Skip to content

Commit cc5ec95

Browse files
authored
Merge pull request #4702 from namada-net/tiago/indexer-client-backoff-sleep
Indexer client backoff sleep
2 parents 40be1d6 + 76788fd commit cc5ec95

File tree

4 files changed

+177
-64
lines changed

4 files changed

+177
-64
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
- Add backoff sleep between MASP indexer client failed requests.
2+
([\#4702](https://github.com/anoma/namada/pull/4702))

crates/apps_lib/src/client/masp.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use namada_sdk::error::Error;
88
use namada_sdk::io::DevNullProgressBar;
99
use namada_sdk::io::{Client, Io, MaybeSend, MaybeSync, display, display_line};
1010
use namada_sdk::masp::{
11-
IndexerMaspClient, LedgerMaspClient, MaspLocalTaskEnv, ShieldedContext,
12-
ShieldedSyncConfig, ShieldedUtils,
11+
IndexerMaspClient, LedgerMaspClient, LinearBackoffSleepMaspClient,
12+
MaspLocalTaskEnv, ShieldedContext, ShieldedSyncConfig, ShieldedUtils,
1313
};
1414

1515
#[allow(clippy::too_many_arguments)]
@@ -126,11 +126,14 @@ pub async fn syncing<
126126
))
127127
})?;
128128

