diff --git a/Cargo.lock b/Cargo.lock index 242ea57ebd4..69c19ddd6d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2389,6 +2389,7 @@ dependencies = [ "jsonrpsee-core", "jsonrpsee-http-client", "jsonrpsee-types", + "jsonrpsee-wasm-client", "jsonrpsee-ws-client", ] @@ -2472,6 +2473,17 @@ dependencies = [ "thiserror", ] +[[package]] +name = "jsonrpsee-wasm-client" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f448d8eacd945cc17b6c0b42c361531ca36a962ee186342a97cdb8fca679cd77" +dependencies = [ + "jsonrpsee-client-transport", + "jsonrpsee-core", + "jsonrpsee-types", +] + [[package]] name = "jsonrpsee-ws-client" version = "0.22.5" @@ -3397,6 +3409,7 @@ dependencies = [ "thiserror", "tokio", "tracing", + "wasm-bindgen-futures", ] [[package]] diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index 698e85a4501..5229fce56be 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -28,7 +28,8 @@ native = [ "jsonrpsee?/async-client", "jsonrpsee?/client-ws-transport-native-tls", "subxt-lightclient?/native", - "tokio-util" + "tokio-util", + "reconnecting-jsonrpsee-ws-client?/native", ] # Enable this for web/wasm builds. @@ -39,7 +40,8 @@ web = [ "getrandom/js", "subxt-lightclient?/web", "subxt-macro/web", - "instant/wasm-bindgen" + "instant/wasm-bindgen", + "reconnecting-jsonrpsee-ws-client?/web", ] # Enable this to use the reconnecting rpc client @@ -99,7 +101,7 @@ subxt-metadata = { workspace = true, features = ["std"] } subxt-lightclient = { workspace = true, optional = true, default-features = false } # Reconnecting jsonrpc ws client -reconnecting-jsonrpsee-ws-client = { version = "0.4", optional = true } +reconnecting-jsonrpsee-ws-client = { version = "0.4", optional = true, default-features = false } # For parsing urls to disallow insecure schemes url = { workspace = true } @@ -137,8 +139,8 @@ path = "examples/light_client_local_node.rs" required-features = ["unstable-light-client", "jsonrpsee", "native"] [[example]] -name = "reconnecting_rpc_client" -path = "examples/reconnecting_rpc_client.rs" +name = "setup_reconnecting_rpc_client" +path = "examples/setup_reconnecting_rpc_client.rs" required-features = ["unstable-reconnecting-rpc-client"] [package.metadata.docs.rs] diff --git a/subxt/examples/reconnecting_rpc_client.rs b/subxt/examples/reconnecting_rpc_client.rs deleted file mode 100644 index b21be899f67..00000000000 --- a/subxt/examples/reconnecting_rpc_client.rs +++ /dev/null @@ -1,73 +0,0 @@ -//! Example to utilize the `reconnecting rpc client` in subxt -//! which hidden behind behind `--feature unstable-reconnecting-rpc-client` -//! -//! To utilize full logs from the RPC client use: -//! `RUST_LOG="jsonrpsee=trace,reconnecting_jsonrpsee_ws_client=trace"` - -#![allow(missing_docs)] - -use std::time::Duration; - -use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExponentialBackoff, PingConfig}; -use subxt::backend::rpc::RpcClient; -use subxt::error::{Error, RpcError}; -use subxt::{OnlineClient, PolkadotConfig}; - -// Generate an interface that we can use from the node's metadata. -#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata_small.scale")] -pub mod polkadot {} - -#[tokio::main] -async fn main() -> Result<(), Box> { - tracing_subscriber::fmt::init(); - - // Create a new client with with a reconnecting RPC client. - let rpc = Client::builder() - // Reconnect with exponential backoff - // - // This API is "iterator-like" so one could limit it to only - // reconnect x times and then quit. - .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10))) - // Send period WebSocket pings/pongs every 6th second and if it's not ACK:ed in 30 seconds - // then disconnect. - // - // This is just a way to ensure that the connection isn't idle if no message is sent that often - .enable_ws_ping( - PingConfig::new() - .ping_interval(Duration::from_secs(6)) - .inactive_limit(Duration::from_secs(30)), - ) - // There are other configurations as well that can be found here: - // - .build("ws://localhost:9944".to_string()) - .await?; - - let api: OnlineClient = - OnlineClient::from_rpc_client(RpcClient::new(rpc.clone())).await?; - - // Subscribe to all finalized blocks: - let mut blocks_sub = api.blocks().subscribe_finalized().await?; - - // For each block, print a bunch of information about it: - while let Some(block) = blocks_sub.next().await { - let block = match block { - Ok(b) => b, - Err(Error::Rpc(RpcError::DisconnectedWillReconnect(err))) => { - println!("{err}"); - continue; - } - Err(e) => { - return Err(e.into()); - } - }; - - let block_number = block.header().number; - let block_hash = block.hash(); - - println!("Block #{block_number} ({block_hash})"); - } - - println!("RPC client reconnected `{}` times", rpc.reconnect_count()); - - Ok(()) -} diff --git a/subxt/examples/setup_reconnecting_rpc_client.rs b/subxt/examples/setup_reconnecting_rpc_client.rs new file mode 100644 index 00000000000..c393f2cffa6 --- /dev/null +++ b/subxt/examples/setup_reconnecting_rpc_client.rs @@ -0,0 +1,102 @@ +//! Example to utilize the `reconnecting rpc client` in subxt +//! which hidden behind behind `--feature unstable-reconnecting-rpc-client` +//! +//! To utilize full logs from the RPC client use: +//! `RUST_LOG="jsonrpsee=trace,reconnecting_jsonrpsee_ws_client=trace"` + +#![allow(missing_docs)] + +use std::time::Duration; + +use futures::StreamExt; +use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExponentialBackoff}; +use subxt::{OnlineClient, PolkadotConfig}; + +// Generate an interface that we can use from the node's metadata. +#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata_small.scale")] +pub mod polkadot {} + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + // Create a new client with with a reconnecting RPC client. + let rpc = Client::builder() + // Reconnect with exponential backoff + // + // This API is "iterator-like" and we use `take` to limit the number of retries. + .retry_policy( + ExponentialBackoff::from_millis(100) + .max_delay(Duration::from_secs(10)) + .take(3), + ) + // There are other configurations as well that can be found at [`reconnecting_rpc_client::ClientBuilder`]. + .build("ws://localhost:9944".to_string()) + .await?; + + // If you want to use the unstable backend with the reconnecting RPC client, you can do so like this: + // + // ``` + // use subxt::backend::unstable::UnstableBackend; + // use subxt::OnlineClient; + // + // let (backend, mut driver) = UnstableBackend::builder().build(RpcClient::new(rpc.clone())); + // tokio::spawn(async move { + // while let Some(val) = driver.next().await { + // if let Err(e) = val { + // eprintln!("Error driving unstable backend: {e}; terminating client"); + // } + // } + // }); + // let api: OnlineClient = OnlineClient::from_backend(Arc::new(backend)).await?; + // ``` + + let api: OnlineClient = OnlineClient::from_rpc_client(rpc.clone()).await?; + + // Optionally print if the RPC client reconnects. + let rpc2 = rpc.clone(); + tokio::spawn(async move { + loop { + // The connection was lost and the client is trying to reconnect. + let reconnected = rpc2.reconnect_initiated().await; + let now = std::time::Instant::now(); + // The connection was re-established. + reconnected.await; + println!( + "RPC client reconnection took `{}s`", + now.elapsed().as_secs() + ); + } + }); + + // Run for at most 100 blocks and print a bunch of information about it. + // + // The subscription is automatically re-started when the RPC client has reconnected. + // You can test that by stopping the polkadot node and restarting it. + let mut blocks_sub = api.blocks().subscribe_finalized().await?.take(100); + + while let Some(block) = blocks_sub.next().await { + let block = match block { + Ok(b) => b, + Err(e) => { + // This can only happen on the legacy backend and the unstable backend + // will handle this internally. + if e.is_disconnected_will_reconnect() { + println!("The RPC connection was lost and we may have missed a few blocks"); + continue; + } + + return Err(e.into()); + } + }; + + let block_number = block.number(); + let block_hash = block.hash(); + + println!("Block #{block_number} ({block_hash})"); + } + + println!("RPC client reconnected `{}` times", rpc.reconnect_count()); + + Ok(()) +} diff --git a/subxt/src/backend/legacy/mod.rs b/subxt/src/backend/legacy/mod.rs index 425cf0c8bfa..44fd606f996 100644 --- a/subxt/src/backend/legacy/mod.rs +++ b/subxt/src/backend/legacy/mod.rs @@ -8,10 +8,12 @@ pub mod rpc_methods; use self::rpc_methods::TransactionStatus as RpcTransactionStatus; +use crate::backend::utils::{retry, retry_stream}; use crate::backend::{ rpc::RpcClient, Backend, BlockRef, RuntimeVersion, StorageResponse, StreamOf, StreamOfResults, TransactionStatus, }; +use crate::error::RpcError; use crate::{config::Header, Config, Error}; use async_trait::async_trait; use futures::{future, future::Either, stream, Future, FutureExt, Stream, StreamExt}; @@ -62,12 +64,21 @@ impl LegacyBackendBuilder { } /// The legacy backend. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct LegacyBackend { storage_page_size: u32, methods: LegacyRpcMethods, } +impl Clone for LegacyBackend { + fn clone(&self) -> LegacyBackend { + LegacyBackend { + storage_page_size: self.storage_page_size, + methods: self.methods.clone(), + } + } +} + impl LegacyBackend { /// Configure and construct an [`LegacyBackend`]. pub fn builder() -> LegacyBackendBuilder { @@ -84,24 +95,28 @@ impl Backend for LegacyBackend { keys: Vec>, at: T::Hash, ) -> Result, Error> { - let methods = self.methods.clone(); + retry(|| async { + let keys = keys.clone(); + let methods = self.methods.clone(); - // For each key, return it + a future to get the result. - let iter = keys.into_iter().map(move |key| { - let methods = methods.clone(); - async move { - let res = methods.state_get_storage(&key, Some(at)).await?; - Ok(res.map(|value| StorageResponse { key, value })) - } - }); + // For each key, return it + a future to get the result. + let iter = keys.into_iter().map(move |key| { + let methods = methods.clone(); + async move { + let res = methods.state_get_storage(&key, Some(at)).await?; + Ok(res.map(|value| StorageResponse { key, value })) + } + }); - let s = stream::iter(iter) - // Resolve the future - .then(|fut| fut) - // Filter any Options out (ie if we didn't find a value at some key we return nothing for it). - .filter_map(|r| future::ready(r.transpose())); + let s = stream::iter(iter) + // Resolve the future + .then(|fut| fut) + // Filter any Options out (ie if we didn't find a value at some key we return nothing for it). + .filter_map(|r| future::ready(r.transpose())); - Ok(StreamOf(Box::pin(s))) + Ok(StreamOf(Box::pin(s))) + }) + .await } async fn storage_fetch_descendant_keys( @@ -158,99 +173,159 @@ impl Backend for LegacyBackend { } async fn genesis_hash(&self) -> Result { - self.methods.genesis_hash().await + retry(|| self.methods.genesis_hash()).await } async fn block_header(&self, at: T::Hash) -> Result, Error> { - self.methods.chain_get_header(Some(at)).await + retry(|| self.methods.chain_get_header(Some(at))).await } async fn block_body(&self, at: T::Hash) -> Result>>, Error> { - let Some(details) = self.methods.chain_get_block(Some(at)).await? else { - return Ok(None); - }; - Ok(Some( - details.block.extrinsics.into_iter().map(|b| b.0).collect(), - )) + retry(|| async { + let Some(details) = self.methods.chain_get_block(Some(at)).await? else { + return Ok(None); + }; + Ok(Some( + details.block.extrinsics.into_iter().map(|b| b.0).collect(), + )) + }) + .await } async fn latest_finalized_block_ref(&self) -> Result, Error> { - let hash = self.methods.chain_get_finalized_head().await?; - Ok(BlockRef::from_hash(hash)) + retry(|| async { + let hash = self.methods.chain_get_finalized_head().await?; + Ok(BlockRef::from_hash(hash)) + }) + .await } async fn current_runtime_version(&self) -> Result { - let details = self.methods.state_get_runtime_version(None).await?; - Ok(RuntimeVersion { - spec_version: details.spec_version, - transaction_version: details.transaction_version, + retry(|| async { + let details = self.methods.state_get_runtime_version(None).await?; + Ok(RuntimeVersion { + spec_version: details.spec_version, + transaction_version: details.transaction_version, + }) }) + .await } async fn stream_runtime_version(&self) -> Result, Error> { - let sub = self.methods.state_subscribe_runtime_version().await?; - let sub = sub.map(|r| { - r.map(|v| RuntimeVersion { - spec_version: v.spec_version, - transaction_version: v.transaction_version, + let methods = self.methods.clone(); + + let retry_sub = retry_stream(move || { + let methods = methods.clone(); + + Box::pin(async move { + let sub = methods.state_subscribe_runtime_version().await?; + let sub = sub.map(|r| { + r.map(|v| RuntimeVersion { + spec_version: v.spec_version, + transaction_version: v.transaction_version, + }) + }); + Ok(StreamOf(Box::pin(sub))) }) + }) + .await?; + + // For runtime version subscriptions we omit the `DisconnectedWillReconnect` error + // because the once it resubscribes it will emit the latest runtime version. + // + // Thus, it's technically possible that a runtime version can be missed if + // two runtime upgrades happen in quick succession, but this is very unlikely. + let stream = retry_sub.filter(|r| { + let forward = !matches!(r, Err(Error::Rpc(RpcError::DisconnectedWillReconnect(_)))); + async move { forward } }); - Ok(StreamOf(Box::pin(sub))) + + Ok(StreamOf(Box::pin(stream))) } async fn stream_all_block_headers( &self, ) -> Result)>, Error> { - let sub = self.methods.chain_subscribe_all_heads().await?; - let sub = sub.map(|r| { - r.map(|h| { - let hash = h.hash(); - (h, BlockRef::from_hash(hash)) + let methods = self.methods.clone(); + + let retry_sub = retry_stream(move || { + let methods = methods.clone(); + Box::pin(async move { + let sub = methods.chain_subscribe_all_heads().await?; + let sub = sub.map(|r| { + r.map(|h| { + let hash = h.hash(); + (h, BlockRef::from_hash(hash)) + }) + }); + Ok(StreamOf(Box::pin(sub))) }) - }); - Ok(StreamOf(Box::pin(sub))) + }) + .await?; + + Ok(retry_sub) } async fn stream_best_block_headers( &self, ) -> Result)>, Error> { - let sub = self.methods.chain_subscribe_new_heads().await?; - let sub = sub.map(|r| { - r.map(|h| { - let hash = h.hash(); - (h, BlockRef::from_hash(hash)) + let methods = self.methods.clone(); + + let retry_sub = retry_stream(move || { + let methods = methods.clone(); + Box::pin(async move { + let sub = methods.chain_subscribe_new_heads().await?; + let sub = sub.map(|r| { + r.map(|h| { + let hash = h.hash(); + (h, BlockRef::from_hash(hash)) + }) + }); + Ok(StreamOf(Box::pin(sub))) }) - }); - Ok(StreamOf(Box::pin(sub))) + }) + .await?; + + Ok(retry_sub) } async fn stream_finalized_block_headers( &self, ) -> Result)>, Error> { - let sub: super::rpc::RpcSubscription<::Header> = - self.methods.chain_subscribe_finalized_heads().await?; - - // Get the last finalized block immediately so that the stream will emit every finalized block after this. - let last_finalized_block_ref = self.latest_finalized_block_ref().await?; - let last_finalized_block_num = self - .block_header(last_finalized_block_ref.hash()) - .await? - .map(|h| h.number().into()); - - // Fill in any missing blocks, because the backend may not emit every finalized block; just the latest ones which - // are finalized each time. - let sub = subscribe_to_block_headers_filling_in_gaps( - self.methods.clone(), - sub, - last_finalized_block_num, - ); - let sub = sub.map(|r| { - r.map(|h| { - let hash = h.hash(); - (h, BlockRef::from_hash(hash)) + let this = self.clone(); + + let retry_sub = retry_stream(move || { + let this = this.clone(); + Box::pin(async move { + let sub = this.methods.chain_subscribe_finalized_heads().await?; + + // Get the last finalized block immediately so that the stream will emit every finalized block after this. + let last_finalized_block_ref = this.latest_finalized_block_ref().await?; + let last_finalized_block_num = this + .block_header(last_finalized_block_ref.hash()) + .await? + .map(|h| h.number().into()); + + // Fill in any missing blocks, because the backend may not emit every finalized block; just the latest ones which + // are finalized each time. + let sub = subscribe_to_block_headers_filling_in_gaps( + this.methods.clone(), + sub, + last_finalized_block_num, + ); + let sub = sub.map(|r| { + r.map(|h| { + let hash = h.hash(); + (h, BlockRef::from_hash(hash)) + }) + }); + + Ok(StreamOf(Box::pin(sub))) }) - }); - Ok(StreamOf(Box::pin(sub))) + }) + .await?; + + Ok(retry_sub) } async fn submit_transaction( @@ -261,6 +336,7 @@ impl Backend for LegacyBackend { .methods .author_submit_and_watch_extrinsic(extrinsic) .await?; + let sub = sub.filter_map(|r| { let mapped = r .map(|tx| { @@ -309,7 +385,8 @@ impl Backend for LegacyBackend { future::ready(mapped) }); - Ok(StreamOf(Box::pin(sub))) + + Ok(StreamOf::new(Box::pin(sub))) } async fn call( @@ -318,9 +395,7 @@ impl Backend for LegacyBackend { call_parameters: Option<&[u8]>, at: T::Hash, ) -> Result, Error> { - self.methods - .state_call(method, call_parameters, Some(at)) - .await + retry(|| self.methods.state_call(method, call_parameters, Some(at))).await } } @@ -431,6 +506,11 @@ impl Stream for StorageFetchDescendantKeysStream { return Poll::Ready(Some(Ok(keys))); } Err(e) => { + if e.is_disconnected_will_reconnect() { + this.keys_fut = Some(keys_fut); + continue; + } + // Error getting keys? Return it. return Poll::Ready(Some(Err(e))); } @@ -513,7 +593,9 @@ impl Stream for StorageFetchDescendantValuesStream { let at = this.keys.at; let results_fut = async move { let keys = keys.iter().map(|k| &**k); - let values = methods.state_query_storage_at(keys, Some(at)).await?; + let values = + retry(|| methods.state_query_storage_at(keys.clone(), Some(at))) + .await?; let values: VecDeque<_> = values .into_iter() .flat_map(|v| { diff --git a/subxt/src/backend/mod.rs b/subxt/src/backend/mod.rs index 63b2bee6246..6da2faa4ce8 100644 --- a/subxt/src/backend/mod.rs +++ b/subxt/src/backend/mod.rs @@ -9,6 +9,7 @@ pub mod legacy; pub mod rpc; pub mod unstable; +pub mod utils; use subxt_core::client::RuntimeVersion; diff --git a/subxt/src/backend/rpc/mod.rs b/subxt/src/backend/rpc/mod.rs index 453fcf5a7ff..bec5d9d86e9 100644 --- a/subxt/src/backend/rpc/mod.rs +++ b/subxt/src/backend/rpc/mod.rs @@ -65,8 +65,8 @@ crate::macros::cfg_unstable_light_client! { } crate::macros::cfg_reconnecting_rpc_client! { - mod reconnecting_jsonrpsee_impl; - pub use reconnecting_jsonrpsee_ws_client as reconnecting_rpc_client; + /// reconnecting rpc client. + pub mod reconnecting_rpc_client; } mod rpc_client; diff --git a/subxt/src/backend/rpc/reconnecting_jsonrpsee_impl.rs b/subxt/src/backend/rpc/reconnecting_jsonrpsee_impl.rs deleted file mode 100644 index da37b267e04..00000000000 --- a/subxt/src/backend/rpc/reconnecting_jsonrpsee_impl.rs +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2019-2023 Parity Technologies (UK) Ltd. -// This file is dual-licensed as Apache-2.0 or GPL-3.0. -// see LICENSE for license details. - -use super::{RawRpcFuture, RawRpcSubscription, RpcClientT}; -use crate::error::RpcError; -use futures::{FutureExt, StreamExt, TryStreamExt}; -use reconnecting_jsonrpsee_ws_client::SubscriptionId; -use serde_json::value::RawValue; - -impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client { - fn request_raw<'a>( - &'a self, - method: &'a str, - params: Option>, - ) -> RawRpcFuture<'a, Box> { - async { - self.request_raw(method.to_string(), params) - .await - .map_err(|e| RpcError::ClientError(Box::new(e))) - } - .boxed() - } - - fn subscribe_raw<'a>( - &'a self, - sub: &'a str, - params: Option>, - unsub: &'a str, - ) -> RawRpcFuture<'a, RawRpcSubscription> { - async { - let sub = self - .subscribe_raw(sub.to_string(), params, unsub.to_string()) - .await - .map_err(|e| RpcError::ClientError(Box::new(e)))?; - - let id = match sub.id() { - SubscriptionId::Num(n) => n.to_string(), - SubscriptionId::Str(s) => s.to_string(), - }; - let stream = sub - .map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string())) - .boxed(); - - Ok(RawRpcSubscription { - stream, - id: Some(id), - }) - } - .boxed() - } -} diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client.rs b/subxt/src/backend/rpc/reconnecting_rpc_client.rs new file mode 100644 index 00000000000..bfefa461c5f --- /dev/null +++ b/subxt/src/backend/rpc/reconnecting_rpc_client.rs @@ -0,0 +1,270 @@ +// Copyright 2019-2024 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +use super::{RawRpcFuture, RawRpcSubscription, RpcClientT}; +use crate::error::RpcError; +use futures::{Future, FutureExt, StreamExt, TryStreamExt}; +use reconnecting_jsonrpsee_ws_client::{CallRetryPolicy, Client as InnerClient, SubscriptionId}; +use serde_json::value::RawValue; +use std::time::Duration; + +pub use reconnecting_jsonrpsee_ws_client::{ + ExponentialBackoff, FibonacciBackoff, FixedInterval, IdKind, +}; + +#[cfg(feature = "native")] +use reconnecting_jsonrpsee_ws_client::{HeaderMap, PingConfig}; + +/// Builder for [`Client`]. +#[derive(Debug, Clone)] +pub struct Builder

