use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use futures_util::FutureExt;
use crate::notification::{AsyncEvictionListener, RemovalCause};
pub(crate) struct RemovalNotifier<K, V> {
listener: AsyncEvictionListener<K, V>,
is_enabled: AtomicBool,
#[cfg(feature = "logging")]
cache_name: Option<String>,
}
impl<K, V> RemovalNotifier<K, V> {
pub(crate) fn new(listener: AsyncEvictionListener<K, V>, _cache_name: Option<String>) -> Self {
Self {
listener,
is_enabled: AtomicBool::new(true),
#[cfg(feature = "logging")]
cache_name: _cache_name,
}
}
pub(crate) async fn notify(&self, key: Arc<K>, value: V, cause: RemovalCause) {
use std::panic::{catch_unwind, AssertUnwindSafe};
if !self.is_enabled.load(Ordering::Acquire) {
return;
}
macro_rules! try_or_disable {
($match_expr:expr) => {
match $match_expr {
Ok(v) => v,
Err(_payload) => {
self.is_enabled.store(false, Ordering::Release);
#[cfg(feature = "logging")]
log_panic(&*_payload, self.cache_name.as_deref());
return;
}
}
};
}
let listener_clo = || (self.listener)(key, value, cause);
let fut = try_or_disable!(catch_unwind(AssertUnwindSafe(listener_clo)));
try_or_disable!(AssertUnwindSafe(fut).catch_unwind().await);
}
}
#[cfg(feature = "logging")]
fn log_panic(payload: &(dyn std::any::Any + Send + 'static), cache_name: Option<&str>) {
let message: Option<std::borrow::Cow<'_, str>> =
(payload.downcast_ref::<&str>().map(|s| (*s).into()))
.or_else(|| payload.downcast_ref::<String>().map(Into::into));
let cn = cache_name
.map(|name| format!("[{name}] "))
.unwrap_or_default();
if let Some(m) = message {
log::error!("{cn}Disabled the eviction listener because it panicked at '{m}'");
} else {
log::error!("{cn}Disabled the eviction listener because it panicked");
}
}