Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 93 additions & 0 deletions docs/fendermint/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,29 @@ This is achieved through the use of the `ipc-observability` crate/library, which
- `ipc_topdown_parent_finality_voting_quorum_height` (IntGauge): Sets the height of the parent finality quorum.
- `ipc_topdown_parent_finality_voting_quorum_weight` (IntGauge): Sets the weight of the parent finality quorum.
- `ipc_topdown_parent_finality_committed_height` (IntGauge): Sets the height of the committed parent finality.
- `ipld_resolver_ping_rtt` (Histogram): Records a ping roundtrip time.
- `ipld_resolver_ping_timeouts` (IntCounter): Incremented when a ping timed out.
- `ipld_resolver_ping_failure` (IntCounter): Incremented when a ping failed.
- `ipld_resolver_ping_success` (IntCounter): Incremented when a ping succeeded.
- `ipld_resolver_identify_failure` (IntCounter): Incremented when an identify failed.
- `ipld_resolver_identify_received` (IntCounter): Incremented when an identify info received.
- `ipld_resolver_discovery_background_lookup` (IntCounter): Incremented when a discovery background lookup started.
- `ipld_resolver_discovery_connected_peers` (IntGauge): Sets the number of discovery connected peers.
- `ipld_resolver_membership_skipped_peers` (IntCounter): Incremented when a membership provider skipped.
- `ipld_resolver_membership_routable_peers` (IntGauge): Sets the number of routable peers.
- `ipld_resolver_membership_provider_peers` (IntGauge): Sets the number of unique peers.
- `ipld_resolver_membership_unknown_topic` (IntCounter): Incremented when a membership of unknown topic received.
- `ipld_resolver_membership_invalid_message` (IntCounter): Incremented when a membership with invalid message received.
- `ipld_resolver_membership_publish_total` (IntCounter): Incremented when a membership published.
- `ipld_resolver_membership_publish_failure` (IntCounter): Incremented when a membership publish failed.
- `ipld_resolver_content_resolve_running` (IntGauge): Sets the number currently running content resolutions.
- `ipld_resolver_content_resolve_no_peers` (IntCounter): Incremented when a resolution had no peer.
- `ipld_resolver_content_resolve_success` (IntCounter): Incremented when a resolution succeeded.
- `ipld_resolver_content_resolve_failure` (IntCounter): Incremented when a resolution failed.
- `ipld_resolver_content_resolve_fallback` (IntCounter): Incremented when a resolution had a fallback.
- `ipld_resolver_content_resolve_peers` (Histogram): Records the number of peers found for a resolution from a subnet.
- `ipld_resolver_content_connected_peers` (Histogram): Records the number connected peers in a resolution.
- `ipld_resolver_content_rate_limited` (IntCounter): Incremented when a resolution was rate limited.
- `ipc_tracing_errors` (IntCounterVec): Increments the count of tracing errors for the affected event.

## Events and corresponding metrics
Expand Down Expand Up @@ -290,6 +313,76 @@ Represents the commitment of parent finality.

- `ipc_topdown_parent_finality_committed_height`

### PingEvent

**Variants and affected metrics:**

- `Success(PeerId, Duration)`: `ipld_resolver_ping_rtt`,`ipld_resolver_ping_success`

### PingFailureEvent

**Variants and affected metrics:**

- `Timeout(PeerId)`: `ipld_resolver_ping_timeouts`
- `Failure(PeerId, Duration)`: `ipld_resolver_ping_failure`

### IdentifyEvent

**Variants and affected metrics:**

- `Received(PeerId)`: `ipld_resolver_identify_received`

### IdentifyFailureEvent

**Variants and affected metrics:**

- `Failure(PeerId, String)`: `ipld_resolver_identify_failure`

### DiscoveryEvent

**Variants and affected metrics:**

- `BackgroundLookup(PeerId)`: `ipld_resolver_discovery_background_lookup`
- `ConnectionEstablished(PeerId)`: `ipld_resolver_discovery_connected_peers`
- `ConnectionClosed(PeerId)`: `ipld_resolver_discovery_connected_peers`

### MembershipEvent

**Variants and affected metrics:**

