Skip to content
Merged
23 changes: 18 additions & 5 deletions cumulus/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ use sc_consensus::{
use sc_network::{config::SyncMode, service::traits::NetworkService, NetworkBackend};
use sc_network_sync::SyncingService;
use sc_network_transactions::TransactionsHandlerController;
use sc_service::{Configuration, NetworkStarter, SpawnTaskHandle, TaskManager, WarpSyncConfig};
use sc_service::{
build_polkadot_syncing_strategy, Configuration, NetworkStarter, SpawnTaskHandle, TaskManager,
WarpSyncConfig,
};
use sc_telemetry::{log, TelemetryWorkerHandle};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_api::ProvideRuntimeApi;
Expand Down Expand Up @@ -425,7 +428,7 @@ pub struct BuildNetworkParams<
pub async fn build_network<'a, Block, Client, RCInterface, IQ, Network>(
BuildNetworkParams {
parachain_config,
net_config,
mut net_config,
client,
transaction_pool,
para_id,
Expand Down Expand Up @@ -462,7 +465,7 @@ where
IQ: ImportQueue<Block> + 'static,
Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
{
let warp_sync_params = match parachain_config.network.sync_mode {
let warp_sync_config = match parachain_config.network.sync_mode {
SyncMode::Warp => {
log::debug!(target: LOG_TARGET_SYNC, "waiting for announce block...");

Expand Down Expand Up @@ -493,9 +496,19 @@ where
},
};
let metrics = Network::register_notification_metrics(
parachain_config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
);

let syncing_strategy = build_polkadot_syncing_strategy(
parachain_config.protocol_id(),
parachain_config.chain_spec.fork_id(),
&mut net_config,
warp_sync_config,
client.clone(),
&spawn_handle,
parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
)?;

sc_service::build_network(sc_service::BuildNetworkParams {
config: parachain_config,
net_config,
Expand All @@ -504,7 +517,7 @@ where
spawn_handle,
import_queue,
block_announce_validator_builder: Some(Box::new(move |_| block_announce_validator)),
warp_sync_config: warp_sync_params,
syncing_strategy,
block_relay: None,
metrics,
})
Expand Down
14 changes: 12 additions & 2 deletions polkadot/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
use prometheus_endpoint::Registry;
#[cfg(feature = "full-node")]
use sc_service::KeystoreContainer;
use sc_service::{RpcHandlers, SpawnTaskHandle};
use sc_service::{build_polkadot_syncing_strategy, RpcHandlers, SpawnTaskHandle};
use sc_telemetry::TelemetryWorker;
#[cfg(feature = "full-node")]
use sc_telemetry::{Telemetry, TelemetryWorkerHandle};
Expand Down Expand Up @@ -1028,6 +1028,16 @@ pub fn new_full<
})
};

let syncing_strategy = build_polkadot_syncing_strategy(
config.protocol_id(),
config.chain_spec.fork_id(),
&mut net_config,
Some(WarpSyncConfig::WithProvider(warp_sync)),
client.clone(),
&task_manager.spawn_handle(),
config.prometheus_config.as_ref().map(|config| &config.registry),
)?;

let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
Expand All @@ -1037,7 +1047,7 @@ pub fn new_full<
spawn_handle: task_manager.spawn_handle(),
import_queue,
block_announce_validator_builder: None,
warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
syncing_strategy,
block_relay: None,
metrics,
})?;
Expand Down
19 changes: 19 additions & 0 deletions prdoc/pr_5666.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
title: Make syncing strategy an argument of the syncing engine

doc:
- audience: Node Dev
description: |
Syncing strategy is no longer implicitly created when building network, but needs to be instantiated explicitly.
Previously default implementation can be created with new function `build_polkadot_syncing_strategy` or custom
syncing strategy could be implemented and used instead if desired, providing greater flexibility for chain
developers.

crates:
- name: cumulus-client-service
bump: patch
- name: polkadot-service
bump: patch
- name: sc-service
bump: major
- name: sc-network-sync
bump: major
13 changes: 12 additions & 1 deletion substrate/bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use frame_system_rpc_runtime_api::AccountNonceApi;
use futures::prelude::*;
use kitchensink_runtime::RuntimeApi;
use node_primitives::Block;
use polkadot_sdk::sc_service::build_polkadot_syncing_strategy;
use sc_client_api::{Backend, BlockBackend};
use sc_consensus_babe::{self, SlotProportion};
use sc_network::{
Expand Down Expand Up @@ -506,6 +507,16 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
Vec::default(),
));

let syncing_strategy = build_polkadot_syncing_strategy(
config.protocol_id(),
config.chain_spec.fork_id(),
&mut net_config,
Some(WarpSyncConfig::WithProvider(warp_sync)),
client.clone(),
&task_manager.spawn_handle(),
config.prometheus_config.as_ref().map(|config| &config.registry),
)?;

