diff --git a/Cargo.lock b/Cargo.lock index 821f92df61..cb40600c18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3343,11 +3343,13 @@ dependencies = [ "fvm_shared", "hex", "ipc-api", + "ipc-observability", "ipc_actors_abis", "libipld", "multihash 0.18.1", "num-traits", "pin-project", + "prometheus", "quickcheck", "quickcheck_macros", "rand", diff --git a/fendermint/abci/examples/kvstore.rs b/fendermint/abci/examples/kvstore.rs index 9477c502f9..41695750ea 100644 --- a/fendermint/abci/examples/kvstore.rs +++ b/fendermint/abci/examples/kvstore.rs @@ -75,7 +75,8 @@ impl Application for KVStore { &self, request: request::PrepareProposal, ) -> Result { - let mut txs = take_until_max_size(request.txs, request.max_tx_bytes.try_into().unwrap()); + let (txs, _) = take_until_max_size(request.txs, request.max_tx_bytes.try_into().unwrap()); + let mut txs = txs.clone(); // Enfore transaciton limit so that we don't have a problem with buffering. txs.truncate(MAX_TXNS); diff --git a/fendermint/abci/src/application.rs b/fendermint/abci/src/application.rs index 0cb08164f5..58d3b33aee 100644 --- a/fendermint/abci/src/application.rs +++ b/fendermint/abci/src/application.rs @@ -64,7 +64,7 @@ pub trait Application { &self, request: request::PrepareProposal, ) -> AbciResult { - let txs = take_until_max_size(request.txs, request.max_tx_bytes.try_into().unwrap()); + let (txs, _) = take_until_max_size(request.txs, request.max_tx_bytes.try_into().unwrap()); Ok(response::PrepareProposal { txs }) } diff --git a/fendermint/abci/src/util.rs b/fendermint/abci/src/util.rs index b40d26d0dc..119fb15ced 100644 --- a/fendermint/abci/src/util.rs +++ b/fendermint/abci/src/util.rs @@ -4,7 +4,7 @@ /// Take the first transactions until the first one that would exceed the maximum limit. /// /// The function does not skip or reorder transaction even if a later one would stay within the limit. -pub fn take_until_max_size>(txs: Vec, max_tx_bytes: usize) -> Vec { +pub fn take_until_max_size>(txs: Vec, max_tx_bytes: usize) -> (Vec, usize) { let mut size: usize = 0; let mut out = Vec::new(); for tx in txs { @@ -15,5 +15,5 @@ pub fn take_until_max_size>(txs: Vec, max_tx_bytes: usize) -> size += bz.len(); out.push(tx); } - out + (out, size) } diff --git a/fendermint/app/options/src/lib.rs b/fendermint/app/options/src/lib.rs index 1718963b17..64d7c2b77e 100644 --- a/fendermint/app/options/src/lib.rs +++ b/fendermint/app/options/src/lib.rs @@ -117,7 +117,6 @@ pub struct Options { pub max_log_files: Option, #[arg( - short = 'l', long, default_value = "daily", value_enum, diff --git a/fendermint/app/src/app.rs b/fendermint/app/src/app.rs index 81d67073f8..fab3e331ac 100644 --- a/fendermint/app/src/app.rs +++ b/fendermint/app/src/app.rs @@ -13,7 +13,6 @@ use fendermint_abci::{AbciResult, Application}; use fendermint_storage::{ Codec, Encode, KVCollection, KVRead, KVReadable, KVStore, KVWritable, KVWrite, }; -use fendermint_tracing::emit; use fendermint_vm_core::Timestamp; use fendermint_vm_interpreter::bytes::{ BytesMessageApplyRes, BytesMessageCheckRes, BytesMessageQuery, BytesMessageQueryRes, @@ -37,13 +36,17 @@ use fvm_shared::chainid::ChainID; use fvm_shared::clock::ChainEpoch; use fvm_shared::econ::TokenAmount; use fvm_shared::version::NetworkVersion; +use ipc_observability::emit; use num_traits::Zero; use serde::{Deserialize, Serialize}; use tendermint::abci::request::CheckTxKind; use tendermint::abci::{request, response}; use tracing::instrument; -use crate::events::{NewBlock, ProposalProcessed}; +use crate::observe::{ + BlockCommitted, BlockProposalAccepted, BlockProposalReceived, BlockProposalRejected, + BlockProposalSent, HexEncodableBlockHash, MpoolReceived, MpoolReceivedInvalidMessage, +}; use crate::AppExitCode; use crate::BlockHeight; use crate::{tmconv::*, VERSION}; @@ -620,11 +623,42 @@ where *guard = Some(state); let response = match result { - Err(e) => invalid_check_tx(AppError::InvalidEncoding, e.description), + Err(e) => { + emit(MpoolReceivedInvalidMessage { + reason: "InvalidEncoding", + description: e.description.as_ref(), + }); + invalid_check_tx(AppError::InvalidEncoding, e.description) + } Ok(result) => match result { - Err(IllegalMessage) => invalid_check_tx(AppError::IllegalMessage, "".to_owned()), - Ok(Err(InvalidSignature(d))) => invalid_check_tx(AppError::InvalidSignature, d), - Ok(Ok(ret)) => to_check_tx(ret), + Err(IllegalMessage) => { + emit(MpoolReceivedInvalidMessage { + reason: "IllegalMessage", + description: "", + }); + + invalid_check_tx(AppError::IllegalMessage, "".to_owned()) + } + Ok(Err(InvalidSignature(d))) => { + emit(MpoolReceivedInvalidMessage { + reason: "InvalidSignature", + description: d.as_ref(), + }); + invalid_check_tx(AppError::InvalidSignature, d) + } + Ok(Ok(ret)) => { + emit(MpoolReceived { + from: ret.message.from.to_string().as_str(), + to: ret.message.to.to_string().as_str(), + value: ret.message.value.to_string().as_str(), + param_len: ret.message.params.len(), + gas_limit: ret.message.gas_limit, + fee_cap: ret.message.gas_fee_cap.to_string().as_str(), + premium: ret.message.gas_premium.to_string().as_str(), + accept: ret.exit_code.is_success(), + }); + to_check_tx(ret) + } }, }; @@ -650,7 +684,13 @@ where .context("failed to prepare proposal")?; let txs = txs.into_iter().map(bytes::Bytes::from).collect(); - let txs = take_until_max_size(txs, request.max_tx_bytes.try_into().unwrap()); + let (txs, size) = take_until_max_size(txs, request.max_tx_bytes.try_into().unwrap()); + + emit(BlockProposalSent { + height: request.height.value(), + tx_count: txs.len(), + size, + }); Ok(response::PrepareProposal { txs }) } @@ -666,26 +706,40 @@ where "process proposal" ); let txs: Vec<_> = request.txs.into_iter().map(|tx| tx.to_vec()).collect(); + let size_txs = txs.iter().map(|tx| tx.len()).sum::(); let num_txs = txs.len(); - let accept = self - .interpreter - .process(self.chain_env.clone(), txs) - .await - .context("failed to process proposal")?; - - emit!(ProposalProcessed { - is_accepted: accept, - block_height: request.height.value(), - block_hash: request.hash.to_string().as_str(), - num_txs, - proposer: request.proposer_address.to_string().as_str() + let process_result = self.interpreter.process(self.chain_env.clone(), txs).await; + + emit(BlockProposalReceived { + height: request.height.value(), + hash: HexEncodableBlockHash(request.hash.into()), + size: size_txs, + tx_count: num_txs, + validator: request.proposer_address.to_string().as_str(), }); - if accept { - Ok(response::ProcessProposal::Accept) - } else { - Ok(response::ProcessProposal::Reject) + match process_result { + Ok(_) => { + emit(BlockProposalAccepted { + height: request.height.value(), + hash: HexEncodableBlockHash(request.hash.into()), + size: size_txs, + tx_count: num_txs, + validator: request.proposer_address.to_string().as_str(), + }); + Ok(response::ProcessProposal::Accept) + } + Err(e) => { + emit(BlockProposalRejected { + height: request.height.value(), + size: size_txs, + tx_count: num_txs, + validator: request.proposer_address.to_string().as_str(), + reason: e.to_string().as_str(), + }); + Ok(response::ProcessProposal::Reject) + } } } @@ -867,12 +921,15 @@ where // Commit app state to the datastore. self.set_committed_state(state)?; - emit!(NewBlock { block_height }); - // Reset check state. let mut guard = self.check_state.lock().await; *guard = None; + emit(BlockCommitted { + height: block_height, + app_hash: HexEncodableBlockHash(app_hash.clone().into()), + }); + Ok(response::Commit { data: app_hash.into(), retain_height: retain_height.try_into().expect("height is valid"), diff --git a/fendermint/app/src/events.rs b/fendermint/app/src/events.rs deleted file mode 100644 index 84fc2c8cdb..0000000000 --- a/fendermint/app/src/events.rs +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2022-2024 Protocol Labs -// SPDX-License-Identifier: Apache-2.0, MIT - -use crate::BlockHeight; - -/// Re-export other events, just to provide the visibility of where they are. -pub use fendermint_vm_event::{ - NewBottomUpCheckpoint, ParentFinalityCommitted, ParentFinalityMissingQuorum, -}; - -/// Hex encoded block hash. -pub type BlockHashHex<'a> = &'a str; - -#[derive(Debug, Default)] -pub struct ProposalProcessed<'a> { - pub is_accepted: bool, - pub block_height: BlockHeight, - pub block_hash: BlockHashHex<'a>, - pub num_txs: usize, - pub proposer: &'a str, -} - -#[derive(Debug, Default)] -pub struct NewBlock { - pub block_height: BlockHeight, -} diff --git a/fendermint/app/src/lib.rs b/fendermint/app/src/lib.rs index 29c83a384a..0b529a9f23 100644 --- a/fendermint/app/src/lib.rs +++ b/fendermint/app/src/lib.rs @@ -1,9 +1,9 @@ // Copyright 2022-2024 Protocol Labs // SPDX-License-Identifier: Apache-2.0, MIT mod app; -pub mod events; pub mod ipc; pub mod metrics; +pub mod observe; mod store; mod tmconv; diff --git a/fendermint/app/src/metrics/mod.rs b/fendermint/app/src/metrics/mod.rs index abe06cdcdc..34459e81cd 100644 --- a/fendermint/app/src/metrics/mod.rs +++ b/fendermint/app/src/metrics/mod.rs @@ -2,8 +2,6 @@ // SPDX-License-Identifier: Apache-2.0, MIT mod prometheus; -mod tracing; pub use prometheus::app::register_metrics as register_app_metrics; pub use prometheus::eth::register_metrics as register_eth_metrics; -pub use tracing::layer; diff --git a/fendermint/app/src/metrics/tracing.rs b/fendermint/app/src/metrics/tracing.rs deleted file mode 100644 index 397092396e..0000000000 --- a/fendermint/app/src/metrics/tracing.rs +++ /dev/null @@ -1,155 +0,0 @@ -// Copyright 2022-2024 Protocol Labs -// SPDX-License-Identifier: Apache-2.0, MIT -//! Subscribing to tracing events and turning them into metrics. - -use std::marker::PhantomData; - -use tracing::{Event, Subscriber}; -use tracing_subscriber::{filter, layer, registry::LookupSpan, Layer}; - -use super::prometheus::app as am; -use crate::events::*; - -/// Create a layer that handles events by incrementing metrics. -pub fn layer() -> impl Layer -where - S: Subscriber, - for<'a> S: LookupSpan<'a>, -{ - MetricsLayer::new().with_filter(filter::filter_fn(|md| md.name().starts_with("event::"))) -} - -struct MetricsLayer { - _subscriber: PhantomData, -} - -impl MetricsLayer { - pub fn new() -> Self { - Self { - _subscriber: PhantomData, - } - } -} - -/// Check that the field exist on a type; if it doesn't this won't compile. -/// This ensures that we're mapping fields with the correct name. -macro_rules! check_field { - ($event_ty:ident :: $field:ident) => {{ - if false { - #[allow(clippy::needless_update)] - let _event = $event_ty { - $field: Default::default(), - ..Default::default() - }; - } - }}; -} - -/// Set a gague to an absolute value based on a field in an event. -macro_rules! set_gauge { - ($event:ident, $event_ty:ident :: $field:ident, $gauge:expr) => { - check_field!($event_ty::$field); - let mut fld = visitors::FindU64::new(stringify!($field)); - $event.record(&mut fld); - $gauge.set(fld.value as i64); - }; -} - -/// Increment a counter by the value of a field in the event. -macro_rules! inc_counter { - ($event:ident, $event_ty:ident :: $field:ident, $counter:expr) => { - check_field!($event_ty::$field); - let mut fld = visitors::FindU64::new(stringify!($field)); - $event.record(&mut fld); - $counter.inc_by(fld.value); - }; -} - -/// Produce the prefixed event name from the type name. -macro_rules! event_name { - ($event_ty:ident) => { - concat!("event::", stringify!($event_ty)) - }; -} - -/// Call one of the macros that set values on a metric. -macro_rules! event_mapping { - ($op:ident, $event:ident, $event_ty:ident :: $field:ident, $metric:expr) => { - $op!($event, $event_ty::$field, $metric); - }; -} - -/// Match the event name to event DTO types and within the map fields to metrics. -macro_rules! event_match { - ($event:ident { $( $event_ty:ident { $( $field:ident => $op:ident ! $metric:expr ),* $(,)? } ),* } ) => { - match $event.metadata().name() { - $( - event_name!($event_ty) => { - $( - event_mapping!($op, $event, $event_ty :: $field, $metric); - )* - } - )* - _ => {} - } - }; -} - -impl Layer for MetricsLayer { - fn on_event(&self, event: &Event<'_>, _ctx: layer::Context<'_, S>) { - event_match!(event { - NewBottomUpCheckpoint { - block_height => set_gauge ! &am::BOTTOMUP_CKPT_BLOCK_HEIGHT, - next_configuration_number => set_gauge ! &am::BOTTOMUP_CKPT_CONFIG_NUM, - num_msgs => inc_counter ! &am::BOTTOMUP_CKPT_NUM_MSGS, - }, - NewBlock { - block_height => set_gauge ! &am::ABCI_COMMITTED_BLOCK_HEIGHT - } - }); - } -} - -mod visitors { - use tracing::field::{Field, Visit}; - - pub struct FindU64<'a> { - pub name: &'a str, - pub value: u64, - } - - impl<'a> FindU64<'a> { - pub fn new(name: &'a str) -> Self { - Self { name, value: 0 } - } - } - - // Looking for multiple values because the callsite might be passed as a literal which turns into an i64 for example. - impl<'a> Visit for FindU64<'a> { - fn record_u64(&mut self, field: &Field, value: u64) { - if field.name() == self.name { - self.value = value; - } - } - - fn record_i64(&mut self, field: &Field, value: i64) { - if field.name() == self.name { - self.value = value as u64; - } - } - - fn record_i128(&mut self, field: &Field, value: i128) { - if field.name() == self.name { - self.value = value as u64; - } - } - - fn record_u128(&mut self, field: &Field, value: u128) { - if field.name() == self.name { - self.value = value as u64; - } - } - - fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {} - } -} diff --git a/fendermint/app/src/observe.rs b/fendermint/app/src/observe.rs new file mode 100644 index 0000000000..dd50b6e3e8 --- /dev/null +++ b/fendermint/app/src/observe.rs @@ -0,0 +1,215 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +use hex; +use std::fmt; + +use ipc_observability::{ + impl_traceable, impl_traceables, lazy_static, register_metrics, Recordable, TraceLevel, + Traceable, +}; + +use prometheus::{register_counter_vec, CounterVec, Registry}; + +register_metrics! { + PROPOSALS_BLOCK_PROPOSAL_RECEIVED: CounterVec + = register_counter_vec!("proposals_block_proposal_received", "Block proposal received", &["height"]); + PROPOSALS_BLOCK_PROPOSAL_SENT: CounterVec + = register_counter_vec!("proposals_block_proposal_sent", "Block proposal sent", &["height"]); + PROPOSALS_BLOCK_PROPOSAL_ACCEPTED: CounterVec + = register_counter_vec!("proposals_block_proposal_accepted", "Block proposal accepted", &["height"]); + PROPOSALS_BLOCK_PROPOSAL_REJECTED: CounterVec + = register_counter_vec!("proposals_block_proposal_rejected", "Block proposal rejected", &["height"]); + PROPOSALS_BLOCK_COMMITTED: CounterVec + = register_counter_vec!("proposals_block_committed", "Block committed", &["height"]); + MPOOL_RECEIVED: CounterVec = register_counter_vec!("mpool_received", "Mpool received", &["accept", "from", "to"]); + MPOOL_RECEIVED_INVALID_MESSAGE: CounterVec + = register_counter_vec!("mpool_received_invalid_message", "Mpool received invalid message", &["reason"]); +} + +impl_traceables!( + TraceLevel::Info, + "Proposals", + BlockProposalReceived<'a>, + BlockProposalSent, + BlockProposalAccepted<'a>, + BlockProposalRejected<'a>, + BlockCommitted +); + +impl_traceables!( + TraceLevel::Info, + "Mpool", + MpoolReceived<'a>, + MpoolReceivedInvalidMessage<'a> +); + +pub type BlockHeight = u64; +/// Hex encodable block hash. +pub struct HexEncodableBlockHash(pub Vec); + +impl fmt::Debug for HexEncodableBlockHash { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", hex::encode(&self.0)) + } +} + +#[derive(Debug)] +pub struct BlockProposalReceived<'a> { + pub height: BlockHeight, + pub hash: HexEncodableBlockHash, + pub size: usize, + pub tx_count: usize, + pub validator: &'a str, +} + +impl Recordable for BlockProposalReceived<'_> { + fn record_metrics(&self) { + PROPOSALS_BLOCK_PROPOSAL_RECEIVED + .with_label_values(&[&self.height.to_string()]) + .inc(); + } +} + +#[derive(Debug)] +pub struct BlockProposalSent { + pub height: BlockHeight, + pub size: usize, + pub tx_count: usize, +} + +impl Recordable for BlockProposalSent { + fn record_metrics(&self) { + PROPOSALS_BLOCK_PROPOSAL_SENT + .with_label_values(&[&self.height.to_string()]) + .inc(); + } +} + +#[derive(Debug)] +pub struct BlockProposalAccepted<'a> { + pub height: BlockHeight, + pub hash: HexEncodableBlockHash, + pub size: usize, + pub tx_count: usize, + pub validator: &'a str, +} + +impl Recordable for BlockProposalAccepted<'_> { + fn record_metrics(&self) { + PROPOSALS_BLOCK_PROPOSAL_ACCEPTED + .with_label_values(&[&self.height.to_string()]) + .inc(); + } +} + +#[derive(Debug)] +pub struct BlockProposalRejected<'a> { + pub height: BlockHeight, + pub size: usize, + pub tx_count: usize, + pub validator: &'a str, + pub reason: &'a str, +} + +impl Recordable for BlockProposalRejected<'_> { + fn record_metrics(&self) { + PROPOSALS_BLOCK_PROPOSAL_REJECTED + .with_label_values(&[&self.height.to_string()]) + .inc(); + } +} + +#[derive(Debug)] +pub struct BlockCommitted { + pub height: BlockHeight, + pub app_hash: HexEncodableBlockHash, +} + +impl Recordable for BlockCommitted { + fn record_metrics(&self) { + PROPOSALS_BLOCK_COMMITTED + .with_label_values(&[&self.height.to_string()]) + .inc(); + } +} + +#[derive(Debug)] +pub struct MpoolReceived<'a> { + // TODO - add cid later on + // pub message_cid: &'a str, + pub from: &'a str, + pub to: &'a str, + pub value: &'a str, + pub param_len: usize, + pub gas_limit: u64, + pub fee_cap: &'a str, + pub premium: &'a str, + pub accept: bool, +} + +impl Recordable for MpoolReceived<'_> { + fn record_metrics(&self) { + MPOOL_RECEIVED + .with_label_values(&[&self.accept.to_string(), self.from, self.to]) + .inc(); + } +} + +#[derive(Debug)] +pub struct MpoolReceivedInvalidMessage<'a> { + pub reason: &'a str, + pub description: &'a str, +} + +impl Recordable for MpoolReceivedInvalidMessage<'_> { + fn record_metrics(&self) { + MPOOL_RECEIVED_INVALID_MESSAGE + .with_label_values(&[self.reason]) + .inc(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ipc_observability::emit; + + #[test] + fn test_emit() { + emit(BlockProposalReceived { + height: 1, + hash: HexEncodableBlockHash(vec![0x01, 0x02, 0x03]), + size: 100, + tx_count: 10, + validator: "validator", + }); + + emit(BlockProposalSent { + height: 1, + size: 100, + tx_count: 10, + }); + + emit(BlockProposalAccepted { + height: 1, + hash: HexEncodableBlockHash(vec![0x01, 0x02, 0x03]), + size: 100, + tx_count: 10, + validator: "validator", + }); + + emit(BlockProposalRejected { + height: 1, + size: 100, + tx_count: 10, + validator: "validator", + reason: "reason", + }); + + emit(BlockCommitted { + height: 1, + app_hash: HexEncodableBlockHash(vec![0x01, 0x02, 0x03]), + }); + } +} diff --git a/fendermint/eth/api/src/apis/mod.rs b/fendermint/eth/api/src/apis/mod.rs index ca7e1968a0..e948728f58 100644 --- a/fendermint/eth/api/src/apis/mod.rs +++ b/fendermint/eth/api/src/apis/mod.rs @@ -121,7 +121,6 @@ pub fn register_methods(server: ServerBuilder) -> ServerBuilder) -> anyhow::Result { + async fn process( + &self, + state: Self::State, + msgs: Vec, + ) -> anyhow::Result { if msgs.len() > self.max_msgs { tracing::warn!( block_msgs = msgs.len(), "rejecting block: too many messages" ); - return Ok(false); + return Err(ProcessError::TooManyMessages(msgs.len())); } let mut chain_msgs = Vec::new(); @@ -152,7 +157,7 @@ where "failed to decode message in proposal as ChainMessage" ); if self.reject_malformed_proposal { - return Ok(false); + return Err(ProcessError::FailedToDecodeMessage(e.to_string())); } } Ok(msg) => chain_msgs.push(msg), diff --git a/fendermint/vm/interpreter/src/chain.rs b/fendermint/vm/interpreter/src/chain.rs index 79136138f8..dae3dc951c 100644 --- a/fendermint/vm/interpreter/src/chain.rs +++ b/fendermint/vm/interpreter/src/chain.rs @@ -1,5 +1,6 @@ // Copyright 2022-2024 Protocol Labs // SPDX-License-Identifier: Apache-2.0, MIT +use crate::errors::ProcessError; use crate::fvm::state::ipc::GatewayCaller; use crate::fvm::{topdown, FvmApplyRet, PowerUpdates}; use crate::{ @@ -175,7 +176,11 @@ where } /// Perform finality checks on top-down transactions and availability checks on bottom-up transactions. - async fn process(&self, env: Self::State, msgs: Vec) -> anyhow::Result { + async fn process( + &self, + env: Self::State, + msgs: Vec, + ) -> anyhow::Result { for msg in msgs { match msg { ChainMessage::Ipc(IpcMessage::BottomUpExec(msg)) => { @@ -194,7 +199,7 @@ where .await; if !is_resolved { - return Ok(false); + return Err(ProcessError::CheckpointNotResolved); } } ChainMessage::Ipc(IpcMessage::TopDownExec(ParentFinality { @@ -208,7 +213,7 @@ where let is_final = atomically(|| env.parent_finality_provider.check_proposal(&prop)).await; if !is_final { - return Ok(false); + return Err(ProcessError::ParentFinalityNotAvailable); } } _ => {} diff --git a/fendermint/vm/interpreter/src/errors.rs b/fendermint/vm/interpreter/src/errors.rs new file mode 100644 index 0000000000..62e7363955 --- /dev/null +++ b/fendermint/vm/interpreter/src/errors.rs @@ -0,0 +1,16 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum ProcessError { + #[error("checkpoint not resolved")] + CheckpointNotResolved, + #[error("parent finality not available")] + ParentFinalityNotAvailable, + #[error("too many messages: {0}")] + TooManyMessages(usize), + #[error("failed to decode message in proposal as ChainMessage: {0}")] + FailedToDecodeMessage(String), +} diff --git a/fendermint/vm/interpreter/src/fvm/check.rs b/fendermint/vm/interpreter/src/fvm/check.rs index 1329cccedd..ee5b5a1398 100644 --- a/fendermint/vm/interpreter/src/fvm/check.rs +++ b/fendermint/vm/interpreter/src/fvm/check.rs @@ -6,10 +6,16 @@ use async_trait::async_trait; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::RawBytes; use fvm_shared::{address::Address, error::ExitCode}; +use ipc_observability::emit; use crate::CheckInterpreter; -use super::{state::FvmExecState, store::ReadOnlyBlockstore, FvmMessage, FvmMessageInterpreter}; +use super::{ + observe::MsgExecCheck, + state::{ElapsedExecution, FvmExecState}, + store::ReadOnlyBlockstore, + FvmMessage, FvmMessageInterpreter, +}; /// Transaction check results are expressed by the exit code, so that hopefully /// they would result in the same error code if they were applied. @@ -19,6 +25,7 @@ pub struct FvmCheckRet { pub exit_code: ExitCode, pub return_data: Option, pub info: Option, + pub message: FvmMessage, } #[async_trait] @@ -64,6 +71,7 @@ where exit_code, return_data, info, + message: msg.clone(), }; Ok((state, ret)) }; @@ -108,9 +116,23 @@ where } else if self.exec_in_check { // Instead of modifying just the partial state, we will execute the call in earnest. // This is required for fully supporting the Ethereum API "pending" queries, if that's needed. + let (apply_ret, _, latency) = + ElapsedExecution::new(&mut state).execute_explicit(msg.clone())?; - // This will stack the effect for subsequent transactions added to the mempool. - let (apply_ret, _) = state.execute_explicit(msg.clone())?; + emit(MsgExecCheck { + height: state.block_height(), + from: msg.from.to_string().as_str(), + to: msg.to.to_string().as_str(), + value: msg.value.to_string().as_str(), + method_num: msg.method_num, + gas_limit: msg.gas_limit, + gas_price: msg.gas_premium.to_string().as_str(), + // TODO Karel - this should be the serialized params + params: msg.params.clone().bytes(), + nonce: msg.sequence, + duration: latency.as_secs_f64(), + exit_code: apply_ret.msg_receipt.exit_code.value(), + }); return checked( state, diff --git a/fendermint/vm/interpreter/src/fvm/exec.rs b/fendermint/vm/interpreter/src/fvm/exec.rs index c5f845e1f3..aa468a6cb7 100644 --- a/fendermint/vm/interpreter/src/fvm/exec.rs +++ b/fendermint/vm/interpreter/src/fvm/exec.rs @@ -9,13 +9,15 @@ use fendermint_vm_actor_interface::{chainmetadata, cron, system}; use fvm::executor::ApplyRet; use fvm_ipld_blockstore::Blockstore; use fvm_shared::{address::Address, ActorID, MethodNum, BLOCK_GAS_LIMIT}; +use ipc_observability::emit; use tendermint_rpc::Client; use crate::ExecInterpreter; use super::{ checkpoint::{self, PowerUpdates}, - state::FvmExecState, + observe::MsgExecApply, + state::{ElapsedExecution, FvmExecState}, FvmMessage, FvmMessageInterpreter, }; @@ -153,22 +155,32 @@ where let to = msg.to; let method_num = msg.method_num; let gas_limit = msg.gas_limit; - - let (apply_ret, emitters) = if from == system::SYSTEM_ACTOR_ADDR { - state.execute_implicit(msg)? + let sequence = msg.sequence; + let params = msg.params.clone(); + let value = msg.value.to_string(); + // TODO Karel - this should probably not be gas_premium but something else?? + let gas_price = msg.gas_premium.to_string(); + + let (apply_ret, emitters, latency) = if from == system::SYSTEM_ACTOR_ADDR { + ElapsedExecution::new(&mut state).execute_implicit(msg)? } else { - state.execute_explicit(msg)? + ElapsedExecution::new(&mut state).execute_explicit(msg)? }; - tracing::info!( - height = state.block_height(), - from = from.to_string(), - to = to.to_string(), - method_num = method_num, - exit_code = apply_ret.msg_receipt.exit_code.value(), - gas_used = apply_ret.msg_receipt.gas_used, - "tx delivered" - ); + emit(MsgExecApply { + height: state.block_height(), + from: from.to_string().as_str(), + to: to.to_string().as_str(), + value: value.as_str(), + method_num, + gas_limit, + gas_price: gas_price.as_str(), + // TODO Karel - this should be the serialized params + params: params.bytes(), + nonce: sequence, + duration: latency.as_secs_f64(), + exit_code: apply_ret.msg_receipt.exit_code.value(), + }); let ret = FvmApplyRet { apply_ret, diff --git a/fendermint/vm/interpreter/src/fvm/mod.rs b/fendermint/vm/interpreter/src/fvm/mod.rs index 0aa31b69cd..83cb934d9c 100644 --- a/fendermint/vm/interpreter/src/fvm/mod.rs +++ b/fendermint/vm/interpreter/src/fvm/mod.rs @@ -8,6 +8,7 @@ mod checkpoint; mod exec; mod externs; mod genesis; +mod observe; mod query; pub mod state; pub mod store; diff --git a/fendermint/vm/interpreter/src/fvm/observe.rs b/fendermint/vm/interpreter/src/fvm/observe.rs new file mode 100644 index 0000000000..d59f6513b0 --- /dev/null +++ b/fendermint/vm/interpreter/src/fvm/observe.rs @@ -0,0 +1,147 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +use ipc_observability::{ + impl_traceable, impl_traceables, lazy_static, register_metrics, Recordable, TraceLevel, + Traceable, +}; +use prometheus::{register_histogram, Histogram, Registry}; + +register_metrics! { + EXEC_FVM_CHECK_EXECUTION_TIME_SECS: Histogram + = register_histogram!("exec_fvm_check_execution_time_secs", "Execution time of FVM check in seconds"); + EXEC_FVM_ESTIMATE_EXECUTION_TIME_SECS: Histogram + = register_histogram!("exec_fvm_estimate_execution_time_secs", "Execution time of FVM estimate in seconds"); + EXEC_FVM_APPLY_EXECUTION_TIME_SECS: Histogram + = register_histogram!("exec_fvm_apply_execution_time_secs", "Execution time of FVM apply in seconds"); + EXEC_FVM_CALL_EXECUTION_TIME_SECS: Histogram + = register_histogram!("exec_fvm_call_execution_time_secs", "Execution time of FVM call in seconds"); +} + +impl_traceables!( + TraceLevel::Info, + "Execution", + MsgExecCheck<'a>, + MsgExecEstimate<'a>, + MsgExecApply<'a>, + MsgExecCall<'a> +); + +macro_rules! message_exec_struct { + ($($name:ident),*) => { + $( + #[derive(std::fmt::Debug)] + #[allow(dead_code)] + pub struct $name<'a> { + pub height: i64, + pub from: &'a str, + pub to: &'a str, + pub value: &'a str, + pub method_num: u64, + pub gas_limit: u64, + pub gas_price: &'a str, + pub params: &'a [u8], + pub nonce: u64, + pub duration: f64, + pub exit_code: u32, + } + )* + }; +} + +impl Recordable for MsgExecCheck<'_> { + fn record_metrics(&self) { + EXEC_FVM_CHECK_EXECUTION_TIME_SECS.observe(self.duration); + } +} + +impl Recordable for MsgExecEstimate<'_> { + fn record_metrics(&self) { + EXEC_FVM_ESTIMATE_EXECUTION_TIME_SECS.observe(self.duration); + } +} + +impl Recordable for MsgExecApply<'_> { + fn record_metrics(&self) { + EXEC_FVM_APPLY_EXECUTION_TIME_SECS.observe(self.duration); + } +} + +impl Recordable for MsgExecCall<'_> { + fn record_metrics(&self) { + EXEC_FVM_CALL_EXECUTION_TIME_SECS.observe(self.duration); + } +} + +message_exec_struct!(MsgExecCheck, MsgExecEstimate, MsgExecApply, MsgExecCall); + +#[cfg(test)] +mod tests { + use super::*; + use ipc_observability::emit; + + #[test] + fn test_metrics() { + let registry = Registry::new(); + register_metrics(®istry).unwrap(); + } + + #[test] + fn test_emit() { + emit(MsgExecCheck { + height: 1, + from: "from", + to: "to", + value: "value", + method_num: 1, + gas_limit: 1, + gas_price: "gas_price", + params: &[1, 2, 3], + nonce: 1, + duration: 1.0, + exit_code: 1, + }); + + emit(MsgExecEstimate { + height: 1, + from: "from", + to: "to", + value: "value", + method_num: 1, + gas_limit: 1, + gas_price: "gas_price", + params: &[1, 2, 3], + nonce: 1, + duration: 1.0, + exit_code: 1, + }); + + emit(MsgExecApply { + height: 1, + from: "from", + to: "to", + value: "value", + method_num: 1, + gas_limit: 1, + gas_price: "gas_price", + params: &[1, 2, 3], + nonce: 1, + duration: 1.0, + exit_code: 1, + }); + + emit(MsgExecCall { + height: 1, + from: "from", + to: "to", + value: "value", + method_num: 1, + gas_limit: 1, + gas_price: "gas_price", + params: &[1, 2, 3], + nonce: 1, + duration: 1.0, + exit_code: 1, + }) + } +} diff --git a/fendermint/vm/interpreter/src/fvm/query.rs b/fendermint/vm/interpreter/src/fvm/query.rs index 945cea65c6..237ba2b9bf 100644 --- a/fendermint/vm/interpreter/src/fvm/query.rs +++ b/fendermint/vm/interpreter/src/fvm/query.rs @@ -1,5 +1,7 @@ // Copyright 2022-2024 Protocol Labs // SPDX-License-Identifier: Apache-2.0, MIT +use std::time::Instant; + use async_trait::async_trait; use cid::Cid; use fendermint_vm_message::query::{ActorState, FvmQuery, GasEstimate, StateParams}; @@ -8,8 +10,10 @@ use fvm_ipld_encoding::RawBytes; use fvm_shared::{ bigint::BigInt, econ::TokenAmount, error::ExitCode, message::Message, ActorID, BLOCK_GAS_LIMIT, }; +use ipc_observability::emit; use num_traits::Zero; +use super::observe::{MsgExecCall, MsgExecEstimate}; use crate::QueryInterpreter; use super::{state::FvmQueryState, FvmApplyRet, FvmMessageInterpreter}; @@ -79,24 +83,25 @@ where let method_num = msg.method_num; let gas_limit = msg.gas_limit; + let start = Instant::now(); // Do not stack effects - let (state, (apply_ret, emitters)) = state.call(*msg).await?; + let (state, (apply_ret, emitters)) = state.call(*msg.clone()).await?; + let latency = start.elapsed().as_secs_f64(); + let exit_code = apply_ret.msg_receipt.exit_code.value(); - tracing::info!( - height = state.block_height(), - pending = state.pending(), - to = to.to_string(), - from = from.to_string(), - method_num, - exit_code = apply_ret.msg_receipt.exit_code.value(), - data = hex::encode(apply_ret.msg_receipt.return_data.bytes()), - info = apply_ret - .failure_info - .as_ref() - .map(|i| i.to_string()) - .unwrap_or_default(), - "query call" - ); + emit(MsgExecCall { + height: state.block_height(), + from: msg.from.to_string().as_str(), + to: msg.to.to_string().as_str(), + value: msg.value.to_string().as_str(), + method_num: msg.method_num, + gas_limit: msg.gas_limit, + gas_price: msg.gas_premium.to_string().as_str(), + params: &msg.params, + nonce: msg.sequence, + duration: latency, + exit_code, + }); let ret = FvmApplyRet { apply_ret, @@ -177,15 +182,25 @@ where msg.gas_premium = TokenAmount::zero(); msg.gas_fee_cap = TokenAmount::zero(); + let start = Instant::now(); // estimate the gas limit and assign it to the message // revert any changes because we'll repeat the estimation let (state, (ret, _)) = state.call(msg.clone()).await?; + let latency = start.elapsed().as_secs_f64(); - tracing::debug!( - gas_used = ret.msg_receipt.gas_used, - exit_code = ret.msg_receipt.exit_code.value(), - "estimated gassed message" - ); + emit(MsgExecEstimate { + height: state.block_height(), + from: msg.from.to_string().as_str(), + to: msg.to.to_string().as_str(), + value: msg.value.to_string().as_str(), + method_num: msg.method_num, + gas_limit: msg.gas_limit, + gas_price: msg.gas_premium.to_string().as_str(), + params: &msg.params, + nonce: msg.sequence, + duration: latency, + exit_code: ret.msg_receipt.exit_code.value(), + }); if !ret.msg_receipt.exit_code.is_success() { // if the message fail we can't estimate the gas. @@ -278,7 +293,9 @@ where // set message nonce to zero so the right one is picked up msg.sequence = 0; - let (state, (apply_ret, _)) = state.call(msg).await?; + let start = Instant::now(); + let (state, (apply_ret, _)) = state.call(msg.clone()).await?; + let latency = start.elapsed().as_secs_f64(); let ret = GasEstimate { exit_code: apply_ret.msg_receipt.exit_code, @@ -290,6 +307,20 @@ where gas_limit: apply_ret.msg_receipt.gas_used, }; + emit(MsgExecEstimate { + height: state.block_height(), + from: msg.from.to_string().as_str(), + to: msg.to.to_string().as_str(), + value: msg.value.to_string().as_str(), + method_num: msg.method_num, + gas_limit: msg.gas_limit, + gas_price: msg.gas_premium.to_string().as_str(), + params: &msg.params, + nonce: msg.sequence, + duration: latency, + exit_code: ret.exit_code.value(), + }); + // if the message succeeded or failed with a different error than `SYS_OUT_OF_GAS`, // immediately return as we either succeeded finding the right gas estimation, // or something non-related happened. diff --git a/fendermint/vm/interpreter/src/fvm/state/exec.rs b/fendermint/vm/interpreter/src/fvm/state/exec.rs index 7d5f10dda4..46e291eae6 100644 --- a/fendermint/vm/interpreter/src/fvm/state/exec.rs +++ b/fendermint/vm/interpreter/src/fvm/state/exec.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT use std::collections::{HashMap, HashSet}; +use std::time::{Duration, Instant}; use anyhow::Ok; use cid::Cid; @@ -350,3 +351,39 @@ fn check_error(e: anyhow::Error) -> (ApplyRet, ActorAddressMap) { }; (ret, Default::default()) } + +pub struct ElapsedExecution<'a, DB> +where + DB: Blockstore + Clone + 'static, +{ + state: &'a mut FvmExecState, +} + +impl<'a, DB> ElapsedExecution<'a, DB> +where + DB: Blockstore + Clone + 'static, +{ + pub fn new(state: &'a mut FvmExecState) -> Self { + Self { state } + } + + pub fn execute_implicit( + &mut self, + msg: Message, + ) -> anyhow::Result<(ApplyRet, ActorAddressMap, Duration)> { + let start = Instant::now(); + let result = self.state.execute_implicit(msg)?; + let duration = start.elapsed(); + Ok((result.0, result.1, duration)) + } + + pub fn execute_explicit( + &mut self, + msg: Message, + ) -> anyhow::Result<(ApplyRet, ActorAddressMap, Duration)> { + let start = Instant::now(); + let result = self.state.execute_explicit(msg)?; + let duration = start.elapsed(); + Ok((result.0, result.1, duration)) + } +} diff --git a/fendermint/vm/interpreter/src/fvm/state/mod.rs b/fendermint/vm/interpreter/src/fvm/state/mod.rs index 22a1504e4c..0547ce9d80 100644 --- a/fendermint/vm/interpreter/src/fvm/state/mod.rs +++ b/fendermint/vm/interpreter/src/fvm/state/mod.rs @@ -12,7 +12,7 @@ pub mod snapshot; use std::sync::Arc; pub use check::FvmCheckState; -pub use exec::{BlockHash, FvmExecState, FvmStateParams, FvmUpdatableParams}; +pub use exec::{BlockHash, ElapsedExecution, FvmExecState, FvmStateParams, FvmUpdatableParams}; pub use genesis::{empty_state_tree, FvmGenesisState}; pub use query::FvmQueryState; diff --git a/fendermint/vm/interpreter/src/lib.rs b/fendermint/vm/interpreter/src/lib.rs index 42dab76b8c..49ac252cac 100644 --- a/fendermint/vm/interpreter/src/lib.rs +++ b/fendermint/vm/interpreter/src/lib.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; pub mod bytes; pub mod chain; +pub mod errors; pub mod fvm; pub mod signed; @@ -54,7 +55,11 @@ pub trait ProposalInterpreter: Sync + Send { /// This is our chance check whether CIDs proposed for execution are available. /// /// Return `true` if we can accept this block, `false` to reject it. - async fn process(&self, state: Self::State, msgs: Vec) -> anyhow::Result; + async fn process( + &self, + state: Self::State, + msgs: Vec, + ) -> anyhow::Result; } /// The `ExecInterpreter` applies messages on some state, which is