diff --git a/tests/kubernetes/para-single.json b/tests/kubernetes/para-single.json index 6901c0e..8e6cf4f 100644 --- a/tests/kubernetes/para-single.json +++ b/tests/kubernetes/para-single.json @@ -6,6 +6,17 @@ "default_image": "docker.io/parity/polkadot:latest", "chain": "rococo-local", "default_command": "polkadot", + "genesis": { + "runtime": { + "runtime_genesis_config": { + "configuration": { + "config": { + "max_pov_size": 5242880 + } + } + } + } + }, "nodes": [ { "name": "alice", diff --git a/tests/native/single-para-native.json b/tests/native/single-para-native.json index f7cd42b..0cda8d5 100644 --- a/tests/native/single-para-native.json +++ b/tests/native/single-para-native.json @@ -6,6 +6,17 @@ "default_image": "docker.io/parity/polkadot:latest", "chain": "rococo-local", "default_command": "polkadot", + "genesis": { + "runtime": { + "runtime_genesis_config": { + "configuration": { + "config": { + "max_pov_size": 5242880 + } + } + } + } + }, "nodes": [ { "name": "alice", diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 279ae91..6fb3e7d 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -14,10 +14,11 @@ tokio = { version = "1.28.2", features = ["rt-multi-thread", "macros", "time"] } parity-scale-codec = { version = "3.5.0", default-features = false, features = ["derive", "full", "bit-vec"] } subxt = "0.29.0" log = "0.4.18" +jsonrpsee = { version = "0.16", features = ["jsonrpsee-ws-client"] } [features] rococo = [] polkadot-parachain = [] tick = [] versi-tick = [] -versi-relay = [] \ No newline at end of file +versi-relay = [] diff --git a/utils/sender/src/main.rs b/utils/sender/src/main.rs index ca262af..d7a6f7a 100644 --- a/utils/sender/src/main.rs +++ b/utils/sender/src/main.rs @@ -5,30 +5,35 @@ use log::*; use sp_core::{sr25519::Pair as SrPair, Pair}; use subxt::{ config::extrinsic_params::{BaseExtrinsicParamsBuilder as Params, Era}, - tx::PairSigner, + tx::{PairSigner, SubmittableExtrinsic}, PolkadotConfig, + OnlineClient, }; use utils::{connect, runtime, Api, Error, DERIVATION}; mod pre; -use pre::pre_conditions; +use pre::{parallel_pre_conditions, pre_conditions}; /// Util program to send transactions #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { - /// Node URL + /// Node URL. Can be either a collator, or relaychain node based on whether you want to measure parachain TPS, or relaychain TPS. #[arg(long)] node_url: String, - /// Sender index. + /// Set to the number of desired threads (default: 1). If set > 1 the program will spawn multiple threads to send transactions in parallel. + #[arg(long, default_value_t = 1)] + threads: usize, + + /// The sender index. Useful if you set threads to =< 1 and run multiple sender instances (as in the zombienet tests). #[arg(long)] - sender_index: usize, + sender_index: Option, /// Total number of senders #[arg(long)] - total_senders: usize, + total_senders: Option, /// Chunk size for sending the extrinsics. #[arg(long, default_value_t = 50)] @@ -46,7 +51,6 @@ async fn send_funds( n_tx_sender: usize, ) -> Result<(), Error> { let receivers = generate_receivers(n_tx_sender, sender_index); // one receiver per tx - let ext_deposit_addr = runtime::constants().balances().existential_deposit(); let ext_deposit = api.constants().at(&ext_deposit_addr)?; @@ -106,13 +110,14 @@ async fn send_funds( Ok(()) } +/// Generates a signer from a given index. pub fn generate_signer(i: usize) -> PairSigner { let pair: SrPair = Pair::from_string(format!("{}{}", DERIVATION, i).as_str(), None).unwrap(); let signer: PairSigner = PairSigner::new(pair); signer } -/// Generates a vector of account IDs. +/// Generates a vector of account IDs from a given index. fn generate_receivers(n: usize, sender_index: usize) -> Vec { let shift = sender_index * n; let mut receivers = Vec::new(); @@ -126,6 +131,83 @@ fn generate_receivers(n: usize, sender_index: usize) -> Vec>>> +) -> Result<(), Error> { + let ext_deposit_addr = runtime::constants().balances().existential_deposit(); + let genesis_hash = api.genesis_hash(); + let ext_deposit = api.constants().at(&ext_deposit_addr)?; + + for i in 0..*threads { + let api = api.clone(); + let producer = producer.clone(); + tokio::task::spawn_blocking(move || { + debug!("Thread {}: preparing {} transactions", i, n_tx_sender); + let ext_deposit = ext_deposit.clone(); + let genesis_hash = genesis_hash.clone(); + let receivers = generate_receivers(n_tx_sender, i); + let mut txs = Vec::new(); + for j in 0..n_tx_sender { + debug!("Thread {}: preparing transaction {}", i, j); + let shift = i * n_tx_sender; + let signer = generate_signer(shift + j); + debug!("Thread {}: generated signer {}{}", i, DERIVATION, shift + j); + let tx_params = Params::new().era(Era::Immortal, genesis_hash); + let tx_payload = runtime::tx() + .balances() + .transfer_keep_alive(receivers[j as usize].clone().into(), ext_deposit); + let signed_tx = + match api.tx().create_signed_with_nonce(&tx_payload, &signer, 0, tx_params) { + Ok(signed) => signed, + Err(e) => panic!("Thread {}: failed to sign transaction due to: {}", i, e), + }; + txs.push(signed_tx); + } + match producer.send(txs) { + Ok(_) => (), + Err(e) => error!("Thread {}: failed to send transactions to consumer: {}", i, e), + } + info!("Thread {}: prepared and signed {} transactions", i, n_tx_sender); + }); + } + Ok(()) +} + +/// Here the signed extrinsics are submitted. +async fn submit_txs( + consumer: &mut tokio::sync::mpsc::UnboundedReceiver>>>, + chunk_size: usize, + threads: usize, +) -> Result<(), Error> { + let mut submittable_vecs = Vec::new(); + while let Some(signed_txs) = consumer.recv().await { + debug!("Consumer: received {} submittable transactions", signed_txs.len()); + submittable_vecs.push(signed_txs); + if threads == submittable_vecs.len() { + debug!("Consumer: received all submittable transactions, now starting submission"); + for vec in &submittable_vecs { + for chunk in vec.chunks(chunk_size) { + let mut hashes = Vec::new(); + for signed_tx in chunk { + let hash = signed_tx.submit(); + hashes.push(hash); + } + try_join_all(hashes).await?; + debug!("Sender submitted chunk with size: {}", chunk_size); + } + } + info!("Sender submitted all transactions"); + } + } + Ok(()) +} + #[tokio::main] async fn main() -> Result<(), Error> { env_logger::init_from_env( @@ -133,13 +215,55 @@ async fn main() -> Result<(), Error> { ); let args = Args::parse(); - let api = connect(&args.node_url).await?; - let n_tx_sender = args.num / args.total_senders; + let node_url = args.node_url; + let threads = args.threads; + let chunk_size = args.chunk_size; - pre_conditions(&api, args.sender_index, n_tx_sender).await?; + // This index is optional and only set when single-threaded mode is used. + // If it is not set, we default to 0. + let sender_index = match args.sender_index { + Some(i) => i, + None => 0, + }; - send_funds(&api, args.sender_index, args.chunk_size, n_tx_sender).await?; + // In case the optional total_senders argument is not passed for single-threaded mode, + // we must make sure that we split the work evenly between threads for multi-threaded mode. + let n_tx_sender = match args.total_senders { + Some(tot_s) => args.num / tot_s, + None => args.num / threads, + }; + // Create the client here, so that we can use it in the various functions. + let api = connect(&node_url).await?; + + match args.threads { + n if n > 1 => { + info!("Starting sender in parallel mode"); + let (producer, mut consumer) = tokio::sync::mpsc::unbounded_channel(); + // I/O Bound + parallel_pre_conditions(&api, &threads, &n_tx_sender).await?; + // CPU Bound + match parallel_signing(&api, &threads, n_tx_sender, producer) { + Ok(_) => (), + Err(e) => panic!("Error: {:?}", e), + } + // I/O Bound + submit_txs(&mut consumer, chunk_size, threads).await?; + }, + // Single-threaded mode + n if n == 1 => { + debug!("Starting sender in single-threaded mode"); + match args.sender_index { + Some(i) => { + pre_conditions(&api, &i, &n_tx_sender).await?; + send_funds(&api, sender_index, chunk_size, n_tx_sender).await?; + }, + None => panic!("Must set sender index when running in single-threaded mode"), + } + }, + // All other non-sensical cases + _ => panic!("Number of threads must be 1, or greater!"), + } Ok(()) } diff --git a/utils/sender/src/pre/mod.rs b/utils/sender/src/pre/mod.rs index 965c061..d846628 100644 --- a/utils/sender/src/pre/mod.rs +++ b/utils/sender/src/pre/mod.rs @@ -5,7 +5,7 @@ use subxt::{tx::PairSigner, utils::AccountId32, PolkadotConfig}; use utils::{runtime, Api, Error, DERIVATION}; /// Check pre-conditions of accounts attributed to this sender -pub async fn pre_conditions(api: &Api, i: usize, n: usize) -> Result<(), Error> { +pub async fn pre_conditions(api: &Api, i: &usize, n: &usize) -> Result<(), Error> { info!( "Sender {}: checking pre-conditions of accounts {}{} through {}{}", i, @@ -14,16 +14,44 @@ pub async fn pre_conditions(api: &Api, i: usize, n: usize) -> Result<(), Error> DERIVATION, (i + 1) * n - 1 ); - for j in i * n..(i + 1) * n { let pair: SrPair = Pair::from_string(format!("{}{}", DERIVATION, j).as_str(), None).unwrap(); let signer: PairSigner = PairSigner::new(pair); let account = signer.account_id(); - info!("Checking account: {}", account); + debug!("Sender {}: checking account {}", i, account); check_account(&api, account).await?; } + debug!("Sender {}: all pre-conditions checked and succeeded!", i); + Ok(()) +} +/// Use JoinSet to run prechecks in a multi-threaded way. +/// The pre_condition call is async because it fetches the chain state and hence is I/O bound. +pub async fn parallel_pre_conditions( + api: &Api, + threads: &usize, + n_tx_sender: &usize, +) -> Result<(), Error> { + let mut precheck_set = tokio::task::JoinSet::new(); + for i in 0..*threads { + let api = api.clone(); + let n_tx_sender = n_tx_sender.clone(); + precheck_set.spawn(async move { + match pre_conditions(&api, &i, &n_tx_sender).await { + Ok(_) => Ok(()), + Err(e) => Err(e), + } + }); + } + while let Some(result) = precheck_set.join_next().await { + match result { + Ok(_) => (), + Err(e) => { + error!("Error: {:?}", e); + }, + } + } Ok(()) } diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 4404c65..907cb13 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -1,6 +1,8 @@ -use log::{error, info, warn}; +use log::{error, debug, warn}; use std::time::Duration; use subxt::{OnlineClient, PolkadotConfig}; +use jsonrpsee::ws_client::WsClientBuilder; +use std::sync::Arc; #[cfg(feature = "polkadot-parachain")] #[subxt::subxt(runtime_metadata_path = "metadata/polkadot-parachain.scale")] @@ -37,12 +39,16 @@ pub const DERIVATION: &str = "//Sender/"; /// Tries [`MAX_ATTEMPTS`] times to connect to the given node. pub async fn connect(url: &str) -> Result { for i in 1..=MAX_ATTEMPTS { - info!("Attempt #{}: Connecting to {}", i, url); - let promise = OnlineClient::::from_url(url); - - match promise.await { + debug!("Attempt #{}: Connecting to {}", i, url); + let rpc = WsClientBuilder::default() + .max_request_body_size(u32::MAX) + .max_concurrent_requests(u32::MAX as usize) + .build(url) + .await?; + let api = Api::from_rpc_client(Arc::new(rpc)); + match api.await { Ok(client) => { - info!("Connection established to: {}", url); + debug!("Connection established to: {}", url); return Ok(client); }, Err(err) => {