129-
dispatch_client!(IndexerMaspClient::new(
130-
client,
131-
url,
132-
true,
133-
args.max_concurrent_fetches,
129+
dispatch_client!(LinearBackoffSleepMaspClient::new(
130+
IndexerMaspClient::new(
131+
client,
132+
url,
133+
true,
134+
args.max_concurrent_fetches,
135+
),
136+
Duration::from_millis(5)
134137
))?
135138
} else {
136139
display_line!(
@@ -139,10 +142,9 @@ pub async fn syncing<
139142
"==== Shielded sync started using ledger client ====".bold()
140143
);
141144

142-
dispatch_client!(LedgerMaspClient::new(
143-
client,
144-
args.max_concurrent_fetches,
145-
Duration::from_millis(5),
145+
dispatch_client!(LinearBackoffSleepMaspClient::new(
146+
LedgerMaspClient::new(client, args.max_concurrent_fetches,),
147+
Duration::from_millis(5)
146148
))?
147149
};
148150

crates/sdk/src/masp.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ use namada_token::masp::shielded_wallet::ShieldedQueries;
2121
pub use namada_token::masp::{utils, *};
2222
use namada_tx::event::{MaspEvent, MaspEventKind, MaspTxRef};
2323
use namada_tx::{IndexedTx, Tx};
24-
pub use utilities::{IndexerMaspClient, LedgerMaspClient};
24+
pub use utilities::{
25+
IndexerMaspClient, LedgerMaspClient, LinearBackoffSleepMaspClient,
26+
};
2527

2628
use crate::error::{Error, QueryError};
2729
use crate::rpc::{

crates/sdk/src/masp/utilities.rs

Lines changed: 159 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Helper functions and types
22
33
use std::collections::BTreeMap;
4+
use std::future::Future;
45
use std::sync::{Arc, RwLock};
56

67
use borsh::BorshDeserialize;
@@ -25,11 +26,129 @@ use tokio::sync::Semaphore;
2526
use crate::error::{Error, QueryError};
2627
use crate::masp::{extract_masp_tx, get_indexed_masp_events_at_height};
2728

29+
/// Middleware MASP client implementation that introduces
30+
/// linear backoff sleeps between failed requests.
31+
pub struct LinearBackoffSleepMaspClient<M> {
32+
middleware_client: M,
33+
shared: Arc<LinearBackoffSleepMaspClientShared>,
34+
}
35+
36+
struct LinearBackoffSleepMaspClientShared {
37+
backoff: RwLock<Duration>,
38+
sleep: Sleep<LinearBackoff>,
39+
}
40+
41+
impl<M> LinearBackoffSleepMaspClient<M> {
42+
/// Create a new [`MaspClient`] with linear backoff
43+
/// sleep between failed requests.
44+
#[inline(always)]
45+
pub fn new(middleware_client: M, linear_backoff_delta: Duration) -> Self {
46+
Self {
47+
middleware_client,
48+
shared: Arc::new(LinearBackoffSleepMaspClientShared {
49+
backoff: RwLock::new(Duration::from_secs(0)),
50+
sleep: Sleep {
51+
strategy: LinearBackoff {
52+
delta: linear_backoff_delta,
53+
},
54+
},
55+
}),
56+
}
57+
}
58+
}
59+
60+
impl<M: Clone> Clone for LinearBackoffSleepMaspClient<M> {
61+
fn clone(&self) -> Self {
62+
Self {
63+
middleware_client: self.middleware_client.clone(),
64+
shared: Arc::clone(&self.shared),
65+
}
66+
}
67+
}
68+
69+
impl<M: MaspClient> MaspClient for LinearBackoffSleepMaspClient<M> {
70+
type Error = <M as MaspClient>::Error;
71+
72+
async fn last_block_height(
73+
&self,
74+
) -> Result<Option<BlockHeight>, Self::Error> {
75+
with_linear_backoff(
76+
&self.shared.backoff,
77+
&self.shared.sleep,
78+
self.middleware_client.last_block_height(),
79+
)
80+
.await
81+
}
82+
83+
async fn fetch_shielded_transfers(
84+
&self,
85+
from: BlockHeight,
86+
to: BlockHeight,
87+
) -> Result<Vec<IndexedNoteEntry>, Self::Error> {
88+
with_linear_backoff(
89+
&self.shared.backoff,
90+
&self.shared.sleep,
91+
self.middleware_client.fetch_shielded_transfers(from, to),
92+
)
93+
.await
94+
}
95+
96+
fn capabilities(&self) -> MaspClientCapabilities {
97+
self.middleware_client.capabilities()
98+
}
99+
100+
async fn fetch_commitment_tree(
101+
&self,
102+
height: BlockHeight,
103+
) -> Result<CommitmentTree<Node>, Self::Error> {
104+
with_linear_backoff(
105+
&self.shared.backoff,
106+
&self.shared.sleep,
107+
self.middleware_client.fetch_commitment_tree(height),
108+
)
109+
.await
110+
}
111+
112+
async fn fetch_note_index(
113+
&self,
114+
height: BlockHeight,
115+
) -> Result<BTreeMap<MaspIndexedTx, usize>, Self::Error> {
116+
with_linear_backoff(
117+
&self.shared.backoff,
118+
&self.shared.sleep,
119+
self.middleware_client.fetch_note_index(height),
120+
)
121+
.await
122+
}
123+
124+
async fn fetch_witness_map(
125+
&self,
126+
height: BlockHeight,
127+
) -> Result<HashMap<usize, IncrementalWitness<Node>>, Self::Error> {
128+
with_linear_backoff(
129+
&self.shared.backoff,
130+
&self.shared.sleep,
131+
self.middleware_client.fetch_witness_map(height),
132+
)
133+
.await
134+
}
135+
136+
async fn commitment_anchor_exists(
137+
&self,
138+
root: &Node,
139+
) -> Result<bool, Self::Error> {
140+
with_linear_backoff(
141+
&self.shared.backoff,
142+
&self.shared.sleep,
143+
self.middleware_client.commitment_anchor_exists(root),
144+
)
145+
.await
146+
}
147+
}
148+
28149
struct LedgerMaspClientInner<C> {
29150
client: C,
30151
semaphore: Semaphore,
31-
backoff: RwLock<Duration>,
32-
sleep: Sleep<LinearBackoff>,
33152
}
34153

35154
/// An inefficient MASP client which simply uses a
@@ -49,28 +168,25 @@ impl<C> Clone for LedgerMaspClient<C> {
49168
impl<C> LedgerMaspClient<C> {
50169
/// Create a new [`MaspClient`] given an rpc client.
51170
#[inline(always)]
52-
pub fn new(
53-
client: C,
54-
max_concurrent_fetches: usize,
55-
linear_backoff_delta: Duration,
56-
) -> Self {
171+
pub fn new(client: C, max_concurrent_fetches: usize) -> Self {
57172
Self {
58173
inner: Arc::new(LedgerMaspClientInner {
59174
client,
60175
semaphore: Semaphore::new(max_concurrent_fetches),
61-
backoff: RwLock::new(Duration::from_secs(0)),
62-
sleep: Sleep {
63-
strategy: LinearBackoff {
64-
delta: linear_backoff_delta,
65-
},
66-
},
67176
}),
68177
}
69178
}
70179
}
71180

72-
impl<C: Client + Send + Sync> LedgerMaspClient<C> {
73-
async fn fetch_shielded_transfers_inner(
181+
impl<C: Client + Send + Sync> MaspClient for LedgerMaspClient<C> {
182+
type Error = Error;
183+
184+
async fn last_block_height(&self) -> Result<Option<BlockHeight>, Error> {
185+
let maybe_block = crate::rpc::query_block(&self.inner.client).await?;
186+
Ok(maybe_block.map(|b| b.height))
187+
}
188+
189+
async fn fetch_shielded_transfers(
74190
&self,
75191
from: BlockHeight,
76192
to: BlockHeight,
@@ -142,43 +258,6 @@ impl<C: Client + Send + Sync> LedgerMaspClient<C> {
142258

143259
Ok(txs)
144260
}
145-
}
146-
147-
impl<C: Client + Send + Sync> MaspClient for LedgerMaspClient<C> {
148-
type Error = Error;
149-
150-
async fn last_block_height(&self) -> Result<Option<BlockHeight>, Error> {
151-
let maybe_block = crate::rpc::query_block(&self.inner.client).await?;
152-
Ok(maybe_block.map(|b| b.height))
153-
}
154-
155-
async fn fetch_shielded_transfers(
156-
&self,
157-
from: BlockHeight,
158-
to: BlockHeight,
159-
) -> Result<Vec<IndexedNoteEntry>, Error> {
160-
const ZERO: Duration = Duration::from_secs(0);
161-
let current_backoff = { *self.inner.backoff.read().unwrap() };
162-
163-
if current_backoff > ZERO {
164-
self.inner
165-
.sleep
166-
.sleep_with_current_backoff(&current_backoff)
167-
.await;
168-
}
169-
170-
let result = self.fetch_shielded_transfers_inner(from, to).await;
171-
172-
if result.is_err() {
173-
let mut backoff = self.inner.backoff.write().unwrap();
174-
self.inner.sleep.strategy.next_state(&mut *backoff);
175-
} else if current_backoff > ZERO {
176-
let mut backoff = self.inner.backoff.write().unwrap();
177-
self.inner.sleep.strategy.prev_state(&mut *backoff);
178-
}
179-
180-
result
181-
}
182261

183262
#[inline(always)]
184263
fn capabilities(&self) -> MaspClientCapabilities {
@@ -881,6 +960,34 @@ impl BlockIndex {
881960
}
882961
}
883962

963+
async fn with_linear_backoff<F, T, E>(
964+
backoff: &RwLock<Duration>,
965+
sleep: &Sleep<LinearBackoff>,
966+
fut: F,
967+
) -> Result<T, E>
968+
where
969+
F: Future<Output = Result<T, E>>,
970+
{
971+
const ZERO: Duration = Duration::from_secs(0);
972+
let current_backoff = { *backoff.read().unwrap() };
973+
974+
if current_backoff > ZERO {
975+
sleep.sleep_with_current_backoff(&current_backoff).await;
976+
}
977+
978+
let result = fut.await;
979+
980+
if result.is_err() {
981+
let mut backoff = backoff.write().unwrap();
982+
sleep.strategy.next_state(&mut *backoff);
983+
} else if current_backoff > ZERO {
984+
let mut backoff = backoff.write().unwrap();
985+
sleep.strategy.prev_state(&mut *backoff);
986+
}
987+
988+
result
989+
}
990+
884991
#[cfg(test)]
885992
mod test_block_index {
886993
use std::ops::ControlFlow;

0 commit comments

Comments
 (0)