Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 30 additions & 27 deletions client/network/src/service/out_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,13 @@
use crate::event::Event;

use futures::{prelude::*, ready, stream::FusedStream};
use log::error;
use parking_lot::Mutex;
use log::{error, info};
use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64};
use std::{
backtrace::Backtrace,
cell::RefCell,
fmt,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

Expand All @@ -52,16 +50,15 @@ use std::{
/// to warn if there are too many unprocessed events in the channel.
pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiver) {
let (tx, rx) = async_channel::unbounded();
let metrics = Arc::new(Mutex::new(None));
let tx = Sender {
inner: tx,
name,
queue_size_warning,
warning_fired: false,
creation_backtrace: Backtrace::force_capture(),
metrics: metrics.clone(),
metrics: None,
};
let rx = Receiver { inner: rx, name, metrics };
let rx = Receiver { inner: rx, name, metrics: None };
(tx, rx)
}

Expand All @@ -84,7 +81,7 @@ pub struct Sender {
creation_backtrace: Backtrace,
/// Clone of [`Receiver::metrics`]. Will be initialized when [`Sender`] is added to
/// [`OutChannels`] with `OutChannels::push()`.
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
metrics: Option<Metrics>,
}

impl fmt::Debug for Sender {
Expand All @@ -95,8 +92,8 @@ impl fmt::Debug for Sender {

impl Drop for Sender {
fn drop(&mut self) {
let metrics = self.metrics.lock();
if let Some(Some(metrics)) = metrics.as_ref().map(|m| &**m) {
let metrics = self.metrics.clone();
if let Some(metrics) = metrics.as_ref() {
metrics.num_channels.with_label_values(&[self.name]).dec();
}
}
Expand All @@ -108,21 +105,18 @@ pub struct Receiver {
name: &'static str,
/// Initially contains `None`, and will be set to a value once the corresponding [`Sender`]
/// is assigned to an instance of [`OutChannels`].
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
metrics: Option<Metrics>,
}

impl Stream for Receiver {
type Item = Event;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Event>> {
if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) {
let metrics = self.metrics.lock().clone();
match metrics.as_ref().map(|m| m.as_ref()) {
Some(Some(metrics)) => metrics.event_out(&ev, self.name),
Some(None) => (), // no registry
None => log::warn!(
"Inconsistency in out_events: event happened before sender associated"
),
let metrics = self.metrics.clone();
match metrics.as_ref() {
Some(metrics) => metrics.event_out(&ev, self.name),
None => (), // no registry
}
Poll::Ready(Some(ev))
} else {
Expand Down Expand Up @@ -151,7 +145,7 @@ pub struct OutChannels {
event_streams: Vec<Sender>,
/// The metrics we collect. A clone of this is sent to each [`Receiver`] associated with this
/// object.
metrics: Arc<Option<Metrics>>,
metrics: Option<Metrics>,
}

impl OutChannels {
Expand All @@ -160,17 +154,15 @@ impl OutChannels {
let metrics =
if let Some(registry) = registry { Some(Metrics::register(registry)?) } else { None };

Ok(Self { event_streams: Vec::new(), metrics: Arc::new(metrics) })
Ok(Self { event_streams: Vec::new(), metrics })
}

/// Adds a new [`Sender`] to the collection.
pub fn push(&mut self, sender: Sender) {
let mut metrics = sender.metrics.lock();
debug_assert!(metrics.is_none());
*metrics = Some(self.metrics.clone());
drop(metrics);
pub fn push(&mut self, mut sender: Sender) {
debug_assert!(sender.metrics.is_none());
sender.metrics = self.metrics.clone();

if let Some(metrics) = &*self.metrics {
if let Some(metrics) = self.metrics.as_ref() {
metrics.num_channels.with_label_values(&[sender.name]).inc();
}

Expand All @@ -180,7 +172,8 @@ impl OutChannels {
/// Sends an event.
pub fn send(&mut self, event: Event) {
self.event_streams.retain_mut(|sender| {
if sender.inner.len() >= sender.queue_size_warning && !sender.warning_fired {
let current_pending = sender.inner.len();
if current_pending >= sender.queue_size_warning && !sender.warning_fired {
sender.warning_fired = true;
error!(
"The number of unprocessed events in channel `{}` exceeded {}.\n\
Expand All @@ -191,11 +184,20 @@ impl OutChannels {
sender.creation_backtrace,
Backtrace::force_capture(),
);
} else if sender.warning_fired &&
current_pending < sender.queue_size_warning.wrapping_div(2)
{
sender.warning_fired = false;
info!(
"Channel `{}` is no longer overflowed. Number of events: {}",
sender.name, current_pending
);
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this. Otherwise it will get really spammy. This is just about saying "hey, there is somehing" and not on/off/on/off/on/off

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The information like on/off would be very useful when we want to debug networking issue. However, I agree that it could be quite spammy. What about emitting error! on the first trigger, and then indicate all subsequent free/busy events on info or even debug level?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug sounds reasonable


sender.inner.try_send(event.clone()).is_ok()
});

if let Some(metrics) = &*self.metrics {
if let Some(metrics) = self.metrics.as_ref() {
for ev in &self.event_streams {
metrics.event_in(&event, ev.name);
}
Expand All @@ -211,6 +213,7 @@ impl fmt::Debug for OutChannels {
}
}

#[derive(Clone)]
struct Metrics {
// This list is ordered alphabetically
events_total: CounterVec<U64>,
Expand Down