diff --git a/Cargo.lock b/Cargo.lock
index e8582c36f74..2f5ed34b297 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1578,6 +1578,7 @@ name = "cumulus-client-consensus-common"
version = "0.1.0"
dependencies = [
"async-trait",
+ "cumulus-primitives-core",
"cumulus-relay-chain-interface",
"cumulus-test-client",
"dyn-clone",
@@ -1690,6 +1691,7 @@ dependencies = [
"cumulus-client-pov-recovery",
"cumulus-primitives-core",
"cumulus-relay-chain-interface",
+ "futures",
"parking_lot 0.12.1",
"polkadot-primitives",
"sc-client-api",
diff --git a/client/consensus/common/Cargo.toml b/client/consensus/common/Cargo.toml
index bcaa7ba73b2..816a004fbbe 100644
--- a/client/consensus/common/Cargo.toml
+++ b/client/consensus/common/Cargo.toml
@@ -25,6 +25,7 @@ sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# Cumulus
+cumulus-primitives-core = { path = "../../../primitives/core" }
cumulus-relay-chain-interface = { path = "../../relay-chain-interface" }
[dev-dependencies]
diff --git a/client/consensus/common/src/parachain_consensus.rs b/client/consensus/common/src/parachain_consensus.rs
index 860eb552c87..2db3b725c43 100644
--- a/client/consensus/common/src/parachain_consensus.rs
+++ b/client/consensus/common/src/parachain_consensus.rs
@@ -15,7 +15,6 @@
// along with Cumulus. If not, see .
use async_trait::async_trait;
-use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
@@ -27,15 +26,25 @@ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT},
};
+use cumulus_primitives_core::{RecoveryDelay, RecoveryKind, RecoveryRequest};
+use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
+
use polkadot_primitives::v2::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption};
use codec::Decode;
-use futures::{select, FutureExt, Stream, StreamExt};
+use futures::{channel::mpsc::Sender, select, FutureExt, Stream, StreamExt};
-use std::{pin::Pin, sync::Arc};
+use std::{pin::Pin, sync::Arc, time::Duration};
const LOG_TARGET: &str = "cumulus-consensus";
+// Delay range to trigger explicit requests.
+// The chosen value doesn't have any special meaning, a random delay within the order of
+// seconds in practice should be a good enough to allow a quick recovery without DOSing
+// the relay chain.
+const RECOVERY_DELAY: RecoveryDelay =
+ RecoveryDelay { min: Duration::ZERO, max: Duration::from_secs(30) };
+
/// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`.
#[async_trait]
pub trait RelaychainClient: Clone + 'static {
@@ -82,7 +91,7 @@ where
let finalized_head = if let Some(h) = finalized_heads.next().await {
h
} else {
- tracing::debug!(target: "cumulus-consensus", "Stopping following finalized head.");
+ tracing::debug!(target: LOG_TARGET, "Stopping following finalized head.");
return
};
@@ -90,7 +99,7 @@ where
Ok(header) => header,
Err(err) => {
tracing::debug!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
error = ?err,
"Could not decode parachain header while following finalized heads.",
);
@@ -105,12 +114,12 @@ where
if let Err(e) = parachain.finalize_block(hash, None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
block_hash = ?hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
error = ?e,
block_hash = ?hash,
"Failed to finalize block",
@@ -136,6 +145,7 @@ pub async fn run_parachain_consensus
(
parachain: Arc
,
relay_chain: R,
announce_block: Arc>) + Send + Sync>,
+ recovery_chan_tx: Option>>,
) where
Block: BlockT,
P: Finalizer
@@ -148,8 +158,13 @@ pub async fn run_parachain_consensus(
R: RelaychainClient,
B: Backend,
{
- let follow_new_best =
- follow_new_best(para_id, parachain.clone(), relay_chain.clone(), announce_block);
+ let follow_new_best = follow_new_best(
+ para_id,
+ parachain.clone(),
+ relay_chain.clone(),
+ announce_block,
+ recovery_chan_tx,
+ );
let follow_finalized_head = follow_finalized_head(para_id, parachain, relay_chain);
select! {
_ = follow_new_best.fuse() => {},
@@ -163,6 +178,7 @@ async fn follow_new_best(
parachain: Arc
,
relay_chain: R,
announce_block: Arc>) + Send + Sync>,
+ recovery_chan_tx: Option>>,
) where
Block: BlockT,
P: Finalizer
@@ -197,10 +213,11 @@ async fn follow_new_best(
h,
&*parachain,
&mut unset_best_header,
+ recovery_chan_tx.clone(),
).await,
None => {
tracing::debug!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
"Stopping following new best.",
);
return
@@ -217,7 +234,7 @@ async fn follow_new_best
(
).await,
None => {
tracing::debug!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
"Stopping following imported blocks.",
);
return
@@ -276,7 +293,7 @@ async fn handle_new_block_imported(
import_block_as_new_best(unset_hash, unset_best_header, parachain).await;
},
state => tracing::debug!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
?unset_best_header,
?notification.header,
?state,
@@ -290,6 +307,7 @@ async fn handle_new_best_parachain_head(
head: Vec,
parachain: &P,
unset_best_header: &mut Option,
+ mut recovery_chan_tx: Option>>,
) where
Block: BlockT,
P: UsageProvider + Send + Sync + BlockBackend,
@@ -299,7 +317,7 @@ async fn handle_new_best_parachain_head(
Ok(header) => header,
Err(err) => {
tracing::debug!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
error = ?err,
"Could not decode Parachain header while following best heads.",
);
@@ -311,7 +329,7 @@ async fn handle_new_best_parachain_head(
if parachain.usage_info().chain.best_hash == hash {
tracing::debug!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
block_hash = ?hash,
"Skipping set new best block, because block is already the best.",
)
@@ -325,7 +343,7 @@ async fn handle_new_best_parachain_head(
},
Ok(BlockStatus::InChainPruned) => {
tracing::error!(
- target: "cumulus-collator",
+ target: LOG_TARGET,
block_hash = ?hash,
"Trying to set pruned block as new best!",
);
@@ -334,14 +352,30 @@ async fn handle_new_best_parachain_head(
*unset_best_header = Some(parachain_head);
tracing::debug!(
- target: "cumulus-collator",
+ target: LOG_TARGET,
block_hash = ?hash,
"Parachain block not yet imported, waiting for import to enact as best block.",
);
+
+ if let Some(ref mut recovery_chan_tx) = recovery_chan_tx {
+ // Best effort channel to actively encourage block recovery.
+ // An error here is not fatal; the relay chain continuously re-announces
+ // the best block, thus we will have other opportunities to retry.
+ let req =
+ RecoveryRequest { hash, delay: RECOVERY_DELAY, kind: RecoveryKind::Full };
+ if let Err(err) = recovery_chan_tx.try_send(req) {
+ tracing::warn!(
+ target: LOG_TARGET,
+ block_hash = ?hash,
+ error = ?err,
+ "Unable to notify block recovery subsystem"
+ )
+ }
+ }
},
Err(e) => {
tracing::error!(
- target: "cumulus-collator",
+ target: LOG_TARGET,
block_hash = ?hash,
error = ?e,
"Failed to get block status of block.",
@@ -361,7 +395,7 @@ where
let best_number = parachain.usage_info().chain.best_number;
if *header.number() < best_number {
tracing::debug!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
%best_number,
block_number = %header.number(),
"Skipping importing block as new best block, because there already exists a \
@@ -377,7 +411,7 @@ where
if let Err(err) = (&*parachain).import_block(block_import_params, Default::default()).await {
tracing::warn!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
block_hash = ?hash,
error = ?err,
"Failed to set new best block.",
diff --git a/client/consensus/common/src/tests.rs b/client/consensus/common/src/tests.rs
index fca1888e279..fc5a1011550 100644
--- a/client/consensus/common/src/tests.rs
+++ b/client/consensus/common/src/tests.rs
@@ -18,6 +18,7 @@ use crate::*;
use async_trait::async_trait;
use codec::Encode;
+use cumulus_primitives_core::RecoveryKind;
use cumulus_relay_chain_interface::RelayChainResult;
use cumulus_test_client::{
runtime::{Block, Header},
@@ -29,7 +30,7 @@ use polkadot_primitives::v2::Id as ParaId;
use sc_client_api::{blockchain::Backend as _, Backend as _, UsageProvider};
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use sp_blockchain::Error as ClientError;
-use sp_consensus::BlockOrigin;
+use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::generic::BlockId;
use std::{
sync::{Arc, Mutex},
@@ -103,22 +104,8 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
}
}
-fn build_and_import_block(mut client: Arc, import_as_best: bool) -> Block {
- build_and_import_block_ext(
- &*client.clone(),
- BlockOrigin::Own,
- import_as_best,
- &mut client,
- None,
- None,
- )
-}
-
-fn build_and_import_block_ext>(
+fn build_block(
builder: &B,
- origin: BlockOrigin,
- import_as_best: bool,
- importer: &mut I,
at: Option>,
timestamp: Option,
) -> Block {
@@ -132,27 +119,67 @@ fn build_and_import_block_ext>(
};
let mut block = builder.build().unwrap().block;
- let (header, body) = block.clone().deconstruct();
- // Simulate some form of post activity.
+ // Simulate some form of post activity (like a Seal or Other generic things).
// This is mostly used to excercise the `LevelMonitor` correct behavior.
// (in practice we want that header post-hash != pre-hash)
- let post_digest = sp_runtime::DigestItem::Other(vec![1, 2, 3]);
+ block.header.digest.push(sp_runtime::DigestItem::Other(vec![1, 2, 3]));
+
+ block
+}
+
+async fn import_block>(
+ importer: &mut I,
+ block: Block,
+ origin: BlockOrigin,
+ import_as_best: bool,
+) {
+ let (mut header, body) = block.deconstruct();
+
+ let post_digest =
+ header.digest.pop().expect("post digested is present in manually crafted block");
let mut block_import_params = BlockImportParams::new(origin, header);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(import_as_best));
block_import_params.body = Some(body);
- block_import_params.post_digests.push(post_digest.clone());
+ block_import_params.post_digests.push(post_digest);
- block_on(importer.import_block(block_import_params, Default::default())).unwrap();
+ importer.import_block(block_import_params, Default::default()).await.unwrap();
+}
- // In order to get a header hash compatible with block import params containing some
- // form of `post_digest`, we need to manually push the post digest within the header
- // digest logs.
- block.header.digest.push(post_digest);
+fn import_block_sync>(
+ importer: &mut I,
+ block: Block,
+ origin: BlockOrigin,
+ import_as_best: bool,
+) {
+ block_on(import_block(importer, block, origin, import_as_best));
+}
+
+fn build_and_import_block_ext>(
+ builder: &B,
+ origin: BlockOrigin,
+ import_as_best: bool,
+ importer: &mut I,
+ at: Option>,
+ timestamp: Option,
+) -> Block {
+ let block = build_block(builder, at, timestamp);
+ import_block_sync(importer, block.clone(), origin, import_as_best);
block
}
+fn build_and_import_block(mut client: Arc, import_as_best: bool) -> Block {
+ build_and_import_block_ext(
+ &*client.clone(),
+ BlockOrigin::Own,
+ import_as_best,
+ &mut client,
+ None,
+ None,
+ )
+}
+
#[test]
fn follow_new_best_works() {
sp_tracing::try_init_simple();
@@ -164,7 +191,7 @@ fn follow_new_best_works() {
let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
let consensus =
- run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
+ run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
let work = async move {
new_best_heads_sender.unbounded_send(block.header().clone()).unwrap();
@@ -187,6 +214,68 @@ fn follow_new_best_works() {
});
}
+#[test]
+fn follow_new_best_with_dummy_recovery_works() {
+ sp_tracing::try_init_simple();
+
+ let client = Arc::new(TestClientBuilder::default().build());
+
+ let relay_chain = Relaychain::new();
+ let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
+
+ let (recovery_chan_tx, mut recovery_chan_rx) = futures::channel::mpsc::channel(3);
+
+ let consensus = run_parachain_consensus(
+ 100.into(),
+ client.clone(),
+ relay_chain,
+ Arc::new(|_, _| {}),
+ Some(recovery_chan_tx),
+ );
+
+ let block = build_block(&*client.clone(), None, None);
+ let block_clone = block.clone();
+ let client_clone = client.clone();
+
+ let work = async move {
+ new_best_heads_sender.unbounded_send(block.header().clone()).unwrap();
+ loop {
+ Delay::new(Duration::from_millis(100)).await;
+ match client.block_status(&BlockId::Hash(block.hash())).unwrap() {
+ BlockStatus::Unknown => {},
+ status => {
+ assert_eq!(block.hash(), client.usage_info().chain.best_hash);
+ assert_eq!(status, BlockStatus::InChainWithState);
+ break
+ },
+ }
+ }
+ };
+
+ let dummy_block_recovery = async move {
+ loop {
+ if let Some(req) = recovery_chan_rx.next().await {
+ assert_eq!(req.hash, block_clone.hash());
+ assert_eq!(req.kind, RecoveryKind::Full);
+ Delay::new(Duration::from_millis(500)).await;
+ import_block(&mut &*client_clone, block_clone.clone(), BlockOrigin::Own, true)
+ .await;
+ }
+ }
+ };
+
+ block_on(async move {
+ futures::pin_mut!(consensus);
+ futures::pin_mut!(work);
+
+ select! {
+ r = consensus.fuse() => panic!("Consensus should not end: {:?}", r),
+ _ = dummy_block_recovery.fuse() => {},
+ _ = work.fuse() => {},
+ }
+ });
+}
+
#[test]
fn follow_finalized_works() {
sp_tracing::try_init_simple();
@@ -198,7 +287,7 @@ fn follow_finalized_works() {
let finalized_sender = relay_chain.inner.lock().unwrap().finalized_heads_sender.clone();
let consensus =
- run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
+ run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
let work = async move {
finalized_sender.unbounded_send(block.header().clone()).unwrap();
@@ -239,7 +328,7 @@ fn follow_finalized_does_not_stop_on_unknown_block() {
let finalized_sender = relay_chain.inner.lock().unwrap().finalized_heads_sender.clone();
let consensus =
- run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
+ run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
let work = async move {
for _ in 0..3usize {
@@ -289,7 +378,7 @@ fn follow_new_best_sets_best_after_it_is_imported() {
let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
let consensus =
- run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
+ run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
let work = async move {
new_best_heads_sender.unbounded_send(block.header().clone()).unwrap();
@@ -366,7 +455,7 @@ fn do_not_set_best_block_to_older_block() {
let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
let consensus =
- run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
+ run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
let client2 = client.clone();
let work = async move {
diff --git a/client/pov-recovery/src/active_candidate_recovery.rs b/client/pov-recovery/src/active_candidate_recovery.rs
index a269a26f821..caae3615a85 100644
--- a/client/pov-recovery/src/active_candidate_recovery.rs
+++ b/client/pov-recovery/src/active_candidate_recovery.rs
@@ -42,19 +42,19 @@ impl ActiveCandidateRecovery {
Self { recoveries: Default::default(), candidates: Default::default(), overseer_handle }
}
- /// Recover the given `pending_candidate`.
+ /// Recover the given `candidate`.
pub async fn recover_candidate(
&mut self,
block_hash: Block::Hash,
- pending_candidate: crate::PendingCandidate,
+ candidate: &crate::Candidate,
) {
let (tx, rx) = oneshot::channel();
self.overseer_handle
.send_msg(
AvailabilityRecoveryMessage::RecoverAvailableData(
- pending_candidate.receipt,
- pending_candidate.session_index,
+ candidate.receipt.clone(),
+ candidate.session_index,
None,
tx,
),
diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs
index 3327d4bb86b..5c9764f6a9c 100644
--- a/client/pov-recovery/src/lib.rs
+++ b/client/pov-recovery/src/lib.rs
@@ -55,11 +55,13 @@ use polkadot_primitives::v2::{
CandidateReceipt, CommittedCandidateReceipt, Id as ParaId, SessionIndex,
};
-use cumulus_primitives_core::ParachainBlockData;
+use cumulus_primitives_core::{ParachainBlockData, RecoveryDelay, RecoveryKind, RecoveryRequest};
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use codec::Decode;
-use futures::{select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
+use futures::{
+ channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt,
+};
use futures_timer::Delay;
use rand::{thread_rng, Rng};
@@ -67,7 +69,6 @@ use std::{
collections::{HashMap, VecDeque},
pin::Pin,
sync::Arc,
- time::Duration,
};
mod active_candidate_recovery;
@@ -75,38 +76,21 @@ use active_candidate_recovery::ActiveCandidateRecovery;
const LOG_TARGET: &str = "cumulus-pov-recovery";
-/// Represents a pending candidate.
-struct PendingCandidate {
+/// Represents an outstanding block candidate.
+struct Candidate {
receipt: CandidateReceipt,
session_index: SessionIndex,
block_number: NumberFor,
-}
-
-/// The delay between observing an unknown block and recovering this block.
-#[derive(Clone, Copy)]
-pub enum RecoveryDelay {
- /// Start recovering the block in maximum of the given delay.
- WithMax { max: Duration },
- /// Start recovering the block after at least `min` delay and in maximum `max` delay.
- WithMinAndMax { min: Duration, max: Duration },
-}
-
-impl RecoveryDelay {
- /// Return as [`Delay`].
- fn as_delay(self) -> Delay {
- match self {
- Self::WithMax { max } => Delay::new(max.mul_f64(thread_rng().gen())),
- Self::WithMinAndMax { min, max } =>
- Delay::new(min + max.saturating_sub(min).mul_f64(thread_rng().gen())),
- }
- }
+ parent_hash: Block::Hash,
+ // Lazy recovery has been submitted.
+ waiting_recovery: bool,
}
/// Encapsulates the logic of the pov recovery.
pub struct PoVRecovery {
/// All the pending candidates that we are waiting for to be imported or that need to be
/// recovered when `next_candidate_to_recover` tells us to do so.
- pending_candidates: HashMap>,
+ candidates: HashMap>,
/// A stream of futures that resolve to hashes of candidates that need to be recovered.
///
/// The candidates to the hashes are stored in `pending_candidates`. If a candidate is not
@@ -122,6 +106,8 @@ pub struct PoVRecovery {
parachain_import_queue: IQ,
relay_chain_interface: RC,
para_id: ParaId,
+ /// Explicit block recovery requests channel.
+ recovery_chan_rx: Receiver>,
}
impl PoVRecovery
@@ -138,9 +124,10 @@ where
parachain_import_queue: IQ,
relay_chain_interface: RCInterface,
para_id: ParaId,
+ recovery_chan_rx: Receiver>,
) -> Self {
Self {
- pending_candidates: HashMap::new(),
+ candidates: HashMap::new(),
next_candidate_to_recover: Default::default(),
active_candidate_recovery: ActiveCandidateRecovery::new(overseer_handle),
recovery_delay,
@@ -149,6 +136,7 @@ where
parachain_import_queue,
relay_chain_interface,
para_id,
+ recovery_chan_rx,
}
}
@@ -175,69 +163,54 @@ where
}
let hash = header.hash();
- match self.parachain_client.block_status(&BlockId::Hash(hash)) {
- Ok(BlockStatus::Unknown) => (),
- // Any other state means, we should ignore it.
- Ok(_) => return,
- Err(e) => {
- tracing::debug!(
- target: LOG_TARGET,
- error = ?e,
- block_hash = ?hash,
- "Failed to get block status",
- );
- return
- },
- }
- tracing::debug!(target: LOG_TARGET, ?hash, "Adding pending candidate");
- if self
- .pending_candidates
- .insert(
- hash,
- PendingCandidate {
- block_number: *header.number(),
- receipt: receipt.to_plain(),
- session_index,
- },
- )
- .is_some()
- {
+ if self.candidates.contains_key(&hash) {
return
}
- // Delay the recovery by some random time to not spam the relay chain.
- let delay = self.recovery_delay.as_delay();
- self.next_candidate_to_recover.push(
- async move {
- delay.await;
- hash
- }
- .boxed(),
+ tracing::debug!(target: LOG_TARGET, block_hash = ?hash, "Adding outstanding candidate");
+ self.candidates.insert(
+ hash,
+ Candidate {
+ block_number: *header.number(),
+ receipt: receipt.to_plain(),
+ session_index,
+ parent_hash: *header.parent_hash(),
+ waiting_recovery: false,
+ },
);
+
+ // If required, triggers a lazy recovery request that will eventually be blocked
+ // if in the meantime the block is imported.
+ self.recover(RecoveryRequest {
+ hash,
+ delay: self.recovery_delay,
+ kind: RecoveryKind::Simple,
+ });
}
/// Handle an imported block.
- fn handle_block_imported(&mut self, hash: &Block::Hash) {
- self.pending_candidates.remove(hash);
+ fn handle_block_imported(&mut self, block_hash: &Block::Hash) {
+ self.candidates.get_mut(block_hash).map(|candidate| {
+ // Prevents triggering an already enqueued recovery request
+ candidate.waiting_recovery = false;
+ });
}
/// Handle a finalized block with the given `block_number`.
fn handle_block_finalized(&mut self, block_number: NumberFor) {
- self.pending_candidates.retain(|_, pc| pc.block_number > block_number);
+ self.candidates.retain(|_, pc| pc.block_number > block_number);
}
/// Recover the candidate for the given `block_hash`.
async fn recover_candidate(&mut self, block_hash: Block::Hash) {
- let pending_candidate = match self.pending_candidates.remove(&block_hash) {
- Some(pending_candidate) => pending_candidate,
- None => return,
- };
-
- tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request");
- self.active_candidate_recovery
- .recover_candidate(block_hash, pending_candidate)
- .await;
+ match self.candidates.get(&block_hash) {
+ Some(candidate) if candidate.waiting_recovery => {
+ tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request");
+ self.active_candidate_recovery.recover_candidate(block_hash, candidate).await;
+ },
+ _ => (),
+ }
}
/// Clear `waiting_for_parent` from the given `hash` and do this recursively for all child
@@ -349,7 +322,7 @@ where
async fn import_block(&mut self, block: Block) {
let mut blocks = VecDeque::new();
- tracing::debug!(target: LOG_TARGET, hash = ?block.hash(), "Importing block retrieved using pov_recovery");
+ tracing::debug!(target: LOG_TARGET, block_hash = ?block.hash(), "Importing block retrieved using pov_recovery");
blocks.push_back(block);
let mut incoming_blocks = Vec::new();
@@ -380,6 +353,70 @@ where
.import_blocks(BlockOrigin::ConsensusBroadcast, incoming_blocks);
}
+ /// Attempts an explicit recovery of one or more blocks.
+ pub fn recover(&mut self, req: RecoveryRequest) {
+ let RecoveryRequest { mut hash, delay, kind } = req;
+ let mut to_recover = Vec::new();
+
+ let do_recover = loop {
+ let candidate = match self.candidates.get_mut(&hash) {
+ Some(candidate) => candidate,
+ None => {
+ tracing::debug!(
+ target: LOG_TARGET,
+ block_hash = ?hash,
+ "Cound not recover. Block was never announced as candidate"
+ );
+ break false
+ },
+ };
+
+ match self.parachain_client.block_status(&BlockId::Hash(hash)) {
+ Ok(BlockStatus::Unknown) if !candidate.waiting_recovery => {
+ candidate.waiting_recovery = true;
+ to_recover.push(hash);
+ },
+ Ok(_) => break true,
+ Err(e) => {
+ tracing::error!(
+ target: LOG_TARGET,
+ error = ?e,
+ block_hash = ?hash,
+ "Failed to get block status",
+ );
+ break false
+ },
+ }
+
+ if kind == RecoveryKind::Simple {
+ break true
+ }
+
+ hash = candidate.parent_hash;
+ };
+
+ if do_recover {
+ for hash in to_recover.into_iter().rev() {
+ let delay =
+ delay.min + delay.max.saturating_sub(delay.min).mul_f64(thread_rng().gen());
+ tracing::debug!(
+ target: LOG_TARGET,
+ block_hash = ?hash,
+ "Starting {:?} block recovery in {:?} sec",
+ kind,
+ delay.as_secs(),
+ );
+ self.next_candidate_to_recover.push(
+ async move {
+ Delay::new(delay).await;
+ hash
+ }
+ .boxed(),
+ );
+ }
+ }
+ }
+
/// Run the pov-recovery.
pub async fn run(mut self) {
let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
@@ -401,10 +438,15 @@ where
if let Some((receipt, session_index)) = pending_candidate {
self.handle_pending_candidate(receipt, session_index);
} else {
- tracing::debug!(
- target: LOG_TARGET,
- "Pending candidates stream ended",
- );
+ tracing::debug!(target: LOG_TARGET, "Pending candidates stream ended");
+ return;
+ }
+ },
+ recovery_req = self.recovery_chan_rx.next() => {
+ if let Some(req) = recovery_req {
+ self.recover(req);
+ } else {
+ tracing::debug!(target: LOG_TARGET, "Recovery channel stream ended");
return;
}
},
@@ -412,10 +454,7 @@ where
if let Some(imported) = imported {
self.handle_block_imported(&imported.hash);
} else {
- tracing::debug!(
- target: LOG_TARGET,
- "Imported blocks stream ended",
- );
+ tracing::debug!(target: LOG_TARGET, "Imported blocks stream ended");
return;
}
},
@@ -423,10 +462,7 @@ where
if let Some(finalized) = finalized {
self.handle_block_finalized(*finalized.header.number());
} else {
- tracing::debug!(
- target: LOG_TARGET,
- "Finalized blocks stream ended",
- );
+ tracing::debug!(target: LOG_TARGET, "Finalized blocks stream ended");
return;
}
},
diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml
index e705364a1c8..1151a978d34 100644
--- a/client/service/Cargo.toml
+++ b/client/service/Cargo.toml
@@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
parking_lot = "0.12.1"
+futures = "0.3.24"
# Substrate
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs
index 067ca1c83f3..a7f6a16625a 100644
--- a/client/service/src/lib.rs
+++ b/client/service/src/lib.rs
@@ -19,9 +19,10 @@
//! Provides functions for starting a collator node or a normal full node.
use cumulus_client_consensus_common::ParachainConsensus;
-use cumulus_primitives_core::{CollectCollationInfo, ParaId};
+use cumulus_primitives_core::{CollectCollationInfo, ParaId, RecoveryDelay};
use cumulus_relay_chain_interface::RelayChainInterface;
use polkadot_primitives::v2::CollatorPair;
+
use sc_client_api::{
Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, UsageProvider,
};
@@ -30,6 +31,7 @@ use sc_consensus::{
BlockImport,
};
use sc_service::{Configuration, TaskManager};
+
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_consensus::BlockOrigin;
@@ -38,8 +40,15 @@ use sp_runtime::{
traits::{Block as BlockT, NumberFor},
Justifications,
};
+
+use futures::channel::mpsc;
use std::{sync::Arc, time::Duration};
+// Given the sporadic nature of the explicit recovery operation and the
+// possibility to retry infinite times this value is more than enough.
+// In practice here we expect no more than one queued messages.
+const RECOVERY_CHAN_SIZE: usize = 8;
+
/// Parameters given to [`start_collator`].
pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawner, IQ> {
pub block_status: Arc,
@@ -94,11 +103,14 @@ where
Backend: BackendT + 'static,
IQ: ImportQueue + 'static,
{
+ let (recovery_chan_tx, recovery_chan_rx) = mpsc::channel(RECOVERY_CHAN_SIZE);
+
let consensus = cumulus_client_consensus_common::run_parachain_consensus(
para_id,
client.clone(),
relay_chain_interface.clone(),
announce_block.clone(),
+ Some(recovery_chan_tx),
);
task_manager
@@ -113,11 +125,12 @@ where
overseer_handle.clone(),
// We want that collators wait at maximum the relay chain slot duration before starting
// to recover blocks.
- cumulus_client_pov_recovery::RecoveryDelay::WithMax { max: relay_chain_slot_duration },
+ RecoveryDelay { min: core::time::Duration::ZERO, max: relay_chain_slot_duration },
client.clone(),
import_queue,
relay_chain_interface.clone(),
para_id,
+ recovery_chan_rx,
);
task_manager
@@ -178,11 +191,14 @@ where
RCInterface: RelayChainInterface + Clone + 'static,
IQ: ImportQueue + 'static,
{
+ let (recovery_chan_tx, recovery_chan_rx) = mpsc::channel(RECOVERY_CHAN_SIZE);
+
let consensus = cumulus_client_consensus_common::run_parachain_consensus(
para_id,
client.clone(),
relay_chain_interface.clone(),
announce_block,
+ Some(recovery_chan_tx),
);
task_manager
@@ -200,14 +216,12 @@ where
// the recovery way before full nodes try to recover a certain block and then share the
// block with the network using "the normal way". Full nodes are just the "last resort"
// for block recovery.
- cumulus_client_pov_recovery::RecoveryDelay::WithMinAndMax {
- min: relay_chain_slot_duration * 25,
- max: relay_chain_slot_duration * 50,
- },
+ RecoveryDelay { min: relay_chain_slot_duration * 25, max: relay_chain_slot_duration * 50 },
client,
import_queue,
relay_chain_interface,
para_id,
+ recovery_chan_rx,
);
task_manager
diff --git a/primitives/core/src/lib.rs b/primitives/core/src/lib.rs
index 516ff817dcf..2f0105248f5 100644
--- a/primitives/core/src/lib.rs
+++ b/primitives/core/src/lib.rs
@@ -19,6 +19,7 @@
#![cfg_attr(not(feature = "std"), no_std)]
use codec::{Decode, Encode};
+use core::time::Duration;
use polkadot_parachain::primitives::HeadData;
use sp_runtime::{traits::Block as BlockT, RuntimeDebug};
use sp_std::prelude::*;
@@ -194,6 +195,37 @@ impl ParachainBlockData {
}
}
+/// Type of recovery to trigger.
+#[derive(Debug, PartialEq)]
+pub enum RecoveryKind {
+ /// Single block recovery.
+ Simple,
+ /// Full ancestry recovery.
+ Full,
+}
+
+/// Structure used to trigger an explicit recovery request via `PoVRecovery`.
+pub struct RecoveryRequest {
+ /// Hash of the last block to recover.
+ pub hash: Block::Hash,
+ /// Recovery delay range. Randomizing the start of the recovery within this interval
+ /// can be used to prevent self-DOSing if the recovery request is part of a
+ /// distributed protocol and there is the possibility that multiple actors are
+ /// requiring to perform the recovery action at approximately the same time.
+ pub delay: RecoveryDelay,
+ /// Recovery type.
+ pub kind: RecoveryKind,
+}
+
+/// The delay between observing an unknown block and triggering the recovery of a block.
+#[derive(Clone, Copy)]
+pub struct RecoveryDelay {
+ /// Start recovering after `min` delay.
+ pub min: Duration,
+ /// Start recovering before `max` delay.
+ pub max: Duration,
+}
+
/// Information about a collation.
///
/// This was used in version 1 of the [`CollectCollationInfo`] runtime api.