Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4ab10a7
parachain informant
sistemd Apr 24, 2025
f4feee1
terser log messages
sistemd Apr 27, 2025
8555c03
cumulus-relay-chain-streams crate
sistemd Apr 27, 2025
5877fdd
move the informant to start_relay_chain_tasks
sistemd Apr 27, 2025
3647ee1
metrics
sistemd Apr 27, 2025
d4fdb8b
Merge remote-tracking branch 'origin/master' into sistemd/parachain-i…
sistemd Apr 27, 2025
7fe5c7e
move more streams into the cumulus-relay-chain-streams crate
sistemd Apr 29, 2025
6d8a764
calculate unincluded segment size correctly
sistemd Apr 29, 2025
24747e7
handle included and backed blocks separately
sistemd Apr 30, 2025
297064a
Merge remote-tracking branch 'origin/master' into sistemd/parachain-i…
sistemd Apr 30, 2025
094e5c1
Merge remote-tracking branch 'origin/master' into sistemd/parachain-i…
sistemd May 7, 2025
711327e
use candidate_events
sistemd May 7, 2025
347c81a
rename client_for_closure to client
sistemd May 7, 2025
0e0fb2b
Merge branch 'master' into sistemd/parachain-informant
sistemd May 7, 2025
a1d312b
fmt
sistemd May 7, 2025
0a42273
fix test build error
sistemd May 7, 2025
6236884
Merge remote-tracking branch 'origin/master' into sistemd/parachain-i…
sistemd May 8, 2025
26afa66
fixing CI
sistemd May 8, 2025
d14e780
revert to only logging once per update
sistemd May 9, 2025
0c3c956
graceful error handling
sistemd May 9, 2025
2fa4062
Merge remote-tracking branch 'origin/master' into sistemd/parachain-i…
sistemd May 9, 2025
f6d636e
Update from github-actions[bot] running command 'prdoc --audience run…
github-actions[bot] May 9, 2025
f49189a
add prdoc
sistemd May 9, 2025
89ec28c
some nits
sistemd May 12, 2025
a0a702c
Merge remote-tracking branch 'origin/master' into sistemd/parachain-i…
sistemd May 12, 2025
e709bfe
undo formatting change
sistemd May 13, 2025
9281d3f
update prdoc
sistemd May 13, 2025
c39d476
parachain informant doc comment and small changes
sistemd May 13, 2025
8f35b0d
fix prdoc again
sistemd May 13, 2025
7713c0a
ditto
sistemd May 13, 2025
4b2b6de
Merge branch 'master' into sistemd/parachain-informant
sistemd May 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ members = [
"cumulus/client/relay-chain-interface",
"cumulus/client/relay-chain-minimal-node",
"cumulus/client/relay-chain-rpc-interface",
"cumulus/client/relay-chain-streams",
"cumulus/client/service",
"cumulus/pallets/aura-ext",
"cumulus/pallets/collator-selection",
Expand Down Expand Up @@ -751,6 +752,7 @@ cumulus-relay-chain-inprocess-interface = { path = "cumulus/client/relay-chain-i
cumulus-relay-chain-interface = { path = "cumulus/client/relay-chain-interface", default-features = false }
cumulus-relay-chain-minimal-node = { path = "cumulus/client/relay-chain-minimal-node", default-features = false }
cumulus-relay-chain-rpc-interface = { path = "cumulus/client/relay-chain-rpc-interface", default-features = false }
cumulus-relay-chain-streams = { path = "cumulus/client/relay-chain-streams", default-features = false }
cumulus-test-client = { path = "cumulus/test/client" }
cumulus-test-relay-sproof-builder = { path = "cumulus/test/relay-sproof-builder", default-features = false }
cumulus-test-runtime = { path = "cumulus/test/runtime" }
Expand Down
1 change: 1 addition & 0 deletions cumulus/client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ polkadot-primitives = { workspace = true, default-features = true }
cumulus-client-pov-recovery = { workspace = true, default-features = true }
cumulus-primitives-core = { workspace = true, default-features = true }
cumulus-relay-chain-interface = { workspace = true, default-features = true }
cumulus-relay-chain-streams = { workspace = true, default-features = true }
schnellru = { workspace = true }

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions cumulus/client/consensus/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod tests;

pub use parent_search::*;

pub use cumulus_relay_chain_streams::finalized_heads;
pub use parachain_consensus::run_parachain_consensus;

use level_monitor::LevelMonitor;
Expand Down
49 changes: 5 additions & 44 deletions cumulus/client/consensus/common/src/parachain_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.

use cumulus_relay_chain_streams::{finalized_heads, new_best_heads};
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
Expand All @@ -25,12 +26,12 @@ use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};

use cumulus_client_pov_recovery::{RecoveryKind, RecoveryRequest};
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use cumulus_relay_chain_interface::RelayChainInterface;

use polkadot_primitives::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption};
use polkadot_primitives::Id as ParaId;

