Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
841c96c
cumulus/client: Add docs for candidate events
lexnv Jun 16, 2025
593a50a
rpc-v2/tx: Add transaction monitor hook into tx lifecycle
lexnv Jun 16, 2025
0f4edd1
rpc-v2/tx: Return the tx monitor while building the RPC
lexnv Jun 16, 2025
7be02d4
rpc-v2/tx: Send monitorig events to the higher levels
lexnv Jun 17, 2025
d19f828
rpc-v2/tx: Expose tx structs and events
lexnv Jun 17, 2025
614927e
client/service: Extract data out of spawning RPC servers
lexnv Jun 17, 2025
5d809da
rpc-v2/tx: Implement stream for the monitor handle
lexnv Jun 18, 2025
17f76eb
client/service: Add lru dependency
lexnv Jun 18, 2025
dac91df
client/service: Report parachain rpc metrics in backed blocks
lexnv Jun 18, 2025
675e443
client/service: Do not spawn on empty monitoring handles
lexnv Jun 18, 2025
da186b7
cumulus: Initialize relay chain tasks with rpc handlers
lexnv Jun 18, 2025
6b84f36
rpc-v2/tx: Change log group and add trace logs
lexnv Jun 18, 2025
364b65f
client/service: Add trace logs
lexnv Jun 18, 2025
9103fdf
client/service: Adjust bucket metrics times
lexnv Jun 18, 2025
6c0c302
cumulus/service: Move parainformant to dedicated module
lexnv Jun 19, 2025
3dba6bb
cumulus/service: Move fn to struct based impl
lexnv Jun 19, 2025
43d0f5f
cumulus/service: Merge rpc metrics v2 into parainformant
lexnv Jun 19, 2025
3fb0778
cumulus/service: Break into smaller fns
lexnv Jun 19, 2025
6b8779c
cumulus/service: Add dedicated log target and docs
lexnv Jun 19, 2025
8b424fd
rpc-v2/tx: Add tests for the monitoring handle
lexnv Jun 19, 2025
6bed637
Adjust to new SpawnResult
lexnv Jun 19, 2025
954cf29
frame/revive: Use proper server handle to keep alive
lexnv Jun 19, 2025
26453ce
rpc-v2/tx: Apply cargo clippy
lexnv Jun 19, 2025
855617f
Update from github-actions[bot] running command 'prdoc --audience nod…
github-actions[bot] Jun 20, 2025
750b547
rpc-v2/tx: Make rpc initialization more generic to use only hasher
lexnv Jun 20, 2025
780443b
Merge remote-tracking branch 'origin/master' into lexnv/cumulus-to-rp…
lexnv Oct 6, 2025
6b5b130
cumulus/informant: Add constant and types for tx stream
lexnv Oct 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cumulus/client/relay-chain-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ pub trait RelayChainInterface: Send + Sync {
/// Fetch the scheduling lookahead value.
async fn scheduling_lookahead(&self, relay_parent: PHash) -> RelayChainResult<u32>;

/// Fetch the candidate events for the given relay chain block.
async fn candidate_events(&self, at: RelayHash) -> RelayChainResult<Vec<CandidateEvent>>;
}

Expand Down
1 change: 1 addition & 0 deletions cumulus/client/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ workspace = true
async-channel = { workspace = true }
futures = { workspace = true }
prometheus = { workspace = true }
schnellru = { workspace = true }

# Substrate
sc-client-api = { workspace = true, default-features = true }
Expand Down
148 changes: 145 additions & 3 deletions cumulus/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ use cumulus_client_pov_recovery::{PoVRecovery, RecoveryDelayRange, RecoveryHandl
use cumulus_primitives_core::{CollectCollationInfo, ParaId};
pub use cumulus_primitives_proof_size_hostfunction::storage_proof_size;
use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain;
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
use cumulus_relay_chain_minimal_node::{
build_minimal_relay_chain_node_light_client, build_minimal_relay_chain_node_with_rpc,
};
use futures::{channel::mpsc, StreamExt};
use futures::{channel::mpsc, FutureExt, StreamExt};
use polkadot_primitives::{vstaging::CandidateEvent, CollatorPair, OccupiedCoreAssumption};
use prometheus::{Histogram, HistogramOpts, Registry};
use prometheus::{linear_buckets, Histogram, HistogramOpts, Registry};
use sc_client_api::{
Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, ProofProvider, UsageProvider,
};
Expand All @@ -49,6 +49,7 @@ use sc_network_transactions::TransactionsHandlerController;
use sc_service::{Configuration, SpawnTaskHandle, TaskManager, WarpSyncConfig};
use sc_telemetry::{log, TelemetryWorkerHandle};
use sc_utils::mpsc::TracingUnboundedSender;
use schnellru::{ByLength, LruMap};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_core::{traits::SpawnNamed, Decode};
Expand Down Expand Up @@ -118,6 +119,7 @@ pub struct StartRelayChainTasksParams<'a, Block: BlockT, Client, RCInterface> {
pub recovery_handle: Box<dyn RecoveryHandle>,
pub sync_service: Arc<SyncingService<Block>>,
pub prometheus_registry: Option<&'a Registry>,
pub rpc_transaction_v2_handles: Vec<sc_service::TransactionMonitorHandle<Block::Hash>>,
}