- `Added(PeerId)`: `ipld_resolver_membership_provider_peers`
- `Removed(PeerId)`: `ipld_resolver_membership_provider_peers`
- `Skipped(PeerId)`: `ipld_resolver_membership_skipped_peers`
- `PublishSuccess`: `ipld_resolver_membership_publish_total`
- `RoutablePeers(i64)`: `ipld_resolver_membership_routable_peers`

### MembershipFailureEvent

**Variants and affected metrics:**

- `PublishFailure(String)`: `ipld_resolver_membership_publish_failure`
- `GossipInvalidProviderRecord(Option<PeerId>, String)`: `ipld_resolver_membership_invalid_message`
- `GossipInvalidVoteRecord(Option<PeerId>, String)`: `ipld_resolver_membership_invalid_message`
- `GossipUnknownTopic(Option<PeerId>, TopicHash)`: `ipld_resolver_membership_unknown_topic`

### ResolveEvent

**Variants and affected metrics:**

- `Started(Cid)`: `ipld_resolver_content_resolve_running`
- `Success(Cid)`: `ipld_resolver_content_resolve_success`
- `Completed`: `ipld_resolver_content_resolve_running`
- `Peers(usize)`: `ipld_resolver_content_resolve_peers`
- `NoPeers`: `ipld_resolver_content_resolve_no_peers`
- `ConnectedPeers(usize)`: `ipld_resolver_content_connected_peers`

### ResolveFailureEvent

**Variants and affected metrics:**

- `Failure(Cid)`: `ipld_resolver_content_resolve_failure`
- `Fallback(Cid)`: `ipld_resolver_content_resolve_fallback`

### TracingError

**Description:**
Expand Down
1 change: 1 addition & 0 deletions ipld/resolver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ fvm_shared = { workspace = true }
fvm_ipld_blockstore = { workspace = true, optional = true }

ipc-api = { path = "../../ipc/api", default-features = false }
ipc-observability = { workspace = true }

[dev-dependencies]
cid = { workspace = true }
Expand Down
14 changes: 7 additions & 7 deletions ipld/resolver/src/behaviour/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ use std::{
time::Duration,
};

