Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 9 additions & 18 deletions crates/networking/p2p/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ use std::{
};
use storage_healing::storage_healer;
use tokio::{
sync::{
mpsc::{self, error::SendError},
Mutex,
},
sync::{mpsc::error::SendError, Mutex},
time::{Duration, Instant},
};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -599,13 +596,13 @@ impl Syncer {
));
};
// Spawn storage healer earlier so we can start healing stale storages
let (storage_healer_sender, storage_healer_receiver) =
mpsc::channel::<Vec<H256>>(MAX_CHANNEL_MESSAGES);
// Create a cancellation token so we can end the storage healer when finished, make it a child so that it also ends upon shutdown
let storage_healer_cancell_token = self.cancel_token.child_token();
let storage_healer_handler = tokio::spawn(storage_healer(
state_root,
storage_healer_receiver,
self.peers.clone(),
store.clone(),
storage_healer_cancell_token.clone(),
));
// Perform state sync if it was not already completed on a previous cycle
// Retrieve storage data to check which snap sync phase we are in
Expand All @@ -628,12 +625,11 @@ impl Syncer {
.unwrap()
.storage_rebuilder_sender
.clone(),
storage_healer_sender.clone(),
)
.await?;
if stale_pivot {
warn!("Stale Pivot, aborting state sync");
storage_healer_sender.send(vec![]).await?;
storage_healer_cancell_token.cancel();
storage_healer_handler.await??;
return Ok(false);
}
Expand All @@ -650,15 +646,10 @@ impl Syncer {
store.clear_snapshot().await?;

// Perform Healing
let state_heal_complete = heal_state_trie(
state_root,
store.clone(),
self.peers.clone(),
storage_healer_sender.clone(),
)
.await?;
// Send empty batch to signal that no more batches are incoming
storage_healer_sender.send(vec![]).await?;
let state_heal_complete =
heal_state_trie(state_root, store.clone(), self.peers.clone()).await?;
// Wait for storage healer to end
storage_healer_cancell_token.cancel();
let storage_heal_complete = storage_healer_handler.await??;
if !(state_heal_complete && storage_heal_complete) {
warn!("Stale pivot, aborting healing");
Expand Down
12 changes: 8 additions & 4 deletions crates/networking/p2p/sync/state_healing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub(crate) async fn heal_state_trie(
state_root: H256,
store: Store,
peers: PeerHandler,
storage_healer_sender: Sender<Vec<H256>>,
) -> Result<bool, SyncError> {
let mut paths = store.get_state_heal_paths()?.unwrap_or_default();
// Spawn a bytecode fetcher for this block
Expand All @@ -59,7 +58,6 @@ pub(crate) async fn heal_state_trie(
batch,
peers.clone(),
store.clone(),
storage_healer_sender.clone(),
bytecode_sender.clone(),
));
// End loop if we have no more paths to fetch
Expand Down Expand Up @@ -98,7 +96,6 @@ async fn heal_state_batch(
mut batch: Vec<Nibbles>,
peers: PeerHandler,
store: Store,
storage_sender: Sender<Vec<H256>>,
bytecode_sender: Sender<Vec<H256>>,
) -> Result<(Vec<Nibbles>, bool), SyncError> {
if let Some(nodes) = peers
Expand Down Expand Up @@ -144,7 +141,14 @@ async fn heal_state_batch(
}
// Send storage & bytecode requests
if !hashed_addresses.is_empty() {
storage_sender.send(hashed_addresses).await?;
store
.set_storage_heal_paths(
hashed_addresses
.into_iter()
.map(|hash| (hash, vec![Nibbles::default()]))
.collect(),
)
.await?;
}
if !code_hashes.is_empty() {
bytecode_sender.send(code_hashes).await?;
Expand Down
4 changes: 0 additions & 4 deletions crates/networking/p2p/sync/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub(crate) async fn state_sync(
peers: PeerHandler,
key_checkpoints: Option<[H256; STATE_TRIE_SEGMENTS]>,
storage_trie_rebuilder_sender: Sender<Vec<(H256, H256)>>,
storage_healer_sender: Sender<Vec<H256>>,
) -> Result<bool, SyncError> {
// Spawn tasks to fetch each state trie segment
let mut state_trie_tasks = tokio::task::JoinSet::new();
Expand All @@ -58,7 +57,6 @@ pub(crate) async fn state_sync(
key_checkpoints.map(|chs| chs[i]),
state_sync_progress.clone(),
storage_trie_rebuilder_sender.clone(),
storage_healer_sender.clone(),
));
}
show_progress_handle.await?;
Expand Down Expand Up @@ -91,7 +89,6 @@ async fn state_sync_segment(
checkpoint: Option<H256>,
state_sync_progress: StateSyncProgress,
storage_trie_rebuilder_sender: Sender<Vec<(H256, H256)>>,
storage_healer_sender: Sender<Vec<H256>>,
) -> Result<(usize, bool, H256), SyncError> {
// Resume download from checkpoint if available or start from an empty trie
let mut start_account_hash = checkpoint.unwrap_or(STATE_TRIE_SEGMENTS_START[segment_number]);
Expand Down Expand Up @@ -125,7 +122,6 @@ async fn state_sync_segment(
store.clone(),
state_root,
storage_trie_rebuilder_sender.clone(),
storage_healer_sender.clone(),
));
info!("Starting/Resuming state trie download of segment number {segment_number} from key {start_account_hash}");
// Fetch Account Ranges
Expand Down
29 changes: 16 additions & 13 deletions crates/networking/p2p/sync/storage_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use std::cmp::min;

use ethrex_common::H256;
use ethrex_storage::Store;
use ethrex_trie::Nibbles;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tracing::{debug, info};
use tracing::debug;

use crate::{
peer_handler::PeerHandler,
Expand Down Expand Up @@ -40,7 +41,6 @@ pub(crate) async fn storage_fetcher(
store: Store,
state_root: H256,
storage_trie_rebuilder_sender: Sender<Vec<(H256, H256)>>,
storage_healer_sender: Sender<Vec<H256>>,
) -> Result<(), SyncError> {
// Spawn large storage fetcher
let (large_storage_sender, large_storage_receiver) =
Expand All @@ -51,7 +51,6 @@ pub(crate) async fn storage_fetcher(
store.clone(),
state_root,
storage_trie_rebuilder_sender.clone(),
storage_healer_sender.clone(),
));
// Pending list of storages to fetch
let mut pending_storage: Vec<(H256, H256)> = vec![];
Expand Down Expand Up @@ -117,8 +116,13 @@ pub(crate) async fn storage_fetcher(
pending_storage.len()
);
if !pending_storage.is_empty() {
storage_healer_sender
.send(pending_storage.into_iter().map(|(hash, _)| hash).collect())
store
.set_storage_heal_paths(
pending_storage
.into_iter()
.map(|(hash, _)| (hash, vec![Nibbles::default()]))
.collect(),
)
.await?;
}
// Signal large storage fetcher
Expand Down Expand Up @@ -197,7 +201,6 @@ async fn large_storage_fetcher(
store: Store,
state_root: H256,
storage_trie_rebuilder_sender: Sender<Vec<(H256, H256)>>,
storage_healer_sender: Sender<Vec<H256>>,
) -> Result<(), SyncError> {
// Pending list of storages to fetch
// (account_hash, storage_root, last_key)
Expand Down Expand Up @@ -250,22 +253,22 @@ async fn large_storage_fetcher(
}
}
}
info!(
debug!(
"Concluding large storage fetcher, {} large storages left in queue to be healed later",
pending_storage.len()
);
if !pending_storage.is_empty() {
// Send incomplete storages to the rebuilder and healer
// As these are large storages we should rebuild the partial tries instead of delegating them fully to the healer
let account_hashes: Vec<H256> = pending_storage
.into_iter()
.map(|req| req.account_hash)
let heal_paths = pending_storage
.iter()
.map(|req| (req.account_hash, vec![Nibbles::default()]))
.collect();
let account_hashes_and_roots: Vec<(H256, H256)> = account_hashes
let account_hashes_and_roots: Vec<(H256, H256)> = pending_storage
.iter()
.map(|hash| (*hash, REBUILDER_INCOMPLETE_STORAGE_ROOT))
.map(|req| (req.account_hash, REBUILDER_INCOMPLETE_STORAGE_ROOT))
.collect();
storage_healer_sender.send(account_hashes).await?;
store.set_storage_heal_paths(heal_paths).await?;
storage_trie_rebuilder_sender
.send(account_hashes_and_roots)
.await?;
Expand Down
56 changes: 21 additions & 35 deletions crates/networking/p2p/sync/storage_healing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,49 @@ use std::collections::BTreeMap;
use ethrex_common::H256;
use ethrex_storage::Store;
use ethrex_trie::{Nibbles, EMPTY_TRIE_HASH};
use tokio::sync::mpsc::Receiver;
use tokio_util::sync::CancellationToken;
use tracing::debug;

use crate::{peer_handler::PeerHandler, sync::node_missing_children};

use super::{SyncError, MAX_CHANNEL_READS, MAX_PARALLEL_FETCHES, NODE_BATCH_SIZE};
/// Minumum amount of storages to keep in the storage healer queue
/// More paths will be read from the Store if the amount goes below this value
const MINUMUM_STORAGES_IN_QUEUE: usize = 400;

use super::{SyncError, MAX_PARALLEL_FETCHES, NODE_BATCH_SIZE};

/// Waits for incoming hashed addresses from the receiver channel endpoint and queues the associated root nodes for state retrieval
/// Also retrieves their children nodes until we have the full storage trie stored
/// If the state becomes stale while fetching, returns its current queued account hashes
// Returns true if there are no more pending storages in the queue (aka storage healing was completed)
pub(crate) async fn storage_healer(
state_root: H256,
mut receiver: Receiver<Vec<H256>>,
peers: PeerHandler,
store: Store,
cancel_token: CancellationToken,
) -> Result<bool, SyncError> {
let mut pending_paths: BTreeMap<H256, Vec<Nibbles>> = store
.get_storage_heal_paths()?
.unwrap_or_default()
.into_iter()
.collect();
// List of paths in need of healing, grouped by hashed address
let mut pending_paths = BTreeMap::<H256, Vec<Nibbles>>::new();
// The pivot may become stale while the fetcher is active, we will still keep the process
// alive until the end signal so we don't lose queued messages
let mut stale = false;
let mut incoming = true;
while incoming {
while !(stale || cancel_token.is_cancelled()) {
// If we have few storages in queue, fetch more from the store
// We won't be retrieving all of them as the read can become quite long and we may not end up using all of the paths in this cycle
if pending_paths.len() < MINUMUM_STORAGES_IN_QUEUE {
pending_paths.extend(
store
.take_storage_heal_paths(MINUMUM_STORAGES_IN_QUEUE)
.await?
.into_iter(),
);
}
// If we have enough pending storages to fill a batch
// or if we have no more incoming batches, spawn a fetch process
// If the pivot became stale don't process anything and just save incoming requests
let mut storage_tasks = tokio::task::JoinSet::new();
let mut task_num = 0;
while !stale && !pending_paths.is_empty() && task_num < MAX_PARALLEL_FETCHES {
while !pending_paths.is_empty() && task_num < MAX_PARALLEL_FETCHES {
let mut next_batch: BTreeMap<H256, Vec<Nibbles>> = BTreeMap::new();
// Fill batch
let mut batch_size = 0;
Expand All @@ -65,30 +75,6 @@ pub(crate) async fn storage_healer(
pending_paths.extend(remaining);
stale |= is_stale;
}

// Read incoming requests that are already awaiting on the receiver
// Don't wait for requests unless we have no pending paths left
if incoming && (!receiver.is_empty() || pending_paths.is_empty()) {
// Fetch incoming requests
let mut msg_buffer = vec![];
if receiver.recv_many(&mut msg_buffer, MAX_CHANNEL_READS).await != 0 {
for account_hashes in msg_buffer {
if !account_hashes.is_empty() {
pending_paths.extend(
account_hashes
.into_iter()
.map(|acc_path| (acc_path, vec![Nibbles::default()])),
);
} else {
// Empty message signaling no more bytecodes to sync
incoming = false
}
}
} else {
// Disconnect
incoming = false
}
}
}
let healing_complete = pending_paths.is_empty();
// Store pending paths
Expand Down
5 changes: 4 additions & 1 deletion crates/networking/p2p/sync/trie_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use ethrex_common::{BigEndianHash, H256, U256, U512};
use ethrex_rlp::encode::RLPEncode;
use ethrex_storage::{Store, MAX_SNAPSHOT_READS, STATE_TRIE_SEGMENTS};
use ethrex_trie::EMPTY_TRIE_HASH;
use ethrex_trie::{Nibbles, EMPTY_TRIE_HASH};
use std::array;
use tokio::{
sync::mpsc::{channel, Receiver, Sender},
Expand Down Expand Up @@ -272,6 +272,9 @@ async fn rebuild_storage_trie(
}
if expected_root != REBUILDER_INCOMPLETE_STORAGE_ROOT && storage_trie.hash()? != expected_root {
warn!("Mismatched storage root for account {account_hash}");
store
.set_storage_heal_paths(vec![(account_hash, vec![Nibbles::default()])])
.await?;
}
Ok(())
}
Expand Down
9 changes: 7 additions & 2 deletions crates/storage/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,15 +273,20 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe {
&self,
) -> Result<Option<[H256; STATE_TRIE_SEGMENTS]>, StoreError>;

/// Sets the storage trie paths in need of healing, grouped by hashed address
/// Sets storage trie paths in need of healing, grouped by hashed address
/// This will overwite previously stored paths for the received storages but will not remove other storage's paths
async fn set_storage_heal_paths(
&self,
accounts: Vec<(H256, Vec<Nibbles>)>,
) -> Result<(), StoreError>;

/// Gets the storage trie paths in need of healing, grouped by hashed address
/// Gets paths from at most `limit` storage tries and removes them from the store
#[allow(clippy::type_complexity)]
fn get_storage_heal_paths(&self) -> Result<Option<Vec<(H256, Vec<Nibbles>)>>, StoreError>;
async fn take_storage_heal_paths(
&self,
limit: usize,
) -> Result<Vec<(H256, Vec<Nibbles>)>, StoreError>;

/// Sets the state trie paths in need of healing
async fn set_state_heal_paths(&self, paths: Vec<Nibbles>) -> Result<(), StoreError>;
Expand Down
2 changes: 2 additions & 0 deletions crates/storage/rlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use ethrex_common::{
H256,
};
use ethrex_rlp::{decode::RLPDecode, encode::RLPEncode};
use ethrex_trie::Nibbles;
#[cfg(feature = "libmdbx")]
use libmdbx::orm::{Decodable, Encodable};
#[cfg(feature = "redb")]
Expand All @@ -21,6 +22,7 @@ pub type AccountCodeHashRLP = Rlp<H256>;
pub type AccountCodeRLP = Rlp<Bytes>;
pub type AccountHashRLP = Rlp<H256>;
pub type AccountStateRLP = Rlp<AccountState>;
pub type TriePathsRLP = Rlp<Vec<Nibbles>>;

// Block types
pub type BlockHashRLP = Rlp<BlockHash>;
Expand Down
15 changes: 10 additions & 5 deletions crates/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -952,18 +952,23 @@ impl Store {
self.engine.get_state_trie_key_checkpoint()
}

/// Sets the storage trie paths in need of healing, grouped by hashed address
/// Sets storage trie paths in need of healing, grouped by hashed address
/// This will overwite previously stored paths for the received storages but will not remove other storage's paths
pub async fn set_storage_heal_paths(
&self,
accounts: Vec<(H256, Vec<Nibbles>)>,
paths: Vec<(H256, Vec<Nibbles>)>,
) -> Result<(), StoreError> {
self.engine.set_storage_heal_paths(accounts).await
self.engine.set_storage_heal_paths(paths).await
}

/// Gets the storage trie paths in need of healing, grouped by hashed address
/// Gets paths from at most `limit` storage tries and removes them from the Store
#[allow(clippy::type_complexity)]
pub fn get_storage_heal_paths(&self) -> Result<Option<Vec<(H256, Vec<Nibbles>)>>, StoreError> {
self.engine.get_storage_heal_paths()
pub async fn take_storage_heal_paths(
&self,
limit: usize,
) -> Result<Vec<(H256, Vec<Nibbles>)>, StoreError> {
self.engine.take_storage_heal_paths(limit).await
}

/// Sets the state trie paths in need of healing
Expand Down
Loading
Loading