diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 990f4b6099c..0c9eb335163 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -73,7 +73,9 @@ use crate::{ kzg_utils, metrics, AvailabilityPendingExecutedBlock, BeaconChainError, BeaconForkChoiceStore, BeaconSnapshot, CachedHead, }; -use eth2::types::{EventKind, SseBlobSidecar, SseBlock, SseExtendedPayloadAttributes}; +use eth2::types::{ + EventKind, SseBlobSidecar, SseBlock, SseDataColumnSidecar, SseExtendedPayloadAttributes, +}; use execution_layer::{ BlockProposalContents, BlockProposalContentsType, BuilderParams, ChainHealth, ExecutionLayer, FailedCondition, PayloadAttributes, PayloadStatus, @@ -3087,6 +3089,11 @@ impl BeaconChain { return Err(BlockError::DuplicateFullyImported(block_root)); } + self.emit_sse_data_column_sidecar_events( + &block_root, + data_columns.iter().map(|column| column.as_data_column()), + ); + let r = self .check_gossip_data_columns_availability_and_import( slot, @@ -3158,10 +3165,16 @@ impl BeaconChain { return Err(BlockError::DuplicateFullyImported(block_root)); } - // process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS - // consumers don't expect the blobs event to fire erratically. - if let EngineGetBlobsOutput::Blobs(blobs) = &engine_get_blobs_output { - self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().map(|b| b.as_blob())); + match &engine_get_blobs_output { + EngineGetBlobsOutput::Blobs(blobs) => { + self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().map(|b| b.as_blob())); + } + EngineGetBlobsOutput::CustodyColumns(columns) => { + self.emit_sse_data_column_sidecar_events( + &block_root, + columns.iter().map(|column| column.as_data_column()), + ); + } } let r = self @@ -3191,6 +3204,31 @@ impl BeaconChain { } } + fn emit_sse_data_column_sidecar_events<'a, I>( + self: &Arc, + block_root: &Hash256, + data_columns_iter: I, + ) where + I: Iterator>, + { + if let Some(event_handler) = self.event_handler.as_ref() { + if event_handler.has_data_column_sidecar_subscribers() { + let imported_data_columns = self + .data_availability_checker + .cached_data_column_indexes(block_root) + .unwrap_or_default(); + let new_data_columns = + data_columns_iter.filter(|b| !imported_data_columns.contains(&b.index)); + + for data_column in new_data_columns { + event_handler.register(EventKind::DataColumnSidecar( + SseDataColumnSidecar::from_data_column_sidecar(data_column), + )); + } + } + } + } + /// Cache the columns in the processing cache, process it, then evict it from the cache if it was /// imported or errors. pub async fn process_rpc_custody_columns( @@ -3231,6 +3269,11 @@ impl BeaconChain { } } + self.emit_sse_data_column_sidecar_events( + &block_root, + custody_columns.iter().map(|column| column.as_ref()), + ); + let r = self .check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns) .await; diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 3e54ee91990..609e5bd796e 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -215,8 +215,7 @@ impl GossipVerifiedDataColumn } /// Create a `GossipVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY. - #[cfg(test)] - pub(crate) fn __new_for_testing(column_sidecar: Arc>) -> Self { + pub fn __new_for_testing(column_sidecar: Arc>) -> Self { Self { block_root: column_sidecar.block_root(), data_column: KzgVerifiedDataColumn::__new_for_testing(column_sidecar), @@ -268,7 +267,6 @@ impl KzgVerifiedDataColumn { } /// Create a `KzgVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY. - #[cfg(test)] pub(crate) fn __new_for_testing(data_column: Arc>) -> Self { Self { data: data_column } } diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index d09b74e6452..94ebfb46557 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -11,6 +11,7 @@ pub struct ServerSentEventHandler { single_attestation_tx: Sender>, block_tx: Sender>, blob_sidecar_tx: Sender>, + data_column_sidecar_tx: Sender>, finalized_tx: Sender>, head_tx: Sender>, exit_tx: Sender>, @@ -37,6 +38,7 @@ impl ServerSentEventHandler { let (single_attestation_tx, _) = broadcast::channel(capacity); let (block_tx, _) = broadcast::channel(capacity); let (blob_sidecar_tx, _) = broadcast::channel(capacity); + let (data_column_sidecar_tx, _) = broadcast::channel(capacity); let (finalized_tx, _) = broadcast::channel(capacity); let (head_tx, _) = broadcast::channel(capacity); let (exit_tx, _) = broadcast::channel(capacity); @@ -57,6 +59,7 @@ impl ServerSentEventHandler { single_attestation_tx, block_tx, blob_sidecar_tx, + data_column_sidecar_tx, finalized_tx, head_tx, exit_tx, @@ -99,6 +102,10 @@ impl ServerSentEventHandler { .blob_sidecar_tx .send(kind) .map(|count| log_count("blob sidecar", count)), + EventKind::DataColumnSidecar(_) => self + .data_column_sidecar_tx + .send(kind) + .map(|count| log_count("data_column_sidecar", count)), EventKind::FinalizedCheckpoint(_) => self .finalized_tx .send(kind) @@ -177,6 +184,10 @@ impl ServerSentEventHandler { self.blob_sidecar_tx.subscribe() } + pub fn subscribe_data_column_sidecar(&self) -> Receiver> { + self.data_column_sidecar_tx.subscribe() + } + pub fn subscribe_finalized(&self) -> Receiver> { self.finalized_tx.subscribe() } @@ -249,6 +260,10 @@ impl ServerSentEventHandler { self.blob_sidecar_tx.receiver_count() > 0 } + pub fn has_data_column_sidecar_subscribers(&self) -> bool { + self.data_column_sidecar_tx.receiver_count() > 0 + } + pub fn has_finalized_subscribers(&self) -> bool { self.finalized_tx.receiver_count() > 0 } diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 3ee8c7ae3f9..db6968b662a 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -72,7 +72,7 @@ pub const FORK_NAME_ENV_VAR: &str = "FORK_NAME"; // Pre-computed data column sidecar using a single static blob from: // `beacon_node/execution_layer/src/test_utils/fixtures/mainnet/test_blobs_bundle.ssz` -const TEST_DATA_COLUMN_SIDECARS_SSZ: &[u8] = +pub const TEST_DATA_COLUMN_SIDECARS_SSZ: &[u8] = include_bytes!("test_utils/fixtures/test_data_column_sidecars.ssz"); // Default target aggregators to set during testing, this ensures an aggregator at each slot. diff --git a/beacon_node/beacon_chain/tests/events.rs b/beacon_node/beacon_chain/tests/events.rs index c9bd55e0620..5d0f22e2528 100644 --- a/beacon_node/beacon_chain/tests/events.rs +++ b/beacon_node/beacon_chain/tests/events.rs @@ -1,11 +1,15 @@ use beacon_chain::blob_verification::GossipVerifiedBlob; -use beacon_chain::test_utils::BeaconChainHarness; -use eth2::types::{EventKind, SseBlobSidecar}; +use beacon_chain::data_column_verification::GossipVerifiedDataColumn; +use beacon_chain::test_utils::{BeaconChainHarness, TEST_DATA_COLUMN_SIDECARS_SSZ}; +use eth2::types::{EventKind, SseBlobSidecar, SseDataColumnSidecar}; use rand::rngs::StdRng; use rand::SeedableRng; use std::sync::Arc; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlobSidecar, EthSpec, ForkName, MinimalEthSpec}; +use types::test_utils::TestRandom; +use types::{ + BlobSidecar, DataColumnSidecar, EthSpec, ForkName, MinimalEthSpec, RuntimeVariableList, +}; type E = MinimalEthSpec; @@ -43,6 +47,42 @@ async fn blob_sidecar_event_on_process_gossip_blob() { assert_eq!(sidecar_event, EventKind::BlobSidecar(expected_sse_blobs)); } +/// Verifies that a data column event is emitted when a gossip verified data column is received via gossip or the publish block API. +#[tokio::test] +async fn data_column_sidecar_event_on_process_gossip_data_column() { + let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec) + .deterministic_keypairs(8) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + // subscribe to blob sidecar events + let event_handler = harness.chain.event_handler.as_ref().unwrap(); + let mut data_column_event_receiver = event_handler.subscribe_data_column_sidecar(); + + // build and process a gossip verified data column + let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); + let sidecar = Arc::new(DataColumnSidecar::random_for_test(&mut rng)); + let gossip_verified_data_column = GossipVerifiedDataColumn::__new_for_testing(sidecar); + let expected_sse_data_column = SseDataColumnSidecar::from_data_column_sidecar( + gossip_verified_data_column.as_data_column(), + ); + + let _ = harness + .chain + .process_gossip_data_columns(vec![gossip_verified_data_column], || Ok(())) + .await + .unwrap(); + + let sidecar_event = data_column_event_receiver.try_recv().unwrap(); + assert_eq!( + sidecar_event, + EventKind::DataColumnSidecar(expected_sse_data_column) + ); +} + /// Verifies that a blob event is emitted when blobs are received via RPC. #[tokio::test] async fn blob_sidecar_event_on_process_rpc_blobs() { @@ -95,3 +135,41 @@ async fn blob_sidecar_event_on_process_rpc_blobs() { } assert_eq!(sse_blobs, expected_sse_blobs); } + +#[tokio::test] +async fn data_column_sidecar_event_on_process_rpc_columns() { + let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec.clone()) + .deterministic_keypairs(8) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + // subscribe to blob sidecar events + let event_handler = harness.chain.event_handler.as_ref().unwrap(); + let mut data_column_event_receiver = event_handler.subscribe_data_column_sidecar(); + + // load the precomputed column sidecar to avoid computing them for every block in the tests. + let mut sidecar = RuntimeVariableList::>::from_ssz_bytes( + TEST_DATA_COLUMN_SIDECARS_SSZ, + spec.number_of_columns as usize, + ) + .unwrap()[0] + .clone(); + let parent_root = harness.chain.head().head_block_root(); + sidecar.signed_block_header.message.parent_root = parent_root; + let expected_sse_data_column = SseDataColumnSidecar::from_data_column_sidecar(&sidecar); + + let _ = harness + .chain + .process_rpc_custody_columns(vec![Arc::new(sidecar)]) + .await + .unwrap(); + + let sidecar_event = data_column_event_receiver.try_recv().unwrap(); + assert_eq!( + sidecar_event, + EventKind::DataColumnSidecar(expected_sse_data_column) + ); +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 2eaa33a9648..b220685b869 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -4741,6 +4741,9 @@ pub fn serve( api_types::EventTopic::BlobSidecar => { event_handler.subscribe_blob_sidecar() } + api_types::EventTopic::DataColumnSidecar => { + event_handler.subscribe_data_column_sidecar() + } api_types::EventTopic::Attestation => { event_handler.subscribe_attestation() } diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index b8c74d4dcdc..f7bda17eb1a 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -960,6 +960,35 @@ impl SseBlobSidecar { } } +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +pub struct SseDataColumnSidecar { + pub block_root: Hash256, + #[serde(with = "serde_utils::quoted_u64")] + pub index: u64, + pub slot: Slot, + pub kzg_commitments: Vec, + pub versioned_hashes: Vec, +} + +impl SseDataColumnSidecar { + pub fn from_data_column_sidecar( + data_column_sidecar: &DataColumnSidecar, + ) -> SseDataColumnSidecar { + let kzg_commitments = data_column_sidecar.kzg_commitments.to_vec(); + let versioned_hashes = kzg_commitments + .iter() + .map(|c| c.calculate_versioned_hash()) + .collect(); + SseDataColumnSidecar { + block_root: data_column_sidecar.block_root(), + index: data_column_sidecar.index, + slot: data_column_sidecar.slot(), + kzg_commitments, + versioned_hashes, + } + } +} + #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] pub struct SseFinalizedCheckpoint { pub block: Hash256, @@ -1110,6 +1139,7 @@ pub enum EventKind { SingleAttestation(Box), Block(SseBlock), BlobSidecar(SseBlobSidecar), + DataColumnSidecar(SseDataColumnSidecar), FinalizedCheckpoint(SseFinalizedCheckpoint), Head(SseHead), VoluntaryExit(SignedVoluntaryExit), @@ -1133,6 +1163,7 @@ impl EventKind { EventKind::Head(_) => "head", EventKind::Block(_) => "block", EventKind::BlobSidecar(_) => "blob_sidecar", + EventKind::DataColumnSidecar(_) => "data_column_sidecar", EventKind::Attestation(_) => "attestation", EventKind::SingleAttestation(_) => "single_attestation", EventKind::VoluntaryExit(_) => "voluntary_exit", @@ -1168,6 +1199,11 @@ impl EventKind { "blob_sidecar" => Ok(EventKind::BlobSidecar(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Blob Sidecar: {:?}", e)), )?)), + "data_column_sidecar" => Ok(EventKind::DataColumnSidecar( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!("Data Column Sidecar: {:?}", e)) + })?, + )), "chain_reorg" => Ok(EventKind::ChainReorg(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Chain Reorg: {:?}", e)), )?)), @@ -1257,6 +1293,7 @@ pub enum EventTopic { Head, Block, BlobSidecar, + DataColumnSidecar, Attestation, SingleAttestation, VoluntaryExit, @@ -1283,6 +1320,7 @@ impl FromStr for EventTopic { "head" => Ok(EventTopic::Head), "block" => Ok(EventTopic::Block), "blob_sidecar" => Ok(EventTopic::BlobSidecar), + "data_column_sidecar" => Ok(EventTopic::DataColumnSidecar), "attestation" => Ok(EventTopic::Attestation), "single_attestation" => Ok(EventTopic::SingleAttestation), "voluntary_exit" => Ok(EventTopic::VoluntaryExit), @@ -1310,6 +1348,7 @@ impl fmt::Display for EventTopic { EventTopic::Head => write!(f, "head"), EventTopic::Block => write!(f, "block"), EventTopic::BlobSidecar => write!(f, "blob_sidecar"), + EventTopic::DataColumnSidecar => write!(f, "data_column_sidecar"), EventTopic::Attestation => write!(f, "attestation"), EventTopic::SingleAttestation => write!(f, "single_attestation"), EventTopic::VoluntaryExit => write!(f, "voluntary_exit"),