diff --git a/.changelog/unreleased/improvements/4702-indexer-client-backoff-sleep.md b/.changelog/unreleased/improvements/4702-indexer-client-backoff-sleep.md new file mode 100644 index 00000000000..26240b5ed86 --- /dev/null +++ b/.changelog/unreleased/improvements/4702-indexer-client-backoff-sleep.md @@ -0,0 +1,2 @@ +- Add backoff sleep between MASP indexer client failed requests. + ([\#4702](https://github.com/anoma/namada/pull/4702)) \ No newline at end of file diff --git a/crates/apps_lib/src/client/masp.rs b/crates/apps_lib/src/client/masp.rs index df2d7e04917..a344234e535 100644 --- a/crates/apps_lib/src/client/masp.rs +++ b/crates/apps_lib/src/client/masp.rs @@ -8,8 +8,8 @@ use namada_sdk::error::Error; use namada_sdk::io::DevNullProgressBar; use namada_sdk::io::{Client, Io, MaybeSend, MaybeSync, display, display_line}; use namada_sdk::masp::{ - IndexerMaspClient, LedgerMaspClient, MaspLocalTaskEnv, ShieldedContext, - ShieldedSyncConfig, ShieldedUtils, + IndexerMaspClient, LedgerMaspClient, LinearBackoffSleepMaspClient, + MaspLocalTaskEnv, ShieldedContext, ShieldedSyncConfig, ShieldedUtils, }; #[allow(clippy::too_many_arguments)] @@ -126,11 +126,14 @@ pub async fn syncing< )) })?; - dispatch_client!(IndexerMaspClient::new( - client, - url, - true, - args.max_concurrent_fetches, + dispatch_client!(LinearBackoffSleepMaspClient::new( + IndexerMaspClient::new( + client, + url, + true, + args.max_concurrent_fetches, + ), + Duration::from_millis(5) ))? } else { display_line!( @@ -139,10 +142,9 @@ pub async fn syncing< "==== Shielded sync started using ledger client ====".bold() ); - dispatch_client!(LedgerMaspClient::new( - client, - args.max_concurrent_fetches, - Duration::from_millis(5), + dispatch_client!(LinearBackoffSleepMaspClient::new( + LedgerMaspClient::new(client, args.max_concurrent_fetches,), + Duration::from_millis(5) ))? }; diff --git a/crates/sdk/src/masp.rs b/crates/sdk/src/masp.rs index 53f173c2bf0..7677bd9a870 100644 --- a/crates/sdk/src/masp.rs +++ b/crates/sdk/src/masp.rs @@ -21,7 +21,9 @@ use namada_token::masp::shielded_wallet::ShieldedQueries; pub use namada_token::masp::{utils, *}; use namada_tx::event::{MaspEvent, MaspEventKind, MaspTxRef}; use namada_tx::{IndexedTx, Tx}; -pub use utilities::{IndexerMaspClient, LedgerMaspClient}; +pub use utilities::{ + IndexerMaspClient, LedgerMaspClient, LinearBackoffSleepMaspClient, +}; use crate::error::{Error, QueryError}; use crate::rpc::{ diff --git a/crates/sdk/src/masp/utilities.rs b/crates/sdk/src/masp/utilities.rs index 0a7c31052fd..648c159b56c 100644 --- a/crates/sdk/src/masp/utilities.rs +++ b/crates/sdk/src/masp/utilities.rs @@ -1,6 +1,7 @@ //! Helper functions and types use std::collections::BTreeMap; +use std::future::Future; use std::sync::{Arc, RwLock}; use borsh::BorshDeserialize; @@ -25,11 +26,129 @@ use tokio::sync::Semaphore; use crate::error::{Error, QueryError}; use crate::masp::{extract_masp_tx, get_indexed_masp_events_at_height}; +/// Middleware MASP client implementation that introduces +/// linear backoff sleeps between failed requests. +pub struct LinearBackoffSleepMaspClient { + middleware_client: M, + shared: Arc, +} + +struct LinearBackoffSleepMaspClientShared { + backoff: RwLock, + sleep: Sleep, +} + +impl LinearBackoffSleepMaspClient { + /// Create a new [`MaspClient`] with linear backoff + /// sleep between failed requests. + #[inline(always)] + pub fn new(middleware_client: M, linear_backoff_delta: Duration) -> Self { + Self { + middleware_client, + shared: Arc::new(LinearBackoffSleepMaspClientShared { + backoff: RwLock::new(Duration::from_secs(0)), + sleep: Sleep { + strategy: LinearBackoff { + delta: linear_backoff_delta, + }, + }, + }), + } + } +} + +impl Clone for LinearBackoffSleepMaspClient { + fn clone(&self) -> Self { + Self { + middleware_client: self.middleware_client.clone(), + shared: Arc::clone(&self.shared), + } + } +} + +impl MaspClient for LinearBackoffSleepMaspClient { + type Error = ::Error; + + async fn last_block_height( + &self, + ) -> Result, Self::Error> { + with_linear_backoff( + &self.shared.backoff, + &self.shared.sleep, + self.middleware_client.last_block_height(), + ) + .await + } + + async fn fetch_shielded_transfers( + &self, + from: BlockHeight, + to: BlockHeight, + ) -> Result, Self::Error> { + with_linear_backoff( + &self.shared.backoff, + &self.shared.sleep, + self.middleware_client.fetch_shielded_transfers(from, to), + ) + .await + } + + fn capabilities(&self) -> MaspClientCapabilities { + self.middleware_client.capabilities() + } + + async fn fetch_commitment_tree( + &self, + height: BlockHeight, + ) -> Result, Self::Error> { + with_linear_backoff( + &self.shared.backoff, + &self.shared.sleep, + self.middleware_client.fetch_commitment_tree(height), + ) + .await + } + + async fn fetch_note_index( + &self, + height: BlockHeight, + ) -> Result, Self::Error> { + with_linear_backoff( + &self.shared.backoff, + &self.shared.sleep, + self.middleware_client.fetch_note_index(height), + ) + .await + } + + async fn fetch_witness_map( + &self, + height: BlockHeight, + ) -> Result>, Self::Error> { + with_linear_backoff( + &self.shared.backoff, + &self.shared.sleep, + self.middleware_client.fetch_witness_map(height), + ) + .await + } + + async fn commitment_anchor_exists( + &self, + root: &Node, + ) -> Result { + with_linear_backoff( + &self.shared.backoff, + &self.shared.sleep, + self.middleware_client.commitment_anchor_exists(root), + ) + .await + } +} + struct LedgerMaspClientInner { client: C, semaphore: Semaphore, - backoff: RwLock, - sleep: Sleep, } /// An inefficient MASP client which simply uses a @@ -49,28 +168,25 @@ impl Clone for LedgerMaspClient { impl LedgerMaspClient { /// Create a new [`MaspClient`] given an rpc client. #[inline(always)] - pub fn new( - client: C, - max_concurrent_fetches: usize, - linear_backoff_delta: Duration, - ) -> Self { + pub fn new(client: C, max_concurrent_fetches: usize) -> Self { Self { inner: Arc::new(LedgerMaspClientInner { client, semaphore: Semaphore::new(max_concurrent_fetches), - backoff: RwLock::new(Duration::from_secs(0)), - sleep: Sleep { - strategy: LinearBackoff { - delta: linear_backoff_delta, - }, - }, }), } } } -impl LedgerMaspClient { - async fn fetch_shielded_transfers_inner( +impl MaspClient for LedgerMaspClient { + type Error = Error; + + async fn last_block_height(&self) -> Result, Error> { + let maybe_block = crate::rpc::query_block(&self.inner.client).await?; + Ok(maybe_block.map(|b| b.height)) + } + + async fn fetch_shielded_transfers( &self, from: BlockHeight, to: BlockHeight, @@ -142,43 +258,6 @@ impl LedgerMaspClient { Ok(txs) } -} - -impl MaspClient for LedgerMaspClient { - type Error = Error; - - async fn last_block_height(&self) -> Result, Error> { - let maybe_block = crate::rpc::query_block(&self.inner.client).await?; - Ok(maybe_block.map(|b| b.height)) - } - - async fn fetch_shielded_transfers( - &self, - from: BlockHeight, - to: BlockHeight, - ) -> Result, Error> { - const ZERO: Duration = Duration::from_secs(0); - let current_backoff = { *self.inner.backoff.read().unwrap() }; - - if current_backoff > ZERO { - self.inner - .sleep - .sleep_with_current_backoff(¤t_backoff) - .await; - } - - let result = self.fetch_shielded_transfers_inner(from, to).await; - - if result.is_err() { - let mut backoff = self.inner.backoff.write().unwrap(); - self.inner.sleep.strategy.next_state(&mut *backoff); - } else if current_backoff > ZERO { - let mut backoff = self.inner.backoff.write().unwrap(); - self.inner.sleep.strategy.prev_state(&mut *backoff); - } - - result - } #[inline(always)] fn capabilities(&self) -> MaspClientCapabilities { @@ -881,6 +960,34 @@ impl BlockIndex { } } +async fn with_linear_backoff( + backoff: &RwLock, + sleep: &Sleep, + fut: F, +) -> Result +where + F: Future>, +{ + const ZERO: Duration = Duration::from_secs(0); + let current_backoff = { *backoff.read().unwrap() }; + + if current_backoff > ZERO { + sleep.sleep_with_current_backoff(¤t_backoff).await; + } + + let result = fut.await; + + if result.is_err() { + let mut backoff = backoff.write().unwrap(); + sleep.strategy.next_state(&mut *backoff); + } else if current_backoff > ZERO { + let mut backoff = backoff.write().unwrap(); + sleep.strategy.prev_state(&mut *backoff); + } + + result +} + #[cfg(test)] mod test_block_index { use std::ops::ControlFlow;