let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
Expand All @@ -515,7 +526,7 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
spawn_handle: task_manager.spawn_handle(),
import_queue,
block_announce_validator_builder: None,
warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
syncing_strategy,
block_relay: None,
metrics,
})?;
Expand Down
77 changes: 19 additions & 58 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@ use crate::{
BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
},
block_relay_protocol::{BlockDownloader, BlockResponseError},
block_request_handler::MAX_BLOCKS_IN_RESPONSE,
pending_responses::{PendingResponses, ResponseEvent},
schema::v1::{StateRequest, StateResponse},
service::{
self,
syncing_service::{SyncingService, ToServiceCommand},
},
strategy::{
warp::{EncodedProof, WarpProofRequest, WarpSyncConfig},
PolkadotSyncingStrategy, StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy,
warp::{EncodedProof, WarpProofRequest},
StrategyKey, SyncingAction, SyncingStrategy,
},
types::{
BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent,
Expand Down Expand Up @@ -189,7 +188,7 @@ pub struct Peer<B: BlockT> {

pub struct SyncingEngine<B: BlockT, Client> {
/// Syncing strategy.
strategy: PolkadotSyncingStrategy<B, Client>,
strategy: Box<dyn SyncingStrategy<B>>,

/// Blockchain client.
client: Arc<Client>,
Expand Down Expand Up @@ -271,12 +270,6 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Block downloader
block_downloader: Arc<dyn BlockDownloader<B>>,

/// Protocol name used to send out state requests
state_request_protocol_name: ProtocolName,

/// Protocol name used to send out warp sync requests
warp_sync_protocol_name: Option<ProtocolName>,

/// Handle to import queue.
import_queue: Box<dyn ImportQueueService<B>>,
}
Expand All @@ -301,35 +294,15 @@ where
protocol_id: ProtocolId,
fork_id: &Option<String>,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
warp_sync_config: Option<WarpSyncConfig<B>>,
syncing_strategy: Box<dyn SyncingStrategy<B>>,
network_service: service::network::NetworkServiceHandle,
import_queue: Box<dyn ImportQueueService<B>>,
block_downloader: Arc<dyn BlockDownloader<B>>,
state_request_protocol_name: ProtocolName,
warp_sync_protocol_name: Option<ProtocolName>,
peer_store_handle: Arc<dyn PeerStoreProvider>,
) -> Result<(Self, SyncingService<B>, N::NotificationProtocolConfig), ClientError>
where
N: NetworkBackend<B, <B as BlockT>::Hash>,
{
let mode = net_config.network_config.sync_mode;
let max_parallel_downloads = net_config.network_config.max_parallel_downloads;
let max_blocks_per_request =
if net_config.network_config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
log::info!(
target: LOG_TARGET,
"clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}",
);
MAX_BLOCKS_IN_RESPONSE as u32
} else {
net_config.network_config.max_blocks_per_request
};
let syncing_config = SyncingConfig {
mode,
max_parallel_downloads,
max_blocks_per_request,
metrics_registry: metrics_registry.cloned(),
};
let cache_capacity = (net_config.network_config.default_peers_set.in_peers +
net_config.network_config.default_peers_set.out_peers)
.max(1);
Expand Down Expand Up @@ -388,10 +361,6 @@ where
Arc::clone(&peer_store_handle),
);

// Initialize syncing strategy.
let strategy =
PolkadotSyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?;

let block_announce_protocol_name = block_announce_config.protocol_name().clone();
let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
let num_connected = Arc::new(AtomicUsize::new(0));
Expand All @@ -413,7 +382,7 @@ where
Self {
roles,
client,
strategy,
strategy: syncing_strategy,
network_service,
peers: HashMap::new(),
block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
Expand Down Expand Up @@ -450,8 +419,6 @@ where
},
pending_responses: PendingResponses::new(),
block_downloader,
state_request_protocol_name,
warp_sync_protocol_name,
import_queue,
},
SyncingService::new(tx, num_connected, is_major_syncing),
Expand Down Expand Up @@ -652,16 +619,16 @@ where
"Processed {action:?}, response removed: {removed}.",
);
},
SyncingAction::SendStateRequest { peer_id, key, request } => {
self.send_state_request(peer_id, key, request);
SyncingAction::SendStateRequest { peer_id, key, protocol_name, request } => {
self.send_state_request(peer_id, key, protocol_name, request);

trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendStateRequest` to {peer_id}.",
);
},
SyncingAction::SendWarpProofRequest { peer_id, key, request } => {
self.send_warp_proof_request(peer_id, key, request.clone());
SyncingAction::SendWarpProofRequest { peer_id, key, protocol_name, request } => {
self.send_warp_proof_request(peer_id, key, protocol_name, request.clone());

trace!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1054,6 +1021,7 @@ where
&mut self,
peer_id: PeerId,
key: StrategyKey,
protocol_name: ProtocolName,
request: OpaqueStateRequest,
) {
if !self.peers.contains_key(&peer_id) {
Expand All @@ -1070,7 +1038,7 @@ where
Ok(data) => {
self.network_service.start_request(
peer_id,
self.state_request_protocol_name.clone(),
protocol_name,
data,
tx,
IfDisconnected::ImmediateError,
Expand All @@ -1089,6 +1057,7 @@ where
&mut self,
peer_id: PeerId,
key: StrategyKey,
protocol_name: ProtocolName,
request: WarpProofRequest<B>,
) {
if !self.peers.contains_key(&peer_id) {
Expand All @@ -1101,21 +1070,13 @@ where

self.pending_responses.insert(peer_id, key, PeerRequest::WarpProof, rx.boxed());

match &self.warp_sync_protocol_name {
Some(name) => self.network_service.start_request(
peer_id,
name.clone(),
request.encode(),
tx,
IfDisconnected::ImmediateError,
),
None => {
log::warn!(
target: LOG_TARGET,
"Trying to send warp sync request when no protocol is configured {request:?}",
);
},
}
self.network_service.start_request(
peer_id,
protocol_name,
request.encode(),
tx,
IfDisconnected::ImmediateError,
);
}

fn encode_state_request(request: &OpaqueStateRequest) -> Result<Vec<u8>, String> {
Expand Down
Loading