Skip to content

Commit a2ca4a3

Browse files
committed
prewarm spans
1 parent 84cee75 commit a2ca4a3

File tree

2 files changed

+33
-5
lines changed

2 files changed

+33
-5
lines changed

crates/engine/tree/src/tree/cached_state.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use reth_trie::{
1818
MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
1919
};
2020
use std::{sync::Arc, time::Duration};
21-
use tracing::trace;
21+
use tracing::{instrument, trace, trace_span};
2222

2323
pub(crate) type Cache<K, V> =
2424
mini_moka::sync::Cache<K, V, alloy_primitives::map::DefaultHashBuilder>;
@@ -354,6 +354,7 @@ impl ExecutionCache {
354354
}
355355

356356
/// Invalidates the storage for all addresses in the set
357+
#[instrument(target = "engine::tree", skip_all, fields(accounts = addresses.len()))]
357358
pub(crate) fn invalidate_storages(&self, addresses: HashSet<&Address>) {
358359
// NOTE: this must collect because the invalidate function should not be called while we
359360
// hold an iter for it
@@ -385,12 +386,22 @@ impl ExecutionCache {
385386
/// ## Error Handling
386387
///
387388
/// Returns an error if the state updates are inconsistent and should be discarded.
389+
#[instrument(target = "engine::tree", skip_all)]
388390
pub(crate) fn insert_state(&self, state_updates: &BundleState) -> Result<(), ()> {
391+
let _enter = trace_span!("contracts", len = state_updates.contracts.len()).entered();
389392
// Insert bytecodes
390393
for (code_hash, bytecode) in &state_updates.contracts {
391394
self.code_cache.insert(*code_hash, Some(Bytecode(bytecode.clone())));
392395
}
393-
396+
drop(_enter);
397+
398+
let _enter = trace_span!(
399+
"accounts",
400+
accounts = state_updates.state.len(),
401+
storages =
402+
state_updates.state.values().map(|account| account.storage.len()).sum::<usize>()
403+
)
404+
.entered();
394405
let mut invalidated_accounts = HashSet::default();
395406
for (addr, account) in &state_updates.state {
396407
// If the account was not modified, as in not changed and not destroyed, then we have
@@ -434,6 +445,8 @@ impl ExecutionCache {
434445
// invalidate storage for all destroyed accounts
435446
self.invalidate_storages(invalidated_accounts);
436447

448+
drop(_enter);
449+
437450
Ok(())
438451
}
439452
}

crates/engine/tree/src/tree/payload_processor/prewarm.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use std::{
3939
},
4040
time::Instant,
4141
};
42-
use tracing::{debug, trace, warn};
42+
use tracing::{debug, instrument, trace, trace_span, warn};
4343

4444
/// A wrapper for transactions that includes their index in the block.
4545
#[derive(Clone)]
@@ -86,6 +86,8 @@ where
8686
to_multi_proof: Option<Sender<MultiProofMessage>>,
8787
/// Receiver for events produced by tx execution
8888
actions_rx: Receiver<PrewarmTaskEvent>,
89+
/// Tracing span associated with this prewarm task.
90+
span: tracing::Span,
8991
}
9092

9193
impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
@@ -121,6 +123,7 @@ where
121123
transaction_count_hint,
122124
to_multi_proof,
123125
actions_rx,
126+
span: tracing::Span::current(),
124127
},
125128
actions_tx,
126129
)
@@ -139,8 +142,11 @@ where
139142
let ctx = self.ctx.clone();
140143
let max_concurrency = self.max_concurrency;
141144
let transaction_count_hint = self.transaction_count_hint;
145+
let span = self.span.clone();
142146

143147
self.executor.spawn_blocking(move || {
148+
let _enter = span.enter();
149+
144150
let (done_tx, done_rx) = mpsc::channel();
145151
let mut executing = 0usize;
146152

@@ -157,8 +163,8 @@ where
157163
};
158164

159165
// Only spawn initial workers as needed
160-
for _ in 0..workers_needed {
161-
handles.push(ctx.spawn_worker(&executor, actions_tx.clone(), done_tx.clone()));
166+
for i in 0..workers_needed {
167+
handles.push(ctx.spawn_worker(i, &executor, actions_tx.clone(), done_tx.clone()));
162168
}
163169

164170
let mut tx_index = 0usize;
@@ -248,6 +254,7 @@ where
248254
/// the new, warmed cache to be inserted.
249255
///
250256
/// This method is called from `run()` only after all execution tasks are complete.
257+
#[instrument(target = "engine::tree", skip_all)]
251258
fn save_cache(self, state: BundleState) {
252259
let start = Instant::now();
253260

@@ -289,6 +296,8 @@ where
289296
pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
290297
actions_tx: Sender<PrewarmTaskEvent>,
291298
) {
299+
let _enter = self.span.clone().entered();
300+
292301
// spawn execution tasks.
293302
self.spawn_all(pending, actions_tx);
294303

@@ -429,6 +438,7 @@ where
429438
///
430439
/// Note: There are no ordering guarantees; this does not reflect the state produced by
431440
/// sequential execution.
441+
#[instrument(target = "engine::tree", skip_all)]
432442
fn transact_batch<Tx>(
433443
self,
434444
txs: mpsc::Receiver<IndexedTransaction<Tx>>,
@@ -440,6 +450,8 @@ where
440450
let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
441451

442452
while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
453+
let _enter = trace_span!("prewarm tx", index, tx_hash=%tx.tx().tx_hash()).entered();
454+
443455
// If the task was cancelled, stop execution, send an empty result to notify the task,
444456
// and exit.
445457
if terminate_execution.load(Ordering::Relaxed) {
@@ -485,6 +497,7 @@ where
485497
/// Spawns a worker task for transaction execution and returns its sender channel.
486498
fn spawn_worker<Tx>(
487499
&self,
500+
idx: usize,
488501
executor: &WorkloadExecutor,
489502
actions_tx: Sender<PrewarmTaskEvent>,
490503
done_tx: Sender<()>,
@@ -494,8 +507,10 @@ where
494507
{
495508
let (tx, rx) = mpsc::channel();
496509
let ctx = self.clone();
510+
let span = trace_span!("prewarm worker", idx);
497511

498512
executor.spawn_blocking(move || {
513+
let _enter = span.enter();
499514
ctx.transact_batch(rx, actions_tx, done_tx);
500515
});
501516

0 commit comments

Comments
 (0)