Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
12c57c9
rpc: upgrade jsonrpsee v0.23
niklasad1 May 27, 2024
0aadd3d
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23
niklasad1 Jun 7, 2024
15691d6
cleanup
niklasad1 Jun 7, 2024
f681f0e
make it compile
niklasad1 Jun 7, 2024
bd74645
fix test build
niklasad1 Jun 7, 2024
88a7c00
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23
niklasad1 Jun 7, 2024
369db7e
jsonrpsee v0.23.1
niklasad1 Jun 10, 2024
fa8cda3
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23
niklasad1 Jun 10, 2024
a2418a2
remove needless deps
niklasad1 Jun 12, 2024
b198759
log conn_data when subscriptions lagged
niklasad1 Jun 18, 2024
ff100b8
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23-…
niklasad1 Jun 18, 2024
c059759
rpc: add prometheous metrics dropped subscriptions
niklasad1 Jun 17, 2024
c463349
cleanup
niklasad1 Jun 19, 2024
c7be842
fix nit2
niklasad1 Jun 19, 2024
c0bd2ed
bridges: add `serde_json` dependency
niklasad1 Jun 19, 2024
d0ae05c
more nits
niklasad1 Jun 19, 2024
06fd1ba
fix tests
niklasad1 Jun 20, 2024
1e1b506
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23-…
niklasad1 Jun 20, 2024
877fe59
fix more tests
niklasad1 Jun 20, 2024
a74655e
Update substrate/client/rpc/src/utils.rs
niklasad1 Jun 21, 2024
8e1678f
Update substrate/client/rpc/src/utils.rs
niklasad1 Jun 21, 2024
7889147
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23-…
niklasad1 Jun 26, 2024
de221dc
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23-…
niklasad1 Jun 26, 2024
81d555d
cargo fmt
niklasad1 Jun 26, 2024
7208c17
Merge branch 'master' into na-jsonrpsee-v0.23-with-ip-addr
niklasad1 Jul 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions polkadot/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ where
let chain_name = chain_spec.name().to_string();
let genesis_hash = client.hash(0).ok().flatten().expect("Genesis block exists; qed");
let properties = chain_spec.properties();
// TODO: This should be a shared metrics instance.
let metrics = sc_rpc::SubscriptionMetrics::disabled();

io.merge(ChainSpec::new(chain_name, genesis_hash, properties).into_rpc())?;
io.merge(StateMigration::new(client.clone(), backend.clone(), deny_unsafe).into_rpc())?;
Expand All @@ -167,6 +169,7 @@ where
shared_voter_state,
justification_stream,
finality_provider,
metrics.clone(),
)
.into_rpc(),
)?;
Expand All @@ -179,6 +182,7 @@ where
beefy.beefy_finality_proof_stream,
beefy.beefy_best_block_stream,
beefy.subscription_executor,
metrics,
)?
.into_rpc(),
)?;
Expand Down
4 changes: 4 additions & 0 deletions substrate/bin/node/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ where
let chain_name = chain_spec.name().to_string();
let genesis_hash = client.block_hash(0).ok().flatten().expect("Genesis block exists; qed");
let properties = chain_spec.properties();
let metrics = sc_rpc::SubscriptionMetrics::disabled();

io.merge(ChainSpec::new(chain_name, genesis_hash, properties).into_rpc())?;

io.merge(System::new(client.clone(), pool, deny_unsafe).into_rpc())?;
Expand Down Expand Up @@ -206,6 +208,7 @@ where
shared_voter_state,
justification_stream,
finality_provider,
metrics.clone(),
)
.into_rpc(),
)?;
Expand All @@ -231,6 +234,7 @@ where
beefy.beefy_finality_proof_stream,
beefy.beefy_best_block_stream,
beefy.subscription_executor,
metrics,
)?
.into_rpc(),
)?;
Expand Down
36 changes: 28 additions & 8 deletions substrate/client/consensus/beefy/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use parking_lot::RwLock;
use sp_consensus_beefy::AuthorityIdBound;
use std::sync::Arc;

