Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/exex/exex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ reth-stages-api.workspace = true
## async
tokio.workspace = true
tokio-util.workspace = true
futures.workspace = true

## misc
eyre.workspace = true
Expand Down
58 changes: 55 additions & 3 deletions crates/exex/exex/src/backfill.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::BackFillJobStream;
use reth_evm::execute::{
BatchExecutor, BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor,
};
Expand Down Expand Up @@ -198,6 +199,15 @@ impl<E, P> BackfillJob<E, P> {
pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
self.into()
}

/// Converts the backfill job into a backfill job stream.
pub fn into_stream(self) -> BackFillJobStream<E, P>
where
E: BlockExecutorProvider + Clone + 'static,
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + 'static,
{
BackFillJobStream::new(self.into_single_blocks())
}
}

impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
Expand All @@ -210,11 +220,11 @@ impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
///
/// It implements [`Iterator`] which executes a block each time the
/// iterator is advanced and yields ([`BlockWithSenders`], [`BlockExecutionOutput`])
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SingleBlockBackfillJob<E, P> {
executor: E,
provider: P,
range: RangeInclusive<BlockNumber>,
pub(crate) range: RangeInclusive<BlockNumber>,
}

impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
Expand All @@ -234,7 +244,7 @@ where
E: BlockExecutorProvider,
P: HeaderProvider + BlockReader + StateProviderFactory,
{
fn execute_block(
pub(crate) fn execute_block(
&self,
block_number: u64,
) -> Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError> {
Expand Down Expand Up @@ -266,6 +276,7 @@ where
mod tests {
use crate::BackfillJobFactory;
use eyre::OptionExt;
use futures::StreamExt;
use reth_blockchain_tree::noop::NoopBlockchainTree;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, EthereumHardfork, MAINNET};
use reth_db_common::init::init_genesis;
Expand Down Expand Up @@ -519,4 +530,45 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_async_backfill() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

// Create a key pair for the sender
let key_pair = Keypair::new_global(&mut generators::rng());
let address = public_key_to_address(key_pair.public_key());

let chain_spec = chain_spec(address);

let executor = EthExecutorProvider::ethereum(chain_spec.clone());
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
init_genesis(provider_factory.clone())?;
let blockchain_db = BlockchainProvider::new(
provider_factory.clone(),
Arc::new(NoopBlockchainTree::default()),
)?;

// Create first 2 blocks
let blocks_and_execution_outcomes =
blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;

// Backfill the first block
let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone());
let mut backfill_stream = factory.backfill(1..=1).into_stream();

// execute first block
let (block, mut execution_output) = backfill_stream.next().await.unwrap().unwrap();
execution_output.state.reverts.sort();
let sealed_block_with_senders = blocks_and_execution_outcomes[0].0.clone();
let expected_block = sealed_block_with_senders.unseal();
let expected_output = &blocks_and_execution_outcomes[0].1;
assert_eq!(block, expected_block);
assert_eq!(&execution_output, expected_output);

// expect no more blocks
assert!(backfill_stream.next().await.is_none());

Ok(())
}
}
76 changes: 76 additions & 0 deletions crates/exex/exex/src/backfill_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use crate::SingleBlockBackfillJob;
use futures::{
stream::{FuturesOrdered, Stream},
StreamExt,
};
use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider};
use reth_primitives::{BlockNumber, BlockWithSenders, Receipt};
use reth_provider::{BlockReader, HeaderProvider, StateProviderFactory};
use std::{
ops::RangeInclusive,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::task::JoinHandle;

/// Stream for processing backfill jobs asynchronously.
///
/// This struct manages the execution of `SingleBlockBackfillJob` tasks, allowing blocks to be
/// processed asynchronously within a specified range.
#[derive(Debug)]
pub struct BackFillJobStream<E, P> {
job: SingleBlockBackfillJob<E, P>,
tasks: FuturesOrdered<
JoinHandle<Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>>,
>,
range: RangeInclusive<BlockNumber>,
}

impl<E, P> BackFillJobStream<E, P>
where
E: BlockExecutorProvider + Clone + Send + 'static,
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + 'static,
{
/// Creates a new `BackFillJobStream`.
///
/// # Parameters
/// - `job`: The `SingleBlockBackfillJob` to be executed asynchronously.
///
/// # Returns
/// A new instance of `BackFillJobStream`.
pub fn new(job: SingleBlockBackfillJob<E, P>) -> Self {
let range = job.range.clone();
Self { job, tasks: FuturesOrdered::new(), range }
}

fn spawn_task(
&self,
block_number: BlockNumber,
) -> JoinHandle<Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>>
{
let job = self.job.clone();
tokio::task::spawn_blocking(move || job.execute_block(block_number))
}
}

impl<E, P> Stream for BackFillJobStream<E, P>
where
E: BlockExecutorProvider + Clone + Send + 'static,
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + 'static + Unpin,
{
type Item = Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

if let Some(block_number) = this.range.next() {
let task = this.spawn_task(block_number);
this.tasks.push_back(task);
}
Comment thread
loocapro marked this conversation as resolved.
Outdated

match ready!(this.tasks.poll_next_unpin(cx)) {
Some(res) => Poll::Ready(Some(res.map_err(|e| BlockExecutionError::Other(e.into()))?)),
None => Poll::Ready(None),
}
}
}
3 changes: 3 additions & 0 deletions crates/exex/exex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
mod backfill;
pub use backfill::*;

mod backfill_stream;
pub use backfill_stream::*;

mod context;
pub use context::*;

Expand Down