Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
06eb54a
feat: introduce metrics crate
karlem Jul 4, 2024
46629ad
feat: improve library with macros, add top down events
karlem Jul 5, 2024
89106eb
feat: add domain filter
karlem Jul 5, 2024
f756bd1
feat: emit metrics
karlem Jul 8, 2024
3780b2d
feat: add remaining top down events
karlem Jul 9, 2024
2c9f5f4
feat: fix test
karlem Jul 9, 2024
487560f
feat: remove unused code & fix build
karlem Jul 9, 2024
5ac87a1
feat: keep file journal disabled
karlem Jul 9, 2024
2dcec96
feat: address comments
karlem Jul 10, 2024
90e092b
feat: use latency wrapper instead
karlem Jul 10, 2024
3ef23f3
feat: register metrics
karlem Jul 10, 2024
31dc460
feat: add config filters
karlem Jul 11, 2024
a6bc67a
feat: add config as a cmd flags
karlem Jul 12, 2024
92b9d5f
feat: fix comments
karlem Jul 15, 2024
b693a80
Introduce New Observability - Configuration (#1063)
karlem Jul 15, 2024
366069c
feat: add consensus traces
karlem Jul 12, 2024
deae3bd
feat: add execution metrics
karlem Jul 12, 2024
f78e40c
feat: add proposals metrics & reason
karlem Jul 15, 2024
9086667
feat: add mempool event
karlem Jul 16, 2024
dd6e49f
feat: remove
karlem Jul 16, 2024
43e7d51
feat: add metrics
karlem Jul 16, 2024
4eb611f
feat: address comments
karlem Jul 16, 2024
d4e8a54
feat: address comments
karlem Jul 16, 2024
053d614
feat: fix clippy
karlem Jul 16, 2024
71a57cf
Introduce New Observability - Remaining events (#1064)
karlem Jul 16, 2024
2093d38
minor cleanup.
raulk Jul 16, 2024
8c63b8c
Merge branch 'main' into new-observability
raulk Jul 16, 2024
e590217
fix code typo.
raulk Jul 16, 2024
1ecb44f
feat: fix typo
karlem Jul 16, 2024
2a0b425
feat: standartize mpool trace
karlem Jul 16, 2024
e7e7cb5
feat: address comments
karlem Jul 18, 2024
0e7c1f1
lint: clippy
karlem Jul 18, 2024
3a94fed
feat: revert risky changes & small cleanup
karlem Jul 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 14 additions & 18 deletions fendermint/app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use fendermint_vm_interpreter::fvm::store::ReadOnlyBlockstore;
use fendermint_vm_interpreter::fvm::{FvmApplyRet, FvmGenesisOutput, PowerUpdates};
use fendermint_vm_interpreter::signed::InvalidSignature;
use fendermint_vm_interpreter::{
CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProposalInterpreter, QueryInterpreter,
CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProcessResult, ProposalInterpreter,
QueryInterpreter,
};
use fendermint_vm_message::query::FvmQueryHeight;
use fendermint_vm_snapshot::{SnapshotClient, SnapshotError};
Expand All @@ -44,7 +45,8 @@ use tendermint::abci::{request, response};
use tracing::instrument;

use crate::observe::{
BlockCommitted, BlockProposalEvaluated, BlockProposalReceived, BlockProposalSent, MpoolReceived,
BlockCommitted, BlockProposalEvaluated, BlockProposalReceived, BlockProposalSent, Message,
MpoolReceived,
};
use crate::AppExitCode;
use crate::BlockHeight;
Expand Down Expand Up @@ -629,23 +631,14 @@ where
Err(IllegalMessage) => invalid_check_tx(AppError::IllegalMessage, "".to_owned()),
Ok(Err(InvalidSignature(d))) => invalid_check_tx(AppError::InvalidSignature, d),
Ok(Ok(ret)) => {
mpool_received_trace.from = Some(ret.message.from);
mpool_received_trace.to = Some(ret.message.to);
mpool_received_trace.value = Some(ret.message.value.clone());
mpool_received_trace.param_len = ret.message.params.len();
mpool_received_trace.gas_limit = ret.message.gas_limit;
mpool_received_trace.fee_cap = Some(ret.message.gas_fee_cap.clone());
mpool_received_trace.premium = Some(ret.message.gas_premium.clone());

mpool_received_trace.message = Some(Message::from(&ret.message));
to_check_tx(ret)
}
},
};

