diff --git a/.changelog/unreleased/improvements/4016-ss-ledger-client-improvements.md b/.changelog/unreleased/improvements/4016-ss-ledger-client-improvements.md new file mode 100644 index 00000000000..cfcd0923e95 --- /dev/null +++ b/.changelog/unreleased/improvements/4016-ss-ledger-client-improvements.md @@ -0,0 +1,2 @@ +- Improve the shielded sync's ledger client performance and user experience. + ([\#4016](https://github.com/anoma/namada/pull/4016)) \ No newline at end of file diff --git a/crates/apps_lib/src/cli.rs b/crates/apps_lib/src/cli.rs index 9739caaed53..8fb3e293b30 100644 --- a/crates/apps_lib/src/cli.rs +++ b/crates/apps_lib/src/cli.rs @@ -3386,6 +3386,8 @@ pub mod args { }), ); pub const BIRTHDAY: ArgOpt = arg_opt("birthday"); + pub const BLOCK_BATCH: ArgDefault = + arg_default("block-batch", DefaultFn(|| 10)); pub const BLOCK_HEIGHT: Arg = arg("block-height"); pub const BLOCK_HEIGHT_OPT: ArgOpt = arg_opt("height"); pub const BLOCK_HEIGHT_TO_OPT: ArgOpt = arg_opt("to-height"); @@ -6803,6 +6805,7 @@ pub mod args { Some(times) => RetryStrategy::Times(times), None => RetryStrategy::Forever, }; + let block_batch_size = BLOCK_BATCH.parse(matches); Self { ledger_address, last_query_height, @@ -6812,6 +6815,7 @@ pub mod args { wait_for_last_query_height, max_concurrent_fetches, retry_strategy, + block_batch_size, } } @@ -6849,6 +6853,10 @@ pub mod args { "Maximum number of times to retry fetching. If no \ argument is provided, defaults to retrying forever." ))) + .arg(BLOCK_BATCH.def().help(wrap!( + "Number of blocks fetched per concurrent fetch job. The \ + default is 10." + ))) } } @@ -6862,6 +6870,7 @@ pub mod args { let chain_ctx = ctx.borrow_mut_chain_or_exit(); Ok(ShieldedSync { + block_batch_size: self.block_batch_size, max_concurrent_fetches: self.max_concurrent_fetches, wait_for_last_query_height: self.wait_for_last_query_height, ledger_address: chain_ctx.get(&self.ledger_address), diff --git a/crates/apps_lib/src/client/masp.rs b/crates/apps_lib/src/client/masp.rs index 7e81ac07fde..1973564acd4 100644 --- a/crates/apps_lib/src/client/masp.rs +++ b/crates/apps_lib/src/client/masp.rs @@ -84,6 +84,7 @@ pub async fn syncing< .shutdown_signal(install_shutdown_signal(false)) .wait_for_last_query_height(args.wait_for_last_query_height) .retry_strategy(args.retry_strategy) + .block_batch_size(args.block_batch_size) .build(); let env = MaspLocalTaskEnv::new(500) @@ -141,6 +142,7 @@ pub async fn syncing< dispatch_client!(LedgerMaspClient::new( client, args.max_concurrent_fetches, + Duration::from_millis(5), ))? }; diff --git a/crates/core/src/control_flow/time.rs b/crates/core/src/control_flow/time.rs index 91c7dbfbd98..46d85ad79d7 100644 --- a/crates/core/src/control_flow/time.rs +++ b/crates/core/src/control_flow/time.rs @@ -29,9 +29,12 @@ pub trait SleepStrategy { /// Calculate a duration from a sleep strategy state. fn backoff(&self, state: &Self::State) -> Duration; - /// Update the state of the sleep strategy. + /// Move to the next state of the sleep strategy. fn next_state(&self, state: &mut Self::State); + /// Move to the previous state of the sleep strategy. + fn prev_state(&self, state: &mut Self::State); + /// Map a function to the duration returned from a /// sleep strategy. fn map(self, map: M) -> Map @@ -73,6 +76,11 @@ where fn next_state(&self, state: &mut S::State) { self.strategy.next_state(state) } + + #[inline] + fn prev_state(&self, state: &mut S::State) { + self.strategy.prev_state(state) + } } /// Constant sleep strategy. @@ -93,6 +101,10 @@ impl SleepStrategy for Constant { fn next_state(&self, _: &mut ()) { // NOOP } + + fn prev_state(&self, _: &mut ()) { + // NOOP + } } /// Linear backoff sleep strategy. @@ -114,7 +126,11 @@ impl SleepStrategy for LinearBackoff { } fn next_state(&self, state: &mut Duration) { - *state += self.delta; + *state = state.saturating_add(self.delta); + } + + fn prev_state(&self, state: &mut Duration) { + *state = state.saturating_sub(self.delta); } } @@ -144,6 +160,10 @@ where fn next_state(&self, state: &mut Self::State) { *state = state.saturating_add(1); } + + fn prev_state(&self, state: &mut Self::State) { + *state = state.saturating_sub(1); + } } /// A [`SleepStrategy`] adaptor, to run async tasks with custom @@ -236,7 +256,17 @@ impl Sleep { /// Update the sleep strategy state, and sleep for the given backoff. async fn sleep_update(&self, state: &mut S::State) { self.strategy.next_state(state); - sleep(self.strategy.backoff(state)).await; + self.sleep_with_current_backoff(state).await; + } + + /// Sleep for a [`Duration`] equivalent to the value of + /// the current backoff. + pub fn sleep_with_current_backoff( + &self, + state: &S::State, + ) -> impl Future + 'static { + let backoff_duration = self.strategy.backoff(state); + sleep(backoff_duration) } /// Run a future as many times as `iter_times` diff --git a/crates/node/src/bench_utils.rs b/crates/node/src/bench_utils.rs index e8224f4a247..9bbaeb53f3b 100644 --- a/crates/node/src/bench_utils.rs +++ b/crates/node/src/bench_utils.rs @@ -1192,6 +1192,7 @@ impl BenchShieldedCtx { wait_for_last_query_height: false, max_concurrent_fetches: 100, retry_strategy: RetryStrategy::Forever, + block_batch_size: 10, }, &StdIo, )) diff --git a/crates/sdk/src/args.rs b/crates/sdk/src/args.rs index 5933a663f30..9874d3aaf0f 100644 --- a/crates/sdk/src/args.rs +++ b/crates/sdk/src/args.rs @@ -2158,6 +2158,8 @@ pub struct ShieldedSync { /// Maximum number of fetch jobs that will ever /// execute concurrently during the shielded sync. pub max_concurrent_fetches: usize, + /// Number of blocks fetched per concurrent fetch job. + pub block_batch_size: usize, /// Maximum number of times to retry fetching. If `None` /// is provided, defaults to "forever". pub retry_strategy: RetryStrategy, diff --git a/crates/sdk/src/masp/utilities.rs b/crates/sdk/src/masp/utilities.rs index 8f892bb2951..47be9bfbae0 100644 --- a/crates/sdk/src/masp/utilities.rs +++ b/crates/sdk/src/masp/utilities.rs @@ -1,7 +1,7 @@ //! Helper functions and types use std::collections::BTreeMap; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use borsh::BorshDeserialize; use masp_primitives::merkle_tree::{CommitmentTree, IncrementalWitness}; @@ -9,6 +9,9 @@ use masp_primitives::sapling::Node; use masp_primitives::transaction::Transaction as MaspTx; use namada_core::chain::BlockHeight; use namada_core::collections::HashMap; +use namada_core::control_flow::time::{ + Duration, LinearBackoff, Sleep, SleepStrategy, +}; use namada_core::storage::TxIndex; use namada_events::extend::IndexedMaspData; use namada_io::Client; @@ -24,6 +27,8 @@ use crate::masp::{extract_masp_tx, get_indexed_masp_events_at_height}; struct LedgerMaspClientInner { client: C, semaphore: Semaphore, + backoff: RwLock, + sleep: Sleep, } /// An inefficient MASP client which simply uses a @@ -43,36 +48,39 @@ impl Clone for LedgerMaspClient { impl LedgerMaspClient { /// Create a new [`MaspClient`] given an rpc client. #[inline(always)] - pub fn new(client: C, max_concurrent_fetches: usize) -> Self { + pub fn new( + client: C, + max_concurrent_fetches: usize, + linear_backoff_delta: Duration, + ) -> Self { Self { inner: Arc::new(LedgerMaspClientInner { client, semaphore: Semaphore::new(max_concurrent_fetches), + backoff: RwLock::new(Duration::from_secs(0)), + sleep: Sleep { + strategy: LinearBackoff { + delta: linear_backoff_delta, + }, + }, }), } } } -impl MaspClient for LedgerMaspClient { - type Error = Error; - - async fn last_block_height(&self) -> Result, Error> { - let maybe_block = crate::rpc::query_block(&self.inner.client).await?; - Ok(maybe_block.map(|b| b.height)) - } - - async fn fetch_shielded_transfers( +impl LedgerMaspClient { + async fn fetch_shielded_transfers_inner( &self, from: BlockHeight, to: BlockHeight, ) -> Result, Error> { + let _permit = self.inner.semaphore.acquire().await.unwrap(); + // Fetch all the transactions we do not have yet let mut txs = vec![]; for height in from.0..=to.0 { let maybe_txs_results = async { - let _permit = self.inner.semaphore.acquire().await.unwrap(); - get_indexed_masp_events_at_height( &self.inner.client, height.into(), @@ -86,8 +94,6 @@ impl MaspClient for LedgerMaspClient { }; let block = { - let _permit = self.inner.semaphore.acquire().await.unwrap(); - // Query the actual block to get the txs bytes. If we only need // one tx it might be slightly better to query // the /tx endpoint to reduce the amount of data @@ -127,6 +133,43 @@ impl MaspClient for LedgerMaspClient { Ok(txs) } +} + +impl MaspClient for LedgerMaspClient { + type Error = Error; + + async fn last_block_height(&self) -> Result, Error> { + let maybe_block = crate::rpc::query_block(&self.inner.client).await?; + Ok(maybe_block.map(|b| b.height)) + } + + async fn fetch_shielded_transfers( + &self, + from: BlockHeight, + to: BlockHeight, + ) -> Result, Error> { + const ZERO: Duration = Duration::from_secs(0); + let current_backoff = { *self.inner.backoff.read().unwrap() }; + + if current_backoff > ZERO { + self.inner + .sleep + .sleep_with_current_backoff(¤t_backoff) + .await; + } + + let result = self.fetch_shielded_transfers_inner(from, to).await; + + if result.is_err() { + let mut backoff = self.inner.backoff.write().unwrap(); + self.inner.sleep.strategy.next_state(&mut *backoff); + } else if current_backoff > ZERO { + let mut backoff = self.inner.backoff.write().unwrap(); + self.inner.sleep.strategy.prev_state(&mut *backoff); + } + + result + } #[inline(always)] fn capabilities(&self) -> MaspClientCapabilities { diff --git a/crates/shielded_token/src/masp/shielded_sync/dispatcher.rs b/crates/shielded_token/src/masp/shielded_sync/dispatcher.rs index 68fe08a8a78..6a53a6270e8 100644 --- a/crates/shielded_token/src/masp/shielded_sync/dispatcher.rs +++ b/crates/shielded_token/src/masp/shielded_sync/dispatcher.rs @@ -702,7 +702,6 @@ where } if self.config.retry_strategy.may_retry() { - self.config.fetched_tracker.message(format!("{error}")); true } else { // NB: store last encountered error