Skip to content

Commit 11c9949

Browse files
yongkangcmediocregopherCopilotshekhirin
authored
refactor(trie): remove proof task manager (#18934)
Co-authored-by: Brian Picciano <[email protected]> Co-authored-by: Copilot <[email protected]> Co-authored-by: Alexey Shekhirin <[email protected]>
1 parent 082b5da commit 11c9949

File tree

7 files changed

+265
-568
lines changed

7 files changed

+265
-568
lines changed

crates/engine/primitives/src/config.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@ pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;
99
/// Default maximum concurrency for on-demand proof tasks (blinded nodes)
1010
pub const DEFAULT_MAX_PROOF_TASK_CONCURRENCY: u64 = 256;
1111

12+
/// Minimum number of workers we allow configuring explicitly.
13+
pub const MIN_WORKER_COUNT: usize = 32;
14+
1215
/// Returns the default number of storage worker threads based on available parallelism.
1316
fn default_storage_worker_count() -> usize {
1417
#[cfg(feature = "std")]
1518
{
16-
std::thread::available_parallelism().map(|n| (n.get() * 2).clamp(2, 64)).unwrap_or(8)
19+
std::thread::available_parallelism().map_or(8, |n| n.get() * 2).min(MIN_WORKER_COUNT)
1720
}
1821
#[cfg(not(feature = "std"))]
1922
{
@@ -491,8 +494,8 @@ impl TreeConfig {
491494
}
492495

493496
/// Setter for the number of storage proof worker threads.
494-
pub const fn with_storage_worker_count(mut self, storage_worker_count: usize) -> Self {
495-
self.storage_worker_count = storage_worker_count;
497+
pub fn with_storage_worker_count(mut self, storage_worker_count: usize) -> Self {
498+
self.storage_worker_count = storage_worker_count.max(MIN_WORKER_COUNT);
496499
self
497500
}
498501

@@ -502,8 +505,8 @@ impl TreeConfig {
502505
}
503506

504507
/// Setter for the number of account proof worker threads.
505-
pub const fn with_account_worker_count(mut self, account_worker_count: usize) -> Self {
506-
self.account_worker_count = account_worker_count;
508+
pub fn with_account_worker_count(mut self, account_worker_count: usize) -> Self {
509+
self.account_worker_count = account_worker_count.max(MIN_WORKER_COUNT);
507510
self
508511
}
509512
}

crates/engine/tree/src/tree/payload_processor/mod.rs

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use reth_provider::{
3232
use reth_revm::{db::BundleState, state::EvmState};
3333
use reth_trie::TrieInput;
3434
use reth_trie_parallel::{
35-
proof_task::{ProofTaskCtx, ProofTaskManager},
35+
proof_task::{ProofTaskCtx, ProofWorkerHandle},
3636
root::ParallelStateRootError,
3737
};
3838
use reth_trie_sparse::{
@@ -167,8 +167,7 @@ where
167167
/// This returns a handle to await the final state root and to interact with the tasks (e.g.
168168
/// canceling)
169169
///
170-
/// Returns an error with the original transactions iterator if the proof task manager fails to
171-
/// initialize.
170+
/// Returns an error with the original transactions iterator if proof worker spawning fails.
172171
#[allow(clippy::type_complexity)]
173172
pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
174173
&mut self,
@@ -204,14 +203,14 @@ where
204203
let storage_worker_count = config.storage_worker_count();
205204
let account_worker_count = config.account_worker_count();
206205
let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
207-
let proof_task = match ProofTaskManager::new(
206+
let proof_handle = match ProofWorkerHandle::new(
208207
self.executor.handle().clone(),
209208
consistent_view,
210209
task_ctx,
211210
storage_worker_count,
212211
account_worker_count,
213212
) {
214-
Ok(task) => task,
213+
Ok(handle) => handle,
215214
Err(error) => {
216215
return Err((error, transactions, env, provider_builder));
217216
}
@@ -223,7 +222,7 @@ where
223222
let multi_proof_task = MultiProofTask::new(
224223
state_root_config,
225224
self.executor.clone(),
226-
proof_task.handle(),
225+
proof_handle.clone(),
227226
to_sparse_trie,
228227
max_multi_proof_task_concurrency,
229228
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
@@ -252,19 +251,7 @@ where
252251
let (state_root_tx, state_root_rx) = channel();
253252

254253
// Spawn the sparse trie task using any stored trie and parallel trie configuration.
255-
self.spawn_sparse_trie_task(sparse_trie_rx, proof_task.handle(), state_root_tx);
256-
257-
// spawn the proof task
258-
self.executor.spawn_blocking(move || {
259-
if let Err(err) = proof_task.run() {
260-
// At least log if there is an error at any point
261-
tracing::error!(
262-
target: "engine::root",
263-
?err,
264-
"Storage proof task returned an error"
265-
);
266-
}
267-
});
254+
self.spawn_sparse_trie_task(sparse_trie_rx, proof_handle, state_root_tx);
268255

269256
Ok(PayloadHandle {
270257
to_multi_proof,
@@ -406,7 +393,7 @@ where
406393
fn spawn_sparse_trie_task<BPF>(
407394
&self,
408395
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
409-
proof_task_handle: BPF,
396+
proof_worker_handle: BPF,
410397
state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
411398
) where
412399
BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
@@ -436,7 +423,7 @@ where
436423
let task =
437424
SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
438425
sparse_trie_rx,
439-
proof_task_handle,
426+
proof_worker_handle,
440427
self.trie_metrics.clone(),
441428
sparse_state_trie,
442429
);

crates/engine/tree/src/tree/payload_processor/multiproof.rs

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use reth_trie::{
2020
};
2121
use reth_trie_parallel::{
2222
proof::ParallelProof,
23-
proof_task::{AccountMultiproofInput, ProofTaskKind, ProofTaskManagerHandle},
23+
proof_task::{AccountMultiproofInput, ProofWorkerHandle},
2424
root::ParallelStateRootError,
2525
};
2626
use std::{
@@ -346,11 +346,8 @@ pub struct MultiproofManager {
346346
pending: VecDeque<PendingMultiproofTask>,
347347
/// Executor for tasks
348348
executor: WorkloadExecutor,
349-
/// Handle to the proof task manager used for creating `ParallelProof` instances for storage
350-
/// proofs.
351-
storage_proof_task_handle: ProofTaskManagerHandle,
352-
/// Handle to the proof task manager used for account multiproofs.
353-
account_proof_task_handle: ProofTaskManagerHandle,
349+
/// Handle to the proof worker pools (storage and account).
350+
proof_worker_handle: ProofWorkerHandle,
354351
/// Cached storage proof roots for missed leaves; this maps
355352
/// hashed (missed) addresses to their storage proof roots.
356353
///
@@ -372,8 +369,7 @@ impl MultiproofManager {
372369
fn new(
373370
executor: WorkloadExecutor,
374371
metrics: MultiProofTaskMetrics,
375-
storage_proof_task_handle: ProofTaskManagerHandle,
376-
account_proof_task_handle: ProofTaskManagerHandle,
372+
proof_worker_handle: ProofWorkerHandle,
377373
max_concurrent: usize,
378374
) -> Self {
379375
Self {
@@ -382,8 +378,7 @@ impl MultiproofManager {
382378
executor,
383379
inflight: 0,
384380
metrics,
385-
storage_proof_task_handle,
386-
account_proof_task_handle,
381+
proof_worker_handle,
387382
missed_leaves_storage_roots: Default::default(),
388383
}
389384
}
@@ -452,7 +447,7 @@ impl MultiproofManager {
452447
multi_added_removed_keys,
453448
} = storage_multiproof_input;
454449

455-
let storage_proof_task_handle = self.storage_proof_task_handle.clone();
450+
let storage_proof_worker_handle = self.proof_worker_handle.clone();
456451
let missed_leaves_storage_roots = self.missed_leaves_storage_roots.clone();
457452

458453
self.executor.spawn_blocking(move || {
@@ -471,7 +466,7 @@ impl MultiproofManager {
471466
config.state_sorted,
472467
config.prefix_sets,
473468
missed_leaves_storage_roots,
474-
storage_proof_task_handle,
469+
storage_proof_worker_handle,
475470
)
476471
.with_branch_node_masks(true)
477472
.with_multi_added_removed_keys(Some(multi_added_removed_keys))
@@ -524,7 +519,7 @@ impl MultiproofManager {
524519
state_root_message_sender,
525520
multi_added_removed_keys,
526521
} = multiproof_input;
527-
let account_proof_task_handle = self.account_proof_task_handle.clone();
522+
let account_proof_worker_handle = self.proof_worker_handle.clone();
528523
let missed_leaves_storage_roots = self.missed_leaves_storage_roots.clone();
529524

530525
self.executor.spawn_blocking(move || {
@@ -556,15 +551,10 @@ impl MultiproofManager {
556551
missed_leaves_storage_roots,
557552
};
558553

559-
let (sender, receiver) = channel();
560554
let proof_result: Result<DecodedMultiProof, ParallelStateRootError> = (|| {
561-
account_proof_task_handle
562-
.queue_task(ProofTaskKind::AccountMultiproof(input, sender))
563-
.map_err(|_| {
564-
ParallelStateRootError::Other(
565-
"Failed to queue account multiproof to worker pool".into(),
566-
)
567-
})?;
555+
let receiver = account_proof_worker_handle
556+
.queue_account_multiproof(input)
557+
.map_err(|e| ParallelStateRootError::Other(e.to_string()))?;
568558

569559
receiver
570560
.recv()
@@ -693,7 +683,7 @@ impl MultiProofTask {
693683
pub(super) fn new(
694684
config: MultiProofConfig,
695685
executor: WorkloadExecutor,
696-
proof_task_handle: ProofTaskManagerHandle,
686+
proof_worker_handle: ProofWorkerHandle,
697687
to_sparse_trie: Sender<SparseTrieUpdate>,
698688
max_concurrency: usize,
699689
chunk_size: Option<usize>,
@@ -713,8 +703,7 @@ impl MultiProofTask {
713703
multiproof_manager: MultiproofManager::new(
714704
executor,
715705
metrics.clone(),
716-
proof_task_handle.clone(), // handle for storage proof workers
717-
proof_task_handle, // handle for account proof workers
706+
proof_worker_handle,
718707
max_concurrency,
719708
),
720709
metrics,
@@ -1223,7 +1212,7 @@ mod tests {
12231212
DatabaseProviderFactory,
12241213
};
12251214
use reth_trie::{MultiProof, TrieInput};
1226-
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofTaskManager};
1215+
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle};
12271216
use revm_primitives::{B256, U256};
12281217

12291218
fn create_test_state_root_task<F>(factory: F) -> MultiProofTask
@@ -1238,12 +1227,12 @@ mod tests {
12381227
config.prefix_sets.clone(),
12391228
);
12401229
let consistent_view = ConsistentDbView::new(factory, None);
1241-
let proof_task =
1242-
ProofTaskManager::new(executor.handle().clone(), consistent_view, task_ctx, 1, 1)
1243-
.expect("Failed to create ProofTaskManager");
1230+
let proof_handle =
1231+
ProofWorkerHandle::new(executor.handle().clone(), consistent_view, task_ctx, 1, 1)
1232+
.expect("Failed to spawn proof workers");
12441233
let channel = channel();
12451234

1246-
MultiProofTask::new(config, executor, proof_task.handle(), channel.0, 1, None)
1235+
MultiProofTask::new(config, executor, proof_handle, channel.0, 1, None)
12471236
}
12481237

12491238
#[test]

crates/engine/tree/src/tree/payload_validator.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -892,13 +892,12 @@ where
892892
(handle, StateRootStrategy::StateRootTask)
893893
}
894894
Err((error, txs, env, provider_builder)) => {
895-
// Failed to initialize proof task manager, fallback to parallel state
896-
// root
895+
// Failed to spawn proof workers, fallback to parallel state root
897896
error!(
898897
target: "engine::tree",
899898
block=?block_num_hash,
900899
?error,
901-
"Failed to initialize proof task manager, falling back to parallel state root"
900+
"Failed to spawn proof workers, falling back to parallel state root"
902901
);
903902
(
904903
self.payload_processor.spawn_cache_exclusive(

0 commit comments

Comments
 (0)