Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ reth-payload-primitives = { git = "https://github.com/scroll-tech/reth.git", def
reth-primitives = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-primitives-traits = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-provider = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-rpc-builder = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-rpc-server-types = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-tasks = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
reth-tokio-util = { git = "https://github.com/scroll-tech/reth.git", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion crates/database/db/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ mod test {

// Should return the highest safe block (block 201)
let latest_safe = db.get_latest_safe_l2_block().await.unwrap();
assert_eq!(latest_safe, Some(safe_block_2));
assert_eq!(latest_safe, Some((safe_block_2, batch_info)));
}

#[tokio::test]
Expand Down
1 change: 1 addition & 0 deletions crates/database/db/src/models/batch_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl From<Model> for BatchCommitData {
blob_versioned_hash: value
.blob_hash
.map(|b| b.as_slice().try_into().expect("data persisted in database is valid")),
finalized_block_number: value.finalized_block_number.map(|b| b as u64),
}
}
}
8 changes: 8 additions & 0 deletions crates/database/db/src/models/l2_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ impl Model {
pub(crate) fn block_info(&self) -> BlockInfo {
BlockInfo { number: self.block_number as u64, hash: B256::from_slice(&self.block_hash) }
}

pub(crate) fn batch_info(&self) -> Option<BatchInfo> {
self.batch_hash.as_ref().map(|hash| BatchInfo {
index: self.batch_index.expect("batch index must be present if batch hash is present")
as u64,
hash: B256::from_slice(hash),
})
}
}

/// The relation for the batch input model.
Expand Down
36 changes: 36 additions & 0 deletions crates/database/db/src/models/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use rollup_node_primitives::Metadata;
use sea_orm::{entity::prelude::*, ActiveValue};

/// A database model that represents the metadata for the rollup node.
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "metadata")]
pub struct Model {
/// The metadata key.
#[sea_orm(primary_key)]
pub key: String,
/// The metadata value.
pub value: String,
}

/// The relation for the metadata model.
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

/// The active model behavior for the metadata model.
impl ActiveModelBehavior for ActiveModel {}

impl From<Metadata> for ActiveModel {
fn from(metadata: Metadata) -> Self {
Self {
key: ActiveValue::Set("l1_finalized_block".to_owned()),
value: ActiveValue::Set(metadata.l1_finalized_block.to_string()),
}
}
}

impl From<Model> for Metadata {
fn from(value: Model) -> Self {
debug_assert!(value.key == "l1_finalized_block");
Self { l1_finalized_block: value.value.parse().expect("invalid value") }
}
}
3 changes: 3 additions & 0 deletions crates/database/db/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ pub mod block_data;

/// This module contains the L1 message database model.
pub mod l1_message;

/// This module contains the metadata model.
pub mod metadata;
108 changes: 102 additions & 6 deletions crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::DatabaseConnectionProvider;
use alloy_primitives::B256;
use futures::{Stream, StreamExt};
use rollup_node_primitives::{
BatchCommitData, BatchInfo, BlockInfo, L1MessageEnvelope, L2BlockInfoWithL1Messages,
BatchCommitData, BatchInfo, BlockInfo, L1MessageEnvelope, L2BlockInfoWithL1Messages, Metadata,
};
use scroll_alloy_rpc_types_engine::BlockDataHint;
use sea_orm::{
Expand All @@ -19,8 +19,16 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
async fn insert_batch(&self, batch_commit: BatchCommitData) -> Result<(), DatabaseError> {
tracing::trace!(target: "scroll::db", batch_hash = ?batch_commit.hash, batch_index = batch_commit.index, "Inserting batch input into database.");
let batch_commit: models::batch_commit::ActiveModel = batch_commit.into();
batch_commit.insert(self.get_connection()).await?;
Ok(())
match models::batch_commit::Entity::insert(batch_commit)
.on_conflict(
OnConflict::column(models::batch_commit::Column::Index).do_nothing().to_owned(),
)
.exec(self.get_connection())
.await
{
Err(sea_orm::DbErr::RecordNotInserted) => Ok(()),
x => Ok(x.map(|_| ())?),
}
}

/// Finalize a [`BatchCommitData`] with the provided `batch_hash` in the database and set the
Expand Down Expand Up @@ -68,6 +76,37 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
.map(|x| x.map(Into::into))?)
}