use codec::Decode;
use futures::{channel::mpsc::Sender, pin_mut, select, FutureExt, Stream, StreamExt};
use futures::{channel::mpsc::Sender, pin_mut, select, FutureExt, StreamExt};

use std::sync::Arc;

Expand Down Expand Up @@ -120,7 +121,7 @@ where
select! {
fin = finalized_heads.next() => {
match fin {
Some(finalized_head) =>
Some((finalized_head, _)) =>
handle_new_finalized_head(&parachain, finalized_head, &mut last_seen_finalized_hashes),
None => {
tracing::debug!(target: LOG_TARGET, "Stopping following finalized head.");
Expand Down Expand Up @@ -466,43 +467,3 @@ where
);
}
}

/// Returns a stream that will yield best heads for the given `para_id`.
async fn new_best_heads(
relay_chain: impl RelayChainInterface + Clone,
para_id: ParaId,
) -> RelayChainResult<impl Stream<Item = Vec<u8>>> {
let new_best_notification_stream =
relay_chain.new_best_notification_stream().await?.filter_map(move |n| {
let relay_chain = relay_chain.clone();
async move { parachain_head_at(&relay_chain, n.hash(), para_id).await.ok().flatten() }
});

Ok(new_best_notification_stream)
}

/// Returns a stream that will yield finalized heads for the given `para_id`.
async fn finalized_heads(
relay_chain: impl RelayChainInterface + Clone,
para_id: ParaId,
) -> RelayChainResult<impl Stream<Item = Vec<u8>>> {
let finality_notification_stream =
relay_chain.finality_notification_stream().await?.filter_map(move |n| {
let relay_chain = relay_chain.clone();
async move { parachain_head_at(&relay_chain, n.hash(), para_id).await.ok().flatten() }
});

Ok(finality_notification_stream)
}

/// Returns head of the parachain at the given relay chain block.
async fn parachain_head_at(
relay_chain: &impl RelayChainInterface,
at: PHash,
para_id: ParaId,
) -> RelayChainResult<Option<Vec<u8>>> {
relay_chain
.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut)
.await
.map(|s| s.map(|s| s.parent_head.0))
}
2 changes: 1 addition & 1 deletion cumulus/client/pov-recovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ tracing = { workspace = true, default-features = true }
# Substrate
sc-client-api = { workspace = true, default-features = true }
sc-consensus = { workspace = true, default-features = true }
sp-api = { workspace = true, default-features = true }
sp-consensus = { workspace = true, default-features = true }
sp-maybe-compressed-blob = { workspace = true, default-features = true }
sp-runtime = { workspace = true, default-features = true }
Expand All @@ -37,6 +36,7 @@ polkadot-primitives = { workspace = true, default-features = true }
async-trait = { workspace = true }
cumulus-primitives-core = { workspace = true, default-features = true }
cumulus-relay-chain-interface = { workspace = true, default-features = true }
cumulus-relay-chain-streams = { workspace = true, default-features = true }

[dev-dependencies]
assert_matches = { workspace = true }
Expand Down
99 changes: 5 additions & 94 deletions cumulus/client/pov-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,11 @@

