1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
use crate::runtime::{Config, MetricsBatch, WorkerMetrics};
use std::cmp;
use std::time::{Duration, Instant};
/// Per-worker statistics. This is used for both tuning the scheduler and
/// reporting runtime-level metrics/stats.
pub(crate) struct Stats {
/// The metrics batch used to report runtime-level metrics/stats to the
/// user.
batch: MetricsBatch,
/// Instant at which work last resumed (continued after park).
///
/// This duplicates the value stored in `MetricsBatch`. We will unify
/// `Stats` and `MetricsBatch` when we stabilize metrics.
processing_scheduled_tasks_started_at: Instant,
/// Number of tasks polled in the batch of scheduled tasks
tasks_polled_in_batch: usize,
/// Exponentially-weighted moving average of time spent polling scheduled a
/// task.
///
/// Tracked in nanoseconds, stored as a f64 since that is what we use with
/// the EWMA calculations
task_poll_time_ewma: f64,
}
/// How to weigh each individual poll time, value is plucked from thin air.
const TASK_POLL_TIME_EWMA_ALPHA: f64 = 0.1;
/// Ideally, we wouldn't go above this, value is plucked from thin air.
const TARGET_GLOBAL_QUEUE_INTERVAL: f64 = Duration::from_micros(200).as_nanos() as f64;
/// Max value for the global queue interval. This is 2x the previous default
const MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 127;
/// This is the previous default
const TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 61;
impl Stats {
pub(crate) fn new(worker_metrics: &WorkerMetrics) -> Stats {
// Seed the value with what we hope to see.
let task_poll_time_ewma =
TARGET_GLOBAL_QUEUE_INTERVAL / TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL as f64;
Stats {
batch: MetricsBatch::new(worker_metrics),
processing_scheduled_tasks_started_at: Instant::now(),
tasks_polled_in_batch: 0,
task_poll_time_ewma,
}
}
pub(crate) fn tuned_global_queue_interval(&self, config: &Config) -> u32 {
// If an interval is explicitly set, don't tune.
if let Some(configured) = config.global_queue_interval {
return configured;
}
// As of Rust 1.45, casts from f64 -> u32 are saturating, which is fine here.
let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32;
cmp::max(
// We don't want to return less than 2 as that would result in the
// global queue always getting checked first.
2,
cmp::min(
MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL,
tasks_per_interval,
),
)
}
pub(crate) fn submit(&mut self, to: &WorkerMetrics) {
self.batch.submit(to, self.task_poll_time_ewma as u64);
}
pub(crate) fn about_to_park(&mut self) {
self.batch.about_to_park();
}
pub(crate) fn inc_local_schedule_count(&mut self) {
self.batch.inc_local_schedule_count();
}
pub(crate) fn start_processing_scheduled_tasks(&mut self) {
self.batch.start_processing_scheduled_tasks();
self.processing_scheduled_tasks_started_at = Instant::now();
self.tasks_polled_in_batch = 0;
}
pub(crate) fn end_processing_scheduled_tasks(&mut self) {
self.batch.end_processing_scheduled_tasks();
// Update the EWMA task poll time
if self.tasks_polled_in_batch > 0 {
let now = Instant::now();
// If we "overflow" this conversion, we have bigger problems than
// slightly off stats.
let elapsed = (now - self.processing_scheduled_tasks_started_at).as_nanos() as f64;
let num_polls = self.tasks_polled_in_batch as f64;
// Calculate the mean poll duration for a single task in the batch
let mean_poll_duration = elapsed / num_polls;
// Compute the alpha weighted by the number of tasks polled this batch.
let weighted_alpha = 1.0 - (1.0 - TASK_POLL_TIME_EWMA_ALPHA).powf(num_polls);
// Now compute the new weighted average task poll time.
self.task_poll_time_ewma = weighted_alpha * mean_poll_duration
+ (1.0 - weighted_alpha) * self.task_poll_time_ewma;
}
}
pub(crate) fn start_poll(&mut self) {
self.batch.start_poll();
self.tasks_polled_in_batch += 1;
}
pub(crate) fn end_poll(&mut self) {
self.batch.end_poll();
}
pub(crate) fn incr_steal_count(&mut self, by: u16) {
self.batch.incr_steal_count(by);
}
pub(crate) fn incr_steal_operations(&mut self) {
self.batch.incr_steal_operations();
}
pub(crate) fn incr_overflow_count(&mut self) {
self.batch.incr_overflow_count();
}
}