/// Parameters given to [`start_full_node`].
Expand Down Expand Up @@ -192,6 +194,7 @@ where
recovery_handle,
sync_service,
prometheus_registry,
rpc_transaction_v2_handles: vec![],
})?;

#[allow(deprecated)]
Expand Down Expand Up @@ -232,6 +235,7 @@ pub fn start_relay_chain_tasks<Block, Client, Backend, RCInterface>(
recovery_handle,
sync_service,
prometheus_registry,
rpc_transaction_v2_handles,
}: StartRelayChainTasksParams<Block, Client, RCInterface>,
) -> sc_service::error::Result<()>
where
Expand Down Expand Up @@ -310,6 +314,21 @@ where
.spawn_handle()
.spawn("parachain-informant", None, parachain_informant);

if !rpc_transaction_v2_handles.is_empty() {
let parachain_rpc_metrics = parachain_rpc_metrics::<Block>(
relay_chain_interface.clone(),
rpc_transaction_v2_handles,
para_id,
prometheus_registry.map(ParachainRpcMetrics::new).transpose()?,
);
task_manager.spawn_handle().spawn("parachain-rpc-metrics", None, async move {
let result = parachain_rpc_metrics.await;
if result.is_err() {
log::error!(target: LOG_TARGET_SYNC, "Parachain RPC metrics stopped: {:?}", result);
}
});
}

Ok(())
}

Expand Down Expand Up @@ -358,6 +377,7 @@ where
sync_service,
da_recovery_profile: DARecoveryProfile::FullNode,
prometheus_registry,
rpc_transaction_v2_handles: vec![],
})
}

Expand Down Expand Up @@ -755,3 +775,125 @@ impl ParachainInformantMetrics {
})
}
}

/// Metrics for the parachain RPC.
struct ParachainRpcMetrics {
/// Time between the submission of a transaction and its inclusion in a backed block.
transaction_backed_duration: Histogram,
}

impl ParachainRpcMetrics {
fn new(prometheus_registry: &Registry) -> prometheus::Result<Self> {
let transaction_backed_duration = Histogram::with_opts(
HistogramOpts::new(
"parachain_transaction_backed_duration",
"Time between the submission of a transaction and its inclusion in a backed block",
)
.buckets(linear_buckets(0.01, 40.0, 20).expect("Valid buckets; qed")),
)?;
prometheus_registry.register(Box::new(transaction_backed_duration.clone()))?;

Ok(Self { transaction_backed_duration })
}
}

