@@ -56,12 +56,14 @@ use crate::observed_data_sidecars::ObservedDataSidecars;
5656use crate :: observed_operations:: { ObservationOutcome , ObservedOperations } ;
5757use crate :: observed_slashable:: ObservedSlashable ;
5858use crate :: persisted_beacon_chain:: PersistedBeaconChain ;
59+ use crate :: persisted_custody:: persist_custody_context;
5960use crate :: persisted_fork_choice:: PersistedForkChoice ;
6061use crate :: pre_finalization_cache:: PreFinalizationBlockCache ;
6162use crate :: shuffling_cache:: { BlockShufflingIds , ShufflingCache } ;
6263use crate :: sync_committee_verification:: {
6364 Error as SyncCommitteeError , VerifiedSyncCommitteeMessage , VerifiedSyncContribution ,
6465} ;
66+ use crate :: validator_custody:: CustodyContextSsz ;
6567use crate :: validator_monitor:: {
6668 get_slot_delay_ms, timestamp_now, ValidatorMonitor ,
6769 HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS ,
@@ -71,7 +73,9 @@ use crate::{
7173 kzg_utils, metrics, AvailabilityPendingExecutedBlock , BeaconChainError , BeaconForkChoiceStore ,
7274 BeaconSnapshot , CachedHead ,
7375} ;
74- use eth2:: types:: { EventKind , SseBlobSidecar , SseBlock , SseExtendedPayloadAttributes } ;
76+ use eth2:: types:: {
77+ EventKind , SseBlobSidecar , SseBlock , SseDataColumnSidecar , SseExtendedPayloadAttributes ,
78+ } ;
7579use execution_layer:: {
7680 BlockProposalContents , BlockProposalContentsType , BuilderParams , ChainHealth , ExecutionLayer ,
7781 FailedCondition , PayloadAttributes , PayloadStatus ,
@@ -648,6 +652,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
648652 Ok ( ( ) )
649653 }
650654
655+ /// Persists the custody information to disk.
656+ pub fn persist_custody_context ( & self ) -> Result < ( ) , Error > {
657+ let custody_context: CustodyContextSsz = self
658+ . data_availability_checker
659+ . custody_context ( )
660+ . as_ref ( )
661+ . into ( ) ;
662+ debug ! ( ?custody_context, "Persisting custody context to store" ) ;
663+
664+ persist_custody_context :: < T :: EthSpec , T :: HotStore , T :: ColdStore > (
665+ self . store . clone ( ) ,
666+ custody_context,
667+ ) ?;
668+
669+ Ok ( ( ) )
670+ }
671+
651672 /// Returns the slot _right now_ according to `self.slot_clock`. Returns `Err` if the slot is
652673 /// unavailable.
653674 ///
@@ -2960,7 +2981,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
29602981 pub async fn verify_block_for_gossip (
29612982 self : & Arc < Self > ,
29622983 block : Arc < SignedBeaconBlock < T :: EthSpec > > ,
2963- custody_columns_count : usize ,
29642984 ) -> Result < GossipVerifiedBlock < T > , BlockError > {
29652985 let chain = self . clone ( ) ;
29662986 self . task_executor
@@ -2970,7 +2990,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
29702990 let slot = block. slot ( ) ;
29712991 let graffiti_string = block. message ( ) . body ( ) . graffiti ( ) . as_utf8_lossy ( ) ;
29722992
2973- match GossipVerifiedBlock :: new ( block, & chain, custody_columns_count ) {
2993+ match GossipVerifiedBlock :: new ( block, & chain) {
29742994 Ok ( verified) => {
29752995 let commitments_formatted = verified. block . commitments_formatted ( ) ;
29762996 debug ! (
@@ -3059,6 +3079,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
30593079 return Err ( BlockError :: DuplicateFullyImported ( block_root) ) ;
30603080 }
30613081
3082+ self . emit_sse_data_column_sidecar_events (
3083+ & block_root,
3084+ data_columns. iter ( ) . map ( |column| column. as_data_column ( ) ) ,
3085+ ) ;
3086+
30623087 let r = self
30633088 . check_gossip_data_columns_availability_and_import (
30643089 slot,
@@ -3130,10 +3155,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
31303155 return Err ( BlockError :: DuplicateFullyImported ( block_root) ) ;
31313156 }
31323157
3133- // process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS
3134- // consumers don't expect the blobs event to fire erratically.
3135- if let EngineGetBlobsOutput :: Blobs ( blobs) = & engine_get_blobs_output {
3136- self . emit_sse_blob_sidecar_events ( & block_root, blobs. iter ( ) . map ( |b| b. as_blob ( ) ) ) ;
3158+ match & engine_get_blobs_output {
3159+ EngineGetBlobsOutput :: Blobs ( blobs) => {
3160+ self . emit_sse_blob_sidecar_events ( & block_root, blobs. iter ( ) . map ( |b| b. as_blob ( ) ) ) ;
3161+ }
3162+ EngineGetBlobsOutput :: CustodyColumns ( columns) => {
3163+ self . emit_sse_data_column_sidecar_events (
3164+ & block_root,
3165+ columns. iter ( ) . map ( |column| column. as_data_column ( ) ) ,
3166+ ) ;
3167+ }
31373168 }
31383169
31393170 let r = self
@@ -3163,6 +3194,31 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
31633194 }
31643195 }
31653196
3197+ fn emit_sse_data_column_sidecar_events < ' a , I > (
3198+ self : & Arc < Self > ,
3199+ block_root : & Hash256 ,
3200+ data_columns_iter : I ,
3201+ ) where
3202+ I : Iterator < Item = & ' a DataColumnSidecar < T :: EthSpec > > ,
3203+ {
3204+ if let Some ( event_handler) = self . event_handler . as_ref ( ) {
3205+ if event_handler. has_data_column_sidecar_subscribers ( ) {
3206+ let imported_data_columns = self
3207+ . data_availability_checker
3208+ . cached_data_column_indexes ( block_root)
3209+ . unwrap_or_default ( ) ;
3210+ let new_data_columns =
3211+ data_columns_iter. filter ( |b| !imported_data_columns. contains ( & b. index ) ) ;
3212+
3213+ for data_column in new_data_columns {
3214+ event_handler. register ( EventKind :: DataColumnSidecar (
3215+ SseDataColumnSidecar :: from_data_column_sidecar ( data_column) ,
3216+ ) ) ;
3217+ }
3218+ }
3219+ }
3220+ }
3221+
31663222 /// Cache the columns in the processing cache, process it, then evict it from the cache if it was
31673223 /// imported or errors.
31683224 pub async fn process_rpc_custody_columns (
@@ -3203,6 +3259,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
32033259 }
32043260 }
32053261
3262+ self . emit_sse_data_column_sidecar_events (
3263+ & block_root,
3264+ custody_columns. iter ( ) . map ( |column| column. as_ref ( ) ) ,
3265+ ) ;
3266+
32063267 let r = self
32073268 . check_rpc_custody_columns_availability_and_import ( slot, block_root, custody_columns)
32083269 . await ;
@@ -3610,7 +3671,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
36103671 data_columns. iter ( ) . map ( |c| c. as_data_column ( ) ) ,
36113672 ) ?;
36123673 self . data_availability_checker
3613- . put_gossip_verified_data_columns ( block_root, data_columns) ?
3674+ . put_kzg_verified_custody_data_columns ( block_root, data_columns) ?
36143675 }
36153676 } ;
36163677
@@ -7076,7 +7137,8 @@ impl<T: BeaconChainTypes> Drop for BeaconChain<T> {
70767137 fn drop ( & mut self ) {
70777138 let drop = || -> Result < ( ) , Error > {
70787139 self . persist_fork_choice ( ) ?;
7079- self . persist_op_pool ( )
7140+ self . persist_op_pool ( ) ?;
7141+ self . persist_custody_context ( )
70807142 } ;
70817143
70827144 if let Err ( e) = drop ( ) {
0 commit comments