diff --git a/Cargo.lock b/Cargo.lock index 6a4d8f4b83047..928bc1b6eb4ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4644,6 +4644,7 @@ dependencies = [ "sc-telemetry", "sc-transaction-pool", "sc-utils", + "schnellru", "sp-api", "sp-blockchain", "sp-consensus", diff --git a/cumulus/client/relay-chain-interface/src/lib.rs b/cumulus/client/relay-chain-interface/src/lib.rs index dd03738ed0029..9fa61d4b9d60e 100644 --- a/cumulus/client/relay-chain-interface/src/lib.rs +++ b/cumulus/client/relay-chain-interface/src/lib.rs @@ -249,6 +249,7 @@ pub trait RelayChainInterface: Send + Sync { /// Fetch the scheduling lookahead value. async fn scheduling_lookahead(&self, relay_parent: PHash) -> RelayChainResult; + /// Fetch the candidate events for the given relay chain block. async fn candidate_events(&self, at: RelayHash) -> RelayChainResult>; } diff --git a/cumulus/client/service/Cargo.toml b/cumulus/client/service/Cargo.toml index 3ea36d70b42b6..ae48505fa0c7e 100644 --- a/cumulus/client/service/Cargo.toml +++ b/cumulus/client/service/Cargo.toml @@ -15,6 +15,7 @@ workspace = true async-channel = { workspace = true } futures = { workspace = true } prometheus = { workspace = true } +schnellru = { workspace = true } # Substrate sc-client-api = { workspace = true, default-features = true } diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs index 501212eb971a3..309a733e9a6b4 100644 --- a/cumulus/client/service/src/lib.rs +++ b/cumulus/client/service/src/lib.rs @@ -28,8 +28,8 @@ use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc; use futures::{channel::mpsc, StreamExt}; -use polkadot_primitives::{CandidateEvent, CollatorPair, OccupiedCoreAssumption}; -use prometheus::{Histogram, HistogramOpts, Registry}; +use polkadot_primitives::{CollatorPair, OccupiedCoreAssumption}; +use prometheus::Registry; use sc_client_api::{ Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, ProofProvider, UsageProvider, }; @@ -49,14 +49,11 @@ use sc_utils::mpsc::TracingUnboundedSender; use sp_api::ProvideRuntimeApi; use sp_blockchain::{HeaderBackend, HeaderMetadata}; use sp_core::Decode; -use sp_runtime::{ - traits::{Block as BlockT, BlockIdTo, Header}, - SaturatedConversion, Saturating, -}; -use std::{ - sync::Arc, - time::{Duration, Instant}, -}; +use sp_runtime::traits::{Block as BlockT, BlockIdTo, Header}; +use std::{sync::Arc, time::Duration}; + +mod para_informant; +use crate::para_informant::ParachainInformant; /// Host functions that should be used in parachain nodes. /// @@ -98,6 +95,7 @@ pub struct StartRelayChainTasksParams<'a, Block: BlockT, Client, RCInterface> { pub recovery_handle: Box, pub sync_service: Arc>, pub prometheus_registry: Option<&'a Registry>, + pub rpc_transaction_v2_handles: Vec>, } /// Start necessary consensus tasks related to the relay chain. @@ -122,6 +120,7 @@ pub fn start_relay_chain_tasks( recovery_handle, sync_service, prometheus_registry, + rpc_transaction_v2_handles, }: StartRelayChainTasksParams, ) -> sc_service::error::Result<()> where @@ -191,15 +190,16 @@ where .spawn_essential_handle() .spawn("cumulus-pov-recovery", None, pov_recovery.run()); - let parachain_informant = parachain_informant::( - para_id, - relay_chain_interface.clone(), + let parachain_informant = ParachainInformant::::new( + Arc::new(relay_chain_interface.clone()), client.clone(), - prometheus_registry.map(ParachainInformantMetrics::new).transpose()?, - ); - task_manager - .spawn_handle() - .spawn("parachain-informant", None, parachain_informant); + prometheus_registry, + para_id, + rpc_transaction_v2_handles, + )?; + task_manager.spawn_handle().spawn("parachain-informant", None, async move { + let _ = parachain_informant.run().await; + }); Ok(()) } @@ -439,161 +439,3 @@ where Err("Stopping following imported blocks. Could not determine parachain target block".into()) } - -/// Task for logging candidate events and some related metrics. -async fn parachain_informant( - para_id: ParaId, - relay_chain_interface: impl RelayChainInterface + Clone, - client: Arc, - metrics: Option, -) where - Client: HeaderBackend + Send + Sync + 'static, -{ - let mut import_notifications = match relay_chain_interface.import_notification_stream().await { - Ok(import_notifications) => import_notifications, - Err(e) => { - log::error!("Failed to get import notification stream: {e:?}. Parachain informant will not run!"); - return - }, - }; - let mut last_backed_block_time: Option = None; - while let Some(n) = import_notifications.next().await { - let candidate_events = match relay_chain_interface.candidate_events(n.hash()).await { - Ok(candidate_events) => candidate_events, - Err(e) => { - log::warn!("Failed to get candidate events for block {}: {e:?}", n.hash()); - continue - }, - }; - let mut backed_candidates = Vec::new(); - let mut included_candidates = Vec::new(); - let mut timed_out_candidates = Vec::new(); - for event in candidate_events { - match event { - CandidateEvent::CandidateBacked(receipt, head, _, _) => { - if receipt.descriptor.para_id() != para_id { - continue; - } - let backed_block = match Block::Header::decode(&mut &head.0[..]) { - Ok(header) => header, - Err(e) => { - log::warn!( - "Failed to decode parachain header from backed block: {e:?}" - ); - continue - }, - }; - let backed_block_time = Instant::now(); - if let Some(last_backed_block_time) = &last_backed_block_time { - let duration = backed_block_time.duration_since(*last_backed_block_time); - if let Some(metrics) = &metrics { - metrics.parachain_block_backed_duration.observe(duration.as_secs_f64()); - } - } - last_backed_block_time = Some(backed_block_time); - backed_candidates.push(backed_block); - }, - CandidateEvent::CandidateIncluded(receipt, head, _, _) => { - if receipt.descriptor.para_id() != para_id { - continue; - } - let included_block = match Block::Header::decode(&mut &head.0[..]) { - Ok(header) => header, - Err(e) => { - log::warn!( - "Failed to decode parachain header from included block: {e:?}" - ); - continue - }, - }; - let unincluded_segment_size = - client.info().best_number.saturating_sub(*included_block.number()); - let unincluded_segment_size: u32 = unincluded_segment_size.saturated_into(); - if let Some(metrics) = &metrics { - metrics.unincluded_segment_size.observe(unincluded_segment_size.into()); - } - included_candidates.push(included_block); - }, - CandidateEvent::CandidateTimedOut(receipt, head, _) => { - if receipt.descriptor.para_id() != para_id { - continue; - } - let timed_out_block = match Block::Header::decode(&mut &head.0[..]) { - Ok(header) => header, - Err(e) => { - log::warn!( - "Failed to decode parachain header from timed out block: {e:?}" - ); - continue - }, - }; - timed_out_candidates.push(timed_out_block); - }, - } - } - let mut log_parts = Vec::new(); - if !backed_candidates.is_empty() { - let backed_candidates = backed_candidates - .into_iter() - .map(|c| format!("#{} ({})", c.number(), c.hash())) - .collect::>() - .join(", "); - log_parts.push(format!("backed: {}", backed_candidates)); - }; - if !included_candidates.is_empty() { - let included_candidates = included_candidates - .into_iter() - .map(|c| format!("#{} ({})", c.number(), c.hash())) - .collect::>() - .join(", "); - log_parts.push(format!("included: {}", included_candidates)); - }; - if !timed_out_candidates.is_empty() { - let timed_out_candidates = timed_out_candidates - .into_iter() - .map(|c| format!("#{} ({})", c.number(), c.hash())) - .collect::>() - .join(", "); - log_parts.push(format!("timed out: {}", timed_out_candidates)); - }; - if !log_parts.is_empty() { - log::info!( - "Update at relay chain block #{} ({}) - {}", - n.number(), - n.hash(), - log_parts.join(", ") - ); - } - } -} - -struct ParachainInformantMetrics { - /// Time between parachain blocks getting backed by the relaychain. - parachain_block_backed_duration: Histogram, - /// Number of blocks between best block and last included block. - unincluded_segment_size: Histogram, -} - -impl ParachainInformantMetrics { - fn new(prometheus_registry: &Registry) -> prometheus::Result { - let parachain_block_authorship_duration = Histogram::with_opts(HistogramOpts::new( - "parachain_block_backed_duration", - "Time between parachain blocks getting backed by the relaychain", - ))?; - prometheus_registry.register(Box::new(parachain_block_authorship_duration.clone()))?; - - let unincluded_segment_size = Histogram::with_opts( - HistogramOpts::new( - "parachain_unincluded_segment_size", - "Number of blocks between best block and last included block", - ) - .buckets((0..=24).into_iter().map(|i| i as f64).collect()), - )?; - prometheus_registry.register(Box::new(unincluded_segment_size.clone()))?; - - Ok(Self { - parachain_block_backed_duration: parachain_block_authorship_duration, - unincluded_segment_size, - }) - } -} diff --git a/cumulus/client/service/src/para_informant.rs b/cumulus/client/service/src/para_informant.rs new file mode 100644 index 0000000000000..3e508e502d7ea --- /dev/null +++ b/cumulus/client/service/src/para_informant.rs @@ -0,0 +1,358 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +//! Parachain informant service. +//! +//! Provides a service that logs information about parachain candidate events +//! and related metrics. + +use std::{pin::Pin, sync::Arc, time::Instant}; + +use cumulus_primitives_core::{relay_chain::Header as RelayHeader, ParaId}; +use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; +use futures::{FutureExt, StreamExt}; +use polkadot_primitives::CandidateEvent; +use prometheus::{linear_buckets, Histogram, HistogramOpts, Registry}; +use sc_service::TransactionMonitorEvent; +use sc_telemetry::log; +use schnellru::{ByLength, LruMap}; +use sp_blockchain::HeaderBackend; +use sp_core::Decode; +use sp_runtime::{ + traits::{Block as BlockT, Header}, + SaturatedConversion, Saturating, +}; + +const LOG_TARGET: &str = "parachain_informant"; + +/// The maximum number of entries to keep in the LRU caches. +const LRU_LENGTH: u32 = 64; + +/// Type alias for the transaction monitor event stream. +type TransactionMonitorEventStream = + Pin> + Unpin + Send>>; + +/// The parachain informant service. +pub struct ParachainInformant { + /// Relay chain interface to interact with the relay chain. + relay_chain_interface: Arc, + + /// Client to access the blockchain headers. + client: Arc>, + + /// Optional metrics for the parachain informant. + metrics: Option, + + /// Parachain ID of the parachain this informant is running for. + para_id: ParaId, + + /// Last time a block was backed. + last_backed_block_time: Option, + + /// Cache for storing the last backed blocks. + backed_blocks: LruMap, + + /// Cache for storing transactions not yet backed. + unresolved_tx: LruMap>, + + /// Stream of transaction events from RPC transaction v2 handles. + transaction_v2_handle: TransactionMonitorEventStream, +} + +impl ParachainInformant { + pub fn new( + relay_chain_interface: Arc, + client: Arc>, + registry: Option<&Registry>, + para_id: ParaId, + rpc_transaction_v2_handles: Vec>, + ) -> sc_service::error::Result { + let metrics = registry.map(|r| ParachainInformantMetrics::new(r)).transpose()?; + + let transaction_v2_handle: Pin< + Box> + Unpin + Send>, + > = if rpc_transaction_v2_handles.is_empty() { + Box::pin(futures::stream::pending()) + } else { + Box::pin(futures::stream::select_all(rpc_transaction_v2_handles)) + }; + + Ok(Self { + relay_chain_interface, + client, + metrics, + para_id, + last_backed_block_time: None, + backed_blocks: LruMap::new(ByLength::new(LRU_LENGTH)), + unresolved_tx: LruMap::new(ByLength::new(LRU_LENGTH)), + transaction_v2_handle, + }) + } + + /// Run the parachain informant service. + pub async fn run(mut self) -> RelayChainResult<()> { + let mut import_notifications = + self.relay_chain_interface.import_notification_stream().await.inspect_err(|e| { + log::error!( + target: LOG_TARGET, + "Failed to get import notification stream: {e:?}. Parachain informant will not run!" + ); + })?; + + loop { + futures::select! { + notification = import_notifications.next().fuse() => { + let Some(notification) = notification else { return Ok(()) }; + + self.handle_import_notification(notification).await; + }, + + tx_event = self.transaction_v2_handle.next().fuse() => { + let Some(tx_event) = tx_event else { continue }; + + log::debug!(target: LOG_TARGET, "Received transaction event: {:?}", tx_event); + + self.handle_rpc_monitor_event(tx_event); + }, + } + } + } + + /// Handle an import notification from the relay chain. + /// + /// Ensures the RPC backed blocks reflect into the metrics and + /// performs the parachain logging. + async fn handle_import_notification(&mut self, n: RelayHeader) { + let candidate_events = match self.relay_chain_interface.candidate_events(n.hash()).await { + Ok(candidate_events) => candidate_events, + Err(e) => { + log::warn!(target: LOG_TARGET, "Failed to get candidate events for block {}: {e:?}", n.hash()); + return; + }, + }; + + self.handle_rpc_backed_blocks(candidate_events.iter()); + + self.handle_logging(candidate_events, &n); + } + + /// Handle a transaction event from the RPC transaction monitor. + /// + /// If the transaction is included in a backed block a metric is recorded. + /// Otherwise, the transaction is stored in an unresolved transactions cache + /// until the block is backed. + fn handle_rpc_monitor_event(&mut self, event: TransactionMonitorEvent) { + let (block_hash, submitted_at) = match event { + sc_service::TransactionMonitorEvent::InBlock { block_hash, submitted_at } => + (sp_core::H256::from_slice(block_hash.as_ref()), submitted_at), + }; + + if self.backed_blocks.peek(&block_hash).is_some() { + if let Some(metrics) = &self.metrics { + metrics + .transaction_backed_duration + .observe(submitted_at.elapsed().as_secs_f64()); + } + } else { + // Received the transaction before the block is backed. + self.unresolved_tx + .get_or_insert(block_hash, || Vec::new()) + .map(|pending| pending.push(submitted_at)); + } + } + + /// Handle the RPC metrics for backed blocks. + fn handle_rpc_backed_blocks<'a>(&mut self, events: impl Iterator) { + let blocks = events.filter_map(|event| match event { + CandidateEvent::CandidateBacked(receipt, ..) + if receipt.descriptor.para_id() == self.para_id => + Some(receipt.descriptor.para_head()), + _ => None, + }); + + for block in blocks { + if self.backed_blocks.insert(block, ()) { + log::trace!(target: LOG_TARGET, "New backed block: {:?}", block); + } + + if let Some(tx_times) = self.unresolved_tx.remove(&block) { + for submitted_at in tx_times { + if let Some(metrics) = &self.metrics { + metrics + .transaction_backed_duration + .observe(submitted_at.elapsed().as_secs_f64()); + } + } + } + } + } + + /// Handle candidate events and log the results. + fn handle_logging(&mut self, candidate_events: Vec, n: &RelayHeader) { + let mut backed_candidates = Vec::new(); + let mut included_candidates = Vec::new(); + let mut timed_out_candidates = Vec::new(); + + for event in candidate_events { + match event { + CandidateEvent::CandidateBacked(receipt, head, _, _) => { + if receipt.descriptor.para_id() != self.para_id { + continue; + } + + let backed_block = match Block::Header::decode(&mut &head.0[..]) { + Ok(header) => header, + Err(e) => { + log::warn!( + target: LOG_TARGET, + "Failed to decode parachain header from backed block: {e:?}" + ); + continue + }, + }; + let backed_block_time = Instant::now(); + if let Some(last_backed_block_time) = &self.last_backed_block_time { + let duration = backed_block_time.duration_since(*last_backed_block_time); + if let Some(metrics) = &self.metrics { + metrics.parachain_block_backed_duration.observe(duration.as_secs_f64()); + } + } + self.last_backed_block_time = Some(backed_block_time); + backed_candidates.push(backed_block); + }, + CandidateEvent::CandidateIncluded(receipt, head, _, _) => { + if receipt.descriptor.para_id() != self.para_id { + continue; + } + + let included_block = match Block::Header::decode(&mut &head.0[..]) { + Ok(header) => header, + Err(e) => { + log::warn!( + target: LOG_TARGET, + "Failed to decode parachain header from included block: {e:?}" + ); + continue + }, + }; + let unincluded_segment_size = + self.client.info().best_number.saturating_sub(*included_block.number()); + let unincluded_segment_size: u32 = unincluded_segment_size.saturated_into(); + if let Some(metrics) = &self.metrics { + metrics.unincluded_segment_size.observe(unincluded_segment_size.into()); + } + included_candidates.push(included_block); + }, + CandidateEvent::CandidateTimedOut(receipt, head, _) => { + if receipt.descriptor.para_id() != self.para_id { + continue; + } + + let timed_out_block = match Block::Header::decode(&mut &head.0[..]) { + Ok(header) => header, + Err(e) => { + log::warn!( + target: LOG_TARGET, + "Failed to decode parachain header from timed out block: {e:?}" + ); + continue + }, + }; + timed_out_candidates.push(timed_out_block); + }, + } + } + let mut log_parts = Vec::new(); + if !backed_candidates.is_empty() { + let backed_candidates = backed_candidates + .into_iter() + .map(|c| format!("#{} ({})", c.number(), c.hash())) + .collect::>() + .join(", "); + log_parts.push(format!("backed: {}", backed_candidates)); + }; + if !included_candidates.is_empty() { + let included_candidates = included_candidates + .into_iter() + .map(|c| format!("#{} ({})", c.number(), c.hash())) + .collect::>() + .join(", "); + log_parts.push(format!("included: {}", included_candidates)); + }; + if !timed_out_candidates.is_empty() { + let timed_out_candidates = timed_out_candidates + .into_iter() + .map(|c| format!("#{} ({})", c.number(), c.hash())) + .collect::>() + .join(", "); + log_parts.push(format!("timed out: {}", timed_out_candidates)); + }; + if !log_parts.is_empty() { + log::info!( + target: LOG_TARGET, + "Update at relay chain block #{} ({}) - {}", + n.number(), + n.hash(), + log_parts.join(", ") + ); + } + } +} + +/// Metrics for the parachain informant service. +pub struct ParachainInformantMetrics { + /// Time between parachain blocks getting backed by the relaychain. + parachain_block_backed_duration: Histogram, + /// Number of blocks between best block and last included block. + unincluded_segment_size: Histogram, + /// Time between the submission of a transaction and its inclusion in a backed block. + transaction_backed_duration: Histogram, +} + +impl ParachainInformantMetrics { + pub fn new(prometheus_registry: &Registry) -> prometheus::Result { + let parachain_block_authorship_duration = Histogram::with_opts(HistogramOpts::new( + "parachain_block_backed_duration", + "Time between parachain blocks getting backed by the relaychain", + ))?; + prometheus_registry.register(Box::new(parachain_block_authorship_duration.clone()))?; + + let unincluded_segment_size = Histogram::with_opts( + HistogramOpts::new( + "parachain_unincluded_segment_size", + "Number of blocks between best block and last included block", + ) + .buckets((0..=24).into_iter().map(|i| i as f64).collect()), + )?; + prometheus_registry.register(Box::new(unincluded_segment_size.clone()))?; + + let transaction_backed_duration = Histogram::with_opts( + HistogramOpts::new( + "parachain_transaction_backed_duration", + "Time between the submission of a transaction and its inclusion in a backed block", + ) + .buckets(linear_buckets(0.01, 40.0, 20).expect("Valid buckets; qed")), + )?; + prometheus_registry.register(Box::new(transaction_backed_duration.clone()))?; + + Ok(Self { + parachain_block_backed_duration: parachain_block_authorship_duration, + unincluded_segment_size, + transaction_backed_duration, + }) + } +} diff --git a/cumulus/polkadot-omni-node/lib/src/common/spec.rs b/cumulus/polkadot-omni-node/lib/src/common/spec.rs index f0d4cc0e0a88d..488c2540be919 100644 --- a/cumulus/polkadot-omni-node/lib/src/common/spec.rs +++ b/cumulus/polkadot-omni-node/lib/src/common/spec.rs @@ -435,7 +435,7 @@ pub(crate) trait NodeSpec: BaseNodeSpec { }) }; - sc_service::spawn_tasks(sc_service::SpawnTasksParams { + let spawn_result = sc_service::spawn_tasks(sc_service::SpawnTasksParams { rpc_builder, client: client.clone(), transaction_pool: transaction_pool.clone(), @@ -493,6 +493,7 @@ pub(crate) trait NodeSpec: BaseNodeSpec { recovery_handle: Box::new(overseer_handle.clone()), sync_service, prometheus_registry: prometheus_registry.as_ref(), + rpc_transaction_v2_handles: spawn_result.transaction_v2_handles, })?; start_bootnode_tasks(StartBootnodeTasksParams { diff --git a/cumulus/test/service/src/lib.rs b/cumulus/test/service/src/lib.rs index d0d2325c21b6c..931bcc60a8dff 100644 --- a/cumulus/test/service/src/lib.rs +++ b/cumulus/test/service/src/lib.rs @@ -389,7 +389,8 @@ where system_rpc_tx, tx_handler_controller, telemetry: None, - })?; + })? + .rpc_handlers; let announce_block = { let sync_service = sync_service.clone(); @@ -427,6 +428,7 @@ where recovery_handle, sync_service: sync_service.clone(), prometheus_registry: None, + rpc_transaction_v2_handles: Default::default(), })?; if let Some(collator_key) = collator_key { diff --git a/polkadot/node/service/src/builder/mod.rs b/polkadot/node/service/src/builder/mod.rs index c4e9a6668ed3e..618d5a2402b2c 100644 --- a/polkadot/node/service/src/builder/mod.rs +++ b/polkadot/node/service/src/builder/mod.rs @@ -506,7 +506,8 @@ where system_rpc_tx, tx_handler_controller, telemetry: telemetry.as_mut(), - })?; + })? + .rpc_handlers; if let Some(hwbench) = hwbench { sc_sysinfo::print_hwbench(&hwbench); diff --git a/prdoc/pr_8902.prdoc b/prdoc/pr_8902.prdoc new file mode 100644 index 0000000000000..382544831466d --- /dev/null +++ b/prdoc/pr_8902.prdoc @@ -0,0 +1,71 @@ +title: 'cumulus/metrics: Measure the time of including a tx in a backed block' +doc: +- audience: Node Dev + description: |- + This PR introduces a metric in Cumulus to measure the time it took a transaction received on the RPC layer to be included a backed block. + + - A new `TransactionMonitorHandle` is exposed from the transaction RPC V2 API to listen on monitoring events emitted by substrate + - The monitoring event contains the block of the transaction with the timestamp associated + - A cumulus task is added to group the backed events from the relay chain with the monitoring events exposed by the RPC layer + - The spawning of RPC `spawn_tasks` now returns the additional `TransactionMonitorHandle` + - The metric is exposed under the following histogram: `parachain_transaction_backed_duration` + + ## Testing Done + - Started a small network with 2 validators and 2 collators (charlie and dave -- using `small_network.toml` as base) + - Used subxt with a minimal working code to send a transaction to charlie + - Used the RPC-V2 API (ie ChainHeadBackend) + + ```rust + use std::sync::Arc; + use subxt::{ + OnlineClient, PolkadotConfig, + backend::{chain_head::ChainHeadBackend, rpc::RpcClient}, + }; + use subxt_signer::sr25519::dev; + + #[subxt::subxt(runtime_metadata_insecure_url = "ws://127.0.0.1:44919")] + pub mod polkadot {} + + #[tokio::main] + async fn main() -> Result<(), Box> { + let rpc = RpcClient::from_url("ws://127.0.0.1:44919".to_string()).await?; + let backend = ChainHeadBackend::builder().build_with_background_driver(rpc.clone()); + let api: OnlineClient = OnlineClient::from_backend(Arc::new(backend)).await?; + + let from = dev::charlie(); + let dest = dev::dave().public_key().into(); + let tx = Box::new(polkadot::tx().balances().transfer_allow_death(dest, 10_000)); + api.tx() + .sign_and_submit_then_watch_default(&tx, &from) + .await? + .wait_for_finalized_success() + .await?; + + Ok(()) + } + ``` + - After the transaction gets finalized: + + ``` + parachain_transaction_backed_duration_sum{chain="asset-hub-rococo-local"} 13.097583666 + parachain_transaction_backed_duration_count{chain="asset-hub-rococo-local"} 1 + ``` + + Closes https://github.com/paritytech/polkadot-sdk/issues/8383 + + cc @paritytech/sdk-node +crates: +- name: cumulus-relay-chain-interface + bump: patch +- name: sc-rpc-spec-v2 + bump: patch +- name: cumulus-client-service + bump: patch +- name: polkadot-omni-node-lib + bump: patch +- name: polkadot-service + bump: patch +- name: sc-service + bump: patch +- name: pallet-revive-eth-rpc + bump: patch diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index 8db5cf24ea22a..2b7977c49e3d6 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -568,7 +568,8 @@ pub fn new_full_base::Hash>>( tx_handler_controller, sync_service: sync_service.clone(), telemetry: telemetry.as_mut(), - })?; + })? + .rpc_handlers; if let Some(hwbench) = hwbench { sc_sysinfo::print_hwbench(&hwbench); diff --git a/substrate/client/rpc-spec-v2/src/transaction/metrics.rs b/substrate/client/rpc-spec-v2/src/transaction/metrics.rs index 25745ba9116df..142c0549db29f 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/metrics.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/metrics.rs @@ -152,4 +152,9 @@ impl InstanceMetrics { histogram.observe(self.submitted_at.elapsed().as_secs_f64()); } } + + /// Returns the time when the transaction was submitted. + pub fn submitted_at(&self) -> Instant { + self.submitted_at + } } diff --git a/substrate/client/rpc-spec-v2/src/transaction/mod.rs b/substrate/client/rpc-spec-v2/src/transaction/mod.rs index 4cb7ceb06ab0d..fcfd49d00c293 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/mod.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/mod.rs @@ -39,5 +39,5 @@ pub mod transaction_broadcast; pub use api::{TransactionApiServer, TransactionBroadcastApiServer}; pub use event::{TransactionBlock, TransactionDropped, TransactionError, TransactionEvent}; pub use metrics::Metrics as TransactionMetrics; -pub use transaction::Transaction; +pub use transaction::{Transaction, TransactionMonitorEvent, TransactionMonitorHandle}; pub use transaction_broadcast::TransactionBroadcast; diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs index 2618da953bc26..f65e232f9c4be 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs @@ -22,6 +22,7 @@ use crate::{ api::{TransactionApiServer, TransactionBroadcastApiServer}, tests::executor::{TaskExecutorBroadcast, TaskExecutorState}, Transaction as RpcTransaction, TransactionBroadcast as RpcTransactionBroadcast, + TransactionMonitorHandle, }, }; use futures::Future; @@ -104,6 +105,20 @@ pub fn setup_api_tx() -> ( RpcModule>>>, TaskExecutorState, MiddlewarePoolRecv, +) { + let (api, pool, client_mock, tx_api, executor_recv, pool_state, _tx_monitor) = + setup_api_tx_with_monitoring(); + (api, pool, client_mock, tx_api, executor_recv, pool_state) +} + +pub fn setup_api_tx_with_monitoring() -> ( + Arc, + Arc, + Arc>>, + RpcModule>>>, + TaskExecutorState, + MiddlewarePoolRecv, + TransactionMonitorHandle, ) { let (pool, api, _) = maintained_pool(Default::default()); let (pool, pool_state) = MiddlewarePool::new(Arc::new(pool).clone()); @@ -114,11 +129,11 @@ pub fn setup_api_tx() -> ( let client_mock = Arc::new(ChainHeadMockClient::new(client.clone())); let (task_executor, executor_recv) = TaskExecutorBroadcast::new(); - let tx_api = - RpcTransaction::new(client_mock.clone(), pool.clone(), Arc::new(task_executor), None) - .into_rpc(); + let (tx_api, tx_monitor) = + RpcTransaction::new(client_mock.clone(), pool.clone(), Arc::new(task_executor), None); + let tx_api = tx_api.into_rpc(); - (api, pool, client_mock, tx_api, executor_recv, pool_state) + (api, pool, client_mock, tx_api, executor_recv, pool_state, tx_monitor) } /// Get the next event from the provided middleware in at most 5 seconds. diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_tests.rs index 879d51eaf5f39..9c6f05dff82d5 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_tests.rs @@ -18,10 +18,11 @@ use crate::{ hex_string, - transaction::{TransactionBlock, TransactionEvent}, + transaction::{tests::setup::setup_api_tx_with_monitoring, TransactionBlock, TransactionEvent}, }; use assert_matches::assert_matches; use codec::Encode; +use futures::StreamExt; use jsonrpsee::rpc_params; use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool}; use sp_core::H256; @@ -235,3 +236,48 @@ async fn tx_slow_client_replace_old_messages() { assert_eq!(res, exp); } + +#[tokio::test] +async fn tx_with_monitoring() { + let (api, pool, client, tx_api, _exec_middleware, _pool_middleware, mut tx_monitor) = + setup_api_tx_with_monitoring(); + let block_1_header = api.push_block(1, vec![], true); + client.set_best_block(block_1_header.hash(), 1); + + let uxt = uxt(Alice, ALICE_NONCE); + let xt = hex_string(&uxt.encode()); + + let mut sub = tx_api + .subscribe_unbounded("transactionWatch_v1_submitAndWatch", rpc_params![&xt]) + .await + .unwrap(); + + let event: TransactionEvent = get_next_event_sub!(&mut sub); + assert_eq!(event, TransactionEvent::Validated); + + // Import block 2 with the transaction included. + let block_2_header = api.push_block(2, vec![uxt.clone()], true); + let block_2 = block_2_header.hash(); + let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None }; + pool.inner_pool.maintain(event).await; + + let event: TransactionEvent = get_next_event_sub!(&mut sub); + assert_eq!( + event, + TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock { + hash: block_2, + index: 0 + })) + ); + + // Check the monitor produces the same event + let result = tokio::time::timeout(std::time::Duration::from_secs(5), tx_monitor.next()) + .await + .unwrap() + .unwrap(); + let (block_hash, _submitted_at) = match result { + crate::transaction::TransactionMonitorEvent::InBlock { block_hash, submitted_at } => + (block_hash, submitted_at), + }; + assert_eq!(block_hash, block_2); +} diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs index a497c1a71f8b0..5066a039e5bf6 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs @@ -28,7 +28,7 @@ use crate::{ }; use codec::Decode; -use futures::{StreamExt, TryFutureExt}; +use futures::{Stream, StreamExt, TryFutureExt}; use jsonrpsee::{core::async_trait, PendingSubscriptionSink}; use super::metrics::{InstanceMetrics, Metrics}; @@ -43,29 +43,54 @@ use sp_core::Bytes; use sp_runtime::traits::Block as BlockT; use std::sync::Arc; -pub(crate) const LOG_TARGET: &str = "rpc-spec-v2"; +pub(crate) const LOG_TARGET: &str = "rpc-spec-v2::tx"; /// An API for transaction RPC calls. -pub struct Transaction { +pub struct Transaction +where + Pool: TransactionPool + Sync + Send + 'static, + Pool::Hash: Unpin, + ::Hash: Unpin, + Client: HeaderBackend + Send + Sync + 'static, +{ /// Substrate client. client: Arc, /// Transactions pool. pool: Arc, /// Executor to spawn subscriptions. executor: SubscriptionTaskExecutor, + /// Channel to announce transactions. + tx_announce: tokio::sync::mpsc::Sender::Hash>>, /// Metrics for transactions. metrics: Option, } -impl Transaction { +impl Transaction +where + Pool: TransactionPool + Sync + Send + 'static, + Pool::Hash: Unpin, + ::Hash: Unpin, + Client: HeaderBackend + Send + Sync + 'static, +{ /// Creates a new [`Transaction`]. pub fn new( client: Arc, pool: Arc, executor: SubscriptionTaskExecutor, metrics: Option, - ) -> Self { - Transaction { client, pool, executor, metrics } + ) -> (Self, TransactionMonitorHandle<::Hash>) { + let (tx_announce, rx_announce) = tokio::sync::mpsc::channel(1024); + + ( + Transaction { + client: client.clone(), + pool: pool.clone(), + executor, + tx_announce, + metrics, + }, + TransactionMonitorHandle(Some(rx_announce)), + ) } } @@ -90,6 +115,7 @@ where // Get a new transaction metrics instance and increment the counter. let mut metrics = InstanceMetrics::new(self.metrics.clone()); + let tx_announce = self.tx_announce.clone(); let fut = async move { let decoded_extrinsic = match TransactionFor::::decode(&mut &xt[..]) { @@ -125,6 +151,7 @@ where return; }; + let submitted_at = metrics.submitted_at(); match submit.await { Ok(stream) => { let stream = stream @@ -135,7 +162,25 @@ where metrics.register_event(event); }); - async move { event } + let tx_announce = tx_announce.clone(); + async move { + if let Some(TransactionEvent::BestChainBlockIncluded(Some( + TransactionBlock { hash, .. }, + ))) = &event + { + // Notify the monitor about the transaction being included in a + // block. + let _ = tx_announce + .send(TransactionMonitorEvent::InBlock { + block_hash: *hash, + submitted_at, + }) + .await; + log::trace!(target: LOG_TARGET, "Transaction included in block and notified: {:?}", hash); + } + + event + } }) .boxed(); @@ -189,3 +234,38 @@ fn handle_event( TransactionStatus::Broadcast(_) => None, } } + +/// Handle for the transaction monitor. +#[derive(Default)] +pub struct TransactionMonitorHandle( + Option>>, +); + +/// An event emitted by the transaction monitor. +/// +/// This is used to notify about the state of transactions. +#[derive(Debug, Clone)] +pub enum TransactionMonitorEvent { + /// The transaction has been included in a block. + InBlock { + /// The block hash where the transaction was included. + block_hash: Hash, + + /// The time when the transaction was submitted. + submitted_at: std::time::Instant, + }, +} + +impl Stream for TransactionMonitorHandle { + type Item = TransactionMonitorEvent; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + if let Some(rx) = self.get_mut().0.as_mut() { + return rx.poll_recv(cx); + } + std::task::Poll::Pending + } +} diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 74d94c30cd69b..1158db8b4f4cf 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -463,10 +463,19 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> { pub telemetry: Option<&'a mut Telemetry>, } +pub struct SpawnTaskHandlers { + /// RPC handlers for the in-memory RPC server. + pub rpc_handlers: RpcHandlers, + + /// Specific handler for the RPC transactions. + pub transaction_v2_handles: + Vec>, +} + /// Spawn the tasks that are required to run a node. pub fn spawn_tasks( params: SpawnTasksParams, -) -> Result +) -> Result, Error> where TCl: ProvideRuntimeApi + HeaderMetadata @@ -628,7 +637,11 @@ where rpc_id_provider, )?; + let mut transaction_v2_handles = Vec::with_capacity(2); + transaction_v2_handles.push(rpc_server_handle.transaction_v2_handle); + let listen_addrs = rpc_server_handle + .server .listen_addrs() .into_iter() .map(|socket_addr| { @@ -639,7 +652,11 @@ where .collect(); let in_memory_rpc = { - let mut module = gen_rpc_module()?; + let result = gen_rpc_module()?; + // Take in memory RPC into account. + transaction_v2_handles.push(result.transaction_v2_handle); + + let mut module = result.module; module.extensions_mut().insert(DenyUnsafe::No); module }; @@ -653,9 +670,9 @@ where sc_informant::build(client.clone(), network, sync_service.clone()), ); - task_manager.keep_alive((config.base_path, rpc_server_handle)); + task_manager.keep_alive((config.base_path, rpc_server_handle.server)); - Ok(in_memory_rpc_handle) + Ok(SpawnTaskHandlers { rpc_handlers: in_memory_rpc_handle, transaction_v2_handles }) } /// Returns a future that forwards imported transactions to the transaction networking protocol. @@ -750,6 +767,15 @@ where Ok(telemetry.handle()) } +/// RPC module data structure. +pub struct RpcModuleData { + /// The RPC module used to instantiate the RPC server. + pub module: RpcModule<()>, + + /// Specific handler for the RPC transactions. + pub transaction_v2_handle: sc_rpc_spec_v2::transaction::TransactionMonitorHandle, +} + /// Generate RPC module using provided configuration pub fn gen_rpc_module( spawn_handle: SpawnTaskHandle, @@ -765,7 +791,7 @@ pub fn gen_rpc_module( backend: Arc, rpc_builder: &(dyn Fn(SubscriptionTaskExecutor) -> Result, Error>), metrics: Option, -) -> Result, Error> +) -> Result, Error> where TBl: BlockT, TCl: ProvideRuntimeApi @@ -816,13 +842,13 @@ where ) .into_rpc(); - let transaction_v2 = sc_rpc_spec_v2::transaction::Transaction::new( + let (transaction_v2, transaction_v2_handle) = sc_rpc_spec_v2::transaction::Transaction::new( client.clone(), transaction_pool.clone(), task_executor.clone(), metrics, - ) - .into_rpc(); + ); + let transaction_v2 = transaction_v2.into_rpc(); let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new( client.clone(), @@ -893,7 +919,7 @@ where let extra_rpcs = rpc_builder(task_executor.clone())?; rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?; - Ok(rpc_api) + Ok(RpcModuleData { module: rpc_api, transaction_v2_handle }) } /// Parameters to pass into [`build_network`]. diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index a7f1b07111828..026deda54b49f 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -61,8 +61,8 @@ pub use self::{ new_client, new_db_backend, new_full_client, new_full_parts, new_full_parts_record_import, new_full_parts_with_genesis_builder, new_wasm_executor, propagate_transaction_notifications, spawn_tasks, BuildNetworkAdvancedParams, - BuildNetworkParams, DefaultSyncingEngineConfig, KeystoreContainer, SpawnTasksParams, - TFullBackend, TFullCallExecutor, TFullClient, + BuildNetworkParams, DefaultSyncingEngineConfig, KeystoreContainer, RpcModuleData, + SpawnTasksParams, TFullBackend, TFullCallExecutor, TFullClient, }, client::{ClientConfig, LocalCallExecutor}, error::Error, @@ -84,6 +84,8 @@ pub use sc_chain_spec::{ Properties, }; +pub use sc_rpc_spec_v2::transaction::{TransactionMonitorEvent, TransactionMonitorHandle}; + use crate::config::RpcConfiguration; use prometheus_endpoint::Registry; pub use sc_consensus::ImportQueue; @@ -379,16 +381,25 @@ pub async fn build_system_rpc_future< debug!("`NetworkWorker` has terminated, shutting down the system RPC future."); } +/// The data returned by starting the RPC server. +pub struct RpcServerData { + /// The RPC server instance. + pub server: Server, + /// Specific handler for the RPC transactions. + pub transaction_v2_handle: sc_rpc_spec_v2::transaction::TransactionMonitorHandle, +} + /// Starts RPC servers. -pub fn start_rpc_servers( +pub fn start_rpc_servers( rpc_configuration: &RpcConfiguration, registry: Option<&Registry>, tokio_handle: &Handle, gen_rpc_module: R, rpc_id_provider: Option>, -) -> Result +) -> Result, error::Error> where - R: Fn() -> Result, Error>, + Hash: Clone, + R: Fn() -> Result, Error>, { let endpoints: Vec = if let Some(endpoints) = rpc_configuration.addr.as_ref() @@ -436,11 +447,11 @@ where }; let metrics = sc_rpc_server::RpcMetrics::new(registry)?; - let rpc_api = gen_rpc_module()?; + let RpcModuleData { module, transaction_v2_handle } = gen_rpc_module()?; let server_config = sc_rpc_server::Config { endpoints, - rpc_api, + rpc_api: module, metrics, id_provider: rpc_id_provider, tokio_handle: tokio_handle.clone(), @@ -453,7 +464,7 @@ where match tokio::task::block_in_place(|| { tokio_handle.block_on(sc_rpc_server::start_server(server_config)) }) { - Ok(server) => Ok(server), + Ok(server) => Ok(RpcServerData { server, transaction_v2_handle }), Err(e) => Err(Error::Application(e)), } } diff --git a/substrate/frame/revive/rpc/src/cli.rs b/substrate/frame/revive/rpc/src/cli.rs index c603947e33a95..56be27cf854b1 100644 --- a/substrate/frame/revive/rpc/src/cli.rs +++ b/substrate/frame/revive/rpc/src/cli.rs @@ -243,14 +243,17 @@ pub fn run(cmd: CliCommand) -> anyhow::Result<()> { } }); - task_manager.keep_alive(rpc_server_handle); + task_manager.keep_alive(rpc_server_handle.server); let signals = tokio_runtime.block_on(async { Signals::capture() })?; tokio_runtime.block_on(signals.run_until_signal(task_manager.future().fuse()))?; Ok(()) } /// Create the JSON-RPC module. -fn rpc_module(is_dev: bool, client: Client) -> Result, sc_service::Error> { +fn rpc_module( + is_dev: bool, + client: Client, +) -> Result, sc_service::Error> { let eth_api = EthRpcServerImpl::new(client.clone()) .with_accounts(if is_dev { vec![crate::Account::default()] } else { vec![] }) .into_rpc(); @@ -262,5 +265,6 @@ fn rpc_module(is_dev: bool, client: Client) -> Result, sc_service: module.merge(eth_api).map_err(|e| sc_service::Error::Application(e.into()))?; module.merge(health_api).map_err(|e| sc_service::Error::Application(e.into()))?; module.merge(debug_api).map_err(|e| sc_service::Error::Application(e.into()))?; - Ok(module) + + Ok(sc_service::RpcModuleData { module, transaction_v2_handle: Default::default() }) } diff --git a/templates/parachain/node/src/service.rs b/templates/parachain/node/src/service.rs index 4ce1bba376e8d..b2847e8e207f4 100644 --- a/templates/parachain/node/src/service.rs +++ b/templates/parachain/node/src/service.rs @@ -334,7 +334,7 @@ pub async fn start_parachain_node( }) }; - sc_service::spawn_tasks(sc_service::SpawnTasksParams { + let spawn_result = sc_service::spawn_tasks(sc_service::SpawnTasksParams { rpc_builder, client: client.clone(), transaction_pool: transaction_pool.clone(), @@ -401,6 +401,7 @@ pub async fn start_parachain_node( recovery_handle: Box::new(overseer_handle.clone()), sync_service: sync_service.clone(), prometheus_registry: prometheus_registry.as_ref(), + rpc_transaction_v2_handles: spawn_result.transaction_v2_handles, })?; start_bootnode_tasks(StartBootnodeTasksParams {