diff --git a/Cargo.lock b/Cargo.lock index fa27c09f1c12b..0cea3091e4d3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4276,6 +4276,7 @@ dependencies = [ "cumulus-client-pov-recovery", "cumulus-primitives-core", "cumulus-relay-chain-interface", + "cumulus-relay-chain-streams", "cumulus-test-client", "cumulus-test-relay-sproof-builder", "dyn-clone", @@ -4399,6 +4400,7 @@ dependencies = [ "async-trait", "cumulus-primitives-core", "cumulus-relay-chain-interface", + "cumulus-relay-chain-streams", "cumulus-test-client", "futures", "futures-timer", @@ -4439,8 +4441,10 @@ dependencies = [ "cumulus-relay-chain-inprocess-interface", "cumulus-relay-chain-interface", "cumulus-relay-chain-minimal-node", + "cumulus-relay-chain-streams", "futures", "polkadot-primitives", + "prometheus", "sc-client-api", "sc-consensus", "sc-network", @@ -4890,6 +4894,19 @@ dependencies = [ "url", ] +[[package]] +name = "cumulus-relay-chain-streams" +version = "0.7.0" +dependencies = [ + "cumulus-relay-chain-interface", + "futures", + "polkadot-node-subsystem", + "polkadot-primitives", + "sp-api 26.0.0", + "sp-consensus", + "tracing", +] + [[package]] name = "cumulus-test-client" version = "0.1.0" @@ -15816,6 +15833,7 @@ dependencies = [ "cumulus-relay-chain-interface", "cumulus-relay-chain-minimal-node", "cumulus-relay-chain-rpc-interface", + "cumulus-relay-chain-streams", "cumulus-test-relay-sproof-builder", "emulated-integration-tests-common", "fork-tree", diff --git a/Cargo.toml b/Cargo.toml index 4f0e13418f3d2..bdb74565eb07c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,6 +76,7 @@ members = [ "cumulus/client/relay-chain-interface", "cumulus/client/relay-chain-minimal-node", "cumulus/client/relay-chain-rpc-interface", + "cumulus/client/relay-chain-streams", "cumulus/client/service", "cumulus/pallets/aura-ext", "cumulus/pallets/collator-selection", @@ -753,6 +754,7 @@ cumulus-relay-chain-inprocess-interface = { path = "cumulus/client/relay-chain-i cumulus-relay-chain-interface = { path = "cumulus/client/relay-chain-interface", default-features = false } cumulus-relay-chain-minimal-node = { path = "cumulus/client/relay-chain-minimal-node", default-features = false } cumulus-relay-chain-rpc-interface = { path = "cumulus/client/relay-chain-rpc-interface", default-features = false } +cumulus-relay-chain-streams = { path = "cumulus/client/relay-chain-streams", default-features = false } cumulus-test-client = { path = "cumulus/test/client" } cumulus-test-relay-sproof-builder = { path = "cumulus/test/relay-sproof-builder", default-features = false } cumulus-test-runtime = { path = "cumulus/test/runtime" } diff --git a/cumulus/client/consensus/common/Cargo.toml b/cumulus/client/consensus/common/Cargo.toml index ae2c23c2e7295..1615215abfa73 100644 --- a/cumulus/client/consensus/common/Cargo.toml +++ b/cumulus/client/consensus/common/Cargo.toml @@ -41,6 +41,7 @@ polkadot-primitives = { workspace = true, default-features = true } cumulus-client-pov-recovery = { workspace = true, default-features = true } cumulus-primitives-core = { workspace = true, default-features = true } cumulus-relay-chain-interface = { workspace = true, default-features = true } +cumulus-relay-chain-streams = { workspace = true, default-features = true } schnellru = { workspace = true } [dev-dependencies] diff --git a/cumulus/client/consensus/common/src/lib.rs b/cumulus/client/consensus/common/src/lib.rs index 86d5803ad5416..20e38989c256e 100644 --- a/cumulus/client/consensus/common/src/lib.rs +++ b/cumulus/client/consensus/common/src/lib.rs @@ -40,6 +40,7 @@ mod tests; pub use parent_search::*; +pub use cumulus_relay_chain_streams::finalized_heads; pub use parachain_consensus::run_parachain_consensus; use level_monitor::LevelMonitor; diff --git a/cumulus/client/consensus/common/src/parachain_consensus.rs b/cumulus/client/consensus/common/src/parachain_consensus.rs index 8af8e1ef2bc7a..3d959aa9948d2 100644 --- a/cumulus/client/consensus/common/src/parachain_consensus.rs +++ b/cumulus/client/consensus/common/src/parachain_consensus.rs @@ -15,6 +15,7 @@ // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . +use cumulus_relay_chain_streams::{finalized_heads, new_best_heads}; use sc_client_api::{ Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider, }; @@ -25,12 +26,12 @@ use sp_consensus::{BlockOrigin, BlockStatus}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use cumulus_client_pov_recovery::{RecoveryKind, RecoveryRequest}; -use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; +use cumulus_relay_chain_interface::RelayChainInterface; -use polkadot_primitives::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption}; +use polkadot_primitives::Id as ParaId; use codec::Decode; -use futures::{channel::mpsc::Sender, pin_mut, select, FutureExt, Stream, StreamExt}; +use futures::{channel::mpsc::Sender, pin_mut, select, FutureExt, StreamExt}; use std::sync::Arc; @@ -120,7 +121,7 @@ where select! { fin = finalized_heads.next() => { match fin { - Some(finalized_head) => + Some((finalized_head, _)) => handle_new_finalized_head(¶chain, finalized_head, &mut last_seen_finalized_hashes), None => { tracing::debug!(target: LOG_TARGET, "Stopping following finalized head."); @@ -466,43 +467,3 @@ where ); } } - -/// Returns a stream that will yield best heads for the given `para_id`. -async fn new_best_heads( - relay_chain: impl RelayChainInterface + Clone, - para_id: ParaId, -) -> RelayChainResult>> { - let new_best_notification_stream = - relay_chain.new_best_notification_stream().await?.filter_map(move |n| { - let relay_chain = relay_chain.clone(); - async move { parachain_head_at(&relay_chain, n.hash(), para_id).await.ok().flatten() } - }); - - Ok(new_best_notification_stream) -} - -/// Returns a stream that will yield finalized heads for the given `para_id`. -async fn finalized_heads( - relay_chain: impl RelayChainInterface + Clone, - para_id: ParaId, -) -> RelayChainResult>> { - let finality_notification_stream = - relay_chain.finality_notification_stream().await?.filter_map(move |n| { - let relay_chain = relay_chain.clone(); - async move { parachain_head_at(&relay_chain, n.hash(), para_id).await.ok().flatten() } - }); - - Ok(finality_notification_stream) -} - -/// Returns head of the parachain at the given relay chain block. -async fn parachain_head_at( - relay_chain: &impl RelayChainInterface, - at: PHash, - para_id: ParaId, -) -> RelayChainResult>> { - relay_chain - .persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut) - .await - .map(|s| s.map(|s| s.parent_head.0)) -} diff --git a/cumulus/client/consensus/common/src/tests.rs b/cumulus/client/consensus/common/src/tests.rs index 56e7e927d535e..47336ea703150 100644 --- a/cumulus/client/consensus/common/src/tests.rs +++ b/cumulus/client/consensus/common/src/tests.rs @@ -35,7 +35,7 @@ use cumulus_test_client::{ use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder; use futures::{channel::mpsc, executor::block_on, select, FutureExt, Stream, StreamExt}; use futures_timer::Delay; -use polkadot_primitives::HeadData; +use polkadot_primitives::{vstaging::CandidateEvent, HeadData}; use sc_client_api::{Backend as _, UsageProvider}; use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy}; use sp_blockchain::Backend as BlockchainBackend; @@ -289,6 +289,10 @@ impl RelayChainInterface for Relaychain { async fn scheduling_lookahead(&self, _: PHash) -> RelayChainResult { unimplemented!("Not needed for test") } + + async fn candidate_events(&self, _: PHash) -> RelayChainResult> { + unimplemented!("Not needed for test") + } } fn sproof_with_best_parent(client: &Client) -> RelayStateSproofBuilder { diff --git a/cumulus/client/network/src/tests.rs b/cumulus/client/network/src/tests.rs index d3407d46a5cd0..77aaa8c034610 100644 --- a/cumulus/client/network/src/tests.rs +++ b/cumulus/client/network/src/tests.rs @@ -27,7 +27,7 @@ use futures::{executor::block_on, poll, task::Poll, FutureExt, Stream, StreamExt use parking_lot::Mutex; use polkadot_node_primitives::{SignedFullStatement, Statement}; use polkadot_primitives::{ - vstaging::{CommittedCandidateReceiptV2, CoreState}, + vstaging::{CandidateEvent, CommittedCandidateReceiptV2, CoreState}, BlockNumber, CandidateCommitments, CandidateDescriptor, CollatorPair, CommittedCandidateReceipt, Hash as PHash, HeadData, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData, SessionIndex, SigningContext, @@ -352,6 +352,10 @@ impl RelayChainInterface for DummyRelayChainInterface { async fn scheduling_lookahead(&self, _: PHash) -> RelayChainResult { unimplemented!("Not needed for test") } + + async fn candidate_events(&self, _: PHash) -> RelayChainResult> { + unimplemented!("Not needed for test") + } } fn make_validator_and_api() -> ( diff --git a/cumulus/client/pov-recovery/Cargo.toml b/cumulus/client/pov-recovery/Cargo.toml index 6c5095d438057..d0534fc7a6add 100644 --- a/cumulus/client/pov-recovery/Cargo.toml +++ b/cumulus/client/pov-recovery/Cargo.toml @@ -38,6 +38,7 @@ polkadot-primitives = { workspace = true, default-features = true } async-trait = { workspace = true } cumulus-primitives-core = { workspace = true, default-features = true } cumulus-relay-chain-interface = { workspace = true, default-features = true } +cumulus-relay-chain-streams = { workspace = true, default-features = true } [dev-dependencies] assert_matches = { workspace = true } diff --git a/cumulus/client/pov-recovery/src/lib.rs b/cumulus/client/pov-recovery/src/lib.rs index 089ad08367a75..3d093ee05db07 100644 --- a/cumulus/client/pov-recovery/src/lib.rs +++ b/cumulus/client/pov-recovery/src/lib.rs @@ -49,12 +49,11 @@ use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider}; use sc_consensus::import_queue::{ImportQueueService, IncomingBlock}; -use sp_api::RuntimeApiInfo; use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT}; -use polkadot_node_subsystem::messages::{AvailabilityRecoveryMessage, RuntimeApiRequest}; +use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage; use polkadot_overseer::Handle as OverseerHandle; use polkadot_primitives::{ vstaging::{ @@ -65,11 +64,12 @@ use polkadot_primitives::{ }; use cumulus_primitives_core::ParachainBlockData; -use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; +use cumulus_relay_chain_interface::RelayChainInterface; +use cumulus_relay_chain_streams::pending_candidates; use codec::{Decode, DecodeAll}; use futures::{ - channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt, + channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, StreamExt, }; use futures_timer::Delay; use rand::{distributions::Uniform, prelude::Distribution, thread_rng}; @@ -605,7 +605,7 @@ where loop { select! { next_pending_candidates = pending_candidates.next() => { - if let Some((candidates, session_index)) = next_pending_candidates { + if let Some((candidates, session_index, _)) = next_pending_candidates { for candidate in candidates { self.handle_pending_candidate(candidate, session_index); } @@ -661,92 +661,3 @@ where } } } - -/// Returns a stream over pending candidates for the parachain corresponding to `para_id`. -async fn pending_candidates( - relay_chain_client: impl RelayChainInterface + Clone, - para_id: ParaId, - sync_service: Arc, -) -> RelayChainResult, SessionIndex)>> { - let import_notification_stream = relay_chain_client.import_notification_stream().await?; - - let filtered_stream = import_notification_stream.filter_map(move |n| { - let client_for_closure = relay_chain_client.clone(); - let sync_oracle = sync_service.clone(); - async move { - let hash = n.hash(); - if sync_oracle.is_major_syncing() { - tracing::debug!( - target: LOG_TARGET, - relay_hash = ?hash, - "Skipping candidate due to sync.", - ); - return None - } - - let runtime_api_version = client_for_closure - .version(hash) - .await - .map_err(|e| { - tracing::error!( - target: LOG_TARGET, - error = ?e, - "Failed to fetch relay chain runtime version.", - ) - }) - .ok()?; - let parachain_host_runtime_api_version = runtime_api_version - .api_version( - &>::ID, - ) - .unwrap_or_default(); - - // If the relay chain runtime does not support the new runtime API, fallback to the - // deprecated one. - let pending_availability_result = if parachain_host_runtime_api_version < - RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT - { - #[allow(deprecated)] - client_for_closure - .candidate_pending_availability(hash, para_id) - .await - .map_err(|e| { - tracing::error!( - target: LOG_TARGET, - error = ?e, - "Failed to fetch pending candidates.", - ) - }) - .map(|candidate| candidate.into_iter().collect::>()) - } else { - client_for_closure.candidates_pending_availability(hash, para_id).await.map_err( - |e| { - tracing::error!( - target: LOG_TARGET, - error = ?e, - "Failed to fetch pending candidates.", - ) - }, - ) - }; - - let session_index_result = - client_for_closure.session_index_for_child(hash).await.map_err(|e| { - tracing::error!( - target: LOG_TARGET, - error = ?e, - "Failed to fetch session index.", - ) - }); - - if let Ok(candidates) = pending_availability_result { - session_index_result.map(|session_index| (candidates, session_index)).ok() - } else { - None - } - } - }); - Ok(filtered_stream) -} diff --git a/cumulus/client/pov-recovery/src/tests.rs b/cumulus/client/pov-recovery/src/tests.rs index e6f2e2e2e34e6..c3be39d7f0fe4 100644 --- a/cumulus/client/pov-recovery/src/tests.rs +++ b/cumulus/client/pov-recovery/src/tests.rs @@ -23,15 +23,19 @@ use cumulus_primitives_core::relay_chain::{ }; use cumulus_relay_chain_interface::{ InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, PHash, PHeader, - PersistedValidationData, StorageValue, ValidationCodeHash, ValidatorId, + PersistedValidationData, RelayChainResult, StorageValue, ValidationCodeHash, ValidatorId, }; use cumulus_test_client::{ runtime::{Block, Header}, Sr25519Keyring, }; -use futures::{channel::mpsc, SinkExt}; +use futures::{channel::mpsc, SinkExt, Stream}; use polkadot_node_primitives::AvailableData; -use polkadot_node_subsystem::{messages::AvailabilityRecoveryMessage, RecoveryError, TimeoutExt}; +use polkadot_node_subsystem::{ + messages::{AvailabilityRecoveryMessage, RuntimeApiRequest}, + RecoveryError, TimeoutExt, +}; +use polkadot_primitives::vstaging::CandidateEvent; use rstest::rstest; use sc_client_api::{ BlockImportNotification, ClientInfo, CompactProof, FinalityNotification, FinalityNotifications, @@ -39,6 +43,7 @@ use sc_client_api::{ }; use sc_consensus::import_queue::RuntimeOrigin; use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender}; +use sp_api::RuntimeApiInfo; use sp_blockchain::Info; use sp_runtime::{generic::SignedBlock, Justifications}; use sp_version::RuntimeVersion; @@ -508,6 +513,10 @@ impl RelayChainInterface for Relaychain { async fn scheduling_lookahead(&self, _: PHash) -> RelayChainResult { unimplemented!("Not needed for test") } + + async fn candidate_events(&self, _: PHash) -> RelayChainResult> { + unimplemented!("Not needed for test"); + } } fn make_candidate_chain(candidate_number_range: Range) -> Vec { diff --git a/cumulus/client/relay-chain-inprocess-interface/Cargo.toml b/cumulus/client/relay-chain-inprocess-interface/Cargo.toml index 9dcef93753e09..b82a90a8840e0 100644 --- a/cumulus/client/relay-chain-inprocess-interface/Cargo.toml +++ b/cumulus/client/relay-chain-inprocess-interface/Cargo.toml @@ -32,6 +32,7 @@ sp-state-machine = { workspace = true, default-features = true } # Polkadot polkadot-cli = { features = ["cli"], workspace = true } +polkadot-primitives = { workspace = true, default-features = true } polkadot-service = { workspace = true, default-features = true } # Cumulus diff --git a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs index 61563e1029ee0..b5fd038c0f60a 100644 --- a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs +++ b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs @@ -35,6 +35,7 @@ use cumulus_primitives_core::{ }; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; use futures::{FutureExt, Stream, StreamExt}; +use polkadot_primitives::vstaging::CandidateEvent; use polkadot_service::{ builder::PolkadotServiceBuilder, CollatorOverseerGen, CollatorPair, Configuration, FullBackend, FullClient, Handle, NewFull, NewFullParams, TaskManager, @@ -328,6 +329,10 @@ impl RelayChainInterface for RelayChainInProcessInterface { async fn scheduling_lookahead(&self, hash: PHash) -> RelayChainResult { Ok(self.full_client.runtime_api().scheduling_lookahead(hash)?) } + + async fn candidate_events(&self, hash: PHash) -> RelayChainResult> { + Ok(self.full_client.runtime_api().candidate_events(hash)?) + } } pub enum BlockCheckStatus { diff --git a/cumulus/client/relay-chain-interface/src/lib.rs b/cumulus/client/relay-chain-interface/src/lib.rs index 3623672c6ee61..72ce38f1a18ef 100644 --- a/cumulus/client/relay-chain-interface/src/lib.rs +++ b/cumulus/client/relay-chain-interface/src/lib.rs @@ -31,7 +31,7 @@ use codec::{Decode, Encode, Error as CodecError}; use jsonrpsee_core::ClientError as JsonRpcError; use sp_api::ApiError; -use cumulus_primitives_core::relay_chain::{BlockId, Hash as RelayHash}; +use cumulus_primitives_core::relay_chain::{vstaging::CandidateEvent, BlockId, Hash as RelayHash}; pub use cumulus_primitives_core::{ relay_chain::{ vstaging::{CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreState}, @@ -248,6 +248,8 @@ pub trait RelayChainInterface: Send + Sync { /// Fetch the scheduling lookahead value. async fn scheduling_lookahead(&self, relay_parent: PHash) -> RelayChainResult; + + async fn candidate_events(&self, at: RelayHash) -> RelayChainResult>; } #[async_trait] @@ -406,6 +408,10 @@ where async fn scheduling_lookahead(&self, relay_parent: PHash) -> RelayChainResult { (**self).scheduling_lookahead(relay_parent).await } + + async fn candidate_events(&self, at: RelayHash) -> RelayChainResult> { + (**self).candidate_events(at).await + } } /// Helper function to call an arbitrary runtime API using a `RelayChainInterface` client. diff --git a/cumulus/client/relay-chain-rpc-interface/src/lib.rs b/cumulus/client/relay-chain-rpc-interface/src/lib.rs index 15fc8024e0b52..2597cd393bc79 100644 --- a/cumulus/client/relay-chain-rpc-interface/src/lib.rs +++ b/cumulus/client/relay-chain-rpc-interface/src/lib.rs @@ -19,9 +19,9 @@ use async_trait::async_trait; use core::time::Duration; use cumulus_primitives_core::{ relay_chain::{ - vstaging::CommittedCandidateReceiptV2 as CommittedCandidateReceipt, Hash as RelayHash, - Header as RelayHeader, InboundHrmpMessage, OccupiedCoreAssumption, SessionIndex, - ValidationCodeHash, ValidatorId, + vstaging::{CandidateEvent, CommittedCandidateReceiptV2 as CommittedCandidateReceipt}, + Hash as RelayHash, Header as RelayHeader, InboundHrmpMessage, OccupiedCoreAssumption, + SessionIndex, ValidationCodeHash, ValidatorId, }, InboundDownwardMessage, ParaId, PersistedValidationData, }; @@ -287,4 +287,11 @@ impl RelayChainInterface for RelayChainRpcInterface { async fn scheduling_lookahead(&self, relay_parent: RelayHash) -> RelayChainResult { self.rpc_client.parachain_host_scheduling_lookahead(relay_parent).await } + + async fn candidate_events( + &self, + relay_parent: RelayHash, + ) -> RelayChainResult> { + self.rpc_client.parachain_host_candidate_events(relay_parent).await + } } diff --git a/cumulus/client/relay-chain-streams/Cargo.toml b/cumulus/client/relay-chain-streams/Cargo.toml new file mode 100644 index 0000000000000..b6a441da4091e --- /dev/null +++ b/cumulus/client/relay-chain-streams/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "cumulus-relay-chain-streams" +version = "0.7.0" +authors.workspace = true +description = "Cumulus client common relay chain streams." +edition.workspace = true +license = "GPL-3.0-or-later WITH Classpath-exception-2.0" +homepage.workspace = true +repository.workspace = true + +[lints] +workspace = true + +[dependencies] +futures = { workspace = true } +tracing = { workspace = true, default-features = true } + +# Substrate +sp-api = { workspace = true, default-features = true } +sp-consensus = { workspace = true, default-features = true } + +# Polkadot +polkadot-node-subsystem = { workspace = true, default-features = true } +polkadot-primitives = { workspace = true, default-features = true } + +# Cumulus +cumulus-relay-chain-interface = { workspace = true, default-features = true } diff --git a/cumulus/client/relay-chain-streams/src/lib.rs b/cumulus/client/relay-chain-streams/src/lib.rs new file mode 100644 index 0000000000000..19ee3ff931e24 --- /dev/null +++ b/cumulus/client/relay-chain-streams/src/lib.rs @@ -0,0 +1,167 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +//! Common utilities for transforming relay chain streams. + +use std::sync::Arc; + +use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; +use futures::{Stream, StreamExt}; +use polkadot_node_subsystem::messages::RuntimeApiRequest; +use polkadot_primitives::{ + vstaging::CommittedCandidateReceiptV2 as CommittedCandidateReceipt, Hash as PHash, + Id as ParaId, OccupiedCoreAssumption, SessionIndex, +}; +use sp_api::RuntimeApiInfo; +use sp_consensus::SyncOracle; + +const LOG_TARGET: &str = "cumulus-relay-chain-streams"; + +pub type RelayHeader = polkadot_primitives::Header; + +/// Returns a stream over pending candidates for the parachain corresponding to `para_id`. +pub async fn pending_candidates( + relay_chain_client: impl RelayChainInterface + Clone, + para_id: ParaId, + sync_service: Arc, +) -> RelayChainResult, SessionIndex, RelayHeader)>> +{ + let import_notification_stream = relay_chain_client.import_notification_stream().await?; + + let filtered_stream = import_notification_stream.filter_map(move |n| { + let client = relay_chain_client.clone(); + let sync_oracle = sync_service.clone(); + async move { + let hash = n.hash(); + if sync_oracle.is_major_syncing() { + tracing::debug!( + target: LOG_TARGET, + relay_hash = ?hash, + "Skipping candidate due to sync.", + ); + return None + } + + let runtime_api_version = client + .version(hash) + .await + .map_err(|e| { + tracing::error!( + target: LOG_TARGET, + error = ?e, + "Failed to fetch relay chain runtime version.", + ) + }) + .ok()?; + let parachain_host_runtime_api_version = runtime_api_version + .api_version( + &>::ID, + ) + .unwrap_or_default(); + + // If the relay chain runtime does not support the new runtime API, fallback to the + // deprecated one. + let pending_availability_result = if parachain_host_runtime_api_version < + RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT + { + #[allow(deprecated)] + client + .candidate_pending_availability(hash, para_id) + .await + .map_err(|e| { + tracing::error!( + target: LOG_TARGET, + error = ?e, + "Failed to fetch pending candidates.", + ) + }) + .map(|candidate| candidate.into_iter().collect::>()) + } else { + client.candidates_pending_availability(hash, para_id).await.map_err(|e| { + tracing::error!( + target: LOG_TARGET, + error = ?e, + "Failed to fetch pending candidates.", + ) + }) + }; + + let session_index_result = client.session_index_for_child(hash).await.map_err(|e| { + tracing::error!( + target: LOG_TARGET, + error = ?e, + "Failed to fetch session index.", + ) + }); + + if let Ok(candidates) = pending_availability_result { + session_index_result.map(|session_index| (candidates, session_index, n)).ok() + } else { + None + } + } + }); + Ok(filtered_stream) +} + +/// Returns a stream that will yield best heads for the given `para_id`. +pub async fn new_best_heads( + relay_chain: impl RelayChainInterface + Clone, + para_id: ParaId, +) -> RelayChainResult>> { + let new_best_notification_stream = + relay_chain.new_best_notification_stream().await?.filter_map(move |n| { + let relay_chain = relay_chain.clone(); + async move { parachain_head_at(&relay_chain, n.hash(), para_id).await.ok().flatten() } + }); + + Ok(new_best_notification_stream) +} + +/// Returns a stream that will yield finalized heads for the given `para_id`. +pub async fn finalized_heads( + relay_chain: impl RelayChainInterface + Clone, + para_id: ParaId, +) -> RelayChainResult, RelayHeader)>> { + let finality_notification_stream = + relay_chain.finality_notification_stream().await?.filter_map(move |n| { + let relay_chain = relay_chain.clone(); + async move { + parachain_head_at(&relay_chain, n.hash(), para_id) + .await + .ok() + .flatten() + .map(|h| (h, n)) + } + }); + + Ok(finality_notification_stream) +} + +/// Returns head of the parachain at the given relay chain block. +async fn parachain_head_at( + relay_chain: &impl RelayChainInterface, + at: PHash, + para_id: ParaId, +) -> RelayChainResult>> { + relay_chain + .persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut) + .await + .map(|s| s.map(|s| s.parent_head.0)) +} diff --git a/cumulus/client/service/Cargo.toml b/cumulus/client/service/Cargo.toml index ef1fa88484501..3ea36d70b42b6 100644 --- a/cumulus/client/service/Cargo.toml +++ b/cumulus/client/service/Cargo.toml @@ -14,6 +14,7 @@ workspace = true [dependencies] async-channel = { workspace = true } futures = { workspace = true } +prometheus = { workspace = true } # Substrate sc-client-api = { workspace = true, default-features = true } @@ -49,3 +50,4 @@ cumulus-primitives-proof-size-hostfunction = { workspace = true, default-feature cumulus-relay-chain-inprocess-interface = { workspace = true, default-features = true } cumulus-relay-chain-interface = { workspace = true, default-features = true } cumulus-relay-chain-minimal-node = { workspace = true, default-features = true } +cumulus-relay-chain-streams = { workspace = true, default-features = true } diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs index 52573d61d4f70..431b8e9401d79 100644 --- a/cumulus/client/service/src/lib.rs +++ b/cumulus/client/service/src/lib.rs @@ -24,13 +24,15 @@ use cumulus_client_consensus_common::ParachainConsensus; use cumulus_client_network::{AssumeSybilResistance, RequireSecondedInBlockAnnounce}; use cumulus_client_pov_recovery::{PoVRecovery, RecoveryDelayRange, RecoveryHandle}; use cumulus_primitives_core::{CollectCollationInfo, ParaId}; +pub use cumulus_primitives_proof_size_hostfunction::storage_proof_size; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use cumulus_relay_chain_minimal_node::{ build_minimal_relay_chain_node_light_client, build_minimal_relay_chain_node_with_rpc, }; use futures::{channel::mpsc, StreamExt}; -use polkadot_primitives::{CollatorPair, OccupiedCoreAssumption}; +use polkadot_primitives::{vstaging::CandidateEvent, CollatorPair, OccupiedCoreAssumption}; +use prometheus::{Histogram, HistogramOpts, Registry}; use sc_client_api::{ Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, ProofProvider, UsageProvider, }; @@ -50,10 +52,14 @@ use sc_utils::mpsc::TracingUnboundedSender; use sp_api::ProvideRuntimeApi; use sp_blockchain::{HeaderBackend, HeaderMetadata}; use sp_core::{traits::SpawnNamed, Decode}; -use sp_runtime::traits::{Block as BlockT, BlockIdTo, Header}; -use std::{sync::Arc, time::Duration}; - -pub use cumulus_primitives_proof_size_hostfunction::storage_proof_size; +use sp_runtime::{ + traits::{Block as BlockT, BlockIdTo, Header}, + SaturatedConversion, Saturating, +}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; /// Host functions that should be used in parachain nodes. /// @@ -96,6 +102,7 @@ pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawn pub relay_chain_slot_duration: Duration, pub recovery_handle: Box, pub sync_service: Arc>, + pub prometheus_registry: Option<&'a Registry>, } /// Parameters given to [`start_relay_chain_tasks`]. @@ -110,6 +117,7 @@ pub struct StartRelayChainTasksParams<'a, Block: BlockT, Client, RCInterface> { pub relay_chain_slot_duration: Duration, pub recovery_handle: Box, pub sync_service: Arc>, + pub prometheus_registry: Option<&'a Registry>, } /// Parameters given to [`start_full_node`]. @@ -123,6 +131,7 @@ pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface> { pub import_queue: Box>, pub recovery_handle: Box, pub sync_service: Arc>, + pub prometheus_registry: Option<&'a Registry>, } /// Start a collator node for a parachain. @@ -146,6 +155,7 @@ pub async fn start_collator<'a, Block, BS, Client, Backend, RCInterface, Spawner relay_chain_slot_duration, recovery_handle, sync_service, + prometheus_registry, }: StartCollatorParams<'a, Block, BS, Client, RCInterface, Spawner>, ) -> sc_service::error::Result<()> where @@ -181,6 +191,7 @@ where relay_chain_slot_duration, recovery_handle, sync_service, + prometheus_registry, })?; #[allow(deprecated)] @@ -206,6 +217,8 @@ where /// arrive via the normal p2p layer (i.e. when authors withhold their blocks deliberately). /// /// This function spawns work for those side tasks. +/// +/// It also spawns a parachain informant task that will log the relay chain state and some metrics. pub fn start_relay_chain_tasks( StartRelayChainTasksParams { client, @@ -218,12 +231,14 @@ pub fn start_relay_chain_tasks( relay_chain_slot_duration, recovery_handle, sync_service, + prometheus_registry, }: StartRelayChainTasksParams, ) -> sc_service::error::Result<()> where Block: BlockT, Client: Finalizer + UsageProvider + + HeaderBackend + Send + Sync + BlockBackend @@ -279,13 +294,22 @@ where relay_chain_interface.clone(), para_id, recovery_chan_rx, - sync_service, + sync_service.clone(), ); task_manager .spawn_essential_handle() .spawn("cumulus-pov-recovery", None, pov_recovery.run()); + let parachain_informant = parachain_informant::( + relay_chain_interface.clone(), + client.clone(), + prometheus_registry.map(ParachainInformantMetrics::new).transpose()?, + ); + task_manager + .spawn_handle() + .spawn("parachain-informant", None, parachain_informant); + Ok(()) } @@ -305,12 +329,14 @@ pub fn start_full_node( import_queue, recovery_handle, sync_service, + prometheus_registry, }: StartFullNodeParams, ) -> sc_service::error::Result<()> where Block: BlockT, Client: Finalizer + UsageProvider + + HeaderBackend + Send + Sync + BlockBackend @@ -331,6 +357,7 @@ where recovery_handle, sync_service, da_recovery_profile: DARecoveryProfile::FullNode, + prometheus_registry, }) } @@ -580,3 +607,148 @@ where Err("Stopping following imported blocks. Could not determine parachain target block".into()) } + +/// Task for logging candidate events and some related metrics. +async fn parachain_informant( + relay_chain_interface: impl RelayChainInterface + Clone, + client: Arc, + metrics: Option, +) where + Client: HeaderBackend + Send + Sync + 'static, +{ + let mut import_notifications = match relay_chain_interface.import_notification_stream().await { + Ok(import_notifications) => import_notifications, + Err(e) => { + log::error!("Failed to get import notification stream: {e:?}. Parachain informant will not run!"); + return + }, + }; + let mut last_backed_block_time: Option = None; + while let Some(n) = import_notifications.next().await { + let candidate_events = match relay_chain_interface.candidate_events(n.hash()).await { + Ok(candidate_events) => candidate_events, + Err(e) => { + log::warn!("Failed to get candidate events for block {}: {e:?}", n.hash()); + continue + }, + }; + let mut backed_candidates = Vec::new(); + let mut included_candidates = Vec::new(); + let mut timed_out_candidates = Vec::new(); + for event in candidate_events { + match event { + CandidateEvent::CandidateBacked(_, head, _, _) => { + let backed_block = match Block::Header::decode(&mut &head.0[..]) { + Ok(header) => header, + Err(e) => { + log::warn!( + "Failed to decode parachain header from backed block: {e:?}" + ); + continue + }, + }; + let backed_block_time = Instant::now(); + if let Some(last_backed_block_time) = &last_backed_block_time { + let duration = backed_block_time.duration_since(*last_backed_block_time); + if let Some(metrics) = &metrics { + metrics.parachain_block_backed_duration.observe(duration.as_secs_f64()); + } + } + last_backed_block_time = Some(backed_block_time); + backed_candidates.push(backed_block); + }, + CandidateEvent::CandidateIncluded(_, head, _, _) => { + let included_block = match Block::Header::decode(&mut &head.0[..]) { + Ok(header) => header, + Err(e) => { + log::warn!( + "Failed to decode parachain header from included block: {e:?}" + ); + continue + }, + }; + let unincluded_segment_size = + client.info().best_number.saturating_sub(*included_block.number()); + let unincluded_segment_size: u32 = unincluded_segment_size.saturated_into(); + if let Some(metrics) = &metrics { + metrics.unincluded_segment_size.observe(unincluded_segment_size.into()); + } + included_candidates.push(included_block); + }, + CandidateEvent::CandidateTimedOut(_, head, _) => { + let timed_out_block = match Block::Header::decode(&mut &head.0[..]) { + Ok(header) => header, + Err(e) => { + log::warn!( + "Failed to decode parachain header from timed out block: {e:?}" + ); + continue + }, + }; + timed_out_candidates.push(timed_out_block); + }, + } + } + let mut log_parts = Vec::new(); + if !backed_candidates.is_empty() { + let backed_candidates = backed_candidates + .into_iter() + .map(|c| format!("#{} ({})", c.number(), c.hash())) + .collect::>() + .join(", "); + log_parts.push(format!("backed: {}", backed_candidates)); + }; + if !included_candidates.is_empty() { + let included_candidates = included_candidates + .into_iter() + .map(|c| format!("#{} ({})", c.number(), c.hash())) + .collect::>() + .join(", "); + log_parts.push(format!("included: {}", included_candidates)); + }; + if !timed_out_candidates.is_empty() { + let timed_out_candidates = timed_out_candidates + .into_iter() + .map(|c| format!("#{} ({})", c.number(), c.hash())) + .collect::>() + .join(", "); + log_parts.push(format!("timed out: {}", timed_out_candidates)); + }; + if !log_parts.is_empty() { + log::info!( + "Update at relay chain block #{} ({}) - {}", + n.number(), + n.hash(), + log_parts.join(", ") + ); + } + } +} + +struct ParachainInformantMetrics { + /// Time between parachain blocks getting backed by the relaychain. + parachain_block_backed_duration: Histogram, + /// Number of blocks between best block and last included block. + unincluded_segment_size: Histogram, +} + +impl ParachainInformantMetrics { + fn new(prometheus_registry: &Registry) -> prometheus::Result { + let parachain_block_authorship_duration = Histogram::with_opts(HistogramOpts::new( + "parachain_block_backed_duration", + "Time between parachain blocks getting backed by the relaychain", + ))?; + prometheus_registry.register(Box::new(parachain_block_authorship_duration.clone()))?; + + let unincluded_segment_size = Histogram::with_opts(HistogramOpts::new( + "parachain_unincluded_segment_size", + "Number of blocks between best block and last included block", + ))?; + prometheus_registry.register(Box::new(unincluded_segment_size.clone()))?; + + Ok(Self { + parachain_block_backed_duration: parachain_block_authorship_duration, + unincluded_segment_size, + }) + } +} diff --git a/cumulus/polkadot-omni-node/lib/src/common/spec.rs b/cumulus/polkadot-omni-node/lib/src/common/spec.rs index 915098a94762f..66aebe6d2a13a 100644 --- a/cumulus/polkadot-omni-node/lib/src/common/spec.rs +++ b/cumulus/polkadot-omni-node/lib/src/common/spec.rs @@ -404,6 +404,7 @@ pub(crate) trait NodeSpec: BaseNodeSpec { relay_chain_slot_duration, recovery_handle: Box::new(overseer_handle.clone()), sync_service, + prometheus_registry: prometheus_registry.as_ref(), })?; start_bootnode_tasks(StartBootnodeTasksParams { diff --git a/cumulus/test/service/src/lib.rs b/cumulus/test/service/src/lib.rs index 83866b779f236..7d69f3fc8dabe 100644 --- a/cumulus/test/service/src/lib.rs +++ b/cumulus/test/service/src/lib.rs @@ -447,6 +447,7 @@ where relay_chain_slot_duration, recovery_handle, sync_service: sync_service.clone(), + prometheus_registry: None, })?; if let Some(collator_key) = collator_key { diff --git a/prdoc/pr_8332.prdoc b/prdoc/pr_8332.prdoc new file mode 100644 index 0000000000000..38837474b7d95 --- /dev/null +++ b/prdoc/pr_8332.prdoc @@ -0,0 +1,27 @@ +title: parachain informant +doc: +- audience: Node Operator + description: Closes https://github.com/paritytech/polkadot-sdk/issues/8216. Adds parachain_block_authorship_duration and parachain_unincluded_segment_size parachain metrics. +- audience: Node Dev + description: Adds a new `cumulus-relay-chain-streams` crate to make it easier to reuse relay chain streams. +crates: +- name: cumulus-client-consensus-common + bump: minor +- name: cumulus-client-pov-recovery + bump: minor +- name: polkadot-omni-node-lib + bump: patch +- name: cumulus-relay-chain-streams + bump: none +- name: cumulus-client-service + bump: major +- name: cumulus-relay-chain-inprocess-interface + bump: minor +- name: cumulus-relay-chain-interface + bump: major +- name: cumulus-relay-chain-rpc-interface + bump: minor +- name: cumulus-client-network + bump: none +- name: polkadot-sdk + bump: minor diff --git a/templates/parachain/node/src/service.rs b/templates/parachain/node/src/service.rs index 90b9491b25b9d..ef8ba1a2dde34 100644 --- a/templates/parachain/node/src/service.rs +++ b/templates/parachain/node/src/service.rs @@ -393,6 +393,7 @@ pub async fn start_parachain_node( relay_chain_slot_duration, recovery_handle: Box::new(overseer_handle.clone()), sync_service: sync_service.clone(), + prometheus_registry: prometheus_registry.as_ref(), })?; start_bootnode_tasks(StartBootnodeTasksParams { diff --git a/umbrella/Cargo.toml b/umbrella/Cargo.toml index 3967d80a363f2..80a63a6888cc8 100644 --- a/umbrella/Cargo.toml +++ b/umbrella/Cargo.toml @@ -839,6 +839,7 @@ node = [ "cumulus-relay-chain-interface", "cumulus-relay-chain-minimal-node", "cumulus-relay-chain-rpc-interface", + "cumulus-relay-chain-streams", "cumulus-test-relay-sproof-builder", "emulated-integration-tests-common", "fork-tree", @@ -2193,6 +2194,11 @@ default-features = false optional = true path = "../cumulus/client/relay-chain-rpc-interface" +[dependencies.cumulus-relay-chain-streams] +default-features = false +optional = true +path = "../cumulus/client/relay-chain-streams" + [dependencies.cumulus-test-relay-sproof-builder] default-features = false optional = true diff --git a/umbrella/src/lib.rs b/umbrella/src/lib.rs index 05a8a4f138eb2..831a521fc68bf 100644 --- a/umbrella/src/lib.rs +++ b/umbrella/src/lib.rs @@ -202,6 +202,10 @@ pub use cumulus_relay_chain_minimal_node; #[cfg(feature = "cumulus-relay-chain-rpc-interface")] pub use cumulus_relay_chain_rpc_interface; +/// Cumulus client common relay chain streams. +#[cfg(feature = "cumulus-relay-chain-streams")] +pub use cumulus_relay_chain_streams; + /// Mocked relay state proof builder for testing Cumulus. #[cfg(feature = "cumulus-test-relay-sproof-builder")] pub use cumulus_test_relay_sproof_builder;