use sc_rpc::{utils::pipe_from_stream, SubscriptionTaskExecutor};
use sc_rpc::{
utils::pipe_from_stream, SubscriptionMetrics, SubscriptionParams, SubscriptionTaskExecutor,
};
use sp_application_crypto::RuntimeAppPublic;
use sp_runtime::traits::Block as BlockT;

Expand All @@ -33,7 +35,7 @@ use jsonrpsee::{
core::async_trait,
proc_macros::rpc,
types::{ErrorObject, ErrorObjectOwned},
PendingSubscriptionSink,
ConnectionId, Extensions, PendingSubscriptionSink,
};
use log::warn;

Expand Down Expand Up @@ -87,6 +89,7 @@ pub trait BeefyApi<Notification, Hash> {
name = "beefy_subscribeJustifications" => "beefy_justifications",
unsubscribe = "beefy_unsubscribeJustifications",
item = Notification,
with_extensions
)]
fn subscribe_justifications(&self);

Expand All @@ -104,6 +107,7 @@ pub struct Beefy<Block: BlockT, AuthorityId: AuthorityIdBound> {
finality_proof_stream: BeefyVersionedFinalityProofStream<Block, AuthorityId>,
beefy_best_block: Arc<RwLock<Option<Block::Hash>>>,
executor: SubscriptionTaskExecutor,
metrics: SubscriptionMetrics,
}

impl<Block, AuthorityId> Beefy<Block, AuthorityId>
Expand All @@ -116,6 +120,7 @@ where
finality_proof_stream: BeefyVersionedFinalityProofStream<Block, AuthorityId>,
best_block_stream: BeefyBestBlockStream<Block>,
executor: SubscriptionTaskExecutor,
metrics: SubscriptionMetrics,
) -> Result<Self, Error> {
let beefy_best_block = Arc::new(RwLock::new(None));

Expand All @@ -127,7 +132,7 @@ where
});

executor.spawn("substrate-rpc-subscription", Some("rpc"), future.map(drop).boxed());
Ok(Self { finality_proof_stream, beefy_best_block, executor })
Ok(Self { finality_proof_stream, beefy_best_block, executor, metrics })
}
}

