diff --git a/cmd/ethrex/Cargo.toml b/cmd/ethrex/Cargo.toml index e91f6dc5304..b31aac85e6e 100644 --- a/cmd/ethrex/Cargo.toml +++ b/cmd/ethrex/Cargo.toml @@ -83,7 +83,7 @@ c-kzg = [ "ethrex-crypto/c-kzg", ] metrics = ["ethrex-blockchain/metrics", "ethrex-l2?/metrics"] -rocksdb = ["ethrex-storage/rocksdb", "ethrex-p2p/rocksdb"] +rocksdb = ["ethrex-storage/rocksdb", "ethrex-p2p/rocksdb", "ethrex-l2?/rocksdb"] jemalloc = ["dep:tikv-jemallocator"] jemalloc_profiling = [ "jemalloc", diff --git a/cmd/ethrex/l2/initializers.rs b/cmd/ethrex/l2/initializers.rs index 2682c249a97..138b97f9d6b 100644 --- a/cmd/ethrex/l2/initializers.rs +++ b/cmd/ethrex/l2/initializers.rs @@ -7,7 +7,9 @@ use crate::l2::{L2Options, SequencerOptions}; use crate::utils::{ NodeConfigFile, get_client_version, init_datadir, read_jwtsecret_file, store_node_config_file, }; -use ethrex_blockchain::{Blockchain, BlockchainType, L2Config}; +use ethrex_blockchain::{Blockchain, BlockchainOptions, BlockchainType, L2Config}; +use ethrex_common::fd_limit::raise_fd_limit; +use ethrex_common::types::Genesis; use ethrex_common::types::fee_config::{FeeConfig, L1FeeConfig, OperatorFeeConfig}; use ethrex_common::{Address, types::DEFAULT_BUILDER_GAS_CEIL}; use ethrex_l2::SequencerConfig; @@ -18,6 +20,8 @@ use ethrex_p2p::{ sync_manager::SyncManager, types::{Node, NodeRecord}, }; +#[cfg(feature = "rocksdb")] +use ethrex_storage::EngineType; use ethrex_storage::Store; use ethrex_storage_rollup::{EngineTypeRollup, StoreRollup}; use secp256k1::SecretKey; @@ -146,14 +150,19 @@ pub async fn init_l2( opts: L2Options, log_filter_handler: Option>, ) -> eyre::Result<()> { + raise_fd_limit()?; + let datadir = opts.node_opts.datadir.clone(); init_datadir(&opts.node_opts.datadir); let rollup_store_dir = datadir.join("rollup_store"); + // Checkpoints are stored in the main datadir + let checkpoints_dir = datadir.clone(); + let network = get_network(&opts.node_opts); let genesis = network.get_genesis()?; - let store = init_store(&datadir, genesis).await; + let store = init_store(&datadir, genesis.clone()).await; let rollup_store = init_rollup_store(&rollup_store_dir).await; let operator_fee_config = get_operator_fee_config(&opts.sequencer_opts).await?; @@ -180,10 +189,18 @@ pub async fn init_l2( perf_logs_enabled: true, }; - let blockchain = init_blockchain(store.clone(), blockchain_opts); + let blockchain = init_blockchain(store.clone(), blockchain_opts.clone()); regenerate_head_state(&store, &blockchain).await?; + let (initial_checkpoint_store, initial_checkpoint_blockchain) = initialize_checkpoint( + &store, + &checkpoints_dir.join("initial_checkpoint"), + genesis.clone(), + blockchain_opts, + ) + .await?; + let signer = get_signer(&datadir); let local_p2p_node = get_local_p2p_node(&opts.node_opts, &signer); @@ -277,6 +294,10 @@ pub async fn init_l2( "http://{}:{}", opts.node_opts.http_addr, opts.node_opts.http_port ), + initial_checkpoint_store, + initial_checkpoint_blockchain, + genesis, + checkpoints_dir, ) .into_future(); @@ -344,3 +365,92 @@ pub async fn get_operator_fee_config( }; Ok(operator_fee_config) } + +/// Initializes a checkpoint of the given store at the specified path. +/// +/// If there's no previous checkpoint, it creates one from the current store state. +/// +/// This function performs the following steps: +/// 1. Creates a checkpoint of the provided store at the specified path. +/// 2. Initializes a new store and blockchain for the checkpoint. +/// 3. Regenerates the head state in the checkpoint store. +/// 4. Validates that the checkpoint store's head block number and latest block match those of the original store. +async fn initialize_checkpoint( + store: &Store, + path: &Path, + genesis: Genesis, + blockchain_opts: BlockchainOptions, +) -> eyre::Result<(Store, Arc)> { + // If the checkpoint is not present, create it + if !path.exists() { + store.create_checkpoint(path).await?; + } + + // We now load the checkpoint, validate it, and regenerate its state. + + #[cfg(feature = "rocksdb")] + let engine_type = EngineType::RocksDB; + #[cfg(not(feature = "rocksdb"))] + let engine_type = EngineType::InMemory; + + let checkpoint_store = { + let checkpoint_store_inner = Store::new(path, engine_type)?; + + checkpoint_store_inner + .add_initial_state(genesis.clone()) + .await?; + + checkpoint_store_inner + }; + + let checkpoint_blockchain = + Arc::new(Blockchain::new(checkpoint_store.clone(), blockchain_opts)); + + let checkpoint_head_block_number = checkpoint_store.get_latest_block_number().await?; + + let db_head_block_number = store.get_latest_block_number().await?; + + if checkpoint_head_block_number != db_head_block_number { + return Err(eyre::eyre!( + "checkpoint store head block number does not match main store head block number before regeneration" + )); + } + + regenerate_head_state(&checkpoint_store, &checkpoint_blockchain).await?; + + let checkpoint_latest_block_number = checkpoint_store.get_latest_block_number().await?; + + let db_latest_block_number = store.get_latest_block_number().await?; + + let checkpoint_latest_block_header = checkpoint_store + .get_block_header(checkpoint_latest_block_number)? + .ok_or(eyre::eyre!( + "latest block header ({checkpoint_latest_block_number}) not found in checkpoint store" + ))?; + + let db_latest_block_header = store + .get_block_header(db_latest_block_number)? + .ok_or(eyre::eyre!("latest block not found in main store"))?; + + // Final sanity check + + if !checkpoint_store.has_state_root(checkpoint_latest_block_header.state_root)? { + return Err(eyre::eyre!( + "checkpoint store state is not regenerated properly" + )); + } + + if checkpoint_latest_block_number != db_head_block_number { + return Err(eyre::eyre!( + "checkpoint store latest block number does not match main store head block number after regeneration" + )); + } + + if checkpoint_latest_block_header.state_root != db_latest_block_header.state_root { + return Err(eyre::eyre!( + "checkpoint store latest block hash does not match main store latest block hash after regeneration" + )); + } + + Ok((checkpoint_store, checkpoint_blockchain)) +} diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 383a9b2f891..e5fdcc11310 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -201,13 +201,25 @@ impl Blockchain { blocks: &[Block], fee_configs: Option<&[FeeConfig]>, ) -> Result { - let first_block_header = blocks + let first_block_header = &blocks .first() .ok_or(ChainError::WitnessGeneration( "Empty block batch".to_string(), ))? - .header - .clone(); + .header; + + // Later on, we need to access block hashes by number. To avoid needing + // to apply fork choice for each block, we cache them here. + let mut block_hashes_map: BTreeMap = blocks + .iter() + .cloned() + .map(|block| (block.header.number, block.hash())) + .collect(); + + block_hashes_map.insert( + first_block_header.number.saturating_sub(1), + first_block_header.parent_hash, + ); // Get state at previous block let trie = self @@ -216,7 +228,17 @@ impl Blockchain { .map_err(|_| ChainError::ParentStateNotFound)? .ok_or(ChainError::ParentStateNotFound)?; - let (state_trie_witness, mut trie) = TrieLogger::open_trie(trie); + let (mut current_trie_witness, mut trie) = TrieLogger::open_trie(trie); + + // For each block, a new TrieLogger will be opened, each containing the + // witness accessed during the block execution. We need to accumulate + // all the nodes accessed during the entire batch execution. + let mut accumulated_state_trie_witness = current_trie_witness + .lock() + .map_err(|_| { + ChainError::WitnessGeneration("Failed to lock state trie witness".to_string()) + })? + .clone(); let mut touched_account_storage_slots = BTreeMap::new(); // This will become the state trie + storage trie @@ -232,10 +254,18 @@ impl Blockchain { for (i, block) in blocks.iter().enumerate() { let parent_hash = block.header.parent_hash; + + // This assumes that the user has the necessary state stored already, + // so if the user only has the state previous to the first block, it + // will fail in the second iteration of this for loop. To ensure this, + // doesn't fail, later in this function we store the new state after + // re-execution. let vm_db: DynVmDatabase = Box::new(StoreVmDatabase::new(self.storage.clone(), parent_hash)); + let logger = Arc::new(DatabaseLogger::new(Arc::new(Mutex::new(Box::new(vm_db))))); - let mut vm = match &self.options.r#type { + + let mut vm = match self.options.r#type { BlockchainType::L1 => Evm::new_from_db_for_l1(logger.clone()), BlockchainType::L2(_) => { let l2_config = match fee_configs { @@ -253,7 +283,8 @@ impl Blockchain { }; // Re-execute block with logger - vm.execute_block(block)?; + let execution_result = vm.execute_block(block)?; + // Gather account updates let account_updates = vm.get_state_transitions()?; @@ -276,7 +307,9 @@ impl Blockchain { ChainError::WitnessGeneration("Failed to get block hashes".to_string()) })? .clone(); + block_hashes.extend(logger_block_hashes); + // Access all the accounts needed for withdrawals if let Some(withdrawals) = block.body.withdrawals.as_ref() { for withdrawal in withdrawals { @@ -320,6 +353,7 @@ impl Blockchain { used_storage_tries.insert(*account, (storage_trie_witness, storage_trie)); } } + // Store all the accessed evm bytecodes for code_hash in logger .code_accessed @@ -342,7 +376,7 @@ impl Blockchain { } // Apply account updates to the trie recording all the necessary nodes to do so - let (updated_trie, storage_tries_after_update) = self + let (storage_tries_after_update, account_updates_list) = self .storage .apply_account_updates_from_trie_with_witness( trie, @@ -350,6 +384,13 @@ impl Blockchain { used_storage_tries, ) .await?; + + // We cannot ensure that the users of this function have the necessary + // state stored, so in order for it to not assume anything, we update + // the storage with the new state after re-execution + self.store_block(block.clone(), account_updates_list, execution_result) + .await?; + for (address, (witness, _storage_trie)) in storage_tries_after_update { let mut witness = witness.lock().map_err(|_| { ChainError::WitnessGeneration("Failed to lock storage trie witness".to_string()) @@ -359,15 +400,40 @@ impl Blockchain { used_trie_nodes.extend_from_slice(&witness); touched_account_storage_slots.entry(address).or_default(); } + + let (new_state_trie_witness, updated_trie) = TrieLogger::open_trie( + self.storage + .state_trie( + block_hashes_map + .get(&block.header.number) + .ok_or(ChainError::WitnessGeneration( + "Block hash not found for witness generation".to_string(), + ))? + .to_owned(), + ) + .map_err(|_| ChainError::ParentStateNotFound)? + .ok_or(ChainError::ParentStateNotFound)?, + ); + + // Use the updated state trie for the next block trie = updated_trie; + + for state_trie_witness in current_trie_witness + .lock() + .map_err(|_| { + ChainError::WitnessGeneration("Failed to lock state trie witness".to_string()) + })? + .iter() + { + accumulated_state_trie_witness.insert(state_trie_witness.clone()); + } + + current_trie_witness = new_state_trie_witness; } - // Get the witness for the state trie - let mut state_trie_witness = state_trie_witness.lock().map_err(|_| { - ChainError::WitnessGeneration("Failed to lock state trie witness".to_string()) - })?; - let state_trie_witness = std::mem::take(&mut *state_trie_witness); - used_trie_nodes.extend_from_slice(&Vec::from_iter(state_trie_witness.into_iter())); + used_trie_nodes + .extend_from_slice(&Vec::from_iter(accumulated_state_trie_witness.into_iter())); + // If the witness is empty at least try to store the root if used_trie_nodes.is_empty() && let Some(root) = root_node @@ -376,6 +442,7 @@ impl Blockchain { } let mut needed_block_numbers = block_hashes.keys().collect::>(); + needed_block_numbers.sort(); // Last needed block header for the witness is the parent of the last block we need to execute @@ -385,17 +452,26 @@ impl Blockchain { .header .number .saturating_sub(1); + // The first block number we need is either the parent of the first block number or the earliest block number used by BLOCKHASH let mut first_needed_block_number = first_block_header.number.saturating_sub(1); + if let Some(block_number_from_logger) = needed_block_numbers.first() && **block_number_from_logger < first_needed_block_number { first_needed_block_number = **block_number_from_logger; } + let mut block_headers_bytes = Vec::new(); + for block_number in first_needed_block_number..=last_needed_block_number { - let header = self.storage.get_block_header(block_number)?.ok_or( - ChainError::WitnessGeneration("Failed to get block header".to_string()), + let hash = block_hashes_map + .get(&block_number) + .ok_or(ChainError::WitnessGeneration(format!( + "Failed to get block {block_number} hash" + )))?; + let header = self.storage.get_block_header_by_hash(*hash)?.ok_or( + ChainError::WitnessGeneration(format!("Failed to get block {block_number} header")), )?; block_headers_bytes.push(header.encode_to_vec()); } diff --git a/crates/l2/Cargo.toml b/crates/l2/Cargo.toml index da3558612df..41a79612e81 100644 --- a/crates/l2/Cargo.toml +++ b/crates/l2/Cargo.toml @@ -80,5 +80,6 @@ panic = "deny" [features] default = ["l2"] +rocksdb = [] metrics = ["ethrex-blockchain/metrics"] l2 = ["guest_program/l2"] diff --git a/crates/l2/sequencer/errors.rs b/crates/l2/sequencer/errors.rs index 884f953c545..644551607c8 100644 --- a/crates/l2/sequencer/errors.rs +++ b/crates/l2/sequencer/errors.rs @@ -225,7 +225,7 @@ pub enum CommitterError { FailedToRetrieveDataFromStorage, #[error("Committer failed to generate blobs bundle: {0}")] FailedToGenerateBlobsBundle(#[from] BlobsBundleError), - #[error("Committer failed to get information from storage")] + #[error("Committer failed to get information from storage: {0}")] FailedToGetInformationFromStorage(String), #[error("Committer failed to encode state diff: {0}")] FailedToEncodeStateDiff(#[from] StateDiffError), @@ -265,6 +265,12 @@ pub enum CommitterError { Unreachable(String), #[error("Failed to generate batch witness: {0}")] FailedToGenerateBatchWitness(#[source] ChainError), + #[error("Failed to create checkpoint: {0}")] + FailedToCreateCheckpoint(String), + #[error("Failed to process blobs: {0}")] + ChainError(#[from] ChainError), + #[error("Failed due to invalid fork choice: {0}")] + InvalidForkChoice(#[from] InvalidForkChoice), } #[derive(Debug, thiserror::Error)] diff --git a/crates/l2/sequencer/l1_committer.rs b/crates/l2/sequencer/l1_committer.rs index 6fc1c0fef2d..dd3ae3b2648 100644 --- a/crates/l2/sequencer/l1_committer.rs +++ b/crates/l2/sequencer/l1_committer.rs @@ -14,7 +14,7 @@ use ethrex_blockchain::{Blockchain, vm::StoreVmDatabase}; use ethrex_common::{ Address, H256, U256, types::{ - AccountUpdate, BLOB_BASE_FEE_UPDATE_FRACTION, BlobsBundle, Block, BlockNumber, + AccountUpdate, BLOB_BASE_FEE_UPDATE_FRACTION, BlobsBundle, Block, BlockNumber, Genesis, MIN_BASE_FEE_PER_BLOB_GAS, TxType, batch::Batch, blobs_bundle, fake_exponential_checked, }, }; @@ -42,11 +42,15 @@ use ethrex_rpc::{ clients::eth::{EthClient, Overrides}, types::block_identifier::{BlockIdentifier, BlockTag}, }; +use ethrex_storage::EngineType; use ethrex_storage::Store; use ethrex_storage_rollup::StoreRollup; +use ethrex_vm::{BlockExecutionResult, Evm}; use serde::Serialize; use std::{ collections::{BTreeMap, HashMap}, + fs::remove_dir_all, + path::{Path, PathBuf}, sync::Arc, }; use tokio_util::sync::CancellationToken; @@ -110,6 +114,21 @@ pub struct L1Committer { elasticity_multiplier: u64, /// Git commit hash of the build git_commit_hash: String, + /// Store containing the state checkpoint at the last committed batch. + /// + /// It is used to ensure state availability for batch preparation and + /// witness generation. + current_checkpoint_store: Store, + /// Blockchain instance using the current checkpoint store. + /// + /// It is used for witness generation. + current_checkpoint_blockchain: Arc, + /// Network genesis. + /// + /// It is used for creating checkpoints. + genesis: Genesis, + /// Directory where checkpoints are stored. + checkpoints_dir: PathBuf, } #[derive(Clone, Serialize)] @@ -129,7 +148,7 @@ pub struct L1CommitterHealth { } impl L1Committer { - #[allow(clippy::too_many_arguments)] + #[expect(clippy::too_many_arguments)] pub async fn new( committer_config: &CommitterConfig, proposer_config: &BlockProducerConfig, @@ -139,6 +158,10 @@ impl L1Committer { rollup_store: StoreRollup, based: bool, sequencer_state: SequencerState, + initial_checkpoint_store: Store, + initial_checkpoint_blockchain: Arc, + genesis: Genesis, + checkpoints_dir: PathBuf, ) -> Result { let eth_client = EthClient::new_with_config( eth_config.rpc_url.iter().map(AsRef::as_ref).collect(), @@ -173,15 +196,24 @@ impl L1Committer { cancellation_token: None, elasticity_multiplier: proposer_config.elasticity_multiplier, git_commit_hash: get_git_commit_hash(), + current_checkpoint_store: initial_checkpoint_store, + current_checkpoint_blockchain: initial_checkpoint_blockchain, + genesis, + checkpoints_dir, }) } + #[expect(clippy::too_many_arguments)] pub async fn spawn( store: Store, blockchain: Arc, rollup_store: StoreRollup, cfg: SequencerConfig, sequencer_state: SequencerState, + initial_checkpoint_store: Store, + initial_checkpoint_blockchain: Arc, + genesis: Genesis, + checkpoints_dir: PathBuf, ) -> Result, CommitterError> { let state = Self::new( &cfg.l1_committer, @@ -192,6 +224,10 @@ impl L1Committer { rollup_store.clone(), cfg.based.enabled, sequencer_state, + initial_checkpoint_store, + initial_checkpoint_blockchain, + genesis, + checkpoints_dir, ) .await?; // NOTE: we spawn as blocking due to `generate_blobs_bundle` and @@ -284,6 +320,12 @@ impl L1Committer { self.generate_and_store_batch_prover_input(&batch).await?; + // We need to update the current checkpoint after generating the witness + // with it, and before sending the commitment. + // The actual checkpoint store directory is not pruned until the batch + // it served in is verified on L1. + self.update_current_checkpoint(&batch).await?; + info!( first_block = batch.first_block, last_block = batch.last_block, @@ -349,27 +391,52 @@ impl L1Committer { info!("Preparing state diff from block {first_block_of_batch}, {batch_number}"); + let one_time_checkpoint_path = self + .checkpoints_dir + .join(format!("temp_checkpoint_batch_{batch_number}")); + + // For re-execution we need to use a checkpoint to the previous state + // (i.e. checkpoint of the state to the latest block from the previous + // batch, or the state of the genesis if this is the first batch). + // We already have this initial checkpoint as part of the L1Committer + // struct, but we need to create a one-time copy of it because + // we still need to use the current checkpoint store later for witness + // generation. + let (one_time_checkpoint_store, one_time_checkpoint_blockchain) = self + .create_checkpoint(&self.current_checkpoint_store, &one_time_checkpoint_path) + .await?; + loop { let block_to_commit_number = last_added_block_number + 1; - // Get a block to add to the batch - let Some(block_to_commit_body) = self - .store - .get_block_body(block_to_commit_number) - .await - .map_err(CommitterError::from)? - else { - debug!("No new block to commit, skipping.."); - break; + + // Get potential block to include in the batch + // Here it is ok to fetch the blocks from the main store and not from + // the checkpoint because the blocks will be available. We only need + // the checkpoint for re-execution, this is during witness generation + // in generate_and_store_batch_prover_input and for later in this + // function. + let potential_batch_block = { + let Some(block_to_commit_body) = self + .store + .get_block_body(block_to_commit_number) + .await + .map_err(CommitterError::from)? + else { + debug!("No new block to commit, skipping.."); + break; + }; + let block_to_commit_header = self + .store + .get_block_header(block_to_commit_number) + .map_err(CommitterError::from)? + .ok_or(CommitterError::FailedToGetInformationFromStorage( + "Failed to get_block_header() after get_block_body()".to_owned(), + ))?; + + Block::new(block_to_commit_header, block_to_commit_body) }; - let block_to_commit_header = self - .store - .get_block_header(block_to_commit_number) - .map_err(CommitterError::from)? - .ok_or(CommitterError::FailedToGetInformationFromStorage( - "Failed to get_block_header() after get_block_body()".to_owned(), - ))?; - let current_block_gas_used = block_to_commit_header.gas_used; + let current_block_gas_used = potential_batch_block.header.gas_used; // Check if adding this block would exceed the batch gas limit if self.batch_gas_limit.is_some_and(|batch_gas_limit| { @@ -384,7 +451,7 @@ impl L1Committer { // Get block transactions and receipts let mut txs = vec![]; let mut receipts = vec![]; - for (index, tx) in block_to_commit_body.transactions.iter().enumerate() { + for (index, tx) in potential_batch_block.body.transactions.iter().enumerate() { let receipt = self .store .get_receipt(block_to_commit_number, index.try_into()?) @@ -402,14 +469,13 @@ impl L1Committer { .try_into() .inspect_err(|_| tracing::error!("Failed to collect metric tx count")) .unwrap_or(0); - batch_gas_used += block_to_commit_header.gas_used; + batch_gas_used += potential_batch_block.header.gas_used; ); // Get block messages and privileged transactions let messages = get_block_l1_messages(&receipts); let privileged_transactions = get_block_privileged_transactions(&txs); // Get block account updates. - let block_to_commit = Block::new(block_to_commit_header.clone(), block_to_commit_body); let account_updates = if let Some(account_updates) = self .rollup_store .get_account_updates_by_block_number(block_to_commit_number) @@ -422,13 +488,55 @@ impl L1Committer { last_added_block_number + 1 ); - let vm_db = - StoreVmDatabase::new(self.store.clone(), block_to_commit.header.parent_hash); - let mut vm = self.blockchain.new_evm(vm_db).await?; - vm.execute_block(&block_to_commit)?; + // Here we use the checkpoint store because we need the previous + // state available (i.e. not pruned) for re-execution. + let vm_db = StoreVmDatabase::new( + one_time_checkpoint_store.clone(), + potential_batch_block.header.parent_hash, + ); + + let fee_config = self + .rollup_store + .get_fee_config_by_block(block_to_commit_number) + .await? + .ok_or(CommitterError::FailedToGetInformationFromStorage( + "Failed to get fee config for re-execution".to_owned(), + ))?; + + let mut vm = Evm::new_for_l2(vm_db, fee_config)?; + + vm.execute_block(&potential_batch_block)?; + vm.get_state_transitions()? }; + // The checkpoint store's state corresponds to the parent state of + // the first block of the batch. Therefore, we need to apply the + // account updates of each block as we go, to be able to continue + // re-executing the next blocks in the batch. + { + let account_updates_list = one_time_checkpoint_store + .apply_account_updates_batch( + potential_batch_block.header.parent_hash, + &account_updates, + ) + .await? + .ok_or(CommitterError::FailedToGetInformationFromStorage( + "no account updated".to_owned(), + ))?; + + one_time_checkpoint_blockchain + .store_block( + potential_batch_block.clone(), + account_updates_list, + BlockExecutionResult { + receipts, + requests: vec![], + }, + ) + .await?; + } + // Accumulate block data with the rest of the batch. acc_messages.extend(messages.clone()); acc_privileged_txs.extend(privileged_transactions.clone()); @@ -441,6 +549,8 @@ impl L1Committer { } } + // It is safe to retrieve this from the main store because blocks + // are available there. What's not available is the state let parent_block_hash = self .store .get_block_header(first_block_of_batch)? @@ -448,7 +558,11 @@ impl L1Committer { "Failed to get_block_header() of the last added block".to_owned(), ))? .parent_hash; - let parent_db = StoreVmDatabase::new(self.store.clone(), parent_block_hash); + + // Again, here the VM database should be instantiated from the checkpoint + // store to have access to the previous state + let parent_db = + StoreVmDatabase::new(one_time_checkpoint_store.clone(), parent_block_hash); let acc_privileged_txs_len: u64 = acc_privileged_txs.len().try_into()?; if acc_privileged_txs_len > PRIVILEGED_TX_BUDGET { @@ -461,8 +575,8 @@ impl L1Committer { let result = if !self.validium { // Prepare current state diff. - let state_diff = prepare_state_diff( - block_to_commit_header, + let state_diff: StateDiff = prepare_state_diff( + potential_batch_block.header.clone(), &parent_db, &acc_messages, &acc_privileged_txs, @@ -504,9 +618,8 @@ impl L1Committer { message_hashes.extend(messages.iter().map(get_l1_message_hash)); - new_state_root = self - .store - .state_trie(block_to_commit.hash())? + new_state_root = one_time_checkpoint_store + .state_trie(potential_batch_block.hash())? .ok_or(CommitterError::FailedToGetInformationFromStorage( "Failed to get state root from storage".to_owned(), ))? @@ -547,6 +660,12 @@ impl L1Committer { let privileged_transactions_hash = compute_privileged_transactions_hash(privileged_transactions_hashes)?; + remove_dir_all(&one_time_checkpoint_path).map_err(|e| { + CommitterError::FailedToCreateCheckpoint(format!( + "Failed to remove one-time checkpoint directory {one_time_checkpoint_path:?}: {e}" + )) + })?; + Ok(( blobs_bundle, new_state_root, @@ -568,7 +687,7 @@ impl L1Committer { .await?; let batch_witness = self - .blockchain + .current_checkpoint_blockchain .generate_witness_for_blocks_with_fee_configs(&blocks, Some(&fee_configs)) .await .map_err(CommitterError::FailedToGenerateBatchWitness)?; @@ -618,6 +737,126 @@ impl L1Committer { Ok(()) } + /// Updates the current checkpoint store and blockchain to the state at the + /// given latest batch. + /// + /// The reference to the previous checkpoint is lost after this operation, + /// but the directory is not deleted until the batch it serves in is verified + /// on L1. + async fn update_current_checkpoint( + &mut self, + latest_batch: &Batch, + ) -> Result<(), CommitterError> { + let new_checkpoint_path = self + .checkpoints_dir + .join(format!("checkpoint_batch_{}", latest_batch.number)); + + // CAUTION + // We need to skip checkpoint creation if the directory already exists. + // Sometimes the commit_next_batch task is retried after a failure, and in + // that case we would try to create a checkpoint again at the same path, + // causing an lock error under rocksdb feature. + if new_checkpoint_path.exists() { + debug!("Checkpoint at path {new_checkpoint_path:?} already exists, skipping creation"); + return Ok(()); + } + + let (new_checkpoint_store, new_checkpoint_blockchain) = self + .create_checkpoint(&self.store, &new_checkpoint_path) + .await?; + + self.current_checkpoint_store = new_checkpoint_store; + + self.current_checkpoint_blockchain = new_checkpoint_blockchain; + + Ok(()) + } + + /// Creates a checkpoint of the given store at the specified path. + /// + /// This function performs the following steps: + /// 1. Creates a checkpoint of the provided store at the specified path. + /// 2. Initializes a new store and blockchain for the checkpoint. + /// 3. Regenerates the head state in the checkpoint store. + /// 4. Validates that the checkpoint store's head block number and latest block match those of the original store. + async fn create_checkpoint( + &self, + checkpointee: &Store, + path: &Path, + ) -> Result<(Store, Arc), CommitterError> { + checkpointee.create_checkpoint(&path).await?; + + #[cfg(feature = "rocksdb")] + let engine_type = EngineType::RocksDB; + #[cfg(not(feature = "rocksdb"))] + let engine_type = EngineType::InMemory; + + let checkpoint_store = { + let checkpoint_store_inner = Store::new(path, engine_type)?; + + checkpoint_store_inner + .add_initial_state(self.genesis.clone()) + .await?; + + checkpoint_store_inner + }; + + let checkpoint_blockchain = Arc::new(Blockchain::new( + checkpoint_store.clone(), + self.blockchain.options.clone(), + )); + + let checkpoint_head_block_number = checkpoint_store.get_latest_block_number().await?; + + let db_head_block_number = checkpointee.get_latest_block_number().await?; + + if checkpoint_head_block_number != db_head_block_number { + return Err(CommitterError::FailedToCreateCheckpoint( + "checkpoint store head block number does not match main store head block number before regeneration".to_string(), + )); + } + + regenerate_head_state(&checkpoint_store, &checkpoint_blockchain).await?; + + let checkpoint_latest_block_number = checkpoint_store.get_latest_block_number().await?; + + let db_latest_block_number = checkpointee.get_latest_block_number().await?; + + let checkpoint_latest_block = checkpoint_store + .get_block_by_number(checkpoint_latest_block_number) + .await? + .ok_or(CommitterError::FailedToCreateCheckpoint( + "latest block not found in checkpoint store".to_string(), + ))?; + + let db_latest_block = checkpointee + .get_block_by_number(db_latest_block_number) + .await? + .ok_or(CommitterError::FailedToCreateCheckpoint( + "latest block not found in main store".to_string(), + ))?; + + if !checkpoint_store.has_state_root(checkpoint_latest_block.header.state_root)? { + return Err(CommitterError::FailedToCreateCheckpoint( + "checkpoint store state is not regenerated properly".to_string(), + )); + } + + if checkpoint_latest_block_number != db_head_block_number { + return Err(CommitterError::FailedToCreateCheckpoint( + "checkpoint store latest block number does not match main store head block number after regeneration".to_string(), + )); + } + + if checkpoint_latest_block.hash() != db_latest_block.hash() { + return Err(CommitterError::FailedToCreateCheckpoint( + "checkpoint store latest block hash does not match main store latest block hash after regeneration".to_string(), + )); + } + + Ok((checkpoint_store, checkpoint_blockchain)) + } + async fn send_commitment(&mut self, batch: &Batch) -> Result { let messages_merkle_root = compute_merkle_root(&batch.message_hashes); let last_block_hash = get_last_block_hash(&self.store, batch.last_block)?; @@ -952,3 +1191,78 @@ async fn estimate_blob_gas( Ok(blob_gas) } + +/// Regenerates the state up to the head block by re-applying blocks from the +/// last known state root. +/// +/// Since the path-based feature was added, the database stores the state 128 +/// blocks behind the head block while the state of the blocks in between are +/// kept in in-memory-diff-layers. +/// +/// After the node is shut down, those in-memory layers are lost, and the database +/// won't have the state for those blocks. It will have the blocks though. +/// +/// When the node is started again, the state needs to be regenerated by +/// re-applying the blocks from the last known state root up to the head block. +/// +/// This function performs that regeneration. +pub async fn regenerate_head_state( + store: &Store, + blockchain: &Arc, +) -> Result<(), CommitterError> { + let head_block_number = store.get_latest_block_number().await?; + + let Some(last_header) = store.get_block_header(head_block_number)? else { + unreachable!("Database is empty, genesis block should be present"); + }; + + let mut current_last_header = last_header; + + // Find the last block with a known state root + while !store.has_state_root(current_last_header.state_root)? { + if current_last_header.number == 0 { + return Err(CommitterError::FailedToCreateCheckpoint( + "unknown state found in DB. Please run `ethrex removedb` and restart node" + .to_string(), + )); + } + let parent_number = current_last_header.number - 1; + + debug!("Need to regenerate state for block {parent_number}"); + + let Some(parent_header) = store.get_block_header(parent_number)? else { + return Err(CommitterError::FailedToCreateCheckpoint(format!( + "parent header for block {parent_number} not found" + ))); + }; + + current_last_header = parent_header; + } + + let last_state_number = current_last_header.number; + + if last_state_number == head_block_number { + debug!("State is already up to date"); + return Ok(()); + } + + info!("Regenerating state from block {last_state_number} to {head_block_number}"); + + // Re-apply blocks from the last known state root to the head block + for i in (last_state_number + 1)..=head_block_number { + debug!("Re-applying block {i} to regenerate state"); + + let block = store.get_block_by_number(i).await?.ok_or_else(|| { + CommitterError::FailedToCreateCheckpoint(format!("Block {i} not found")) + })?; + + blockchain + .add_block(block) + .await + .map_err(|err| CommitterError::FailedToCreateCheckpoint(err.to_string()))?; + } + + info!("Finished regenerating state"); + + Ok(()) +} diff --git a/crates/l2/sequencer/mod.rs b/crates/l2/sequencer/mod.rs index 9cf172aa779..25483efe96c 100644 --- a/crates/l2/sequencer/mod.rs +++ b/crates/l2/sequencer/mod.rs @@ -1,3 +1,4 @@ +use std::path::PathBuf; use std::sync::Arc; use crate::based::sequencer_state::SequencerState; @@ -8,6 +9,7 @@ use crate::sequencer::errors::SequencerError; use crate::{BlockFetcher, SequencerConfig, StateUpdater}; use block_producer::BlockProducer; use ethrex_blockchain::Blockchain; +use ethrex_common::types::Genesis; use ethrex_l2_common::prover::ProverType; use ethrex_storage::Store; use ethrex_storage_rollup::StoreRollup; @@ -36,6 +38,7 @@ pub mod errors; pub mod setup; pub mod utils; +#[expect(clippy::too_many_arguments)] pub async fn start_l2( store: Store, rollup_store: StoreRollup, @@ -43,6 +46,10 @@ pub async fn start_l2( cfg: SequencerConfig, cancellation_token: CancellationToken, #[cfg(feature = "metrics")] l2_url: String, + initial_checkpoint_store: Store, + initial_checkpoint_blockchain: Arc, + genesis: Genesis, + checkpoints_dir: PathBuf, ) -> Result<(), errors::SequencerError> { let initial_status = if cfg.based.enabled { SequencerStatus::default() @@ -104,6 +111,10 @@ pub async fn start_l2( rollup_store.clone(), cfg.clone(), shared_state.clone(), + initial_checkpoint_store, + initial_checkpoint_blockchain, + genesis, + checkpoints_dir, ) .await .inspect_err(|err| { diff --git a/crates/storage/api.rs b/crates/storage/api.rs index d3246a167c1..abc7b9c6cf5 100644 --- a/crates/storage/api.rs +++ b/crates/storage/api.rs @@ -3,6 +3,7 @@ use ethrex_common::types::{ Block, BlockBody, BlockHash, BlockHeader, BlockNumber, ChainConfig, Code, Index, Receipt, Transaction, }; +use std::path::Path; use std::{fmt::Debug, panic::RefUnwindSafe}; use crate::UpdateBatch; @@ -377,4 +378,6 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { async fn clear_fullsync_headers(&self) -> Result<(), StoreError>; fn generate_flatkeyvalue(&self) -> Result<(), StoreError>; + + async fn create_checkpoint(&self, path: &Path) -> Result<(), StoreError>; } diff --git a/crates/storage/store.rs b/crates/storage/store.rs index 080f39768f0..cc58e02321c 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -17,7 +17,7 @@ use ethrex_rlp::decode::RLPDecode; use ethrex_rlp::encode::RLPEncode; use ethrex_trie::{Nibbles, NodeRLP, Trie, TrieLogger, TrieNode, TrieWitness}; use sha3::{Digest as _, Keccak256}; -use std::sync::Arc; +use std::{collections::hash_map::Entry, sync::Arc}; use std::{ collections::{BTreeMap, HashMap}, sync::RwLock, @@ -447,61 +447,92 @@ impl Store { mut state_trie: Trie, account_updates: &[AccountUpdate], mut storage_tries: HashMap, - ) -> Result<(Trie, HashMap), StoreError> { + ) -> Result<(HashMap, AccountUpdatesList), StoreError> { + let mut ret_storage_updates = Vec::new(); + + let mut code_updates = Vec::new(); + let state_root = state_trie.hash_no_commit(); + for update in account_updates.iter() { let hashed_address = hash_address(&update.address); + if update.removed { // Remove account from trie state_trie.remove(&hashed_address)?; - } else { - // Add or update AccountState in the trie - // Fetch current state or create a new state to be inserted - let mut account_state = match state_trie.get(&hashed_address)? { - Some(encoded_state) => AccountState::decode(&encoded_state)?, - None => AccountState::default(), - }; - if let Some(info) = &update.info { - account_state.nonce = info.nonce; - account_state.balance = info.balance; - account_state.code_hash = info.code_hash; - // Store updated code in DB - if let Some(code) = &update.code { - self.add_account_code(code.clone()).await?; - } - } - if update.removed_storage { - account_state.storage_root = *EMPTY_TRIE_HASH; + + continue; + } + + // Add or update AccountState in the trie + // Fetch current state or create a new state to be inserted + let mut account_state = match state_trie.get(&hashed_address)? { + Some(encoded_state) => AccountState::decode(&encoded_state)?, + None => AccountState::default(), + }; + + if update.removed_storage { + account_state.storage_root = *EMPTY_TRIE_HASH; + } + + if let Some(info) = &update.info { + account_state.nonce = info.nonce; + + account_state.balance = info.balance; + + account_state.code_hash = info.code_hash; + + // Store updated code in DB + if let Some(code) = &update.code { + code_updates.push((info.code_hash, code.clone())); } - // Store the added storage in the account's storage trie and compute its new root - if !update.added_storage.is_empty() { - let (_witness, storage_trie) = match storage_tries.entry(update.address) { - std::collections::hash_map::Entry::Occupied(value) => value.into_mut(), - std::collections::hash_map::Entry::Vacant(vacant) => { - let trie = self.engine.open_storage_trie( - H256::from_slice(&hashed_address), - account_state.storage_root, - state_root, - )?; - vacant.insert(TrieLogger::open_trie(trie)) - } - }; - - for (storage_key, storage_value) in &update.added_storage { - let hashed_key = hash_key(storage_key); - if storage_value.is_zero() { - storage_trie.remove(&hashed_key)?; - } else { - storage_trie.insert(hashed_key, storage_value.encode_to_vec())?; - } + } + + // Store the added storage in the account's storage trie and compute its new root + if !update.added_storage.is_empty() { + let (_witness, storage_trie) = match storage_tries.entry(update.address) { + Entry::Occupied(value) => value.into_mut(), + Entry::Vacant(vacant) => { + let trie = self.engine.open_storage_trie( + H256::from_slice(&hashed_address), + account_state.storage_root, + state_root, + )?; + vacant.insert(TrieLogger::open_trie(trie)) + } + }; + + for (storage_key, storage_value) in &update.added_storage { + let hashed_key = hash_key(storage_key); + + if storage_value.is_zero() { + storage_trie.remove(&hashed_key)?; + } else { + storage_trie.insert(hashed_key, storage_value.encode_to_vec())?; } - account_state.storage_root = storage_trie.hash_no_commit(); } - state_trie.insert(hashed_address, account_state.encode_to_vec())?; + + let (storage_hash, storage_updates) = + storage_trie.collect_changes_since_last_hash(); + + account_state.storage_root = storage_hash; + + ret_storage_updates.push((H256::from_slice(&hashed_address), storage_updates)); } + + state_trie.insert(hashed_address, account_state.encode_to_vec())?; } - Ok((state_trie, storage_tries)) + let (state_trie_hash, state_updates) = state_trie.collect_changes_since_last_hash(); + + let account_updates_list = AccountUpdatesList { + state_trie_hash, + state_updates, + storage_updates: ret_storage_updates, + code_updates, + }; + + Ok((storage_tries, account_updates_list)) } /// Adds all genesis accounts and returns the genesis block's state_root @@ -1362,6 +1393,10 @@ impl Store { pub fn generate_flatkeyvalue(&self) -> Result<(), StoreError> { self.engine.generate_flatkeyvalue() } + + pub async fn create_checkpoint(&self, path: impl AsRef) -> Result<(), StoreError> { + self.engine.create_checkpoint(path.as_ref()).await + } } pub struct AccountProof { diff --git a/crates/storage/store_db/in_memory.rs b/crates/storage/store_db/in_memory.rs index 4557ec2c02e..17263f90c9b 100644 --- a/crates/storage/store_db/in_memory.rs +++ b/crates/storage/store_db/in_memory.rs @@ -14,6 +14,7 @@ use ethrex_trie::{InMemoryTrieDB, Nibbles, Trie, db::NodeMap}; use std::{ collections::HashMap, fmt::Debug, + path::Path, sync::{Arc, Mutex, MutexGuard, RwLock}, }; @@ -733,6 +734,12 @@ impl StoreEngine for Store { // Silently ignoring the request to build the FlatKeyValue is harmless Ok(()) } + + async fn create_checkpoint(&self, _path: &Path) -> Result<(), StoreError> { + // Checkpoints are not supported for the InMemory DB + // Silently ignoring the request to create a checkpoint is harmless + Ok(()) + } } impl Debug for Store { diff --git a/crates/storage/store_db/rocksdb.rs b/crates/storage/store_db/rocksdb.rs index b9f71221d98..bc4cd516e5b 100644 --- a/crates/storage/store_db/rocksdb.rs +++ b/crates/storage/store_db/rocksdb.rs @@ -16,7 +16,7 @@ use ethrex_common::{ use ethrex_trie::{Nibbles, Node, Trie}; use rocksdb::{ BlockBasedOptions, BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, - Options, WriteBatch, + Options, WriteBatch, checkpoint::Checkpoint, }; use std::{ collections::HashSet, @@ -1763,6 +1763,19 @@ impl StoreEngine for Store { .send(FKVGeneratorControlMessage::Continue) .map_err(|_| StoreError::Custom("FlatKeyValue thread disconnected.".to_string())) } + + async fn create_checkpoint(&self, path: &Path) -> Result<(), StoreError> { + let checkpoint = Checkpoint::new(&self.db) + .map_err(|e| StoreError::Custom(format!("Failed to create checkpoint: {e}")))?; + + checkpoint.create_checkpoint(path).map_err(|e| { + StoreError::Custom(format!( + "Failed to create RocksDB checkpoint at {path:?}: {e}" + )) + })?; + + Ok(()) + } } /// Open column families