/// Task for monitoring the parachain RPC metrics.
async fn parachain_rpc_metrics<Block: BlockT>(
relay_chain_interface: impl RelayChainInterface + Clone,
rpc_transaction_v2_handles: Vec<sc_service::TransactionMonitorHandle<Block::Hash>>,
para_id: ParaId,
metrics: Option<ParachainRpcMetrics>,
) -> Result<(), RelayChainError> {
const LOG_TARGET: &str = "parachain_rpc_metrics";

let mut backed_blocks: LruMap<sp_core::H256, ()> = LruMap::new(ByLength::new(64));
let mut unresolved_tx: LruMap<sp_core::H256, Vec<Instant>> = LruMap::new(ByLength::new(64));

/// Convert the substrate block hash to a H256 hash without panics.
fn convert_to_h256<Block: BlockT>(
hash: &Block::Hash,
) -> Result<sp_core::H256, RelayChainError> {
if hash.as_ref().len() != 32 {
return Err(RelayChainError::GenericError(format!(
"Expected hash to be 32 bytes, got {} bytes",
hash.as_ref().len()
)));
}

Ok(sp_core::H256::from_slice(hash.as_ref()))
}

let mut import_notification_stream =
relay_chain_interface.import_notification_stream().await.inspect_err(|err| {
log::error!(
target: LOG_TARGET,
"Failed to get backed block stream: {err:?}"
);
})?;

let mut transaction_v2_handle =
Box::pin(futures::stream::select_all(rpc_transaction_v2_handles));

loop {
futures::select! {
notification = import_notification_stream.next().fuse() => {
let Some(notification) = notification else { return Ok(()) };

log::debug!(target: LOG_TARGET, "Received import notification for block {:?}", notification.hash());

let Ok(events) = relay_chain_interface.candidate_events(notification.hash()).await else {
log::warn!(target: LOG_TARGET, "Failed to get candidate events for block {}", notification.hash());
continue
};

let blocks = events.into_iter().filter_map(|event| match event {
CandidateEvent::CandidateBacked(receipt, ..)
if receipt.descriptor.para_id() == para_id => {
Some(receipt.descriptor.para_head())
}
_ => None,
});

log::trace!(target: LOG_TARGET, "Relay block {:?} with backed blocks: {:?}", notification.hash(), blocks);

for block in blocks {
if backed_blocks.insert(block, ()) {
log::debug!(target: LOG_TARGET, "New backed block: {:?}", block);
}

if let Some(tx_times) = unresolved_tx.remove(&block) {
for submitted_at in tx_times {
if let Some(metrics) = &metrics {
metrics.transaction_backed_duration.observe(submitted_at.elapsed().as_secs_f64());
}
}
}
}
},

tx_event = transaction_v2_handle.next().fuse() => {
let Some(tx_event) = tx_event else { return Ok(()) };

log::debug!(target: LOG_TARGET, "Received transaction event: {:?}", tx_event);

match tx_event {
sc_service::TransactionMonitorEvent::InBlock { block_hash, submitted_at } => {
let block_hash = convert_to_h256::<Block>(&block_hash)?;

if backed_blocks.peek(&block_hash).is_some() {
if let Some(metrics) = &metrics {
metrics.transaction_backed_duration.observe(submitted_at.elapsed().as_secs_f64());
}
} else {
// Received the transaction before the block is backed.
unresolved_tx.get_or_insert(
block_hash,
|| Vec::new(),
).map(|pending| pending.push(submitted_at));
}
}
}
}
}
}
}
3 changes: 2 additions & 1 deletion cumulus/polkadot-omni-node/lib/src/common/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ pub(crate) trait NodeSpec: BaseNodeSpec {
})
};

sc_service::spawn_tasks(sc_service::SpawnTasksParams {
let spawn_result = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
rpc_builder,
client: client.clone(),
transaction_pool: transaction_pool.clone(),
Expand Down Expand Up @@ -404,6 +404,7 @@ pub(crate) trait NodeSpec: BaseNodeSpec {
recovery_handle: Box::new(overseer_handle.clone()),
sync_service,
prometheus_registry: prometheus_registry.as_ref(),
rpc_transaction_v2_handles: spawn_result.transaction_v2_handles,
})?;

start_bootnode_tasks(StartBootnodeTasksParams {
Expand Down
3 changes: 2 additions & 1 deletion polkadot/node/service/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,8 @@ where
system_rpc_tx,
tx_handler_controller,
telemetry: telemetry.as_mut(),
})?;
})?
.rpc_handlers;

if let Some(hwbench) = hwbench {
sc_sysinfo::print_hwbench(&hwbench);
Expand Down
5 changes: 5 additions & 0 deletions substrate/client/rpc-spec-v2/src/transaction/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,9 @@ impl InstanceMetrics {
histogram.observe(self.submitted_at.elapsed().as_secs_f64());
}
}

/// Returns the time when the transaction was submitted.
pub fn submitted_at(&self) -> Instant {
self.submitted_at
}
}
2 changes: 1 addition & 1 deletion substrate/client/rpc-spec-v2/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ pub mod transaction_broadcast;
pub use api::{TransactionApiServer, TransactionBroadcastApiServer};
pub use event::{TransactionBlock, TransactionDropped, TransactionError, TransactionEvent};
pub use metrics::Metrics as TransactionMetrics;
pub use transaction::Transaction;
pub use transaction::{Transaction, TransactionMonitorEvent, TransactionMonitorHandle};
pub use transaction_broadcast::TransactionBroadcast;
Loading
Loading