use std::{ptr::NonNull, time::Duration};
use super::{
concurrent::{entry_info::EntryInfo, DeqNodes},
deque::{DeqNode, Deque},
time::{CheckedTimeOps, Instant},
};
use parking_lot::Mutex;
use triomphe::Arc as TrioArc;
const BUCKET_COUNTS: &[u64] = &[
64, 64, 32, 4, 1, ];
const OVERFLOW_QUEUE_INDEX: usize = BUCKET_COUNTS.len() - 1;
const NUM_LEVELS: usize = OVERFLOW_QUEUE_INDEX - 1;
const DAY: Duration = Duration::from_secs(60 * 60 * 24);
const SPANS: &[u64] = &[
aligned_duration(Duration::from_secs(1)), aligned_duration(Duration::from_secs(60)), aligned_duration(Duration::from_secs(60 * 60)), aligned_duration(DAY), BUCKET_COUNTS[3] * aligned_duration(DAY), BUCKET_COUNTS[3] * aligned_duration(DAY), ];
const SHIFT: &[u64] = &[
SPANS[0].trailing_zeros() as u64,
SPANS[1].trailing_zeros() as u64,
SPANS[2].trailing_zeros() as u64,
SPANS[3].trailing_zeros() as u64,
SPANS[4].trailing_zeros() as u64,
];
const fn aligned_duration(duration: Duration) -> u64 {
(duration.as_nanos() as u64).next_power_of_two()
}
pub(crate) enum TimerNode<K> {
Sentinel,
Entry {
pos: Option<(u8, u8)>,
entry_info: TrioArc<EntryInfo<K>>,
deq_nodes: TrioArc<Mutex<DeqNodes<K>>>,
},
}
impl<K> TimerNode<K> {
fn new(
entry_info: TrioArc<EntryInfo<K>>,
deq_nodes: TrioArc<Mutex<DeqNodes<K>>>,
level: usize,
index: usize,
) -> Self {
Self::Entry {
pos: Some((level as u8, index as u8)),
entry_info,
deq_nodes,
}
}
fn position(&self) -> Option<(usize, usize)> {
if let Self::Entry { pos, .. } = &self {
pos.map(|(level, index)| (level as usize, index as usize))
} else {
unreachable!()
}
}
fn set_position(&mut self, level: usize, index: usize) {
if let Self::Entry { pos, .. } = self {
*pos = Some((level as u8, index as u8));
} else {
unreachable!()
}
}
fn unset_position(&mut self) {
if let Self::Entry { pos, .. } = self {
*pos = None;
} else {
unreachable!()
}
}
fn is_sentinel(&self) -> bool {
matches!(self, Self::Sentinel)
}
pub(crate) fn entry_info(&self) -> &TrioArc<EntryInfo<K>> {
if let Self::Entry { entry_info, .. } = &self {
entry_info
} else {
unreachable!()
}
}
fn unset_timer_node_in_deq_nodes(&self) {
if let Self::Entry { deq_nodes, .. } = &self {
deq_nodes.lock().set_timer_node(None);
} else {
unreachable!();
}
}
}
type Bucket<K> = Deque<TimerNode<K>>;
#[must_use = "this `ReschedulingResult` may be an `Removed` variant, which should be handled"]
pub(crate) enum ReschedulingResult<K> {
Rescheduled,
Removed(Box<DeqNode<TimerNode<K>>>),
}
pub(crate) struct TimerWheel<K> {
wheels: Box<[Box<[Bucket<K>]>]>,
origin: Instant,
current: Instant,
}
#[cfg(feature = "future")]
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl<K> Send for TimerWheel<K> {}
impl<K> TimerWheel<K> {
pub(crate) fn new(now: Instant) -> Self {
Self {
wheels: Box::default(), origin: now,
current: now,
}
}
#[cfg(test)]
pub(crate) fn set_origin(&mut self, time: Instant) {
self.origin = time;
self.current = time;
}
pub(crate) fn is_enabled(&self) -> bool {
!self.wheels.is_empty()
}
pub(crate) fn enable(&mut self) {
assert!(!self.is_enabled());
self.wheels = BUCKET_COUNTS
.iter()
.map(|b| {
(0..*b)
.map(|_| {
let mut deq = Deque::new(super::CacheRegion::Other);
deq.push_back(Box::new(DeqNode::new(TimerNode::Sentinel)));
deq
})
.collect::<Vec<_>>()
.into_boxed_slice()
})
.collect::<Vec<_>>()
.into_boxed_slice();
}
pub(crate) fn schedule(
&mut self,
entry_info: TrioArc<EntryInfo<K>>,
deq_nodes: TrioArc<Mutex<DeqNodes<K>>>,
) -> Option<NonNull<DeqNode<TimerNode<K>>>> {
debug_assert!(self.is_enabled());
if let Some(t) = entry_info.expiration_time() {
let (level, index) = self.bucket_indices(t);
let node = Box::new(DeqNode::new(TimerNode::new(
entry_info, deq_nodes, level, index,
)));
let node = self.wheels[level][index].push_back(node);
Some(node)
} else {
None
}
}
fn schedule_existing_node(
&mut self,
mut node: NonNull<DeqNode<TimerNode<K>>>,
) -> ReschedulingResult<K> {
debug_assert!(self.is_enabled());
if let entry @ TimerNode::Entry { .. } = &mut unsafe { node.as_mut() }.element {
if let Some(t) = entry.entry_info().expiration_time() {
let (level, index) = self.bucket_indices(t);
entry.set_position(level, index);
let node = unsafe { Box::from_raw(node.as_ptr()) };
self.wheels[level][index].push_back(node);
ReschedulingResult::Rescheduled
} else {
entry.unset_position();
entry.unset_timer_node_in_deq_nodes();
ReschedulingResult::Removed(unsafe { Box::from_raw(node.as_ptr()) })
}
} else {
unreachable!()
}
}
pub(crate) fn reschedule(
&mut self,
node: NonNull<DeqNode<TimerNode<K>>>,
) -> ReschedulingResult<K> {
debug_assert!(self.is_enabled());
unsafe { self.unlink_timer(node) };
self.schedule_existing_node(node)
}
pub(crate) fn deschedule(&mut self, node: NonNull<DeqNode<TimerNode<K>>>) {
debug_assert!(self.is_enabled());
unsafe {
self.unlink_timer(node);
Self::drop_node(node);
}
}
unsafe fn unlink_timer(&mut self, mut node: NonNull<DeqNode<TimerNode<K>>>) {
let p = node.as_mut();
if let entry @ TimerNode::Entry { .. } = &mut p.element {
if let Some((level, index)) = entry.position() {
self.wheels[level][index].unlink(node);
entry.unset_position();
}
} else {
unreachable!();
}
}
unsafe fn drop_node(node: NonNull<DeqNode<TimerNode<K>>>) {
std::mem::drop(Box::from_raw(node.as_ptr()));
}
pub(crate) fn advance(
&mut self,
current_time: Instant,
) -> impl Iterator<Item = TimerEvent<K>> + '_ {
debug_assert!(self.is_enabled());
let previous_time = self.current;
self.current = current_time;
TimerEventsIter::new(self, previous_time, current_time)
}
fn pop_timer_node(&mut self, level: usize, index: usize) -> Option<Box<DeqNode<TimerNode<K>>>> {
let deque = &mut self.wheels[level][index];
if let Some(node) = deque.peek_front() {
if node.element.is_sentinel() {
return None;
}
}
deque.pop_front()
}
fn reset_timer_node_positions(&mut self, level: usize, index: usize) {
let deque = &mut self.wheels[level][index];
debug_assert!(
deque.len() > 0,
"BUG: The queue is empty. level: {level}, index: {index}"
);
while !deque.peek_back().unwrap().element.is_sentinel() {
deque.move_front_to_back();
}
}
fn bucket_indices(&self, time: Instant) -> (usize, usize) {
let duration_nanos = self.duration_nanos_since_last_advanced(time);
let time_nanos = self.time_nanos(time);
for level in 0..=NUM_LEVELS {
if duration_nanos < SPANS[level + 1] {
let ticks = time_nanos >> SHIFT[level];
let index = ticks & (BUCKET_COUNTS[level] - 1);
return (level, index as usize);
}
}
(OVERFLOW_QUEUE_INDEX, 0)
}
fn duration_nanos_since_last_advanced(&self, time: Instant) -> u64 {
time.checked_duration_since(self.current)
.unwrap_or_default() .as_nanos() as u64
}
fn time_nanos(&self, time: Instant) -> u64 {
let nanos_u128 = time
.checked_duration_since(self.origin)
.unwrap_or_default() .as_nanos();
nanos_u128.try_into().unwrap_or(u64::MAX)
}
}
#[derive(Debug)]
pub(crate) enum TimerEvent<K> {
Expired(Box<DeqNode<TimerNode<K>>>),
#[cfg(test)]
Rescheduled(TrioArc<EntryInfo<K>>),
#[cfg(not(test))]
Rescheduled(()),
Descheduled,
}
pub(crate) struct TimerEventsIter<'iter, K> {
timer_wheel: &'iter mut TimerWheel<K>,
previous_time: Instant,
current_time: Instant,
is_done: bool,
level: usize,
index: u8,
end_index: u8,
index_mask: u64,
is_new_level: bool,
is_new_index: bool,
}
impl<'iter, K> TimerEventsIter<'iter, K> {
fn new(
timer_wheel: &'iter mut TimerWheel<K>,
previous_time: Instant,
current_time: Instant,
) -> Self {
Self {
timer_wheel,
previous_time,
current_time,
is_done: false,
level: 0,
index: 0,
end_index: 0,
index_mask: 0,
is_new_level: true,
is_new_index: true,
}
}
}
impl<'iter, K> Drop for TimerEventsIter<'iter, K> {
fn drop(&mut self) {
if !self.is_done {
self.timer_wheel.current = self.previous_time;
}
}
}
impl<'iter, K> Iterator for TimerEventsIter<'iter, K> {
type Item = TimerEvent<K>;
fn next(&mut self) -> Option<Self::Item> {
if self.is_done {
return None;
}
loop {
if self.is_new_level {
let previous_time_nanos = self.timer_wheel.time_nanos(self.previous_time);
let current_time_nanos = self.timer_wheel.time_nanos(self.current_time);
let previous_ticks = previous_time_nanos >> SHIFT[self.level];
let current_ticks = current_time_nanos >> SHIFT[self.level];
if current_ticks <= previous_ticks {
self.is_done = true;
return None;
}
self.index_mask = BUCKET_COUNTS[self.level] - 1;
self.index = (previous_ticks & self.index_mask) as u8;
let steps =
(current_ticks - previous_ticks + 1).min(BUCKET_COUNTS[self.level]) as u8;
self.end_index = self.index + steps;
self.is_new_level = false;
self.is_new_index = true;
}
let i = self.index & self.index_mask as u8;
if self.is_new_index {
self.timer_wheel
.reset_timer_node_positions(self.level, i as usize);
self.is_new_index = false;
}
if let Some(node) = self.timer_wheel.pop_timer_node(self.level, i as usize) {
let expiration_time = node.as_ref().element.entry_info().expiration_time();
if let Some(t) = expiration_time {
if t <= self.current_time {
node.as_ref().element.unset_timer_node_in_deq_nodes();
return Some(TimerEvent::Expired(node));
}
let node_p = NonNull::new(Box::into_raw(node)).expect("Got a null ptr");
#[cfg(test)]
let entry_info =
TrioArc::clone(unsafe { node_p.as_ref() }.element.entry_info());
match self.timer_wheel.schedule_existing_node(node_p) {
ReschedulingResult::Rescheduled => {
#[cfg(test)]
return Some(TimerEvent::Rescheduled(entry_info));
#[cfg(not(test))]
return Some(TimerEvent::Rescheduled(()));
}
ReschedulingResult::Removed(node) => {
node.as_ref().element.unset_timer_node_in_deq_nodes();
return Some(TimerEvent::Descheduled);
}
}
}
} else {
self.index += 1;
self.is_new_index = true;
if self.index >= self.end_index {
self.level += 1;
if self.level >= BUCKET_COUNTS.len() {
self.is_done = true;
return None;
}
self.is_new_level = true;
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use super::{TimerEvent, TimerWheel, SPANS};
use crate::common::{
concurrent::{entry_info::EntryInfo, KeyHash},
time::{CheckedTimeOps, Clock, Instant, Mock},
};
use triomphe::Arc as TrioArc;
#[test]
fn test_bucket_indices() {
fn bi(timer: &TimerWheel<()>, now: Instant, dur: Duration) -> (usize, usize) {
let t = now.checked_add(dur).unwrap();
timer.bucket_indices(t)
}
let (clock, mock) = Clock::mock();
let now = now(&clock);
let mut timer = TimerWheel::<()>::new(now);
timer.enable();
assert_eq!(timer.bucket_indices(now), (0, 0));
assert_eq!(bi(&timer, now, n2d(SPANS[0] - 1)), (0, 0));
assert_eq!(bi(&timer, now, n2d(SPANS[0])), (0, 1));
assert_eq!(bi(&timer, now, n2d(SPANS[0] * 63)), (0, 63));
assert_eq!(bi(&timer, now, n2d(SPANS[0] * 64)), (1, 1));
assert_eq!(bi(&timer, now, n2d(SPANS[1])), (1, 1));
assert_eq!(bi(&timer, now, n2d(SPANS[1] * 63 + SPANS[0] * 63)), (1, 63));
assert_eq!(bi(&timer, now, n2d(SPANS[1] * 64)), (2, 1));
assert_eq!(bi(&timer, now, n2d(SPANS[2])), (2, 1));
assert_eq!(
bi(
&timer,
now,
n2d(SPANS[2] * 31 + SPANS[1] * 63 + SPANS[0] * 63)
),
(2, 31)
);
assert_eq!(bi(&timer, now, n2d(SPANS[2] * 32)), (3, 1));
assert_eq!(bi(&timer, now, n2d(SPANS[3])), (3, 1));
assert_eq!(bi(&timer, now, n2d(SPANS[3] * 3)), (3, 3));
assert_eq!(bi(&timer, now, n2d(SPANS[3] * 4)), (4, 0));
assert_eq!(bi(&timer, now, n2d(SPANS[4])), (4, 0));
assert_eq!(bi(&timer, now, n2d(SPANS[4] * 100)), (4, 0));
let now = advance_clock(&clock, &mock, n2d(SPANS[0] * 5));
timer.current = now;
assert_eq!(bi(&timer, now, n2d(SPANS[0] - 1)), (0, 5));
assert_eq!(bi(&timer, now, n2d(SPANS[0])), (0, 6));
assert_eq!(bi(&timer, now, n2d(SPANS[0] * 63)), (0, 4));
assert_eq!(bi(&timer, now, n2d(SPANS[0] * 64)), (1, 1));
assert_eq!(bi(&timer, now, n2d(SPANS[1])), (1, 1));
assert_eq!(
bi(&timer, now, n2d(SPANS[1] * 63 + SPANS[0] * (63 - 5))),
(1, 63)
);
let now = advance_clock(&clock, &mock, n2d(SPANS[0] * 61));
timer.current = now;
assert_eq!(bi(&timer, now, n2d(SPANS[0] - 1)), (0, 2));
assert_eq!(bi(&timer, now, n2d(SPANS[0])), (0, 3));
assert_eq!(bi(&timer, now, n2d(SPANS[0] * 63)), (0, 1));
assert_eq!(bi(&timer, now, n2d(SPANS[0] * 64)), (1, 2));
assert_eq!(bi(&timer, now, n2d(SPANS[1])), (1, 2));
assert_eq!(
bi(&timer, now, n2d(SPANS[1] * 63 + SPANS[0] * (63 - 2))),
(1, 0)
);
}
#[test]
fn test_advance() {
fn schedule_timer(timer: &mut TimerWheel<u32>, key: u32, now: Instant, ttl: Duration) {
let hash = key as u64;
let key_hash = KeyHash::new(Arc::new(key), hash);
let policy_weight = 0;
let entry_info = TrioArc::new(EntryInfo::new(key_hash, now, policy_weight));
entry_info.set_expiration_time(Some(now.checked_add(ttl).unwrap()));
let deq_nodes = Default::default();
let timer_node = timer.schedule(entry_info, TrioArc::clone(&deq_nodes));
deq_nodes.lock().set_timer_node(timer_node);
}
fn expired_key(maybe_entry: Option<TimerEvent<u32>>) -> u32 {
let entry = maybe_entry.expect("entry is none");
match entry {
TimerEvent::Expired(node) => *node.element.entry_info().key_hash().key,
_ => panic!("Expected an expired entry. Got {entry:?}"),
}
}
fn rescheduled_key(maybe_entry: Option<TimerEvent<u32>>) -> u32 {
let entry = maybe_entry.expect("entry is none");
match entry {
TimerEvent::Rescheduled(entry) => *entry.key_hash().key,
_ => panic!("Expected a rescheduled entry. Got {entry:?}"),
}
}
let (clock, mock) = Clock::mock();
let now = advance_clock(&clock, &mock, s2d(10));
let mut timer = TimerWheel::<u32>::new(now);
timer.enable();
schedule_timer(&mut timer, 1, now, s2d(5));
schedule_timer(&mut timer, 2, now, s2d(1));
schedule_timer(&mut timer, 3, now, s2d(63));
schedule_timer(&mut timer, 4, now, s2d(3));
let now = advance_clock(&clock, &mock, s2d(4));
let mut expired_entries = timer.advance(now);
assert_eq!(expired_key(expired_entries.next()), 2);
assert_eq!(expired_key(expired_entries.next()), 4);
assert!(expired_entries.next().is_none());
drop(expired_entries);
let now = advance_clock(&clock, &mock, s2d(4));
let mut expired_entries = timer.advance(now);
assert_eq!(expired_key(expired_entries.next()), 1);
assert!(expired_entries.next().is_none());
drop(expired_entries);
let now = advance_clock(&clock, &mock, s2d(64 - 8));
let mut expired_entries = timer.advance(now);
assert_eq!(expired_key(expired_entries.next()), 3);
assert!(expired_entries.next().is_none());
drop(expired_entries);
const MINUTES: u64 = 60;
schedule_timer(&mut timer, 1, now, s2d(5 * MINUTES));
#[allow(clippy::identity_op)]
schedule_timer(&mut timer, 2, now, s2d(1 * MINUTES));
schedule_timer(&mut timer, 3, now, s2d(63 * MINUTES));
schedule_timer(&mut timer, 4, now, s2d(3 * MINUTES));
let now = advance_clock(&clock, &mock, s2d(4 * MINUTES));
let mut expired_entries = timer.advance(now);
assert_eq!(expired_key(expired_entries.next()), 2);
assert_eq!(expired_key(expired_entries.next()), 4);
assert!(expired_entries.next().is_none());
drop(expired_entries);
let now = advance_clock(&clock, &mock, s2d(4 * MINUTES));
let mut expired_entries = timer.advance(now);
assert_eq!(expired_key(expired_entries.next()), 1);
assert!(expired_entries.next().is_none());
drop(expired_entries);
let now = advance_clock(&clock, &mock, s2d((64 - 8) * MINUTES));
let mut expired_entries = timer.advance(now);
assert_eq!(expired_key(expired_entries.next()), 3);
assert!(expired_entries.next().is_none());
drop(expired_entries);
const HOURS: u64 = 60 * 60;
schedule_timer(&mut timer, 1, now, s2d(5 * HOURS));
#[allow(clippy::identity_op)]
schedule_timer(&mut timer, 2, now, s2d(1 * HOURS));
schedule_timer(&mut timer, 3, now, s2d(31 * HOURS));
schedule_timer(&mut timer, 4, now, s2d(3 * HOURS));
let now = advance_clock(&clock, &mock, s2d(4 * HOURS));
let mut expired_entries = timer.advance(now);
assert_eq!(expired_key(expired_entries.next()), 2);
assert_eq!(expired_key(expired_entries.next()), 4);
assert_eq!(rescheduled_key(expired_entries.next()), 1);
assert!(expired_entries.next().is_none());
drop(expired_entries);
let now = advance_clock(&clock, &mock, s2d(4 * HOURS));
let mut expired_entries = timer.advance(now);
assert_eq!(expired_key(expired_entries.next()), 1);
assert!(expired_entries.next().is_none());
drop(expired_entries);
let now = advance_clock(&clock, &mock, s2d((32 - 8) * HOURS));
let mut expired_entries = timer.advance(now);
assert_eq!(expired_key(expired_entries.next()), 3);
assert!(expired_entries.next().is_none());
drop(expired_entries);
const DAYS: u64 = 24 * 60 * 60;
schedule_timer(&mut timer, 1, now, s2d(5 * DAYS));
#[allow(clippy::identity_op)]
schedule_timer(&mut timer, 2, now, s2d(1 * DAYS));
schedule_timer(&mut timer, 3, now, s2d(2 * DAYS));
schedule_timer(&mut timer, 4, now, s2d(8 * DAYS));
let now = advance_clock(&clock, &mock, s2d(3 * DAYS));
let mut expired_entries = timer.advance(now);
assert_eq!(expired_key(expired_entries.next()), 2);
assert_eq!(expired_key(expired_entries.next()), 3);
assert!(expired_entries.next().is_none());
drop(expired_entries);
let now = advance_clock(&clock, &mock, s2d(3 * DAYS));
let mut expired_entries = timer.advance(now);
assert_eq!(expired_key(expired_entries.next()), 1);
assert_eq!(rescheduled_key(expired_entries.next()), 4);
assert!(expired_entries.next().is_none());
drop(expired_entries);
let now = advance_clock(&clock, &mock, s2d(3 * DAYS));
let mut expired_entries = timer.advance(now);
assert_eq!(expired_key(expired_entries.next()), 4);
assert!(expired_entries.next().is_none());
drop(expired_entries);
}
fn now(clock: &Clock) -> Instant {
Instant::new(clock.now())
}
fn advance_clock(clock: &Clock, mock: &Arc<Mock>, duration: Duration) -> Instant {
mock.increment(duration);
now(clock)
}
fn n2d(nanos: u64) -> Duration {
Duration::from_nanos(nanos)
}
fn s2d(secs: u64) -> Duration {
Duration::from_secs(secs)
}
}