Skip to content

Commit 51819f5

Browse files
lexnvgithub-actions[bot]
authored andcommitted
tx/metrics: Add metrics for the RPC v2 transactionWatch_v1_submitAndWatch (#8345)
This PR adds metrics for the following RPC subscription: [transactionWatch_v1_submitAndWatch](https://paritytech.github.io/json-rpc-interface-spec/api/transactionWatch_v1_submitAndWatch.html) Metrics are exposed in two ways: - simple counters of how many events we've seen globally - a histogram vector of execution times, which is labeled by `initial event` -> `final event` - This helps us identify how long it takes the transaction pool to advance the state of the events, and further debug issues Part of: #8336 ### (outdated) PoC Dashboards ![Screenshot 2025-04-28 at 17 50 48](https://github.com/user-attachments/assets/9fd0bf30-a321-4362-a10b-dfc3de1eb474) ### Next steps - [x] initial dashboards with a live node - [x] adjust testing --------- Signed-off-by: Alexandru Vasile <[email protected]> Co-authored-by: cmd[bot] <41898282+github-actions[bot]@users.noreply.github.com>
1 parent db2e73c commit 51819f5

File tree

9 files changed

+249
-11
lines changed

9 files changed

+249
-11
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

prdoc/pr_8345.prdoc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
title: 'tx/metrics: Add metrics for the RPC v2 `transactionWatch_v1_submitAndWatch`'
2+
doc:
3+
- audience: Node Operator
4+
description: |-
5+
This PR adds metrics for the following RPC subscription: [transactionWatch_v1_submitAndWatch](https://paritytech.github.io/json-rpc-interface-spec/api/transactionWatch_v1_submitAndWatch.html)
6+
7+
Metrics are exposed in two ways:
8+
- simple counters of how many events we've seen globally
9+
- a histogram vector of execution times, which is labeled by `initial event` -> `final event`
10+
- This helps us identify how long it takes the transaction pool to advance the state of the events, and further debug issues
11+
12+
Part of: https://github.com/paritytech/polkadot-sdk/issues/8336
13+
14+
### (outdated) PoC Dashboards
15+
16+
![Screenshot 2025-04-28 at 17 50 48](https://github.com/user-attachments/assets/9fd0bf30-a321-4362-a10b-dfc3de1eb474)
17+
18+
19+
### Next steps
20+
- [x] initial dashboards with a live node
21+
- [x] adjust testing
22+
crates:
23+
- name: sc-service
24+
bump: major
25+
- name: sc-rpc-spec-v2
26+
bump: major

substrate/client/rpc-spec-v2/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ hex = { workspace = true, default-features = true }
2828
itertools = { workspace = true }
2929
log = { workspace = true, default-features = true }
3030
parking_lot = { workspace = true, default-features = true }
31+
prometheus-endpoint = { workspace = true, default-features = true }
3132
rand = { workspace = true, default-features = true }
3233
sc-client-api = { workspace = true, default-features = true }
3334
sc-rpc = { workspace = true, default-features = true }

substrate/client/rpc-spec-v2/src/transaction/event.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,19 @@ pub enum TransactionEvent<Hash> {
9595
Dropped(TransactionDropped),
9696
}
9797

98+
impl<Hash> TransactionEvent<Hash> {
99+
/// Returns true if this is the last event emitted by the RPC subscription.
100+
pub fn is_final(&self) -> bool {
101+
matches!(
102+
&self,
103+
TransactionEvent::Finalized(_) |
104+
TransactionEvent::Error(_) |
105+
TransactionEvent::Invalid(_) |
106+
TransactionEvent::Dropped(_)
107+
)
108+
}
109+
}
110+
98111
/// Intermediate representation (IR) for the transaction events
99112
/// that handles block events only.
100113
///
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// This file is part of Substrate.
2+
3+
// Copyright (C) Parity Technologies (UK) Ltd.
4+
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5+
6+
// This program is free software: you can redistribute it and/or modify
7+
// it under the terms of the GNU General Public License as published by
8+
// the Free Software Foundation, either version 3 of the License, or
9+
// (at your option) any later version.
10+
11+
// This program is distributed in the hope that it will be useful,
12+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
// GNU General Public License for more details.
15+
16+
// You should have received a copy of the GNU General Public License
17+
// along with this program. If not, see <https://www.gnu.org/licenses/>.
18+
19+
//! Metrics for recording transaction events.
20+
21+
use std::{collections::HashSet, time::Instant};
22+
23+
use prometheus_endpoint::{
24+
exponential_buckets, linear_buckets, register, Histogram, HistogramOpts, PrometheusError,
25+
Registry,
26+
};
27+
28+
use super::TransactionEvent;
29+
30+
/// RPC layer metrics for transaction pool.
31+
#[derive(Debug, Clone)]
32+
pub struct Metrics {
33+
validated: Histogram,
34+
in_block: Histogram,
35+
finalized: Histogram,
36+
dropped: Histogram,
37+
invalid: Histogram,
38+
error: Histogram,
39+
}
40+
41+
impl Metrics {
42+
/// Creates a new [`Metrics`] instance.
43+
pub fn new(registry: &Registry) -> Result<Self, PrometheusError> {
44+
let validated = register(
45+
Histogram::with_opts(
46+
HistogramOpts::new(
47+
"rpc_transaction_validation_time",
48+
"RPC Transaction validation time in seconds",
49+
)
50+
.buckets(exponential_buckets(0.01, 2.0, 16).expect("Valid buckets; qed")),
51+
)?,
52+
registry,
53+
)?;
54+
55+
let in_block = register(
56+
Histogram::with_opts(
57+
HistogramOpts::new(
58+
"rpc_transaction_in_block_time",
59+
"RPC Transaction in block time in seconds",
60+
)
61+
.buckets(linear_buckets(0.0, 3.0, 20).expect("Valid buckets; qed")),
62+
)?,
63+
registry,
64+
)?;
65+
66+
let finalized = register(
67+
Histogram::with_opts(
68+
HistogramOpts::new(
69+
"rpc_transaction_finalized_time",
70+
"RPC Transaction finalized time in seconds",
71+
)
72+
.buckets(linear_buckets(0.01, 40.0, 20).expect("Valid buckets; qed")),
73+
)?,
74+
registry,
75+
)?;
76+
77+
let dropped = register(
78+
Histogram::with_opts(
79+
HistogramOpts::new(
80+
"rpc_transaction_dropped_time",
81+
"RPC Transaction dropped time in seconds",
82+
)
83+
.buckets(linear_buckets(0.01, 3.0, 20).expect("Valid buckets; qed")),
84+
)?,
85+
registry,
86+
)?;
87+
88+
let invalid = register(
89+
Histogram::with_opts(
90+
HistogramOpts::new(
91+
"rpc_transaction_invalid_time",
92+
"RPC Transaction invalid time in seconds",
93+
)
94+
.buckets(linear_buckets(0.01, 3.0, 20).expect("Valid buckets; qed")),
95+
)?,
96+
registry,
97+
)?;
98+
99+
let error = register(
100+
Histogram::with_opts(
101+
HistogramOpts::new(
102+
"rpc_transaction_error_time",
103+
"RPC Transaction error time in seconds",
104+
)
105+
.buckets(linear_buckets(0.01, 3.0, 20).expect("Valid buckets; qed")),
106+
)?,
107+
registry,
108+
)?;
109+
110+
Ok(Metrics { validated, in_block, finalized, dropped, invalid, error })
111+
}
112+
}
113+
114+
/// Transaction metrics for a single transaction instance.
115+
pub struct InstanceMetrics {
116+
/// The metrics instance.
117+
metrics: Option<Metrics>,
118+
/// The time when the transaction was submitted.
119+
submitted_at: Instant,
120+
/// Ensure the states are reported once.
121+
reported_states: HashSet<&'static str>,
122+
}
123+
124+
impl InstanceMetrics {
125+
/// Creates a new [`InstanceMetrics`] instance.
126+
pub fn new(metrics: Option<Metrics>) -> Self {
127+
Self { metrics, submitted_at: Instant::now(), reported_states: HashSet::new() }
128+
}
129+
130+
/// Record the execution time of a transaction state.
131+
///
132+
/// This represents how long it took for the transaction to move to the next state.
133+
///
134+
/// The method must be called before the transaction event is provided to the user.
135+
pub fn register_event<Hash>(&mut self, event: &TransactionEvent<Hash>) {
136+
let Some(ref metrics) = self.metrics else {
137+
return;
138+
};
139+
140+
let (histogram, target_state) = match event {
141+
TransactionEvent::Validated => (&metrics.validated, "validated"),
142+
TransactionEvent::BestChainBlockIncluded(Some(_)) => (&metrics.in_block, "in_block"),
143+
TransactionEvent::BestChainBlockIncluded(None) => (&metrics.in_block, "retracted"),
144+
TransactionEvent::Finalized(..) => (&metrics.finalized, "finalized"),
145+
TransactionEvent::Error(..) => (&metrics.error, "error"),
146+
TransactionEvent::Dropped(..) => (&metrics.dropped, "dropped"),
147+
TransactionEvent::Invalid(..) => (&metrics.invalid, "invalid"),
148+
};
149+
150+
// Only record the state if it hasn't been reported before.
151+
if self.reported_states.insert(target_state) {
152+
histogram.observe(self.submitted_at.elapsed().as_secs_f64());
153+
}
154+
}
155+
}

substrate/client/rpc-spec-v2/src/transaction/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
#[cfg(test)]
2929
mod tests;
3030

31+
mod metrics;
32+
3133
pub mod api;
3234
pub mod error;
3335
pub mod event;
@@ -36,5 +38,6 @@ pub mod transaction_broadcast;
3638

3739
pub use api::{TransactionApiServer, TransactionBroadcastApiServer};
3840
pub use event::{TransactionBlock, TransactionDropped, TransactionError, TransactionEvent};
41+
pub use metrics::Metrics as TransactionMetrics;
3942
pub use transaction::Transaction;
4043
pub use transaction_broadcast::TransactionBroadcast;

substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ pub fn setup_api_tx() -> (
115115
let (task_executor, executor_recv) = TaskExecutorBroadcast::new();
116116

117117
let tx_api =
118-
RpcTransaction::new(client_mock.clone(), pool.clone(), Arc::new(task_executor)).into_rpc();
118+
RpcTransaction::new(client_mock.clone(), pool.clone(), Arc::new(task_executor), None)
119+
.into_rpc();
119120

120121
(api, pool, client_mock, tx_api, executor_recv, pool_state)
121122
}

substrate/client/rpc-spec-v2/src/transaction/transaction.rs

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ use crate::{
3030
use codec::Decode;
3131
use futures::{StreamExt, TryFutureExt};
3232
use jsonrpsee::{core::async_trait, PendingSubscriptionSink};
33+
34+
use super::metrics::{InstanceMetrics, Metrics};
35+
3336
use sc_rpc::utils::{RingBuffer, Subscription};
3437
use sc_transaction_pool_api::{
3538
error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource,
@@ -50,12 +53,19 @@ pub struct Transaction<Pool, Client> {
5053
pool: Arc<Pool>,
5154
/// Executor to spawn subscriptions.
5255
executor: SubscriptionTaskExecutor,
56+
/// Metrics for transactions.
57+
metrics: Option<Metrics>,
5358
}
5459

5560
impl<Pool, Client> Transaction<Pool, Client> {
5661
/// Creates a new [`Transaction`].
57-
pub fn new(client: Arc<Client>, pool: Arc<Pool>, executor: SubscriptionTaskExecutor) -> Self {
58-
Transaction { client, pool, executor }
62+
pub fn new(
63+
client: Arc<Client>,
64+
pool: Arc<Pool>,
65+
executor: SubscriptionTaskExecutor,
66+
metrics: Option<Metrics>,
67+
) -> Self {
68+
Transaction { client, pool, executor, metrics }
5969
}
6070
}
6171

@@ -78,6 +88,9 @@ where
7888
let client = self.client.clone();
7989
let pool = self.pool.clone();
8090

91+
// Get a new transaction metrics instance and increment the counter.
92+
let mut metrics = InstanceMetrics::new(self.metrics.clone());
93+
8194
let fut = async move {
8295
let decoded_extrinsic = match TransactionFor::<Pool>::decode(&mut &xt[..]) {
8396
Ok(decoded_extrinsic) => decoded_extrinsic,
@@ -86,12 +99,14 @@ where
8699

87100
let Ok(sink) = pending.accept().await.map(Subscription::from) else { return };
88101

102+
let event = TransactionEvent::Invalid::<BlockHash<Pool>>(TransactionError {
103+
error: "Extrinsic bytes cannot be decoded".into(),
104+
});
105+
106+
metrics.register_event(&event);
107+
89108
// The transaction is invalid.
90-
let _ = sink
91-
.send(&TransactionEvent::Invalid::<BlockHash<Pool>>(TransactionError {
92-
error: "Extrinsic bytes cannot be decoded".into(),
93-
}))
94-
.await;
109+
let _ = sink.send(&event).await;
95110
return
96111
},
97112
};
@@ -112,8 +127,17 @@ where
112127

113128
match submit.await {
114129
Ok(stream) => {
115-
let stream =
116-
stream.filter_map(move |event| async move { handle_event(event) }).boxed();
130+
let stream = stream
131+
.filter_map(|event| {
132+
let event = handle_event(event);
133+
134+
event.as_ref().inspect(|event| {
135+
metrics.register_event(event);
136+
});
137+
138+
async move { event }
139+
})
140+
.boxed();
117141

118142
// If the subscription is too slow older events will be overwritten.
119143
sink.pipe_from_stream(stream, RingBuffer::new(3)).await;
@@ -122,6 +146,9 @@ where
122146
// We have not created an `Watcher` for the tx. Make sure the
123147
// error is still propagated as an event.
124148
let event: TransactionEvent<<Pool::Block as BlockT>::Hash> = err.into();
149+
150+
metrics.register_event(&event);
151+
125152
_ = sink.send(&event).await;
126153
},
127154
};
@@ -134,7 +161,7 @@ where
134161
/// Handle events generated by the transaction-pool and convert them
135162
/// to the new API expected state.
136163
#[inline]
137-
pub fn handle_event<Hash: Clone, BlockHash: Clone>(
164+
fn handle_event<Hash: Clone, BlockHash: Clone>(
138165
event: TransactionStatus<Hash, BlockHash>,
139166
) -> Option<TransactionEvent<BlockHash>> {
140167
match event {

substrate/client/service/src/builder.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,14 @@ where
515515
let rpc_id_provider = config.rpc.id_provider.take();
516516

517517
// jsonrpsee RPC
518+
// RPC-V2 specific metrics need to be registered before the RPC server is started,
519+
// since we might have two instances running (one for the in-memory RPC and one for the network
520+
// RPC).
521+
let rpc_v2_metrics = config
522+
.prometheus_registry()
523+
.map(|registry| sc_rpc_spec_v2::transaction::TransactionMetrics::new(registry))
524+
.transpose()?;
525+
518526
let gen_rpc_module = || {
519527
gen_rpc_module(
520528
task_manager.spawn_handle(),
@@ -529,6 +537,7 @@ where
529537
config.blocks_pruning,
530538
backend.clone(),
531539
&*rpc_builder,
540+
rpc_v2_metrics.clone(),
532541
)
533542
};
534543

@@ -676,6 +685,7 @@ pub fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
676685
blocks_pruning: BlocksPruning,
677686
backend: Arc<TBackend>,
678687
rpc_builder: &(dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>),
688+
metrics: Option<sc_rpc_spec_v2::transaction::TransactionMetrics>,
679689
) -> Result<RpcModule<()>, Error>
680690
where
681691
TBl: BlockT,
@@ -731,6 +741,7 @@ where
731741
client.clone(),
732742
transaction_pool.clone(),
733743
task_executor.clone(),
744+
metrics,
734745
)
735746
.into_rpc();
736747

0 commit comments

Comments
 (0)