Expand All @@ -139,13 +144,23 @@ where
AuthorityId: AuthorityIdBound,
<AuthorityId as RuntimeAppPublic>::Signature: Send + Sync,
{
fn subscribe_justifications(&self, pending: PendingSubscriptionSink) {
fn subscribe_justifications(&self, pending: PendingSubscriptionSink, ext: &Extensions) {
let params = SubscriptionParams {
conn_id: *ext.get::<ConnectionId>().expect("ConnectionId is set"),
ip_addr: *ext.get::<std::net::IpAddr>().expect("IpAddr is set"),
method: "beefy_subscribeJustifications",
metrics: self.metrics.clone(),
};

let stream = self
.finality_proof_stream
.subscribe(100_000)
.map(|vfp| notification::EncodedVersionedFinalityProof::new::<Block, AuthorityId>(vfp));

sc_rpc::utils::spawn_subscription_task(&self.executor, pipe_from_stream(pending, stream));
sc_rpc::utils::spawn_subscription_task(
&self.executor,
pipe_from_stream(pending, stream, params),
);
}

async fn latest_finalized(&self) -> Result<Block::Hash, Error> {
Expand All @@ -163,6 +178,7 @@ mod tests {
communication::notification::BeefyVersionedFinalityProofSender,
justification::BeefyVersionedFinalityProof,
};
use sc_rpc::SubscriptionMetrics;
use sp_consensus_beefy::{ecdsa_crypto, known_payloads, Payload, SignedCommitment};
use sp_runtime::traits::{BlakeTwo256, Hash};
use substrate_test_runtime_client::runtime::Block;
Expand All @@ -184,9 +200,13 @@ mod tests {
let (finality_proof_sender, finality_proof_stream) =
BeefyVersionedFinalityProofStream::<Block, ecdsa_crypto::AuthorityId>::channel();

let handler =
Beefy::new(finality_proof_stream, best_block_stream, sc_rpc::testing::test_executor())
.expect("Setting up the BEEFY RPC handler works");
let handler = Beefy::new(
finality_proof_stream,
best_block_stream,
sc_rpc::testing::test_executor(),
SubscriptionMetrics::disabled(),
)
.expect("Setting up the BEEFY RPC handler works");

(handler.into_rpc(), finality_proof_sender)
}
Expand Down
34 changes: 29 additions & 5 deletions substrate/client/consensus/grandpa/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;
use jsonrpsee::{
core::{async_trait, server::PendingSubscriptionSink},
proc_macros::rpc,
ConnectionId, Extensions,
};

mod error;
Expand All @@ -38,7 +39,9 @@ use finality::{EncodedFinalityProof, RpcFinalityProofProvider};
use notification::JustificationNotification;
use report::{ReportAuthoritySet, ReportVoterState, ReportedRoundStates};
use sc_consensus_grandpa::GrandpaJustificationStream;
use sc_rpc::{utils::pipe_from_stream, SubscriptionTaskExecutor};
use sc_rpc::{
utils::pipe_from_stream, SubscriptionMetrics, SubscriptionParams, SubscriptionTaskExecutor,
};
use sp_runtime::traits::{Block as BlockT, NumberFor};

/// Provides RPC methods for interacting with GRANDPA.
Expand All @@ -54,7 +57,8 @@ pub trait GrandpaApi<Notification, Hash, Number> {
#[subscription(
name = "grandpa_subscribeJustifications" => "grandpa_justifications",
unsubscribe = "grandpa_unsubscribeJustifications",
item = Notification
item = Notification,
with_extensions
)]
fn subscribe_justifications(&self);

Expand All @@ -71,6 +75,7 @@ pub struct Grandpa<AuthoritySet, VoterState, Block: BlockT, ProofProvider> {
voter_state: VoterState,
justification_stream: GrandpaJustificationStream<Block>,
finality_proof_provider: Arc<ProofProvider>,
metrics: SubscriptionMetrics,
}
impl<AuthoritySet, VoterState, Block: BlockT, ProofProvider>
Grandpa<AuthoritySet, VoterState, Block, ProofProvider>
Expand All @@ -82,8 +87,16 @@ impl<AuthoritySet, VoterState, Block: BlockT, ProofProvider>
voter_state: VoterState,
justification_stream: GrandpaJustificationStream<Block>,
finality_proof_provider: Arc<ProofProvider>,
metrics: SubscriptionMetrics,
) -> Self {
Self { executor, authority_set, voter_state, justification_stream, finality_proof_provider }
Self {
executor,
authority_set,
voter_state,
justification_stream,
finality_proof_provider,
metrics,
}
}
}

Expand All @@ -101,14 +114,24 @@ where
ReportedRoundStates::from(&self.authority_set, &self.voter_state)
}

fn subscribe_justifications(&self, pending: PendingSubscriptionSink) {
fn subscribe_justifications(&self, pending: PendingSubscriptionSink, ext: &Extensions) {
let params = SubscriptionParams {
conn_id: *ext.get::<ConnectionId>().expect("ConnectionId is set"),
ip_addr: *ext.get::<std::net::IpAddr>().expect("IpAddr is set"),
method: "grandpa_subscribeJustifications",
metrics: self.metrics.clone(),
};

let stream = self.justification_stream.subscribe(100_000).map(
|x: sc_consensus_grandpa::GrandpaJustification<Block>| {
JustificationNotification::from(x)
},
);

sc_rpc::utils::spawn_subscription_task(&self.executor, pipe_from_stream(pending, stream));
sc_rpc::utils::spawn_subscription_task(
&self.executor,
pipe_from_stream(pending, stream, params),
);
}

async fn prove_finality(
Expand Down Expand Up @@ -260,6 +283,7 @@ mod tests {
voter_state,
justification_stream,
finality_proof_provider,
SubscriptionMetrics::disabled(),
)
.into_rpc();

Expand Down
1 change: 1 addition & 0 deletions substrate/client/rpc-api/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub trait AuthorApi<Hash, BlockHash> {
name = "author_submitAndWatchExtrinsic" => "author_extrinsicUpdate",
unsubscribe = "author_unwatchExtrinsic",
item = TransactionStatus<Hash, BlockHash>,
with_extensions
)]
fn watch_extrinsic(&self, bytes: Bytes);
}
9 changes: 6 additions & 3 deletions substrate/client/rpc-api/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ pub trait ChainApi<Number, Hash, Header, SignedBlock> {
#[subscription(
name = "chain_subscribeAllHeads" => "chain_allHead",
unsubscribe = "chain_unsubscribeAllHeads",
item = Header
item = Header,
with_extensions,
)]
fn subscribe_all_heads(&self);

