tokio/runtime/io/
registration_set.rsuse crate::loom::sync::atomic::AtomicUsize;
use crate::runtime::io::ScheduledIo;
use crate::util::linked_list::{self, LinkedList};
use std::io;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::{Acquire, Release};
use std::sync::Arc;
pub(super) struct RegistrationSet {
num_pending_release: AtomicUsize,
}
pub(super) struct Synced {
is_shutdown: bool,
registrations: LinkedList<Arc<ScheduledIo>, ScheduledIo>,
pending_release: Vec<Arc<ScheduledIo>>,
}
impl RegistrationSet {
pub(super) fn new() -> (RegistrationSet, Synced) {
let set = RegistrationSet {
num_pending_release: AtomicUsize::new(0),
};
let synced = Synced {
is_shutdown: false,
registrations: LinkedList::new(),
pending_release: Vec::with_capacity(16),
};
(set, synced)
}
pub(super) fn is_shutdown(&self, synced: &Synced) -> bool {
synced.is_shutdown
}
pub(super) fn needs_release(&self) -> bool {
self.num_pending_release.load(Acquire) != 0
}
pub(super) fn allocate(&self, synced: &mut Synced) -> io::Result<Arc<ScheduledIo>> {
if synced.is_shutdown {
return Err(io::Error::new(
io::ErrorKind::Other,
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
));
}
let ret = Arc::new(ScheduledIo::default());
synced.registrations.push_front(ret.clone());
Ok(ret)
}
pub(super) fn deregister(&self, synced: &mut Synced, registration: &Arc<ScheduledIo>) -> bool {
const NOTIFY_AFTER: usize = 16;
synced.pending_release.push(registration.clone());
let len = synced.pending_release.len();
self.num_pending_release.store(len, Release);
len == NOTIFY_AFTER
}
pub(super) fn shutdown(&self, synced: &mut Synced) -> Vec<Arc<ScheduledIo>> {
if synced.is_shutdown {
return vec![];
}
synced.is_shutdown = true;
synced.pending_release.clear();
let mut ret = vec![];
while let Some(io) = synced.registrations.pop_back() {
ret.push(io);
}
ret
}
pub(super) fn release(&self, synced: &mut Synced) {
let pending = std::mem::take(&mut synced.pending_release);
for io in pending {
unsafe { self.remove(synced, io.as_ref()) }
}
self.num_pending_release.store(0, Release);
}
pub(super) unsafe fn remove(&self, synced: &mut Synced, io: &ScheduledIo) {
super::EXPOSE_IO.unexpose_provenance(io);
let _ = synced.registrations.remove(io.into());
}
}
unsafe impl linked_list::Link for Arc<ScheduledIo> {
type Handle = Arc<ScheduledIo>;
type Target = ScheduledIo;
fn as_raw(handle: &Self::Handle) -> NonNull<ScheduledIo> {
unsafe { NonNull::new_unchecked(Arc::as_ptr(handle) as *mut _) }
}
unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Arc<ScheduledIo> {
unsafe { Arc::from_raw(ptr.as_ptr() as *const _) }
}
unsafe fn pointers(
target: NonNull<Self::Target>,
) -> NonNull<linked_list::Pointers<ScheduledIo>> {
NonNull::new_unchecked(target.as_ref().linked_list_pointers.get())
}
}