Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 64 additions & 39 deletions client/network/src/service/out_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,37 +34,48 @@
use crate::event::Event;

use futures::{prelude::*, ready, stream::FusedStream};
use log::error;
use parking_lot::Mutex;
use log::{debug, error};
use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64};
use std::{
backtrace::Backtrace,
cell::RefCell,
fmt,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

/// Log target for this file.
pub const LOG_TARGET: &str = "sub-libp2p::out_events";

/// Creates a new channel that can be associated to a [`OutChannels`].
///
/// The name is used in Prometheus reports, the queue size threshold is used
/// to warn if there are too many unprocessed events in the channel.
pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiver) {
let (tx, rx) = async_channel::unbounded();
let metrics = Arc::new(Mutex::new(None));
let tx = Sender {
inner: tx,
name,
queue_size_warning,
warning_fired: false,
warning_fired: SenderWarningState::NotFired,
creation_backtrace: Backtrace::force_capture(),
metrics: metrics.clone(),
metrics: None,
};
let rx = Receiver { inner: rx, name, metrics };
let rx = Receiver { inner: rx, name, metrics: None };
(tx, rx)
}

/// A state of a sender warning that is used to avoid spamming the logs.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SenderWarningState {
/// The warning has not been fired yet.
NotFired,
/// The warning has been fired, and the channel is full
FiredFull,
/// The warning has been fired and the channel is not full anymore.
FiredFree,
}

/// Sending side of a channel.
///
/// Must be associated with an [`OutChannels`] before anything can be sent on it
Expand All @@ -78,13 +89,14 @@ pub struct Sender {
name: &'static str,
/// Threshold queue size to generate an error message in the logs.
queue_size_warning: usize,
/// We generate the error message only once to not spam the logs.
warning_fired: bool,
/// We generate the error message only once to not spam the logs after the first error.
/// Subsequently we indicate channel fullness on debug level.
warning_fired: SenderWarningState,
/// Backtrace of a place where the channel was created.
creation_backtrace: Backtrace,
/// Clone of [`Receiver::metrics`]. Will be initialized when [`Sender`] is added to
/// [`OutChannels`] with `OutChannels::push()`.
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
metrics: Option<Metrics>,
}

impl fmt::Debug for Sender {
Expand All @@ -95,8 +107,7 @@ impl fmt::Debug for Sender {

impl Drop for Sender {
fn drop(&mut self) {
let metrics = self.metrics.lock();
if let Some(Some(metrics)) = metrics.as_ref().map(|m| &**m) {
if let Some(metrics) = self.metrics.as_ref() {
metrics.num_channels.with_label_values(&[self.name]).dec();
}
}
Expand All @@ -108,21 +119,16 @@ pub struct Receiver {
name: &'static str,
/// Initially contains `None`, and will be set to a value once the corresponding [`Sender`]
/// is assigned to an instance of [`OutChannels`].
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
metrics: Option<Metrics>,
}

impl Stream for Receiver {
type Item = Event;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Event>> {
if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) {
let metrics = self.metrics.lock().clone();
match metrics.as_ref().map(|m| m.as_ref()) {
Some(Some(metrics)) => metrics.event_out(&ev, self.name),
Some(None) => (), // no registry
None => log::warn!(
"Inconsistency in out_events: event happened before sender associated"
),
if let Some(metrics) = &self.metrics {
metrics.event_out(&ev, self.name);
}
Poll::Ready(Some(ev))
} else {
Expand Down Expand Up @@ -151,7 +157,7 @@ pub struct OutChannels {
event_streams: Vec<Sender>,
/// The metrics we collect. A clone of this is sent to each [`Receiver`] associated with this
/// object.
metrics: Arc<Option<Metrics>>,
metrics: Option<Metrics>,
}

impl OutChannels {
Expand All @@ -160,17 +166,15 @@ impl OutChannels {
let metrics =
if let Some(registry) = registry { Some(Metrics::register(registry)?) } else { None };

Ok(Self { event_streams: Vec::new(), metrics: Arc::new(metrics) })
Ok(Self { event_streams: Vec::new(), metrics })
}

/// Adds a new [`Sender`] to the collection.
pub fn push(&mut self, sender: Sender) {
let mut metrics = sender.metrics.lock();
debug_assert!(metrics.is_none());
*metrics = Some(self.metrics.clone());
drop(metrics);
pub fn push(&mut self, mut sender: Sender) {
debug_assert!(sender.metrics.is_none());
sender.metrics = self.metrics.clone();

if let Some(metrics) = &*self.metrics {
if let Some(metrics) = &self.metrics {
metrics.num_channels.with_label_values(&[sender.name]).inc();
}

Expand All @@ -180,22 +184,42 @@ impl OutChannels {
/// Sends an event.
pub fn send(&mut self, event: Event) {
self.event_streams.retain_mut(|sender| {
if sender.inner.len() >= sender.queue_size_warning && !sender.warning_fired {
sender.warning_fired = true;
error!(
"The number of unprocessed events in channel `{}` exceeded {}.\n\
The channel was created at:\n{:}\n
The last event was sent from:\n{:}",
sender.name,
sender.queue_size_warning,
sender.creation_backtrace,
Backtrace::force_capture(),
let current_pending = sender.inner.len();
if current_pending >= sender.queue_size_warning {
if sender.warning_fired == SenderWarningState::NotFired {
error!(
"The number of unprocessed events in channel `{}` exceeded {}.\n\
The channel was created at:\n{:}\n
The last event was sent from:\n{:}",
sender.name,
sender.queue_size_warning,
sender.creation_backtrace,
Backtrace::force_capture(),
);
} else if sender.warning_fired == SenderWarningState::FiredFree {
// We don't want to spam the logs, so we only log on debug level
debug!(
target: LOG_TARGET,
"Channel `{}` is overflowed. Number of events: {}",
sender.name, current_pending
);
}
sender.warning_fired = SenderWarningState::FiredFull;
} else if sender.warning_fired == SenderWarningState::FiredFull &&
current_pending < sender.queue_size_warning.wrapping_div(2)
{
sender.warning_fired = SenderWarningState::FiredFree;
debug!(
target: LOG_TARGET,
"Channel `{}` is no longer overflowed. Number of events: {}",
sender.name, current_pending
);
}

sender.inner.try_send(event.clone()).is_ok()
});

if let Some(metrics) = &*self.metrics {
if let Some(metrics) = &self.metrics {
for ev in &self.event_streams {
metrics.event_in(&event, ev.name);
}
Expand All @@ -211,6 +235,7 @@ impl fmt::Debug for OutChannels {
}
}

#[derive(Clone)]
struct Metrics {
// This list is ordered alphabetically
events_total: CounterVec<U64>,
Expand Down