Skip to content

Commit 4db576c

Browse files
committed
refactor: rm pending queue
1 parent e121bc2 commit 4db576c

File tree

1 file changed

+16
-50
lines changed

1 file changed

+16
-50
lines changed

crates/engine/tree/src/tree/payload_processor/multiproof.rs

Lines changed: 16 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use reth_trie_parallel::{
2424
root::ParallelStateRootError,
2525
};
2626
use std::{
27-
collections::{BTreeMap, VecDeque},
27+
collections::BTreeMap,
2828
ops::DerefMut,
2929
sync::{
3030
mpsc::{channel, Receiver, Sender},
@@ -34,10 +34,6 @@ use std::{
3434
};
3535
use tracing::{debug, error, trace};
3636

37-
/// Default upper bound for inflight multiproof calculations. These would be sitting in the queue
38-
/// waiting to be processed.
39-
const DEFAULT_MULTIPROOF_INFLIGHT_LIMIT: usize = 256;
40-
4137
/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
4238
/// state.
4339
#[derive(Default, Debug)]
@@ -337,17 +333,10 @@ impl MultiproofInput {
337333
}
338334

339335
/// Manages concurrent multiproof calculations.
340-
/// Takes care of not having more calculations in flight than a given maximum
341-
/// concurrency, further calculation requests are queued and spawn later, after
342-
/// availability has been signaled.
343336
#[derive(Debug)]
344337
pub struct MultiproofManager {
345-
/// Maximum number of proof calculations allowed to be inflight at once.
346-
inflight_limit: usize,
347338
/// Currently running calculations.
348339
inflight: usize,
349-
/// Queued calculations.
350-
pending: VecDeque<PendingMultiproofTask>,
351340
/// Executor for tasks
352341
executor: WorkloadExecutor,
353342
/// Handle to the proof worker pools (storage and account).
@@ -376,22 +365,16 @@ impl MultiproofManager {
376365
proof_worker_handle: ProofWorkerHandle,
377366
) -> Self {
378367
Self {
379-
pending: VecDeque::with_capacity(DEFAULT_MULTIPROOF_INFLIGHT_LIMIT),
380-
inflight_limit: DEFAULT_MULTIPROOF_INFLIGHT_LIMIT,
381-
executor,
382368
inflight: 0,
369+
executor,
383370
metrics,
384371
proof_worker_handle,
385372
missed_leaves_storage_roots: Default::default(),
386373
}
387374
}
388375

389-
const fn is_full(&self) -> bool {
390-
self.inflight >= self.inflight_limit
391-
}
392-
393-
/// Spawns a new multiproof calculation or enqueues it if the inflight limit is reached.
394-
fn spawn_or_queue(&mut self, input: PendingMultiproofTask) {
376+
/// Spawns a new multiproof calculation.
377+
fn spawn(&mut self, input: PendingMultiproofTask) {
395378
// If there are no proof targets, we can just send an empty multiproof back immediately
396379
if input.proof_targets_is_empty() {
397380
debug!(
@@ -402,27 +385,9 @@ impl MultiproofManager {
402385
return
403386
}
404387

405-
if self.is_full() {
406-
self.pending.push_back(input);
407-
self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
408-
return;
409-
}
410-
411388
self.spawn_multiproof_task(input);
412389
}
413390

414-
/// Signals that a multiproof calculation has finished and there's room to
415-
/// spawn a new calculation if needed.
416-
fn on_calculation_complete(&mut self) {
417-
self.inflight = self.inflight.saturating_sub(1);
418-
self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
419-
420-
if let Some(input) = self.pending.pop_front() {
421-
self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
422-
self.spawn_multiproof_task(input);
423-
}
424-
}
425-
426391
/// Spawns a multiproof task, dispatching to `spawn_storage_proof` if the input is a storage
427392
/// multiproof, and dispatching to `spawn_multiproof` otherwise.
428393
fn spawn_multiproof_task(&mut self, input: PendingMultiproofTask) {
@@ -510,6 +475,12 @@ impl MultiproofManager {
510475
self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
511476
}
512477

478+
/// Signals that a multiproof calculation has finished.
479+
fn on_calculation_complete(&mut self) {
480+
self.inflight = self.inflight.saturating_sub(1);
481+
self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
482+
}
483+
513484
/// Spawns a single multiproof calculation task.
514485
fn spawn_multiproof(&mut self, multiproof_input: MultiproofInput) {
515486
let MultiproofInput {
@@ -606,8 +577,6 @@ impl MultiproofManager {
606577
pub(crate) struct MultiProofTaskMetrics {
607578
/// Histogram of inflight multiproofs.
608579
pub inflight_multiproofs_histogram: Histogram,
609-
/// Histogram of pending multiproofs.
610-
pub pending_multiproofs_histogram: Histogram,
611580

612581
/// Histogram of the number of prefetch proof target accounts.
613582
pub prefetch_proof_targets_accounts_histogram: Histogram,
@@ -657,8 +626,7 @@ pub(crate) struct MultiProofTaskMetrics {
657626
#[derive(Debug)]
658627
pub(super) struct MultiProofTask {
659628
/// The size of proof targets chunk to spawn in one calculation.
660-
///
661-
/// If [`None`], then chunking is disabled.
629+
/// If None, chunking is disabled and all targets are processed in a single proof.
662630
chunk_size: Option<usize>,
663631
/// Task configuration.
664632
config: MultiProofConfig,
@@ -737,10 +705,9 @@ impl MultiProofTask {
737705

738706
// Process proof targets in chunks.
739707
let mut chunks = 0;
740-
let should_chunk = !self.multiproof_manager.is_full();
741708

742709
let mut spawn = |proof_targets| {
743-
self.multiproof_manager.spawn_or_queue(
710+
self.multiproof_manager.spawn(
744711
MultiproofInput {
745712
config: self.config.clone(),
746713
source: None,
@@ -755,7 +722,7 @@ impl MultiProofTask {
755722
chunks += 1;
756723
};
757724

758-
if should_chunk && let Some(chunk_size) = self.chunk_size {
725+
if let Some(chunk_size) = self.chunk_size {
759726
for proof_targets_chunk in proof_targets.chunks(chunk_size) {
760727
spawn(proof_targets_chunk);
761728
}
@@ -871,7 +838,6 @@ impl MultiProofTask {
871838

872839
// Process state updates in chunks.
873840
let mut chunks = 0;
874-
let should_chunk = !self.multiproof_manager.is_full();
875841

876842
let mut spawned_proof_targets = MultiProofTargets::default();
877843

@@ -883,7 +849,7 @@ impl MultiProofTask {
883849
);
884850
spawned_proof_targets.extend_ref(&proof_targets);
885851

886-
self.multiproof_manager.spawn_or_queue(
852+
self.multiproof_manager.spawn(
887853
MultiproofInput {
888854
config: self.config.clone(),
889855
source: Some(source),
@@ -899,7 +865,7 @@ impl MultiProofTask {
899865
chunks += 1;
900866
};
901867

902-
if should_chunk && let Some(chunk_size) = self.chunk_size {
868+
if let Some(chunk_size) = self.chunk_size {
903869
for chunk in not_fetched_state_update.chunks(chunk_size) {
904870
spawn(chunk);
905871
}
@@ -952,7 +918,7 @@ impl MultiProofTask {
952918
/// so that the proofs for accounts and storage slots that were already fetched are not
953919
/// requested again.
954920
/// 2. Using the proof targets, a new multiproof is calculated using
955-
/// [`MultiproofManager::spawn_or_queue`].
921+
/// [`MultiproofManager::spawn`].
956922
/// * If the list of proof targets is empty, the [`MultiProofMessage::EmptyProof`] message is
957923
/// sent back to this task along with the original state update.
958924
/// * Otherwise, the multiproof is calculated and the [`MultiProofMessage::ProofCalculated`]

0 commit comments

Comments
 (0)