Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 32 additions & 24 deletions relays/client-substrate/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
Result, SignParam, TransactionTracker, UnsignedTransaction,
};

use async_std::sync::{Arc, Mutex};
use async_std::sync::{Arc, Mutex, RwLock};
use async_trait::async_trait;
use bp_runtime::{HeaderIdProvider, StorageDoubleMapKeyProvider, StorageMapKeyProvider};
use codec::{Decode, Encode};
Expand Down Expand Up @@ -111,46 +111,54 @@ pub enum ChainRuntimeVersion {

/// Substrate client type.
///
/// Cloning `Client` is a cheap operation.
/// Cloning `Client` is a cheap operation that only clones internal references. Different
/// clones of the same client are guaranteed to use the same references.
pub struct Client<C: Chain> {
/// Tokio runtime handle.
tokio: Arc<tokio::runtime::Runtime>,
// Lock order: `submit_signed_extrinsic_lock`, `data`
/// Client connection params.
params: Arc<ConnectionParams>,
/// Substrate RPC client.
client: Arc<RpcClient>,
/// Genesis block hash.
genesis_hash: HashOf<C>,
/// Saved chain runtime version.
chain_runtime_version: ChainRuntimeVersion,
/// If several tasks are submitting their transactions simultaneously using
/// `submit_signed_extrinsic` method, they may get the same transaction nonce. So one of
/// transactions will be rejected from the pool. This lock is here to prevent situations like
/// that.
submit_signed_extrinsic_lock: Arc<Mutex<()>>,
/// Saved chain runtime version
chain_runtime_version: ChainRuntimeVersion,
/// Genesis block hash.
genesis_hash: HashOf<C>,
/// Shared dynamic data.
data: Arc<RwLock<ClientData>>,
}

/// Client data, shared by all `Client` clones.
struct ClientData {
/// Tokio runtime handle.
tokio: Arc<tokio::runtime::Runtime>,
/// Substrate RPC client.
client: Arc<RpcClient>,
}

#[async_trait]
impl<C: Chain> relay_utils::relay_loop::Client for Client<C> {
type Error = Error;

async fn reconnect(&mut self) -> Result<()> {
let mut data = self.data.write().await;
let (tokio, client) = Self::build_client(&self.params).await?;
self.tokio = tokio;
self.client = client;
data.tokio = tokio;
data.client = client;
Ok(())
}
}

impl<C: Chain> Clone for Client<C> {
fn clone(&self) -> Self {
Client {
tokio: self.tokio.clone(),
params: self.params.clone(),
client: self.client.clone(),
genesis_hash: self.genesis_hash,
submit_signed_extrinsic_lock: self.submit_signed_extrinsic_lock.clone(),
chain_runtime_version: self.chain_runtime_version.clone(),
submit_signed_extrinsic_lock: self.submit_signed_extrinsic_lock.clone(),
genesis_hash: self.genesis_hash,
data: self.data.clone(),
}
}
}
Expand Down Expand Up @@ -199,12 +207,11 @@ impl<C: Chain> Client<C> {

let chain_runtime_version = params.chain_runtime_version.clone();
Ok(Self {
tokio,
params,
client,
genesis_hash,
submit_signed_extrinsic_lock: Arc::new(Mutex::new(())),
chain_runtime_version,
submit_signed_extrinsic_lock: Arc::new(Mutex::new(())),
genesis_hash,
data: Arc::new(RwLock::new(ClientData { tokio, client })),
})
}

Expand Down Expand Up @@ -572,7 +579,7 @@ impl<C: Chain> Client<C> {
Ok((tracker, subscription))
})
.await?;
self.tokio.spawn(Subscription::background_worker(
self.data.read().await.tokio.spawn(Subscription::background_worker(
C::NAME.into(),
"extrinsic".into(),
subscription,
Expand Down Expand Up @@ -719,7 +726,7 @@ impl<C: Chain> Client<C> {
})
.await?;
let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY);
self.tokio.spawn(Subscription::background_worker(
self.data.read().await.tokio.spawn(Subscription::background_worker(
C::NAME.into(),
"justification".into(),
subscription,
Expand All @@ -735,8 +742,9 @@ impl<C: Chain> Client<C> {
F: Future<Output = Result<T>> + Send,
T: Send + 'static,
{
let client = self.client.clone();
self.tokio.spawn(async move { make_jsonrpsee_future(client).await }).await?
let data = self.data.read().await;
let client = data.client.clone();
data.tokio.spawn(async move { make_jsonrpsee_future(client).await }).await?
}

/// Returns `true` if version guard can be started.
Expand Down