Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 48 additions & 5 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ use crate::{
kzg_utils, metrics, AvailabilityPendingExecutedBlock, BeaconChainError, BeaconForkChoiceStore,
BeaconSnapshot, CachedHead,
};
use eth2::types::{EventKind, SseBlobSidecar, SseBlock, SseExtendedPayloadAttributes};
use eth2::types::{
EventKind, SseBlobSidecar, SseBlock, SseDataColumnSidecar, SseExtendedPayloadAttributes,
};
use execution_layer::{
BlockProposalContents, BlockProposalContentsType, BuilderParams, ChainHealth, ExecutionLayer,
FailedCondition, PayloadAttributes, PayloadStatus,
Expand Down Expand Up @@ -3087,6 +3089,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::DuplicateFullyImported(block_root));
}

self.emit_sse_data_column_sidecar_events(
&block_root,
data_columns.iter().map(|column| column.as_data_column()),
);

let r = self
.check_gossip_data_columns_availability_and_import(
slot,
Expand Down Expand Up @@ -3158,10 +3165,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::DuplicateFullyImported(block_root));
}

// process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS
// consumers don't expect the blobs event to fire erratically.
if let EngineGetBlobsOutput::Blobs(blobs) = &engine_get_blobs_output {
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().map(|b| b.as_blob()));
match &engine_get_blobs_output {
EngineGetBlobsOutput::Blobs(blobs) => {
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().map(|b| b.as_blob()));
}
EngineGetBlobsOutput::CustodyColumns(columns) => {
self.emit_sse_data_column_sidecar_events(
&block_root,
columns.iter().map(|column| column.as_data_column()),
);
}
}

let r = self
Expand Down Expand Up @@ -3191,6 +3204,31 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

fn emit_sse_data_column_sidecar_events<'a, I>(
self: &Arc<Self>,
block_root: &Hash256,
data_columns_iter: I,
) where
I: Iterator<Item = &'a DataColumnSidecar<T::EthSpec>>,
{
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_data_column_sidecar_subscribers() {
let imported_data_columns = self
.data_availability_checker
.cached_data_column_indexes(block_root)
.unwrap_or_default();
let new_data_columns =
data_columns_iter.filter(|b| !imported_data_columns.contains(&b.index));

for data_column in new_data_columns {
event_handler.register(EventKind::DataColumnSidecar(
SseDataColumnSidecar::from_data_column_sidecar(data_column),
));
}
}
}
}

/// Cache the columns in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_rpc_custody_columns(
Expand Down Expand Up @@ -3231,6 +3269,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

self.emit_sse_data_column_sidecar_events(
&block_root,
custody_columns.iter().map(|column| column.as_ref()),
);

let r = self
.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
.await;
Expand Down
4 changes: 1 addition & 3 deletions beacon_node/beacon_chain/src/data_column_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ impl<T: BeaconChainTypes, O: ObservationStrategy> GossipVerifiedDataColumn<T, O>
}

