Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 1ace0a3

Browse files
committed
wait for relay chain to sync then get parachain header
1 parent dd25357 commit 1ace0a3

File tree

7 files changed

+135
-10
lines changed

7 files changed

+135
-10
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/consensus/common/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use polkadot_primitives::v2::{Hash as PHash, PersistedValidationData};
1818
use sc_consensus::BlockImport;
1919
use sp_runtime::traits::Block as BlockT;
2020

21-
mod parachain_consensus;
21+
pub mod parachain_consensus;
2222
#[cfg(test)]
2323
mod tests;
2424
pub use parachain_consensus::run_parachain_consensus;

client/network/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch =
2828

2929
# Cumulus
3030
cumulus-relay-chain-interface = { path = "../relay-chain-interface" }
31+
cumulus-client-consensus-common = { path = "../consensus/common" }
3132

3233
[dev-dependencies]
3334
portpicker = "0.1.1"

client/network/src/lib.rs

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ use polkadot_primitives::v2::{
3535
};
3636

3737
use codec::{Decode, DecodeAll, Encode};
38-
use futures::{channel::oneshot, future::FutureExt, Future};
38+
use futures::{channel::oneshot, future::FutureExt, Future, StreamExt};
3939

40+
use cumulus_client_consensus_common::parachain_consensus::RelaychainClient;
4041
use std::{convert::TryFrom, fmt, marker::PhantomData, pin::Pin, sync::Arc};
41-
4242
#[cfg(test)]
4343
mod tests;
4444

@@ -454,3 +454,72 @@ async fn wait_to_announce<Block: BlockT>(
454454
);
455455
}
456456
}
457+
458+
#[derive(Clone)]
459+
pub struct WaitForParachainTargetBlock<Block> {
460+
phantom: PhantomData<Block>,
461+
}
462+
463+
impl<Block: BlockT> WaitForParachainTargetBlock<Block> {
464+
/// Get warp sync target block
465+
pub async fn warp_sync_get(
466+
para_id: ParaId,
467+
relay_chain_interface: Arc<dyn RelayChainInterface>,
468+
) -> Result<oneshot::Receiver<Block::Header>, BoxedError>
469+
where
470+
Block: BlockT + 'static,
471+
{
472+
let (sender, receiver) = oneshot::channel::<Block::Header>();
473+
Self::wait_for_target_block(sender, para_id, relay_chain_interface).await;
474+
return Ok(receiver)
475+
}
476+
477+
async fn wait_for_target_block(
478+
sender: oneshot::Sender<Block::Header>,
479+
para_id: ParaId,
480+
relay_chain_interface: Arc<dyn RelayChainInterface>,
481+
) {
482+
let is_syncing = relay_chain_interface
483+
.is_major_syncing()
484+
.await
485+
.map_err(|e| {
486+
tracing::error!(target: LOG_TARGET, "Unable to determine sync status. {}", e)
487+
})
488+
.unwrap_or(false);
489+
490+
loop {
491+
if !is_syncing {
492+
let mut finalized_heads = match relay_chain_interface.finalized_heads(para_id).await
493+
{
494+
Ok(finalized_heads_stream) => finalized_heads_stream,
495+
Err(err) => {
496+
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
497+
return
498+
},
499+
};
500+
501+
let finalized_head = if let Some(h) = finalized_heads.next().await {
502+
h
503+
} else {
504+
tracing::debug!(target: "cumulus-consensus", "Stopping following finalized head.");
505+
return
506+
};
507+
508+
let target_header = match Block::Header::decode(&mut &finalized_head[..]) {
509+
Ok(header) => header,
510+
Err(err) => {
511+
tracing::debug!(
512+
target: "cumulus-network",
513+
error = ?err,
514+
"Could not decode parachain header while following finalized heads.",
515+
);
516+
continue
517+
},
518+
};
519+
520+
let _ = sender.send(target_header);
521+
break
522+
}
523+
}
524+
}
525+
}

parachain-template/node/src/service.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use sp_keystore::SyncCryptoStorePtr;
3131
use substrate_prometheus_endpoint::Registry;
3232

3333
use polkadot_service::CollatorPair;
34+
use sc_network_common::sync::warp::WarpSyncParams;
3435

