Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions crates/anvil-polkadot/src/api_server/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use anvil_rpc::{error::RpcError, response::ResponseResult};
use serde::Serialize;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Rpc Endpoint not implemented")]
RpcUnimplemented,
}

pub type Result<T> = std::result::Result<T, Error>;

/// Helper trait to easily convert results to rpc results
pub(crate) trait ToRpcResponseResult {
fn to_rpc_result(self) -> ResponseResult;
}

/// Converts a serializable value into a `ResponseResult`.
fn to_rpc_result<T: Serialize>(val: T) -> ResponseResult {
match serde_json::to_value(val) {
Ok(success) => ResponseResult::Success(success),
Err(err) => {
error!(%err, "Failed serialize rpc response");
ResponseResult::error(RpcError::internal_error())
}
}
}

impl<T: Serialize> ToRpcResponseResult for Result<T> {
fn to_rpc_result(self) -> ResponseResult {
match self {
Ok(val) => to_rpc_result(val),
Err(err) => RpcError::internal_error_with(err.to_string()).into(),
}
}
}
9 changes: 6 additions & 3 deletions crates/anvil-polkadot/src/api_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anvil_rpc::response::ResponseResult;
use futures::channel::{mpsc, oneshot};
use server::ApiServer;

mod error;
mod server;

pub type ApiHandle = mpsc::Sender<ApiRequest>;
Expand All @@ -16,10 +17,12 @@ pub struct ApiRequest {
pub fn spawn(substrate_service: &Service, logging_manager: LoggingManager) -> ApiHandle {
let (api_handle, receiver) = mpsc::channel(100);

let api_server = ApiServer::new(substrate_service, receiver, logging_manager);

let spawn_handle = substrate_service.task_manager.spawn_essential_handle();
spawn_handle.spawn("anvil-api-server", "anvil", api_server.run());
let rpc_handlers = substrate_service.rpc_handlers.clone();
spawn_handle.spawn("anvil-api-server", "anvil", async move {
let api_server = ApiServer::new(rpc_handlers, receiver, logging_manager).await;
api_server.run().await;
});

api_handle
}
185 changes: 174 additions & 11 deletions crates/anvil-polkadot/src/api_server/server.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,124 @@
use super::ApiRequest;
use crate::{logging::LoggingManager, macros::node_info, substrate_node::service::Service};
use crate::{
api_server::{
error::{Error, Result, ToRpcResponseResult},
ApiRequest,
},
logging::LoggingManager,
macros::node_info,
};
use alloy_primitives::{B256, U256, U64};
use alloy_rpc_types::request::TransactionRequest;
use alloy_serde::WithOtherFields;
use anvil_core::eth::EthRequest;
use anvil_rpc::{error::RpcError, response::ResponseResult};
use futures::{channel::mpsc, StreamExt};
use polkadot_sdk::{
pallet_revive::evm::{Account, ReceiptInfo},
pallet_revive_eth_rpc::{
client::Client as EthRpcClient, subxt_client::SrcChainConfig, ReceiptExtractor,
ReceiptProvider, SubxtBlockInfoProvider,
},
sc_service::RpcHandlers,
};
use sqlx::sqlite::SqlitePoolOptions;
use subxt::{
backend::rpc::{RawRpcFuture, RawRpcSubscription, RawValue, RpcClient, RpcClientT},
ext::{
jsonrpsee::core::traits::ToRpcParams,
subxt_rpcs::{Error as SubxtRpcError, LegacyRpcMethods},
},
OnlineClient,
};

pub struct Wallet {
accounts: Vec<Account>,
}

pub struct ApiServer {
req_receiver: mpsc::Receiver<ApiRequest>,
logging_manager: LoggingManager,
eth_rpc_client: EthRpcClient,
wallet: Wallet,
}

struct InMemoryRpcClient(RpcHandlers);

struct Params(Option<Box<RawValue>>);

impl ToRpcParams for Params {
fn to_rpc_params(self) -> std::result::Result<Option<Box<RawValue>>, serde_json::Error> {
Ok(self.0)
}
}

impl RpcClientT for InMemoryRpcClient {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>> {
Box::pin(async move {
self.0
.handle()
.call(method, Params(params))
.await
.map_err(|err| SubxtRpcError::Client(Box::new(err)))
})
}

fn subscribe_raw<'a>(
&'a self,
_sub: &'a str,
_params: Option<Box<RawValue>>,
_unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
unimplemented!("Not needed")
}
}