/// Create a `GossipVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY.
#[cfg(test)]
pub(crate) fn __new_for_testing(column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>) -> Self {
pub fn __new_for_testing(column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>) -> Self {
Self {
block_root: column_sidecar.block_root(),
data_column: KzgVerifiedDataColumn::__new_for_testing(column_sidecar),
Expand Down Expand Up @@ -268,7 +267,6 @@ impl<E: EthSpec> KzgVerifiedDataColumn<E> {
}

/// Create a `KzgVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY.
#[cfg(test)]
pub(crate) fn __new_for_testing(data_column: Arc<DataColumnSidecar<E>>) -> Self {
Self { data: data_column }
}
Expand Down
15 changes: 15 additions & 0 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct ServerSentEventHandler<E: EthSpec> {
single_attestation_tx: Sender<EventKind<E>>,
block_tx: Sender<EventKind<E>>,
blob_sidecar_tx: Sender<EventKind<E>>,
data_column_sidecar_tx: Sender<EventKind<E>>,
finalized_tx: Sender<EventKind<E>>,
head_tx: Sender<EventKind<E>>,
exit_tx: Sender<EventKind<E>>,
Expand All @@ -37,6 +38,7 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
let (single_attestation_tx, _) = broadcast::channel(capacity);
let (block_tx, _) = broadcast::channel(capacity);
let (blob_sidecar_tx, _) = broadcast::channel(capacity);
let (data_column_sidecar_tx, _) = broadcast::channel(capacity);
let (finalized_tx, _) = broadcast::channel(capacity);
let (head_tx, _) = broadcast::channel(capacity);
let (exit_tx, _) = broadcast::channel(capacity);
Expand All @@ -57,6 +59,7 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
single_attestation_tx,
block_tx,
blob_sidecar_tx,
data_column_sidecar_tx,
finalized_tx,
head_tx,
exit_tx,
Expand Down Expand Up @@ -99,6 +102,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
.blob_sidecar_tx
.send(kind)
.map(|count| log_count("blob sidecar", count)),
EventKind::DataColumnSidecar(_) => self
.data_column_sidecar_tx
.send(kind)
.map(|count| log_count("data_column_sidecar", count)),
EventKind::FinalizedCheckpoint(_) => self
.finalized_tx
.send(kind)
Expand Down Expand Up @@ -177,6 +184,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
self.blob_sidecar_tx.subscribe()
}

pub fn subscribe_data_column_sidecar(&self) -> Receiver<EventKind<E>> {
self.data_column_sidecar_tx.subscribe()
}

pub fn subscribe_finalized(&self) -> Receiver<EventKind<E>> {
self.finalized_tx.subscribe()
}
Expand Down Expand Up @@ -249,6 +260,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
self.blob_sidecar_tx.receiver_count() > 0
}

pub fn has_data_column_sidecar_subscribers(&self) -> bool {
self.data_column_sidecar_tx.receiver_count() > 0
}

pub fn has_finalized_subscribers(&self) -> bool {
self.finalized_tx.receiver_count() > 0
}
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub const FORK_NAME_ENV_VAR: &str = "FORK_NAME";

// Pre-computed data column sidecar using a single static blob from:
// `beacon_node/execution_layer/src/test_utils/fixtures/mainnet/test_blobs_bundle.ssz`
const TEST_DATA_COLUMN_SIDECARS_SSZ: &[u8] =
pub const TEST_DATA_COLUMN_SIDECARS_SSZ: &[u8] =
include_bytes!("test_utils/fixtures/test_data_column_sidecars.ssz");

// Default target aggregators to set during testing, this ensures an aggregator at each slot.
Expand Down
84 changes: 81 additions & 3 deletions beacon_node/beacon_chain/tests/events.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use beacon_chain::blob_verification::GossipVerifiedBlob;
use beacon_chain::test_utils::BeaconChainHarness;
use eth2::types::{EventKind, SseBlobSidecar};
use beacon_chain::data_column_verification::GossipVerifiedDataColumn;
use beacon_chain::test_utils::{BeaconChainHarness, TEST_DATA_COLUMN_SIDECARS_SSZ};
use eth2::types::{EventKind, SseBlobSidecar, SseDataColumnSidecar};
use rand::rngs::StdRng;
use rand::SeedableRng;
use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{BlobSidecar, EthSpec, ForkName, MinimalEthSpec};
use types::test_utils::TestRandom;
use types::{
BlobSidecar, DataColumnSidecar, EthSpec, ForkName, MinimalEthSpec, RuntimeVariableList,
};

type E = MinimalEthSpec;

Expand Down Expand Up @@ -43,6 +47,42 @@ async fn blob_sidecar_event_on_process_gossip_blob() {
assert_eq!(sidecar_event, EventKind::BlobSidecar(expected_sse_blobs));
}

/// Verifies that a data column event is emitted when a gossip verified data column is received via gossip or the publish block API.
#[tokio::test]
async fn data_column_sidecar_event_on_process_gossip_data_column() {
let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec()));
let harness = BeaconChainHarness::builder(E::default())
.spec(spec)
.deterministic_keypairs(8)
.fresh_ephemeral_store()
.mock_execution_layer()
.build();

// subscribe to blob sidecar events
let event_handler = harness.chain.event_handler.as_ref().unwrap();
let mut data_column_event_receiver = event_handler.subscribe_data_column_sidecar();

// build and process a gossip verified data column
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
let sidecar = Arc::new(DataColumnSidecar::random_for_test(&mut rng));
let gossip_verified_data_column = GossipVerifiedDataColumn::__new_for_testing(sidecar);
let expected_sse_data_column = SseDataColumnSidecar::from_data_column_sidecar(
gossip_verified_data_column.as_data_column(),
);

let _ = harness
.chain
.process_gossip_data_columns(vec![gossip_verified_data_column], || Ok(()))
.await
.unwrap();

let sidecar_event = data_column_event_receiver.try_recv().unwrap();
assert_eq!(
sidecar_event,
EventKind::DataColumnSidecar(expected_sse_data_column)
);
}

/// Verifies that a blob event is emitted when blobs are received via RPC.
#[tokio::test]
async fn blob_sidecar_event_on_process_rpc_blobs() {
Expand Down Expand Up @@ -95,3 +135,41 @@ async fn blob_sidecar_event_on_process_rpc_blobs() {
}
assert_eq!(sse_blobs, expected_sse_blobs);
}

#[tokio::test]
async fn data_column_sidecar_event_on_process_rpc_columns() {
let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec()));
let harness = BeaconChainHarness::builder(E::default())
.spec(spec.clone())
.deterministic_keypairs(8)
.fresh_ephemeral_store()
.mock_execution_layer()
.build();

// subscribe to blob sidecar events
let event_handler = harness.chain.event_handler.as_ref().unwrap();
let mut data_column_event_receiver = event_handler.subscribe_data_column_sidecar();

