Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 76 additions & 67 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
},
strategy::{
warp::{EncodedProof, WarpProofRequest, WarpSyncParams},
SyncingAction, SyncingConfig, SyncingStrategy,
StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy,
},
types::{
BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent,
Expand All @@ -48,7 +48,7 @@ use futures::{
FutureExt, StreamExt,
};
use libp2p::{request_response::OutboundFailure, PeerId};
use log::{debug, error, trace};
use log::{debug, error, trace, warn};
use prometheus_endpoint::{
register, Counter, Gauge, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
};
Expand Down Expand Up @@ -214,9 +214,6 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Syncing strategy.
strategy: SyncingStrategy<B, Client>,

/// Syncing configuration for startegies.
syncing_config: SyncingConfig,

/// Blockchain client.
client: Arc<Client>,

Expand Down Expand Up @@ -441,8 +438,7 @@ where
.map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse());

// Initialize syncing strategy.
let strategy =
SyncingStrategy::new(syncing_config.clone(), client.clone(), warp_sync_config)?;
let strategy = SyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?;

let block_announce_protocol_name = block_announce_config.protocol_name().clone();
let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
Expand Down Expand Up @@ -471,7 +467,6 @@ where
roles,
client,
strategy,
syncing_config,
network_service,
peers: HashMap::new(),
block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
Expand Down Expand Up @@ -661,8 +656,15 @@ where
Some(event) => self.process_notification_event(event),
None => return,
},
warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused =>
self.pass_warp_sync_target_block_header(warp_target_block_header),
warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused => {
if let Err(_) = self.pass_warp_sync_target_block_header(warp_target_block_header) {
error!(
target: LOG_TARGET,
"Failed to set warp sync target block header, terminating `SyncingEngine`.",
);
return
}
},
response_event = self.pending_responses.select_next_some() =>
self.process_response_event(response_event),
validation_result = self.block_announce_validator.select_next_some() =>
Expand All @@ -675,48 +677,61 @@ where

// Process actions requested by a syncing strategy.
if let Err(e) = self.process_strategy_actions() {
error!("Terminating `SyncingEngine` due to fatal error: {e:?}");
error!(
target: LOG_TARGET,
"Terminating `SyncingEngine` due to fatal error: {e:?}.",
);
return
}
}
}

fn process_strategy_actions(&mut self) -> Result<(), ClientError> {
for action in self.strategy.actions() {
for action in self.strategy.actions()? {
match action {
SyncingAction::SendBlockRequest { peer_id, request } => {
SyncingAction::SendBlockRequest { peer_id, key, request } => {
// Sending block request implies dropping obsolete pending response as we are
// not interested in it anymore (see [`SyncingAction::SendBlockRequest`]).
// Furthermore, only one request at a time is allowed to any peer.
let removed = self.pending_responses.remove(&peer_id);
self.send_block_request(peer_id, request.clone());

trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {} with {:?}, stale response removed: {}.",
peer_id,
request,
removed,
)
let removed = self.pending_responses.remove(peer_id, key);
self.send_block_request(peer_id, key, request.clone());

if removed {
warn!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {} from {:?} with {:?}. \
Stale response removed!",
peer_id,
key,
request,
)
} else {
trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {} from {:?} with {:?}.",
peer_id,
key,
request,
)
}
},
SyncingAction::CancelBlockRequest { peer_id } => {
let removed = self.pending_responses.remove(&peer_id);
SyncingAction::CancelRequest { peer_id, key } => {
let removed = self.pending_responses.remove(peer_id, key);

trace!(
target: LOG_TARGET,
"Processed {action:?}, response removed: {removed}.",
);
},
SyncingAction::SendStateRequest { peer_id, request } => {
self.send_state_request(peer_id, request);
SyncingAction::SendStateRequest { peer_id, key, request } => {
self.send_state_request(peer_id, key, request);

trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {peer_id}.",
"Processed `ChainSyncAction::SendStateRequest` to {peer_id}.",
);
},
SyncingAction::SendWarpProofRequest { peer_id, request } => {
self.send_warp_proof_request(peer_id, request.clone());
SyncingAction::SendWarpProofRequest { peer_id, key, request } => {
self.send_warp_proof_request(peer_id, key, request.clone());

trace!(
target: LOG_TARGET,
Expand All @@ -726,7 +741,7 @@ where
);
},
SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
self.pending_responses.remove(&peer_id);
self.pending_responses.remove_all(&peer_id);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(peer_id, rep);
Expand All @@ -753,20 +768,8 @@ where
number,
)
},
SyncingAction::Finished => {
let connected_peers = self.peers.iter().filter_map(|(peer_id, peer)| {
peer.info.roles.is_full().then_some((
*peer_id,
peer.info.best_hash,
peer.info.best_number,
))
});
self.strategy.switch_to_next(
self.syncing_config.clone(),
self.client.clone(),
connected_peers,
)?;
},
// Nothing to do, this is handled internally by `SyncingStrategy`.
SyncingAction::Finished => {},
}
}

