Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ reth-payload-primitives = { git = "https://github.com/scroll-tech/reth.git", def
reth-primitives = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-primitives-traits = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-provider = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-rpc-builder = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-rpc-server-types = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-tasks = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-tokio-util = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion crates/database/db/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ mod test {

// Should return the highest safe block (block 201)
let latest_safe = db.get_latest_safe_l2_block().await.unwrap();
assert_eq!(latest_safe, Some(safe_block_2));
assert_eq!(latest_safe, Some((safe_block_2, batch_info)));
}

#[tokio::test]
Expand Down
1 change: 1 addition & 0 deletions crates/database/db/src/models/batch_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl From<Model> for BatchCommitData {
blob_versioned_hash: value
.blob_hash
.map(|b| b.as_slice().try_into().expect("data persisted in database is valid")),
finalized_block_number: value.finalized_block_number.map(|b| b as u64),
}
}
}
8 changes: 8 additions & 0 deletions crates/database/db/src/models/l2_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ impl Model {
pub(crate) fn block_info(&self) -> BlockInfo {
BlockInfo { number: self.block_number as u64, hash: B256::from_slice(&self.block_hash) }
}

pub(crate) fn batch_info(&self) -> Option<BatchInfo> {
self.batch_hash.as_ref().map(|hash| BatchInfo {
index: self.batch_index.expect("batch index must be present if batch hash is present")
as u64,
hash: B256::from_slice(hash),
})
}
}

/// The relation for the batch input model.
Expand Down
60 changes: 55 additions & 5 deletions crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
async fn insert_batch(&self, batch_commit: BatchCommitData) -> Result<(), DatabaseError> {
tracing::trace!(target: "scroll::db", batch_hash = ?batch_commit.hash, batch_index = batch_commit.index, "Inserting batch input into database.");
let batch_commit: models::batch_commit::ActiveModel = batch_commit.into();
batch_commit.insert(self.get_connection()).await?;
Ok(())
match models::batch_commit::Entity::insert(batch_commit)
.on_conflict(
OnConflict::column(models::batch_commit::Column::Index).do_nothing().to_owned(),
)
.exec(self.get_connection())
.await
{
Err(sea_orm::DbErr::RecordNotInserted) => Ok(()),
x => Ok(x.map(|_| ())?),
}
}

/// Finalize a [`BatchCommitData`] with the provided `batch_hash` in the database and set the
Expand Down Expand Up @@ -114,7 +122,12 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
async fn insert_l1_message(&self, l1_message: L1MessageEnvelope) -> Result<(), DatabaseError> {
tracing::trace!(target: "scroll::db", queue_index = l1_message.transaction.queue_index, "Inserting L1 message into database.");
let l1_message: models::l1_message::ActiveModel = l1_message.into();
l1_message.insert(self.get_connection()).await?;
models::l1_message::Entity::insert(l1_message)
.on_conflict(
OnConflict::column(models::l1_message::Column::QueueIndex).do_nothing().to_owned(),
)
.exec(self.get_connection())
.await?;
Ok(())
}

Expand Down Expand Up @@ -217,14 +230,24 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
}

/// Get the latest safe L2 [`BlockInfo`] from the database.
async fn get_latest_safe_l2_block(&self) -> Result<Option<BlockInfo>, DatabaseError> {
async fn get_latest_safe_l2_block(
&self,
) -> Result<Option<(BlockInfo, BatchInfo)>, DatabaseError> {
tracing::trace!(target: "scroll::db", "Fetching latest safe L2 block from database.");
Ok(models::l2_block::Entity::find()
.filter(models::l2_block::Column::BatchIndex.is_not_null())
.order_by_desc(models::l2_block::Column::BlockNumber)
.one(self.get_connection())
.await
.map(|x| x.map(|x| x.block_info()))?)
.map(|x| {
x.map(|x| {
(
x.block_info(),
x.batch_info()
.expect("Batch info must be present due to database query arguments"),
)
})
})?)
}

/// Get the latest L2 [`BlockInfo`] from the database.
Expand All @@ -237,6 +260,33 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
.map(|x| x.map(|x| x.block_info()))?)
}

