From 8fd5e1f9ff5781c3b88258a7c3668e44baa2ee3b Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Fri, 24 Oct 2025 07:48:02 +0200 Subject: [PATCH 1/3] Move validation to `main.rs` --- src/candid_rpc/mod.rs | 41 ++++++++++++++++++++++------------------- src/main.rs | 5 ++++- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/src/candid_rpc/mod.rs b/src/candid_rpc/mod.rs index cf7b7898..7b1edcbe 100644 --- a/src/candid_rpc/mod.rs +++ b/src/candid_rpc/mod.rs @@ -13,7 +13,9 @@ use candid::Nat; use canhttp::http::json::JsonRpcRequest; use canhttp::multi::{ReductionError, Timestamp}; use ethers_core::{types::Transaction, utils::rlp}; -use evm_rpc_types::{Hex, Hex32, MultiRpcResult, Nat256, RpcError, RpcResult, ValidationError}; +use evm_rpc_types::{ + BlockTag, GetLogsArgs, Hex, Hex32, MultiRpcResult, Nat256, RpcError, RpcResult, ValidationError, +}; fn process_result( method: impl Into + Clone, @@ -71,23 +73,6 @@ impl CandidRpcClient { max_block_range: u32, ) -> MultiRpcResult> { use crate::candid_rpc::cketh_conversion::{from_log_entries, into_get_logs_param}; - - if let ( - Some(evm_rpc_types::BlockTag::Number(from)), - Some(evm_rpc_types::BlockTag::Number(to)), - ) = (&args.from_block, &args.to_block) - { - let from = Nat::from(from.clone()); - let to = Nat::from(to.clone()); - let block_count = if to > from { to - from } else { from - to }; - if block_count > max_block_range { - return MultiRpcResult::Consistent(Err(ValidationError::Custom(format!( - "Requested {} blocks; limited to {} when specifying a start and end block", - block_count, max_block_range - )) - .into())); - } - } process_result( RpcMethod::EthGetLogs, self.client.eth_get_logs(into_get_logs_param(args)).await, @@ -97,7 +82,7 @@ impl CandidRpcClient { pub async fn eth_get_block_by_number( &self, - block: evm_rpc_types::BlockTag, + block: BlockTag, ) -> MultiRpcResult { use crate::candid_rpc::cketh_conversion::{from_block, into_block_spec}; process_result( @@ -208,3 +193,21 @@ fn get_transaction_hash(raw_signed_transaction_hex: &Hex) -> Option { let transaction: Transaction = rlp::decode(raw_signed_transaction_hex.as_ref()).ok()?; Some(Hex32::from(transaction.hash.0)) } + +pub fn validate_get_logs_block_range(args: &GetLogsArgs, max_block_range: u32) -> RpcResult<()> { + if let (Some(BlockTag::Number(from)), Some(BlockTag::Number(to))) = + (&args.from_block, &args.to_block) + { + let from = Nat::from(from.clone()); + let to = Nat::from(to.clone()); + let block_count = if to > from { to - from } else { from - to }; + if block_count > max_block_range { + return Err(ValidationError::Custom(format!( + "Requested {} blocks; limited to {} when specifying a start and end block", + block_count, max_block_range + )) + .into()); + } + } + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 7d01a920..be9d8988 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use canhttp::{cycles::CyclesChargingPolicy, multi::Timestamp}; use canlog::{log, Log, Sort}; use evm_rpc::{ - candid_rpc::CandidRpcClient, + candid_rpc::{validate_get_logs_block_range, CandidRpcClient}, http::{ charging_policy_with_collateral, json_rpc_request, json_rpc_request_arg, service_request_builder, transform_http_request, @@ -43,6 +43,9 @@ pub async fn eth_get_logs( ) -> MultiRpcResult> { let config = config.unwrap_or_default(); let max_block_range = config.max_block_range_or_default(); + if let Err(err) = validate_get_logs_block_range(&args, max_block_range) { + return MultiRpcResult::Consistent(Err(err)); + } match CandidRpcClient::new(source, Some(RpcConfig::from(config)), now()) { Ok(source) => source.eth_get_logs(args, max_block_range).await, Err(err) => Err(err).into(), From 24b69e2b3776b1c9eed9a14637ecd33b0dd41223 Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Fri, 24 Oct 2025 10:57:41 +0200 Subject: [PATCH 2/3] Introduce `MultiRpcRequest` --- src/candid_rpc/mod.rs | 149 ++++++------------ src/candid_rpc/tests.rs | 70 --------- src/main.rs | 2 +- src/rpc_client/mod.rs | 334 ++++++++++++++++++++++++++-------------- src/rpc_client/tests.rs | 70 +++++++++ 5 files changed, 333 insertions(+), 292 deletions(-) delete mode 100644 src/candid_rpc/tests.rs diff --git a/src/candid_rpc/mod.rs b/src/candid_rpc/mod.rs index 7b1edcbe..c6cb891a 100644 --- a/src/candid_rpc/mod.rs +++ b/src/candid_rpc/mod.rs @@ -1,56 +1,15 @@ mod cketh_conversion; -#[cfg(test)] -mod tests; -use crate::rpc_client::{EthRpcClient, ReducedResult}; -use crate::types::MetricRpcMethod; -use crate::{ - add_metric_entry, - providers::resolve_rpc_service, - types::{MetricRpcHost, ResolvedRpcService, RpcMethod}, -}; +use crate::rpc_client::EthRpcClient; +use crate::types::RpcMethod; use candid::Nat; use canhttp::http::json::JsonRpcRequest; -use canhttp::multi::{ReductionError, Timestamp}; +use canhttp::multi::Timestamp; use ethers_core::{types::Transaction, utils::rlp}; use evm_rpc_types::{ BlockTag, GetLogsArgs, Hex, Hex32, MultiRpcResult, Nat256, RpcError, RpcResult, ValidationError, }; -fn process_result( - method: impl Into + Clone, - result: ReducedResult, -) -> MultiRpcResult { - match result { - Ok(value) => MultiRpcResult::Consistent(Ok(value)), - Err(err) => match err { - ReductionError::ConsistentError(err) => MultiRpcResult::Consistent(Err(err)), - ReductionError::InconsistentResults(multi_call_results) => { - let results: Vec<_> = multi_call_results.into_iter().collect(); - results.iter().for_each(|(service, _service_result)| { - if let Ok(ResolvedRpcService::Provider(provider)) = - resolve_rpc_service(service.clone()) - { - add_metric_entry!( - inconsistent_responses, - ( - method.clone().into(), - MetricRpcHost( - provider - .hostname() - .unwrap_or_else(|| "(unknown)".to_string()) - ) - ), - 1 - ) - } - }); - MultiRpcResult::Inconsistent(results) - } - }, - } -} - /// Adapt the `EthRpcClient` to the `Candid` interface used by the EVM-RPC canister. pub struct CandidRpcClient { client: EthRpcClient, @@ -70,14 +29,13 @@ impl CandidRpcClient { pub async fn eth_get_logs( &self, args: evm_rpc_types::GetLogsArgs, - max_block_range: u32, ) -> MultiRpcResult> { use crate::candid_rpc::cketh_conversion::{from_log_entries, into_get_logs_param}; - process_result( - RpcMethod::EthGetLogs, - self.client.eth_get_logs(into_get_logs_param(args)).await, - ) - .map(from_log_entries) + self.client + .eth_get_logs(into_get_logs_param(args)) + .send_and_reduce() + .await + .map(from_log_entries) } pub async fn eth_get_block_by_number( @@ -85,13 +43,11 @@ impl CandidRpcClient { block: BlockTag, ) -> MultiRpcResult { use crate::candid_rpc::cketh_conversion::{from_block, into_block_spec}; - process_result( - RpcMethod::EthGetBlockByNumber, - self.client - .eth_get_block_by_number(into_block_spec(block)) - .await, - ) - .map(from_block) + self.client + .eth_get_block_by_number(into_block_spec(block)) + .send_and_reduce() + .await + .map(from_block) } pub async fn eth_get_transaction_receipt( @@ -99,13 +55,11 @@ impl CandidRpcClient { hash: Hex32, ) -> MultiRpcResult> { use crate::candid_rpc::cketh_conversion::{from_transaction_receipt, into_hash}; - process_result( - RpcMethod::EthGetTransactionReceipt, - self.client - .eth_get_transaction_receipt(into_hash(hash)) - .await, - ) - .map(|option| option.map(from_transaction_receipt)) + self.client + .eth_get_transaction_receipt(into_hash(hash)) + .send_and_reduce() + .await + .map(|option| option.map(from_transaction_receipt)) } pub async fn eth_get_transaction_count( @@ -113,13 +67,11 @@ impl CandidRpcClient { args: evm_rpc_types::GetTransactionCountArgs, ) -> MultiRpcResult { use crate::candid_rpc::cketh_conversion::into_get_transaction_count_params; - process_result( - RpcMethod::EthGetTransactionCount, - self.client - .eth_get_transaction_count(into_get_transaction_count_params(args)) - .await, - ) - .map(Nat256::from) + self.client + .eth_get_transaction_count(into_get_transaction_count_params(args)) + .send_and_reduce() + .await + .map(Nat256::from) } pub async fn eth_fee_history( @@ -127,13 +79,11 @@ impl CandidRpcClient { args: evm_rpc_types::FeeHistoryArgs, ) -> MultiRpcResult { use crate::candid_rpc::cketh_conversion::{from_fee_history, into_fee_history_params}; - process_result( - RpcMethod::EthFeeHistory, - self.client - .eth_fee_history(into_fee_history_params(args)) - .await, - ) - .map(from_fee_history) + self.client + .eth_fee_history(into_fee_history_params(args)) + .send_and_reduce() + .await + .map(from_fee_history) } pub async fn eth_send_raw_transaction( @@ -142,13 +92,11 @@ impl CandidRpcClient { ) -> MultiRpcResult { use crate::candid_rpc::cketh_conversion::from_send_raw_transaction_result; let transaction_hash = get_transaction_hash(&raw_signed_transaction_hex); - process_result( - RpcMethod::EthSendRawTransaction, - self.client - .eth_send_raw_transaction(raw_signed_transaction_hex.to_string()) - .await, - ) - .map(|result| from_send_raw_transaction_result(transaction_hash.clone(), result)) + self.client + .eth_send_raw_transaction(raw_signed_transaction_hex.to_string()) + .send_and_reduce() + .await + .map(|result| from_send_raw_transaction_result(transaction_hash.clone(), result)) } pub async fn eth_call( @@ -156,11 +104,11 @@ impl CandidRpcClient { args: evm_rpc_types::CallArgs, ) -> MultiRpcResult { use crate::candid_rpc::cketh_conversion::{from_data, into_eth_call_params}; - process_result( - RpcMethod::EthCall, - self.client.eth_call(into_eth_call_params(args)).await, - ) - .map(from_data) + self.client + .eth_call(into_eth_call_params(args)) + .send_and_reduce() + .await + .map(from_data) } pub async fn multi_request(&self, json_rpc_payload: String) -> MultiRpcResult { @@ -173,19 +121,14 @@ impl CandidRpcClient { ))) } }; - process_result( - MetricRpcMethod { - method: request.method().to_string(), - is_manual_request: true, - }, - self.client - .multi_request( - RpcMethod::Custom(request.method().to_string()), - request.params(), - ) - .await, - ) - .map(String::from) + self.client + .multi_request( + RpcMethod::Custom(request.method().to_string()), + request.params(), + ) + .send_and_reduce() + .await + .map(String::from) } } diff --git a/src/candid_rpc/tests.rs b/src/candid_rpc/tests.rs deleted file mode 100644 index 7bbc59dc..00000000 --- a/src/candid_rpc/tests.rs +++ /dev/null @@ -1,70 +0,0 @@ -use crate::candid_rpc::process_result; -use crate::types::RpcMethod; -use canhttp::multi::MultiResults; -use evm_rpc_types::MultiRpcResult; -use evm_rpc_types::{ProviderError, RpcError}; - -#[test] -fn test_process_result_mapping() { - use evm_rpc_types::{EthMainnetService, RpcService}; - type ReductionError = canhttp::multi::ReductionError; - - assert_eq!( - process_result(RpcMethod::EthGetTransactionCount, Ok(5)), - MultiRpcResult::Consistent(Ok(5)) - ); - assert_eq!( - process_result( - RpcMethod::EthGetTransactionCount, - Err(ReductionError::ConsistentError(RpcError::ProviderError( - ProviderError::MissingRequiredProvider - ))) - ), - MultiRpcResult::Consistent(Err(RpcError::ProviderError( - ProviderError::MissingRequiredProvider - ))) - ); - assert_eq!( - process_result( - RpcMethod::EthGetTransactionCount, - Err(ReductionError::InconsistentResults(MultiResults::default())) - ), - MultiRpcResult::Inconsistent(vec![]) - ); - assert_eq!( - process_result( - RpcMethod::EthGetTransactionCount, - Err(ReductionError::InconsistentResults( - MultiResults::from_non_empty_iter(vec![( - RpcService::EthMainnet(EthMainnetService::Ankr), - Ok(5) - )]) - )) - ), - MultiRpcResult::Inconsistent(vec![( - RpcService::EthMainnet(EthMainnetService::Ankr), - Ok(5) - )]) - ); - assert_eq!( - process_result( - RpcMethod::EthGetTransactionCount, - Err(ReductionError::InconsistentResults( - MultiResults::from_non_empty_iter(vec![ - (RpcService::EthMainnet(EthMainnetService::Ankr), Ok(5)), - ( - RpcService::EthMainnet(EthMainnetService::Cloudflare), - Err(RpcError::ProviderError(ProviderError::NoPermission)) - ) - ]) - )) - ), - MultiRpcResult::Inconsistent(vec![ - (RpcService::EthMainnet(EthMainnetService::Ankr), Ok(5)), - ( - RpcService::EthMainnet(EthMainnetService::Cloudflare), - Err(RpcError::ProviderError(ProviderError::NoPermission)) - ) - ]) - ); -} diff --git a/src/main.rs b/src/main.rs index be9d8988..14b25536 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,7 +47,7 @@ pub async fn eth_get_logs( return MultiRpcResult::Consistent(Err(err)); } match CandidRpcClient::new(source, Some(RpcConfig::from(config)), now()) { - Ok(source) => source.eth_get_logs(args, max_block_range).await, + Ok(source) => source.eth_get_logs(args).await, Err(err) => Err(err).into(), } } diff --git a/src/rpc_client/mod.rs b/src/rpc_client/mod.rs index dcce4b31..40c60cb2 100644 --- a/src/rpc_client/mod.rs +++ b/src/rpc_client/mod.rs @@ -1,4 +1,6 @@ +use crate::types::{MetricRpcHost, ResolvedRpcService}; use crate::{ + add_metric_entry, http::http_client, memory::{get_override_provider, rank_providers, record_ok_result}, providers::{resolve_rpc_service, SupportedRpcService}, @@ -11,12 +13,17 @@ use crate::{ }; use canhttp::{ http::json::JsonRpcRequest, - multi::{MultiResults, Reduce, ReduceWithEquality, ReduceWithThreshold, Timestamp}, + multi::{ + MultiResults, Reduce, ReduceWithEquality, ReduceWithThreshold, ReducedResult, + ReductionError, Timestamp, + }, MaxResponseBytesRequestExtension, TransformContextRequestExtension, }; use evm_rpc_types::{ - ConsensusStrategy, JsonRpcError, ProviderError, RpcConfig, RpcError, RpcService, RpcServices, + ConsensusStrategy, JsonRpcError, MultiRpcResult, ProviderError, RpcConfig, RpcError, + RpcService, RpcServices, }; +use http::Request; use ic_management_canister_types::{TransformContext, TransformFunc}; use json::{ requests::{ @@ -263,182 +270,239 @@ impl EthRpcClient { ) } - /// Query all providers in parallel and return all results. - /// It's up to the caller to decide how to handle the results, which could be inconsistent - /// (e.g., if different providers gave different responses). - /// This method is useful for querying data that is critical for the system to ensure that there is no single point of failure, - /// e.g., ethereum logs upon which ckETH will be minted. - async fn parallel_call( + pub fn eth_get_logs( &self, - method: RpcMethod, - params: I, - response_size_estimate: ResponseSizeEstimate, - ) -> MultiCallResults - where - I: Serialize + Clone + Debug, - O: Debug + DeserializeOwned + HttpResponsePayload, - { - let providers = self.providers(); - let transform_op = O::response_transform() - .as_ref() - .map(|t| { - let mut buf = vec![]; - minicbor::encode(t, &mut buf).unwrap(); - buf - }) - .unwrap_or_default(); - let effective_size_estimate = response_size_estimate.get(); - let mut requests = MultiResults::default(); - for provider in providers { - let request = resolve_rpc_service(provider.clone()) - .map_err(RpcError::from) - .and_then(|rpc_service| rpc_service.post(&get_override_provider())) - .map(|builder| { - builder - .max_response_bytes(effective_size_estimate) - .transform_context(TransformContext { - function: TransformFunc(candid::Func { - method: "cleanup_response".to_string(), - principal: ic_cdk::api::canister_self(), - }), - context: transform_op.clone(), - }) - .body(JsonRpcRequest::new(method.clone().name(), params.clone())) - .expect("BUG: invalid request") - }); - requests.insert_once(provider.clone(), request); - } - - let client = http_client(MetricRpcMethod::from(method), true).map_result(|r| { - match r?.into_body().into_result() { - Ok(value) => Ok(value), - Err(json_rpc_error) => Err(RpcError::JsonRpcError(JsonRpcError { - code: json_rpc_error.code, - message: json_rpc_error.message, - })), - } - }); - - let (requests, errors) = requests.into_inner(); - let (_client, mut results) = canhttp::multi::parallel_call(client, requests).await; - results.add_errors(errors); - let now = Timestamp::from_nanos_since_unix_epoch(ic_cdk::api::time()); - results - .ok_results() - .keys() - .filter_map(SupportedRpcService::new) - .for_each(|service| record_ok_result(service, now)); - assert_eq!( - results.len(), - providers.len(), - "BUG: expected 1 result per provider" - ); - results - } - - pub async fn eth_get_logs(&self, params: GetLogsParam) -> ReducedResult> { - self.parallel_call( + params: GetLogsParam, + ) -> MultiRpcRequest<(GetLogsParam,), Vec> { + MultiRpcRequest::new( + self.providers(), RpcMethod::EthGetLogs, - vec![params], + (params,), self.response_size_estimate(1024 + HEADER_SIZE_LIMIT), + self.consensus_strategy(), ) - .await - .reduce(self.consensus_strategy()) } - pub async fn eth_get_block_by_number(&self, block: BlockSpec) -> ReducedResult { + pub fn eth_get_block_by_number( + &self, + block: BlockSpec, + ) -> MultiRpcRequest { let expected_block_size = match self.chain() { EthereumNetwork::SEPOLIA => 12 * 1024, EthereumNetwork::MAINNET => 24 * 1024, _ => 24 * 1024, // Default for unknown networks }; - - self.parallel_call( + MultiRpcRequest::new( + self.providers(), RpcMethod::EthGetBlockByNumber, GetBlockByNumberParams { block, include_full_transactions: false, }, self.response_size_estimate(expected_block_size + HEADER_SIZE_LIMIT), + self.consensus_strategy(), ) - .await - .reduce(self.consensus_strategy()) } - pub async fn eth_get_transaction_receipt( + pub fn eth_get_transaction_receipt( &self, tx_hash: Hash, - ) -> ReducedResult> { - self.parallel_call( + ) -> MultiRpcRequest<(Hash,), Option> { + MultiRpcRequest::new( + self.providers(), RpcMethod::EthGetTransactionReceipt, - vec![tx_hash], + (tx_hash,), self.response_size_estimate(700 + HEADER_SIZE_LIMIT), + self.consensus_strategy(), ) - .await - .reduce(self.consensus_strategy()) } - pub async fn eth_fee_history(&self, params: FeeHistoryParams) -> ReducedResult { + pub fn eth_fee_history( + &self, + params: FeeHistoryParams, + ) -> MultiRpcRequest { // A typical response is slightly above 300 bytes. - self.parallel_call( + MultiRpcRequest::new( + self.providers(), RpcMethod::EthFeeHistory, params, self.response_size_estimate(512 + HEADER_SIZE_LIMIT), + self.consensus_strategy(), ) - .await - .reduce(self.consensus_strategy()) } - pub async fn eth_send_raw_transaction( + pub fn eth_send_raw_transaction( &self, raw_signed_transaction_hex: String, - ) -> ReducedResult { + ) -> MultiRpcRequest<(String,), SendRawTransactionResult> { // A successful reply is under 256 bytes, but we expect most calls to end with an error // since we submit the same transaction from multiple nodes. - self.parallel_call( + MultiRpcRequest::new( + self.providers(), RpcMethod::EthSendRawTransaction, - vec![raw_signed_transaction_hex], + (raw_signed_transaction_hex,), self.response_size_estimate(256 + HEADER_SIZE_LIMIT), + self.consensus_strategy(), ) - .await - .reduce(self.consensus_strategy()) } - pub async fn eth_get_transaction_count( + pub fn eth_get_transaction_count( &self, params: GetTransactionCountParams, - ) -> ReducedResult { - self.parallel_call( + ) -> MultiRpcRequest { + MultiRpcRequest::new( + self.providers(), RpcMethod::EthGetTransactionCount, params, self.response_size_estimate(50 + HEADER_SIZE_LIMIT), + self.consensus_strategy(), ) - .await - .reduce(self.consensus_strategy()) } - pub async fn eth_call(&self, params: EthCallParams) -> ReducedResult { - self.parallel_call( + pub fn eth_call(&self, params: EthCallParams) -> MultiRpcRequest { + MultiRpcRequest::new( + self.providers(), RpcMethod::EthCall, params, self.response_size_estimate(256 + HEADER_SIZE_LIMIT), + self.consensus_strategy(), ) - .await - .reduce(self.consensus_strategy()) } - pub async fn multi_request( + pub fn multi_request<'a>( &self, method: RpcMethod, - params: Option<&Value>, - ) -> ReducedResult { - self.parallel_call( + params: Option<&'a Value>, + ) -> MultiRpcRequest, RawJson> { + MultiRpcRequest::new( + self.providers(), method, params, self.response_size_estimate(256 + HEADER_SIZE_LIMIT), + self.consensus_strategy(), ) - .await - .reduce(self.consensus_strategy()) + } +} + +pub struct MultiRpcRequest<'a, Params, Output> { + providers: &'a BTreeSet, + method: RpcMethod, + params: Params, + response_size_estimate: ResponseSizeEstimate, + reduction_strategy: ReductionStrategy, + _marker: std::marker::PhantomData, +} + +impl<'a, Params, Output> MultiRpcRequest<'a, Params, Output> { + pub fn new( + providers: &'a BTreeSet, + method: RpcMethod, + params: Params, + response_size_estimate: ResponseSizeEstimate, + reduction_strategy: ReductionStrategy, + ) -> MultiRpcRequest<'a, Params, Output> { + MultiRpcRequest { + providers, + method, + params, + response_size_estimate, + reduction_strategy, + _marker: Default::default(), + } + } +} + +impl MultiRpcRequest<'_, Params, Output> { + pub async fn send_and_reduce(self) -> MultiRpcResult + where + Params: Serialize + Clone + Debug, + Output: Debug + Serialize + DeserializeOwned + HttpResponsePayload + PartialEq, + { + let result = self.parallel_call().await.reduce(self.reduction_strategy); + process_result(self.method, result) + } + + /// Query all providers in parallel and return all results. + /// It's up to the caller to decide how to handle the results, which could be inconsistent + /// (e.g., if different providers gave different responses). + /// This method is useful for querying data that is critical for the system to ensure that there is no single point of failure, + /// e.g., ethereum logs upon which ckETH will be minted. + async fn parallel_call(&self) -> MultiResults + where + Params: Serialize + Clone + Debug, + Output: Debug + DeserializeOwned + HttpResponsePayload, + { + let requests = self.create_json_rpc_requests(); + + let client = + http_client(MetricRpcMethod::from(self.method.clone()), true).map_result(|r| match r? + .into_body() + .into_result() + { + Ok(value) => Ok(value), + Err(json_rpc_error) => Err(RpcError::JsonRpcError(JsonRpcError { + code: json_rpc_error.code, + message: json_rpc_error.message, + })), + }); + + let (requests, errors) = requests.into_inner(); + let (_client, mut results) = canhttp::multi::parallel_call(client, requests).await; + results.add_errors(errors); + let now = Timestamp::from_nanos_since_unix_epoch(ic_cdk::api::time()); + results + .ok_results() + .keys() + .filter_map(SupportedRpcService::new) + .for_each(|service| record_ok_result(service, now)); + assert_eq!( + results.len(), + self.providers.len(), + "BUG: expected 1 result per provider" + ); + results + } + + fn create_json_rpc_requests( + &self, + ) -> MultiResults>, RpcError> + where + Params: Clone, + Output: HttpResponsePayload, + { + let transform_op = Output::response_transform() + .as_ref() + .map(|t| { + let mut buf = vec![]; + minicbor::encode(t, &mut buf).unwrap(); + buf + }) + .unwrap_or_default(); + let effective_size_estimate = self.response_size_estimate.get(); + let mut requests = MultiResults::default(); + for provider in self.providers { + let request = resolve_rpc_service(provider.clone()) + .map_err(RpcError::from) + .and_then(|rpc_service| rpc_service.post(&get_override_provider())) + .map(|builder| { + builder + .max_response_bytes(effective_size_estimate) + .transform_context(TransformContext { + function: TransformFunc(candid::Func { + method: "cleanup_response".to_string(), + principal: ic_cdk::api::canister_self(), + }), + context: transform_op.clone(), + }) + .body(JsonRpcRequest::new( + self.method.clone().name(), + self.params.clone(), + )) + .expect("BUG: invalid request") + }); + requests.insert_once(provider.clone(), request); + } + requests } } @@ -459,7 +523,10 @@ impl From for ReductionStrategy { } impl Reduce for ReductionStrategy { - fn reduce(&self, results: MultiResults) -> ReducedResult { + fn reduce( + &self, + results: MultiResults, + ) -> ReducedResult { match self { ReductionStrategy::ByEquality(r) => r.reduce(results), ReductionStrategy::ByThreshold(r) => r.reduce(results), @@ -467,5 +534,36 @@ impl Reduce for ReductionStra } } -pub type MultiCallResults = MultiResults; -pub type ReducedResult = canhttp::multi::ReducedResult; +fn process_result( + method: impl Into + Clone, + result: ReducedResult, +) -> MultiRpcResult { + match result { + Ok(value) => MultiRpcResult::Consistent(Ok(value)), + Err(err) => match err { + ReductionError::ConsistentError(err) => MultiRpcResult::Consistent(Err(err)), + ReductionError::InconsistentResults(multi_call_results) => { + let results: Vec<_> = multi_call_results.into_iter().collect(); + results.iter().for_each(|(service, _service_result)| { + if let Ok(ResolvedRpcService::Provider(provider)) = + resolve_rpc_service(service.clone()) + { + add_metric_entry!( + inconsistent_responses, + ( + method.clone().into(), + MetricRpcHost( + provider + .hostname() + .unwrap_or_else(|| "(unknown)".to_string()) + ) + ), + 1 + ) + } + }); + MultiRpcResult::Inconsistent(results) + } + }, + } +} diff --git a/src/rpc_client/tests.rs b/src/rpc_client/tests.rs index e2e0a2de..bad2843a 100644 --- a/src/rpc_client/tests.rs +++ b/src/rpc_client/tests.rs @@ -1,3 +1,8 @@ +use crate::rpc_client::process_result; +use crate::types::RpcMethod; +use canhttp::multi::MultiResults; +use evm_rpc_types::{MultiRpcResult, ProviderError, RpcError}; + mod eth_rpc_client { use crate::rpc_client::EthRpcClient; use canhttp::multi::Timestamp; @@ -365,3 +370,68 @@ mod providers { } } } + +#[test] +fn test_process_result_mapping() { + use evm_rpc_types::{EthMainnetService, RpcService}; + type ReductionError = canhttp::multi::ReductionError; + + assert_eq!( + process_result(RpcMethod::EthGetTransactionCount, Ok(5)), + MultiRpcResult::Consistent(Ok(5)) + ); + assert_eq!( + process_result( + RpcMethod::EthGetTransactionCount, + Err(ReductionError::ConsistentError(RpcError::ProviderError( + ProviderError::MissingRequiredProvider + ))) + ), + MultiRpcResult::Consistent(Err(RpcError::ProviderError( + ProviderError::MissingRequiredProvider + ))) + ); + assert_eq!( + process_result( + RpcMethod::EthGetTransactionCount, + Err(ReductionError::InconsistentResults(MultiResults::default())) + ), + MultiRpcResult::Inconsistent(vec![]) + ); + assert_eq!( + process_result( + RpcMethod::EthGetTransactionCount, + Err(ReductionError::InconsistentResults( + MultiResults::from_non_empty_iter(vec![( + RpcService::EthMainnet(EthMainnetService::Ankr), + Ok(5) + )]) + )) + ), + MultiRpcResult::Inconsistent(vec![( + RpcService::EthMainnet(EthMainnetService::Ankr), + Ok(5) + )]) + ); + assert_eq!( + process_result( + RpcMethod::EthGetTransactionCount, + Err(ReductionError::InconsistentResults( + MultiResults::from_non_empty_iter(vec![ + (RpcService::EthMainnet(EthMainnetService::Ankr), Ok(5)), + ( + RpcService::EthMainnet(EthMainnetService::Cloudflare), + Err(RpcError::ProviderError(ProviderError::NoPermission)) + ) + ]) + )) + ), + MultiRpcResult::Inconsistent(vec![ + (RpcService::EthMainnet(EthMainnetService::Ankr), Ok(5)), + ( + RpcService::EthMainnet(EthMainnetService::Cloudflare), + Err(RpcError::ProviderError(ProviderError::NoPermission)) + ) + ]) + ); +} From 15e2636952156881ffd4e1391afcc9c57ac57228 Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Fri, 31 Oct 2025 13:32:10 +0100 Subject: [PATCH 3/3] Clean-up lifetimes --- src/candid_rpc/mod.rs | 26 ++++---- src/rpc_client/mod.rs | 134 ++++++++++++++++++++++------------------ src/rpc_client/tests.rs | 6 +- 3 files changed, 90 insertions(+), 76 deletions(-) diff --git a/src/candid_rpc/mod.rs b/src/candid_rpc/mod.rs index c6cb891a..48484022 100644 --- a/src/candid_rpc/mod.rs +++ b/src/candid_rpc/mod.rs @@ -7,7 +7,7 @@ use canhttp::http::json::JsonRpcRequest; use canhttp::multi::Timestamp; use ethers_core::{types::Transaction, utils::rlp}; use evm_rpc_types::{ - BlockTag, GetLogsArgs, Hex, Hex32, MultiRpcResult, Nat256, RpcError, RpcResult, ValidationError, + BlockTag, Hex, Hex32, MultiRpcResult, Nat256, RpcError, RpcResult, ValidationError, }; /// Adapt the `EthRpcClient` to the `Candid` interface used by the EVM-RPC canister. @@ -27,7 +27,7 @@ impl CandidRpcClient { } pub async fn eth_get_logs( - &self, + self, args: evm_rpc_types::GetLogsArgs, ) -> MultiRpcResult> { use crate::candid_rpc::cketh_conversion::{from_log_entries, into_get_logs_param}; @@ -39,7 +39,7 @@ impl CandidRpcClient { } pub async fn eth_get_block_by_number( - &self, + self, block: BlockTag, ) -> MultiRpcResult { use crate::candid_rpc::cketh_conversion::{from_block, into_block_spec}; @@ -51,7 +51,7 @@ impl CandidRpcClient { } pub async fn eth_get_transaction_receipt( - &self, + self, hash: Hex32, ) -> MultiRpcResult> { use crate::candid_rpc::cketh_conversion::{from_transaction_receipt, into_hash}; @@ -63,7 +63,7 @@ impl CandidRpcClient { } pub async fn eth_get_transaction_count( - &self, + self, args: evm_rpc_types::GetTransactionCountArgs, ) -> MultiRpcResult { use crate::candid_rpc::cketh_conversion::into_get_transaction_count_params; @@ -75,7 +75,7 @@ impl CandidRpcClient { } pub async fn eth_fee_history( - &self, + self, args: evm_rpc_types::FeeHistoryArgs, ) -> MultiRpcResult { use crate::candid_rpc::cketh_conversion::{from_fee_history, into_fee_history_params}; @@ -87,7 +87,7 @@ impl CandidRpcClient { } pub async fn eth_send_raw_transaction( - &self, + self, raw_signed_transaction_hex: Hex, ) -> MultiRpcResult { use crate::candid_rpc::cketh_conversion::from_send_raw_transaction_result; @@ -99,10 +99,7 @@ impl CandidRpcClient { .map(|result| from_send_raw_transaction_result(transaction_hash.clone(), result)) } - pub async fn eth_call( - &self, - args: evm_rpc_types::CallArgs, - ) -> MultiRpcResult { + pub async fn eth_call(self, args: evm_rpc_types::CallArgs) -> MultiRpcResult { use crate::candid_rpc::cketh_conversion::{from_data, into_eth_call_params}; self.client .eth_call(into_eth_call_params(args)) @@ -111,7 +108,7 @@ impl CandidRpcClient { .map(from_data) } - pub async fn multi_request(&self, json_rpc_payload: String) -> MultiRpcResult { + pub async fn multi_request(self, json_rpc_payload: String) -> MultiRpcResult { let request: JsonRpcRequest = match serde_json::from_str(&json_rpc_payload) { Ok(req) => req, @@ -137,7 +134,10 @@ fn get_transaction_hash(raw_signed_transaction_hex: &Hex) -> Option { Some(Hex32::from(transaction.hash.0)) } -pub fn validate_get_logs_block_range(args: &GetLogsArgs, max_block_range: u32) -> RpcResult<()> { +pub fn validate_get_logs_block_range( + args: &evm_rpc_types::GetLogsArgs, + max_block_range: u32, +) -> RpcResult<()> { if let (Some(BlockTag::Number(from)), Some(BlockTag::Number(to))) = (&args.from_block, &args.to_block) { diff --git a/src/rpc_client/mod.rs b/src/rpc_client/mod.rs index 40c60cb2..12df1c6e 100644 --- a/src/rpc_client/mod.rs +++ b/src/rpc_client/mod.rs @@ -12,7 +12,7 @@ use crate::{ types::{MetricRpcMethod, RpcMethod}, }; use canhttp::{ - http::json::JsonRpcRequest, + http::json::{HttpJsonRpcResponse, JsonRpcRequest}, multi::{ MultiResults, Reduce, ReduceWithEquality, ReduceWithThreshold, ReducedResult, ReductionError, Timestamp, @@ -20,7 +20,7 @@ use canhttp::{ MaxResponseBytesRequestExtension, TransformContextRequestExtension, }; use evm_rpc_types::{ - ConsensusStrategy, JsonRpcError, MultiRpcResult, ProviderError, RpcConfig, RpcError, + ConsensusStrategy, JsonRpcError, MultiRpcResult, ProviderError, RpcConfig, RpcError, RpcResult, RpcService, RpcServices, }; use http::Request; @@ -252,15 +252,11 @@ impl EthRpcClient { self.providers.chain } - fn providers(&self) -> &BTreeSet { - &self.providers.services - } - fn response_size_estimate(&self, estimate: u64) -> ResponseSizeEstimate { ResponseSizeEstimate::new(self.config.response_size_estimate.unwrap_or(estimate)) } - fn consensus_strategy(&self) -> ReductionStrategy { + fn reduction_strategy(&self) -> ReductionStrategy { ReductionStrategy::from( self.config .response_consensus @@ -271,20 +267,22 @@ impl EthRpcClient { } pub fn eth_get_logs( - &self, + self, params: GetLogsParam, ) -> MultiRpcRequest<(GetLogsParam,), Vec> { + let response_size_estimate = self.response_size_estimate(1024 + HEADER_SIZE_LIMIT); + let reduction = self.reduction_strategy(); MultiRpcRequest::new( - self.providers(), + self.providers.services, RpcMethod::EthGetLogs, (params,), - self.response_size_estimate(1024 + HEADER_SIZE_LIMIT), - self.consensus_strategy(), + response_size_estimate, + reduction, ) } pub fn eth_get_block_by_number( - &self, + self, block: BlockSpec, ) -> MultiRpcRequest { let expected_block_size = match self.chain() { @@ -292,100 +290,115 @@ impl EthRpcClient { EthereumNetwork::MAINNET => 24 * 1024, _ => 24 * 1024, // Default for unknown networks }; + let response_size_estimate = + self.response_size_estimate(expected_block_size + HEADER_SIZE_LIMIT); + let reduction_strategy = self.reduction_strategy(); MultiRpcRequest::new( - self.providers(), + self.providers.services, RpcMethod::EthGetBlockByNumber, GetBlockByNumberParams { block, include_full_transactions: false, }, - self.response_size_estimate(expected_block_size + HEADER_SIZE_LIMIT), - self.consensus_strategy(), + response_size_estimate, + reduction_strategy, ) } pub fn eth_get_transaction_receipt( - &self, + self, tx_hash: Hash, ) -> MultiRpcRequest<(Hash,), Option> { + let response_size_estimate = self.response_size_estimate(700 + HEADER_SIZE_LIMIT); + let reduction_strategy = self.reduction_strategy(); MultiRpcRequest::new( - self.providers(), + self.providers.services, RpcMethod::EthGetTransactionReceipt, (tx_hash,), - self.response_size_estimate(700 + HEADER_SIZE_LIMIT), - self.consensus_strategy(), + response_size_estimate, + reduction_strategy, ) } pub fn eth_fee_history( - &self, + self, params: FeeHistoryParams, ) -> MultiRpcRequest { // A typical response is slightly above 300 bytes. + let response_size_estimate = self.response_size_estimate(512 + HEADER_SIZE_LIMIT); + let reduction_strategy = self.reduction_strategy(); MultiRpcRequest::new( - self.providers(), + self.providers.services, RpcMethod::EthFeeHistory, params, - self.response_size_estimate(512 + HEADER_SIZE_LIMIT), - self.consensus_strategy(), + response_size_estimate, + reduction_strategy, ) } pub fn eth_send_raw_transaction( - &self, + self, raw_signed_transaction_hex: String, ) -> MultiRpcRequest<(String,), SendRawTransactionResult> { // A successful reply is under 256 bytes, but we expect most calls to end with an error // since we submit the same transaction from multiple nodes. + let response_size_estimate = self.response_size_estimate(256 + HEADER_SIZE_LIMIT); + let reduction_strategy = self.reduction_strategy(); MultiRpcRequest::new( - self.providers(), + self.providers.services, RpcMethod::EthSendRawTransaction, (raw_signed_transaction_hex,), - self.response_size_estimate(256 + HEADER_SIZE_LIMIT), - self.consensus_strategy(), + response_size_estimate, + reduction_strategy, ) } pub fn eth_get_transaction_count( - &self, + self, params: GetTransactionCountParams, ) -> MultiRpcRequest { + let response_size_estimate = self.response_size_estimate(50 + HEADER_SIZE_LIMIT); + let reduction_strategy = self.reduction_strategy(); MultiRpcRequest::new( - self.providers(), + self.providers.services, RpcMethod::EthGetTransactionCount, params, - self.response_size_estimate(50 + HEADER_SIZE_LIMIT), - self.consensus_strategy(), + response_size_estimate, + reduction_strategy, ) } - pub fn eth_call(&self, params: EthCallParams) -> MultiRpcRequest { + pub fn eth_call(self, params: EthCallParams) -> MultiRpcRequest { + let response_size_estimate = self.response_size_estimate(256 + HEADER_SIZE_LIMIT); + let reduction_strategy = self.reduction_strategy(); MultiRpcRequest::new( - self.providers(), + self.providers.services, RpcMethod::EthCall, params, - self.response_size_estimate(256 + HEADER_SIZE_LIMIT), - self.consensus_strategy(), + response_size_estimate, + reduction_strategy, ) } - pub fn multi_request<'a>( - &self, + pub fn multi_request( + self, method: RpcMethod, - params: Option<&'a Value>, - ) -> MultiRpcRequest, RawJson> { + params: Option<&Value>, + ) -> MultiRpcRequest, RawJson> { + let response_size_estimate = self.response_size_estimate(256 + HEADER_SIZE_LIMIT); + let reduction_strategy = self.reduction_strategy(); MultiRpcRequest::new( - self.providers(), + self.providers.services, method, params, - self.response_size_estimate(256 + HEADER_SIZE_LIMIT), - self.consensus_strategy(), + response_size_estimate, + reduction_strategy, ) } } -pub struct MultiRpcRequest<'a, Params, Output> { - providers: &'a BTreeSet, +pub struct MultiRpcRequest { + providers: BTreeSet, method: RpcMethod, params: Params, response_size_estimate: ResponseSizeEstimate, @@ -393,14 +406,14 @@ pub struct MultiRpcRequest<'a, Params, Output> { _marker: std::marker::PhantomData, } -impl<'a, Params, Output> MultiRpcRequest<'a, Params, Output> { +impl MultiRpcRequest { pub fn new( - providers: &'a BTreeSet, + providers: BTreeSet, method: RpcMethod, params: Params, response_size_estimate: ResponseSizeEstimate, reduction_strategy: ReductionStrategy, - ) -> MultiRpcRequest<'a, Params, Output> { + ) -> MultiRpcRequest { MultiRpcRequest { providers, method, @@ -412,7 +425,7 @@ impl<'a, Params, Output> MultiRpcRequest<'a, Params, Output> { } } -impl MultiRpcRequest<'_, Params, Output> { +impl MultiRpcRequest { pub async fn send_and_reduce(self) -> MultiRpcResult where Params: Serialize + Clone + Debug, @@ -434,17 +447,8 @@ impl MultiRpcRequest<'_, Params, Output> { { let requests = self.create_json_rpc_requests(); - let client = - http_client(MetricRpcMethod::from(self.method.clone()), true).map_result(|r| match r? - .into_body() - .into_result() - { - Ok(value) => Ok(value), - Err(json_rpc_error) => Err(RpcError::JsonRpcError(JsonRpcError { - code: json_rpc_error.code, - message: json_rpc_error.message, - })), - }); + let client = http_client(MetricRpcMethod::from(self.method.clone()), true) + .map_result(extract_json_rpc_response); let (requests, errors) = requests.into_inner(); let (_client, mut results) = canhttp::multi::parallel_call(client, requests).await; @@ -480,7 +484,7 @@ impl MultiRpcRequest<'_, Params, Output> { .unwrap_or_default(); let effective_size_estimate = self.response_size_estimate.get(); let mut requests = MultiResults::default(); - for provider in self.providers { + for provider in self.providers.iter() { let request = resolve_rpc_service(provider.clone()) .map_err(RpcError::from) .and_then(|rpc_service| rpc_service.post(&get_override_provider())) @@ -506,6 +510,16 @@ impl MultiRpcRequest<'_, Params, Output> { } } +fn extract_json_rpc_response(result: RpcResult>) -> RpcResult { + match result?.into_body().into_result() { + Ok(value) => Ok(value), + Err(json_rpc_error) => Err(RpcError::JsonRpcError(JsonRpcError { + code: json_rpc_error.code, + message: json_rpc_error.message, + })), + } +} + pub enum ReductionStrategy { ByEquality(ReduceWithEquality), ByThreshold(ReduceWithThreshold), diff --git a/src/rpc_client/tests.rs b/src/rpc_client/tests.rs index bad2843a..06bfee83 100644 --- a/src/rpc_client/tests.rs +++ b/src/rpc_client/tests.rs @@ -39,7 +39,7 @@ mod eth_rpc_client { RpcServices::OptimismMainnet(None), ] { let client = EthRpcClient::new(empty_source, None, Timestamp::default()).unwrap(); - assert!(!client.providers().is_empty()); + assert!(!client.providers.services.is_empty()); } } @@ -56,8 +56,8 @@ mod eth_rpc_client { .unwrap(); assert_eq!( - client.providers(), - &btreeset! { + client.providers.services, + btreeset! { RpcService::EthMainnet(provider1), RpcService::EthMainnet(provider2) }