3536
/// Native executor type.
3637
pub struct ParachainNativeExecutor;
@@ -197,6 +198,17 @@ async fn start_node_impl(
197198
let prometheus_registry = parachain_config.prometheus_registry().cloned();
198199
let transaction_pool = params.transaction_pool.clone();
199200
let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue);
201+
let warp_sync_params =
202+
match cumulus_client_network::WaitForParachainTargetBlock::<Block>::warp_sync_get(
203+
id,
204+
relay_chain_interface.clone(),
205+
)
206+
.await
207+
{
208+
Ok(target_block) => Some(WarpSyncParams::WaitForTarget(target_block)),
209+
_ => None,
210+
};
211+
200212
let (network, system_rpc_tx, tx_handler_controller, start_network) =
201213
sc_service::build_network(sc_service::BuildNetworkParams {
202214
config: &parachain_config,
@@ -207,7 +219,7 @@ async fn start_node_impl(
207219
block_announce_validator_builder: Some(Box::new(|_| {
208220
Box::new(block_announce_validator)
209221
})),
210-
warp_sync_params: None,
222+
warp_sync_params,
211223
})?;
212224

213225
if parachain_config.offchain_worker.enabled {

polkadot-parachain/src/service.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use sc_consensus::{
4747
};
4848
use sc_executor::WasmExecutor;
4949
use sc_network::NetworkService;
50-
use sc_network_common::service::NetworkBlock;
50+
use sc_network_common::{service::NetworkBlock, sync::warp::WarpSyncParams};
5151
use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager};
5252
use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle};
5353
use sp_api::{ApiExt, ConstructRuntimeApi};
@@ -375,6 +375,16 @@ where
375375
let prometheus_registry = parachain_config.prometheus_registry().cloned();
376376
let transaction_pool = params.transaction_pool.clone();
377377
let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue);
378+
let warp_sync_params =
379+
match cumulus_client_network::WaitForParachainTargetBlock::<Block>::warp_sync_get(
380+
para_id,
381+
relay_chain_interface.clone(),
382+
)
383+
.await
384+
{
385+
Ok(target_block) => Some(WarpSyncParams::WaitForTarget(target_block)),
386+
_ => None,
387+
};
378388
let (network, system_rpc_tx, tx_handler_controller, start_network) =
379389
sc_service::build_network(sc_service::BuildNetworkParams {
380390
config: &parachain_config,
@@ -385,7 +395,7 @@ where
385395
block_announce_validator_builder: Some(Box::new(|_| {
386396
Box::new(block_announce_validator)
387397
})),
388-
warp_sync_params: None,
398+
warp_sync_params,
389399
})?;
390400

391401
let rpc_client = client.clone();
@@ -558,6 +568,16 @@ where
558568
let prometheus_registry = parachain_config.prometheus_registry().cloned();
559569
let transaction_pool = params.transaction_pool.clone();
560570
let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue);
571+
let warp_sync_params =
572+
match cumulus_client_network::WaitForParachainTargetBlock::<Block>::warp_sync_get(
573+
para_id,
574+
relay_chain_interface.clone(),
575+
)
576+
.await
577+
{
578+
Ok(target_block) => Some(WarpSyncParams::WaitForTarget(target_block)),
579+
_ => None,
580+
};
561581
let (network, system_rpc_tx, tx_handler_controller, start_network) =
562582
sc_service::build_network(sc_service::BuildNetworkParams {
563583
config: &parachain_config,
@@ -568,7 +588,7 @@ where
568588
block_announce_validator_builder: Some(Box::new(|_| {
569589
Box::new(block_announce_validator)
570590
})),
571-
warp_sync_params: None,
591+
warp_sync_params,
572592
})?;
573593

574594
let rpc_builder = {
@@ -1327,6 +1347,16 @@ where
13271347
let prometheus_registry = parachain_config.prometheus_registry().cloned();
13281348
let transaction_pool = params.transaction_pool.clone();
13291349
let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue);
1350+
let warp_sync_params =
1351+
match cumulus_client_network::WaitForParachainTargetBlock::<Block>::warp_sync_get(
1352+
para_id,
1353+
relay_chain_interface.clone(),
1354+
)
1355+
.await
1356+
{
1357+
Ok(target_block) => Some(WarpSyncParams::WaitForTarget(target_block)),
1358+
_ => None,
1359+
};
13301360
let (network, system_rpc_tx, tx_handler_controller, start_network) =
13311361
sc_service::build_network(sc_service::BuildNetworkParams {
13321362
config: &parachain_config,
@@ -1337,7 +1367,7 @@ where
13371367
block_announce_validator_builder: Some(Box::new(|_| {
13381368
Box::new(block_announce_validator)
13391369
})),
1340-
warp_sync_params: None,
1370+
warp_sync_params,
13411371
})?;
13421372

13431373
let rpc_builder = {

test/service/src/lib.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ use polkadot_primitives::v2::{CollatorPair, Hash as PHash, PersistedValidationDa
4949
use polkadot_service::ProvideRuntimeApi;
5050
use sc_client_api::execution_extensions::ExecutionStrategies;
5151
use sc_network::{multiaddr, NetworkBlock, NetworkService};
52-
use sc_network_common::{config::TransportConfig, service::NetworkStateInfo};
52+
use sc_network_common::{
53+
config::TransportConfig, service::NetworkStateInfo, sync::warp::WarpSyncParams,
54+
};
5355
use sc_service::{
5456
config::{
5557
BlocksPruning, DatabaseSource, KeystoreConfig, MultiaddrWithPeerId, NetworkConfiguration,
@@ -268,6 +270,16 @@ where
268270
BlockAnnounceValidator::new(relay_chain_interface.clone(), para_id);
269271
let block_announce_validator_builder = move |_| Box::new(block_announce_validator) as Box<_>;
270272

273+
let warp_sync_params =
274+
match cumulus_client_network::WaitForParachainTargetBlock::<Block>::warp_sync_get(
275+
para_id,
276+
relay_chain_interface.clone(),
277+
)
278+
.await
279+
{
280+
Ok(target_block) => Some(WarpSyncParams::WaitForTarget(target_block)),
281+
_ => None,
282+
};
271283
let prometheus_registry = parachain_config.prometheus_registry().cloned();
272284
let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue);
273285
let (network, system_rpc_tx, tx_handler_controller, start_network) =
@@ -278,7 +290,7 @@ where
278290
spawn_handle: task_manager.spawn_handle(),
279291
import_queue: import_queue.clone(),
280292
block_announce_validator_builder: Some(Box::new(block_announce_validator_builder)),
281-
warp_sync_params: None,
293+
warp_sync_params,
282294
})?;
283295

284296
let rpc_builder = {

0 commit comments

Comments
 (0)