/// Get the safe block for startup from the database.
///
/// This method fetches the batch info for the latest safe L2 block, it then retrieves the
/// latest block for the previous batch (i.e., the batch before the latest safe block) and
/// returns the block info.
async fn get_startup_safe_block(&self) -> Result<Option<BlockInfo>, DatabaseError> {
tracing::trace!(target: "scroll::db", "Fetching startup safe block from database.");
let safe = if let Some(batch_info) = self
.get_latest_safe_l2_block()
.await?
.map(|(_, batch_info)| batch_info)
.filter(|b| b.index > 1)
{
let batch = self
.get_batch_by_index(batch_info.index - 1)
.await?
.expect("Batch info must be present due to database query arguments");
let block = self.get_highest_block_for_batch(batch.hash).await?;
tracing::info!(target:"test", "{:?}", block);
block
} else {
None
};

Ok(safe)
}

/// Delete all L2 blocks with a block number greater than the provided block number.
async fn delete_l2_blocks_gt(&self, block_number: u64) -> Result<u64, DatabaseError> {
tracing::trace!(target: "scroll::db", block_number, "Deleting L2 blocks greater than provided block number.");
Expand Down
10 changes: 10 additions & 0 deletions crates/derivation-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ scroll-codec.workspace = true
scroll-db.workspace = true

# misc
async-trait = { workspace = true, optional = true }
futures.workspace = true
thiserror.workspace = true
tracing.workspace = true
Expand All @@ -49,3 +50,12 @@ std = [
"scroll-alloy-rpc-types-engine/std",
"futures/std",
]
test-utils = [
"async-trait",
"rollup-node-providers/test-utils",
"scroll-codec/test-utils",
"scroll-db/test-utils",
"rollup-node-providers/test-utils",
"scroll-codec/test-utils",
"scroll-db/test-utils",
]
2 changes: 2 additions & 0 deletions crates/derivation-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ mod tests {
block_timestamp: 1696935971,
calldata: Arc::new(raw_calldata),
blob_versioned_hash: None,
finalized_block_number: None,
};
db.insert_batch(batch_data).await?;
// load messages in db.
Expand Down Expand Up @@ -440,6 +441,7 @@ mod tests {
block_timestamp: 1696935971,
calldata: Arc::new(raw_calldata),
blob_versioned_hash: None,
finalized_block_number: None,
};

let l1_messages = vec![
Expand Down
22 changes: 8 additions & 14 deletions crates/engine/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,20 +175,20 @@ where
tracing::info!(target: "scroll::engine", ?result, "handling L1 consolidation result");

match result {
Ok((block_info, reorg, batch_info)) => {
Ok(consolidation_outcome) => {
let block_info = consolidation_outcome.block_info();

// Update the safe block info and return the block info
tracing::trace!(target: "scroll::engine", ?block_info, "updating safe block info from block derived from L1");
self.fcs.update_safe_block_info(block_info.block_info);

// If we reorged, update the head block info
if reorg {
if consolidation_outcome.is_reorg() {
tracing::warn!(target: "scroll::engine", ?block_info, "reorging head to l1 derived block");
self.fcs.update_head_block_info(block_info.block_info);
}

return Some(EngineDriverEvent::L1BlockConsolidated((
block_info, batch_info,
)))
return Some(EngineDriverEvent::L1BlockConsolidated(consolidation_outcome))
}
Err(err) => {
tracing::error!(target: "scroll::engine", ?err, "failed to consolidate block derived from L1")
Expand Down Expand Up @@ -314,18 +314,12 @@ where
}

if let Some(payload_attributes) = this.l1_payload_attributes.pop_front() {
let safe_block_info = *this.fcs.safe_block_info();
let fcs = this.alloy_forkchoice_state();
let fcs = this.fcs.clone();
let client = this.client.clone();

if let Some(provider) = this.provider.clone() {
this.engine_future = Some(EngineFuture::l1_consolidation(
client,
provider,
safe_block_info,
fcs,
payload_attributes,
));
this.engine_future =
Some(EngineFuture::l1_consolidation(client, provider, fcs, payload_attributes));
this.waker.wake();
} else {
tracing::error!(target: "scroll::engine", "l1 consolidation requires an execution payload provider");
Expand Down
4 changes: 2 additions & 2 deletions crates/engine/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::ConsolidationOutcome;
use reth_scroll_primitives::ScrollBlock;
use rollup_node_primitives::{BatchInfo, L2BlockInfoWithL1Messages};
use scroll_network::BlockImportOutcome;

/// An enum representing the events that can be emitted by the engine driver.
Expand All @@ -10,5 +10,5 @@ pub enum EngineDriverEvent {
/// The result of attempting a block import.
BlockImportOutcome(BlockImportOutcome),
/// A block derived from L1 has been consolidated.
L1BlockConsolidated((L2BlockInfoWithL1Messages, BatchInfo)),
L1BlockConsolidated(ConsolidationOutcome),
}
Loading