diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index 6fa3baf8901..a269688b856 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -418,6 +418,8 @@ pub async fn init_l1( }, ); + regenerate_head_state(&store, &blockchain).await?; + let signer = get_signer(datadir); let local_p2p_node = get_local_p2p_node(&opts, &signer); @@ -484,3 +486,43 @@ pub async fn init_l1( local_node_record, )) } + +async fn regenerate_head_state(store: &Store, blockchain: &Arc) -> eyre::Result<()> { + let head_block_number = store.get_latest_block_number().await?; + let Some(last_header) = store.get_block_header(head_block_number)? else { + unreachable!("Database is empty, genesis block should be present"); + }; + + let mut current_last_header = last_header; + + while !store.has_state_root(current_last_header.state_root)? { + let parent_number = current_last_header.number - 1; + debug!("Need to regenerate state for block {parent_number}"); + let Some(parent_header) = store.get_block_header(parent_number)? else { + return Err(eyre::eyre!( + "Parent header for block {parent_number} not found" + )); + }; + current_last_header = parent_header; + } + + let last_state_number = current_last_header.number; + + if last_state_number == head_block_number { + debug!("State is already up to date"); + return Ok(()); + } + info!("Regenerating state from block {last_state_number} to {head_block_number}"); + + for i in (last_state_number + 1)..=head_block_number { + debug!("Re-applying block {i} to regenerate state"); + + let block = store + .get_block_by_number(i) + .await? + .ok_or_else(|| eyre::eyre!("Block {i} not found"))?; + blockchain.add_block(block).await?; + } + info!("Finished regenerating state"); + Ok(()) +} diff --git a/cmd/ethrex/l2/command.rs b/cmd/ethrex/l2/command.rs index bc9a61d31b3..00a6a40ffa8 100644 --- a/cmd/ethrex/l2/command.rs +++ b/cmd/ethrex/l2/command.rs @@ -20,7 +20,7 @@ use ethrex_l2_sdk::call_contract; use ethrex_rpc::{ EthClient, clients::beacon::BeaconClient, types::block_identifier::BlockIdentifier, }; -use ethrex_storage::{EngineType, Store, UpdateBatch}; +use ethrex_storage::{EngineType, Store}; use ethrex_storage_rollup::StoreRollup; use eyre::OptionExt; use itertools::Itertools; @@ -405,11 +405,8 @@ impl Command { // Get genesis let genesis_header = store.get_block_header(0)?.expect("Genesis block not found"); - let genesis_block_hash = genesis_header.hash(); - let mut new_trie = store - .state_trie(genesis_block_hash)? - .expect("Genesis block not found"); + let mut current_state_root = genesis_header.state_root; let mut last_block_number = 0; let mut new_canonical_blocks = vec![]; @@ -433,37 +430,30 @@ impl Command { let state_diff = StateDiff::decode(&blob)?; // Apply all account updates to trie - let account_updates = state_diff.to_account_updates(&new_trie)?; + let trie = store.open_direct_state_trie(current_state_root)?; + + let account_updates = state_diff.to_account_updates(&trie)?; + let account_updates_list = store - .apply_account_updates_from_trie_batch(new_trie, account_updates.values()) + .apply_account_updates_from_trie_batch(trie, account_updates.values()) .await .map_err(|e| format!("Error applying account updates: {e}")) .unwrap(); - let (new_state_root, state_updates, accounts_updates) = ( - account_updates_list.state_trie_hash, - account_updates_list.state_updates, - account_updates_list.storage_updates, - ); + store + .open_direct_state_trie(current_state_root)? + .db() + .put_batch(account_updates_list.state_updates)?; - let pseudo_update_batch = UpdateBatch { - account_updates: state_updates, - storage_updates: accounts_updates, - blocks: vec![], - receipts: vec![], - code_updates: vec![], - }; + current_state_root = account_updates_list.state_trie_hash; store - .store_block_updates(pseudo_update_batch) - .await - .map_err(|e| format!("Error storing trie updates: {e}")) - .unwrap(); + .write_storage_trie_nodes_batch(account_updates_list.storage_updates) + .await?; - new_trie = store - .open_state_trie(new_state_root) - .map_err(|e| format!("Error opening new state trie: {e}")) - .unwrap(); + store + .write_account_code_batch(account_updates_list.code_updates) + .await?; // Get withdrawal hashes let message_hashes = state_diff @@ -479,10 +469,7 @@ impl Command { // Note that its state_root is the root of new_trie. let new_block = BlockHeader { coinbase, - state_root: new_trie - .hash() - .map_err(|e| format!("Error committing state: {e}")) - .unwrap(), + state_root: account_updates_list.state_trie_hash, ..state_diff.last_header }; diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 8801344ecdb..a313e574699 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -416,7 +416,6 @@ impl Blockchain { }; self.storage - .clone() .store_block_updates(update_batch) .await .map_err(|e| e.into()) diff --git a/crates/blockchain/smoke_test.rs b/crates/blockchain/smoke_test.rs index 5b3732849df..d2a2fb42094 100644 --- a/crates/blockchain/smoke_test.rs +++ b/crates/blockchain/smoke_test.rs @@ -310,7 +310,7 @@ mod blockchain_integration_test { }; // Create blockchain - let blockchain = Blockchain::default_with_store(store.clone().clone()); + let blockchain = Blockchain::default_with_store(store.clone()); let block = create_payload(&args, store, Bytes::new()).unwrap(); let result = blockchain.build_payload(block).await.unwrap(); diff --git a/crates/blockchain/tracing.rs b/crates/blockchain/tracing.rs index b10e94533d7..445b98ffe84 100644 --- a/crates/blockchain/tracing.rs +++ b/crates/blockchain/tracing.rs @@ -135,7 +135,7 @@ async fn get_missing_state_parents( let Some(parent_block) = store.get_block_by_hash(parent_hash).await? else { return Err(ChainError::Custom("Parent Block not Found".to_string())); }; - if store.contains_state_node(parent_block.header.state_root)? { + if store.has_state_root(parent_block.header.state_root)? { break; } parent_hash = parent_block.header.parent_hash; diff --git a/crates/common/trie/db.rs b/crates/common/trie/db.rs index 0de923571e8..127d5b8585d 100644 --- a/crates/common/trie/db.rs +++ b/crates/common/trie/db.rs @@ -1,25 +1,28 @@ use ethereum_types::H256; use ethrex_rlp::encode::RLPEncode; -use crate::{Node, NodeHash, NodeRLP, Trie, error::TrieError}; +use crate::{Nibbles, Node, NodeRLP, Trie, error::TrieError}; use std::{ collections::BTreeMap, sync::{Arc, Mutex}, }; +// Nibbles -> encoded node +pub type NodeMap = Arc, Vec>>>; + pub trait TrieDB: Send + Sync { - fn get(&self, key: NodeHash) -> Result>, TrieError>; - fn put_batch(&self, key_values: Vec<(NodeHash, Vec)>) -> Result<(), TrieError>; + fn get(&self, key: Nibbles) -> Result>, TrieError>; + fn put_batch(&self, key_values: Vec<(Nibbles, Vec)>) -> Result<(), TrieError>; // TODO: replace putbatch with this function. - fn put_batch_no_alloc(&self, key_values: &[(NodeHash, Node)]) -> Result<(), TrieError> { + fn put_batch_no_alloc(&self, key_values: &[(Nibbles, Node)]) -> Result<(), TrieError> { self.put_batch( key_values .iter() - .map(|node| (node.0, node.1.encode_to_vec())) + .map(|node| (node.0.clone(), node.1.encode_to_vec())) .collect(), ) } - fn put(&self, key: NodeHash, value: Vec) -> Result<(), TrieError> { + fn put(&self, key: Nibbles, value: Vec) -> Result<(), TrieError> { self.put_batch(vec![(key, value)]) } } @@ -27,16 +30,29 @@ pub trait TrieDB: Send + Sync { /// InMemory implementation for the TrieDB trait, with get and put operations. #[derive(Default)] pub struct InMemoryTrieDB { - pub inner: Arc>>>, + inner: NodeMap, + prefix: Option, } impl InMemoryTrieDB { - pub const fn new(map: Arc>>>) -> Self { - Self { inner: map } + pub const fn new(map: NodeMap) -> Self { + Self { + inner: map, + prefix: None, + } + } + + pub const fn new_with_prefix(map: NodeMap, prefix: Nibbles) -> Self { + Self { + inner: map, + prefix: Some(prefix), + } } + pub fn new_empty() -> Self { Self { inner: Default::default(), + prefix: None, } } @@ -45,33 +61,56 @@ impl InMemoryTrieDB { state_nodes: &BTreeMap, ) -> Result { let mut embedded_root = Trie::get_embedded_root(state_nodes, root_hash)?; - let mut hashed_nodes: Vec<(NodeHash, Vec)> = vec![]; - embedded_root.commit(&mut hashed_nodes); + let mut hashed_nodes = vec![]; + embedded_root.commit(Nibbles::default(), &mut hashed_nodes); - let hashed_nodes = hashed_nodes.into_iter().collect(); + let hashed_nodes = hashed_nodes + .into_iter() + .map(|(k, v)| (k.into_vec(), v)) + .collect(); let in_memory_trie = Arc::new(Mutex::new(hashed_nodes)); Ok(Self::new(in_memory_trie)) } + + fn apply_prefix(&self, path: Nibbles) -> Nibbles { + match &self.prefix { + Some(prefix) => prefix.concat(&path), + None => path, + } + } } impl TrieDB for InMemoryTrieDB { - fn get(&self, key: NodeHash) -> Result>, TrieError> { + fn get(&self, key: Nibbles) -> Result>, TrieError> { Ok(self .inner .lock() .map_err(|_| TrieError::LockError)? - .get(&key) + .get(self.apply_prefix(key).as_ref()) .cloned()) } - fn put_batch(&self, key_values: Vec<(NodeHash, Vec)>) -> Result<(), TrieError> { + fn put_batch(&self, key_values: Vec<(Nibbles, Vec)>) -> Result<(), TrieError> { let mut db = self.inner.lock().map_err(|_| TrieError::LockError)?; for (key, value) in key_values { - db.insert(key, value); + let prefixed_key = self.apply_prefix(key); + db.insert(prefixed_key.into_vec(), value); } Ok(()) } } + +pub fn nibbles_to_fixed_size(nibbles: Nibbles) -> [u8; 33] { + let node_hash_ref = nibbles.to_bytes(); + let original_len = node_hash_ref.len(); + + let mut buffer = [0u8; 33]; + + // Encode the node as [node_path..., original_len] + buffer[32] = nibbles.len() as u8; + buffer[..original_len].copy_from_slice(&node_hash_ref); + buffer +} diff --git a/crates/common/trie/logger.rs b/crates/common/trie/logger.rs index 0bfbf927140..e1473dd252f 100644 --- a/crates/common/trie/logger.rs +++ b/crates/common/trie/logger.rs @@ -5,7 +5,7 @@ use std::{ use ethrex_rlp::decode::RLPDecode; -use crate::{Node, NodeHash, Trie, TrieDB, TrieError}; +use crate::{Nibbles, Node, Trie, TrieDB, TrieError}; pub type TrieWitness = Arc>>>; @@ -33,7 +33,7 @@ impl TrieLogger { } impl TrieDB for TrieLogger { - fn get(&self, key: NodeHash) -> Result>, TrieError> { + fn get(&self, key: Nibbles) -> Result>, TrieError> { let result = self.inner_db.get(key)?; if let Some(result) = result.as_ref() && let Ok(decoded) = Node::decode(result) @@ -44,11 +44,11 @@ impl TrieDB for TrieLogger { Ok(result) } - fn put(&self, key: NodeHash, value: Vec) -> Result<(), TrieError> { + fn put(&self, key: Nibbles, value: Vec) -> Result<(), TrieError> { self.inner_db.put(key, value) } - fn put_batch(&self, key_values: Vec<(NodeHash, Vec)>) -> Result<(), TrieError> { + fn put_batch(&self, key_values: Vec<(Nibbles, Vec)>) -> Result<(), TrieError> { self.inner_db.put_batch(key_values) } } diff --git a/crates/common/trie/nibbles.rs b/crates/common/trie/nibbles.rs index 4ca0bff05b5..49b70fa45df 100644 --- a/crates/common/trie/nibbles.rs +++ b/crates/common/trie/nibbles.rs @@ -7,10 +7,37 @@ use ethrex_rlp::{ structs::{Decoder, Encoder}, }; +// TODO: move path-tracking logic somewhere else +// PERF: try using a stack-allocated array /// Struct representing a list of nibbles (half-bytes) -#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, Default)] pub struct Nibbles { - pub(crate) data: Vec, + data: Vec, + /// Parts of the path that have already been consumed (used for tracking + /// current position when visiting nodes). See `current()`. + already_consumed: Vec, +} + +// NOTE: custom impls to ignore the `already_consumed` field + +impl PartialEq for Nibbles { + fn eq(&self, other: &Nibbles) -> bool { + self.data == other.data + } +} + +impl Eq for Nibbles {} + +impl PartialOrd for Nibbles { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Nibbles { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.data.cmp(&other.data) + } } impl std::hash::Hash for Nibbles { @@ -22,7 +49,10 @@ impl std::hash::Hash for Nibbles { impl Nibbles { /// Create `Nibbles` from hex-encoded nibbles pub const fn from_hex(hex: Vec) -> Self { - Self { data: hex } + Self { + data: hex, + already_consumed: vec![], + } } /// Splits incoming bytes into nibbles and appends the leaf flag (a 16 nibble at the end) @@ -40,7 +70,14 @@ impl Nibbles { data.push(16); } - Self { data } + Self { + data, + already_consumed: vec![], + } + } + + pub fn into_vec(self) -> Vec { + self.data } /// Returns the amount of nibbles @@ -58,6 +95,7 @@ impl Nibbles { pub fn skip_prefix(&mut self, prefix: &Nibbles) -> bool { if self.len() >= prefix.len() && &self.data[..prefix.len()] == prefix.as_ref() { self.data = self.data[prefix.len()..].to_vec(); + self.already_consumed.extend(&prefix.data); true } else { false @@ -85,7 +123,10 @@ impl Nibbles { /// Removes and returns the first nibble #[allow(clippy::should_implement_trait)] pub fn next(&mut self) -> Option { - (!self.is_empty()).then(|| self.data.remove(0)) + (!self.is_empty()).then(|| { + self.already_consumed.push(self.data[0]); + self.data.remove(0) + }) } /// Removes and returns the first nibble if it is a suitable choice index (aka < 16) @@ -95,7 +136,9 @@ impl Nibbles { /// Returns the nibbles after the given offset pub fn offset(&self, offset: usize) -> Nibbles { - self.slice(offset, self.len()) + let mut ret = self.slice(offset, self.len()); + ret.already_consumed = [&self.already_consumed, &self.data[0..offset]].concat(); + ret } /// Returns the nibbles beween the start and end indexes @@ -187,9 +230,10 @@ impl Nibbles { } /// Concatenates self and another Nibbles returning a new Nibbles - pub fn concat(&self, other: Nibbles) -> Nibbles { + pub fn concat(&self, other: &Nibbles) -> Nibbles { Nibbles { - data: [self.data.clone(), other.data].concat(), + data: [&self.data[..], &other.data[..]].concat(), + already_consumed: self.already_consumed.clone(), } } @@ -197,6 +241,15 @@ impl Nibbles { pub fn append_new(&self, nibble: u8) -> Nibbles { Nibbles { data: [self.data.clone(), vec![nibble]].concat(), + already_consumed: self.already_consumed.clone(), + } + } + + /// Return already consumed parts of path + pub fn current(&self) -> Nibbles { + Nibbles { + data: self.already_consumed.clone(), + already_consumed: vec![], } } } @@ -217,7 +270,13 @@ impl RLPDecode for Nibbles { fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> { let decoder = Decoder::new(rlp)?; let (data, decoder) = decoder.decode_field("data")?; - Ok((Self { data }, decoder.finish()?)) + Ok(( + Self { + data, + already_consumed: vec![], + }, + decoder.finish()?, + )) } } diff --git a/crates/common/trie/node.rs b/crates/common/trie/node.rs index 8818c54f8a0..4edb1655375 100644 --- a/crates/common/trie/node.rs +++ b/crates/common/trie/node.rs @@ -31,15 +31,19 @@ pub enum NodeRef { } impl NodeRef { - pub fn get_node(&self, db: &dyn TrieDB) -> Result, TrieError> { + pub fn get_node(&self, db: &dyn TrieDB, path: Nibbles) -> Result, TrieError> { match *self { NodeRef::Node(ref node, _) => Ok(Some(node.as_ref().clone())), NodeRef::Hash(NodeHash::Inline((data, len))) => { Ok(Some(Node::decode_raw(&data[..len as usize])?)) } - NodeRef::Hash(hash @ NodeHash::Hashed(_)) => db - .get(hash)? - .map(|rlp| Node::decode(&rlp).map_err(TrieError::RLPDecode)) + NodeRef::Hash(hash) => db + .get(path)? + .filter(|rlp| !rlp.is_empty()) + .and_then(|rlp| match Node::decode(&rlp) { + Ok(node) => (node.compute_hash() == hash).then_some(Ok(node)), + Err(err) => Some(Err(TrieError::RLPDecode(err))), + }) .transpose(), } } @@ -51,25 +55,23 @@ impl NodeRef { } } - pub fn commit(&mut self, acc: &mut Vec<(NodeHash, Vec)>) -> NodeHash { + pub fn commit(&mut self, path: Nibbles, acc: &mut Vec<(Nibbles, Vec)>) -> NodeHash { match *self { NodeRef::Node(ref mut node, ref mut hash) => { match Arc::make_mut(node) { Node::Branch(node) => { - for node in &mut node.choices { - node.commit(acc); + for (choice, node) in &mut node.choices.iter_mut().enumerate() { + node.commit(path.append_new(choice as u8), acc); } } Node::Extension(node) => { - node.child.commit(acc); + node.child.commit(path.concat(&node.prefix), acc); } Node::Leaf(_) => {} } + let hash = *hash.get_or_init(|| node.compute_hash()); + acc.push((path.clone(), node.encode_to_vec())); - let hash = hash.get_or_init(|| node.compute_hash()); - acc.push((*hash, node.encode_to_vec())); - - let hash = *hash; *self = hash.into(); hash diff --git a/crates/common/trie/node/branch.rs b/crates/common/trie/node/branch.rs index edcf0da1746..270f7599642 100644 --- a/crates/common/trie/node/branch.rs +++ b/crates/common/trie/node/branch.rs @@ -44,7 +44,9 @@ impl BranchNode { // Delegate to children if present let child_ref = &self.choices[choice]; if child_ref.is_valid() { - let child_node = child_ref.get_node(db)?.ok_or(TrieError::InconsistentTree)?; + let child_node = child_ref + .get_node(db, path.current())? + .ok_or(TrieError::InconsistentTree)?; child_node.get(db, path) } else { Ok(None) @@ -74,7 +76,7 @@ impl BranchNode { // Insert into existing child and then update it (choice_ref, ValueOrHash::Value(value)) => { let child_node = choice_ref - .get_node(db)? + .get_node(db, path.current())? .ok_or(TrieError::InconsistentTree)?; *choice_ref = child_node.insert(db, path, value)?.into(); @@ -89,7 +91,7 @@ impl BranchNode { )); } else { *choice_ref = choice_ref - .get_node(db)? + .get_node(db, path.current())? .ok_or(TrieError::InconsistentTree)? .insert(db, path, value)? .into(); @@ -100,6 +102,7 @@ impl BranchNode { // Insert into self self.update(value); } else { + // Value in branches don't happen in our use-case. todo!("handle override case (error?)") } @@ -130,13 +133,14 @@ impl BranchNode { [+1 children] Branch { [childA, childB, ... ], None } -> Branch { [childA, childB, ... ], None } */ + let base_path = path.clone(); // Step 1: Remove value // Check if the value is located in a child subtrie let value = if let Some(choice_index) = path.next_choice() { if self.choices[choice_index].is_valid() { let child_node = self.choices[choice_index] - .get_node(db)? + .get_node(db, path.current())? .ok_or(TrieError::InconsistentTree)?; // Remove value from child node let (child_node, old_value) = child_node.remove(db, path.clone())?; @@ -176,7 +180,9 @@ impl BranchNode { // If this node doesn't have a value and has only one child, replace it with its child node (1, false) => { let (choice_index, child_ref) = children[0]; - let child = child_ref.get_node(db)?.ok_or(TrieError::InconsistentTree)?; + let child = child_ref + .get_node(db, base_path.current().append_new(choice_index as u8))? + .ok_or(TrieError::InconsistentTree)?; match child { // Replace self with an extension node leading to the child Node::Branch(_) => ExtensionNode::new( @@ -243,7 +249,9 @@ impl BranchNode { // Continue to child let child_ref = &self.choices[choice]; if child_ref.is_valid() { - let child_node = child_ref.get_node(db)?.ok_or(TrieError::InconsistentTree)?; + let child_node = child_ref + .get_node(db, path.current())? + .ok_or(TrieError::InconsistentTree)?; child_node.get_path(db, path, node_path)?; } } diff --git a/crates/common/trie/node/extension.rs b/crates/common/trie/node/extension.rs index 9878a6f5b47..00248990044 100644 --- a/crates/common/trie/node/extension.rs +++ b/crates/common/trie/node/extension.rs @@ -28,7 +28,7 @@ impl ExtensionNode { if path.skip_prefix(&self.prefix) { let child_node = self .child - .get_node(db)? + .get_node(db, path.current())? .ok_or(TrieError::InconsistentTree)?; child_node.get(db, path) @@ -56,12 +56,13 @@ impl ExtensionNode { */ let match_index = path.count_prefix(&self.prefix); if match_index == self.prefix.len() { + let path = path.offset(match_index); // Insert into child node let child_node = self .child - .get_node(db)? + .get_node(db, path.current())? .ok_or(TrieError::InconsistentTree)?; - let new_child_node = child_node.insert(db, path.offset(match_index), value)?; + let new_child_node = child_node.insert(db, path, value)?; self.child = new_child_node.into(); Ok(self.into()) } else if match_index == 0 { @@ -72,7 +73,7 @@ impl ExtensionNode { }; let mut choices = BranchNode::EMPTY_CHOICES; let branch_node = if self.prefix.at(0) == 16 { - match new_node.get_node(db)? { + match new_node.get_node(db, path.current())? { Some(Node::Leaf(leaf)) => BranchNode::new_with_value(choices, leaf.value), _ => return Err(TrieError::InconsistentTree), } @@ -107,7 +108,7 @@ impl ExtensionNode { if path.skip_prefix(&self.prefix) { let child_node = self .child - .get_node(db)? + .get_node(db, path.current())? .ok_or(TrieError::InconsistentTree)?; // Remove value from child subtrie let (child_node, old_value) = child_node.remove(db, path)?; @@ -174,7 +175,7 @@ impl ExtensionNode { if path.skip_prefix(&self.prefix) { let child_node = self .child - .get_node(db)? + .get_node(db, path.current())? .ok_or(TrieError::InconsistentTree)?; child_node.get_path(db, path, node_path)?; } diff --git a/crates/common/trie/node/leaf.rs b/crates/common/trie/node/leaf.rs index dce9a34416e..831cf3de3e4 100644 --- a/crates/common/trie/node/leaf.rs +++ b/crates/common/trie/node/leaf.rs @@ -72,6 +72,7 @@ impl LeafNode { choices, match value { ValueOrHash::Value(value) => value, + // Value in branches don't happen in our use-case. ValueOrHash::Hash(_) => todo!("handle override case (error?)"), }, ) diff --git a/crates/common/trie/trie.rs b/crates/common/trie/trie.rs index 3eafaf226bd..1a6d7915f3e 100644 --- a/crates/common/trie/trie.rs +++ b/crates/common/trie/trie.rs @@ -48,12 +48,13 @@ pub type ValueRLP = Vec; /// RLP-encoded trie node pub type NodeRLP = Vec; /// Represents a node in the Merkle Patricia Trie. -pub type TrieNode = (NodeHash, NodeRLP); +pub type TrieNode = (Nibbles, NodeRLP); -/// Libmdx-based Ethereum Compatible Merkle Patricia Trie +/// Ethereum-compatible Merkle Patricia Trie pub struct Trie { db: Box, pub root: NodeRef, + pending_removal: HashSet, } impl Default for Trie { @@ -68,6 +69,7 @@ impl Trie { Self { db, root: NodeRef::default(), + pending_removal: HashSet::new(), } } @@ -80,6 +82,7 @@ impl Trie { } else { Default::default() }, + pending_removal: HashSet::new(), } } @@ -92,14 +95,19 @@ impl Trie { } /// Retrieve an RLP-encoded value from the trie given its RLP-encoded path. - pub fn get(&self, path: &PathRLP) -> Result, TrieError> { + pub fn get(&self, pathrlp: &PathRLP) -> Result, TrieError> { + let path = Nibbles::from_bytes(pathrlp); + Ok(match self.root { - NodeRef::Node(ref node, _) => node.get(self.db.as_ref(), Nibbles::from_bytes(path))?, - NodeRef::Hash(hash) if hash.is_valid() => { - let rlp = self.db.get(hash)?.ok_or(TrieError::InconsistentTree)?; - let node = Node::decode(&rlp).map_err(TrieError::RLPDecode)?; - node.get(self.db.as_ref(), Nibbles::from_bytes(path))? - } + NodeRef::Node(ref node, _) => node.get(self.db.as_ref(), path)?, + NodeRef::Hash(hash) if hash.is_valid() => Node::decode( + &self + .db + .get(Nibbles::default())? + .ok_or(TrieError::InconsistentTree)?, + ) + .map_err(TrieError::RLPDecode)? + .get(self.db.as_ref(), path)?, _ => None, }) } @@ -107,11 +115,12 @@ impl Trie { /// Insert an RLP-encoded value into the trie. pub fn insert(&mut self, path: PathRLP, value: ValueRLP) -> Result<(), TrieError> { let path = Nibbles::from_bytes(&path); + self.pending_removal.remove(&path); self.root = if self.root.is_valid() { // If the trie is not empty, call the root node's insertion logic. self.root - .get_node(self.db.as_ref())? + .get_node(self.db.as_ref(), Nibbles::default())? .ok_or(TrieError::InconsistentTree)? .insert(self.db.as_ref(), path, value)? .into() @@ -129,11 +138,10 @@ impl Trie { if !self.root.is_valid() { return Ok(None); } - // If the trie is not empty, call the root node's removal logic. let (node, value) = self .root - .get_node(self.db.as_ref())? + .get_node(self.db.as_ref(), Nibbles::default())? .ok_or(TrieError::InconsistentTree)? .remove(self.db.as_ref(), Nibbles::from_bytes(path))?; self.root = node.map(Into::into).unwrap_or_default(); @@ -175,11 +183,8 @@ impl Trie { /// This method will also compute the hash of all internal nodes indirectly. It will not clear /// the cached nodes. pub fn commit(&mut self) -> Result<(), TrieError> { - if self.root.is_valid() { - let mut acc = Vec::new(); - self.root.commit(&mut acc); - self.db.put_batch(acc)?; // we'll try to avoid calling this for every commit - } + let acc = self.commit_without_storing(); + self.db.put_batch(acc)?; Ok(()) } @@ -189,8 +194,12 @@ impl Trie { pub fn commit_without_storing(&mut self) -> Vec { let mut acc = Vec::new(); if self.root.is_valid() { - self.root.commit(&mut acc); + self.root.commit(Nibbles::default(), &mut acc); } + if self.root.compute_hash() == NodeHash::Hashed(*EMPTY_TRIE_HASH) { + acc.push((Nibbles::default(), vec![RLP_NULL])) + } + acc.extend(self.pending_removal.drain().map(|nib| (nib, vec![]))); acc } @@ -211,7 +220,7 @@ impl Trie { node_path.push(data[..len as usize].to_vec()); } - let root = match self.root.get_node(self.db.as_ref())? { + let root = match self.root.get_node(self.db.as_ref(), Nibbles::default())? { Some(x) => x, None => return Ok(Vec::new()), }; @@ -233,7 +242,7 @@ impl Trie { if self.root.is_valid() { let encoded_root = self .root - .get_node(self.db.as_ref())? + .get_node(self.db.as_ref(), Nibbles::default())? .ok_or(TrieError::InconsistentTree)? .encode_raw(); @@ -351,7 +360,7 @@ impl Trie { struct NullTrieDB; impl TrieDB for NullTrieDB { - fn get(&self, _key: NodeHash) -> Result>, TrieError> { + fn get(&self, _key: Nibbles) -> Result>, TrieError> { Ok(None) } @@ -378,6 +387,7 @@ impl Trie { fn get_node_inner( db: &dyn TrieDB, + current_path: Nibbles, node: Node, mut partial_path: Nibbles, ) -> Result, TrieError> { @@ -390,9 +400,11 @@ impl Trie { Some(idx) => { let child_ref = &branch_node.choices[idx]; if child_ref.is_valid() { - let child_node = - child_ref.get_node(db)?.ok_or(TrieError::InconsistentTree)?; - get_node_inner(db, child_node, partial_path) + let child_path = current_path.append_new(idx as u8); + let child_node = child_ref + .get_node(db, child_path.clone())? + .ok_or(TrieError::InconsistentTree)?; + get_node_inner(db, child_path, child_node, partial_path) } else { Ok(vec![]) } @@ -403,11 +415,12 @@ impl Trie { if partial_path.skip_prefix(&extension_node.prefix) && extension_node.child.is_valid() { + let child_path = partial_path.concat(&extension_node.prefix); let child_node = extension_node .child - .get_node(db)? + .get_node(db, child_path.clone())? .ok_or(TrieError::InconsistentTree)?; - get_node_inner(db, child_node, partial_path) + get_node_inner(db, child_path, child_node, partial_path) } else { Ok(vec![]) } @@ -420,8 +433,9 @@ impl Trie { if self.root.is_valid() { get_node_inner( self.db.as_ref(), + Default::default(), self.root - .get_node(self.db.as_ref())? + .get_node(self.db.as_ref(), Default::default())? .ok_or(TrieError::InconsistentTree)?, partial_path, ) @@ -434,18 +448,12 @@ impl Trie { if self.hash_no_commit() == *EMPTY_TRIE_HASH { return Ok(None); } - self.root.get_node(self.db.as_ref()) + self.root.get_node(self.db.as_ref(), Nibbles::default()) } /// Creates a new Trie based on a temporary InMemory DB fn new_temp() -> Self { - use std::collections::BTreeMap; - use std::sync::Arc; - use std::sync::Mutex; - - let hmap: BTreeMap> = BTreeMap::new(); - let map = Arc::new(Mutex::new(hmap)); - let db = InMemoryTrieDB::new(map); + let db = InMemoryTrieDB::new(Default::default()); Trie::new(Box::new(db)) } } @@ -472,7 +480,7 @@ impl ProofTrie { // If the trie is not empty, call the root node's insertion logic. self.0 .root - .get_node(self.0.db.as_ref())? + .get_node(self.0.db.as_ref(), Nibbles::default())? .ok_or(TrieError::InconsistentTree)? .insert(self.0.db.as_ref(), partial_path, external_ref)? .into() diff --git a/crates/common/trie/trie_iter.rs b/crates/common/trie/trie_iter.rs index 45aded9f6fc..3635b9ed04c 100644 --- a/crates/common/trie/trie_iter.rs +++ b/crates/common/trie/trie_iter.rs @@ -42,7 +42,7 @@ impl TrieIterator { node: NodeRef, new_stack: &mut Vec<(Nibbles, NodeRef)>, ) -> Result<(), TrieError> { - let Some(next_node) = node.get_node(db).ok().flatten() else { + let Some(next_node) = node.get_node(db, prefix_nibbles.clone()).ok().flatten() else { return Ok(()); }; match &next_node { @@ -139,7 +139,10 @@ impl Iterator for TrieIterator { }; // Fetch the last node in the stack let (mut path, next_node_ref) = self.stack.pop()?; - let next_node = next_node_ref.get_node(self.db.as_ref()).ok().flatten()?; + let next_node = next_node_ref + .get_node(self.db.as_ref(), path.clone()) + .ok() + .flatten()?; match &next_node { Node::Branch(branch_node) => { // Add all children to the stack (in reverse order so we process first child frist) diff --git a/crates/common/trie/trie_sorted.rs b/crates/common/trie/trie_sorted.rs index 0e0859596f7..9c3f8cbd06b 100644 --- a/crates/common/trie/trie_sorted.rs +++ b/crates/common/trie/trie_sorted.rs @@ -1,5 +1,5 @@ use crate::{ - EMPTY_TRIE_HASH, Nibbles, Node, NodeHash, TrieDB, TrieError, + EMPTY_TRIE_HASH, Nibbles, Node, TrieDB, TrieError, node::{BranchNode, ExtensionNode, LeafNode}, }; use crossbeam::channel::{Receiver, Sender, bounded}; @@ -79,7 +79,7 @@ fn create_parent(center_side: &CenterSide, closest_nibbles: &Nibbles) -> StackEl } fn add_center_to_parent_and_write_queue( - nodes_to_write: &mut Vec<(NodeHash, Node)>, + nodes_to_write: &mut Vec<(Nibbles, Node)>, center_side: &CenterSide, parent_element: &mut StackElement, ) -> Result<(), TrieGenerationError> { @@ -90,25 +90,32 @@ fn add_center_to_parent_and_write_queue( let index = path .next() .ok_or(TrieGenerationError::IndexNotFound(center_side.path.clone()))?; - let node: Node = match ¢er_side.element { + let top_path = parent_element.path.append_new(index); + let (target_path, node): (Nibbles, Node) = match ¢er_side.element { CenterSideElement::Branch { node } => { if path.is_empty() { - node.clone().into() + (top_path, node.clone().into()) } else { let hash = node.compute_hash(); - nodes_to_write.push((hash, node.clone().into())); - ExtensionNode { - prefix: path, - child: hash.into(), - } - .into() + nodes_to_write.push((center_side.path.clone(), node.clone().into())); + ( + top_path, + ExtensionNode { + prefix: path, + child: hash.into(), + } + .into(), + ) } } - CenterSideElement::Leaf { value } => LeafNode { - partial: path, - value: value.clone(), - } - .into(), + CenterSideElement::Leaf { value } => ( + top_path, + LeafNode { + partial: path, + value: value.clone(), + } + .into(), + ), }; parent_element.element.choices[index as usize] = node.compute_hash().into(); debug!( @@ -121,14 +128,14 @@ fn add_center_to_parent_and_write_queue( .filter_map(|(index, child)| child.is_valid().then_some(index)) .collect::>() ); - nodes_to_write.push((node.compute_hash(), node)); + nodes_to_write.push((target_path, node)); Ok(()) } fn flush_nodes_to_write( - mut nodes_to_write: Vec<(NodeHash, Node)>, + mut nodes_to_write: Vec<(Nibbles, Node)>, db: &dyn TrieDB, - sender: Sender>, + sender: Sender>, ) -> Result<(), TrieGenerationError> { db.put_batch_no_alloc(&nodes_to_write) .map_err(TrieGenerationError::FlushToDbError)?; @@ -137,13 +144,14 @@ fn flush_nodes_to_write( Ok(()) } +// TODO: why this inline(never)? #[inline(never)] pub fn trie_from_sorted_accounts<'scope, T>( db: &'scope dyn TrieDB, data_iter: &mut T, scope: Arc>, - buffer_sender: Sender>, - buffer_receiver: Receiver>, + buffer_sender: Sender>, + buffer_receiver: Receiver>, ) -> Result where T: Iterator)> + Send, @@ -151,7 +159,7 @@ where let Some(initial_value) = data_iter.next() else { return Ok(*EMPTY_TRIE_HASH); }; - let mut nodes_to_write: Vec<(NodeHash, Node)> = buffer_receiver + let mut nodes_to_write: Vec<(Nibbles, Node)> = buffer_receiver .recv() .expect("This channel shouldn't close"); let mut trie_stack: Vec = Vec::with_capacity(64); // Optimized for H256 @@ -238,8 +246,7 @@ where .find(|(_, child)| child.is_valid()) .unwrap(); - debug_assert!(nodes_to_write.last().unwrap().0 == child.compute_hash()); - let (node_hash, node_hash_ref) = nodes_to_write.iter_mut().last().unwrap(); + let (target_path, node_hash_ref) = nodes_to_write.iter_mut().last().unwrap(); match node_hash_ref { Node::Branch(_) => { let node: Node = ExtensionNode { @@ -247,31 +254,37 @@ where child, } .into(); - nodes_to_write.push((node.compute_hash(), node)); + nodes_to_write.push((Nibbles::default(), node)); nodes_to_write .last() .expect("we just inserted") - .0 + .1 + .compute_hash() .finalize() } Node::Extension(extension_node) => { - extension_node.prefix.data.insert(0, index as u8); - *node_hash = extension_node.compute_hash(); - node_hash.finalize() + extension_node.prefix.prepend(index as u8); + // This next works because this target path is always length of 1 element, + // and we're just removing that one element + target_path.next(); + extension_node.compute_hash().finalize() } Node::Leaf(leaf_node) => { - leaf_node.partial.data.insert(0, index as u8); - *node_hash = leaf_node.compute_hash(); - node_hash.finalize() + leaf_node.partial.prepend(index as u8); + // This next works because this target path is always length of 1 element, + // and we're just removing that one element + target_path.next(); + leaf_node.compute_hash().finalize() } } } else { let node: Node = left_side.element.into(); - nodes_to_write.push((node.compute_hash(), node)); + nodes_to_write.push((Nibbles::default(), node)); nodes_to_write .last() .expect("we just inserted") - .0 + .1 + .compute_hash() .finalize() }; @@ -286,7 +299,7 @@ pub fn trie_from_sorted_accounts_wrap( where T: Iterator)> + Send, { - let (buffer_sender, buffer_receiver) = bounded::>(BUFFER_COUNT as usize); + let (buffer_sender, buffer_receiver) = bounded::>(BUFFER_COUNT as usize); for _ in 0..BUFFER_COUNT { let _ = buffer_sender.send(Vec::with_capacity(SIZE_TO_WRITE_DB as usize)); } @@ -415,9 +428,9 @@ mod test { let computed_data = computed_data.lock().unwrap(); let expected_data = expected_data.lock().unwrap(); - for (k, v) in computed_data.iter() { - assert!(expected_data.contains_key(k)); - assert_eq!(*v, expected_data[k]); + for (k, v) in expected_data.iter() { + assert!(computed_data.contains_key(k)); + assert_eq!(*v, computed_data[k]); } } diff --git a/crates/l2/sequencer/block_producer.rs b/crates/l2/sequencer/block_producer.rs index 9a58965d280..85dabb0fb4c 100644 --- a/crates/l2/sequencer/block_producer.rs +++ b/crates/l2/sequencer/block_producer.rs @@ -205,7 +205,6 @@ impl BlockProducer { .store_block(block, account_updates_list, execution_result) .await?; info!("Stored new block {:x}", block_hash); - // WARN: We're not storing the payload into the Store because there's no use to it by the L2 for now. self.rollup_store .store_account_updates_by_block_number(block_number, account_updates) diff --git a/crates/l2/tests/state_reconstruct.rs b/crates/l2/tests/state_reconstruct.rs index 5d920366510..a6030956f17 100644 --- a/crates/l2/tests/state_reconstruct.rs +++ b/crates/l2/tests/state_reconstruct.rs @@ -50,10 +50,15 @@ async fn test_state_reconstruct() { }) .collect::>(); - test_state_block(&addresses, 0, 0).await; - test_state_block(&addresses, 6, 50).await; - test_state_block(&addresses, 11, 100).await; - test_state_block(&addresses, 16, 150).await; + // TODO: Historical state is not supported in the DB currently by the client. + // This is due to the newest path-based trie implementation. + // A potential fix would be to store the historical state in the DB through + // diff layers. The commented tests below make no sense until then. + // + // test_state_block(&addresses, 0, 0).await; + // test_state_block(&addresses, 6, 50).await; + // test_state_block(&addresses, 11, 100).await; + // test_state_block(&addresses, 16, 150).await; test_state_block(&addresses, 21, addresses.len() as u64).await; } @@ -68,10 +73,15 @@ async fn test_state_block(addresses: &[Address], block_number: u64, rich_account if index < rich_accounts as usize { assert_eq!( balance, - U256::from_dec_str("500000000000000000000000000").unwrap() + U256::from_dec_str("500000000000000000000000000").unwrap(), + "Balance mismatch for address {address:#x} at block {block_number}. Expected 500000000000000000000000000, got {balance}" ); } else { - assert_eq!(balance, U256::zero()); + assert_eq!( + balance, + U256::zero(), + "Balance should be zero for address {address:#x} at block {block_number}. Expected 0, got {balance}" + ); } } } diff --git a/crates/l2/tests/tests.rs b/crates/l2/tests/tests.rs index 597409f0cd5..97b0071e755 100644 --- a/crates/l2/tests/tests.rs +++ b/crates/l2/tests/tests.rs @@ -1015,12 +1015,13 @@ async fn test_send( test: &str, ) -> Result { let signer: Signer = LocalSigner::new(*private_key).into(); + let calldata = encode_calldata(signature, data).unwrap().into(); let mut tx = build_generic_tx( client, TxType::EIP1559, to, signer.address(), - encode_calldata(signature, data).unwrap().into(), + calldata, Default::default(), ) .await diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index 3e1df0a7567..6f31c301a58 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -2067,7 +2067,7 @@ pub enum PeerHandlerError { PeerTableError(#[from] PeerTableError), } -#[derive(Debug, Clone, std::hash::Hash)] +#[derive(Debug, Clone)] pub struct RequestMetadata { pub hash: H256, pub path: Nibbles, diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index 4ae690b84ac..750e8ea3c15 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -26,6 +26,7 @@ use crate::{ self, Capability, DisconnectMessage, DisconnectReason, PingMessage, PongMessage, SUPPORTED_ETH_CAPABILITIES, SUPPORTED_SNAP_CAPABILITIES, }, + snap::TrieNodes, utils::{log_peer_debug, log_peer_error, log_peer_warn}, }, snap::{ @@ -1022,8 +1023,11 @@ async fn handle_incoming_message( send(state, Message::ByteCodes(response)).await? } Message::GetTrieNodes(req) => { - let response = process_trie_nodes_request(req, state.storage.clone()).await?; - send(state, Message::TrieNodes(response)).await? + let id = req.id; + match process_trie_nodes_request(req, state.storage.clone()).await { + Ok(response) => send(state, Message::TrieNodes(response)).await?, + Err(_) => send(state, Message::TrieNodes(TrieNodes { id, nodes: vec![] })).await?, + } } Message::L2(req) if peer_supports_l2 => { handle_based_capability_message(state, req).await?; diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 2e4c3c4418a..133ec8bed07 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -27,12 +27,8 @@ use ethrex_storage::{EngineType, STATE_TRIE_SEGMENTS, Store, error::StoreError}; use ethrex_trie::trie_sorted::TrieGenerationError; use ethrex_trie::{Trie, TrieError}; use rayon::iter::{ParallelBridge, ParallelIterator}; -#[cfg(not(feature = "rocksdb"))] -use std::collections::hash_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::path::{Path, PathBuf}; -#[cfg(not(feature = "rocksdb"))] -use std::sync::Mutex; use std::time::SystemTime; use std::{ array, @@ -692,13 +688,14 @@ impl FullBlockSyncState { .await { if let Some(batch_failure) = batch_failure { - warn!("Failed to add block during FullSync: {err}"); + let failed_block_hash = batch_failure.failed_block_hash; + warn!(%err, block=%failed_block_hash, "Failed to add block during FullSync"); // Since running the batch failed we set the failing block and it's descendants with having an invalid ancestor on the following cases. if let ChainError::InvalidBlock(_) = err { let mut block_hashes_with_invalid_ancestor: Vec = vec![]; if let Some(index) = block_batch_hashes .iter() - .position(|x| x == &batch_failure.failed_block_hash) + .position(|x| x == &failed_block_hash) { block_hashes_with_invalid_ancestor = block_batch_hashes[index..].to_vec(); } @@ -1059,6 +1056,7 @@ impl Syncer { debug_assert!(validate_state_root(store.clone(), pivot_header.state_root).await); debug_assert!(validate_storage_root(store.clone(), pivot_header.state_root).await); + info!("Finished healing"); // Finish code hash collection @@ -1160,26 +1158,23 @@ impl Syncer { } #[cfg(not(feature = "rocksdb"))] -type StorageRoots = (H256, Vec<(ethrex_trie::NodeHash, Vec)>); +type StorageRoots = (H256, Vec<(ethrex_trie::Nibbles, Vec)>); #[cfg(not(feature = "rocksdb"))] fn compute_storage_roots( - maybe_big_account_storage_state_roots: Arc>>, store: Store, account_hash: H256, key_value_pairs: &[(H256, U256)], pivot_hash: H256, ) -> Result { - let account_storage_root = match maybe_big_account_storage_state_roots - .lock() - .map_err(|_| SyncError::MaybeBigAccount)? - .entry(account_hash) - { - Entry::Occupied(occupied_entry) => *occupied_entry.get(), - Entry::Vacant(_vacant_entry) => *EMPTY_TRIE_HASH, - }; + use ethrex_trie::{Nibbles, Node}; - let mut storage_trie = store.open_storage_trie(account_hash, account_storage_root)?; + let storage_trie = store.open_direct_storage_trie(account_hash, *EMPTY_TRIE_HASH)?; + let trie_hash = match storage_trie.db().get(Nibbles::default())? { + Some(noderlp) => Node::decode(&noderlp)?.compute_hash().finalize(), + None => *EMPTY_TRIE_HASH, + }; + let mut storage_trie = store.open_direct_storage_trie(account_hash, trie_hash)?; for (hashed_key, value) in key_value_pairs { if let Err(err) = storage_trie.insert(hashed_key.0.to_vec(), value.encode_to_vec()) { @@ -1196,13 +1191,7 @@ fn compute_storage_roots( .ok_or(SyncError::AccountState(pivot_hash, account_hash))?; if computed_storage_root == account_state.storage_root { METRICS.storage_tries_state_roots_computed.inc(); - } else { - maybe_big_account_storage_state_roots - .lock() - .map_err(|_| SyncError::MaybeBigAccount)? - .insert(account_hash, computed_storage_root); } - Ok((account_hash, changes)) } @@ -1491,7 +1480,7 @@ async fn insert_accounts( let store_clone = store.clone(); let current_state_root: Result = tokio::task::spawn_blocking(move || -> Result { - let mut trie = store_clone.open_state_trie(computed_state_root)?; + let mut trie = store_clone.open_direct_state_trie(computed_state_root)?; for (account_hash, account) in account_states_snapshot { trie.insert(account_hash.0.to_vec(), account.encode_to_vec())?; @@ -1517,8 +1506,6 @@ async fn insert_storages( pivot_header: &BlockHeader, ) -> Result<(), SyncError> { use rayon::iter::IntoParallelIterator; - let maybe_big_account_storage_state_roots: Arc>> = - Arc::new(Mutex::new(HashMap::new())); for entry in std::fs::read_dir(account_storages_snapshots_dir) .map_err(|_| SyncError::AccountStoragesSnapshotsDirNotFound)? @@ -1546,8 +1533,6 @@ async fn insert_storages( }) .map_err(|_| SyncError::SnapshotDecodeError(snapshot_path.clone()))?; - let maybe_big_account_storage_state_roots_clone = - maybe_big_account_storage_state_roots.clone(); let store_clone = store.clone(); let pivot_hash_moved = pivot_header.hash(); info!("Starting compute of account_storages_snapshot"); @@ -1565,13 +1550,7 @@ async fn insert_storages( .map(move |account| (account, storages.clone())) }) .map(|(account, storages)| { - compute_storage_roots( - maybe_big_account_storage_state_roots_clone.clone(), - store.clone(), - account, - &storages, - pivot_hash_moved, - ) + compute_storage_roots(store.clone(), account, &storages, pivot_hash_moved) }) .collect::, SyncError>>() }) @@ -1596,7 +1575,7 @@ async fn insert_accounts( use crate::utils::get_rocksdb_temp_accounts_dir; use ethrex_trie::trie_sorted::trie_from_sorted_accounts_wrap; - let trie = store.open_state_trie(*EMPTY_TRIE_HASH)?; + let trie = store.open_direct_state_trie(*EMPTY_TRIE_HASH)?; let mut db_options = rocksdb::Options::default(); db_options.create_if_missing(true); let db = rocksdb::DB::open(&db_options, get_rocksdb_temp_accounts_dir(datadir)) @@ -1658,7 +1637,7 @@ async fn insert_storages( use crossbeam::channel::{bounded, unbounded}; use ethrex_threadpool::ThreadPool; use ethrex_trie::{ - Node, NodeHash, + Nibbles, Node, trie_sorted::{BUFFER_COUNT, SIZE_TO_WRITE_DB, trie_from_sorted_accounts}, }; use std::thread::scope; @@ -1718,7 +1697,7 @@ async fn insert_storages( ( account_hash, store - .open_storage_trie(account_hash, *EMPTY_TRIE_HASH) + .open_direct_storage_trie(account_hash, *EMPTY_TRIE_HASH) .expect("Should be able to open trie"), ) }) @@ -1730,7 +1709,7 @@ async fn insert_storages( .map(|num| num.into()) .unwrap_or(8); - let (buffer_sender, buffer_receiver) = bounded::>(BUFFER_COUNT as usize); + let (buffer_sender, buffer_receiver) = bounded::>(BUFFER_COUNT as usize); for _ in 0..BUFFER_COUNT { let _ = buffer_sender.send(Vec::with_capacity(SIZE_TO_WRITE_DB as usize)); } diff --git a/crates/networking/p2p/sync/state_healing.rs b/crates/networking/p2p/sync/state_healing.rs index 6cea48e6667..aef943b9798 100644 --- a/crates/networking/p2p/sync/state_healing.rs +++ b/crates/networking/p2p/sync/state_healing.rs @@ -10,7 +10,7 @@ use std::{ cmp::min, - collections::HashMap, + collections::{BTreeMap, HashMap}, sync::atomic::Ordering, time::{Duration, Instant}, }; @@ -18,7 +18,7 @@ use std::{ use ethrex_common::{H256, constants::EMPTY_KECCACK_HASH, types::AccountState}; use ethrex_rlp::{decode::RLPDecode, encode::RLPEncode}; use ethrex_storage::Store; -use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Node, NodeHash, TrieDB, TrieError}; +use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Node, TrieDB, TrieError}; use tracing::{debug, error, info}; use crate::{ @@ -107,7 +107,8 @@ async fn heal_state_trie( let mut downloads_fail = 0; let mut leafs_healed = 0; let mut empty_try_recv: u64 = 0; - let mut nodes_to_write: Vec = Vec::new(); + let mut heals_per_cycle: u64 = 0; + let mut nodes_to_write: Vec<(Nibbles, Node)> = Vec::new(); let mut db_joinset = tokio::task::JoinSet::new(); // channel to send the tasks to the peers @@ -135,21 +136,13 @@ async fn heal_state_trie( METRICS .healing_empty_try_recv .store(empty_try_recv, Ordering::Relaxed); - if is_stale { - debug!( - "State Healing stopping due to staleness, snap peers available {num_peers}, inflight_tasks: {inflight_tasks}, Maximum depth reached on loop {longest_path_seen}, leafs healed {leafs_healed}, global leafs healed {}, Download success rate {downloads_rate}, Paths to go {}, Membatch size {}", - global_leafs_healed, - paths.len(), - membatch.len() - ); - } else { - debug!( - "State Healing in Progress, snap peers available {num_peers}, inflight_tasks: {inflight_tasks}, Maximum depth reached on loop {longest_path_seen}, leafs healed {leafs_healed}, global leafs healed {}, Download success rate {downloads_rate}, Paths to go {}, Membatch size {}", - global_leafs_healed, - paths.len(), - membatch.len() - ); - } + debug!( + "State Healing {}, snap peers available {num_peers}, inflight_tasks: {inflight_tasks}, Maximum depth reached on loop {longest_path_seen}, leafs healed {leafs_healed}, global leafs healed {}, Download success rate {downloads_rate}, Paths to go {}, Membatch size {}, Processing per cycle {heals_per_cycle}", + if is_stale { "stopping" } else { "in progress" }, + global_leafs_healed, + paths.len(), + membatch.len() + ); downloads_success = 0; downloads_fail = 0; } @@ -168,9 +161,8 @@ async fn heal_state_trie( for (node, meta) in nodes.iter().zip(batch.iter()) { if let Node::Leaf(node) = node { let account = AccountState::decode(&node.value)?; - let account_hash = H256::from_slice( - &meta.path.concat(node.partial.clone()).to_bytes(), - ); + let account_hash = + H256::from_slice(&meta.path.concat(&node.partial).to_bytes()); // // Collect valid code hash if account.code_hash != *EMPTY_KECCACK_HASH { @@ -264,6 +256,7 @@ async fn heal_state_trie( // If there is at least one "batch" of nodes to heal, heal it if let Some((nodes, batch)) = nodes_to_heal.pop() { + heals_per_cycle += 1; let return_paths = heal_state_batch( batch, nodes, @@ -271,7 +264,6 @@ async fn heal_state_trie( &mut membatch, &mut nodes_to_write, ) - .await .inspect_err(|err| { error!("We have found a sync error while trying to write to DB a batch: {err}") })?; @@ -281,30 +273,31 @@ async fn heal_state_trie( let is_done = paths.is_empty() && nodes_to_heal.is_empty() && inflight_tasks == 0; if nodes_to_write.len() > 100_000 || is_done || is_stale { - let to_write = nodes_to_write; - nodes_to_write = Vec::new(); + // PERF: reuse buffers? + let to_write = std::mem::take(&mut nodes_to_write); let store = store.clone(); - if db_joinset.len() > 3 { - db_joinset.join_next().await; + // NOTE: we keep only a single task in the background to avoid out of order deletes + if !db_joinset.is_empty() { + db_joinset + .join_next() + .await + .expect("we just checked joinset is not empty")?; } - db_joinset.spawn_blocking(|| { - spawned_rt::tasks::block_on(async move { - // TODO: replace put batch with the async version - let trie_db = store - .open_state_trie(*EMPTY_TRIE_HASH) - .expect("Store should open"); - let db = trie_db.db(); - db.put_batch( - to_write - .into_iter() - .filter_map(|node| match node.compute_hash() { - hash @ NodeHash::Hashed(_) => Some((hash, node.encode_to_vec())), - NodeHash::Inline(_) => None, - }) - .collect(), - ) + db_joinset.spawn_blocking(move || { + let mut encoded_to_write = BTreeMap::new(); + for (path, node) in to_write { + for i in 0..path.len() { + encoded_to_write.insert(path.slice(0, i), vec![]); + } + encoded_to_write.insert(path, node.encode_to_vec()); + } + let trie_db = store + .open_direct_state_trie(*EMPTY_TRIE_HASH) + .expect("Store should open"); + let db = trie_db.db(); + // PERF: use put_batch_no_alloc (note that it needs to remove nodes too) + db.put_batch(encoded_to_write.into_iter().collect()) .expect("The put batch on the store failed"); - }) }); } @@ -337,14 +330,14 @@ async fn heal_state_trie( /// Receives a set of state trie paths, fetches their respective nodes, stores them, /// and returns their children paths and the paths that couldn't be fetched so they can be returned to the queue -async fn heal_state_batch( +fn heal_state_batch( mut batch: Vec, nodes: Vec, store: Store, membatch: &mut HashMap, - nodes_to_write: &mut Vec, // TODO: change tuple to struct + nodes_to_write: &mut Vec<(Nibbles, Node)>, // TODO: change tuple to struct ) -> Result, SyncError> { - let trie = store.open_state_trie(*EMPTY_TRIE_HASH)?; + let trie = store.open_direct_state_trie(*EMPTY_TRIE_HASH)?; for node in nodes.into_iter() { let path = batch.remove(0); let (missing_children_count, missing_children) = @@ -375,9 +368,9 @@ fn commit_node( path: &Nibbles, parent_path: &Nibbles, membatch: &mut HashMap, - nodes_to_write: &mut Vec, + nodes_to_write: &mut Vec<(Nibbles, Node)>, ) { - nodes_to_write.push(node); + nodes_to_write.push((path.clone(), node)); if parent_path == path { return; // Case where we're saving the root @@ -412,27 +405,49 @@ pub fn node_missing_children( match &node { Node::Branch(node) => { for (index, child) in node.choices.iter().enumerate() { - if child.is_valid() && child.get_node(trie_state)?.is_none() { - missing_children_count += 1; - paths.extend(vec![RequestMetadata { - hash: child.compute_hash().finalize(), - path: path.clone().append_new(index as u8), - parent_path: path.clone(), - }]); + let child_path = path.clone().append_new(index as u8); + if !child.is_valid() { + continue; + } + let validity = child + .get_node(trie_state, child_path.clone()) + .inspect_err(|_| { + error!("Malformed data when doing get child of a branch node") + })? + .is_some(); + if validity { + continue; } - } - } - Node::Extension(node) => { - if node.child.is_valid() && node.child.get_node(trie_state)?.is_none() { - missing_children_count += 1; + missing_children_count += 1; paths.extend(vec![RequestMetadata { - hash: node.child.compute_hash().finalize(), - path: path.concat(node.prefix.clone()), + hash: child.compute_hash().finalize(), + path: child_path, parent_path: path.clone(), }]); } } + Node::Extension(node) => { + let child_path = path.concat(&node.prefix); + if !node.child.is_valid() { + return Ok((0, vec![])); + } + let validity = node + .child + .get_node(trie_state, child_path.clone()) + .inspect_err(|_| error!("Malformed data when doing get child of a branch node"))? + .is_some(); + if validity { + return Ok((0, vec![])); + } + missing_children_count += 1; + + paths.extend(vec![RequestMetadata { + hash: node.child.compute_hash().finalize(), + path: child_path, + parent_path: path.clone(), + }]); + } _ => {} } Ok((missing_children_count, paths)) diff --git a/crates/networking/p2p/sync/storage_healing.rs b/crates/networking/p2p/sync/storage_healing.rs index 3966f66e16d..6e4f1f1aada 100644 --- a/crates/networking/p2p/sync/storage_healing.rs +++ b/crates/networking/p2p/sync/storage_healing.rs @@ -16,7 +16,7 @@ use bytes::Bytes; use ethrex_common::{H256, types::AccountState}; use ethrex_rlp::{decode::RLPDecode, encode::RLPEncode, error::RLPDecodeError}; use ethrex_storage::{Store, error::StoreError}; -use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Node, NodeHash}; +use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Node}; use rand::random; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::{ @@ -157,7 +157,7 @@ pub async fn heal_storage_trie( Result>>, > = JoinSet::new(); - let mut nodes_to_write: HashMap)>> = HashMap::new(); + let mut nodes_to_write: HashMap> = HashMap::new(); let mut db_joinset = tokio::task::JoinSet::new(); // channel to send the tasks to the peers @@ -175,7 +175,7 @@ pub async fn heal_storage_trie( .store(state.empty_count as u64, Ordering::Relaxed); state.last_update = Instant::now(); debug!( - "We are storage healing. Snap Peers {}. Inflight tasks {}. Download Queue {}. Maximum length {}. Leafs Healed {}. Global Leafs Healed {global_leafs_healed}. Roots Healed {}. Good Download Percentage {}. Empty count {}. Disconnected Count {}.", + "We are storage healing. Snap Peers {}. Inflight tasks {}. Download Queue {}. Maximum length {}. Leafs Healed {}. Global Leafs Healed {global_leafs_healed}. Roots Healed {}. Good Downloads {}. Good Download Percentage {}. Empty count {}. Disconnected Count {}.", peers .peer_table .peer_count_by_capabilities(&SUPPORTED_SNAP_CAPABILITIES) @@ -186,6 +186,7 @@ pub async fn heal_storage_trie( state.maximum_length_seen, state.leafs_healed, state.roots_healed, + state.succesful_downloads, state.succesful_downloads as f64 / (state.succesful_downloads as f64 + state.failed_downloads as f64), state.empty_count, @@ -201,18 +202,27 @@ pub async fn heal_storage_trie( let is_stale = current_unix_time() > state.staleness_timestamp; if nodes_to_write.values().map(Vec::len).sum::() > 100_000 || is_done || is_stale { - let to_write = nodes_to_write.drain().collect(); + let to_write: Vec<_> = nodes_to_write.drain().collect(); let store = state.store.clone(); - if db_joinset.len() > 3 { + // NOTE: we keep only a single task in the background to avoid out of order deletes + if !db_joinset.is_empty() { db_joinset.join_next().await; } - db_joinset.spawn_blocking(|| { - spawned_rt::tasks::block_on(async move { - store - .write_storage_trie_nodes_batch(to_write) - .await - .expect("db write failed"); - }) + db_joinset.spawn_blocking(move || { + let mut encoded_to_write = vec![]; + for (hashed_account, nodes) in to_write { + let mut account_nodes = vec![]; + for (path, node) in nodes { + for i in 0..path.len() { + account_nodes.push((path.slice(0, i), vec![])); + } + account_nodes.push((path, node.encode_to_vec())); + } + encoded_to_write.push((hashed_account, account_nodes)); + } + // PERF: use put_batch_no_alloc? (it needs to remove parent nodes too) + spawned_rt::tasks::block_on(store.write_storage_trie_nodes_batch(encoded_to_write)) + .expect("db write failed"); }); } @@ -257,7 +267,7 @@ pub async fn heal_storage_trie( &mut state.requests, peers, &mut state.download_queue, - trie_nodes.clone(), // TODO: remove unnecesary clone, needed now for log 🏗️🏗️ + &trie_nodes, &mut state.succesful_downloads, &mut state.failed_downloads, ) @@ -269,7 +279,7 @@ pub async fn heal_storage_trie( process_node_responses( &mut nodes_from_peer, &mut state.download_queue, - state.store.clone(), + &state.store, &mut state.membatch, &mut state.leafs_healed, global_leafs_healed, @@ -277,7 +287,7 @@ pub async fn heal_storage_trie( &mut state.maximum_length_seen, &mut nodes_to_write, ) - .expect("We shouldn't be getting store errors"); // TODO: if we have a stor error we should stop + .expect("We shouldn't be getting store errors"); // TODO: if we have a store error we should stop } Err(RequestStorageTrieNodes::RequestError(id, _err)) => { let inflight_request = state.requests.remove(&id).expect("request disappeared"); @@ -393,7 +403,7 @@ async fn zip_requeue_node_responses_score_peer( requests: &mut HashMap, peer_handler: &mut PeerHandler, download_queue: &mut VecDeque, - trie_nodes: TrieNodes, + trie_nodes: &TrieNodes, succesful_downloads: &mut usize, failed_downloads: &mut usize, ) -> Result>, SyncError> { @@ -461,13 +471,13 @@ async fn zip_requeue_node_responses_score_peer( fn process_node_responses( node_processing_queue: &mut Vec, download_queue: &mut VecDeque, - store: Store, + store: &Store, membatch: &mut Membatch, leafs_healed: &mut usize, global_leafs_healed: &mut u64, roots_healed: &mut usize, maximum_length_seen: &mut usize, - to_write: &mut HashMap)>>, + to_write: &mut HashMap>, ) -> Result<(), StoreError> { while let Some(node_response) = node_processing_queue.pop() { trace!("We are processing node response {:?}", node_response); @@ -482,7 +492,7 @@ fn process_node_responses( ); let (missing_children_nibbles, missing_children_count) = - determine_missing_children(&node_response, store.clone()).inspect_err(|err| { + determine_missing_children(&node_response, store).inspect_err(|err| { error!("{err} in determine missing children while searching {node_response:?}") })?; @@ -534,12 +544,7 @@ fn get_initial_downloads( if account.storage_root == *EMPTY_TRIE_HASH { return None; } - if store - .contains_storage_node(*acc_path, account.storage_root) - .expect("We should be able to open the store") - { - return None; - } + Some(NodeRequest { acc_path: Nibbles::from_bytes(&acc_path.0), storage_path: Nibbles::default(), // We need to be careful, the root parent is a special case @@ -556,13 +561,13 @@ fn get_initial_downloads( /// and the number of direct missing children pub fn determine_missing_children( node_response: &NodeResponse, - store: Store, + store: &Store, ) -> Result<(Vec, usize), StoreError> { let mut paths = Vec::new(); let mut count = 0; let node = node_response.node.clone(); let trie = store - .open_storage_trie( + .open_direct_storage_trie( H256::from_slice(&node_response.node_request.acc_path.to_bytes()), *EMPTY_TRIE_HASH, ) @@ -570,54 +575,60 @@ pub fn determine_missing_children( error!("Malformed data when opening the storage trie in determine missing children") })?; let trie_state = trie.db(); + match &node { Node::Branch(node) => { for (index, child) in node.choices.iter().enumerate() { - if child.is_valid() - && child - .get_node(trie_state) - .inspect_err(|_| { - error!("Malformed data when doing get child of a branch node") - })? - .is_none() - { - count += 1; - - paths.extend(vec![NodeRequest { - acc_path: node_response.node_request.acc_path.clone(), - storage_path: node_response - .node_request - .storage_path - .append_new(index as u8), - parent: node_response.node_request.storage_path.clone(), - hash: child.compute_hash().finalize(), - }]); + let child_path = node_response + .node_request + .storage_path + .append_new(index as u8); + if !child.is_valid() { + continue; } - } - } - Node::Extension(node) => { - if node.child.is_valid() - && node - .child - .get_node(trie_state) + let validity = child + .get_node(trie_state, child_path.clone()) .inspect_err(|_| { - error!("Malformed data when doing get child of an extension node") + error!("Malformed data when doing get child of a branch node") })? - .is_none() - { + .is_some(); + + if validity { + continue; + } count += 1; paths.extend(vec![NodeRequest { acc_path: node_response.node_request.acc_path.clone(), - storage_path: node_response - .node_request - .storage_path - .concat(node.prefix.clone()), + storage_path: child_path, parent: node_response.node_request.storage_path.clone(), - hash: node.child.compute_hash().finalize(), + hash: child.compute_hash().finalize(), }]); } } + Node::Extension(node) => { + let child_path = node_response.node_request.storage_path.concat(&node.prefix); + if !node.child.is_valid() { + return Ok((vec![], 0)); + } + let validity = node + .child + .get_node(trie_state, child_path.clone()) + .inspect_err(|_| error!("Malformed data when doing get child of a branch node"))? + .is_some(); + + if validity { + return Ok((vec![], 0)); + } + count += 1; + + paths.extend(vec![NodeRequest { + acc_path: node_response.node_request.acc_path.clone(), + storage_path: child_path, + parent: node_response.node_request.storage_path.clone(), + hash: node.child.compute_hash().finalize(), + }]); + } _ => {} } Ok((paths, count)) @@ -627,13 +638,14 @@ fn commit_node( node: &NodeResponse, membatch: &mut Membatch, roots_healed: &mut usize, - to_write: &mut HashMap)>>, + to_write: &mut HashMap>, ) -> Result<(), StoreError> { let hashed_account = H256::from_slice(&node.node_request.acc_path.to_bytes()); + to_write .entry(hashed_account) .or_default() - .push((node.node.compute_hash(), node.node.encode_to_vec())); + .push((node.node_request.storage_path.clone(), node.node.clone())); // Special case, we have just commited the root, we stop if node.node_request.storage_path == node.node_request.parent { @@ -644,7 +656,7 @@ fn commit_node( return Ok(()); } - let parent_key: (Nibbles, Nibbles) = ( + let parent_key = ( node.node_request.acc_path.clone(), node.node_request.parent.clone(), ); @@ -661,9 +673,9 @@ fn commit_node( membatch, roots_healed, to_write, - )?; + ) } else { membatch.insert(parent_key, parent_entry); + Ok(()) } - Ok(()) } diff --git a/crates/networking/p2p/utils.rs b/crates/networking/p2p/utils.rs index 9b6ce315988..429533ebc4d 100644 --- a/crates/networking/p2p/utils.rs +++ b/crates/networking/p2p/utils.rs @@ -83,9 +83,9 @@ pub fn dump_accounts_to_rocks_db( let writer_options = rocksdb::Options::default(); let mut writer = rocksdb::SstFileWriter::create(&writer_options); writer.open(std::path::Path::new(&path))?; - for (key, acccount) in contents { + for (key, account) in contents { buffer.clear(); - acccount.encode(&mut buffer); + account.encode(&mut buffer); writer.put(key.0.as_ref(), buffer.as_slice())?; } writer.finish() diff --git a/crates/networking/rpc/eth/account.rs b/crates/networking/rpc/eth/account.rs index 6cbf145319c..54c4ef1c981 100644 --- a/crates/networking/rpc/eth/account.rs +++ b/crates/networking/rpc/eth/account.rs @@ -218,44 +218,34 @@ impl RpcHandler for GetProofRequest { let Some(block_number) = self.block.resolve_block_number(storage).await? else { return Ok(Value::Null); }; + let Some(header) = storage.get_block_header(block_number)? else { + return Ok(Value::Null); + }; // Create account proof let Some(account_proof) = storage - .get_account_proof(block_number, &self.address) + .get_account_proof(header.state_root, self.address, &self.storage_keys) .await? else { return Err(RpcErr::Internal("Could not get account proof".to_owned())); }; - let account = storage - .get_account_state(block_number, self.address) - .await?; - // Create storage proofs for all provided storage keys - let mut storage_proofs = Vec::new(); - for storage_key in self.storage_keys.iter() { - let value = storage - .get_storage_at(block_number, self.address, *storage_key) - .await? - .unwrap_or_default(); - let proof = if let Some(account) = &account { - storage.get_storage_proof(self.address, account.storage_root, storage_key)? - } else { - Vec::new() - }; - let storage_proof = StorageProof { - key: storage_key.into_uint(), - proof, - value, - }; - storage_proofs.push(storage_proof); - } - let account = account.unwrap_or_default(); + let storage_proof = account_proof + .storage_proof + .into_iter() + .map(|sp| StorageProof { + key: sp.key.into_uint(), + value: sp.value, + proof: sp.proof, + }) + .collect(); + let account = account_proof.account; let account_proof = AccountProof { - account_proof, + account_proof: account_proof.proof, address: self.address, balance: account.balance, code_hash: account.code_hash, nonce: account.nonce, storage_hash: account.storage_root, - storage_proof: storage_proofs, + storage_proof, }; serde_json::to_value(account_proof).map_err(|error| RpcErr::Internal(error.to_string())) } diff --git a/crates/networking/rpc/eth/filter.rs b/crates/networking/rpc/eth/filter.rs index d8461f984b5..d24267f9dc6 100644 --- a/crates/networking/rpc/eth/filter.rs +++ b/crates/networking/rpc/eth/filter.rs @@ -532,7 +532,7 @@ mod tests { async fn background_job_removes_filter_smoke_test() { // Start a test server to start the cleanup // task in the background - let server_handle = tokio::spawn(async move { start_test_api().await }); + let server_handle = start_test_api().await; // Give the server some time to start tokio::time::sleep(Duration::from_secs(1)).await; diff --git a/crates/networking/rpc/utils.rs b/crates/networking/rpc/utils.rs index 3f43ca169ed..65cb175e804 100644 --- a/crates/networking/rpc/utils.rs +++ b/crates/networking/rpc/utils.rs @@ -364,13 +364,13 @@ pub mod test_utils { // like eth_uninstallFilter. // Here's how you would use it: // ``` - // let server_handle = tokio::spawn(async move { start_stest_api().await }) + // let server_handle = start_stest_api().await; // ... - // assert!(something_that_needs_the_server) + // assert!(something_that_needs_the_server); // ... - // server_handle.abort() + // server_handle.abort(); // ``` - pub async fn start_test_api() { + pub async fn start_test_api() -> tokio::task::JoinHandle<()> { let http_addr: SocketAddr = "127.0.0.1:8500".parse().unwrap(); let authrpc_addr: SocketAddr = "127.0.0.1:8501".parse().unwrap(); let storage = @@ -383,23 +383,25 @@ pub mod test_utils { let jwt_secret = Default::default(); let local_p2p_node = example_p2p_node(); let local_node_record = example_local_node_record(); - start_api( - http_addr, - authrpc_addr, - storage, - blockchain, - jwt_secret, - local_p2p_node, - local_node_record, - SyncManager::dummy(), - PeerHandler::dummy(), - "ethrex/test".to_string(), - None, - None, - String::new(), - ) - .await - .unwrap(); + tokio::spawn(async move { + start_api( + http_addr, + authrpc_addr, + storage, + blockchain, + jwt_secret, + local_p2p_node, + local_node_record, + SyncManager::dummy(), + PeerHandler::dummy(), + "ethrex/test".to_string(), + None, + None, + String::new(), + ) + .await + .unwrap() + }) } pub async fn default_context_with_storage(storage: Store) -> RpcApiContext { diff --git a/crates/storage/api.rs b/crates/storage/api.rs index 051289c73bc..22cefb8f3ed 100644 --- a/crates/storage/api.rs +++ b/crates/storage/api.rs @@ -7,7 +7,7 @@ use std::{fmt::Debug, panic::RefUnwindSafe}; use crate::UpdateBatch; use crate::{error::StoreError, store::STATE_TRIE_SEGMENTS}; -use ethrex_trie::{Nibbles, NodeHash, Trie}; +use ethrex_trie::{Nibbles, Trie}; // We need async_trait because the stabilized feature lacks support for object safety // (i.e. dyn StoreEngine) @@ -220,6 +220,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { &self, hashed_address: H256, storage_root: H256, + state_root: H256, ) -> Result; /// Obtain a state trie from the given state root @@ -227,6 +228,20 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { /// Used for internal store operations fn open_state_trie(&self, state_root: H256) -> Result; + /// Obtain a storage trie from the given address and storage_root + /// Doesn't check if the account is stored + /// Used for internal store operations + fn open_direct_storage_trie( + &self, + hashed_address: H256, + storage_root: H256, + ) -> Result; + + /// Obtain a state trie from the given state root + /// Doesn't check if the state root is valid + /// Used for internal store operations + fn open_direct_state_trie(&self, state_root: H256) -> Result; + /// Obtain a state trie locked for reads from the given state root /// Doesn't check if the state root is valid /// Used for internal store operations @@ -241,8 +256,9 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { &self, hashed_address: H256, storage_root: H256, + state_root: H256, ) -> Result { - self.open_storage_trie(hashed_address, storage_root) + self.open_storage_trie(hashed_address, storage_root, state_root) } async fn forkchoice_update( @@ -339,7 +355,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { async fn write_storage_trie_nodes_batch( &self, - storage_trie_nodes: Vec<(H256, Vec<(NodeHash, Vec)>)>, + storage_trie_nodes: Vec<(H256, Vec<(Nibbles, Vec)>)>, ) -> Result<(), StoreError>; async fn write_account_code_batch( diff --git a/crates/storage/error.rs b/crates/storage/error.rs index 97597b9fb4f..5a87cb21145 100644 --- a/crates/storage/error.rs +++ b/crates/storage/error.rs @@ -36,4 +36,6 @@ pub enum StoreError { IncompatibleChainConfig, #[error("Failed to convert index: {0}")] TryInto(#[from] std::num::TryFromIntError), + #[error("Update batch contains no blocks")] + UpdateBatchNoBlocks, } diff --git a/crates/storage/lib.rs b/crates/storage/lib.rs index 3d3b6e422d4..0e6627ba51e 100644 --- a/crates/storage/lib.rs +++ b/crates/storage/lib.rs @@ -13,3 +13,4 @@ pub use store::{ AccountUpdatesList, EngineType, MAX_SNAPSHOT_READS, STATE_TRIE_SEGMENTS, Store, UpdateBatch, hash_address, hash_key, }; +pub use trie_db::layering::apply_prefix; diff --git a/crates/storage/store.rs b/crates/storage/store.rs index 8fa690c4b04..4a63df85e73 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -1,8 +1,8 @@ -use crate::api::StoreEngine; use crate::error::StoreError; use crate::store_db::in_memory::Store as InMemoryStore; #[cfg(feature = "rocksdb")] use crate::store_db::rocksdb::Store as RocksDBStore; +use crate::{api::StoreEngine, apply_prefix}; use bytes::Bytes; use ethereum_types::{Address, H256, U256}; @@ -16,7 +16,7 @@ use ethrex_common::{ }; use ethrex_rlp::decode::RLPDecode; use ethrex_rlp::encode::RLPEncode; -use ethrex_trie::{Nibbles, NodeHash, Trie, TrieLogger, TrieNode, TrieWitness}; +use ethrex_trie::{Nibbles, NodeRLP, Trie, TrieLogger, TrieNode, TrieWitness}; use sha3::{Digest as _, Keccak256}; use std::sync::Arc; use std::{ @@ -38,7 +38,7 @@ pub struct Store { pub latest_block_header: Arc>, } -pub type StorageTrieNodes = Vec<(H256, Vec<(NodeHash, Vec)>)>; +pub type StorageTrieNodes = Vec<(H256, Vec<(Nibbles, Vec)>)>; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum EngineType { @@ -60,11 +60,11 @@ pub struct UpdateBatch { pub code_updates: Vec<(H256, Bytes)>, } -type StorageUpdates = Vec<(H256, Vec<(NodeHash, Vec)>)>; +type StorageUpdates = Vec<(H256, Vec<(Nibbles, Vec)>)>; pub struct AccountUpdatesList { pub state_trie_hash: H256, - pub state_updates: Vec<(NodeHash, Vec)>, + pub state_updates: Vec<(Nibbles, Vec)>, pub storage_updates: StorageUpdates, pub code_updates: Vec<(H256, Bytes)>, } @@ -383,6 +383,7 @@ impl Store { ) -> Result { let mut ret_storage_updates = Vec::new(); let mut code_updates = Vec::new(); + let state_root = state_trie.hash_no_commit(); for update in account_updates { let hashed_address = hash_address(&update.address); if update.removed { @@ -410,6 +411,7 @@ impl Store { let mut storage_trie = self.engine.open_storage_trie( H256::from_slice(&hashed_address), account_state.storage_root, + state_root, )?; for (storage_key, storage_value) in &update.added_storage { let hashed_key = hash_key(storage_key); @@ -444,6 +446,7 @@ impl Store { account_updates: &[AccountUpdate], mut storage_tries: HashMap, ) -> Result<(Trie, HashMap), StoreError> { + let state_root = state_trie.hash_no_commit(); for update in account_updates.iter() { let hashed_address = hash_address(&update.address); if update.removed { @@ -473,6 +476,7 @@ impl Store { let trie = self.engine.open_storage_trie( H256::from_slice(&hashed_address), account_state.storage_root, + state_root, )?; vacant.insert(TrieLogger::open_trie(trie)) } @@ -500,7 +504,8 @@ impl Store { &self, genesis_accounts: BTreeMap, ) -> Result { - let mut genesis_state_trie = self.engine.open_state_trie(*EMPTY_TRIE_HASH)?; + let mut nodes = HashMap::new(); + let mut genesis_state_trie = self.engine.open_direct_state_trie(*EMPTY_TRIE_HASH)?; for (address, account) in genesis_accounts { let hashed_address = hash_address(&address); // Store account code (as this won't be stored in the trie) @@ -509,14 +514,15 @@ impl Store { // Store the account's storage in a clean storage trie and compute its root let mut storage_trie = self .engine - .open_storage_trie(H256::from_slice(&hashed_address), *EMPTY_TRIE_HASH)?; + .open_direct_storage_trie(H256::from_slice(&hashed_address), *EMPTY_TRIE_HASH)?; for (storage_key, storage_value) in account.storage { if !storage_value.is_zero() { let hashed_key = hash_key(&H256(storage_key.to_big_endian())); storage_trie.insert(hashed_key, storage_value.encode_to_vec())?; } } - let storage_root = storage_trie.hash()?; + let (storage_root, new_nodes) = storage_trie.collect_changes_since_last_hash(); + nodes.insert(H256::from_slice(&hashed_address), new_nodes); // Add account to trie let account_state = AccountState { nonce: account.nonce, @@ -526,7 +532,21 @@ impl Store { }; genesis_state_trie.insert(hashed_address, account_state.encode_to_vec())?; } - genesis_state_trie.hash().map_err(StoreError::Trie) + let (state_root, state_nodes) = genesis_state_trie.collect_changes_since_last_hash(); + + // TODO: replace this with a Store method + genesis_state_trie.db().put_batch( + nodes + .into_iter() + .flat_map(|(account_hash, nodes)| { + nodes + .into_iter() + .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node)) + }) + .chain(state_nodes) + .collect(), + )?; + Ok(state_root) } pub async fn add_receipt( @@ -823,6 +843,9 @@ impl Store { block_hash: BlockHash, address: Address, ) -> Result, StoreError> { + let Some(header) = self.get_block_header_by_hash(block_hash)? else { + return Ok(None); + }; // Fetch Account from state_trie let Some(state_trie) = self.state_trie(block_hash)? else { return Ok(None); @@ -837,6 +860,7 @@ impl Store { Ok(Some(self.engine.open_storage_trie( H256::from_slice(&hashed_address), storage_root, + header.state_root, )?)) } @@ -851,7 +875,7 @@ impl Store { let Some(state_trie) = self.state_trie(block_hash)? else { return Ok(None); }; - self.get_account_state_from_trie(&state_trie, address) + get_account_state_from_trie(&state_trie, address) } pub fn get_account_state_by_hash( @@ -877,31 +901,68 @@ impl Store { Ok(Some(AccountState::decode(&encoded_state)?)) } + /// Constructs a merkle proof for the given account address against a given state. + /// If storage_keys are provided, also constructs the storage proofs for those keys. + /// + /// Returns `None` if the state trie is missing, otherwise returns the proof. pub async fn get_account_proof( &self, - block_number: BlockNumber, - address: &Address, - ) -> Result>>, StoreError> { - let Some(block_hash) = self.get_canonical_block_hash(block_number).await? else { - return Ok(None); - }; - let Some(state_trie) = self.state_trie(block_hash)? else { - return Ok(None); - }; - Ok(Some(state_trie.get_proof(&hash_address(address))).transpose()?) - } - - /// Constructs a merkle proof for the given storage_key in a storage_trie with a known root - pub fn get_storage_proof( - &self, + state_root: H256, address: Address, - storage_root: H256, - storage_key: &H256, - ) -> Result>, StoreError> { - let trie = self - .engine - .open_storage_trie(hash_address_fixed(&address), storage_root)?; - Ok(trie.get_proof(&hash_key(storage_key))?) + storage_keys: &[H256], + ) -> Result, StoreError> { + // TODO: check state root + // let Some(state_trie) = self.open_state_trie(state_trie)? else { + // return Ok(None); + // }; + let state_trie = self.open_state_trie(state_root)?; + let hashed_address = hash_address_fixed(&address); + let address_path = hashed_address.0.to_vec(); + let proof = state_trie.get_proof(&address_path)?; + let account_opt = state_trie + .get(&address_path)? + .map(|encoded_state| AccountState::decode(&encoded_state)) + .transpose()?; + + let mut storage_proof = Vec::with_capacity(storage_keys.len()); + + if let Some(account) = &account_opt { + let storage_trie = self.engine.open_storage_trie( + hashed_address, + account.storage_root, + state_trie.hash_no_commit(), + )?; + + for key in storage_keys { + let hashed_key = hash_key(key); + let proof = storage_trie.get_proof(&hashed_key)?; + let value = storage_trie + .get(&hashed_key)? + .map(|rlp| U256::decode(&rlp).map_err(StoreError::RLPDecode)) + .transpose()? + .unwrap_or_default(); + + let slot_proof = StorageSlotProof { + proof, + key: *key, + value, + }; + storage_proof.push(slot_proof); + } + } else { + storage_proof.extend(storage_keys.iter().map(|key| StorageSlotProof { + proof: Vec::new(), + key: *key, + value: U256::zero(), + })); + } + let account = account_opt.unwrap_or_default(); + let account_proof = AccountProof { + proof, + account, + storage_proof, + }; + Ok(Some(account_proof)) } // Returns an iterator across all accounts in the state trie given by the state_root @@ -942,7 +1003,7 @@ impl Store { let storage_root = AccountState::decode(&account_rlp)?.storage_root; let mut iter = self .engine - .open_locked_storage_trie(hashed_address, storage_root)? + .open_locked_storage_trie(hashed_address, storage_root, state_root)? .into_iter(); iter.advance(starting_slot.0.to_vec())?; Ok(Some(iter.content().map_while(|(path, value)| { @@ -986,9 +1047,9 @@ impl Store { return Ok(None); }; let storage_root = AccountState::decode(&account_rlp)?.storage_root; - let storage_trie = self - .engine - .open_storage_trie(hashed_address, storage_root)?; + let storage_trie = + self.engine + .open_storage_trie(hashed_address, storage_root, state_root)?; let mut proof = storage_trie.get_proof(&starting_hash.as_bytes().to_vec())?; if let Some(last_hash) = last_hash { proof.extend_from_slice(&storage_trie.get_proof(&last_hash.as_bytes().to_vec())?); @@ -1030,9 +1091,11 @@ impl Store { let Ok(hashed_address) = account_path.clone().try_into().map(H256) else { return Ok(vec![]); }; - let storage_trie = self - .engine - .open_storage_trie(hashed_address, account_state.storage_root)?; + let storage_trie = self.engine.open_storage_trie( + hashed_address, + account_state.storage_root, + state_root, + )?; // Fetch storage trie nodes let mut nodes = vec![]; let mut bytes_used = 0; @@ -1073,14 +1136,24 @@ impl Store { self.engine.open_locked_state_trie(state_root) } + pub fn open_direct_storage_trie(&self, addr: H256, root: H256) -> Result { + self.engine.open_direct_storage_trie(addr, root) + } + + pub fn open_direct_state_trie(&self, root: H256) -> Result { + self.engine.open_direct_state_trie(root) + } + /// Obtain a storage trie from the given address and storage_root. /// Doesn't check if the account is stored pub fn open_storage_trie( &self, account_hash: H256, storage_root: H256, + state_root: H256, ) -> Result { - self.engine.open_storage_trie(account_hash, storage_root) + self.engine + .open_storage_trie(account_hash, storage_root, state_root) } /// Obtain a read-locked storage trie from the given address and storage_root. @@ -1089,33 +1162,24 @@ impl Store { &self, account_hash: H256, storage_root: H256, + state_root: H256, ) -> Result { self.engine - .open_locked_storage_trie(account_hash, storage_root) - } - - /// Returns true if the given node is part of the state trie's internal storage - pub fn contains_state_node(&self, node_hash: H256) -> Result { - // Root is irrelevant, we only care about the internal state - Ok(self - .open_state_trie(*EMPTY_TRIE_HASH)? - .db() - .get(node_hash.into())? - .is_some()) + .open_locked_storage_trie(account_hash, storage_root, state_root) } - /// Returns true if the given node is part of the given storage trie's internal storage - pub fn contains_storage_node( - &self, - hashed_address: H256, - node_hash: H256, - ) -> Result { - // Root is irrelevant, we only care about the internal state - Ok(self - .open_storage_trie(hashed_address, *EMPTY_TRIE_HASH)? - .db() - .get(node_hash.into())? - .is_some()) + pub fn has_state_root(&self, state_root: H256) -> Result { + // Empty state trie is always available + if state_root == *EMPTY_TRIE_HASH { + return Ok(true); + } + let trie = self.engine.open_state_trie(state_root)?; + // NOTE: here we hash the root because the trie doesn't check the state root is correct + let Some(root) = trie.db().get(Nibbles::default())? else { + return Ok(false); + }; + let root_hash = ethrex_trie::Node::decode(&root)?.compute_hash().finalize(); + Ok(state_root == root_hash) } /// Sets the hash of the last header downloaded during a snap sync @@ -1252,6 +1316,8 @@ impl Store { .is_some_and(|h| h == block_hash)) } + /// CAUTION: This method writes directly to the underlying database, bypassing any caching layer. + /// For updating the state after block execution, use [`Self::store_block_updates`]. pub async fn write_storage_trie_nodes_batch( &self, storage_trie_nodes: StorageTrieNodes, @@ -1269,6 +1335,29 @@ impl Store { } } +pub struct AccountProof { + pub proof: Vec, + pub account: AccountState, + pub storage_proof: Vec, +} + +pub struct StorageSlotProof { + pub proof: Vec, + pub key: H256, + pub value: U256, +} + +fn get_account_state_from_trie( + state_trie: &Trie, + address: Address, +) -> Result, StoreError> { + let hashed_address = hash_address(&address); + let Some(encoded_state) = state_trie.get(&hashed_address)? else { + return Ok(None); + }; + Ok(Some(AccountState::decode(&encoded_state)?)) +} + pub struct AncestorIterator { store: Store, next_hash: BlockHash, @@ -1408,7 +1497,9 @@ mod tests { }) .collect(); slots.sort_by_key(|a| a.0); - let mut trie = store.open_storage_trie(address, *EMPTY_TRIE_HASH).unwrap(); + let mut trie = store + .open_storage_trie(address, *EMPTY_TRIE_HASH, *EMPTY_TRIE_HASH) + .unwrap(); for (slot, value) in &slots { trie.insert(slot.0.to_vec(), value.encode_to_vec()).unwrap(); } diff --git a/crates/storage/store_db/in_memory.rs b/crates/storage/store_db/in_memory.rs index e2136f44e8f..6668561abe1 100644 --- a/crates/storage/store_db/in_memory.rs +++ b/crates/storage/store_db/in_memory.rs @@ -1,16 +1,28 @@ -use crate::{UpdateBatch, api::StoreEngine, error::StoreError, store::STATE_TRIE_SEGMENTS}; +use crate::{ + UpdateBatch, + api::StoreEngine, + apply_prefix, + error::StoreError, + store::STATE_TRIE_SEGMENTS, + trie_db::layering::{TrieLayerCache, TrieWrapper}, +}; use bytes::Bytes; use ethereum_types::H256; use ethrex_common::types::{ Block, BlockBody, BlockHash, BlockHeader, BlockNumber, ChainConfig, Index, Receipt, }; -use ethrex_trie::{InMemoryTrieDB, Nibbles, NodeHash, Trie}; +use ethrex_trie::{InMemoryTrieDB, Nibbles, Trie, db::NodeMap}; use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, fmt::Debug, - sync::{Arc, Mutex, MutexGuard}, + sync::{Arc, Mutex, MutexGuard, RwLock}, }; -pub type NodeMap = Arc>>>; + +// NOTE: we use a different commit threshold than rocksdb since tests +// require older states to be available +// TODO: solve this in some other way, maybe adding logic for arbitrary +// state access by applying diffs +const COMMIT_THRESHOLD: usize = 10000; #[derive(Default, Clone)] pub struct Store(Arc>); @@ -27,13 +39,13 @@ pub struct StoreInner { // Maps transaction hashes to their blocks (height+hash) and index within the blocks. transaction_locations: HashMap>, receipts: HashMap>, - pub state_trie_nodes: NodeMap, - // A storage trie for each hashed account address - pub storage_trie_nodes: HashMap, + trie_cache: Arc>, + // Contains account trie nodes + state_trie_nodes: NodeMap, pending_blocks: HashMap, // Stores invalid blocks and their latest valid ancestor invalid_ancestors: HashMap, - // Stores current Snap Sate + // Stores current Snap State snap_state: SnapState, } @@ -75,32 +87,59 @@ impl Store { impl StoreEngine for Store { async fn apply_updates(&self, update_batch: UpdateBatch) -> Result<(), StoreError> { let mut store = self.inner()?; + + // Store trie updates { - // store account updates - let mut state_trie_store = store + let mut trie = store + .trie_cache + .write() + .map_err(|_| StoreError::LockError)?; + let parent = update_batch + .blocks + .first() + .ok_or(StoreError::UpdateBatchNoBlocks)? + .header + .parent_hash; + + let pre_state_root = store + .headers + .get(&parent) + .map(|header| header.state_root) + .unwrap_or_default(); + + let last_state_root = update_batch + .blocks + .last() + .ok_or(StoreError::UpdateBatchNoBlocks)? + .header + .state_root; + + let mut state_trie = store .state_trie_nodes .lock() .map_err(|_| StoreError::LockError)?; - for (node_hash, node_data) in update_batch.account_updates { - state_trie_store.insert(node_hash, node_data); - } - } - - // store code updates - for (code_hash, code) in update_batch.code_updates { - store.account_codes.insert(code_hash, code); - } - for (hashed_address, nodes) in update_batch.storage_updates { - let mut addr_store = store - .storage_trie_nodes - .entry(hashed_address) - .or_default() - .lock() - .map_err(|_| StoreError::LockError)?; - for (node_hash, node_data) in nodes { - addr_store.insert(node_hash, node_data); + if let Some(root) = trie.get_commitable(pre_state_root, COMMIT_THRESHOLD) { + let nodes = trie.commit(root).unwrap_or_default(); + for (key, value) in nodes { + if value.is_empty() { + state_trie.remove(&key); + } else { + state_trie.insert(key, value); + } + } } + let key_values = update_batch + .storage_updates + .into_iter() + .flat_map(|(account_hash, nodes)| { + nodes + .into_iter() + .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node)) + }) + .chain(update_batch.account_updates) + .collect(); + trie.put_batch(pre_state_root, last_state_root, key_values); } for block in update_batch.blocks { @@ -130,6 +169,11 @@ impl StoreEngine for Store { } } + // store code updates + for (code_hash, code) in update_batch.code_updates { + store.account_codes.insert(code_hash, code); + } + Ok(()) } @@ -388,19 +432,52 @@ impl StoreEngine for Store { &self, hashed_address: H256, storage_root: H256, + state_root: H256, ) -> Result { - let mut store = self.inner()?; - let trie_backend = store.storage_trie_nodes.entry(hashed_address).or_default(); - let db = Box::new(InMemoryTrieDB::new(trie_backend.clone())); - Ok(Trie::open(db, storage_root)) + let store = self.inner()?; + let trie_backend = store.state_trie_nodes.clone(); + let db = Box::new(InMemoryTrieDB::new(trie_backend)); + let wrap_db = Box::new(TrieWrapper { + state_root, + inner: store.trie_cache.clone(), + db, + prefix: Some(hashed_address), + }); + Ok(Trie::open(wrap_db, storage_root)) } fn open_state_trie(&self, state_root: H256) -> Result { - let trie_backend = self.inner()?.state_trie_nodes.clone(); + let store = self.inner()?; + let trie_backend = store.state_trie_nodes.clone(); + let db = Box::new(InMemoryTrieDB::new(trie_backend)); + let wrap_db = Box::new(TrieWrapper { + state_root, + inner: store.trie_cache.clone(), + db, + prefix: None, + }); + Ok(Trie::open(wrap_db, state_root)) + } + + fn open_direct_state_trie(&self, state_root: H256) -> Result { + let store = self.inner()?; + let trie_backend = store.state_trie_nodes.clone(); let db = Box::new(InMemoryTrieDB::new(trie_backend)); Ok(Trie::open(db, state_root)) } + fn open_direct_storage_trie( + &self, + hashed_address: H256, + storage_root: H256, + ) -> Result { + let store = self.inner()?; + let trie_backend = store.state_trie_nodes.clone(); + let prefix = apply_prefix(Some(hashed_address), Default::default()); + let db = Box::new(InMemoryTrieDB::new_with_prefix(trie_backend, prefix)); + Ok(Trie::open(db, storage_root)) + } + async fn get_block_body_by_hash( &self, block_hash: BlockHash, @@ -588,19 +665,18 @@ impl StoreEngine for Store { async fn write_storage_trie_nodes_batch( &self, - storage_trie_nodes: Vec<(H256, Vec<(NodeHash, Vec)>)>, + storage_trie_nodes: Vec<(H256, Vec<(Nibbles, Vec)>)>, ) -> Result<(), StoreError> { - let mut store = self.inner()?; + let store = self.inner()?; + let mut trie = store + .state_trie_nodes + .lock() + .map_err(|_| StoreError::LockError)?; for (hashed_address, nodes) in storage_trie_nodes { - let mut addr_store = store - .storage_trie_nodes - .entry(hashed_address) - .or_default() - .lock() - .map_err(|_| StoreError::LockError)?; - for (node_hash, node_data) in nodes { - addr_store.insert(node_hash, node_data); + for (node_path, node_data) in nodes { + let full_path = apply_prefix(Some(hashed_address), node_path); + trie.insert(full_path.into_vec(), node_data); } } diff --git a/crates/storage/store_db/rocksdb.rs b/crates/storage/store_db/rocksdb.rs index aec627d3a94..2564a49341a 100644 --- a/crates/storage/store_db/rocksdb.rs +++ b/crates/storage/store_db/rocksdb.rs @@ -1,4 +1,10 @@ -use crate::{rlp::AccountCodeHashRLP, trie_db::rocksdb_locked::RocksDBLockedTrieDB}; +use crate::{ + rlp::AccountCodeHashRLP, + trie_db::{ + layering::{TrieLayerCache, TrieWrapper, apply_prefix}, + rocksdb_locked::RocksDBLockedTrieDB, + }, +}; use bytes::Bytes; use ethrex_common::{ H256, @@ -7,12 +13,16 @@ use ethrex_common::{ Transaction, }, }; -use ethrex_trie::{Nibbles, NodeHash, Trie}; +use ethrex_trie::{Nibbles, Trie}; use rocksdb::{ BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamilyDescriptor, MultiThreaded, OptimisticTransactionDB, Options, WriteBatchWithTransaction, }; -use std::{collections::HashSet, path::Path, sync::Arc}; +use std::{ + collections::HashSet, + path::Path, + sync::{Arc, RwLock}, +}; use tracing::info; use crate::{ @@ -26,6 +36,9 @@ use crate::{ use ethrex_rlp::{decode::RLPDecode, encode::RLPEncode}; use std::fmt::Debug; +// TODO: use finalized hash to determine when to commit +const COMMIT_THRESHOLD: usize = 128; + /// Canonical block hashes column family: [`u8;_`] => [`Vec`] /// - [`u8;_`] = `block_number.to_le_bytes()` /// - [`Vec`] = `BlockHashRLP::from(block_hash).bytes().clone()` @@ -76,20 +89,10 @@ const CF_CHAIN_DATA: &str = "chain_data"; /// - [`Vec`] = `BlockHashRLP::from(block_hash).bytes().clone()` const CF_SNAP_STATE: &str = "snap_state"; -/// State trie nodes column family: [`NodeHash`] => [`Vec`] -/// - [`NodeHash`] = `node_hash.as_ref()` +/// State trie nodes column family: [`Nibbles`] => [`Vec`] +/// - [`Nibbles`] = `node_hash.as_ref()` /// - [`Vec`] = `node_data` -const CF_STATE_TRIE_NODES: &str = "state_trie_nodes"; - -/// Storage tries nodes column family: [`Vec`] => [`Vec`] -/// - [`Vec`] = Composite key -/// ```rust,no_run -/// // let mut key = Vec::with_capacity(64); -/// // key.extend_from_slice(address_hash.as_bytes()); -/// // key.extend_from_slice(node_hash.as_ref()); -/// ``` -/// - [`Vec`] = `node_data` -const CF_STORAGE_TRIES_NODES: &str = "storage_tries_nodes"; +const CF_TRIE_NODES: &str = "trie_nodes"; /// Pending blocks column family: [`Vec`] => [`Vec`] /// - [`Vec`] = `BlockHashRLP::from(block.hash()).bytes().clone()` @@ -104,6 +107,7 @@ const CF_INVALID_ANCESTORS: &str = "invalid_ancestors"; #[derive(Debug)] pub struct Store { db: Arc>, + trie_cache: Arc>, } impl Store { @@ -159,8 +163,7 @@ impl Store { CF_TRANSACTION_LOCATIONS, CF_CHAIN_DATA, CF_SNAP_STATE, - CF_STATE_TRIE_NODES, - CF_STORAGE_TRIES_NODES, + CF_TRIE_NODES, CF_PENDING_BLOCKS, CF_INVALID_ANCESTORS, ]; @@ -229,7 +232,7 @@ impl Store { block_opts.set_cache_index_and_filter_blocks(true); cf_opts.set_block_based_table_factory(&block_opts); } - CF_STATE_TRIE_NODES | CF_STORAGE_TRIES_NODES => { + CF_TRIE_NODES => { cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4); cf_opts.set_write_buffer_size(512 * 1024 * 1024); // 512MB cf_opts.set_max_write_buffer_number(6); @@ -299,7 +302,10 @@ impl Store { } } - Ok(Self { db: Arc::new(db) }) + Ok(Self { + db: Arc::new(db), + trie_cache: Default::default(), + }) } // Helper method to get column family handle @@ -368,7 +374,6 @@ impl Store { batch_ops: Vec<(String, Vec, Vec)>, ) -> Result<(), StoreError> { let db = self.db.clone(); - tokio::task::spawn_blocking(move || { let mut batch = WriteBatchWithTransaction::default(); @@ -441,11 +446,32 @@ impl Store { impl StoreEngine for Store { async fn apply_updates(&self, update_batch: UpdateBatch) -> Result<(), StoreError> { let db = self.db.clone(); + let trie_cache = self.trie_cache.clone(); + + let parent_state_root = self + .get_block_header_by_hash( + update_batch + .blocks + .first() + .ok_or(StoreError::UpdateBatchNoBlocks)? + .header + .parent_hash, + )? + .map(|header| header.state_root) + .unwrap_or_default(); + + let last_state_root = update_batch + .blocks + .last() + .ok_or(StoreError::UpdateBatchNoBlocks)? + .header + .state_root; tokio::task::spawn_blocking(move || { + let _span = tracing::trace_span!("Block DB update").entered(); + let [ - cf_state, - cf_storage, + cf_trie_nodes, cf_receipts, cf_codes, cf_block_numbers, @@ -455,8 +481,7 @@ impl StoreEngine for Store { ] = open_cfs( &db, [ - CF_STATE_TRIE_NODES, - CF_STORAGE_TRIES_NODES, + CF_TRIE_NODES, CF_RECEIPTS, CF_ACCOUNT_CODES, CF_BLOCK_NUMBERS, @@ -469,19 +494,31 @@ impl StoreEngine for Store { let _span = tracing::trace_span!("Block DB update").entered(); let mut batch = WriteBatchWithTransaction::default(); - for (node_hash, node_data) in update_batch.account_updates { - batch.put_cf(&cf_state, node_hash.as_ref(), node_data); - } - - for (address_hash, storage_updates) in update_batch.storage_updates { - for (node_hash, node_data) in storage_updates { - // Key: address_hash + node_hash - let mut key = Vec::with_capacity(64); - key.extend_from_slice(address_hash.as_bytes()); - key.extend_from_slice(node_hash.as_ref()); - batch.put_cf(&cf_storage, key, node_data); + let mut trie = trie_cache.write().map_err(|_| StoreError::LockError)?; + if let Some(root) = trie.get_commitable(parent_state_root, COMMIT_THRESHOLD) { + let nodes = trie.commit(root).unwrap_or_default(); + for (key, value) in nodes { + if value.is_empty() { + batch.delete_cf(&cf_trie_nodes, key); + } else { + batch.put_cf(&cf_trie_nodes, key, value); + } } } + trie.put_batch( + parent_state_root, + last_state_root, + update_batch + .storage_updates + .into_iter() + .flat_map(|(account_hash, nodes)| { + nodes + .into_iter() + .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node)) + }) + .chain(update_batch.account_updates) + .collect(), + ); for block in update_batch.blocks { let block_number = block.header.number; @@ -1071,40 +1108,80 @@ impl StoreEngine for Store { &self, hashed_address: H256, storage_root: H256, + state_root: H256, + ) -> Result { + let db = Box::new(RocksDBTrieDB::new(self.db.clone(), CF_TRIE_NODES, None)?); + let wrap_db = Box::new(TrieWrapper { + state_root, + inner: self.trie_cache.clone(), + db, + prefix: Some(hashed_address), + }); + Ok(Trie::open(wrap_db, storage_root)) + } + + fn open_state_trie(&self, state_root: H256) -> Result { + let db = Box::new(RocksDBTrieDB::new(self.db.clone(), CF_TRIE_NODES, None)?); + let wrap_db = Box::new(TrieWrapper { + state_root, + inner: self.trie_cache.clone(), + db, + prefix: None, + }); + Ok(Trie::open(wrap_db, state_root)) + } + + fn open_direct_storage_trie( + &self, + hashed_address: H256, + storage_root: H256, ) -> Result { let db = Box::new(RocksDBTrieDB::new( self.db.clone(), - CF_STORAGE_TRIES_NODES, + CF_TRIE_NODES, Some(hashed_address), )?); Ok(Trie::open(db, storage_root)) } - fn open_state_trie(&self, state_root: H256) -> Result { - let db = Box::new(RocksDBTrieDB::new( - self.db.clone(), - CF_STATE_TRIE_NODES, - None, - )?); + fn open_direct_state_trie(&self, state_root: H256) -> Result { + let db = Box::new(RocksDBTrieDB::new(self.db.clone(), CF_TRIE_NODES, None)?); Ok(Trie::open(db, state_root)) } fn open_locked_state_trie(&self, state_root: H256) -> Result { - let db = RocksDBLockedTrieDB::new(self.db.clone(), CF_STATE_TRIE_NODES, None)?; - Ok(Trie::open(Box::new(db), state_root)) + let db = Box::new(RocksDBLockedTrieDB::new( + self.db.clone(), + CF_TRIE_NODES, + None, + )?); + let wrap_db = Box::new(TrieWrapper { + state_root, + inner: self.trie_cache.clone(), + db, + prefix: None, + }); + Ok(Trie::open(wrap_db, state_root)) } fn open_locked_storage_trie( &self, hashed_address: H256, storage_root: H256, + state_root: H256, ) -> Result { - let db = RocksDBLockedTrieDB::new( + let db = Box::new(RocksDBLockedTrieDB::new( self.db.clone(), - CF_STORAGE_TRIES_NODES, - Some(hashed_address), - )?; - Ok(Trie::open(Box::new(db), storage_root)) + CF_TRIE_NODES, + None, + )?); + let wrap_db = Box::new(TrieWrapper { + state_root, + inner: self.trie_cache.clone(), + db, + prefix: Some(hashed_address), + }); + Ok(Trie::open(wrap_db, storage_root)) } async fn forkchoice_update( @@ -1361,21 +1438,31 @@ impl StoreEngine for Store { async fn write_storage_trie_nodes_batch( &self, - storage_trie_nodes: Vec<(H256, Vec<(NodeHash, Vec)>)>, + storage_trie_nodes: Vec<(H256, Vec<(Nibbles, Vec)>)>, ) -> Result<(), StoreError> { - let mut batch_ops = Vec::new(); + let db = self.db.clone(); + tokio::task::spawn_blocking(move || { + let mut batch = WriteBatchWithTransaction::default(); + let cf = db.cf_handle(CF_TRIE_NODES).ok_or_else(|| { + StoreError::Custom("Column family not found: CF_TRIE_NODES".to_string()) + })?; - for (address_hash, nodes) in storage_trie_nodes { - for (node_hash, node_data) in nodes { - // Create composite key: address_hash + node_hash - let mut key = Vec::with_capacity(64); - key.extend_from_slice(address_hash.as_bytes()); - key.extend_from_slice(node_hash.as_ref()); - batch_ops.push((CF_STORAGE_TRIES_NODES.to_string(), key, node_data)); + for (address_hash, nodes) in storage_trie_nodes { + for (node_hash, node_data) in nodes { + let key = apply_prefix(Some(address_hash), node_hash); + if node_data.is_empty() { + batch.delete_cf(&cf, key.as_ref()); + } else { + batch.put_cf(&cf, key.as_ref(), node_data); + } + } } - } - self.write_batch_async(batch_ops).await + db.write(batch) + .map_err(|e| StoreError::Custom(format!("RocksDB batch write error: {}", e))) + }) + .await + .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))? } async fn write_account_code_batch( diff --git a/crates/storage/trie_db/layering.rs b/crates/storage/trie_db/layering.rs new file mode 100644 index 00000000000..1154df48118 --- /dev/null +++ b/crates/storage/trie_db/layering.rs @@ -0,0 +1,159 @@ +use ethrex_common::H256; +use ethrex_rlp::decode::RLPDecode; +use std::{collections::HashMap, sync::Arc, sync::RwLock}; + +use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Node, TrieDB, TrieError}; + +#[derive(Debug)] +struct TrieLayer { + nodes: HashMap, Vec>, + parent: H256, + id: usize, +} + +#[derive(Debug, Default)] +pub struct TrieLayerCache { + /// Monotonically increasing ID for layers, starting at 1. + /// TODO: this implementation panics on overflow + last_id: usize, + layers: HashMap, +} + +impl TrieLayerCache { + pub fn get(&self, state_root: H256, key: Nibbles) -> Option> { + let mut current_state_root = state_root; + while let Some(layer) = self.layers.get(¤t_state_root) { + if let Some(value) = layer.nodes.get(key.as_ref()) { + return Some(value.clone()); + } + current_state_root = layer.parent; + if current_state_root == state_root { + // TODO: check if this is possible in practice + // This can't happen in L1, due to system contracts irreversibly modifying state + // at each block. + // On L2, if no transactions are included in a block, the state root remains the same, + // but we handle that case in put_batch. It may happen, however, if someone modifies + // state with a privileged tx and later reverts it (since it doesn't update nonce). + panic!("State cycle found"); + } + } + None + } + + // TODO: use finalized hash to know when to commit + pub fn get_commitable( + &mut self, + mut state_root: H256, + commit_threshold: usize, + ) -> Option { + let mut counter = 0; + while let Some(layer) = self.layers.get(&state_root) { + state_root = layer.parent; + counter += 1; + if counter > commit_threshold { + return Some(state_root); + } + } + None + } + + pub fn put_batch( + &mut self, + parent: H256, + state_root: H256, + key_values: Vec<(Nibbles, Vec)>, + ) { + if parent == state_root && key_values.is_empty() { + return; + } else if parent == state_root { + tracing::error!("Inconsistent state: parent == state_root but key_values not empty"); + return; + } + self.layers + .entry(state_root) + .or_insert_with(|| { + self.last_id += 1; + TrieLayer { + nodes: HashMap::new(), + parent, + id: self.last_id, + } + }) + .nodes + .extend( + key_values + .into_iter() + .map(|(path, node)| (path.into_vec(), node)), + ); + } + + pub fn commit(&mut self, state_root: H256) -> Option, Vec)>> { + let mut layer = self.layers.remove(&state_root)?; + // ensure parents are commited + let parent_nodes = self.commit(layer.parent); + // older layers are useless + self.layers.retain(|_, item| item.id > layer.id); + Some( + parent_nodes + .unwrap_or_default() + .into_iter() + .chain(layer.nodes.drain()) + .collect(), + ) + } +} + +pub struct TrieWrapper { + pub state_root: H256, + pub inner: Arc>, + pub db: Box, + pub prefix: Option, +} + +pub fn apply_prefix(prefix: Option, path: Nibbles) -> Nibbles { + // Apply a prefix with an invalid nibble (17) as a separator, to + // differentiate between a state trie value and a storage trie root. + match prefix { + Some(prefix) => Nibbles::from_bytes(prefix.as_bytes()) + .append_new(17) + .concat(&path), + None => path, + } +} + +impl TrieDB for TrieWrapper { + fn get(&self, key: Nibbles) -> Result>, TrieError> { + let key = apply_prefix(self.prefix, key); + if let Some(value) = self + .inner + .read() + .map_err(|_| TrieError::LockError)? + .get(self.state_root, key.clone()) + { + return Ok(Some(value)); + } + self.db.get(key) + } + + fn put_batch(&self, key_values: Vec<(Nibbles, Vec)>) -> Result<(), TrieError> { + // TODO: this is unused, because we call `TrieLayerCache::put_batch` directly + let last_pair = key_values.iter().rev().find(|(_path, rlp)| !rlp.is_empty()); + let new_state_root = match last_pair { + Some((_, noderlp)) => { + let root_node = Node::decode(noderlp)?; + root_node.compute_hash().finalize() + } + None => *EMPTY_TRIE_HASH, + }; + let mut inner = self.inner.write().map_err(|_| TrieError::LockError)?; + inner.put_batch( + self.state_root, + new_state_root, + key_values + .into_iter() + .map(move |(path, node)| (apply_prefix(self.prefix, path), node)) + .collect(), + ); + Ok(()) + } +} diff --git a/crates/storage/trie_db/mod.rs b/crates/storage/trie_db/mod.rs index 2a8abfb0072..b6510a1fb3e 100644 --- a/crates/storage/trie_db/mod.rs +++ b/crates/storage/trie_db/mod.rs @@ -2,3 +2,5 @@ pub mod rocksdb; #[cfg(feature = "rocksdb")] pub mod rocksdb_locked; + +pub mod layering; diff --git a/crates/storage/trie_db/rocksdb.rs b/crates/storage/trie_db/rocksdb.rs index 9983ce02bf8..bcca18aaba8 100644 --- a/crates/storage/trie_db/rocksdb.rs +++ b/crates/storage/trie_db/rocksdb.rs @@ -1,9 +1,11 @@ use ethrex_common::H256; use ethrex_rlp::encode::RLPEncode; -use ethrex_trie::{Node, NodeHash, TrieDB, error::TrieError}; +use ethrex_trie::{Nibbles, Node, TrieDB, error::TrieError}; use rocksdb::{MultiThreaded, OptimisticTransactionDB}; use std::sync::Arc; +use crate::trie_db::layering::apply_prefix; + /// RocksDB implementation for the TrieDB trait, with get and put operations. pub struct RocksDBTrieDB { /// RocksDB database @@ -41,39 +43,36 @@ impl RocksDBTrieDB { .ok_or_else(|| TrieError::DbError(anyhow::anyhow!("Column family not found"))) } - fn make_key(&self, node_hash: &NodeHash) -> Vec { - match &self.address_prefix { - Some(address) => { - // For storage tries, prefix with address - let mut key = address.as_bytes().to_vec(); - key.extend_from_slice(node_hash.as_ref()); - key - } - None => { - // For state trie, use node hash directly - node_hash.as_ref().to_vec() - } - } + fn make_key(&self, node_hash: Nibbles) -> Vec { + apply_prefix(self.address_prefix, node_hash) + .as_ref() + .to_vec() } } impl TrieDB for RocksDBTrieDB { - fn get(&self, key: NodeHash) -> Result>, TrieError> { + fn get(&self, key: Nibbles) -> Result>, TrieError> { let cf = self.cf_handle()?; - let db_key = self.make_key(&key); + let db_key = self.make_key(key); - self.db - .get_cf(&cf, db_key) - .map_err(|e| TrieError::DbError(anyhow::anyhow!("RocksDB get error: {}", e))) + let res = self + .db + .get_cf(&cf, &db_key) + .map_err(|e| TrieError::DbError(anyhow::anyhow!("RocksDB get error: {}", e)))?; + Ok(res) } - fn put_batch(&self, key_values: Vec<(NodeHash, Vec)>) -> Result<(), TrieError> { + fn put_batch(&self, key_values: Vec<(Nibbles, Vec)>) -> Result<(), TrieError> { let cf = self.cf_handle()?; let mut batch = rocksdb::WriteBatchWithTransaction::default(); for (key, value) in key_values { - let db_key = self.make_key(&key); - batch.put_cf(&cf, db_key, value); + let db_key = self.make_key(key); + if value.is_empty() { + batch.delete_cf(&cf, db_key); + } else { + batch.put_cf(&cf, db_key, value); + } } self.db @@ -81,14 +80,14 @@ impl TrieDB for RocksDBTrieDB { .map_err(|e| TrieError::DbError(anyhow::anyhow!("RocksDB batch write error: {}", e))) } - fn put_batch_no_alloc(&self, key_values: &[(NodeHash, Node)]) -> Result<(), TrieError> { + fn put_batch_no_alloc(&self, key_values: &[(Nibbles, Node)]) -> Result<(), TrieError> { let cf = self.cf_handle()?; let mut batch = rocksdb::WriteBatchWithTransaction::default(); // 532 is the maximum size of an encoded branch node. let mut buffer = Vec::with_capacity(532); for (hash, node) in key_values { - let db_key = self.make_key(hash); + let db_key = self.make_key(hash.clone()); buffer.clear(); node.encode(&mut buffer); batch.put_cf(&cf, db_key, &buffer); @@ -103,7 +102,7 @@ impl TrieDB for RocksDBTrieDB { #[cfg(test)] mod tests { use super::*; - use ethrex_trie::NodeHash; + use ethrex_trie::Nibbles; use rocksdb::{ColumnFamilyDescriptor, MultiThreaded, Options}; use tempfile::TempDir; @@ -130,12 +129,12 @@ mod tests { let trie_db = RocksDBTrieDB::new(db, "test_cf", None).unwrap(); // Test data - let node_hash = NodeHash::from(H256::from([1u8; 32])); + let node_hash = Nibbles::from_hex(vec![1]); let node_data = vec![1, 2, 3, 4, 5]; // Test put_batch trie_db - .put_batch(vec![(node_hash, node_data.clone())]) + .put_batch(vec![(node_hash.clone(), node_data.clone())]) .unwrap(); // Test get @@ -143,7 +142,7 @@ mod tests { assert_eq!(retrieved_data, node_data); // Test get nonexistent - let nonexistent_hash = NodeHash::from(H256::from([2u8; 32])); + let nonexistent_hash = Nibbles::from_hex(vec![2]); assert!(trie_db.get(nonexistent_hash).unwrap().is_none()); } @@ -171,12 +170,12 @@ mod tests { let trie_db = RocksDBTrieDB::new(db, "test_cf", Some(address)).unwrap(); // Test data - let node_hash = NodeHash::from(H256::from([1u8; 32])); + let node_hash = Nibbles::from_hex(vec![1]); let node_data = vec![1, 2, 3, 4, 5]; // Test put_batch trie_db - .put_batch(vec![(node_hash, node_data.clone())]) + .put_batch(vec![(node_hash.clone(), node_data.clone())]) .unwrap(); // Test get @@ -207,10 +206,11 @@ mod tests { let trie_db = RocksDBTrieDB::new(db, "test_cf", None).unwrap(); // Test data + // NOTE: we don't use the same paths to avoid overwriting in the batch let batch_data = vec![ - (NodeHash::from(H256::from([1u8; 32])), vec![1, 2, 3]), - (NodeHash::from(H256::from([2u8; 32])), vec![4, 5, 6]), - (NodeHash::from(H256::from([3u8; 32])), vec![7, 8, 9]), + (Nibbles::from_hex(vec![1]), vec![1, 2, 3]), + (Nibbles::from_hex(vec![1, 2]), vec![4, 5, 6]), + (Nibbles::from_hex(vec![1, 2, 3]), vec![7, 8, 9]), ]; // Test batch put diff --git a/crates/storage/trie_db/rocksdb_locked.rs b/crates/storage/trie_db/rocksdb_locked.rs index a843ba46052..ed3f8f8e781 100644 --- a/crates/storage/trie_db/rocksdb_locked.rs +++ b/crates/storage/trie_db/rocksdb_locked.rs @@ -1,8 +1,10 @@ use ethrex_common::H256; -use ethrex_trie::{NodeHash, TrieDB, error::TrieError}; +use ethrex_trie::{Nibbles, TrieDB, error::TrieError}; use rocksdb::{MultiThreaded, OptimisticTransactionDB, SnapshotWithThreadMode}; use std::sync::Arc; +use crate::trie_db::layering::apply_prefix; + /// RocksDB locked implementation for the TrieDB trait, read-only with consistent snapshot. pub struct RocksDBLockedTrieDB { /// RocksDB database @@ -40,19 +42,10 @@ impl RocksDBLockedTrieDB { }) } - fn make_key(&self, node_hash: NodeHash) -> Vec { - match &self.address_prefix { - Some(address) => { - // For storage tries, prefix with address - let mut key = address.as_bytes().to_vec(); - key.extend_from_slice(node_hash.as_ref()); - key - } - None => { - // For state trie, use node hash directly - node_hash.as_ref().to_vec() - } - } + fn make_key(&self, node_hash: Nibbles) -> Vec { + apply_prefix(self.address_prefix, node_hash) + .as_ref() + .to_vec() } } @@ -69,7 +62,7 @@ impl Drop for RocksDBLockedTrieDB { } impl TrieDB for RocksDBLockedTrieDB { - fn get(&self, key: NodeHash) -> Result>, TrieError> { + fn get(&self, key: Nibbles) -> Result>, TrieError> { let db_key = self.make_key(key); self.snapshot @@ -77,7 +70,7 @@ impl TrieDB for RocksDBLockedTrieDB { .map_err(|e| TrieError::DbError(anyhow::anyhow!("RocksDB snapshot get error: {}", e))) } - fn put_batch(&self, _key_values: Vec<(NodeHash, Vec)>) -> Result<(), TrieError> { + fn put_batch(&self, _key_values: Vec<(Nibbles, Vec)>) -> Result<(), TrieError> { Err(TrieError::DbError(anyhow::anyhow!( "LockedTrie is read-only" ))) diff --git a/docs/CLI.md b/docs/CLI.md index 2e05cc9c63e..1aa98f9a9ed 100644 --- a/docs/CLI.md +++ b/docs/CLI.md @@ -126,7 +126,7 @@ Block producer options: --block-producer.extra-data Block extra data message. - [default: "ethrex 0.1.0"] + [default: "ethrex 3.0.0"] ``` diff --git a/tooling/archive_sync/src/main.rs b/tooling/archive_sync/src/main.rs index a4080d4861b..5417fefd563 100644 --- a/tooling/archive_sync/src/main.rs +++ b/tooling/archive_sync/src/main.rs @@ -133,7 +133,7 @@ pub async fn archive_sync( /// This could be improved in the future to use an in_memory trie with async db writes async fn process_dump(dump: Dump, store: Store, current_root: H256) -> eyre::Result { let mut storage_tasks = JoinSet::new(); - let mut state_trie = store.open_state_trie(current_root)?; + let mut state_trie = store.open_direct_state_trie(current_root)?; for (address, dump_account) in dump.accounts.into_iter() { let hashed_address = dump_account .hashed_address @@ -172,7 +172,7 @@ async fn process_dump_storage( hashed_address: H256, storage_root: H256, ) -> eyre::Result<()> { - let mut trie = store.open_storage_trie(hashed_address, *EMPTY_TRIE_HASH)?; + let mut trie = store.open_direct_storage_trie(hashed_address, *EMPTY_TRIE_HASH)?; for (key, val) in dump_storage { // The key we receive is the preimage of the one stored in the trie trie.insert(keccak(key.0).0.to_vec(), val.encode_to_vec())?; diff --git a/tooling/ef_tests/blockchain/test_runner.rs b/tooling/ef_tests/blockchain/test_runner.rs index d8d463e13f2..481ca64763d 100644 --- a/tooling/ef_tests/blockchain/test_runner.rs +++ b/tooling/ef_tests/blockchain/test_runner.rs @@ -309,6 +309,7 @@ fn check_prestate_against_db(test_key: &str, test: &TestUnit, db: &Store) { test_state_root, db_block_header.state_root, "Mismatched genesis state root for database, test: {test_key}" ); + assert!(db.has_state_root(test_state_root).unwrap()); } /// Checks that all accounts in the post-state are present and have the correct values in the DB @@ -394,8 +395,10 @@ async fn re_run_stateless( if test_should_fail && witness.is_err() { // We can't generate witness for a test that should fail. return Ok(()); - } else if !test_should_fail && witness.is_err() { - return Err("Failed to create witness for a test that should not fail".into()); + } else if !test_should_fail && let Err(err) = witness { + return Err(format!( + "Failed to create witness for a test that should not fail: {err}" + )); } // At this point witness is guaranteed to be Ok let execution_witness = witness.unwrap();