{ + max_request_size: u32, + max_response_size: u32, + retry_policy: P, + max_redirections: u32, + id_kind: IdKind, + max_log_len: u32, + max_concurrent_requests: u32, + request_timeout: Duration, + connection_timeout: Duration, + #[cfg(feature = "native")] + ping_config: Option, + #[cfg(feature = "native")] + headers: HeaderMap, +} + +impl Default for Builder { + fn default() -> Self { + Self { + max_request_size: 10 * 1024 * 1024, + max_response_size: 10 * 1024 * 1024, + retry_policy: ExponentialBackoff::from_millis(10).max_delay(Duration::from_secs(60)), + max_redirections: 5, + id_kind: IdKind::Number, + max_log_len: 1024, + max_concurrent_requests: 1024, + request_timeout: Duration::from_secs(60), + connection_timeout: Duration::from_secs(10), + #[cfg(feature = "native")] + ping_config: Some(PingConfig::new()), + #[cfg(feature = "native")] + headers: HeaderMap::new(), + } + } +} + +impl Builder { + /// Create a new builder. + pub fn new() -> Self { + Self::default() + } +} + +impl

Builder

+where + P: Iterator + Send + Sync + 'static + Clone, +{ + /// Configure the min response size a for websocket message. + /// + /// Default: 10MB + pub fn max_request_size(mut self, max: u32) -> Self { + self.max_request_size = max; + self + } + + /// Configure the max response size a for websocket message. + /// + /// Default: 10MB + pub fn max_response_size(mut self, max: u32) -> Self { + self.max_response_size = max; + self + } + + /// Set the max number of redirections to perform until a connection is regarded as failed. + /// + /// Default: 5 + pub fn max_redirections(mut self, redirect: u32) -> Self { + self.max_redirections = redirect; + self + } + + /// Configure how many concurrent method calls are allowed. + /// + /// Default: 1024 + pub fn max_concurrent_requests(mut self, max: u32) -> Self { + self.max_concurrent_requests = max; + self + } + + /// Configure how long until a method call is regarded as failed. + /// + /// Default: 1 minute + pub fn request_timeout(mut self, timeout: Duration) -> Self { + self.request_timeout = timeout; + self + } + + /// Set connection timeout for the WebSocket handshake + /// + /// Default: 10 seconds + pub fn connection_timeout(mut self, timeout: Duration) -> Self { + self.connection_timeout = timeout; + self + } + + /// Configure the data type of the request object ID + /// + /// Default: number + pub fn id_format(mut self, kind: IdKind) -> Self { + self.id_kind = kind; + self + } + + /// Set maximum length for logging calls and responses. + /// Logs bigger than this limit will be truncated. + /// + /// Default: 1024 + pub fn set_max_logging_length(mut self, max: u32) -> Self { + self.max_log_len = max; + self + } + + /// Configure which retry policy to use. + /// + /// Default: Exponential backoff 10ms + pub fn retry_policy + Send + Sync + 'static + Clone>( + self, + retry_policy: T, + ) -> Builder { + Builder { + max_request_size: self.max_request_size, + max_response_size: self.max_response_size, + retry_policy, + max_redirections: self.max_redirections, + max_log_len: self.max_log_len, + id_kind: self.id_kind, + max_concurrent_requests: self.max_concurrent_requests, + request_timeout: self.request_timeout, + connection_timeout: self.connection_timeout, + #[cfg(feature = "native")] + ping_config: self.ping_config, + #[cfg(feature = "native")] + headers: self.headers, + } + } + + #[cfg(feature = "native")] + #[cfg_attr(docsrs, doc(cfg(feature = "native")))] + /// Configure the WebSocket ping/pong interval. + /// + /// Default: 30 seconds. + pub fn enable_ws_ping(mut self, ping_config: PingConfig) -> Self { + self.ping_config = Some(ping_config); + self + } + + #[cfg(feature = "native")] + #[cfg_attr(docsrs, doc(cfg(feature = "native")))] + /// Disable WebSocket ping/pongs. + /// + /// Default: 30 seconds. + pub fn disable_ws_ping(mut self) -> Self { + self.ping_config = None; + self + } + + #[cfg(feature = "native")] + #[cfg_attr(docsrs, doc(cfg(native)))] + /// Configure custom headers to use in the WebSocket handshake. + pub fn set_headers(mut self, headers: HeaderMap) -> Self { + self.headers = headers; + self + } + + /// Build and connect to the target. + pub async fn build(self, url: String) -> Result { + let client = InnerClient::builder() + .retry_policy(self.retry_policy) + .build(url) + .await + .map_err(|e| RpcError::ClientError(Box::new(e)))?; + + Ok(Client(client)) + } +} + +/// Reconnecting rpc client. +#[derive(Debug, Clone)] +pub struct Client(InnerClient); + +impl Client { + /// Create a builder. + pub fn builder() -> Builder { + Builder::new() + } + + /// A future that resolves when the client has initiated a reconnection. + /// This method returns another future that resolves when the client has reconnected. + /// + /// This may be called multiple times. + pub async fn reconnect_initiated(&self) -> impl Future + '_ { + self.0.reconnect_started().await; + self.0.reconnected() + } + + /// Get how many times the client has reconnected successfully. + pub fn reconnect_count(&self) -> usize { + self.0.reconnect_count() + } +} + +impl RpcClientT for Client { + fn request_raw<'a>( + &'a self, + method: &'a str, + params: Option>, + ) -> RawRpcFuture<'a, Box> { + async { + self.0 + .request_raw_with_policy(method.to_string(), params, CallRetryPolicy::Drop) + .await + .map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string())) + } + .boxed() + } + + fn subscribe_raw<'a>( + &'a self, + sub: &'a str, + params: Option>, + unsub: &'a str, + ) -> RawRpcFuture<'a, RawRpcSubscription> { + async { + let sub = self + .0 + .subscribe_raw_with_policy( + sub.to_string(), + params, + unsub.to_string(), + CallRetryPolicy::Drop, + ) + .await + .map_err(|e| RpcError::ClientError(Box::new(e)))?; + + let id = match sub.id() { + SubscriptionId::Num(n) => n.to_string(), + SubscriptionId::Str(s) => s.to_string(), + }; + let stream = sub + .map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string())) + .boxed(); + + Ok(RawRpcSubscription { + stream, + id: Some(id), + }) + } + .boxed() + } +} diff --git a/subxt/src/backend/unstable/follow_stream.rs b/subxt/src/backend/unstable/follow_stream.rs index 3afa4a9ea50..1a5316f9629 100644 --- a/subxt/src/backend/unstable/follow_stream.rs +++ b/subxt/src/backend/unstable/follow_stream.rs @@ -148,6 +148,12 @@ impl Stream for FollowStream { continue; } Poll::Ready(Err(e)) => { + // Re-start if a reconnecting backend was enabled. + if e.is_disconnected_will_reconnect() { + this.stream = InnerStreamState::Stopped; + continue; + } + // Finish forever if there's an error, passing it on. this.stream = InnerStreamState::Finished; return Poll::Ready(Some(Err(e))); @@ -182,6 +188,12 @@ impl Stream for FollowStream { return Poll::Ready(Some(Ok(FollowStreamMsg::Event(ev)))); } Poll::Ready(Some(Err(e))) => { + // Re-start if a reconnecting backend was enabled. + if e.is_disconnected_will_reconnect() { + this.stream = InnerStreamState::Stopped; + continue; + } + // Finish forever if there's an error, passing it on. this.stream = InnerStreamState::Finished; return Poll::Ready(Some(Err(e))); diff --git a/subxt/src/backend/unstable/follow_stream_driver.rs b/subxt/src/backend/unstable/follow_stream_driver.rs index 61aca96dc95..a9bd508990f 100644 --- a/subxt/src/backend/unstable/follow_stream_driver.rs +++ b/subxt/src/backend/unstable/follow_stream_driver.rs @@ -5,7 +5,7 @@ use super::follow_stream_unpin::{BlockRef, FollowStreamMsg, FollowStreamUnpin}; use crate::backend::unstable::rpc_methods::{FollowEvent, Initialized, RuntimeEvent}; use crate::config::BlockHash; -use crate::error::Error; +use crate::error::{Error, RpcError}; use futures::stream::{Stream, StreamExt}; use std::collections::{HashMap, HashSet, VecDeque}; use std::ops::DerefMut; @@ -380,6 +380,103 @@ struct SubscriberDetails { waker: Option, } +/// A stream that subscribes to finalized blocks +/// and indicates whether a block was missed if was restarted. +#[derive(Debug)] +pub struct FollowStreamFinalizedHeads { + stream: FollowStreamDriverSubscription, + sub_id: Option, + last_seen_block: Option>, + f: F, + is_done: bool, +} + +impl Unpin for FollowStreamFinalizedHeads {} + +impl FollowStreamFinalizedHeads +where + Hash: BlockHash, + F: Fn(FollowEvent>) -> Vec>, +{ + pub fn new(stream: FollowStreamDriverSubscription, f: F) -> Self { + Self { + stream, + sub_id: None, + last_seen_block: None, + f, + is_done: false, + } + } +} + +impl Stream for FollowStreamFinalizedHeads +where + Hash: BlockHash, + F: Fn(FollowEvent>) -> Vec>, +{ + type Item = Result<(String, Vec>), Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.is_done { + return Poll::Ready(None); + } + + loop { + let Some(ev) = futures::ready!(self.stream.poll_next_unpin(cx)) else { + self.is_done = true; + return Poll::Ready(None); + }; + + let block_refs = match ev { + FollowStreamMsg::Ready(sub_id) => { + self.sub_id = Some(sub_id); + continue; + } + FollowStreamMsg::Event(FollowEvent::Finalized(finalized)) => { + self.last_seen_block = finalized.finalized_block_hashes.last().cloned(); + + (self.f)(FollowEvent::Finalized(finalized)) + } + FollowStreamMsg::Event(FollowEvent::Initialized(mut init)) => { + let prev = self.last_seen_block.take(); + self.last_seen_block = init.finalized_block_hashes.last().cloned(); + + if let Some(p) = prev { + let Some(pos) = init + .finalized_block_hashes + .iter() + .position(|b| b.hash() == p.hash()) + else { + return Poll::Ready(Some(Err(RpcError::DisconnectedWillReconnect( + "Missed at least one block when the connection was lost".to_owned(), + ) + .into()))); + }; + + // If we got older blocks than `prev`, we need to remove them + // because they should already have been sent at this point. + init.finalized_block_hashes.drain(0..=pos); + } + + (self.f)(FollowEvent::Initialized(init)) + } + FollowStreamMsg::Event(ev) => (self.f)(ev), + }; + + if block_refs.is_empty() { + continue; + } + + let sub_id = self + .sub_id + .clone() + .expect("Ready is always emitted before any other event"); + + return Poll::Ready(Some(Ok((sub_id, block_refs)))); + } + } +} + #[cfg(test)] mod test_utils { use super::super::follow_stream_unpin::test_utils::test_unpin_stream_getter; @@ -402,6 +499,9 @@ mod test_utils { #[cfg(test)] mod test { + use futures::TryStreamExt; + use sp_core::H256; + use super::super::follow_stream::test_utils::{ ev_best_block, ev_finalized, ev_initialized, ev_new_block, }; @@ -545,4 +645,101 @@ mod test { ]; assert_eq!(evs, expected); } + + #[tokio::test] + async fn subscribe_finalized_blocks_restart_works() { + let mut driver = test_follow_stream_driver_getter( + || { + [ + Ok(ev_initialized(0)), + Ok(ev_new_block(0, 1)), + Ok(ev_best_block(1)), + Ok(ev_finalized([1], [])), + Ok(FollowEvent::Stop), + Ok(ev_initialized(1)), + Ok(ev_finalized([2], [])), + Err(Error::Other("ended".to_owned())), + ] + }, + 10, + ); + + let handle = driver.handle(); + + tokio::spawn(async move { while driver.next().await.is_some() {} }); + + let f = |ev| match ev { + FollowEvent::Finalized(ev) => ev.finalized_block_hashes, + FollowEvent::Initialized(ev) => ev.finalized_block_hashes, + _ => vec![], + }; + + let stream = FollowStreamFinalizedHeads::new(handle.subscribe(), f); + let evs: Vec<_> = stream.try_collect().await.unwrap(); + + let expected = vec![ + ( + "sub_id_0".to_string(), + vec![BlockRef::new(H256::from_low_u64_le(0))], + ), + ( + "sub_id_0".to_string(), + vec![BlockRef::new(H256::from_low_u64_le(1))], + ), + ( + "sub_id_5".to_string(), + vec![BlockRef::new(H256::from_low_u64_le(2))], + ), + ]; + assert_eq!(evs, expected); + } + + #[tokio::test] + async fn subscribe_finalized_blocks_restart_with_missed_blocks() { + let mut driver = test_follow_stream_driver_getter( + || { + [ + Ok(ev_initialized(0)), + Ok(FollowEvent::Stop), + // Emulate that we missed some blocks. + Ok(ev_initialized(13)), + Ok(ev_finalized([14], [])), + Err(Error::Other("ended".to_owned())), + ] + }, + 10, + ); + + let handle = driver.handle(); + + tokio::spawn(async move { while driver.next().await.is_some() {} }); + + let f = |ev| match ev { + FollowEvent::Finalized(ev) => ev.finalized_block_hashes, + FollowEvent::Initialized(ev) => ev.finalized_block_hashes, + _ => vec![], + }; + + let evs: Vec<_> = FollowStreamFinalizedHeads::new(handle.subscribe(), f) + .collect() + .await; + + assert_eq!( + evs[0].as_ref().unwrap(), + &( + "sub_id_0".to_string(), + vec![BlockRef::new(H256::from_low_u64_le(0))] + ) + ); + assert!( + matches!(&evs[1], Err(Error::Rpc(RpcError::DisconnectedWillReconnect(e))) if e.contains("Missed at least one block when the connection was lost")) + ); + assert_eq!( + evs[2].as_ref().unwrap(), + &( + "sub_id_2".to_string(), + vec![BlockRef::new(H256::from_low_u64_le(14))] + ) + ); + } } diff --git a/subxt/src/backend/unstable/follow_stream_unpin.rs b/subxt/src/backend/unstable/follow_stream_unpin.rs index cb12c93b281..ecff8b0d58a 100644 --- a/subxt/src/backend/unstable/follow_stream_unpin.rs +++ b/subxt/src/backend/unstable/follow_stream_unpin.rs @@ -474,7 +474,7 @@ pub(super) mod test_utils { pub type UnpinRx = std::sync::mpsc::Receiver<(Hash, Arc)>; - /// Get a `FolowStreamUnpin` from an iterator over events. + /// Get a [`FollowStreamUnpin`] from an iterator over events. pub fn test_unpin_stream_getter( events: F, max_life: usize, diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs index 5802fb6710c..ead0af1d9e0 100644 --- a/subxt/src/backend/unstable/mod.rs +++ b/subxt/src/backend/unstable/mod.rs @@ -18,21 +18,22 @@ mod storage_items; pub mod rpc_methods; +use self::follow_stream_driver::FollowStreamFinalizedHeads; use self::rpc_methods::{ FollowEvent, MethodResponse, RuntimeEvent, StorageQuery, StorageQueryType, StorageResultType, }; use crate::backend::{ - rpc::RpcClient, Backend, BlockRef, BlockRefT, RuntimeVersion, StorageResponse, StreamOf, - StreamOfResults, TransactionStatus, + rpc::RpcClient, utils::retry, Backend, BlockRef, BlockRefT, RuntimeVersion, StorageResponse, + StreamOf, StreamOfResults, TransactionStatus, }; use crate::config::BlockHash; use crate::error::{Error, RpcError}; use crate::Config; use async_trait::async_trait; use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle}; +use futures::future::Either; use futures::{Stream, StreamExt}; use std::collections::HashMap; -use std::sync::Arc; use std::task::Poll; use storage_items::StorageItems; @@ -136,43 +137,50 @@ impl UnstableBackend { } /// Stream block headers based on the provided filter fn - async fn stream_headers( + async fn stream_headers( &self, f: F, ) -> Result)>, Error> where - F: Fn(FollowEvent>) -> I + Copy + Send + 'static, - I: IntoIterator> + Send + 'static, - ::IntoIter: Send, + F: Fn( + FollowEvent>, + ) -> Vec> + + Send + + Sync + + 'static, { - let sub_id = get_subscription_id(&self.follow_handle).await?; - let sub_id = Arc::new(sub_id); let methods = self.methods.clone(); - let headers = self.follow_handle.subscribe().events().flat_map(move |ev| { - let sub_id = sub_id.clone(); - let methods = methods.clone(); - let block_refs = f(ev).into_iter(); - - futures::stream::iter(block_refs).filter_map(move |block_ref| { - let sub_id = sub_id.clone(); + let headers = + FollowStreamFinalizedHeads::new(self.follow_handle.subscribe(), f).flat_map(move |r| { let methods = methods.clone(); - async move { - let res = methods - .chainhead_v1_header(&sub_id, block_ref.hash()) - .await - .transpose()?; + let (sub_id, block_refs) = match r { + Ok(ev) => ev, + Err(e) => return Either::Left(futures::stream::once(async { Err(e) })), + }; - let header = match res { - Ok(header) => header, - Err(e) => return Some(Err(e)), - }; + Either::Right( + futures::stream::iter(block_refs).filter_map(move |block_ref| { + let methods = methods.clone(); + let sub_id = sub_id.clone(); - Some(Ok((header, block_ref.into()))) - } - }) - }); + async move { + let res = methods + .chainhead_v1_header(&sub_id, block_ref.hash()) + .await + .transpose()?; + + let header = match res { + Ok(header) => header, + Err(e) => return Some(Err(e)), + }; + + Some(Ok((header, block_ref.into()))) + } + }), + ) + }); Ok(StreamOf(Box::pin(headers))) } @@ -194,31 +202,34 @@ impl Backend for UnstableBackend { keys: Vec>, at: T::Hash, ) -> Result, Error> { - let queries = keys.iter().map(|key| StorageQuery { - key: &**key, - query_type: StorageQueryType::Value, - }); + retry(|| async { + let queries = keys.iter().map(|key| StorageQuery { + key: &**key, + query_type: StorageQueryType::Value, + }); - let storage_items = - StorageItems::from_methods(queries, at, &self.follow_handle, self.methods.clone()) - .await?; + let storage_items = + StorageItems::from_methods(queries, at, &self.follow_handle, self.methods.clone()) + .await?; - let storage_result_stream = storage_items.filter_map(|val| async move { - let val = match val { - Ok(val) => val, - Err(e) => return Some(Err(e)), - }; + let stream = storage_items.filter_map(|val| async move { + let val = match val { + Ok(val) => val, + Err(e) => return Some(Err(e)), + }; - let StorageResultType::Value(result) = val.result else { - return None; - }; - Some(Ok(StorageResponse { - key: val.key.0, - value: result.0, - })) - }); + let StorageResultType::Value(result) = val.result else { + return None; + }; + Some(Ok(StorageResponse { + key: val.key.0, + value: result.0, + })) + }); - Ok(StreamOf(Box::pin(storage_result_stream))) + Ok(StreamOf(Box::pin(stream))) + }) + .await } async fn storage_fetch_descendant_keys( @@ -226,22 +237,25 @@ impl Backend for UnstableBackend { key: Vec, at: T::Hash, ) -> Result>, Error> { - // Ask for hashes, and then just ignore them and return the keys that come back. - let query = StorageQuery { - key: &*key, - query_type: StorageQueryType::DescendantsHashes, - }; + retry(|| async { + // Ask for hashes, and then just ignore them and return the keys that come back. + let query = StorageQuery { + key: &*key, + query_type: StorageQueryType::DescendantsHashes, + }; - let storage_items = StorageItems::from_methods( - std::iter::once(query), - at, - &self.follow_handle, - self.methods.clone(), - ) - .await?; + let storage_items = StorageItems::from_methods( + std::iter::once(query), + at, + &self.follow_handle, + self.methods.clone(), + ) + .await?; - let storage_result_stream = storage_items.map(|val| val.map(|v| v.key.0)); - Ok(StreamOf(Box::pin(storage_result_stream))) + let storage_result_stream = storage_items.map(|val| val.map(|v| v.key.0)); + Ok(StreamOf(Box::pin(storage_result_stream))) + }) + .await } async fn storage_fetch_descendant_values( @@ -249,72 +263,81 @@ impl Backend for UnstableBackend { key: Vec, at: T::Hash, ) -> Result, Error> { - let query = StorageQuery { - key: &*key, - query_type: StorageQueryType::DescendantsValues, - }; - - let storage_items = StorageItems::from_methods( - std::iter::once(query), - at, - &self.follow_handle, - self.methods.clone(), - ) - .await?; - - let storage_result_stream = storage_items.filter_map(|val| async move { - let val = match val { - Ok(val) => val, - Err(e) => return Some(Err(e)), + retry(|| async { + let query = StorageQuery { + key: &*key, + query_type: StorageQueryType::DescendantsValues, }; - let StorageResultType::Value(result) = val.result else { - return None; - }; - Some(Ok(StorageResponse { - key: val.key.0, - value: result.0, - })) - }); + let storage_items = StorageItems::from_methods( + std::iter::once(query), + at, + &self.follow_handle, + self.methods.clone(), + ) + .await?; + + let storage_result_stream = storage_items.filter_map(|val| async move { + let val = match val { + Ok(val) => val, + Err(e) => return Some(Err(e)), + }; + + let StorageResultType::Value(result) = val.result else { + return None; + }; + Some(Ok(StorageResponse { + key: val.key.0, + value: result.0, + })) + }); - Ok(StreamOf(Box::pin(storage_result_stream))) + Ok(StreamOf(Box::pin(storage_result_stream))) + }) + .await } async fn genesis_hash(&self) -> Result { - self.methods.chainspec_v1_genesis_hash().await + retry(|| self.methods.chainspec_v1_genesis_hash()).await } async fn block_header(&self, at: T::Hash) -> Result, Error> { - let sub_id = get_subscription_id(&self.follow_handle).await?; - self.methods.chainhead_v1_header(&sub_id, at).await + retry(|| async { + let sub_id = get_subscription_id(&self.follow_handle).await?; + self.methods.chainhead_v1_header(&sub_id, at).await + }) + .await } async fn block_body(&self, at: T::Hash) -> Result>>, Error> { - let sub_id = get_subscription_id(&self.follow_handle).await?; - - // Subscribe to the body response and get our operationId back. - let follow_events = self.follow_handle.subscribe().events(); - let status = self.methods.chainhead_v1_body(&sub_id, at).await?; - let operation_id = match status { - MethodResponse::LimitReached => { - return Err(RpcError::request_rejected("limit reached").into()) - } - MethodResponse::Started(s) => s.operation_id, - }; - - // Wait for the response to come back with the correct operationId. - let mut exts_stream = follow_events.filter_map(|ev| { - let FollowEvent::OperationBodyDone(body) = ev else { - return std::future::ready(None); + retry(|| async { + let sub_id = get_subscription_id(&self.follow_handle).await?; + + // Subscribe to the body response and get our operationId back. + let follow_events = self.follow_handle.subscribe().events(); + let status = self.methods.chainhead_v1_body(&sub_id, at).await?; + let operation_id = match status { + MethodResponse::LimitReached => { + return Err(RpcError::request_rejected("limit reached").into()) + } + MethodResponse::Started(s) => s.operation_id, }; - if body.operation_id != operation_id { - return std::future::ready(None); - } - let exts: Vec<_> = body.value.into_iter().map(|ext| ext.0).collect(); - std::future::ready(Some(exts)) - }); - Ok(exts_stream.next().await) + // Wait for the response to come back with the correct operationId. + let mut exts_stream = follow_events.filter_map(|ev| { + let FollowEvent::OperationBodyDone(body) = ev else { + return std::future::ready(None); + }; + if body.operation_id != operation_id { + return std::future::ready(None); + } + let exts: Vec<_> = body.value.into_iter().map(|ext| ext.0).collect(); + std::future::ready(Some(exts)) + }); + + Ok(exts_stream.next().await) + }) + .await } async fn latest_finalized_block_ref(&self) -> Result, Error> { @@ -423,12 +446,16 @@ impl Backend for UnstableBackend { std::future::ready(Some(Ok(runtime_version))) }); - Ok(StreamOf(Box::pin(runtime_stream))) + Ok(StreamOf::new(Box::pin(runtime_stream))) } async fn stream_all_block_headers( &self, ) -> Result)>, Error> { + // TODO: https://github.com/paritytech/subxt/issues/1568 + // + // It's possible that blocks may be silently missed if + // a reconnection occurs because it's restarted by the unstable backend. self.stream_headers(|ev| match ev { FollowEvent::Initialized(init) => init.finalized_block_hashes, FollowEvent::NewBlock(ev) => { @@ -442,6 +469,10 @@ impl Backend for UnstableBackend { async fn stream_best_block_headers( &self, ) -> Result)>, Error> { + // TODO: https://github.com/paritytech/subxt/issues/1568 + // + // It's possible that blocks may be silently missed if + // a reconnection occurs because it's restarted by the unstable backend. self.stream_headers(|ev| match ev { FollowEvent::Initialized(init) => init.finalized_block_hashes, FollowEvent::BestBlockChanged(ev) => vec![ev.best_block_hash], @@ -638,37 +669,40 @@ impl Backend for UnstableBackend { call_parameters: Option<&[u8]>, at: T::Hash, ) -> Result, Error> { - let sub_id = get_subscription_id(&self.follow_handle).await?; - - // Subscribe to the body response and get our operationId back. - let follow_events = self.follow_handle.subscribe().events(); - let call_parameters = call_parameters.unwrap_or(&[]); - let status = self - .methods - .chainhead_v1_call(&sub_id, at, method, call_parameters) - .await?; - let operation_id = match status { - MethodResponse::LimitReached => { - return Err(RpcError::request_rejected("limit reached").into()) - } - MethodResponse::Started(s) => s.operation_id, - }; - - // Wait for the response to come back with the correct operationId. - let mut call_data_stream = follow_events.filter_map(|ev| { - let FollowEvent::OperationCallDone(body) = ev else { - return std::future::ready(None); + retry(|| async { + let sub_id = get_subscription_id(&self.follow_handle).await?; + + // Subscribe to the body response and get our operationId back. + let follow_events = self.follow_handle.subscribe().events(); + let call_parameters = call_parameters.unwrap_or(&[]); + let status = self + .methods + .chainhead_v1_call(&sub_id, at, method, call_parameters) + .await?; + let operation_id = match status { + MethodResponse::LimitReached => { + return Err(RpcError::request_rejected("limit reached").into()) + } + MethodResponse::Started(s) => s.operation_id, }; - if body.operation_id != operation_id { - return std::future::ready(None); - } - std::future::ready(Some(body.output.0)) - }); - call_data_stream - .next() - .await - .ok_or_else(|| RpcError::SubscriptionDropped.into()) + // Wait for the response to come back with the correct operationId. + let mut call_data_stream = follow_events.filter_map(|ev| { + let FollowEvent::OperationCallDone(body) = ev else { + return std::future::ready(None); + }; + if body.operation_id != operation_id { + return std::future::ready(None); + } + std::future::ready(Some(body.output.0)) + }); + + call_data_stream + .next() + .await + .ok_or_else(|| RpcError::SubscriptionDropped.into()) + }) + .await } } diff --git a/subxt/src/backend/unstable/storage_items.rs b/subxt/src/backend/unstable/storage_items.rs index 52d85608353..0e7aa46772d 100644 --- a/subxt/src/backend/unstable/storage_items.rs +++ b/subxt/src/backend/unstable/storage_items.rs @@ -111,6 +111,11 @@ impl Stream for StorageItems { return Poll::Pending; } Poll::Ready(Err(e)) => { + if e.is_disconnected_will_reconnect() { + self.continue_fut = Some((self.continue_call)()); + continue; + } + self.done = true; return Poll::Ready(Some(Err(e))); } diff --git a/subxt/src/backend/utils.rs b/subxt/src/backend/utils.rs new file mode 100644 index 00000000000..e8587734ba0 --- /dev/null +++ b/subxt/src/backend/utils.rs @@ -0,0 +1,271 @@ +//! RPC utils. + +use super::{StreamOf, StreamOfResults}; +use crate::error::Error; +use futures::future::BoxFuture; +use futures::{FutureExt, Stream, StreamExt}; +use std::{future::Future, pin::Pin, task::Poll}; + +/// Resubscribe callback. +type ResubscribeGetter = Box ResubscribeFuture + Send>; + +/// Future that resolves to a subscription stream. +type ResubscribeFuture = Pin, Error>> + Send>>; + +pub(crate) enum PendingOrStream { + Pending(BoxFuture<'static, Result, Error>>), + Stream(StreamOfResults), +} + +impl std::fmt::Debug for PendingOrStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PendingOrStream::Pending(_) => write!(f, "Pending"), + PendingOrStream::Stream(_) => write!(f, "Stream"), + } + } +} + +/// Retry subscription. +struct RetrySubscription { + resubscribe: ResubscribeGetter, + state: Option>, +} + +impl std::marker::Unpin for RetrySubscription {} + +impl Stream for RetrySubscription { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + loop { + let Some(mut this) = self.state.take() else { + return Poll::Ready(None); + }; + + match this { + PendingOrStream::Stream(ref mut s) => match s.poll_next_unpin(cx) { + Poll::Ready(Some(Err(err))) => { + if err.is_disconnected_will_reconnect() { + self.state = Some(PendingOrStream::Pending((self.resubscribe)())); + } + return Poll::Ready(Some(Err(err))); + } + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some(Ok(val))) => { + self.state = Some(this); + return Poll::Ready(Some(Ok(val))); + } + Poll::Pending => { + self.state = Some(this); + return Poll::Pending; + } + }, + PendingOrStream::Pending(mut fut) => match fut.poll_unpin(cx) { + Poll::Ready(Ok(stream)) => { + self.state = Some(PendingOrStream::Stream(stream)); + continue; + } + Poll::Ready(Err(err)) => { + if err.is_disconnected_will_reconnect() { + self.state = Some(PendingOrStream::Pending((self.resubscribe)())); + } + return Poll::Ready(Some(Err(err))); + } + Poll::Pending => { + self.state = Some(PendingOrStream::Pending(fut)); + return Poll::Pending; + } + }, + }; + } + } +} + +/// Retry a future until it doesn't return a disconnected error. +/// +/// # Example +/// +/// ```no_run +/// use subxt::backend::utils::retry; +/// +/// async fn some_future() -> Result<(), subxt::error::Error> { +/// Ok(()) +/// } +/// +/// #[tokio::main] +/// async fn main() { +/// let result = retry(|| some_future()).await; +/// } +/// ``` +pub async fn retry(mut retry_future: F) -> Result +where + F: FnMut() -> T, + T: Future>, +{ + const REJECTED_MAX_RETRIES: usize = 10; + let mut rejected_retries = 0; + + loop { + match retry_future().await { + Ok(v) => return Ok(v), + Err(e) => { + if e.is_disconnected_will_reconnect() { + continue; + } + + // TODO: https://github.com/paritytech/subxt/issues/1567 + // This is a hack because if a reconnection occurs + // the order of pending calls is not guaranteed. + // + // Such that it's possible the a pending future completes + // before `chainHead_follow` is established with fresh + // subscription id. + // + if e.is_rejected() && rejected_retries < REJECTED_MAX_RETRIES { + rejected_retries += 1; + continue; + } + + return Err(e); + } + } + } +} + +/// Create a retry stream that will resubscribe on disconnect. +/// +/// It's important to note that this function is intended to work only for stateless subscriptions. +/// If the subscription takes input or modifies state, this function should not be used. +/// +/// # Example +/// +/// ```no_run +/// use subxt::backend::{utils::retry_stream, StreamOf}; +/// use futures::future::FutureExt; +/// +/// #[tokio::main] +/// async fn main() { +/// retry_stream(|| { +/// // This needs to return a stream of results but if you are using +/// // the subxt backend already it will return StreamOf so you can just +/// // return it directly in the async block below. +/// async move { Ok(StreamOf::new(Box::pin(futures::stream::iter([Ok(2)])))) }.boxed() +/// }).await; +/// } +/// ``` +pub async fn retry_stream(sub_stream: F) -> Result, Error> +where + F: FnMut() -> ResubscribeFuture + Send + 'static + Clone, + R: Send + 'static, +{ + let stream = retry(sub_stream.clone()).await?; + + let resubscribe = Box::new(move || { + let sub_stream = sub_stream.clone(); + async move { retry(sub_stream).await }.boxed() + }); + + // The extra Box is to encapsulate the retry subscription type + Ok(StreamOf::new(Box::pin(RetrySubscription { + state: Some(PendingOrStream::Stream(stream)), + resubscribe, + }))) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::backend::StreamOf; + + fn disconnect_err() -> Error { + Error::Rpc(crate::error::RpcError::DisconnectedWillReconnect( + String::new(), + )) + } + + fn custom_err() -> Error { + Error::Other(String::new()) + } + + #[tokio::test] + async fn retry_stream_works() { + let retry_stream = retry_stream(|| { + async { + Ok(StreamOf::new(Box::pin(futures::stream::iter([ + Ok(1), + Ok(2), + Ok(3), + Err(disconnect_err()), + ])))) + } + .boxed() + }) + .await + .unwrap(); + + let result = retry_stream + .take(5) + .collect::>>() + .await; + + assert!(matches!(result[0], Ok(r) if r == 1)); + assert!(matches!(result[1], Ok(r) if r == 2)); + assert!(matches!(result[2], Ok(r) if r == 3)); + assert!(matches!(result[3], Err(ref e) if e.is_disconnected_will_reconnect())); + assert!(matches!(result[4], Ok(r) if r == 1)); + } + + #[tokio::test] + async fn retry_sub_works() { + let stream = futures::stream::iter([Ok(1), Err(disconnect_err())]); + + let resubscribe = Box::new(move || { + async move { Ok(StreamOf::new(Box::pin(futures::stream::iter([Ok(2)])))) }.boxed() + }); + + let retry_stream = RetrySubscription { + state: Some(PendingOrStream::Stream(StreamOf::new(Box::pin(stream)))), + resubscribe, + }; + + let result: Vec<_> = retry_stream.collect().await; + + assert!(matches!(result[0], Ok(r) if r == 1)); + assert!(matches!(result[1], Err(ref e) if e.is_disconnected_will_reconnect())); + assert!(matches!(result[2], Ok(r) if r == 2)); + } + + #[tokio::test] + async fn retry_sub_err_terminates_stream() { + let stream = futures::stream::iter([Ok(1)]); + let resubscribe = Box::new(move || async move { Err(custom_err()) }.boxed()); + + let retry_stream = RetrySubscription { + state: Some(PendingOrStream::Stream(StreamOf::new(Box::pin(stream)))), + resubscribe, + }; + + assert_eq!(retry_stream.count().await, 1); + } + + #[tokio::test] + async fn retry_sub_resubscribe_err() { + let stream = futures::stream::iter([Ok(1), Err(disconnect_err())]); + let resubscribe = Box::new(move || async move { Err(custom_err()) }.boxed()); + + let retry_stream = RetrySubscription { + state: Some(PendingOrStream::Stream(StreamOf::new(Box::pin(stream)))), + resubscribe, + }; + + let result: Vec<_> = retry_stream.collect().await; + + assert!(matches!(result[0], Ok(r) if r == 1)); + assert!(matches!(result[1], Err(ref e) if e.is_disconnected_will_reconnect())); + assert!(matches!(result[2], Err(ref e) if matches!(e, Error::Other(_)))); + } +} diff --git a/subxt/src/blocks/blocks_client.rs b/subxt/src/blocks/blocks_client.rs index 0d8e589a94a..c0cb92f9c9c 100644 --- a/subxt/src/blocks/blocks_client.rs +++ b/subxt/src/blocks/blocks_client.rs @@ -95,8 +95,8 @@ where { let client = self.client.clone(); header_sub_fut_to_block_sub(self.clone(), async move { - let sub = client.backend().stream_all_block_headers().await?; - BlockStreamRes::Ok(sub) + let stream = client.backend().stream_all_block_headers().await?; + BlockStreamRes::Ok(stream) }) } @@ -112,8 +112,8 @@ where { let client = self.client.clone(); header_sub_fut_to_block_sub(self.clone(), async move { - let sub = client.backend().stream_best_block_headers().await?; - BlockStreamRes::Ok(sub) + let stream = client.backend().stream_best_block_headers().await?; + BlockStreamRes::Ok(stream) }) } @@ -126,8 +126,8 @@ where { let client = self.client.clone(); header_sub_fut_to_block_sub(self.clone(), async move { - let sub = client.backend().stream_finalized_block_headers().await?; - BlockStreamRes::Ok(sub) + let stream = client.backend().stream_finalized_block_headers().await?; + BlockStreamRes::Ok(stream) }) } } diff --git a/subxt/src/client/online_client.rs b/subxt/src/client/online_client.rs index eaf03fd671e..fccd4874234 100644 --- a/subxt/src/client/online_client.rs +++ b/subxt/src/client/online_client.rs @@ -432,9 +432,8 @@ impl ClientRuntimeUpdater { /// Instead that's up to the user of this API to decide when to update and /// to perform the actual updating. pub async fn runtime_updates(&self) -> Result, Error> { - let stream = self.0.backend().stream_runtime_version().await?; Ok(RuntimeUpdaterStream { - stream, + stream: self.0.backend().stream_runtime_version().await?, client: self.0.clone(), }) } diff --git a/subxt/src/error/mod.rs b/subxt/src/error/mod.rs index 0f942a1c40c..53803450f14 100644 --- a/subxt/src/error/mod.rs +++ b/subxt/src/error/mod.rs @@ -125,6 +125,11 @@ impl Error { pub fn is_disconnected_will_reconnect(&self) -> bool { matches!(self, Error::Rpc(RpcError::DisconnectedWillReconnect(_))) } + + /// Checks whether the error was caused by a RPC request being rejected. + pub fn is_rejected(&self) -> bool { + matches!(self, Error::Rpc(RpcError::RequestRejected(_))) + } } /// An RPC error. Since we are generic over the RPC client that is used, diff --git a/subxt/src/tx/tx_progress.rs b/subxt/src/tx/tx_progress.rs index 3c5d9b7de57..f4d2ed9ab9d 100644 --- a/subxt/src/tx/tx_progress.rs +++ b/subxt/src/tx/tx_progress.rs @@ -11,11 +11,11 @@ use crate::{ client::OnlineClientT, error::{DispatchError, Error, RpcError, TransactionError}, events::EventsClient, + utils::strip_compact_prefix, Config, }; use derive_where::derive_where; use futures::{Stream, StreamExt}; -use subxt_core::utils::strip_compact_prefix; /// This struct represents a subscription to the progress of some transaction. pub struct TxProgress {