diff --git a/Cargo.lock b/Cargo.lock index 889fddf1b7..3bf6b8d8d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5349,6 +5349,7 @@ dependencies = [ "fvm_shared", "gcra", "ipc-api", + "ipc-observability", "ipc_ipld_resolver", "lazy_static", "libipld", diff --git a/docs/fendermint/observability.md b/docs/fendermint/observability.md index c962124695..03001e8e17 100644 --- a/docs/fendermint/observability.md +++ b/docs/fendermint/observability.md @@ -49,6 +49,29 @@ This is achieved through the use of the `ipc-observability` crate/library, which - `ipc_topdown_parent_finality_voting_quorum_height` (IntGauge): Sets the height of the parent finality quorum. - `ipc_topdown_parent_finality_voting_quorum_weight` (IntGauge): Sets the weight of the parent finality quorum. - `ipc_topdown_parent_finality_committed_height` (IntGauge): Sets the height of the committed parent finality. +- `ipld_resolver_ping_rtt` (Histogram): Records a ping roundtrip time. +- `ipld_resolver_ping_timeouts` (IntCounter): Incremented when a ping timed out. +- `ipld_resolver_ping_failure` (IntCounter): Incremented when a ping failed. +- `ipld_resolver_ping_success` (IntCounter): Incremented when a ping succeeded. +- `ipld_resolver_identify_failure` (IntCounter): Incremented when an identify failed. +- `ipld_resolver_identify_received` (IntCounter): Incremented when an identify info received. +- `ipld_resolver_discovery_background_lookup` (IntCounter): Incremented when a discovery background lookup started. +- `ipld_resolver_discovery_connected_peers` (IntGauge): Sets the number of discovery connected peers. +- `ipld_resolver_membership_skipped_peers` (IntCounter): Incremented when a membership provider skipped. +- `ipld_resolver_membership_routable_peers` (IntGauge): Sets the number of routable peers. +- `ipld_resolver_membership_provider_peers` (IntGauge): Sets the number of unique peers. +- `ipld_resolver_membership_unknown_topic` (IntCounter): Incremented when a membership of unknown topic received. +- `ipld_resolver_membership_invalid_message` (IntCounter): Incremented when a membership with invalid message received. +- `ipld_resolver_membership_publish_total` (IntCounter): Incremented when a membership published. +- `ipld_resolver_membership_publish_failure` (IntCounter): Incremented when a membership publish failed. +- `ipld_resolver_content_resolve_running` (IntGauge): Sets the number currently running content resolutions. +- `ipld_resolver_content_resolve_no_peers` (IntCounter): Incremented when a resolution had no peer. +- `ipld_resolver_content_resolve_success` (IntCounter): Incremented when a resolution succeeded. +- `ipld_resolver_content_resolve_failure` (IntCounter): Incremented when a resolution failed. +- `ipld_resolver_content_resolve_fallback` (IntCounter): Incremented when a resolution had a fallback. +- `ipld_resolver_content_resolve_peers` (Histogram): Records the number of peers found for a resolution from a subnet. +- `ipld_resolver_content_connected_peers` (Histogram): Records the number connected peers in a resolution. +- `ipld_resolver_content_rate_limited` (IntCounter): Incremented when a resolution was rate limited. - `ipc_tracing_errors` (IntCounterVec): Increments the count of tracing errors for the affected event. ## Events and corresponding metrics @@ -290,6 +313,76 @@ Represents the commitment of parent finality. - `ipc_topdown_parent_finality_committed_height` +### PingEvent + +**Variants and affected metrics:** + +- `Success(PeerId, Duration)`: `ipld_resolver_ping_rtt`,`ipld_resolver_ping_success` + +### PingFailureEvent + +**Variants and affected metrics:** + +- `Timeout(PeerId)`: `ipld_resolver_ping_timeouts` +- `Failure(PeerId, Duration)`: `ipld_resolver_ping_failure` + +### IdentifyEvent + +**Variants and affected metrics:** + +- `Received(PeerId)`: `ipld_resolver_identify_received` + +### IdentifyFailureEvent + +**Variants and affected metrics:** + +- `Failure(PeerId, String)`: `ipld_resolver_identify_failure` + +### DiscoveryEvent + +**Variants and affected metrics:** + +- `BackgroundLookup(PeerId)`: `ipld_resolver_discovery_background_lookup` +- `ConnectionEstablished(PeerId)`: `ipld_resolver_discovery_connected_peers` +- `ConnectionClosed(PeerId)`: `ipld_resolver_discovery_connected_peers` + +### MembershipEvent + +**Variants and affected metrics:** + +- `Added(PeerId)`: `ipld_resolver_membership_provider_peers` +- `Removed(PeerId)`: `ipld_resolver_membership_provider_peers` +- `Skipped(PeerId)`: `ipld_resolver_membership_skipped_peers` +- `PublishSuccess`: `ipld_resolver_membership_publish_total` +- `RoutablePeers(i64)`: `ipld_resolver_membership_routable_peers` + +### MembershipFailureEvent + +**Variants and affected metrics:** + +- `PublishFailure(String)`: `ipld_resolver_membership_publish_failure` +- `GossipInvalidProviderRecord(Option, String)`: `ipld_resolver_membership_invalid_message` +- `GossipInvalidVoteRecord(Option, String)`: `ipld_resolver_membership_invalid_message` +- `GossipUnknownTopic(Option, TopicHash)`: `ipld_resolver_membership_unknown_topic` + +### ResolveEvent + +**Variants and affected metrics:** + +- `Started(Cid)`: `ipld_resolver_content_resolve_running` +- `Success(Cid)`: `ipld_resolver_content_resolve_success` +- `Completed`: `ipld_resolver_content_resolve_running` +- `Peers(usize)`: `ipld_resolver_content_resolve_peers` +- `NoPeers`: `ipld_resolver_content_resolve_no_peers` +- `ConnectedPeers(usize)`: `ipld_resolver_content_connected_peers` + +### ResolveFailureEvent + +**Variants and affected metrics:** + +- `Failure(Cid)`: `ipld_resolver_content_resolve_failure` +- `Fallback(Cid)`: `ipld_resolver_content_resolve_fallback` + ### TracingError **Description:** diff --git a/ipld/resolver/Cargo.toml b/ipld/resolver/Cargo.toml index 00565d3951..8c5e8a69ce 100644 --- a/ipld/resolver/Cargo.toml +++ b/ipld/resolver/Cargo.toml @@ -34,6 +34,7 @@ fvm_shared = { workspace = true } fvm_ipld_blockstore = { workspace = true, optional = true } ipc-api = { path = "../../ipc/api", default-features = false } +ipc-observability = { workspace = true } [dev-dependencies] cid = { workspace = true } diff --git a/ipld/resolver/src/behaviour/content.rs b/ipld/resolver/src/behaviour/content.rs index 115c35b6e6..cd6bf12543 100644 --- a/ipld/resolver/src/behaviour/content.rs +++ b/ipld/resolver/src/behaviour/content.rs @@ -7,6 +7,11 @@ use std::{ time::Duration, }; +use crate::{ + limiter::{RateLimit, RateLimiter}, + observe, +}; +use ipc_observability::emit; use libipld::{store::StoreParams, Cid}; use libp2p::{ core::{ConnectedPoint, Endpoint}, @@ -22,11 +27,6 @@ use libp2p_bitswap::{Bitswap, BitswapConfig, BitswapEvent, BitswapResponse, Bits use log::debug; use prometheus::Registry; -use crate::{ - limiter::{RateLimit, RateLimiter}, - stats, -}; - pub type QueryId = libp2p_bitswap::QueryId; // Not much to do here, just hiding the `Progress` event as I don't think we'll need it. @@ -140,7 +140,7 @@ impl Behaviour

{ /// will initiate connections to the peers which aren't connected at the moment. pub fn resolve(&mut self, cid: Cid, peers: Vec) -> QueryId { debug!("resolving {cid} from {peers:?}"); - stats::CONTENT_RESOLVE_RUNNING.inc(); + emit(observe::ResolveEvent::Started(cid)); // Not passing any missing items, which will result in a call to `BitswapStore::missing_blocks`. self.inner.sync(cid, peers, [].into_iter()) } @@ -334,7 +334,7 @@ impl NetworkBehaviour for Behaviour

{ ToSwarm::GenerateEvent(ev) => match ev { BitswapEvent::Progress(_, _) => {} BitswapEvent::Complete(id, result) => { - stats::CONTENT_RESOLVE_RUNNING.dec(); + emit(observe::ResolveEvent::Completed); let out = Event::Complete(id, result); return Poll::Ready(ToSwarm::GenerateEvent(out)); } diff --git a/ipld/resolver/src/behaviour/discovery.rs b/ipld/resolver/src/behaviour/discovery.rs index 88e6bbe9a9..a5f2d2efac 100644 --- a/ipld/resolver/src/behaviour/discovery.rs +++ b/ipld/resolver/src/behaviour/discovery.rs @@ -8,6 +8,9 @@ use std::{ time::Duration, }; +use super::NetworkConfig; +use crate::observe; +use ipc_observability::emit; use libp2p::{ core::Endpoint, identify::Info, @@ -23,11 +26,6 @@ use libp2p::{ }; use log::{debug, warn}; use tokio::time::Interval; - -use crate::stats; - -use super::NetworkConfig; - // NOTE: The Discovery behaviour is largely based on what exists in Forest. If it ain't broken... // NOTE: Not sure if emitting events is going to be useful yet, but for now it's an example of having one. @@ -178,7 +176,7 @@ impl Behaviour { pub fn background_lookup(&mut self, peer_id: PeerId) { if self.addresses_of_peer(peer_id).is_empty() { if let Some(kademlia) = self.inner.as_mut() { - stats::DISCOVERY_BACKGROUND_LOOKUP.inc(); + emit(observe::DiscoveryEvent::BackgroundLookup(peer_id)); kademlia.get_closest_peers(peer_id); } } @@ -241,13 +239,13 @@ impl NetworkBehaviour for Behaviour { match &event { FromSwarm::ConnectionEstablished(e) => { if e.other_established == 0 { - stats::DISCOVERY_CONNECTED_PEERS.inc(); + emit(observe::DiscoveryEvent::ConnectionEstablished(e.peer_id)); self.num_connections += 1; } } FromSwarm::ConnectionClosed(e) => { if e.remaining_established == 0 { - stats::DISCOVERY_CONNECTED_PEERS.dec(); + emit(observe::DiscoveryEvent::ConnectionClosed(e.peer_id)); self.num_connections -= 1; } } diff --git a/ipld/resolver/src/behaviour/membership.rs b/ipld/resolver/src/behaviour/membership.rs index ac1ebd2497..ae3d5c9558 100644 --- a/ipld/resolver/src/behaviour/membership.rs +++ b/ipld/resolver/src/behaviour/membership.rs @@ -5,8 +5,16 @@ use std::marker::PhantomData; use std::task::{Context, Poll}; use std::time::Duration; +use super::NetworkConfig; +use crate::hash::blake2b_256; +use crate::observe; +use crate::provider_cache::{ProviderDelta, SubnetProviderCache}; +use crate::provider_record::{ProviderRecord, SignedProviderRecord}; +use crate::vote_record::{SignedVoteRecord, VoteRecord}; +use crate::Timestamp; use anyhow::anyhow; use ipc_api::subnet_id::SubnetID; +use ipc_observability::emit; use libp2p::core::Endpoint; use libp2p::gossipsub::{ self, IdentTopic, MessageAuthenticity, MessageId, PublishError, Sha256Topic, SubscriptionError, @@ -24,14 +32,6 @@ use serde::de::DeserializeOwned; use serde::Serialize; use tokio::time::{Instant, Interval}; -use crate::hash::blake2b_256; -use crate::provider_cache::{ProviderDelta, SubnetProviderCache}; -use crate::provider_record::{ProviderRecord, SignedProviderRecord}; -use crate::vote_record::{SignedVoteRecord, VoteRecord}; -use crate::{stats, Timestamp}; - -use super::NetworkConfig; - /// `Gossipsub` topic identifier for subnet membership. const PUBSUB_MEMBERSHIP: &str = "/ipc/membership"; /// `Gossipsub` topic identifier for voting about content. @@ -326,11 +326,13 @@ where ); match self.inner.publish(self.membership_topic.clone(), data) { Err(e) => { - stats::MEMBERSHIP_PUBLISH_FAILURE.inc(); + emit(observe::MembershipFailureEvent::PublishFailure( + e.to_string(), + )); Err(anyhow!(e)) } Ok(_msg_id) => { - stats::MEMBERSHIP_PUBLISH_SUCCESS.inc(); + emit(observe::MembershipEvent::PublishSuccess); self.last_publish_timestamp = Timestamp::now(); self.next_publish_timestamp = self.last_publish_timestamp + self.publish_interval.period(); @@ -346,11 +348,13 @@ where let data = vote.into_envelope().into_protobuf_encoding(); match self.inner.publish(topic, data) { Err(e) => { - stats::MEMBERSHIP_PUBLISH_FAILURE.inc(); + emit(observe::MembershipFailureEvent::PublishFailure( + e.to_string(), + )); Err(anyhow!(e)) } Ok(_msg_id) => { - stats::MEMBERSHIP_PUBLISH_SUCCESS.inc(); + emit(observe::MembershipEvent::PublishSuccess); Ok(()) } } @@ -363,11 +367,13 @@ where let topic = self.preemptive_topic(&subnet_id); match self.inner.publish(topic, data) { Err(e) => { - stats::MEMBERSHIP_PUBLISH_FAILURE.inc(); + emit(observe::MembershipFailureEvent::PublishFailure( + e.to_string(), + )); Err(anyhow!(e)) } Ok(_msg_id) => { - stats::MEMBERSHIP_PUBLISH_SUCCESS.inc(); + emit(observe::MembershipEvent::PublishSuccess); Ok(()) } } @@ -378,8 +384,9 @@ where /// Call this method when the discovery service learns the address of a peer. pub fn set_routable(&mut self, peer_id: PeerId) { self.provider_cache.set_routable(peer_id); - stats::MEMBERSHIP_ROUTABLE_PEERS - .set(self.provider_cache.num_routable().try_into().unwrap()); + + let num_routable = self.provider_cache.num_routable().try_into().unwrap(); + emit(observe::MembershipEvent::RoutablePeers(num_routable)); self.publish_for_new_peer(peer_id); } @@ -406,33 +413,27 @@ where if msg.topic == self.membership_topic.hash() { match SignedProviderRecord::from_bytes(&msg.data).map(|r| r.into_record()) { Ok(record) => self.handle_provider_record(record), - Err(e) => { - stats::MEMBERSHIP_INVALID_MESSAGE.inc(); - warn!( - "Gossip message from peer {:?} could not be deserialized as ProviderRecord: {e}", - msg.source - ); - } + Err(e) => emit( + observe::MembershipFailureEvent::GossipInvalidProviderRecord( + msg.source, + e.to_string(), + ), + ), } } else if self.voting_topics.contains(&msg.topic) { match SignedVoteRecord::from_bytes(&msg.data).map(|r| r.into_record()) { Ok(record) => self.handle_vote_record(record), - Err(e) => { - stats::MEMBERSHIP_INVALID_MESSAGE.inc(); - warn!( - "Gossip message from peer {:?} could not be deserialized as VoteRecord: {e}", - msg.source - ); - } + Err(e) => emit(observe::MembershipFailureEvent::GossipInvalidVoteRecord( + msg.source, + e.to_string(), + )), } } else if let Some(subnet_id) = self.preemptive_topics.get(&msg.topic) { self.handle_preemptive_data(subnet_id.clone(), msg.data) } else { - stats::MEMBERSHIP_UNKNOWN_TOPIC.inc(); - warn!( - "unknown gossipsub topic in message from {:?}: {}", - msg.source, msg.topic - ); + emit(observe::MembershipFailureEvent::GossipUnknownTopic( + msg.source, msg.topic, + )); } } @@ -444,7 +445,7 @@ where debug!("received provider record: {record:?}"); let (event, publish) = match self.provider_cache.add_provider(&record) { None => { - stats::MEMBERSHIP_SKIPPED_PEERS.inc(); + emit(observe::MembershipEvent::Skipped(record.peer_id)); (Some(Event::Skipped(record.peer_id)), false) } Some(d) if d.is_empty() && !d.is_new => (None, false), @@ -459,7 +460,7 @@ where } if publish { - stats::MEMBERSHIP_PROVIDER_PEERS.inc(); + emit(observe::MembershipEvent::Added(record.peer_id)); self.publish_for_new_peer(record.peer_id) } } @@ -514,7 +515,7 @@ where let cutoff_timestamp = Timestamp::now() - self.max_provider_age; let pruned = self.provider_cache.prune_providers(cutoff_timestamp); for peer_id in pruned { - stats::MEMBERSHIP_PROVIDER_PEERS.dec(); + emit(observe::MembershipEvent::Removed(peer_id)); self.outbox.push_back(Event::Removed(peer_id)) } } diff --git a/ipld/resolver/src/lib.rs b/ipld/resolver/src/lib.rs index d2a14cb0bd..3d54127b37 100644 --- a/ipld/resolver/src/lib.rs +++ b/ipld/resolver/src/lib.rs @@ -4,8 +4,8 @@ mod behaviour; mod client; mod hash; mod limiter; +mod observe; mod service; -mod stats; mod timestamp; mod provider_cache; diff --git a/ipld/resolver/src/observe.rs b/ipld/resolver/src/observe.rs new file mode 100644 index 0000000000..20486aa35f --- /dev/null +++ b/ipld/resolver/src/observe.rs @@ -0,0 +1,458 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: MIT +use ipc_observability::{ + impl_traceable, impl_traceables, register_metrics, Recordable, TraceLevel, Traceable, +}; +use lazy_static::lazy_static; +use libipld::cid::Cid; +use libp2p::gossipsub::TopicHash; +use libp2p::PeerId; +use prometheus::{ + register_histogram, register_int_counter, register_int_gauge, Histogram, IntCounter, IntGauge, + Registry, +}; +use std::fmt; +use std::time::Duration; + +register_metrics! { + IPLD_RESOLVER_PING_RTT: Histogram = + register_histogram!("ipld_resolver_ping_rtt", "Ping roundtrip time"); + + IPLD_RESOLVER_PING_TIMEOUT: IntCounter = + register_int_counter!("ipld_resolver_ping_timeouts", "Number of timed out pings"); + + IPLD_RESOLVER_PING_FAILURE: IntCounter = + register_int_counter!("ipld_resolver_ping_failure", "Number of failed pings"); + + IPLD_RESOLVER_PING_SUCCESS: IntCounter = + register_int_counter!("ipld_resolver_ping_success", "Number of successful pings"); + + IPLD_RESOLVER_IDENTIFY_FAILURE: IntCounter = + register_int_counter!("ipld_resolver_identify_failure", "Number of Identify errors"); + + IPLD_RESOLVER_IDENTIFY_RECEIVED: IntCounter = + register_int_counter!("ipld_resolver_identify_received", "Number of Identify infos received"); + + IPLD_RESOLVER_DISCOVERY_BACKGROUND_LOOKUP: IntCounter = + register_int_counter!("ipld_resolver_discovery_background_lookup", "Number of background lookups started"); + + IPLD_RESOLVER_DISCOVERY_CONNECTED_PEERS: IntGauge = + register_int_gauge!("ipld_resolver_discovery_connected_peers", "Number of connections"); + + IPLD_RESOLVER_MEMBERSHIP_SKIPPED_PEERS: IntCounter = + register_int_counter!("ipld_resolver_membership_skipped_peers", "Number of providers skipped"); + + IPLD_RESOLVER_MEMBERSHIP_ROUTABLE_PEERS: IntGauge = + register_int_gauge!("ipld_resolver_membership_routable_peers", "Number of routable peers"); + + IPLD_RESOLVER_MEMBERSHIP_PROVIDER_PEERS: IntGauge = + register_int_gauge!("ipld_resolver_membership_provider_peers", "Number of unique providers"); + + IPLD_RESOLVER_MEMBERSHIP_UNKNOWN_TOPIC: IntCounter = + register_int_counter!("ipld_resolver_membership_unknown_topic", "Number of messages with unknown topic"); + + IPLD_RESOLVER_MEMBERSHIP_INVALID_MESSAGE: IntCounter = + register_int_counter!("ipld_resolver_membership_invalid_message", "Number of invalid messages received"); + + IPLD_RESOLVER_MEMBERSHIP_PUBLISH_SUCCESS: IntCounter = + register_int_counter!("ipld_resolver_membership_publish_total", "Number of published messages"); + + IPLD_RESOLVER_MEMBERSHIP_PUBLISH_FAILURE: IntCounter = + register_int_counter!("ipld_resolver_membership_publish_failure", "Number of failed publish attempts"); + + IPLD_RESOLVER_CONTENT_RESOLVE_RUNNING: IntGauge = + register_int_gauge!("ipld_resolver_content_resolve_running", "Number of currently running content resolutions"); + + IPLD_RESOLVER_CONTENT_RESOLVE_NO_PEERS: IntCounter = + register_int_counter!("ipld_resolver_content_resolve_no_peers", "Number of resolutions with no known peers"); + + IPLD_RESOLVER_CONTENT_RESOLVE_SUCCESS: IntCounter = + register_int_counter!("ipld_resolver_content_resolve_success", "Number of successful resolutions"); + + IPLD_RESOLVER_CONTENT_RESOLVE_FAILURE: IntCounter = + register_int_counter!("ipld_resolver_content_resolve_failure", "Number of failed resolutions"); + + IPLD_RESOLVER_CONTENT_RESOLVE_FALLBACK: IntCounter = + register_int_counter!("ipld_resolver_content_resolve_fallback", "Number of resolutions that fall back on secondary peers"); + + IPLD_RESOLVER_CONTENT_RESOLVE_PEERS: Histogram = + register_histogram!("ipld_resolver_content_resolve_peers", "Number of peers found for resolution from a subnet"); + + IPLD_RESOLVER_CONTENT_CONNECTED_PEERS: Histogram = + register_histogram!("ipld_resolver_content_connected_peers", "Number of connected peers in a resolution"); + + IPLD_RESOLVER_CONTENT_RATE_LIMITED: IntCounter = + register_int_counter!("ipld_resolver_content_rate_limited", "Number of rate limited requests"); +} + +const DOMAIN: &str = "IPLD"; + +impl_traceables!(TraceLevel::Info, DOMAIN, PingEvent); +impl_traceables!(TraceLevel::Warn, DOMAIN, PingFailureEvent); +impl_traceables!(TraceLevel::Info, DOMAIN, IdentifyEvent); +impl_traceables!(TraceLevel::Warn, DOMAIN, IdentifyFailureEvent); +impl_traceables!(TraceLevel::Info, DOMAIN, DiscoveryEvent); +impl_traceables!(TraceLevel::Info, DOMAIN, MembershipEvent); +impl_traceables!(TraceLevel::Warn, DOMAIN, MembershipFailureEvent); +impl_traceables!(TraceLevel::Info, DOMAIN, ResolveEvent); +impl_traceables!(TraceLevel::Warn, DOMAIN, ResolveFailureEvent); + +#[allow(dead_code)] +pub enum PingEvent { + Success(PeerId, Duration), +} + +impl Recordable for PingEvent { + fn record_metrics(&self) { + match self { + Self::Success(_, rtt) => { + IPLD_RESOLVER_PING_SUCCESS.inc(); + IPLD_RESOLVER_PING_RTT.observe(rtt.as_millis() as f64); + } + } + } +} +impl fmt::Debug for PingEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + PingEvent::Success(peer_id, duration) => { + write!(f, "Ping::Success({:?}, {:?})", peer_id, duration) + } + } + } +} + +#[allow(dead_code)] +pub enum PingFailureEvent { + Timeout(PeerId), + Failure(PeerId, String), +} + +impl Recordable for PingFailureEvent { + fn record_metrics(&self) { + match self { + Self::Failure(_, _) => IPLD_RESOLVER_PING_FAILURE.inc(), + Self::Timeout(_) => IPLD_RESOLVER_PING_TIMEOUT.inc(), + } + } +} + +impl fmt::Debug for PingFailureEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + PingFailureEvent::Timeout(peer_id) => { + write!(f, "Ping::Timeout({:?})", peer_id) + } + PingFailureEvent::Failure(peer_id, reason) => { + write!(f, "Ping::Failure({:?}, {:?})", peer_id, reason) + } + } + } +} + +#[allow(dead_code)] +pub enum IdentifyEvent { + Received(PeerId), +} + +impl Recordable for IdentifyEvent { + fn record_metrics(&self) { + match self { + Self::Received(_) => IPLD_RESOLVER_IDENTIFY_RECEIVED.inc(), + } + } +} + +impl fmt::Debug for IdentifyEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + IdentifyEvent::Received(peer_id) => { + write!(f, "Identify::Received({:?})", peer_id) + } + } + } +} + +#[allow(dead_code)] +pub enum IdentifyFailureEvent { + Failure(PeerId, String), +} + +impl Recordable for IdentifyFailureEvent { + fn record_metrics(&self) { + match self { + Self::Failure(_, _) => IPLD_RESOLVER_IDENTIFY_FAILURE.inc(), + } + } +} + +impl fmt::Debug for IdentifyFailureEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + IdentifyFailureEvent::Failure(peer_id, reason) => { + write!(f, "Identify::Failure({:?}, {:?})", peer_id, reason) + } + } + } +} + +#[allow(dead_code)] +pub enum DiscoveryEvent { + BackgroundLookup(PeerId), + ConnectionEstablished(PeerId), + ConnectionClosed(PeerId), +} + +impl Recordable for DiscoveryEvent { + fn record_metrics(&self) { + match self { + Self::BackgroundLookup(_) => IPLD_RESOLVER_DISCOVERY_BACKGROUND_LOOKUP.inc(), + Self::ConnectionEstablished(_) => IPLD_RESOLVER_DISCOVERY_CONNECTED_PEERS.inc(), + Self::ConnectionClosed(_) => IPLD_RESOLVER_DISCOVERY_CONNECTED_PEERS.dec(), + } + } +} + +impl fmt::Debug for DiscoveryEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + DiscoveryEvent::BackgroundLookup(peer_id) => { + write!(f, "Discovery::BackgroundLookup({:?})", peer_id) + } + DiscoveryEvent::ConnectionEstablished(peer_id) => { + write!(f, "Discovery::ConnectionEstablished({:?})", peer_id) + } + DiscoveryEvent::ConnectionClosed(peer_id) => { + write!(f, "Discovery::ConnectionClosed({:?})", peer_id) + } + } + } +} + +#[allow(dead_code)] +pub enum MembershipEvent { + Added(PeerId), + Removed(PeerId), + Skipped(PeerId), + PublishSuccess, + RoutablePeers(i64), +} + +impl Recordable for MembershipEvent { + fn record_metrics(&self) { + match self { + Self::Added(_) => IPLD_RESOLVER_MEMBERSHIP_PROVIDER_PEERS.inc(), + Self::Removed(_) => IPLD_RESOLVER_MEMBERSHIP_PROVIDER_PEERS.dec(), + Self::Skipped(_) => IPLD_RESOLVER_MEMBERSHIP_SKIPPED_PEERS.inc(), + Self::PublishSuccess => IPLD_RESOLVER_MEMBERSHIP_PUBLISH_SUCCESS.inc(), + Self::RoutablePeers(num_routable) => { + IPLD_RESOLVER_MEMBERSHIP_ROUTABLE_PEERS.set(*num_routable) + } + } + } +} + +impl fmt::Debug for MembershipEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + MembershipEvent::Added(peer_id) => { + write!(f, "Membership::Added({:?})", peer_id) + } + MembershipEvent::Removed(peer_id) => { + write!(f, "Membership::Removed({:?})", peer_id) + } + MembershipEvent::Skipped(peer_id) => { + write!(f, "Membership::Skipped({:?})", peer_id) + } + MembershipEvent::PublishSuccess => { + write!(f, "Membership::PublishSuccess") + } + MembershipEvent::RoutablePeers(count) => { + write!(f, "Membership::RoutablePeers({:?})", count) + } + } + } +} + +#[allow(dead_code)] +pub enum MembershipFailureEvent { + PublishFailure(String), + GossipInvalidProviderRecord(Option, String), + GossipInvalidVoteRecord(Option, String), + GossipUnknownTopic(Option, TopicHash), +} + +impl Recordable for MembershipFailureEvent { + fn record_metrics(&self) { + match self { + Self::PublishFailure(_) => IPLD_RESOLVER_MEMBERSHIP_PUBLISH_FAILURE.inc(), + Self::GossipInvalidProviderRecord(_, _) => { + IPLD_RESOLVER_MEMBERSHIP_INVALID_MESSAGE.inc() + } + Self::GossipInvalidVoteRecord(_, _) => IPLD_RESOLVER_MEMBERSHIP_INVALID_MESSAGE.inc(), + Self::GossipUnknownTopic(_, _) => IPLD_RESOLVER_MEMBERSHIP_UNKNOWN_TOPIC.inc(), + } + } +} + +impl fmt::Debug for MembershipFailureEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + MembershipFailureEvent::PublishFailure(reason) => { + write!(f, "Membership::PublishFailure({:?})", reason) + } + MembershipFailureEvent::GossipInvalidProviderRecord(peer_id, record) => { + write!( + f, + "Membership::GossipInvalidProviderRecord({:?}, {:?})", + peer_id, record + ) + } + MembershipFailureEvent::GossipInvalidVoteRecord(peer_id, record) => { + write!( + f, + "Membership::GossipInvalidVoteRecord({:?}, {:?})", + peer_id, record + ) + } + MembershipFailureEvent::GossipUnknownTopic(peer_id, topic) => { + write!( + f, + "Membership::GossipUnknownTopic({:?}, {:?})", + peer_id, topic + ) + } + } + } +} + +#[allow(dead_code)] +pub enum ResolveEvent { + Started(Cid), + Success(Cid), + Completed, + Peers(usize), + NoPeers, + ConnectedPeers(usize), +} + +impl Recordable for ResolveEvent { + fn record_metrics(&self) { + match self { + Self::Started(_) => IPLD_RESOLVER_CONTENT_RESOLVE_RUNNING.inc(), + Self::Success(_) => IPLD_RESOLVER_CONTENT_RESOLVE_SUCCESS.inc(), + Self::Completed => IPLD_RESOLVER_CONTENT_RESOLVE_RUNNING.dec(), + Self::Peers(num) => IPLD_RESOLVER_CONTENT_RESOLVE_PEERS.observe(*num as f64), + Self::NoPeers => IPLD_RESOLVER_CONTENT_RESOLVE_NO_PEERS.inc(), + Self::ConnectedPeers(num) => IPLD_RESOLVER_CONTENT_CONNECTED_PEERS.observe(*num as f64), + } + } +} + +impl fmt::Debug for ResolveEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ResolveEvent::Started(cid) => { + write!(f, "Resolve::Started({:?})", cid) + } + ResolveEvent::Success(cid) => { + write!(f, "Resolve::Success({:?})", cid) + } + ResolveEvent::Completed => { + write!(f, "Resolve::Completed") + } + ResolveEvent::Peers(count) => { + write!(f, "Resolve::Peers({:?})", count) + } + ResolveEvent::NoPeers => { + write!(f, "Resolve::NoPeers") + } + ResolveEvent::ConnectedPeers(count) => { + write!(f, "Resolve::ConnectedPeers({:?})", count) + } + } + } +} + +#[allow(dead_code)] +pub enum ResolveFailureEvent { + Failure(Cid), + Fallback(Cid), +} + +impl Recordable for ResolveFailureEvent { + fn record_metrics(&self) { + match self { + Self::Failure(_) => IPLD_RESOLVER_CONTENT_RESOLVE_FAILURE.inc(), + Self::Fallback(_) => IPLD_RESOLVER_CONTENT_RESOLVE_FALLBACK.inc(), + } + } +} + +impl fmt::Debug for ResolveFailureEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ResolveFailureEvent::Failure(cid) => { + write!(f, "Resolve::Failure({:?})", cid) + } + ResolveFailureEvent::Fallback(cid) => { + write!(f, "Resolve::Fallback({:?})", cid) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ipc_observability::emit; + + #[test] + fn test_metrics() { + let registry = Registry::new(); + register_metrics(®istry).unwrap(); + } + + #[test] + fn test_emit() { + let peer_id = PeerId::random(); + let rtt: Duration = Duration::from_millis(500); + let err_str = "err".to_string(); + let cid = Cid::default(); + + emit(PingEvent::Success(peer_id, rtt)); + emit(PingFailureEvent::Timeout(peer_id)); + emit(PingFailureEvent::Failure(peer_id, err_str.clone())); + emit(IdentifyEvent::Received(peer_id)); + emit(IdentifyFailureEvent::Failure(peer_id, err_str.clone())); + emit(DiscoveryEvent::BackgroundLookup(peer_id)); + emit(DiscoveryEvent::ConnectionEstablished(peer_id)); + emit(DiscoveryEvent::ConnectionClosed(peer_id)); + emit(MembershipEvent::Added(peer_id)); + emit(MembershipEvent::Removed(peer_id)); + emit(MembershipEvent::Skipped(peer_id)); + emit(MembershipEvent::PublishSuccess); + emit(MembershipEvent::RoutablePeers(Default::default())); + emit(MembershipFailureEvent::PublishFailure(err_str.clone())); + emit(MembershipFailureEvent::GossipInvalidProviderRecord( + Some(peer_id), + err_str.clone(), + )); + emit(MembershipFailureEvent::GossipInvalidVoteRecord( + Some(peer_id), + err_str.clone(), + )); + emit(MembershipFailureEvent::GossipUnknownTopic( + Some(peer_id), + TopicHash::from_raw("topic".to_string()), + )); + emit(ResolveEvent::Started(cid)); + emit(ResolveEvent::Success(cid)); + emit(ResolveEvent::Completed); + emit(ResolveEvent::Peers(Default::default())); + emit(ResolveEvent::NoPeers); + emit(ResolveEvent::ConnectedPeers(Default::default())); + emit(ResolveFailureEvent::Failure(cid)); + emit(ResolveFailureEvent::Fallback(cid)); + } +} diff --git a/ipld/resolver/src/service.rs b/ipld/resolver/src/service.rs index 55ca0af7b8..f5a1bb65e2 100644 --- a/ipld/resolver/src/service.rs +++ b/ipld/resolver/src/service.rs @@ -3,9 +3,17 @@ use std::collections::HashMap; use std::time::Duration; +use crate::behaviour::{ + self, content, discovery, membership, Behaviour, BehaviourEvent, ConfigError, ContentConfig, + DiscoveryConfig, MembershipConfig, NetworkConfig, +}; +use crate::client::Client; +use crate::observe; +use crate::vote_record::{SignedVoteRecord, VoteRecord}; use anyhow::anyhow; use bloom::{BloomFilter, ASMS}; use ipc_api::subnet_id::SubnetID; +use ipc_observability::emit; use libipld::store::StoreParams; use libipld::Cid; use libp2p::futures::StreamExt; @@ -18,7 +26,7 @@ use libp2p::{ use libp2p::{identify, ping}; use libp2p_bitswap::{BitswapResponse, BitswapStore}; use libp2p_mplex::MplexConfig; -use log::{debug, error, info, trace, warn}; +use log::{debug, error, info, warn}; use prometheus::Registry; use rand::seq::SliceRandom; use serde::de::DeserializeOwned; @@ -28,14 +36,6 @@ use tokio::sync::broadcast; use tokio::sync::mpsc; use tokio::sync::oneshot::{self, Sender}; -use crate::behaviour::{ - self, content, discovery, membership, Behaviour, BehaviourEvent, ConfigError, ContentConfig, - DiscoveryConfig, MembershipConfig, NetworkConfig, -}; -use crate::client::Client; -use crate::stats; -use crate::vote_record::{SignedVoteRecord, VoteRecord}; - /// Result of attempting to resolve a CID. pub type ResolveResult = anyhow::Result<()>; @@ -259,7 +259,7 @@ where /// Register Prometheus metrics. pub fn register_metrics(&mut self, registry: &Registry) -> anyhow::Result<()> { self.content_mut().register_metrics(registry)?; - stats::register_metrics(registry)?; + observe::register_metrics(registry)?; Ok(()) } @@ -308,27 +308,12 @@ where fn handle_ping_event(&mut self, event: ping::Event) { let peer_id = event.peer.to_base58(); match event.result { - Ok(rtt) => { - stats::PING_SUCCESS.inc(); - stats::PING_RTT.observe(rtt.as_millis() as f64); - trace!( - "PingSuccess::Ping rtt to {} from {} is {} ms", - peer_id, - self.peer_id, - rtt.as_millis() - ); - } - Err(ping::Failure::Timeout) => { - stats::PING_TIMEOUT.inc(); - debug!("PingFailure::Timeout from {peer_id} to {}", self.peer_id); - } - Err(ping::Failure::Other { error }) => { - stats::PING_FAILURE.inc(); - warn!( - "PingFailure::Other from {peer_id} to {}: {error}", - self.peer_id - ); - } + Ok(rtt) => emit(observe::PingEvent::Success(event.peer, rtt)), + Err(ping::Failure::Timeout) => emit(observe::PingFailureEvent::Timeout(event.peer)), + Err(ping::Failure::Other { error }) => emit(observe::PingFailureEvent::Failure( + event.peer, + error.to_string(), + )), Err(ping::Failure::Unsupported) => { warn!("Should ban peer {peer_id} due to protocol error"); // TODO: How do we ban peers in 0.53 ? @@ -340,10 +325,12 @@ where fn handle_identify_event(&mut self, event: identify::Event) { if let identify::Event::Error { peer_id, error } = event { - stats::IDENTIFY_FAILURE.inc(); - warn!("Error identifying {peer_id}: {error}") + emit(observe::IdentifyFailureEvent::Failure( + peer_id, + error.to_string(), + )); } else if let identify::Event::Received { peer_id, info } = event { - stats::IDENTIFY_RECEIVED.inc(); + emit(observe::IdentifyEvent::Received(peer_id)); debug!("protocols supported by {peer_id}: {:?}", info.protocols); debug!("adding identified address of {peer_id} to {}", self.peer_id); self.discovery_mut().add_identified(&peer_id, info); @@ -478,10 +465,10 @@ where fn start_query(&mut self, cid: Cid, subnet_id: SubnetID, response_channel: ResponseChannel) { let mut peers = self.membership_mut().providers_of_subnet(&subnet_id); - stats::CONTENT_RESOLVE_PEERS.observe(peers.len() as f64); + emit(observe::ResolveEvent::Peers(peers.len())); if peers.is_empty() { - stats::CONTENT_RESOLVE_NO_PEERS.inc(); + emit(observe::ResolveEvent::NoPeers); send_resolve_result(response_channel, Err(anyhow!(NoKnownPeers(subnet_id)))); } else { // Connect to them in a random order, so as not to overwhelm any specific peer. @@ -492,7 +479,7 @@ where .into_iter() .partition::, _>(|id| self.swarm.is_connected(id)); - stats::CONTENT_CONNECTED_PEERS.observe(connected.len() as f64); + emit(observe::ResolveEvent::ConnectedPeers(connected.len())); let peers = [connected, known].into_iter().flatten().collect(); let (peers, fallback) = self.split_peers_for_query(peers); @@ -518,15 +505,15 @@ where fn resolve_query(&mut self, mut query: Query, result: ResolveResult) { match result { Ok(_) => { - stats::CONTENT_RESOLVE_SUCCESS.inc(); + emit(observe::ResolveEvent::Success(query.cid)); send_resolve_result(query.response_channel, result) } Err(_) if query.fallback_peer_ids.is_empty() => { - stats::CONTENT_RESOLVE_FAILURE.inc(); + emit(observe::ResolveFailureEvent::Failure(query.cid)); send_resolve_result(query.response_channel, result) } Err(e) => { - stats::CONTENT_RESOLVE_FALLBACK.inc(); + emit(observe::ResolveFailureEvent::Fallback(query.cid)); debug!( "resolving {} from {} failed with {}, but there are {} fallback peers to try", query.cid, diff --git a/ipld/resolver/src/stats.rs b/ipld/resolver/src/stats.rs deleted file mode 100644 index fe9edaf90b..0000000000 --- a/ipld/resolver/src/stats.rs +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2022-2024 Protocol Labs -// SPDX-License-Identifier: MIT -use lazy_static::lazy_static; -use prometheus::{Histogram, HistogramOpts, IntCounter, IntGauge, Registry}; - -macro_rules! metrics { - ($($name:ident : $type:ty = $make:expr);* $(;)?) => { - $( - lazy_static! { - pub static ref $name: $type = $make.unwrap(); - } - )* - - pub fn register_metrics(registry: &Registry) -> anyhow::Result<()> { - $(registry.register(Box::new($name.clone()))?;)* - Ok(()) - } - }; -} - -metrics! { - PING_RTT: Histogram = - Histogram::with_opts(HistogramOpts::new("ping_rtt", "Ping roundtrip time")); - - PING_TIMEOUT: IntCounter = - IntCounter::new("ping_timeouts", "Number of timed out pings"); - - PING_FAILURE: IntCounter = - IntCounter::new("ping_failure", "Number of failed pings"); - - PING_SUCCESS: IntCounter = - IntCounter::new("ping_success", "Number of successful pings",); - - IDENTIFY_FAILURE: IntCounter = - IntCounter::new("identify_failure", "Number of Identify errors",); - - IDENTIFY_RECEIVED: IntCounter = - IntCounter::new("identify_received", "Number of Identify infos received",); - - DISCOVERY_BACKGROUND_LOOKUP: IntCounter = IntCounter::new( - "discovery_background_lookup", - "Number of background lookups started", - ); - - DISCOVERY_CONNECTED_PEERS: IntGauge = - IntGauge::new("discovery_connected_peers", "Number of connections",); - - MEMBERSHIP_SKIPPED_PEERS: IntCounter = - IntCounter::new("membership_skipped_peers", "Number of providers skipped",); - - MEMBERSHIP_ROUTABLE_PEERS: IntGauge = - IntGauge::new("membership_routable_peers", "Number of routable peers"); - - MEMBERSHIP_PROVIDER_PEERS: IntGauge = - IntGauge::new("membership_provider_peers", "Number of unique providers"); - - MEMBERSHIP_UNKNOWN_TOPIC: IntCounter = IntCounter::new( - "membership_unknown_topic", - "Number of messages with unknown topic" - ); - - MEMBERSHIP_INVALID_MESSAGE: IntCounter = IntCounter::new( - "membership_invalid_message", - "Number of invalid messages received" - ); - - MEMBERSHIP_PUBLISH_SUCCESS: IntCounter = IntCounter::new( - "membership_publish_total", "Number of published messages" - ); - - MEMBERSHIP_PUBLISH_FAILURE: IntCounter = IntCounter::new( - "membership_publish_failure", - "Number of failed publish attempts" - ); - - CONTENT_RESOLVE_RUNNING: IntGauge = IntGauge::new( - "content_resolve_running", - "Number of currently running content resolutions" - ); - - CONTENT_RESOLVE_NO_PEERS: IntCounter = IntCounter::new( - "content_resolve_no_peers", - "Number of resolutions with no known peers" - ); - - CONTENT_RESOLVE_SUCCESS: IntCounter = IntCounter::new( - "content_resolve_success", - "Number of successful resolutions" - ); - - CONTENT_RESOLVE_FAILURE: IntCounter = IntCounter::new( - "content_resolve_failure", - "Number of failed resolutions" - ); - - CONTENT_RESOLVE_FALLBACK: IntCounter = IntCounter::new( - "content_resolve_fallback", - "Number of resolutions that fall back on secondary peers" - ); - - CONTENT_RESOLVE_PEERS: Histogram = Histogram::with_opts(HistogramOpts::new( - "content_resolve_peers", - "Number of peers found for resolution from a subnet" - )); - - CONTENT_CONNECTED_PEERS: Histogram = Histogram::with_opts(HistogramOpts::new( - "content_connected_peers", - "Number of connected peers in a resolution" - )); - - CONTENT_RATE_LIMITED: IntCounter = IntCounter::new( - "content_rate_limited", - "Number of rate limited requests" - ); -}