/// Set the latest finalized L1 block number.
async fn set_latest_finalized_l1_block_number(
&self,
block_number: u64,
) -> Result<(), DatabaseError> {
tracing::trace!(target: "scroll::db", block_number, "Updating the latest finalized L1 block number in the database.");
let metadata: models::metadata::ActiveModel =
Metadata { l1_finalized_block: block_number }.into();
Ok(models::metadata::Entity::insert(metadata)
.on_conflict(
OnConflict::column(models::metadata::Column::Key)
.update_column(models::metadata::Column::Value)
.to_owned(),
)
.exec(self.get_connection())
.await
.map(|_| ())?)
}

/// Get the finalized L1 block number from the database.
async fn get_finalized_l1_block_number(&self) -> Result<Option<u64>, DatabaseError> {
Ok(models::metadata::Entity::find()
.filter(models::metadata::Column::Key.eq("l1_finalized_block"))
.select_only()
.column(models::metadata::Column::Value)
.into_tuple::<String>()
.one(self.get_connection())
.await
.map(|x| x.and_then(|x| x.parse::<u64>().ok()))?)
}

/// Get the newest finalized batch hash up to or at the provided height.
async fn get_finalized_batch_hash_at_height(
&self,
Expand Down Expand Up @@ -113,7 +152,23 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
async fn insert_l1_message(&self, l1_message: L1MessageEnvelope) -> Result<(), DatabaseError> {
tracing::trace!(target: "scroll::db", queue_index = l1_message.transaction.queue_index, "Inserting L1 message into database.");
let l1_message: models::l1_message::ActiveModel = l1_message.into();
l1_message.insert(self.get_connection()).await?;
models::l1_message::Entity::insert(l1_message)
.on_conflict(
OnConflict::column(models::l1_message::Column::QueueIndex)
.update_columns(vec![
models::l1_message::Column::QueueHash,
models::l1_message::Column::Hash,
models::l1_message::Column::L1BlockNumber,
models::l1_message::Column::GasLimit,
models::l1_message::Column::To,
models::l1_message::Column::Value,
models::l1_message::Column::Sender,
models::l1_message::Column::Input,
])
.to_owned(),
)
.exec(self.get_connection())
.await?;
Ok(())
}

Expand Down Expand Up @@ -209,14 +264,24 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
}

/// Get the latest safe L2 [`BlockInfo`] from the database.
async fn get_latest_safe_l2_block(&self) -> Result<Option<BlockInfo>, DatabaseError> {
async fn get_latest_safe_l2_block(
&self,
) -> Result<Option<(BlockInfo, BatchInfo)>, DatabaseError> {
tracing::trace!(target: "scroll::db", "Fetching latest safe L2 block from database.");
Ok(models::l2_block::Entity::find()
.filter(models::l2_block::Column::BatchIndex.is_not_null())
.order_by_desc(models::l2_block::Column::BlockNumber)
.one(self.get_connection())
.await
.map(|x| x.map(|x| x.block_info()))?)
.map(|x| {
x.map(|x| {
(
x.block_info(),
x.batch_info()
.expect("Batch info must be present due to database query arguments"),
)
})
})?)
}

/// Get the latest L2 [`BlockInfo`] from the database.
Expand All @@ -229,6 +294,37 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
.map(|x| x.map(|x| x.block_info()))?)
}