Expand Down Expand Up @@ -948,23 +951,18 @@ where
}
}

fn pass_warp_sync_target_block_header(&mut self, header: Result<B::Header, oneshot::Canceled>) {
fn pass_warp_sync_target_block_header(
&mut self,
header: Result<B::Header, oneshot::Canceled>,
) -> Result<(), ()> {
match header {
Ok(header) =>
if let SyncingStrategy::WarpSyncStrategy(warp_sync) = &mut self.strategy {
warp_sync.set_target_block(header);
} else {
error!(
target: LOG_TARGET,
"Cannot set warp sync target block: no warp sync strategy is active."
);
debug_assert!(false);
},
Ok(header) => self.strategy.set_warp_sync_target_block_header(header),
Err(err) => {
error!(
target: LOG_TARGET,
"Failed to get target block for warp sync. Error: {err:?}",
);
Err(())
},
}
}
Expand Down Expand Up @@ -1002,7 +1000,7 @@ where
}

self.strategy.remove_peer(&peer_id);
self.pending_responses.remove(&peer_id);
self.pending_responses.remove_all(&peer_id);
self.event_streams
.retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok());
}
Expand Down Expand Up @@ -1167,7 +1165,7 @@ where
Ok(())
}

fn send_block_request(&mut self, peer_id: PeerId, request: BlockRequest<B>) {
fn send_block_request(&mut self, peer_id: PeerId, key: StrategyKey, request: BlockRequest<B>) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send block request to unknown peer {peer_id}");
debug_assert!(false);
Expand All @@ -1178,12 +1176,18 @@ where

self.pending_responses.insert(
peer_id,
key,
PeerRequest::Block(request.clone()),
async move { downloader.download_blocks(peer_id, request).await }.boxed(),
);
}

fn send_state_request(&mut self, peer_id: PeerId, request: OpaqueStateRequest) {
fn send_state_request(
&mut self,
peer_id: PeerId,
key: StrategyKey,
request: OpaqueStateRequest,
) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send state request to unknown peer {peer_id}");
debug_assert!(false);
Expand All @@ -1192,7 +1196,7 @@ where

let (tx, rx) = oneshot::channel();

self.pending_responses.insert(peer_id, PeerRequest::State, rx.boxed());
self.pending_responses.insert(peer_id, key, PeerRequest::State, rx.boxed());

match Self::encode_state_request(&request) {
Ok(data) => {
Expand All @@ -1213,7 +1217,12 @@ where
}
}

fn send_warp_proof_request(&mut self, peer_id: PeerId, request: WarpProofRequest<B>) {
fn send_warp_proof_request(
&mut self,
peer_id: PeerId,
key: StrategyKey,
request: WarpProofRequest<B>,
) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send warp proof request to unknown peer {peer_id}");
debug_assert!(false);
Expand All @@ -1222,7 +1231,7 @@ where

let (tx, rx) = oneshot::channel();

self.pending_responses.insert(peer_id, PeerRequest::WarpProof, rx.boxed());
self.pending_responses.insert(peer_id, key, PeerRequest::WarpProof, rx.boxed());

match &self.warp_sync_protocol_name {
Some(name) => self.network_service.start_request(
Expand Down Expand Up @@ -1259,14 +1268,14 @@ where
}

fn process_response_event(&mut self, response_event: ResponseEvent<B>) {
let ResponseEvent { peer_id, request, response } = response_event;
let ResponseEvent { peer_id, key, request, response } = response_event;

match response {
Ok(Ok((resp, _))) => match request {
PeerRequest::Block(req) => {
match self.block_downloader.block_response_into_blocks(&req, resp) {
Ok(blocks) => {
self.strategy.on_block_response(peer_id, req, blocks);
self.strategy.on_block_response(peer_id, key, req, blocks);
},
Err(BlockResponseError::DecodeFailed(e)) => {
debug!(
Expand Down Expand Up @@ -1311,10 +1320,10 @@ where
},
};

self.strategy.on_state_response(peer_id, response);
self.strategy.on_state_response(peer_id, key, response);
},
PeerRequest::WarpProof => {
self.strategy.on_warp_proof_response(&peer_id, EncodedProof(resp));
self.strategy.on_warp_proof_response(&peer_id, key, EncodedProof(resp));
},
},
Ok(Err(e)) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Justification requests scheduling. [`ExtraRequests`] manages requesting justifications
//! from peers taking into account forks and their finalization (dropping pending requests
//! that don't make sense after one of the forks is finalized).

use crate::{
request_metrics::Metrics,
strategy::chain_sync::{PeerSync, PeerSyncState},
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ pub use strategy::warp::{WarpSyncParams, WarpSyncPhase, WarpSyncProgress};
pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusProvider};

mod block_announce_validator;
mod extra_requests;
mod futures_stream;
mod justification_requests;
mod pending_responses;
mod request_metrics;
mod schema;
Expand Down
Loading