@@ -80,6 +80,8 @@ mod worker;
8080use crate :: beacon_processor:: work_reprocessing_queue:: QueuedGossipBlock ;
8181pub use worker:: { ChainSegmentProcessId , GossipAggregatePackage , GossipAttestationPackage } ;
8282
83+ use self :: work_reprocessing_queue:: QueuedBlobsSidecar ;
84+
8385/// The maximum size of the channel for work events to the `BeaconProcessor`.
8486///
8587/// Setting this too low will cause consensus messages to be dropped.
@@ -116,6 +118,8 @@ const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024;
116118
117119//FIXME(sean) verify
118120const MAX_GOSSIP_BLOB_QUEUE_LEN : usize = 1_024 ;
121+ //FIXME(sean) verify
122+ const MAX_BLOBS_SIDECAR_REPROCESS_QUEUE_LEN : usize = 1_024 ;
119123
120124/// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but
121125/// within acceptable clock disparity) that will be queued before we start dropping them.
@@ -206,6 +210,7 @@ pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request";
206210pub const BLOBS_BY_RANGE_REQUEST : & str = "blobs_by_range_request" ;
207211pub const UNKNOWN_BLOCK_ATTESTATION : & str = "unknown_block_attestation" ;
208212pub const UNKNOWN_BLOCK_AGGREGATE : & str = "unknown_block_aggregate" ;
213+ pub const UNKNOWN_BLOBS_SIDECAR : & str = "unknown_blobs_sidecar" ;
209214
210215/// A simple first-in-first-out queue with a maximum length.
211216struct FifoQueue < T > {
@@ -413,7 +418,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
413418 pub fn gossip_blobs_sidecar (
414419 message_id : MessageId ,
415420 peer_id : PeerId ,
416- peer_client : Client ,
421+ _peer_client : Client ,
417422 blobs : Arc < SignedBlobsSidecar < T :: EthSpec > > ,
418423 seen_timestamp : Duration ,
419424 ) -> Self {
@@ -422,7 +427,6 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
422427 work : Work :: GossipBlobsSidecar {
423428 message_id,
424429 peer_id,
425- peer_client,
426430 blobs,
427431 seen_timestamp,
428432 } ,
@@ -670,6 +674,20 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
670674 seen_timestamp,
671675 } ,
672676 } ,
677+ ReadyWork :: BlobsSidecar ( QueuedBlobsSidecar {
678+ peer_id,
679+ message_id,
680+ blobs_sidecar,
681+ seen_timestamp,
682+ } ) => Self {
683+ drop_during_sync : true ,
684+ work : Work :: UnknownBlobsSidecar {
685+ message_id,
686+ peer_id,
687+ blobs : blobs_sidecar,
688+ seen_timestamp,
689+ } ,
690+ } ,
673691 }
674692 }
675693}
@@ -722,7 +740,12 @@ pub enum Work<T: BeaconChainTypes> {
722740 GossipBlobsSidecar {
723741 message_id : MessageId ,
724742 peer_id : PeerId ,
725- peer_client : Client ,
743+ blobs : Arc < SignedBlobsSidecar < T :: EthSpec > > ,
744+ seen_timestamp : Duration ,
745+ } ,
746+ UnknownBlobsSidecar {
747+ message_id : MessageId ,
748+ peer_id : PeerId ,
726749 blobs : Arc < SignedBlobsSidecar < T :: EthSpec > > ,
727750 seen_timestamp : Duration ,
728751 } ,
@@ -815,6 +838,7 @@ impl<T: BeaconChainTypes> Work<T> {
815838 Work :: BlobsByRangeRequest { .. } => BLOBS_BY_RANGE_REQUEST ,
816839 Work :: UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION ,
817840 Work :: UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE ,
841+ Work :: UnknownBlobsSidecar { .. } => UNKNOWN_BLOBS_SIDECAR ,
818842 }
819843 }
820844}
@@ -931,6 +955,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
931955 LifoQueue :: new ( MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN ) ;
932956 let mut unknown_block_attestation_queue =
933957 LifoQueue :: new ( MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN ) ;
958+ let mut unknown_blobs_sidecar_queue = LifoQueue :: new ( MAX_BLOBS_SIDECAR_REPROCESS_QUEUE_LEN ) ;
934959
935960 let mut sync_message_queue = LifoQueue :: new ( MAX_SYNC_MESSAGE_QUEUE_LEN ) ;
936961 let mut sync_contribution_queue = LifoQueue :: new ( MAX_SYNC_CONTRIBUTION_QUEUE_LEN ) ;
@@ -1312,6 +1337,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
13121337 Work :: UnknownBlockAggregate { .. } => {
13131338 unknown_block_aggregate_queue. push ( work)
13141339 }
1340+ Work :: UnknownBlobsSidecar { .. } => {
1341+ unknown_blobs_sidecar_queue. push ( work)
1342+ }
13151343 }
13161344 }
13171345 }
@@ -1531,20 +1559,16 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
15311559 Work :: GossipBlobsSidecar {
15321560 message_id,
15331561 peer_id,
1534- peer_client,
15351562 blobs,
15361563 seen_timestamp,
15371564 } => task_spawner. spawn_async ( async move {
1538- worker
1539- . process_gossip_blob (
1540- message_id,
1541- peer_id,
1542- peer_client,
1543- blobs,
1544- work_reprocessing_tx,
1545- seen_timestamp,
1546- )
1547- . await
1565+ worker. process_gossip_blob (
1566+ message_id,
1567+ peer_id,
1568+ blobs,
1569+ Some ( work_reprocessing_tx) ,
1570+ seen_timestamp,
1571+ )
15481572 } ) ,
15491573 /*
15501574 * Import for blocks that we received earlier than their intended slot.
@@ -1731,6 +1755,14 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
17311755 seen_timestamp,
17321756 )
17331757 } ) ,
1758+ Work :: UnknownBlobsSidecar {
1759+ message_id,
1760+ peer_id,
1761+ blobs,
1762+ seen_timestamp,
1763+ } => task_spawner. spawn_blocking ( move || {
1764+ worker. process_gossip_blob ( message_id, peer_id, blobs, None , seen_timestamp)
1765+ } ) ,
17341766 } ;
17351767 }
17361768}
0 commit comments