Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"ahash",
"armv",
"benmanes",
"CHECKME",
"circleci",
"CLFU",
"clippy",
Expand All @@ -19,10 +20,13 @@
"deqs",
"Deque",
"Deques",
"deschedule",
"Descheduled",
"devcontainer",
"docsrs",
"Einziger",
"else's",
"ENHANCEME",
"Eytan",
"getrandom",
"hashbrown",
Expand Down
1 change: 1 addition & 0 deletions src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub(crate) mod builder_utils;
pub(crate) mod deque;
pub(crate) mod frequency_sketch;
pub(crate) mod time;
pub(crate) mod timer_wheel;

#[cfg(all(test, any(feature = "sync", feature = "future")))]
pub(crate) mod test_utils;
Expand Down
80 changes: 56 additions & 24 deletions src/common/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub(crate) mod debug_counters;

use self::entry_info::EntryInfo;

use super::timer_wheel::TimerNode;

pub(crate) type Weigher<K, V> = Arc<dyn Fn(&K, &V) -> u32 + Send + Sync + 'static>;

pub(crate) trait AccessTime {
Expand All @@ -40,6 +42,7 @@ pub(crate) trait AccessTime {
fn set_last_modified(&self, timestamp: Instant);
}

#[derive(Debug)]
pub(crate) struct KeyHash<K> {
pub(crate) key: Arc<K>,
pub(crate) hash: u64,
Expand All @@ -62,11 +65,11 @@ impl<K> Clone for KeyHash<K> {

pub(crate) struct KeyDate<K> {
key: Arc<K>,
entry_info: TrioArc<EntryInfo>,
entry_info: TrioArc<EntryInfo<K>>,
}

impl<K> KeyDate<K> {
pub(crate) fn new(key: Arc<K>, entry_info: &TrioArc<EntryInfo>) -> Self {
pub(crate) fn new(key: Arc<K>, entry_info: &TrioArc<EntryInfo<K>>) -> Self {
Self {
key,
entry_info: TrioArc::clone(entry_info),
Expand All @@ -91,11 +94,11 @@ impl<K> KeyDate<K> {
pub(crate) struct KeyHashDate<K> {
key: Arc<K>,
hash: u64,
entry_info: TrioArc<EntryInfo>,
entry_info: TrioArc<EntryInfo<K>>,
}

impl<K> KeyHashDate<K> {
pub(crate) fn new(kh: KeyHash<K>, entry_info: &TrioArc<EntryInfo>) -> Self {
pub(crate) fn new(kh: KeyHash<K>, entry_info: &TrioArc<EntryInfo<K>>) -> Self {
Self {
key: kh.key,
hash: kh.hash,
Expand All @@ -111,7 +114,7 @@ impl<K> KeyHashDate<K> {
self.hash
}

pub(crate) fn entry_info(&self) -> &EntryInfo {
pub(crate) fn entry_info(&self) -> &EntryInfo<K> {
&self.entry_info
}
}
Expand Down Expand Up @@ -177,54 +180,63 @@ type KeyDeqNodeAo<K> = TagNonNull<DeqNode<KeyHashDate<K>>, 2>;
// DeqNode for the write order queue.
type KeyDeqNodeWo<K> = NonNull<DeqNode<KeyDate<K>>>;

// DeqNode for the timer wheel.
type DeqNodeTimer<K> = NonNull<DeqNode<TimerNode<K>>>;

pub(crate) struct DeqNodes<K> {
access_order_q_node: Option<KeyDeqNodeAo<K>>,
write_order_q_node: Option<KeyDeqNodeWo<K>>,
timer_node: Option<DeqNodeTimer<K>>,
}

impl<K> Default for DeqNodes<K> {
fn default() -> Self {
Self {
access_order_q_node: None,
write_order_q_node: None,
timer_node: None,
}
}
}

// We need this `unsafe impl` as DeqNodes have NonNull pointers.
unsafe impl<K> Send for DeqNodes<K> {}

impl<K> DeqNodes<K> {
pub(crate) fn set_timer_node(&mut self, timer_node: Option<DeqNodeTimer<K>>) {
self.timer_node = timer_node;
}
}

pub(crate) struct ValueEntry<K, V> {
pub(crate) value: V,
info: TrioArc<EntryInfo>,
nodes: Mutex<DeqNodes<K>>,
info: TrioArc<EntryInfo<K>>,
nodes: TrioArc<Mutex<DeqNodes<K>>>,
}

impl<K, V> ValueEntry<K, V> {
pub(crate) fn new(value: V, entry_info: TrioArc<EntryInfo>) -> Self {
pub(crate) fn new(value: V, entry_info: TrioArc<EntryInfo<K>>) -> Self {
#[cfg(feature = "unstable-debug-counters")]
self::debug_counters::InternalGlobalDebugCounters::value_entry_created();

Self {
value,
info: entry_info,
nodes: Mutex::new(DeqNodes {
access_order_q_node: None,
write_order_q_node: None,
}),
nodes: TrioArc::new(Mutex::new(DeqNodes::default())),
}
}

pub(crate) fn new_from(value: V, entry_info: TrioArc<EntryInfo>, other: &Self) -> Self {
pub(crate) fn new_from(value: V, entry_info: TrioArc<EntryInfo<K>>, other: &Self) -> Self {
#[cfg(feature = "unstable-debug-counters")]
self::debug_counters::InternalGlobalDebugCounters::value_entry_created();

let nodes = {
let other_nodes = other.nodes.lock();
DeqNodes {
access_order_q_node: other_nodes.access_order_q_node,
write_order_q_node: other_nodes.write_order_q_node,
}
};
Self {
value,
info: entry_info,
nodes: Mutex::new(nodes),
nodes: TrioArc::clone(&other.nodes),
}
}

pub(crate) fn entry_info(&self) -> &TrioArc<EntryInfo> {
pub(crate) fn entry_info(&self) -> &TrioArc<EntryInfo<K>> {
&self.info
}

Expand All @@ -249,6 +261,10 @@ impl<K, V> ValueEntry<K, V> {
self.info.policy_weight()
}

pub(crate) fn deq_nodes(&self) -> &TrioArc<Mutex<DeqNodes<K>>> {
&self.nodes
}

pub(crate) fn access_order_q_node(&self) -> Option<KeyDeqNodeAo<K>> {
self.nodes.lock().access_order_q_node
}
Expand All @@ -273,6 +289,18 @@ impl<K, V> ValueEntry<K, V> {
self.nodes.lock().write_order_q_node.take()
}

pub(crate) fn timer_node(&self) -> Option<DeqNodeTimer<K>> {
self.nodes.lock().timer_node
}

pub(crate) fn set_timer_node(&self, node: Option<DeqNodeTimer<K>>) {
self.nodes.lock().timer_node = node;
}

pub(crate) fn take_timer_node(&self) -> Option<DeqNodeTimer<K>> {
self.nodes.lock().timer_node.take()
}

pub(crate) fn unset_q_nodes(&self) {
let mut nodes = self.nodes.lock();
nodes.access_order_q_node = None;
Expand Down Expand Up @@ -310,8 +338,12 @@ impl<K, V> AccessTime for TrioArc<ValueEntry<K, V>> {
}

pub(crate) enum ReadOp<K, V> {
Hit {
value_entry: TrioArc<ValueEntry<K, V>>,
timestamp: Instant,
is_expiry_modified: bool,
},
// u64 is the hash of the key.
Hit(u64, TrioArc<ValueEntry<K, V>>, Instant),
Miss(u64),
}

Expand Down
5 changes: 5 additions & 0 deletions src/common/concurrent/atomic_time/atomic_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
sync::atomic::{AtomicU64, Ordering},
};

#[derive(Debug)]
pub(crate) struct AtomicInstant {
instant: AtomicU64,
}
Expand All @@ -27,6 +28,10 @@ impl AtomicInstant {
ai
}

pub(crate) fn clear(&self) {
self.instant.store(u64::MAX, Ordering::Release);
}

pub(crate) fn is_set(&self) -> bool {
self.instant.load(Ordering::Acquire) != u64::MAX
}
Expand Down
5 changes: 5 additions & 0 deletions src/common/concurrent/atomic_time/atomic_time_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::Instant;

use parking_lot::RwLock;

#[derive(Debug)]
pub(crate) struct AtomicInstant {
instant: RwLock<Option<Instant>>,
}
Expand All @@ -21,6 +22,10 @@ impl AtomicInstant {
ai
}

pub(crate) fn clear(&self) {
*self.instant.write() = None;
}

pub(crate) fn is_set(&self) -> bool {
self.instant.read().is_some()
}
Expand Down
49 changes: 38 additions & 11 deletions src/common/concurrent/entry_info.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};

use super::AccessTime;
use super::{AccessTime, KeyHash};
use crate::common::{concurrent::atomic_time::AtomicInstant, time::Instant};

pub(crate) struct EntryInfo {
#[derive(Debug)]
pub(crate) struct EntryInfo<K> {
key_hash: KeyHash<K>,
/// `is_admitted` indicates that the entry has been admitted to the
/// cache. When `false`, it means the entry is _temporary_ admitted to
/// the cache or evicted from the cache (so it should not have LRU nodes).
Expand All @@ -14,24 +16,32 @@ pub(crate) struct EntryInfo {
is_dirty: AtomicBool,
last_accessed: AtomicInstant,
last_modified: AtomicInstant,
expiration_time: AtomicInstant,
policy_weight: AtomicU32,
}

impl EntryInfo {
impl<K> EntryInfo<K> {
#[inline]
pub(crate) fn new(timestamp: Instant, policy_weight: u32) -> Self {
pub(crate) fn new(key_hash: KeyHash<K>, timestamp: Instant, policy_weight: u32) -> Self {
#[cfg(feature = "unstable-debug-counters")]
super::debug_counters::InternalGlobalDebugCounters::entry_info_created();

Self {
key_hash,
is_admitted: Default::default(),
is_dirty: AtomicBool::new(true),
last_accessed: AtomicInstant::new(timestamp),
last_modified: AtomicInstant::new(timestamp),
expiration_time: AtomicInstant::default(),
policy_weight: AtomicU32::new(policy_weight),
}
}

#[inline]
pub(crate) fn key_hash(&self) -> &KeyHash<K> {
&self.key_hash
}

#[inline]
pub(crate) fn is_admitted(&self) -> bool {
self.is_admitted.load(Ordering::Acquire)
Expand Down Expand Up @@ -60,16 +70,29 @@ impl EntryInfo {
pub(crate) fn set_policy_weight(&self, size: u32) {
self.policy_weight.store(size, Ordering::Release);
}

#[inline]
pub(crate) fn expiration_time(&self) -> Option<Instant> {
self.expiration_time.instant()
}

pub(crate) fn set_expiration_time(&self, time: Option<Instant>) {
if let Some(t) = time {
self.expiration_time.set_instant(t);
} else {
self.expiration_time.clear();
}
}
}

#[cfg(feature = "unstable-debug-counters")]
impl Drop for EntryInfo {
impl<K> Drop for EntryInfo<K> {
fn drop(&mut self) {
super::debug_counters::InternalGlobalDebugCounters::entry_info_dropped();
}
}

impl AccessTime for EntryInfo {
impl<K> AccessTime for EntryInfo<K> {
#[inline]
fn last_accessed(&self) -> Option<Instant> {
self.last_accessed.instant()
Expand Down Expand Up @@ -100,10 +123,14 @@ mod test {
// RUSTFLAGS='--cfg rustver' cargo test --lib --no-default-features --features sync -- common::concurrent::entry_info::test --nocapture
//
// Note: the size of the struct may change in a future version of Rust.
#[cfg_attr(
not(all(rustver, any(target_os = "linux", target_os = "macos"))),
ignore
)]

// TODO: Re-enable this test.

// #[cfg_attr(
// not(all(rustver, any(target_os = "linux", target_os = "macos"))),
// ignore
// )]
#[ignore]
#[test]
fn check_struct_size() {
use std::mem::size_of;
Expand Down Expand Up @@ -152,7 +179,7 @@ mod test {
}

if let Some(size) = expected {
assert_eq!(size_of::<EntryInfo>(), size);
assert_eq!(size_of::<EntryInfo<()>>(), size);
} else {
panic!("No expected size for {:?} with Rust version {}", arch, ver);
}
Expand Down
5 changes: 3 additions & 2 deletions src/common/deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ impl<T> Deque<T> {
/// This method takes care not to create mutable references to `element`, to
/// maintain validity of aliasing pointers.
///
/// Panics:
/// IMPORTANT: This method does not drop the node. If the node is no longer
/// needed, use `unlink_and_drop` instead, or drop it at the caller side.
/// Otherwise, the node will leak.
pub(crate) unsafe fn unlink(&mut self, mut node: NonNull<DeqNode<T>>) {
if self.is_at_cursor(node.as_ref()) {
self.advance_cursor();
Expand Down Expand Up @@ -265,7 +267,6 @@ impl<T> Deque<T> {
std::mem::drop(Box::from_raw(node.as_ptr()));
}

#[allow(unused)]
pub(crate) fn reset_cursor(&mut self) {
self.cursor = None;
}
Expand Down
Loading