Skip to content

Commit a3dc64e

Browse files
authored
feat(l1): move storage heal paths to their own table (#2359)
**Motivation** During state sync, we store the accounts hashes of the storages we failed to fetch along with their root path in the store so the storage healer can then read them and heal them. For this we used the `SnapState` table, where the whole pending storage paths map was a value in that table. This used to work fine at a smaller scale, but when this map gets too big reading and writing from it becomes very expensive and can disrupt other processes. This PR moves the pending storage paths to their own table and changes how we interact with them: * The storage healer no longer fetches the whole map, but instead reads a specific amount of storages from it when its queue is not filled. * The storage healer no longer uses a channel, it instead reads incoming requests directly from the store * Fetchers that need to communicate with the storage healer now do so via adding paths to the store <!-- Why does this pull request exist? What are its goals? --> **Description** * Remove storage heal paths from snap state * Add new DB table for storage heal paths * Remove channel from storage healer and instead manage incoming and outgoing storage heal paths through the store (This also solves the issues of the rebuilder not being able to input storage heal requests and the storage healer being kept alive indefinitely upon forced shutdown) <!-- A clear and concise general description of the changes this PR introduces --> <!-- Link to issues: Resolves #111, Resolves #222 --> Closes #issue_number
1 parent 9544514 commit a3dc64e

13 files changed

Lines changed: 187 additions & 119 deletions

File tree

crates/networking/p2p/sync.rs

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,7 @@ use std::{
2525
};
2626
use storage_healing::storage_healer;
2727
use tokio::{
28-
sync::{
29-
mpsc::{self, error::SendError},
30-
Mutex,
31-
},
28+
sync::{mpsc::error::SendError, Mutex},
3229
time::{Duration, Instant},
3330
};
3431
use tokio_util::sync::CancellationToken;
@@ -599,13 +596,13 @@ impl Syncer {
599596
));
600597
};
601598
// Spawn storage healer earlier so we can start healing stale storages
602-
let (storage_healer_sender, storage_healer_receiver) =
603-
mpsc::channel::<Vec<H256>>(MAX_CHANNEL_MESSAGES);
599+
// Create a cancellation token so we can end the storage healer when finished, make it a child so that it also ends upon shutdown
600+
let storage_healer_cancell_token = self.cancel_token.child_token();
604601
let storage_healer_handler = tokio::spawn(storage_healer(
605602
state_root,
606-
storage_healer_receiver,
607603
self.peers.clone(),
608604
store.clone(),
605+
storage_healer_cancell_token.clone(),
609606
));
610607
// Perform state sync if it was not already completed on a previous cycle
611608
// Retrieve storage data to check which snap sync phase we are in
@@ -628,12 +625,11 @@ impl Syncer {
628625
.unwrap()
629626
.storage_rebuilder_sender
630627
.clone(),
631-
storage_healer_sender.clone(),
632628
)
633629
.await?;
634630
if stale_pivot {
635631
warn!("Stale Pivot, aborting state sync");
636-
storage_healer_sender.send(vec![]).await?;
632+
storage_healer_cancell_token.cancel();
637633
storage_healer_handler.await??;
638634
return Ok(false);
639635
}
@@ -650,15 +646,10 @@ impl Syncer {
650646
store.clear_snapshot().await?;
651647

652648
// Perform Healing
653-
let state_heal_complete = heal_state_trie(
654-
state_root,
655-
store.clone(),
656-
self.peers.clone(),
657-
storage_healer_sender.clone(),
658-
)
659-
.await?;
660-
// Send empty batch to signal that no more batches are incoming
661-
storage_healer_sender.send(vec![]).await?;
649+
let state_heal_complete =
650+
heal_state_trie(state_root, store.clone(), self.peers.clone()).await?;
651+
// Wait for storage healer to end
652+
storage_healer_cancell_token.cancel();
662653
let storage_heal_complete = storage_healer_handler.await??;
663654
if !(state_heal_complete && storage_heal_complete) {
664655
warn!("Stale pivot, aborting healing");

crates/networking/p2p/sync/state_healing.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ pub(crate) async fn heal_state_trie(
3636
state_root: H256,
3737
store: Store,
3838
peers: PeerHandler,
39-
storage_healer_sender: Sender<Vec<H256>>,
4039
) -> Result<bool, SyncError> {
4140
let mut paths = store.get_state_heal_paths()?.unwrap_or_default();
4241
// Spawn a bytecode fetcher for this block
@@ -59,7 +58,6 @@ pub(crate) async fn heal_state_trie(
5958
batch,
6059
peers.clone(),
6160
store.clone(),
62-
storage_healer_sender.clone(),
6361
bytecode_sender.clone(),
6462
));
6563
// End loop if we have no more paths to fetch
@@ -98,7 +96,6 @@ async fn heal_state_batch(
9896
mut batch: Vec<Nibbles>,
9997
peers: PeerHandler,
10098
store: Store,
101-
storage_sender: Sender<Vec<H256>>,
10299
bytecode_sender: Sender<Vec<H256>>,
103100
) -> Result<(Vec<Nibbles>, bool), SyncError> {
104101
if let Some(nodes) = peers
@@ -144,7 +141,14 @@ async fn heal_state_batch(
144141
}
145142
// Send storage & bytecode requests
146143
if !hashed_addresses.is_empty() {
147-
storage_sender.send(hashed_addresses).await?;
144+
store
145+
.set_storage_heal_paths(
146+
hashed_addresses
147+
.into_iter()
148+
.map(|hash| (hash, vec![Nibbles::default()]))
149+
.collect(),
150+
)
151+
.await?;
148152
}
149153
if !code_hashes.is_empty() {
150154
bytecode_sender.send(code_hashes).await?;

crates/networking/p2p/sync/state_sync.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ pub(crate) async fn state_sync(
4141
peers: PeerHandler,
4242
key_checkpoints: Option<[H256; STATE_TRIE_SEGMENTS]>,
4343
storage_trie_rebuilder_sender: Sender<Vec<(H256, H256)>>,
44-
storage_healer_sender: Sender<Vec<H256>>,
4544
) -> Result<bool, SyncError> {
4645
// Spawn tasks to fetch each state trie segment
4746
let mut state_trie_tasks = tokio::task::JoinSet::new();
@@ -58,7 +57,6 @@ pub(crate) async fn state_sync(
5857
key_checkpoints.map(|chs| chs[i]),
5958
state_sync_progress.clone(),
6059
storage_trie_rebuilder_sender.clone(),
61-
storage_healer_sender.clone(),
6260
));
6361
}
6462
show_progress_handle.await?;
@@ -91,7 +89,6 @@ async fn state_sync_segment(
9189
checkpoint: Option<H256>,
9290
state_sync_progress: StateSyncProgress,
9391
storage_trie_rebuilder_sender: Sender<Vec<(H256, H256)>>,
94-
storage_healer_sender: Sender<Vec<H256>>,
9592
) -> Result<(usize, bool, H256), SyncError> {
9693
// Resume download from checkpoint if available or start from an empty trie
9794
let mut start_account_hash = checkpoint.unwrap_or(STATE_TRIE_SEGMENTS_START[segment_number]);
@@ -125,7 +122,6 @@ async fn state_sync_segment(
125122
store.clone(),
126123
state_root,
127124
storage_trie_rebuilder_sender.clone(),
128-
storage_healer_sender.clone(),
129125
));
130126
info!("Starting/Resuming state trie download of segment number {segment_number} from key {start_account_hash}");
131127
// Fetch Account Ranges

crates/networking/p2p/sync/storage_fetcher.rs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ use std::cmp::min;
1111

1212
use ethrex_common::H256;
1313
use ethrex_storage::Store;
14+
use ethrex_trie::Nibbles;
1415
use tokio::sync::mpsc::{channel, Receiver, Sender};
15-
use tracing::{debug, info};
16+
use tracing::debug;
1617

1718
use crate::{
1819
peer_handler::PeerHandler,
@@ -40,7 +41,6 @@ pub(crate) async fn storage_fetcher(
4041
store: Store,
4142
state_root: H256,
4243
storage_trie_rebuilder_sender: Sender<Vec<(H256, H256)>>,
43-
storage_healer_sender: Sender<Vec<H256>>,
4444
) -> Result<(), SyncError> {
4545
// Spawn large storage fetcher
4646
let (large_storage_sender, large_storage_receiver) =
@@ -51,7 +51,6 @@ pub(crate) async fn storage_fetcher(
5151
store.clone(),
5252
state_root,
5353
storage_trie_rebuilder_sender.clone(),
54-
storage_healer_sender.clone(),
5554
));
5655
// Pending list of storages to fetch
5756
let mut pending_storage: Vec<(H256, H256)> = vec![];
@@ -117,8 +116,13 @@ pub(crate) async fn storage_fetcher(
117116
pending_storage.len()
118117
);
119118
if !pending_storage.is_empty() {
120-
storage_healer_sender
121-
.send(pending_storage.into_iter().map(|(hash, _)| hash).collect())
119+
store
120+
.set_storage_heal_paths(
121+
pending_storage
122+
.into_iter()
123+
.map(|(hash, _)| (hash, vec![Nibbles::default()]))
124+
.collect(),
125+
)
122126
.await?;
123127
}
124128
// Signal large storage fetcher
@@ -197,7 +201,6 @@ async fn large_storage_fetcher(
197201
store: Store,
198202
state_root: H256,
199203
storage_trie_rebuilder_sender: Sender<Vec<(H256, H256)>>,
200-
storage_healer_sender: Sender<Vec<H256>>,
201204
) -> Result<(), SyncError> {
202205
// Pending list of storages to fetch
203206
// (account_hash, storage_root, last_key)
@@ -250,22 +253,22 @@ async fn large_storage_fetcher(
250253
}
251254
}
252255
}
253-
info!(
256+
debug!(
254257
"Concluding large storage fetcher, {} large storages left in queue to be healed later",
255258
pending_storage.len()
256259
);
257260
if !pending_storage.is_empty() {
258261
// Send incomplete storages to the rebuilder and healer
259262
// As these are large storages we should rebuild the partial tries instead of delegating them fully to the healer
260-
let account_hashes: Vec<H256> = pending_storage
261-
.into_iter()
262-
.map(|req| req.account_hash)
263+
let heal_paths = pending_storage
264+
.iter()
265+
.map(|req| (req.account_hash, vec![Nibbles::default()]))
263266
.collect();
264-
let account_hashes_and_roots: Vec<(H256, H256)> = account_hashes
267+
let account_hashes_and_roots: Vec<(H256, H256)> = pending_storage
265268
.iter()
266-
.map(|hash| (*hash, REBUILDER_INCOMPLETE_STORAGE_ROOT))
269+
.map(|req| (req.account_hash, REBUILDER_INCOMPLETE_STORAGE_ROOT))
267270
.collect();
268-
storage_healer_sender.send(account_hashes).await?;
271+
store.set_storage_heal_paths(heal_paths).await?;
269272
storage_trie_rebuilder_sender
270273
.send(account_hashes_and_roots)
271274
.await?;

crates/networking/p2p/sync/storage_healing.rs

Lines changed: 21 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,39 +10,49 @@ use std::collections::BTreeMap;
1010
use ethrex_common::H256;
1111
use ethrex_storage::Store;
1212
use ethrex_trie::{Nibbles, EMPTY_TRIE_HASH};
13-
use tokio::sync::mpsc::Receiver;
13+
use tokio_util::sync::CancellationToken;
1414
use tracing::debug;
1515

1616
use crate::{peer_handler::PeerHandler, sync::node_missing_children};
1717

18-
use super::{SyncError, MAX_CHANNEL_READS, MAX_PARALLEL_FETCHES, NODE_BATCH_SIZE};
18+
/// Minumum amount of storages to keep in the storage healer queue
19+
/// More paths will be read from the Store if the amount goes below this value
20+
const MINUMUM_STORAGES_IN_QUEUE: usize = 400;
21+
22+
use super::{SyncError, MAX_PARALLEL_FETCHES, NODE_BATCH_SIZE};
1923

2024
/// Waits for incoming hashed addresses from the receiver channel endpoint and queues the associated root nodes for state retrieval
2125
/// Also retrieves their children nodes until we have the full storage trie stored
2226
/// If the state becomes stale while fetching, returns its current queued account hashes
2327
// Returns true if there are no more pending storages in the queue (aka storage healing was completed)
2428
pub(crate) async fn storage_healer(
2529
state_root: H256,
26-
mut receiver: Receiver<Vec<H256>>,
2730
peers: PeerHandler,
2831
store: Store,
32+
cancel_token: CancellationToken,
2933
) -> Result<bool, SyncError> {
30-
let mut pending_paths: BTreeMap<H256, Vec<Nibbles>> = store
31-
.get_storage_heal_paths()?
32-
.unwrap_or_default()
33-
.into_iter()
34-
.collect();
34+
// List of paths in need of healing, grouped by hashed address
35+
let mut pending_paths = BTreeMap::<H256, Vec<Nibbles>>::new();
3536
// The pivot may become stale while the fetcher is active, we will still keep the process
3637
// alive until the end signal so we don't lose queued messages
3738
let mut stale = false;
38-
let mut incoming = true;
39-
while incoming {
39+
while !(stale || cancel_token.is_cancelled()) {
40+
// If we have few storages in queue, fetch more from the store
41+
// We won't be retrieving all of them as the read can become quite long and we may not end up using all of the paths in this cycle
42+
if pending_paths.len() < MINUMUM_STORAGES_IN_QUEUE {
43+
pending_paths.extend(
44+
store
45+
.take_storage_heal_paths(MINUMUM_STORAGES_IN_QUEUE)
46+
.await?
47+
.into_iter(),
48+
);
49+
}
4050
// If we have enough pending storages to fill a batch
4151
// or if we have no more incoming batches, spawn a fetch process
4252
// If the pivot became stale don't process anything and just save incoming requests
4353
let mut storage_tasks = tokio::task::JoinSet::new();
4454
let mut task_num = 0;
45-
while !stale && !pending_paths.is_empty() && task_num < MAX_PARALLEL_FETCHES {
55+
while !pending_paths.is_empty() && task_num < MAX_PARALLEL_FETCHES {
4656
let mut next_batch: BTreeMap<H256, Vec<Nibbles>> = BTreeMap::new();
4757
// Fill batch
4858
let mut batch_size = 0;
@@ -65,30 +75,6 @@ pub(crate) async fn storage_healer(
6575
pending_paths.extend(remaining);
6676
stale |= is_stale;
6777
}
68-
69-
// Read incoming requests that are already awaiting on the receiver
70-
// Don't wait for requests unless we have no pending paths left
71-
if incoming && (!receiver.is_empty() || pending_paths.is_empty()) {
72-
// Fetch incoming requests
73-
let mut msg_buffer = vec![];
74-
if receiver.recv_many(&mut msg_buffer, MAX_CHANNEL_READS).await != 0 {
75-
for account_hashes in msg_buffer {
76-
if !account_hashes.is_empty() {
77-
pending_paths.extend(
78-
account_hashes
79-
.into_iter()
80-
.map(|acc_path| (acc_path, vec![Nibbles::default()])),
81-
);
82-
} else {
83-
// Empty message signaling no more bytecodes to sync
84-
incoming = false
85-
}
86-
}
87-
} else {
88-
// Disconnect
89-
incoming = false
90-
}
91-
}
9278
}
9379
let healing_complete = pending_paths.is_empty();
9480
// Store pending paths

crates/networking/p2p/sync/trie_rebuild.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
use ethrex_common::{BigEndianHash, H256, U256, U512};
99
use ethrex_rlp::encode::RLPEncode;
1010
use ethrex_storage::{Store, MAX_SNAPSHOT_READS, STATE_TRIE_SEGMENTS};
11-
use ethrex_trie::EMPTY_TRIE_HASH;
11+
use ethrex_trie::{Nibbles, EMPTY_TRIE_HASH};
1212
use std::array;
1313
use tokio::{
1414
sync::mpsc::{channel, Receiver, Sender},
@@ -272,6 +272,9 @@ async fn rebuild_storage_trie(
272272
}
273273
if expected_root != REBUILDER_INCOMPLETE_STORAGE_ROOT && storage_trie.hash()? != expected_root {
274274
warn!("Mismatched storage root for account {account_hash}");
275+
store
276+
.set_storage_heal_paths(vec![(account_hash, vec![Nibbles::default()])])
277+
.await?;
275278
}
276279
Ok(())
277280
}

crates/storage/api.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,15 +273,20 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe {
273273
&self,
274274
) -> Result<Option<[H256; STATE_TRIE_SEGMENTS]>, StoreError>;
275275

276-
/// Sets the storage trie paths in need of healing, grouped by hashed address
276+
/// Sets storage trie paths in need of healing, grouped by hashed address
277+
/// This will overwite previously stored paths for the received storages but will not remove other storage's paths
277278
async fn set_storage_heal_paths(
278279
&self,
279280
accounts: Vec<(H256, Vec<Nibbles>)>,
280281
) -> Result<(), StoreError>;
281282

282283
/// Gets the storage trie paths in need of healing, grouped by hashed address
284+
/// Gets paths from at most `limit` storage tries and removes them from the store
283285
#[allow(clippy::type_complexity)]
284-
fn get_storage_heal_paths(&self) -> Result<Option<Vec<(H256, Vec<Nibbles>)>>, StoreError>;
286+
async fn take_storage_heal_paths(
287+
&self,
288+
limit: usize,
289+
) -> Result<Vec<(H256, Vec<Nibbles>)>, StoreError>;
285290

286291
/// Sets the state trie paths in need of healing
287292
async fn set_state_heal_paths(&self, paths: Vec<Nibbles>) -> Result<(), StoreError>;

crates/storage/rlp.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use ethrex_common::{
99
H256,
1010
};
1111
use ethrex_rlp::{decode::RLPDecode, encode::RLPEncode};
12+
use ethrex_trie::Nibbles;
1213
#[cfg(feature = "libmdbx")]
1314
use libmdbx::orm::{Decodable, Encodable};
1415
#[cfg(feature = "redb")]
@@ -21,6 +22,7 @@ pub type AccountCodeHashRLP = Rlp<H256>;
2122
pub type AccountCodeRLP = Rlp<Bytes>;
2223
pub type AccountHashRLP = Rlp<H256>;
2324
pub type AccountStateRLP = Rlp<AccountState>;
25+
pub type TriePathsRLP = Rlp<Vec<Nibbles>>;
2426

2527
// Block types
2628
pub type BlockHashRLP = Rlp<BlockHash>;

crates/storage/store.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -952,18 +952,23 @@ impl Store {
952952
self.engine.get_state_trie_key_checkpoint()
953953
}
954954

955-
/// Sets the storage trie paths in need of healing, grouped by hashed address
955+
/// Sets storage trie paths in need of healing, grouped by hashed address
956+
/// This will overwite previously stored paths for the received storages but will not remove other storage's paths
956957
pub async fn set_storage_heal_paths(
957958
&self,
958-
accounts: Vec<(H256, Vec<Nibbles>)>,
959+
paths: Vec<(H256, Vec<Nibbles>)>,
959960
) -> Result<(), StoreError> {
960-
self.engine.set_storage_heal_paths(accounts).await
961+
self.engine.set_storage_heal_paths(paths).await
961962
}
962963

963964
/// Gets the storage trie paths in need of healing, grouped by hashed address
965+
/// Gets paths from at most `limit` storage tries and removes them from the Store
964966
#[allow(clippy::type_complexity)]
965-
pub fn get_storage_heal_paths(&self) -> Result<Option<Vec<(H256, Vec<Nibbles>)>>, StoreError> {
966-
self.engine.get_storage_heal_paths()
967+
pub async fn take_storage_heal_paths(
968+
&self,
969+
limit: usize,
970+
) -> Result<Vec<(H256, Vec<Nibbles>)>, StoreError> {
971+
self.engine.take_storage_heal_paths(limit).await
967972
}
968973

969974
/// Sets the state trie paths in need of healing

0 commit comments

Comments
 (0)