Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 137 additions & 146 deletions crates/l2/sequencer/l1_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl L1Committer {
Self::get_checkpoint_from_path(
genesis.clone(),
blockchain.options.clone(),
&checkpoints_dir.join(format!("checkpoint_batch_{}", last_committed_batch)),
&checkpoints_dir.join(batch_checkpoint_name(last_committed_batch)),
&rollup_store,
)
.await?;
Expand Down Expand Up @@ -261,114 +261,14 @@ impl L1Committer {
let batch = match self.rollup_store.get_batch(batch_to_commit).await? {
Some(batch) => batch,
None => {
let last_committed_blocks = self
.rollup_store
.get_block_numbers_by_batch(last_committed_batch_number)
.await?
.ok_or(
CommitterError::RetrievalError(format!("Failed to get batch with batch number {last_committed_batch_number}. Batch is missing when it should be present. This is a bug"))
)?;
let last_block = last_committed_blocks
.last()
.ok_or(
CommitterError::RetrievalError(format!("Last committed batch ({last_committed_batch_number}) doesn't have any blocks. This is probably a bug."))
)?;
let first_block_to_commit = last_block + 1;

// We need to guarantee that the checkpoint path is new
// to avoid causing a lock error under rocksdb feature.
let rand_suffix: u32 = rand::thread_rng().r#gen();
let one_time_checkpoint_path = self
.checkpoints_dir
.join(format!("temp_checkpoint_{batch_to_commit}_{rand_suffix}"));

// For re-execution we need to use a checkpoint to the previous state
// (i.e. checkpoint of the state to the latest block from the previous
// batch, or the state of the genesis if this is the first batch).
// We already have this initial checkpoint as part of the L1Committer
// struct, but we need to create a one-time copy of it because
// we still need to use the current checkpoint store later for witness
// generation.

let (one_time_checkpoint_store, one_time_checkpoint_blockchain) = self
.create_checkpoint(
&self.current_checkpoint_store,
&one_time_checkpoint_path,
&self.rollup_store,
)
.await?;

// Try to prepare batch
let result = self
.prepare_batch_from_block(
*last_block,
batch_to_commit,
one_time_checkpoint_store,
one_time_checkpoint_blockchain,
)
.await;

if one_time_checkpoint_path.exists() {
let _ = remove_dir_all(&one_time_checkpoint_path).inspect_err(|e| {
error!(
"Failed to remove one-time checkpoint directory at path {one_time_checkpoint_path:?}. Should be removed manually. Error: {}", e.to_string()
)
});
}

let (
blobs_bundle,
new_state_root,
message_hashes,
privileged_transactions_hash,
last_block_of_batch,
) = result?;

if *last_block == last_block_of_batch {
debug!("No new blocks to commit, skipping");
let Some(batch) = self.produce_batch(batch_to_commit).await? else {
// The batch is empty (there's no new blocks from last batch)
return Ok(());
}

let batch = Batch {
number: batch_to_commit,
first_block: first_block_to_commit,
last_block: last_block_of_batch,
state_root: new_state_root,
privileged_transactions_hash,
message_hashes,
blobs_bundle,
commit_tx: None,
verify_tx: None,
};

self.rollup_store.seal_batch(batch.clone()).await?;

debug!(
first_block = batch.first_block,
last_block = batch.last_block,
"Batch {} stored in database",
batch.number
);

batch
}
};

info!(
first_block = batch.first_block,
last_block = batch.last_block,
"Generating and storing witness for batch {}",
batch.number,
);

self.generate_and_store_batch_prover_input(&batch).await?;

// We need to update the current checkpoint after generating the witness
// with it, and before sending the commitment.
// The actual checkpoint store directory is not pruned until the batch
// it served in is verified on L1.
self.update_current_checkpoint(&batch).await?;

info!(
first_block = batch.first_block,
last_block = batch.last_block,
Expand Down Expand Up @@ -409,12 +309,116 @@ impl L1Committer {
}
}

async fn produce_batch(&mut self, batch_number: u64) -> Result<Option<Batch>, CommitterError> {
let last_committed_blocks = self
.rollup_store
.get_block_numbers_by_batch(batch_number-1)
Copy link

Copilot AI Nov 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing spaces around the subtraction operator. Should be batch_number - 1 for consistency with Rust formatting conventions.

Suggested change
.get_block_numbers_by_batch(batch_number-1)
.get_block_numbers_by_batch(batch_number - 1)

Copilot uses AI. Check for mistakes.
.await?
.ok_or(
CommitterError::RetrievalError(format!("Failed to get batch with batch number {}. Batch is missing when it should be present. This is a bug", batch_number))
Copy link

Copilot AI Nov 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message refers to getting a batch but is actually fetching block numbers by batch. The message should say 'Failed to get blocks for batch number {}' to accurately reflect what failed.

Suggested change
CommitterError::RetrievalError(format!("Failed to get batch with batch number {}. Batch is missing when it should be present. This is a bug", batch_number))
CommitterError::RetrievalError(format!("Failed to get blocks for batch number {}. Block numbers are missing when they should be present. This is a bug", batch_number))

Copilot uses AI. Check for mistakes.
)?;
let last_block = last_committed_blocks
.last()
.ok_or(CommitterError::RetrievalError(format!(
"Last committed batch ({}) doesn't have any blocks. This is probably a bug.",
batch_number
)))?;
let first_block_to_commit = last_block + 1;

// We need to guarantee that the checkpoint path is new
// to avoid causing a lock error under rocksdb feature.
let new_checkpoint_path = self
.checkpoints_dir
.join(batch_checkpoint_name(batch_number));

// For re-execution we need to use a checkpoint to the previous state
// (i.e. checkpoint of the state to the latest block from the previous
// batch, or the state of the genesis if this is the first batch).
// We already have this initial checkpoint as part of the L1Committer
// struct, but we need to create a one-time copy of it because
// we still need to use the current checkpoint store later for witness
// generation.

let (new_checkpoint_store, new_checkpoint_blockchain) = self
.create_checkpoint(
&self.current_checkpoint_store,
&new_checkpoint_path,
&self.rollup_store,
)
.await?;

// Try to prepare batch
let result = self
.prepare_batch_from_block(
*last_block,
batch_number,
new_checkpoint_store.clone(),
new_checkpoint_blockchain.clone(),
)
.await;

let (
blobs_bundle,
new_state_root,
message_hashes,
privileged_transactions_hash,
last_block_of_batch,
) = result?;

if *last_block == last_block_of_batch {
debug!("No new blocks to commit, skipping");
return Ok(None);
}

let batch = Batch {
number: batch_number,
first_block: first_block_to_commit,
last_block: last_block_of_batch,
state_root: new_state_root,
privileged_transactions_hash,
message_hashes,
blobs_bundle,
commit_tx: None,
verify_tx: None,
};

self.rollup_store.seal_batch(batch.clone()).await?;

debug!(
first_block = batch.first_block,
last_block = batch.last_block,
"Batch {} stored in database",
batch.number
);

info!(
first_block = batch.first_block,
last_block = batch.last_block,
"Generating and storing witness for batch {}",
batch.number,
);

self.generate_and_store_batch_prover_input(&batch).await?;

// We need to update the current checkpoint after generating the witness
// with it, and before sending the commitment.
// The actual checkpoint store directory is not pruned until the batch
// it served in is verified on L1.
// The reference to the previous checkpoint is lost after this operation,
// but the directory is not deleted until the batch it serves in is verified
// on L1.
self.current_checkpoint_store = new_checkpoint_store;
self.current_checkpoint_blockchain = new_checkpoint_blockchain;

Ok(Some(batch))
}

async fn prepare_batch_from_block(
&mut self,
mut last_added_block_number: BlockNumber,
batch_number: u64,
one_time_checkpoint_store: Store,
one_time_checkpoint_blockchain: Arc<Blockchain>,
checkpoint_store: Store,
checkpoint_blockchain: Arc<Blockchain>,
) -> Result<(BlobsBundle, H256, Vec<H256>, H256, BlockNumber), CommitterError> {
let first_block_of_batch = last_added_block_number + 1;
let mut blobs_bundle = BlobsBundle::default();
Expand All @@ -426,6 +430,7 @@ impl L1Committer {
let mut privileged_transactions_hashes = vec![];
let mut new_state_root = H256::default();
let mut acc_gas_used = 0_u64;
let mut blocks = vec![];

#[cfg(feature = "metrics")]
let mut tx_count = 0_u64;
Expand Down Expand Up @@ -521,7 +526,7 @@ impl L1Committer {
// Here we use the checkpoint store because we need the previous
// state available (i.e. not pruned) for re-execution.
let vm_db = StoreVmDatabase::new(
one_time_checkpoint_store.clone(),
checkpoint_store.clone(),
potential_batch_block.header.parent_hash,
);

Expand All @@ -545,7 +550,7 @@ impl L1Committer {
// account updates of each block as we go, to be able to continue
// re-executing the next blocks in the batch.
{
let account_updates_list = one_time_checkpoint_store
let account_updates_list = checkpoint_store
.apply_account_updates_batch(
potential_batch_block.header.parent_hash,
&account_updates,
Expand All @@ -554,7 +559,7 @@ impl L1Committer {
"no account updated".to_owned(),
))?;

one_time_checkpoint_blockchain.store_block(
checkpoint_blockchain.store_block(
potential_batch_block.clone(),
account_updates_list,
BlockExecutionResult {
Expand Down Expand Up @@ -588,8 +593,7 @@ impl L1Committer {

// Again, here the VM database should be instantiated from the checkpoint
// store to have access to the previous state
let parent_db =
StoreVmDatabase::new(one_time_checkpoint_store.clone(), parent_block_hash);
let parent_db = StoreVmDatabase::new(checkpoint_store.clone(), parent_block_hash);

let acc_privileged_txs_len: u64 = acc_privileged_txs.len().try_into()?;
if acc_privileged_txs_len > PRIVILEGED_TX_BUDGET {
Expand Down Expand Up @@ -645,7 +649,7 @@ impl L1Committer {

message_hashes.extend(messages.iter().map(get_l1_message_hash));

new_state_root = one_time_checkpoint_store
new_state_root = checkpoint_store
.state_trie(potential_batch_block.hash())?
.ok_or(CommitterError::FailedToGetInformationFromStorage(
"Failed to get state root from storage".to_owned(),
Expand All @@ -654,6 +658,7 @@ impl L1Committer {

last_added_block_number += 1;
acc_gas_used += current_block_gas_used;
blocks.push((last_added_block_number, potential_batch_block.hash()));
} // end loop

metrics!(if let (Ok(privileged_transaction_count), Ok(messages_count)) = (
Expand Down Expand Up @@ -687,6 +692,23 @@ impl L1Committer {
let privileged_transactions_hash =
compute_privileged_transactions_hash(privileged_transactions_hashes)?;

let last_block_hash = blocks
.last()
.ok_or(CommitterError::Unreachable(
"There should always be blocks".to_string(),
))?
.1;

checkpoint_store
.forkchoice_update(
Some(blocks),
last_added_block_number,
last_block_hash,
None,
None,
)
.await?;

Ok((
blobs_bundle,
new_state_root,
Expand Down Expand Up @@ -794,41 +816,6 @@ impl L1Committer {
Ok(())
}

/// Updates the current checkpoint store and blockchain to the state at the
/// given latest batch.
///
/// The reference to the previous checkpoint is lost after this operation,
/// but the directory is not deleted until the batch it serves in is verified
/// on L1.
async fn update_current_checkpoint(
&mut self,
latest_batch: &Batch,
) -> Result<(), CommitterError> {
let new_checkpoint_path = self
.checkpoints_dir
.join(format!("checkpoint_batch_{}", latest_batch.number));

// CAUTION
// We need to skip checkpoint creation if the directory already exists.
// Sometimes the commit_next_batch task is retried after a failure, and in
// that case we would try to create a checkpoint again at the same path,
// causing a lock error under rocksdb feature.
if new_checkpoint_path.exists() {
// TODO: we should validate that the existing checkpoint is correct. otherwise we may want to update our `current_checkpoint_store` to point to the correct one.
return Ok(());
}

let (new_checkpoint_store, new_checkpoint_blockchain) = self
.create_checkpoint(&self.store, &new_checkpoint_path, &self.rollup_store)
.await?;

self.current_checkpoint_store = new_checkpoint_store;

self.current_checkpoint_blockchain = new_checkpoint_blockchain;

Ok(())
}

/// Creates a checkpoint of the given store at the specified path.
///
/// This function performs the following steps:
Expand Down Expand Up @@ -1326,3 +1313,7 @@ pub async fn regenerate_head_state(

Ok(())
}

fn batch_checkpoint_name(batch_number: u64) -> String {
format!("checkpoint_batch_{batch_number}")
}
Loading