tokio/util/sharded_list.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
use std::ptr::NonNull;
use std::sync::atomic::Ordering;
use crate::loom::sync::{Mutex, MutexGuard};
use crate::util::metric_atomics::{MetricAtomicU64, MetricAtomicUsize};
use super::linked_list::{Link, LinkedList};
/// An intrusive linked list supporting highly concurrent updates.
///
/// It currently relies on `LinkedList`, so it is the caller's
/// responsibility to ensure the list is empty before dropping it.
///
/// Note: Due to its inner sharded design, the order of nodes cannot be guaranteed.
pub(crate) struct ShardedList<L, T> {
lists: Box<[Mutex<LinkedList<L, T>>]>,
added: MetricAtomicU64,
count: MetricAtomicUsize,
shard_mask: usize,
}
/// Determines which linked list an item should be stored in.
///
/// # Safety
///
/// Implementations must guarantee that the id of an item does not change from
/// call to call.
pub(crate) unsafe trait ShardedListItem: Link {
/// # Safety
/// The provided pointer must point at a valid list item.
unsafe fn get_shard_id(target: NonNull<Self::Target>) -> usize;
}
impl<L, T> ShardedList<L, T> {
/// Creates a new and empty sharded linked list with the specified size.
pub(crate) fn new(sharded_size: usize) -> Self {
assert!(sharded_size.is_power_of_two());
let shard_mask = sharded_size - 1;
let mut lists = Vec::with_capacity(sharded_size);
for _ in 0..sharded_size {
lists.push(Mutex::new(LinkedList::<L, T>::new()))
}
Self {
lists: lists.into_boxed_slice(),
added: MetricAtomicU64::new(0),
count: MetricAtomicUsize::new(0),
shard_mask,
}
}
}
/// Used to get the lock of shard.
pub(crate) struct ShardGuard<'a, L, T> {
lock: MutexGuard<'a, LinkedList<L, T>>,
added: &'a MetricAtomicU64,
count: &'a MetricAtomicUsize,
id: usize,
}
impl<L: ShardedListItem> ShardedList<L, L::Target> {
/// Removes the last element from a list specified by `shard_id` and returns it, or None if it is
/// empty.
pub(crate) fn pop_back(&self, shard_id: usize) -> Option<L::Handle> {
let mut lock = self.shard_inner(shard_id);
let node = lock.pop_back();
if node.is_some() {
self.count.decrement();
}
node
}
/// Removes the specified node from the list.
///
/// # Safety
///
/// The caller **must** ensure that exactly one of the following is true:
/// - `node` is currently contained by `self`,
/// - `node` is not contained by any list,
/// - `node` is currently contained by some other `GuardedLinkedList`.
pub(crate) unsafe fn remove(&self, node: NonNull<L::Target>) -> Option<L::Handle> {
let id = L::get_shard_id(node);
let mut lock = self.shard_inner(id);
// SAFETY: Since the shard id cannot change, it's not possible for this node
// to be in any other list of the same sharded list.
let node = unsafe { lock.remove(node) };
if node.is_some() {
self.count.decrement();
}
node
}
/// Gets the lock of `ShardedList`, makes us have the write permission.
pub(crate) fn lock_shard(&self, val: &L::Handle) -> ShardGuard<'_, L, L::Target> {
let id = unsafe { L::get_shard_id(L::as_raw(val)) };
ShardGuard {
lock: self.shard_inner(id),
added: &self.added,
count: &self.count,
id,
}
}
/// Gets the count of elements in this list.
pub(crate) fn len(&self) -> usize {
self.count.load(Ordering::Relaxed)
}
cfg_64bit_metrics! {
/// Gets the total number of elements added to this list.
pub(crate) fn added(&self) -> u64 {
self.added.load(Ordering::Relaxed)
}
}
/// Returns whether the linked list does not contain any node.
pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
}
/// Gets the shard size of this `SharedList`.
///
/// Used to help us to decide the parameter `shard_id` of the `pop_back` method.
pub(crate) fn shard_size(&self) -> usize {
self.shard_mask + 1
}
#[inline]
fn shard_inner(&self, id: usize) -> MutexGuard<'_, LinkedList<L, <L as Link>::Target>> {
// Safety: This modulo operation ensures that the index is not out of bounds.
unsafe { self.lists.get_unchecked(id & self.shard_mask).lock() }
}
}
impl<'a, L: ShardedListItem> ShardGuard<'a, L, L::Target> {
/// Push a value to this shard.
pub(crate) fn push(mut self, val: L::Handle) {
let id = unsafe { L::get_shard_id(L::as_raw(&val)) };
assert_eq!(id, self.id);
self.lock.push_front(val);
self.added.add(1, Ordering::Relaxed);
self.count.increment();
}
}
cfg_taskdump! {
impl<L: ShardedListItem> ShardedList<L, L::Target> {
pub(crate) fn for_each<F>(&self, mut f: F)
where
F: FnMut(&L::Handle),
{
let mut guards = Vec::with_capacity(self.lists.len());
for list in self.lists.iter() {
guards.push(list.lock());
}
for g in &mut guards {
g.for_each(&mut f);
}
}
}
}