Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
//! task.

use crate::work_reprocessing_queue::{
QueuedBackfillBatch, QueuedGossipBlock, ReprocessQueueMessage,
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
};
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
Expand Down Expand Up @@ -117,6 +117,7 @@ pub struct BeaconProcessorQueueLengths {
rpc_custody_column_queue: usize,
rpc_verify_data_column_queue: usize,
sampling_result_queue: usize,
column_reconstruction_queue: usize,
chain_segment_queue: usize,
backfill_chain_segment: usize,
gossip_block_queue: usize,
Expand Down Expand Up @@ -184,6 +185,7 @@ impl BeaconProcessorQueueLengths {
rpc_verify_data_column_queue: 1000,
unknown_block_sampling_request_queue: 16384,
sampling_result_queue: 1000,
column_reconstruction_queue: 64,
chain_segment_queue: 64,
backfill_chain_segment: 64,
gossip_block_queue: 1024,
Expand Down Expand Up @@ -498,6 +500,12 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
drop_during_sync: false,
work: Work::ChainSegmentBackfill(process_fn),
},
ReadyWork::ColumnReconstruction(QueuedColumnReconstruction { process_fn, .. }) => {
Self {
drop_during_sync: true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we will never perform reconstruction during sync (which is also an upcoming feature), but i think we can think about it a bit more.

work: Work::ColumnReconstruction(process_fn),
}
}
}
}
}
Expand Down Expand Up @@ -619,6 +627,7 @@ pub enum Work<E: EthSpec> {
RpcCustodyColumn(AsyncFn),
RpcVerifyDataColumn(AsyncFn),
SamplingResult(AsyncFn),
ColumnReconstruction(AsyncFn),
IgnoredRpcBlock {
process_fn: BlockingFn,
},
Expand Down Expand Up @@ -674,6 +683,7 @@ pub enum WorkType {
RpcCustodyColumn,
RpcVerifyDataColumn,
SamplingResult,
ColumnReconstruction,
IgnoredRpcBlock,
ChainSegment,
ChainSegmentBackfill,
Expand Down Expand Up @@ -725,6 +735,7 @@ impl<E: EthSpec> Work<E> {
Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn,
Work::RpcVerifyDataColumn { .. } => WorkType::RpcVerifyDataColumn,
Work::SamplingResult { .. } => WorkType::SamplingResult,
Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction,
Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock,
Work::ChainSegment { .. } => WorkType::ChainSegment,
Work::ChainSegmentBackfill(_) => WorkType::ChainSegmentBackfill,
Expand Down Expand Up @@ -891,6 +902,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
FifoQueue::new(queue_lengths.rpc_verify_data_column_queue);
// TODO(das): the sampling_request_queue is never read
let mut sampling_result_queue = FifoQueue::new(queue_lengths.sampling_result_queue);
let mut column_reconstruction_queue =
FifoQueue::new(queue_lengths.column_reconstruction_queue);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we never pop from this queue. Need to add it somewhere in this block based on priority

let work_event: Option<Work<E>> =

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 719df16

let mut unknown_block_sampling_request_queue =
FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue);
let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue);
Expand Down Expand Up @@ -1371,6 +1384,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
rpc_verify_data_column_queue.push(work, work_id)
}
Work::SamplingResult(_) => sampling_result_queue.push(work, work_id),
Work::ColumnReconstruction(_) => {
column_reconstruction_queue.push(work, work_id)
}
Work::ChainSegment { .. } => chain_segment_queue.push(work, work_id),
Work::ChainSegmentBackfill { .. } => {
backfill_chain_segment.push(work, work_id)
Expand Down Expand Up @@ -1460,6 +1476,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(),
WorkType::RpcVerifyDataColumn => rpc_verify_data_column_queue.len(),
WorkType::SamplingResult => sampling_result_queue.len(),
WorkType::ColumnReconstruction => column_reconstruction_queue.len(),
WorkType::ChainSegment => chain_segment_queue.len(),
WorkType::ChainSegmentBackfill => backfill_chain_segment.len(),
WorkType::Status => status_queue.len(),
Expand Down Expand Up @@ -1602,7 +1619,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
| Work::RpcBlobs { process_fn }
| Work::RpcCustodyColumn(process_fn)
| Work::RpcVerifyDataColumn(process_fn)
| Work::SamplingResult(process_fn) => task_spawner.spawn_async(process_fn),
| Work::SamplingResult(process_fn)
| Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn),
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
Work::GossipBlock(work)
| Work::GossipBlobSidecar(work)
Expand Down
58 changes: 58 additions & 0 deletions beacon_node/beacon_processor/src/work_reprocessing_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use itertools::Itertools;
use logging::crit;
use logging::TimeLatch;
use slot_clock::SlotClock;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
Expand Down Expand Up @@ -54,6 +55,9 @@ pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4);
/// For how long to queue sampling requests for reprocessing.
pub const QUEUED_SAMPLING_REQUESTS_DELAY: Duration = Duration::from_secs(12);

