Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Add backoff sleep between MASP indexer client failed requests.
([\#4702](https://github.com/anoma/namada/pull/4702))
24 changes: 13 additions & 11 deletions crates/apps_lib/src/client/masp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use namada_sdk::error::Error;
use namada_sdk::io::DevNullProgressBar;
use namada_sdk::io::{Client, Io, MaybeSend, MaybeSync, display, display_line};
use namada_sdk::masp::{
IndexerMaspClient, LedgerMaspClient, MaspLocalTaskEnv, ShieldedContext,
ShieldedSyncConfig, ShieldedUtils,
IndexerMaspClient, LedgerMaspClient, LinearBackoffSleepMaspClient,
MaspLocalTaskEnv, ShieldedContext, ShieldedSyncConfig, ShieldedUtils,
};

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -126,11 +126,14 @@ pub async fn syncing<
))
})?;

dispatch_client!(IndexerMaspClient::new(
client,
url,
true,
args.max_concurrent_fetches,
dispatch_client!(LinearBackoffSleepMaspClient::new(
IndexerMaspClient::new(
client,
url,
true,
args.max_concurrent_fetches,
),
Duration::from_millis(5)
))?
} else {
display_line!(
Expand All @@ -139,10 +142,9 @@ pub async fn syncing<
"==== Shielded sync started using ledger client ====".bold()
);

dispatch_client!(LedgerMaspClient::new(
client,
args.max_concurrent_fetches,
Duration::from_millis(5),
dispatch_client!(LinearBackoffSleepMaspClient::new(
LedgerMaspClient::new(client, args.max_concurrent_fetches,),
Duration::from_millis(5)
))?
};

Expand Down
4 changes: 3 additions & 1 deletion crates/sdk/src/masp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use namada_token::masp::shielded_wallet::ShieldedQueries;
pub use namada_token::masp::{utils, *};
use namada_tx::event::{MaspEvent, MaspEventKind, MaspTxRef};
use namada_tx::{IndexedTx, Tx};
pub use utilities::{IndexerMaspClient, LedgerMaspClient};
pub use utilities::{
IndexerMaspClient, LedgerMaspClient, LinearBackoffSleepMaspClient,
};

use crate::error::{Error, QueryError};
use crate::rpc::{
Expand Down
211 changes: 159 additions & 52 deletions crates/sdk/src/masp/utilities.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Helper functions and types

use std::collections::BTreeMap;
use std::future::Future;
use std::sync::{Arc, RwLock};

use borsh::BorshDeserialize;
Expand All @@ -25,11 +26,129 @@ use tokio::sync::Semaphore;
use crate::error::{Error, QueryError};
use crate::masp::{extract_masp_tx, get_indexed_masp_events_at_height};

/// Middleware MASP client implementation that introduces
/// linear backoff sleeps between failed requests.
pub struct LinearBackoffSleepMaspClient<M> {
middleware_client: M,
shared: Arc<LinearBackoffSleepMaspClientShared>,
}

struct LinearBackoffSleepMaspClientShared {
backoff: RwLock<Duration>,
sleep: Sleep<LinearBackoff>,
}

impl<M> LinearBackoffSleepMaspClient<M> {
/// Create a new [`MaspClient`] with linear backoff
/// sleep between failed requests.
#[inline(always)]
pub fn new(middleware_client: M, linear_backoff_delta: Duration) -> Self {
Self {
middleware_client,
shared: Arc::new(LinearBackoffSleepMaspClientShared {
backoff: RwLock::new(Duration::from_secs(0)),
sleep: Sleep {
strategy: LinearBackoff {
delta: linear_backoff_delta,
},
},
}),
}
}
}

impl<M: Clone> Clone for LinearBackoffSleepMaspClient<M> {
fn clone(&self) -> Self {
Self {
middleware_client: self.middleware_client.clone(),
shared: Arc::clone(&self.shared),
}
}
}

impl<M: MaspClient> MaspClient for LinearBackoffSleepMaspClient<M> {
type Error = <M as MaspClient>::Error;

async fn last_block_height(
&self,
) -> Result<Option<BlockHeight>, Self::Error> {
with_linear_backoff(
&self.shared.backoff,
&self.shared.sleep,
self.middleware_client.last_block_height(),
)
.await
}

async fn fetch_shielded_transfers(
&self,
from: BlockHeight,
to: BlockHeight,
) -> Result<Vec<IndexedNoteEntry>, Self::Error> {
with_linear_backoff(
&self.shared.backoff,
&self.shared.sleep,
self.middleware_client.fetch_shielded_transfers(from, to),
)
.await
}

fn capabilities(&self) -> MaspClientCapabilities {
self.middleware_client.capabilities()
}

async fn fetch_commitment_tree(
&self,
height: BlockHeight,
) -> Result<CommitmentTree<Node>, Self::Error> {
with_linear_backoff(
&self.shared.backoff,
&self.shared.sleep,
self.middleware_client.fetch_commitment_tree(height),
)
.await
}

async fn fetch_note_index(
&self,
height: BlockHeight,
) -> Result<BTreeMap<MaspIndexedTx, usize>, Self::Error> {
with_linear_backoff(
&self.shared.backoff,
&self.shared.sleep,
self.middleware_client.fetch_note_index(height),
)
.await
}

async fn fetch_witness_map(
&self,
height: BlockHeight,
) -> Result<HashMap<usize, IncrementalWitness<Node>>, Self::Error> {
with_linear_backoff(
&self.shared.backoff,
&self.shared.sleep,
self.middleware_client.fetch_witness_map(height),
)
.await
}

async fn commitment_anchor_exists(
&self,
root: &Node,
) -> Result<bool, Self::Error> {
with_linear_backoff(
&self.shared.backoff,
&self.shared.sleep,
self.middleware_client.commitment_anchor_exists(root),
)
.await
}
}

struct LedgerMaspClientInner<C> {
client: C,
semaphore: Semaphore,
backoff: RwLock<Duration>,
sleep: Sleep<LinearBackoff>,
}

/// An inefficient MASP client which simply uses a
Expand All @@ -49,28 +168,25 @@ impl<C> Clone for LedgerMaspClient<C> {
impl<C> LedgerMaspClient<C> {
/// Create a new [`MaspClient`] given an rpc client.
#[inline(always)]
pub fn new(
client: C,
max_concurrent_fetches: usize,
linear_backoff_delta: Duration,
) -> Self {
pub fn new(client: C, max_concurrent_fetches: usize) -> Self {
Self {
inner: Arc::new(LedgerMaspClientInner {
client,
semaphore: Semaphore::new(max_concurrent_fetches),
backoff: RwLock::new(Duration::from_secs(0)),
sleep: Sleep {
strategy: LinearBackoff {
delta: linear_backoff_delta,
},
},
}),
}
}
}

impl<C: Client + Send + Sync> LedgerMaspClient<C> {
async fn fetch_shielded_transfers_inner(
impl<C: Client + Send + Sync> MaspClient for LedgerMaspClient<C> {
type Error = Error;

async fn last_block_height(&self) -> Result<Option<BlockHeight>, Error> {
let maybe_block = crate::rpc::query_block(&self.inner.client).await?;
Ok(maybe_block.map(|b| b.height))
}

async fn fetch_shielded_transfers(
&self,
from: BlockHeight,
to: BlockHeight,
Expand Down Expand Up @@ -142,43 +258,6 @@ impl<C: Client + Send + Sync> LedgerMaspClient<C> {

Ok(txs)
}
}

impl<C: Client + Send + Sync> MaspClient for LedgerMaspClient<C> {
type Error = Error;

async fn last_block_height(&self) -> Result<Option<BlockHeight>, Error> {
let maybe_block = crate::rpc::query_block(&self.inner.client).await?;
Ok(maybe_block.map(|b| b.height))
}

async fn fetch_shielded_transfers(
&self,
from: BlockHeight,
to: BlockHeight,
) -> Result<Vec<IndexedNoteEntry>, Error> {
const ZERO: Duration = Duration::from_secs(0);
let current_backoff = { *self.inner.backoff.read().unwrap() };

if current_backoff > ZERO {
self.inner
.sleep
.sleep_with_current_backoff(&current_backoff)
.await;
}

let result = self.fetch_shielded_transfers_inner(from, to).await;

if result.is_err() {
let mut backoff = self.inner.backoff.write().unwrap();
self.inner.sleep.strategy.next_state(&mut *backoff);
} else if current_backoff > ZERO {
let mut backoff = self.inner.backoff.write().unwrap();
self.inner.sleep.strategy.prev_state(&mut *backoff);
}

result
}

#[inline(always)]
fn capabilities(&self) -> MaspClientCapabilities {
Expand Down Expand Up @@ -881,6 +960,34 @@ impl BlockIndex {
}
}

async fn with_linear_backoff<F, T, E>(
backoff: &RwLock<Duration>,
sleep: &Sleep<LinearBackoff>,
fut: F,
) -> Result<T, E>
where
F: Future<Output = Result<T, E>>,
{
const ZERO: Duration = Duration::from_secs(0);
let current_backoff = { *backoff.read().unwrap() };

if current_backoff > ZERO {
sleep.sleep_with_current_backoff(&current_backoff).await;
}

let result = fut.await;

if result.is_err() {
let mut backoff = backoff.write().unwrap();
sleep.strategy.next_state(&mut *backoff);
} else if current_backoff > ZERO {
let mut backoff = backoff.write().unwrap();
sleep.strategy.prev_state(&mut *backoff);
}

result
}

#[cfg(test)]
mod test_block_index {
use std::ops::ControlFlow;
Expand Down
Loading