Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/exex/exex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ reth-stages-api.workspace = true
## async
tokio.workspace = true
tokio-util.workspace = true
futures.workspace = true

## misc
eyre.workspace = true
Expand Down
74 changes: 74 additions & 0 deletions crates/exex/exex/src/backfill/job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use reth_evm::execute::{
BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor,
};
use reth_primitives::{BlockNumber, BlockWithSenders, Receipt};
use reth_provider::{
BlockReader, HeaderProvider, ProviderError, StateProviderFactory, TransactionVariant,
};
use reth_revm::database::StateProviderDatabase;
use reth_tracing::tracing::trace;
use std::ops::RangeInclusive;

use crate::BackfillJob;

/// Single block Backfill job started for a specific range.
///
/// It implements [`Iterator`] which executes a block each time the
/// iterator is advanced and yields ([`BlockWithSenders`], [`BlockExecutionOutput`])
#[derive(Debug, Clone)]
pub struct SingleBlockBackfillJob<E, P> {
executor: E,
provider: P,
pub(crate) range: RangeInclusive<BlockNumber>,
}

impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
where
E: BlockExecutorProvider,
P: HeaderProvider + BlockReader + StateProviderFactory,
{
type Item = Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>;

fn next(&mut self) -> Option<Self::Item> {
self.range.next().map(|block_number| self.execute_block(block_number))
}
}

impl<E, P> SingleBlockBackfillJob<E, P>
where
E: BlockExecutorProvider,
P: HeaderProvider + BlockReader + StateProviderFactory,
{
pub(crate) fn execute_block(
&self,
block_number: u64,
) -> Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError> {
let td = self
.provider
.header_td_by_number(block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;

// Fetch the block with senders for execution.
let block_with_senders = self
.provider
.block_with_senders(block_number.into(), TransactionVariant::WithHash)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;

// Configure the executor to use the previous block's state.
let executor = self.executor.executor(StateProviderDatabase::new(
self.provider.history_by_block_number(block_number.saturating_sub(1))?,
));

trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.block.body.len(), "Executing block");

let block_execution_output = executor.execute((&block_with_senders, td).into())?;

Ok((block_with_senders, block_execution_output))
}
}

impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
fn from(value: BackfillJob<E, P>) -> Self {
Self { executor: value.executor, provider: value.provider, range: value.range }
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use reth_evm::execute::{
BatchExecutor, BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor,
};
use job::SingleBlockBackfillJob;
use reth_evm::execute::{BatchExecutor, BlockExecutionError, BlockExecutorProvider};
use reth_node_api::FullNodeComponents;
use reth_primitives::{Block, BlockNumber, BlockWithSenders, Receipt};
use reth_primitives::{Block, BlockNumber};
use reth_primitives_traits::format_gas_throughput;
use reth_provider::{
BlockReader, Chain, HeaderProvider, ProviderError, StateProviderFactory, TransactionVariant,
Expand All @@ -15,6 +14,10 @@ use std::{
ops::RangeInclusive,
time::{Duration, Instant},
};
use stream::BackFillJobStream;

mod job;
mod stream;

/// Factory for creating new backfill jobs.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -198,74 +201,22 @@ impl<E, P> BackfillJob<E, P> {
pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
self.into()
}
}

impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
fn from(value: BackfillJob<E, P>) -> Self {
Self { executor: value.executor, provider: value.provider, range: value.range }
}
}

/// Single block Backfill job started for a specific range.
///
/// It implements [`Iterator`] which executes a block each time the
/// iterator is advanced and yields ([`BlockWithSenders`], [`BlockExecutionOutput`])
#[derive(Debug)]
pub struct SingleBlockBackfillJob<E, P> {
executor: E,
provider: P,
range: RangeInclusive<BlockNumber>,
}

impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
where
E: BlockExecutorProvider,
P: HeaderProvider + BlockReader + StateProviderFactory,
{
type Item = Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>;

fn next(&mut self) -> Option<Self::Item> {
self.range.next().map(|block_number| self.execute_block(block_number))
}
}

