diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index e864cb1fd91..3acc11b1d27 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -39,7 +39,7 @@ //! task. use crate::work_reprocessing_queue::{ - QueuedBackfillBatch, QueuedGossipBlock, ReprocessQueueMessage, + QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage, }; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; @@ -117,6 +117,7 @@ pub struct BeaconProcessorQueueLengths { rpc_custody_column_queue: usize, rpc_verify_data_column_queue: usize, sampling_result_queue: usize, + column_reconstruction_queue: usize, chain_segment_queue: usize, backfill_chain_segment: usize, gossip_block_queue: usize, @@ -184,6 +185,7 @@ impl BeaconProcessorQueueLengths { rpc_verify_data_column_queue: 1000, unknown_block_sampling_request_queue: 16384, sampling_result_queue: 1000, + column_reconstruction_queue: 64, chain_segment_queue: 64, backfill_chain_segment: 64, gossip_block_queue: 1024, @@ -498,6 +500,12 @@ impl From for WorkEvent { drop_during_sync: false, work: Work::ChainSegmentBackfill(process_fn), }, + ReadyWork::ColumnReconstruction(QueuedColumnReconstruction { process_fn, .. }) => { + Self { + drop_during_sync: true, + work: Work::ColumnReconstruction(process_fn), + } + } } } } @@ -619,6 +627,7 @@ pub enum Work { RpcCustodyColumn(AsyncFn), RpcVerifyDataColumn(AsyncFn), SamplingResult(AsyncFn), + ColumnReconstruction(AsyncFn), IgnoredRpcBlock { process_fn: BlockingFn, }, @@ -674,6 +683,7 @@ pub enum WorkType { RpcCustodyColumn, RpcVerifyDataColumn, SamplingResult, + ColumnReconstruction, IgnoredRpcBlock, ChainSegment, ChainSegmentBackfill, @@ -725,6 +735,7 @@ impl Work { Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn, Work::RpcVerifyDataColumn { .. } => WorkType::RpcVerifyDataColumn, Work::SamplingResult { .. } => WorkType::SamplingResult, + Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction, Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock, Work::ChainSegment { .. } => WorkType::ChainSegment, Work::ChainSegmentBackfill(_) => WorkType::ChainSegmentBackfill, @@ -891,6 +902,8 @@ impl BeaconProcessor { FifoQueue::new(queue_lengths.rpc_verify_data_column_queue); // TODO(das): the sampling_request_queue is never read let mut sampling_result_queue = FifoQueue::new(queue_lengths.sampling_result_queue); + let mut column_reconstruction_queue = + FifoQueue::new(queue_lengths.column_reconstruction_queue); let mut unknown_block_sampling_request_queue = FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue); let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue); @@ -1072,6 +1085,8 @@ impl BeaconProcessor { Some(item) } else if let Some(item) = gossip_data_column_queue.pop() { Some(item) + } else if let Some(item) = column_reconstruction_queue.pop() { + Some(item) // Check the priority 0 API requests after blocks and blobs, but before attestations. } else if let Some(item) = api_request_p0_queue.pop() { Some(item) @@ -1371,6 +1386,9 @@ impl BeaconProcessor { rpc_verify_data_column_queue.push(work, work_id) } Work::SamplingResult(_) => sampling_result_queue.push(work, work_id), + Work::ColumnReconstruction(_) => { + column_reconstruction_queue.push(work, work_id) + } Work::ChainSegment { .. } => chain_segment_queue.push(work, work_id), Work::ChainSegmentBackfill { .. } => { backfill_chain_segment.push(work, work_id) @@ -1460,6 +1478,7 @@ impl BeaconProcessor { WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(), WorkType::RpcVerifyDataColumn => rpc_verify_data_column_queue.len(), WorkType::SamplingResult => sampling_result_queue.len(), + WorkType::ColumnReconstruction => column_reconstruction_queue.len(), WorkType::ChainSegment => chain_segment_queue.len(), WorkType::ChainSegmentBackfill => backfill_chain_segment.len(), WorkType::Status => status_queue.len(), @@ -1602,7 +1621,8 @@ impl BeaconProcessor { | Work::RpcBlobs { process_fn } | Work::RpcCustodyColumn(process_fn) | Work::RpcVerifyDataColumn(process_fn) - | Work::SamplingResult(process_fn) => task_spawner.spawn_async(process_fn), + | Work::SamplingResult(process_fn) + | Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn), Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), Work::GossipBlock(work) | Work::GossipBlobSidecar(work) diff --git a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs index 2b6e72ae0c3..855342d8bda 100644 --- a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs @@ -19,6 +19,7 @@ use itertools::Itertools; use logging::crit; use logging::TimeLatch; use slot_clock::SlotClock; +use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::future::Future; use std::pin::Pin; @@ -54,6 +55,9 @@ pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4); /// For how long to queue sampling requests for reprocessing. pub const QUEUED_SAMPLING_REQUESTS_DELAY: Duration = Duration::from_secs(12); +/// For how long to queue delayed column reconstruction. +pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150); + /// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that /// we signature-verify blocks before putting them in the queue *should* protect against this, but /// it's nice to have extra protection. @@ -109,6 +113,8 @@ pub enum ReprocessQueueMessage { UnknownBlockSamplingRequest(QueuedSamplingRequest), /// A new backfill batch that needs to be scheduled for processing. BackfillSync(QueuedBackfillBatch), + /// A delayed column reconstruction that needs checking + DelayColumnReconstruction(QueuedColumnReconstruction), } /// Events sent by the scheduler once they are ready for re-processing. @@ -121,6 +127,7 @@ pub enum ReadyWork { LightClientUpdate(QueuedLightClientUpdate), SamplingRequest(QueuedSamplingRequest), BackfillSync(QueuedBackfillBatch), + ColumnReconstruction(QueuedColumnReconstruction), } /// An Attestation for which the corresponding block was not seen while processing, queued for @@ -176,6 +183,11 @@ pub struct IgnoredRpcBlock { /// A backfill batch work that has been queued for processing later. pub struct QueuedBackfillBatch(pub AsyncFn); +pub struct QueuedColumnReconstruction { + pub block_root: Hash256, + pub process_fn: AsyncFn, +} + impl TryFrom> for QueuedBackfillBatch { type Error = WorkEvent; @@ -212,6 +224,8 @@ enum InboundEvent { ReadyLightClientUpdate(QueuedLightClientUpdateId), /// A backfill batch that was queued is ready for processing. ReadyBackfillSync(QueuedBackfillBatch), + /// A column reconstruction that was queued is ready for processing. + ReadyColumnReconstruction(QueuedColumnReconstruction), /// A message sent to the `ReprocessQueue` Msg(ReprocessQueueMessage), } @@ -234,6 +248,8 @@ struct ReprocessQueue { lc_updates_delay_queue: DelayQueue, /// Queue to manage scheduled sampling requests sampling_requests_delay_queue: DelayQueue, + /// Queue to manage scheduled column reconstructions. + column_reconstructions_delay_queue: DelayQueue, /* Queued items */ /// Queued blocks. @@ -252,6 +268,8 @@ struct ReprocessQueue { queued_sampling_requests: FnvHashMap, /// Sampling requests per block root. awaiting_sampling_requests_per_block_root: HashMap>, + /// Column reconstruction per block root. + queued_column_reconstructions: HashMap, /// Queued backfill batches queued_backfill_batches: Vec, @@ -343,6 +361,15 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } + match self.column_reconstructions_delay_queue.poll_expired(cx) { + Poll::Ready(Some(reconstruction)) => { + return Poll::Ready(Some(InboundEvent::ReadyColumnReconstruction( + reconstruction.into_inner(), + ))); + } + Poll::Ready(None) | Poll::Pending => (), + } + if let Some(next_backfill_batch_event) = self.next_backfill_batch_event.as_mut() { match next_backfill_batch_event.as_mut().poll(cx) { Poll::Ready(_) => { @@ -410,6 +437,7 @@ impl ReprocessQueue { attestations_delay_queue: DelayQueue::new(), lc_updates_delay_queue: DelayQueue::new(), sampling_requests_delay_queue: <_>::default(), + column_reconstructions_delay_queue: DelayQueue::new(), queued_gossip_block_roots: HashSet::new(), queued_lc_updates: FnvHashMap::default(), queued_aggregates: FnvHashMap::default(), @@ -419,6 +447,7 @@ impl ReprocessQueue { awaiting_lc_updates_per_parent_root: HashMap::new(), awaiting_sampling_requests_per_block_root: <_>::default(), queued_backfill_batches: Vec::new(), + queued_column_reconstructions: HashMap::new(), next_attestation: 0, next_lc_update: 0, next_sampling_request_update: 0, @@ -817,6 +846,21 @@ impl ReprocessQueue { self.recompute_next_backfill_batch_event(); } } + InboundEvent::Msg(DelayColumnReconstruction(request)) => { + match self.queued_column_reconstructions.entry(request.block_root) { + Entry::Occupied(key) => { + // Push back the reattempted reconstruction + self.column_reconstructions_delay_queue + .reset(key.get(), QUEUED_RECONSTRUCTION_DELAY) + } + Entry::Vacant(vacant) => { + let delay_key = self + .column_reconstructions_delay_queue + .insert(request, QUEUED_RECONSTRUCTION_DELAY); + vacant.insert(delay_key); + } + } + } // A block that was queued for later processing is now ready to be processed. InboundEvent::ReadyGossipBlock(ready_block) => { let block_root = ready_block.beacon_block_root; @@ -940,6 +984,20 @@ impl ReprocessQueue { _ => crit!("Unexpected return from try_send error"), } } + InboundEvent::ReadyColumnReconstruction(column_reconstruction) => { + self.queued_column_reconstructions + .remove(&column_reconstruction.block_root); + if self + .ready_work_tx + .try_send(ReadyWork::ColumnReconstruction(column_reconstruction)) + .is_err() + { + error!( + hint = "system may be overloaded", + "Ignored scheduled column reconstruction" + ); + } + } } metrics::set_gauge_vec( diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 87f657f9352..d9aab07d5f6 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -32,6 +32,7 @@ use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; use tokio::sync::mpsc; +use tokio::sync::mpsc::error::SendError; use tracing::{debug, error, info, trace, warn}; use types::{ beacon_block::BlockImportSource, Attestation, AttestationData, AttestationRef, @@ -42,6 +43,7 @@ use types::{ SyncCommitteeMessage, SyncSubnetId, }; +use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction; use beacon_processor::{ work_reprocessing_queue::{ QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate, @@ -1173,8 +1175,31 @@ impl NetworkBeaconProcessor { "Processed data column, waiting for other components" ); - self.attempt_data_column_reconstruction(block_root, true) + // Instead of triggering reconstruction immediately, schedule it to be run. If + // another column arrives it either completes availability or pushes + // reconstruction back a bit. + let cloned_self = Arc::clone(self); + let send_result = self + .reprocess_tx + .send(ReprocessQueueMessage::DelayColumnReconstruction( + QueuedColumnReconstruction { + block_root, + process_fn: Box::pin(async move { + cloned_self + .attempt_data_column_reconstruction(block_root, true) + .await; + }), + }, + )) .await; + if let Err(SendError(ReprocessQueueMessage::DelayColumnReconstruction( + reconstruction, + ))) = send_result + { + warn!("Unable to send reconstruction to reprocessing"); + // Execute it immediately instead. + reconstruction.process_fn.await; + } } }, Err(BlockError::DuplicateFullyImported(_)) => { diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index f6a1069a7f4..9f133ea55ee 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -729,6 +729,10 @@ async fn import_gossip_block_acceptably_early() { rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar]) .await; } + if num_data_columns > 0 { + rig.assert_event_journal_completes(&[WorkType::ColumnReconstruction]) + .await; + } // Note: this section of the code is a bit race-y. We're assuming that we can set the slot clock // and check the head in the time between the block arrived early and when its due for