Skip to content

Commit 4a32bc0

Browse files
shekhirinclaudeRjected
authored
feat(engine): improve payload validator tracing spans (#18960)
Co-authored-by: Claude <[email protected]> Co-authored-by: Dan Cline <[email protected]>
1 parent a5618f5 commit 4a32bc0

File tree

29 files changed

+249
-101
lines changed

29 files changed

+249
-101
lines changed

crates/engine/tree/src/chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ where
7171
/// Internal function used to advance the chain.
7272
///
7373
/// Polls the `ChainOrchestrator` for the next event.
74-
#[tracing::instrument(level = "debug", name = "ChainOrchestrator::poll", skip(self, cx))]
74+
#[tracing::instrument(name = "ChainOrchestrator::poll", skip(self, cx))]
7575
fn poll_next_event(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ChainEvent<T::Event>> {
7676
let this = self.get_mut();
7777

crates/engine/tree/src/tree/cached_state.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use reth_trie::{
1818
MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
1919
};
2020
use std::{sync::Arc, time::Duration};
21-
use tracing::trace;
21+
use tracing::{debug_span, instrument, trace};
2222

2323
pub(crate) type Cache<K, V> =
2424
mini_moka::sync::Cache<K, V, alloy_primitives::map::DefaultHashBuilder>;
@@ -354,6 +354,7 @@ impl ExecutionCache {
354354
}
355355

356356
/// Invalidates the storage for all addresses in the set
357+
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(accounts = addresses.len()))]
357358
pub(crate) fn invalidate_storages(&self, addresses: HashSet<&Address>) {
358359
// NOTE: this must collect because the invalidate function should not be called while we
359360
// hold an iter for it
@@ -385,12 +386,25 @@ impl ExecutionCache {
385386
/// ## Error Handling
386387
///
387388
/// Returns an error if the state updates are inconsistent and should be discarded.
389+
#[instrument(level = "debug", target = "engine::tree", skip_all)]
388390
pub(crate) fn insert_state(&self, state_updates: &BundleState) -> Result<(), ()> {
391+
let _enter =
392+
debug_span!(target: "engine::tree", "contracts", len = state_updates.contracts.len())
393+
.entered();
389394
// Insert bytecodes
390395
for (code_hash, bytecode) in &state_updates.contracts {
391396
self.code_cache.insert(*code_hash, Some(Bytecode(bytecode.clone())));
392397
}
393-
398+
drop(_enter);
399+
400+
let _enter = debug_span!(
401+
target: "engine::tree",
402+
"accounts",
403+
accounts = state_updates.state.len(),
404+
storages =
405+
state_updates.state.values().map(|account| account.storage.len()).sum::<usize>()
406+
)
407+
.entered();
394408
let mut invalidated_accounts = HashSet::default();
395409
for (addr, account) in &state_updates.state {
396410
// If the account was not modified, as in not changed and not destroyed, then we have

crates/engine/tree/src/tree/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ impl EngineApiMetrics {
7979
for tx in transactions {
8080
let tx = tx?;
8181
let span =
82-
debug_span!(target: "engine::tree", "execute_tx", tx_hash=?tx.tx().tx_hash());
82+
debug_span!(target: "engine::tree", "execute tx", tx_hash=?tx.tx().tx_hash());
8383
let _enter = span.enter();
8484
trace!(target: "engine::tree", "Executing transaction");
8585
executor.execute_transaction(tx)?;

crates/engine/tree/src/tree/mod.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,12 @@ where
496496
///
497497
/// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
498498
/// returns an error if an internal error occurred.
499-
#[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
499+
#[instrument(
500+
level = "debug",
501+
target = "engine::tree",
502+
skip_all,
503+
fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
504+
)]
500505
fn on_new_payload(
501506
&mut self,
502507
payload: T::ExecutionData,
@@ -577,6 +582,7 @@ where
577582
/// - `Valid`: Payload successfully validated and inserted
578583
/// - `Syncing`: Parent missing, payload buffered for later
579584
/// - Error status: Payload is invalid
585+
#[instrument(level = "debug", target = "engine::tree", skip_all)]
580586
fn try_insert_payload(
581587
&mut self,
582588
payload: T::ExecutionData,
@@ -970,7 +976,7 @@ where
970976
/// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
971977
///
972978
/// Returns an error if an internal error occurred like a database error.
973-
#[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
979+
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
974980
fn on_forkchoice_updated(
975981
&mut self,
976982
state: ForkchoiceState,
@@ -1972,7 +1978,7 @@ where
19721978
}
19731979

19741980
/// Attempts to connect any buffered blocks that are connected to the given parent hash.
1975-
#[instrument(level = "trace", skip(self), target = "engine::tree")]
1981+
#[instrument(level = "debug", target = "engine::tree", skip(self))]
19761982
fn try_connect_buffered_blocks(
19771983
&mut self,
19781984
parent: BlockNumHash,
@@ -2281,7 +2287,7 @@ where
22812287
/// Returns an event with the appropriate action to take, such as:
22822288
/// - download more missing blocks
22832289
/// - try to canonicalize the target if the `block` is the tracked target (head) block.
2284-
#[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
2290+
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
22852291
fn on_downloaded_block(
22862292
&mut self,
22872293
block: RecoveredBlock<N::Block>,
@@ -2387,6 +2393,7 @@ where
23872393
/// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
23882394
/// `InsertPayloadOk::AlreadySeen` if the block already exists, or
23892395
/// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
2396+
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_id))]
23902397
fn insert_block_or_payload<Input, Err>(
23912398
&mut self,
23922399
block_id: BlockWithParent,

crates/engine/tree/src/tree/payload_processor/mod.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use std::sync::{
4545
mpsc::{self, channel, Sender},
4646
Arc,
4747
};
48-
use tracing::{debug, instrument, warn};
48+
use tracing::{debug, debug_span, instrument, warn};
4949

5050
mod configured_sparse_trie;
5151
pub mod executor;
@@ -167,6 +167,12 @@ where
167167
/// This returns a handle to await the final state root and to interact with the tasks (e.g.
168168
/// canceling)
169169
#[allow(clippy::type_complexity)]
170+
#[instrument(
171+
level = "debug",
172+
target = "engine::tree::payload_processor",
173+
name = "payload processor",
174+
skip_all
175+
)]
170176
pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
171177
&mut self,
172178
env: ExecutionEnv<Evm>,
@@ -236,7 +242,9 @@ where
236242
);
237243

238244
// spawn multi-proof task
245+
let span = tracing::Span::current();
239246
self.executor.spawn_blocking(move || {
247+
let _enter = span.entered();
240248
multi_proof_task.run();
241249
});
242250

@@ -257,6 +265,7 @@ where
257265
/// Spawns a task that exclusively handles cache prewarming for transaction execution.
258266
///
259267
/// Returns a [`PayloadHandle`] to communicate with the task.
268+
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
260269
pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
261270
&self,
262271
env: ExecutionEnv<Evm>,
@@ -353,7 +362,9 @@ where
353362
// spawn pre-warm task
354363
{
355364
let to_prewarm_task = to_prewarm_task.clone();
365+
let span = debug_span!(target: "engine::tree::payload_processor", "prewarm task");
356366
self.executor.spawn_blocking(move || {
367+
let _enter = span.entered();
357368
prewarm_task.run(transactions, to_prewarm_task);
358369
});
359370
}
@@ -370,7 +381,7 @@ where
370381
///
371382
/// If the given hash is different then what is recently cached, then this will create a new
372383
/// instance.
373-
#[instrument(target = "engine::caching", skip(self))]
384+
#[instrument(level = "debug", target = "engine::caching", skip(self))]
374385
fn cache_for(&self, parent_hash: B256) -> SavedCache {
375386
if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
376387
debug!("reusing execution cache");
@@ -383,6 +394,7 @@ where
383394
}
384395

385396
/// Spawns the [`SparseTrieTask`] for this payload processor.
397+
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
386398
fn spawn_sparse_trie_task<BPF>(
387399
&self,
388400
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
@@ -421,13 +433,18 @@ where
421433
sparse_state_trie,
422434
);
423435

436+
let span = tracing::Span::current();
424437
self.executor.spawn_blocking(move || {
438+
let _enter = span.entered();
439+
425440
let (result, trie) = task.run();
426441
// Send state root computation result
427442
let _ = state_root_tx.send(result);
428443

429-
// Clear the SparseStateTrie and replace it back into the mutex _after_ sending results
430-
// to the next step, so that time spent clearing doesn't block the step after this one.
444+
// Clear the SparseStateTrie and replace it back into the mutex _after_ sending
445+
// results to the next step, so that time spent clearing doesn't block the step after
446+
// this one.
447+
let _enter = debug_span!(target: "engine::tree::payload_processor", "clear").entered();
431448
cleared_sparse_trie.lock().replace(ClearedSparseStateTrie::from_state_trie(trie));
432449
});
433450
}
@@ -452,6 +469,7 @@ impl<Tx, Err> PayloadHandle<Tx, Err> {
452469
/// # Panics
453470
///
454471
/// If payload processing was started without background tasks.
472+
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
455473
pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
456474
self.state_root
457475
.take()

crates/engine/tree/src/tree/payload_processor/multiproof.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use std::{
3232
},
3333
time::{Duration, Instant},
3434
};
35-
use tracing::{debug, error, trace};
35+
use tracing::{debug, error, instrument, trace};
3636

3737
/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
3838
/// state.
@@ -718,6 +718,7 @@ impl MultiProofTask {
718718
/// Handles request for proof prefetch.
719719
///
720720
/// Returns a number of proofs that were spawned.
721+
#[instrument(level = "debug", target = "engine::tree::payload_processor::multiproof", skip_all, fields(accounts = targets.len()))]
721722
fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
722723
let proof_targets = self.get_prefetch_proof_targets(targets);
723724
self.fetched_proof_targets.extend_ref(&proof_targets);
@@ -844,6 +845,7 @@ impl MultiProofTask {
844845
/// Handles state updates.
845846
///
846847
/// Returns a number of proofs that were spawned.
848+
#[instrument(level = "debug", target = "engine::tree::payload_processor::multiproof", skip(self, update), fields(accounts = update.len()))]
847849
fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
848850
let hashed_state_update = evm_state_to_hashed_post_state(update);
849851

@@ -973,6 +975,7 @@ impl MultiProofTask {
973975
/// currently being calculated, or if there are any pending proofs in the proof sequencer
974976
/// left to be revealed by checking the pending tasks.
975977
/// 6. This task exits after all pending proofs are processed.
978+
#[instrument(level = "debug", target = "engine::tree::payload_processor::multiproof", skip_all)]
976979
pub(crate) fn run(mut self) {
977980
// TODO convert those into fields
978981
let mut prefetch_proofs_requested = 0;

crates/engine/tree/src/tree/payload_processor/prewarm.rs

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use std::{
3939
},
4040
time::Instant,
4141
};
42-
use tracing::{debug, trace, warn};
42+
use tracing::{debug, debug_span, instrument, trace, warn};
4343

4444
/// A wrapper for transactions that includes their index in the block.
4545
#[derive(Clone)]
@@ -139,8 +139,11 @@ where
139139
let ctx = self.ctx.clone();
140140
let max_concurrency = self.max_concurrency;
141141
let transaction_count_hint = self.transaction_count_hint;
142+
let span = tracing::Span::current();
142143

143144
self.executor.spawn_blocking(move || {
145+
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
146+
144147
let (done_tx, done_rx) = mpsc::channel();
145148
let mut executing = 0usize;
146149

@@ -157,8 +160,8 @@ where
157160
};
158161

159162
// Only spawn initial workers as needed
160-
for _ in 0..workers_needed {
161-
handles.push(ctx.spawn_worker(&executor, actions_tx.clone(), done_tx.clone()));
163+
for i in 0..workers_needed {
164+
handles.push(ctx.spawn_worker(i, &executor, actions_tx.clone(), done_tx.clone()));
162165
}
163166

164167
let mut tx_index = 0usize;
@@ -248,6 +251,7 @@ where
248251
/// the new, warmed cache to be inserted.
249252
///
250253
/// This method is called from `run()` only after all execution tasks are complete.
254+
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
251255
fn save_cache(self, state: BundleState) {
252256
let start = Instant::now();
253257

@@ -284,6 +288,12 @@ where
284288
///
285289
/// This will execute the transactions until all transactions have been processed or the task
286290
/// was cancelled.
291+
#[instrument(
292+
level = "debug",
293+
target = "engine::tree::payload_processor::prewarm",
294+
name = "prewarm",
295+
skip_all
296+
)]
287297
pub(super) fn run(
288298
self,
289299
pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
@@ -364,6 +374,7 @@ where
364374
{
365375
/// Splits this context into an evm, an evm config, metrics, and the atomic bool for terminating
366376
/// execution.
377+
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
367378
fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
368379
let Self {
369380
env,
@@ -380,7 +391,7 @@ where
380391
Ok(provider) => provider,
381392
Err(err) => {
382393
trace!(
383-
target: "engine::tree",
394+
target: "engine::tree::payload_processor::prewarm",
384395
%err,
385396
"Failed to build state provider in prewarm thread"
386397
);
@@ -429,6 +440,7 @@ where
429440
///
430441
/// Note: There are no ordering guarantees; this does not reflect the state produced by
431442
/// sequential execution.
443+
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
432444
fn transact_batch<Tx>(
433445
self,
434446
txs: mpsc::Receiver<IndexedTransaction<Tx>>,
@@ -439,7 +451,15 @@ where
439451
{
440452
let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
441453

442-
while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
454+
while let Ok(IndexedTransaction { index, tx }) = {
455+
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx")
456+
.entered();
457+
txs.recv()
458+
} {
459+
let _enter =
460+
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm tx", index, tx_hash=%tx.tx().tx_hash())
461+
.entered();
462+
443463
// If the task was cancelled, stop execution, send an empty result to notify the task,
444464
// and exit.
445465
if terminate_execution.load(Ordering::Relaxed) {
@@ -467,12 +487,18 @@ where
467487
};
468488
metrics.execution_duration.record(start.elapsed());
469489

490+
drop(_enter);
491+
470492
// Only send outcome for transactions after the first txn
471493
// as the main execution will be just as fast
472494
if index > 0 {
495+
let _enter =
496+
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
497+
.entered();
473498
let (targets, storage_targets) = multiproof_targets_from_state(res.state);
474499
metrics.prefetch_storage_targets.record(storage_targets as f64);
475500
let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
501+
drop(_enter);
476502
}
477503

478504
metrics.total_runtime.record(start.elapsed());
@@ -485,6 +511,7 @@ where
485511
/// Spawns a worker task for transaction execution and returns its sender channel.
486512
fn spawn_worker<Tx>(
487513
&self,
514+
idx: usize,
488515
executor: &WorkloadExecutor,
489516
actions_tx: Sender<PrewarmTaskEvent>,
490517
done_tx: Sender<()>,
@@ -494,8 +521,11 @@ where
494521
{
495522
let (tx, rx) = mpsc::channel();
496523
let ctx = self.clone();
524+
let span =
525+
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
497526

498527
executor.spawn_blocking(move || {
528+
let _enter = span.entered();
499529
ctx.transact_batch(rx, actions_tx, done_tx);
500530
});
501531

0 commit comments

Comments
 (0)