Skip to content
4 changes: 4 additions & 0 deletions crates/common/trie/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ impl InMemoryTrieDB {
None => path,
}
}

pub fn inner(&self) -> NodeMap {
Arc::clone(&self.inner)
}
}

impl TrieDB for InMemoryTrieDB {
Expand Down
31 changes: 21 additions & 10 deletions crates/storage/store_db/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{
collections::HashMap,
fmt::Debug,
path::Path,
sync::{Arc, Mutex, MutexGuard},
sync::{Arc, Mutex, MutexGuard, RwLock},
};

// NOTE: we use a different commit threshold than rocksdb since tests
Expand All @@ -39,7 +39,7 @@ pub struct StoreInner {
// Maps transaction hashes to their blocks (height+hash) and index within the blocks.
transaction_locations: HashMap<H256, Vec<(BlockNumber, BlockHash, Index)>>,
receipts: HashMap<BlockHash, HashMap<Index, Receipt>>,
trie_cache: Arc<TrieLayerCache>,
trie_cache: Arc<RwLock<TrieLayerCache>>,
// Contains account trie nodes
state_trie_nodes: NodeMap,
pending_blocks: HashMap<BlockHash, Block>,
Expand Down Expand Up @@ -91,7 +91,6 @@ impl StoreEngine for Store {
let mut store = self.inner()?;

// Store trie updates
let mut trie = TrieLayerCache::clone(&store.trie_cache);
let parent = update_batch
.blocks
.first()
Expand All @@ -117,9 +116,18 @@ impl StoreEngine for Store {
.state_trie_nodes
.lock()
.map_err(|_| StoreError::LockError)?;

if let Some(root) = trie.get_commitable(pre_state_root, COMMIT_THRESHOLD) {
let nodes = trie.commit(root).unwrap_or_default();
if let Some(root) = store
.trie_cache
.read()
.map_err(|_| StoreError::LockError)?
.get_commitable(pre_state_root, COMMIT_THRESHOLD)
{
let nodes = store
.trie_cache
.write()
.map_err(|_| StoreError::LockError)?
.commit(root)
.unwrap_or_default();
for (key, value) in nodes {
if value.is_empty() {
state_trie.remove(&key);
Expand All @@ -140,8 +148,11 @@ impl StoreEngine for Store {
})
.chain(update_batch.account_updates)
.collect();
trie.put_batch(pre_state_root, last_state_root, key_values);
store.trie_cache = Arc::new(trie);
store
.trie_cache
.write()
.map_err(|_| StoreError::LockError)?
.put_batch(pre_state_root, last_state_root, key_values);

for block in update_batch.blocks {
// store block
Expand Down Expand Up @@ -439,7 +450,7 @@ impl StoreEngine for Store {
let trie_backend = store.state_trie_nodes.clone();
let db = Box::new(InMemoryTrieDB::new(trie_backend));
let wrap_db = Box::new(TrieWrapper {
state_root,
state_root: RwLock::new(state_root),
inner: store.trie_cache.clone(),
db,
prefix: Some(hashed_address),
Expand All @@ -452,7 +463,7 @@ impl StoreEngine for Store {
let trie_backend = store.state_trie_nodes.clone();
let db = Box::new(InMemoryTrieDB::new(trie_backend));
let wrap_db = Box::new(TrieWrapper {
state_root,
state_root: RwLock::new(state_root),
inner: store.trie_cache.clone(),
db,
prefix: None,
Expand Down
68 changes: 27 additions & 41 deletions crates/storage/store_db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{
collections::HashSet,
path::Path,
sync::{
Arc, Mutex,
Arc, Mutex, RwLock,
mpsc::{SyncSender, sync_channel},
},
};
Expand Down Expand Up @@ -139,7 +139,7 @@ enum FKVGeneratorControlMessage {
#[derive(Debug, Clone)]
pub struct Store {
db: Arc<DBWithThreadMode<MultiThreaded>>,
trie_cache: Arc<Mutex<Arc<TrieLayerCache>>>,
trie_cache: Arc<RwLock<TrieLayerCache>>,
flatkeyvalue_control_tx: std::sync::mpsc::SyncSender<FKVGeneratorControlMessage>,
trie_update_worker_tx: TriedUpdateWorkerTx,
last_computed_flatkeyvalue: Arc<Mutex<Vec<u8>>>,
Expand Down Expand Up @@ -706,7 +706,6 @@ impl Store {
) -> Result<(), StoreError> {
let db = &*self.db;
let fkv_ctl = &self.flatkeyvalue_control_tx;
let trie_cache = &self.trie_cache;

// Phase 1: update the in-memory diff-layers only, then notify block production.
let new_layer = storage_updates
Expand All @@ -718,36 +717,40 @@ impl Store {
})
.chain(account_updates)
.collect();
// Read-Copy-Update the trie cache with a new layer.
let trie = trie_cache
.lock()
// Update trie cache with a new layer under write lock.
self.trie_cache
.write()
.map_err(|_| StoreError::LockError)?
.clone();
let mut trie_mut = (*trie).clone();
trie_mut.put_batch(parent_state_root, child_state_root, new_layer);
let trie = Arc::new(trie_mut);
*trie_cache.lock().map_err(|_| StoreError::LockError)? = trie.clone();
.put_batch(parent_state_root, child_state_root, new_layer);
// Update finished, signal block processing.
notify.send(Ok(())).map_err(|_| StoreError::LockError)?;

// Phase 2: update disk layer.
let Some(root) = trie.get_commitable(parent_state_root, COMMIT_THRESHOLD) else {
let Some(root) = self
.trie_cache
.read()
.map_err(|_| StoreError::LockError)?
.get_commitable(parent_state_root, COMMIT_THRESHOLD)
else {
// Nothing to commit to disk, move on.
return Ok(());
};
// Stop the flat-key-value generator thread, as the underlying trie is about to change.
// Ignore the error, if the channel is closed it means there is no worker to notify.
let _ = fkv_ctl.send(FKVGeneratorControlMessage::Stop);

// RCU to remove the bottom layer: update step needs to happen after disk layer is updated.
let mut trie_mut = (*trie).clone();
let mut batch = WriteBatch::default();
let [cf_trie_nodes, cf_flatkeyvalue, cf_misc] =
open_cfs(db, [CF_TRIE_NODES, CF_FLATKEYVALUE, CF_MISC_VALUES])?;

let last_written = db.get_cf(&cf_misc, "last_written")?.unwrap_or_default();
// Commit removes the bottom layer and returns it, this is the mutation step.
let nodes = trie_mut.commit(root).unwrap_or_default();
let nodes = self
.trie_cache
.write()
.map_err(|_| StoreError::LockError)?
.commit(root)
.unwrap_or_default();
for (key, value) in nodes {
let is_leaf = key.len() == 65 || key.len() == 131;

Expand All @@ -769,8 +772,7 @@ impl Store {
// We want to send this message even if there was an error during the batch write
let _ = fkv_ctl.send(FKVGeneratorControlMessage::Continue);
result?;
// Phase 3: update diff layers with the removal of bottom layer.
*trie_cache.lock().map_err(|_| StoreError::LockError)? = Arc::new(trie_mut);
// Phase 3: diff layers already updated by commit mutation.
Ok(())
}

Expand Down Expand Up @@ -1477,12 +1479,8 @@ impl StoreEngine for Store {
self.last_written()?,
)?);
let wrap_db = Box::new(TrieWrapper {
state_root,
inner: self
.trie_cache
.lock()
.map_err(|_| StoreError::LockError)?
.clone(),
state_root: RwLock::new(state_root),
inner: self.trie_cache.clone(),
db,
prefix: Some(hashed_address),
});
Expand All @@ -1498,12 +1496,8 @@ impl StoreEngine for Store {
self.last_written()?,
)?);
let wrap_db = Box::new(TrieWrapper {
state_root,
inner: self
.trie_cache
.lock()
.map_err(|_| StoreError::LockError)?
.clone(),
state_root: RwLock::new(state_root),
inner: self.trie_cache.clone(),
db,
prefix: None,
});
Expand Down Expand Up @@ -1542,12 +1536,8 @@ impl StoreEngine for Store {
self.last_written()?,
)?);
let wrap_db = Box::new(TrieWrapper {
state_root,
inner: self
.trie_cache
.lock()
.map_err(|_| StoreError::LockError)?
.clone(),
state_root: RwLock::new(state_root),
inner: self.trie_cache.clone(),
db,
prefix: None,
});
Expand All @@ -1567,12 +1557,8 @@ impl StoreEngine for Store {
self.last_written()?,
)?);
let wrap_db = Box::new(TrieWrapper {
state_root,
inner: self
.trie_cache
.lock()
.map_err(|_| StoreError::LockError)?
.clone(),
state_root: RwLock::new(state_root),
inner: self.trie_cache.clone(),
db,
prefix: Some(hashed_address),
});
Expand Down
53 changes: 45 additions & 8 deletions crates/storage/trie_db/layering.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use ethrex_common::H256;
use ethrex_common::{H256, utils::keccak};
use rayon::iter::{ParallelBridge, ParallelIterator};
use rustc_hash::FxHashMap;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use ethrex_trie::{Nibbles, TrieDB, TrieError};

Expand Down Expand Up @@ -197,8 +197,9 @@ impl TrieLayerCache {
}

pub struct TrieWrapper {
pub state_root: H256,
pub inner: Arc<TrieLayerCache>,
// RwLock because we should update state root if we put_batch
pub state_root: RwLock<H256>,
pub inner: Arc<RwLock<TrieLayerCache>>,
pub db: Box<dyn TrieDB>,
pub prefix: Option<H256>,
}
Expand All @@ -221,14 +222,50 @@ impl TrieDB for TrieWrapper {
}
fn get(&self, key: Nibbles) -> Result<Option<Vec<u8>>, TrieError> {
let key = apply_prefix(self.prefix, key);
if let Some(value) = self.inner.get(self.state_root, key.clone()) {
let state_root = *self.state_root.read().map_err(|_| TrieError::LockError)?;
if let Some(value) = self
.inner
.read()
.map_err(|_| TrieError::LockError)?
.get(state_root, key.clone())
{
return Ok(Some(value));
}
self.db.get(key)
}

fn put_batch(&self, _key_values: Vec<(Nibbles, Vec<u8>)>) -> Result<(), TrieError> {
// TODO: Get rid of this.
unimplemented!("This function should not be called");
// Warning, use this carefully as it's not battle tested. It's useful for manipulating the trie though.
fn put_batch(&self, mut key_values: Vec<(Nibbles, Vec<u8>)>) -> Result<(), TrieError> {
// No-op for empty batches. This occurs when committing without logical changes.
if key_values.is_empty() {
return Ok(());
}

// State root is the hash of the node that doesn't have nibbles in path.
// If state root is not in the batch I believe it should be invalid.
let new_state_root = key_values
.iter()
.find(|(key, _)| *key == Nibbles::default())
.map(|(_, value)| keccak(value))
.ok_or(TrieError::InvalidInput)?;

// Apply prefix to every key
for (key, _value) in &mut key_values {
*key = apply_prefix(self.prefix, key.clone());
}

let parent = {
let guard = self.state_root.read().map_err(|_| TrieError::LockError)?;
*guard
};

self.inner
.write()
.map_err(|_| TrieError::LockError)?
.put_batch(parent, new_state_root, key_values);

*self.state_root.write().map_err(|_| TrieError::LockError)? = new_state_root;

Ok(())
}
}
Loading