if response.code.is_ok() {
mpool_received_trace.accept = true;
} else {
mpool_received_trace.accept = false;
mpool_received_trace.accept = response.code.is_ok();
if !mpool_received_trace.accept {
mpool_received_trace.reason = Some(format!("{:?} - {}", response.code, response.info));
}

Expand Down Expand Up @@ -698,7 +691,10 @@ where
let size_txs = txs.iter().map(|tx| tx.len()).sum::<usize>();
let num_txs = txs.len();

let process_result = self.interpreter.process(self.chain_env.clone(), txs).await;
let process_result = self
.interpreter
.process(self.chain_env.clone(), txs)
.await?;

emit(BlockProposalReceived {
height: request.height.value(),
Expand All @@ -719,10 +715,10 @@ where
};

let process_proposal = match process_result {
Ok(_) => response::ProcessProposal::Accept,
Err(e) => {
ProcessResult::Accepted => response::ProcessProposal::Accept,
ProcessResult::Rejected(reason) => {
proposal_evaluated.accept = false;
proposal_evaluated.reason = Some(e);
proposal_evaluated.reason = Some(reason);
response::ProcessProposal::Reject
}
};
Expand Down
20 changes: 18 additions & 2 deletions fendermint/app/src/cmd/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use fendermint_vm_snapshot::{SnapshotManager, SnapshotParams};
use fendermint_vm_topdown::observe::register_metrics as register_topdown_metrics;
use fendermint_vm_topdown::proxy::{IPCProviderProxy, IPCProviderProxyWithLatency};
use fendermint_vm_topdown::sync::launch_polling_syncer;
use fendermint_vm_topdown::voting::{publish_vote_loop, VoteTally};
use fendermint_vm_topdown::voting::{publish_vote_loop, Error as VoteError, VoteTally};
use fendermint_vm_topdown::{CachedFinalityProvider, IPCParentFinality, Toggle};
use fvm_shared::address::{current_network, Address, Network};
use ipc_ipld_resolver::{Event as ResolverEvent, VoteRecord};
Expand Down Expand Up @@ -539,14 +539,30 @@ async fn dispatch_vote(
tracing::debug!("ignoring vote; topdown disabled");
return;
}
let _res = atomically_or_err(|| {
let res = atomically_or_err(|| {
parent_finality_votes.add_vote(
vote.public_key.clone(),
f.height,
f.block_hash.clone(),
)
})
.await;

match res {
Err(e @ VoteError::Equivocation(_, _, _, _)) => {
tracing::warn!(error = e.to_string(), "failed to handle vote");
}
Err(e @ (
VoteError::Uninitialized // early vote, we're not ready yet
| VoteError::UnpoweredValidator(_) // maybe arrived too early or too late, or spam
| VoteError::UnexpectedBlock(_, _) // won't happen here
)) => {
tracing::debug!(error = e.to_string(), "failed to handle vote");
}
_ => {
tracing::debug!("vote handled");
}
};
}
}
}
40 changes: 32 additions & 8 deletions fendermint/app/src/observe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use fvm_shared::address::Address;
use fvm_shared::econ::TokenAmount;

use fendermint_vm_interpreter::errors::ProcessError;
use fendermint_vm_interpreter::fvm::FvmMessage;
use tendermint::account::Id;

use ipc_observability::{
Expand Down Expand Up @@ -113,25 +114,48 @@ impl Recordable for BlockCommitted {
}
}

#[derive(Debug)]
pub struct Message {
pub from: Address,
pub to: Address,
pub value: TokenAmount,
pub gas_limit: u64,
pub fee_cap: TokenAmount,
pub premium: TokenAmount,
}

impl From<&FvmMessage> for Message {
fn from(fvm_message: &FvmMessage) -> Self {
Message {
from: fvm_message.from,
to: fvm_message.to,
value: fvm_message.value.clone(),
gas_limit: fvm_message.gas_limit,
fee_cap: fvm_message.gas_fee_cap.clone(),
premium: fvm_message.gas_premium.clone(),
}
}
}

#[derive(Debug, Default)]
pub struct MpoolReceived {
// TODO - add cid later on
// pub message_cid: &'a str,
pub from: Option<Address>,
pub to: Option<Address>,
pub value: Option<TokenAmount>,
pub param_len: usize,
pub gas_limit: u64,
pub fee_cap: Option<TokenAmount>,
pub premium: Option<TokenAmount>,
pub message: Option<Message>,
pub accept: bool,
pub reason: Option<String>,
}

impl Recordable for MpoolReceived {
fn record_metrics(&self) {
let from = self
.message
.as_ref()
.map(|m| m.from.to_string())
.unwrap_or("".to_string());

MPOOL_RECEIVED
.with_label_values(&[&self.accept.to_string(), self.from.map_or("", |_| "")])
.with_label_values(&[&self.accept.to_string(), &from])
.inc();
}
}
Expand Down
1 change: 1 addition & 0 deletions fendermint/vm/interpreter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tendermint-rpc = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }
prometheus = { workspace = true }
strum = { workspace = true }

cid = { workspace = true }
fvm = { workspace = true }
Expand Down
13 changes: 9 additions & 4 deletions fendermint/vm/interpreter/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use crate::{
chain::{ChainMessageApplyRet, ChainMessageCheckRes},
errors::ProcessError,
fvm::{FvmQuery, FvmQueryRet},
CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProposalInterpreter, QueryInterpreter,
CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProcessResult, ProposalInterpreter,
QueryInterpreter,
};

pub type BytesMessageApplyRes = Result<ChainMessageApplyRet, IpldError>;
Expand Down Expand Up @@ -130,13 +131,15 @@ where
&self,
state: Self::State,
msgs: Vec<Self::Message>,
) -> anyhow::Result<bool, ProcessError> {
) -> anyhow::Result<ProcessResult> {
if msgs.len() > self.max_msgs {
tracing::warn!(
block_msgs = msgs.len(),
"rejecting block: too many messages"
);
return Err(ProcessError::TooManyMessages(msgs.len()));
return Ok(ProcessResult::Rejected(ProcessError::TooManyMessages(
msgs.len(),
)));
}

let mut chain_msgs = Vec::new();
Expand All @@ -157,7 +160,9 @@ where
"failed to decode message in proposal as ChainMessage"
);
if self.reject_malformed_proposal {
return Err(ProcessError::FailedToDecodeMessage(e.to_string()));
return Ok(ProcessResult::Rejected(
ProcessError::FailedToDecodeMessage(e.to_string()),
));
}
}
Ok(msg) => chain_msgs.push(msg),
Expand Down
13 changes: 8 additions & 5 deletions fendermint/vm/interpreter/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use crate::{
fvm::state::FvmExecState,
fvm::FvmMessage,
signed::{SignedMessageApplyRes, SignedMessageCheckRes, SyntheticMessage, VerifiableMessage},
CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProposalInterpreter, QueryInterpreter,
CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProcessResult, ProposalInterpreter,
QueryInterpreter,
};
use anyhow::{bail, Context};
use async_stm::atomically;
Expand Down Expand Up @@ -180,7 +181,7 @@ where
&self,
env: Self::State,
msgs: Vec<Self::Message>,
) -> anyhow::Result<bool, ProcessError> {
) -> anyhow::Result<ProcessResult> {
for msg in msgs {
match msg {
ChainMessage::Ipc(IpcMessage::BottomUpExec(msg)) => {
Expand All @@ -199,7 +200,7 @@ where
.await;

if !is_resolved {
return Err(ProcessError::CheckpointNotResolved);
return Ok(ProcessResult::Rejected(ProcessError::CheckpointNotResolved));
}
}
ChainMessage::Ipc(IpcMessage::TopDownExec(ParentFinality {
Expand All @@ -213,13 +214,15 @@ where
let is_final =
atomically(|| env.parent_finality_provider.check_proposal(&prop)).await;
if !is_final {
return Err(ProcessError::ParentFinalityNotAvailable);
return Ok(ProcessResult::Rejected(
ProcessError::ParentFinalityNotAvailable,
));
}
}
_ => {}
};
}
Ok(true)
Ok(ProcessResult::Accepted)
}
}

Expand Down
2 changes: 2 additions & 0 deletions fendermint/vm/interpreter/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ pub enum ProcessError {
TooManyMessages(usize),
#[error("failed to decode message in proposal as ChainMessage: {0}")]
FailedToDecodeMessage(String),
#[error("")]
Empty,
}
26 changes: 11 additions & 15 deletions fendermint/vm/interpreter/src/fvm/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use async_trait::async_trait;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::RawBytes;
use fvm_shared::{address::Address, error::ExitCode};
use ipc_observability::emit;
use ipc_observability::{emit, measure_time};

use crate::CheckInterpreter;

use super::{
observe::MsgExecCheck,
state::{ElapsedExecution, FvmExecState},
observe::{MsgExec, MsgExecPurpose},
state::FvmExecState,
store::ReadOnlyBlockstore,
FvmMessage, FvmMessageInterpreter,
};
Expand Down Expand Up @@ -116,20 +116,16 @@ where
} else if self.exec_in_check {
// Instead of modifying just the partial state, we will execute the call in earnest.
// This is required for fully supporting the Ethereum API "pending" queries, if that's needed.
let (apply_ret, _, latency) =
ElapsedExecution::new(&mut state).execute_explicit(msg.clone())?;

emit(MsgExecCheck {
let (execution_result, latency) =
measure_time(|| state.execute_explicit(msg.clone()));

let (apply_ret, _) = execution_result?;

emit(MsgExec {
purpose: MsgExecPurpose::Check,
height: state.block_height(),
from: msg.from.to_string().as_str(),
to: msg.to.to_string().as_str(),
value: msg.value.to_string().as_str(),
method_num: msg.method_num,
gas_limit: msg.gas_limit,
gas_price: msg.gas_premium.to_string().as_str(),
// TODO Karel - this should be the serialized params
params: msg.params.clone().bytes(),
nonce: msg.sequence,
message: msg.clone(),
duration: latency.as_secs_f64(),
exit_code: apply_ret.msg_receipt.exit_code.value(),
});
Expand Down
Loading