diff --git a/Cargo.lock b/Cargo.lock index cb4e085218e52..3de3363d19f6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3311,6 +3311,7 @@ dependencies = [ "cumulus-relay-chain-interface", "futures", "parity-scale-codec", + "parking_lot 0.12.1", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-overseer", @@ -14689,6 +14690,7 @@ dependencies = [ name = "sc-consensus" version = "0.10.0-dev" dependencies = [ + "async-lock", "async-trait", "futures", "futures-timer", @@ -14708,6 +14710,7 @@ dependencies = [ "sp-test-primitives", "substrate-prometheus-endpoint", "thiserror", + "tokio", ] [[package]] diff --git a/cumulus/client/consensus/aura/Cargo.toml b/cumulus/client/consensus/aura/Cargo.toml index 8239a498746e3..55482206c0d7f 100644 --- a/cumulus/client/consensus/aura/Cargo.toml +++ b/cumulus/client/consensus/aura/Cargo.toml @@ -9,6 +9,7 @@ edition.workspace = true async-trait = "0.1.73" codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive" ] } futures = "0.3.28" +parking_lot = "0.12.1" tracing = "0.1.37" schnellru = "0.2.1" diff --git a/cumulus/client/consensus/aura/src/equivocation_import_queue.rs b/cumulus/client/consensus/aura/src/equivocation_import_queue.rs index 5cd65ed5546ba..38ceec30b57d8 100644 --- a/cumulus/client/consensus/aura/src/equivocation_import_queue.rs +++ b/cumulus/client/consensus/aura/src/equivocation_import_queue.rs @@ -21,11 +21,12 @@ /// should be thrown out and which ones should be kept. use codec::Codec; use cumulus_client_consensus_common::ParachainBlockImportMarker; +use parking_lot::Mutex; use schnellru::{ByLength, LruMap}; use sc_consensus::{ import_queue::{BasicQueue, Verifier as VerifierT}, - BlockImport, BlockImportParams, ForkChoiceStrategy, + BlockImport, BlockImportParams, ForkChoiceStrategy, SharedBlockImport, }; use sc_consensus_aura::standalone as aura_internal; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE}; @@ -71,7 +72,7 @@ struct Verifier { client: Arc, create_inherent_data_providers: CIDP, slot_duration: SlotDuration, - defender: NaiveEquivocationDefender, + defender: Mutex, telemetry: Option, _phantom: std::marker::PhantomData (Block, P)>, } @@ -89,7 +90,7 @@ where CIDP: CreateInherentDataProviders, { async fn verify( - &mut self, + &self, mut block_params: BlockImportParams, ) -> Result, String> { // Skip checks that include execution, if being told so, or when importing only state. @@ -132,7 +133,7 @@ where block_params.post_hash = Some(post_hash); // Check for and reject egregious amounts of equivocations. - if self.defender.insert_and_check(slot) { + if self.defender.lock().insert_and_check(slot) { return Err(format!( "Rejecting block {:?} due to excessive equivocations at slot", post_hash, @@ -239,11 +240,11 @@ where let verifier = Verifier:: { client, create_inherent_data_providers, - defender: NaiveEquivocationDefender::default(), + defender: Mutex::new(NaiveEquivocationDefender::default()), slot_duration, telemetry, _phantom: std::marker::PhantomData, }; - BasicQueue::new(verifier, Box::new(block_import), None, spawner, registry) + BasicQueue::new(verifier, SharedBlockImport::new(block_import), None, spawner, registry) } diff --git a/cumulus/client/consensus/common/src/import_queue.rs b/cumulus/client/consensus/common/src/import_queue.rs index 311a2b7ad8cfd..e815353b77bb0 100644 --- a/cumulus/client/consensus/common/src/import_queue.rs +++ b/cumulus/client/consensus/common/src/import_queue.rs @@ -38,6 +38,7 @@ use sp_runtime::traits::Block as BlockT; use sc_consensus::{ block_import::{BlockImport, BlockImportParams}, import_queue::{BasicQueue, Verifier}, + SharedBlockImport, }; use crate::ParachainBlockImportMarker; @@ -50,7 +51,7 @@ pub struct VerifyNothing; #[async_trait::async_trait] impl Verifier for VerifyNothing { async fn verify( - &mut self, + &self, params: BlockImportParams, ) -> Result, String> { Ok(params) @@ -72,5 +73,5 @@ where + Sync + 'static, { - BasicQueue::new(VerifyNothing, Box::new(block_import), None, spawner, registry) + BasicQueue::new(VerifyNothing, SharedBlockImport::new(block_import), None, spawner, registry) } diff --git a/cumulus/client/consensus/common/src/lib.rs b/cumulus/client/consensus/common/src/lib.rs index 08bceabb2bd4a..ea788fb219e0b 100644 --- a/cumulus/client/consensus/common/src/lib.rs +++ b/cumulus/client/consensus/common/src/lib.rs @@ -155,13 +155,13 @@ impl Clone for ParachainBlockImport { impl BlockImport for ParachainBlockImport where Block: BlockT, - BI: BlockImport + Send, + BI: BlockImport + Send + Sync, BE: Backend, { type Error = BI::Error; async fn check_block( - &mut self, + &self, block: sc_consensus::BlockCheckParams, ) -> Result { self.inner.check_block(block).await diff --git a/cumulus/client/consensus/relay-chain/src/import_queue.rs b/cumulus/client/consensus/relay-chain/src/import_queue.rs index 9ee03b95904c6..16013ff2a1d2e 100644 --- a/cumulus/client/consensus/relay-chain/src/import_queue.rs +++ b/cumulus/client/consensus/relay-chain/src/import_queue.rs @@ -20,7 +20,7 @@ use cumulus_client_consensus_common::ParachainBlockImportMarker; use sc_consensus::{ import_queue::{BasicQueue, Verifier as VerifierT}, - BlockImport, BlockImportParams, + BlockImport, BlockImportParams, SharedBlockImport, }; use sp_api::ProvideRuntimeApi; use sp_block_builder::BlockBuilder as BlockBuilderApi; @@ -52,7 +52,7 @@ where CIDP: CreateInherentDataProviders, { async fn verify( - &mut self, + &self, mut block_params: BlockImportParams, ) -> Result, String> { // Skip checks that include execution, if being told so, or when importing only state. @@ -125,5 +125,5 @@ where { let verifier = Verifier::new(client, create_inherent_data_providers); - Ok(BasicQueue::new(verifier, Box::new(block_import), None, spawner, registry)) + Ok(BasicQueue::new(verifier, SharedBlockImport::new(block_import), None, spawner, registry)) } diff --git a/cumulus/client/pov-recovery/src/lib.rs b/cumulus/client/pov-recovery/src/lib.rs index b050bc66799c7..130828a148dd3 100644 --- a/cumulus/client/pov-recovery/src/lib.rs +++ b/cumulus/client/pov-recovery/src/lib.rs @@ -453,7 +453,7 @@ where /// Import the given `block`. /// - /// This will also recursivley drain `waiting_for_parent` and import them as well. + /// This will also recursively drain `waiting_for_parent` and import them as well. async fn import_block(&mut self, block: Block) { let mut blocks = VecDeque::new(); diff --git a/cumulus/polkadot-parachain/src/service.rs b/cumulus/polkadot-parachain/src/service.rs index f7b053b4b6a9d..cef6f6f32041a 100644 --- a/cumulus/polkadot-parachain/src/service.rs +++ b/cumulus/polkadot-parachain/src/service.rs @@ -46,7 +46,7 @@ use cumulus_client_consensus_relay_chain::Verifier as RelayChainVerifier; use futures::lock::Mutex; use sc_consensus::{ import_queue::{BasicQueue, Verifier as VerifierT}, - BlockImportParams, ImportQueue, + BlockImportParams, ImportQueue, SharedBlockImport, }; use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY}; use sc_network::{config::FullNetworkConfiguration, NetworkBlock}; @@ -1022,7 +1022,7 @@ where struct Verifier { client: Arc, - aura_verifier: BuildOnAccess>>, + aura_verifier: Mutex>>>, relay_chain_verifier: Box>, _phantom: PhantomData, } @@ -1035,7 +1035,7 @@ where AuraId: Send + Sync + Codec, { async fn verify( - &mut self, + &self, block_import: BlockImportParams, ) -> Result, String> { if self @@ -1044,7 +1044,7 @@ where .has_api::>(*block_import.header.parent_hash()) .unwrap_or(false) { - self.aura_verifier.get_mut().verify(block_import).await + self.aura_verifier.lock().await.get_mut().verify(block_import).await } else { self.relay_chain_verifier.verify(block_import).await } @@ -1104,14 +1104,14 @@ where let verifier = Verifier { client, relay_chain_verifier, - aura_verifier: BuildOnAccess::Uninitialized(Some(Box::new(aura_verifier))), + aura_verifier: Mutex::new(BuildOnAccess::Uninitialized(Some(Box::new(aura_verifier)))), _phantom: PhantomData, }; let registry = config.prometheus_registry(); let spawner = task_manager.spawn_essential_handle(); - Ok(BasicQueue::new(verifier, Box::new(block_import), None, &spawner, registry)) + Ok(BasicQueue::new(verifier, SharedBlockImport::new(block_import), None, &spawner, registry)) } /// Start an aura powered parachain node. Asset Hub and Collectives use this. diff --git a/substrate/client/consensus/aura/src/import_queue.rs b/substrate/client/consensus/aura/src/import_queue.rs index a8777ef8788cc..1239621015f96 100644 --- a/substrate/client/consensus/aura/src/import_queue.rs +++ b/substrate/client/consensus/aura/src/import_queue.rs @@ -29,6 +29,7 @@ use sc_client_api::{backend::AuxStore, BlockOf, UsageProvider}; use sc_consensus::{ block_import::{BlockImport, BlockImportParams, ForkChoiceStrategy}, import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier}, + SharedBlockImport, }; use sc_consensus_slots::{check_equivocation, CheckedHeader, InherentDataProviderExt}; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE}; @@ -174,7 +175,7 @@ where CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync, { async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { // Skip checks that include execution, if being told so or when importing only state. @@ -376,7 +377,13 @@ where compatibility_mode, }); - Ok(BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry)) + Ok(BasicQueue::new( + verifier, + SharedBlockImport::new(block_import), + justification_import, + spawner, + registry, + )) } /// Parameters of [`build_verifier`]. diff --git a/substrate/client/consensus/babe/src/lib.rs b/substrate/client/consensus/babe/src/lib.rs index 90b7523ec18bb..b80fe72a592c7 100644 --- a/substrate/client/consensus/babe/src/lib.rs +++ b/substrate/client/consensus/babe/src/lib.rs @@ -97,6 +97,7 @@ use sc_consensus::{ StateAction, }, import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier}, + SharedBlockImport, }; use sc_consensus_epochs::{ descendent_query, Epoch as EpochT, EpochChangesFor, SharedEpochChanges, ViableEpochDescriptor, @@ -1130,7 +1131,7 @@ where CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync, { async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { trace!( @@ -1683,7 +1684,7 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { self.inner.check_block(block).await.map_err(Into::into) @@ -1856,7 +1857,13 @@ where spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed()); Ok(( - BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry), + BasicQueue::new( + verifier, + SharedBlockImport::new(block_import), + justification_import, + spawner, + registry, + ), BabeWorkerHandle(worker_tx), )) } diff --git a/substrate/client/consensus/babe/src/tests.rs b/substrate/client/consensus/babe/src/tests.rs index b3843f8acfa0a..0ce7209cf754e 100644 --- a/substrate/client/consensus/babe/src/tests.rs +++ b/substrate/client/consensus/babe/src/tests.rs @@ -22,7 +22,7 @@ use super::*; use authorship::claim_slot; use sc_block_builder::{BlockBuilder, BlockBuilderProvider}; use sc_client_api::{BlockchainEvents, Finalizer}; -use sc_consensus::{BoxBlockImport, BoxJustificationImport}; +use sc_consensus::{BoxJustificationImport, SharedBlockImport}; use sc_consensus_epochs::{EpochIdentifier, EpochIdentifierPosition}; use sc_consensus_slots::BackoffAuthoringOnFinalizedHeadLagging; use sc_network_test::{Block as TestBlock, *}; @@ -138,11 +138,11 @@ thread_local! { pub struct PanickingBlockImport(B); #[async_trait::async_trait] -impl> BlockImport for PanickingBlockImport +impl BlockImport for PanickingBlockImport where - B: Send, + BI: BlockImport + Send + Sync, { - type Error = B::Error; + type Error = BI::Error; async fn import_block( &mut self, @@ -152,7 +152,7 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { Ok(self.0.check_block(block).await.expect("checking block failed")) @@ -193,7 +193,7 @@ impl Verifier for TestVerifier { /// new set of validators to import. If not, err with an Error-Message /// presented to the User in the logs. async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { // apply post-sealing mutations (i.e. stripping seal, if desired). @@ -204,7 +204,7 @@ impl Verifier for TestVerifier { pub struct PeerData { link: BabeLink, - block_import: Mutex>>, + block_import: SharedBlockImport, } impl TestNetFactory for BabeTestNet { @@ -228,8 +228,7 @@ impl TestNetFactory for BabeTestNet { let block_import = PanickingBlockImport(block_import); - let data_block_import = - Mutex::new(Some(Box::new(block_import.clone()) as BoxBlockImport<_>)); + let data_block_import = SharedBlockImport::new(block_import.clone()); ( BlockImportAdapter::new(block_import), None, @@ -370,7 +369,7 @@ async fn run_one_test(mutator: impl Fn(&mut TestHeader, Stage) + Send + Sync + ' let client_clone = client.clone(); babe_futures.push( start_babe(BabeParams { - block_import: data.block_import.lock().take().expect("import set up during init"), + block_import: data.block_import.clone(), select_chain, client, env: environ, @@ -614,7 +613,7 @@ async fn propose_and_import_block( parent: &TestHeader, slot: Option, proposer_factory: &mut DummyFactory, - block_import: &mut BoxBlockImport, + block_import: &mut SharedBlockImport, ) -> Hash { let mut proposer = proposer_factory.init(parent).await.unwrap(); @@ -684,7 +683,7 @@ async fn propose_and_import_block( async fn propose_and_import_blocks( client: &PeersFullClient, proposer_factory: &mut DummyFactory, - block_import: &mut BoxBlockImport, + block_import: &mut SharedBlockImport, parent_hash: Hash, n: usize, ) -> Vec { @@ -715,7 +714,7 @@ async fn importing_block_one_sets_genesis_epoch() { mutator: Arc::new(|_, _| ()), }; - let mut block_import = data.block_import.lock().take().expect("import set up during init"); + let mut block_import = data.block_import.clone(); let genesis_header = client.header(client.chain_info().genesis_hash).unwrap().unwrap(); @@ -749,7 +748,7 @@ async fn revert_prunes_epoch_changes_and_removes_weights() { let client = peer.client().as_client(); let backend = peer.client().as_backend(); - let mut block_import = data.block_import.lock().take().expect("import set up during init"); + let mut block_import = data.block_import.clone(); let epoch_changes = data.link.epoch_changes.clone(); let mut proposer_factory = DummyFactory { @@ -837,7 +836,7 @@ async fn revert_not_allowed_for_finalized() { let client = peer.client().as_client(); let backend = peer.client().as_backend(); - let mut block_import = data.block_import.lock().take().expect("import set up during init"); + let mut block_import = data.block_import.clone(); let mut proposer_factory = DummyFactory { client: client.clone(), @@ -876,7 +875,7 @@ async fn importing_epoch_change_block_prunes_tree() { let data = peer.data.as_ref().expect("babe link set up during initialization"); let client = peer.client().as_client(); - let mut block_import = data.block_import.lock().take().expect("import set up during init"); + let mut block_import = data.block_import.clone(); let epoch_changes = data.link.epoch_changes.clone(); let mut proposer_factory = DummyFactory { @@ -975,7 +974,7 @@ async fn verify_slots_are_strictly_increasing() { let data = peer.data.as_ref().expect("babe link set up during initialization"); let client = peer.client().as_client(); - let mut block_import = data.block_import.lock().take().expect("import set up during init"); + let mut block_import = data.block_import.clone(); let mut proposer_factory = DummyFactory { client: client.clone(), @@ -1022,7 +1021,7 @@ async fn obsolete_blocks_aux_data_cleanup() { mutator: Arc::new(|_, _| ()), }; - let mut block_import = data.block_import.lock().take().expect("import set up during init"); + let mut block_import = data.block_import.clone(); let aux_data_check = |hashes: &[Hash], expected: bool| { hashes.iter().all(|hash| { @@ -1099,7 +1098,7 @@ async fn allows_skipping_epochs() { let data = peer.data.as_ref().expect("babe link set up during initialization"); let client = peer.client().as_client(); - let mut block_import = data.block_import.lock().take().expect("import set up during init"); + let mut block_import = data.block_import.clone(); let mut proposer_factory = DummyFactory { client: client.clone(), @@ -1228,7 +1227,7 @@ async fn allows_skipping_epochs_on_some_forks() { let data = peer.data.as_ref().expect("babe link set up during initialization"); let client = peer.client().as_client(); - let mut block_import = data.block_import.lock().take().expect("import set up during init"); + let mut block_import = data.block_import.clone(); let mut proposer_factory = DummyFactory { client: client.clone(), diff --git a/substrate/client/consensus/beefy/src/import.rs b/substrate/client/consensus/beefy/src/import.rs index 5b2abb20acede..dd658c7d40f0b 100644 --- a/substrate/client/consensus/beefy/src/import.rs +++ b/substrate/client/consensus/beefy/src/import.rs @@ -175,7 +175,7 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { self.inner.check_block(block).await diff --git a/substrate/client/consensus/common/Cargo.toml b/substrate/client/consensus/common/Cargo.toml index f269e3752d435..c43da3299d63b 100644 --- a/substrate/client/consensus/common/Cargo.toml +++ b/substrate/client/consensus/common/Cargo.toml @@ -13,6 +13,7 @@ readme = "README.md" targets = ["x86_64-unknown-linux-gnu"] [dependencies] +async-lock = "2.8.0" async-trait = "0.1.57" futures = { version = "0.3.21", features = ["thread-pool"] } futures-timer = "3.0.1" @@ -31,6 +32,7 @@ sp-consensus = { path = "../../../primitives/consensus/common" } sp-core = { path = "../../../primitives/core" } sp-runtime = { path = "../../../primitives/runtime" } sp-state-machine = { path = "../../../primitives/state-machine" } +tokio = "1.32.0" [dev-dependencies] sp-test-primitives = { path = "../../../primitives/test-primitives" } diff --git a/substrate/client/consensus/common/src/block_import.rs b/substrate/client/consensus/common/src/block_import.rs index a451692ad478e..617d5b07e3d64 100644 --- a/substrate/client/consensus/common/src/block_import.rs +++ b/substrate/client/consensus/common/src/block_import.rs @@ -307,10 +307,7 @@ pub trait BlockImport { type Error: std::error::Error + Send + 'static; /// Check block preconditions. - async fn check_block( - &mut self, - block: BlockCheckParams, - ) -> Result; + async fn check_block(&self, block: BlockCheckParams) -> Result; /// Import a block. async fn import_block( @@ -320,15 +317,12 @@ pub trait BlockImport { } #[async_trait::async_trait] -impl BlockImport for crate::import_queue::BoxBlockImport { +impl BlockImport for crate::import_queue::SharedBlockImport { type Error = sp_consensus::error::Error; /// Check block preconditions. - async fn check_block( - &mut self, - block: BlockCheckParams, - ) -> Result { - (**self).check_block(block).await + async fn check_block(&self, block: BlockCheckParams) -> Result { + self.read().await.check_block(block).await } /// Import a block. @@ -336,7 +330,7 @@ impl BlockImport for crate::import_queue::BoxBlockImport { &mut self, block: BlockImportParams, ) -> Result { - (**self).import_block(block).await + self.write().await.import_block(block).await } } @@ -348,10 +342,7 @@ where { type Error = E; - async fn check_block( - &mut self, - block: BlockCheckParams, - ) -> Result { + async fn check_block(&self, block: BlockCheckParams) -> Result { (&**self).check_block(block).await } diff --git a/substrate/client/consensus/common/src/import_queue.rs b/substrate/client/consensus/common/src/import_queue.rs index 39d5bf8ed35d1..079bb1cd0c0c3 100644 --- a/substrate/client/consensus/common/src/import_queue.rs +++ b/substrate/client/consensus/common/src/import_queue.rs @@ -27,7 +27,16 @@ //! instantiated. The `BasicQueue` and `BasicVerifier` traits allow serial //! queues to be instantiated simply. +use async_lock::RwLock; use log::{debug, trace}; +use std::{ + fmt, + future::Future, + ops::Deref, + pin::Pin, + sync::Arc, + time::{Duration, Instant}, +}; use sp_consensus::{error::Error as ConsensusError, BlockOrigin}; use sp_runtime::{ @@ -57,7 +66,33 @@ pub mod buffered_link; pub mod mock; /// Shared block import struct used by the queue. -pub type BoxBlockImport = Box + Send + Sync>; +pub struct SharedBlockImport( + Arc + Send + Sync>>, +); + +impl Clone for SharedBlockImport { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } +} + +impl Deref for SharedBlockImport { + type Target = RwLock + Send + Sync>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl SharedBlockImport { + /// New instance + pub fn new(block_import: BI) -> Self + where + BI: BlockImport + Send + Sync + 'static, + { + Self(Arc::new(RwLock::new(block_import))) + } +} /// Shared justification import struct used by the queue. pub type BoxJustificationImport = @@ -93,18 +128,50 @@ pub struct IncomingBlock { /// Verify a justification of a block #[async_trait::async_trait] -pub trait Verifier: Send { +pub trait Verifier: Send + Sync { + /// Whether verifier supports stateless verification. + /// + /// Stateless verification means that verification on blocks can be done in arbitrary order, + /// doesn't expect parent block to be imported first, etc. + /// + /// Verifiers that support stateless verification can verify multiple blocks concurrently, + /// significantly improving sync speed. + fn supports_stateless_verification(&self) -> bool { + // Unless re-defined by verifier is assumed to not support stateless verification. + false + } + /// Verify the given block data and return the `BlockImportParams` to /// continue the block import process. - async fn verify(&mut self, block: BlockImportParams) - -> Result, String>; + async fn verify(&self, block: BlockImportParams) -> Result, String>; +} + +impl Verifier for Arc> +where + Block: BlockT, +{ + fn supports_stateless_verification(&self) -> bool { + (**self).supports_stateless_verification() + } + + fn verify<'life0, 'async_trait>( + &'life0 self, + block: BlockImportParams, + ) -> Pin, String>> + Send + 'async_trait>> + where + 'life0: 'async_trait, + Self: 'async_trait, + { + (**self).verify(block) + } } /// Blocks import queue API. /// /// The `import_*` methods can be called in order to send elements for the import queue to verify. pub trait ImportQueueService: Send { - /// Import bunch of blocks. + /// Import bunch of blocks, every next block must be an ancestor of the previous block in the + /// list. fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>); /// Import block justifications. @@ -165,16 +232,16 @@ pub trait Link: Send { /// Block import successful result. #[derive(Debug, PartialEq)] -pub enum BlockImportStatus { +pub enum BlockImportStatus { /// Imported known block. - ImportedKnown(N, Option), + ImportedKnown(BlockNumber, Option), /// Imported unknown block. - ImportedUnknown(N, ImportedAux, Option), + ImportedUnknown(BlockNumber, ImportedAux, Option), } -impl BlockImportStatus { +impl BlockImportStatus { /// Returns the imported block number. - pub fn number(&self) -> &N { + pub fn number(&self) -> &BlockNumber { match self { BlockImportStatus::ImportedKnown(n, _) | BlockImportStatus::ImportedUnknown(n, _, _) => n, @@ -217,50 +284,42 @@ pub enum BlockImportError { type BlockImportResult = Result>, BlockImportError>; /// Single block import function. -pub async fn import_single_block>( - import_handle: &mut impl BlockImport, +pub async fn import_single_block( + import_handle: &mut BI, block_origin: BlockOrigin, - block: IncomingBlock, - verifier: &mut V, -) -> BlockImportResult { - import_single_block_metered(import_handle, block_origin, block, verifier, None).await + block: IncomingBlock, + verifier: &dyn Verifier, +) -> BlockImportResult +where + Block: BlockT, + BI: BlockImport, +{ + match verify_single_block_metered(import_handle, block_origin, block, verifier, false, None) + .await? + { + SingleBlockVerificationOutcome::Imported(import_status) => Ok(import_status), + SingleBlockVerificationOutcome::Verified(import_parameters) => + import_single_block_metered(import_handle, import_parameters, None).await, + } } -/// Single block import function with metering. -pub(crate) async fn import_single_block_metered>( - import_handle: &mut impl BlockImport, - block_origin: BlockOrigin, - block: IncomingBlock, - verifier: &mut V, - metrics: Option, -) -> BlockImportResult { - let peer = block.origin; - - let (header, justifications) = match (block.header, block.justifications) { - (Some(header), justifications) => (header, justifications), - (None, _) => { - if let Some(ref peer) = peer { - debug!(target: LOG_TARGET, "Header {} was not provided by {} ", block.hash, peer); - } else { - debug!(target: LOG_TARGET, "Header {} was not provided ", block.hash); - } - return Err(BlockImportError::IncompleteHeader(peer)) - }, - }; - - trace!(target: LOG_TARGET, "Header {} has {:?} logs", block.hash, header.digest().logs().len()); - - let number = *header.number(); - let hash = block.hash; - let parent_hash = *header.parent_hash(); - - let import_handler = |import| match import { +fn import_handler( + number: NumberFor, + hash: Block::Hash, + parent_hash: Block::Hash, + block_origin: Option, + import: Result, +) -> Result>, BlockImportError> +where + Block: BlockT, +{ + match import { Ok(ImportResult::AlreadyInChain) => { trace!(target: LOG_TARGET, "Block already in chain {}: {:?}", number, hash); - Ok(BlockImportStatus::ImportedKnown(number, peer)) + Ok(BlockImportStatus::ImportedKnown(number, block_origin)) }, Ok(ImportResult::Imported(aux)) => - Ok(BlockImportStatus::ImportedUnknown(number, aux, peer)), + Ok(BlockImportStatus::ImportedUnknown(number, aux, block_origin)), Ok(ImportResult::MissingState) => { debug!( target: LOG_TARGET, @@ -277,15 +336,67 @@ pub(crate) async fn import_single_block_metered>( }, Ok(ImportResult::KnownBad) => { debug!(target: LOG_TARGET, "Peer gave us a bad block {}: {:?}", number, hash); - Err(BlockImportError::BadBlock(peer)) + Err(BlockImportError::BadBlock(block_origin)) }, Err(e) => { debug!(target: LOG_TARGET, "Error importing block {}: {:?}: {}", number, hash, e); Err(BlockImportError::Other(e)) }, + } +} + +pub(crate) enum SingleBlockVerificationOutcome { + /// Block is already imported. + Imported(BlockImportStatus>), + /// Block is verified, but needs to be imported. + Verified(SingleBlockImportParameters), +} + +pub(crate) struct SingleBlockImportParameters { + import_block: BlockImportParams, + hash: Block::Hash, + block_origin: Option, + verification_time: Duration, +} + +/// Single block import function with metering. +pub(crate) async fn verify_single_block_metered( + import_handle: &BI, + block_origin: BlockOrigin, + block: IncomingBlock, + verifier: &dyn Verifier, + allow_missing_parent: bool, + metrics: Option<&Metrics>, +) -> Result, BlockImportError> +where + Block: BlockT, + BI: BlockImport, +{ + let peer = block.origin; + + let (header, justifications) = match (block.header, block.justifications) { + (Some(header), justifications) => (header, justifications), + (None, _) => { + if let Some(ref peer) = peer { + debug!(target: LOG_TARGET, "Header {} was not provided by {} ", block.hash, peer); + } else { + debug!(target: LOG_TARGET, "Header {} was not provided ", block.hash); + } + return Err(BlockImportError::IncompleteHeader(peer)) + }, }; - match import_handler( + trace!(target: LOG_TARGET, "Header {} has {:?} logs", block.hash, header.digest().logs().len()); + + let number = *header.number(); + let hash = block.hash; + let parent_hash = *header.parent_hash(); + + match import_handler::( + number, + hash, + parent_hash, + peer, import_handle .check_block(BlockCheckParams { hash, @@ -293,15 +404,18 @@ pub(crate) async fn import_single_block_metered>( parent_hash, allow_missing_state: block.allow_missing_state, import_existing: block.import_existing, - allow_missing_parent: block.state.is_some(), + allow_missing_parent: allow_missing_parent || block.state.is_some(), }) .await, )? { BlockImportStatus::ImportedUnknown { .. } => (), - r => return Ok(r), // Any other successful result means that the block is already imported. + r => { + // Any other successful result means that the block is already imported. + return Ok(SingleBlockVerificationOutcome::Imported(r)) + }, } - let started = std::time::Instant::now(); + let started = Instant::now(); let mut import_block = BlockImportParams::new(block_origin, header); import_block.body = block.body; @@ -332,19 +446,46 @@ pub(crate) async fn import_single_block_metered>( } else { trace!(target: LOG_TARGET, "Verifying {}({}) failed: {}", number, hash, msg); } - if let Some(metrics) = metrics.as_ref() { + if let Some(metrics) = metrics { metrics.report_verification(false, started.elapsed()); } BlockImportError::VerificationFailed(peer, msg) })?; - if let Some(metrics) = metrics.as_ref() { - metrics.report_verification(true, started.elapsed()); + let verification_time = started.elapsed(); + if let Some(metrics) = metrics { + metrics.report_verification(true, verification_time); } + Ok(SingleBlockVerificationOutcome::Verified(SingleBlockImportParameters { + import_block, + hash, + block_origin: peer, + verification_time, + })) +} + +pub(crate) async fn import_single_block_metered( + import_handle: &mut BI, + import_parameters: SingleBlockImportParameters, + metrics: Option<&Metrics>, +) -> BlockImportResult +where + Block: BlockT, + BI: BlockImport, +{ + let started = Instant::now(); + + let SingleBlockImportParameters { import_block, hash, block_origin, verification_time } = + import_parameters; + + let number = *import_block.header.number(); + let parent_hash = *import_block.header.parent_hash(); + let imported = import_handle.import_block(import_block).await; - if let Some(metrics) = metrics.as_ref() { - metrics.report_verification_and_import(started.elapsed()); + if let Some(metrics) = metrics { + metrics.report_verification_and_import(started.elapsed() + verification_time); } - import_handler(imported) + + import_handler::(number, hash, parent_hash, block_origin, imported) } diff --git a/substrate/client/consensus/common/src/import_queue/basic_queue.rs b/substrate/client/consensus/common/src/import_queue/basic_queue.rs index 1cc7ec26fd193..a60ce7bc6bfc9 100644 --- a/substrate/client/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/client/consensus/common/src/import_queue/basic_queue.rs @@ -17,9 +17,9 @@ // along with this program. If not, see . use futures::{ prelude::*, + stream::FuturesOrdered, task::{Context, Poll}, }; -use futures_timer::Delay; use log::{debug, trace}; use prometheus_endpoint::Registry; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; @@ -28,14 +28,22 @@ use sp_runtime::{ traits::{Block as BlockT, Header as HeaderT, NumberFor}, Justification, Justifications, }; -use std::{pin::Pin, time::Duration}; +use std::{ + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; +use tokio::{runtime::Handle, task}; use crate::{ import_queue::{ buffered_link::{self, BufferedLinkReceiver, BufferedLinkSender}, - import_single_block_metered, BlockImportError, BlockImportStatus, BoxBlockImport, - BoxJustificationImport, ImportQueue, ImportQueueService, IncomingBlock, Link, - RuntimeOrigin, Verifier, LOG_TARGET, + import_single_block_metered, verify_single_block_metered, BlockImportError, + BlockImportStatus, BoxJustificationImport, ImportQueue, ImportQueueService, IncomingBlock, + Link, RuntimeOrigin, SharedBlockImport, SingleBlockVerificationOutcome, Verifier, + LOG_TARGET, }, metrics::Metrics, }; @@ -61,13 +69,16 @@ impl BasicQueue { /// Instantiate a new basic queue, with given verifier. /// /// This creates a background task, and calls `on_start` on the justification importer. - pub fn new>( + pub fn new( verifier: V, - block_import: BoxBlockImport, + block_import: SharedBlockImport, justification_import: Option>, spawner: &impl sp_core::traits::SpawnEssentialNamed, prometheus_registry: Option<&Registry>, - ) -> Self { + ) -> Self + where + V: Verifier + 'static, + { let (result_sender, result_port) = buffered_link::buffered_link(100_000); let metrics = prometheus_registry.and_then(|r| { @@ -198,7 +209,7 @@ impl ImportQueue for BasicQueue { } } -/// Messages destinated to the background worker. +/// Messages designated to the background worker. mod worker_messages { use super::*; @@ -219,13 +230,13 @@ mod worker_messages { /// /// Returns when `block_import` ended. async fn block_import_process( - mut block_import: BoxBlockImport, - mut verifier: impl Verifier, + mut block_import: SharedBlockImport, + verifier: impl Verifier + 'static, mut result_sender: BufferedLinkSender, mut block_import_receiver: TracingUnboundedReceiver>, metrics: Option, - delay_between_blocks: Duration, ) { + let verifier: Arc> = Arc::new(verifier); loop { let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await { @@ -239,15 +250,18 @@ async fn block_import_process( }, }; - let res = import_many_blocks( - &mut block_import, - origin, - blocks, - &mut verifier, - delay_between_blocks, - metrics.clone(), - ) - .await; + let res = if verifier.supports_stateless_verification() { + import_many_blocks_with_stateless_verification( + &mut block_import, + origin, + blocks, + &verifier, + metrics.clone(), + ) + .await + } else { + import_many_blocks(&mut block_import, origin, blocks, &verifier, metrics.clone()).await + }; result_sender.blocks_processed(res.imported, res.block_count, res.results); } @@ -260,29 +274,30 @@ struct BlockImportWorker { } impl BlockImportWorker { - fn new>( + fn new( result_sender: BufferedLinkSender, verifier: V, - block_import: BoxBlockImport, + block_import: SharedBlockImport, justification_import: Option>, metrics: Option, ) -> ( impl Future + Send, TracingUnboundedSender>, TracingUnboundedSender>, - ) { + ) + where + V: Verifier + 'static, + { use worker_messages::*; let (justification_sender, mut justification_port) = tracing_unbounded("mpsc_import_queue_worker_justification", 100_000); - let (block_import_sender, block_import_port) = + let (block_import_sender, block_import_receiver) = tracing_unbounded("mpsc_import_queue_worker_blocks", 100_000); let mut worker = BlockImportWorker { result_sender, justification_import, metrics }; - let delay_between_blocks = Duration::default(); - let future = async move { // Let's initialize `justification_import` if let Some(justification_import) = worker.justification_import.as_mut() { @@ -295,9 +310,8 @@ impl BlockImportWorker { block_import, verifier, worker.result_sender.clone(), - block_import_port, + block_import_receiver, worker.metrics.clone(), - delay_between_blocks, ); futures::pin_mut!(block_import_process); @@ -389,12 +403,14 @@ struct ImportManyBlocksResult { /// /// This will yield after each imported block once, to ensure that other futures can /// be called as well. +/// +/// For verifiers that support stateless verification use +/// [`import_many_blocks_with_stateless_verification`] for better performance. async fn import_many_blocks>( - import_handle: &mut BoxBlockImport, + import_handle: &mut SharedBlockImport, blocks_origin: BlockOrigin, blocks: Vec>, - verifier: &mut V, - delay_between_blocks: Duration, + verifier: &V, metrics: Option, ) -> ImportManyBlocksResult { let count = blocks.len(); @@ -431,15 +447,23 @@ async fn import_many_blocks>( let import_result = if has_error { Err(BlockImportError::Cancelled) } else { - // The actual import. - import_single_block_metered( + let verification_fut = verify_single_block_metered( import_handle, blocks_origin, block, verifier, - metrics.clone(), - ) - .await + false, + metrics.as_ref(), + ); + match verification_fut.await { + Ok(SingleBlockVerificationOutcome::Imported(import_status)) => Ok(import_status), + Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) => { + // The actual import. + import_single_block_metered(import_handle, import_parameters, metrics.as_ref()) + .await + }, + Err(e) => Err(e), + } }; if let Some(metrics) = metrics.as_ref() { @@ -460,12 +484,116 @@ async fn import_many_blocks>( results.push((import_result, block_hash)); - if delay_between_blocks != Duration::default() && !has_error { - Delay::new(delay_between_blocks).await; + Yield::new().await + } +} + +/// The same as [`import_many_blocks()`]`, but for verifiers that support stateless verification of +/// blocks (use [`Verifier::supports_stateless_verification()`]). +async fn import_many_blocks_with_stateless_verification( + import_handle: &mut SharedBlockImport, + blocks_origin: BlockOrigin, + blocks: Vec>, + verifier: &Arc>, + metrics: Option, +) -> ImportManyBlocksResult { + let count = blocks.len(); + + let blocks_range = match ( + blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), + blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), + ) { + (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), + (Some(first), Some(_)) => format!(" ({})", first), + _ => Default::default(), + }; + + trace!(target: LOG_TARGET, "Starting import of {} blocks {}", count, blocks_range); + + let has_error = Arc::new(AtomicBool::new(false)); + + // Blocks in the response/drain should be in ascending order. + let mut verified_blocks = blocks + .into_iter() + .enumerate() + .map(|(index, block)| { + let import_handle = import_handle.clone(); + let verifier = Arc::clone(verifier); + let metrics = metrics.clone(); + let has_error = Arc::clone(&has_error); + + async move { + let block_number = block.header.as_ref().map(|h| *h.number()); + let block_hash = block.hash; + + let result = if has_error.load(Ordering::Acquire) { + Err(BlockImportError::Cancelled) + } else { + task::spawn_blocking(move || { + Handle::current().block_on(verify_single_block_metered( + &import_handle, + blocks_origin, + block, + &verifier, + // Check parent for the first block, but skip for others since blocks + // are verified concurrently before being imported. + index != 0, + metrics.as_ref(), + )) + }) + .await + .unwrap_or_else(|error| { + Err(BlockImportError::Other(sp_consensus::Error::Other( + format!("Failed to join on block verification: {error}").into(), + ))) + }) + }; + + (block_number, block_hash, result) + } + }) + .collect::>(); + + let mut imported = 0; + let mut results = vec![]; + + while let Some((block_number, block_hash, verification_result)) = verified_blocks.next().await { + let import_result = if has_error.load(Ordering::Acquire) { + Err(BlockImportError::Cancelled) } else { - Yield::new().await + // The actual import. + match verification_result { + Ok(SingleBlockVerificationOutcome::Imported(import_status)) => Ok(import_status), + Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) => + import_single_block_metered(import_handle, import_parameters, metrics.as_ref()) + .await, + Err(e) => Err(e), + } + }; + + if let Some(metrics) = metrics.as_ref() { + metrics.report_import::(&import_result); } + + if import_result.is_ok() { + trace!( + target: LOG_TARGET, + "Block imported successfully {:?} ({})", + block_number, + block_hash, + ); + imported += 1; + } else { + has_error.store(true, Ordering::Release); + } + + results.push((import_result, block_hash)); + + Yield::new().await } + + // No block left to import, success! + ImportManyBlocksResult { block_count: count, imported, results } } /// A future that will always `yield` on the first call of `poll` but schedules the @@ -510,7 +638,7 @@ mod tests { #[async_trait::async_trait] impl Verifier for () { async fn verify( - &mut self, + &self, block: BlockImportParams, ) -> Result, String> { Ok(BlockImportParams::new(block.origin, block.header)) @@ -522,7 +650,7 @@ mod tests { type Error = sp_consensus::Error; async fn check_block( - &mut self, + &self, _block: BlockCheckParams, ) -> Result { Ok(ImportResult::imported(false)) @@ -592,8 +720,13 @@ mod tests { fn prioritizes_finality_work_over_block_import() { let (result_sender, mut result_port) = buffered_link::buffered_link(100_000); - let (worker, finality_sender, block_import_sender) = - BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None); + let (worker, finality_sender, block_import_sender) = BlockImportWorker::new( + result_sender, + (), + SharedBlockImport::new(()), + Some(Box::new(())), + None, + ); futures::pin_mut!(worker); let import_block = |n| { diff --git a/substrate/client/consensus/common/src/lib.rs b/substrate/client/consensus/common/src/lib.rs index 6bf1ed0b48b4d..5e1c8f67112a9 100644 --- a/substrate/client/consensus/common/src/lib.rs +++ b/substrate/client/consensus/common/src/lib.rs @@ -28,8 +28,8 @@ pub use block_import::{ StorageChanges, }; pub use import_queue::{ - import_single_block, BasicQueue, BlockImportError, BlockImportStatus, BoxBlockImport, - BoxJustificationImport, DefaultImportQueue, ImportQueue, IncomingBlock, Link, Verifier, + import_single_block, BasicQueue, BlockImportError, BlockImportStatus, BoxJustificationImport, + DefaultImportQueue, ImportQueue, IncomingBlock, Link, SharedBlockImport, Verifier, }; mod longest_chain; diff --git a/substrate/client/consensus/grandpa/src/import.rs b/substrate/client/consensus/grandpa/src/import.rs index ca5b7c400bfb2..a88681b1c664e 100644 --- a/substrate/client/consensus/grandpa/src/import.rs +++ b/substrate/client/consensus/grandpa/src/import.rs @@ -515,7 +515,7 @@ where Client: ClientForGrandpa, Client::Api: GrandpaApi, for<'a> &'a Client: BlockImport, - SC: Send, + SC: Send + Sync, { type Error = ConsensusError; @@ -694,7 +694,7 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { self.inner.check_block(block).await diff --git a/substrate/client/consensus/manual-seal/src/consensus/babe.rs b/substrate/client/consensus/manual-seal/src/consensus/babe.rs index 26fa81459808c..647e2f1903d84 100644 --- a/substrate/client/consensus/manual-seal/src/consensus/babe.rs +++ b/substrate/client/consensus/manual-seal/src/consensus/babe.rs @@ -96,7 +96,7 @@ where C: HeaderBackend + HeaderMetadata, { async fn verify( - &mut self, + &self, mut import_params: BlockImportParams, ) -> Result, String> { import_params.finalized = false; diff --git a/substrate/client/consensus/manual-seal/src/lib.rs b/substrate/client/consensus/manual-seal/src/lib.rs index 1e5db966e66db..0b75f6c6cf0d1 100644 --- a/substrate/client/consensus/manual-seal/src/lib.rs +++ b/substrate/client/consensus/manual-seal/src/lib.rs @@ -28,7 +28,7 @@ use sc_client_api::{ }; use sc_consensus::{ block_import::{BlockImport, BlockImportParams, ForkChoiceStrategy}, - import_queue::{BasicQueue, BoxBlockImport, Verifier}, + import_queue::{BasicQueue, SharedBlockImport, Verifier}, }; use sp_blockchain::HeaderBackend; use sp_consensus::{Environment, Proposer, SelectChain}; @@ -65,7 +65,7 @@ struct ManualSealVerifier; #[async_trait::async_trait] impl Verifier for ManualSealVerifier { async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { block.finalized = false; @@ -76,7 +76,7 @@ impl Verifier for ManualSealVerifier { /// Instantiate the import queue for the manual seal consensus engine. pub fn import_queue( - block_import: BoxBlockImport, + block_import: SharedBlockImport, spawner: &impl sp_core::traits::SpawnEssentialNamed, registry: Option<&Registry>, ) -> BasicQueue diff --git a/substrate/client/consensus/pow/src/lib.rs b/substrate/client/consensus/pow/src/lib.rs index ee5c1dfc6f11a..c816498902d50 100644 --- a/substrate/client/consensus/pow/src/lib.rs +++ b/substrate/client/consensus/pow/src/lib.rs @@ -50,8 +50,8 @@ use log::*; use prometheus_endpoint::Registry; use sc_client_api::{self, backend::AuxStore, BlockOf, BlockchainEvents}; use sc_consensus::{ - BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxBlockImport, - BoxJustificationImport, ForkChoiceStrategy, ImportResult, Verifier, + BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxJustificationImport, + ForkChoiceStrategy, ImportResult, SharedBlockImport, Verifier, }; use sp_api::ProvideRuntimeApi; use sp_block_builder::BlockBuilder as BlockBuilderApi; @@ -312,10 +312,7 @@ where { type Error = ConsensusError; - async fn check_block( - &mut self, - block: BlockCheckParams, - ) -> Result { + async fn check_block(&self, block: BlockCheckParams) -> Result { self.inner.check_block(block).await.map_err(Into::into) } @@ -442,7 +439,7 @@ where Algorithm::Difficulty: 'static + Send, { async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { let hash = block.header.hash(); @@ -463,7 +460,7 @@ pub type PowImportQueue = BasicQueue; /// Import queue for PoW engine. pub fn import_queue( - block_import: BoxBlockImport, + block_import: SharedBlockImport, justification_import: Option>, algorithm: Algorithm, spawner: &impl sp_core::traits::SpawnEssentialNamed, @@ -489,7 +486,7 @@ where /// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted /// for blocks being built. This can encode authorship information, or just be a graffiti. pub fn start_mining_worker( - block_import: BoxBlockImport, + block_import: SharedBlockImport, client: Arc, select_chain: S, algorithm: Algorithm, diff --git a/substrate/client/consensus/pow/src/worker.rs b/substrate/client/consensus/pow/src/worker.rs index 9e9c4fc137d86..ad3a37a5a9f63 100644 --- a/substrate/client/consensus/pow/src/worker.rs +++ b/substrate/client/consensus/pow/src/worker.rs @@ -24,7 +24,9 @@ use futures_timer::Delay; use log::*; use parking_lot::Mutex; use sc_client_api::ImportNotifications; -use sc_consensus::{BlockImportParams, BoxBlockImport, StateAction, StorageChanges}; +use sc_consensus::{ + BlockImport, BlockImportParams, SharedBlockImport, StateAction, StorageChanges, +}; use sp_consensus::{BlockOrigin, Proposal}; use sp_runtime::{ generic::BlockId, @@ -78,7 +80,7 @@ pub struct MiningHandle< algorithm: Arc, justification_sync_link: Arc, build: Arc>>>, - block_import: Arc>>, + block_import: SharedBlockImport, } impl MiningHandle @@ -94,7 +96,7 @@ where pub(crate) fn new( algorithm: Algorithm, - block_import: BoxBlockImport, + block_import: SharedBlockImport, justification_sync_link: L, ) -> Self { Self { @@ -102,7 +104,7 @@ where algorithm: Arc::new(algorithm), justification_sync_link: Arc::new(justification_sync_link), build: Arc::new(Mutex::new(None)), - block_import: Arc::new(Mutex::new(block_import)), + block_import, } } @@ -192,9 +194,8 @@ where import_block.insert_intermediate(INTERMEDIATE_KEY, intermediate); let header = import_block.post_header(); - let mut block_import = self.block_import.lock(); - match block_import.import_block(import_block).await { + match self.block_import.clone().import_block(import_block).await { Ok(res) => { res.handle_justification( &header.hash(), diff --git a/substrate/client/network/test/src/block_import.rs b/substrate/client/network/test/src/block_import.rs index 25e3b9bee87f1..08287620ba429 100644 --- a/substrate/client/network/test/src/block_import.rs +++ b/substrate/client/network/test/src/block_import.rs @@ -23,7 +23,7 @@ use futures::executor::block_on; use sc_block_builder::BlockBuilderProvider; use sc_consensus::{ import_single_block, BasicQueue, BlockImportError, BlockImportStatus, ImportedAux, - IncomingBlock, + IncomingBlock, SharedBlockImport, }; use sp_consensus::BlockOrigin; use substrate_test_runtime_client::{ @@ -118,7 +118,7 @@ fn async_import_queue_drops() { let queue = BasicQueue::new( verifier, - Box::new(substrate_test_runtime_client::new()), + SharedBlockImport::new(substrate_test_runtime_client::new()), None, &executor, None, diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 463624690a5f6..ae39d424ae398 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -48,7 +48,7 @@ use sc_client_api::{ use sc_consensus::{ BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxJustificationImport, ForkChoiceStrategy, ImportQueue, ImportResult, JustificationImport, JustificationSyncLink, - LongestChain, Verifier, + LongestChain, SharedBlockImport, Verifier, }; use sc_network::{ config::{ @@ -115,7 +115,7 @@ impl PassThroughVerifier { #[async_trait::async_trait] impl Verifier for PassThroughVerifier { async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { if block.fork_choice.is_none() { @@ -211,7 +211,7 @@ impl BlockImport for PeersClient { type Error = ConsensusError; async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { self.client.check_block(block).await @@ -587,7 +587,7 @@ where type Error = ConsensusError; async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { self.inner.check_block(block).await @@ -609,10 +609,7 @@ struct VerifierAdapter { #[async_trait::async_trait] impl Verifier for VerifierAdapter { - async fn verify( - &mut self, - block: BlockImportParams, - ) -> Result, String> { + async fn verify(&self, block: BlockImportParams) -> Result, String> { let hash = block.header.hash(); self.verifier.lock().await.verify(block).await.map_err(|e| { self.failed_verifications.lock().insert(hash, e.clone()); @@ -770,7 +767,7 @@ pub trait TestNetFactory: Default + Sized + Send { let import_queue = Box::new(BasicQueue::new( verifier.clone(), - Box::new(block_import.clone()), + SharedBlockImport::new(block_import.clone()), justification_import, &sp_core::testing::TaskExecutor::new(), None, diff --git a/substrate/client/network/test/src/service.rs b/substrate/client/network/test/src/service.rs index baa562c46dfa3..7aeaf2a8918e1 100644 --- a/substrate/client/network/test/src/service.rs +++ b/substrate/client/network/test/src/service.rs @@ -19,7 +19,7 @@ use futures::prelude::*; use libp2p::{Multiaddr, PeerId}; -use sc_consensus::{ImportQueue, Link}; +use sc_consensus::{ImportQueue, Link, SharedBlockImport}; use sc_network::{ config::{self, FullNetworkConfiguration, MultiaddrWithPeerId, ProtocolId, TransportConfig}, event::Event, @@ -134,7 +134,7 @@ impl TestNetworkBuilder { #[async_trait::async_trait] impl sc_consensus::Verifier for PassThroughVerifier { async fn verify( - &mut self, + &self, mut block: sc_consensus::BlockImportParams, ) -> Result, String> { block.finalized = self.0; @@ -146,7 +146,7 @@ impl TestNetworkBuilder { let mut import_queue = self.import_queue.unwrap_or(Box::new(sc_consensus::BasicQueue::new( PassThroughVerifier(false), - Box::new(client.clone()), + SharedBlockImport::new(client.clone()), None, &sp_core::testing::TaskExecutor::new(), None, diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs index 09c1673884aac..46d3766ba98c3 100644 --- a/substrate/client/service/src/client/client.rs +++ b/substrate/client/service/src/client/client.rs @@ -1781,7 +1781,7 @@ where /// Check block preconditions. async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { let BlockCheckParams { @@ -1863,10 +1863,10 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { - (&*self).check_block(block).await + (&self).check_block(block).await } }