// load the precomputed column sidecar to avoid computing them for every block in the tests.
let mut sidecar = RuntimeVariableList::<DataColumnSidecar<E>>::from_ssz_bytes(
TEST_DATA_COLUMN_SIDECARS_SSZ,
spec.number_of_columns as usize,
)
.unwrap()[0]
.clone();
let parent_root = harness.chain.head().head_block_root();
sidecar.signed_block_header.message.parent_root = parent_root;
let expected_sse_data_column = SseDataColumnSidecar::from_data_column_sidecar(&sidecar);

let _ = harness
.chain
.process_rpc_custody_columns(vec![Arc::new(sidecar)])
.await
.unwrap();

let sidecar_event = data_column_event_receiver.try_recv().unwrap();
assert_eq!(
sidecar_event,
EventKind::DataColumnSidecar(expected_sse_data_column)
);
}
3 changes: 3 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4741,6 +4741,9 @@ pub fn serve<T: BeaconChainTypes>(
api_types::EventTopic::BlobSidecar => {
event_handler.subscribe_blob_sidecar()
}
api_types::EventTopic::DataColumnSidecar => {
event_handler.subscribe_data_column_sidecar()
}
api_types::EventTopic::Attestation => {
event_handler.subscribe_attestation()
}
Expand Down
39 changes: 39 additions & 0 deletions common/eth2/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,35 @@ impl SseBlobSidecar {
}
}

#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
pub struct SseDataColumnSidecar {
pub block_root: Hash256,
#[serde(with = "serde_utils::quoted_u64")]
pub index: u64,
pub slot: Slot,
pub kzg_commitments: Vec<KzgCommitment>,
pub versioned_hashes: Vec<VersionedHash>,
}

impl SseDataColumnSidecar {
pub fn from_data_column_sidecar<E: EthSpec>(
data_column_sidecar: &DataColumnSidecar<E>,
) -> SseDataColumnSidecar {
let kzg_commitments = data_column_sidecar.kzg_commitments.to_vec();
let versioned_hashes = kzg_commitments
.iter()
.map(|c| c.calculate_versioned_hash())
.collect();
SseDataColumnSidecar {
block_root: data_column_sidecar.block_root(),
index: data_column_sidecar.index,
slot: data_column_sidecar.slot(),
kzg_commitments,
versioned_hashes,
}
}
}

#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
pub struct SseFinalizedCheckpoint {
pub block: Hash256,
Expand Down Expand Up @@ -1110,6 +1139,7 @@ pub enum EventKind<E: EthSpec> {
SingleAttestation(Box<SingleAttestation>),
Block(SseBlock),
BlobSidecar(SseBlobSidecar),
DataColumnSidecar(SseDataColumnSidecar),
FinalizedCheckpoint(SseFinalizedCheckpoint),
Head(SseHead),
VoluntaryExit(SignedVoluntaryExit),
Expand All @@ -1133,6 +1163,7 @@ impl<E: EthSpec> EventKind<E> {
EventKind::Head(_) => "head",
EventKind::Block(_) => "block",
EventKind::BlobSidecar(_) => "blob_sidecar",
EventKind::DataColumnSidecar(_) => "data_column_sidecar",
EventKind::Attestation(_) => "attestation",
EventKind::SingleAttestation(_) => "single_attestation",
EventKind::VoluntaryExit(_) => "voluntary_exit",
Expand Down Expand Up @@ -1168,6 +1199,11 @@ impl<E: EthSpec> EventKind<E> {
"blob_sidecar" => Ok(EventKind::BlobSidecar(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Blob Sidecar: {:?}", e)),
)?)),
"data_column_sidecar" => Ok(EventKind::DataColumnSidecar(
serde_json::from_str(data).map_err(|e| {
ServerError::InvalidServerSentEvent(format!("Data Column Sidecar: {:?}", e))
})?,
)),
"chain_reorg" => Ok(EventKind::ChainReorg(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Chain Reorg: {:?}", e)),
)?)),
Expand Down Expand Up @@ -1257,6 +1293,7 @@ pub enum EventTopic {
Head,
Block,
BlobSidecar,
DataColumnSidecar,
Attestation,
SingleAttestation,
VoluntaryExit,
Expand All @@ -1283,6 +1320,7 @@ impl FromStr for EventTopic {
"head" => Ok(EventTopic::Head),
"block" => Ok(EventTopic::Block),
"blob_sidecar" => Ok(EventTopic::BlobSidecar),
"data_column_sidecar" => Ok(EventTopic::DataColumnSidecar),
"attestation" => Ok(EventTopic::Attestation),
"single_attestation" => Ok(EventTopic::SingleAttestation),
"voluntary_exit" => Ok(EventTopic::VoluntaryExit),
Expand Down Expand Up @@ -1310,6 +1348,7 @@ impl fmt::Display for EventTopic {
EventTopic::Head => write!(f, "head"),
EventTopic::Block => write!(f, "block"),
EventTopic::BlobSidecar => write!(f, "blob_sidecar"),
EventTopic::DataColumnSidecar => write!(f, "data_column_sidecar"),
EventTopic::Attestation => write!(f, "attestation"),
EventTopic::SingleAttestation => write!(f, "single_attestation"),
EventTopic::VoluntaryExit => write!(f, "voluntary_exit"),
Expand Down