quanta/upkeep.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
use crate::{set_recent, Clock};
use std::{
fmt, io,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, JoinHandle},
time::Duration,
};
static GLOBAL_UPKEEP_RUNNING: AtomicBool = AtomicBool::new(false);
/// Ultra-low-overhead access to slightly-delayed time.
///
/// In some applications, there can be a need to check the current time very often, so much so that
/// the overhead of checking the time can begin to eat up measurable overhead. For some of these
/// cases, the time may need to be accessed often but does not necessarily need to be incredibly
/// accurate: one millisecond granularity could be entirely acceptable.
///
/// For these cases, we provide a slightly-delayed version of the time to callers via
/// [`Clock::recent`], which is updated by a background upkeep thread. That thread is configured
/// and spanwed via [`Upkeep`].
///
/// [`Upkeep`] can construct a new clock (or be passed an existing clock to use), and given an
/// update interval, and it will faithfully attempt to update the global recent time on the
/// specified interval. There is a trade-off to be struck in terms of how often the time is
/// updated versus the required accuracy. Checking the time and updating the global reference is
/// itself not zero-cost, and so care must be taken to analyze the number of readers in order to
/// ensure that, given a particular update interval, the upkeep thread is saving more CPU time than
/// would be spent otherwise by directly querying the current time.
///
/// The recent time is read and written atomically. It is global to an application, so if another
/// codepath creates the upkeep thread, the interval chosen by that instantiation will be the one
/// that all callers of [`Clock::recent`] end up using.
///
/// Multiple upkeep threads cannot exist at the same time. A new upkeep thread can be started if
/// the old one is dropped and returns.
///
/// In terms of performance, reading the recent time can be up to two to three times as fast as
/// reading the current time in the optimized case of using the Time Stamp Counter source. In
/// practice, while a caller might expect to take 12-14ns to read the TSC and scale it to reference
/// time, the recent time can be read in 4-5ns, with no reference scale conversion required.
#[derive(Debug)]
pub struct Upkeep {
interval: Duration,
clock: Clock,
}
/// Handle to a running upkeep thread.
///
/// If a handle is dropped, the upkeep thread will be stopped, and the recent time will cease to
/// update. The upkeep thread can be started again to resume updating the recent time.
#[derive(Debug)]
pub struct Handle {
done: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
}
/// Errors thrown during the creation/spawning of the upkeep thread.
#[derive(Debug)]
pub enum Error {
/// An upkeep thread is already running in this process.
UpkeepRunning,
/// An error occurred when trying to spawn the upkeep thread.
FailedToSpawnUpkeepThread(io::Error),
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::UpkeepRunning => write!(f, "upkeep thread already running"),
Error::FailedToSpawnUpkeepThread(e) => {
write!(f, "failed to spawn upkeep thread: {}", e)
}
}
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::UpkeepRunning => None,
Self::FailedToSpawnUpkeepThread(e) => Some(e),
}
}
}
impl Upkeep {
/// Creates a new [`Upkeep`].
///
/// This creates a new internal clock for acquiring the current time. If you have an existing
/// [`Clock`] that is already calibrated, it is slightly faster to clone it and construct the
/// builder with [`new_with_clock`](Upkeep::new_with_clock) to avoid recalibrating.
pub fn new(interval: Duration) -> Upkeep {
Self::new_with_clock(interval, Clock::new())
}
/// Creates a new [`Upkeep`] with the specified [`Clock`] instance.
pub fn new_with_clock(interval: Duration, clock: Clock) -> Upkeep {
Upkeep { interval, clock }
}
/// Start the upkeep thread, periodically updating the global coarse time.
///
/// [`Handle`] represents a drop guard for the upkeep thread if it is successfully spawned.
/// Dropping the handle will also instruct the upkeep thread to stop and exist, so the handle
/// must be held while the upkeep thread should continue to run.
///
/// # Errors
///
/// If either an existing upkeep thread is running, or there was an issue when attempting to
/// spawn the upkeep thread, an error variant will be returned describing the error.
pub fn start(self) -> Result<Handle, Error> {
// If another upkeep thread is running, inform the caller.
let _ = GLOBAL_UPKEEP_RUNNING
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.map_err(|_| Error::UpkeepRunning)?;
let interval = self.interval;
let clock = self.clock;
// Note: spawning `quanta-upkeep` thread may take a significant amount of time. Thus, it is
// possible for a user to read a 0 value from `Clock::recent` before `quanta-upkeep` has
// started. To avoid that, make sure the recent time is initialized to some measurement.
set_recent(clock.now());
let done = Arc::new(AtomicBool::new(false));
let their_done = done.clone();
let result = thread::Builder::new()
.name("quanta-upkeep".to_string())
.spawn(move || {
while !their_done.load(Ordering::Acquire) {
set_recent(clock.now());
thread::sleep(interval);
}
GLOBAL_UPKEEP_RUNNING.store(false, Ordering::SeqCst);
})
.map_err(Error::FailedToSpawnUpkeepThread);
// Let another caller attempt to spawn the upkeep thread if we failed to do so.
if result.is_err() {
GLOBAL_UPKEEP_RUNNING.store(false, Ordering::SeqCst);
}
let handle = result?;
Ok(Handle {
done,
handle: Some(handle),
})
}
}
impl Drop for Handle {
fn drop(&mut self) {
self.done.store(true, Ordering::Release);
if let Some(handle) = self.handle.take() {
let _result = handle
.join()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "failed to stop upkeep thread"));
}
}
}
#[cfg(test)]
mod tests {
use super::Upkeep;
use std::time::Duration;
#[test]
#[cfg_attr(target_arch = "wasm32", ignore)] // WASM is single threaded
fn test_spawning_second_upkeep() {
let first = Upkeep::new(Duration::from_millis(250)).start();
let second = Upkeep::new(Duration::from_millis(250))
.start()
.map_err(|e| e.to_string());
assert!(first.is_ok());
let second_err = second.expect_err("second upkeep should be error, got handle");
assert_eq!(second_err, "upkeep thread already running");
}
}