impl<E, P> SingleBlockBackfillJob<E, P>
where
E: BlockExecutorProvider,
P: HeaderProvider + BlockReader + StateProviderFactory,
{
fn execute_block(
&self,
block_number: u64,
) -> Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError> {
let td = self
.provider
.header_td_by_number(block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;

// Fetch the block with senders for execution.
let block_with_senders = self
.provider
.block_with_senders(block_number.into(), TransactionVariant::WithHash)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;

// Configure the executor to use the previous block's state.
let executor = self.executor.executor(StateProviderDatabase::new(
self.provider.history_by_block_number(block_number.saturating_sub(1))?,
));

trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.block.body.len(), "Executing block");

let block_execution_output = executor.execute((&block_with_senders, td).into())?;

Ok((block_with_senders, block_execution_output))
/// Converts the backfill job into a backfill job stream.
pub fn into_stream(self) -> BackFillJobStream<E, P>
where
E: BlockExecutorProvider + Clone + 'static,
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + 'static,
{
BackFillJobStream::new(self.into_single_blocks())
}
}

#[cfg(test)]
mod tests {
use crate::BackfillJobFactory;
use eyre::OptionExt;
use futures::StreamExt;
use reth_blockchain_tree::noop::NoopBlockchainTree;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, EthereumHardfork, MAINNET};
use reth_db_common::init::init_genesis;
Expand Down Expand Up @@ -519,4 +470,45 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_async_backfill() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

// Create a key pair for the sender
let key_pair = Keypair::new_global(&mut generators::rng());
let address = public_key_to_address(key_pair.public_key());

let chain_spec = chain_spec(address);

let executor = EthExecutorProvider::ethereum(chain_spec.clone());
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
init_genesis(provider_factory.clone())?;
let blockchain_db = BlockchainProvider::new(
provider_factory.clone(),
Arc::new(NoopBlockchainTree::default()),
)?;

// Create first 2 blocks
let blocks_and_execution_outcomes =
blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;

// Backfill the first block
let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone());
let mut backfill_stream = factory.backfill(1..=1).into_stream();

// execute first block
let (block, mut execution_output) = backfill_stream.next().await.unwrap().unwrap();
execution_output.state.reverts.sort();
let sealed_block_with_senders = blocks_and_execution_outcomes[0].0.clone();
let expected_block = sealed_block_with_senders.unseal();
let expected_output = &blocks_and_execution_outcomes[0].1;
assert_eq!(block, expected_block);
assert_eq!(&execution_output, expected_output);

// expect no more blocks
assert!(backfill_stream.next().await.is_none());

Ok(())
}
}
99 changes: 99 additions & 0 deletions crates/exex/exex/src/backfill/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use super::job::SingleBlockBackfillJob;
use futures::{
stream::{FuturesOrdered, Stream},
StreamExt,
};
use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider};
use reth_primitives::{BlockNumber, BlockWithSenders, Receipt};
use reth_provider::{BlockReader, HeaderProvider, StateProviderFactory};
use std::{
ops::RangeInclusive,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::task::JoinHandle;

type BackfillTasks = FuturesOrdered<
JoinHandle<Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>>,
>;

/// The default parallelism for active tasks in [`BackFillJobStream`].
const DEFAULT_PARALLELISM: usize = 4;

/// Stream for processing backfill jobs asynchronously.
///
/// This struct manages the execution of [`SingleBlockBackfillJob`] tasks, allowing blocks to be
/// processed asynchronously but in order within a specified range.
#[derive(Debug)]
pub struct BackFillJobStream<E, P> {
job: SingleBlockBackfillJob<E, P>,
tasks: BackfillTasks,
range: RangeInclusive<BlockNumber>,
parallelism: usize,
}

impl<E, P> BackFillJobStream<E, P>
where
E: BlockExecutorProvider + Clone + Send + 'static,
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + 'static,
{
/// Creates a new [`BackFillJobStream`] with the default parallelism.
///
/// # Parameters
/// - `job`: The [`SingleBlockBackfillJob`] to be executed asynchronously.
///
/// # Returns
/// A new instance of [`BackFillJobStream`] with the default parallelism.
pub fn new(job: SingleBlockBackfillJob<E, P>) -> Self {
let range = job.range.clone();
Self { job, tasks: FuturesOrdered::new(), range, parallelism: DEFAULT_PARALLELISM }
}

/// Configures the parallelism of the [`BackFillJobStream`] to handle active tasks.
///
/// # Parameters
/// - `parallelism`: The parallelism to handle active tasks.
///
/// # Returns
/// The modified instance of [`BackFillJobStream`] with the specified parallelism.
pub const fn with_parallelism(mut self, parallelism: usize) -> Self {
self.parallelism = parallelism;
self
}

fn spawn_task(
&self,
block_number: BlockNumber,
) -> JoinHandle<Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>>
{
let job = self.job.clone();
tokio::task::spawn_blocking(move || job.execute_block(block_number))
}
}

impl<E, P> Stream for BackFillJobStream<E, P>
where
E: BlockExecutorProvider + Clone + Send + 'static,
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + 'static + Unpin,
{
type Item = Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

// Spawn new tasks only if we are below the parallelism configured.
while this.tasks.len() < this.parallelism {
if let Some(block_number) = this.range.next() {
let task = this.spawn_task(block_number);
this.tasks.push_back(task);
} else {
break;
}
}

match ready!(this.tasks.poll_next_unpin(cx)) {
Some(res) => Poll::Ready(Some(res.map_err(|e| BlockExecutionError::Other(e.into()))?)),
Comment thread
shekhirin marked this conversation as resolved.
None => Poll::Ready(None),
}
}
}