/// Get the safe block for startup from the database.
///
/// This method fetches the batch info for the latest safe L2 block, it then retrieves the
/// latest block for the previous batch (i.e., the batch before the latest safe block) and
/// returns the block info.
async fn get_startup_data(&self) -> Result<(Option<BlockInfo>, Option<u64>), DatabaseError> {
tracing::trace!(target: "scroll::db", "Fetching startup safe block from database.");
let safe = if let Some(batch_info) = self
.get_latest_safe_l2_block()
.await?
.map(|(_, batch_info)| batch_info)
.filter(|b| b.index > 1)
{
let batch = self
.get_batch_by_index(batch_info.index)
.await?
.expect("Batch info must be present due to database query arguments");
let previous_batch = self
.get_batch_by_index(batch_info.index - 1)
.await?
.expect("Batch info must be present due to database query arguments");
let l2_block = self.get_highest_block_for_batch(previous_batch.hash).await?;
tracing::info!(target:"test", "{:?}", l2_block);
(l2_block, Some(batch.block_number))
} else {
(None, None)
};

Ok(safe)
}

/// Delete all L2 blocks with a block number greater than the provided block number.
async fn delete_l2_blocks_gt(&self, block_number: u64) -> Result<u64, DatabaseError> {
tracing::trace!(target: "scroll::db", block_number, "Deleting L2 blocks greater than provided block number.");
Expand Down
22 changes: 15 additions & 7 deletions crates/database/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod m20250304_125946_add_l1_msg_table;
mod m20250408_132123_add_header_metadata;
mod m20250408_150338_load_header_metadata;
mod m20250411_072004_add_l2_block;
mod m20250616_223947_add_metadata;
mod migration_info;
pub use migration_info::{MigrationInfo, ScrollMainnetMigrationInfo, ScrollSepoliaMigrationInfo};

Expand All @@ -19,12 +20,16 @@ impl<MI: MigrationInfo + Send + Sync + 'static> MigratorTrait for Migrator<MI> {
Box::new(m20250408_132123_add_header_metadata::Migration),
Box::new(m20250408_150338_load_header_metadata::Migration::<MI>(Default::default())),
Box::new(m20250411_072004_add_l2_block::Migration),
Box::new(m20250616_223947_add_metadata::Migration),
]
}
}

pub mod traits {
use crate::{ScrollMainnetMigrationInfo, ScrollSepoliaMigrationInfo};
use crate::{
migration_info::ScrollMainnetTestMigrationInfo, ScrollMainnetMigrationInfo,
ScrollSepoliaMigrationInfo,
};
use reth_chainspec::NamedChain;
use sea_orm::{prelude::async_trait::async_trait, DatabaseConnection, DbErr};
use sea_orm_migration::MigratorTrait;
Expand All @@ -33,20 +38,23 @@ pub mod traits {
#[async_trait]
pub trait ScrollMigrator {
/// Migrates the tables.
async fn migrate(&self, conn: &DatabaseConnection) -> Result<(), DbErr>;
async fn migrate(&self, conn: &DatabaseConnection, test: bool) -> Result<(), DbErr>;
}

#[async_trait]
impl ScrollMigrator for NamedChain {
async fn migrate(&self, conn: &DatabaseConnection) -> Result<(), DbErr> {
match self {
NamedChain::Scroll => {
async fn migrate(&self, conn: &DatabaseConnection, test: bool) -> Result<(), DbErr> {
match (self, test) {
(NamedChain::Scroll, false) => {
Ok(super::Migrator::<ScrollMainnetMigrationInfo>::up(conn, None))
}
NamedChain::ScrollSepolia => {
(NamedChain::Scroll, true) => {
Ok(super::Migrator::<ScrollMainnetTestMigrationInfo>::up(conn, None))
}
(NamedChain::ScrollSepolia, _) => {
Ok(super::Migrator::<ScrollSepoliaMigrationInfo>::up(conn, None))
}
NamedChain::Dev => Ok(super::Migrator::<()>::up(conn, None)),
(NamedChain::Dev, _) => Ok(super::Migrator::<()>::up(conn, None)),
_ => Err(DbErr::Custom("expected Scroll Mainnet, Sepolia or Dev".into())),
}?
.await
Expand Down
Loading