use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider};
use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
use sp_api::RuntimeApiInfo;
use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};

use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT};
use polkadot_node_subsystem::messages::{AvailabilityRecoveryMessage, RuntimeApiRequest};
use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::{
vstaging::{
Expand All @@ -65,11 +64,12 @@ use polkadot_primitives::{
};

use cumulus_primitives_core::ParachainBlockData;
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use cumulus_relay_chain_interface::RelayChainInterface;
use cumulus_relay_chain_streams::pending_candidates;

use codec::{Decode, DecodeAll};
use futures::{
channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt,
channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, StreamExt,
};
use futures_timer::Delay;
use rand::{distributions::Uniform, prelude::Distribution, thread_rng};
Expand Down Expand Up @@ -605,7 +605,7 @@ where
loop {
select! {
next_pending_candidates = pending_candidates.next() => {
if let Some((candidates, session_index)) = next_pending_candidates {
if let Some((candidates, session_index, _)) = next_pending_candidates {
for candidate in candidates {
self.handle_pending_candidate(candidate, session_index);
}
Expand Down Expand Up @@ -661,92 +661,3 @@ where
}
}
}

/// Returns a stream over pending candidates for the parachain corresponding to `para_id`.
async fn pending_candidates(
relay_chain_client: impl RelayChainInterface + Clone,
para_id: ParaId,
sync_service: Arc<dyn SyncOracle + Sync + Send>,
) -> RelayChainResult<impl Stream<Item = (Vec<CommittedCandidateReceipt>, SessionIndex)>> {
let import_notification_stream = relay_chain_client.import_notification_stream().await?;

let filtered_stream = import_notification_stream.filter_map(move |n| {
let client_for_closure = relay_chain_client.clone();
let sync_oracle = sync_service.clone();
async move {
let hash = n.hash();
if sync_oracle.is_major_syncing() {
tracing::debug!(
target: LOG_TARGET,
relay_hash = ?hash,
"Skipping candidate due to sync.",
);
return None
}

let runtime_api_version = client_for_closure
.version(hash)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to fetch relay chain runtime version.",
)
})
.ok()?;
let parachain_host_runtime_api_version = runtime_api_version
.api_version(
&<dyn polkadot_primitives::runtime_api::ParachainHost<
polkadot_primitives::Block,
>>::ID,
)
.unwrap_or_default();

// If the relay chain runtime does not support the new runtime API, fallback to the
// deprecated one.
let pending_availability_result = if parachain_host_runtime_api_version <
RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT
{
#[allow(deprecated)]
client_for_closure
.candidate_pending_availability(hash, para_id)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to fetch pending candidates.",
)
})
.map(|candidate| candidate.into_iter().collect::<Vec<_>>())
} else {
client_for_closure.candidates_pending_availability(hash, para_id).await.map_err(
|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to fetch pending candidates.",
)
},
)
};

let session_index_result =
client_for_closure.session_index_for_child(hash).await.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to fetch session index.",
)
});

if let Ok(candidates) = pending_availability_result {
session_index_result.map(|session_index| (candidates, session_index)).ok()
} else {
None
}
}
});
Ok(filtered_stream)
}
27 changes: 27 additions & 0 deletions cumulus/client/relay-chain-streams/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "cumulus-relay-chain-streams"
version = "0.7.0"
authors.workspace = true
description = "Cumulus client common relay chain streams."
edition.workspace = true
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
homepage.workspace = true
repository.workspace = true

[lints]
workspace = true

[dependencies]
futures = { workspace = true }
tracing = { workspace = true, default-features = true }

# Substrate
sp-api = { workspace = true, default-features = true }
sp-consensus = { workspace = true, default-features = true }

# Polkadot
polkadot-node-subsystem = { workspace = true, default-features = true }
polkadot-primitives = { workspace = true, default-features = true }

# Cumulus
cumulus-relay-chain-interface = { workspace = true, default-features = true }
Loading
Loading