use crate::{
limiter::{RateLimit, RateLimiter},
observe,
};
use ipc_observability::emit;
use libipld::{store::StoreParams, Cid};
use libp2p::{
core::{ConnectedPoint, Endpoint},
Expand All @@ -22,11 +27,6 @@ use libp2p_bitswap::{Bitswap, BitswapConfig, BitswapEvent, BitswapResponse, Bits
use log::debug;
use prometheus::Registry;

use crate::{
limiter::{RateLimit, RateLimiter},
stats,
};

pub type QueryId = libp2p_bitswap::QueryId;

// Not much to do here, just hiding the `Progress` event as I don't think we'll need it.
Expand Down Expand Up @@ -140,7 +140,7 @@ impl<P: StoreParams> Behaviour<P> {
/// will initiate connections to the peers which aren't connected at the moment.
pub fn resolve(&mut self, cid: Cid, peers: Vec<PeerId>) -> QueryId {
debug!("resolving {cid} from {peers:?}");
stats::CONTENT_RESOLVE_RUNNING.inc();
emit(observe::ResolveEvent::Started(cid));
// Not passing any missing items, which will result in a call to `BitswapStore::missing_blocks`.
self.inner.sync(cid, peers, [].into_iter())
}
Expand Down Expand Up @@ -334,7 +334,7 @@ impl<P: StoreParams> NetworkBehaviour for Behaviour<P> {
ToSwarm::GenerateEvent(ev) => match ev {
BitswapEvent::Progress(_, _) => {}
BitswapEvent::Complete(id, result) => {
stats::CONTENT_RESOLVE_RUNNING.dec();
emit(observe::ResolveEvent::Completed);
let out = Event::Complete(id, result);
return Poll::Ready(ToSwarm::GenerateEvent(out));
}
Expand Down
14 changes: 6 additions & 8 deletions ipld/resolver/src/behaviour/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use std::{
time::Duration,
};

use super::NetworkConfig;
use crate::observe;
use ipc_observability::emit;
use libp2p::{
core::Endpoint,
identify::Info,
Expand All @@ -23,11 +26,6 @@ use libp2p::{
};
use log::{debug, warn};
use tokio::time::Interval;

use crate::stats;

use super::NetworkConfig;

// NOTE: The Discovery behaviour is largely based on what exists in Forest. If it ain't broken...
// NOTE: Not sure if emitting events is going to be useful yet, but for now it's an example of having one.

Expand Down Expand Up @@ -178,7 +176,7 @@ impl Behaviour {
pub fn background_lookup(&mut self, peer_id: PeerId) {
if self.addresses_of_peer(peer_id).is_empty() {
if let Some(kademlia) = self.inner.as_mut() {
stats::DISCOVERY_BACKGROUND_LOOKUP.inc();
emit(observe::DiscoveryEvent::BackgroundLookup(peer_id));
kademlia.get_closest_peers(peer_id);
}
}
Expand Down Expand Up @@ -241,13 +239,13 @@ impl NetworkBehaviour for Behaviour {
match &event {
FromSwarm::ConnectionEstablished(e) => {
if e.other_established == 0 {
stats::DISCOVERY_CONNECTED_PEERS.inc();
emit(observe::DiscoveryEvent::ConnectionEstablished(e.peer_id));
self.num_connections += 1;
}
}
FromSwarm::ConnectionClosed(e) => {
if e.remaining_established == 0 {
stats::DISCOVERY_CONNECTED_PEERS.dec();
emit(observe::DiscoveryEvent::ConnectionClosed(e.peer_id));
self.num_connections -= 1;
}
}
Expand Down
77 changes: 39 additions & 38 deletions ipld/resolver/src/behaviour/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,16 @@ use std::marker::PhantomData;
use std::task::{Context, Poll};
use std::time::Duration;

use super::NetworkConfig;
use crate::hash::blake2b_256;
use crate::observe;
use crate::provider_cache::{ProviderDelta, SubnetProviderCache};
use crate::provider_record::{ProviderRecord, SignedProviderRecord};
use crate::vote_record::{SignedVoteRecord, VoteRecord};
use crate::Timestamp;
use anyhow::anyhow;
use ipc_api::subnet_id::SubnetID;
use ipc_observability::emit;
use libp2p::core::Endpoint;
use libp2p::gossipsub::{
self, IdentTopic, MessageAuthenticity, MessageId, PublishError, Sha256Topic, SubscriptionError,
Expand All @@ -24,14 +32,6 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::time::{Instant, Interval};

use crate::hash::blake2b_256;
use crate::provider_cache::{ProviderDelta, SubnetProviderCache};
use crate::provider_record::{ProviderRecord, SignedProviderRecord};
use crate::vote_record::{SignedVoteRecord, VoteRecord};
use crate::{stats, Timestamp};

use super::NetworkConfig;

/// `Gossipsub` topic identifier for subnet membership.
const PUBSUB_MEMBERSHIP: &str = "/ipc/membership";
/// `Gossipsub` topic identifier for voting about content.
Expand Down Expand Up @@ -326,11 +326,13 @@ where
);
match self.inner.publish(self.membership_topic.clone(), data) {
Err(e) => {
stats::MEMBERSHIP_PUBLISH_FAILURE.inc();
emit(observe::MembershipFailureEvent::PublishFailure(
e.to_string(),
));
Err(anyhow!(e))
}
Ok(_msg_id) => {
stats::MEMBERSHIP_PUBLISH_SUCCESS.inc();
emit(observe::MembershipEvent::PublishSuccess);
self.last_publish_timestamp = Timestamp::now();
self.next_publish_timestamp =
self.last_publish_timestamp + self.publish_interval.period();
Expand All @@ -346,11 +348,13 @@ where
let data = vote.into_envelope().into_protobuf_encoding();
match self.inner.publish(topic, data) {
Err(e) => {
stats::MEMBERSHIP_PUBLISH_FAILURE.inc();
emit(observe::MembershipFailureEvent::PublishFailure(
e.to_string(),
));
Err(anyhow!(e))
}
Ok(_msg_id) => {
stats::MEMBERSHIP_PUBLISH_SUCCESS.inc();
emit(observe::MembershipEvent::PublishSuccess);
Ok(())
}
}
Expand All @@ -363,11 +367,13 @@ where
let topic = self.preemptive_topic(&subnet_id);
match self.inner.publish(topic, data) {
Err(e) => {
stats::MEMBERSHIP_PUBLISH_FAILURE.inc();
emit(observe::MembershipFailureEvent::PublishFailure(
e.to_string(),
));
Err(anyhow!(e))
}
Ok(_msg_id) => {
stats::MEMBERSHIP_PUBLISH_SUCCESS.inc();
emit(observe::MembershipEvent::PublishSuccess);
Ok(())
}
}
Expand All @@ -378,8 +384,9 @@ where
/// Call this method when the discovery service learns the address of a peer.
pub fn set_routable(&mut self, peer_id: PeerId) {
self.provider_cache.set_routable(peer_id);
stats::MEMBERSHIP_ROUTABLE_PEERS
.set(self.provider_cache.num_routable().try_into().unwrap());

let num_routable = self.provider_cache.num_routable().try_into().unwrap();
emit(observe::MembershipEvent::RoutablePeers(num_routable));
self.publish_for_new_peer(peer_id);
}

Expand All @@ -406,33 +413,27 @@ where
if msg.topic == self.membership_topic.hash() {
match SignedProviderRecord::from_bytes(&msg.data).map(|r| r.into_record()) {
Ok(record) => self.handle_provider_record(record),
Err(e) => {
stats::MEMBERSHIP_INVALID_MESSAGE.inc();
warn!(
"Gossip message from peer {:?} could not be deserialized as ProviderRecord: {e}",
msg.source
);
}
Err(e) => emit(
observe::MembershipFailureEvent::GossipInvalidProviderRecord(
msg.source,
e.to_string(),
),
),
}
} else if self.voting_topics.contains(&msg.topic) {
match SignedVoteRecord::from_bytes(&msg.data).map(|r| r.into_record()) {
Ok(record) => self.handle_vote_record(record),
Err(e) => {
stats::MEMBERSHIP_INVALID_MESSAGE.inc();
warn!(
"Gossip message from peer {:?} could not be deserialized as VoteRecord: {e}",
msg.source
);
}
Err(e) => emit(observe::MembershipFailureEvent::GossipInvalidVoteRecord(
msg.source,
e.to_string(),
)),
}
} else if let Some(subnet_id) = self.preemptive_topics.get(&msg.topic) {
self.handle_preemptive_data(subnet_id.clone(), msg.data)
} else {
stats::MEMBERSHIP_UNKNOWN_TOPIC.inc();
warn!(
"unknown gossipsub topic in message from {:?}: {}",
msg.source, msg.topic
);
emit(observe::MembershipFailureEvent::GossipUnknownTopic(
msg.source, msg.topic,
));
}
}

Expand All @@ -444,7 +445,7 @@ where
debug!("received provider record: {record:?}");
let (event, publish) = match self.provider_cache.add_provider(&record) {
None => {
stats::MEMBERSHIP_SKIPPED_PEERS.inc();
emit(observe::MembershipEvent::Skipped(record.peer_id));
(Some(Event::Skipped(record.peer_id)), false)
}
Some(d) if d.is_empty() && !d.is_new => (None, false),
Expand All @@ -459,7 +460,7 @@ where
}

if publish {
stats::MEMBERSHIP_PROVIDER_PEERS.inc();
emit(observe::MembershipEvent::Added(record.peer_id));
self.publish_for_new_peer(record.peer_id)
}
}
Expand Down Expand Up @@ -514,7 +515,7 @@ where
let cutoff_timestamp = Timestamp::now() - self.max_provider_age;
let pruned = self.provider_cache.prune_providers(cutoff_timestamp);
for peer_id in pruned {
stats::MEMBERSHIP_PROVIDER_PEERS.dec();
emit(observe::MembershipEvent::Removed(peer_id));
self.outbox.push_back(Event::Removed(peer_id))
}
}
Expand Down
2 changes: 1 addition & 1 deletion ipld/resolver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ mod behaviour;
mod client;
mod hash;
mod limiter;
mod observe;
mod service;
mod stats;
mod timestamp;

mod provider_cache;
Expand Down
Loading