Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions beacon_node/beacon_chain/src/fetch_blobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
return Ok(None);
}

debug!(num_fetched_blobs, "All expected blobs received from the EL");
inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL);

if chain_adapter.fork_choice_contains_block(&block_root) {
Expand All @@ -276,37 +277,45 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
}

let chain_adapter = Arc::new(chain_adapter);
let custody_columns = compute_and_publish_data_columns(
let custody_columns_to_import = compute_custody_columns_to_import(
&chain_adapter,
block.clone(),
blobs,
proofs,
custody_columns_indices,
publish_fn,
)
.await?;

debug!(num_fetched_blobs, "Processing engine blobs");
if custody_columns_to_import.is_empty() {
debug!(
info = "No new data columns to import",
"Ignoring EL blobs response"
);
return Ok(None);
}

publish_fn(EngineGetBlobsOutput::CustodyColumns(
custody_columns_to_import.clone(),
));

let availability_processing_status = chain_adapter
.process_engine_blobs(
block.slot(),
block_root,
EngineGetBlobsOutput::CustodyColumns(custody_columns),
EngineGetBlobsOutput::CustodyColumns(custody_columns_to_import),
)
.await?;

Ok(Some(availability_processing_status))
}

/// Offload the data column computation to a blocking task to avoid holding up the async runtime.
async fn compute_and_publish_data_columns<T: BeaconChainTypes>(
async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
chain_adapter: &Arc<FetchBlobsBeaconAdapter<T>>,
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
blobs: Vec<Blob<T::EthSpec>>,
proofs: Vec<KzgProofs<T::EthSpec>>,
custody_columns_indices: HashSet<ColumnIndex>,
publish_fn: impl Fn(EngineGetBlobsOutput<T>) + Send + 'static,
) -> Result<Vec<GossipVerifiedDataColumn<T, DoNotObserve>>, FetchEngineBlobError> {
let kzg = chain_adapter.kzg().clone();
let spec = chain_adapter.spec().clone();
Expand Down Expand Up @@ -380,13 +389,9 @@ async fn compute_and_publish_data_columns<T: BeaconChainTypes>(
.collect::<Result<Vec<_>, _>>()
.map_err(FetchEngineBlobError::GossipDataColumn)?;

publish_fn(EngineGetBlobsOutput::CustodyColumns(
columns_to_import_and_publish.clone(),
));

Ok(columns_to_import_and_publish)
},
"compute_and_publish_data_columns",
"compute_custody_columns_to_import",
)
.ok_or(FetchEngineBlobError::RuntimeShutdown)?
.await
Expand Down
48 changes: 47 additions & 1 deletion beacon_node/beacon_chain/src/fetch_blobs/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::data_column_verification::GossipVerifiedDataColumn;
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::fetch_blobs::fetch_blobs_beacon_adapter::MockFetchBlobsBeaconAdapter;
use crate::fetch_blobs::{
fetch_and_process_engine_blobs_inner, EngineGetBlobsOutput, FetchEngineBlobError,
Expand Down Expand Up @@ -139,6 +139,52 @@ async fn test_fetch_blobs_v2_block_imported_after_el_response() {
);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v2_no_new_columns_to_import() {
let mut mock_adapter = mock_beacon_adapter();
let (publish_fn, publish_fn_args) = mock_publish_fn();
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
let block_root = block.canonical_root();

// **GIVEN**:
// All blobs returned
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
// block not yet imported into fork choice
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
// All data columns already seen on gossip
mock_adapter
.expect_verify_data_column_for_gossip()
.returning(|c| {
Err(GossipDataColumnError::PriorKnown {
proposer: c.block_proposer_index(),
slot: c.slot(),
index: c.index,
})
});
// No blobs should be processed
mock_adapter.expect_process_engine_blobs().times(0);

// **WHEN**: Trigger `fetch_blobs` on the block
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns.clone(),
publish_fn,
)
.await
.expect("fetch blobs should succeed");

// **THEN**: Should NOT be processed and no columns should be published.
assert_eq!(processing_status, None);
assert_eq!(
publish_fn_args.lock().unwrap().len(),
0,
"no columns should be published"
);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v2_success() {
let mut mock_adapter = mock_beacon_adapter();
Expand Down