Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f212d70
feat(engine): improve payload validator tracing spans
shekhirin Oct 13, 2025
8e58f40
set trace levels
shekhirin Oct 14, 2025
80b4427
remove levels from instrument macro when no ret is present
shekhirin Oct 15, 2025
780e250
move target to the beginning of macro
shekhirin Oct 15, 2025
dd4eb83
improve in_scope for evm_env
shekhirin Oct 15, 2025
84cee75
Merge remote-tracking branch 'origin/main' into alexey/payload-valida…
shekhirin Oct 15, 2025
a2ca4a3
prewarm spans
shekhirin Oct 15, 2025
5c9f414
reorganize spans in prewarm
shekhirin Oct 15, 2025
32575f8
convert trace_span to info_span
shekhirin Oct 15, 2025
07302ba
Merge remote-tracking branch 'origin/main' into alexey/payload-valida…
shekhirin Oct 15, 2025
d2f02a5
spans!!
shekhirin Oct 15, 2025
74290f6
specify ret level
shekhirin Oct 15, 2025
413e7b6
Merge remote-tracking branch 'origin/main' into alexey/payload-valida…
shekhirin Oct 16, 2025
f076382
chore: remove unused Block import
shekhirin Oct 16, 2025
c33f51c
remove payload from try_insert_payload span
shekhirin Oct 16, 2025
071e028
spans.
shekhirin Oct 16, 2025
c7881f3
chore: add span names to payload processor instrumentation
shekhirin Oct 16, 2025
efea7e6
Merge main into alexey/payload-validator-spans
shekhirin Oct 16, 2025
c3e65d5
chore: update tracing spans to debug level
shekhirin Oct 16, 2025
79d4d7d
remove reth-bench binaries oops
shekhirin Oct 16, 2025
bc330b4
chore: set all tracing spans to debug level
shekhirin Oct 16, 2025
2cda69e
chore: fmt
Rjected Oct 17, 2025
3211876
Merge branch 'main' into HEAD
Rjected Oct 17, 2025
dd1c07d
chore: add attributes feature to reth-trie-sparse
Rjected Oct 17, 2025
6b3756f
fix: skip self on more spans
Rjected Oct 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/engine/tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
/// Internal function used to advance the chain.
///
/// Polls the `ChainOrchestrator` for the next event.
#[tracing::instrument(level = "debug", name = "ChainOrchestrator::poll", skip(self, cx))]
#[tracing::instrument(name = "ChainOrchestrator::poll", skip(self, cx))]
fn poll_next_event(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ChainEvent<T::Event>> {
let this = self.get_mut();

Expand Down
18 changes: 16 additions & 2 deletions crates/engine/tree/src/tree/cached_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use reth_trie::{
MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
};
use std::{sync::Arc, time::Duration};
use tracing::trace;
use tracing::{debug_span, instrument, trace};

pub(crate) type Cache<K, V> =
mini_moka::sync::Cache<K, V, alloy_primitives::map::DefaultHashBuilder>;
Expand Down Expand Up @@ -354,6 +354,7 @@ impl ExecutionCache {
}

/// Invalidates the storage for all addresses in the set
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(accounts = addresses.len()))]
pub(crate) fn invalidate_storages(&self, addresses: HashSet<&Address>) {
// NOTE: this must collect because the invalidate function should not be called while we
// hold an iter for it
Expand Down Expand Up @@ -385,12 +386,25 @@ impl ExecutionCache {
/// ## Error Handling
///
/// Returns an error if the state updates are inconsistent and should be discarded.
#[instrument(level = "debug", target = "engine::tree", skip_all)]
pub(crate) fn insert_state(&self, state_updates: &BundleState) -> Result<(), ()> {
let _enter =
debug_span!(target: "engine::tree", "contracts", len = state_updates.contracts.len())
.entered();
// Insert bytecodes
for (code_hash, bytecode) in &state_updates.contracts {
self.code_cache.insert(*code_hash, Some(Bytecode(bytecode.clone())));
}

drop(_enter);

let _enter = debug_span!(
target: "engine::tree",
"accounts",
accounts = state_updates.state.len(),
storages =
state_updates.state.values().map(|account| account.storage.len()).sum::<usize>()
)
.entered();
let mut invalidated_accounts = HashSet::default();
for (addr, account) in &state_updates.state {
// If the account was not modified, as in not changed and not destroyed, then we have
Expand Down
2 changes: 1 addition & 1 deletion crates/engine/tree/src/tree/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl EngineApiMetrics {
for tx in transactions {
let tx = tx?;
let span =
debug_span!(target: "engine::tree", "execute_tx", tx_hash=?tx.tx().tx_hash());
debug_span!(target: "engine::tree", "execute tx", tx_hash=?tx.tx().tx_hash());
let _enter = span.enter();
trace!(target: "engine::tree", "Executing transaction");
executor.execute_transaction(tx)?;
Expand Down
15 changes: 11 additions & 4 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,12 @@ where
///
/// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
/// returns an error if an internal error occurred.
#[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
#[instrument(
level = "debug",
target = "engine::tree",
skip_all,
fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
)]
fn on_new_payload(
&mut self,
payload: T::ExecutionData,
Expand Down Expand Up @@ -577,6 +582,7 @@ where
/// - `Valid`: Payload successfully validated and inserted
/// - `Syncing`: Parent missing, payload buffered for later
/// - Error status: Payload is invalid
#[instrument(level = "debug", target = "engine::tree", skip_all)]
fn try_insert_payload(
&mut self,
payload: T::ExecutionData,
Expand Down Expand Up @@ -970,7 +976,7 @@ where
/// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
///
/// Returns an error if an internal error occurred like a database error.
#[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
fn on_forkchoice_updated(
&mut self,
state: ForkchoiceState,
Expand Down Expand Up @@ -1972,7 +1978,7 @@ where
}

/// Attempts to connect any buffered blocks that are connected to the given parent hash.
#[instrument(level = "trace", skip(self), target = "engine::tree")]
#[instrument(level = "debug", target = "engine::tree", skip(self))]
fn try_connect_buffered_blocks(
&mut self,
parent: BlockNumHash,
Expand Down Expand Up @@ -2281,7 +2287,7 @@ where
/// Returns an event with the appropriate action to take, such as:
/// - download more missing blocks
/// - try to canonicalize the target if the `block` is the tracked target (head) block.
#[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
fn on_downloaded_block(
&mut self,
block: RecoveredBlock<N::Block>,
Expand Down Expand Up @@ -2387,6 +2393,7 @@ where
/// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
/// `InsertPayloadOk::AlreadySeen` if the block already exists, or
/// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_id))]
fn insert_block_or_payload<Input, Err>(
&mut self,
block_id: BlockWithParent,
Expand Down
26 changes: 22 additions & 4 deletions crates/engine/tree/src/tree/payload_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use std::sync::{
mpsc::{self, channel, Sender},
Arc,
};
use tracing::{debug, instrument, warn};
use tracing::{debug, debug_span, instrument, warn};

mod configured_sparse_trie;
pub mod executor;
Expand Down Expand Up @@ -167,6 +167,12 @@ where
/// This returns a handle to await the final state root and to interact with the tasks (e.g.
/// canceling)
#[allow(clippy::type_complexity)]
#[instrument(
level = "debug",
target = "engine::tree::payload_processor",
name = "payload processor",
skip_all
)]
pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
&mut self,
env: ExecutionEnv<Evm>,
Expand Down Expand Up @@ -236,7 +242,9 @@ where
);

// spawn multi-proof task
let span = tracing::Span::current();
self.executor.spawn_blocking(move || {
let _enter = span.entered();
multi_proof_task.run();
});

Expand All @@ -257,6 +265,7 @@ where
/// Spawns a task that exclusively handles cache prewarming for transaction execution.
///
/// Returns a [`PayloadHandle`] to communicate with the task.
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
&self,
env: ExecutionEnv<Evm>,
Expand Down Expand Up @@ -353,7 +362,9 @@ where
// spawn pre-warm task
{
let to_prewarm_task = to_prewarm_task.clone();
let span = debug_span!(target: "engine::tree::payload_processor", "prewarm task");
self.executor.spawn_blocking(move || {
let _enter = span.entered();
prewarm_task.run(transactions, to_prewarm_task);
});
}
Expand All @@ -370,7 +381,7 @@ where
///
/// If the given hash is different then what is recently cached, then this will create a new
/// instance.
#[instrument(target = "engine::caching", skip(self))]
#[instrument(level = "debug", target = "engine::caching", skip(self))]
fn cache_for(&self, parent_hash: B256) -> SavedCache {
if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
debug!("reusing execution cache");
Expand All @@ -383,6 +394,7 @@ where
}

/// Spawns the [`SparseTrieTask`] for this payload processor.
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
fn spawn_sparse_trie_task<BPF>(
&self,
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
Expand Down Expand Up @@ -421,13 +433,18 @@ where
sparse_state_trie,
);

let span = tracing::Span::current();
self.executor.spawn_blocking(move || {
let _enter = span.entered();

let (result, trie) = task.run();
// Send state root computation result
let _ = state_root_tx.send(result);

// Clear the SparseStateTrie and replace it back into the mutex _after_ sending results
// to the next step, so that time spent clearing doesn't block the step after this one.
// Clear the SparseStateTrie and replace it back into the mutex _after_ sending
// results to the next step, so that time spent clearing doesn't block the step after
// this one.
let _enter = debug_span!(target: "engine::tree::payload_processor", "clear").entered();
cleared_sparse_trie.lock().replace(ClearedSparseStateTrie::from_state_trie(trie));
});
}
Expand All @@ -452,6 +469,7 @@ impl<Tx, Err> PayloadHandle<Tx, Err> {
/// # Panics
///
/// If payload processing was started without background tasks.
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
self.state_root
.take()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::{
},
time::{Duration, Instant},
};
use tracing::{debug, error, trace};
use tracing::{debug, error, instrument, trace};

/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
/// state.
Expand Down Expand Up @@ -718,6 +718,7 @@ impl MultiProofTask {
/// Handles request for proof prefetch.
///
/// Returns a number of proofs that were spawned.
#[instrument(level = "debug", target = "engine::tree::payload_processor::multiproof", skip_all, fields(accounts = targets.len()))]
fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
let proof_targets = self.get_prefetch_proof_targets(targets);
self.fetched_proof_targets.extend_ref(&proof_targets);
Expand Down Expand Up @@ -844,6 +845,7 @@ impl MultiProofTask {
/// Handles state updates.
///
/// Returns a number of proofs that were spawned.
#[instrument(level = "debug", target = "engine::tree::payload_processor::multiproof", skip(self, update), fields(accounts = update.len()))]
fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
let hashed_state_update = evm_state_to_hashed_post_state(update);

Expand Down Expand Up @@ -973,6 +975,7 @@ impl MultiProofTask {
/// currently being calculated, or if there are any pending proofs in the proof sequencer
/// left to be revealed by checking the pending tasks.
/// 6. This task exits after all pending proofs are processed.
#[instrument(level = "debug", target = "engine::tree::payload_processor::multiproof", skip_all)]
pub(crate) fn run(mut self) {
// TODO convert those into fields
let mut prefetch_proofs_requested = 0;
Expand Down
40 changes: 35 additions & 5 deletions crates/engine/tree/src/tree/payload_processor/prewarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use std::{
},
time::Instant,
};
use tracing::{debug, trace, warn};
use tracing::{debug, debug_span, instrument, trace, warn};

/// A wrapper for transactions that includes their index in the block.
#[derive(Clone)]
Expand Down Expand Up @@ -139,8 +139,11 @@ where
let ctx = self.ctx.clone();
let max_concurrency = self.max_concurrency;
let transaction_count_hint = self.transaction_count_hint;
let span = tracing::Span::current();

self.executor.spawn_blocking(move || {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();

let (done_tx, done_rx) = mpsc::channel();
let mut executing = 0usize;

Expand All @@ -157,8 +160,8 @@ where
};

// Only spawn initial workers as needed
for _ in 0..workers_needed {
handles.push(ctx.spawn_worker(&executor, actions_tx.clone(), done_tx.clone()));
for i in 0..workers_needed {
handles.push(ctx.spawn_worker(i, &executor, actions_tx.clone(), done_tx.clone()));
}

let mut tx_index = 0usize;
Expand Down Expand Up @@ -248,6 +251,7 @@ where
/// the new, warmed cache to be inserted.
///
/// This method is called from `run()` only after all execution tasks are complete.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn save_cache(self, state: BundleState) {
let start = Instant::now();

Expand Down Expand Up @@ -284,6 +288,12 @@ where
///
/// This will execute the transactions until all transactions have been processed or the task
/// was cancelled.
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::prewarm",
name = "prewarm",
skip_all
)]
pub(super) fn run(
self,
pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
Expand Down Expand Up @@ -364,6 +374,7 @@ where
{
/// Splits this context into an evm, an evm config, metrics, and the atomic bool for terminating
/// execution.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
let Self {
env,
Expand All @@ -380,7 +391,7 @@ where
Ok(provider) => provider,
Err(err) => {
trace!(
target: "engine::tree",
target: "engine::tree::payload_processor::prewarm",
%err,
"Failed to build state provider in prewarm thread"
);
Expand Down Expand Up @@ -429,6 +440,7 @@ where
///
/// Note: There are no ordering guarantees; this does not reflect the state produced by
/// sequential execution.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn transact_batch<Tx>(
self,
txs: mpsc::Receiver<IndexedTransaction<Tx>>,
Expand All @@ -439,7 +451,15 @@ where
{
let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };

while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
while let Ok(IndexedTransaction { index, tx }) = {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx")
.entered();
txs.recv()
} {
let _enter =
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm tx", index, tx_hash=%tx.tx().tx_hash())
.entered();

// If the task was cancelled, stop execution, send an empty result to notify the task,
// and exit.
if terminate_execution.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -467,12 +487,18 @@ where
};
metrics.execution_duration.record(start.elapsed());

drop(_enter);

// Only send outcome for transactions after the first txn
// as the main execution will be just as fast
if index > 0 {
let _enter =
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
.entered();
let (targets, storage_targets) = multiproof_targets_from_state(res.state);
metrics.prefetch_storage_targets.record(storage_targets as f64);
let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
drop(_enter);
}

metrics.total_runtime.record(start.elapsed());
Expand All @@ -485,6 +511,7 @@ where
/// Spawns a worker task for transaction execution and returns its sender channel.
fn spawn_worker<Tx>(
&self,
idx: usize,
executor: &WorkloadExecutor,
actions_tx: Sender<PrewarmTaskEvent>,
done_tx: Sender<()>,
Expand All @@ -494,8 +521,11 @@ where
{
let (tx, rx) = mpsc::channel();
let ctx = self.clone();
let span =
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);

executor.spawn_blocking(move || {
let _enter = span.entered();
ctx.transact_batch(rx, actions_tx, done_tx);
});

Expand Down
Loading
Loading