Skip to content

Commit dc5c8e1

Browse files
authored
Merge da28be2 into d413c55
2 parents d413c55 + da28be2 commit dc5c8e1

4 files changed

Lines changed: 80 additions & 53 deletions

File tree

crates/common/trie/db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub trait TrieDB: Send + Sync {
3333
/// InMemory implementation for the TrieDB trait, with get and put operations.
3434
#[derive(Default)]
3535
pub struct InMemoryTrieDB {
36-
inner: NodeMap,
36+
pub inner: NodeMap,
3737
prefix: Option<Nibbles>,
3838
}
3939

crates/storage/store_db/in_memory.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::{
1515
collections::HashMap,
1616
fmt::Debug,
1717
path::Path,
18-
sync::{Arc, Mutex, MutexGuard},
18+
sync::{Arc, Mutex, MutexGuard, RwLock},
1919
};
2020

2121
// NOTE: we use a different commit threshold than rocksdb since tests
@@ -39,7 +39,7 @@ pub struct StoreInner {
3939
// Maps transaction hashes to their blocks (height+hash) and index within the blocks.
4040
transaction_locations: HashMap<H256, Vec<(BlockNumber, BlockHash, Index)>>,
4141
receipts: HashMap<BlockHash, HashMap<Index, Receipt>>,
42-
trie_cache: Arc<TrieLayerCache>,
42+
trie_cache: Arc<RwLock<TrieLayerCache>>,
4343
// Contains account trie nodes
4444
state_trie_nodes: NodeMap,
4545
pending_blocks: HashMap<BlockHash, Block>,
@@ -91,7 +91,6 @@ impl StoreEngine for Store {
9191
let mut store = self.inner()?;
9292

9393
// Store trie updates
94-
let mut trie = TrieLayerCache::clone(&store.trie_cache);
9594
let parent = update_batch
9695
.blocks
9796
.first()
@@ -117,9 +116,18 @@ impl StoreEngine for Store {
117116
.state_trie_nodes
118117
.lock()
119118
.map_err(|_| StoreError::LockError)?;
120-
121-
if let Some(root) = trie.get_commitable(pre_state_root, COMMIT_THRESHOLD) {
122-
let nodes = trie.commit(root).unwrap_or_default();
119+
if let Some(root) = store
120+
.trie_cache
121+
.read()
122+
.map_err(|_| StoreError::LockError)?
123+
.get_commitable(pre_state_root, COMMIT_THRESHOLD)
124+
{
125+
let nodes = store
126+
.trie_cache
127+
.write()
128+
.map_err(|_| StoreError::LockError)?
129+
.commit(root)
130+
.unwrap_or_default();
123131
for (key, value) in nodes {
124132
if value.is_empty() {
125133
state_trie.remove(&key);
@@ -140,8 +148,11 @@ impl StoreEngine for Store {
140148
})
141149
.chain(update_batch.account_updates)
142150
.collect();
143-
trie.put_batch(pre_state_root, last_state_root, key_values);
144-
store.trie_cache = Arc::new(trie);
151+
store
152+
.trie_cache
153+
.write()
154+
.map_err(|_| StoreError::LockError)?
155+
.put_batch(pre_state_root, last_state_root, key_values);
145156

146157
for block in update_batch.blocks {
147158
// store block

crates/storage/store_db/rocksdb.rs

Lines changed: 23 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::{
2222
collections::HashSet,
2323
path::Path,
2424
sync::{
25-
Arc, Mutex,
25+
Arc, Mutex, RwLock,
2626
mpsc::{SyncSender, sync_channel},
2727
},
2828
};
@@ -139,7 +139,7 @@ enum FKVGeneratorControlMessage {
139139
#[derive(Debug, Clone)]
140140
pub struct Store {
141141
db: Arc<DBWithThreadMode<MultiThreaded>>,
142-
trie_cache: Arc<Mutex<Arc<TrieLayerCache>>>,
142+
trie_cache: Arc<RwLock<TrieLayerCache>>,
143143
flatkeyvalue_control_tx: std::sync::mpsc::SyncSender<FKVGeneratorControlMessage>,
144144
trie_update_worker_tx: TriedUpdateWorkerTx,
145145
last_computed_flatkeyvalue: Arc<Mutex<Vec<u8>>>,
@@ -706,7 +706,6 @@ impl Store {
706706
) -> Result<(), StoreError> {
707707
let db = &*self.db;
708708
let fkv_ctl = &self.flatkeyvalue_control_tx;
709-
let trie_cache = &self.trie_cache;
710709

711710
// Phase 1: update the in-memory diff-layers only, then notify block production.
712711
let new_layer = storage_updates
@@ -718,36 +717,40 @@ impl Store {
718717
})
719718
.chain(account_updates)
720719
.collect();
721-
// Read-Copy-Update the trie cache with a new layer.
722-
let trie = trie_cache
723-
.lock()
720+
// Update trie cache with a new layer under write lock.
721+
self.trie_cache
722+
.write()
724723
.map_err(|_| StoreError::LockError)?
725-
.clone();
726-
let mut trie_mut = (*trie).clone();
727-
trie_mut.put_batch(parent_state_root, child_state_root, new_layer);
728-
let trie = Arc::new(trie_mut);
729-
*trie_cache.lock().map_err(|_| StoreError::LockError)? = trie.clone();
724+
.put_batch(parent_state_root, child_state_root, new_layer);
730725
// Update finished, signal block processing.
731726
notify.send(Ok(())).map_err(|_| StoreError::LockError)?;
732727

733728
// Phase 2: update disk layer.
734-
let Some(root) = trie.get_commitable(parent_state_root, COMMIT_THRESHOLD) else {
729+
let Some(root) = self
730+
.trie_cache
731+
.read()
732+
.map_err(|_| StoreError::LockError)?
733+
.get_commitable(parent_state_root, COMMIT_THRESHOLD)
734+
else {
735735
// Nothing to commit to disk, move on.
736736
return Ok(());
737737
};
738738
// Stop the flat-key-value generator thread, as the underlying trie is about to change.
739739
// Ignore the error, if the channel is closed it means there is no worker to notify.
740740
let _ = fkv_ctl.send(FKVGeneratorControlMessage::Stop);
741741

742-
// RCU to remove the bottom layer: update step needs to happen after disk layer is updated.
743-
let mut trie_mut = (*trie).clone();
744742
let mut batch = WriteBatch::default();
745743
let [cf_trie_nodes, cf_flatkeyvalue, cf_misc] =
746744
open_cfs(db, [CF_TRIE_NODES, CF_FLATKEYVALUE, CF_MISC_VALUES])?;
747745

748746
let last_written = db.get_cf(&cf_misc, "last_written")?.unwrap_or_default();
749747
// Commit removes the bottom layer and returns it, this is the mutation step.
750-
let nodes = trie_mut.commit(root).unwrap_or_default();
748+
let nodes = self
749+
.trie_cache
750+
.write()
751+
.map_err(|_| StoreError::LockError)?
752+
.commit(root)
753+
.unwrap_or_default();
751754
for (key, value) in nodes {
752755
let is_leaf = key.len() == 65 || key.len() == 131;
753756

@@ -769,8 +772,7 @@ impl Store {
769772
// We want to send this message even if there was an error during the batch write
770773
let _ = fkv_ctl.send(FKVGeneratorControlMessage::Continue);
771774
result?;
772-
// Phase 3: update diff layers with the removal of bottom layer.
773-
*trie_cache.lock().map_err(|_| StoreError::LockError)? = Arc::new(trie_mut);
775+
// Phase 3: diff layers already updated by commit mutation.
774776
Ok(())
775777
}
776778

@@ -1478,11 +1480,7 @@ impl StoreEngine for Store {
14781480
)?);
14791481
let wrap_db = Box::new(TrieWrapper {
14801482
state_root,
1481-
inner: self
1482-
.trie_cache
1483-
.lock()
1484-
.map_err(|_| StoreError::LockError)?
1485-
.clone(),
1483+
inner: self.trie_cache.clone(),
14861484
db,
14871485
prefix: Some(hashed_address),
14881486
});
@@ -1499,11 +1497,7 @@ impl StoreEngine for Store {
14991497
)?);
15001498
let wrap_db = Box::new(TrieWrapper {
15011499
state_root,
1502-
inner: self
1503-
.trie_cache
1504-
.lock()
1505-
.map_err(|_| StoreError::LockError)?
1506-
.clone(),
1500+
inner: self.trie_cache.clone(),
15071501
db,
15081502
prefix: None,
15091503
});
@@ -1543,11 +1537,7 @@ impl StoreEngine for Store {
15431537
)?);
15441538
let wrap_db = Box::new(TrieWrapper {
15451539
state_root,
1546-
inner: self
1547-
.trie_cache
1548-
.lock()
1549-
.map_err(|_| StoreError::LockError)?
1550-
.clone(),
1540+
inner: self.trie_cache.clone(),
15511541
db,
15521542
prefix: None,
15531543
});
@@ -1568,11 +1558,7 @@ impl StoreEngine for Store {
15681558
)?);
15691559
let wrap_db = Box::new(TrieWrapper {
15701560
state_root,
1571-
inner: self
1572-
.trie_cache
1573-
.lock()
1574-
.map_err(|_| StoreError::LockError)?
1575-
.clone(),
1561+
inner: self.trie_cache.clone(),
15761562
db,
15771563
prefix: Some(hashed_address),
15781564
});

crates/storage/trie_db/layering.rs

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
use ethrex_common::H256;
1+
use ethrex_common::{H256, utils::keccak};
22
use rayon::iter::{ParallelBridge, ParallelIterator};
33
use rustc_hash::FxHashMap;
4-
use std::sync::Arc;
4+
use std::sync::{Arc, RwLock};
55

66
use ethrex_trie::{Nibbles, TrieDB, TrieError};
77

@@ -198,7 +198,7 @@ impl TrieLayerCache {
198198

199199
pub struct TrieWrapper {
200200
pub state_root: H256,
201-
pub inner: Arc<TrieLayerCache>,
201+
pub inner: Arc<RwLock<TrieLayerCache>>,
202202
pub db: Box<dyn TrieDB>,
203203
pub prefix: Option<H256>,
204204
}
@@ -221,14 +221,44 @@ impl TrieDB for TrieWrapper {
221221
}
222222
fn get(&self, key: Nibbles) -> Result<Option<Vec<u8>>, TrieError> {
223223
let key = apply_prefix(self.prefix, key);
224-
if let Some(value) = self.inner.get(self.state_root, key.clone()) {
224+
if let Some(value) = self
225+
.inner
226+
.read()
227+
.map_err(|_| TrieError::LockError)?
228+
.get(self.state_root, key.clone())
229+
{
225230
return Ok(Some(value));
226231
}
227232
self.db.get(key)
228233
}
229234

230-
fn put_batch(&self, _key_values: Vec<(Nibbles, Vec<u8>)>) -> Result<(), TrieError> {
231-
// TODO: Get rid of this.
232-
unimplemented!("This function should not be called");
235+
// Warning, use this carefully as it's not battle tested. It's useful for manipulating the trie though.
236+
fn put_batch(&self, mut key_values: Vec<(Nibbles, Vec<u8>)>) -> Result<(), TrieError> {
237+
// No-op for empty batches. This occurs when committing without logical changes.
238+
if key_values.is_empty() {
239+
return Ok(());
240+
}
241+
242+
// State root is the hash of the node that doesn't have nibbles in path.
243+
// If state root is not in the batch I believe it should be invalid.
244+
let state_root = key_values
245+
.iter()
246+
.find(|(key, _)| *key == Nibbles::default())
247+
.map(|(_, value)| keccak(value))
248+
.ok_or(TrieError::InvalidInput)?;
249+
250+
// Apply prefix to every key
251+
for (key, _value) in &mut key_values {
252+
*key = apply_prefix(self.prefix, key.clone());
253+
}
254+
255+
let parent = self.state_root;
256+
257+
self.inner
258+
.write()
259+
.map_err(|_| TrieError::LockError)?
260+
.put_batch(parent, state_root, key_values);
261+
262+
Ok(())
233263
}
234264
}

0 commit comments

Comments
 (0)