Expand All @@ -61,7 +62,8 @@ pub trait ChainApi<Number, Hash, Header, SignedBlock> {
aliases = ["subscribe_newHead", "chain_subscribeNewHead"],
unsubscribe = "chain_unsubscribeNewHeads",
unsubscribe_aliases = ["unsubscribe_newHead", "chain_unsubscribeNewHead"],
item = Header
item = Header,
with_extensions
)]
fn subscribe_new_heads(&self);

Expand All @@ -71,7 +73,8 @@ pub trait ChainApi<Number, Hash, Header, SignedBlock> {
aliases = ["chain_subscribeFinalisedHeads"],
unsubscribe = "chain_unsubscribeFinalizedHeads",
unsubscribe_aliases = ["chain_unsubscribeFinalisedHeads"],
item = Header
item = Header,
with_extensions
)]
fn subscribe_finalized_heads(&self);
}
2 changes: 2 additions & 0 deletions substrate/client/rpc-api/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub trait StateApi<Hash> {
aliases = ["chain_subscribeRuntimeVersion"],
unsubscribe_aliases = ["chain_unsubscribeRuntimeVersion"],
item = RuntimeVersion,
with_extensions,
)]
fn subscribe_runtime_version(&self);

Expand All @@ -136,6 +137,7 @@ pub trait StateApi<Hash> {
name = "state_subscribeStorage" => "state_storage",
unsubscribe = "state_unsubscribeStorage",
item = StorageChangeSet<Hash>,
with_extensions
)]
fn subscribe_storage(&self, keys: Option<Vec<StorageKey>>);

Expand Down
4 changes: 3 additions & 1 deletion substrate/client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ where

let ip = remote_addr.ip();
let cfg2 = cfg.clone();
let svc = tower::service_fn(move |req: http::Request<hyper::body::Incoming>| {
let svc = tower::service_fn(move |mut req: http::Request<hyper::body::Incoming>| {
let PerConnection {
methods,
service_builder,
Expand All @@ -194,6 +194,8 @@ where
let proxy_ip =
if rate_limit_trust_proxy_headers { get_proxy_ip(&req) } else { None };

req.extensions_mut().insert(proxy_ip.unwrap_or(ip));

let rate_limit_cfg = if rate_limit_whitelisted_ips
.iter()
.any(|ips| ips.contains(proxy_ip.unwrap_or(ip)))
Expand Down
1 change: 1 addition & 0 deletions substrate/client/rpc-spec-v2/src/transaction/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub trait TransactionApi<Hash: Clone> {
name = "transactionWatch_v1_submitAndWatch" => "transactionWatch_v1_watchEvent",
unsubscribe = "transactionWatch_v1_unwatch",
item = TransactionEvent<Hash>,
with_extensions,
)]
fn submit_and_watch(&self, bytes: Bytes);
}
Expand Down
10 changes: 8 additions & 2 deletions substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
};
use futures::Future;
use jsonrpsee::RpcModule;
use sc_rpc::SubscriptionMetrics;
use sc_transaction_pool::*;
use std::{pin::Pin, sync::Arc};
use substrate_test_runtime_client::{prelude::*, Client};
Expand Down Expand Up @@ -114,8 +115,13 @@ pub fn setup_api_tx() -> (
let client_mock = Arc::new(ChainHeadMockClient::new(client.clone()));
let (task_executor, executor_recv) = TaskExecutorBroadcast::new();

let tx_api =
RpcTransaction::new(client_mock.clone(), pool.clone(), Arc::new(task_executor)).into_rpc();
let tx_api = RpcTransaction::new(
client_mock.clone(),
pool.clone(),
Arc::new(task_executor),
SubscriptionMetrics::disabled(),
)
.into_rpc();

(api, pool, client_mock, tx_api, executor_recv, pool_state)
}
Expand Down
Loading