Skip to content

Commit 1f19eac

Browse files
authored
kad: Limit MemoryStore entries (#78)
This PR ensures that the memory store entries are not growing indefinitely, which may cause: - a memory leak and process being terminated - using outdated records The following changes are introduced: - records are stored in the memory store iff space is available - a record can replace another record iff it outlives the currently stored record - records are removed from the store if they expire when the record is inspected - record value must not exceed a sensible number of bytes cc @dmitry-markin --------- Signed-off-by: Alexandru Vasile <[email protected]>
1 parent 84267ad commit 1f19eac

File tree

2 files changed

+161
-5
lines changed

2 files changed

+161
-5
lines changed

src/protocol/libp2p/kademlia/record.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ impl Record {
104104
}
105105

106106
/// Checks whether the record is expired w.r.t. the given `Instant`.
107-
pub fn _is_expired(&self, now: Instant) -> bool {
107+
pub fn is_expired(&self, now: Instant) -> bool {
108108
self.expires.map_or(false, |t| now >= t)
109109
}
110110
}

src/protocol/libp2p/kademlia/store.rs

Lines changed: 160 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
#![allow(unused)]
2424
use crate::protocol::libp2p::kademlia::record::{Key, Record};
2525

26-
use std::collections::HashMap;
26+
use std::collections::{hash_map::Entry, HashMap};
2727

2828
/// Memory store events.
2929
pub enum MemoryStoreEvent {}
@@ -32,28 +32,184 @@ pub enum MemoryStoreEvent {}
3232
pub struct MemoryStore {
3333
/// Records.
3434
records: HashMap<Key, Record>,
35+
/// Configuration.
36+
config: MemoryStoreConfig,
3537
}
3638

3739
impl MemoryStore {
3840
/// Create new [`MemoryStore`].
3941
pub fn new() -> Self {
4042
Self {
4143
records: HashMap::new(),
44+
config: MemoryStoreConfig::default(),
45+
}
46+
}
47+
48+
/// Create new [`MemoryStore`] with the provided configuration.
49+
pub fn with_config(config: MemoryStoreConfig) -> Self {
50+
Self {
51+
records: HashMap::new(),
52+
config,
4253
}
4354
}
4455

4556
/// Try to get record from local store for `key`.
46-
pub fn get(&self, key: &Key) -> Option<&Record> {
47-
self.records.get(key)
57+
pub fn get(&mut self, key: &Key) -> Option<&Record> {
58+
let is_expired = self
59+
.records
60+
.get(key)
61+
.map_or(false, |record| record.is_expired(std::time::Instant::now()));
62+
63+
if is_expired {
64+
self.records.remove(key);
65+
None
66+
} else {
67+
self.records.get(key)
68+
}
4869
}
4970

5071
/// Store record.
5172
pub fn put(&mut self, record: Record) {
52-
self.records.insert(record.key.clone(), record);
73+
if record.value.len() >= self.config.max_record_size_bytes {
74+
return;
75+
}
76+
77+
let len = self.records.len();
78+
match self.records.entry(record.key.clone()) {
79+
Entry::Occupied(mut entry) => {
80+
// Lean towards the new record.
81+
if let (Some(stored_record_ttl), Some(new_record_ttl)) =
82+
(entry.get().expires, record.expires)
83+
{
84+
if stored_record_ttl > new_record_ttl {
85+
return;
86+
}
87+
}
88+
89+
entry.insert(record);
90+
}
91+
92+
Entry::Vacant(entry) => {
93+
if len >= self.config.max_records {
94+
return;
95+
}
96+
97+
entry.insert(record);
98+
}
99+
}
53100
}
54101

55102
/// Poll next event from the store.
56103
async fn next_event() -> Option<MemoryStoreEvent> {
57104
None
58105
}
59106
}
107+
108+
pub struct MemoryStoreConfig {
109+
/// Maximum number of records to store.
110+
pub max_records: usize,
111+
112+
/// Maximum size of a record in bytes.
113+
pub max_record_size_bytes: usize,
114+
}
115+
116+
impl Default for MemoryStoreConfig {
117+
fn default() -> Self {
118+
Self {
119+
max_records: 1024,
120+
max_record_size_bytes: 65 * 1024,
121+
}
122+
}
123+
}
124+
125+
#[cfg(test)]
126+
mod tests {
127+
use super::*;
128+
129+
#[test]
130+
fn test_memory_store() {
131+
let mut store = MemoryStore::new();
132+
let key = Key::from(vec![1, 2, 3]);
133+
let record = Record::new(key.clone(), vec![4, 5, 6]);
134+
135+
store.put(record.clone());
136+
assert_eq!(store.get(&key), Some(&record));
137+
}
138+
139+
#[test]
140+
fn test_memory_store_length() {
141+
let mut store = MemoryStore::with_config(MemoryStoreConfig {
142+
max_records: 1,
143+
max_record_size_bytes: 1024,
144+
});
145+
146+
let key1 = Key::from(vec![1, 2, 3]);
147+
let key2 = Key::from(vec![4, 5, 6]);
148+
let record1 = Record::new(key1.clone(), vec![4, 5, 6]);
149+
let record2 = Record::new(key2.clone(), vec![7, 8, 9]);
150+
151+
store.put(record1.clone());
152+
store.put(record2.clone());
153+
154+
assert_eq!(store.get(&key1), Some(&record1));
155+
assert_eq!(store.get(&key2), None);
156+
}
157+
158+
#[test]
159+
fn test_memory_store_remove_old_records() {
160+
let mut store = MemoryStore::new();
161+
let key = Key::from(vec![1, 2, 3]);
162+
let record = Record {
163+
key: key.clone(),
164+
value: vec![4, 5, 6],
165+
publisher: None,
166+
expires: Some(std::time::Instant::now() - std::time::Duration::from_secs(5)),
167+
};
168+
// Record is already expired.
169+
assert!(record.is_expired(std::time::Instant::now()));
170+
171+
store.put(record.clone());
172+
assert_eq!(store.get(&key), None);
173+
}
174+
175+
#[test]
176+
fn test_memory_store_replace_new_records() {
177+
let mut store = MemoryStore::new();
178+
let key = Key::from(vec![1, 2, 3]);
179+
let record1 = Record {
180+
key: key.clone(),
181+
value: vec![4, 5, 6],
182+
publisher: None,
183+
expires: Some(std::time::Instant::now() + std::time::Duration::from_secs(100)),
184+
};
185+
let record2 = Record {
186+
key: key.clone(),
187+
value: vec![4, 5, 6],
188+
publisher: None,
189+
expires: Some(std::time::Instant::now() + std::time::Duration::from_secs(1000)),
190+
};
191+
192+
store.put(record1.clone());
193+
assert_eq!(store.get(&key), Some(&record1));
194+
195+
store.put(record2.clone());
196+
assert_eq!(store.get(&key), Some(&record2));
197+
}
198+
199+
#[test]
200+
fn test_memory_store_max_record_size() {
201+
let mut store = MemoryStore::with_config(MemoryStoreConfig {
202+
max_records: 1024,
203+
max_record_size_bytes: 2,
204+
});
205+
206+
let key = Key::from(vec![1, 2, 3]);
207+
let record = Record::new(key.clone(), vec![4, 5]);
208+
store.put(record.clone());
209+
assert_eq!(store.get(&key), None);
210+
211+
let record = Record::new(key.clone(), vec![4]);
212+
store.put(record.clone());
213+
assert_eq!(store.get(&key), Some(&record));
214+
}
215+
}

0 commit comments

Comments
 (0)