@@ -24,7 +24,7 @@ use reth_trie_parallel::{
2424 root:: ParallelStateRootError ,
2525} ;
2626use std:: {
27- collections:: BTreeMap ,
27+ collections:: { BTreeMap , VecDeque } ,
2828 ops:: DerefMut ,
2929 sync:: {
3030 mpsc:: { channel, Receiver , Sender } ,
@@ -34,6 +34,10 @@ use std::{
3434} ;
3535use tracing:: { debug, error, trace} ;
3636
37+ /// Default upper bound for inflight multiproof calculations. These would be sitting in the queue
38+ /// waiting to be processed.
39+ const DEFAULT_MULTIPROOF_INFLIGHT_LIMIT : usize = 256 ;
40+
3741/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
3842/// state.
3943#[ derive( Default , Debug ) ]
@@ -333,11 +337,17 @@ impl MultiproofInput {
333337}
334338
335339/// Manages concurrent multiproof calculations.
336- /// Takes care of spawning multiproof calculations and tracking in-flight work.
340+ /// Takes care of not having more calculations in flight than a given maximum
341+ /// concurrency, further calculation requests are queued and spawn later, after
342+ /// availability has been signaled.
337343#[ derive( Debug ) ]
338344pub struct MultiproofManager {
345+ /// Maximum number of proof calculations allowed to be inflight at once.
346+ inflight_limit : usize ,
339347 /// Currently running calculations.
340348 inflight : usize ,
349+ /// Queued calculations.
350+ pending : VecDeque < PendingMultiproofTask > ,
341351 /// Executor for tasks
342352 executor : WorkloadExecutor ,
343353 /// Handle to the proof worker pools (storage and account).
@@ -366,6 +376,8 @@ impl MultiproofManager {
366376 proof_worker_handle : ProofWorkerHandle ,
367377 ) -> Self {
368378 Self {
379+ pending : VecDeque :: with_capacity ( DEFAULT_MULTIPROOF_INFLIGHT_LIMIT ) ,
380+ inflight_limit : DEFAULT_MULTIPROOF_INFLIGHT_LIMIT ,
369381 executor,
370382 inflight : 0 ,
371383 metrics,
@@ -374,7 +386,11 @@ impl MultiproofManager {
374386 }
375387 }
376388
377- /// Spawns a new multiproof calculation for the provided input.
389+ const fn is_full ( & self ) -> bool {
390+ self . inflight >= self . inflight_limit
391+ }
392+
393+ /// Spawns a new multiproof calculation or enqueues it if the inflight limit is reached.
378394 fn spawn_or_queue ( & mut self , input : PendingMultiproofTask ) {
379395 // If there are no proof targets, we can just send an empty multiproof back immediately
380396 if input. proof_targets_is_empty ( ) {
@@ -386,6 +402,12 @@ impl MultiproofManager {
386402 return
387403 }
388404
405+ if self . is_full ( ) {
406+ self . pending . push_back ( input) ;
407+ self . metrics . pending_multiproofs_histogram . record ( self . pending . len ( ) as f64 ) ;
408+ return ;
409+ }
410+
389411 self . spawn_multiproof_task ( input) ;
390412 }
391413
@@ -394,6 +416,11 @@ impl MultiproofManager {
394416 fn on_calculation_complete ( & mut self ) {
395417 self . inflight = self . inflight . saturating_sub ( 1 ) ;
396418 self . metrics . inflight_multiproofs_histogram . record ( self . inflight as f64 ) ;
419+
420+ if let Some ( input) = self . pending . pop_front ( ) {
421+ self . metrics . pending_multiproofs_histogram . record ( self . pending . len ( ) as f64 ) ;
422+ self . spawn_multiproof_task ( input) ;
423+ }
397424 }
398425
399426 /// Spawns a multiproof task, dispatching to `spawn_storage_proof` if the input is a storage
@@ -579,6 +606,8 @@ impl MultiproofManager {
579606pub ( crate ) struct MultiProofTaskMetrics {
580607 /// Histogram of inflight multiproofs.
581608 pub inflight_multiproofs_histogram : Histogram ,
609+ /// Histogram of pending multiproofs.
610+ pub pending_multiproofs_histogram : Histogram ,
582611
583612 /// Histogram of the number of prefetch proof target accounts.
584613 pub prefetch_proof_targets_accounts_histogram : Histogram ,
0 commit comments