Skip to content

Commit 2cedd4c

Browse files
authored
Merge of #3578
2 parents 767c35f + 9626233 commit 2cedd4c

38 files changed

Lines changed: 4741 additions & 3113 deletions

File tree

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
- Parallelize the shielded sync implementation in Namada.
2+
([\#3578](https://github.com/anoma/namada/pull/3578))

Cargo.lock

Lines changed: 55 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ expectrl = "0.7.0"
109109
eyre = "0.6.12"
110110
fd-lock = "3.0.12"
111111
flate2 = "1.0.22"
112+
flume = "0.11.0"
112113
fs_extra = "1.2.0"
113114
futures = "0.3"
114115
git2 = { version = "0.18.1", default-features = false }
@@ -118,9 +119,11 @@ ibc-testkit = { git = "https://github.com/heliaxdev/cosmos-ibc-rs", rev = "1dd9b
118119
ics23 = "0.11.0"
119120
index-set = { git = "https://github.com/heliaxdev/index-set", tag = "v0.8.1", features = ["serialize-borsh", "serialize-serde"] }
120121
indexmap = { git = "https://github.com/heliaxdev/indexmap", tag = "2.2.4-heliax-1", features = ["borsh-schema", "serde"] }
122+
init-once = "0.6.0"
121123
itertools = "0.12.1"
122124
jubjub = "0.10"
123125
k256 = { version = "0.13.0", default-features = false, features = ["ecdsa", "pkcs8", "precomputed-tables", "serde", "std"]}
126+
kdam = "0.5.2"
124127
konst = { version = "0.3.8", default-features = false }
125128
lazy_static = "1.4.0"
126129
# TODO: upstreamed in https://github.com/ledger-community/rust-ledger/pull/9
@@ -152,7 +155,7 @@ prost = "0.12.0"
152155
prost-types = "0.12.0"
153156
rand = {version = "0.8", default-features = false}
154157
rand_core = {version = "0.6", default-features = false}
155-
rayon = "=1.5.3"
158+
rayon = "1.5.3"
156159
regex = "1.10.2"
157160
reqwest = "0.11.4"
158161
ripemd = "0.1"
@@ -194,6 +197,7 @@ tracing = "0.1.30"
194197
tracing-appender = "0.2.2"
195198
tracing-log = "0.2.0"
196199
tracing-subscriber = {version = "0.3.7", default-features = false, features = ["env-filter", "fmt"]}
200+
typed-builder = "0.19.1"
197201
wasmparser = "0.107.0"
198202
wasm-instrument = {version = "0.4.0", features = ["sign_ext"]}
199203
wasmer = "4.3.5"
@@ -204,6 +208,7 @@ wasmer-types = "4.3.5"
204208
wasmer-vm = "4.3.5"
205209
wasmtimer = "0.2.0"
206210
winapi = "0.3.9"
211+
xorf = { version = "0.11.0", features = ["serde"] }
207212
yansi = "0.5.1"
208213
zeroize = { version = "1.5.5", features = ["zeroize_derive"] }
209214

crates/apps_lib/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ fd-lock.workspace = true
5454
flate2.workspace = true
5555
futures.workspace = true
5656
itertools.workspace = true
57+
kdam.workspace = true
5758
lazy_static = { workspace = true, optional = true }
5859
linkme = { workspace = true, optional = true }
5960
ledger-lib = { workspace = true }

crates/apps_lib/src/cli.rs

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3165,6 +3165,7 @@ pub mod args {
31653165
use namada_sdk::ibc::core::host::types::identifiers::{ChannelId, PortId};
31663166
use namada_sdk::keccak::KeccakHash;
31673167
use namada_sdk::key::*;
3168+
use namada_sdk::masp::utils::RetryStrategy;
31683169
use namada_sdk::masp::{MaspEpoch, PaymentAddress};
31693170
use namada_sdk::storage::{self, BlockHeight, Epoch};
31703171
use namada_sdk::time::DateTimeUtc;
@@ -3214,8 +3215,6 @@ pub mod args {
32143215
pub const BIRTHDAY: ArgOpt<BlockHeight> = arg_opt("birthday");
32153216
pub const BLOCK_HEIGHT: Arg<BlockHeight> = arg("block-height");
32163217
pub const BLOCK_HEIGHT_OPT: ArgOpt<BlockHeight> = arg_opt("height");
3217-
pub const BLOCK_HEIGHT_FROM_OPT: ArgOpt<BlockHeight> =
3218-
arg_opt("from-height");
32193218
pub const BLOCK_HEIGHT_TO_OPT: ArgOpt<BlockHeight> = arg_opt("to-height");
32203219
pub const BRIDGE_POOL_GAS_AMOUNT: ArgDefault<token::DenominatedAmount> =
32213220
arg_default(
@@ -3353,6 +3352,8 @@ pub mod args {
33533352
pub const MASP_EPOCH: ArgOpt<MaspEpoch> = arg_opt("masp-epoch");
33543353
pub const MAX_COMMISSION_RATE_CHANGE: Arg<Dec> =
33553354
arg("max-commission-rate-change");
3355+
pub const MAX_CONCURRENT_FETCHES: ArgDefault<usize> =
3356+
arg_default("max-concurrent-fetches", DefaultFn(|| 100));
33563357
pub const MAX_ETH_GAS: ArgOpt<u64> = arg_opt("max_eth-gas");
33573358
pub const MEMO_OPT: ArgOpt<String> = arg_opt("memo");
33583359
pub const MIGRATION_PATH: ArgOpt<PathBuf> = arg_opt("migration-path");
@@ -3406,7 +3407,7 @@ pub mod args {
34063407
pub const REFUND_TARGET: ArgOpt<WalletTransferTarget> =
34073408
arg_opt("refund-target");
34083409
pub const RELAYER: Arg<Address> = arg("relayer");
3409-
pub const SAFE_MODE: ArgFlag = flag("safe-mode");
3410+
pub const RETRIES: ArgOpt<u64> = arg_opt("retries");
34103411
pub const SCHEME: ArgDefault<SchemeType> =
34113412
arg_default("scheme", DefaultFn(|| SchemeType::Ed25519));
34123413
pub const SHELL: Arg<Shell> = arg("shell");
@@ -3466,6 +3467,8 @@ pub mod args {
34663467
pub const VIEWING_KEYS: ArgMulti<WalletViewingKey, GlobStar> =
34673468
arg_multi("viewing-keys");
34683469
pub const VP: ArgOpt<String> = arg_opt("vp");
3470+
pub const WAIT_FOR_LAST_QUERY_HEIGHT: ArgFlag =
3471+
flag("wait-for-last-query-height");
34693472
pub const WALLET_ALIAS_FORCE: ArgFlag = flag("wallet-alias-force");
34703473
pub const WASM_CHECKSUMS_PATH: Arg<PathBuf> = arg("wasm-checksums-path");
34713474
pub const WASM_DIR: ArgOpt<PathBuf> = arg_opt("wasm-dir");
@@ -4099,14 +4102,12 @@ pub mod args {
40994102
gas_price: self.gas_price,
41004103
eth_addr: self.eth_addr,
41014104
sync: self.sync,
4102-
safe_mode: self.safe_mode,
41034105
}
41044106
}
41054107
}
41064108

41074109
impl Args for RelayBridgePoolProof<CliTypes> {
41084110
fn parse(matches: &ArgMatches) -> Self {
4109-
let safe_mode = SAFE_MODE.parse(matches);
41104111
let ledger_address = LEDGER_ADDRESS.parse(matches);
41114112
let hashes = HASH_LIST.parse(matches);
41124113
let relayer = RELAYER.parse(matches);
@@ -4137,16 +4138,11 @@ pub mod args {
41374138
eth_rpc_endpoint,
41384139
eth_addr,
41394140
confirmations,
4140-
safe_mode,
41414141
}
41424142
}
41434143

41444144
fn def(app: App) -> App {
41454145
app.arg(LEDGER_ADDRESS.def().help(LEDGER_ADDRESS_ABOUT))
4146-
.arg(SAFE_MODE.def().help(wrap!(
4147-
"Safe mode overrides keyboard interrupt signals, to \
4148-
ensure Ethereum transfers aren't canceled midway through."
4149-
)))
41504146
.arg(HASH_LIST.def().help(wrap!(
41514147
"Whitespace separated Keccak hash list of transfers in \
41524148
the Bridge pool."
@@ -4282,15 +4278,13 @@ pub mod args {
42824278
sync: self.sync,
42834279
retry_dur: self.retry_dur,
42844280
success_dur: self.success_dur,
4285-
safe_mode: self.safe_mode,
42864281
}
42874282
}
42884283
}
42894284

42904285
impl Args for ValidatorSetUpdateRelay<CliTypes> {
42914286
fn parse(matches: &ArgMatches) -> Self {
42924287
let ledger_address = LEDGER_ADDRESS.parse(matches);
4293-
let safe_mode = SAFE_MODE.parse(matches);
42944288
let daemon = DAEMON_MODE.parse(matches);
42954289
let epoch = EPOCH.parse(matches);
42964290
let gas = ETH_GAS.parse(matches);
@@ -4315,16 +4309,11 @@ pub mod args {
43154309
eth_addr,
43164310
retry_dur,
43174311
success_dur,
4318-
safe_mode,
43194312
}
43204313
}
43214314

43224315
fn def(app: App) -> App {
43234316
app.arg(LEDGER_ADDRESS.def().help(LEDGER_ADDRESS_ABOUT))
4324-
.arg(SAFE_MODE.def().help(wrap!(
4325-
"Safe mode overrides keyboard interrupt signals, to \
4326-
ensure Ethereum transfers aren't canceled midway through."
4327-
)))
43284317
.arg(DAEMON_MODE.def().help(wrap!(
43294318
"Run in daemon mode, which will continuously perform \
43304319
validator set updates."
@@ -6601,18 +6590,26 @@ pub mod args {
66016590
impl Args for ShieldedSync<CliTypes> {
66026591
fn parse(matches: &ArgMatches) -> Self {
66036592
let ledger_address = CONFIG_RPC_LEDGER_ADDRESS.parse(matches);
6604-
let start_query_height = BLOCK_HEIGHT_FROM_OPT.parse(matches);
66056593
let last_query_height = BLOCK_HEIGHT_TO_OPT.parse(matches);
66066594
let spending_keys = DATED_SPENDING_KEYS.parse(matches);
66076595
let viewing_keys = DATED_VIEWING_KEYS.parse(matches);
66086596
let with_indexer = WITH_INDEXER.parse(matches);
6597+
let wait_for_last_query_height =
6598+
WAIT_FOR_LAST_QUERY_HEIGHT.parse(matches);
6599+
let max_concurrent_fetches = MAX_CONCURRENT_FETCHES.parse(matches);
6600+
let retry_strategy = match RETRIES.parse(matches) {
6601+
Some(times) => RetryStrategy::Times(times),
6602+
None => RetryStrategy::Forever,
6603+
};
66096604
Self {
66106605
ledger_address,
6611-
start_query_height,
66126606
last_query_height,
66136607
spending_keys,
66146608
viewing_keys,
66156609
with_indexer,
6610+
wait_for_last_query_height,
6611+
max_concurrent_fetches,
6612+
retry_strategy,
66166613
}
66176614
}
66186615

@@ -6621,11 +6618,6 @@ pub mod args {
66216618
.arg(BLOCK_HEIGHT_TO_OPT.def().help(wrap!(
66226619
"Option block height to sync up to. Default is latest."
66236620
)))
6624-
.arg(
6625-
BLOCK_HEIGHT_FROM_OPT
6626-
.def()
6627-
.help(wrap!("Option block height to sync from.")),
6628-
)
66296621
.arg(DATED_SPENDING_KEYS.def().help(wrap!(
66306622
"List of new spending keys with which to check note \
66316623
ownership. These will be added to the shielded context. \
@@ -6643,6 +6635,18 @@ pub mod args {
66436635
present, the shielded sync will be performed using data \
66446636
retrieved from the given indexer."
66456637
)))
6638+
.arg(WAIT_FOR_LAST_QUERY_HEIGHT.def().help(wrap!(
6639+
"Wait until the last height to sync is available instead \
6640+
of returning early from the shielded sync."
6641+
)))
6642+
.arg(MAX_CONCURRENT_FETCHES.def().help(wrap!(
6643+
"Maximum number of fetch jobs that will ever execute \
6644+
concurrently during the shielded sync."
6645+
)))
6646+
.arg(RETRIES.def().help(wrap!(
6647+
"Maximum number of times to retry fetching. If no \
6648+
argument is provided, defaults to retrying forever."
6649+
)))
66466650
}
66476651
}
66486652

@@ -6656,8 +6660,9 @@ pub mod args {
66566660
let chain_ctx = ctx.borrow_mut_chain_or_exit();
66576661

66586662
Ok(ShieldedSync {
6663+
max_concurrent_fetches: self.max_concurrent_fetches,
6664+
wait_for_last_query_height: self.wait_for_last_query_height,
66596665
ledger_address: chain_ctx.get(&self.ledger_address),
6660-
start_query_height: self.start_query_height,
66616666
last_query_height: self.last_query_height,
66626667
spending_keys: self
66636668
.spending_keys
@@ -6669,7 +6674,8 @@ pub mod args {
66696674
.iter()
66706675
.map(|vk| chain_ctx.get_cached(vk))
66716676
.collect(),
6672-
with_indexer: self.with_indexer.map(|_| ()),
6677+
with_indexer: self.with_indexer,
6678+
retry_strategy: self.retry_strategy,
66736679
})
66746680
}
66756681
}

crates/apps_lib/src/cli/api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::facade::tendermint_rpc::{HttpClient, Url as TendermintUrl};
88

99
/// Trait for clients that can be used with the CLI.
1010
#[async_trait::async_trait(?Send)]
11-
pub trait CliClient: Client + Sync {
11+
pub trait CliClient: Client + Send + Sync + 'static {
1212
fn from_tendermint_address(address: &TendermintUrl) -> Self;
1313
async fn wait_until_node_is_synced(
1414
&self,

0 commit comments

Comments
 (0)