diff --git a/Cargo.lock b/Cargo.lock index f868a993..2b918bae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1324,7 +1324,9 @@ dependencies = [ "serde_yaml", "sha2 0.10.8", "ssz_types", + "tempfile", "thiserror 1.0.61", + "time", "tokio", "toml", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 029088a3..b1a6d791 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -103,3 +103,4 @@ derive_more = { version = "1.0.0", features = [ "deref", "display", ] } +time = { version = "0.3", features = ["formatting"] } diff --git a/bin/pbs.rs b/bin/pbs.rs index cb47ceb6..c9251a15 100644 --- a/bin/pbs.rs +++ b/bin/pbs.rs @@ -1,6 +1,7 @@ use cb_common::{ - config::load_pbs_config, - utils::{initialize_pbs_tracing_log, wait_for_signal}, + config::{load_pbs_config, PBS_MODULE_NAME}, + logging::initialize_tracing_log, + utils::wait_for_signal, }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use eyre::Result; @@ -14,7 +15,7 @@ async fn main() -> Result<()> { if std::env::var_os("RUST_BACKTRACE").is_none() { std::env::set_var("RUST_BACKTRACE", "1"); } - let _guard = initialize_pbs_tracing_log(); + let _guard = initialize_tracing_log(PBS_MODULE_NAME); let pbs_config = load_pbs_config().await?; diff --git a/bin/signer.rs b/bin/signer.rs index 8ae24046..73f67f7c 100644 --- a/bin/signer.rs +++ b/bin/signer.rs @@ -1,6 +1,7 @@ use cb_common::{ config::{StartSignerConfig, SIGNER_MODULE_NAME}, - utils::{initialize_tracing_log, wait_for_signal}, + logging::initialize_tracing_log, + utils::wait_for_signal, }; use cb_signer::service::SigningService; use eyre::Result; diff --git a/bin/src/lib.rs b/bin/src/lib.rs index 55961348..449c8a9d 100644 --- a/bin/src/lib.rs +++ b/bin/src/lib.rs @@ -7,15 +7,13 @@ pub mod prelude { }, config::{ load_builder_module_config, load_commit_module_config, load_pbs_config, - load_pbs_custom_config, LogsSettings, StartCommitModuleConfig, + load_pbs_custom_config, LogsSettings, StartCommitModuleConfig, PBS_MODULE_NAME, }, + logging::initialize_tracing_log, pbs::{BuilderEvent, BuilderEventClient, OnBuilderApiEvent}, signer::{BlsPublicKey, BlsSignature, EcdsaPublicKey, EcdsaSignature}, types::Chain, - utils::{ - initialize_pbs_tracing_log, initialize_tracing_log, utcnow_ms, utcnow_ns, utcnow_sec, - utcnow_us, - }, + utils::{utcnow_ms, utcnow_ns, utcnow_sec, utcnow_us}, }; pub use cb_metrics::provider::MetricsProvider; pub use cb_pbs::{ diff --git a/config.example.toml b/config.example.toml index 4d754b96..ce3042f8 100644 --- a/config.example.toml +++ b/config.example.toml @@ -244,3 +244,9 @@ log_level = "debug" # Maximum number of log files to keep # OPTIONAL max_log_files = 30 +# Log format. Supported values: default,json, raw +# OPTIONAL, DEFAULT: default +format = "raw" +# Log destination. Supported values: stdout, file, both +# OPTIONAL, DEFAULT: stdout +destination = "both" \ No newline at end of file diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 2c037c47..3125ec55 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -51,6 +51,10 @@ url.workspace = true rand.workspace = true bimap.workspace = true derive_more.workspace = true +time.workspace = true unicode-normalization.workspace = true base64.workspace = true + +[dev-dependencies] +tempfile = "3.8" diff --git a/crates/common/src/config/log.rs b/crates/common/src/config/log.rs index 2e6b2805..91350eee 100644 --- a/crates/common/src/config/log.rs +++ b/crates/common/src/config/log.rs @@ -2,8 +2,28 @@ use std::path::PathBuf; use eyre::Result; use serde::{Deserialize, Serialize}; +use tracing_subscriber::{fmt, Layer, Registry}; use super::{load_optional_env_var, CommitBoostConfig, LOGS_DIR_DEFAULT, LOGS_DIR_ENV}; +use crate::logging::RawFormatter; + +#[derive(Clone, Debug, Deserialize, Serialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum LogFormat { + #[default] + Default, // default tracing format + Raw, // key=value format + Json, // JSON format +} + +#[derive(Clone, Debug, Deserialize, Serialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum LogDest { + Stdout, // Only console output + File, // Only file output + #[default] + Both, // Both console and file output +} #[derive(Clone, Debug, Deserialize, Serialize)] pub struct LogsSettings { @@ -13,16 +33,10 @@ pub struct LogsSettings { pub log_level: String, #[serde(default)] pub max_log_files: Option, -} - -impl Default for LogsSettings { - fn default() -> Self { - LogsSettings { - log_dir_path: default_log_dir_path(), - log_level: default_log_level(), - max_log_files: None, - } - } + #[serde(default)] + pub format: LogFormat, + #[serde(default)] + pub destination: LogDest, } impl LogsSettings { @@ -38,6 +52,29 @@ impl LogsSettings { Ok(config.logs) } + + /// Creates a format layer based on the configured format type + pub fn create_format_layer(&self) -> Box + Send + Sync> { + match self.format { + LogFormat::Default => Box::new(fmt::layer().with_target(false)), + LogFormat::Raw => Box::new(fmt::layer().with_target(false).event_format(RawFormatter)), + LogFormat::Json => { + Box::new(fmt::layer().with_target(false).json().with_current_span(true)) + } + } + } +} + +impl Default for LogsSettings { + fn default() -> Self { + LogsSettings { + log_dir_path: default_log_dir_path(), + log_level: default_log_level(), + max_log_files: None, + format: LogFormat::default(), + destination: LogDest::default(), + } + } } fn default_log_dir_path() -> PathBuf { diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 5042061b..01f04a89 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -4,6 +4,7 @@ pub mod commit; pub mod config; pub mod constants; pub mod error; +pub mod logging; pub mod pbs; pub mod signature; pub mod signer; diff --git a/crates/common/src/logging.rs b/crates/common/src/logging.rs new file mode 100644 index 00000000..334a9dd2 --- /dev/null +++ b/crates/common/src/logging.rs @@ -0,0 +1,460 @@ +use eyre::Result; +use time::OffsetDateTime; +use tracing::{Event, Level, Subscriber}; +use tracing_appender::{non_blocking::WorkerGuard, rolling::Rotation}; +use tracing_subscriber::{ + fmt::{self, FmtContext}, + prelude::*, + registry::LookupSpan, + EnvFilter, Layer as SubscriberLayer, Registry, +}; + +use crate::config::{load_optional_env_var, LogDest, LogFormat, LogsSettings}; + +pub struct RawFormatter; + +impl RawFormatter { + pub fn new() -> Self { + RawFormatter + } +} + +impl Default for RawFormatter { + fn default() -> Self { + RawFormatter::new() + } +} + +impl fmt::FormatEvent for RawFormatter +where + S: Subscriber + for<'a> LookupSpan<'a>, + N: for<'a> fmt::FormatFields<'a> + 'static, +{ + fn format_event( + &self, + ctx: &FmtContext<'_, S, N>, + mut writer: fmt::format::Writer<'_>, + event: &Event<'_>, + ) -> std::fmt::Result { + // Write timestamp + let now = OffsetDateTime::now_utc(); + let timestamp = now + .format(&time::format_description::well_known::Rfc3339) + .map_err(|_| std::fmt::Error)?; + write!(writer, "timestamp={} ", timestamp)?; + + // Write log level + write!(writer, "log_level={} ", event.metadata().level().to_string().to_uppercase())?; + + // Write target/method + write!(writer, "method={} ", event.metadata().target())?; + + // Write span fields and event fields + ctx.field_format().format_fields(writer.by_ref(), event)?; + + writeln!(writer) + } +} + +pub struct JsonFormatter; + +impl JsonFormatter { + pub fn new() -> Self { + JsonFormatter + } +} + +impl Default for JsonFormatter { + fn default() -> Self { + JsonFormatter::new() + } +} + +impl fmt::FormatEvent for JsonFormatter +where + S: Subscriber + for<'a> LookupSpan<'a>, + N: for<'a> fmt::FormatFields<'a> + 'static, +{ + fn format_event( + &self, + _ctx: &FmtContext<'_, S, N>, + mut writer: fmt::format::Writer<'_>, + event: &Event<'_>, + ) -> std::fmt::Result { + let timestamp = OffsetDateTime::now_utc() + .format(&time::format_description::well_known::Rfc3339) + .map_err(|_| std::fmt::Error)?; + + let mut output = serde_json::json!({ + "timestamp": timestamp, + "log_level": event.metadata().level().to_string().to_uppercase(), + "method": event.metadata().target(), + }); + + // Add event fields directly to root + if let serde_json::Value::Object(ref mut map) = output { + let mut visitor = JsonVisitor(map); + event.record(&mut visitor); + } + + writeln!(writer, "{}", serde_json::to_string(&output).map_err(|_| std::fmt::Error)?) + } +} + +struct JsonVisitor<'a>(&'a mut serde_json::Map); + +impl tracing::field::Visit for JsonVisitor<'_> { + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + self.0.insert(field.name().to_string(), serde_json::Value::String(value.to_string())); + } + + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + self.0.insert(field.name().to_string(), serde_json::Value::String(format!("{:?}", value))); + } +} + +pub fn initialize_tracing_log( + module_id: &str, +) -> Result<(Option, Option)> { + // Load settings from environment/config, use defaults if not configured + let settings = LogsSettings::from_env_config()?.unwrap_or_default(); + + // Get log level from RUST_LOG env var or settings + let log_level = if let Some(log_level) = load_optional_env_var("RUST_LOG") { + log_level.parse::().expect("invalid RUST_LOG value") + } else { + settings.log_level.parse::().expect("invalid log_level value in settings") + }; + + // Create filter for commit-boost crates with specified log level + let filter = format_crates_filter(Level::INFO.as_str(), log_level.as_str())?; + + match settings.destination { + // Stdout only - use specified LogFormat + LogDest::Stdout => { + let (writer, guard) = tracing_appender::non_blocking(std::io::stdout()); + + // Create layer with configured format + let layer: Box + Send + Sync> = match settings.format { + LogFormat::Json => { + Box::new(fmt::layer().event_format(JsonFormatter).with_writer(writer)) + } + LogFormat::Raw => { + Box::new(fmt::layer().event_format(RawFormatter).with_writer(writer)) + } + _ => Box::new(fmt::layer().with_writer(writer)), + }; + + tracing_subscriber::registry().with(layer.with_filter(filter)).init(); + Ok((None, Some(guard))) + } + + // File output only - use specified LogFormat + LogDest::File => { + // Set up daily rotating log files + let file_appender = tracing_appender::rolling::Builder::new() + .filename_prefix(module_id.to_lowercase()) + .max_log_files(settings.max_log_files.unwrap_or_default()) + .rotation(Rotation::DAILY) + .build(settings.log_dir_path.clone()) + .expect("failed building rolling file appender"); + + let (file_writer, guard) = tracing_appender::non_blocking(file_appender); + + // Create layer with configured format + let layer: Box + Send + Sync> = match settings.format { + LogFormat::Json => { + Box::new(fmt::layer().event_format(JsonFormatter).with_writer(file_writer)) + } + LogFormat::Raw => { + Box::new(fmt::layer().event_format(RawFormatter).with_writer(file_writer)) + } + _ => Box::new(fmt::layer().with_writer(file_writer)), + }; + + tracing_subscriber::registry().with(layer.with_filter(filter)).init(); + Ok((Some(guard), None)) + } + + // Both outputs - use specified LogFormat for both + LogDest::Both => { + // Set up daily rotating log files + let file_appender = tracing_appender::rolling::Builder::new() + .filename_prefix(module_id.to_lowercase()) + .max_log_files(settings.max_log_files.unwrap_or_default()) + .rotation(Rotation::DAILY) + .build(settings.log_dir_path.clone()) + .expect("failed building rolling file appender"); + + let (file_writer, file_guard) = tracing_appender::non_blocking(file_appender); + let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout()); + + // Create layers with configured format for both outputs + let file_layer: Box + Send + Sync> = match settings.format + { + LogFormat::Json => { + Box::new(fmt::layer().event_format(JsonFormatter).with_writer(file_writer)) + } + LogFormat::Raw => { + Box::new(fmt::layer().event_format(RawFormatter).with_writer(file_writer)) + } + _ => Box::new(fmt::layer().with_writer(file_writer)), + }; + let stdout_layer: Box + Send + Sync> = + match settings.format { + LogFormat::Json => Box::new( + fmt::layer().event_format(JsonFormatter).with_writer(stdout_writer), + ), + LogFormat::Raw => { + Box::new(fmt::layer().event_format(RawFormatter).with_writer(stdout_writer)) + } + _ => Box::new(fmt::layer().with_writer(stdout_writer)), + }; + + // Create separate filters to avoid cloning + let file_filter = format_crates_filter(Level::INFO.as_str(), log_level.as_str())?; + let stdout_filter = format_crates_filter(Level::INFO.as_str(), log_level.as_str())?; + + // Initialize registry with both layers + tracing_subscriber::registry() + .with(vec![ + file_layer.with_filter(file_filter), + stdout_layer.with_filter(stdout_filter), + ]) + .init(); + Ok((Some(file_guard), Some(stdout_guard))) + } + } +} + +// all commit boost crates +fn format_crates_filter(default_level: &str, crates_level: &str) -> Result { + let s = format!( + "{default_level},cb_signer={crates_level},cb_pbs={crates_level},cb_common={crates_level},cb_metrics={crates_level}", + ); + s.parse().map_err(|e| eyre::eyre!("Failed to parse filter directive: {}", e)) +} + +// todo tests will not pass CI due to: SetGlobalDefaultError("a global default +// trace dispatcher has already been set") #[cfg(test)] +// mod tests { +// use tempfile::tempdir; +// use tracing::info; + +// use super::*; +// use crate::config::CommitBoostConfig; + +// fn setup_test_config(dir: &tempfile::TempDir, settings: &LogsSettings) -> +// Result<()> { let config = CommitBoostConfig { +// chain: crate::types::Chain::Mainnet, +// relays: vec![], +// pbs: crate::config::StaticPbsConfig { +// pbs_config: crate::config::PbsConfig { +// host: "0.0.0.0".parse().unwrap(), +// port: 8080, +// relay_check: true, +// wait_all_registrations: true, +// timeout_get_header_ms: 1000, +// timeout_get_payload_ms: 1000, +// timeout_register_validator_ms: 1000, +// skip_sigverify: false, +// min_bid_wei: alloy::primitives::U256::from(10000), +// relay_monitors: vec![], +// late_in_slot_time_ms: 1000, +// extra_validation_enabled: false, +// rpc_url: None, +// }, +// docker_image: "".to_string(), +// with_signer: false, +// }, +// logs: Some(settings.clone()), +// muxes: None, +// modules: None, +// signer: None, +// metrics: None, +// }; +// let config = toml::to_string(&config)?; +// std::fs::write(dir.path().join("config.toml"), config)?; +// std::env::set_var("CB_CONFIG", dir.path().join("config.toml")); +// Ok(()) +// } + +// #[test] +// fn test_initialize_tracing_log_stdout() -> Result<()> { +// let dir = tempdir()?; +// std::env::set_var("RUST_LOG", "debug"); + +// let mut settings = LogsSettings::default(); +// settings.destination = LogDest::Stdout; +// settings.format = LogFormat::Raw; +// settings.log_dir_path = dir.path().to_path_buf(); + +// setup_test_config(&dir, &settings)?; + +// let _guard = initialize_tracing_log("test")?; +// info!("test message"); +// Ok(()) +// } + +// #[test] +// fn test_initialize_tracing_log_file() -> Result<()> { +// let dir = tempdir()?; +// std::env::set_var("RUST_LOG", "debug"); + +// let mut settings = LogsSettings::default(); +// settings.destination = LogDest::File; +// settings.format = LogFormat::Raw; +// settings.log_dir_path = dir.path().to_path_buf(); + +// setup_test_config(&dir, &settings)?; + +// let _guard = initialize_tracing_log("test")?; + +// // Verify log file was created - look for files starting with "test" +// let log_files = std::fs::read_dir(&dir)? +// .filter_map(|e| e.ok()) +// .filter(|e| { +// let is_log = e +// .path() +// .file_name() +// .and_then(|n| n.to_str()) +// .map(|s| s.starts_with("test")) +// .unwrap_or(false); +// is_log +// }) +// .count(); +// assert_eq!(log_files, 1, "Expected 1 log file, found {}", log_files); +// Ok(()) +// } + +// #[test] +// fn test_initialize_tracing_log_both() -> Result<()> { +// let dir = tempdir()?; +// std::env::set_var("RUST_LOG", "debug"); + +// let mut settings = LogsSettings::default(); +// settings.destination = LogDest::Both; +// settings.format = LogFormat::Raw; +// settings.log_dir_path = dir.path().to_path_buf(); + +// setup_test_config(&dir, &settings)?; + +// let _guard = initialize_tracing_log("test")?; +// info!("test message"); + +// // Find and read the log file +// let log_file = std::fs::read_dir(&dir)? +// .filter_map(|e| e.ok()) +// .find(|e| { +// let is_log = e +// .path() +// .file_name() +// .and_then(|n| n.to_str()) +// .map(|s| s.starts_with("test")) +// .unwrap_or(false); +// is_log +// }) +// .expect("Log file not found"); + +// let contents = std::fs::read_to_string(log_file.path())?; +// println!("File contents:\n{}", contents); + +// assert!(contents.contains("log_level=INFO"), "Missing log level"); +// assert!(contents.contains("test message"), "Missing message"); +// assert!(contents.contains("method=cb_common::logging::tests"), +// "Missing method"); Ok(()) +// } + +// #[test] +// fn test_file_logging_raw_format() -> Result<()> { +// let dir = tempdir()?; +// std::env::set_var("RUST_LOG", "debug"); + +// let mut settings = LogsSettings::default(); +// settings.destination = LogDest::File; +// settings.format = LogFormat::Raw; +// settings.log_dir_path = dir.path().to_path_buf(); + +// setup_test_config(&dir, &settings)?; + +// let _guard = initialize_tracing_log("test")?; +// info!(field = "value", "test message"); + +// // Give the non-blocking writer a moment to flush +// std::thread::sleep(std::time::Duration::from_millis(50)); + +// // Find and read the log file +// let log_file = std::fs::read_dir(&dir)? +// .filter_map(|e| e.ok()) +// .find(|e| { +// let is_log = e +// .path() +// .file_name() +// .and_then(|n| n.to_str()) +// .map(|s| s.starts_with("test")) +// .unwrap_or(false); +// is_log +// }) +// .expect("Log file not found"); + +// let contents = std::fs::read_to_string(log_file.path())?; +// println!("File contents:\n{}", contents); + +// assert!(contents.contains("log_level=INFO"), "Missing log level"); +// assert!(contents.contains("\"value\""), "Missing value"); +// assert!(contents.contains("test message"), "Missing message"); +// assert!(contents.contains("method=cb_common::logging::tests"), +// "Missing method"); + +// Ok(()) +// } + +// // #[test] +// fn test_file_logging_json_format() -> Result<()> { +// let dir = tempdir()?; +// std::env::set_var("RUST_LOG", "debug"); + +// let mut settings = LogsSettings::default(); +// settings.destination = LogDest::File; +// settings.format = LogFormat::Json; +// settings.log_dir_path = dir.path().to_path_buf(); + +// setup_test_config(&dir, &settings)?; + +// let _guard = initialize_tracing_log("test")?; +// info!( +// req_id = "test-123", +// relay_id = "test-relay", +// msg = "test message", +// latency = "100ms", +// ); + +// // Give the non-blocking writer a moment to flush +// std::thread::sleep(std::time::Duration::from_millis(50)); + +// // Find and read the log file +// let log_file = std::fs::read_dir(&dir)? +// .filter_map(|e| e.ok()) +// .find(|e| { +// e.path() +// .file_name() +// .and_then(|n| n.to_str()) +// .map(|s| s.starts_with("test")) +// .unwrap_or(false) +// }) +// .expect("Log file not found"); + +// let contents = std::fs::read_to_string(log_file.path())?; +// println!("File contents:\n{}", contents); + +// let json: serde_json::Value = serde_json::from_str(&contents)?; +// assert_eq!(json["log_level"], "INFO", "Wrong log level"); +// assert_eq!(json["method"], "cb_common::logging::tests", "Wrong +// method"); assert_eq!(json["req_id"], "test-123", "Wrong req_id"); +// assert_eq!(json["relay_id"], "test-relay", "Wrong relay_id"); +// assert_eq!(json["msg"], "test message", "Wrong message"); +// assert_eq!(json["latency"], "100ms", "Wrong latency"); + +// Ok(()) +// } +// } diff --git a/crates/common/src/pbs/types/beacon_block.rs b/crates/common/src/pbs/types/beacon_block.rs index ea633155..c8ae9c2f 100644 --- a/crates/common/src/pbs/types/beacon_block.rs +++ b/crates/common/src/pbs/types/beacon_block.rs @@ -25,6 +25,13 @@ impl SignedBlindedBeaconBlock { } } + pub fn parent_hash(&self) -> B256 { + match &self.message { + BlindedBeaconBlock::Electra(b) => b.parent_root, + BlindedBeaconBlock::Deneb(b) => b.parent_root, + } + } + pub fn slot(&self) -> u64 { match &self.message { BlindedBeaconBlock::Electra(b) => b.slot, diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index 918b29d5..bdade56c 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -14,15 +14,8 @@ use reqwest::header::HeaderMap; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use ssz::{Decode, Encode}; -use tracing::Level; -use tracing_appender::{non_blocking::WorkerGuard, rolling::Rotation}; -use tracing_subscriber::{fmt::Layer, prelude::*, EnvFilter}; - -use crate::{ - config::{load_optional_env_var, LogsSettings, PBS_MODULE_NAME}, - pbs::HEADER_VERSION_VALUE, - types::Chain, -}; + +use crate::{pbs::HEADER_VERSION_VALUE, types::Chain}; const MILLIS_PER_SECOND: u64 = 1_000; @@ -171,78 +164,6 @@ pub const fn default_u256() -> U256 { U256::ZERO } -// LOGGING -pub fn initialize_tracing_log(module_id: &str) -> eyre::Result { - let settings = LogsSettings::from_env_config()?; - - // Use file logs only if setting is set - let use_file_logs = settings.is_some(); - let settings = settings.unwrap_or_default(); - - // Log level for stdout - - let stdout_log_level = if let Some(log_level) = load_optional_env_var("RUST_LOG") { - log_level.parse::().expect("invalid RUST_LOG value") - } else { - settings.log_level.parse::().expect("invalid log_level value in settings") - }; - - let stdout_filter = format_crates_filter(Level::INFO.as_str(), stdout_log_level.as_str()); - - if use_file_logs { - // Log all events to a rolling log file. - let mut builder = - tracing_appender::rolling::Builder::new().filename_prefix(module_id.to_lowercase()); - if let Some(value) = settings.max_log_files { - builder = builder.max_log_files(value); - } - let file_appender = builder - .rotation(Rotation::DAILY) - .build(settings.log_dir_path) - .expect("failed building rolling file appender"); - - let (writer, guard) = tracing_appender::non_blocking(file_appender); - - // at least debug for file logs - let file_log_level = stdout_log_level.max(Level::DEBUG); - let file_log_filter = format_crates_filter(Level::INFO.as_str(), file_log_level.as_str()); - - let stdout_layer = - tracing_subscriber::fmt::layer().with_target(false).with_filter(stdout_filter); - - let file_layer = Layer::new() - .json() - .with_current_span(false) - .with_span_list(true) - .with_writer(writer) - .with_filter(file_log_filter); - - tracing_subscriber::registry().with(stdout_layer.and_then(file_layer)).init(); - Ok(guard) - } else { - let (writer, guard) = tracing_appender::non_blocking(std::io::stdout()); - let stdout_layer = tracing_subscriber::fmt::layer() - .with_target(false) - .with_writer(writer) - .with_filter(stdout_filter); - tracing_subscriber::registry().with(stdout_layer).init(); - Ok(guard) - } -} - -pub fn initialize_pbs_tracing_log() -> eyre::Result { - initialize_tracing_log(PBS_MODULE_NAME) -} - -// all commit boost crates -// TODO: this can probably done without unwrap -fn format_crates_filter(default_level: &str, crates_level: &str) -> EnvFilter { - let s = format!( - "{default_level},cb_signer={crates_level},cb_pbs={crates_level},cb_common={crates_level},cb_metrics={crates_level}", - ); - s.parse().unwrap() -} - pub fn print_logo() { println!( r#" ______ _ __ ____ __ diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index 919bad11..82a7aca3 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -5,7 +5,7 @@ use axum::{ response::IntoResponse, }; use cb_common::{ - pbs::{BuilderEvent, GetHeaderParams}, + pbs::{BuilderEvent, GetHeaderParams, GetHeaderResponse}, utils::{get_user_agent, ms_into_slot}, }; use reqwest::StatusCode; @@ -20,6 +20,40 @@ use crate::{ state::{BuilderApiState, PbsStateGuard}, }; +fn log_get_header( + params: GetHeaderParams, + user_agent: String, + ms_into_slot: u64, + relays: Vec, + max_bid: &Option, +) { + if let Some(max_bid) = max_bid { + info!( + msg = "received header", + ua = ?user_agent, + msIntoSlot = ms_into_slot, + parentHash = %params.parent_hash, + pubkey = %max_bid.pubkey(), + slot = params.slot, + relays = ?relays, + valueEth = %format_ether(max_bid.value()), + blockHash = %max_bid.block_hash(), + blockNumber = %max_bid.block_number(), + gasLimit = %max_bid.gas_limit(), + ); + } else { + info!( + msg = "no header available for slot", + ua = ?user_agent, + msIntoSlot = ms_into_slot, + parentHash = %params.parent_hash, + pubkey = %params.pubkey, + slot = params.slot, + relays = ?relays, + ); + } +} + #[tracing::instrument(skip_all, name = "get_header", fields(req_id = %Uuid::new_v4(), slot = params.slot))] pub async fn handle_get_header>( State(state): State>, @@ -27,32 +61,28 @@ pub async fn handle_get_header>( Path(params): Path, ) -> Result { let state = state.read().clone(); - state.publish_event(BuilderEvent::GetHeaderRequest(params)); + // inputs for logging + let relays = state.config.all_relays.iter().map(|r| (*r.id).clone()).collect::>(); let ua = get_user_agent(&req_headers); let ms_into_slot = ms_into_slot(params.slot, state.config.chain); - info!(ua, parent_hash=%params.parent_hash, validator_pubkey=%params.pubkey, ms_into_slot); - - match A::get_header(params, req_headers, state.clone()).await { + match A::get_header(params, req_headers.clone(), state.clone()).await { Ok(res) => { + log_get_header(params, ua, ms_into_slot, relays, &res); state.publish_event(BuilderEvent::GetHeaderResponse(Box::new(res.clone()))); if let Some(max_bid) = res { - info!(value_eth = format_ether(max_bid.value()), block_hash =% max_bid.block_hash(), "received header"); - BEACON_NODE_STATUS.with_label_values(&["200", GET_HEADER_ENDPOINT_TAG]).inc(); Ok((StatusCode::OK, axum::Json(max_bid)).into_response()) } else { - // spec: return 204 if request is valid but no bid available - info!("no header available for slot"); - BEACON_NODE_STATUS.with_label_values(&["204", GET_HEADER_ENDPOINT_TAG]).inc(); Ok(StatusCode::NO_CONTENT.into_response()) } } Err(err) => { + log_get_header(params, ua, ms_into_slot, relays, &None); error!(%err, "no header available from relays"); let err = PbsClientError::NoPayload; diff --git a/crates/pbs/src/routes/register_validator.rs b/crates/pbs/src/routes/register_validator.rs index da644642..8c6e30d1 100644 --- a/crates/pbs/src/routes/register_validator.rs +++ b/crates/pbs/src/routes/register_validator.rs @@ -1,4 +1,4 @@ -use alloy::rpc::types::beacon::relay::ValidatorRegistration; +use alloy::{primitives::hex::encode_prefixed, rpc::types::beacon::relay::ValidatorRegistration}; use axum::{extract::State, http::HeaderMap, response::IntoResponse, Json}; use cb_common::{pbs::BuilderEvent, utils::get_user_agent}; use reqwest::StatusCode; @@ -13,6 +13,32 @@ use crate::{ state::{BuilderApiState, PbsStateGuard}, }; +fn log_register_validator( + registrations: &[ValidatorRegistration], + user_agent: String, + success: bool, + error: Option<&str>, +) { + if success { + let pubkeys = + registrations.iter().map(|r| encode_prefixed(r.message.pubkey)).collect::>(); + + info!( + ua = ?user_agent, + num_registrations = registrations.len(), + pubkeys = ?pubkeys, + msg = "register validator successful", + ); + } else { + error!( + ua = ?user_agent, + num_registrations = registrations.len(), + error = error.unwrap_or("unknown error"), + msg = "all relays failed registration", + ); + } +} + #[tracing::instrument(skip_all, name = "register_validators", fields(req_id = %Uuid::new_v4()))] pub async fn handle_register_validator>( State(state): State>, @@ -26,21 +52,22 @@ pub async fn handle_register_validator>( let ua = get_user_agent(&req_headers); - info!(ua, num_registrations = registrations.len()); - - if let Err(err) = A::register_validator(registrations, req_headers, state.clone()).await { - state.publish_event(BuilderEvent::RegisterValidatorResponse); - error!(%err, "all relays failed registration"); - - let err = PbsClientError::NoResponse; - BEACON_NODE_STATUS - .with_label_values(&[err.status_code().as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG]) - .inc(); - Err(err) - } else { - info!("register validator successful"); + match A::register_validator(registrations.clone(), req_headers, state.clone()).await { + Ok(_) => { + log_register_validator(®istrations, ua, true, None); + state.publish_event(BuilderEvent::RegisterValidatorResponse); + BEACON_NODE_STATUS.with_label_values(&["200", REGISTER_VALIDATOR_ENDPOINT_TAG]).inc(); + Ok(StatusCode::OK) + } + Err(err) => { + log_register_validator(®istrations, ua, false, Some(&err.to_string())); + state.publish_event(BuilderEvent::RegisterValidatorResponse); - BEACON_NODE_STATUS.with_label_values(&["200", REGISTER_VALIDATOR_ENDPOINT_TAG]).inc(); - Ok(StatusCode::OK) + let err = PbsClientError::NoResponse; + BEACON_NODE_STATUS + .with_label_values(&[err.status_code().as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG]) + .inc(); + Err(err) + } } } diff --git a/crates/pbs/src/routes/reload.rs b/crates/pbs/src/routes/reload.rs index 9b984d3f..add58daa 100644 --- a/crates/pbs/src/routes/reload.rs +++ b/crates/pbs/src/routes/reload.rs @@ -11,23 +11,53 @@ use crate::{ BuilderApi, RELOAD_ENDPOINT_TAG, }; +fn log_reload( + user_agent: String, + relay_check: bool, + success: bool, + error: Option<&str>, + relays: Vec, + prev_relays: Vec, +) { + if success { + info!( + ua = ?user_agent, + relay_check = relay_check, + relays = ?relays, + prev_relays = ?prev_relays, + msg = "config reload successful", + ); + } else { + error!( + ua = ?user_agent, + relay_check = relay_check, + relays = ?relays, + prev_relays = ?prev_relays, + error = error.unwrap_or("unknown error"), + msg = "config reload failed", + ); + } +} + #[tracing::instrument(skip_all, name = "reload", fields(req_id = %Uuid::new_v4()))] pub async fn handle_reload>( req_headers: HeaderMap, State(state): State>, ) -> Result { + let prev_relays = + state.read().config.all_relays.iter().map(|r| (*r.id).clone()).collect::>(); let prev_state = state.read().clone(); - prev_state.publish_event(BuilderEvent::ReloadEvent); + // inputs for logging let ua = get_user_agent(&req_headers); - - info!(ua, relay_check = prev_state.config.pbs_config.relay_check); + let relay_check = prev_state.config.pbs_config.relay_check; + let relays = prev_state.config.all_relays.iter().map(|r| (*r.id).clone()).collect::>(); match A::reload(prev_state.clone()).await { Ok(new_state) => { + log_reload(ua, relay_check, true, None, relays, prev_relays); prev_state.publish_event(BuilderEvent::ReloadResponse); - info!("config reload successful"); *state.write() = new_state; @@ -36,6 +66,7 @@ pub async fn handle_reload>( } Err(err) => { error!(%err, "config reload failed"); + log_reload(ua, relay_check, false, Some(&err.to_string()), relays, prev_relays); let err = PbsClientError::Internal; BEACON_NODE_STATUS diff --git a/crates/pbs/src/routes/status.rs b/crates/pbs/src/routes/status.rs index b4262d1f..95124d81 100644 --- a/crates/pbs/src/routes/status.rs +++ b/crates/pbs/src/routes/status.rs @@ -12,29 +12,55 @@ use crate::{ state::{BuilderApiState, PbsStateGuard}, }; +fn log_status( + user_agent: String, + relay_check: bool, + success: bool, + error: Option<&str>, + relays: Vec, +) { + if success { + info!( + ua = ?user_agent, + relay_check = relay_check, + relays = ?relays, + msg = "relay check successful", + ); + } else { + error!( + ua = ?user_agent, + relay_check = relay_check, + relays = ?relays, + error = error.unwrap_or("unknown error"), + msg = "all relays failed get_status", + ); + } +} + #[tracing::instrument(skip_all, name = "status", fields(req_id = %Uuid::new_v4()))] pub async fn handle_get_status>( req_headers: HeaderMap, State(state): State>, ) -> Result { let state = state.read().clone(); - state.publish_event(BuilderEvent::GetStatusEvent); + // inputs for logging let ua = get_user_agent(&req_headers); - - info!(ua, relay_check = state.config.pbs_config.relay_check); + let relay_check = state.config.pbs_config.relay_check; + let relays = state.config.all_relays.iter().map(|r| (*r.id).clone()).collect::>(); match A::get_status(req_headers, state.clone()).await { Ok(_) => { + log_status(ua, relay_check, true, None, relays); state.publish_event(BuilderEvent::GetStatusResponse); - info!("relay check successful"); BEACON_NODE_STATUS.with_label_values(&["200", STATUS_ENDPOINT_TAG]).inc(); Ok(StatusCode::OK) } Err(err) => { error!(%err, "all relays failed get_status"); + log_status(ua, relay_check, false, Some(&err.to_string()), relays); let err = PbsClientError::NoResponse; BEACON_NODE_STATUS diff --git a/crates/pbs/src/routes/submit_block.rs b/crates/pbs/src/routes/submit_block.rs index 760f17ff..5c2deab1 100644 --- a/crates/pbs/src/routes/submit_block.rs +++ b/crates/pbs/src/routes/submit_block.rs @@ -15,6 +15,26 @@ use crate::{ state::{BuilderApiState, PbsStateGuard}, }; +fn log_submit_block( + user_agent: String, + ms_into_slot: u64, + block_hash: alloy::primitives::B256, + slot: u64, + parent_hash: alloy::primitives::B256, + success: bool, + relays: Vec, +) { + info!( + msg = if success { "received unblinded block" } else { "failed to receive unblinded block" }, + ua = ?user_agent, + msIntoSlot = ms_into_slot, + slot = slot, + parentHash = %parent_hash, + blockHash = %block_hash, + relays = ?relays, + ); +} + #[tracing::instrument(skip_all, name = "submit_blinded_block", fields(req_id = %Uuid::new_v4(), slot = signed_blinded_block.slot()))] pub async fn handle_submit_block>( State(state): State>, @@ -26,19 +46,21 @@ pub async fn handle_submit_block>( trace!(?signed_blinded_block); state.publish_event(BuilderEvent::SubmitBlockRequest(Box::new(signed_blinded_block.clone()))); + // inputs for logging let now = utcnow_ms(); let slot = signed_blinded_block.slot(); let block_hash = signed_blinded_block.block_hash(); let slot_start_ms = timestamp_of_slot_start_millis(slot, state.config.chain); let ua = get_user_agent(&req_headers); - - info!(ua, ms_into_slot=now.saturating_sub(slot_start_ms), %block_hash); + let ms_into_slot = now.saturating_sub(slot_start_ms); + let relays = state.config.all_relays.iter().map(|r| (*r.id).clone()).collect::>(); + let parent_hash = signed_blinded_block.parent_hash(); match A::submit_block(signed_blinded_block, req_headers, state.clone()).await { Ok(res) => { trace!(?res); + log_submit_block(ua, ms_into_slot, block_hash, slot, parent_hash, true, relays); state.publish_event(BuilderEvent::SubmitBlockResponse(Box::new(res.clone()))); - info!("received unblinded block"); BEACON_NODE_STATUS.with_label_values(&["200", SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG]).inc(); Ok((StatusCode::OK, Json(res).into_response())) @@ -46,6 +68,7 @@ pub async fn handle_submit_block>( Err(err) => { error!(%err, %block_hash, "CRITICAL: no payload received from relays. Check previous logs or use the Relay Data API"); + log_submit_block(ua, ms_into_slot, block_hash, slot, parent_hash, false, relays); state.publish_event(BuilderEvent::MissedPayload { block_hash }); let err = PbsClientError::NoPayload; diff --git a/examples/status_api/src/main.rs b/examples/status_api/src/main.rs index 973348bc..44523809 100644 --- a/examples/status_api/src/main.rs +++ b/examples/status_api/src/main.rs @@ -91,7 +91,7 @@ async fn main() -> Result<()> { let (pbs_config, extra) = load_pbs_custom_config::().await?; let chain = pbs_config.chain; - let _guard = initialize_pbs_tracing_log()?; + let _guard = initialize_tracing_log(PBS_MODULE_NAME); let custom_state = MyBuilderState::from_config(extra); let state = PbsState::new(pbs_config).with_data(custom_state);