/// For how long to queue delayed column reconstruction.
pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth making this configurable - we'll likely have some numbers from pandaops soon now we expose the data column event streams.


/// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that
/// we signature-verify blocks before putting them in the queue *should* protect against this, but
/// it's nice to have extra protection.
Expand Down Expand Up @@ -109,6 +113,8 @@ pub enum ReprocessQueueMessage {
UnknownBlockSamplingRequest(QueuedSamplingRequest),
/// A new backfill batch that needs to be scheduled for processing.
BackfillSync(QueuedBackfillBatch),
/// A delayed column reconstruction that needs checking
DelayColumnReconstruction(QueuedColumnReconstruction),
}

/// Events sent by the scheduler once they are ready for re-processing.
Expand All @@ -121,6 +127,7 @@ pub enum ReadyWork {
LightClientUpdate(QueuedLightClientUpdate),
SamplingRequest(QueuedSamplingRequest),
BackfillSync(QueuedBackfillBatch),
ColumnReconstruction(QueuedColumnReconstruction),
}

/// An Attestation for which the corresponding block was not seen while processing, queued for
Expand Down Expand Up @@ -176,6 +183,11 @@ pub struct IgnoredRpcBlock {
/// A backfill batch work that has been queued for processing later.
pub struct QueuedBackfillBatch(pub AsyncFn);

pub struct QueuedColumnReconstruction {
pub block_root: Hash256,
pub process_fn: AsyncFn,
}

impl<E: EthSpec> TryFrom<WorkEvent<E>> for QueuedBackfillBatch {
type Error = WorkEvent<E>;

Expand Down Expand Up @@ -212,6 +224,8 @@ enum InboundEvent {
ReadyLightClientUpdate(QueuedLightClientUpdateId),
/// A backfill batch that was queued is ready for processing.
ReadyBackfillSync(QueuedBackfillBatch),
/// A column reconstruction that was queued is ready for processing.
ReadyColumnReconstruction(QueuedColumnReconstruction),
/// A message sent to the `ReprocessQueue`
Msg(ReprocessQueueMessage),
}
Expand All @@ -234,6 +248,8 @@ struct ReprocessQueue<S> {
lc_updates_delay_queue: DelayQueue<QueuedLightClientUpdateId>,
/// Queue to manage scheduled sampling requests
sampling_requests_delay_queue: DelayQueue<QueuedSamplingRequestId>,
/// Queue to manage scheduled column reconstructions.
column_reconstructions_delay_queue: DelayQueue<QueuedColumnReconstruction>,

/* Queued items */
/// Queued blocks.
Expand All @@ -252,6 +268,8 @@ struct ReprocessQueue<S> {
queued_sampling_requests: FnvHashMap<usize, (QueuedSamplingRequest, DelayKey)>,
/// Sampling requests per block root.
awaiting_sampling_requests_per_block_root: HashMap<Hash256, Vec<QueuedSamplingRequestId>>,
/// Column reconstruction per block root.
queued_column_reconstructions: HashMap<Hash256, DelayKey>,
/// Queued backfill batches
queued_backfill_batches: Vec<QueuedBackfillBatch>,

Expand Down Expand Up @@ -343,6 +361,15 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
Poll::Ready(None) | Poll::Pending => (),
}

match self.column_reconstructions_delay_queue.poll_expired(cx) {
Poll::Ready(Some(reconstruction)) => {
return Poll::Ready(Some(InboundEvent::ReadyColumnReconstruction(
reconstruction.into_inner(),
)));
}
Poll::Ready(None) | Poll::Pending => (),
}

if let Some(next_backfill_batch_event) = self.next_backfill_batch_event.as_mut() {
match next_backfill_batch_event.as_mut().poll(cx) {
Poll::Ready(_) => {
Expand Down Expand Up @@ -410,6 +437,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
attestations_delay_queue: DelayQueue::new(),
lc_updates_delay_queue: DelayQueue::new(),
sampling_requests_delay_queue: <_>::default(),
column_reconstructions_delay_queue: DelayQueue::new(),
queued_gossip_block_roots: HashSet::new(),
queued_lc_updates: FnvHashMap::default(),
queued_aggregates: FnvHashMap::default(),
Expand All @@ -419,6 +447,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
awaiting_lc_updates_per_parent_root: HashMap::new(),
awaiting_sampling_requests_per_block_root: <_>::default(),
queued_backfill_batches: Vec::new(),
queued_column_reconstructions: HashMap::new(),
next_attestation: 0,
next_lc_update: 0,
next_sampling_request_update: 0,
Expand Down Expand Up @@ -817,6 +846,21 @@ impl<S: SlotClock> ReprocessQueue<S> {
self.recompute_next_backfill_batch_event();
}
}
InboundEvent::Msg(DelayColumnReconstruction(request)) => {
match self.queued_column_reconstructions.entry(request.block_root) {
Entry::Occupied(key) => {
// Push back the reattempted reconstruction
self.column_reconstructions_delay_queue
.reset(key.get(), QUEUED_RECONSTRUCTION_DELAY)
}
Entry::Vacant(vacant) => {
let delay_key = self
.column_reconstructions_delay_queue
.insert(request, QUEUED_RECONSTRUCTION_DELAY);
vacant.insert(delay_key);
}
}
}
// A block that was queued for later processing is now ready to be processed.
InboundEvent::ReadyGossipBlock(ready_block) => {
let block_root = ready_block.beacon_block_root;
Expand Down Expand Up @@ -940,6 +984,20 @@ impl<S: SlotClock> ReprocessQueue<S> {
_ => crit!("Unexpected return from try_send error"),
}
}
InboundEvent::ReadyColumnReconstruction(column_reconstruction) => {
self.queued_column_reconstructions
.remove(&column_reconstruction.block_root);
if self
.ready_work_tx
.try_send(ReadyWork::ColumnReconstruction(column_reconstruction))
.is_err()
{
error!(
hint = "system may be overloaded",
"Ignored scheduled column reconstruction"
);
}
}
}

metrics::set_gauge_vec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::HotColdDBError;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::SendError;
use tracing::{debug, error, info, trace, warn};
use types::{
beacon_block::BlockImportSource, Attestation, AttestationData, AttestationRef,
Expand All @@ -42,6 +43,7 @@ use types::{
SyncCommitteeMessage, SyncSubnetId,
};

use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction;
use beacon_processor::{
work_reprocessing_queue::{
QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate,
Expand Down Expand Up @@ -1173,8 +1175,31 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"Processed data column, waiting for other components"
);

self.attempt_data_column_reconstruction(block_root, true)
// Instead of triggering reconstruction immediately, schedule it to be run. If
// another column arrives it either completes availability or pushes
// reconstruction back a bit.
let cloned_self = Arc::clone(self);
let send_result = self
.reprocess_tx
.send(ReprocessQueueMessage::DelayColumnReconstruction(
QueuedColumnReconstruction {
block_root,
process_fn: Box::pin(async move {
cloned_self
.attempt_data_column_reconstruction(block_root, true)
.await;
}),
},
))
.await;
if let Err(SendError(ReprocessQueueMessage::DelayColumnReconstruction(
reconstruction,
))) = send_result
{
warn!("Unable to send reconstruction to reprocessing");
// Execute it immediately instead.
reconstruction.process_fn.await;
}
}
},
Err(BlockError::DuplicateFullyImported(_)) => {
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/network/src/network_beacon_processor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,10 @@ async fn import_gossip_block_acceptably_early() {
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
.await;
}
if num_data_columns > 0 {
rig.assert_event_journal_completes(&[WorkType::ColumnReconstruction])
.await;
}

// Note: this section of the code is a bit race-y. We're assuming that we can set the slot clock
// and check the head in the time between the block arrived early and when its due for
Expand Down