@@ -19,6 +19,7 @@ use itertools::Itertools;
1919use logging:: crit;
2020use logging:: TimeLatch ;
2121use slot_clock:: SlotClock ;
22+ use std:: collections:: hash_map:: Entry ;
2223use std:: collections:: { HashMap , HashSet } ;
2324use std:: future:: Future ;
2425use std:: pin:: Pin ;
@@ -54,6 +55,9 @@ pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4);
5455/// For how long to queue sampling requests for reprocessing.
5556pub const QUEUED_SAMPLING_REQUESTS_DELAY : Duration = Duration :: from_secs ( 12 ) ;
5657
58+ /// For how long to queue delayed column reconstruction.
59+ pub const QUEUED_RECONSTRUCTION_DELAY : Duration = Duration :: from_millis ( 150 ) ;
60+
5761/// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that
5862/// we signature-verify blocks before putting them in the queue *should* protect against this, but
5963/// it's nice to have extra protection.
@@ -109,6 +113,8 @@ pub enum ReprocessQueueMessage {
109113 UnknownBlockSamplingRequest ( QueuedSamplingRequest ) ,
110114 /// A new backfill batch that needs to be scheduled for processing.
111115 BackfillSync ( QueuedBackfillBatch ) ,
116+ /// A delayed column reconstruction that needs checking
117+ DelayColumnReconstruction ( QueuedColumnReconstruction ) ,
112118}
113119
114120/// Events sent by the scheduler once they are ready for re-processing.
@@ -121,6 +127,7 @@ pub enum ReadyWork {
121127 LightClientUpdate ( QueuedLightClientUpdate ) ,
122128 SamplingRequest ( QueuedSamplingRequest ) ,
123129 BackfillSync ( QueuedBackfillBatch ) ,
130+ ColumnReconstruction ( QueuedColumnReconstruction ) ,
124131}
125132
126133/// An Attestation for which the corresponding block was not seen while processing, queued for
@@ -176,6 +183,11 @@ pub struct IgnoredRpcBlock {
176183/// A backfill batch work that has been queued for processing later.
177184pub struct QueuedBackfillBatch ( pub AsyncFn ) ;
178185
186+ pub struct QueuedColumnReconstruction {
187+ pub block_root : Hash256 ,
188+ pub process_fn : AsyncFn ,
189+ }
190+
179191impl < E : EthSpec > TryFrom < WorkEvent < E > > for QueuedBackfillBatch {
180192 type Error = WorkEvent < E > ;
181193
@@ -212,6 +224,8 @@ enum InboundEvent {
212224 ReadyLightClientUpdate ( QueuedLightClientUpdateId ) ,
213225 /// A backfill batch that was queued is ready for processing.
214226 ReadyBackfillSync ( QueuedBackfillBatch ) ,
227+ /// A column reconstruction that was queued is ready for processing.
228+ ReadyColumnReconstruction ( QueuedColumnReconstruction ) ,
215229 /// A message sent to the `ReprocessQueue`
216230 Msg ( ReprocessQueueMessage ) ,
217231}
@@ -234,6 +248,8 @@ struct ReprocessQueue<S> {
234248 lc_updates_delay_queue : DelayQueue < QueuedLightClientUpdateId > ,
235249 /// Queue to manage scheduled sampling requests
236250 sampling_requests_delay_queue : DelayQueue < QueuedSamplingRequestId > ,
251+ /// Queue to manage scheduled column reconstructions.
252+ column_reconstructions_delay_queue : DelayQueue < QueuedColumnReconstruction > ,
237253
238254 /* Queued items */
239255 /// Queued blocks.
@@ -252,6 +268,8 @@ struct ReprocessQueue<S> {
252268 queued_sampling_requests : FnvHashMap < usize , ( QueuedSamplingRequest , DelayKey ) > ,
253269 /// Sampling requests per block root.
254270 awaiting_sampling_requests_per_block_root : HashMap < Hash256 , Vec < QueuedSamplingRequestId > > ,
271+ /// Column reconstruction per block root.
272+ queued_column_reconstructions : HashMap < Hash256 , DelayKey > ,
255273 /// Queued backfill batches
256274 queued_backfill_batches : Vec < QueuedBackfillBatch > ,
257275
@@ -343,6 +361,15 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
343361 Poll :: Ready ( None ) | Poll :: Pending => ( ) ,
344362 }
345363
364+ match self . column_reconstructions_delay_queue . poll_expired ( cx) {
365+ Poll :: Ready ( Some ( reconstruction) ) => {
366+ return Poll :: Ready ( Some ( InboundEvent :: ReadyColumnReconstruction (
367+ reconstruction. into_inner ( ) ,
368+ ) ) ) ;
369+ }
370+ Poll :: Ready ( None ) | Poll :: Pending => ( ) ,
371+ }
372+
346373 if let Some ( next_backfill_batch_event) = self . next_backfill_batch_event . as_mut ( ) {
347374 match next_backfill_batch_event. as_mut ( ) . poll ( cx) {
348375 Poll :: Ready ( _) => {
@@ -410,6 +437,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
410437 attestations_delay_queue : DelayQueue :: new ( ) ,
411438 lc_updates_delay_queue : DelayQueue :: new ( ) ,
412439 sampling_requests_delay_queue : <_ >:: default ( ) ,
440+ column_reconstructions_delay_queue : DelayQueue :: new ( ) ,
413441 queued_gossip_block_roots : HashSet :: new ( ) ,
414442 queued_lc_updates : FnvHashMap :: default ( ) ,
415443 queued_aggregates : FnvHashMap :: default ( ) ,
@@ -419,6 +447,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
419447 awaiting_lc_updates_per_parent_root : HashMap :: new ( ) ,
420448 awaiting_sampling_requests_per_block_root : <_ >:: default ( ) ,
421449 queued_backfill_batches : Vec :: new ( ) ,
450+ queued_column_reconstructions : HashMap :: new ( ) ,
422451 next_attestation : 0 ,
423452 next_lc_update : 0 ,
424453 next_sampling_request_update : 0 ,
@@ -817,6 +846,21 @@ impl<S: SlotClock> ReprocessQueue<S> {
817846 self . recompute_next_backfill_batch_event ( ) ;
818847 }
819848 }
849+ InboundEvent :: Msg ( DelayColumnReconstruction ( request) ) => {
850+ match self . queued_column_reconstructions . entry ( request. block_root ) {
851+ Entry :: Occupied ( key) => {
852+ // Push back the reattempted reconstruction
853+ self . column_reconstructions_delay_queue
854+ . reset ( key. get ( ) , QUEUED_RECONSTRUCTION_DELAY )
855+ }
856+ Entry :: Vacant ( vacant) => {
857+ let delay_key = self
858+ . column_reconstructions_delay_queue
859+ . insert ( request, QUEUED_RECONSTRUCTION_DELAY ) ;
860+ vacant. insert ( delay_key) ;
861+ }
862+ }
863+ }
820864 // A block that was queued for later processing is now ready to be processed.
821865 InboundEvent :: ReadyGossipBlock ( ready_block) => {
822866 let block_root = ready_block. beacon_block_root ;
@@ -940,6 +984,20 @@ impl<S: SlotClock> ReprocessQueue<S> {
940984 _ => crit ! ( "Unexpected return from try_send error" ) ,
941985 }
942986 }
987+ InboundEvent :: ReadyColumnReconstruction ( column_reconstruction) => {
988+ self . queued_column_reconstructions
989+ . remove ( & column_reconstruction. block_root ) ;
990+ if self
991+ . ready_work_tx
992+ . try_send ( ReadyWork :: ColumnReconstruction ( column_reconstruction) )
993+ . is_err ( )
994+ {
995+ error ! (
996+ hint = "system may be overloaded" ,
997+ "Ignored scheduled column reconstruction"
998+ ) ;
999+ }
1000+ }
9431001 }
9441002
9451003 metrics:: set_gauge_vec (
0 commit comments