-
Notifications
You must be signed in to change notification settings - Fork 1.2k
tx/metrics: Add metrics for the RPC v2 transactionWatch_v1_submitAndWatch
#8345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
46eaa28
89fe6ae
c56660a
830b95d
bf88c82
bf4a235
a009f37
5b558a0
6ab0c8c
2cc6fd1
a07ca93
260e601
b0510d3
fe971b7
04d24c2
9b0febd
ca78079
4a8b0c0
96b0cbd
5540857
8c2c9a4
7c44dc1
1b21650
516d2b3
d45d520
8ad08b8
c975366
6ac204d
793825f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| title: 'tx/metrics: Add metrics for the RPC v2 `transactionWatch_v1_submitAndWatch`' | ||
| doc: | ||
| - audience: Node Operator | ||
| description: |- | ||
| This PR adds metrics for the following RPC subscription: [transactionWatch_v1_submitAndWatch](https://paritytech.github.io/json-rpc-interface-spec/api/transactionWatch_v1_submitAndWatch.html) | ||
|
|
||
| Metrics are exposed in two ways: | ||
| - simple counters of how many events we've seen globally | ||
| - a histogram vector of execution times, which is labeled by `initial event` -> `final event` | ||
| - This helps us identify how long it takes the transaction pool to advance the state of the events, and further debug issues | ||
|
|
||
| Part of: https://github.com/paritytech/polkadot-sdk/issues/8336 | ||
|
|
||
| ### (outdated) PoC Dashboards | ||
|
|
||
|  | ||
|
|
||
|
|
||
| ### Next steps | ||
| - [x] initial dashboards with a live node | ||
| - [x] adjust testing | ||
| crates: | ||
| - name: sc-service | ||
| bump: major | ||
| - name: sc-rpc-spec-v2 | ||
| bump: major |
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,155 @@ | ||||||||||||||||
| // This file is part of Substrate. | ||||||||||||||||
|
|
||||||||||||||||
| // Copyright (C) Parity Technologies (UK) Ltd. | ||||||||||||||||
| // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 | ||||||||||||||||
|
|
||||||||||||||||
| // This program is free software: you can redistribute it and/or modify | ||||||||||||||||
| // it under the terms of the GNU General Public License as published by | ||||||||||||||||
| // the Free Software Foundation, either version 3 of the License, or | ||||||||||||||||
| // (at your option) any later version. | ||||||||||||||||
|
|
||||||||||||||||
| // This program is distributed in the hope that it will be useful, | ||||||||||||||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||||||||||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||||||||||||||
| // GNU General Public License for more details. | ||||||||||||||||
|
|
||||||||||||||||
| // You should have received a copy of the GNU General Public License | ||||||||||||||||
| // along with this program. If not, see <https://www.gnu.org/licenses/>. | ||||||||||||||||
|
|
||||||||||||||||
| //! Metrics for recording transaction events. | ||||||||||||||||
|
|
||||||||||||||||
| use std::{collections::HashSet, time::Instant}; | ||||||||||||||||
|
|
||||||||||||||||
| use prometheus_endpoint::{ | ||||||||||||||||
| exponential_buckets, linear_buckets, register, Histogram, HistogramOpts, PrometheusError, | ||||||||||||||||
| Registry, | ||||||||||||||||
| }; | ||||||||||||||||
|
|
||||||||||||||||
| use super::TransactionEvent; | ||||||||||||||||
|
|
||||||||||||||||
| /// RPC layer metrics for transaction pool. | ||||||||||||||||
| #[derive(Debug, Clone)] | ||||||||||||||||
| pub struct Metrics { | ||||||||||||||||
| validated: Histogram, | ||||||||||||||||
| in_block: Histogram, | ||||||||||||||||
| finalized: Histogram, | ||||||||||||||||
| dropped: Histogram, | ||||||||||||||||
| invalid: Histogram, | ||||||||||||||||
| error: Histogram, | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| impl Metrics { | ||||||||||||||||
| /// Creates a new [`Metrics`] instance. | ||||||||||||||||
| pub fn new(registry: &Registry) -> Result<Self, PrometheusError> { | ||||||||||||||||
| let validated = register( | ||||||||||||||||
| Histogram::with_opts( | ||||||||||||||||
| HistogramOpts::new( | ||||||||||||||||
| "rpc_transaction_validation_time", | ||||||||||||||||
| "RPC Transaction validation time in seconds", | ||||||||||||||||
| ) | ||||||||||||||||
| .buckets(exponential_buckets(0.01, 2.0, 16).expect("Valid buckets; qed")), | ||||||||||||||||
| )?, | ||||||||||||||||
| registry, | ||||||||||||||||
| )?; | ||||||||||||||||
|
|
||||||||||||||||
| let in_block = register( | ||||||||||||||||
| Histogram::with_opts( | ||||||||||||||||
| HistogramOpts::new( | ||||||||||||||||
| "rpc_transaction_in_block_time", | ||||||||||||||||
| "RPC Transaction in block time in seconds", | ||||||||||||||||
| ) | ||||||||||||||||
| .buckets(linear_buckets(0.0, 3.0, 20).expect("Valid buckets; qed")), | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @olliecorbisiero here are new buckets for in-block. Buckets for internal pool events (they you are using now in dashboard) were exactly the same: polkadot-sdk/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs Lines 141 to 147 in 23c41b2
Let us know what would work for you? |
||||||||||||||||
| )?, | ||||||||||||||||
| registry, | ||||||||||||||||
| )?; | ||||||||||||||||
|
|
||||||||||||||||
| let finalized = register( | ||||||||||||||||
| Histogram::with_opts( | ||||||||||||||||
| HistogramOpts::new( | ||||||||||||||||
| "rpc_transaction_finalized_time", | ||||||||||||||||
| "RPC Transaction finalized time in seconds", | ||||||||||||||||
| ) | ||||||||||||||||
| .buckets(linear_buckets(0.01, 40.0, 20).expect("Valid buckets; qed")), | ||||||||||||||||
| )?, | ||||||||||||||||
| registry, | ||||||||||||||||
| )?; | ||||||||||||||||
|
|
||||||||||||||||
| let dropped = register( | ||||||||||||||||
| Histogram::with_opts( | ||||||||||||||||
| HistogramOpts::new( | ||||||||||||||||
| "rpc_transaction_dropped_time", | ||||||||||||||||
| "RPC Transaction dropped time in seconds", | ||||||||||||||||
| ) | ||||||||||||||||
| .buckets(linear_buckets(0.01, 3.0, 20).expect("Valid buckets; qed")), | ||||||||||||||||
| )?, | ||||||||||||||||
| registry, | ||||||||||||||||
| )?; | ||||||||||||||||
|
|
||||||||||||||||
| let invalid = register( | ||||||||||||||||
| Histogram::with_opts( | ||||||||||||||||
| HistogramOpts::new( | ||||||||||||||||
| "rpc_transaction_invalid_time", | ||||||||||||||||
| "RPC Transaction invalid time in seconds", | ||||||||||||||||
| ) | ||||||||||||||||
| .buckets(linear_buckets(0.01, 3.0, 20).expect("Valid buckets; qed")), | ||||||||||||||||
| )?, | ||||||||||||||||
| registry, | ||||||||||||||||
| )?; | ||||||||||||||||
|
|
||||||||||||||||
| let error = register( | ||||||||||||||||
| Histogram::with_opts( | ||||||||||||||||
| HistogramOpts::new( | ||||||||||||||||
| "rpc_transaction_error_time", | ||||||||||||||||
| "RPC Transaction error time in seconds", | ||||||||||||||||
| ) | ||||||||||||||||
| .buckets(linear_buckets(0.01, 3.0, 20).expect("Valid buckets; qed")), | ||||||||||||||||
| )?, | ||||||||||||||||
| registry, | ||||||||||||||||
| )?; | ||||||||||||||||
|
|
||||||||||||||||
| Ok(Metrics { validated, in_block, finalized, dropped, invalid, error }) | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /// Transaction metrics for a single transaction instance. | ||||||||||||||||
| pub struct InstanceMetrics { | ||||||||||||||||
| /// The metrics instance. | ||||||||||||||||
| metrics: Option<Metrics>, | ||||||||||||||||
| /// The time when the transaction was submitted. | ||||||||||||||||
| submitted_at: Instant, | ||||||||||||||||
| /// Ensure the states are reported once. | ||||||||||||||||
| reported_states: HashSet<&'static str>, | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| impl InstanceMetrics { | ||||||||||||||||
| /// Creates a new [`InstanceMetrics`] instance. | ||||||||||||||||
| pub fn new(metrics: Option<Metrics>) -> Self { | ||||||||||||||||
| Self { metrics, submitted_at: Instant::now(), reported_states: HashSet::new() } | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /// Record the execution time of a transaction state. | ||||||||||||||||
| /// | ||||||||||||||||
| /// This represents how long it took for the transaction to move to the next state. | ||||||||||||||||
| /// | ||||||||||||||||
| /// The method must be called before the transaction event is provided to the user. | ||||||||||||||||
| pub fn register_event<Hash>(&mut self, event: &TransactionEvent<Hash>) { | ||||||||||||||||
| let Some(ref metrics) = self.metrics else { | ||||||||||||||||
| return; | ||||||||||||||||
| }; | ||||||||||||||||
|
|
||||||||||||||||
| let (histogram, target_state) = match event { | ||||||||||||||||
| TransactionEvent::Validated => (&metrics.validated, "validated"), | ||||||||||||||||
| TransactionEvent::BestChainBlockIncluded(Some(_)) => (&metrics.in_block, "in_block"), | ||||||||||||||||
| TransactionEvent::BestChainBlockIncluded(None) => (&metrics.in_block, "retracted"), | ||||||||||||||||
| TransactionEvent::Finalized(..) => (&metrics.finalized, "finalized"), | ||||||||||||||||
| TransactionEvent::Error(..) => (&metrics.error, "error"), | ||||||||||||||||
| TransactionEvent::Dropped(..) => (&metrics.dropped, "dropped"), | ||||||||||||||||
| TransactionEvent::Invalid(..) => (&metrics.invalid, "invalid"), | ||||||||||||||||
| }; | ||||||||||||||||
|
|
||||||||||||||||
| // Only record the state if it hasn't been reported before. | ||||||||||||||||
| if self.reported_states.insert(target_state) { | ||||||||||||||||
| histogram.observe(self.submitted_at.elapsed().as_secs_f64()); | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,9 @@ use crate::{ | |
| use codec::Decode; | ||
| use futures::{StreamExt, TryFutureExt}; | ||
| use jsonrpsee::{core::async_trait, PendingSubscriptionSink}; | ||
|
|
||
| use super::metrics::{InstanceMetrics, Metrics}; | ||
|
|
||
| use sc_rpc::utils::{RingBuffer, Subscription}; | ||
| use sc_transaction_pool_api::{ | ||
| error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource, | ||
|
|
@@ -50,12 +53,19 @@ pub struct Transaction<Pool, Client> { | |
| pool: Arc<Pool>, | ||
| /// Executor to spawn subscriptions. | ||
| executor: SubscriptionTaskExecutor, | ||
| /// Metrics for transactions. | ||
| metrics: Option<Metrics>, | ||
| } | ||
|
|
||
| impl<Pool, Client> Transaction<Pool, Client> { | ||
| /// Creates a new [`Transaction`]. | ||
| pub fn new(client: Arc<Client>, pool: Arc<Pool>, executor: SubscriptionTaskExecutor) -> Self { | ||
| Transaction { client, pool, executor } | ||
| pub fn new( | ||
| client: Arc<Client>, | ||
| pool: Arc<Pool>, | ||
| executor: SubscriptionTaskExecutor, | ||
| metrics: Option<Metrics>, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That you need to take here the |
||
| ) -> Self { | ||
| Transaction { client, pool, executor, metrics } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -78,6 +88,9 @@ where | |
| let client = self.client.clone(); | ||
| let pool = self.pool.clone(); | ||
|
|
||
| // Get a new transaction metrics instance and increment the counter. | ||
| let mut metrics = InstanceMetrics::new(self.metrics.clone()); | ||
|
|
||
| let fut = async move { | ||
| let decoded_extrinsic = match TransactionFor::<Pool>::decode(&mut &xt[..]) { | ||
| Ok(decoded_extrinsic) => decoded_extrinsic, | ||
|
|
@@ -86,12 +99,14 @@ where | |
|
|
||
| let Ok(sink) = pending.accept().await.map(Subscription::from) else { return }; | ||
|
|
||
| let event = TransactionEvent::Invalid::<BlockHash<Pool>>(TransactionError { | ||
| error: "Extrinsic bytes cannot be decoded".into(), | ||
| }); | ||
|
|
||
| metrics.register_event(&event); | ||
|
|
||
| // The transaction is invalid. | ||
| let _ = sink | ||
| .send(&TransactionEvent::Invalid::<BlockHash<Pool>>(TransactionError { | ||
| error: "Extrinsic bytes cannot be decoded".into(), | ||
| })) | ||
| .await; | ||
| let _ = sink.send(&event).await; | ||
| return | ||
| }, | ||
| }; | ||
|
|
@@ -112,8 +127,17 @@ where | |
|
|
||
| match submit.await { | ||
| Ok(stream) => { | ||
| let stream = | ||
| stream.filter_map(move |event| async move { handle_event(event) }).boxed(); | ||
| let stream = stream | ||
| .filter_map(|event| { | ||
| let event = handle_event(event); | ||
|
|
||
| event.as_ref().inspect(|event| { | ||
| metrics.register_event(event); | ||
| }); | ||
|
|
||
| async move { event } | ||
| }) | ||
| .boxed(); | ||
|
|
||
| // If the subscription is too slow older events will be overwritten. | ||
| sink.pipe_from_stream(stream, RingBuffer::new(3)).await; | ||
|
|
@@ -122,6 +146,9 @@ where | |
| // We have not created an `Watcher` for the tx. Make sure the | ||
| // error is still propagated as an event. | ||
| let event: TransactionEvent<<Pool::Block as BlockT>::Hash> = err.into(); | ||
|
|
||
| metrics.register_event(&event); | ||
|
|
||
| _ = sink.send(&event).await; | ||
| }, | ||
| }; | ||
|
|
@@ -134,7 +161,7 @@ where | |
| /// Handle events generated by the transaction-pool and convert them | ||
| /// to the new API expected state. | ||
| #[inline] | ||
| pub fn handle_event<Hash: Clone, BlockHash: Clone>( | ||
| fn handle_event<Hash: Clone, BlockHash: Clone>( | ||
| event: TransactionStatus<Hash, BlockHash>, | ||
| ) -> Option<TransactionEvent<BlockHash>> { | ||
| match event { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.