diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 7486c091ebf13..c1a7009eeb017 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -33,7 +33,7 @@ use crate::{ }, strategy::{ warp::{EncodedProof, WarpProofRequest, WarpSyncParams}, - SyncingAction, SyncingConfig, SyncingStrategy, + StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy, }, types::{ BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent, @@ -48,7 +48,7 @@ use futures::{ FutureExt, StreamExt, }; use libp2p::{request_response::OutboundFailure, PeerId}; -use log::{debug, error, trace}; +use log::{debug, error, trace, warn}; use prometheus_endpoint::{ register, Counter, Gauge, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64, }; @@ -214,9 +214,6 @@ pub struct SyncingEngine { /// Syncing strategy. strategy: SyncingStrategy, - /// Syncing configuration for startegies. - syncing_config: SyncingConfig, - /// Blockchain client. client: Arc, @@ -441,8 +438,7 @@ where .map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse()); // Initialize syncing strategy. - let strategy = - SyncingStrategy::new(syncing_config.clone(), client.clone(), warp_sync_config)?; + let strategy = SyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?; let block_announce_protocol_name = block_announce_config.protocol_name().clone(); let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000); @@ -471,7 +467,6 @@ where roles, client, strategy, - syncing_config, network_service, peers: HashMap::new(), block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)), @@ -661,8 +656,15 @@ where Some(event) => self.process_notification_event(event), None => return, }, - warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused => - self.pass_warp_sync_target_block_header(warp_target_block_header), + warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused => { + if let Err(_) = self.pass_warp_sync_target_block_header(warp_target_block_header) { + error!( + target: LOG_TARGET, + "Failed to set warp sync target block header, terminating `SyncingEngine`.", + ); + return + } + }, response_event = self.pending_responses.select_next_some() => self.process_response_event(response_event), validation_result = self.block_announce_validator.select_next_some() => @@ -675,48 +677,61 @@ where // Process actions requested by a syncing strategy. if let Err(e) = self.process_strategy_actions() { - error!("Terminating `SyncingEngine` due to fatal error: {e:?}"); + error!( + target: LOG_TARGET, + "Terminating `SyncingEngine` due to fatal error: {e:?}.", + ); return } } } fn process_strategy_actions(&mut self) -> Result<(), ClientError> { - for action in self.strategy.actions() { + for action in self.strategy.actions()? { match action { - SyncingAction::SendBlockRequest { peer_id, request } => { + SyncingAction::SendBlockRequest { peer_id, key, request } => { // Sending block request implies dropping obsolete pending response as we are // not interested in it anymore (see [`SyncingAction::SendBlockRequest`]). - // Furthermore, only one request at a time is allowed to any peer. - let removed = self.pending_responses.remove(&peer_id); - self.send_block_request(peer_id, request.clone()); - - trace!( - target: LOG_TARGET, - "Processed `ChainSyncAction::SendBlockRequest` to {} with {:?}, stale response removed: {}.", - peer_id, - request, - removed, - ) + let removed = self.pending_responses.remove(peer_id, key); + self.send_block_request(peer_id, key, request.clone()); + + if removed { + warn!( + target: LOG_TARGET, + "Processed `ChainSyncAction::SendBlockRequest` to {} from {:?} with {:?}. \ + Stale response removed!", + peer_id, + key, + request, + ) + } else { + trace!( + target: LOG_TARGET, + "Processed `ChainSyncAction::SendBlockRequest` to {} from {:?} with {:?}.", + peer_id, + key, + request, + ) + } }, - SyncingAction::CancelBlockRequest { peer_id } => { - let removed = self.pending_responses.remove(&peer_id); + SyncingAction::CancelRequest { peer_id, key } => { + let removed = self.pending_responses.remove(peer_id, key); trace!( target: LOG_TARGET, "Processed {action:?}, response removed: {removed}.", ); }, - SyncingAction::SendStateRequest { peer_id, request } => { - self.send_state_request(peer_id, request); + SyncingAction::SendStateRequest { peer_id, key, request } => { + self.send_state_request(peer_id, key, request); trace!( target: LOG_TARGET, - "Processed `ChainSyncAction::SendBlockRequest` to {peer_id}.", + "Processed `ChainSyncAction::SendStateRequest` to {peer_id}.", ); }, - SyncingAction::SendWarpProofRequest { peer_id, request } => { - self.send_warp_proof_request(peer_id, request.clone()); + SyncingAction::SendWarpProofRequest { peer_id, key, request } => { + self.send_warp_proof_request(peer_id, key, request.clone()); trace!( target: LOG_TARGET, @@ -726,7 +741,7 @@ where ); }, SyncingAction::DropPeer(BadPeer(peer_id, rep)) => { - self.pending_responses.remove(&peer_id); + self.pending_responses.remove_all(&peer_id); self.network_service .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); self.network_service.report_peer(peer_id, rep); @@ -753,20 +768,8 @@ where number, ) }, - SyncingAction::Finished => { - let connected_peers = self.peers.iter().filter_map(|(peer_id, peer)| { - peer.info.roles.is_full().then_some(( - *peer_id, - peer.info.best_hash, - peer.info.best_number, - )) - }); - self.strategy.switch_to_next( - self.syncing_config.clone(), - self.client.clone(), - connected_peers, - )?; - }, + // Nothing to do, this is handled internally by `SyncingStrategy`. + SyncingAction::Finished => {}, } } @@ -948,23 +951,18 @@ where } } - fn pass_warp_sync_target_block_header(&mut self, header: Result) { + fn pass_warp_sync_target_block_header( + &mut self, + header: Result, + ) -> Result<(), ()> { match header { - Ok(header) => - if let SyncingStrategy::WarpSyncStrategy(warp_sync) = &mut self.strategy { - warp_sync.set_target_block(header); - } else { - error!( - target: LOG_TARGET, - "Cannot set warp sync target block: no warp sync strategy is active." - ); - debug_assert!(false); - }, + Ok(header) => self.strategy.set_warp_sync_target_block_header(header), Err(err) => { error!( target: LOG_TARGET, "Failed to get target block for warp sync. Error: {err:?}", ); + Err(()) }, } } @@ -1002,7 +1000,7 @@ where } self.strategy.remove_peer(&peer_id); - self.pending_responses.remove(&peer_id); + self.pending_responses.remove_all(&peer_id); self.event_streams .retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok()); } @@ -1167,7 +1165,7 @@ where Ok(()) } - fn send_block_request(&mut self, peer_id: PeerId, request: BlockRequest) { + fn send_block_request(&mut self, peer_id: PeerId, key: StrategyKey, request: BlockRequest) { if !self.peers.contains_key(&peer_id) { trace!(target: LOG_TARGET, "Cannot send block request to unknown peer {peer_id}"); debug_assert!(false); @@ -1178,12 +1176,18 @@ where self.pending_responses.insert( peer_id, + key, PeerRequest::Block(request.clone()), async move { downloader.download_blocks(peer_id, request).await }.boxed(), ); } - fn send_state_request(&mut self, peer_id: PeerId, request: OpaqueStateRequest) { + fn send_state_request( + &mut self, + peer_id: PeerId, + key: StrategyKey, + request: OpaqueStateRequest, + ) { if !self.peers.contains_key(&peer_id) { trace!(target: LOG_TARGET, "Cannot send state request to unknown peer {peer_id}"); debug_assert!(false); @@ -1192,7 +1196,7 @@ where let (tx, rx) = oneshot::channel(); - self.pending_responses.insert(peer_id, PeerRequest::State, rx.boxed()); + self.pending_responses.insert(peer_id, key, PeerRequest::State, rx.boxed()); match Self::encode_state_request(&request) { Ok(data) => { @@ -1213,7 +1217,12 @@ where } } - fn send_warp_proof_request(&mut self, peer_id: PeerId, request: WarpProofRequest) { + fn send_warp_proof_request( + &mut self, + peer_id: PeerId, + key: StrategyKey, + request: WarpProofRequest, + ) { if !self.peers.contains_key(&peer_id) { trace!(target: LOG_TARGET, "Cannot send warp proof request to unknown peer {peer_id}"); debug_assert!(false); @@ -1222,7 +1231,7 @@ where let (tx, rx) = oneshot::channel(); - self.pending_responses.insert(peer_id, PeerRequest::WarpProof, rx.boxed()); + self.pending_responses.insert(peer_id, key, PeerRequest::WarpProof, rx.boxed()); match &self.warp_sync_protocol_name { Some(name) => self.network_service.start_request( @@ -1259,14 +1268,14 @@ where } fn process_response_event(&mut self, response_event: ResponseEvent) { - let ResponseEvent { peer_id, request, response } = response_event; + let ResponseEvent { peer_id, key, request, response } = response_event; match response { Ok(Ok((resp, _))) => match request { PeerRequest::Block(req) => { match self.block_downloader.block_response_into_blocks(&req, resp) { Ok(blocks) => { - self.strategy.on_block_response(peer_id, req, blocks); + self.strategy.on_block_response(peer_id, key, req, blocks); }, Err(BlockResponseError::DecodeFailed(e)) => { debug!( @@ -1311,10 +1320,10 @@ where }, }; - self.strategy.on_state_response(peer_id, response); + self.strategy.on_state_response(peer_id, key, response); }, PeerRequest::WarpProof => { - self.strategy.on_warp_proof_response(&peer_id, EncodedProof(resp)); + self.strategy.on_warp_proof_response(&peer_id, key, EncodedProof(resp)); }, }, Ok(Err(e)) => { diff --git a/substrate/client/network/sync/src/extra_requests.rs b/substrate/client/network/sync/src/justification_requests.rs similarity index 98% rename from substrate/client/network/sync/src/extra_requests.rs rename to substrate/client/network/sync/src/justification_requests.rs index cd3008d270b1f..799b6df5831a5 100644 --- a/substrate/client/network/sync/src/extra_requests.rs +++ b/substrate/client/network/sync/src/justification_requests.rs @@ -16,6 +16,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +//! Justification requests scheduling. [`ExtraRequests`] manages requesting justifications +//! from peers taking into account forks and their finalization (dropping pending requests +//! that don't make sense after one of the forks is finalized). + use crate::{ request_metrics::Metrics, strategy::chain_sync::{PeerSync, PeerSyncState}, diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 494e3b87aa955..e23a23e735d3e 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -23,8 +23,8 @@ pub use strategy::warp::{WarpSyncParams, WarpSyncPhase, WarpSyncProgress}; pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusProvider}; mod block_announce_validator; -mod extra_requests; mod futures_stream; +mod justification_requests; mod pending_responses; mod request_metrics; mod schema; diff --git a/substrate/client/network/sync/src/pending_responses.rs b/substrate/client/network/sync/src/pending_responses.rs index 21e409eb847fe..602c69df7ff96 100644 --- a/substrate/client/network/sync/src/pending_responses.rs +++ b/substrate/client/network/sync/src/pending_responses.rs @@ -19,7 +19,7 @@ //! [`PendingResponses`] is responsible for keeping track of pending responses and //! polling them. [`Stream`] implemented by [`PendingResponses`] never terminates. -use crate::{types::PeerRequest, LOG_TARGET}; +use crate::{strategy::StrategyKey, types::PeerRequest, LOG_TARGET}; use futures::{ channel::oneshot, future::BoxFuture, @@ -42,6 +42,7 @@ type ResponseFuture = BoxFuture<'static, ResponseResult>; /// An event we receive once a pending response future resolves. pub(crate) struct ResponseEvent { pub peer_id: PeerId, + pub key: StrategyKey, pub request: PeerRequest, pub response: ResponseResult, } @@ -49,7 +50,8 @@ pub(crate) struct ResponseEvent { /// Stream taking care of polling pending responses. pub(crate) struct PendingResponses { /// Pending responses - pending_responses: StreamMap, ResponseResult)>>, + pending_responses: + StreamMap<(PeerId, StrategyKey), BoxStream<'static, (PeerRequest, ResponseResult)>>, /// Waker to implement never terminating stream waker: Option, } @@ -62,6 +64,7 @@ impl PendingResponses { pub fn insert( &mut self, peer_id: PeerId, + key: StrategyKey, request: PeerRequest, response_future: ResponseFuture, ) { @@ -70,7 +73,7 @@ impl PendingResponses { if self .pending_responses .insert( - peer_id, + (peer_id, key), Box::pin(async move { (request, response_future.await) }.into_stream()), ) .is_some() @@ -87,8 +90,20 @@ impl PendingResponses { } } - pub fn remove(&mut self, peer_id: &PeerId) -> bool { - self.pending_responses.remove(peer_id).is_some() + pub fn remove(&mut self, peer_id: PeerId, key: StrategyKey) -> bool { + self.pending_responses.remove(&(peer_id, key)).is_some() + } + + pub fn remove_all(&mut self, peer_id: &PeerId) { + let to_remove = self + .pending_responses + .keys() + .filter(|(peer, _key)| peer == peer_id) + .cloned() + .collect::>(); + to_remove.iter().for_each(|k| { + self.pending_responses.remove(k); + }); } pub fn len(&self) -> usize { @@ -104,13 +119,13 @@ impl Stream for PendingResponses { cx: &mut Context<'_>, ) -> Poll> { match self.pending_responses.poll_next_unpin(cx) { - Poll::Ready(Some((peer_id, (request, response)))) => { + Poll::Ready(Some(((peer_id, key), (request, response)))) => { // We need to manually remove the stream, because `StreamMap` doesn't know yet that // it's going to yield `None`, so may not remove it before the next request is made // to the same peer. - self.pending_responses.remove(&peer_id); + self.pending_responses.remove(&(peer_id, key)); - Poll::Ready(Some(ResponseEvent { peer_id, request, response })) + Poll::Ready(Some(ResponseEvent { peer_id, key, request, response })) }, Poll::Ready(None) | Poll::Pending => { self.waker = Some(cx.waker().clone()); diff --git a/substrate/client/network/sync/src/strategy.rs b/substrate/client/network/sync/src/strategy.rs index dbfb4188ec3f3..7d6e6a8d3b8b7 100644 --- a/substrate/client/network/sync/src/strategy.rs +++ b/substrate/client/network/sync/src/strategy.rs @@ -30,7 +30,7 @@ use crate::{ }; use chain_sync::{ChainSync, ChainSyncAction, ChainSyncMode}; use libp2p::PeerId; -use log::{error, info}; +use log::{debug, error, info}; use prometheus_endpoint::Registry; use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; @@ -41,11 +41,11 @@ use sc_network_common::sync::{ use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata}; use sp_consensus::BlockOrigin; use sp_runtime::{ - traits::{Block as BlockT, NumberFor}, + traits::{Block as BlockT, Header, NumberFor}, Justifications, }; use state::{StateStrategy, StateStrategyAction}; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use warp::{EncodedProof, WarpProofRequest, WarpSync, WarpSyncAction, WarpSyncConfig}; /// Corresponding `ChainSync` mode. @@ -71,16 +71,27 @@ pub struct SyncingConfig { pub metrics_registry: Option, } +/// The key identifying a specific strategy for responses routing. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum StrategyKey { + /// Warp sync initiated this request. + Warp, + /// State sync initiated this request. + State, + /// `ChainSync` initiated this request. + ChainSync, +} + #[derive(Debug)] pub enum SyncingAction { /// Send block request to peer. Always implies dropping a stale block request to the same peer. - SendBlockRequest { peer_id: PeerId, request: BlockRequest }, - /// Drop stale block request. - CancelBlockRequest { peer_id: PeerId }, + SendBlockRequest { peer_id: PeerId, key: StrategyKey, request: BlockRequest }, /// Send state request to peer. - SendStateRequest { peer_id: PeerId, request: OpaqueStateRequest }, + SendStateRequest { peer_id: PeerId, key: StrategyKey, request: OpaqueStateRequest }, /// Send warp proof request to peer. - SendWarpProofRequest { peer_id: PeerId, request: WarpProofRequest }, + SendWarpProofRequest { peer_id: PeerId, key: StrategyKey, request: WarpProofRequest }, + /// Drop stale request. + CancelRequest { peer_id: PeerId, key: StrategyKey }, /// Peer misbehaved. Disconnect, report it and cancel any requests to it. DropPeer(BadPeer), /// Import blocks. @@ -92,15 +103,75 @@ pub enum SyncingAction { number: NumberFor, justifications: Justifications, }, - /// Syncing strategy has finished. + /// Strategy finished. Nothing to do, this is handled by `SyncingStrategy`. Finished, } +impl SyncingAction { + fn is_finished(&self) -> bool { + matches!(self, SyncingAction::Finished) + } +} + +impl From> for SyncingAction { + fn from(action: WarpSyncAction) -> Self { + match action { + WarpSyncAction::SendWarpProofRequest { peer_id, request } => + SyncingAction::SendWarpProofRequest { peer_id, key: StrategyKey::Warp, request }, + WarpSyncAction::SendBlockRequest { peer_id, request } => + SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::Warp, request }, + WarpSyncAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), + WarpSyncAction::Finished => SyncingAction::Finished, + } + } +} + +impl From> for SyncingAction { + fn from(action: StateStrategyAction) -> Self { + match action { + StateStrategyAction::SendStateRequest { peer_id, request } => + SyncingAction::SendStateRequest { peer_id, key: StrategyKey::State, request }, + StateStrategyAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), + StateStrategyAction::ImportBlocks { origin, blocks } => + SyncingAction::ImportBlocks { origin, blocks }, + StateStrategyAction::Finished => SyncingAction::Finished, + } + } +} + +impl From> for SyncingAction { + fn from(action: ChainSyncAction) -> Self { + match action { + ChainSyncAction::SendBlockRequest { peer_id, request } => + SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::ChainSync, request }, + ChainSyncAction::SendStateRequest { peer_id, request } => + SyncingAction::SendStateRequest { peer_id, key: StrategyKey::ChainSync, request }, + ChainSyncAction::CancelRequest { peer_id } => + SyncingAction::CancelRequest { peer_id, key: StrategyKey::ChainSync }, + ChainSyncAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), + ChainSyncAction::ImportBlocks { origin, blocks } => + SyncingAction::ImportBlocks { origin, blocks }, + ChainSyncAction::ImportJustifications { peer_id, hash, number, justifications } => + SyncingAction::ImportJustifications { peer_id, hash, number, justifications }, + } + } +} + /// Proxy to specific syncing strategies. -pub enum SyncingStrategy { - WarpSyncStrategy(WarpSync), - StateSyncStrategy(StateStrategy), - ChainSyncStrategy(ChainSync), +pub struct SyncingStrategy { + /// Syncing configuration. + config: SyncingConfig, + /// Client used by syncing strategies. + client: Arc, + /// Warp strategy. + warp: Option>, + /// State strategy. + state: Option>, + /// `ChainSync` strategy.` + chain_sync: Option>, + /// Connected peers and their best blocks used to seed a new strategy when switching to it in + /// [`SyncingStrategy::proceed_to_next`]. + peer_best_blocks: HashMap)>, } impl SyncingStrategy @@ -123,37 +194,51 @@ where if let SyncMode::Warp = config.mode { let warp_sync_config = warp_sync_config .expect("Warp sync configuration must be supplied in warp sync mode."); - Ok(Self::WarpSyncStrategy(WarpSync::new(client.clone(), warp_sync_config))) + let warp_sync = WarpSync::new(client.clone(), warp_sync_config); + Ok(Self { + config, + client, + warp: Some(warp_sync), + state: None, + chain_sync: None, + peer_best_blocks: Default::default(), + }) } else { - Ok(Self::ChainSyncStrategy(ChainSync::new( + let chain_sync = ChainSync::new( chain_sync_mode(config.mode), client.clone(), config.max_parallel_downloads, config.max_blocks_per_request, - config.metrics_registry, - )?)) + config.metrics_registry.clone(), + std::iter::empty(), + )?; + Ok(Self { + config, + client, + warp: None, + state: None, + chain_sync: Some(chain_sync), + peer_best_blocks: Default::default(), + }) } } /// Notify that a new peer has connected. pub fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { - match self { - SyncingStrategy::WarpSyncStrategy(strategy) => - strategy.add_peer(peer_id, best_hash, best_number), - SyncingStrategy::StateSyncStrategy(strategy) => - strategy.add_peer(peer_id, best_hash, best_number), - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.add_peer(peer_id, best_hash, best_number), - } + self.peer_best_blocks.insert(peer_id, (best_hash, best_number)); + + self.warp.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number)); + self.state.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number)); + self.chain_sync.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number)); } /// Notify that a peer has disconnected. pub fn remove_peer(&mut self, peer_id: &PeerId) { - match self { - SyncingStrategy::WarpSyncStrategy(strategy) => strategy.remove_peer(peer_id), - SyncingStrategy::StateSyncStrategy(strategy) => strategy.remove_peer(peer_id), - SyncingStrategy::ChainSyncStrategy(strategy) => strategy.remove_peer(peer_id), - } + self.warp.as_mut().map(|s| s.remove_peer(peer_id)); + self.state.as_mut().map(|s| s.remove_peer(peer_id)); + self.chain_sync.as_mut().map(|s| s.remove_peer(peer_id)); + + self.peer_best_blocks.remove(peer_id); } /// Submit a validated block announcement. @@ -165,14 +250,31 @@ where peer_id: PeerId, announce: &BlockAnnounce, ) -> Option<(B::Hash, NumberFor)> { - match self { - SyncingStrategy::WarpSyncStrategy(strategy) => - strategy.on_validated_block_announce(is_best, peer_id, announce), - SyncingStrategy::StateSyncStrategy(strategy) => - strategy.on_validated_block_announce(is_best, peer_id, announce), - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.on_validated_block_announce(is_best, peer_id, announce), + let new_best = if let Some(ref mut warp) = self.warp { + warp.on_validated_block_announce(is_best, peer_id, announce) + } else if let Some(ref mut state) = self.state { + state.on_validated_block_announce(is_best, peer_id, announce) + } else if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.on_validated_block_announce(is_best, peer_id, announce) + } else { + error!(target: LOG_TARGET, "No syncing strategy is active."); + debug_assert!(false); + Some((announce.header.hash(), *announce.header.number())) + }; + + if let Some(new_best) = new_best { + if let Some(best) = self.peer_best_blocks.get_mut(&peer_id) { + *best = new_best; + } else { + debug!( + target: LOG_TARGET, + "Cannot update `peer_best_blocks` as peer {peer_id} is not known to `Strategy` \ + (already disconnected?)", + ); + } } + + new_best } /// Configure an explicit fork sync request in case external code has detected that there is a @@ -183,40 +285,33 @@ where hash: &B::Hash, number: NumberFor, ) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.set_sync_fork_request(peers, hash, number), + // Fork requests are only handled by `ChainSync`. + if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.set_sync_fork_request(peers.clone(), hash, number); } } /// Request extra justification. pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.request_justification(hash, number), + // Justifications can only be requested via `ChainSync`. + if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.request_justification(hash, number); } } /// Clear extra justification requests. pub fn clear_justification_requests(&mut self) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(strategy) => strategy.clear_justification_requests(), + // Justification requests can only be cleared by `ChainSync`. + if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.clear_justification_requests(); } } /// Report a justification import (successful or not). pub fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.on_justification_import(hash, number, success), + // Only `ChainSync` is interested in justification import. + if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.on_justification_import(hash, number, success); } } @@ -224,36 +319,65 @@ where pub fn on_block_response( &mut self, peer_id: PeerId, + key: StrategyKey, request: BlockRequest, blocks: Vec>, ) { - match self { - SyncingStrategy::WarpSyncStrategy(strategy) => - strategy.on_block_response(peer_id, request, blocks), - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.on_block_response(peer_id, request, blocks), + if let (StrategyKey::Warp, Some(ref mut warp)) = (key, &mut self.warp) { + warp.on_block_response(peer_id, request, blocks); + } else if let (StrategyKey::ChainSync, Some(ref mut chain_sync)) = + (key, &mut self.chain_sync) + { + chain_sync.on_block_response(peer_id, request, blocks); + } else { + error!( + target: LOG_TARGET, + "`on_block_response()` called with unexpected key {key:?} \ + or corresponding strategy is not active.", + ); + debug_assert!(false); } } /// Process state response. - pub fn on_state_response(&mut self, peer_id: PeerId, response: OpaqueStateResponse) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(strategy) => - strategy.on_state_response(peer_id, response), - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.on_state_response(peer_id, response), + pub fn on_state_response( + &mut self, + peer_id: PeerId, + key: StrategyKey, + response: OpaqueStateResponse, + ) { + if let (StrategyKey::State, Some(ref mut state)) = (key, &mut self.state) { + state.on_state_response(peer_id, response); + } else if let (StrategyKey::ChainSync, Some(ref mut chain_sync)) = + (key, &mut self.chain_sync) + { + chain_sync.on_state_response(peer_id, response); + } else { + error!( + target: LOG_TARGET, + "`on_state_response()` called with unexpected key {key:?} \ + or corresponding strategy is not active.", + ); + debug_assert!(false); } } /// Process warp proof response. - pub fn on_warp_proof_response(&mut self, peer_id: &PeerId, response: EncodedProof) { - match self { - SyncingStrategy::WarpSyncStrategy(strategy) => - strategy.on_warp_proof_response(peer_id, response), - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(_) => {}, + pub fn on_warp_proof_response( + &mut self, + peer_id: &PeerId, + key: StrategyKey, + response: EncodedProof, + ) { + if let (StrategyKey::Warp, Some(ref mut warp)) = (key, &mut self.warp) { + warp.on_warp_proof_response(peer_id, response); + } else { + error!( + target: LOG_TARGET, + "`on_warp_proof_response()` called with unexpected key {key:?} \ + or warp strategy is not active", + ); + debug_assert!(false); } } @@ -264,226 +388,202 @@ where count: usize, results: Vec<(Result>, BlockImportError>, B::Hash)>, ) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(strategy) => - strategy.on_blocks_processed(imported, count, results), - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.on_blocks_processed(imported, count, results), + // Only `StateStrategy` and `ChainSync` are interested in block processing notifications. + if let Some(ref mut state) = self.state { + state.on_blocks_processed(imported, count, results); + } else if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.on_blocks_processed(imported, count, results); } } /// Notify a syncing strategy that a block has been finalized. pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.on_block_finalized(hash, number), + // Only `ChainSync` is interested in block finalization notifications. + if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.on_block_finalized(hash, number); } } /// Inform sync about a new best imported block. pub fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.update_chain_info(best_hash, best_number), + // This is relevant to `ChainSync` only. + if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.update_chain_info(best_hash, best_number); } } // Are we in major sync mode? pub fn is_major_syncing(&self) -> bool { - match self { - SyncingStrategy::WarpSyncStrategy(_) => true, - SyncingStrategy::StateSyncStrategy(_) => true, - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.status().state.is_major_syncing(), - } + self.warp.is_some() || + self.state.is_some() || + match self.chain_sync { + Some(ref s) => s.status().state.is_major_syncing(), + None => unreachable!("At least one syncing startegy is active; qed"), + } } /// Get the number of peers known to the syncing strategy. pub fn num_peers(&self) -> usize { - match self { - SyncingStrategy::WarpSyncStrategy(strategy) => strategy.num_peers(), - SyncingStrategy::StateSyncStrategy(strategy) => strategy.num_peers(), - SyncingStrategy::ChainSyncStrategy(strategy) => strategy.num_peers(), - } + self.peer_best_blocks.len() } /// Returns the current sync status. pub fn status(&self) -> SyncStatus { - match self { - SyncingStrategy::WarpSyncStrategy(strategy) => strategy.status(), - SyncingStrategy::StateSyncStrategy(strategy) => strategy.status(), - SyncingStrategy::ChainSyncStrategy(strategy) => strategy.status(), + // This function presumes that startegies are executed serially and must be refactored + // once we have parallel strategies. + if let Some(ref warp) = self.warp { + warp.status() + } else if let Some(ref state) = self.state { + state.status() + } else if let Some(ref chain_sync) = self.chain_sync { + chain_sync.status() + } else { + unreachable!("At least one syncing startegy is always active; qed") } } /// Get the total number of downloaded blocks. pub fn num_downloaded_blocks(&self) -> usize { - match self { - SyncingStrategy::WarpSyncStrategy(_) => 0, - SyncingStrategy::StateSyncStrategy(_) => 0, - SyncingStrategy::ChainSyncStrategy(strategy) => strategy.num_downloaded_blocks(), - } + self.chain_sync + .as_ref() + .map_or(0, |chain_sync| chain_sync.num_downloaded_blocks()) } /// Get an estimate of the number of parallel sync requests. pub fn num_sync_requests(&self) -> usize { - match self { - SyncingStrategy::WarpSyncStrategy(_) => 0, - SyncingStrategy::StateSyncStrategy(_) => 0, - SyncingStrategy::ChainSyncStrategy(strategy) => strategy.num_sync_requests(), - } + self.chain_sync.as_ref().map_or(0, |chain_sync| chain_sync.num_sync_requests()) } /// Report Prometheus metrics pub fn report_metrics(&self) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(strategy) => strategy.report_metrics(), + if let Some(ref chain_sync) = self.chain_sync { + chain_sync.report_metrics(); + } + } + + /// Let `WarpSync` know about target block header + pub fn set_warp_sync_target_block_header( + &mut self, + target_header: B::Header, + ) -> Result<(), ()> { + match self.warp { + Some(ref mut warp) => { + warp.set_target_block(target_header); + Ok(()) + }, + None => { + error!( + target: LOG_TARGET, + "Cannot set warp sync target block: no warp sync strategy is active." + ); + debug_assert!(false); + Err(()) + }, } } /// Get actions that should be performed by the owner on the strategy's behalf #[must_use] - pub fn actions(&mut self) -> Box>> { - match self { - SyncingStrategy::WarpSyncStrategy(strategy) => - Box::new(strategy.actions().map(|action| match action { - WarpSyncAction::SendWarpProofRequest { peer_id, request } => - SyncingAction::SendWarpProofRequest { peer_id, request }, - WarpSyncAction::SendBlockRequest { peer_id, request } => - SyncingAction::SendBlockRequest { peer_id, request }, - WarpSyncAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), - WarpSyncAction::Finished => SyncingAction::Finished, - })), - SyncingStrategy::StateSyncStrategy(strategy) => - Box::new(strategy.actions().map(|action| match action { - StateStrategyAction::SendStateRequest { peer_id, request } => - SyncingAction::SendStateRequest { peer_id, request }, - StateStrategyAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), - StateStrategyAction::ImportBlocks { origin, blocks } => - SyncingAction::ImportBlocks { origin, blocks }, - StateStrategyAction::Finished => SyncingAction::Finished, - })), - SyncingStrategy::ChainSyncStrategy(strategy) => - Box::new(strategy.actions().map(|action| match action { - ChainSyncAction::SendBlockRequest { peer_id, request } => - SyncingAction::SendBlockRequest { peer_id, request }, - ChainSyncAction::CancelBlockRequest { peer_id } => - SyncingAction::CancelBlockRequest { peer_id }, - ChainSyncAction::SendStateRequest { peer_id, request } => - SyncingAction::SendStateRequest { peer_id, request }, - ChainSyncAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), - ChainSyncAction::ImportBlocks { origin, blocks } => - SyncingAction::ImportBlocks { origin, blocks }, - ChainSyncAction::ImportJustifications { - peer_id, - hash, - number, - justifications, - } => SyncingAction::ImportJustifications { - peer_id, - hash, - number, - justifications, - }, - })), + pub fn actions(&mut self) -> Result>, ClientError> { + // This function presumes that strategies are executed serially and must be refactored once + // we have parallel strategies. + let actions: Vec<_> = if let Some(ref mut warp) = self.warp { + warp.actions().map(Into::into).collect() + } else if let Some(ref mut state) = self.state { + state.actions().map(Into::into).collect() + } else if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.actions().map(Into::into).collect() + } else { + unreachable!("At least one syncing strategy is always active; qed") + }; + + if actions.iter().any(SyncingAction::is_finished) { + self.proceed_to_next()?; } + + Ok(actions) } - /// Switch to next strategy if the active one finished. - pub fn switch_to_next( - &mut self, - config: SyncingConfig, - client: Arc, - connected_peers: impl Iterator)>, - ) -> Result<(), ClientError> { - match self { - Self::WarpSyncStrategy(warp_sync) => { - match warp_sync.take_result() { - Some(res) => { - info!( - target: LOG_TARGET, - "Warp sync is complete, continuing with state sync." - ); - let state_sync = StateStrategy::new( - client, - res.target_header, - res.target_body, - res.target_justifications, - // skip proofs, only set to `true` in `FastUnsafe` sync mode - false, - connected_peers - .map(|(peer_id, _best_hash, best_number)| (peer_id, best_number)), - ); - - *self = Self::StateSyncStrategy(state_sync); - }, - None => { - error!( - target: LOG_TARGET, - "Warp sync failed. Falling back to full sync." - ); - let mut chain_sync = match ChainSync::new( - chain_sync_mode(config.mode), - client, - config.max_parallel_downloads, - config.max_blocks_per_request, - config.metrics_registry, - ) { - Ok(chain_sync) => chain_sync, - Err(e) => { - error!(target: LOG_TARGET, "Failed to start `ChainSync`."); - return Err(e) - }, - }; - // Let `ChainSync` know about connected peers. - connected_peers.into_iter().for_each( - |(peer_id, best_hash, best_number)| { - chain_sync.add_peer(peer_id, best_hash, best_number) - }, - ); - - *self = Self::ChainSyncStrategy(chain_sync); - }, - } - }, - Self::StateSyncStrategy(state_sync) => { - if state_sync.is_succeded() { - info!(target: LOG_TARGET, "State sync is complete, continuing with block sync."); - } else { - error!(target: LOG_TARGET, "State sync failed. Falling back to full sync."); - } - let mut chain_sync = match ChainSync::new( - chain_sync_mode(config.mode), - client, - config.max_parallel_downloads, - config.max_blocks_per_request, - config.metrics_registry, - ) { - Ok(chain_sync) => chain_sync, - Err(e) => { - error!(target: LOG_TARGET, "Failed to start `ChainSync`."); - return Err(e) - }, - }; - // Let `ChainSync` know about connected peers. - connected_peers.into_iter().for_each(|(peer_id, best_hash, best_number)| { - chain_sync.add_peer(peer_id, best_hash, best_number) - }); - - *self = Self::ChainSyncStrategy(chain_sync); - }, - Self::ChainSyncStrategy(_) => { - error!(target: LOG_TARGET, "`ChainSyncStrategy` is final startegy, cannot switch to next."); - debug_assert!(false); - }, + /// Proceed with the next strategy if the active one finished. + pub fn proceed_to_next(&mut self) -> Result<(), ClientError> { + // The strategies are switched as `WarpSync` -> `StateStartegy` -> `ChainSync`. + if let Some(ref mut warp) = self.warp { + match warp.take_result() { + Some(res) => { + info!( + target: LOG_TARGET, + "Warp sync is complete, continuing with state sync." + ); + let state_sync = StateStrategy::new( + self.client.clone(), + res.target_header, + res.target_body, + res.target_justifications, + false, + self.peer_best_blocks + .iter() + .map(|(peer_id, (_, best_number))| (*peer_id, *best_number)), + ); + + self.warp = None; + self.state = Some(state_sync); + Ok(()) + }, + None => { + error!( + target: LOG_TARGET, + "Warp sync failed. Continuing with full sync." + ); + let chain_sync = match ChainSync::new( + chain_sync_mode(self.config.mode), + self.client.clone(), + self.config.max_parallel_downloads, + self.config.max_blocks_per_request, + self.config.metrics_registry.clone(), + self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| { + (*peer_id, *best_hash, *best_number) + }), + ) { + Ok(chain_sync) => chain_sync, + Err(e) => { + error!(target: LOG_TARGET, "Failed to start `ChainSync`."); + return Err(e) + }, + }; + + self.warp = None; + self.chain_sync = Some(chain_sync); + Ok(()) + }, + } + } else if let Some(state) = &self.state { + if state.is_succeded() { + info!(target: LOG_TARGET, "State sync is complete, continuing with block sync."); + } else { + error!(target: LOG_TARGET, "State sync failed. Falling back to full sync."); + } + let chain_sync = match ChainSync::new( + chain_sync_mode(self.config.mode), + self.client.clone(), + self.config.max_parallel_downloads, + self.config.max_blocks_per_request, + self.config.metrics_registry.clone(), + self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| { + (*peer_id, *best_hash, *best_number) + }), + ) { + Ok(chain_sync) => chain_sync, + Err(e) => { + error!(target: LOG_TARGET, "Failed to start `ChainSync`."); + return Err(e); + }, + }; + + self.state = None; + self.chain_sync = Some(chain_sync); + Ok(()) + } else { + unreachable!("Only warp & state strategies can finish; qed") } - Ok(()) } } diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index 62c260d582b5a..ad0c75363e78a 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -30,7 +30,7 @@ use crate::{ blocks::BlockCollection, - extra_requests::ExtraRequests, + justification_requests::ExtraRequests, schema::v1::StateResponse, strategy::{ state_sync::{ImportResult, StateSync, StateSyncProvider}, @@ -212,10 +212,10 @@ struct GapSync { pub enum ChainSyncAction { /// Send block request to peer. Always implies dropping a stale block request to the same peer. SendBlockRequest { peer_id: PeerId, request: BlockRequest }, - /// Drop stale block request. - CancelBlockRequest { peer_id: PeerId }, /// Send state request to peer. SendStateRequest { peer_id: PeerId, request: OpaqueStateRequest }, + /// Drop stale request. + CancelRequest { peer_id: PeerId }, /// Peer misbehaved. Disconnect, report it and cancel the block request to it. DropPeer(BadPeer), /// Import blocks. @@ -373,6 +373,7 @@ where max_parallel_downloads: u32, max_blocks_per_request: u32, metrics_registry: Option, + initial_peers: impl Iterator)>, ) -> Result { let mut sync = Self { client, @@ -405,6 +406,10 @@ where }; sync.reset_sync_start_point()?; + initial_peers.for_each(|(peer_id, best_hash, best_number)| { + sync.add_peer(peer_id, best_hash, best_number); + }); + Ok(sync) } @@ -1312,34 +1317,35 @@ where ); let old_peers = std::mem::take(&mut self.peers); - old_peers.into_iter().for_each(|(peer_id, mut p)| { - // peers that were downloading justifications - // should be kept in that state. - if let PeerSyncState::DownloadingJustification(_) = p.state { - // We make sure our commmon number is at least something we have. - trace!( - target: LOG_TARGET, - "Keeping peer {} after restart, updating common number from={} => to={} (our best).", - peer_id, - p.common_number, - self.best_queued_number, - ); - p.common_number = self.best_queued_number; - self.peers.insert(peer_id, p); - return + old_peers.into_iter().for_each(|(peer_id, mut peer_sync)| { + match peer_sync.state { + PeerSyncState::Available => { + self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number); + }, + PeerSyncState::AncestorSearch { .. } | + PeerSyncState::DownloadingNew(_) | + PeerSyncState::DownloadingStale(_) | + PeerSyncState::DownloadingGap(_) | + PeerSyncState::DownloadingState => { + // Cancel a request first, as `add_peer` may generate a new request. + self.actions.push(ChainSyncAction::CancelRequest { peer_id }); + self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number); + }, + PeerSyncState::DownloadingJustification(_) => { + // Peers that were downloading justifications + // should be kept in that state. + // We make sure our commmon number is at least something we have. + trace!( + target: LOG_TARGET, + "Keeping peer {} after restart, updating common number from={} => to={} (our best).", + peer_id, + peer_sync.common_number, + self.best_queued_number, + ); + peer_sync.common_number = self.best_queued_number; + self.peers.insert(peer_id, peer_sync); + }, } - - // handle peers that were in other states. - let action = match self.add_peer_inner(peer_id, p.best_hash, p.best_number) { - // since the request is not a justification, remove it from pending responses - Ok(None) => ChainSyncAction::CancelBlockRequest { peer_id }, - // update the request if the new one is available - Ok(Some(request)) => ChainSyncAction::SendBlockRequest { peer_id, request }, - // this implies that we need to drop pending response from the peer - Err(bad_peer) => ChainSyncAction::DropPeer(bad_peer), - }; - - self.actions.push(action); }); } diff --git a/substrate/client/network/sync/src/strategy/chain_sync/test.rs b/substrate/client/network/sync/src/strategy/chain_sync/test.rs index c89096bc6c904..127b6862f0e0d 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync/test.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync/test.rs @@ -38,7 +38,9 @@ fn processes_empty_response_on_justification_request_for_unknown_block() { let client = Arc::new(TestClientBuilder::new().build()); let peer_id = PeerId::random(); - let mut sync = ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None).unwrap(); + let mut sync = + ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None, std::iter::empty()) + .unwrap(); let (a1_hash, a1_number) = { let a1 = BlockBuilderBuilder::new(&*client) @@ -91,7 +93,11 @@ fn processes_empty_response_on_justification_request_for_unknown_block() { fn restart_doesnt_affect_peers_downloading_finality_data() { let mut client = Arc::new(TestClientBuilder::new().build()); - let mut sync = ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None).unwrap(); + // we request max 8 blocks to always initiate block requests to both peers for the test to be + // deterministic + let mut sync = + ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 8, None, std::iter::empty()) + .unwrap(); let peer_id1 = PeerId::random(); let peer_id2 = PeerId::random(); @@ -122,10 +128,13 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { // we wil send block requests to these peers // for these blocks we don't know about - assert!(sync - .block_requests() - .into_iter() - .all(|(p, _)| { p == peer_id1 || p == peer_id2 })); + let actions = sync.actions().collect::>(); + assert_eq!(actions.len(), 2); + assert!(actions.iter().all(|action| match action { + ChainSyncAction::SendBlockRequest { peer_id, .. } => + peer_id == &peer_id1 || peer_id == &peer_id2, + _ => false, + })); // add a new peer at a known block sync.add_peer(peer_id3, b1_hash, b1_number); @@ -146,22 +155,29 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { PeerSyncState::DownloadingJustification(b1_hash), ); - // clear old actions + // drop old actions let _ = sync.take_actions(); // we restart the sync state sync.restart(); - let actions = sync.take_actions().collect::>(); - // which should make us send out block requests to the first two peers - assert_eq!(actions.len(), 2); + // which should make us cancel and send out again block requests to the first two peers + let actions = sync.actions().collect::>(); + assert_eq!(actions.len(), 4); + let mut cancelled_first = HashSet::new(); assert!(actions.iter().all(|action| match action { - ChainSyncAction::SendBlockRequest { peer_id, .. } => - peer_id == &peer_id1 || peer_id == &peer_id2, + ChainSyncAction::CancelRequest { peer_id, .. } => { + cancelled_first.insert(peer_id); + peer_id == &peer_id1 || peer_id == &peer_id2 + }, + ChainSyncAction::SendBlockRequest { peer_id, .. } => { + assert!(cancelled_first.remove(peer_id)); + peer_id == &peer_id1 || peer_id == &peer_id2 + }, _ => false, })); - // peer 3 should be unaffected it was downloading finality data + // peer 3 should be unaffected as it was downloading finality data assert_eq!( sync.peers.get(&peer_id3).unwrap().state, PeerSyncState::DownloadingJustification(b1_hash), @@ -275,7 +291,9 @@ fn do_ancestor_search_when_common_block_to_best_qeued_gap_is_to_big() { let mut client = Arc::new(TestClientBuilder::new().build()); let info = client.info(); - let mut sync = ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None).unwrap(); + let mut sync = + ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None, std::iter::empty()) + .unwrap(); let peer_id1 = PeerId::random(); let peer_id2 = PeerId::random(); @@ -421,7 +439,9 @@ fn can_sync_huge_fork() { let info = client.info(); - let mut sync = ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None).unwrap(); + let mut sync = + ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None, std::iter::empty()) + .unwrap(); let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone(); let just = (*b"TEST", Vec::new()); @@ -554,7 +574,9 @@ fn syncs_fork_without_duplicate_requests() { let info = client.info(); - let mut sync = ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None).unwrap(); + let mut sync = + ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None, std::iter::empty()) + .unwrap(); let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone(); let just = (*b"TEST", Vec::new()); @@ -689,7 +711,9 @@ fn removes_target_fork_on_disconnect() { let mut client = Arc::new(TestClientBuilder::new().build()); let blocks = (0..3).map(|_| build_block(&mut client, None, false)).collect::>(); - let mut sync = ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None).unwrap(); + let mut sync = + ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None, std::iter::empty()) + .unwrap(); let peer_id1 = PeerId::random(); let common_block = blocks[1].clone(); @@ -714,7 +738,9 @@ fn can_import_response_with_missing_blocks() { let empty_client = Arc::new(TestClientBuilder::new().build()); - let mut sync = ChainSync::new(ChainSyncMode::Full, empty_client.clone(), 1, 64, None).unwrap(); + let mut sync = + ChainSync::new(ChainSyncMode::Full, empty_client.clone(), 1, 64, None, std::iter::empty()) + .unwrap(); let peer_id1 = PeerId::random(); let best_block = blocks[3].clone(); @@ -745,7 +771,9 @@ fn ancestor_search_repeat() { #[test] fn sync_restart_removes_block_but_not_justification_requests() { let mut client = Arc::new(TestClientBuilder::new().build()); - let mut sync = ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None).unwrap(); + let mut sync = + ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None, std::iter::empty()) + .unwrap(); let peers = vec![PeerId::random(), PeerId::random()]; @@ -813,7 +841,7 @@ fn sync_restart_removes_block_but_not_justification_requests() { let actions = sync.take_actions().collect::>(); for action in actions.iter() { match action { - ChainSyncAction::CancelBlockRequest { peer_id } => { + ChainSyncAction::CancelRequest { peer_id } => { pending_responses.remove(&peer_id); }, ChainSyncAction::SendBlockRequest { peer_id, .. } => { @@ -887,7 +915,9 @@ fn request_across_forks() { fork_blocks }; - let mut sync = ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None).unwrap(); + let mut sync = + ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None, std::iter::empty()) + .unwrap(); // Add the peers, all at the common ancestor 100. let common_block = blocks.last().unwrap(); diff --git a/substrate/client/network/sync/src/strategy/state.rs b/substrate/client/network/sync/src/strategy/state.rs index ae3f7b6005594..12d36ff9e01a9 100644 --- a/substrate/client/network/sync/src/strategy/state.rs +++ b/substrate/client/network/sync/src/strategy/state.rs @@ -330,11 +330,6 @@ impl StateStrategy { } } - /// Get the number of peers known to syncing. - pub fn num_peers(&self) -> usize { - self.peers.len() - } - /// Get actions that should be performed by the owner on [`WarpSync`]'s behalf #[must_use] pub fn actions(&mut self) -> impl Iterator> {