Skip to content
34 changes: 20 additions & 14 deletions crates/networking/p2p/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,10 @@ pub struct Metrics {
pub storage_tries_download_end_time: Arc<Mutex<Option<SystemTime>>>,

// Storage slots
pub downloaded_storage_slots: AtomicU64,
pub storage_accounts_initial: AtomicU64,
pub storage_accounts_healed: AtomicU64,
pub storage_leaves_downloaded: IntCounter,
pub storage_leaves_inserted: IntCounter,
pub storage_tries_insert_end_time: Arc<Mutex<Option<SystemTime>>>,
pub storage_tries_insert_start_time: Arc<Mutex<Option<SystemTime>>>,
pub storage_tries_state_roots_computed: IntCounter,

// Healing
pub healing_empty_try_recv: AtomicU64,
Expand Down Expand Up @@ -624,15 +622,25 @@ impl Default for Metrics {
.register(Box::new(pings_sent_rate.clone()))
.expect("Failed to register pings_sent_rate gauge");

let storage_tries_state_roots_computed = IntCounter::new(
"storage_tries_state_roots_computed",
"Total number of storage tries state roots computed",
let storage_leaves_inserted = IntCounter::new(
"storage_leaves_inserted",
"Total number of storage leaves inserted",
)
.expect("Failed to create storage_tries_state_roots_computed counter");
.expect("Failed to create storage_leaves_inserted counter");

registry
.register(Box::new(storage_tries_state_roots_computed.clone()))
.expect("Failed to register storage_tries_state_roots_computed counter");
.register(Box::new(storage_leaves_inserted.clone()))
.expect("Failed to register storage_leaves_inserted counter");

let storage_leaves_downloaded = IntCounter::new(
"storage_leaves_downloaded",
"Total number of storage leaves downloaded",
)
.expect("Failed to create storage_leaves_downloaded counter");

registry
.register(Box::new(storage_leaves_downloaded.clone()))
.expect("Failed to register storage_leaves_downloaded counter");

Metrics {
_registry: registry,
Expand Down Expand Up @@ -689,12 +697,10 @@ impl Default for Metrics {
storage_tries_download_end_time: Arc::new(Mutex::new(None)),

// Storage slots
downloaded_storage_slots: AtomicU64::new(0),
storage_leaves_downloaded,

// Storage tries state roots
storage_tries_state_roots_computed,
storage_accounts_initial: AtomicU64::new(0),
storage_accounts_healed: AtomicU64::new(0),
storage_leaves_inserted,
storage_tries_insert_end_time: Arc::new(Mutex::new(None)),
storage_tries_insert_start_time: Arc::new(Mutex::new(None)),

Expand Down
23 changes: 14 additions & 9 deletions crates/networking/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,16 @@ pub async fn periodically_show_peer_stats_during_syncing(
});

// Storage leaves metrics
let storage_leaves_downloaded =
METRICS.downloaded_storage_slots.load(Ordering::Relaxed);
let storage_accounts_inserted = METRICS.storage_tries_state_roots_computed.get();
let storage_accounts = METRICS.storage_accounts_initial.load(Ordering::Relaxed);
let storage_accounts_healed = METRICS.storage_accounts_healed.load(Ordering::Relaxed);
let storage_leaves_downloaded = METRICS.storage_leaves_downloaded.get();
let storage_leaves_inserted_percentage = if storage_leaves_downloaded != 0 {
METRICS.storage_leaves_inserted.get() as f64 / storage_leaves_downloaded as f64
* 100.0
} else {
0.0
};
// We round up because of the accounts whose slots get downloaded and then not used
let storage_leaves_inserted_percentage =
(storage_leaves_inserted_percentage * 10.0).round() / 10.0;
let storage_leaves_time = format_duration({
let end_time = METRICS
.storage_tries_download_end_time
Expand Down Expand Up @@ -324,9 +329,9 @@ pub async fn periodically_show_peer_stats_during_syncing(
.load(Ordering::Relaxed);
let heal_current_throttle =
if METRICS.healing_empty_try_recv.load(Ordering::Relaxed) == 0 {
"\x1b[31mDatabase\x1b[0m"
"Database"
} else {
"\x1b[32mPeers\x1b[0m"
"Peers"
};

// Bytecode metrics
Expand Down Expand Up @@ -361,8 +366,8 @@ Current Header Hash: {current_header_hash:x}
headers progress: {headers_download_progress} (total: {headers_to_download}, downloaded: {headers_downloaded}, remaining: {headers_remaining})
account leaves download: {account_leaves_downloaded}, elapsed: {account_leaves_time}
account leaves insertion: {account_leaves_inserted_percentage:.2}%, elapsed: {account_leaves_inserted_time}
storage leaves download: {storage_leaves_downloaded}, elapsed: {storage_leaves_time}, initially accounts with storage {storage_accounts}, healed accounts {storage_accounts_healed}
storage leaves insertion: {storage_accounts_inserted}, {storage_leaves_inserted_time}
storage leaves download: {storage_leaves_downloaded}, elapsed: {storage_leaves_time}
storage leaves insertion: {storage_leaves_inserted_percentage:.2}%, elapsed: {storage_leaves_inserted_time}
healing: global accounts healed {healed_accounts} global storage slots healed {healed_storages}, elapsed: {heal_time}, current throttle {heal_current_throttle}
bytecodes progress: downloaded: {bytecodes_downloaded}, elapsed: {bytecodes_download_time})"
);
Expand Down
13 changes: 11 additions & 2 deletions crates/networking/p2p/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1535,9 +1535,18 @@ impl PeerHandler {
.map(|storage| storage.len())
.sum::<usize>();

// These take into account we downloaded the same thing for different accounts
let effective_slots: usize = account_storages
.iter()
.enumerate()
.map(|(i, storages)| {
accounts_by_root_hash[start_index + i].1.len() * storages.len()
})
.sum();

METRICS
.downloaded_storage_slots
.fetch_add(n_slots as u64, Ordering::Relaxed);
.storage_leaves_downloaded
.inc_by(effective_slots as u64);

debug!("Downloaded {n_storages} storages ({n_slots} slots) from peer {peer_id}");
debug!(
Expand Down
33 changes: 6 additions & 27 deletions crates/networking/p2p/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,10 +750,6 @@ impl Syncer {
info!("Computed state root after request_account_rages: {computed_state_root:?}");

*METRICS.storage_tries_download_start_time.lock().await = Some(SystemTime::now());
METRICS.storage_accounts_initial.store(
storage_accounts.accounts_with_storage_root.len() as u64,
Ordering::Relaxed,
);
// We start downloading the storage leafs. To do so, we need to be sure that the storage root
// is correct. To do so, we always heal the state trie before requesting storage rates
let mut chunk_index = 0_u64;
Expand Down Expand Up @@ -836,10 +832,6 @@ impl Syncer {
info!("We stopped because of staleness, restarting loop");
}
info!("Finished request_storage_ranges");
METRICS.storage_accounts_healed.store(
storage_accounts.healed_accounts.len() as u64,
Ordering::Relaxed,
);
*METRICS.storage_tries_download_end_time.lock().await = Some(SystemTime::now());

*METRICS.storage_tries_insert_start_time.lock().await = Some(SystemTime::now());
Expand All @@ -853,7 +845,6 @@ impl Syncer {
accounts_with_storage,
&account_storages_snapshots_dir,
&self.datadir,
&pivot_header,
)
.await?;

Expand Down Expand Up @@ -1017,7 +1008,6 @@ fn compute_storage_roots(
store: Store,
account_hash: H256,
key_value_pairs: &[(H256, U256)],
pivot_hash: H256,
) -> Result<StorageRoots, SyncError> {
use ethrex_trie::{Nibbles, Node};

Expand All @@ -1033,17 +1023,12 @@ fn compute_storage_roots(
warn!(
"Failed to insert hashed key {hashed_key:?} in account hash: {account_hash:?}, err={err:?}"
);
}
};
METRICS.storage_leaves_inserted.inc();
}

let (computed_storage_root, changes) = storage_trie.collect_changes_since_last_hash();
let (_, changes) = storage_trie.collect_changes_since_last_hash();

let account_state = store
.get_account_state_by_acc_hash(pivot_hash, account_hash)?
.ok_or(SyncError::AccountState(pivot_hash, account_hash))?;
if computed_storage_root == account_state.storage_root {
METRICS.storage_tries_state_roots_computed.inc();
}
Ok((account_hash, changes))
}

Expand Down Expand Up @@ -1355,7 +1340,6 @@ async fn insert_storages(
_: BTreeSet<H256>,
account_storages_snapshots_dir: &Path,
_: &Path,
pivot_header: &BlockHeader,
) -> Result<(), SyncError> {
use rayon::iter::IntoParallelIterator;

Expand Down Expand Up @@ -1386,7 +1370,6 @@ async fn insert_storages(
.map_err(|_| SyncError::SnapshotDecodeError(snapshot_path.clone()))?;

let store_clone = store.clone();
let pivot_hash_moved = pivot_header.hash();
info!("Starting compute of account_storages_snapshot");
let storage_trie_node_changes = tokio::task::spawn_blocking(move || {
let store: Store = store_clone;
Expand All @@ -1401,9 +1384,7 @@ async fn insert_storages(
// FIXME: we probably want to make storages an Arc
.map(move |account| (account, storages.clone()))
})
.map(|(account, storages)| {
compute_storage_roots(store.clone(), account, &storages, pivot_hash_moved)
})
.map(|(account, storages)| compute_storage_roots(store.clone(), account, &storages))
.collect::<Result<Vec<_>, SyncError>>()
})
.await??;
Expand Down Expand Up @@ -1492,7 +1473,6 @@ async fn insert_storages(
accounts_with_storage: BTreeSet<H256>,
account_storages_snapshots_dir: &Path,
datadir: &Path,
_: &BlockHeader,
) -> Result<(), SyncError> {
use crate::utils::get_rocksdb_temp_storage_dir;
use crossbeam::channel::{bounded, unbounded};
Expand Down Expand Up @@ -1592,14 +1572,14 @@ async fn insert_storages(
let mut buffer: [u8; 64] = [0_u8; 64];
buffer[..32].copy_from_slice(&account_hash.0);
iter.seek(buffer);
let mut iter = RocksDBIterator {
let iter = RocksDBIterator {
iter,
limit: *account_hash,
};

let _ = trie_from_sorted_accounts(
trie.db(),
&mut iter,
&mut iter.inspect(|_| METRICS.storage_leaves_inserted.inc()),
pool_clone,
buffer_sender,
buffer_receiver,
Expand All @@ -1610,7 +1590,6 @@ async fn insert_storages(
);
})
.map_err(SyncError::TrieGenerationError);
METRICS.storage_tries_state_roots_computed.inc();
let _ = sender.send(());
});
pool.execute(task);
Expand Down