diff --git a/.github/workflows/pr_perf_blocks_exec.yaml b/.github/workflows/pr_perf_blocks_exec.yaml index 6354655dd1a..23bf7122456 100644 --- a/.github/workflows/pr_perf_blocks_exec.yaml +++ b/.github/workflows/pr_perf_blocks_exec.yaml @@ -83,7 +83,7 @@ jobs: BINS="base,head" hyperfine --setup "./bin/ethrex-base removedb" -w 5 -N -r 10 --show-output --export-markdown "bench_pr_comparison.md" \ -L bin "$BINS" -n "{bin}" \ - "./bin/ethrex-{bin} --network test_data/genesis-l2-ci.json import ./test_data/l2-1k-erc20.rlp --removedb --force" + "./bin/ethrex-{bin} --network test_data/genesis-l2-ci.json --force import ./test_data/l2-1k-erc20.rlp --removedb" echo -e "## Benchmark Block Execution Results Comparison Against Main\n\n$(cat bench_pr_comparison.md)" > bench_pr_comparison.md - name: Upload PR results uses: actions/upload-artifact@v4 diff --git a/CHANGELOG.md b/CHANGELOG.md index 07710d0ae56..f93d2fb5533 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Perf +### 2025-04-11 + +- Asyncify some DB read APIs, as well as its users [#2430](https://github.com/lambdaclass/ethrex/pull/2430) + ### 2025-04-09 - Fix an issue where the table was locked for up to 20 sec when performing a ping: [2368](https://github.com/lambdaclass/ethrex/pull/2368) diff --git a/cmd/ef_tests/blockchain/test_runner.rs b/cmd/ef_tests/blockchain/test_runner.rs index 2c715719215..a9a04a6f3e7 100644 --- a/cmd/ef_tests/blockchain/test_runner.rs +++ b/cmd/ef_tests/blockchain/test_runner.rs @@ -54,7 +54,7 @@ pub async fn run_ef_test(test_key: &str, test: &TestUnit) { } } } - check_poststate_against_db(test_key, test, &store) + check_poststate_against_db(test_key, test, &store).await } /// Tests the rlp decoding of a block @@ -136,13 +136,14 @@ fn check_prestate_against_db(test_key: &str, test: &TestUnit, db: &Store) { /// Checks that all accounts in the post-state are present and have the correct values in the DB /// Panics if any comparison fails /// Tests that previously failed the validation stage shouldn't be executed with this function. -fn check_poststate_against_db(test_key: &str, test: &TestUnit, db: &Store) { - let latest_block_number = db.get_latest_block_number().unwrap(); +async fn check_poststate_against_db(test_key: &str, test: &TestUnit, db: &Store) { + let latest_block_number = db.get_latest_block_number().await.unwrap(); for (addr, account) in &test.post_state { let expected_account: CoreAccount = account.clone().into(); // Check info let db_account_info = db .get_account_info(latest_block_number, *addr) + .await .expect("Failed to read from DB") .unwrap_or_else(|| { panic!("Account info for address {addr} not found in DB, test:{test_key}") @@ -167,6 +168,7 @@ fn check_poststate_against_db(test_key: &str, test: &TestUnit, db: &Store) { for (key, value) in expected_account.storage { let db_storage_value = db .get_storage_at(latest_block_number, *addr, key) + .await .expect("Failed to read from DB") .unwrap_or_else(|| { panic!("Storage missing for address {addr} key {key} in DB test:{test_key}") @@ -178,7 +180,7 @@ fn check_poststate_against_db(test_key: &str, test: &TestUnit, db: &Store) { } } // Check lastblockhash is in store - let last_block_number = db.get_latest_block_number().unwrap(); + let last_block_number = db.get_latest_block_number().await.unwrap(); let last_block_hash = db .get_block_header(last_block_number) .unwrap() diff --git a/cmd/ethrex/ethrex.rs b/cmd/ethrex/ethrex.rs index d5100f13779..d2492595a20 100644 --- a/cmd/ethrex/ethrex.rs +++ b/cmd/ethrex/ethrex.rs @@ -55,7 +55,8 @@ async fn main() -> eyre::Result<()> { blockchain.clone(), cancel_token.clone(), tracker.clone(), - ); + ) + .await; init_metrics(&opts, tracker.clone()); @@ -63,7 +64,7 @@ async fn main() -> eyre::Result<()> { if #[cfg(feature = "dev")] { use ethrex::initializers::init_dev_network; - init_dev_network(&opts, &store, tracker.clone()); + init_dev_network(&opts, &store, tracker.clone()).await; } else { use ethrex::initializers::init_network; diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index 942c48aa968..0730ca42b4c 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -94,7 +94,7 @@ pub fn init_blockchain(evm_engine: EvmEngine, store: Store) -> Arc { } #[allow(clippy::too_many_arguments)] -pub fn init_rpc_api( +pub async fn init_rpc_api( opts: &Options, #[cfg(feature = "l2")] l2_opts: &L2Options, signer: &SigningKey, @@ -119,7 +119,8 @@ pub fn init_rpc_api( cancel_token, blockchain.clone(), store.clone(), - ); + ) + .await; let rpc_api = ethrex_rpc::start_api( get_http_socket_addr(opts), @@ -208,14 +209,15 @@ pub async fn init_network( } #[cfg(feature = "dev")] -pub fn init_dev_network(opts: &Options, store: &Store, tracker: TaskTracker) { +pub async fn init_dev_network(opts: &Options, store: &Store, tracker: TaskTracker) { if opts.dev { info!("Running in DEV_MODE"); let head_block_hash = { - let current_block_number = store.get_latest_block_number().unwrap(); + let current_block_number = store.get_latest_block_number().await.unwrap(); store .get_canonical_block_hash(current_block_number) + .await .unwrap() .unwrap() }; diff --git a/cmd/ethrex/l2.rs b/cmd/ethrex/l2.rs index 0e0650c13da..53d317fd04f 100644 --- a/cmd/ethrex/l2.rs +++ b/cmd/ethrex/l2.rs @@ -149,7 +149,8 @@ impl Command { blockchain.clone(), cancel_token.clone(), tracker.clone(), - ); + ) + .await; // TODO: Add a --metrics flag to enable metrics. init_metrics(&opts.node_opts, tracker.clone()); diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 17a672511a1..ee242837022 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -351,7 +351,7 @@ impl Blockchain { /// Add a blob transaction and its blobs bundle to the mempool checking that the transaction is valid #[cfg(feature = "c-kzg")] - pub fn add_blob_transaction_to_pool( + pub async fn add_blob_transaction_to_pool( &self, transaction: EIP4844Transaction, blobs_bundle: BlobsBundle, @@ -364,7 +364,7 @@ impl Blockchain { let sender = transaction.sender(); // Validate transaction - self.validate_transaction(&transaction, sender)?; + self.validate_transaction(&transaction, sender).await?; // Add transaction and blobs bundle to storage let hash = transaction.compute_hash(); @@ -375,14 +375,17 @@ impl Blockchain { } /// Add a transaction to the mempool checking that the transaction is valid - pub fn add_transaction_to_pool(&self, transaction: Transaction) -> Result { + pub async fn add_transaction_to_pool( + &self, + transaction: Transaction, + ) -> Result { // Blob transactions should be submitted via add_blob_transaction along with the corresponding blobs bundle if matches!(transaction, Transaction::EIP4844Transaction(_)) { return Err(MempoolError::BlobTxNoBlobsBundle); } let sender = transaction.sender(); // Validate transaction - self.validate_transaction(&transaction, sender)?; + self.validate_transaction(&transaction, sender).await?; let hash = transaction.compute_hash(); @@ -430,14 +433,14 @@ impl Blockchain { */ - pub fn validate_transaction( + pub async fn validate_transaction( &self, tx: &Transaction, sender: Address, ) -> Result<(), MempoolError> { // TODO: Add validations here - let header_no = self.storage.get_latest_block_number()?; + let header_no = self.storage.get_latest_block_number().await?; let header = self .storage .get_block_header(header_no)? @@ -477,7 +480,7 @@ impl Blockchain { } }; - let maybe_sender_acc_info = self.storage.get_account_info(header_no, sender)?; + let maybe_sender_acc_info = self.storage.get_account_info(header_no, sender).await?; if let Some(sender_acc_info) = maybe_sender_acc_info { if tx.nonce() < sender_acc_info.nonce { @@ -562,8 +565,8 @@ pub fn validate_receipts_root( } // Returns the hash of the head of the canonical chain (the latest valid hash). -pub fn latest_canonical_block_hash(storage: &Store) -> Result { - let latest_block_number = storage.get_latest_block_number()?; +pub async fn latest_canonical_block_hash(storage: &Store) -> Result { + let latest_block_number = storage.get_latest_block_number().await?; if let Some(latest_valid_header) = storage.get_block_header(latest_block_number)? { let latest_valid_hash = latest_valid_header.compute_block_hash(); return Ok(latest_valid_hash); @@ -611,12 +614,12 @@ pub fn validate_block( Ok(()) } -pub fn is_canonical( +pub async fn is_canonical( store: &Store, block_number: BlockNumber, block_hash: BlockHash, ) -> Result { - match store.get_canonical_block_hash(block_number)? { + match store.get_canonical_block_hash(block_number).await? { Some(hash) if hash == block_hash => Ok(true), _ => Ok(false), } diff --git a/crates/blockchain/fork_choice.rs b/crates/blockchain/fork_choice.rs index f1013b922c3..257183ffc9a 100644 --- a/crates/blockchain/fork_choice.rs +++ b/crates/blockchain/fork_choice.rs @@ -54,15 +54,15 @@ pub async fn apply_fork_choice( return Err(InvalidForkChoice::Syncing); }; - let latest = store.get_latest_block_number()?; + let latest = store.get_latest_block_number().await?; // If the head block is an already present head ancestor, skip the update. - if is_canonical(store, head.number, head_hash)? && head.number < latest { + if is_canonical(store, head.number, head_hash).await? && head.number < latest { return Err(InvalidForkChoice::NewHeadAlreadyCanonical); } // Find blocks that will be part of the new canonical chain. - let Some(new_canonical_blocks) = find_link_with_canonical_chain(store, &head)? else { + let Some(new_canonical_blocks) = find_link_with_canonical_chain(store, &head).await? else { return Err(InvalidForkChoice::Disconnected( error::ForkChoiceElement::Head, error::ForkChoiceElement::Safe, @@ -76,7 +76,7 @@ pub async fn apply_fork_choice( // Check that finalized and safe blocks are part of the new canonical chain. if let Some(ref finalized) = finalized_res { - if !((is_canonical(store, finalized.number, finalized_hash)? + if !((is_canonical(store, finalized.number, finalized_hash).await? && finalized.number <= link_block_number) || (finalized.number == head.number && finalized_hash == head_hash) || new_canonical_blocks.contains(&(finalized.number, finalized_hash))) @@ -89,7 +89,8 @@ pub async fn apply_fork_choice( } if let Some(ref safe) = safe_res { - if !((is_canonical(store, safe.number, safe_hash)? && safe.number <= link_block_number) + if !((is_canonical(store, safe.number, safe_hash).await? + && safe.number <= link_block_number) || (safe.number == head.number && safe_hash == head_hash) || new_canonical_blocks.contains(&(safe.number, safe_hash))) { @@ -159,7 +160,7 @@ fn check_order( // - Ok(Some([])): the block is already canonical. // - Ok(Some(branch)): the "branch" is a sequence of blocks that connects the ancestor and the // descendant. -fn find_link_with_canonical_chain( +async fn find_link_with_canonical_chain( store: &Store, block: &BlockHeader, ) -> Result>, StoreError> { @@ -168,11 +169,11 @@ fn find_link_with_canonical_chain( let mut header = block.clone(); let mut branch = Vec::new(); - if is_canonical(store, block_number, block_hash)? { + if is_canonical(store, block_number, block_hash).await? { return Ok(Some(branch)); } - let genesis_number = store.get_earliest_block_number()?; + let genesis_number = store.get_earliest_block_number().await?; while block_number > genesis_number { block_number -= 1; @@ -185,7 +186,7 @@ fn find_link_with_canonical_chain( Err(error) => return Err(error), }; - if is_canonical(store, block_number, parent_hash)? { + if is_canonical(store, block_number, parent_hash).await? { return Ok(Some(branch)); } else { branch.push((block_number, parent_hash)); diff --git a/crates/blockchain/mempool.rs b/crates/blockchain/mempool.rs index b2145533730..3bfb822bff2 100644 --- a/crates/blockchain/mempool.rs +++ b/crates/blockchain/mempool.rs @@ -290,18 +290,15 @@ mod tests { use ethrex_storage::EngineType; use ethrex_storage::{error::StoreError, Store}; - fn setup_storage(config: ChainConfig, header: BlockHeader) -> Result { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - let store = Store::new("test", EngineType::InMemory)?; - let block_number = header.number; - let block_hash = header.compute_block_hash(); - store.add_block_header(block_hash, header).await?; - store.set_canonical_block(block_number, block_hash).await?; - store.update_latest_block_number(block_number).await?; - store.set_chain_config(&config).await?; - Ok(store) - }) + async fn setup_storage(config: ChainConfig, header: BlockHeader) -> Result { + let store = Store::new("test", EngineType::InMemory)?; + let block_number = header.number; + let block_hash = header.compute_block_hash(); + store.add_block_header(block_hash, header).await?; + store.set_canonical_block(block_number, block_hash).await?; + store.update_latest_block_number(block_number).await?; + store.set_chain_config(&config).await?; + Ok(store) } fn build_basic_config_and_header( @@ -502,11 +499,11 @@ mod tests { assert_eq!(intrinsic_gas, expected_gas_cost); } - #[test] - fn transaction_with_big_init_code_in_shanghai_fails() { + #[tokio::test] + async fn transaction_with_big_init_code_in_shanghai_fails() { let (config, header) = build_basic_config_and_header(false, true); - let store = setup_storage(config, header).expect("Storage setup"); + let store = setup_storage(config, header).await.expect("Storage setup"); let blockchain = Blockchain::default_with_store(store); let tx = EIP1559Transaction { @@ -524,16 +521,16 @@ mod tests { let tx = Transaction::EIP1559Transaction(tx); let validation = blockchain.validate_transaction(&tx, Address::random()); assert!(matches!( - validation, + validation.await, Err(MempoolError::TxMaxInitCodeSizeError) )); } - #[test] - fn transaction_with_gas_limit_higher_than_of_the_block_should_fail() { + #[tokio::test] + async fn transaction_with_gas_limit_higher_than_of_the_block_should_fail() { let (config, header) = build_basic_config_and_header(false, false); - let store = setup_storage(config, header).expect("Storage setup"); + let store = setup_storage(config, header).await.expect("Storage setup"); let blockchain = Blockchain::default_with_store(store); let tx = EIP1559Transaction { @@ -551,16 +548,16 @@ mod tests { let tx = Transaction::EIP1559Transaction(tx); let validation = blockchain.validate_transaction(&tx, Address::random()); assert!(matches!( - validation, + validation.await, Err(MempoolError::TxGasLimitExceededError) )); } - #[test] - fn transaction_with_priority_fee_higher_than_gas_fee_should_fail() { + #[tokio::test] + async fn transaction_with_priority_fee_higher_than_gas_fee_should_fail() { let (config, header) = build_basic_config_and_header(false, false); - let store = setup_storage(config, header).expect("Storage setup"); + let store = setup_storage(config, header).await.expect("Storage setup"); let blockchain = Blockchain::default_with_store(store); let tx = EIP1559Transaction { @@ -578,15 +575,15 @@ mod tests { let tx = Transaction::EIP1559Transaction(tx); let validation = blockchain.validate_transaction(&tx, Address::random()); assert!(matches!( - validation, + validation.await, Err(MempoolError::TxTipAboveFeeCapError) )); } - #[test] - fn transaction_with_gas_limit_lower_than_intrinsic_gas_should_fail() { + #[tokio::test] + async fn transaction_with_gas_limit_lower_than_intrinsic_gas_should_fail() { let (config, header) = build_basic_config_and_header(false, false); - let store = setup_storage(config, header).expect("Storage setup"); + let store = setup_storage(config, header).await.expect("Storage setup"); let blockchain = Blockchain::default_with_store(store); let intrinsic_gas_cost = TX_GAS_COST; @@ -605,15 +602,15 @@ mod tests { let tx = Transaction::EIP1559Transaction(tx); let validation = blockchain.validate_transaction(&tx, Address::random()); assert!(matches!( - validation, + validation.await, Err(MempoolError::TxIntrinsicGasCostAboveLimitError) )); } - #[test] - fn transaction_with_blob_base_fee_below_min_should_fail() { + #[tokio::test] + async fn transaction_with_blob_base_fee_below_min_should_fail() { let (config, header) = build_basic_config_and_header(false, false); - let store = setup_storage(config, header).expect("Storage setup"); + let store = setup_storage(config, header).await.expect("Storage setup"); let blockchain = Blockchain::default_with_store(store); let tx = EIP4844Transaction { @@ -632,7 +629,7 @@ mod tests { let tx = Transaction::EIP4844Transaction(tx); let validation = blockchain.validate_transaction(&tx, Address::random()); assert!(matches!( - validation, + validation.await, Err(MempoolError::TxBlobBaseFeeTooLowError) )); } diff --git a/crates/blockchain/smoke_test.rs b/crates/blockchain/smoke_test.rs index e2ab55d3ae3..c6611d9f0d0 100644 --- a/crates/blockchain/smoke_test.rs +++ b/crates/blockchain/smoke_test.rs @@ -34,7 +34,7 @@ mod blockchain_integration_test { let retrieved_1a = store.get_block_header(1).unwrap().unwrap(); assert_eq!(retrieved_1a, block_1a.header); - assert!(is_canonical(&store, 1, hash_1a).unwrap()); + assert!(is_canonical(&store, 1, hash_1a).await.unwrap()); // Add second block at height 1. Will not be canonical. let block_1b = new_block(&store, &genesis_header).await; @@ -46,7 +46,7 @@ mod blockchain_integration_test { let retrieved_1b = store.get_block_header_by_hash(hash_1b).unwrap().unwrap(); assert_ne!(retrieved_1a, retrieved_1b); - assert!(!is_canonical(&store, 1, hash_1b).unwrap()); + assert!(!is_canonical(&store, 1, hash_1b).await.unwrap()); // Add a third block at height 2, child to the non canonical block. let block_2 = new_block(&store, &block_1b.header).await; @@ -58,7 +58,7 @@ mod blockchain_integration_test { let retrieved_2 = store.get_block_header_by_hash(hash_2).unwrap(); assert!(retrieved_2.is_some()); - assert!(store.get_canonical_block_hash(2).unwrap().is_none()); + assert!(store.get_canonical_block_hash(2).await.unwrap().is_none()); // Receive block 2 as new head. apply_fork_choice( @@ -71,10 +71,10 @@ mod blockchain_integration_test { .unwrap(); // Check that canonical blocks changed to the new branch. - assert!(is_canonical(&store, 0, genesis_hash).unwrap()); - assert!(is_canonical(&store, 1, hash_1b).unwrap()); - assert!(is_canonical(&store, 2, hash_2).unwrap()); - assert!(!is_canonical(&store, 1, hash_1a).unwrap()); + assert!(is_canonical(&store, 0, genesis_hash).await.unwrap()); + assert!(is_canonical(&store, 1, hash_1b).await.unwrap()); + assert!(is_canonical(&store, 2, hash_2).await.unwrap()); + assert!(!is_canonical(&store, 1, hash_1a).await.unwrap()); } #[tokio::test] @@ -101,13 +101,13 @@ mod blockchain_integration_test { assert!(matches!(result, Err(ChainError::ParentNotFound))); // block 2 should now be pending. - assert!(store.get_pending_block(hash_2).unwrap().is_some()); + assert!(store.get_pending_block(hash_2).await.unwrap().is_some()); let fc_result = apply_fork_choice(&store, hash_2, H256::zero(), H256::zero()).await; assert!(matches!(fc_result, Err(InvalidForkChoice::Syncing))); // block 2 should still be pending. - assert!(store.get_pending_block(hash_2).unwrap().is_some()); + assert!(store.get_pending_block(hash_2).await.unwrap().is_some()); } #[tokio::test] @@ -126,7 +126,7 @@ mod blockchain_integration_test { blockchain.add_block(&block_1a).await.unwrap(); let retrieved_1a = store.get_block_header_by_hash(hash_1a).unwrap().unwrap(); - assert!(!is_canonical(&store, 1, hash_1a).unwrap()); + assert!(!is_canonical(&store, 1, hash_1a).await.unwrap()); // Add second block at height 1. Canonical. let block_1b = new_block(&store, &genesis_header).await; @@ -142,8 +142,8 @@ mod blockchain_integration_test { assert_ne!(retrieved_1a, retrieved_1b); assert_eq!(retrieved_1b, block_1b.header); - assert!(is_canonical(&store, 1, hash_1b).unwrap()); - assert_eq!(latest_canonical_block_hash(&store).unwrap(), hash_1b); + assert!(is_canonical(&store, 1, hash_1b).await.unwrap()); + assert_eq!(latest_canonical_block_hash(&store).await.unwrap(), hash_1b); // Add a third block at height 2, child to the canonical one. let block_2 = new_block(&store, &block_1b.header).await; @@ -156,11 +156,14 @@ mod blockchain_integration_test { .await .unwrap(); let retrieved_2 = store.get_block_header_by_hash(hash_2).unwrap(); - assert_eq!(latest_canonical_block_hash(&store).unwrap(), hash_2); + assert_eq!(latest_canonical_block_hash(&store).await.unwrap(), hash_2); assert!(retrieved_2.is_some()); - assert!(is_canonical(&store, 2, hash_2).unwrap()); - assert_eq!(store.get_canonical_block_hash(2).unwrap().unwrap(), hash_2); + assert!(is_canonical(&store, 2, hash_2).await.unwrap()); + assert_eq!( + store.get_canonical_block_hash(2).await.unwrap().unwrap(), + hash_2 + ); // Receive block 1a as new head. apply_fork_choice( @@ -173,10 +176,10 @@ mod blockchain_integration_test { .unwrap(); // Check that canonical blocks changed to the new branch. - assert!(is_canonical(&store, 0, genesis_hash).unwrap()); - assert!(is_canonical(&store, 1, hash_1a).unwrap()); - assert!(!is_canonical(&store, 1, hash_1b).unwrap()); - assert!(!is_canonical(&store, 2, hash_2).unwrap()); + assert!(is_canonical(&store, 0, genesis_hash).await.unwrap()); + assert!(is_canonical(&store, 1, hash_1a).await.unwrap()); + assert!(!is_canonical(&store, 1, hash_1b).await.unwrap()); + assert!(!is_canonical(&store, 2, hash_2).await.unwrap()); } #[tokio::test] @@ -205,16 +208,16 @@ mod blockchain_integration_test { .await .expect("Could not add block 2."); - assert!(!is_canonical(&store, 1, hash_1).unwrap()); - assert!(!is_canonical(&store, 2, hash_2).unwrap()); + assert!(!is_canonical(&store, 1, hash_1).await.unwrap()); + assert!(!is_canonical(&store, 2, hash_2).await.unwrap()); // Make that chain the canonical one. apply_fork_choice(&store, hash_2, genesis_hash, genesis_hash) .await .unwrap(); - assert!(is_canonical(&store, 1, hash_1).unwrap()); - assert!(is_canonical(&store, 2, hash_2).unwrap()); + assert!(is_canonical(&store, 1, hash_1).await.unwrap()); + assert!(is_canonical(&store, 2, hash_2).await.unwrap()); let result = apply_fork_choice(&store, hash_1, hash_1, hash_1).await; @@ -224,9 +227,9 @@ mod blockchain_integration_test { )); // Important blocks should still be the same as before. - assert!(store.get_finalized_block_number().unwrap() == Some(0)); - assert!(store.get_safe_block_number().unwrap() == Some(0)); - assert!(store.get_latest_block_number().unwrap() == 2); + assert!(store.get_finalized_block_number().await.unwrap() == Some(0)); + assert!(store.get_safe_block_number().await.unwrap() == Some(0)); + assert!(store.get_latest_block_number().await.unwrap() == 2); } #[tokio::test] @@ -257,14 +260,17 @@ mod blockchain_integration_test { .await .expect("Could not add block 2."); - assert_eq!(latest_canonical_block_hash(&store).unwrap(), genesis_hash); + assert_eq!( + latest_canonical_block_hash(&store).await.unwrap(), + genesis_hash + ); // Make that chain the canonical one. apply_fork_choice(&store, hash_2, genesis_hash, genesis_hash) .await .unwrap(); - assert_eq!(latest_canonical_block_hash(&store).unwrap(), hash_2); + assert_eq!(latest_canonical_block_hash(&store).await.unwrap(), hash_2); // Add a new, non canonical block, starting from genesis. let block_1b = new_block(&store, &genesis_header).await; @@ -275,7 +281,7 @@ mod blockchain_integration_test { .expect("Could not add block b."); // The latest block should be the same. - assert_eq!(latest_canonical_block_hash(&store).unwrap(), hash_2); + assert_eq!(latest_canonical_block_hash(&store).await.unwrap(), hash_2); // if we apply fork choice to the new one, then we should apply_fork_choice(&store, hash_b, genesis_hash, genesis_hash) @@ -283,7 +289,7 @@ mod blockchain_integration_test { .unwrap(); // The latest block should now be the new head. - assert_eq!(latest_canonical_block_hash(&store).unwrap(), hash_b); + assert_eq!(latest_canonical_block_hash(&store).await.unwrap(), hash_b); } async fn new_block(store: &Store, parent: &BlockHeader) -> Block { diff --git a/crates/l2/sequencer/block_producer.rs b/crates/l2/sequencer/block_producer.rs index 67590845de5..69123d65fe0 100644 --- a/crates/l2/sequencer/block_producer.rs +++ b/crates/l2/sequencer/block_producer.rs @@ -76,7 +76,7 @@ impl BlockProducer { ) -> Result<(), BlockProducerError> { let version = 3; let head_header = { - let current_block_number = store.get_latest_block_number()?; + let current_block_number = store.get_latest_block_number().await?; store .get_block_header(current_block_number)? .ok_or(BlockProducerError::StorageDataIsNone)? diff --git a/crates/l2/sequencer/l1_committer.rs b/crates/l2/sequencer/l1_committer.rs index d6ac1365766..ccf6754232c 100644 --- a/crates/l2/sequencer/l1_committer.rs +++ b/crates/l2/sequencer/l1_committer.rs @@ -93,6 +93,7 @@ impl Committer { let Some(block_to_commit_body) = self .store .get_block_body(block_number) + .await .map_err(CommitterError::from)? else { debug!("No new block to commit, skipping.."); @@ -111,7 +112,8 @@ impl Committer { for (index, tx) in block_to_commit_body.transactions.iter().enumerate() { let receipt = self .store - .get_receipt(block_number, index.try_into()?)? + .get_receipt(block_number, index.try_into()?) + .await? .ok_or(CommitterError::InternalError( "Transactions in a block should have a receipt".to_owned(), ))?; @@ -151,13 +153,15 @@ impl Committer { } }; - let state_diff = self.prepare_state_diff( - &block_to_commit, - self.store.clone(), - withdrawals, - deposits, - &account_updates, - )?; + let state_diff = self + .prepare_state_diff( + &block_to_commit, + self.store.clone(), + withdrawals, + deposits, + &account_updates, + ) + .await?; let blobs_bundle = self.generate_blobs_bundle(&state_diff)?; @@ -262,7 +266,7 @@ impl Committer { } /// Prepare the state diff for the block. - fn prepare_state_diff( + async fn prepare_state_diff( &self, block: &Block, store: Store, @@ -279,6 +283,7 @@ impl Committer { // and we may have to keep track of the latestCommittedBlock (last block of the batch), // the batch_size and the latestCommittedBatch in the contract. .get_account_info(block.header.number - 1, account_update.address) + .await .map_err(StoreError::from)? { Some(acc) => acc.nonce, diff --git a/crates/l2/sequencer/l1_watcher.rs b/crates/l2/sequencer/l1_watcher.rs index 65d84866ccc..014f0b85d79 100644 --- a/crates/l2/sequencer/l1_watcher.rs +++ b/crates/l2/sequencer/l1_watcher.rs @@ -277,6 +277,7 @@ impl L1Watcher { match blockchain .add_transaction_to_pool(Transaction::PrivilegedL2Transaction(mint_transaction)) + .await { Ok(hash) => { info!("Mint transaction added to mempool {hash:#x}",); diff --git a/crates/l2/sequencer/prover_server.rs b/crates/l2/sequencer/prover_server.rs index d18a13d3309..d0284301b37 100644 --- a/crates/l2/sequencer/prover_server.rs +++ b/crates/l2/sequencer/prover_server.rs @@ -397,7 +397,7 @@ impl ProverServer { ) -> Result<(), ProverServerError> { debug!("Request received"); - let latest_block_number = self.store.get_latest_block_number()?; + let latest_block_number = self.store.get_latest_block_number().await?; let response = if block_number > latest_block_number { let response = ProofData::response(None, None); @@ -408,7 +408,7 @@ impl ProverServer { debug!("Block: {block_number} has been submitted."); response } else { - let input = self.create_prover_input(block_number)?; + let input = self.create_prover_input(block_number).await?; let response = ProofData::response(Some(block_number), Some(input)); info!("Sent Response for block_number: {block_number}"); response @@ -439,14 +439,18 @@ impl ProverServer { Ok(()) } - fn create_prover_input(&self, block_number: u64) -> Result { + async fn create_prover_input( + &self, + block_number: u64, + ) -> Result { let header = self .store .get_block_header(block_number)? .ok_or(ProverServerError::StorageDataIsNone)?; let body = self .store - .get_block_body(block_number)? + .get_block_body(block_number) + .await? .ok_or(ProverServerError::StorageDataIsNone)?; let block = Block::new(header, body); diff --git a/crates/l2/utils/test_data_io.rs b/crates/l2/utils/test_data_io.rs index e4150056bf5..75aa8692869 100644 --- a/crates/l2/utils/test_data_io.rs +++ b/crates/l2/utils/test_data_io.rs @@ -33,11 +33,11 @@ pub fn read_genesis_file(genesis_file_path: &str) -> Genesis { /// Place this in the `proposer/mod.rs` file, /// specifically in the `start` function, /// before calling `send_commitment()` to send the block commitment. -pub fn generate_rlp( +pub async fn generate_rlp( up_to_block_number: u64, store: &Store, ) -> Result<(), Box> { - if store.get_latest_block_number()? == up_to_block_number { + if store.get_latest_block_number().await? == up_to_block_number { let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); let file_name = "l2-test.rlp"; @@ -45,7 +45,7 @@ pub fn generate_rlp( let mut file = std::fs::File::create(path.to_str().unwrap())?; for i in 1..up_to_block_number { - let body = store.get_block_body(i)?.unwrap(); + let body = store.get_block_body(i).await?.unwrap(); let header = store.get_block_header(i)?.unwrap(); let block = Block::new(header, body); diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index 52ec860f499..1b0b776b307 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -423,7 +423,7 @@ impl RLPxConnection { sender: mpsc::Sender, ) -> Result<(), RLPxError> { let peer_supports_eth = self.negotiated_eth_version != 0; - let is_synced = self.storage.is_synced()?; + let is_synced = self.storage.is_synced().await?; match message { Message::Disconnect(msg_data) => { log_peer_debug( @@ -440,11 +440,14 @@ impl RLPxConnection { Message::Pong(_) => { // We ignore received Pong messages } - Message::Status(msg_data) if !peer_supports_eth => backend::validate_status( - msg_data, - &self.storage, - self.negotiated_eth_version as u32, - )?, + Message::Status(msg_data) if !peer_supports_eth => { + backend::validate_status( + msg_data, + &self.storage, + self.negotiated_eth_version as u32, + ) + .await? + } Message::GetAccountRange(req) => { let response = process_account_range_request(req, self.storage.clone())?; self.send(Message::AccountRange(response)).await? @@ -454,7 +457,7 @@ impl RLPxConnection { if is_synced { let mut valid_txs = vec![]; for tx in &txs.transactions { - if let Err(e) = self.blockchain.add_transaction_to_pool(tx.clone()) { + if let Err(e) = self.blockchain.add_transaction_to_pool(tx.clone()).await { log_peer_warn(&self.node, &format!("Error adding transaction: {}", e)); continue; } @@ -466,26 +469,23 @@ impl RLPxConnection { Message::GetBlockHeaders(msg_data) if peer_supports_eth => { let response = BlockHeaders { id: msg_data.id, - block_headers: msg_data.fetch_headers(&self.storage), + block_headers: msg_data.fetch_headers(&self.storage).await, }; self.send(Message::BlockHeaders(response)).await?; } Message::GetBlockBodies(msg_data) if peer_supports_eth => { let response = BlockBodies { id: msg_data.id, - block_bodies: msg_data.fetch_blocks(&self.storage), + block_bodies: msg_data.fetch_blocks(&self.storage).await, }; self.send(Message::BlockBodies(response)).await?; } Message::GetReceipts(GetReceipts { id, block_hashes }) if peer_supports_eth => { - let receipts: Result<_, _> = block_hashes - .iter() - .map(|hash| self.storage.get_receipts_for_block(hash)) - .collect(); - let response = Receipts { - id, - receipts: receipts?, - }; + let mut receipts = Vec::new(); + for hash in block_hashes.iter() { + receipts.push(self.storage.get_receipts_for_block(hash)?); + } + let response = Receipts { id, receipts }; self.send(Message::Receipts(response)).await?; } Message::NewPooledTransactionHashes(new_pooled_transaction_hashes) @@ -564,7 +564,8 @@ impl RLPxConnection { .capabilities .contains(&(Capability::Eth, self.negotiated_eth_version)) { - let status = backend::get_status(&self.storage, self.negotiated_eth_version as u32)?; + let status = + backend::get_status(&self.storage, self.negotiated_eth_version as u32).await?; log_peer_debug(&self.node, "Sending status"); self.send(Message::Status(status)).await?; // The next immediate message in the ETH protocol is the @@ -581,7 +582,8 @@ impl RLPxConnection { msg_data, &self.storage, self.negotiated_eth_version as u32, - )? + ) + .await? } Message::Disconnect(disconnect) => { return Err(RLPxError::HandshakeError(format!( diff --git a/crates/networking/p2p/rlpx/eth/backend.rs b/crates/networking/p2p/rlpx/eth/backend.rs index 5e812fe7333..1ca1af8c032 100644 --- a/crates/networking/p2p/rlpx/eth/backend.rs +++ b/crates/networking/p2p/rlpx/eth/backend.rs @@ -5,7 +5,7 @@ use crate::rlpx::error::RLPxError; use super::status::StatusMessage; -pub fn get_status(storage: &Store, eth_version: u32) -> Result { +pub async fn get_status(storage: &Store, eth_version: u32) -> Result { let chain_config = storage.get_chain_config()?; let total_difficulty = U256::from(chain_config.terminal_total_difficulty.unwrap_or_default()); let network_id = chain_config.chain_id; @@ -14,7 +14,7 @@ pub fn get_status(storage: &Store, eth_version: u32) -> Result Result Vec { + pub async fn fetch_headers(&self, storage: &Store) -> Vec { let start_block = match self.startblock { // Check we have the given block hash and fetch its number HashOrNumber::Hash(block_hash) => { // TODO(#1073) // Research what we should do when an error is found in a P2P request. - if let Ok(Some(block_number)) = storage.get_block_number(block_hash) { + if let Ok(Some(block_number)) = storage.get_block_number(block_hash).await { block_number } else { error!("Could not fetch block number for hash {block_hash}"); @@ -221,10 +221,10 @@ impl GetBlockBodies { pub fn new(id: u64, block_hashes: Vec) -> Self { Self { block_hashes, id } } - pub fn fetch_blocks(&self, storage: &Store) -> Vec { + pub async fn fetch_blocks(&self, storage: &Store) -> Vec { let mut block_bodies = vec![]; for block_hash in &self.block_hashes { - match storage.get_block_body_by_hash(*block_hash) { + match storage.get_block_body_by_hash(*block_hash).await { Ok(Some(block)) => { block_bodies.push(block); if block_bodies.len() >= BLOCK_BODY_LIMIT { diff --git a/crates/networking/p2p/rlpx/eth/transactions.rs b/crates/networking/p2p/rlpx/eth/transactions.rs index 691236cd879..002b2cb832c 100644 --- a/crates/networking/p2p/rlpx/eth/transactions.rs +++ b/crates/networking/p2p/rlpx/eth/transactions.rs @@ -275,7 +275,10 @@ impl PooledTransactions { pub async fn handle(self, node: &Node, blockchain: &Blockchain) -> Result<(), MempoolError> { for tx in self.pooled_transactions { if let P2PTransaction::EIP4844TransactionWithBlobs(itx) = tx { - if let Err(e) = blockchain.add_blob_transaction_to_pool(itx.tx, itx.blobs_bundle) { + if let Err(e) = blockchain + .add_blob_transaction_to_pool(itx.tx, itx.blobs_bundle) + .await + { log_peer_warn(node, &format!("Error adding transaction: {}", e)); continue; } @@ -283,7 +286,7 @@ impl PooledTransactions { let regular_tx = tx .try_into() .map_err(|error| MempoolError::StoreError(StoreError::Custom(error)))?; - if let Err(e) = blockchain.add_transaction_to_pool(regular_tx) { + if let Err(e) = blockchain.add_transaction_to_pool(regular_tx).await { log_peer_warn(node, &format!("Error adding transaction: {}", e)); continue; } diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index db03501a59c..7273d7e5de1 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -173,13 +173,13 @@ impl Syncer { // This applies only to snap sync—full sync always starts fetching headers // from the canonical block, which updates as new block headers are fetched. if matches!(sync_mode, SyncMode::Snap) { - if let Some(last_header) = store.get_header_download_checkpoint()? { + if let Some(last_header) = store.get_header_download_checkpoint().await? { // Set latest downloaded header as current head for header fetching current_head = last_header; } } - let pending_block = match store.get_pending_block(sync_head) { + let pending_block = match store.get_pending_block(sync_head).await { Ok(res) => res, Err(e) => return Err(e.into()), }; @@ -263,7 +263,7 @@ impl Syncer { // If the sync head is less than 64 blocks away from our current head switch to full-sync if sync_mode == SyncMode::Snap { - let latest_block_number = store.get_latest_block_number()?; + let latest_block_number = store.get_latest_block_number().await?; if last_block_header.number.saturating_sub(latest_block_number) < MIN_FULL_BLOCKS as u64 { @@ -337,7 +337,8 @@ impl Syncer { // For all blocks after the pivot: Process them fully for hash in &all_block_hashes[pivot_idx + 1..] { let block = store - .get_block_by_hash(*hash)? + .get_block_by_hash(*hash) + .await? .ok_or(SyncError::CorruptDB)?; let block_number = block.header.number; self.blockchain.add_block(&block).await?; @@ -607,7 +608,7 @@ impl Syncer { )); // Perform state sync if it was not already completed on a previous cycle // Retrieve storage data to check which snap sync phase we are in - let key_checkpoints = store.get_state_trie_key_checkpoint()?; + let key_checkpoints = store.get_state_trie_key_checkpoint().await?; // If we have no key checkpoints or if the key checkpoints are lower than the segment boundaries we are in state sync phase if key_checkpoints.is_none() || key_checkpoints.is_some_and(|ch| { diff --git a/crates/networking/p2p/sync/state_healing.rs b/crates/networking/p2p/sync/state_healing.rs index 8828ba94ac6..1fa11923356 100644 --- a/crates/networking/p2p/sync/state_healing.rs +++ b/crates/networking/p2p/sync/state_healing.rs @@ -37,7 +37,7 @@ pub(crate) async fn heal_state_trie( store: Store, peers: PeerHandler, ) -> Result { - let mut paths = store.get_state_heal_paths()?.unwrap_or_default(); + let mut paths = store.get_state_heal_paths().await?.unwrap_or_default(); // Spawn a bytecode fetcher for this block let (bytecode_sender, bytecode_receiver) = channel::>(MAX_CHANNEL_MESSAGES); let bytecode_fetcher_handle = tokio::spawn(bytecode_fetcher( diff --git a/crates/networking/p2p/sync/trie_rebuild.rs b/crates/networking/p2p/sync/trie_rebuild.rs index f960b091c23..7ae1dcfffba 100644 --- a/crates/networking/p2p/sync/trie_rebuild.rs +++ b/crates/networking/p2p/sync/trie_rebuild.rs @@ -96,7 +96,7 @@ async fn rebuild_state_trie_in_backgound( cancel_token: CancellationToken, ) -> Result<(), SyncError> { // Get initial status from checkpoint if available (aka node restart) - let checkpoint = store.get_state_trie_rebuild_checkpoint()?; + let checkpoint = store.get_state_trie_rebuild_checkpoint().await?; let mut rebuild_status = array::from_fn(|i| SegmentStatus { current: checkpoint .map(|(_, ch)| ch[i]) @@ -180,7 +180,8 @@ async fn rebuild_state_trie_segment( // Return if we have no more snapshot accounts to process for this segemnt if unfilled_batch { let state_sync_complete = store - .get_state_trie_key_checkpoint()? + .get_state_trie_key_checkpoint() + .await? .is_some_and(|ch| ch[segment_number] == STATE_TRIE_SEGMENTS_END[segment_number]); // Mark segment as finished if state sync is complete if state_sync_complete { @@ -202,7 +203,8 @@ async fn rebuild_storage_trie_in_background( ) -> Result<(), SyncError> { // (AccountHash, ExpectedRoot) let mut pending_storages = store - .get_storage_trie_rebuild_pending()? + .get_storage_trie_rebuild_pending() + .await? .unwrap_or_default(); let mut incoming = true; while incoming || !pending_storages.is_empty() { @@ -252,7 +254,7 @@ async fn rebuild_storage_trie( let mut start = H256::zero(); let mut storage_trie = store.open_storage_trie(account_hash, *EMPTY_TRIE_HASH); loop { - let batch = store.read_storage_snapshot(account_hash, start)?; + let batch = store.read_storage_snapshot(account_hash, start).await?; let unfilled_batch = batch.len() < MAX_SNAPSHOT_READS; // Update start if let Some(last) = batch.last() { diff --git a/crates/networking/p2p/sync_manager.rs b/crates/networking/p2p/sync_manager.rs index 5a641d9e029..b2c2409fed5 100644 --- a/crates/networking/p2p/sync_manager.rs +++ b/crates/networking/p2p/sync_manager.rs @@ -35,7 +35,7 @@ pub struct SyncManager { } impl SyncManager { - pub fn new( + pub async fn new( peer_table: Arc>, sync_mode: SyncMode, cancel_token: CancellationToken, @@ -59,9 +59,10 @@ impl SyncManager { // Otherwise we will incorreclty assume the node is already synced and work on invalid state if store .get_header_download_checkpoint() + .await .is_ok_and(|res| res.is_some()) { - sync_manager.start_sync(); + sync_manager.start_sync().await; } sync_manager } @@ -99,10 +100,10 @@ impl SyncManager { /// Attempts to sync to the last received fcu head /// Will do nothing if the syncer is already involved in a sync process /// If the sync process would require multiple sync cycles (such as snap sync), starts all required sync cycles until the sync is complete - pub fn start_sync(&self) { + pub async fn start_sync(&self) { let syncer = self.syncer.clone(); let store = self.store.clone(); - let Ok(Some(current_head)) = self.store.get_latest_canonical_block_hash() else { + let Ok(Some(current_head)) = self.store.get_latest_canonical_block_hash().await else { tracing::error!("Failed to fecth latest canonical block, unable to sync"); return; }; @@ -134,6 +135,7 @@ impl SyncManager { // Continue to the next sync cycle if we have an ongoing snap sync (aka if we still have snap sync checkpoints stored) if store .get_header_download_checkpoint() + .await .ok() .flatten() .is_none() diff --git a/crates/networking/rpc/engine/fork_choice.rs b/crates/networking/rpc/engine/fork_choice.rs index a1f4f8578be..fed5bd907ab 100644 --- a/crates/networking/rpc/engine/fork_choice.rs +++ b/crates/networking/rpc/engine/fork_choice.rs @@ -216,7 +216,8 @@ async fn handle_forkchoice( let fork_choice_res = if let Some(latest_valid_hash) = context .storage - .get_latest_valid_ancestor(fork_choice_state.head_block_hash)? + .get_latest_valid_ancestor(fork_choice_state.head_block_hash) + .await? { warn!( "Invalid fork choice state. Reason: Invalid ancestor {:#x}", @@ -225,19 +226,23 @@ async fn handle_forkchoice( Err(InvalidForkChoice::InvalidAncestor(latest_valid_hash)) } else { // Check parent block hash in invalid_ancestors (if head block exists) - let check_parent = context + let head_block = context .storage - .get_block_header_by_hash(fork_choice_state.head_block_hash)? - .and_then(|head_block| { - debug!( - "Checking parent for invalid ancestor {}", - head_block.parent_hash - ); - context - .storage - .get_latest_valid_ancestor(head_block.parent_hash) - .ok()? - }); + .get_block_header_by_hash(fork_choice_state.head_block_hash)?; + let check_parent = if let Some(head_block) = head_block { + debug!( + "Checking parent for invalid ancestor {}", + head_block.parent_hash + ); + context + .storage + .get_latest_valid_ancestor(head_block.parent_hash) + .await + .ok() + .flatten() + } else { + None + }; // Check head block hash in invalid_ancestors if let Some(latest_valid_hash) = check_parent { @@ -266,7 +271,11 @@ async fn handle_forkchoice( Ok(head) => { // Remove included transactions from the mempool after we accept the fork choice // TODO(#797): The remove of transactions from the mempool could be incomplete (i.e. REORGS) - match context.storage.get_block_by_hash(head.compute_block_hash()) { + match context + .storage + .get_block_by_hash(head.compute_block_hash()) + .await + { Ok(Some(block)) => { for tx in &block.body.transactions { context @@ -297,7 +306,9 @@ async fn handle_forkchoice( let forkchoice_response = match forkchoice_error { InvalidForkChoice::NewHeadAlreadyCanonical => { ForkChoiceResponse::from(PayloadStatus::valid_with_hash( - latest_canonical_block_hash(&context.storage).unwrap(), + latest_canonical_block_hash(&context.storage) + .await + .map_err(|e| RpcErr::Internal(e.to_string()))?, )) } InvalidForkChoice::Syncing => { @@ -307,7 +318,7 @@ async fn handle_forkchoice( .update_sync_status(false) .await .map_err(|e| RpcErr::Internal(e.to_string()))?; - context.syncer.start_sync(); + context.syncer.start_sync().await; ForkChoiceResponse::from(PayloadStatus::syncing()) } InvalidForkChoice::Disconnected(_, _) | InvalidForkChoice::ElementNotFound(_) => { @@ -325,10 +336,13 @@ async fn handle_forkchoice( "Invalid fork choice payload. Reason: {}", reason.to_string() ); - let latest_valid_hash = - context.storage.get_latest_canonical_block_hash()?.ok_or( - RpcErr::Internal("Missing latest canonical block".to_owned()), - )?; + let latest_valid_hash = context + .storage + .get_latest_canonical_block_hash() + .await? + .ok_or(RpcErr::Internal( + "Missing latest canonical block".to_owned(), + ))?; ForkChoiceResponse::from(PayloadStatus::invalid_with( latest_valid_hash, reason.to_string(), diff --git a/crates/networking/rpc/engine/payload.rs b/crates/networking/rpc/engine/payload.rs index af04f7f0121..b1545d70eea 100644 --- a/crates/networking/rpc/engine/payload.rs +++ b/crates/networking/rpc/engine/payload.rs @@ -225,7 +225,7 @@ impl RpcHandler for GetPayloadV1Request { } async fn handle(&self, context: RpcApiContext) -> Result { - let payload = get_payload(self.payload_id, &context)?; + let payload = get_payload(self.payload_id, &context).await?; // NOTE: This validation is actually not required to run Hive tests. Not sure if it's // necessary validate_payload_v1_v2(&payload.block, &context)?; @@ -248,7 +248,7 @@ impl RpcHandler for GetPayloadV2Request { } async fn handle(&self, context: RpcApiContext) -> Result { - let payload = get_payload(self.payload_id, &context)?; + let payload = get_payload(self.payload_id, &context).await?; validate_payload_v1_v2(&payload.block, &context)?; let payload_bundle = build_payload_if_necessary(self.payload_id, payload, context).await?; @@ -320,7 +320,7 @@ impl RpcHandler for GetPayloadV3Request { } async fn handle(&self, context: RpcApiContext) -> Result { - let payload = get_payload(self.payload_id, &context)?; + let payload = get_payload(self.payload_id, &context).await?; validate_fork(&payload.block, Fork::Cancun, &context)?; let payload_bundle = build_payload_if_necessary(self.payload_id, payload, context).await?; @@ -357,7 +357,7 @@ impl RpcHandler for GetPayloadV4Request { } async fn handle(&self, context: RpcApiContext) -> Result { - let payload = get_payload(self.payload_id, &context)?; + let payload = get_payload(self.payload_id, &context).await?; let chain_config = &context.storage.get_chain_config()?; if !chain_config.is_prague_activated(payload.block.header.timestamp) { @@ -409,11 +409,10 @@ impl RpcHandler for GetPayloadBodiesByHashV1Request { if self.hashes.len() >= GET_PAYLOAD_BODIES_REQUEST_MAX_SIZE { return Err(RpcErr::TooLargeRequest); } - let bodies = self - .hashes - .iter() - .map(|hash| context.storage.get_block_body_by_hash(*hash)) - .collect::>, _>>()?; + let mut bodies = Vec::new(); + for hash in self.hashes.iter() { + bodies.push(context.storage.get_block_body_by_hash(*hash).await?) + } build_payload_body_response(bodies) } } @@ -446,12 +445,10 @@ impl RpcHandler for GetPayloadBodiesByRangeV1Request { if self.count as usize >= GET_PAYLOAD_BODIES_REQUEST_MAX_SIZE { return Err(RpcErr::TooLargeRequest); } - let latest_block_number = context.storage.get_latest_block_number()?; + let latest_block_number = context.storage.get_latest_block_number().await?; let last = latest_block_number.min(self.start + self.count - 1); - let bodies = (self.start..=last) - .map(|block_num| context.storage.get_block_body(block_num)) - .collect::>, _>>()?; - build_payload_body_response(bodies) + let bodies = context.storage.get_block_bodies(self.start, last).await?; + build_payload_body_response(bodies.into_iter().map(Some).collect()) } } @@ -529,12 +526,16 @@ fn validate_payload_v1_v2(block: &Block, context: &RpcApiContext) -> Result<(), } // This function is used to make sure neither the current block nor its parent have been invalidated -fn validate_ancestors( +async fn validate_ancestors( block: &Block, context: &RpcApiContext, ) -> Result, RpcErr> { // Check if the block has already been invalidated - if let Some(latest_valid_hash) = context.storage.get_latest_valid_ancestor(block.hash())? { + if let Some(latest_valid_hash) = context + .storage + .get_latest_valid_ancestor(block.hash()) + .await? + { return Ok(Some(PayloadStatus::invalid_with( latest_valid_hash, "Header has been previously invalidated.".into(), @@ -544,7 +545,8 @@ fn validate_ancestors( // Check if the parent block has already been invalidated if let Some(latest_valid_hash) = context .storage - .get_latest_valid_ancestor(block.header.parent_hash)? + .get_latest_valid_ancestor(block.header.parent_hash) + .await? { return Ok(Some(PayloadStatus::invalid_with( latest_valid_hash, @@ -578,7 +580,7 @@ async fn handle_new_payload_v1_v2( } // Check for invalid ancestors - if let Some(status) = validate_ancestors(&block, &context)? { + if let Some(status) = validate_ancestors(&block, &context).await? { return serde_json::to_value(status).map_err(|error| RpcErr::Internal(error.to_string())); } @@ -606,7 +608,7 @@ async fn handle_new_payload_v3( } // Check for invalid ancestors - if let Some(status) = validate_ancestors(&block, &context)? { + if let Some(status) = validate_ancestors(&block, &context).await? { return Ok(status); } @@ -675,19 +677,19 @@ async fn execute_payload(block: &Block, context: &RpcApiContext) -> Result Ok(PayloadStatus::syncing()), @@ -760,9 +762,9 @@ fn parse_get_payload_request(params: &Option>) -> Result Ok(payload_id) } -fn get_payload(payload_id: u64, context: &RpcApiContext) -> Result { +async fn get_payload(payload_id: u64, context: &RpcApiContext) -> Result { info!("Requested payload with id: {:#018x}", payload_id); - let Some(payload) = context.storage.get_payload(payload_id)? else { + let Some(payload) = context.storage.get_payload(payload_id).await? else { return Err(RpcErr::UnknownPayload(format!( "Payload with id {:#018x} not found", payload_id diff --git a/crates/networking/rpc/eth/account.rs b/crates/networking/rpc/eth/account.rs index 2ac08cd0ef3..fa2e47ba246 100644 --- a/crates/networking/rpc/eth/account.rs +++ b/crates/networking/rpc/eth/account.rs @@ -53,7 +53,7 @@ impl RpcHandler for GetBalanceRequest { self.address, self.block ); - let Some(block_number) = self.block.resolve_block_number(&context.storage)? else { + let Some(block_number) = self.block.resolve_block_number(&context.storage).await? else { return Err(RpcErr::Internal( "Could not resolve block number".to_owned(), )); // Should we return Null here? @@ -61,7 +61,8 @@ impl RpcHandler for GetBalanceRequest { let account = context .storage - .get_account_info(block_number, self.address)?; + .get_account_info(block_number, self.address) + .await?; let balance = account.map(|acc| acc.balance).unwrap_or_default(); serde_json::to_value(format!("{:#x}", balance)) @@ -88,7 +89,7 @@ impl RpcHandler for GetCodeRequest { self.address, self.block ); - let Some(block_number) = self.block.resolve_block_number(&context.storage)? else { + let Some(block_number) = self.block.resolve_block_number(&context.storage).await? else { return Err(RpcErr::Internal( "Could not resolve block number".to_owned(), )); // Should we return Null here? @@ -96,7 +97,8 @@ impl RpcHandler for GetCodeRequest { let code = context .storage - .get_code_by_account_address(block_number, self.address)? + .get_code_by_account_address(block_number, self.address) + .await? .unwrap_or_default(); serde_json::to_value(format!("0x{:x}", code)) @@ -124,7 +126,7 @@ impl RpcHandler for GetStorageAtRequest { self.storage_slot, self.address, self.block ); - let Some(block_number) = self.block.resolve_block_number(&context.storage)? else { + let Some(block_number) = self.block.resolve_block_number(&context.storage).await? else { return Err(RpcErr::Internal( "Could not resolve block number".to_owned(), )); // Should we return Null here? @@ -132,7 +134,8 @@ impl RpcHandler for GetStorageAtRequest { let storage_value = context .storage - .get_storage_at(block_number, self.address, self.storage_slot)? + .get_storage_at(block_number, self.address, self.storage_slot) + .await? .unwrap_or_default(); let storage_value = H256::from_uint(&storage_value); serde_json::to_value(format!("{:#x}", storage_value)) @@ -169,14 +172,16 @@ impl RpcHandler for GetTransactionCountRequest { let nonce = match pending_nonce { Some(nonce) => nonce, None => { - let Some(block_number) = self.block.resolve_block_number(&context.storage)? else { + let Some(block_number) = self.block.resolve_block_number(&context.storage).await? + else { return serde_json::to_value("0x0") .map_err(|error| RpcErr::Internal(error.to_string())); }; context .storage - .get_nonce_by_account_address(block_number, self.address)? + .get_nonce_by_account_address(block_number, self.address) + .await? .unwrap_or_default() } }; @@ -209,21 +214,28 @@ impl RpcHandler for GetProofRequest { "Requested proof for account {} at block {} with storage keys: {:?}", self.address, self.block, self.storage_keys ); - let Some(block_number) = self.block.resolve_block_number(storage)? else { + let Some(block_number) = self.block.resolve_block_number(storage).await? else { return Ok(Value::Null); }; // Create account proof - let Some(account) = storage.get_account_state(block_number, self.address)? else { + let Some(account) = storage + .get_account_state(block_number, self.address) + .await? + else { return Ok(Value::Null); }; - let Some(account_proof) = storage.get_account_proof(block_number, &self.address)? else { + let Some(account_proof) = storage + .get_account_proof(block_number, &self.address) + .await? + else { return Err(RpcErr::Internal("Could not get account proof".to_owned())); }; // Create storage proofs for all provided storage keys let mut storage_proofs = Vec::new(); for storage_key in self.storage_keys.iter() { let value = storage - .get_storage_at(block_number, self.address, *storage_key)? + .get_storage_at(block_number, self.address, *storage_key) + .await? .unwrap_or_default(); let proof = storage.get_storage_proof(self.address, account.storage_root, storage_key)?; diff --git a/crates/networking/rpc/eth/block.rs b/crates/networking/rpc/eth/block.rs index 109a4e8da7c..307cb3f720a 100644 --- a/crates/networking/rpc/eth/block.rs +++ b/crates/networking/rpc/eth/block.rs @@ -67,12 +67,12 @@ impl RpcHandler for GetBlockByNumberRequest { async fn handle(&self, context: RpcApiContext) -> Result { let storage = &context.storage; info!("Requested block with number: {}", self.block); - let block_number = match self.block.resolve_block_number(storage)? { + let block_number = match self.block.resolve_block_number(storage).await? { Some(block_number) => block_number, _ => return Ok(Value::Null), }; let header = storage.get_block_header(block_number)?; - let body = storage.get_block_body(block_number)?; + let body = storage.get_block_body(block_number).await?; let (header, body) = match (header, body) { (Some(header), Some(body)) => (header, body), // Block not found @@ -101,12 +101,12 @@ impl RpcHandler for GetBlockByHashRequest { async fn handle(&self, context: RpcApiContext) -> Result { let storage = &context.storage; info!("Requested block with hash: {:#x}", self.block); - let block_number = match storage.get_block_number(self.block)? { + let block_number = match storage.get_block_number(self.block).await? { Some(number) => number, _ => return Ok(Value::Null), }; let header = storage.get_block_header(block_number)?; - let body = storage.get_block_body(block_number)?; + let body = storage.get_block_body(block_number).await?; let (header, body) = match (header, body) { (Some(header), Some(body)) => (header, body), // Block not found @@ -136,11 +136,11 @@ impl RpcHandler for GetBlockTransactionCountRequest { "Requested transaction count for block with number: {}", self.block ); - let block_number = match self.block.resolve_block_number(&context.storage)? { + let block_number = match self.block.resolve_block_number(&context.storage).await? { Some(block_number) => block_number, _ => return Ok(Value::Null), }; - let block_body = match context.storage.get_block_body(block_number)? { + let block_body = match context.storage.get_block_body(block_number).await? { Some(block_body) => block_body, _ => return Ok(Value::Null), }; @@ -167,18 +167,18 @@ impl RpcHandler for GetBlockReceiptsRequest { async fn handle(&self, context: RpcApiContext) -> Result { let storage = &context.storage; info!("Requested receipts for block with number: {}", self.block); - let block_number = match self.block.resolve_block_number(storage)? { + let block_number = match self.block.resolve_block_number(storage).await? { Some(block_number) => block_number, _ => return Ok(Value::Null), }; let header = storage.get_block_header(block_number)?; - let body = storage.get_block_body(block_number)?; + let body = storage.get_block_body(block_number).await?; let (header, body) = match (header, body) { (Some(header), Some(body)) => (header, body), // Block not found _ => return Ok(Value::Null), }; - let receipts = get_all_block_rpc_receipts(block_number, header, body, storage)?; + let receipts = get_all_block_rpc_receipts(block_number, header, body, storage).await?; serde_json::to_value(&receipts).map_err(|error| RpcErr::Internal(error.to_string())) } @@ -202,7 +202,7 @@ impl RpcHandler for GetRawHeaderRequest { "Requested raw header for block with identifier: {}", self.block ); - let block_number = match self.block.resolve_block_number(&context.storage)? { + let block_number = match self.block.resolve_block_number(&context.storage).await? { Some(block_number) => block_number, _ => return Ok(Value::Null), }; @@ -232,12 +232,12 @@ impl RpcHandler for GetRawBlockRequest { async fn handle(&self, context: RpcApiContext) -> Result { info!("Requested raw block: {}", self.block); - let block_number = match self.block.resolve_block_number(&context.storage)? { + let block_number = match self.block.resolve_block_number(&context.storage).await? { Some(block_number) => block_number, _ => return Ok(Value::Null), }; let header = context.storage.get_block_header(block_number)?; - let body = context.storage.get_block_body(block_number)?; + let body = context.storage.get_block_body(block_number).await?; let (header, body) = match (header, body) { (Some(header), Some(body)) => (header, body), _ => return Ok(Value::Null), @@ -265,17 +265,18 @@ impl RpcHandler for GetRawReceipts { async fn handle(&self, context: RpcApiContext) -> Result { let storage = &context.storage; - let block_number = match self.block.resolve_block_number(storage)? { + let block_number = match self.block.resolve_block_number(storage).await? { Some(block_number) => block_number, _ => return Ok(Value::Null), }; let header = storage.get_block_header(block_number)?; - let body = storage.get_block_body(block_number)?; + let body = storage.get_block_body(block_number).await?; let (header, body) = match (header, body) { (Some(header), Some(body)) => (header, body), _ => return Ok(Value::Null), }; - let receipts: Vec = get_all_block_receipts(block_number, header, body, storage)? + let receipts: Vec = get_all_block_receipts(block_number, header, body, storage) + .await? .iter() .map(|receipt| format!("0x{}", hex::encode(receipt.encode_inner()))) .collect(); @@ -290,8 +291,11 @@ impl RpcHandler for BlockNumberRequest { async fn handle(&self, context: RpcApiContext) -> Result { info!("Requested latest block number"); - serde_json::to_value(format!("{:#x}", context.storage.get_latest_block_number()?)) - .map_err(|error| RpcErr::Internal(error.to_string())) + serde_json::to_value(format!( + "{:#x}", + context.storage.get_latest_block_number().await? + )) + .map_err(|error| RpcErr::Internal(error.to_string())) } } @@ -302,7 +306,7 @@ impl RpcHandler for GetBlobBaseFee { async fn handle(&self, context: RpcApiContext) -> Result { info!("Requested blob gas price"); - let block_number = context.storage.get_latest_block_number()?; + let block_number = context.storage.get_latest_block_number().await?; let header = match context.storage.get_block_header(block_number)? { Some(header) => header, _ => return Err(RpcErr::Internal("Could not get block header".to_owned())), @@ -326,7 +330,7 @@ impl RpcHandler for GetBlobBaseFee { } } -pub fn get_all_block_rpc_receipts( +pub async fn get_all_block_rpc_receipts( block_number: BlockNumber, header: BlockHeader, body: BlockBody, @@ -355,7 +359,7 @@ pub fn get_all_block_rpc_receipts( let mut current_log_index = 0; for (index, tx) in body.transactions.iter().enumerate() { let index = index as u64; - let receipt = match storage.get_receipt(block_number, index)? { + let receipt = match storage.get_receipt(block_number, index).await? { Some(receipt) => receipt, _ => return Err(RpcErr::Internal("Could not get receipt".to_owned())), }; @@ -375,7 +379,7 @@ pub fn get_all_block_rpc_receipts( Ok(receipts) } -pub fn get_all_block_receipts( +pub async fn get_all_block_receipts( block_number: BlockNumber, header: BlockHeader, body: BlockBody, @@ -388,7 +392,7 @@ pub fn get_all_block_receipts( } for (index, _) in body.transactions.iter().enumerate() { let index = index as u64; - let receipt = match storage.get_receipt(block_number, index)? { + let receipt = match storage.get_receipt(block_number, index).await? { Some(receipt) => receipt, _ => return Err(RpcErr::Internal("Could not get receipt".to_owned())), }; diff --git a/crates/networking/rpc/eth/client.rs b/crates/networking/rpc/eth/client.rs index 615c43918ed..f31837b55e5 100644 --- a/crates/networking/rpc/eth/client.rs +++ b/crates/networking/rpc/eth/client.rs @@ -30,7 +30,7 @@ impl RpcHandler for Syncing { } async fn handle(&self, context: RpcApiContext) -> Result { - let is_synced = context.storage.is_synced()?; + let is_synced = context.storage.is_synced().await?; Ok(Value::Bool(!is_synced)) } } diff --git a/crates/networking/rpc/eth/fee_calculator.rs b/crates/networking/rpc/eth/fee_calculator.rs index 54e07c12ced..6b5d113fa82 100644 --- a/crates/networking/rpc/eth/fee_calculator.rs +++ b/crates/networking/rpc/eth/fee_calculator.rs @@ -27,8 +27,8 @@ const BLOCK_RANGE_LOWER_BOUND_DEC: u64 = 20; // we can look into more sophisticated estimation methods, if needed. /// Estimate Gas Price based on already accepted transactions, /// as per the spec, this will be returned in wei. -pub fn estimate_gas_tip(storage: &Store) -> Result, RpcErr> { - let latest_block_number = storage.get_latest_block_number()?; +pub async fn estimate_gas_tip(storage: &Store) -> Result, RpcErr> { + let latest_block_number = storage.get_latest_block_number().await?; let block_range_lower_bound = latest_block_number.saturating_sub(BLOCK_RANGE_LOWER_BOUND_DEC); // These are the blocks we'll use to estimate the price. let block_range = block_range_lower_bound..=latest_block_number; @@ -46,7 +46,7 @@ pub fn estimate_gas_tip(storage: &Store) -> Result, RpcErr> { // caching this result, also we can have a specific DB method // that returns a block range to not query them one-by-one. for block_num in block_range { - let Some(block_body) = storage.get_block_body(block_num)? else { + let Some(block_body) = storage.get_block_body(block_num).await? else { error!("Block body for block number {block_num} is missing but is below the latest known block!"); return Err(RpcErr::Internal( "Error calculating gas price: missing data".to_string(), @@ -91,7 +91,7 @@ mod tests { async fn test_for_legacy_txs() { let storage = setup_store().await; add_legacy_tx_blocks(&storage, 20, 10).await; - let gas_tip = estimate_gas_tip(&storage).unwrap().unwrap(); + let gas_tip = estimate_gas_tip(&storage).await.unwrap().unwrap(); assert_eq!(gas_tip, BASE_PRICE_IN_WEI); } @@ -99,7 +99,7 @@ mod tests { async fn test_for_eip1559_txs() { let storage = setup_store().await; add_eip1559_tx_blocks(&storage, 20, 10).await; - let gas_tip = estimate_gas_tip(&storage).unwrap().unwrap(); + let gas_tip = estimate_gas_tip(&storage).await.unwrap().unwrap(); assert_eq!(gas_tip, BASE_PRICE_IN_WEI); } @@ -107,14 +107,14 @@ mod tests { async fn test_for_mixed_txs() { let storage = setup_store().await; add_mixed_tx_blocks(&storage, 20, 10).await; - let gas_tip = estimate_gas_tip(&storage).unwrap().unwrap(); + let gas_tip = estimate_gas_tip(&storage).await.unwrap().unwrap(); assert_eq!(gas_tip, BASE_PRICE_IN_WEI); } #[tokio::test] async fn test_for_empty_blocks() { let storage = setup_store().await; - let gas_tip = estimate_gas_tip(&storage).unwrap(); + let gas_tip = estimate_gas_tip(&storage).await.unwrap(); assert_eq!(gas_tip, None); } } diff --git a/crates/networking/rpc/eth/fee_market.rs b/crates/networking/rpc/eth/fee_market.rs index 0443d5c61c9..c27e3f74cd1 100644 --- a/crates/networking/rpc/eth/fee_market.rs +++ b/crates/networking/rpc/eth/fee_market.rs @@ -97,7 +97,8 @@ impl RpcHandler for FeeHistoryRequest { .map_err(|error| RpcErr::Internal(error.to_string())); } - let (start_block, end_block) = get_range(storage, self.block_count, &self.newest_block)?; + let (start_block, end_block) = + get_range(storage, self.block_count, &self.newest_block).await?; let oldest_block = start_block; let block_count = (end_block - start_block + 1) as usize; let mut base_fee_per_gas = vec![0_u64; block_count + 1]; @@ -114,7 +115,8 @@ impl RpcHandler for FeeHistoryRequest { "Could not get header for block {block_number}" )))?; let body = storage - .get_block_body(block_number)? + .get_block_body(block_number) + .await? .ok_or(RpcErr::Internal(format!( "Could not get body for block {block_number}" )))?; @@ -212,7 +214,7 @@ fn project_next_block_base_fee_values( (base_fee_per_gas, base_fee_per_blob) } -fn get_range( +async fn get_range( storage: &Store, block_count: u64, expected_finish_block: &BlockIdentifier, @@ -220,16 +222,16 @@ fn get_range( // NOTE: The amount of blocks to retrieve is capped by MAX_BLOCK_COUNT // Get earliest block - let earliest_block_num = storage.get_earliest_block_number()?; + let earliest_block_num = storage.get_earliest_block_number().await?; // Get latest block - let latest_block_num = storage.get_latest_block_number()?; + let latest_block_num = storage.get_latest_block_number().await?; // Get the expected finish block number from the parameter - let expected_finish_block_num = - expected_finish_block - .resolve_block_number(storage)? - .ok_or(RpcErr::Internal( - "Could not resolve block number".to_owned(), - ))?; + let expected_finish_block_num = expected_finish_block + .resolve_block_number(storage) + .await? + .ok_or(RpcErr::Internal( + "Could not resolve block number".to_owned(), + ))?; // Calculate start and finish block numbers, considering finish block inclusion let finish_block_num = expected_finish_block_num.min(latest_block_num); let expected_start_block_num = (finish_block_num + 1).saturating_sub(block_count); diff --git a/crates/networking/rpc/eth/filter.rs b/crates/networking/rpc/eth/filter.rs index 026dddadf4d..842fd322f1a 100644 --- a/crates/networking/rpc/eth/filter.rs +++ b/crates/networking/rpc/eth/filter.rs @@ -64,7 +64,7 @@ impl NewFilterRequest { }) } - pub fn handle( + pub async fn handle( &self, storage: ethrex_storage::Store, filters: ActiveFilters, @@ -72,19 +72,21 @@ impl NewFilterRequest { let from = self .request_data .from_block - .resolve_block_number(&storage)? + .resolve_block_number(&storage) + .await? .ok_or(RpcErr::WrongParam("fromBlock".to_string()))?; let to = self .request_data .to_block - .resolve_block_number(&storage)? + .resolve_block_number(&storage) + .await? .ok_or(RpcErr::WrongParam("toBlock".to_string()))?; if (from..=to).is_empty() { return Err(RpcErr::BadParams("Invalid block range".to_string())); } - let last_block_number = storage.get_latest_block_number()?; + let last_block_number = storage.get_latest_block_number().await?; let id: u64 = rand::random(); let timestamp = Instant::now(); let mut active_filters_guard = filters.lock().unwrap_or_else(|mut poisoned_guard| { @@ -107,13 +109,13 @@ impl NewFilterRequest { Ok(as_hex) } - pub fn stateful_call( + pub async fn stateful_call( req: &RpcRequest, storage: Store, state: ActiveFilters, ) -> Result { let request = Self::parse(&req.params)?; - request.handle(storage, state) + request.handle(storage, state).await } } @@ -179,18 +181,21 @@ impl FilterChangesRequest { None => Err(RpcErr::MissingParam("0".to_string())), } } - pub fn handle( + pub async fn handle( &self, storage: ethrex_storage::Store, filters: ActiveFilters, ) -> Result { - let latest_block_num = storage.get_latest_block_number()?; - let mut active_filters_guard = filters.lock().unwrap_or_else(|mut poisoned_guard| { - error!("THREAD CRASHED WITH MUTEX TAKEN; SYSTEM MIGHT BE UNSTABLE"); - **poisoned_guard.get_mut() = HashMap::new(); - filters.clear_poison(); - poisoned_guard.into_inner() - }); + let latest_block_num = storage.get_latest_block_number().await?; + // Box needed to keep the future Sync + // https://github.com/rust-lang/rust/issues/128095 + let mut active_filters_guard = + Box::new(filters.lock().unwrap_or_else(|mut poisoned_guard| { + error!("THREAD CRASHED WITH MUTEX TAKEN; SYSTEM MIGHT BE UNSTABLE"); + **poisoned_guard.get_mut() = HashMap::new(); + filters.clear_poison(); + poisoned_guard.into_inner() + })); if let Some((timestamp, filter)) = active_filters_guard.get_mut(&self.id) { // We'll only get changes for a filter that either has a block // range for upcoming blocks, or for the 'latest' tag. @@ -216,7 +221,7 @@ impl FilterChangesRequest { // Drop the lock early to process this filter's query // and not keep the lock more than we should. drop(active_filters_guard); - let logs = fetch_logs_with_filter(&filter.filter_data, storage)?; + let logs = fetch_logs_with_filter(&filter.filter_data, storage).await?; serde_json::to_value(logs).map_err(|error| { tracing::error!("Log filtering request failed with: {error}"); RpcErr::Internal("Failed to filter logs".to_string()) @@ -233,13 +238,13 @@ impl FilterChangesRequest { )) } } - pub fn stateful_call( + pub async fn stateful_call( req: &RpcRequest, storage: ethrex_storage::Store, filters: ActiveFilters, ) -> Result { let request = Self::parse(&req.params)?; - request.handle(storage, filters) + request.handle(storage, filters).await } } diff --git a/crates/networking/rpc/eth/gas_price.rs b/crates/networking/rpc/eth/gas_price.rs index e576c7ddbd0..98bba2fc44e 100644 --- a/crates/networking/rpc/eth/gas_price.rs +++ b/crates/networking/rpc/eth/gas_price.rs @@ -18,9 +18,9 @@ impl RpcHandler for GasPrice { } async fn handle(&self, context: RpcApiContext) -> Result { - let latest_block_number = context.storage.get_latest_block_number()?; + let latest_block_number = context.storage.get_latest_block_number().await?; - let estimated_gas_tip = estimate_gas_tip(&context.storage)?; + let estimated_gas_tip = estimate_gas_tip(&context.storage).await?; let base_fee = context .storage diff --git a/crates/networking/rpc/eth/logs.rs b/crates/networking/rpc/eth/logs.rs index 1fae0a0b51b..289e26fff7f 100644 --- a/crates/networking/rpc/eth/logs.rs +++ b/crates/networking/rpc/eth/logs.rs @@ -86,7 +86,7 @@ impl RpcHandler for LogsFilter { } } async fn handle(&self, context: RpcApiContext) -> Result { - let filtered_logs = fetch_logs_with_filter(self, context.storage)?; + let filtered_logs = fetch_logs_with_filter(self, context.storage).await?; serde_json::to_value(filtered_logs).map_err(|error| { tracing::error!("Log filtering request failed with: {error}"); RpcErr::Internal("Failed to filter logs".to_string()) @@ -103,17 +103,19 @@ impl RpcHandler for LogsFilter { // then we simply could retrieve each log from the receipt and add the info // needed for the RPCLog struct. -pub(crate) fn fetch_logs_with_filter( +pub(crate) async fn fetch_logs_with_filter( filter: &LogsFilter, storage: Store, ) -> Result, RpcErr> { let from = filter .from_block - .resolve_block_number(&storage)? + .resolve_block_number(&storage) + .await? .ok_or(RpcErr::WrongParam("fromBlock".to_string()))?; let to = filter .to_block - .resolve_block_number(&storage)? + .resolve_block_number(&storage) + .await? .ok_or(RpcErr::WrongParam("toBlock".to_string()))?; if (from..=to).is_empty() { return Err(RpcErr::BadParams("Empty range".to_string())); @@ -133,7 +135,8 @@ pub(crate) fn fetch_logs_with_filter( // Take the header of the block, we // will use it to access the transactions. let block_body = storage - .get_block_body(block_num)? + .get_block_body(block_num) + .await? .ok_or(RpcErr::Internal(format!( "Could not get body for block {block_num}" )))?; @@ -151,7 +154,8 @@ pub(crate) fn fetch_logs_with_filter( for (tx_index, tx) in block_body.transactions.iter().enumerate() { let tx_hash = tx.compute_hash(); let receipt = storage - .get_receipt(block_num, tx_index as u64)? + .get_receipt(block_num, tx_index as u64) + .await? .ok_or(RpcErr::Internal("Could not get receipt".to_owned()))?; if receipt.succeeded { diff --git a/crates/networking/rpc/eth/max_priority_fee.rs b/crates/networking/rpc/eth/max_priority_fee.rs index 93c677b1963..67c0eaffe5f 100644 --- a/crates/networking/rpc/eth/max_priority_fee.rs +++ b/crates/networking/rpc/eth/max_priority_fee.rs @@ -18,7 +18,7 @@ impl RpcHandler for MaxPriorityFee { } async fn handle(&self, context: RpcApiContext) -> Result { - let estimated_gas_tip = estimate_gas_tip(&context.storage)?; + let estimated_gas_tip = estimate_gas_tip(&context.storage).await?; let gas_tip = match estimated_gas_tip { Some(gas_tip) => gas_tip, diff --git a/crates/networking/rpc/eth/transaction.rs b/crates/networking/rpc/eth/transaction.rs index 9ab5bd2aa05..91ff7e9c123 100644 --- a/crates/networking/rpc/eth/transaction.rs +++ b/crates/networking/rpc/eth/transaction.rs @@ -105,7 +105,7 @@ impl RpcHandler for CallRequest { async fn handle(&self, context: RpcApiContext) -> Result { let block = self.block.clone().unwrap_or_default(); info!("Requested call on block: {}", block); - let header = match block.resolve_block_header(&context.storage)? { + let header = match block.resolve_block_header(&context.storage).await? { Some(header) => header, // Block not found _ => return Ok(Value::Null), @@ -151,11 +151,11 @@ impl RpcHandler for GetTransactionByBlockNumberAndIndexRequest { "Requested transaction at index: {} of block with number: {}", self.transaction_index, self.block, ); - let block_number = match self.block.resolve_block_number(&context.storage)? { + let block_number = match self.block.resolve_block_number(&context.storage).await? { Some(block_number) => block_number, _ => return Ok(Value::Null), }; - let block_body = match context.storage.get_block_body(block_number)? { + let block_body = match context.storage.get_block_body(block_number).await? { Some(block_body) => block_body, _ => return Ok(Value::Null), }; @@ -202,11 +202,11 @@ impl RpcHandler for GetTransactionByBlockHashAndIndexRequest { "Requested transaction at index: {} of block with hash: {:#x}", self.transaction_index, self.block, ); - let block_number = match context.storage.get_block_number(self.block)? { + let block_number = match context.storage.get_block_number(self.block).await? { Some(number) => number, _ => return Ok(Value::Null), }; - let block_body = match context.storage.get_block_body(block_number)? { + let block_body = match context.storage.get_block_body(block_number).await? { Some(block_body) => block_body, _ => return Ok(Value::Null), }; @@ -241,17 +241,21 @@ impl RpcHandler for GetTransactionByHashRequest { "Requested transaction with hash: {:#x}", self.transaction_hash, ); - let (block_number, block_hash, index) = - match storage.get_transaction_location(self.transaction_hash)? { - Some(location) => location, - _ => return Ok(Value::Null), - }; + let (block_number, block_hash, index) = match storage + .get_transaction_location(self.transaction_hash) + .await? + { + Some(location) => location, + _ => return Ok(Value::Null), + }; - let transaction: ethrex_common::types::Transaction = - match storage.get_transaction_by_location(block_hash, index)? { - Some(transaction) => transaction, - _ => return Ok(Value::Null), - }; + let transaction: ethrex_common::types::Transaction = match storage + .get_transaction_by_location(block_hash, index) + .await? + { + Some(transaction) => transaction, + _ => return Ok(Value::Null), + }; let transaction = RpcTransaction::build(transaction, block_number, block_hash, index as usize); @@ -280,17 +284,20 @@ impl RpcHandler for GetTransactionReceiptRequest { "Requested receipt for transaction {:#x}", self.transaction_hash, ); - let (block_number, block_hash, index) = - match storage.get_transaction_location(self.transaction_hash)? { - Some(location) => location, - _ => return Ok(Value::Null), - }; - let block = match storage.get_block_by_hash(block_hash)? { + let (block_number, block_hash, index) = match storage + .get_transaction_location(self.transaction_hash) + .await? + { + Some(location) => location, + _ => return Ok(Value::Null), + }; + let block = match storage.get_block_by_hash(block_hash).await? { Some(block) => block, None => return Ok(Value::Null), }; let receipts = - block::get_all_block_rpc_receipts(block_number, block.header, block.body, storage)?; + block::get_all_block_rpc_receipts(block_number, block.header, block.body, storage) + .await?; serde_json::to_value(receipts.get(index as usize)) .map_err(|error| RpcErr::Internal(error.to_string())) @@ -324,7 +331,7 @@ impl RpcHandler for CreateAccessListRequest { async fn handle(&self, context: RpcApiContext) -> Result { let block = self.block.clone().unwrap_or_default(); info!("Requested access list creation for tx on block: {}", block); - let block_number = match block.resolve_block_number(&context.storage)? { + let block_number = match block.resolve_block_number(&context.storage).await? { Some(block_number) => block_number, _ => return Ok(Value::Null), }; @@ -386,7 +393,8 @@ impl RpcHandler for GetRawTransaction { async fn handle(&self, context: RpcApiContext) -> Result { let tx = context .storage - .get_transaction_by_hash(self.transaction_hash)?; + .get_transaction_by_hash(self.transaction_hash) + .await?; let tx = match tx { Some(tx) => tx, @@ -427,7 +435,7 @@ impl RpcHandler for EstimateGasRequest { let blockchain = &context.blockchain; let block = self.block.clone().unwrap_or_default(); info!("Requested estimate on block: {}", block); - let block_header = match block.resolve_block_header(storage)? { + let block_header = match block.resolve_block_header(storage).await? { Some(header) => header, // Block not found _ => return Ok(Value::Null), @@ -437,7 +445,8 @@ impl RpcHandler for EstimateGasRequest { Some(_nonce) => self.transaction.clone(), None => { let transaction_nonce = storage - .get_nonce_by_account_address(block_header.number, self.transaction.from)?; + .get_nonce_by_account_address(block_header.number, self.transaction.from) + .await?; let mut cloned_transaction = self.transaction.clone(); cloned_transaction.nonce = transaction_nonce; @@ -450,7 +459,9 @@ impl RpcHandler for EstimateGasRequest { // If the transaction is a plain value transfer, short circuit estimation. if let TxKind::Call(address) = transaction.to { - let account_info = storage.get_account_info(block_header.number, address)?; + let account_info = storage + .get_account_info(block_header.number, address) + .await?; let code = account_info.map(|info| storage.get_account_code(info.code_hash)); if code.is_none() { let mut value_transfer_transaction = transaction.clone(); @@ -481,7 +492,8 @@ impl RpcHandler for EstimateGasRequest { &transaction, storage, block_header.number, - )?; + ) + .await?; } // Check whether the execution is possible @@ -536,14 +548,15 @@ impl RpcHandler for EstimateGasRequest { } } -fn recap_with_account_balances( +async fn recap_with_account_balances( highest_gas_limit: u64, transaction: &GenericTransaction, storage: &Store, block_number: BlockNumber, ) -> Result { let account_balance = storage - .get_account_info(block_number, transaction.from)? + .get_account_info(block_number, transaction.from) + .await? .map(|acc| acc.balance) .unwrap_or_default(); let account_gas = @@ -630,14 +643,18 @@ impl RpcHandler for SendRawTransactionRequest { async fn handle(&self, context: RpcApiContext) -> Result { let hash = if let SendRawTransactionRequest::EIP4844(wrapped_blob_tx) = self { - context.blockchain.add_blob_transaction_to_pool( - wrapped_blob_tx.tx.clone(), - wrapped_blob_tx.blobs_bundle.clone(), - ) + context + .blockchain + .add_blob_transaction_to_pool( + wrapped_blob_tx.tx.clone(), + wrapped_blob_tx.blobs_bundle.clone(), + ) + .await } else { context .blockchain .add_transaction_to_pool(self.to_transaction()) + .await }?; serde_json::to_value(format!("{:#x}", hash)) .map_err(|error| RpcErr::Internal(error.to_string())) diff --git a/crates/networking/rpc/l2/transaction.rs b/crates/networking/rpc/l2/transaction.rs index f84c5e184f8..b1d44f61e45 100644 --- a/crates/networking/rpc/l2/transaction.rs +++ b/crates/networking/rpc/l2/transaction.rs @@ -85,9 +85,11 @@ impl RpcHandler for SponsoredTx { context .storage .get_latest_block_number() + .await .map_err(RpcErr::from)?, self.to, ) + .await .map_err(RpcErr::from)? .unwrap_or_default(); let code = context @@ -121,15 +123,18 @@ impl RpcHandler for SponsoredTx { let latest_block_number = context .storage .get_latest_block_number() + .await .map_err(RpcErr::from)?; let chain_config = context.storage.get_chain_config().map_err(RpcErr::from)?; let chain_id = chain_config.chain_id; let nonce = context .storage .get_nonce_by_account_address(latest_block_number, sponsor_address) + .await .map_err(RpcErr::from)? .ok_or(RpcErr::InvalidEthrexL2Message("Invalid nonce".to_string()))?; let max_priority_fee_per_gas = estimate_gas_tip(&context.storage) + .await .map_err(RpcErr::from)? .unwrap_or_default(); let gas_price_request = GasPrice {}.handle(context.clone()).await?; diff --git a/crates/networking/rpc/rpc.rs b/crates/networking/rpc/rpc.rs index 17e41a6e641..2a4f7bb33f7 100644 --- a/crates/networking/rpc/rpc.rs +++ b/crates/networking/rpc/rpc.rs @@ -344,13 +344,13 @@ pub async fn map_eth_requests(req: &RpcRequest, context: RpcApiContext) -> Resul "eth_estimateGas" => EstimateGasRequest::call(req, context).await, "eth_getLogs" => LogsFilter::call(req, context).await, "eth_newFilter" => { - NewFilterRequest::stateful_call(req, context.storage, context.active_filters) + NewFilterRequest::stateful_call(req, context.storage, context.active_filters).await } "eth_uninstallFilter" => { DeleteFilterRequest::stateful_call(req, context.storage, context.active_filters) } "eth_getFilterChanges" => { - FilterChangesRequest::stateful_call(req, context.storage, context.active_filters) + FilterChangesRequest::stateful_call(req, context.storage, context.active_filters).await } "eth_sendRawTransaction" => { cfg_if::cfg_if! { diff --git a/crates/networking/rpc/types/block_identifier.rs b/crates/networking/rpc/types/block_identifier.rs index 9c816435c55..108efa334fb 100644 --- a/crates/networking/rpc/types/block_identifier.rs +++ b/crates/networking/rpc/types/block_identifier.rs @@ -31,24 +31,26 @@ pub enum BlockTag { } impl BlockIdentifier { - pub fn resolve_block_number(&self, storage: &Store) -> Result, StoreError> { + pub async fn resolve_block_number( + &self, + storage: &Store, + ) -> Result, StoreError> { match self { BlockIdentifier::Number(num) => Ok(Some(*num)), BlockIdentifier::Tag(tag) => match tag { - BlockTag::Earliest => Ok(Some(storage.get_earliest_block_number()?)), - BlockTag::Finalized => storage.get_finalized_block_number(), - BlockTag::Safe => storage.get_safe_block_number(), - BlockTag::Latest => Ok(Some(storage.get_latest_block_number()?)), + BlockTag::Earliest => Ok(Some(storage.get_earliest_block_number().await?)), + BlockTag::Finalized => storage.get_finalized_block_number().await, + BlockTag::Safe => storage.get_safe_block_number().await, + BlockTag::Latest => Ok(Some(storage.get_latest_block_number().await?)), BlockTag::Pending => { // TODO(#1112): We need to check individual intrincacies of the pending tag for // each RPC method that uses it. - storage - .get_pending_block_number() + if let Some(pending_block_number) = storage.get_pending_block_number().await? { + Ok(Some(pending_block_number)) + } else { // If there are no pending blocks, we return the latest block number - .and_then(|pending_block_number| match pending_block_number { - Some(block_number) => Ok(Some(block_number)), - None => Ok(Some(storage.get_latest_block_number()?)), - }) + Ok(Some(storage.get_latest_block_number().await?)) + } } }, } @@ -76,8 +78,11 @@ impl BlockIdentifier { Ok(BlockIdentifier::Number(block_number)) } - pub fn resolve_block_header(&self, storage: &Store) -> Result, StoreError> { - match self.resolve_block_number(storage)? { + pub async fn resolve_block_header( + &self, + storage: &Store, + ) -> Result, StoreError> { + match self.resolve_block_number(storage).await? { Some(block_number) => storage.get_block_header(block_number), _ => Ok(None), } @@ -86,10 +91,13 @@ impl BlockIdentifier { impl BlockIdentifierOrHash { #[allow(unused)] - pub fn resolve_block_number(&self, storage: &Store) -> Result, StoreError> { + pub async fn resolve_block_number( + &self, + storage: &Store, + ) -> Result, StoreError> { match self { - BlockIdentifierOrHash::Identifier(id) => id.resolve_block_number(storage), - BlockIdentifierOrHash::Hash(block_hash) => storage.get_block_number(*block_hash), + BlockIdentifierOrHash::Identifier(id) => id.resolve_block_number(storage).await, + BlockIdentifierOrHash::Hash(block_hash) => storage.get_block_number(*block_hash).await, } } @@ -107,13 +115,13 @@ impl BlockIdentifierOrHash { } #[allow(unused)] - pub fn is_latest(&self, storage: &Store) -> Result { + pub async fn is_latest(&self, storage: &Store) -> Result { if self == &BlockTag::Latest { return Ok(true); } - let result = self.resolve_block_number(storage)?; - let latest = storage.get_latest_block_number()?; + let result = self.resolve_block_number(storage).await?; + let latest = storage.get_latest_block_number().await?; Ok(result.is_some_and(|res| res == latest)) } diff --git a/crates/storage/api.rs b/crates/storage/api.rs index a0e263d6d84..ff17dc17c06 100644 --- a/crates/storage/api.rs +++ b/crates/storage/api.rs @@ -48,10 +48,26 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { ) -> Result<(), StoreError>; /// Obtain canonical block body - fn get_block_body(&self, block_number: BlockNumber) -> Result, StoreError>; + async fn get_block_body( + &self, + block_number: BlockNumber, + ) -> Result, StoreError>; + + /// Obtain canonical block bodies in from..=to + async fn get_block_bodies( + &self, + from: BlockNumber, + to: BlockNumber, + ) -> Result, StoreError>; + + /// Obtain block bodies from a list of hashes + async fn get_block_bodies_by_hash( + &self, + hashes: Vec, + ) -> Result, StoreError>; /// Obtain any block body using the hash - fn get_block_body_by_hash( + async fn get_block_body_by_hash( &self, block_hash: BlockHash, ) -> Result, StoreError>; @@ -62,7 +78,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { ) -> Result, StoreError>; async fn add_pending_block(&self, block: Block) -> Result<(), StoreError>; - fn get_pending_block(&self, block_hash: BlockHash) -> Result, StoreError>; + async fn get_pending_block(&self, block_hash: BlockHash) -> Result, StoreError>; /// Add block number for a given hash async fn add_block_number( @@ -72,7 +88,10 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { ) -> Result<(), StoreError>; /// Obtain block number for a given hash - fn get_block_number(&self, block_hash: BlockHash) -> Result, StoreError>; + async fn get_block_number( + &self, + block_hash: BlockHash, + ) -> Result, StoreError>; /// Store transaction location (block number and index of the transaction within the block) async fn add_transaction_location( @@ -90,7 +109,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { ) -> Result<(), StoreError>; /// Obtain transaction location (block hash and index) - fn get_transaction_location( + async fn get_transaction_location( &self, transaction_hash: H256, ) -> Result, StoreError>; @@ -117,7 +136,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { ) -> Result<(), StoreError>; /// Obtain receipt for a canonical block represented by the block number. - fn get_receipt( + async fn get_receipt( &self, block_number: BlockNumber, index: Index, @@ -129,24 +148,24 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { /// Obtain account code via code hash fn get_account_code(&self, code_hash: H256) -> Result, StoreError>; - fn get_transaction_by_hash( + async fn get_transaction_by_hash( &self, transaction_hash: H256, ) -> Result, StoreError> { let (_block_number, block_hash, index) = - match self.get_transaction_location(transaction_hash)? { + match self.get_transaction_location(transaction_hash).await? { Some(location) => location, None => return Ok(None), }; - self.get_transaction_by_location(block_hash, index) + self.get_transaction_by_location(block_hash, index).await } - fn get_transaction_by_location( + async fn get_transaction_by_location( &self, block_hash: H256, index: u64, ) -> Result, StoreError> { - let block_body = match self.get_block_body_by_hash(block_hash)? { + let block_body = match self.get_block_body_by_hash(block_hash).await? { Some(body) => body, None => return Ok(None), }; @@ -156,12 +175,12 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { .and_then(|index: usize| block_body.transactions.get(index).cloned())) } - fn get_block_by_hash(&self, block_hash: BlockHash) -> Result, StoreError> { + async fn get_block_by_hash(&self, block_hash: BlockHash) -> Result, StoreError> { let header = match self.get_block_header_by_hash(block_hash)? { Some(header) => header, None => return Ok(None), }; - let body = match self.get_block_body_by_hash(block_hash)? { + let body = match self.get_block_body_by_hash(block_hash).await? { Some(body) => body, None => return Ok(None), }; @@ -169,7 +188,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { } // Get the canonical block hash for a given block number. - fn get_canonical_block_hash( + async fn get_canonical_block_hash( &self, block_number: BlockNumber, ) -> Result, StoreError>; @@ -188,7 +207,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { ) -> Result<(), StoreError>; /// Obtain earliest block number - fn get_earliest_block_number(&self) -> Result, StoreError>; + async fn get_earliest_block_number(&self) -> Result, StoreError>; /// Update finalized block number async fn update_finalized_block_number( @@ -197,20 +216,20 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { ) -> Result<(), StoreError>; /// Obtain finalized block number - fn get_finalized_block_number(&self) -> Result, StoreError>; + async fn get_finalized_block_number(&self) -> Result, StoreError>; /// Update safe block number async fn update_safe_block_number(&self, block_number: BlockNumber) -> Result<(), StoreError>; /// Obtain safe block number - fn get_safe_block_number(&self) -> Result, StoreError>; + async fn get_safe_block_number(&self) -> Result, StoreError>; /// Update latest block number async fn update_latest_block_number(&self, block_number: BlockNumber) -> Result<(), StoreError>; /// Obtain latest block number - fn get_latest_block_number(&self) -> Result, StoreError>; + async fn get_latest_block_number(&self) -> Result, StoreError>; /// Update pending block number async fn update_pending_block_number( @@ -219,7 +238,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { ) -> Result<(), StoreError>; /// Obtain pending block number - fn get_pending_block_number(&self) -> Result, StoreError>; + async fn get_pending_block_number(&self) -> Result, StoreError>; /// Obtain a storage trie from the given address and storage_root /// Doesn't check if the account is stored @@ -243,7 +262,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { async fn add_payload(&self, payload_id: u64, block: Block) -> Result<(), StoreError>; - fn get_payload(&self, payload_id: u64) -> Result, StoreError>; + async fn get_payload(&self, payload_id: u64) -> Result, StoreError>; async fn update_payload( &self, @@ -260,7 +279,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { -> Result<(), StoreError>; /// Gets the hash of the last header downloaded during a snap sync - fn get_header_download_checkpoint(&self) -> Result, StoreError>; + async fn get_header_download_checkpoint(&self) -> Result, StoreError>; /// Sets the last key fetched from the state trie being fetched during snap sync async fn set_state_trie_key_checkpoint( @@ -269,7 +288,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { ) -> Result<(), StoreError>; /// Gets the last key fetched from the state trie being fetched during snap sync - fn get_state_trie_key_checkpoint( + async fn get_state_trie_key_checkpoint( &self, ) -> Result, StoreError>; @@ -292,12 +311,12 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { async fn set_state_heal_paths(&self, paths: Vec) -> Result<(), StoreError>; /// Gets the state trie paths in need of healing - fn get_state_heal_paths(&self) -> Result>, StoreError>; + async fn get_state_heal_paths(&self) -> Result>, StoreError>; /// Clears all checkpoint data created during the last snap sync async fn clear_snap_state(&self) -> Result<(), StoreError>; - fn is_synced(&self) -> Result; + async fn is_synced(&self) -> Result; async fn update_sync_status(&self, status: bool) -> Result<(), StoreError>; @@ -331,7 +350,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { ) -> Result<(), StoreError>; /// Get the latest root of the rebuilt state trie and the last downloaded hashes from each segment - fn get_state_trie_rebuild_checkpoint( + async fn get_state_trie_rebuild_checkpoint( &self, ) -> Result, StoreError>; @@ -342,7 +361,9 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { ) -> Result<(), StoreError>; /// Get the accont hashes and roots of the storage tries awaiting rebuild - fn get_storage_trie_rebuild_pending(&self) -> Result>, StoreError>; + async fn get_storage_trie_rebuild_pending( + &self, + ) -> Result>, StoreError>; /// Clears the state and storage snapshots async fn clear_snapshot(&self) -> Result<(), StoreError>; @@ -351,7 +372,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { fn read_account_snapshot(&self, start: H256) -> Result, StoreError>; /// Reads the next `MAX_SNAPSHOT_READS` elements from the storage snapshot as from the `start` storage key - fn read_storage_snapshot( + async fn read_storage_snapshot( &self, start: H256, account_hash: H256, @@ -370,5 +391,8 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { /// Returns the latest valid ancestor hash for a given invalid block hash. /// Used to provide `latest_valid_hash` in the Engine API when processing invalid payloads. - fn get_latest_valid_ancestor(&self, block: BlockHash) -> Result, StoreError>; + async fn get_latest_valid_ancestor( + &self, + block: BlockHash, + ) -> Result, StoreError>; } diff --git a/crates/storage/store.rs b/crates/storage/store.rs index d1192d8ad3b..e664fbe32f1 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -109,12 +109,12 @@ impl Store { Ok(store) } - pub fn get_account_info( + pub async fn get_account_info( &self, block_number: BlockNumber, address: Address, ) -> Result, StoreError> { - match self.get_canonical_block_hash(block_number)? { + match self.get_canonical_block_hash(block_number).await? { Some(block_hash) => self.get_account_info_by_hash(block_hash, address), None => Ok(None), } @@ -172,11 +172,11 @@ impl Store { self.engine.get_block_header_by_hash(block_hash) } - pub fn get_block_body_by_hash( + pub async fn get_block_body_by_hash( &self, block_hash: BlockHash, ) -> Result, StoreError> { - self.engine.get_block_body_by_hash(block_hash) + self.engine.get_block_body_by_hash(block_hash).await } pub async fn add_block_body( @@ -187,11 +187,26 @@ impl Store { self.engine.add_block_body(block_hash, block_body).await } - pub fn get_block_body( + pub async fn get_block_body( &self, block_number: BlockNumber, ) -> Result, StoreError> { - self.engine.get_block_body(block_number) + self.engine.get_block_body(block_number).await + } + + pub async fn get_block_bodies( + &self, + from: BlockNumber, + to: BlockNumber, + ) -> Result, StoreError> { + self.engine.get_block_bodies(from, to).await + } + + pub async fn get_block_bodies_by_hash( + &self, + hashes: Vec, + ) -> Result, StoreError> { + self.engine.get_block_bodies_by_hash(hashes).await } pub async fn add_pending_block(&self, block: Block) -> Result<(), StoreError> { @@ -202,9 +217,12 @@ impl Store { self.engine.add_pending_block(block).await } - pub fn get_pending_block(&self, block_hash: BlockHash) -> Result, StoreError> { + pub async fn get_pending_block( + &self, + block_hash: BlockHash, + ) -> Result, StoreError> { info!("get pending: {}", block_hash); - self.engine.get_pending_block(block_hash) + self.engine.get_pending_block(block_hash).await } pub async fn add_block_number( @@ -218,11 +236,11 @@ impl Store { .await } - pub fn get_block_number( + pub async fn get_block_number( &self, block_hash: BlockHash, ) -> Result, StoreError> { - self.engine.get_block_number(block_hash) + self.engine.get_block_number(block_hash).await } pub async fn add_transaction_location( @@ -257,11 +275,11 @@ impl Store { self.engine.add_transaction_locations(locations).await } - pub fn get_transaction_location( + pub async fn get_transaction_location( &self, transaction_hash: H256, ) -> Result, StoreError> { - self.engine.get_transaction_location(transaction_hash) + self.engine.get_transaction_location(transaction_hash).await } pub async fn add_account_code(&self, code_hash: H256, code: Bytes) -> Result<(), StoreError> { @@ -272,12 +290,12 @@ impl Store { self.engine.get_account_code(code_hash) } - pub fn get_code_by_account_address( + pub async fn get_code_by_account_address( &self, block_number: BlockNumber, address: Address, ) -> Result, StoreError> { - let Some(block_hash) = self.engine.get_canonical_block_hash(block_number)? else { + let Some(block_hash) = self.engine.get_canonical_block_hash(block_number).await? else { return Ok(None); }; let Some(state_trie) = self.state_trie(block_hash)? else { @@ -291,12 +309,12 @@ impl Store { self.get_account_code(account_state.code_hash) } - pub fn get_nonce_by_account_address( + pub async fn get_nonce_by_account_address( &self, block_number: BlockNumber, address: Address, ) -> Result, StoreError> { - let Some(block_hash) = self.engine.get_canonical_block_hash(block_number)? else { + let Some(block_hash) = self.engine.get_canonical_block_hash(block_number).await? else { return Ok(None); }; let Some(state_trie) = self.state_trie(block_hash)? else { @@ -407,7 +425,7 @@ impl Store { }; genesis_state_trie.insert(hashed_address, account_state.encode_to_vec())?; } - Ok(genesis_state_trie.hash()?) + genesis_state_trie.hash().map_err(StoreError::Trie) } pub async fn add_receipt( @@ -434,12 +452,12 @@ impl Store { self.engine.add_receipts_for_blocks(receipts).await } - pub fn get_receipt( + pub async fn get_receipt( &self, block_number: BlockNumber, index: Index, ) -> Result, StoreError> { - self.engine.get_receipt(block_number, index) + self.engine.get_receipt(block_number, index).await } pub async fn add_block(&self, block: Block) -> Result<(), StoreError> { @@ -497,32 +515,34 @@ impl Store { self.set_chain_config(&genesis.config).await } - pub fn get_transaction_by_hash( + pub async fn get_transaction_by_hash( &self, transaction_hash: H256, ) -> Result, StoreError> { - self.engine.get_transaction_by_hash(transaction_hash) + self.engine.get_transaction_by_hash(transaction_hash).await } - pub fn get_transaction_by_location( + pub async fn get_transaction_by_location( &self, block_hash: BlockHash, index: u64, ) -> Result, StoreError> { - self.engine.get_transaction_by_location(block_hash, index) + self.engine + .get_transaction_by_location(block_hash, index) + .await } - pub fn get_block_by_hash(&self, block_hash: H256) -> Result, StoreError> { - self.engine.get_block_by_hash(block_hash) + pub async fn get_block_by_hash(&self, block_hash: H256) -> Result, StoreError> { + self.engine.get_block_by_hash(block_hash).await } - pub fn get_storage_at( + pub async fn get_storage_at( &self, block_number: BlockNumber, address: Address, storage_key: H256, ) -> Result, StoreError> { - match self.get_canonical_block_hash(block_number)? { + match self.get_canonical_block_hash(block_number).await? { Some(block_hash) => self.get_storage_at_hash(block_hash, address, storage_key), None => Ok(None), } @@ -559,9 +579,10 @@ impl Store { self.engine.update_earliest_block_number(block_number).await } - pub fn get_earliest_block_number(&self) -> Result { + pub async fn get_earliest_block_number(&self) -> Result { self.engine - .get_earliest_block_number()? + .get_earliest_block_number() + .await? .ok_or(StoreError::MissingEarliestBlockNumber) } @@ -574,8 +595,8 @@ impl Store { .await } - pub fn get_finalized_block_number(&self) -> Result, StoreError> { - self.engine.get_finalized_block_number() + pub async fn get_finalized_block_number(&self) -> Result, StoreError> { + self.engine.get_finalized_block_number().await } pub async fn update_safe_block_number( @@ -585,8 +606,8 @@ impl Store { self.engine.update_safe_block_number(block_number).await } - pub fn get_safe_block_number(&self) -> Result, StoreError> { - self.engine.get_safe_block_number() + pub async fn get_safe_block_number(&self) -> Result, StoreError> { + self.engine.get_safe_block_number().await } pub async fn update_latest_block_number( @@ -596,9 +617,10 @@ impl Store { self.engine.update_latest_block_number(block_number).await } - pub fn get_latest_block_number(&self) -> Result { + pub async fn get_latest_block_number(&self) -> Result { self.engine - .get_latest_block_number()? + .get_latest_block_number() + .await? .ok_or(StoreError::MissingLatestBlockNumber) } @@ -609,8 +631,8 @@ impl Store { self.engine.update_pending_block_number(block_number).await } - pub fn get_pending_block_number(&self) -> Result, StoreError> { - self.engine.get_pending_block_number() + pub async fn get_pending_block_number(&self) -> Result, StoreError> { + self.engine.get_pending_block_number().await } pub async fn set_canonical_block( @@ -621,19 +643,19 @@ impl Store { self.engine.set_canonical_block(number, hash).await } - pub fn get_canonical_block_hash( + pub async fn get_canonical_block_hash( &self, block_number: BlockNumber, ) -> Result, StoreError> { - self.engine.get_canonical_block_hash(block_number) + self.engine.get_canonical_block_hash(block_number).await } - pub fn get_latest_canonical_block_hash(&self) -> Result, StoreError> { - let latest_block_number = match self.engine.get_latest_block_number() { + pub async fn get_latest_canonical_block_hash(&self) -> Result, StoreError> { + let latest_block_number = match self.engine.get_latest_block_number().await { Ok(n) => n.ok_or(StoreError::MissingLatestBlockNumber)?, Err(e) => return Err(e), }; - self.get_canonical_block_hash(latest_block_number) + self.get_canonical_block_hash(latest_block_number).await } /// Marks a block number as not having any canonical blocks associated with it. @@ -674,12 +696,12 @@ impl Store { ))) } - pub fn get_account_state( + pub async fn get_account_state( &self, block_number: BlockNumber, address: Address, ) -> Result, StoreError> { - let Some(block_hash) = self.engine.get_canonical_block_hash(block_number)? else { + let Some(block_hash) = self.engine.get_canonical_block_hash(block_number).await? else { return Ok(None); }; let Some(state_trie) = self.state_trie(block_hash)? else { @@ -711,12 +733,12 @@ impl Store { Ok(Some(AccountState::decode(&encoded_state)?)) } - pub fn get_account_proof( + pub async fn get_account_proof( &self, block_number: BlockNumber, address: &Address, ) -> Result>>, StoreError> { - let Some(block_hash) = self.engine.get_canonical_block_hash(block_number)? else { + let Some(block_hash) = self.engine.get_canonical_block_hash(block_number).await? else { return Ok(None); }; let Some(state_trie) = self.state_trie(block_hash)? else { @@ -862,8 +884,8 @@ impl Store { self.engine.add_payload(payload_id, block).await } - pub fn get_payload(&self, payload_id: u64) -> Result, StoreError> { - self.engine.get_payload(payload_id) + pub async fn get_payload(&self, payload_id: u64) -> Result, StoreError> { + self.engine.get_payload(payload_id).await } pub async fn update_payload( @@ -933,8 +955,8 @@ impl Store { } /// Gets the hash of the last header downloaded during a snap sync - pub fn get_header_download_checkpoint(&self) -> Result, StoreError> { - self.engine.get_header_download_checkpoint() + pub async fn get_header_download_checkpoint(&self) -> Result, StoreError> { + self.engine.get_header_download_checkpoint().await } /// Sets the last key fetched from the state trie being fetched during snap sync @@ -946,10 +968,10 @@ impl Store { } /// Gets the last key fetched from the state trie being fetched during snap sync - pub fn get_state_trie_key_checkpoint( + pub async fn get_state_trie_key_checkpoint( &self, ) -> Result, StoreError> { - self.engine.get_state_trie_key_checkpoint() + self.engine.get_state_trie_key_checkpoint().await } /// Sets storage trie paths in need of healing, grouped by hashed address @@ -977,12 +999,12 @@ impl Store { } /// Gets the state trie paths in need of healing - pub fn get_state_heal_paths(&self) -> Result>, StoreError> { - self.engine.get_state_heal_paths() + pub async fn get_state_heal_paths(&self) -> Result>, StoreError> { + self.engine.get_state_heal_paths().await } - pub fn is_synced(&self) -> Result { - self.engine.is_synced() + pub async fn is_synced(&self) -> Result { + self.engine.is_synced().await } pub async fn update_sync_status(&self, status: bool) -> Result<(), StoreError> { self.engine.update_sync_status(status).await @@ -1039,10 +1061,10 @@ impl Store { } /// Get the latest root of the rebuilt state trie and the last downloaded hashes from each segment - pub fn get_state_trie_rebuild_checkpoint( + pub async fn get_state_trie_rebuild_checkpoint( &self, ) -> Result, StoreError> { - self.engine.get_state_trie_rebuild_checkpoint() + self.engine.get_state_trie_rebuild_checkpoint().await } /// Set the accont hashes and roots of the storage tries awaiting rebuild @@ -1054,10 +1076,10 @@ impl Store { } /// Get the accont hashes and roots of the storage tries awaiting rebuild - pub fn get_storage_trie_rebuild_pending( + pub async fn get_storage_trie_rebuild_pending( &self, ) -> Result>, StoreError> { - self.engine.get_storage_trie_rebuild_pending() + self.engine.get_storage_trie_rebuild_pending().await } /// Clears the state and storage snapshots @@ -1074,19 +1096,19 @@ impl Store { } /// Reads the next `MAX_SNAPSHOT_READS` elements from the storage snapshot as from the `start` storage key - pub fn read_storage_snapshot( + pub async fn read_storage_snapshot( &self, account_hash: H256, start: H256, ) -> Result, StoreError> { - self.engine.read_storage_snapshot(account_hash, start) + self.engine.read_storage_snapshot(account_hash, start).await } - pub fn get_latest_valid_ancestor( + pub async fn get_latest_valid_ancestor( &self, block: BlockHash, ) -> Result, StoreError> { - self.engine.get_latest_valid_ancestor(block) + self.engine.get_latest_valid_ancestor(block).await } pub async fn set_latest_valid_ancestor( @@ -1227,7 +1249,7 @@ mod tests { store.set_canonical_block(block_number, hash).await.unwrap(); let stored_header = store.get_block_header(block_number).unwrap().unwrap(); - let stored_body = store.get_block_body(block_number).unwrap().unwrap(); + let stored_body = store.get_block_body(block_number).await.unwrap().unwrap(); assert_eq!(stored_header, block_header); assert_eq!(stored_body, block_body); @@ -1295,7 +1317,7 @@ mod tests { .await .unwrap(); - let stored_number = store.get_block_number(block_hash).unwrap().unwrap(); + let stored_number = store.get_block_number(block_hash).await.unwrap().unwrap(); assert_eq!(stored_number, block_number); } @@ -1318,6 +1340,7 @@ mod tests { let stored_location = store .get_transaction_location(transaction_hash) + .await .unwrap() .unwrap(); @@ -1341,7 +1364,10 @@ mod tests { .unwrap(); assert_eq!( - store.get_transaction_location(transaction_hash).unwrap(), + store + .get_transaction_location(transaction_hash) + .await + .unwrap(), None ) } @@ -1368,7 +1394,11 @@ mod tests { .await .unwrap(); - let stored_receipt = store.get_receipt(block_number, index).unwrap().unwrap(); + let stored_receipt = store + .get_receipt(block_number, index) + .await + .unwrap() + .unwrap(); assert_eq!(stored_receipt, receipt); } @@ -1415,11 +1445,12 @@ mod tests { .await .unwrap(); - let stored_earliest_block_number = store.get_earliest_block_number().unwrap(); - let stored_finalized_block_number = store.get_finalized_block_number().unwrap().unwrap(); - let stored_safe_block_number = store.get_safe_block_number().unwrap().unwrap(); - let stored_latest_block_number = store.get_latest_block_number().unwrap(); - let stored_pending_block_number = store.get_pending_block_number().unwrap().unwrap(); + let stored_earliest_block_number = store.get_earliest_block_number().await.unwrap(); + let stored_finalized_block_number = + store.get_finalized_block_number().await.unwrap().unwrap(); + let stored_safe_block_number = store.get_safe_block_number().await.unwrap().unwrap(); + let stored_latest_block_number = store.get_latest_block_number().await.unwrap(); + let stored_pending_block_number = store.get_pending_block_number().await.unwrap().unwrap(); assert_eq!(earliest_block_number, stored_earliest_block_number); assert_eq!(finalized_block_number, stored_finalized_block_number); diff --git a/crates/storage/store_db/in_memory.rs b/crates/storage/store_db/in_memory.rs index a5fd4d5b233..cd0974c57fc 100644 --- a/crates/storage/store_db/in_memory.rs +++ b/crates/storage/store_db/in_memory.rs @@ -97,7 +97,7 @@ impl StoreEngine for Store { } } - fn get_block_body(&self, block_number: u64) -> Result, StoreError> { + async fn get_block_body(&self, block_number: u64) -> Result, StoreError> { let store = self.inner(); if let Some(hash) = store.canonical_hashes.get(&block_number) { Ok(store.bodies.get(hash).cloned()) @@ -106,6 +106,37 @@ impl StoreEngine for Store { } } + async fn get_block_bodies( + &self, + from: BlockNumber, + to: BlockNumber, + ) -> Result, StoreError> { + let store = self.inner(); + let mut res = Vec::new(); + for block_number in from..=to { + if let Some(hash) = store.canonical_hashes.get(&block_number) { + if let Some(block) = store.bodies.get(hash).cloned() { + res.push(block); + } + } + } + Ok(res) + } + + async fn get_block_bodies_by_hash( + &self, + hashes: Vec, + ) -> Result, StoreError> { + let store = self.inner(); + let mut res = Vec::new(); + for hash in hashes { + if let Some(block) = store.bodies.get(&hash).cloned() { + res.push(block); + } + } + Ok(res) + } + async fn add_pending_block(&self, block: Block) -> Result<(), StoreError> { self.inner() .pending_blocks @@ -113,7 +144,7 @@ impl StoreEngine for Store { Ok(()) } - fn get_pending_block(&self, block_hash: BlockHash) -> Result, StoreError> { + async fn get_pending_block(&self, block_hash: BlockHash) -> Result, StoreError> { Ok(self.inner().pending_blocks.get(&block_hash).cloned()) } @@ -186,7 +217,10 @@ impl StoreEngine for Store { Ok(()) } - fn get_block_number(&self, block_hash: BlockHash) -> Result, StoreError> { + async fn get_block_number( + &self, + block_hash: BlockHash, + ) -> Result, StoreError> { Ok(self.inner().block_numbers.get(&block_hash).copied()) } @@ -205,7 +239,7 @@ impl StoreEngine for Store { Ok(()) } - fn get_transaction_location( + async fn get_transaction_location( &self, transaction_hash: H256, ) -> Result, StoreError> { @@ -232,7 +266,7 @@ impl StoreEngine for Store { Ok(()) } - fn get_receipt( + async fn get_receipt( &self, block_number: BlockNumber, index: Index, @@ -279,7 +313,7 @@ impl StoreEngine for Store { Ok(()) } - fn get_earliest_block_number(&self) -> Result, StoreError> { + async fn get_earliest_block_number(&self) -> Result, StoreError> { Ok(self.inner().chain_data.earliest_block_number) } @@ -294,7 +328,7 @@ impl StoreEngine for Store { Ok(()) } - fn get_finalized_block_number(&self) -> Result, StoreError> { + async fn get_finalized_block_number(&self) -> Result, StoreError> { Ok(self.inner().chain_data.finalized_block_number) } @@ -306,7 +340,7 @@ impl StoreEngine for Store { Ok(()) } - fn get_safe_block_number(&self) -> Result, StoreError> { + async fn get_safe_block_number(&self) -> Result, StoreError> { Ok(self.inner().chain_data.safe_block_number) } @@ -320,7 +354,7 @@ impl StoreEngine for Store { .replace(block_number); Ok(()) } - fn get_latest_block_number(&self) -> Result, StoreError> { + async fn get_latest_block_number(&self) -> Result, StoreError> { Ok(self.inner().chain_data.latest_block_number) } @@ -335,7 +369,7 @@ impl StoreEngine for Store { Ok(()) } - fn get_pending_block_number(&self) -> Result, StoreError> { + async fn get_pending_block_number(&self) -> Result, StoreError> { Ok(self.inner().chain_data.pending_block_number) } @@ -352,7 +386,7 @@ impl StoreEngine for Store { Trie::open(db, state_root) } - fn get_block_body_by_hash( + async fn get_block_body_by_hash( &self, block_hash: BlockHash, ) -> Result, StoreError> { @@ -375,7 +409,7 @@ impl StoreEngine for Store { Ok(()) } - fn get_canonical_block_hash( + async fn get_canonical_block_hash( &self, block_number: BlockNumber, ) -> Result, StoreError> { @@ -394,7 +428,7 @@ impl StoreEngine for Store { Ok(()) } - fn get_payload(&self, payload_id: u64) -> Result, StoreError> { + async fn get_payload(&self, payload_id: u64) -> Result, StoreError> { Ok(self.inner().payloads.get(&payload_id).cloned()) } @@ -471,7 +505,7 @@ impl StoreEngine for Store { Ok(()) } - fn get_header_download_checkpoint(&self) -> Result, StoreError> { + async fn get_header_download_checkpoint(&self) -> Result, StoreError> { Ok(self.inner().snap_state.header_download_checkpoint) } @@ -483,7 +517,7 @@ impl StoreEngine for Store { Ok(()) } - fn get_state_trie_key_checkpoint( + async fn get_state_trie_key_checkpoint( &self, ) -> Result, StoreError> { Ok(self.inner().snap_state.state_trie_key_checkpoint) @@ -519,7 +553,7 @@ impl StoreEngine for Store { Ok(()) } - fn is_synced(&self) -> Result { + async fn is_synced(&self) -> Result { Ok(self.inner().chain_data.is_synced) } @@ -533,7 +567,7 @@ impl StoreEngine for Store { Ok(()) } - fn get_state_heal_paths(&self) -> Result>, StoreError> { + async fn get_state_heal_paths(&self) -> Result>, StoreError> { Ok(self.inner().snap_state.state_heal_paths.clone()) } @@ -588,7 +622,7 @@ impl StoreEngine for Store { Ok(()) } - fn get_state_trie_rebuild_checkpoint( + async fn get_state_trie_rebuild_checkpoint( &self, ) -> Result, StoreError> { Ok(self.inner().snap_state.state_trie_rebuild_checkpoint) @@ -614,7 +648,7 @@ impl StoreEngine for Store { .collect()) } - fn read_storage_snapshot( + async fn read_storage_snapshot( &self, start: H256, account_hash: H256, @@ -639,11 +673,16 @@ impl StoreEngine for Store { Ok(()) } - fn get_storage_trie_rebuild_pending(&self) -> Result>, StoreError> { + async fn get_storage_trie_rebuild_pending( + &self, + ) -> Result>, StoreError> { Ok(self.inner().snap_state.storage_trie_rebuild_pending.clone()) } - fn get_latest_valid_ancestor(&self, block: BlockHash) -> Result, StoreError> { + async fn get_latest_valid_ancestor( + &self, + block: BlockHash, + ) -> Result, StoreError> { Ok(self.inner().invalid_ancestors.get(&block).cloned()) } diff --git a/crates/storage/store_db/libmdbx.rs b/crates/storage/store_db/libmdbx.rs index 17a879523ec..4e5fb878b4c 100644 --- a/crates/storage/store_db/libmdbx.rs +++ b/crates/storage/store_db/libmdbx.rs @@ -78,7 +78,37 @@ impl Store { } // Helper method to read from a libmdbx table - fn read(&self, key: T::Key) -> Result, StoreError> { + async fn read(&self, key: T::Key) -> Result, StoreError> { + let db = self.db.clone(); + tokio::task::spawn_blocking(move || { + let txn = db.begin_read().map_err(StoreError::LibmdbxError)?; + txn.get::(key).map_err(StoreError::LibmdbxError) + }) + .await + .map_err(|e| StoreError::Custom(format!("task panicked: {e}")))? + } + + // Helper method to read from a libmdbx table + async fn read_bulk(&self, keys: Vec) -> Result, StoreError> { + let db = self.db.clone(); + tokio::task::spawn_blocking(move || { + let mut res = Vec::new(); + let txn = db.begin_read().map_err(StoreError::LibmdbxError)?; + for key in keys { + let val = txn.get::(key).map_err(StoreError::LibmdbxError)?; + match val { + Some(val) => res.push(val), + None => Err(StoreError::ReadError)?, + } + } + Ok(res) + }) + .await + .map_err(|e| StoreError::Custom(format!("task panicked: {e}")))? + } + + // Helper method to read from a libmdbx table + fn read_sync(&self, key: T::Key) -> Result, StoreError> { let txn = self.db.begin_read().map_err(StoreError::LibmdbxError)?; txn.get::(key).map_err(StoreError::LibmdbxError) } @@ -87,7 +117,9 @@ impl Store { &self, number: BlockNumber, ) -> Result, StoreError> { - Ok(self.read::(number)?.map(|a| a.to())) + Ok(self + .read_sync::(number)? + .map(|a| a.to())) } } @@ -120,7 +152,7 @@ impl StoreEngine for Store { block_number: BlockNumber, ) -> Result, StoreError> { if let Some(hash) = self.get_block_hash_by_block_number(block_number)? { - Ok(self.read::(hash.into())?.map(|b| b.to())) + Ok(self.read_sync::(hash.into())?.map(|b| b.to())) } else { Ok(None) } @@ -183,26 +215,54 @@ impl StoreEngine for Store { self.write_batch::(key_values).await } - fn get_block_body(&self, block_number: BlockNumber) -> Result, StoreError> { + async fn get_block_body( + &self, + block_number: BlockNumber, + ) -> Result, StoreError> { if let Some(hash) = self.get_block_hash_by_block_number(block_number)? { - self.get_block_body_by_hash(hash) + self.get_block_body_by_hash(hash).await } else { Ok(None) } } - fn get_block_body_by_hash( + async fn get_block_bodies( + &self, + from: BlockNumber, + to: BlockNumber, + ) -> Result, StoreError> { + let numbers = (from..=to).collect(); + let hashes = self.read_bulk::(numbers).await?; + let blocks = self.read_bulk::(hashes).await?; + Ok(blocks.into_iter().map(|b| b.to()).collect()) + } + + async fn get_block_bodies_by_hash( + &self, + hashes: Vec, + ) -> Result, StoreError> { + let hashes = hashes.into_iter().map(|h| h.into()).collect(); + let blocks = self.read_bulk::(hashes).await?; + Ok(blocks.into_iter().map(|b| b.to()).collect()) + } + + async fn get_block_body_by_hash( &self, block_hash: BlockHash, ) -> Result, StoreError> { - Ok(self.read::(block_hash.into())?.map(|b| b.to())) + Ok(self + .read::(block_hash.into()) + .await? + .map(|b| b.to())) } fn get_block_header_by_hash( &self, block_hash: BlockHash, ) -> Result, StoreError> { - Ok(self.read::(block_hash.into())?.map(|b| b.to())) + Ok(self + .read_sync::(block_hash.into())? + .map(|b| b.to())) } async fn add_block_number( @@ -214,8 +274,11 @@ impl StoreEngine for Store { .await } - fn get_block_number(&self, block_hash: BlockHash) -> Result, StoreError> { - self.read::(block_hash.into()) + async fn get_block_number( + &self, + block_hash: BlockHash, + ) -> Result, StoreError> { + self.read::(block_hash.into()).await } async fn add_account_code(&self, code_hash: H256, code: Bytes) -> Result<(), StoreError> { @@ -224,7 +287,9 @@ impl StoreEngine for Store { } fn get_account_code(&self, code_hash: H256) -> Result, StoreError> { - Ok(self.read::(code_hash.into())?.map(|b| b.to())) + Ok(self + .read_sync::(code_hash.into())? + .map(|b| b.to())) } async fn add_receipt( @@ -240,7 +305,7 @@ impl StoreEngine for Store { self.write_batch::(entries).await } - fn get_receipt( + async fn get_receipt( &self, block_number: BlockNumber, index: Index, @@ -269,7 +334,7 @@ impl StoreEngine for Store { .await } - fn get_transaction_location( + async fn get_transaction_location( &self, transaction_hash: H256, ) -> Result, StoreError> { @@ -298,7 +363,7 @@ impl StoreEngine for Store { } fn get_chain_config(&self) -> Result { - match self.read::(ChainDataIndex::ChainConfig)? { + match self.read_sync::(ChainDataIndex::ChainConfig)? { None => Err(StoreError::Custom("Chain config not found".to_string())), Some(bytes) => { let json = String::from_utf8(bytes).map_err(|_| StoreError::DecodeError)?; @@ -320,8 +385,11 @@ impl StoreEngine for Store { .await } - fn get_earliest_block_number(&self) -> Result, StoreError> { - match self.read::(ChainDataIndex::EarliestBlockNumber)? { + async fn get_earliest_block_number(&self) -> Result, StoreError> { + match self + .read::(ChainDataIndex::EarliestBlockNumber) + .await? + { None => Ok(None), Some(ref rlp) => RLPDecode::decode(rlp) .map(Some) @@ -340,8 +408,11 @@ impl StoreEngine for Store { .await } - fn get_finalized_block_number(&self) -> Result, StoreError> { - match self.read::(ChainDataIndex::FinalizedBlockNumber)? { + async fn get_finalized_block_number(&self) -> Result, StoreError> { + match self + .read::(ChainDataIndex::FinalizedBlockNumber) + .await? + { None => Ok(None), Some(ref rlp) => RLPDecode::decode(rlp) .map(Some) @@ -357,8 +428,11 @@ impl StoreEngine for Store { .await } - fn get_safe_block_number(&self) -> Result, StoreError> { - match self.read::(ChainDataIndex::SafeBlockNumber)? { + async fn get_safe_block_number(&self) -> Result, StoreError> { + match self + .read::(ChainDataIndex::SafeBlockNumber) + .await? + { None => Ok(None), Some(ref rlp) => RLPDecode::decode(rlp) .map(Some) @@ -377,8 +451,11 @@ impl StoreEngine for Store { .await } - fn get_latest_block_number(&self) -> Result, StoreError> { - match self.read::(ChainDataIndex::LatestBlockNumber)? { + async fn get_latest_block_number(&self) -> Result, StoreError> { + match self + .read::(ChainDataIndex::LatestBlockNumber) + .await? + { None => Ok(None), Some(ref rlp) => RLPDecode::decode(rlp) .map(Some) @@ -397,8 +474,11 @@ impl StoreEngine for Store { .await } - fn get_pending_block_number(&self) -> Result, StoreError> { - match self.read::(ChainDataIndex::PendingBlockNumber)? { + async fn get_pending_block_number(&self) -> Result, StoreError> { + match self + .read::(ChainDataIndex::PendingBlockNumber) + .await? + { None => Ok(None), Some(ref rlp) => RLPDecode::decode(rlp) .map(Some) @@ -428,11 +508,12 @@ impl StoreEngine for Store { .await } - fn get_canonical_block_hash( + async fn get_canonical_block_hash( &self, number: BlockNumber, ) -> Result, StoreError> { self.read::(number) + .await .map(|o| o.map(|hash_rlp| hash_rlp.to())) } @@ -441,8 +522,8 @@ impl StoreEngine for Store { .await } - fn get_payload(&self, payload_id: u64) -> Result, StoreError> { - let r = self.read::(payload_id)?; + async fn get_payload(&self, payload_id: u64) -> Result, StoreError> { + let r = self.read::(payload_id).await?; Ok(r.map(|b| b.to())) } @@ -454,24 +535,24 @@ impl StoreEngine for Store { self.write::(payload_id, payload.into()).await } - fn get_transaction_by_hash( + async fn get_transaction_by_hash( &self, transaction_hash: H256, ) -> Result, StoreError> { let (_block_number, block_hash, index) = - match self.get_transaction_location(transaction_hash)? { + match self.get_transaction_location(transaction_hash).await? { Some(location) => location, None => return Ok(None), }; - self.get_transaction_by_location(block_hash, index) + self.get_transaction_by_location(block_hash, index).await } - fn get_transaction_by_location( + async fn get_transaction_by_location( &self, block_hash: H256, index: u64, ) -> Result, StoreError> { - let block_body = match self.get_block_body_by_hash(block_hash)? { + let block_body = match self.get_block_body_by_hash(block_hash).await? { Some(body) => body, None => return Ok(None), }; @@ -481,12 +562,12 @@ impl StoreEngine for Store { .and_then(|index: usize| block_body.transactions.get(index).cloned())) } - fn get_block_by_hash(&self, block_hash: BlockHash) -> Result, StoreError> { + async fn get_block_by_hash(&self, block_hash: BlockHash) -> Result, StoreError> { let header = match self.get_block_header_by_hash(block_hash)? { Some(header) => header, None => return Ok(None), }; - let body = match self.get_block_body_by_hash(block_hash)? { + let body = match self.get_block_body_by_hash(block_hash).await? { Some(body) => body, None => return Ok(None), }; @@ -511,9 +592,10 @@ impl StoreEngine for Store { .await } - fn get_pending_block(&self, block_hash: BlockHash) -> Result, StoreError> { + async fn get_pending_block(&self, block_hash: BlockHash) -> Result, StoreError> { Ok(self - .read::(block_hash.into())? + .read::(block_hash.into()) + .await? .map(|b| b.to())) } @@ -607,8 +689,9 @@ impl StoreEngine for Store { .await } - fn get_header_download_checkpoint(&self) -> Result, StoreError> { - self.read::(SnapStateIndex::HeaderDownloadCheckpoint)? + async fn get_header_download_checkpoint(&self) -> Result, StoreError> { + self.read::(SnapStateIndex::HeaderDownloadCheckpoint) + .await? .map(|ref h| BlockHash::decode(h)) .transpose() .map_err(StoreError::RLPDecode) @@ -625,10 +708,11 @@ impl StoreEngine for Store { .await } - fn get_state_trie_key_checkpoint( + async fn get_state_trie_key_checkpoint( &self, ) -> Result, StoreError> { - self.read::(SnapStateIndex::StateTrieKeyCheckpoint)? + self.read::(SnapStateIndex::StateTrieKeyCheckpoint) + .await? .map(|ref c| { >::decode(c)? .try_into() @@ -677,8 +761,8 @@ impl StoreEngine for Store { Ok(res) } - fn is_synced(&self) -> Result { - match self.read::(ChainDataIndex::IsSynced)? { + async fn is_synced(&self) -> Result { + match self.read::(ChainDataIndex::IsSynced).await? { None => Err(StoreError::Custom("Sync status not found".to_string())), Some(ref rlp) => RLPDecode::decode(rlp).map_err(|_| StoreError::DecodeError), } @@ -694,8 +778,9 @@ impl StoreEngine for Store { .await } - fn get_state_heal_paths(&self) -> Result>, StoreError> { - self.read::(SnapStateIndex::StateHealPaths)? + async fn get_state_heal_paths(&self) -> Result>, StoreError> { + self.read::(SnapStateIndex::StateHealPaths) + .await? .map(|ref h| >::decode(h)) .transpose() .map_err(StoreError::RLPDecode) @@ -786,11 +871,12 @@ impl StoreEngine for Store { .await } - fn get_state_trie_rebuild_checkpoint( + async fn get_state_trie_rebuild_checkpoint( &self, ) -> Result, StoreError> { let Some((root, checkpoints)) = self - .read::(SnapStateIndex::StateTrieRebuildCheckpoint)? + .read::(SnapStateIndex::StateTrieRebuildCheckpoint) + .await? .map(|ref c| <(H256, Vec)>::decode(c)) .transpose()? else { @@ -815,8 +901,11 @@ impl StoreEngine for Store { .await } - fn get_storage_trie_rebuild_pending(&self) -> Result>, StoreError> { - self.read::(SnapStateIndex::StorageTrieRebuildPending)? + async fn get_storage_trie_rebuild_pending( + &self, + ) -> Result>, StoreError> { + self.read::(SnapStateIndex::StorageTrieRebuildPending) + .await? .map(|ref h| >::decode(h)) .transpose() .map_err(StoreError::RLPDecode) @@ -849,7 +938,7 @@ impl StoreEngine for Store { Ok(iter.collect::>()) } - fn read_storage_snapshot( + async fn read_storage_snapshot( &self, account_hash: H256, start: H256, @@ -868,8 +957,12 @@ impl StoreEngine for Store { Ok(iter.collect::>()) } - fn get_latest_valid_ancestor(&self, block: BlockHash) -> Result, StoreError> { + async fn get_latest_valid_ancestor( + &self, + block: BlockHash, + ) -> Result, StoreError> { self.read::(block.into()) + .await .map(|o| o.map(|a| a.to())) } diff --git a/crates/storage/store_db/redb.rs b/crates/storage/store_db/redb.rs index 7fa19be1a17..416546097af 100644 --- a/crates/storage/store_db/redb.rs +++ b/crates/storage/store_db/redb.rs @@ -203,7 +203,32 @@ impl RedBStore { } // Helper method to read from a redb table - fn read<'k, 'a, K, V>( + async fn read<'k, 'a, K, V>( + &self, + table: TableDefinition<'a, K, V>, + key: K::SelfType<'k>, + ) -> Result>, StoreError> + where + K: Key + Send + 'static, + V: Value + Send, + K::SelfType<'k>: Send, + 'a: 'static, + 'k: 'static, + { + let db = self.db.clone(); + tokio::task::spawn_blocking(move || { + let read_txn = db.begin_read()?; + let table = read_txn.open_table(table)?; + let result = table.get(key)?; + + Ok(result) + }) + .await + .map_err(|e| StoreError::Custom(format!("task panicked: {e}")))? + } + + // Helper method to read from a redb table + fn read_sync<'k, 'a, K, V>( &self, table: TableDefinition<'a, K, V>, key: impl Borrow>, @@ -219,6 +244,35 @@ impl RedBStore { Ok(result) } + // Helper method to read in bulk from a redb table + async fn read_bulk<'k, 'a, K, V>( + &self, + table: TableDefinition<'a, K, V>, + keys: Vec>, + ) -> Result>, StoreError> + where + K: Key + Send + 'static, + V: Value + Send, + K::SelfType<'k>: Send, + 'a: 'static, + 'k: 'static, + { + let db = self.db.clone(); + tokio::task::spawn_blocking(move || { + let read_txn = db.begin_read()?; + let table = read_txn.open_table(table)?; + let mut result = Vec::new(); + for key in keys { + if let Some(val) = table.get(key)? { + result.push(val); + } + } + Ok(result) + }) + .await + .map_err(|e| StoreError::Custom(format!("task panicked: {e}")))? + } + // Helper method to delete from a redb table fn delete<'k, 'v, 'a, K, V>( &self, @@ -241,7 +295,7 @@ impl RedBStore { number: BlockNumber, ) -> Result, StoreError> { Ok(self - .read(CANONICAL_BLOCK_HASHES_TABLE, number)? + .read_sync(CANONICAL_BLOCK_HASHES_TABLE, number)? .map(|a| a.value().to())) } } @@ -285,7 +339,7 @@ impl StoreEngine for RedBStore { ) -> Result, StoreError> { if let Some(hash) = self.get_block_hash_by_block_number(block_number)? { Ok(self - .read(HEADERS_TABLE, >::into(hash))? + .read_sync(HEADERS_TABLE, >::into(hash))? .map(|b| b.value().to())) } else { Ok(None) @@ -370,15 +424,44 @@ impl StoreEngine for RedBStore { .await } - fn get_block_body(&self, block_number: BlockNumber) -> Result, StoreError> { + async fn get_block_body( + &self, + block_number: BlockNumber, + ) -> Result, StoreError> { if let Some(hash) = self.get_block_hash_by_block_number(block_number)? { - self.get_block_body_by_hash(hash) + self.get_block_body_by_hash(hash).await } else { Ok(None) } } - fn get_block_body_by_hash( + async fn get_block_bodies( + &self, + from: BlockNumber, + to: BlockNumber, + ) -> Result, StoreError> { + let numbers = (from..=to).collect(); + let hashes = self + .read_bulk(CANONICAL_BLOCK_HASHES_TABLE, numbers) + .await?; + let hashes: Vec = hashes.into_iter().map(|v| v.value()).collect(); + let blocks = self.read_bulk(BLOCK_BODIES_TABLE, hashes).await?; + Ok(blocks.into_iter().map(|b| b.value().to()).collect()) + } + + async fn get_block_bodies_by_hash( + &self, + hashes: Vec, + ) -> Result, StoreError> { + let hashes = hashes + .into_iter() + .map(>::into) + .collect(); + let blocks = self.read_bulk(BLOCK_BODIES_TABLE, hashes).await?; + Ok(blocks.into_iter().map(|b| b.value().to()).collect()) + } + + async fn get_block_body_by_hash( &self, block_hash: BlockHash, ) -> Result, StoreError> { @@ -386,7 +469,8 @@ impl StoreEngine for RedBStore { .read( BLOCK_BODIES_TABLE, >::into(block_hash), - )? + ) + .await? .map(|b| b.value().to())) } @@ -395,7 +479,7 @@ impl StoreEngine for RedBStore { block_hash: BlockHash, ) -> Result, StoreError> { Ok(self - .read( + .read_sync( HEADERS_TABLE, >::into(block_hash), )? @@ -411,12 +495,13 @@ impl StoreEngine for RedBStore { .await } - fn get_pending_block(&self, block_hash: BlockHash) -> Result, StoreError> { + async fn get_pending_block(&self, block_hash: BlockHash) -> Result, StoreError> { Ok(self .read( PENDING_BLOCKS_TABLE, >::into(block_hash), - )? + ) + .await? .map(|b| b.value().to())) } @@ -433,12 +518,16 @@ impl StoreEngine for RedBStore { .await } - fn get_block_number(&self, block_hash: BlockHash) -> Result, StoreError> { + async fn get_block_number( + &self, + block_hash: BlockHash, + ) -> Result, StoreError> { Ok(self .read( BLOCK_NUMBERS_TABLE, >::into(block_hash), - )? + ) + .await? .map(|b| b.value())) } @@ -461,7 +550,7 @@ impl StoreEngine for RedBStore { .await } - fn get_transaction_location( + async fn get_transaction_location( &self, transaction_hash: ethrex_common::H256, ) -> Result, StoreError> { @@ -518,7 +607,7 @@ impl StoreEngine for RedBStore { self.write_batch(RECEIPTS_TABLE, key_values).await } - fn get_receipt( + async fn get_receipt( &self, block_number: BlockNumber, index: Index, @@ -528,7 +617,8 @@ impl StoreEngine for RedBStore { .read( RECEIPTS_TABLE, <(H256, u64) as Into>>::into((hash, index)), - )? + ) + .await? .map(|b| b.value().to())) } else { Ok(None) @@ -553,18 +643,19 @@ impl StoreEngine for RedBStore { code_hash: ethrex_common::H256, ) -> Result, StoreError> { Ok(self - .read( + .read_sync( ACCOUNT_CODES_TABLE, >::into(code_hash), )? .map(|b| b.value().to())) } - fn get_canonical_block_hash( + async fn get_canonical_block_hash( &self, block_number: BlockNumber, ) -> Result, StoreError> { self.read(CANONICAL_BLOCK_HASHES_TABLE, block_number) + .await .map(|o| o.map(|hash_rlp| hash_rlp.value().to())) } @@ -580,7 +671,7 @@ impl StoreEngine for RedBStore { } fn get_chain_config(&self) -> Result { - match self.read(CHAIN_DATA_TABLE, ChainDataIndex::ChainConfig)? { + match self.read_sync(CHAIN_DATA_TABLE, ChainDataIndex::ChainConfig)? { None => Err(StoreError::Custom("Chain config not found".to_string())), Some(bytes) => { let json = String::from_utf8(bytes.value()).map_err(|_| StoreError::DecodeError)?; @@ -603,8 +694,11 @@ impl StoreEngine for RedBStore { .await } - fn get_earliest_block_number(&self) -> Result, StoreError> { - match self.read(CHAIN_DATA_TABLE, ChainDataIndex::EarliestBlockNumber)? { + async fn get_earliest_block_number(&self) -> Result, StoreError> { + match self + .read(CHAIN_DATA_TABLE, ChainDataIndex::EarliestBlockNumber) + .await? + { None => Ok(None), Some(ref rlp) => RLPDecode::decode(&rlp.value()) .map(Some) @@ -624,8 +718,11 @@ impl StoreEngine for RedBStore { .await } - fn get_finalized_block_number(&self) -> Result, StoreError> { - match self.read(CHAIN_DATA_TABLE, ChainDataIndex::FinalizedBlockNumber)? { + async fn get_finalized_block_number(&self) -> Result, StoreError> { + match self + .read(CHAIN_DATA_TABLE, ChainDataIndex::FinalizedBlockNumber) + .await? + { None => Ok(None), Some(ref rlp) => RLPDecode::decode(&rlp.value()) .map(Some) @@ -642,8 +739,11 @@ impl StoreEngine for RedBStore { .await } - fn get_safe_block_number(&self) -> Result, StoreError> { - match self.read(CHAIN_DATA_TABLE, ChainDataIndex::SafeBlockNumber)? { + async fn get_safe_block_number(&self) -> Result, StoreError> { + match self + .read(CHAIN_DATA_TABLE, ChainDataIndex::SafeBlockNumber) + .await? + { None => Ok(None), Some(ref rlp) => RLPDecode::decode(&rlp.value()) .map(Some) @@ -663,8 +763,11 @@ impl StoreEngine for RedBStore { .await } - fn get_latest_block_number(&self) -> Result, StoreError> { - match self.read(CHAIN_DATA_TABLE, ChainDataIndex::LatestBlockNumber)? { + async fn get_latest_block_number(&self) -> Result, StoreError> { + match self + .read(CHAIN_DATA_TABLE, ChainDataIndex::LatestBlockNumber) + .await? + { None => Ok(None), Some(ref rlp) => RLPDecode::decode(&rlp.value()) .map(Some) @@ -684,8 +787,11 @@ impl StoreEngine for RedBStore { .await } - fn get_pending_block_number(&self) -> Result, StoreError> { - match self.read(CHAIN_DATA_TABLE, ChainDataIndex::PendingBlockNumber)? { + async fn get_pending_block_number(&self) -> Result, StoreError> { + match self + .read(CHAIN_DATA_TABLE, ChainDataIndex::PendingBlockNumber) + .await? + { None => Ok(None), Some(ref rlp) => RLPDecode::decode(&rlp.value()) .map(Some) @@ -733,9 +839,10 @@ impl StoreEngine for RedBStore { .await } - fn get_payload(&self, payload_id: u64) -> Result, StoreError> { + async fn get_payload(&self, payload_id: u64) -> Result, StoreError> { Ok(self - .read(PAYLOADS_TABLE, payload_id)? + .read(PAYLOADS_TABLE, payload_id) + .await? .map(|b| b.value().to())) } @@ -838,8 +945,9 @@ impl StoreEngine for RedBStore { .await } - fn get_header_download_checkpoint(&self) -> Result, StoreError> { - self.read(SNAP_STATE_TABLE, SnapStateIndex::HeaderDownloadCheckpoint)? + async fn get_header_download_checkpoint(&self) -> Result, StoreError> { + self.read(SNAP_STATE_TABLE, SnapStateIndex::HeaderDownloadCheckpoint) + .await? .map(|rlp| RLPDecode::decode(&rlp.value())) .transpose() .map_err(StoreError::RLPDecode) @@ -854,8 +962,9 @@ impl StoreEngine for RedBStore { .await } - fn get_state_trie_key_checkpoint(&self) -> Result, StoreError> { - self.read(SNAP_STATE_TABLE, SnapStateIndex::StateTrieKeyCheckpoint)? + async fn get_state_trie_key_checkpoint(&self) -> Result, StoreError> { + self.read(SNAP_STATE_TABLE, SnapStateIndex::StateTrieKeyCheckpoint) + .await? .map(|rlp| { >::decode(&rlp.value())? .try_into() @@ -909,8 +1018,11 @@ impl StoreEngine for RedBStore { Ok(res) } - fn is_synced(&self) -> Result { - match self.read(CHAIN_DATA_TABLE, ChainDataIndex::IsSynced)? { + async fn is_synced(&self) -> Result { + match self + .read(CHAIN_DATA_TABLE, ChainDataIndex::IsSynced) + .await? + { None => Err(StoreError::Custom("Sync status not found".to_string())), Some(ref rlp) => RLPDecode::decode(&rlp.value()).map_err(|_| StoreError::DecodeError), } @@ -934,8 +1046,9 @@ impl StoreEngine for RedBStore { .await } - fn get_state_heal_paths(&self) -> Result>, StoreError> { - self.read(SNAP_STATE_TABLE, SnapStateIndex::StateHealPaths)? + async fn get_state_heal_paths(&self) -> Result>, StoreError> { + self.read(SNAP_STATE_TABLE, SnapStateIndex::StateHealPaths) + .await? .map(|rlp| RLPDecode::decode(&rlp.value())) .transpose() .map_err(StoreError::RLPDecode) @@ -1025,11 +1138,12 @@ impl StoreEngine for RedBStore { .await } - fn get_state_trie_rebuild_checkpoint( + async fn get_state_trie_rebuild_checkpoint( &self, ) -> Result, StoreError> { let Some((root, checkpoints)) = self - .read(SNAP_STATE_TABLE, SnapStateIndex::StateTrieRebuildCheckpoint)? + .read(SNAP_STATE_TABLE, SnapStateIndex::StateTrieRebuildCheckpoint) + .await? .map(|ref rlp| <(H256, Vec)>::decode(&rlp.value())) .transpose()? else { @@ -1055,8 +1169,11 @@ impl StoreEngine for RedBStore { .await } - fn get_storage_trie_rebuild_pending(&self) -> Result>, StoreError> { - self.read(SNAP_STATE_TABLE, SnapStateIndex::StorageTrieRebuildPending)? + async fn get_storage_trie_rebuild_pending( + &self, + ) -> Result>, StoreError> { + self.read(SNAP_STATE_TABLE, SnapStateIndex::StorageTrieRebuildPending) + .await? .map(|p| RLPDecode::decode(&p.value())) .transpose() .map_err(StoreError::RLPDecode) @@ -1086,7 +1203,7 @@ impl StoreEngine for RedBStore { .collect()) } - fn read_storage_snapshot( + async fn read_storage_snapshot( &self, start: H256, account_hash: H256, @@ -1109,12 +1226,16 @@ impl StoreEngine for RedBStore { .collect()) } - fn get_latest_valid_ancestor(&self, block: BlockHash) -> Result, StoreError> { + async fn get_latest_valid_ancestor( + &self, + block: BlockHash, + ) -> Result, StoreError> { Ok(self .read( INVALID_ANCESOTRS_TABLE, >::into(block), - )? + ) + .await? .map(|b| b.value().to())) }