diff --git a/.github/workflows/Miri.yml b/.github/workflows/Miri.yml index 03fa7ba2..e496f737 100644 --- a/.github/workflows/Miri.yml +++ b/.github/workflows/Miri.yml @@ -45,3 +45,9 @@ jobs: with: command: miri args: test deque + + - name: Run Miri test (timer_wheel) + uses: actions-rs/cargo@v1 + with: + command: miri + args: test timer_wheel diff --git a/.vscode/settings.json b/.vscode/settings.json index 8f962be7..d477aafa 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -10,6 +10,7 @@ "ahash", "armv", "benmanes", + "CHECKME", "circleci", "CLFU", "clippy", @@ -19,10 +20,13 @@ "deqs", "Deque", "Deques", + "deschedule", + "Descheduled", "devcontainer", "docsrs", "Einziger", "else's", + "ENHANCEME", "Eytan", "getrandom", "hashbrown", diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d16265c..d385d8b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,19 @@ # Moka Cache — Change Log +## Version 0.11.0 + +### Breaking Changes + +- (Will change the return type of the `invalidate` method from `()` to `Option`) + +### Added + +- Added support for per-entry expiration. ([#248][gh-pull-0248]) + - In addition to the existing TTL and TTI (time-to-idle) expiration times that + apply to all entries in the cache, the `sync` and `future` caches can now allow + different expiration times for individual entries. + + ## Version 0.10.2 Bumped the minimum supported Rust version (MSRV) to 1.60 (2022-04-07). @@ -624,6 +638,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (2021-03-25). [gh-issue-0031]: https://github.com/moka-rs/moka/issues/31/ [gh-pull-0251]: https://github.com/moka-rs/moka/pull/251/ +[gh-pull-0248]: https://github.com/moka-rs/moka/pull/248/ [gh-pull-0216]: https://github.com/moka-rs/moka/pull/216/ [gh-pull-0199]: https://github.com/moka-rs/moka/pull/199/ [gh-pull-0195]: https://github.com/moka-rs/moka/pull/195/ diff --git a/Cargo.toml b/Cargo.toml index 92b1649a..8890578b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "moka" -version = "0.10.2" +version = "0.11.0" edition = "2018" rust-version = "1.60" # Released on April 7, 2022, supporting 2021 edition. diff --git a/README.md b/README.md index 495f31de..3ff9b5e4 100644 --- a/README.md +++ b/README.md @@ -53,8 +53,9 @@ high level of concurrency for concurrent access. - Eviction from a cache is controlled by the Least Recently Used (LRU) policy. - [More details and some benchmark results are available here][tiny-lfu]. - Supports expiration policies: - - Time to live - - Time to idle + - Time to live. + - Time to idle. + - Per-entry variable expiration. - Supports eviction listener, a callback function that will be called when an entry is removed from the cache. @@ -66,7 +67,7 @@ and can be overkill for your use case. Sometimes simpler caches like The following table shows the trade-offs between the different cache implementations: -| Feature | Moka v0.10 | Mini Moka v0.10 | Quick Cache v0.2 | +| Feature | Moka v0.11 | Mini Moka v0.10 | Quick Cache v0.2 | |:------- |:---- |:--------- |:----------- | | Thread-safe, sync cache | ✅ | ✅ | ✅ | | Thread-safe, async cache | ✅ | ❌ | ❌ | @@ -74,16 +75,17 @@ The following table shows the trade-offs between the different cache implementat | Bounded by the maximum number of entries | ✅ | ✅ | ✅ | | Bounded by the total weighted size of entries | ✅ | ✅ | ✅ | | Near optimal hit ratio | ✅ TinyLFU | ✅ TinyLFU | ✅ CLOCK-Pro | -| Expiration policies | ✅ | ✅ | ❌ | +| Cache-level expiration policies (Time-to-live and time-to-idle) | ✅ | ✅ | ❌ | +| Per-entry variable expiration | ✅ | ❌ | ❌ | | Eviction listener | ✅ | ❌ | ❌ | | Per-key, atomic insertion | ✅ `get_with` family methods | ❌ | ❌ | | Lock-free, concurrent iterator | ✅ | ❌ | ❌ | | Lock-per-shard, concurrent iterator | ❌ | ✅ | ❌ | -| Performance | Moka v0.10 | Mini Moka v0.10 | Quick Cache v0.2 | +| Performance | Moka v0.11 | Mini Moka v0.10 | Quick Cache v0.2 | |:------- |:---- |:--------- |:----------- | | Small overhead compared to a concurrent hash table | ❌ | ❌ | ✅ | -| Does not use background threads | ❌ Will be removed from v0.11 | ✅ | ✅ | +| Does not use background threads | ❌ Will be removed from v0.12 or v0.13 | ✅ | ✅ | | Small dependency tree | ❌ | ✅ | ✅ | [tiny-lfu]: https://github.com/moka-rs/moka/wiki#admission-and-eviction-policies @@ -154,14 +156,14 @@ Add this to your `Cargo.toml`: ```toml [dependencies] -moka = "0.10" +moka = "0.11" ``` To use the asynchronous cache, enable a crate feature called "future". ```toml [dependencies] -moka = { version = "0.10", features = ["future"] } +moka = { version = "0.11", features = ["future"] } ``` @@ -263,7 +265,7 @@ Here is a similar program to the previous example, but using asynchronous cache // Cargo.toml // // [dependencies] -// moka = { version = "0.10", features = ["future"] } +// moka = { version = "0.11", features = ["future"] } // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } // futures-util = "0.3" @@ -502,9 +504,9 @@ to the dependency declaration. ```toml:Cargo.toml [dependencies] -moka = { version = "0.10", default-features = false } +moka = { version = "0.11", default-features = false } # Or -moka = { version = "0.10", default-features = false, features = ["future"] } +moka = { version = "0.11", default-features = false, features = ["future"] } ``` This will make Moka to switch to a fall-back implementation, so it will compile. @@ -547,13 +549,15 @@ $ cargo +nightly -Z unstable-options --config 'build.rustdocflags="--cfg docsrs" - `blocking_insert(K, V)` → `blocking().insert(K, V)` - `time_to_live()` → `policy().time_to_live()` - [x] Notifications on eviction. (`v0.9.0` via [#145][gh-pull-145]) +- [x] The variable (per-entry) expiration, using a hierarchical timer wheel. + (`v0.11.0` via [#248][gh-pull-248]) - [ ] Cache statistics. (Hit rate, etc.) - [ ] Upgrade TinyLFU to Window-TinyLFU. ([details][tiny-lfu]) -- [ ] The variable (per-entry) expiration, using a hierarchical timer wheel. [gh-pull-024]: https://github.com/moka-rs/moka/pull/24 [gh-pull-105]: https://github.com/moka-rs/moka/pull/105 [gh-pull-145]: https://github.com/moka-rs/moka/pull/145 +[gh-pull-248]: https://github.com/moka-rs/moka/pull/248 ## About the Name diff --git a/src/common.rs b/src/common.rs index c056e523..5b718cad 100644 --- a/src/common.rs +++ b/src/common.rs @@ -13,6 +13,7 @@ pub(crate) mod builder_utils; pub(crate) mod deque; pub(crate) mod frequency_sketch; pub(crate) mod time; +pub(crate) mod timer_wheel; #[cfg(all(test, any(feature = "sync", feature = "future")))] pub(crate) mod test_utils; diff --git a/src/common/concurrent.rs b/src/common/concurrent.rs index 3da5b898..66862efe 100644 --- a/src/common/concurrent.rs +++ b/src/common/concurrent.rs @@ -31,6 +31,8 @@ pub(crate) mod debug_counters; use self::entry_info::EntryInfo; +use super::timer_wheel::TimerNode; + pub(crate) type Weigher = Arc u32 + Send + Sync + 'static>; pub(crate) trait AccessTime { @@ -40,6 +42,7 @@ pub(crate) trait AccessTime { fn set_last_modified(&self, timestamp: Instant); } +#[derive(Debug)] pub(crate) struct KeyHash { pub(crate) key: Arc, pub(crate) hash: u64, @@ -60,60 +63,36 @@ impl Clone for KeyHash { } } -pub(crate) struct KeyDate { - key: Arc, - entry_info: TrioArc, -} - -impl KeyDate { - pub(crate) fn new(key: Arc, entry_info: &TrioArc) -> Self { - Self { - key, - entry_info: TrioArc::clone(entry_info), - } - } - - pub(crate) fn key(&self) -> &Arc { - &self.key - } - - #[cfg(any(feature = "sync", feature = "future"))] - pub(crate) fn last_modified(&self) -> Option { - self.entry_info.last_modified() - } - - #[cfg(any(feature = "sync", feature = "future"))] - pub(crate) fn is_dirty(&self) -> bool { - self.entry_info.is_dirty() - } -} - pub(crate) struct KeyHashDate { - key: Arc, - hash: u64, - entry_info: TrioArc, + entry_info: TrioArc>, } impl KeyHashDate { - pub(crate) fn new(kh: KeyHash, entry_info: &TrioArc) -> Self { + pub(crate) fn new(entry_info: &TrioArc>) -> Self { Self { - key: kh.key, - hash: kh.hash, entry_info: TrioArc::clone(entry_info), } } pub(crate) fn key(&self) -> &Arc { - &self.key + &self.entry_info.key_hash().key } pub(crate) fn hash(&self) -> u64 { - self.hash + self.entry_info.key_hash().hash } - pub(crate) fn entry_info(&self) -> &EntryInfo { + pub(crate) fn entry_info(&self) -> &EntryInfo { &self.entry_info } + + pub(crate) fn last_modified(&self) -> Option { + self.entry_info.last_modified() + } + + pub(crate) fn is_dirty(&self) -> bool { + self.entry_info.is_dirty() + } } pub(crate) struct KvEntry { @@ -127,28 +106,6 @@ impl KvEntry { } } -impl AccessTime for DeqNode> { - #[inline] - fn last_accessed(&self) -> Option { - None - } - - #[inline] - fn set_last_accessed(&self, _timestamp: Instant) { - unreachable!(); - } - - #[inline] - fn last_modified(&self) -> Option { - self.element.entry_info.last_modified() - } - - #[inline] - fn set_last_modified(&self, timestamp: Instant) { - self.element.entry_info.set_last_modified(timestamp); - } -} - impl AccessTime for DeqNode> { #[inline] fn last_accessed(&self) -> Option { @@ -162,12 +119,12 @@ impl AccessTime for DeqNode> { #[inline] fn last_modified(&self) -> Option { - None + self.element.entry_info.last_modified() } #[inline] - fn set_last_modified(&self, _timestamp: Instant) { - unreachable!(); + fn set_last_modified(&self, timestamp: Instant) { + self.element.entry_info.set_last_modified(timestamp); } } @@ -175,56 +132,65 @@ impl AccessTime for DeqNode> { type KeyDeqNodeAo = TagNonNull>, 2>; // DeqNode for the write order queue. -type KeyDeqNodeWo = NonNull>>; +type KeyDeqNodeWo = NonNull>>; + +// DeqNode for the timer wheel. +type DeqNodeTimer = NonNull>>; pub(crate) struct DeqNodes { access_order_q_node: Option>, write_order_q_node: Option>, + timer_node: Option>, +} + +impl Default for DeqNodes { + fn default() -> Self { + Self { + access_order_q_node: None, + write_order_q_node: None, + timer_node: None, + } + } } // We need this `unsafe impl` as DeqNodes have NonNull pointers. unsafe impl Send for DeqNodes {} +impl DeqNodes { + pub(crate) fn set_timer_node(&mut self, timer_node: Option>) { + self.timer_node = timer_node; + } +} + pub(crate) struct ValueEntry { pub(crate) value: V, - info: TrioArc, - nodes: Mutex>, + info: TrioArc>, + nodes: TrioArc>>, } impl ValueEntry { - pub(crate) fn new(value: V, entry_info: TrioArc) -> Self { + pub(crate) fn new(value: V, entry_info: TrioArc>) -> Self { #[cfg(feature = "unstable-debug-counters")] self::debug_counters::InternalGlobalDebugCounters::value_entry_created(); Self { value, info: entry_info, - nodes: Mutex::new(DeqNodes { - access_order_q_node: None, - write_order_q_node: None, - }), + nodes: TrioArc::new(Mutex::new(DeqNodes::default())), } } - pub(crate) fn new_from(value: V, entry_info: TrioArc, other: &Self) -> Self { + pub(crate) fn new_from(value: V, entry_info: TrioArc>, other: &Self) -> Self { #[cfg(feature = "unstable-debug-counters")] self::debug_counters::InternalGlobalDebugCounters::value_entry_created(); - - let nodes = { - let other_nodes = other.nodes.lock(); - DeqNodes { - access_order_q_node: other_nodes.access_order_q_node, - write_order_q_node: other_nodes.write_order_q_node, - } - }; Self { value, info: entry_info, - nodes: Mutex::new(nodes), + nodes: TrioArc::clone(&other.nodes), } } - pub(crate) fn entry_info(&self) -> &TrioArc { + pub(crate) fn entry_info(&self) -> &TrioArc> { &self.info } @@ -249,6 +215,10 @@ impl ValueEntry { self.info.policy_weight() } + pub(crate) fn deq_nodes(&self) -> &TrioArc>> { + &self.nodes + } + pub(crate) fn access_order_q_node(&self) -> Option> { self.nodes.lock().access_order_q_node } @@ -273,6 +243,18 @@ impl ValueEntry { self.nodes.lock().write_order_q_node.take() } + pub(crate) fn timer_node(&self) -> Option> { + self.nodes.lock().timer_node + } + + pub(crate) fn set_timer_node(&self, node: Option>) { + self.nodes.lock().timer_node = node; + } + + pub(crate) fn take_timer_node(&self) -> Option> { + self.nodes.lock().timer_node.take() + } + pub(crate) fn unset_q_nodes(&self) { let mut nodes = self.nodes.lock(); nodes.access_order_q_node = None; @@ -310,8 +292,12 @@ impl AccessTime for TrioArc> { } pub(crate) enum ReadOp { + Hit { + value_entry: TrioArc>, + timestamp: Instant, + is_expiry_modified: bool, + }, // u64 is the hash of the key. - Hit(u64, TrioArc>, Instant), Miss(u64), } diff --git a/src/common/concurrent/atomic_time/atomic_time.rs b/src/common/concurrent/atomic_time/atomic_time.rs index 45a85c2f..c24a895b 100644 --- a/src/common/concurrent/atomic_time/atomic_time.rs +++ b/src/common/concurrent/atomic_time/atomic_time.rs @@ -5,6 +5,7 @@ use std::{ sync::atomic::{AtomicU64, Ordering}, }; +#[derive(Debug)] pub(crate) struct AtomicInstant { instant: AtomicU64, } @@ -27,6 +28,10 @@ impl AtomicInstant { ai } + pub(crate) fn clear(&self) { + self.instant.store(u64::MAX, Ordering::Release); + } + pub(crate) fn is_set(&self) -> bool { self.instant.load(Ordering::Acquire) != u64::MAX } diff --git a/src/common/concurrent/atomic_time/atomic_time_compat.rs b/src/common/concurrent/atomic_time/atomic_time_compat.rs index 2a65cc46..d9e73ab3 100644 --- a/src/common/concurrent/atomic_time/atomic_time_compat.rs +++ b/src/common/concurrent/atomic_time/atomic_time_compat.rs @@ -2,6 +2,7 @@ use super::Instant; use parking_lot::RwLock; +#[derive(Debug)] pub(crate) struct AtomicInstant { instant: RwLock>, } @@ -21,6 +22,10 @@ impl AtomicInstant { ai } + pub(crate) fn clear(&self) { + *self.instant.write() = None; + } + pub(crate) fn is_set(&self) -> bool { self.instant.read().is_some() } diff --git a/src/common/concurrent/deques.rs b/src/common/concurrent/deques.rs index 3fdd74db..e2c75267 100644 --- a/src/common/concurrent/deques.rs +++ b/src/common/concurrent/deques.rs @@ -1,4 +1,4 @@ -use super::{KeyDate, KeyHashDate, ValueEntry}; +use super::{KeyHashDate, ValueEntry}; use crate::common::{ deque::{DeqNode, Deque}, CacheRegion, @@ -11,7 +11,7 @@ pub(crate) struct Deques { pub(crate) window: Deque>, // Not used yet. pub(crate) probation: Deque>, pub(crate) protected: Deque>, // Not used yet. - pub(crate) write_order: Deque>, + pub(crate) write_order: Deque>, } #[cfg(feature = "future")] @@ -51,7 +51,11 @@ impl Deques { entry.set_access_order_q_node(Some(tagged_node)); } - pub(crate) fn push_back_wo(&mut self, kd: KeyDate, entry: &TrioArc>) { + pub(crate) fn push_back_wo( + &mut self, + kd: KeyHashDate, + entry: &TrioArc>, + ) { let node = Box::new(DeqNode::new(kd)); let node = self.write_order.push_back(node); entry.set_write_order_q_node(Some(node)); @@ -107,7 +111,7 @@ impl Deques { } pub(crate) fn move_to_back_wo_in_deque( - deq: &mut Deque>, + deq: &mut Deque>, entry: &TrioArc>, ) { if let Some(node) = entry.write_order_q_node() { @@ -134,7 +138,7 @@ impl Deques { } } - pub(crate) fn unlink_wo(deq: &mut Deque>, entry: &TrioArc>) { + pub(crate) fn unlink_wo(deq: &mut Deque>, entry: &TrioArc>) { if let Some(node) = entry.take_write_order_q_node() { Self::unlink_node_wo(deq, node); } @@ -177,7 +181,10 @@ impl Deques { } } - pub(crate) fn unlink_node_wo(deq: &mut Deque>, node: NonNull>>) { + pub(crate) fn unlink_node_wo( + deq: &mut Deque>, + node: NonNull>>, + ) { unsafe { let p = node.as_ref(); if deq.contains(p) { diff --git a/src/common/concurrent/entry_info.rs b/src/common/concurrent/entry_info.rs index 82efc71e..60460203 100644 --- a/src/common/concurrent/entry_info.rs +++ b/src/common/concurrent/entry_info.rs @@ -1,9 +1,11 @@ use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; -use super::AccessTime; +use super::{AccessTime, KeyHash}; use crate::common::{concurrent::atomic_time::AtomicInstant, time::Instant}; -pub(crate) struct EntryInfo { +#[derive(Debug)] +pub(crate) struct EntryInfo { + key_hash: KeyHash, /// `is_admitted` indicates that the entry has been admitted to the /// cache. When `false`, it means the entry is _temporary_ admitted to /// the cache or evicted from the cache (so it should not have LRU nodes). @@ -14,24 +16,32 @@ pub(crate) struct EntryInfo { is_dirty: AtomicBool, last_accessed: AtomicInstant, last_modified: AtomicInstant, + expiration_time: AtomicInstant, policy_weight: AtomicU32, } -impl EntryInfo { +impl EntryInfo { #[inline] - pub(crate) fn new(timestamp: Instant, policy_weight: u32) -> Self { + pub(crate) fn new(key_hash: KeyHash, timestamp: Instant, policy_weight: u32) -> Self { #[cfg(feature = "unstable-debug-counters")] super::debug_counters::InternalGlobalDebugCounters::entry_info_created(); Self { + key_hash, is_admitted: Default::default(), is_dirty: AtomicBool::new(true), last_accessed: AtomicInstant::new(timestamp), last_modified: AtomicInstant::new(timestamp), + expiration_time: AtomicInstant::default(), policy_weight: AtomicU32::new(policy_weight), } } + #[inline] + pub(crate) fn key_hash(&self) -> &KeyHash { + &self.key_hash + } + #[inline] pub(crate) fn is_admitted(&self) -> bool { self.is_admitted.load(Ordering::Acquire) @@ -60,16 +70,29 @@ impl EntryInfo { pub(crate) fn set_policy_weight(&self, size: u32) { self.policy_weight.store(size, Ordering::Release); } + + #[inline] + pub(crate) fn expiration_time(&self) -> Option { + self.expiration_time.instant() + } + + pub(crate) fn set_expiration_time(&self, time: Option) { + if let Some(t) = time { + self.expiration_time.set_instant(t); + } else { + self.expiration_time.clear(); + } + } } #[cfg(feature = "unstable-debug-counters")] -impl Drop for EntryInfo { +impl Drop for EntryInfo { fn drop(&mut self) { super::debug_counters::InternalGlobalDebugCounters::entry_info_dropped(); } } -impl AccessTime for EntryInfo { +impl AccessTime for EntryInfo { #[inline] fn last_accessed(&self) -> Option { self.last_accessed.instant() @@ -135,12 +158,12 @@ mod test { }; let expected_sizes = match (arch, is_quanta_enabled) { - (Linux64, true) => vec![("1.51", 24)], - (Linux32, true) => vec![("1.51", 24)], - (MacOS64, true) => vec![("1.62", 24)], - (Linux64, false) => vec![("1.66", 56), ("1.51", 72)], - (Linux32, false) => vec![("1.66", 56), ("1.62", 72), ("1.51", 40)], - (MacOS64, false) => vec![("1.62", 56)], + (Linux64, true) => vec![("1.51", 48)], + (Linux32, true) => vec![("1.51", 48)], + (MacOS64, true) => vec![("1.62", 48)], + (Linux64, false) => vec![("1.66", 96), ("1.60", 120)], + (Linux32, false) => vec![("1.66", 96), ("1.62", 120), ("1.60", 72)], + (MacOS64, false) => vec![("1.62", 96)], }; let mut expected = None; @@ -152,7 +175,7 @@ mod test { } if let Some(size) = expected { - assert_eq!(size_of::(), size); + assert_eq!(size_of::>(), size); } else { panic!("No expected size for {:?} with Rust version {}", arch, ver); } diff --git a/src/common/deque.rs b/src/common/deque.rs index c0c2115c..424fa921 100644 --- a/src/common/deque.rs +++ b/src/common/deque.rs @@ -157,8 +157,7 @@ impl Deque { }) } - #[cfg(test)] - fn peek_back(&self) -> Option<&DeqNode> { + pub(crate) fn peek_back(&self) -> Option<&DeqNode> { // This method takes care not to create mutable references to whole nodes, // to maintain validity of aliasing pointers into `element`. self.tail.as_ref().map(|node| unsafe { node.as_ref() }) @@ -227,7 +226,9 @@ impl Deque { /// This method takes care not to create mutable references to `element`, to /// maintain validity of aliasing pointers. /// - /// Panics: + /// IMPORTANT: This method does not drop the node. If the node is no longer + /// needed, use `unlink_and_drop` instead, or drop it at the caller side. + /// Otherwise, the node will leak. pub(crate) unsafe fn unlink(&mut self, mut node: NonNull>) { if self.is_at_cursor(node.as_ref()) { self.advance_cursor(); @@ -265,7 +266,6 @@ impl Deque { std::mem::drop(Box::from_raw(node.as_ptr())); } - #[allow(unused)] pub(crate) fn reset_cursor(&mut self) { self.cursor = None; } diff --git a/src/common/time.rs b/src/common/time.rs index 163955d2..6014d3f6 100644 --- a/src/common/time.rs +++ b/src/common/time.rs @@ -7,18 +7,21 @@ pub(crate) mod clock; pub(crate) use clock::Clock; #[cfg(test)] -#[cfg(all(test, feature = "sync"))] pub(crate) use clock::Mock; /// a wrapper type over Instant to force checked additions and prevent /// unintentional overflow. The type preserve the Copy semantics for the wrapped -#[derive(PartialEq, PartialOrd, Clone, Copy)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)] pub(crate) struct Instant(clock::Instant); pub(crate) trait CheckedTimeOps { fn checked_add(&self, duration: Duration) -> Option where Self: Sized; + + fn checked_duration_since(&self, earlier: Self) -> Option + where + Self: Sized; } impl Instant { @@ -40,4 +43,11 @@ impl CheckedTimeOps for Instant { fn checked_add(&self, duration: Duration) -> Option { self.0.checked_add(duration).map(Instant) } + + fn checked_duration_since(&self, earlier: Self) -> Option + where + Self: Sized, + { + self.0.checked_duration_since(earlier.0) + } } diff --git a/src/common/time/clock_quanta.rs b/src/common/time/clock_quanta.rs index b26f04df..5b64d303 100644 --- a/src/common/time/clock_quanta.rs +++ b/src/common/time/clock_quanta.rs @@ -1,5 +1,5 @@ pub(crate) type Clock = quanta::Clock; pub(crate) type Instant = quanta::Instant; -#[cfg(all(test, feature = "sync"))] +#[cfg(test)] pub(crate) type Mock = quanta::Mock; diff --git a/src/common/timer_wheel.rs b/src/common/timer_wheel.rs new file mode 100644 index 00000000..60126207 --- /dev/null +++ b/src/common/timer_wheel.rs @@ -0,0 +1,829 @@ +// License and Copyright Notice: +// +// Some of the code and doc comments in this module were ported or copied from +// a Java class `com.github.benmanes.caffeine.cache.TimerWheel` of Caffeine. +// https://github.com/ben-manes/caffeine/blob/master/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java +// +// The original code/comments from Caffeine are licensed under the Apache License, +// Version 2.0 +// +// Copyrights of the original code/comments are retained by their contributors. +// For full authorship information, see the version control history of +// https://github.com/ben-manes/caffeine/ + +use std::{ptr::NonNull, time::Duration}; + +use super::{ + concurrent::{entry_info::EntryInfo, DeqNodes}, + deque::{DeqNode, Deque}, + time::{CheckedTimeOps, Instant}, +}; + +use parking_lot::Mutex; +use triomphe::Arc as TrioArc; + +const BUCKET_COUNTS: &[u64] = &[ + 64, // roughly seconds + 64, // roughly minutes + 32, // roughly hours + 4, // roughly days + 1, // overflow (> ~6.5 days) +]; + +const OVERFLOW_QUEUE_INDEX: usize = BUCKET_COUNTS.len() - 1; +const NUM_LEVELS: usize = OVERFLOW_QUEUE_INDEX - 1; + +const DAY: Duration = Duration::from_secs(60 * 60 * 24); + +const SPANS: &[u64] = &[ + aligned_duration(Duration::from_secs(1)), // 1.07s + aligned_duration(Duration::from_secs(60)), // 1.14m + aligned_duration(Duration::from_secs(60 * 60)), // 1.22h + aligned_duration(DAY), // 1.63d + BUCKET_COUNTS[3] * aligned_duration(DAY), // 6.5d + BUCKET_COUNTS[3] * aligned_duration(DAY), // 6.5d +]; + +const SHIFT: &[u64] = &[ + SPANS[0].trailing_zeros() as u64, + SPANS[1].trailing_zeros() as u64, + SPANS[2].trailing_zeros() as u64, + SPANS[3].trailing_zeros() as u64, + SPANS[4].trailing_zeros() as u64, +]; + +/// Returns the next power of two of the duration in nanoseconds. +const fn aligned_duration(duration: Duration) -> u64 { + // NOTE: as_nanos() returns u128, so convert it to u64 by using `as`. + // We cannot call TryInto::try_into() here because it is not a const fn. + (duration.as_nanos() as u64).next_power_of_two() +} + +/// A timer node stored in a bucket of a timer wheel. +pub(crate) enum TimerNode { + /// A sentinel node that is used to mark the end of a timer wheel bucket. + Sentinel, + /// A timer entry that is holding Arc pointers to the data structures in a cache + /// entry. + Entry { + /// The position (level and index) of the timer wheel bucket. + pos: Option<(u8, u8)>, + /// An Arc pointer to the `EntryInfo` of the cache entry (`ValueEntry`). + entry_info: TrioArc>, + /// An Arc pointer to the `DeqNodes` of the cache entry (`ValueEntry`). + deq_nodes: TrioArc>>, + }, +} + +impl TimerNode { + fn new( + entry_info: TrioArc>, + deq_nodes: TrioArc>>, + level: usize, + index: usize, + ) -> Self { + Self::Entry { + pos: Some((level as u8, index as u8)), + entry_info, + deq_nodes, + } + } + + /// Returns the position (level and index) of the timer wheel bucket. + fn position(&self) -> Option<(usize, usize)> { + if let Self::Entry { pos, .. } = &self { + pos.map(|(level, index)| (level as usize, index as usize)) + } else { + unreachable!() + } + } + + fn set_position(&mut self, level: usize, index: usize) { + if let Self::Entry { pos, .. } = self { + *pos = Some((level as u8, index as u8)); + } else { + unreachable!() + } + } + + fn unset_position(&mut self) { + if let Self::Entry { pos, .. } = self { + *pos = None; + } else { + unreachable!() + } + } + + fn is_sentinel(&self) -> bool { + matches!(self, Self::Sentinel) + } + + pub(crate) fn entry_info(&self) -> &TrioArc> { + if let Self::Entry { entry_info, .. } = &self { + entry_info + } else { + unreachable!() + } + } + + fn unset_timer_node_in_deq_nodes(&self) { + if let Self::Entry { deq_nodes, .. } = &self { + deq_nodes.lock().set_timer_node(None); + } else { + unreachable!(); + } + } +} + +type Bucket = Deque>; + +#[must_use = "this `ReschedulingResult` may be an `Removed` variant, which should be handled"] +pub(crate) enum ReschedulingResult { + /// The timer event was rescheduled. + Rescheduled, + /// The timer event was not rescheduled because the entry has no expiration time. + Removed(Box>>), +} + +/// A hierarchical timer wheel to add, remove, and fire expiration events in +/// amortized O(1) time. +/// +/// The expiration events are deferred until the timer is advanced, which is +/// performed as part of the cache's housekeeping cycle. +pub(crate) struct TimerWheel { + /// The hierarchical timer wheels. + wheels: Box<[Box<[Bucket]>]>, + /// The time when this `TimerWheel` was created. + origin: Instant, + /// The time when this `TimerWheel` was last advanced. + current: Instant, +} + +#[cfg(feature = "future")] +// TODO: https://github.com/moka-rs/moka/issues/54 +#[allow(clippy::non_send_fields_in_send_ty)] +// Multi-threaded async runtimes require base_cache::Inner to be Send, but it will +// not be without this `unsafe impl`. This is because DeqNodes have NonNull +// pointers. +unsafe impl Send for TimerWheel {} + +impl TimerWheel { + pub(crate) fn new(now: Instant) -> Self { + Self { + wheels: Default::default(), // Empty. + origin: now, + current: now, + } + } + + #[cfg(test)] + pub(crate) fn set_origin(&mut self, time: Instant) { + self.origin = time; + self.current = time; + } + + pub(crate) fn is_enabled(&self) -> bool { + !self.wheels.is_empty() + } + + pub(crate) fn enable(&mut self) { + assert!(!self.is_enabled()); + + // Populate each bucket with a queue having a sentinel node. + self.wheels = BUCKET_COUNTS + .iter() + .map(|b| { + (0..*b) + .map(|_| { + let mut deq = Deque::new(super::CacheRegion::Other); + deq.push_back(Box::new(DeqNode::new(TimerNode::Sentinel))); + deq + }) + .collect::>() + .into_boxed_slice() + }) + .collect::>() + .into_boxed_slice(); + } + + /// Schedules a timer event for the node. + pub(crate) fn schedule( + &mut self, + entry_info: TrioArc>, + deq_nodes: TrioArc>>, + ) -> Option>>> { + debug_assert!(self.is_enabled()); + + if let Some(t) = entry_info.expiration_time() { + let (level, index) = self.bucket_indices(t); + let node = Box::new(DeqNode::new(TimerNode::new( + entry_info, deq_nodes, level, index, + ))); + let node = self.wheels[level][index].push_back(node); + Some(node) + } else { + None + } + } + + fn schedule_existing_node( + &mut self, + mut node: NonNull>>, + ) -> ReschedulingResult { + debug_assert!(self.is_enabled()); + + // Since cache entry's ValueEntry has a pointer to this node, we must reuse + // the node. + // + // SAFETY on `node.as_mut()`: The self (`TimerWheel`) is the only owner of + // the node, and we have `&mut self` here. We are the only one who can mutate + // the node. + if let entry @ TimerNode::Entry { .. } = &mut unsafe { node.as_mut() }.element { + if let Some(t) = entry.entry_info().expiration_time() { + let (level, index) = self.bucket_indices(t); + entry.set_position(level, index); + let node = unsafe { Box::from_raw(node.as_ptr()) }; + self.wheels[level][index].push_back(node); + ReschedulingResult::Rescheduled + } else { + entry.unset_position(); + entry.unset_timer_node_in_deq_nodes(); + ReschedulingResult::Removed(unsafe { Box::from_raw(node.as_ptr()) }) + } + } else { + unreachable!() + } + } + + /// Reschedules an active timer event for the node. + pub(crate) fn reschedule( + &mut self, + node: NonNull>>, + ) -> ReschedulingResult { + debug_assert!(self.is_enabled()); + unsafe { self.unlink_timer(node) }; + self.schedule_existing_node(node) + } + + /// Removes a timer event for this node if present. + pub(crate) fn deschedule(&mut self, node: NonNull>>) { + debug_assert!(self.is_enabled()); + unsafe { + self.unlink_timer(node); + Self::drop_node(node); + } + } + + /// Removes a timer event for this node if present. + /// + /// IMPORTANT: This method does not drop the node. + unsafe fn unlink_timer(&mut self, mut node: NonNull>>) { + // SAFETY: The self (`TimerWheel`) is the only owner of the node, and we have + // `&mut self` here. We are the only one who can mutate the node. + let p = node.as_mut(); + if let entry @ TimerNode::Entry { .. } = &mut p.element { + if let Some((level, index)) = entry.position() { + self.wheels[level][index].unlink(node); + entry.unset_position(); + } + } else { + unreachable!(); + } + } + + unsafe fn drop_node(node: NonNull>>) { + std::mem::drop(Box::from_raw(node.as_ptr())); + } + + /// Advances the timer wheel to the current time, and returns an iterator over + /// timer events. + pub(crate) fn advance( + &mut self, + current_time: Instant, + ) -> impl Iterator> + '_ { + debug_assert!(self.is_enabled()); + + let previous_time = self.current; + self.current = current_time; + TimerEventsIter::new(self, previous_time, current_time) + } + + /// Returns a pointer to the timer event (cache entry) at the front of the queue. + /// Returns `None` if the front node is a sentinel. + fn pop_timer_node(&mut self, level: usize, index: usize) -> Option>>> { + if let Some(node) = self.wheels[level][index].peek_front() { + if node.element.is_sentinel() { + return None; + } + } + + self.wheels[level][index].pop_front() + } + + /// Reset the positions of the nodes in the queue at the given level and index. + fn reset_timer_node_positions(&mut self, level: usize, index: usize) { + if let Some(node) = self.wheels[level][index].peek_back() { + if node.element.is_sentinel() { + // The sentinel is at the back of the queue. We are already set. + return; + } + } else { + panic!( + "BUG: The queue is empty. level: {}, index: {}", + level, index + ) + } + + // Rotate the nodes in the queue until we see the sentinel at the back of the + // queue. + loop { + // Safe to unwrap because we already checked the queue is not empty. + let node = self.wheels[level][index].pop_front().unwrap(); + let is_sentinel = node.element.is_sentinel(); + + // Move the front node to the back. + self.wheels[level][index].push_back(node); + + // If the node we just moved was the sentinel, we are done. + if is_sentinel { + break; + } + } + } + + /// Returns the bucket indices to locate the bucket that the timer event + /// should be added to. + fn bucket_indices(&self, time: Instant) -> (usize, usize) { + let duration_nanos = self.duration_nanos_since_last_advanced(time); + let time_nanos = self.time_nanos(time); + for level in 0..=NUM_LEVELS { + if duration_nanos < SPANS[level + 1] { + let ticks = time_nanos >> SHIFT[level]; + let index = ticks & (BUCKET_COUNTS[level] - 1); + return (level, index as usize); + } + } + (OVERFLOW_QUEUE_INDEX, 0) + } + + // Returns nano-seconds between the given `time` and the time when this timer + // wheel was advanced. If the `time` is earlier than other, returns zero. + fn duration_nanos_since_last_advanced(&self, time: Instant) -> u64 { + time.checked_duration_since(self.current) + // If `time` is earlier than `self.current`, use zero. This could happen + // when a user provided `Expiry` method returned zero or a very short + // duration. + .unwrap_or_default() // Assuming `Duration::default()` returns `ZERO`. + .as_nanos() as u64 + } + + // Returns nano-seconds between the given `time` and `self.origin`, the time when + // this timer wheel was created. + // + // - If the `time` is earlier than other, returns zero. + // - If the `time` is later than `self.origin + u64::MAX`, returns `u64::MAX`, + // which is ~584 years in nanoseconds. + // + fn time_nanos(&self, time: Instant) -> u64 { + // `TryInto` will be in the prelude starting in Rust 2021 Edition. + use std::convert::TryInto; + + let nanos_u128 = time + .checked_duration_since(self.origin) + // If `time` is earlier than `self.origin`, use zero. This would never + // happen in practice as there should be some delay between the timer + // wheel was created and the first timer event is scheduled. But we will + // do this just in case. + .unwrap_or_default() // Assuming `Duration::default()` returns `ZERO`. + .as_nanos(); + + // Convert an `u128` into an `u64`. If the value is too large, use `u64::MAX` + // (~584 years) + nanos_u128.try_into().unwrap_or(u64::MAX) + } +} + +/// A timer event, which is either an expired/rescheduled cache entry, or a +/// descheduled timer. `TimerWheel::advance` method returns an iterator over timer +/// events. +#[derive(Debug)] +pub(crate) enum TimerEvent { + /// This cache entry has expired. + Expired(Box>>), + // This cache entry has been rescheduled. Rescheduling includes moving a timer + // from one wheel to another in a lower level of the hierarchy. (This variant + // is mainly used for testing) + Rescheduled(TrioArc>), + /// This timer node (containing a cache entry) has been removed from the timer. + /// (This variant is mainly used for testing) + Descheduled(Box>>), +} + +/// An iterator over expired cache entries. +pub(crate) struct TimerEventsIter<'iter, K> { + timer_wheel: &'iter mut TimerWheel, + previous_time: Instant, + current_time: Instant, + is_done: bool, + level: usize, + index: u8, + end_index: u8, + index_mask: u64, + is_new_level: bool, + is_new_index: bool, +} + +impl<'iter, K> TimerEventsIter<'iter, K> { + fn new( + timer_wheel: &'iter mut TimerWheel, + previous_time: Instant, + current_time: Instant, + ) -> Self { + Self { + timer_wheel, + previous_time, + current_time, + is_done: false, + level: 0, + index: 0, + end_index: 0, + index_mask: 0, + is_new_level: true, + is_new_index: true, + } + } +} + +impl<'iter, K> Drop for TimerEventsIter<'iter, K> { + fn drop(&mut self) { + if !self.is_done { + // This iterator was dropped before consuming all events. Reset the + // `current` to the time when the timer wheel was last successfully + // advanced. + self.timer_wheel.current = self.previous_time; + } + } +} + +impl<'iter, K> Iterator for TimerEventsIter<'iter, K> { + type Item = TimerEvent; + + /// NOTE: When necessary, this iterator will unset the timer node pointer in the + /// `ValueEntry`. + fn next(&mut self) -> Option { + if self.is_done { + return None; + } + + loop { + if self.is_new_level { + let previous_time_nanos = self.timer_wheel.time_nanos(self.previous_time); + let current_time_nanos = self.timer_wheel.time_nanos(self.current_time); + let previous_ticks = previous_time_nanos >> SHIFT[self.level]; + let current_ticks = current_time_nanos >> SHIFT[self.level]; + + if current_ticks <= previous_ticks { + self.is_done = true; + return None; + } + + self.index_mask = BUCKET_COUNTS[self.level] - 1; + self.index = (previous_ticks & self.index_mask) as u8; + let steps = + (current_ticks - previous_ticks + 1).min(BUCKET_COUNTS[self.level]) as u8; + self.end_index = self.index + steps; + + self.is_new_level = false; + self.is_new_index = true; + + // dbg!(self.level, self.index, self.end_index); + } + + let i = self.index & self.index_mask as u8; + + if self.is_new_index { + // Move the sentinel to the back of the queue. + self.timer_wheel + .reset_timer_node_positions(self.level, i as usize); + + self.is_new_index = false; + } + + // Pop the next timer event (cache entry) from the queue at the current + // level and index. + // + // We will repeat processing this level until we see the sentinel. + // (`pop_timer_node` will return `None` when it sees the sentinel) + match self.timer_wheel.pop_timer_node(self.level, i as usize) { + Some(node) => { + let expiration_time = node.as_ref().element.entry_info().expiration_time(); + if let Some(t) = expiration_time { + if t <= self.current_time { + // The cache entry has expired. Unset the timer node from + // the ValueEntry and return the node. + node.as_ref().element.unset_timer_node_in_deq_nodes(); + return Some(TimerEvent::Expired(node)); + } else { + // The cache entry has not expired. Reschedule it. + let node_p = NonNull::new(Box::into_raw(node)).expect("Got a null ptr"); + match self.timer_wheel.schedule_existing_node(node_p) { + ReschedulingResult::Rescheduled => { + let entry_info = + unsafe { node_p.as_ref() }.element.entry_info(); + return Some(TimerEvent::Rescheduled(TrioArc::clone( + entry_info, + ))); + } + ReschedulingResult::Removed(node) => { + // The timer event has been removed from the timer + // wheel. Unset the timer node from the ValueEntry. + node.as_ref().element.unset_timer_node_in_deq_nodes(); + return Some(TimerEvent::Descheduled(node)); + } + } + } + } + } + // Done with the current queue (`None` means we just saw the + // sentinel). Move to the next index and/or next level. + None => { + self.index += 1; + self.is_new_index = true; + + if self.index >= self.end_index { + self.level += 1; + // No more levels to process. We are done. + if self.level >= BUCKET_COUNTS.len() { + self.is_done = true; + return None; + } + self.is_new_level = true; + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::{sync::Arc, time::Duration}; + + use super::{TimerEvent, TimerWheel, SPANS}; + use crate::common::{ + concurrent::{entry_info::EntryInfo, KeyHash}, + time::{CheckedTimeOps, Clock, Instant, Mock}, + }; + + use triomphe::Arc as TrioArc; + + #[test] + fn test_bucket_indices() { + fn bi(timer: &TimerWheel<()>, now: Instant, dur: Duration) -> (usize, usize) { + let t = now.checked_add(dur).unwrap(); + timer.bucket_indices(t) + } + + let (clock, mock) = Clock::mock(); + let now = now(&clock); + + let mut timer = TimerWheel::<()>::new(now); + timer.enable(); + + assert_eq!(timer.bucket_indices(now), (0, 0)); + + // Level 0: 1.07s + assert_eq!(bi(&timer, now, n2d(SPANS[0] - 1)), (0, 0)); + assert_eq!(bi(&timer, now, n2d(SPANS[0])), (0, 1)); + assert_eq!(bi(&timer, now, n2d(SPANS[0] * 63)), (0, 63)); + + // Level 1: 1.14m + assert_eq!(bi(&timer, now, n2d(SPANS[0] * 64)), (1, 1)); + assert_eq!(bi(&timer, now, n2d(SPANS[1])), (1, 1)); + assert_eq!(bi(&timer, now, n2d(SPANS[1] * 63 + SPANS[0] * 63)), (1, 63)); + + // Level 2: 1.22h + assert_eq!(bi(&timer, now, n2d(SPANS[1] * 64)), (2, 1)); + assert_eq!(bi(&timer, now, n2d(SPANS[2])), (2, 1)); + assert_eq!( + bi( + &timer, + now, + n2d(SPANS[2] * 31 + SPANS[1] * 63 + SPANS[0] * 63) + ), + (2, 31) + ); + + // Level 3: 1.63dh + assert_eq!(bi(&timer, now, n2d(SPANS[2] * 32)), (3, 1)); + assert_eq!(bi(&timer, now, n2d(SPANS[3])), (3, 1)); + assert_eq!(bi(&timer, now, n2d(SPANS[3] * 3)), (3, 3)); + + // Overflow + assert_eq!(bi(&timer, now, n2d(SPANS[3] * 4)), (4, 0)); + assert_eq!(bi(&timer, now, n2d(SPANS[4])), (4, 0)); + assert_eq!(bi(&timer, now, n2d(SPANS[4] * 100)), (4, 0)); + + // Increment the clock by 5 ticks. (1 tick ~= 1.07s) + let now = advance_clock(&clock, &mock, n2d(SPANS[0] * 5)); + timer.current = now; + + // Level 0: 1.07s + assert_eq!(bi(&timer, now, n2d(SPANS[0] - 1)), (0, 5)); + assert_eq!(bi(&timer, now, n2d(SPANS[0])), (0, 6)); + assert_eq!(bi(&timer, now, n2d(SPANS[0] * 63)), (0, 4)); + + // Level 1: 1.14m + assert_eq!(bi(&timer, now, n2d(SPANS[0] * 64)), (1, 1)); + assert_eq!(bi(&timer, now, n2d(SPANS[1])), (1, 1)); + assert_eq!( + bi(&timer, now, n2d(SPANS[1] * 63 + SPANS[0] * (63 - 5))), + (1, 63) + ); + + // Increment the clock by 61 ticks. (total 66 ticks) + let now = advance_clock(&clock, &mock, n2d(SPANS[0] * 61)); + timer.current = now; + + // Level 0: 1.07s + assert_eq!(bi(&timer, now, n2d(SPANS[0] - 1)), (0, 2)); + assert_eq!(bi(&timer, now, n2d(SPANS[0])), (0, 3)); + assert_eq!(bi(&timer, now, n2d(SPANS[0] * 63)), (0, 1)); + + // Level 1: 1.14m + assert_eq!(bi(&timer, now, n2d(SPANS[0] * 64)), (1, 2)); + assert_eq!(bi(&timer, now, n2d(SPANS[1])), (1, 2)); + assert_eq!( + bi(&timer, now, n2d(SPANS[1] * 63 + SPANS[0] * (63 - 2))), + (1, 0) + ); + } + + #[test] + fn test_advance() { + fn schedule_timer(timer: &mut TimerWheel, key: u32, now: Instant, ttl: Duration) { + let hash = key as u64; + let key_hash = KeyHash::new(Arc::new(key), hash); + let policy_weight = 0; + let entry_info = TrioArc::new(EntryInfo::new(key_hash, now, policy_weight)); + entry_info.set_expiration_time(Some(now.checked_add(ttl).unwrap())); + let deq_nodes = Default::default(); + let timer_node = timer.schedule(entry_info, TrioArc::clone(&deq_nodes)); + deq_nodes.lock().set_timer_node(timer_node); + } + + fn expired_key(maybe_entry: Option>) -> u32 { + let entry = maybe_entry.expect("entry is none"); + match entry { + TimerEvent::Expired(node) => *node.element.entry_info().key_hash().key, + _ => panic!("Expected an expired entry. Got {:?}", entry), + } + } + + fn rescheduled_key(maybe_entry: Option>) -> u32 { + let entry = maybe_entry.expect("entry is none"); + match entry { + TimerEvent::Rescheduled(entry) => *entry.key_hash().key, + _ => panic!("Expected a rescheduled entry. Got {:?}", entry), + } + } + + let (clock, mock) = Clock::mock(); + let now = advance_clock(&clock, &mock, s2d(10)); + + let mut timer = TimerWheel::::new(now); + timer.enable(); + + // Add timers that will expire in some seconds. + schedule_timer(&mut timer, 1, now, s2d(5)); + schedule_timer(&mut timer, 2, now, s2d(1)); + schedule_timer(&mut timer, 3, now, s2d(63)); + schedule_timer(&mut timer, 4, now, s2d(3)); + + let now = advance_clock(&clock, &mock, s2d(4)); + let mut expired_entries = timer.advance(now); + assert_eq!(expired_key(expired_entries.next()), 2); + assert_eq!(expired_key(expired_entries.next()), 4); + assert!(expired_entries.next().is_none()); + drop(expired_entries); + + let now = advance_clock(&clock, &mock, s2d(4)); + let mut expired_entries = timer.advance(now); + assert_eq!(expired_key(expired_entries.next()), 1); + assert!(expired_entries.next().is_none()); + drop(expired_entries); + + let now = advance_clock(&clock, &mock, s2d(64 - 8)); + let mut expired_entries = timer.advance(now); + assert_eq!(expired_key(expired_entries.next()), 3); + assert!(expired_entries.next().is_none()); + drop(expired_entries); + + // Add timers that will expire in some minutes. + const MINUTES: u64 = 60; + schedule_timer(&mut timer, 1, now, s2d(5 * MINUTES)); + #[allow(clippy::identity_op)] + schedule_timer(&mut timer, 2, now, s2d(1 * MINUTES)); + schedule_timer(&mut timer, 3, now, s2d(63 * MINUTES)); + schedule_timer(&mut timer, 4, now, s2d(3 * MINUTES)); + + let now = advance_clock(&clock, &mock, s2d(4 * MINUTES)); + let mut expired_entries = timer.advance(now); + assert_eq!(expired_key(expired_entries.next()), 2); + assert_eq!(expired_key(expired_entries.next()), 4); + assert!(expired_entries.next().is_none()); + drop(expired_entries); + + let now = advance_clock(&clock, &mock, s2d(4 * MINUTES)); + let mut expired_entries = timer.advance(now); + assert_eq!(expired_key(expired_entries.next()), 1); + assert!(expired_entries.next().is_none()); + drop(expired_entries); + + let now = advance_clock(&clock, &mock, s2d((64 - 8) * MINUTES)); + let mut expired_entries = timer.advance(now); + assert_eq!(expired_key(expired_entries.next()), 3); + assert!(expired_entries.next().is_none()); + drop(expired_entries); + + // Add timers that will expire in some hours. + const HOURS: u64 = 60 * 60; + schedule_timer(&mut timer, 1, now, s2d(5 * HOURS)); + #[allow(clippy::identity_op)] + schedule_timer(&mut timer, 2, now, s2d(1 * HOURS)); + schedule_timer(&mut timer, 3, now, s2d(31 * HOURS)); + schedule_timer(&mut timer, 4, now, s2d(3 * HOURS)); + + let now = advance_clock(&clock, &mock, s2d(4 * HOURS)); + let mut expired_entries = timer.advance(now); + assert_eq!(expired_key(expired_entries.next()), 2); + assert_eq!(expired_key(expired_entries.next()), 4); + assert_eq!(rescheduled_key(expired_entries.next()), 1); + assert!(expired_entries.next().is_none()); + drop(expired_entries); + + let now = advance_clock(&clock, &mock, s2d(4 * HOURS)); + let mut expired_entries = timer.advance(now); + assert_eq!(expired_key(expired_entries.next()), 1); + assert!(expired_entries.next().is_none()); + drop(expired_entries); + + let now = advance_clock(&clock, &mock, s2d((32 - 8) * HOURS)); + let mut expired_entries = timer.advance(now); + assert_eq!(expired_key(expired_entries.next()), 3); + assert!(expired_entries.next().is_none()); + drop(expired_entries); + + // Add timers that will expire in a few days. + const DAYS: u64 = 24 * 60 * 60; + schedule_timer(&mut timer, 1, now, s2d(5 * DAYS)); + #[allow(clippy::identity_op)] + schedule_timer(&mut timer, 2, now, s2d(1 * DAYS)); + schedule_timer(&mut timer, 3, now, s2d(2 * DAYS)); + // Longer than ~6.5 days, so this should be stored in the overflow area. + schedule_timer(&mut timer, 4, now, s2d(8 * DAYS)); + + let now = advance_clock(&clock, &mock, s2d(3 * DAYS)); + let mut expired_entries = timer.advance(now); + assert_eq!(expired_key(expired_entries.next()), 2); + assert_eq!(expired_key(expired_entries.next()), 3); + assert!(expired_entries.next().is_none()); + drop(expired_entries); + + let now = advance_clock(&clock, &mock, s2d(3 * DAYS)); + let mut expired_entries = timer.advance(now); + assert_eq!(expired_key(expired_entries.next()), 1); + assert_eq!(rescheduled_key(expired_entries.next()), 4); + assert!(expired_entries.next().is_none()); + drop(expired_entries); + + let now = advance_clock(&clock, &mock, s2d(3 * DAYS)); + let mut expired_entries = timer.advance(now); + assert_eq!(expired_key(expired_entries.next()), 4); + assert!(expired_entries.next().is_none()); + drop(expired_entries); + } + + // + // Utility functions + // + + fn now(clock: &Clock) -> Instant { + Instant::new(clock.now()) + } + + fn advance_clock(clock: &Clock, mock: &Arc, duration: Duration) -> Instant { + mock.increment(duration); + now(clock) + } + + /// Convert nano-seconds to duration. + fn n2d(nanos: u64) -> Duration { + Duration::from_nanos(nanos) + } + + /// Convert seconds to duration. + fn s2d(secs: u64) -> Duration { + Duration::from_secs(secs) + } +} diff --git a/src/future/builder.rs b/src/future/builder.rs index 4157e967..95b9cb59 100644 --- a/src/future/builder.rs +++ b/src/future/builder.rs @@ -2,6 +2,8 @@ use super::Cache; use crate::{ common::{builder_utils, concurrent::Weigher}, notification::{self, DeliveryMode, EvictionListener, RemovalCause}, + policy::ExpirationPolicy, + Expiry, }; use std::{ @@ -22,7 +24,7 @@ use std::{ /// // Cargo.toml /// // /// // [dependencies] -/// // moka = { version = "0.10", features = ["future"] } +/// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// // futures = "0.3" /// @@ -60,8 +62,7 @@ pub struct CacheBuilder { weigher: Option>, eviction_listener: Option>, eviction_listener_conf: Option, - time_to_live: Option, - time_to_idle: Option, + expiration_policy: ExpirationPolicy, invalidator_enabled: bool, cache_type: PhantomData, } @@ -79,8 +80,7 @@ where weigher: None, eviction_listener: None, eviction_listener_conf: None, - time_to_live: None, - time_to_idle: None, + expiration_policy: Default::default(), invalidator_enabled: false, cache_type: Default::default(), } @@ -110,7 +110,8 @@ where /// expiration. pub fn build(self) -> Cache { let build_hasher = RandomState::default(); - builder_utils::ensure_expirations_or_panic(self.time_to_live, self.time_to_idle); + let exp = &self.expiration_policy; + builder_utils::ensure_expirations_or_panic(exp.time_to_live(), exp.time_to_idle()); Cache::with_everything( self.name, self.max_capacity, @@ -119,8 +120,7 @@ where self.weigher, self.eviction_listener, self.eviction_listener_conf, - self.time_to_live, - self.time_to_idle, + self.expiration_policy, self.invalidator_enabled, builder_utils::housekeeper_conf(true), ) @@ -208,7 +208,8 @@ where where S: BuildHasher + Clone + Send + Sync + 'static, { - builder_utils::ensure_expirations_or_panic(self.time_to_live, self.time_to_idle); + let exp = &self.expiration_policy; + builder_utils::ensure_expirations_or_panic(exp.time_to_live(), exp.time_to_idle()); Cache::with_everything( self.name, self.max_capacity, @@ -217,8 +218,7 @@ where self.weigher, self.eviction_listener, self.eviction_listener_conf, - self.time_to_live, - self.time_to_idle, + self.expiration_policy, self.invalidator_enabled, builder_utils::housekeeper_conf(true), ) @@ -302,10 +302,9 @@ impl CacheBuilder { /// than 1000 years. This is done to protect against overflow when computing key /// expiration. pub fn time_to_live(self, duration: Duration) -> Self { - Self { - time_to_live: Some(duration), - ..self - } + let mut builder = self; + builder.expiration_policy.set_time_to_live(duration); + builder } /// Sets the time to idle of the cache. @@ -319,10 +318,15 @@ impl CacheBuilder { /// than 1000 years. This is done to protect against overflow when computing key /// expiration. pub fn time_to_idle(self, duration: Duration) -> Self { - Self { - time_to_idle: Some(duration), - ..self - } + let mut builder = self; + builder.expiration_policy.set_time_to_idle(duration); + builder + } + + pub fn expire_after(self, expiry: impl Expiry + Send + Sync + 'static) -> Self { + let mut builder = self; + builder.expiration_policy.set_expiry(Arc::new(expiry)); + builder } /// Enables support for [Cache::invalidate_entries_if][cache-invalidate-if] diff --git a/src/future/cache.rs b/src/future/cache.rs index 3673f76a..b0168045 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -13,6 +13,7 @@ use crate::{ time::Instant, }, notification::{self, EvictionListener}, + policy::ExpirationPolicy, sync_base::base_cache::{BaseCache, HouseKeeperArc}, Entry, Policy, PredicateError, }; @@ -79,7 +80,7 @@ use std::{ /// // Cargo.toml /// // /// // [dependencies] -/// // moka = { version = "0.10", features = ["future"] } +/// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// // futures-util = "0.3" /// @@ -163,7 +164,7 @@ use std::{ /// // Cargo.toml /// // /// // [dependencies] -/// // moka = { version = "0.10", features = ["future"] } +/// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// // futures-util = "0.3" /// @@ -225,7 +226,7 @@ use std::{ /// // Cargo.toml /// // /// // [dependencies] -/// // moka = { version = "0.10", features = ["future"] } +/// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// // futures-util = "0.3" /// @@ -589,7 +590,7 @@ impl Cache { /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.10", features = ["future"] } + /// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// use moka::future::Cache; /// @@ -661,8 +662,7 @@ where None, None, None, - None, - None, + Default::default(), false, housekeeper::Configuration::new_thread_pool(true), ) @@ -693,8 +693,7 @@ where weigher: Option>, eviction_listener: Option>, eviction_listener_conf: Option, - time_to_live: Option, - time_to_idle: Option, + expiration_policy: ExpirationPolicy, invalidator_enabled: bool, housekeeper_conf: housekeeper::Configuration, ) -> Self { @@ -707,8 +706,7 @@ where weigher, eviction_listener, eviction_listener_conf, - time_to_live, - time_to_idle, + expiration_policy, invalidator_enabled, housekeeper_conf, ), @@ -763,7 +761,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.10", features = ["future"] } + /// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// /// use moka::future::Cache; @@ -803,7 +801,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.10", features = ["future"] } + /// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// /// use moka::future::Cache; @@ -848,7 +846,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.10", features = ["future"] } + /// // moka = { version = "0.11", features = ["future"] } /// // futures-util = "0.3" /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// use moka::future::Cache; @@ -975,7 +973,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.10", features = ["future"] } + /// // moka = { version = "0.11", features = ["future"] } /// // futures-util = "0.3" /// // reqwest = "0.11" /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } @@ -1099,7 +1097,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.10", features = ["future"] } + /// // moka = { version = "0.11", features = ["future"] } /// // futures-util = "0.3" /// // reqwest = "0.11" /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } @@ -1369,7 +1367,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.10", features = ["future"] } + /// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// use moka::future::Cache; /// @@ -1448,7 +1446,7 @@ where ) -> Entry { let maybe_entry = self.base - .get_with_hash_but_ignore_if(&key, hash, replace_if.as_mut(), need_key); + .get_with_hash_and_ignore_if(&key, hash, replace_if.as_mut(), need_key); if let Some(entry) = maybe_entry { entry } else { @@ -1471,7 +1469,7 @@ where { let maybe_entry = self.base - .get_with_hash_but_ignore_if(key, hash, replace_if.as_mut(), need_key); + .get_with_hash_and_ignore_if(key, hash, replace_if.as_mut(), need_key); if let Some(entry) = maybe_entry { entry } else { @@ -1493,7 +1491,7 @@ where let get = || { self.base - .get_with_hash_but_no_recording(&key, hash, replace_if.as_mut()) + .get_with_hash_without_recording(&key, hash, replace_if.as_mut()) }; let insert = |v| self.insert_with_hash(key.clone(), hash, v).boxed(); @@ -1615,7 +1613,7 @@ where let get = || { let ignore_if = None as Option<&mut fn(&V) -> bool>; self.base - .get_with_hash_but_no_recording(&key, hash, ignore_if) + .get_with_hash_without_recording(&key, hash, ignore_if) }; let insert = |v| self.insert_with_hash(key.clone(), hash, v).boxed(); @@ -1698,7 +1696,7 @@ where let get = || { let ignore_if = None as Option<&mut fn(&V) -> bool>; self.base - .get_with_hash_but_no_recording(&key, hash, ignore_if) + .get_with_hash_without_recording(&key, hash, ignore_if) }; let insert = |v| self.insert_with_hash(key.clone(), hash, v).boxed(); @@ -1860,11 +1858,18 @@ where #[cfg(test)] mod tests { use super::{Cache, ConcurrentCacheExt}; - use crate::{common::time::Clock, notification::RemovalCause}; + use crate::{ + common::time::Clock, notification::RemovalCause, policy::test_utils::ExpiryCallCounters, + Expiry, + }; use async_io::Timer; use parking_lot::Mutex; - use std::{convert::Infallible, sync::Arc, time::Duration}; + use std::{ + convert::Infallible, + sync::Arc, + time::{Duration, Instant as StdInstant}, + }; #[tokio::test] async fn max_capacity_zero() { @@ -2516,6 +2521,323 @@ mod tests { verify_notification_vec(&cache, actual, &expected); } + #[tokio::test] + async fn time_to_live_by_expiry_type() { + // The following `Vec`s will hold actual and expected notifications. + let actual = Arc::new(Mutex::new(Vec::new())); + let mut expected = Vec::new(); + + // Define an expiry type. + struct MyExpiry { + counters: Arc, + } + + impl MyExpiry { + fn new(counters: Arc) -> Self { + Self { counters } + } + } + + impl Expiry<&str, &str> for MyExpiry { + fn expire_after_create( + &self, + _key: &&str, + _value: &&str, + _current_time: StdInstant, + ) -> Option { + self.counters.incl_actual_creations(); + Some(Duration::from_secs(10)) + } + + fn expire_after_update( + &self, + _key: &&str, + _value: &&str, + _current_time: StdInstant, + _current_duration: Option, + ) -> Option { + self.counters.incl_actual_updates(); + Some(Duration::from_secs(10)) + } + } + + // Create an eviction listener. + let a1 = Arc::clone(&actual); + let listener = move |k, v, cause| a1.lock().push((k, v, cause)); + + // Create expiry counters and the expiry. + let expiry_counters = Arc::new(ExpiryCallCounters::default()); + let expiry = MyExpiry::new(Arc::clone(&expiry_counters)); + + // Create a cache with the expiry and eviction listener. + let mut cache = Cache::builder() + .max_capacity(100) + .expire_after(expiry) + .eviction_listener_with_queued_delivery_mode(listener) + .build(); + cache.reconfigure_for_testing(); + + let (clock, mock) = Clock::mock(); + cache.set_expiration_clock(Some(clock)); + + // Make the cache exterior immutable. + let cache = cache; + + cache.insert("a", "alice").await; + expiry_counters.incl_expected_creations(); + cache.sync(); + + mock.increment(Duration::from_secs(5)); // 5 secs from the start. + cache.sync(); + + assert_eq!(cache.get(&"a"), Some("alice")); + assert!(cache.contains_key(&"a")); + + mock.increment(Duration::from_secs(5)); // 10 secs. + expected.push((Arc::new("a"), "alice", RemovalCause::Expired)); + assert_eq!(cache.get(&"a"), None); + assert!(!cache.contains_key(&"a")); + + assert_eq!(cache.iter().count(), 0); + + cache.sync(); + assert!(cache.is_table_empty()); + + cache.insert("b", "bob").await; + expiry_counters.incl_expected_creations(); + cache.sync(); + + assert_eq!(cache.entry_count(), 1); + + mock.increment(Duration::from_secs(5)); // 15 secs. + cache.sync(); + + assert_eq!(cache.get(&"b"), Some("bob")); + assert!(cache.contains_key(&"b")); + assert_eq!(cache.entry_count(), 1); + + cache.insert("b", "bill").await; + expected.push((Arc::new("b"), "bob", RemovalCause::Replaced)); + expiry_counters.incl_expected_updates(); + cache.sync(); + + mock.increment(Duration::from_secs(5)); // 20 secs + cache.sync(); + + assert_eq!(cache.get(&"b"), Some("bill")); + assert!(cache.contains_key(&"b")); + assert_eq!(cache.entry_count(), 1); + + mock.increment(Duration::from_secs(5)); // 25 secs + expected.push((Arc::new("b"), "bill", RemovalCause::Expired)); + + assert_eq!(cache.get(&"a"), None); + assert_eq!(cache.get(&"b"), None); + assert!(!cache.contains_key(&"a")); + assert!(!cache.contains_key(&"b")); + + assert_eq!(cache.iter().count(), 0); + + cache.sync(); + assert!(cache.is_table_empty()); + + expiry_counters.verify(); + verify_notification_vec(&cache, actual, &expected); + } + + #[tokio::test] + async fn time_to_idle_by_expiry_type() { + // The following `Vec`s will hold actual and expected notifications. + let actual = Arc::new(Mutex::new(Vec::new())); + let mut expected = Vec::new(); + + // Define an expiry type. + struct MyExpiry { + counters: Arc, + } + + impl MyExpiry { + fn new(counters: Arc) -> Self { + Self { counters } + } + } + + impl Expiry<&str, &str> for MyExpiry { + fn expire_after_read( + &self, + _key: &&str, + _value: &&str, + _current_time: StdInstant, + _current_duration: Option, + _last_modified_at: StdInstant, + ) -> Option { + self.counters.incl_actual_reads(); + Some(Duration::from_secs(10)) + } + } + + // Create an eviction listener. + let a1 = Arc::clone(&actual); + let listener = move |k, v, cause| a1.lock().push((k, v, cause)); + + // Create expiry counters and the expiry. + let expiry_counters = Arc::new(ExpiryCallCounters::default()); + let expiry = MyExpiry::new(Arc::clone(&expiry_counters)); + + // Create a cache with the expiry and eviction listener. + let mut cache = Cache::builder() + .max_capacity(100) + .expire_after(expiry) + .eviction_listener_with_queued_delivery_mode(listener) + .build(); + cache.reconfigure_for_testing(); + + let (clock, mock) = Clock::mock(); + cache.set_expiration_clock(Some(clock)); + + // Make the cache exterior immutable. + let cache = cache; + + cache.insert("a", "alice").await; + cache.sync(); + + mock.increment(Duration::from_secs(5)); // 5 secs from the start. + cache.sync(); + + assert_eq!(cache.get(&"a"), Some("alice")); + expiry_counters.incl_expected_reads(); + + mock.increment(Duration::from_secs(5)); // 10 secs. + cache.sync(); + + cache.insert("b", "bob").await; + cache.sync(); + + assert_eq!(cache.entry_count(), 2); + + mock.increment(Duration::from_secs(2)); // 12 secs. + cache.sync(); + + // contains_key does not reset the idle timer for the key. + assert!(cache.contains_key(&"a")); + assert!(cache.contains_key(&"b")); + cache.sync(); + + assert_eq!(cache.entry_count(), 2); + + mock.increment(Duration::from_secs(3)); // 15 secs. + expected.push((Arc::new("a"), "alice", RemovalCause::Expired)); + + assert_eq!(cache.get(&"a"), None); + assert_eq!(cache.get(&"b"), Some("bob")); + expiry_counters.incl_expected_reads(); + assert!(!cache.contains_key(&"a")); + assert!(cache.contains_key(&"b")); + + assert_eq!(cache.iter().count(), 1); + + cache.sync(); + assert_eq!(cache.entry_count(), 1); + + mock.increment(Duration::from_secs(10)); // 25 secs + expected.push((Arc::new("b"), "bob", RemovalCause::Expired)); + + assert_eq!(cache.get(&"a"), None); + assert_eq!(cache.get(&"b"), None); + assert!(!cache.contains_key(&"a")); + assert!(!cache.contains_key(&"b")); + + assert_eq!(cache.iter().count(), 0); + + cache.sync(); + assert!(cache.is_table_empty()); + + expiry_counters.verify(); + verify_notification_vec(&cache, actual, &expected); + } + + /// Verify that the `Expiry::expire_after_read()` method is called in `get_with` + /// only when the key was already present in the cache. + #[tokio::test] + async fn test_expiry_using_get_with() { + // Define an expiry type, which always return `None`. + struct NoExpiry { + counters: Arc, + } + + impl NoExpiry { + fn new(counters: Arc) -> Self { + Self { counters } + } + } + + impl Expiry<&str, &str> for NoExpiry { + fn expire_after_create( + &self, + _key: &&str, + _value: &&str, + _current_time: StdInstant, + ) -> Option { + self.counters.incl_actual_creations(); + None + } + + fn expire_after_read( + &self, + _key: &&str, + _value: &&str, + _current_time: StdInstant, + _current_duration: Option, + _last_modified_at: StdInstant, + ) -> Option { + self.counters.incl_actual_reads(); + None + } + + fn expire_after_update( + &self, + _key: &&str, + _value: &&str, + _current_time: StdInstant, + _current_duration: Option, + ) -> Option { + unreachable!("The `expire_after_update()` method should not be called."); + } + } + + // Create expiry counters and the expiry. + let expiry_counters = Arc::new(ExpiryCallCounters::default()); + let expiry = NoExpiry::new(Arc::clone(&expiry_counters)); + + // Create a cache with the expiry and eviction listener. + let mut cache = Cache::builder() + .max_capacity(100) + .expire_after(expiry) + .build(); + cache.reconfigure_for_testing(); + + // Make the cache exterior immutable. + let cache = cache; + + // The key is not present. + cache.get_with("a", async { "alice" }).await; + expiry_counters.incl_expected_creations(); + cache.sync(); + + // The key is present. + cache.get_with("a", async { "alex" }).await; + expiry_counters.incl_expected_reads(); + cache.sync(); + + // The key is not present. + cache.invalidate("a").await; + cache.get_with("a", async { "amanda" }).await; + expiry_counters.incl_expected_creations(); + cache.sync(); + + expiry_counters.verify(); + } + #[tokio::test] async fn test_iter() { const NUM_KEYS: usize = 50; diff --git a/src/future/entry_selector.rs b/src/future/entry_selector.rs index 1257c153..9616e792 100644 --- a/src/future/entry_selector.rs +++ b/src/future/entry_selector.rs @@ -52,7 +52,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.10", features = ["future"] } + /// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// /// use moka::future::Cache; @@ -94,7 +94,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.10", features = ["future"] } + /// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// /// use moka::future::Cache; @@ -135,7 +135,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.10", features = ["future"] } + /// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// /// use moka::future::Cache; @@ -218,7 +218,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.10", features = ["future"] } + /// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// /// use moka::future::Cache; @@ -293,7 +293,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.10", features = ["future"] } + /// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// /// use moka::future::Cache; @@ -403,7 +403,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.10", features = ["future"] } + /// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// /// use moka::future::Cache; @@ -444,7 +444,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.10", features = ["future"] } + /// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// /// use moka::future::Cache; @@ -484,7 +484,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.10", features = ["future"] } + /// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// /// use moka::future::Cache; @@ -571,7 +571,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.10", features = ["future"] } + /// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// /// use moka::future::Cache; @@ -645,7 +645,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.10", features = ["future"] } + /// // moka = { version = "0.11", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// /// use moka::future::Cache; diff --git a/src/lib.rs b/src/lib.rs index 79d67eb6..4f1ef642 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -188,7 +188,7 @@ pub use common::error::PredicateError; #[cfg(any(feature = "sync", feature = "future"))] pub use common::entry::Entry; -pub use policy::Policy; +pub use policy::{Expiry, Policy}; #[cfg(feature = "dash")] compile_error!( diff --git a/src/policy.rs b/src/policy.rs index 2c2212e2..fba73f7e 100644 --- a/src/policy.rs +++ b/src/policy.rs @@ -1,4 +1,7 @@ -use std::time::Duration; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; #[derive(Clone, Debug)] /// The policy of a cache. @@ -54,3 +57,256 @@ impl Policy { self.time_to_idle } } + +/// Calculates when cache entries expire. A single expiration time is retained on +/// each entry so that the lifetime of an entry may be extended or reduced by +/// subsequent evaluations. +/// +/// `Expiry` trait provides three methods. They specify the expiration time of an +/// entry by returning a `Some(duration)` until the entry expires: +/// +/// - [`expire_after_create`](#method.expire_after_create) — Returns the +/// duration (or none) after the entry's creation. +/// - [`expire_after_read`](#method.expire_after_read) — Returns the duration +/// (or none) after its last read. +/// - [`expire_after_update`](#method.expire_after_update) — Returns the +/// duration (or none) after its last update. +/// +/// The default implementations are provided that return `None` (no expiration) or +/// `current_duration: Option` (not modify the current expiration time). +/// Override some of them as you need. +/// +pub trait Expiry { + /// Specifies that the entry should be automatically removed from the cache once + /// the duration has elapsed after the entry's creation. This method is called + /// for cache write methods such as `insert` and `get_with` but only when the key + /// is not present in the cache. + /// + /// # Parameters + /// + /// - `key` — A reference to the key of the entry. + /// - `value` — A reference to the value of the entry. + /// - `current_time` — The current instant. + /// + /// # Returning `None` + /// + /// - Returning `None` indicates no expiration for the entry. + /// - The default implementation returns `None`. + /// + /// # Notes on `time_to_live` and `time_to_idle` policies + /// + /// When the cache is configured with `time_to_live` and/or `time_to_idle` + /// policies, the entry will be evicted after the earliest of the expiration time + /// returned by this expiry, the `time_to_live` and `time_to_idle` policies. + #[allow(unused_variables)] + fn expire_after_create(&self, key: &K, value: &V, current_time: Instant) -> Option { + None + } + + /// Specifies that the entry should be automatically removed from the cache once + /// the duration has elapsed after its last read. This method is called for cache + /// read methods such as `get` and `get_with` but only when the key is present + /// in the cache. + /// + /// # Parameters + /// + /// - `key` — A reference to the key of the entry. + /// - `value` — A reference to the value of the entry. + /// - `current_time` — The current instant. + /// - `current_duration` — The remaining duration until the entry expires. + /// - `last_modified_at` — The instant when the entry was created or + /// updated. + /// + /// # Returning `None` or `current_duration` + /// + /// - Returning `None` indicates no expiration for the entry. + /// - Returning `current_duration` will not modify the expiration time. + /// - The default implementation returns `current_duration` (not modify the + /// expiration time) + /// + /// # Notes on `time_to_live` and `time_to_idle` policies + /// + /// When the cache is configured with `time_to_live` and/or `time_to_idle` + /// policies, then: + /// + /// - The entry will be evicted after the earliest of the expiration time + /// returned by this expiry, the `time_to_live` and `time_to_idle` policies. + /// - The `current_duration` takes in account the `time_to_live` and + /// `time_to_idle` policies. + #[allow(unused_variables)] + fn expire_after_read( + &self, + key: &K, + value: &V, + current_time: Instant, + current_duration: Option, + last_modified_at: Instant, + ) -> Option { + current_duration + } + + /// Specifies that the entry should be automatically removed from the cache once + /// the duration has elapsed after the replacement of its value. This method is + /// called for cache write methods such as `insert` but only when the key is + /// already present in the cache. + /// + /// # Parameters + /// + /// - `key` — A reference to the key of the entry. + /// - `value` — A reference to the value of the entry. + /// - `current_time` — The current instant. + /// - `current_duration` — The remaining duration until the entry expires. + /// + /// # Returning `None` or `current_duration` + /// + /// - Returning `None` indicates no expiration for the entry. + /// - Returning `current_duration` will not modify the expiration time. + /// - The default implementation returns `current_duration` (not modify the + /// expiration time) + /// + /// # Notes on `time_to_live` and `time_to_idle` policies + /// + /// When the cache is configured with `time_to_live` and/or `time_to_idle` + /// policies, then: + /// + /// - The entry will be evicted after the earliest of the expiration time + /// returned by this expiry, the `time_to_live` and `time_to_idle` policies. + /// - The `current_duration` takes in account the `time_to_live` and + /// `time_to_idle` policies. + #[allow(unused_variables)] + fn expire_after_update( + &self, + key: &K, + value: &V, + current_time: Instant, + current_duration: Option, + ) -> Option { + current_duration + } +} + +pub(crate) struct ExpirationPolicy { + time_to_live: Option, + time_to_idle: Option, + expiry: Option + Send + Sync + 'static>>, +} + +impl Default for ExpirationPolicy { + fn default() -> Self { + Self { + time_to_live: None, + time_to_idle: None, + expiry: None, + } + } +} + +impl Clone for ExpirationPolicy { + fn clone(&self) -> Self { + Self { + time_to_live: self.time_to_live, + time_to_idle: self.time_to_idle, + expiry: self.expiry.as_ref().map(Arc::clone), + } + } +} + +impl ExpirationPolicy { + #[cfg(test)] + pub(crate) fn new( + time_to_live: Option, + time_to_idle: Option, + expiry: Option + Send + Sync + 'static>>, + ) -> Self { + Self { + time_to_live, + time_to_idle, + expiry, + } + } + + /// Returns the `time_to_live` of the cache. + pub(crate) fn time_to_live(&self) -> Option { + self.time_to_live + } + + pub(crate) fn set_time_to_live(&mut self, duration: Duration) { + self.time_to_live = Some(duration); + } + + /// Returns the `time_to_idle` of the cache. + pub(crate) fn time_to_idle(&self) -> Option { + self.time_to_idle + } + + pub(crate) fn set_time_to_idle(&mut self, duration: Duration) { + self.time_to_idle = Some(duration); + } + + pub(crate) fn expiry(&self) -> Option + Send + Sync + 'static>> { + self.expiry.as_ref().map(Arc::clone) + } + + pub(crate) fn set_expiry(&mut self, expiry: Arc + Send + Sync + 'static>) { + self.expiry = Some(expiry); + } +} + +#[cfg(test)] +pub(crate) mod test_utils { + use std::sync::atomic::{AtomicU8, Ordering}; + + #[derive(Default)] + pub(crate) struct ExpiryCallCounters { + expected_creations: AtomicU8, + expected_reads: AtomicU8, + expected_updates: AtomicU8, + actual_creations: AtomicU8, + actual_reads: AtomicU8, + actual_updates: AtomicU8, + } + + impl ExpiryCallCounters { + pub(crate) fn incl_expected_creations(&self) { + self.expected_creations.fetch_add(1, Ordering::Relaxed); + } + + pub(crate) fn incl_expected_reads(&self) { + self.expected_reads.fetch_add(1, Ordering::Relaxed); + } + + pub(crate) fn incl_expected_updates(&self) { + self.expected_updates.fetch_add(1, Ordering::Relaxed); + } + + pub(crate) fn incl_actual_creations(&self) { + self.actual_creations.fetch_add(1, Ordering::Relaxed); + } + + pub(crate) fn incl_actual_reads(&self) { + self.actual_reads.fetch_add(1, Ordering::Relaxed); + } + + pub(crate) fn incl_actual_updates(&self) { + self.actual_updates.fetch_add(1, Ordering::Relaxed); + } + + pub(crate) fn verify(&self) { + assert_eq!( + self.expected_creations.load(Ordering::Relaxed), + self.actual_creations.load(Ordering::Relaxed), + "expected_creations != actual_creations" + ); + assert_eq!( + self.expected_reads.load(Ordering::Relaxed), + self.actual_reads.load(Ordering::Relaxed), + "expected_reads != actual_reads" + ); + assert_eq!( + self.expected_updates.load(Ordering::Relaxed), + self.actual_updates.load(Ordering::Relaxed), + "expected_updates != actual_updates" + ); + } + } +} diff --git a/src/sync/builder.rs b/src/sync/builder.rs index d26687f4..aba6d740 100644 --- a/src/sync/builder.rs +++ b/src/sync/builder.rs @@ -2,6 +2,8 @@ use super::{Cache, SegmentedCache}; use crate::{ common::{builder_utils, concurrent::Weigher}, notification::{self, EvictionListener, RemovalCause}, + policy::ExpirationPolicy, + Expiry, }; use std::{ @@ -53,8 +55,7 @@ pub struct CacheBuilder { weigher: Option>, eviction_listener: Option>, eviction_listener_conf: Option, - time_to_live: Option, - time_to_idle: Option, + expiration_policy: ExpirationPolicy, invalidator_enabled: bool, thread_pool_enabled: bool, cache_type: PhantomData, @@ -74,10 +75,9 @@ where weigher: None, eviction_listener: None, eviction_listener_conf: None, - time_to_live: None, - time_to_idle: None, + expiration_policy: Default::default(), invalidator_enabled: false, - // TODO: Change this to `false` in Moka 0.10.0. + // TODO: Change this to `false` in Moka v0.12.0 or v0.13.0. thread_pool_enabled: true, cache_type: Default::default(), } @@ -117,8 +117,7 @@ where weigher: self.weigher, eviction_listener: self.eviction_listener, eviction_listener_conf: self.eviction_listener_conf, - time_to_live: self.time_to_live, - time_to_idle: self.time_to_idle, + expiration_policy: self.expiration_policy, invalidator_enabled: self.invalidator_enabled, thread_pool_enabled: self.thread_pool_enabled, cache_type: PhantomData::default(), @@ -137,7 +136,8 @@ where /// expiration. pub fn build(self) -> Cache { let build_hasher = RandomState::default(); - builder_utils::ensure_expirations_or_panic(self.time_to_live, self.time_to_idle); + let exp = &self.expiration_policy; + builder_utils::ensure_expirations_or_panic(exp.time_to_live(), exp.time_to_idle()); Cache::with_everything( self.name, self.max_capacity, @@ -146,8 +146,7 @@ where self.weigher, self.eviction_listener, self.eviction_listener_conf, - self.time_to_live, - self.time_to_idle, + self.expiration_policy, self.invalidator_enabled, builder_utils::housekeeper_conf(self.thread_pool_enabled), ) @@ -225,7 +224,8 @@ where where S: BuildHasher + Clone + Send + Sync + 'static, { - builder_utils::ensure_expirations_or_panic(self.time_to_live, self.time_to_idle); + let exp = &self.expiration_policy; + builder_utils::ensure_expirations_or_panic(exp.time_to_live(), exp.time_to_idle()); Cache::with_everything( self.name, self.max_capacity, @@ -234,8 +234,7 @@ where self.weigher, self.eviction_listener, self.eviction_listener_conf, - self.time_to_live, - self.time_to_idle, + self.expiration_policy, self.invalidator_enabled, builder_utils::housekeeper_conf(self.thread_pool_enabled), ) @@ -259,7 +258,8 @@ where /// expiration. pub fn build(self) -> SegmentedCache { let build_hasher = RandomState::default(); - builder_utils::ensure_expirations_or_panic(self.time_to_live, self.time_to_idle); + let exp = &self.expiration_policy; + builder_utils::ensure_expirations_or_panic(exp.time_to_live(), exp.time_to_idle()); SegmentedCache::with_everything( self.name, self.max_capacity, @@ -269,8 +269,7 @@ where self.weigher, self.eviction_listener, self.eviction_listener_conf, - self.time_to_live, - self.time_to_idle, + self.expiration_policy, self.invalidator_enabled, builder_utils::housekeeper_conf(self.thread_pool_enabled), ) @@ -349,7 +348,8 @@ where where S: BuildHasher + Clone + Send + Sync + 'static, { - builder_utils::ensure_expirations_or_panic(self.time_to_live, self.time_to_idle); + let exp = &self.expiration_policy; + builder_utils::ensure_expirations_or_panic(exp.time_to_live(), exp.time_to_idle()); SegmentedCache::with_everything( self.name, self.max_capacity, @@ -359,8 +359,7 @@ where self.weigher, self.eviction_listener, self.eviction_listener_conf, - self.time_to_live, - self.time_to_idle, + self.expiration_policy, self.invalidator_enabled, builder_utils::housekeeper_conf(true), ) @@ -469,10 +468,9 @@ impl CacheBuilder { /// than 1000 years. This is done to protect against overflow when computing key /// expiration. pub fn time_to_live(self, duration: Duration) -> Self { - Self { - time_to_live: Some(duration), - ..self - } + let mut builder = self; + builder.expiration_policy.set_time_to_live(duration); + builder } /// Sets the time to idle of the cache. @@ -486,10 +484,15 @@ impl CacheBuilder { /// than 1000 years. This is done to protect against overflow when computing key /// expiration. pub fn time_to_idle(self, duration: Duration) -> Self { - Self { - time_to_idle: Some(duration), - ..self - } + let mut builder = self; + builder.expiration_policy.set_time_to_idle(duration); + builder + } + + pub fn expire_after(self, expiry: impl Expiry + Send + Sync + 'static) -> Self { + let mut builder = self; + builder.expiration_policy.set_expiry(Arc::new(expiry)); + builder } /// Enables support for [Cache::invalidate_entries_if][cache-invalidate-if] @@ -514,7 +517,7 @@ impl CacheBuilder { /// necessary. /// /// NOTE: The default value will be changed to `false` in a future release - /// (v0.10.0 or v0.11.0). + /// (v0.12.0 or v0.13.0). pub fn thread_pool_enabled(self, v: bool) -> Self { Self { thread_pool_enabled: v, diff --git a/src/sync/cache.rs b/src/sync/cache.rs index 3a423b25..af29eb21 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -12,6 +12,7 @@ use crate::{ time::Instant, }, notification::{self, EvictionListener}, + policy::ExpirationPolicy, sync::{Iter, PredicateId}, sync_base::{ base_cache::{BaseCache, HouseKeeperArc}, @@ -814,8 +815,7 @@ where None, None, None, - None, - None, + Default::default(), false, housekeeper_conf, ) @@ -846,8 +846,7 @@ where weigher: Option>, eviction_listener: Option>, eviction_listener_conf: Option, - time_to_live: Option, - time_to_idle: Option, + expiration_policy: ExpirationPolicy, invalidator_enabled: bool, housekeeper_conf: housekeeper::Configuration, ) -> Self { @@ -860,8 +859,7 @@ where weigher, eviction_listener, eviction_listener_conf, - time_to_live, - time_to_idle, + expiration_policy, invalidator_enabled, housekeeper_conf, ), @@ -1104,7 +1102,7 @@ where need_key: bool, ) -> Entry { self.base - .get_with_hash_but_ignore_if(&key, hash, replace_if.as_mut(), need_key) + .get_with_hash_and_ignore_if(&key, hash, replace_if.as_mut(), need_key) .unwrap_or_else(|| self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key)) } @@ -1126,7 +1124,7 @@ where Q: ToOwned + Hash + Eq + ?Sized, { self.base - .get_with_hash_but_ignore_if(key, hash, replace_if.as_mut(), need_key) + .get_with_hash_and_ignore_if(key, hash, replace_if.as_mut(), need_key) .unwrap_or_else(|| { let key = Arc::new(key.to_owned()); self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key) @@ -1143,7 +1141,7 @@ where ) -> Entry { let get = || { self.base - .get_with_hash_but_no_recording(&key, hash, replace_if.as_mut()) + .get_with_hash_without_recording(&key, hash, replace_if.as_mut()) }; let insert = |v| self.insert_with_hash(key.clone(), hash, v); @@ -1368,7 +1366,7 @@ where let get = || { let ignore_if = None as Option<&mut fn(&V) -> bool>; self.base - .get_with_hash_but_no_recording(&key, hash, ignore_if) + .get_with_hash_without_recording(&key, hash, ignore_if) }; let insert = |v| self.insert_with_hash(key.clone(), hash, v); @@ -1563,7 +1561,7 @@ where let get = || { let ignore_if = None as Option<&mut fn(&V) -> bool>; self.base - .get_with_hash_but_no_recording(&key, hash, ignore_if) + .get_with_hash_without_recording(&key, hash, ignore_if) }; let insert = |v| self.insert_with_hash(key.clone(), hash, v); @@ -1911,10 +1909,16 @@ mod tests { macros::{assert_eq_with_mode, assert_with_mode}, DeliveryMode, RemovalCause, }, + policy::test_utils::ExpiryCallCounters, + Expiry, }; use parking_lot::Mutex; - use std::{convert::Infallible, sync::Arc, time::Duration}; + use std::{ + convert::Infallible, + sync::Arc, + time::{Duration, Instant as StdInstant}, + }; #[test] fn max_capacity_zero() { @@ -2527,6 +2531,339 @@ mod tests { } } + #[test] + fn time_to_live_by_expiry_type() { + run_test(DeliveryMode::Immediate); + run_test(DeliveryMode::Queued); + + // Define an expiry type. + struct MyExpiry { + counters: Arc, + } + + impl MyExpiry { + fn new(counters: Arc) -> Self { + Self { counters } + } + } + + impl Expiry<&str, &str> for MyExpiry { + fn expire_after_create( + &self, + _key: &&str, + _value: &&str, + _current_time: StdInstant, + ) -> Option { + self.counters.incl_actual_creations(); + Some(Duration::from_secs(10)) + } + + fn expire_after_update( + &self, + _key: &&str, + _value: &&str, + _current_time: StdInstant, + _current_duration: Option, + ) -> Option { + self.counters.incl_actual_updates(); + Some(Duration::from_secs(10)) + } + } + + fn run_test(delivery_mode: DeliveryMode) { + // The following `Vec`s will hold actual and expected notifications. + let actual = Arc::new(Mutex::new(Vec::new())); + let mut expected = Vec::new(); + + // Create expiry counters and the expiry. + let expiry_counters = Arc::new(ExpiryCallCounters::default()); + let expiry = MyExpiry::new(Arc::clone(&expiry_counters)); + + // Create an eviction listener. + let a1 = Arc::clone(&actual); + let listener = move |k, v, cause| a1.lock().push((k, v, cause)); + let listener_conf = notification::Configuration::builder() + .delivery_mode(delivery_mode) + .build(); + + // Create a cache with the eviction listener. + let mut cache = Cache::builder() + .max_capacity(100) + .expire_after(expiry) + .eviction_listener_with_conf(listener, listener_conf) + .build(); + cache.reconfigure_for_testing(); + + let (clock, mock) = Clock::mock(); + cache.set_expiration_clock(Some(clock)); + + // Make the cache exterior immutable. + let cache = cache; + + cache.insert("a", "alice"); + expiry_counters.incl_expected_creations(); + cache.sync(); + + mock.increment(Duration::from_secs(5)); // 5 secs from the start. + cache.sync(); + + assert_eq_with_mode!(cache.get(&"a"), Some("alice"), delivery_mode); + assert_with_mode!(cache.contains_key(&"a"), delivery_mode); + + mock.increment(Duration::from_secs(5)); // 10 secs. + expected.push((Arc::new("a"), "alice", RemovalCause::Expired)); + assert_eq_with_mode!(cache.get(&"a"), None, delivery_mode); + assert_with_mode!(!cache.contains_key(&"a"), delivery_mode); + + assert_eq_with_mode!(cache.iter().count(), 0, delivery_mode); + + cache.sync(); + assert_with_mode!(cache.is_table_empty(), delivery_mode); + + cache.insert("b", "bob"); + expiry_counters.incl_expected_creations(); + cache.sync(); + + assert_eq_with_mode!(cache.entry_count(), 1, delivery_mode); + + mock.increment(Duration::from_secs(5)); // 15 secs. + cache.sync(); + + assert_eq_with_mode!(cache.get(&"b"), Some("bob"), delivery_mode); + assert_with_mode!(cache.contains_key(&"b"), delivery_mode); + assert_eq_with_mode!(cache.entry_count(), 1, delivery_mode); + + cache.insert("b", "bill"); + expected.push((Arc::new("b"), "bob", RemovalCause::Replaced)); + expiry_counters.incl_expected_updates(); + cache.sync(); + + mock.increment(Duration::from_secs(5)); // 20 secs + cache.sync(); + + assert_eq_with_mode!(cache.get(&"b"), Some("bill"), delivery_mode); + assert_with_mode!(cache.contains_key(&"b"), delivery_mode); + assert_eq_with_mode!(cache.entry_count(), 1, delivery_mode); + + mock.increment(Duration::from_secs(5)); // 25 secs + expected.push((Arc::new("b"), "bill", RemovalCause::Expired)); + + assert_eq_with_mode!(cache.get(&"a"), None, delivery_mode); + assert_eq_with_mode!(cache.get(&"b"), None, delivery_mode); + assert_with_mode!(!cache.contains_key(&"a"), delivery_mode); + assert_with_mode!(!cache.contains_key(&"b"), delivery_mode); + + assert_eq_with_mode!(cache.iter().count(), 0, delivery_mode); + + cache.sync(); + assert_with_mode!(cache.is_table_empty(), delivery_mode); + + expiry_counters.verify(); + verify_notification_vec(&cache, actual, &expected, delivery_mode); + } + } + + #[test] + fn time_to_idle_by_expiry_type() { + run_test(DeliveryMode::Immediate); + run_test(DeliveryMode::Queued); + + // Define an expiry type. + struct MyExpiry { + counters: Arc, + } + + impl MyExpiry { + fn new(counters: Arc) -> Self { + Self { counters } + } + } + + impl Expiry<&str, &str> for MyExpiry { + fn expire_after_read( + &self, + _key: &&str, + _value: &&str, + _current_time: StdInstant, + _current_duration: Option, + _last_modified_at: StdInstant, + ) -> Option { + self.counters.incl_actual_reads(); + Some(Duration::from_secs(10)) + } + } + + fn run_test(delivery_mode: DeliveryMode) { + // The following `Vec`s will hold actual and expected notifications. + let actual = Arc::new(Mutex::new(Vec::new())); + let mut expected = Vec::new(); + + // Create expiry counters and the expiry. + let expiry_counters = Arc::new(ExpiryCallCounters::default()); + let expiry = MyExpiry::new(Arc::clone(&expiry_counters)); + + // Create an eviction listener. + let a1 = Arc::clone(&actual); + let listener = move |k, v, cause| a1.lock().push((k, v, cause)); + let listener_conf = notification::Configuration::builder() + .delivery_mode(delivery_mode) + .build(); + + // Create a cache with the eviction listener. + let mut cache = Cache::builder() + .max_capacity(100) + .expire_after(expiry) + .eviction_listener_with_conf(listener, listener_conf) + .build(); + cache.reconfigure_for_testing(); + + let (clock, mock) = Clock::mock(); + cache.set_expiration_clock(Some(clock)); + + // Make the cache exterior immutable. + let cache = cache; + + cache.insert("a", "alice"); + cache.sync(); + + mock.increment(Duration::from_secs(5)); // 5 secs from the start. + cache.sync(); + + assert_eq_with_mode!(cache.get(&"a"), Some("alice"), delivery_mode); + expiry_counters.incl_expected_reads(); + + mock.increment(Duration::from_secs(5)); // 10 secs. + cache.sync(); + + cache.insert("b", "bob"); + cache.sync(); + + assert_eq_with_mode!(cache.entry_count(), 2, delivery_mode); + + mock.increment(Duration::from_secs(2)); // 12 secs. + cache.sync(); + + // contains_key does not reset the idle timer for the key. + assert_with_mode!(cache.contains_key(&"a"), delivery_mode); + assert_with_mode!(cache.contains_key(&"b"), delivery_mode); + cache.sync(); + + assert_eq_with_mode!(cache.entry_count(), 2, delivery_mode); + + mock.increment(Duration::from_secs(3)); // 15 secs. + expected.push((Arc::new("a"), "alice", RemovalCause::Expired)); + + assert_eq_with_mode!(cache.get(&"a"), None, delivery_mode); + assert_eq_with_mode!(cache.get(&"b"), Some("bob"), delivery_mode); + expiry_counters.incl_expected_reads(); + assert_with_mode!(!cache.contains_key(&"a"), delivery_mode); + assert_with_mode!(cache.contains_key(&"b"), delivery_mode); + + assert_eq_with_mode!(cache.iter().count(), 1, delivery_mode); + + cache.sync(); + assert_eq_with_mode!(cache.entry_count(), 1, delivery_mode); + + mock.increment(Duration::from_secs(10)); // 25 secs + expected.push((Arc::new("b"), "bob", RemovalCause::Expired)); + + assert_eq_with_mode!(cache.get(&"a"), None, delivery_mode); + assert_eq_with_mode!(cache.get(&"b"), None, delivery_mode); + assert_with_mode!(!cache.contains_key(&"a"), delivery_mode); + assert_with_mode!(!cache.contains_key(&"b"), delivery_mode); + + assert_eq_with_mode!(cache.iter().count(), 0, delivery_mode); + + cache.sync(); + assert_with_mode!(cache.is_table_empty(), delivery_mode); + + expiry_counters.verify(); + verify_notification_vec(&cache, actual, &expected, delivery_mode); + } + } + + /// Verify that the `Expiry::expire_after_read()` method is called in `get_with` + /// only when the key was already present in the cache. + #[test] + fn test_expiry_using_get_with() { + // Define an expiry type, which always return `None`. + struct NoExpiry { + counters: Arc, + } + + impl NoExpiry { + fn new(counters: Arc) -> Self { + Self { counters } + } + } + + impl Expiry<&str, &str> for NoExpiry { + fn expire_after_create( + &self, + _key: &&str, + _value: &&str, + _current_time: StdInstant, + ) -> Option { + self.counters.incl_actual_creations(); + None + } + + fn expire_after_read( + &self, + _key: &&str, + _value: &&str, + _current_time: StdInstant, + _current_duration: Option, + _last_modified_at: StdInstant, + ) -> Option { + self.counters.incl_actual_reads(); + None + } + + fn expire_after_update( + &self, + _key: &&str, + _value: &&str, + _current_time: StdInstant, + _current_duration: Option, + ) -> Option { + unreachable!("The `expire_after_update()` method should not be called."); + } + } + + // Create expiry counters and the expiry. + let expiry_counters = Arc::new(ExpiryCallCounters::default()); + let expiry = NoExpiry::new(Arc::clone(&expiry_counters)); + + // Create a cache with the expiry and eviction listener. + let mut cache = Cache::builder() + .max_capacity(100) + .expire_after(expiry) + .build(); + cache.reconfigure_for_testing(); + + // Make the cache exterior immutable. + let cache = cache; + + // The key is not present. + cache.get_with("a", || "alice"); + expiry_counters.incl_expected_creations(); + cache.sync(); + + // The key is present. + cache.get_with("a", || "alex"); + expiry_counters.incl_expected_reads(); + cache.sync(); + + // The key is not present. + cache.invalidate("a"); + cache.get_with("a", || "amanda"); + expiry_counters.incl_expected_creations(); + cache.sync(); + + expiry_counters.verify(); + } + #[test] fn test_iter() { const NUM_KEYS: usize = 50; diff --git a/src/sync/segment.rs b/src/sync/segment.rs index c1eabe04..0009f6fa 100644 --- a/src/sync/segment.rs +++ b/src/sync/segment.rs @@ -4,6 +4,7 @@ use super::{ use crate::{ common::concurrent::{housekeeper, Weigher}, notification::{self, EvictionListener}, + policy::ExpirationPolicy, sync_base::iter::{Iter, ScanningGet}, Entry, Policy, PredicateError, }; @@ -14,7 +15,6 @@ use std::{ fmt, hash::{BuildHasher, Hash, Hasher}, sync::Arc, - time::Duration, }; /// A thread-safe concurrent in-memory cache, with multiple internal segments. @@ -106,8 +106,7 @@ where None, None, None, - None, - None, + Default::default(), false, housekeeper::Configuration::new_thread_pool(true), ) @@ -216,8 +215,7 @@ where weigher: Option>, eviction_listener: Option>, eviction_listener_conf: Option, - time_to_live: Option, - time_to_idle: Option, + expiration_policy: ExpirationPolicy, invalidator_enabled: bool, housekeeper_conf: housekeeper::Configuration, ) -> Self { @@ -231,8 +229,7 @@ where weigher, eviction_listener, eviction_listener_conf, - time_to_live, - time_to_idle, + expiration_policy, invalidator_enabled, housekeeper_conf, )), @@ -678,7 +675,7 @@ struct MockExpirationClock { #[cfg(test)] impl MockExpirationClock { - fn increment(&mut self, duration: Duration) { + fn increment(&mut self, duration: std::time::Duration) { for mock in &mut self.mocks { mock.increment(duration); } @@ -711,8 +708,7 @@ where weigher: Option>, eviction_listener: Option>, eviction_listener_conf: Option, - time_to_live: Option, - time_to_idle: Option, + expiration_policy: ExpirationPolicy, invalidator_enabled: bool, housekeeper_conf: housekeeper::Configuration, ) -> Self { @@ -736,8 +732,7 @@ where weigher.as_ref().map(Arc::clone), eviction_listener.as_ref().map(Arc::clone), eviction_listener_conf.clone(), - time_to_live, - time_to_idle, + expiration_policy.clone(), invalidator_enabled, housekeeper_conf.clone(), ) diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index 3eedbdc8..8efbbafd 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -17,12 +17,12 @@ use crate::{ deques::Deques, entry_info::EntryInfo, housekeeper::{self, Housekeeper, InnerSync, SyncPace}, - AccessTime, KeyDate, KeyHash, KeyHashDate, KvEntry, ReadOp, ValueEntry, Weigher, - WriteOp, + AccessTime, KeyHash, KeyHashDate, KvEntry, ReadOp, ValueEntry, Weigher, WriteOp, }, deque::{DeqNode, Deque}, frequency_sketch::FrequencySketch, time::{CheckedTimeOps, Clock, Instant}, + timer_wheel::{ReschedulingResult, TimerWheel}, CacheRegion, }, notification::{ @@ -30,7 +30,8 @@ use crate::{ notifier::{RemovalNotifier, RemovedEntry}, EvictionListener, RemovalCause, }, - Entry, Policy, PredicateError, + policy::ExpirationPolicy, + Entry, Expiry, Policy, PredicateError, }; #[cfg(feature = "unstable-debug-counters")] @@ -50,7 +51,7 @@ use std::{ atomic::{AtomicBool, AtomicU8, Ordering}, Arc, }, - time::Duration, + time::{Duration, Instant as StdInstant}, }; use triomphe::Arc as TrioArc; @@ -162,8 +163,7 @@ where weigher: Option>, eviction_listener: Option>, eviction_listener_conf: Option, - time_to_live: Option, - time_to_idle: Option, + expiration_policy: ExpirationPolicy, invalidator_enabled: bool, housekeeper_conf: housekeeper::Configuration, ) -> Self { @@ -186,8 +186,7 @@ where eviction_listener_conf, r_rcv, w_rcv, - time_to_live, - time_to_idle, + expiration_policy, invalidator_enabled, )); if invalidator_enabled { @@ -222,7 +221,8 @@ where let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after()); let now = self.current_time_from_expiration_clock(); - !is_expired_entry_wo(ttl, va, entry, now) + !is_expired_by_per_entry_ttl(entry.entry_info(), now) + && !is_expired_entry_wo(ttl, va, entry, now) && !is_expired_entry_ao(tti, va, entry, now) && !i.is_invalidated_entry(k, entry) }) @@ -243,7 +243,7 @@ where self.do_get_with_hash(key, hash, record, ignore_if, need_key) } - pub(crate) fn get_with_hash_but_ignore_if( + pub(crate) fn get_with_hash_and_ignore_if( &self, key: &Q, hash: u64, @@ -263,7 +263,7 @@ where self.do_get_with_hash(key, hash, record, ignore_if, need_key) } - pub(crate) fn get_with_hash_but_no_recording( + pub(crate) fn get_with_hash_without_recording( &self, key: &Q, hash: u64, @@ -298,7 +298,7 @@ where return None; } - let now = self.current_time_from_expiration_clock(); + let mut now = self.current_time_from_expiration_clock(); let maybe_entry = self .inner @@ -313,7 +313,8 @@ where let i = &self.inner; let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after()); - if is_expired_entry_wo(ttl, va, entry, now) + if is_expired_by_per_entry_ttl(entry.entry_info(), now) + || is_expired_entry_wo(ttl, va, entry, now) || is_expired_entry_ao(tti, va, entry, now) || i.is_invalidated_entry(k, entry) { @@ -322,13 +323,63 @@ where } else { // Valid entry. let maybe_key = if need_key { Some(Arc::clone(k)) } else { None }; - Some((maybe_key, TrioArc::clone(entry), now)) + Some((maybe_key, TrioArc::clone(entry))) } }); - if let Some((maybe_key, entry, now)) = maybe_entry { + if let Some((maybe_key, entry)) = maybe_entry { + let mut is_expiry_modified = false; + + // Call the user supplied `expire_after_read` method if any. + if let Some(expiry) = &self.inner.expiration_policy.expiry() { + let lm = entry.last_modified().expect("Last modified is not set"); + // Check if the `last_modified` of entry is earlier than or equals to + // `now`. If not, update the `now` to `last_modified`. This is needed + // because there is a small chance that other threads have inserted + // the entry _after_ we obtained `now`. + now = now.max(lm); + + // Convert `last_modified` from `moka::common::time::Instant` to + // `std::time::Instant`. + let lm = self.inner.clocks().to_std_instant(lm); + + // Call the user supplied `expire_after_read` method. + // + // We will put the return value (`is_expiry_modified: bool`) to a + // `ReadOp` so that `apply_reads` method can determine whether or not + // to reschedule the timer for the entry. + // + // NOTE: It is not guaranteed that the `ReadOp` is passed to + // `apply_reads`. Here are the corner cases that the `ReadOp` will + // not be passed to `apply_reads`: + // + // - If the bounded `read_op_ch` channel is full, the `ReadOp` will + // be discarded. + // - If we were called by `get_with_hash_without_recording` method, + // the `ReadOp` will not be recorded at all. + // + // These cases are okay because when the timer wheel tries to expire + // the entry, it will check if the entry is actually expired. If not, + // the timer wheel will reschedule the expiration timer for the + // entry. + is_expiry_modified = Self::expire_after_read_or_update( + |k, v, t, d| expiry.expire_after_read(k, v, t, d, lm), + &entry.entry_info().key_hash().key, + &entry, + self.inner.expiration_policy.time_to_live(), + self.inner.expiration_policy.time_to_idle(), + now, + self.inner.clocks(), + ); + } + let v = entry.value.clone(); - read_recorder(ReadOp::Hit(hash, entry, now), now); + let op = ReadOp::Hit { + value_entry: entry, + timestamp: now, + is_expiry_modified, + }; + read_recorder(op, now); Some(Entry::new(maybe_key, v, false)) } else { read_recorder(ReadOp::Miss(hash), now); @@ -405,7 +456,8 @@ where let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after()); let now = self.current_time_from_expiration_clock(); - if is_expired_entry_wo(ttl, va, entry, now) + if is_expired_by_per_entry_ttl(entry.entry_info(), now) + || is_expired_entry_wo(ttl, va, entry, now) || is_expired_entry_ao(tti, va, entry, now) || i.is_invalidated_entry(k, entry) { @@ -478,7 +530,7 @@ where hash, // on_insert || { - let entry = self.new_value_entry(value.clone(), ts, weight); + let entry = self.new_value_entry(&key, hash, value.clone(), ts, weight); let cnt = op_cnt1.fetch_add(1, Ordering::Relaxed); op1 = Some(( cnt, @@ -518,9 +570,29 @@ where ); match (op1, op2) { - (Some((_cnt, ins_op)), None) => (ins_op, ts), + (Some((_cnt, ins_op)), None) => { + if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = + (&self.inner.expiration_policy.expiry(), &ins_op) + { + Self::expire_after_create(expiry, &key, value_entry, ts, self.inner.clocks()); + } + (ins_op, ts) + } (None, Some((_cnt, old_entry, (old_last_accessed, old_last_modified), upd_op))) => { - old_entry.unset_q_nodes(); + if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = + (&self.inner.expiration_policy.expiry(), &upd_op) + { + Self::expire_after_read_or_update( + |k, v, t, d| expiry.expire_after_update(k, v, t, d), + &key, + value_entry, + self.inner.expiration_policy.time_to_live(), + self.inner.expiration_policy.time_to_idle(), + ts, + self.inner.clocks(), + ); + } + if self.is_removal_notifier_enabled() { self.inner .notify_upsert(key, &old_entry, old_last_accessed, old_last_modified); @@ -533,9 +605,33 @@ where Some((cnt2, old_entry, (old_last_accessed, old_last_modified), upd_op)), ) => { if cnt1 > cnt2 { + if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = + (&self.inner.expiration_policy.expiry(), &ins_op) + { + Self::expire_after_create( + expiry, + &key, + value_entry, + ts, + self.inner.clocks(), + ); + } (ins_op, ts) } else { - old_entry.unset_q_nodes(); + if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = + (&self.inner.expiration_policy.expiry(), &upd_op) + { + Self::expire_after_read_or_update( + |k, v, t, d| expiry.expire_after_update(k, v, t, d), + &key, + value_entry, + self.inner.expiration_policy.time_to_live(), + self.inner.expiration_policy.time_to_idle(), + ts, + self.inner.clocks(), + ); + } + if self.is_removal_notifier_enabled() { self.inner.notify_upsert( key, @@ -552,14 +648,40 @@ where } } + #[inline] + fn apply_reads_if_needed(&self, inner: &Inner, now: Instant) { + let len = self.read_op_ch.len(); + + if let Some(hk) = &self.housekeeper { + if Self::should_apply_reads(hk, len, now) { + hk.try_sync(inner); + } + } + } + + #[inline] + fn should_apply_reads(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool { + hk.should_apply_reads(ch_len, now) + } + + #[inline] + fn should_apply_writes(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool { + hk.should_apply_writes(ch_len, now) + } +} + +impl BaseCache { #[inline] fn new_value_entry( &self, + key: &Arc, + hash: u64, value: V, timestamp: Instant, policy_weight: u32, ) -> TrioArc> { - let info = TrioArc::new(EntryInfo::new(timestamp, policy_weight)); + let key_hash = KeyHash::new(Arc::clone(key), hash); + let info = TrioArc::new(EntryInfo::new(key_hash, timestamp, policy_weight)); TrioArc::new(ValueEntry::new(value, info)) } @@ -581,25 +703,59 @@ where TrioArc::new(ValueEntry::new_from(value, info, other)) } - #[inline] - fn apply_reads_if_needed(&self, inner: &Inner, now: Instant) { - let len = self.read_op_ch.len(); - - if let Some(hk) = &self.housekeeper { - if Self::should_apply_reads(hk, len, now) { - hk.try_sync(inner); - } - } + fn expire_after_create( + expiry: &Arc + Send + Sync + 'static>, + key: &K, + value_entry: &ValueEntry, + ts: Instant, + clocks: &Clocks, + ) { + let duration = + expiry.expire_after_create(key, &value_entry.value, clocks.to_std_instant(ts)); + let expiration_time = duration.map(|duration| ts.checked_add(duration).expect("Overflow")); + value_entry + .entry_info() + .set_expiration_time(expiration_time); } - #[inline] - fn should_apply_reads(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool { - hk.should_apply_reads(ch_len, now) - } + fn expire_after_read_or_update( + expiry: impl FnOnce(&K, &V, StdInstant, Option) -> Option, + key: &K, + value_entry: &ValueEntry, + ttl: Option, + tti: Option, + ts: Instant, + clocks: &Clocks, + ) -> bool { + let current_time = clocks.to_std_instant(ts); + let ei = &value_entry.entry_info(); + + let exp_time = IntoIterator::into_iter([ + ei.expiration_time(), + ttl.and_then(|dur| ei.last_modified().and_then(|ts| ts.checked_add(dur))), + tti.and_then(|dur| ei.last_accessed().and_then(|ts| ts.checked_add(dur))), + ]) + .flatten() + .min(); + + let current_duration = exp_time.and_then(|time| { + let std_time = clocks.to_std_instant(time); + std_time.checked_duration_since(current_time) + }); - #[inline] - fn should_apply_writes(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool { - hk.should_apply_writes(ch_len, now) + let duration = expiry(key, &value_entry.value, current_time, current_duration); + + if duration != current_duration { + let expiration_time = + duration.map(|duration| ts.checked_add(duration).expect("Overflow")); + value_entry + .entry_info() + .set_expiration_time(expiration_time); + // The `expiration_time` has changed from `None` to `Some` or vice versa. + true + } else { + false + } } } @@ -759,6 +915,46 @@ enum AdmissionResult { type CacheStore = crate::cht::SegmentedHashMap, TrioArc>, S>; +struct Clocks { + has_expiration_clock: AtomicBool, + expiration_clock: RwLock>, + /// The time (`moka::common::time`) when this timer wheel was created. + origin: Instant, + /// The time (`StdInstant`) when this timer wheel was created. + origin_std: StdInstant, + /// Mutable version of `origin` and `origin_std`. Used when the + /// `expiration_clock` is set. + mutable_origin: RwLock>, +} + +impl Clocks { + fn new(time: Instant, std_time: StdInstant) -> Self { + Self { + has_expiration_clock: Default::default(), + expiration_clock: Default::default(), + origin: time, + origin_std: std_time, + mutable_origin: Default::default(), + } + } + + fn to_std_instant(&self, time: Instant) -> StdInstant { + let (origin, origin_std) = if self.has_expiration_clock.load(Ordering::Relaxed) { + self.mutable_origin + .read() + .expect("mutable_origin is not set") + } else { + (self.origin, self.origin_std) + }; + origin_std + (time.checked_duration_since(origin).unwrap()) + } + + #[cfg(test)] + fn set_origin(&self, time: Instant, std_time: StdInstant) { + *self.mutable_origin.write() = Some((time, std_time)); + } +} + pub(crate) struct Inner { name: Option, max_capacity: Option, @@ -767,30 +963,33 @@ pub(crate) struct Inner { cache: CacheStore, build_hasher: S, deques: Mutex>, + timer_wheel: Mutex>, frequency_sketch: RwLock, frequency_sketch_enabled: AtomicBool, read_op_ch: Receiver>, write_op_ch: Receiver>, - time_to_live: Option, - time_to_idle: Option, + expiration_policy: ExpirationPolicy, valid_after: AtomicInstant, weigher: Option>, removal_notifier: Option>, key_locks: Option>, invalidator_enabled: bool, invalidator: RwLock>>, - has_expiration_clock: AtomicBool, - expiration_clock: RwLock>, + clocks: Clocks, } +// // functions/methods used by BaseCache +// + impl Inner { fn name(&self) -> Option<&str> { self.name.as_deref() } fn policy(&self) -> Policy { - Policy::new(self.max_capacity, 1, self.time_to_live, self.time_to_idle) + let exp = &self.expiration_policy; + Policy::new(self.max_capacity, 1, exp.time_to_live(), exp.time_to_idle()) } #[inline] @@ -817,6 +1016,14 @@ impl Inner { .unwrap_or_default() } + fn maybe_key_lock(&self, key: &Arc) -> Option> + where + K: Hash + Eq, + S: BuildHasher, + { + self.key_locks.as_ref().map(|kls| kls.key_lock(key)) + } + #[cfg(feature = "unstable-debug-counters")] pub fn debug_stats(&self) -> CacheDebugStats { let ec = self.entry_count.load(); @@ -832,9 +1039,10 @@ impl Inner { #[inline] fn current_time_from_expiration_clock(&self) -> Instant { - if self.has_expiration_clock.load(Ordering::Relaxed) { + if self.clocks.has_expiration_clock.load(Ordering::Relaxed) { Instant::new( - self.expiration_clock + self.clocks + .expiration_clock .read() .as_ref() .expect("Cannot get the expiration clock") @@ -845,28 +1053,33 @@ impl Inner { } } + fn clocks(&self) -> &Clocks { + &self.clocks + } + fn num_cht_segments(&self) -> usize { self.cache.actual_num_segments() } #[inline] fn time_to_live(&self) -> Option { - self.time_to_live + self.expiration_policy.time_to_live() } #[inline] fn time_to_idle(&self) -> Option { - self.time_to_idle + self.expiration_policy.time_to_idle() } #[inline] fn has_expiry(&self) -> bool { - self.time_to_live.is_some() || self.time_to_idle.is_some() + let exp = &self.expiration_policy; + exp.time_to_live().is_some() || exp.time_to_idle().is_some() } #[inline] fn is_write_order_queue_enabled(&self) -> bool { - self.time_to_live.is_some() || self.invalidator_enabled + self.expiration_policy.time_to_live().is_some() || self.invalidator_enabled } #[inline] @@ -885,18 +1098,6 @@ impl Inner { } } -// functions/methods used by BaseCache -impl Inner -where - K: Hash + Eq, - S: BuildHasher, -{ - fn maybe_key_lock(&self, key: &Arc) -> Option> { - self.key_locks.as_ref().map(|kls| kls.key_lock(key)) - } -} - -// functions/methods used by BaseCache impl Inner where K: Hash + Eq + Send + Sync + 'static, @@ -916,8 +1117,7 @@ where eviction_listener_conf: Option, read_op_ch: Receiver>, write_op_ch: Receiver>, - time_to_live: Option, - time_to_idle: Option, + expiration_policy: ExpirationPolicy, invalidator_enabled: bool, ) -> Self { let (num_segments, initial_capacity) = if max_capacity == Some(0) { @@ -933,6 +1133,14 @@ where initial_capacity, build_hasher.clone(), ); + + // Assume that getting `moka::common::Instant::now` has lower latency than + // `StdInstant::now`. + let now_std = StdInstant::now(); + let now = Instant::now(); + let clocks = Clocks::new(now, now_std); + let timer_wheel = Mutex::new(TimerWheel::new(now)); + let (removal_notifier, key_locks) = if let Some(listener) = eviction_listener { let rn = RemovalNotifier::new( listener, @@ -956,22 +1164,21 @@ where weighted_size: Default::default(), cache, build_hasher, - deques: Mutex::new(Default::default()), + deques: Default::default(), + timer_wheel, frequency_sketch: RwLock::new(Default::default()), frequency_sketch_enabled: Default::default(), read_op_ch, write_op_ch, - time_to_live, - time_to_idle, + expiration_policy, valid_after: Default::default(), weigher, removal_notifier, key_locks, invalidator_enabled, // When enabled, this field will be set later via the set_invalidator method. - invalidator: RwLock::new(None), - has_expiration_clock: AtomicBool::new(false), - expiration_clock: RwLock::new(None), + invalidator: Default::default(), + clocks, } } @@ -1045,6 +1252,7 @@ where } } + /// Returns `true` if the entry is invalidated by `invalidate_entries_if` method. #[inline] fn is_invalidated_entry(&self, key: &Arc, entry: &TrioArc>) -> bool { if self.invalidator_enabled { @@ -1124,6 +1332,7 @@ where } let mut deqs = self.deques.lock(); + let mut timer_wheel = self.timer_wheel.lock(); let mut calls = 0; let current_ec = self.entry_count.load(); let current_ws = self.weighted_size.load(); @@ -1135,12 +1344,12 @@ where while should_process_logs && calls <= max_repeats { let r_len = self.read_op_ch.len(); if r_len > 0 { - self.apply_reads(&mut deqs, r_len); + self.apply_reads(&mut deqs, &mut timer_wheel, r_len); } let w_len = self.write_op_ch.len(); if w_len > 0 { - self.apply_writes(&mut deqs, w_len, &mut eviction_state); + self.apply_writes(&mut deqs, &mut timer_wheel, w_len, &mut eviction_state); } if self.should_enable_frequency_sketch(&eviction_state.counters) { @@ -1152,9 +1361,18 @@ where || self.write_op_ch.len() >= WRITE_LOG_FLUSH_POINT; } + if timer_wheel.is_enabled() { + self.evict_expired_entries_using_timers( + &mut timer_wheel, + &mut deqs, + &mut eviction_state, + ); + } + if self.has_expiry() || self.has_valid_after() { - self.evict_expired( + self.evict_expired_entries_using_deqs( &mut deqs, + &mut timer_wheel, batch_size::EVICTION_BATCH_SIZE, &mut eviction_state, ); @@ -1166,6 +1384,7 @@ where self.invalidate_entries( invalidator, &mut deqs, + &mut timer_wheel, batch_size::INVALIDATION_BATCH_SIZE, &mut eviction_state, ); @@ -1178,6 +1397,7 @@ where if weights_to_evict > 0 { self.evict_lru_entries( &mut deqs, + &mut timer_wheel, batch_size::EVICTION_BATCH_SIZE, weights_to_evict, &mut eviction_state, @@ -1272,16 +1492,24 @@ where self.frequency_sketch_enabled.store(true, Ordering::Release); } - fn apply_reads(&self, deqs: &mut Deques, count: usize) { + fn apply_reads(&self, deqs: &mut Deques, timer_wheel: &mut TimerWheel, count: usize) { use ReadOp::*; let mut freq = self.frequency_sketch.write(); let ch = &self.read_op_ch; for _ in 0..count { match ch.try_recv() { - Ok(Hit(hash, entry, timestamp)) => { - freq.increment(hash); - entry.set_last_accessed(timestamp); - deqs.move_to_back_ao(&entry) + Ok(Hit { + value_entry, + timestamp, + is_expiry_modified, + }) => { + let kh = value_entry.entry_info().key_hash(); + freq.increment(kh.hash); + value_entry.set_last_accessed(timestamp); + if is_expiry_modified { + self.update_timer_wheel(&value_entry, timer_wheel); + } + deqs.move_to_back_ao(&value_entry); } Ok(Miss(hash)) => freq.increment(hash), Err(_) => break, @@ -1292,6 +1520,7 @@ where fn apply_writes( &self, deqs: &mut Deques, + timer_wheel: &mut TimerWheel, count: usize, eviction_state: &mut EvictionState<'_, K, V>, ) where @@ -1314,11 +1543,12 @@ where old_weight, new_weight, deqs, + timer_wheel, &freq, eviction_state, ), Ok(Remove(KvEntry { key: _key, entry })) => { - Self::handle_remove(deqs, entry, &mut eviction_state.counters) + Self::handle_remove(deqs, timer_wheel, entry, &mut eviction_state.counters) } Err(_) => break, }; @@ -1333,6 +1563,7 @@ where old_weight: u32, new_weight: u32, deqs: &mut Deques, + timer_wheel: &mut TimerWheel, freq: &FrequencySketch, eviction_state: &mut EvictionState<'_, K, V>, ) where @@ -1347,6 +1578,7 @@ where // The entry has been already admitted, so treat this as an update. counters.saturating_sub(0, old_weight); counters.saturating_add(0, new_weight); + self.update_timer_wheel(&entry, timer_wheel); deqs.move_to_back_ao(&entry); deqs.move_to_back_wo(&entry); return; @@ -1355,7 +1587,7 @@ where if self.has_enough_capacity(new_weight, counters) { // There are enough room in the cache (or the cache is unbounded). // Add the candidate to the deques. - self.handle_admit(kh, &entry, new_weight, deqs, counters); + self.handle_admit(&entry, new_weight, deqs, timer_wheel, counters); return; } } @@ -1409,7 +1641,12 @@ where ); } // And then remove the victim from the deques. - Self::handle_remove(deqs, vic_entry, &mut eviction_state.counters); + Self::handle_remove( + deqs, + timer_wheel, + vic_entry, + &mut eviction_state.counters, + ); } else { // Could not remove the victim from the cache. Skip this // victim node as its ValueEntry might have been @@ -1420,7 +1657,13 @@ where skipped_nodes = skipped; // Add the candidate to the deques. - self.handle_admit(kh, &entry, new_weight, deqs, &mut eviction_state.counters); + self.handle_admit( + &entry, + new_weight, + deqs, + timer_wheel, + &mut eviction_state.counters, + ); } AdmissionResult::Rejected { skipped_nodes: s } => { skipped_nodes = s; @@ -1440,6 +1683,10 @@ where // Move the skipped nodes to the back of the deque. We do not unlink (drop) // them because ValueEntries in the write op queue should be pointing them. + // + // TODO FIXME: This `move_to_back()` will be considered UB as violating the + // aliasing rule because these skipped nodes were acquired by `peek_front` or + // `next_node`. (They both return `&node` instead of `&mut node`). for node in skipped_nodes { unsafe { deqs.probation.move_to_back(node) }; } @@ -1526,26 +1773,90 @@ where fn handle_admit( &self, - kh: KeyHash, entry: &TrioArc>, policy_weight: u32, deqs: &mut Deques, + timer_wheel: &mut TimerWheel, counters: &mut EvictionCounters, ) { - let key = Arc::clone(&kh.key); counters.saturating_add(1, policy_weight); + + self.update_timer_wheel(entry, timer_wheel); + + // Update the deques. deqs.push_back_ao( CacheRegion::MainProbation, - KeyHashDate::new(kh, entry.entry_info()), + KeyHashDate::new(entry.entry_info()), entry, ); if self.is_write_order_queue_enabled() { - deqs.push_back_wo(KeyDate::new(key, entry.entry_info()), entry); + deqs.push_back_wo(KeyHashDate::new(entry.entry_info()), entry); } entry.set_admitted(true); } + /// NOTE: This method may enable the timer wheel. + fn update_timer_wheel( + &self, + entry: &TrioArc>, + timer_wheel: &mut TimerWheel, + ) { + // Enable the timer wheel if needed. + if entry.entry_info().expiration_time().is_some() && !timer_wheel.is_enabled() { + timer_wheel.enable(); + } + + // Update the timer wheel. + match ( + entry.entry_info().expiration_time().is_some(), + entry.timer_node(), + ) { + // Do nothing; the cache entry has no expiration time and not registered + // to the timer wheel. + (false, None) => (), + // Register the cache entry to the timer wheel; the cache entry has an + // expiration time and not registered to the timer wheel. + (true, None) => { + let timer = timer_wheel.schedule( + TrioArc::clone(entry.entry_info()), + TrioArc::clone(entry.deq_nodes()), + ); + entry.set_timer_node(timer); + } + // Reschedule the cache entry in the timer wheel; the cache entry has an + // expiration time and already registered to the timer wheel. + (true, Some(tn)) => { + let result = timer_wheel.reschedule(tn); + if let ReschedulingResult::Removed(removed_tn) = result { + // The timer node was removed from the timer wheel because the + // expiration time has been unset by other thread after we + // checked. + entry.set_timer_node(None); + drop(removed_tn); + } + } + // Unregister the cache entry from the timer wheel; the cache entry has + // no expiration time but registered to the timer wheel. + (false, Some(tn)) => { + entry.set_timer_node(None); + timer_wheel.deschedule(tn); + } + } + } + fn handle_remove( + deqs: &mut Deques, + timer_wheel: &mut TimerWheel, + entry: TrioArc>, + counters: &mut EvictionCounters, + ) { + if let Some(timer_node) = entry.take_timer_node() { + timer_wheel.deschedule(timer_node); + } + Self::handle_remove_without_timer_wheel(deqs, entry, counters); + } + + fn handle_remove_without_timer_wheel( deqs: &mut Deques, entry: TrioArc>, counters: &mut EvictionCounters, @@ -1564,10 +1875,14 @@ where fn handle_remove_with_deques( ao_deq_name: &str, ao_deq: &mut Deque>, - wo_deq: &mut Deque>, + wo_deq: &mut Deque>, + timer_wheel: &mut TimerWheel, entry: TrioArc>, counters: &mut EvictionCounters, ) { + if let Some(timer) = entry.take_timer_node() { + timer_wheel.deschedule(timer); + } if entry.is_admitted() { entry.set_admitted(false); counters.saturating_sub(1, entry.policy_weight()); @@ -1579,9 +1894,68 @@ where } } - fn evict_expired( + fn evict_expired_entries_using_timers( &self, + timer_wheel: &mut TimerWheel, deqs: &mut Deques, + eviction_state: &mut EvictionState<'_, K, V>, + ) where + V: Clone, + { + use crate::common::timer_wheel::TimerEvent; + + let now = self.current_time_from_expiration_clock(); + + // NOTE: When necessary, the iterator returned from advance() will unset the + // timer node pointer in the `ValueEntry`, so we do not have to do it here. + for event in timer_wheel.advance(now) { + // We do not have to do anything if event is `TimerEvent::Descheduled(_)` + // or `TimerEvent::Rescheduled(_)`. + if let TimerEvent::Expired(node) = event { + let entry_info = node.element.entry_info(); + let kh = entry_info.key_hash(); + let key = &kh.key; + let hash = kh.hash; + + // Lock the key for removal if blocking removal notification is + // enabled. + let kl = self.maybe_key_lock(key); + let _klg = &kl.as_ref().map(|kl| kl.lock()); + + // Remove the key from the map only when the entry is really + // expired. + let maybe_entry = self.cache.remove_if( + hash, + |k| k == key, + |_, v| is_expired_by_per_entry_ttl(v.entry_info(), now), + ); + + if let Some(entry) = maybe_entry { + if eviction_state.is_notifier_enabled() { + let key = Arc::clone(key); + eviction_state.add_removed_entry(key, &entry, RemovalCause::Expired); + } + Self::handle_remove_without_timer_wheel( + deqs, + entry, + &mut eviction_state.counters, + ); + } else { + // Other thread might have updated or invalidated the entry + // already. We have nothing to do here as the `advance()` + // iterator has unset the timer node pointer in the old + // `ValueEntry`. (In the case of update, the timer node will be + // recreated for the new `ValueEntry` when it is processed by the + // `handle_upsert` method.) + } + } + } + } + + fn evict_expired_entries_using_deqs( + &self, + deqs: &mut Deques, + timer_wheel: &mut TimerWheel, batch_size: usize, eviction_state: &mut EvictionState<'_, K, V>, ) where @@ -1590,10 +1964,10 @@ where let now = self.current_time_from_expiration_clock(); if self.is_write_order_queue_enabled() { - self.remove_expired_wo(deqs, batch_size, now, eviction_state); + self.remove_expired_wo(deqs, timer_wheel, batch_size, now, eviction_state); } - if self.time_to_idle.is_some() || self.has_valid_after() { + if self.expiration_policy.time_to_idle().is_some() || self.has_valid_after() { let (window, probation, protected, wo) = ( &mut deqs.window, &mut deqs.probation, @@ -1601,8 +1975,9 @@ where &mut deqs.write_order, ); - let mut rm_expired_ao = - |name, deq| self.remove_expired_ao(name, deq, wo, batch_size, now, eviction_state); + let mut rm_expired_ao = |name, deq| { + self.remove_expired_ao(name, deq, wo, timer_wheel, batch_size, now, eviction_state) + }; rm_expired_ao("window", window); rm_expired_ao("probation", probation); @@ -1610,19 +1985,21 @@ where } } + #[allow(clippy::too_many_arguments)] #[inline] fn remove_expired_ao( &self, deq_name: &str, deq: &mut Deque>, - write_order_deq: &mut Deque>, + write_order_deq: &mut Deque>, + timer_wheel: &mut TimerWheel, batch_size: usize, now: Instant, eviction_state: &mut EvictionState<'_, K, V>, ) where V: Clone, { - let tti = &self.time_to_idle; + let tti = &self.expiration_policy.time_to_idle(); let va = &self.valid_after(); for _ in 0..batch_size { // Peek the front node of the deque and check if it is expired. @@ -1675,6 +2052,7 @@ where deq_name, deq, write_order_deq, + timer_wheel, entry, &mut eviction_state.counters, ); @@ -1691,7 +2069,7 @@ where hash: u64, deq_name: &str, deq: &mut Deque>, - write_order_deq: &mut Deque>, + write_order_deq: &mut Deque>, ) -> bool { if let Some(entry) = self.cache.get(hash, |k| (k.borrow() as &K) == key) { if entry.is_dirty() { @@ -1705,9 +2083,13 @@ where } } else { // Skip this entry as the key might have been invalidated. Since the - // invalidated ValueEntry (which should be still in the write op - // queue) has a pointer to this node, move the node to the back of - // the deque instead of popping (dropping) it. + // invalidated ValueEntry (which should be still in the write op queue) + // has a pointer to this node, move the node to the back of the deque + // instead of popping (dropping) it. + // + // TODO FIXME: This `peek_front()` and `move_to_back()` combo will be + // considered UB as violating the aliasing rule. (`peek_front` returns + // `&node` instead of `&mut node`). if let Some(node) = deq.peek_front() { let node = NonNull::from(node); unsafe { deq.move_to_back(node) }; @@ -1720,13 +2102,14 @@ where fn remove_expired_wo( &self, deqs: &mut Deques, + timer_wheel: &mut TimerWheel, batch_size: usize, now: Instant, eviction_state: &mut EvictionState<'_, K, V>, ) where V: Clone, { - let ttl = &self.time_to_live; + let ttl = &self.expiration_policy.time_to_live(); let va = &self.valid_after(); for _ in 0..batch_size { let key_cause = deqs.write_order.peek_front().and_then( @@ -1760,8 +2143,9 @@ where let key = Arc::clone(key); eviction_state.add_removed_entry(key, &entry, *cause); } - Self::handle_remove(deqs, entry, &mut eviction_state.counters); + Self::handle_remove(deqs, timer_wheel, entry, &mut eviction_state.counters); } else if let Some(entry) = self.cache.get(hash, |k| k == key) { + // TODO: CHECKME: Should we check `entry.is_dirty()` instead? if entry.last_modified().is_none() { deqs.move_to_back_ao(&entry); deqs.move_to_back_wo(&entry); @@ -1774,6 +2158,10 @@ where // invalidated ValueEntry (which should be still in the write op // queue) has a pointer to this node, move the node to the back of // the deque instead of popping (dropping) it. + // + // TODO FIXME: This `peek_front()` and `move_to_back()` combo will be + // considered UB as violating the aliasing rule (`peek_front` returns + // `&node` instead of `&mut node`). if let Some(node) = deqs.write_order.peek_front() { let node = NonNull::from(node); unsafe { deqs.write_order.move_to_back(node) }; @@ -1786,12 +2174,13 @@ where &self, invalidator: &Invalidator, deqs: &mut Deques, + timer_wheel: &mut TimerWheel, batch_size: usize, eviction_state: &mut EvictionState<'_, K, V>, ) where V: Clone, { - self.process_invalidation_result(invalidator, deqs, eviction_state); + self.process_invalidation_result(invalidator, deqs, timer_wheel, eviction_state); self.submit_invalidation_task(invalidator, &mut deqs.write_order, batch_size); } @@ -1799,6 +2188,7 @@ where &self, invalidator: &Invalidator, deqs: &mut Deques, + timer_wheel: &mut TimerWheel, eviction_state: &mut EvictionState<'_, K, V>, ) where V: Clone, @@ -1809,7 +2199,7 @@ where }) = invalidator.task_result() { for KvEntry { key: _key, entry } in invalidated { - Self::handle_remove(deqs, entry, &mut eviction_state.counters); + Self::handle_remove(deqs, timer_wheel, entry, &mut eviction_state.counters); } if is_done { deqs.write_order.reset_cursor(); @@ -1820,7 +2210,7 @@ where fn submit_invalidation_task( &self, invalidator: &Invalidator, - write_order: &mut Deque>, + write_order: &mut Deque>, batch_size: usize, ) where V: Clone, @@ -1862,6 +2252,7 @@ where fn evict_lru_entries( &self, deqs: &mut Deques, + timer_wheel: &mut TimerWheel, batch_size: usize, weights_to_evict: u64, eviction_state: &mut EvictionState<'_, K, V>, @@ -1927,6 +2318,7 @@ where DEQ_NAME, deq, write_order_deq, + timer_wheel, entry, &mut eviction_state.counters, ); @@ -1963,17 +2355,18 @@ where last_modified: Option, ) { let now = self.current_time_from_expiration_clock(); + let exp = &self.expiration_policy; let mut cause = RemovalCause::Replaced; if let Some(last_accessed) = last_accessed { - if is_expired_by_tti(&self.time_to_idle, last_accessed, now) { + if is_expired_by_tti(&exp.time_to_idle(), last_accessed, now) { cause = RemovalCause::Expired; } } if let Some(last_modified) = last_modified { - if is_expired_by_ttl(&self.time_to_live, last_modified, now) { + if is_expired_by_ttl(&exp.time_to_live(), last_modified, now) { cause = RemovalCause::Expired; } else if is_invalid_entry(&self.valid_after(), last_modified) { cause = RemovalCause::Explicit; @@ -1986,17 +2379,18 @@ where #[inline] fn notify_invalidate(&self, key: &Arc, entry: &TrioArc>) { let now = self.current_time_from_expiration_clock(); + let exp = &self.expiration_policy; let mut cause = RemovalCause::Explicit; if let Some(last_accessed) = entry.last_accessed() { - if is_expired_by_tti(&self.time_to_idle, last_accessed, now) { + if is_expired_by_tti(&exp.time_to_idle(), last_accessed, now) { cause = RemovalCause::Expired; } } if let Some(last_modified) = entry.last_modified() { - if is_expired_by_ttl(&self.time_to_live, last_modified, now) { + if is_expired_by_ttl(&exp.time_to_live(), last_modified, now) { cause = RemovalCause::Expired; } } @@ -2023,12 +2417,20 @@ where } fn set_expiration_clock(&self, clock: Option) { - let mut exp_clock = self.expiration_clock.write(); + let mut exp_clock = self.clocks.expiration_clock.write(); if let Some(clock) = clock { + let std_now = StdInstant::now(); + let now = Instant::new(clock.now()); *exp_clock = Some(clock); - self.has_expiration_clock.store(true, Ordering::SeqCst); + self.clocks + .has_expiration_clock + .store(true, Ordering::SeqCst); + self.clocks.set_origin(now, std_now); + self.timer_wheel.lock().set_origin(now); } else { - self.has_expiration_clock.store(false, Ordering::SeqCst); + self.clocks + .has_expiration_clock + .store(false, Ordering::SeqCst); *exp_clock = None; } } @@ -2037,6 +2439,20 @@ where // // private free-standing functions // + +/// Returns `true` if this entry is expired by its per-entry TTL. +#[inline] +fn is_expired_by_per_entry_ttl(entry_info: &TrioArc>, now: Instant) -> bool { + if let Some(ts) = entry_info.expiration_time() { + return ts <= now; + } + false +} + +/// Returns `true` when one of the followings conditions is met: +/// +/// - This entry is expired by the time-to-idle config of this cache instance. +/// - Or, it is invalidated by the `invalidate_all` method. #[inline] fn is_expired_entry_ao( time_to_idle: &Option, @@ -2052,6 +2468,10 @@ fn is_expired_entry_ao( false } +/// Returns `true` when one of the following conditions is met: +/// +/// - This entry is expired by the time-to-live (TTL) config of this cache instance. +/// - Or, it is invalidated by the `invalidate_all` method. #[inline] fn is_expired_entry_wo( time_to_live: &Option, @@ -2141,7 +2561,7 @@ fn is_expired_by_ttl( #[cfg(test)] mod tests { - use crate::common::concurrent::housekeeper; + use crate::{common::concurrent::housekeeper, policy::ExpirationPolicy}; use super::BaseCache; @@ -2162,8 +2582,7 @@ mod tests { None, None, None, - None, - None, + Default::default(), false, housekeeper::Configuration::new_thread_pool(true), ); @@ -2209,4 +2628,85 @@ mod tests { } }; } + + #[test] + fn test_per_entry_expiration() { + use super::InnerSync; + use crate::{common::time::Clock, Expiry}; + + use std::{ + collections::hash_map::RandomState, + sync::Arc, + time::{Duration, Instant as StdInstant}, + }; + + struct MyExpiry; + + impl Expiry for MyExpiry { + fn expire_after_create( + &self, + key: &u32, + _value: &char, + _current_time: StdInstant, + ) -> Option { + if key == &1 { + Some(Duration::from_secs(1)) + } else { + None + } + } + } + + let mut cache = BaseCache::::new( + None, + Some(100), + None, + RandomState::default(), + None, + None, + None, + ExpirationPolicy::new(None, None, Some(Arc::new(MyExpiry))), + false, + housekeeper::Configuration::new_blocking(), + ); + cache.reconfigure_for_testing(); + + let (clock, mock) = Clock::mock(); + cache.set_expiration_clock(Some(clock)); + + // Make the cache exterior immutable. + let cache = cache; + + mock.increment(Duration::from_millis(10)); + + // Insert an entry. It will have a per-entry TTL of 1 second. + let key = 1; + let hash = cache.hash(&key); + let (op, _now) = cache.do_insert_with_hash(Arc::new(key), hash, 'a'); + cache.write_op_ch.send(op).expect("Failed to send"); + + // Run a sync to register the entry to the internal data structures including + // the timer wheel. + cache.inner.sync(1); + assert_eq!(cache.entry_count(), 1); + + // Increment the time by 999ms. The entry should still be in the cache. + mock.increment(Duration::from_millis(999)); + cache.inner.sync(1); + assert!(cache.contains_key_with_hash(&key, hash)); + assert_eq!(cache.entry_count(), 1); + + // Increment the time by 1ms. The entry should be expired, so + // contains_key_with_hash should return false no matter if the entry is in + // the cache or not. + mock.increment(Duration::from_millis(1)); + cache.inner.sync(1); + assert!(!cache.contains_key_with_hash(&key, hash)); + + // Increment the time more to ensure the entry has been evicted from the + // cache. + mock.increment(Duration::from_secs(1)); + cache.inner.sync(1); + assert_eq!(cache.entry_count(), 0); + } } diff --git a/src/sync_base/invalidator.rs b/src/sync_base/invalidator.rs index a0885a9c..b21d7cdb 100644 --- a/src/sync_base/invalidator.rs +++ b/src/sync_base/invalidator.rs @@ -1,5 +1,3 @@ -#![allow(unused)] - use super::{base_cache::Inner, PredicateId, PredicateIdStr}; use crate::{ common::{