diff --git a/Cargo.lock b/Cargo.lock index a04a2eb123..121792ad66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1605,6 +1605,7 @@ dependencies = [ "sp-runtime", "sp-storage", "sp-transaction-pool", + "tokio", ] [[package]] @@ -7790,9 +7791,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.10.0" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01cf844b23c6131f624accf65ce0e4e9956a8bb329400ea5bcc26ae3a5c20b0b" +checksum = "70e992e41e0d2fb9f755b37446f20900f64446ef54874f40a60c78f021ac6144" dependencies = [ "autocfg", "bytes 1.0.1", diff --git a/client/rpc-core/src/eth.rs b/client/rpc-core/src/eth.rs index 9f171523b7..2256c8ef57 100644 --- a/client/rpc-core/src/eth.rs +++ b/client/rpc-core/src/eth.rs @@ -80,11 +80,11 @@ pub trait EthApi { /// Returns block with given hash. #[rpc(name = "eth_getBlockByHash")] - fn block_by_hash(&self, _: H256, _: bool) -> Result>; + fn block_by_hash(&self, _: H256, _: bool) -> BoxFuture>>; /// Returns block with given number. #[rpc(name = "eth_getBlockByNumber")] - fn block_by_number(&self, _: BlockNumber, _: bool) -> Result>; + fn block_by_number(&self, _: BlockNumber, _: bool) -> BoxFuture>>; /// Returns the number of transactions sent from given address at given time (block number). #[rpc(name = "eth_getTransactionCount")] @@ -125,16 +125,19 @@ pub trait EthApi { /// Estimate gas needed for execution of given contract. #[rpc(name = "eth_estimateGas")] - fn estimate_gas(&self, _: CallRequest, _: Option) -> Result; + fn estimate_gas(&self, _: CallRequest, _: Option) -> BoxFuture>; /// Get transaction by its hash. #[rpc(name = "eth_getTransactionByHash")] - fn transaction_by_hash(&self, _: H256) -> Result>; + fn transaction_by_hash(&self, _: H256) -> BoxFuture>>; /// Returns transaction at given block hash and index. #[rpc(name = "eth_getTransactionByBlockHashAndIndex")] - fn transaction_by_block_hash_and_index(&self, _: H256, _: Index) - -> Result>; + fn transaction_by_block_hash_and_index( + &self, + _: H256, + _: Index, + ) -> BoxFuture>>; /// Returns transaction by given block number and index. #[rpc(name = "eth_getTransactionByBlockNumberAndIndex")] @@ -142,11 +145,11 @@ pub trait EthApi { &self, _: BlockNumber, _: Index, - ) -> Result>; + ) -> BoxFuture>>; /// Returns transaction receipt by transaction hash. #[rpc(name = "eth_getTransactionReceipt")] - fn transaction_receipt(&self, _: H256) -> Result>; + fn transaction_receipt(&self, _: H256) -> BoxFuture>>; /// Returns an uncles at given block and index. #[rpc(name = "eth_getUncleByBlockHashAndIndex")] @@ -162,7 +165,7 @@ pub trait EthApi { /// Returns logs matching given filter object. #[rpc(name = "eth_getLogs")] - fn logs(&self, _: Filter) -> Result>; + fn logs(&self, _: Filter) -> BoxFuture>>; /// Returns the hash of the current block, the seedHash, and the boundary condition to be met. #[rpc(name = "eth_getWork")] @@ -203,11 +206,11 @@ pub trait EthFilterApi { /// Returns filter changes since last poll. #[rpc(name = "eth_getFilterChanges")] - fn filter_changes(&self, _: Index) -> Result; + fn filter_changes(&self, _: Index) -> BoxFuture>; /// Returns all logs matching given filter (in a range 'from' - 'to'). #[rpc(name = "eth_getFilterLogs")] - fn filter_logs(&self, _: Index) -> Result>; + fn filter_logs(&self, _: Index) -> BoxFuture>>; /// Uninstalls filter. #[rpc(name = "eth_uninstallFilter")] diff --git a/client/rpc/Cargo.toml b/client/rpc/Cargo.toml index 10f664e933..fc982458de 100644 --- a/client/rpc/Cargo.toml +++ b/client/rpc/Cargo.toml @@ -46,6 +46,7 @@ libsecp256k1 = "0.3" rand = "0.7" lru = "0.6.6" parking_lot = "0.11.1" +tokio = { version = "1.14", features = [ "sync" ] } [features] rpc_binary_search_estimate = [] diff --git a/client/rpc/src/eth.rs b/client/rpc/src/eth.rs index eff9885eb0..72b8f22072 100644 --- a/client/rpc/src/eth.rs +++ b/client/rpc/src/eth.rs @@ -40,6 +40,7 @@ use sc_client_api::{ client::BlockchainEvents, }; use sc_network::{ExHashT, NetworkService}; +use sc_service::SpawnTaskHandle; use sc_transaction_pool::{ChainApi, Pool}; use sc_transaction_pool_api::{InPoolTransaction, TransactionPool}; use sha3::{Digest, Keccak256}; @@ -51,11 +52,12 @@ use sp_runtime::{ transaction_validity::TransactionSource, }; use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashMap}, marker::PhantomData, sync::{Arc, Mutex}, time, }; +use tokio::sync::{mpsc, oneshot}; use crate::overrides::OverrideHandle; use codec::{self, Decode, Encode}; @@ -315,10 +317,9 @@ where Ok(api) } -fn filter_range_logs( +async fn filter_range_logs( client: &C, backend: &fc_db::Backend, - overrides: &OverrideHandle, block_data_cache: &EthBlockDataCache, ret: &mut Vec, max_past_logs: u32, @@ -397,19 +398,16 @@ where } } }; - let handler = overrides - .schemas - .get(&schema) - .unwrap_or(&overrides.fallback); - let block = block_data_cache.current_block(handler, substrate_hash); + let block = block_data_cache.current_block(schema, substrate_hash).await; if let Some(block) = block { if FilteredParams::address_in_bloom(block.header.logs_bloom, &address_bloom_filter) && FilteredParams::topics_in_bloom(block.header.logs_bloom, &topics_bloom_filter) { - let statuses = - block_data_cache.current_transaction_statuses(handler, substrate_hash); + let statuses = block_data_cache + .current_transaction_statuses(schema, substrate_hash) + .await; if let Some(statuses) = statuses { filter_block_logs(ret, filter, block, statuses); } @@ -689,93 +687,107 @@ where } } - fn block_by_hash(&self, hash: H256, full: bool) -> Result> { - let id = match frontier_backend_client::load_hash::(self.backend.as_ref(), hash) - .map_err(|err| internal_err(format!("{:?}", err)))? - { - Some(hash) => hash, - _ => return Ok(None), - }; - let substrate_hash = self - .client - .expect_block_hash_from_id(&id) - .map_err(|_| internal_err(format!("Expect block number from id: {}", id)))?; - - let schema = - frontier_backend_client::onchain_storage_schema::(self.client.as_ref(), id); - let handler = self - .overrides - .schemas - .get(&schema) - .unwrap_or(&self.overrides.fallback); - - let block = self.block_data_cache.current_block(handler, substrate_hash); - let statuses = self - .block_data_cache - .current_transaction_statuses(handler, substrate_hash); - - let base_fee = handler.base_fee(&id); - let is_eip1559 = handler.is_eip1559(&id); - - match (block, statuses) { - (Some(block), Some(statuses)) => Ok(Some(rich_block_build( - block, - statuses.into_iter().map(|s| Some(s)).collect(), - Some(hash), - full, - base_fee, - is_eip1559, - ))), - _ => Ok(None), - } - } - - fn block_by_number(&self, number: BlockNumber, full: bool) -> Result> { - let id = match frontier_backend_client::native_block_id::( - self.client.as_ref(), - self.backend.as_ref(), - Some(number), - )? { - Some(id) => id, - None => return Ok(None), - }; - let substrate_hash = self - .client - .expect_block_hash_from_id(&id) - .map_err(|_| internal_err(format!("Expect block number from id: {}", id)))?; + fn block_by_hash(&self, hash: H256, full: bool) -> BoxFuture>> { + let client = Arc::clone(&self.client); + let overrides = Arc::clone(&self.overrides); + let block_data_cache = Arc::clone(&self.block_data_cache); + let backend = Arc::clone(&self.backend); - let schema = - frontier_backend_client::onchain_storage_schema::(self.client.as_ref(), id); - let handler = self - .overrides - .schemas - .get(&schema) - .unwrap_or(&self.overrides.fallback); + Box::pin(async move { + let id = match frontier_backend_client::load_hash::(backend.as_ref(), hash) + .map_err(|err| internal_err(format!("{:?}", err)))? + { + Some(hash) => hash, + _ => return Ok(None), + }; + let substrate_hash = client + .expect_block_hash_from_id(&id) + .map_err(|_| internal_err(format!("Expect block number from id: {}", id)))?; - let block = self.block_data_cache.current_block(handler, substrate_hash); - let statuses = self - .block_data_cache - .current_transaction_statuses(handler, substrate_hash); + let schema = + frontier_backend_client::onchain_storage_schema::(client.as_ref(), id); + let handler = overrides + .schemas + .get(&schema) + .unwrap_or(&overrides.fallback); - let base_fee = handler.base_fee(&id); - let is_eip1559 = handler.is_eip1559(&id); + let block = block_data_cache.current_block(schema, substrate_hash).await; + let statuses = block_data_cache + .current_transaction_statuses(schema, substrate_hash) + .await; - match (block, statuses) { - (Some(block), Some(statuses)) => { - let hash = - H256::from_slice(Keccak256::digest(&rlp::encode(&block.header)).as_slice()); + let base_fee = handler.base_fee(&id); + let is_eip1559 = handler.is_eip1559(&id); - Ok(Some(rich_block_build( + match (block, statuses) { + (Some(block), Some(statuses)) => Ok(Some(rich_block_build( block, statuses.into_iter().map(|s| Some(s)).collect(), Some(hash), full, base_fee, is_eip1559, - ))) + ))), + _ => Ok(None), } - _ => Ok(None), - } + }) + } + + fn block_by_number( + &self, + number: BlockNumber, + full: bool, + ) -> BoxFuture>> { + let client = Arc::clone(&self.client); + let overrides = Arc::clone(&self.overrides); + let block_data_cache = Arc::clone(&self.block_data_cache); + let backend = Arc::clone(&self.backend); + + Box::pin(async move { + let id = match frontier_backend_client::native_block_id::( + client.as_ref(), + backend.as_ref(), + Some(number), + )? { + Some(id) => id, + None => return Ok(None), + }; + let substrate_hash = client + .expect_block_hash_from_id(&id) + .map_err(|_| internal_err(format!("Expect block number from id: {}", id)))?; + + let schema = + frontier_backend_client::onchain_storage_schema::(client.as_ref(), id); + let handler = overrides + .schemas + .get(&schema) + .unwrap_or(&overrides.fallback); + + let block = block_data_cache.current_block(schema, substrate_hash).await; + let statuses = block_data_cache + .current_transaction_statuses(schema, substrate_hash) + .await; + + let base_fee = handler.base_fee(&id); + let is_eip1559 = handler.is_eip1559(&id); + + match (block, statuses) { + (Some(block), Some(statuses)) => { + let hash = + H256::from_slice(Keccak256::digest(&rlp::encode(&block.header)).as_slice()); + + Ok(Some(rich_block_build( + block, + statuses.into_iter().map(|s| Some(s)).collect(), + Some(hash), + full, + base_fee, + is_eip1559, + ))) + } + _ => Ok(None), + } + }) } fn transaction_count(&self, address: H160, number: Option) -> Result { @@ -1278,91 +1290,93 @@ where } } - fn estimate_gas(&self, request: CallRequest, _: Option) -> Result { - // Define the lower bound of estimate - const MIN_GAS_PER_TX: U256 = U256([21_000, 0, 0, 0]); - - // Get best hash (TODO missing support for estimating gas historically) - let best_hash = self.client.info().best_hash; - - // For simple transfer to simple account, return MIN_GAS_PER_TX directly - let is_simple_transfer = match &request.data { - None => true, - Some(vec) => vec.0.is_empty(), - }; - if is_simple_transfer { - if let Some(to) = request.to { - let to_code = self - .client - .runtime_api() - .account_code_at(&BlockId::Hash(best_hash), to) - .map_err(|err| internal_err(format!("runtime error: {:?}", err)))?; - if to_code.is_empty() { - return Ok(MIN_GAS_PER_TX); + fn estimate_gas( + &self, + request: CallRequest, + _: Option, + ) -> BoxFuture> { + let client = Arc::clone(&self.client); + let block_data_cache = Arc::clone(&self.block_data_cache); + + Box::pin(async move { + // Define the lower bound of estimate + const MIN_GAS_PER_TX: U256 = U256([21_000, 0, 0, 0]); + + // Get best hash (TODO missing support for estimating gas historically) + let best_hash = client.info().best_hash; + + // For simple transfer to simple account, return MIN_GAS_PER_TX directly + let is_simple_transfer = match &request.data { + None => true, + Some(vec) => vec.0.is_empty(), + }; + if is_simple_transfer { + if let Some(to) = request.to { + let to_code = client + .runtime_api() + .account_code_at(&BlockId::Hash(best_hash), to) + .map_err(|err| internal_err(format!("runtime error: {:?}", err)))?; + if to_code.is_empty() { + return Ok(MIN_GAS_PER_TX); + } } } - } - let (gas_price, max_fee_per_gas, max_priority_fee_per_gas) = { - let details = fee_details( - request.gas_price, - request.max_fee_per_gas, - request.max_priority_fee_per_gas, - )?; - ( - details.gas_price, - details.max_fee_per_gas, - details.max_priority_fee_per_gas, - ) - }; + let (gas_price, max_fee_per_gas, max_priority_fee_per_gas) = { + let details = fee_details( + request.gas_price, + request.max_fee_per_gas, + request.max_priority_fee_per_gas, + )?; + ( + details.gas_price, + details.max_fee_per_gas, + details.max_priority_fee_per_gas, + ) + }; - let get_current_block_gas_limit = || -> Result { - let substrate_hash = self.client.info().best_hash; - let id = BlockId::Hash(substrate_hash); - let schema = - frontier_backend_client::onchain_storage_schema::(&self.client, id); - let handler = self - .overrides - .schemas - .get(&schema) - .unwrap_or(&self.overrides.fallback); - let block = self.block_data_cache.current_block(handler, substrate_hash); - if let Some(block) = block { - Ok(block.header.gas_limit) - } else { - return Err(internal_err("block unavailable, cannot query gas limit")); - } - }; + let get_current_block_gas_limit = || async { + let substrate_hash = client.info().best_hash; + let id = BlockId::Hash(substrate_hash); + let schema = + frontier_backend_client::onchain_storage_schema::(&client, id); + let block = block_data_cache.current_block(schema, substrate_hash).await; + if let Some(block) = block { + Ok(block.header.gas_limit) + } else { + return Err(internal_err("block unavailable, cannot query gas limit")); + } + }; - // Determine the highest possible gas limits - let mut highest = match request.gas { - Some(gas) => gas, - None => { - // query current block's gas limit - get_current_block_gas_limit()? - } - }; + // Determine the highest possible gas limits + let mut highest = match request.gas { + Some(gas) => gas, + None => { + // query current block's gas limit + get_current_block_gas_limit().await? + } + }; - let api = self.client.runtime_api(); + let api = client.runtime_api(); - // Recap the highest gas allowance with account's balance. - if let Some(from) = request.from { - let gas_price = gas_price.unwrap_or_default(); - if gas_price > U256::zero() { - let balance = api - .account_basic(&BlockId::Hash(best_hash), from) - .map_err(|err| internal_err(format!("runtime error: {:?}", err)))? - .balance; - let mut available = balance; - if let Some(value) = request.value { - if value > available { - return Err(internal_err("insufficient funds for transfer")); + // Recap the highest gas allowance with account's balance. + if let Some(from) = request.from { + let gas_price = gas_price.unwrap_or_default(); + if gas_price > U256::zero() { + let balance = api + .account_basic(&BlockId::Hash(best_hash), from) + .map_err(|err| internal_err(format!("runtime error: {:?}", err)))? + .balance; + let mut available = balance; + if let Some(value) = request.value { + if value > available { + return Err(internal_err("insufficient funds for transfer")); + } + available -= value; } - available -= value; - } - let allowance = available / gas_price; - if highest > allowance { - log::warn!( + let allowance = available / gas_price; + if highest > allowance { + log::warn!( "Gas estimation capped by limited funds original {} balance {} sent {} feecap {} fundable {}", highest, balance, @@ -1370,20 +1384,19 @@ where gas_price, allowance ); - highest = allowance; + highest = allowance; + } } } - } - struct ExecutableResult { - data: Vec, - exit_reason: ExitReason, - used_gas: U256, - } + struct ExecutableResult { + data: Vec, + exit_reason: ExitReason, + used_gas: U256, + } - // Create a helper to check if a gas allowance results in an executable transaction - let executable = - move |request: CallRequest, gas_limit, api_version| -> Result { + // Create a helper to check if a gas allowance results in an executable transaction + let executable = move |request, gas_limit, api_version| -> Result { let CallRequest { from, to, @@ -1527,434 +1540,462 @@ where used_gas, }) }; - let api_version = if let Ok(Some(api_version)) = - self.client - .runtime_api() - .api_version::>(&BlockId::Hash(best_hash)) - { - api_version - } else { - return Err(internal_err(format!( - "failed to retrieve Runtime Api version" - ))); - }; - - // Verify that the transaction succeed with highest capacity - let cap = highest; - let ExecutableResult { - data, - exit_reason, - used_gas, - } = executable(request.clone(), highest, api_version)?; - match exit_reason { - ExitReason::Succeed(_) => (), - ExitReason::Error(ExitError::OutOfGas) => { + let api_version = if let Ok(Some(api_version)) = + client + .runtime_api() + .api_version::>(&BlockId::Hash(best_hash)) + { + api_version + } else { return Err(internal_err(format!( - "gas required exceeds allowance {}", - cap - ))) + "failed to retrieve Runtime Api version" + ))); + }; + + // Verify that the transaction succeed with highest capacity + let cap = highest; + let ExecutableResult { + data, + exit_reason, + used_gas, + } = executable(request.clone(), highest, api_version)?; + match exit_reason { + ExitReason::Succeed(_) => (), + ExitReason::Error(ExitError::OutOfGas) => { + return Err(internal_err(format!( + "gas required exceeds allowance {}", + cap + ))) + } + // If the transaction reverts, there are two possible cases, + // it can revert because the called contract feels that it does not have enough + // gas left to continue, or it can revert for another reason unrelated to gas. + ExitReason::Revert(revert) => { + if request.gas.is_some() || request.gas_price.is_some() { + // If the user has provided a gas limit or a gas price, then we have executed + // with less block gas limit, so we must reexecute with block gas limit to + // know if the revert is due to a lack of gas or not. + let ExecutableResult { + data, + exit_reason, + used_gas: _, + } = executable( + request.clone(), + get_current_block_gas_limit().await?, + api_version, + )?; + match exit_reason { + ExitReason::Succeed(_) => { + return Err(internal_err(format!( + "gas required exceeds allowance {}", + cap + ))) + } + // The execution has been done with block gas limit, so it is not a lack of gas from the user. + other => error_on_execution_failure(&other, &data)?, + } + } else { + // The execution has already been done with block gas limit, so it is not a lack of gas from the user. + error_on_execution_failure(&ExitReason::Revert(revert), &data)? + } + } + other => error_on_execution_failure(&other, &data)?, + }; + + #[cfg(not(feature = "rpc_binary_search_estimate"))] + { + Ok(used_gas) } - // If the transaction reverts, there are two possible cases, - // it can revert because the called contract feels that it does not have enough - // gas left to continue, or it can revert for another reason unrelated to gas. - ExitReason::Revert(revert) => { - if request.gas.is_some() || request.gas_price.is_some() { - // If the user has provided a gas limit or a gas price, then we have executed - // with less block gas limit, so we must reexecute with block gas limit to - // know if the revert is due to a lack of gas or not. + #[cfg(feature = "rpc_binary_search_estimate")] + { + // Define the lower bound of the binary search + let mut lowest = MIN_GAS_PER_TX; + + // Start close to the used gas for faster binary search + let mut mid = std::cmp::min(used_gas * 3, (highest + lowest) / 2); + + // Execute the binary search and hone in on an executable gas limit. + let mut previous_highest = highest; + while (highest - lowest) > U256::one() { let ExecutableResult { data, exit_reason, used_gas: _, - } = executable(request.clone(), get_current_block_gas_limit()?, api_version)?; + } = executable(request.clone(), mid, api_version)?; match exit_reason { ExitReason::Succeed(_) => { - return Err(internal_err(format!( - "gas required exceeds allowance {}", - cap - ))) + highest = mid; + // If the variation in the estimate is less than 10%, + // then the estimate is considered sufficiently accurate. + if (previous_highest - highest) * 10 / previous_highest < U256::one() { + return Ok(highest); + } + previous_highest = highest; + } + ExitReason::Revert(_) | ExitReason::Error(ExitError::OutOfGas) => { + lowest = mid; } - // The execution has been done with block gas limit, so it is not a lack of gas from the user. other => error_on_execution_failure(&other, &data)?, } - } else { - // The execution has already been done with block gas limit, so it is not a lack of gas from the user. - error_on_execution_failure(&ExitReason::Revert(revert), &data)? + mid = (highest + lowest) / 2; } + + Ok(highest) } - other => error_on_execution_failure(&other, &data)?, - }; + }) + } - #[cfg(not(feature = "rpc_binary_search_estimate"))] - { - Ok(used_gas) - } - #[cfg(feature = "rpc_binary_search_estimate")] - { - // Define the lower bound of the binary search - let mut lowest = MIN_GAS_PER_TX; + fn transaction_by_hash(&self, hash: H256) -> BoxFuture>> { + let client = Arc::clone(&self.client); + let overrides = Arc::clone(&self.overrides); + let block_data_cache = Arc::clone(&self.block_data_cache); + let backend = Arc::clone(&self.backend); + let graph = Arc::clone(&self.graph); + + Box::pin(async move { + let (hash, index) = match frontier_backend_client::load_transactions::( + client.as_ref(), + backend.as_ref(), + hash, + true, + ) + .map_err(|err| internal_err(format!("{:?}", err)))? + { + Some((hash, index)) => (hash, index as usize), + None => { + // If the transaction is not yet mapped in the frontier db, + // check for it in the transaction pool. + let mut xts: Vec<::Extrinsic> = Vec::new(); + // Collect transactions in the ready validated pool. + xts.extend( + graph + .validated_pool() + .ready() + .map(|in_pool_tx| in_pool_tx.data().clone()) + .collect::::Extrinsic>>(), + ); - // Start close to the used gas for faster binary search - let mut mid = std::cmp::min(used_gas * 3, (highest + lowest) / 2); + // Collect transactions in the future validated pool. + xts.extend( + graph + .validated_pool() + .futures() + .iter() + .map(|(_hash, extrinsic)| extrinsic.clone()) + .collect::::Extrinsic>>(), + ); - // Execute the binary search and hone in on an executable gas limit. - let mut previous_highest = highest; - while (highest - lowest) > U256::one() { - let ExecutableResult { - data, - exit_reason, - used_gas: _, - } = executable(request.clone(), mid, api_version)?; - match exit_reason { - ExitReason::Succeed(_) => { - highest = mid; - // If the variation in the estimate is less than 10%, - // then the estimate is considered sufficiently accurate. - if (previous_highest - highest) * 10 / previous_highest < U256::one() { - return Ok(highest); + let best_block: BlockId = BlockId::Hash(client.info().best_hash); + let ethereum_transactions: Vec = client + .runtime_api() + .extrinsic_filter(&best_block, xts) + .map_err(|err| { + internal_err(format!( + "fetch runtime extrinsic filter failed: {:?}", + err + )) + })?; + + for txn in ethereum_transactions { + let inner_hash = txn.hash(); + if hash == inner_hash { + return Ok(Some(transaction_build(txn, None, None, true, None))); } - previous_highest = highest; - } - ExitReason::Revert(_) | ExitReason::Error(ExitError::OutOfGas) => { - lowest = mid; } - other => error_on_execution_failure(&other, &data)?, + // Unknown transaction. + return Ok(None); } - mid = (highest + lowest) / 2; - } + }; - Ok(highest) - } - } + let id = match frontier_backend_client::load_hash::(backend.as_ref(), hash) + .map_err(|err| internal_err(format!("{:?}", err)))? + { + Some(hash) => hash, + _ => return Ok(None), + }; + let substrate_hash = client + .expect_block_hash_from_id(&id) + .map_err(|_| internal_err(format!("Expect block number from id: {}", id)))?; - fn transaction_by_hash(&self, hash: H256) -> Result> { - let (hash, index) = match frontier_backend_client::load_transactions::( - self.client.as_ref(), - self.backend.as_ref(), - hash, - true, - ) - .map_err(|err| internal_err(format!("{:?}", err)))? - { - Some((hash, index)) => (hash, index as usize), - None => { - // If the transaction is not yet mapped in the frontier db, - // check for it in the transaction pool. - let mut xts: Vec<::Extrinsic> = Vec::new(); - // Collect transactions in the ready validated pool. - xts.extend( - self.graph - .validated_pool() - .ready() - .map(|in_pool_tx| in_pool_tx.data().clone()) - .collect::::Extrinsic>>(), - ); + let schema = + frontier_backend_client::onchain_storage_schema::(client.as_ref(), id); + let handler = overrides + .schemas + .get(&schema) + .unwrap_or(&overrides.fallback); - // Collect transactions in the future validated pool. - xts.extend( - self.graph - .validated_pool() - .futures() - .iter() - .map(|(_hash, extrinsic)| extrinsic.clone()) - .collect::::Extrinsic>>(), - ); + let block = block_data_cache.current_block(schema, substrate_hash).await; + let statuses = block_data_cache + .current_transaction_statuses(schema, substrate_hash) + .await; - let best_block: BlockId = BlockId::Hash(self.client.info().best_hash); - let ethereum_transactions: Vec = self - .client - .runtime_api() - .extrinsic_filter(&best_block, xts) - .map_err(|err| { - internal_err(format!("fetch runtime extrinsic filter failed: {:?}", err)) - })?; + let base_fee = handler.base_fee(&id); + let is_eip1559 = handler.is_eip1559(&id); - for txn in ethereum_transactions { - let inner_hash = txn.hash(); - if hash == inner_hash { - return Ok(Some(transaction_build(txn, None, None, true, None))); - } - } - // Unknown transaction. - return Ok(None); + match (block, statuses) { + (Some(block), Some(statuses)) => Ok(Some(transaction_build( + block.transactions[index].clone(), + Some(block), + Some(statuses[index].clone()), + is_eip1559, + base_fee, + ))), + _ => Ok(None), } - }; - - let id = match frontier_backend_client::load_hash::(self.backend.as_ref(), hash) - .map_err(|err| internal_err(format!("{:?}", err)))? - { - Some(hash) => hash, - _ => return Ok(None), - }; - let substrate_hash = self - .client - .expect_block_hash_from_id(&id) - .map_err(|_| internal_err(format!("Expect block number from id: {}", id)))?; - - let schema = - frontier_backend_client::onchain_storage_schema::(self.client.as_ref(), id); - let handler = self - .overrides - .schemas - .get(&schema) - .unwrap_or(&self.overrides.fallback); - - let block = self.block_data_cache.current_block(handler, substrate_hash); - let statuses = self - .block_data_cache - .current_transaction_statuses(handler, substrate_hash); - - let base_fee = handler.base_fee(&id); - let is_eip1559 = handler.is_eip1559(&id); - - match (block, statuses) { - (Some(block), Some(statuses)) => Ok(Some(transaction_build( - block.transactions[index].clone(), - Some(block), - Some(statuses[index].clone()), - is_eip1559, - base_fee, - ))), - _ => Ok(None), - } + }) } fn transaction_by_block_hash_and_index( &self, hash: H256, index: Index, - ) -> Result> { - let id = match frontier_backend_client::load_hash::(self.backend.as_ref(), hash) - .map_err(|err| internal_err(format!("{:?}", err)))? - { - Some(hash) => hash, - _ => return Ok(None), - }; - let substrate_hash = self - .client - .expect_block_hash_from_id(&id) - .map_err(|_| internal_err(format!("Expect block number from id: {}", id)))?; - - let index = index.value(); - - let schema = - frontier_backend_client::onchain_storage_schema::(self.client.as_ref(), id); - let handler = self - .overrides - .schemas - .get(&schema) - .unwrap_or(&self.overrides.fallback); + ) -> BoxFuture>> { + let client = Arc::clone(&self.client); + let overrides = Arc::clone(&self.overrides); + let block_data_cache = Arc::clone(&self.block_data_cache); + let backend = Arc::clone(&self.backend); + + Box::pin(async move { + let id = match frontier_backend_client::load_hash::(backend.as_ref(), hash) + .map_err(|err| internal_err(format!("{:?}", err)))? + { + Some(hash) => hash, + _ => return Ok(None), + }; + let substrate_hash = client + .expect_block_hash_from_id(&id) + .map_err(|_| internal_err(format!("Expect block number from id: {}", id)))?; - let block = self.block_data_cache.current_block(handler, substrate_hash); - let statuses = self - .block_data_cache - .current_transaction_statuses(handler, substrate_hash); + let index = index.value(); - let base_fee = handler.base_fee(&id); - let is_eip1559 = handler.is_eip1559(&id); + let schema = + frontier_backend_client::onchain_storage_schema::(client.as_ref(), id); + let handler = overrides + .schemas + .get(&schema) + .unwrap_or(&overrides.fallback); - match (block, statuses) { - (Some(block), Some(statuses)) => { - if let (Some(transaction), Some(status)) = - (block.transactions.get(index), statuses.get(index)) - { - return Ok(Some(transaction_build( - transaction.clone(), - Some(block), - Some(status.clone()), - is_eip1559, - base_fee, - ))); - } else { - return Err(internal_err(format!("{:?} is out of bounds", index))); + let block = block_data_cache.current_block(schema, substrate_hash).await; + let statuses = block_data_cache + .current_transaction_statuses(schema, substrate_hash) + .await; + + let base_fee = handler.base_fee(&id); + let is_eip1559 = handler.is_eip1559(&id); + + match (block, statuses) { + (Some(block), Some(statuses)) => { + if let (Some(transaction), Some(status)) = + (block.transactions.get(index), statuses.get(index)) + { + return Ok(Some(transaction_build( + transaction.clone(), + Some(block), + Some(status.clone()), + is_eip1559, + base_fee, + ))); + } else { + return Err(internal_err(format!("{:?} is out of bounds", index))); + } } + _ => Ok(None), } - _ => Ok(None), - } + }) } fn transaction_by_block_number_and_index( &self, number: BlockNumber, index: Index, - ) -> Result> { - let id = match frontier_backend_client::native_block_id::( - self.client.as_ref(), - self.backend.as_ref(), - Some(number), - )? { - Some(id) => id, - None => return Ok(None), - }; - let substrate_hash = self - .client - .expect_block_hash_from_id(&id) - .map_err(|_| internal_err(format!("Expect block number from id: {}", id)))?; - - let index = index.value(); - let schema = - frontier_backend_client::onchain_storage_schema::(self.client.as_ref(), id); - let handler = self - .overrides - .schemas - .get(&schema) - .unwrap_or(&self.overrides.fallback); - - let block = self.block_data_cache.current_block(handler, substrate_hash); - let statuses = self - .block_data_cache - .current_transaction_statuses(handler, substrate_hash); + ) -> BoxFuture>> { + let client = Arc::clone(&self.client); + let overrides = Arc::clone(&self.overrides); + let block_data_cache = Arc::clone(&self.block_data_cache); + let backend = Arc::clone(&self.backend); + + Box::pin(async move { + let id = match frontier_backend_client::native_block_id::( + client.as_ref(), + backend.as_ref(), + Some(number), + )? { + Some(id) => id, + None => return Ok(None), + }; + let substrate_hash = client + .expect_block_hash_from_id(&id) + .map_err(|_| internal_err(format!("Expect block number from id: {}", id)))?; - let base_fee = handler.base_fee(&id); - let is_eip1559 = handler.is_eip1559(&id); + let index = index.value(); + let schema = + frontier_backend_client::onchain_storage_schema::(client.as_ref(), id); + let handler = overrides + .schemas + .get(&schema) + .unwrap_or(&overrides.fallback); - match (block, statuses) { - (Some(block), Some(statuses)) => { - if let (Some(transaction), Some(status)) = - (block.transactions.get(index), statuses.get(index)) - { - return Ok(Some(transaction_build( - transaction.clone(), - Some(block), - Some(status.clone()), - is_eip1559, - base_fee, - ))); - } else { - return Err(internal_err(format!("{:?} is out of bounds", index))); + let block = block_data_cache.current_block(schema, substrate_hash).await; + let statuses = block_data_cache + .current_transaction_statuses(schema, substrate_hash) + .await; + + let base_fee = handler.base_fee(&id); + let is_eip1559 = handler.is_eip1559(&id); + + match (block, statuses) { + (Some(block), Some(statuses)) => { + if let (Some(transaction), Some(status)) = + (block.transactions.get(index), statuses.get(index)) + { + return Ok(Some(transaction_build( + transaction.clone(), + Some(block), + Some(status.clone()), + is_eip1559, + base_fee, + ))); + } else { + return Err(internal_err(format!("{:?} is out of bounds", index))); + } } + _ => Ok(None), } - _ => Ok(None), - } + }) } - fn transaction_receipt(&self, hash: H256) -> Result> { - let (hash, index) = match frontier_backend_client::load_transactions::( - self.client.as_ref(), - self.backend.as_ref(), - hash, - true, - ) - .map_err(|err| internal_err(format!("{:?}", err)))? - { - Some((hash, index)) => (hash, index as usize), - None => return Ok(None), - }; + fn transaction_receipt(&self, hash: H256) -> BoxFuture>> { + let client = Arc::clone(&self.client); + let overrides = Arc::clone(&self.overrides); + let block_data_cache = Arc::clone(&self.block_data_cache); + let backend = Arc::clone(&self.backend); - let id = match frontier_backend_client::load_hash::(self.backend.as_ref(), hash) + Box::pin(async move { + let (hash, index) = match frontier_backend_client::load_transactions::( + client.as_ref(), + backend.as_ref(), + hash, + true, + ) .map_err(|err| internal_err(format!("{:?}", err)))? - { - Some(hash) => hash, - _ => return Ok(None), - }; - let substrate_hash = self - .client - .expect_block_hash_from_id(&id) - .map_err(|_| internal_err(format!("Expect block number from id: {}", id)))?; + { + Some((hash, index)) => (hash, index as usize), + None => return Ok(None), + }; - let schema = - frontier_backend_client::onchain_storage_schema::(self.client.as_ref(), id); - let handler = self - .overrides - .schemas - .get(&schema) - .unwrap_or(&self.overrides.fallback); - - let block = self.block_data_cache.current_block(handler, substrate_hash); - let statuses = self - .block_data_cache - .current_transaction_statuses(handler, substrate_hash); - let receipts = handler.current_receipts(&id); - - match (block, statuses, receipts) { - (Some(block), Some(statuses), Some(receipts)) => { - let block_hash = - H256::from_slice(Keccak256::digest(&rlp::encode(&block.header)).as_slice()); - let receipt = receipts[index].clone(); - - let (logs, logs_bloom, status_code, cumulative_gas_used) = match receipt { - ethereum::ReceiptV3::Legacy(d) - | ethereum::ReceiptV3::EIP2930(d) - | ethereum::ReceiptV3::EIP1559(d) => (d.logs, d.logs_bloom, d.status_code, d.used_gas), - }; + let id = match frontier_backend_client::load_hash::(backend.as_ref(), hash) + .map_err(|err| internal_err(format!("{:?}", err)))? + { + Some(hash) => hash, + _ => return Ok(None), + }; + let substrate_hash = client + .expect_block_hash_from_id(&id) + .map_err(|_| internal_err(format!("Expect block number from id: {}", id)))?; - let status = statuses[index].clone(); - let mut cumulative_receipts = receipts.clone(); - cumulative_receipts.truncate((status.transaction_index + 1) as usize); - let gas_used = if index > 0 { - let previous_receipt = receipts[index - 1].clone(); - let previous_gas_used = match previous_receipt { + let schema = + frontier_backend_client::onchain_storage_schema::(client.as_ref(), id); + let handler = overrides + .schemas + .get(&schema) + .unwrap_or(&overrides.fallback); + + let block = block_data_cache.current_block(schema, substrate_hash).await; + let statuses = block_data_cache + .current_transaction_statuses(schema, substrate_hash) + .await; + let receipts = handler.current_receipts(&id); + + match (block, statuses, receipts) { + (Some(block), Some(statuses), Some(receipts)) => { + let block_hash = + H256::from_slice(Keccak256::digest(&rlp::encode(&block.header)).as_slice()); + let receipt = receipts[index].clone(); + + let (logs, logs_bloom, status_code, cumulative_gas_used) = match receipt { ethereum::ReceiptV3::Legacy(d) | ethereum::ReceiptV3::EIP2930(d) - | ethereum::ReceiptV3::EIP1559(d) => d.used_gas, + | ethereum::ReceiptV3::EIP1559(d) => (d.logs, d.logs_bloom, d.status_code, d.used_gas), }; - cumulative_gas_used.saturating_sub(previous_gas_used) - } else { - cumulative_gas_used - }; - let transaction = block.transactions[index].clone(); - let effective_gas_price = match transaction { - EthereumTransaction::Legacy(t) => t.gas_price, - EthereumTransaction::EIP2930(t) => t.gas_price, - EthereumTransaction::EIP1559(t) => handler - .base_fee(&id) - .unwrap_or_default() - .checked_add(t.max_priority_fee_per_gas) - .unwrap_or(U256::max_value()), - }; + let status = statuses[index].clone(); + let mut cumulative_receipts = receipts.clone(); + cumulative_receipts.truncate((status.transaction_index + 1) as usize); + let gas_used = if index > 0 { + let previous_receipt = receipts[index - 1].clone(); + let previous_gas_used = match previous_receipt { + ethereum::ReceiptV3::Legacy(d) + | ethereum::ReceiptV3::EIP2930(d) + | ethereum::ReceiptV3::EIP1559(d) => d.used_gas, + }; + cumulative_gas_used.saturating_sub(previous_gas_used) + } else { + cumulative_gas_used + }; - return Ok(Some(Receipt { - transaction_hash: Some(status.transaction_hash), - transaction_index: Some(status.transaction_index.into()), - block_hash: Some(block_hash), - from: Some(status.from), - to: status.to, - block_number: Some(block.header.number), - cumulative_gas_used, - gas_used: Some(gas_used), - contract_address: status.contract_address, - logs: { - let mut pre_receipts_log_index = None; - if cumulative_receipts.len() > 0 { - cumulative_receipts.truncate(cumulative_receipts.len() - 1); - pre_receipts_log_index = Some( - cumulative_receipts - .iter() - .map(|r| match r { - ethereum::ReceiptV3::Legacy(d) - | ethereum::ReceiptV3::EIP2930(d) - | ethereum::ReceiptV3::EIP1559(d) => d.logs.len() as u32, - }) - .sum::(), - ); - } - logs.iter() - .enumerate() - .map(|(i, log)| Log { - address: log.address, - topics: log.topics.clone(), - data: Bytes(log.data.clone()), - block_hash: Some(block_hash), - block_number: Some(block.header.number), - transaction_hash: Some(status.transaction_hash), - transaction_index: Some(status.transaction_index.into()), - log_index: Some(U256::from( - (pre_receipts_log_index.unwrap_or(0)) + i as u32, - )), - transaction_log_index: Some(U256::from(i)), - removed: false, - }) - .collect() - }, - status_code: Some(U64::from(status_code)), - logs_bloom: logs_bloom, - state_root: None, - effective_gas_price, - })); + let transaction = block.transactions[index].clone(); + let effective_gas_price = match transaction { + EthereumTransaction::Legacy(t) => t.gas_price, + EthereumTransaction::EIP2930(t) => t.gas_price, + EthereumTransaction::EIP1559(t) => handler + .base_fee(&id) + .unwrap_or_default() + .checked_add(t.max_priority_fee_per_gas) + .unwrap_or(U256::max_value()), + }; + + return Ok(Some(Receipt { + transaction_hash: Some(status.transaction_hash), + transaction_index: Some(status.transaction_index.into()), + block_hash: Some(block_hash), + from: Some(status.from), + to: status.to, + block_number: Some(block.header.number), + cumulative_gas_used, + gas_used: Some(gas_used), + contract_address: status.contract_address, + logs: { + let mut pre_receipts_log_index = None; + if cumulative_receipts.len() > 0 { + cumulative_receipts.truncate(cumulative_receipts.len() - 1); + pre_receipts_log_index = Some( + cumulative_receipts + .iter() + .map(|r| match r { + ethereum::ReceiptV3::Legacy(d) + | ethereum::ReceiptV3::EIP2930(d) + | ethereum::ReceiptV3::EIP1559(d) => d.logs.len() as u32, + }) + .sum::(), + ); + } + logs.iter() + .enumerate() + .map(|(i, log)| Log { + address: log.address, + topics: log.topics.clone(), + data: Bytes(log.data.clone()), + block_hash: Some(block_hash), + block_number: Some(block.header.number), + transaction_hash: Some(status.transaction_hash), + transaction_index: Some(status.transaction_index.into()), + log_index: Some(U256::from( + (pre_receipts_log_index.unwrap_or(0)) + i as u32, + )), + transaction_log_index: Some(U256::from(i)), + removed: false, + }) + .collect() + }, + status_code: Some(U64::from(status_code)), + logs_bloom: logs_bloom, + state_root: None, + effective_gas_price, + })); + } + _ => Ok(None), } - _ => Ok(None), - } + }) } fn uncle_by_block_hash_and_index(&self, _: H256, _: Index) -> Result> { @@ -1969,70 +2010,71 @@ where Ok(None) } - fn logs(&self, filter: Filter) -> Result> { - let mut ret: Vec = Vec::new(); - if let Some(hash) = filter.block_hash.clone() { - let id = match frontier_backend_client::load_hash::(self.backend.as_ref(), hash) - .map_err(|err| internal_err(format!("{:?}", err)))? - { - Some(hash) => hash, - _ => return Ok(Vec::new()), - }; - let substrate_hash = self - .client - .expect_block_hash_from_id(&id) - .map_err(|_| internal_err(format!("Expect block number from id: {}", id)))?; + fn logs(&self, filter: Filter) -> BoxFuture>> { + let client = Arc::clone(&self.client); + let block_data_cache = Arc::clone(&self.block_data_cache); + let backend = Arc::clone(&self.backend); + let max_past_logs = self.max_past_logs; - let schema = frontier_backend_client::onchain_storage_schema::( - self.client.as_ref(), - id, - ); - let handler = self - .overrides - .schemas - .get(&schema) - .unwrap_or(&self.overrides.fallback); - - let block = self.block_data_cache.current_block(handler, substrate_hash); - let statuses = self - .block_data_cache - .current_transaction_statuses(handler, substrate_hash); - if let (Some(block), Some(statuses)) = (block, statuses) { - filter_block_logs(&mut ret, &filter, block, statuses); - } - } else { - let best_number = self.client.info().best_number; - let mut current_number = filter - .to_block - .clone() - .and_then(|v| v.to_min_block_num()) - .map(|s| s.unique_saturated_into()) - .unwrap_or(best_number); + Box::pin(async move { + let mut ret: Vec = Vec::new(); + if let Some(hash) = filter.block_hash.clone() { + let id = match frontier_backend_client::load_hash::(backend.as_ref(), hash) + .map_err(|err| internal_err(format!("{:?}", err)))? + { + Some(hash) => hash, + _ => return Ok(Vec::new()), + }; + let substrate_hash = client + .expect_block_hash_from_id(&id) + .map_err(|_| internal_err(format!("Expect block number from id: {}", id)))?; - if current_number > best_number { - current_number = best_number; - } + let schema = frontier_backend_client::onchain_storage_schema::( + client.as_ref(), + id, + ); - let from_number = filter - .from_block - .clone() - .and_then(|v| v.to_min_block_num()) - .map(|s| s.unique_saturated_into()) - .unwrap_or(self.client.info().best_number); + let block = block_data_cache.current_block(schema, substrate_hash).await; + let statuses = block_data_cache + .current_transaction_statuses(schema, substrate_hash) + .await; + if let (Some(block), Some(statuses)) = (block, statuses) { + filter_block_logs(&mut ret, &filter, block, statuses); + } + } else { + let best_number = client.info().best_number; + let mut current_number = filter + .to_block + .clone() + .and_then(|v| v.to_min_block_num()) + .map(|s| s.unique_saturated_into()) + .unwrap_or(best_number); + + if current_number > best_number { + current_number = best_number; + } - let _ = filter_range_logs( - self.client.as_ref(), - self.backend.as_ref(), - &self.overrides, - &self.block_data_cache, - &mut ret, - self.max_past_logs, - &filter, - from_number, - current_number, - )?; - } - Ok(ret) + let from_number = filter + .from_block + .clone() + .and_then(|v| v.to_min_block_num()) + .map(|s| s.unique_saturated_into()) + .unwrap_or(client.info().best_number); + + let _ = filter_range_logs( + client.as_ref(), + backend.as_ref(), + &block_data_cache, + &mut ret, + max_past_logs, + &filter, + from_number, + current_number, + ) + .await?; + } + Ok(ret) + }) } fn work(&self) -> Result { @@ -2281,7 +2323,6 @@ pub struct EthFilterApi { backend: Arc>, filter_pool: FilterPool, max_stored_filters: usize, - overrides: Arc>, max_past_logs: u32, block_data_cache: Arc>, _marker: PhantomData<(B, BE)>, @@ -2301,7 +2342,6 @@ where backend: Arc>, filter_pool: FilterPool, max_stored_filters: usize, - overrides: Arc>, max_past_logs: u32, block_data_cache: Arc>, ) -> Self { @@ -2310,7 +2350,6 @@ where backend, filter_pool, max_stored_filters, - overrides, max_past_logs, block_data_cache, _marker: PhantomData, @@ -2383,44 +2422,41 @@ where Err(internal_err("Method not available.")) } - fn filter_changes(&self, index: Index) -> Result { + fn filter_changes(&self, index: Index) -> BoxFuture> { + // There are multiple branches that needs to return async blocks. + // Also, each branch need to (synchronously) do stuff with the pool + // (behind a lock), and the lock should be released before entering + // an async block. + // + // To avoid issues with multiple async blocks (having different + // anonymous types) we collect all necessary data in this enum then have + // a single async block. + enum FuturePath { + Block { + last: u64, + next: u64, + }, + Log { + filter: Filter, + from_number: NumberFor, + current_number: NumberFor, + }, + Error(jsonrpc_core::Error), + } + let key = U256::from(index.value()); let block_number = UniqueSaturatedInto::::unique_saturated_into(self.client.info().best_number); let pool = self.filter_pool.clone(); // Try to lock. - let response = if let Ok(locked) = &mut pool.lock() { + let path = if let Ok(locked) = &mut pool.lock() { // Try to get key. - if let Some(pool_item) = locked.clone().get(&key) { + if let Some(pool_item) = locked.get(&key).cloned() { match &pool_item.filter_type { // For each block created since last poll, get a vector of ethereum hashes. FilterType::Block => { let last = pool_item.last_poll.to_min_block_num().unwrap(); let next = block_number + 1; - let mut ethereum_hashes: Vec = Vec::new(); - for n in last..next { - let id = BlockId::Number(n.unique_saturated_into()); - let substrate_hash = - self.client.expect_block_hash_from_id(&id).map_err(|_| { - internal_err(format!("Expect block number from id: {}", id)) - })?; - - let schema = frontier_backend_client::onchain_storage_schema::( - self.client.as_ref(), - id, - ); - let handler = self - .overrides - .schemas - .get(&schema) - .unwrap_or(&self.overrides.fallback); - - let block = - self.block_data_cache.current_block(handler, substrate_hash); - if let Some(block) = block { - ethereum_hashes.push(block.header.hash()) - } - } // Update filter `last_poll`. locked.insert( key, @@ -2430,10 +2466,21 @@ where at_block: pool_item.at_block, }, ); - Ok(FilterChanges::Hashes(ethereum_hashes)) + + FuturePath::::Block { last, next } } // For each event since last poll, get a vector of ethereum logs. FilterType::Log(filter) => { + // Update filter `last_poll`. + locked.insert( + key, + FilterPoolItem { + last_poll: BlockNumber::Num(block_number + 1), + filter_type: pool_item.clone().filter_type, + at_block: pool_item.at_block, + }, + ); + // Either the filter-specific `to` block or best block. let best_number = self.client.info().best_number; let mut current_number = filter @@ -2464,99 +2511,144 @@ where let from_number = std::cmp::max(last_poll, filter_from); // Build the response. - let mut ret: Vec = Vec::new(); - let _ = filter_range_logs( - self.client.as_ref(), - self.backend.as_ref(), - &self.overrides, - &self.block_data_cache, - &mut ret, - self.max_past_logs, - &filter, + FuturePath::Log { + filter: filter.clone(), from_number, current_number, - )?; - // Update filter `last_poll`. - locked.insert( - key, - FilterPoolItem { - last_poll: BlockNumber::Num(block_number + 1), - filter_type: pool_item.clone().filter_type, - at_block: pool_item.at_block, - }, - ); - Ok(FilterChanges::Logs(ret)) + } } // Should never reach here. - _ => Err(internal_err("Method not available.")), + _ => FuturePath::Error(internal_err("Method not available.")), } } else { - Err(internal_err(format!("Filter id {:?} does not exist.", key))) + FuturePath::Error(internal_err(format!("Filter id {:?} does not exist.", key))) } } else { - Err(internal_err("Filter pool is not available.")) + FuturePath::Error(internal_err("Filter pool is not available.")) }; - response + + let client = Arc::clone(&self.client); + let block_data_cache = Arc::clone(&self.block_data_cache); + let backend = Arc::clone(&self.backend); + let max_past_logs = self.max_past_logs; + + Box::pin(async move { + match path { + FuturePath::Error(err) => Err(err), + FuturePath::Block { last, next } => { + let mut ethereum_hashes: Vec = Vec::new(); + for n in last..next { + let id = BlockId::Number(n.unique_saturated_into()); + let substrate_hash = + client.expect_block_hash_from_id(&id).map_err(|_| { + internal_err(format!("Expect block number from id: {}", id)) + })?; + + let schema = frontier_backend_client::onchain_storage_schema::( + client.as_ref(), + id, + ); + + let block = block_data_cache.current_block(schema, substrate_hash).await; + if let Some(block) = block { + ethereum_hashes.push(block.header.hash()) + } + } + Ok(FilterChanges::Hashes(ethereum_hashes)) + } + FuturePath::Log { + filter, + from_number, + current_number, + } => { + let mut ret: Vec = Vec::new(); + let _ = filter_range_logs( + client.as_ref(), + backend.as_ref(), + &block_data_cache, + &mut ret, + max_past_logs, + &filter, + from_number, + current_number, + ) + .await?; + + Ok(FilterChanges::Logs(ret)) + } + } + }) } - fn filter_logs(&self, index: Index) -> Result> { + fn filter_logs(&self, index: Index) -> BoxFuture>> { let key = U256::from(index.value()); let pool = self.filter_pool.clone(); - // Try to lock. - let response = if let Ok(locked) = &mut pool.lock() { - // Try to get key. - if let Some(pool_item) = locked.clone().get(&key) { - match &pool_item.filter_type { - FilterType::Log(filter) => { - let best_number = self.client.info().best_number; - let mut current_number = filter - .to_block - .clone() - .and_then(|v| v.to_min_block_num()) - .map(|s| s.unique_saturated_into()) - .unwrap_or(best_number); - if current_number > best_number { - current_number = best_number; - } + // We want to get the filter, while releasing the pool lock outside + // of the async block. + let filter_result: Result = (|| { + let pool = pool + .lock() + .map_err(|_| internal_err("Filter pool is not available."))?; + + let pool_item = pool + .get(&key) + .ok_or_else(|| internal_err(format!("Filter id {:?} does not exist.", key)))?; + + match &pool_item.filter_type { + FilterType::Log(filter) => Ok(filter.clone()), + _ => Err(internal_err(format!( + "Filter id {:?} is not a Log filter.", + key + ))), + } + })(); - if current_number > self.client.info().best_number { - current_number = self.client.info().best_number; - } + let client = Arc::clone(&self.client); + let block_data_cache = Arc::clone(&self.block_data_cache); + let backend = Arc::clone(&self.backend); + let max_past_logs = self.max_past_logs; - let from_number = filter - .from_block - .clone() - .and_then(|v| v.to_min_block_num()) - .map(|s| s.unique_saturated_into()) - .unwrap_or(self.client.info().best_number); - - let mut ret: Vec = Vec::new(); - let _ = filter_range_logs( - self.client.as_ref(), - self.backend.as_ref(), - &self.overrides, - &self.block_data_cache, - &mut ret, - self.max_past_logs, - &filter, - from_number, - current_number, - )?; - Ok(ret) - } - _ => Err(internal_err(format!( - "Filter id {:?} is not a Log filter.", - key - ))), - } - } else { - Err(internal_err(format!("Filter id {:?} does not exist.", key))) + Box::pin(async move { + let filter = filter_result?; + + let best_number = client.info().best_number; + let mut current_number = filter + .to_block + .clone() + .and_then(|v| v.to_min_block_num()) + .map(|s| s.unique_saturated_into()) + .unwrap_or(best_number); + + if current_number > best_number { + current_number = best_number; } - } else { - Err(internal_err("Filter pool is not available.")) - }; - response + + if current_number > client.info().best_number { + current_number = client.info().best_number; + } + + let from_number = filter + .from_block + .clone() + .and_then(|v| v.to_min_block_num()) + .map(|s| s.unique_saturated_into()) + .unwrap_or(client.info().best_number); + + let mut ret: Vec = Vec::new(); + let _ = filter_range_logs( + client.as_ref(), + backend.as_ref(), + &block_data_cache, + &mut ret, + max_past_logs, + &filter, + from_number, + current_number, + ) + .await?; + Ok(ret) + }) } fn uninstall_filter(&self, index: Index) -> Result { @@ -2893,69 +2985,217 @@ where } } -/// Stores an LRU cache for block data and their transaction statuses. +enum EthBlockDataCacheMessage { + RequestCurrentBlock { + block_hash: B::Hash, + schema: EthereumStorageSchema, + response_tx: oneshot::Sender>, + }, + FetchedCurrentBlock { + block_hash: B::Hash, + block: Option, + }, + + RequestCurrentTransactionStatuses { + block_hash: B::Hash, + schema: EthereumStorageSchema, + response_tx: oneshot::Sender>>, + }, + FetchedCurrentTransactionStatuses { + block_hash: B::Hash, + statuses: Option>, + }, +} + +/// Manage LRU cachse for block data and their transaction statuses. /// These are large and take a lot of time to fetch from the database. /// Storing them in an LRU cache will allow to reduce database accesses /// when many subsequent requests are related to the same blocks. -pub struct EthBlockDataCache { - blocks: parking_lot::Mutex>, - statuses: parking_lot::Mutex>>, -} +pub struct EthBlockDataCache(mpsc::Sender>); impl EthBlockDataCache { - /// Create a new cache with provided cache sizes. - pub fn new(blocks_cache_size: usize, statuses_cache_size: usize) -> Self { - Self { - blocks: parking_lot::Mutex::new(LruCache::new(blocks_cache_size)), - statuses: parking_lot::Mutex::new(LruCache::new(statuses_cache_size)), + pub fn new( + spawn_handle: SpawnTaskHandle, + overrides: Arc>, + blocks_cache_size: usize, + statuses_cache_size: usize, + ) -> Self { + let (task_tx, mut task_rx) = mpsc::channel(100); + let outer_task_tx = task_tx.clone(); + let outer_spawn_handle = spawn_handle.clone(); + + outer_spawn_handle.spawn("EthBlockDataCache", async move { + let mut blocks_cache = LruCache::::new(blocks_cache_size); + let mut statuses_cache = + LruCache::>::new(statuses_cache_size); + + let mut awaiting_blocks = + HashMap::>>>::new(); + let mut awaiting_statuses = + HashMap::>>>>::new(); + + // Handle all incoming messages. + // Exits when there are no more senders. + // Any long computation should be spawned in a separate task + // to keep this task handle messages as soon as possible. + while let Some(message) = task_rx.recv().await { + use EthBlockDataCacheMessage::*; + match message { + RequestCurrentBlock { + block_hash, + schema, + response_tx, + } => Self::request_current( + &spawn_handle, + &mut blocks_cache, + &mut awaiting_blocks, + Arc::clone(&overrides), + block_hash, + schema, + response_tx, + task_tx.clone(), + move |handler| FetchedCurrentBlock { + block_hash, + block: handler.current_block(&BlockId::Hash(block_hash)), + }, + ), + FetchedCurrentBlock { block_hash, block } => { + if let Some(wait_list) = awaiting_blocks.remove(&block_hash) { + for sender in wait_list { + let _ = sender.send(block.clone()); + } + } + + if let Some(block) = block { + blocks_cache.put(block_hash, block); + } + } + + RequestCurrentTransactionStatuses { + block_hash, + schema, + response_tx, + } => Self::request_current( + &spawn_handle, + &mut statuses_cache, + &mut awaiting_statuses, + Arc::clone(&overrides), + block_hash, + schema, + response_tx, + task_tx.clone(), + move |handler| FetchedCurrentTransactionStatuses { + block_hash, + statuses: handler + .current_transaction_statuses(&BlockId::Hash(block_hash)), + }, + ), + FetchedCurrentTransactionStatuses { + block_hash, + statuses, + } => { + if let Some(wait_list) = awaiting_statuses.remove(&block_hash) { + for sender in wait_list { + let _ = sender.send(statuses.clone()); + } + } + + if let Some(statuses) = statuses { + statuses_cache.put(block_hash, statuses); + } + } + } + } + }); + + Self(outer_task_tx) + } + + fn request_current( + spawn_handle: &SpawnTaskHandle, + cache: &mut LruCache, + wait_list: &mut HashMap>>>, + overrides: Arc>, + block_hash: B::Hash, + schema: EthereumStorageSchema, + response_tx: oneshot::Sender>, + task_tx: mpsc::Sender>, + handler_call: F, + ) where + T: Clone, + F: FnOnce(&Box + Send + Sync>) -> EthBlockDataCacheMessage, + F: Send + 'static, + { + // Data is cached, we respond immediately. + if let Some(data) = cache.get(&block_hash).cloned() { + let _ = response_tx.send(Some(data)); + return; + } + + // Another request already triggered caching but the + // response is not known yet, we add the sender to the waiting + // list. + if let Some(waiting) = wait_list.get_mut(&block_hash) { + waiting.push(response_tx); + return; } + + // Data is neither cached nor already requested, so we start fetching + // the data. + wait_list.insert(block_hash.clone(), vec![response_tx]); + + spawn_handle.spawn("EthBlockDataCache Worker", async move { + let handler = overrides + .schemas + .get(&schema) + .unwrap_or(&overrides.fallback); + + let message = handler_call(handler); + let _ = task_tx.send(message).await; + }); } /// Cache for `handler.current_block`. - pub fn current_block( + pub async fn current_block( &self, - handler: &Box + Send + Sync>, - substrate_block_hash: B::Hash, + schema: EthereumStorageSchema, + block_hash: B::Hash, ) -> Option { - { - let mut cache = self.blocks.lock(); - if let Some(block) = cache.get(&substrate_block_hash).cloned() { - return Some(block); - } - } - - if let Some(block) = handler.current_block(&BlockId::Hash(substrate_block_hash)) { - let mut cache = self.blocks.lock(); - cache.put(substrate_block_hash, block.clone()); - - return Some(block); - } + let (response_tx, response_rx) = oneshot::channel(); + + let _ = self + .0 + .send(EthBlockDataCacheMessage::RequestCurrentBlock { + block_hash, + schema, + response_tx, + }) + .await + .ok()?; - None + response_rx.await.ok()? } /// Cache for `handler.current_transaction_statuses`. - pub fn current_transaction_statuses( + pub async fn current_transaction_statuses( &self, - handler: &Box + Send + Sync>, - substrate_block_hash: B::Hash, + schema: EthereumStorageSchema, + block_hash: B::Hash, ) -> Option> { - { - let mut cache = self.statuses.lock(); - if let Some(statuses) = cache.get(&substrate_block_hash).cloned() { - return Some(statuses); - } - } - - if let Some(statuses) = - handler.current_transaction_statuses(&BlockId::Hash(substrate_block_hash)) - { - let mut cache = self.statuses.lock(); - cache.put(substrate_block_hash, statuses.clone()); - - return Some(statuses); - } + let (response_tx, response_rx) = oneshot::channel(); + + let _ = self + .0 + .send( + EthBlockDataCacheMessage::RequestCurrentTransactionStatuses { + block_hash, + schema, + response_tx, + }, + ) + .await + .ok()?; - None + response_rx.await.ok()? } } diff --git a/primitives/consensus/src/lib.rs b/primitives/consensus/src/lib.rs index eca3128383..0afaa810aa 100644 --- a/primitives/consensus/src/lib.rs +++ b/primitives/consensus/src/lib.rs @@ -18,7 +18,6 @@ #![cfg_attr(not(feature = "std"), no_std)] use codec::{Decode, Encode}; -use sha3::{Digest as Sha3Digest, Keccak256}; use sp_core::H256; use sp_runtime::{ generic::{Digest, OpaqueDigestItemId}, diff --git a/template/node/src/rpc.rs b/template/node/src/rpc.rs index 61d5e18df9..860b5a4de8 100644 --- a/template/node/src/rpc.rs +++ b/template/node/src/rpc.rs @@ -1,6 +1,6 @@ //! A collection of node-specific RPC methods. -use std::sync::Arc; +use std::{collections::BTreeMap, sync::Arc}; use fc_rpc::{ EthBlockDataCache, OverrideHandle, RuntimeApiStorageOverride, SchemaV1Override, @@ -24,9 +24,9 @@ use sp_api::ProvideRuntimeApi; use sp_block_builder::BlockBuilder; use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata}; use sp_runtime::traits::BlakeTwo256; -use std::collections::BTreeMap; /// Light client extra dependencies. +#[allow(dead_code)] pub struct LightDeps { /// The client instance to use. pub client: Arc, @@ -67,6 +67,10 @@ pub struct FullDeps { /// Manual seal command sink pub command_sink: Option>>, + /// Ethereum data access overrides. + pub overrides: Arc>, + /// Cache for Ethereum block data. + pub block_data_cache: Arc>, } pub fn overrides_handle(client: Arc) -> Arc> @@ -105,7 +109,6 @@ where pub fn create_full( deps: FullDeps, subscription_task_executor: SubscriptionTaskExecutor, - overrides: Arc>, ) -> jsonrpc_core::IoHandler where BE: Backend + 'static, @@ -144,6 +147,8 @@ where fee_history_limit, fee_history_cache, enable_dev_signer, + overrides, + block_data_cache, } = deps; io.extend_with(SystemApi::to_delegate(FullSystem::new( @@ -160,8 +165,6 @@ where signers.push(Box::new(EthDevSigner::new()) as Box); } - let block_data_cache = Arc::new(EthBlockDataCache::new(50, 50)); - io.extend_with(EthApiServer::to_delegate(EthApi::new( client.clone(), pool.clone(), @@ -184,7 +187,6 @@ where backend, filter_pool.clone(), 500 as usize, // max stored filters - overrides.clone(), max_past_logs, block_data_cache.clone(), ))); @@ -225,6 +227,7 @@ where } /// Instantiate all Light RPC extensions. +#[allow(dead_code)] pub fn create_light(deps: LightDeps) -> jsonrpc_core::IoHandler where C: sp_blockchain::HeaderBackend, diff --git a/template/node/src/service.rs b/template/node/src/service.rs index e2760f99b4..cca23b7d65 100644 --- a/template/node/src/service.rs +++ b/template/node/src/service.rs @@ -377,6 +377,13 @@ pub fn new_full(mut config: Configuration, cli: &Cli) -> Result Result