impl ApiServer {
pub fn new(
_substrate_service: &Service,
pub async fn new(
rpc_handlers: RpcHandlers,
req_receiver: mpsc::Receiver<ApiRequest>,
logging_manager: LoggingManager,
) -> Self {
Self { req_receiver, logging_manager }
let rpc_client = RpcClient::new(InMemoryRpcClient(rpc_handlers));
let api =
OnlineClient::<SrcChainConfig>::from_rpc_client(rpc_client.clone()).await.unwrap();
let rpc = LegacyRpcMethods::<SrcChainConfig>::new(rpc_client.clone());

let block_provider = SubxtBlockInfoProvider::new(api.clone(), rpc.clone()).await.unwrap();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

block_info_provider


let (pool, keep_latest_n_blocks) = {
// see sqlite in-memory issue: https://github.com/launchbadge/sqlx/issues/2510
let pool = SqlitePoolOptions::new()
.max_connections(1)
.idle_timeout(None)
.max_lifetime(None)
.connect("sqlite::memory:")
.await
.unwrap();

(pool, Some(100))
};

let receipt_extractor = ReceiptExtractor::new(api.clone(), None).await.unwrap();

let receipt_provider = ReceiptProvider::new(
pool,
block_provider.clone(),
receipt_extractor.clone(),
keep_latest_n_blocks,
)
.await
.unwrap();

let eth_rpc_client =
EthRpcClient::new(api, rpc_client, rpc, block_provider, receipt_provider)
.await
.unwrap();

Self { req_receiver, logging_manager, eth_rpc_client, wallet: Wallet { accounts: vec![] } }
}

pub async fn run(mut self) {
Expand All @@ -27,13 +130,73 @@ impl ApiServer {
}

pub async fn execute(&mut self, req: EthRequest) -> ResponseResult {
match req {
EthRequest::SetLogging(enabled) => {
node_info!("anvil_setLoggingEnabled");
self.logging_manager.set_enabled(enabled);
ResponseResult::Success(serde_json::Value::Bool(true))
let res = match req.clone() {
EthRequest::SetLogging(enabled) => self.set_logging(enabled).to_rpc_result(),
EthRequest::EthChainId(()) => self.eth_chain_id().to_rpc_result(),
EthRequest::EthNetworkId(()) => self.network_id().to_rpc_result(),
EthRequest::NetListening(()) => self.net_listening().to_rpc_result(),
EthRequest::EthSyncing(()) => self.syncing().to_rpc_result(),
EthRequest::EthGetTransactionReceipt(tx_hash) => {
self.transaction_receipt(tx_hash).await.to_rpc_result()
}
_ => ResponseResult::Error(RpcError::internal_error()),
EthRequest::EthEstimateGas(call, block, _overrides) => {
self.estimate_gas(call, block).await.to_rpc_result()
}

_ => Err::<(), _>(Error::RpcUnimplemented).to_rpc_result(),
};

if let ResponseResult::Error(err) = &res {
node_info!("\nRPC request failed:");
node_info!(" Request: {:?}", res);
node_info!(" Error: {}\n", err);
}

res
}

fn set_logging(&self, enabled: bool) -> Result<()> {
node_info!("anvil_setLoggingEnabled");
self.logging_manager.set_enabled(enabled);
Ok(())
}

fn eth_chain_id(&self) -> Result<U64> {
node_info!("eth_chainId");
Ok(U256::from(self.eth_rpc_client.chain_id()).to::<U64>())
}

fn network_id(&self) -> Result<u64> {
node_info!("eth_networkId");
Ok(self.eth_rpc_client.chain_id())
}

fn net_listening(&self) -> Result<bool> {
node_info!("net_listening");
Ok(true)
}

fn syncing(&self) -> Result<bool> {
node_info!("eth_syncing");
Ok(false)
}

async fn transaction_receipt(&self, tx_hash: B256) -> Result<Option<ReceiptInfo>> {
node_info!("eth_getTransactionReceipt");
// TODO: do we really need to return Ok(None) if the transaction is still in the pool?
Ok(self.eth_rpc_client.receipt(&(tx_hash.0.into())).await)
}

async fn estimate_gas(
&self,
request: WithOtherFields<TransactionRequest>,
block: Option<alloy_rpc_types::BlockId>,
) -> Result<U256> {
node_info!("eth_estimateGas");

let hash = self.eth_rpc_client.block_hash_for_tag(block.unwrap_or_default().into()).await?;
let runtime_api = self.eth_rpc_client.runtime_api(hash);
let dry_run = runtime_api.dry_run(transaction).await?;
Ok(dry_run.eth_gas)
}
}