Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,15 @@ pub enum AvailabilityProcessingStatus {
Imported(Hash256),
}

pub enum ReconstructionOutcome<E: EthSpec> {
Reconstructed {
availability_processing_status: AvailabilityProcessingStatus,
data_columns_to_publish: DataColumnSidecarList<E>,
},
Delay,
NoReconstruction,
}

impl TryInto<SignedBeaconBlockHash> for AvailabilityProcessingStatus {
type Error = ();

Expand Down Expand Up @@ -3301,13 +3310,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn reconstruct_data_columns(
self: &Arc<Self>,
block_root: Hash256,
) -> Result<
Option<(
AvailabilityProcessingStatus,
DataColumnSidecarList<T::EthSpec>,
)>,
BlockError,
> {
is_retry: bool,
) -> Result<ReconstructionOutcome<T::EthSpec>, BlockError> {
// As of now we only reconstruct data columns on supernodes, so if the block is already
// available on a supernode, there's no need to reconstruct as the node must already have
// all columns.
Expand All @@ -3316,15 +3320,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Ok(None);
return Ok(ReconstructionOutcome::NoReconstruction);
}

let data_availability_checker = self.data_availability_checker.clone();

let result = self
.task_executor
.spawn_blocking_handle(
move || data_availability_checker.reconstruct_data_columns(&block_root),
move || data_availability_checker.reconstruct_data_columns(&block_root, is_retry),
"reconstruct_data_columns",
)
.ok_or(BeaconChainError::RuntimeShutdown)?
Expand All @@ -3335,25 +3339,29 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
DataColumnReconstructionResult::Success((availability, data_columns_to_publish)) => {
let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else {
// This should be unreachable because empty result would return `RecoveredColumnsNotImported` instead of success.
return Ok(None);
return Err(BlockError::InternalError("should have columns".to_string()));
};

let r = self
.process_availability(slot, availability, || Ok(()))
.await;
self.remove_notified(&block_root, r)
.map(|availability_processing_status| {
Some((availability_processing_status, data_columns_to_publish))
})
.map(
|availability_processing_status| ReconstructionOutcome::Reconstructed {
availability_processing_status,
data_columns_to_publish,
},
)
}
DataColumnReconstructionResult::Reattempt => Ok(ReconstructionOutcome::Delay),
DataColumnReconstructionResult::NotStarted(reason)
| DataColumnReconstructionResult::RecoveredColumnsNotImported(reason) => {
// We use metric here because logging this would be *very* noisy.
metrics::inc_counter_vec(
&metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL,
&[reason],
);
Ok(None)
Ok(ReconstructionOutcome::NoReconstruction)
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub type AvailabilityAndReconstructedColumns<E> = (Availability<E>, DataColumnSi
#[derive(Debug)]
pub enum DataColumnReconstructionResult<E: EthSpec> {
Success(AvailabilityAndReconstructedColumns<E>),
Reattempt,
NotStarted(&'static str),
RecoveredColumnsNotImported(&'static str),
}
Expand Down Expand Up @@ -511,12 +512,16 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn reconstruct_data_columns(
&self,
block_root: &Hash256,
is_retry: bool,
) -> Result<DataColumnReconstructionResult<T::EthSpec>, AvailabilityCheckError> {
let verified_data_columns = match self
.availability_cache
.check_and_set_reconstruction_started(block_root)
.check_and_set_reconstruction_started(block_root, is_retry)
{
ReconstructColumnsDecision::Yes(verified_data_columns) => verified_data_columns,
ReconstructColumnsDecision::Wait => {
return Ok(DataColumnReconstructionResult::Reattempt)
}
ReconstructColumnsDecision::No(reason) => {
return Ok(DataColumnReconstructionResult::NotStarted(reason));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct PendingComponents<E: EthSpec> {
pub verified_blobs: RuntimeFixedVector<Option<KzgVerifiedBlob<E>>>,
pub verified_data_columns: Vec<KzgVerifiedCustodyDataColumn<E>>,
pub executed_block: Option<DietAvailabilityPendingExecutedBlock<E>>,
pub reconstruction_started: bool,
pub reconstruction_state: ReconstructionState,
}

impl<E: EthSpec> PendingComponents<E> {
Expand Down Expand Up @@ -278,7 +278,7 @@ impl<E: EthSpec> PendingComponents<E> {
verified_blobs: RuntimeFixedVector::new(vec![None; max_len]),
verified_data_columns: vec![],
executed_block: None,
reconstruction_started: false,
reconstruction_state: ReconstructionState::NotStarted,
}
}

Expand Down Expand Up @@ -340,6 +340,12 @@ impl<E: EthSpec> PendingComponents<E> {
}
}

pub enum ReconstructionState {
NotStarted,
WaitingForColumns { num_last: usize },
Started,
}

/// This is the main struct for this module. Outside methods should
/// interact with the cache through this.
pub struct DataAvailabilityCheckerInner<T: BeaconChainTypes> {
Expand All @@ -358,6 +364,7 @@ pub struct DataAvailabilityCheckerInner<T: BeaconChainTypes> {
#[allow(clippy::large_enum_variant)]
pub(crate) enum ReconstructColumnsDecision<E: EthSpec> {
Yes(Vec<KzgVerifiedCustodyDataColumn<E>>),
Wait,
No(&'static str),
}

Expand Down Expand Up @@ -561,6 +568,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
pub fn check_and_set_reconstruction_started(
&self,
block_root: &Hash256,
is_retry: bool,
) -> ReconstructColumnsDecision<T::EthSpec> {
let mut write_lock = self.critical.write();
let Some(pending_components) = write_lock.get_mut(block_root) else {
Expand All @@ -572,18 +580,48 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
let total_column_count = self.spec.number_of_columns as usize;
let received_column_count = pending_components.verified_data_columns.len();

if pending_components.reconstruction_started {
return ReconstructColumnsDecision::No("already started");
}
if received_column_count >= total_column_count {
return ReconstructColumnsDecision::No("all columns received");
}
if received_column_count < total_column_count / 2 {
return ReconstructColumnsDecision::No("not enough columns");
}

pending_components.reconstruction_started = true;
ReconstructColumnsDecision::Yes(pending_components.verified_data_columns.clone())
let descision = match pending_components.reconstruction_state {
ReconstructionState::NotStarted => ReconstructColumnsDecision::Wait,
ReconstructionState::WaitingForColumns { num_last } => {
if !is_retry || num_last < received_column_count {
// We got more columns, let's wait more
ReconstructColumnsDecision::Wait
} else {
// We made no progress waiting for columns, let's start.
ReconstructColumnsDecision::Yes(
pending_components.verified_data_columns.clone(),
)
}
}
ReconstructionState::Started => ReconstructColumnsDecision::No("already started"),
};

match descision {
ReconstructColumnsDecision::Yes(_) => {
pending_components.reconstruction_state = ReconstructionState::Started;
debug!(%block_root, received_column_count, "Starting reconstruction");
}
ReconstructColumnsDecision::Wait => {
pending_components.reconstruction_state = ReconstructionState::WaitingForColumns {
num_last: received_column_count,
};
debug!(
%block_root,
received_column_count,
"Waiting for more columns to arrive before reconstruction"
);
}
ReconstructColumnsDecision::No(_) => {}
}

descision
}

/// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`.
Expand All @@ -592,7 +630,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
pub fn handle_reconstruction_failure(&self, block_root: &Hash256) {
if let Some(pending_components_mut) = self.critical.write().get_mut(block_root) {
pending_components_mut.verified_data_columns = vec![];
pending_components_mut.reconstruction_started = false;
pending_components_mut.reconstruction_state = ReconstructionState::NotStarted;
}
}

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub use self::beacon_chain::{
AttestationProcessingOutcome, AvailabilityProcessingStatus, BeaconBlockResponse,
BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus,
ChainSegmentResult, ForkChoiceError, LightClientProducerEvent, OverrideForkchoiceUpdate,
ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped,
ProduceBlockVerification, ReconstructionOutcome, StateSkipConfig, WhenSlotSkipped,
INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON,
};
Expand Down
22 changes: 20 additions & 2 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
//! task.

use crate::work_reprocessing_queue::{
QueuedBackfillBatch, QueuedGossipBlock, ReprocessQueueMessage,
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
};
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
Expand Down Expand Up @@ -117,6 +117,7 @@ pub struct BeaconProcessorQueueLengths {
rpc_custody_column_queue: usize,
rpc_verify_data_column_queue: usize,
sampling_result_queue: usize,
column_reconstruction_queue: usize,
chain_segment_queue: usize,
backfill_chain_segment: usize,
gossip_block_queue: usize,
Expand Down Expand Up @@ -184,6 +185,7 @@ impl BeaconProcessorQueueLengths {
rpc_verify_data_column_queue: 1000,
unknown_block_sampling_request_queue: 16384,
sampling_result_queue: 1000,
column_reconstruction_queue: 64,
chain_segment_queue: 64,
backfill_chain_segment: 64,
gossip_block_queue: 1024,
Expand Down Expand Up @@ -498,6 +500,12 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
drop_during_sync: false,
work: Work::ChainSegmentBackfill(process_fn),
},
ReadyWork::ColumnReconstruction(QueuedColumnReconstruction { process_fn, .. }) => {
Self {
drop_during_sync: true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we will never perform reconstruction during sync (which is also an upcoming feature), but i think we can think about it a bit more.

work: Work::ColumnReconstruction(process_fn),
}
}
}
}
}
Expand Down Expand Up @@ -619,6 +627,7 @@ pub enum Work<E: EthSpec> {
RpcCustodyColumn(AsyncFn),
RpcVerifyDataColumn(AsyncFn),
SamplingResult(AsyncFn),
ColumnReconstruction(AsyncFn),
IgnoredRpcBlock {
process_fn: BlockingFn,
},
Expand Down Expand Up @@ -674,6 +683,7 @@ pub enum WorkType {
RpcCustodyColumn,
RpcVerifyDataColumn,
SamplingResult,
ColumnReconstruction,
IgnoredRpcBlock,
ChainSegment,
ChainSegmentBackfill,
Expand Down Expand Up @@ -725,6 +735,7 @@ impl<E: EthSpec> Work<E> {
Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn,
Work::RpcVerifyDataColumn { .. } => WorkType::RpcVerifyDataColumn,
Work::SamplingResult { .. } => WorkType::SamplingResult,
Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction,
Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock,
Work::ChainSegment { .. } => WorkType::ChainSegment,
Work::ChainSegmentBackfill(_) => WorkType::ChainSegmentBackfill,
Expand Down Expand Up @@ -891,6 +902,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
FifoQueue::new(queue_lengths.rpc_verify_data_column_queue);
// TODO(das): the sampling_request_queue is never read
let mut sampling_result_queue = FifoQueue::new(queue_lengths.sampling_result_queue);
let mut column_reconstruction_queue =
FifoQueue::new(queue_lengths.column_reconstruction_queue);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we never pop from this queue. Need to add it somewhere in this block based on priority

let work_event: Option<Work<E>> =

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 719df16

let mut unknown_block_sampling_request_queue =
FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue);
let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue);
Expand Down Expand Up @@ -1371,6 +1384,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
rpc_verify_data_column_queue.push(work, work_id)
}
Work::SamplingResult(_) => sampling_result_queue.push(work, work_id),
Work::ColumnReconstruction(_) => {
column_reconstruction_queue.push(work, work_id)
}
Work::ChainSegment { .. } => chain_segment_queue.push(work, work_id),
Work::ChainSegmentBackfill { .. } => {
backfill_chain_segment.push(work, work_id)
Expand Down Expand Up @@ -1460,6 +1476,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(),
WorkType::RpcVerifyDataColumn => rpc_verify_data_column_queue.len(),
WorkType::SamplingResult => sampling_result_queue.len(),
WorkType::ColumnReconstruction => column_reconstruction_queue.len(),
WorkType::ChainSegment => chain_segment_queue.len(),
WorkType::ChainSegmentBackfill => backfill_chain_segment.len(),
WorkType::Status => status_queue.len(),
Expand Down Expand Up @@ -1602,7 +1619,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
| Work::RpcBlobs { process_fn }
| Work::RpcCustodyColumn(process_fn)
| Work::RpcVerifyDataColumn(process_fn)
| Work::SamplingResult(process_fn) => task_spawner.spawn_async(process_fn),
| Work::SamplingResult(process_fn)
| Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn),
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
Work::GossipBlock(work)
| Work::GossipBlobSidecar(work)
Expand Down
Loading