3131//! - Send events by calling [`OutChannels::send`]. Events are cloned for each sender in the
3232//! collection.
3333
34- use backtrace:: Backtrace ;
35- use futures:: { channel:: mpsc, prelude:: * , ready, stream:: FusedStream } ;
34+ use futures:: { prelude:: * , ready, stream:: FusedStream } ;
3635use log:: error;
3736use parking_lot:: Mutex ;
3837use prometheus_endpoint:: { register, CounterVec , GaugeVec , Opts , PrometheusError , Registry , U64 } ;
3938use sc_network_common:: protocol:: event:: Event ;
4039use std:: {
40+ backtrace:: Backtrace ,
4141 cell:: RefCell ,
4242 fmt,
4343 pin:: Pin ,
44- sync:: {
45- atomic:: { AtomicI64 , Ordering } ,
46- Arc ,
47- } ,
44+ sync:: Arc ,
4845 task:: { Context , Poll } ,
4946} ;
5047
5148/// Creates a new channel that can be associated to a [`OutChannels`].
5249///
5350/// The name is used in Prometheus reports, the queue size threshold is used
5451/// to warn if there are too many unprocessed events in the channel.
55- pub fn channel ( name : & ' static str , queue_size_warning : i64 ) -> ( Sender , Receiver ) {
56- let ( tx, rx) = mpsc :: unbounded ( ) ;
52+ pub fn channel ( name : & ' static str , queue_size_warning : usize ) -> ( Sender , Receiver ) {
53+ let ( tx, rx) = async_channel :: unbounded ( ) ;
5754 let metrics = Arc :: new ( Mutex :: new ( None ) ) ;
58- let queue_size = Arc :: new ( AtomicI64 :: new ( 0 ) ) ;
5955 let tx = Sender {
6056 inner : tx,
6157 name,
62- queue_size : queue_size. clone ( ) ,
6358 queue_size_warning,
6459 warning_fired : false ,
65- creation_backtrace : Backtrace :: new_unresolved ( ) ,
60+ creation_backtrace : Backtrace :: force_capture ( ) ,
6661 metrics : metrics. clone ( ) ,
6762 } ;
68- let rx = Receiver { inner : rx, name, queue_size , metrics } ;
63+ let rx = Receiver { inner : rx, name, metrics } ;
6964 ( tx, rx)
7065}
7166
@@ -77,16 +72,11 @@ pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver
7772/// implement the `Clone` trait e.g. in Order to not complicate the logic keeping the metrics in
7873/// sync on drop. If someone adds a `#[derive(Clone)]` below, it is **wrong**.
7974pub struct Sender {
80- inner : mpsc :: UnboundedSender < Event > ,
75+ inner : async_channel :: Sender < Event > ,
8176 /// Name to identify the channel (e.g., in Prometheus and logs).
8277 name : & ' static str ,
83- /// Number of events in the queue. Clone of [`Receiver::in_transit`].
84- // To not bother with ordering and possible underflow errors of the unsigned counter
85- // we just use `i64` and `Ordering::Relaxed`, and perceive `queue_size` as approximate.
86- // It can turn < 0 though.
87- queue_size : Arc < AtomicI64 > ,
8878 /// Threshold queue size to generate an error message in the logs.
89- queue_size_warning : i64 ,
79+ queue_size_warning : usize ,
9080 /// We generate the error message only once to not spam the logs.
9181 warning_fired : bool ,
9282 /// Backtrace of a place where the channel was created.
@@ -113,9 +103,8 @@ impl Drop for Sender {
113103
114104/// Receiving side of a channel.
115105pub struct Receiver {
116- inner : mpsc :: UnboundedReceiver < Event > ,
106+ inner : async_channel :: Receiver < Event > ,
117107 name : & ' static str ,
118- queue_size : Arc < AtomicI64 > ,
119108 /// Initially contains `None`, and will be set to a value once the corresponding [`Sender`]
120109 /// is assigned to an instance of [`OutChannels`].
121110 metrics : Arc < Mutex < Option < Arc < Option < Metrics > > > > > ,
@@ -126,7 +115,6 @@ impl Stream for Receiver {
126115
127116 fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Option < Event > > {
128117 if let Some ( ev) = ready ! ( Pin :: new( & mut self . inner) . poll_next( cx) ) {
129- let _ = self . queue_size . fetch_sub ( 1 , Ordering :: Relaxed ) ;
130118 let metrics = self . metrics . lock ( ) . clone ( ) ;
131119 match metrics. as_ref ( ) . map ( |m| m. as_ref ( ) ) {
132120 Some ( Some ( metrics) ) => metrics. event_out ( & ev, self . name ) ,
@@ -191,17 +179,19 @@ impl OutChannels {
191179 /// Sends an event.
192180 pub fn send ( & mut self , event : Event ) {
193181 self . event_streams . retain_mut ( |sender| {
194- let queue_size = sender. queue_size . fetch_add ( 1 , Ordering :: Relaxed ) ;
195- if queue_size == sender. queue_size_warning && !sender. warning_fired {
182+ if sender. inner . len ( ) >= sender. queue_size_warning && !sender. warning_fired {
196183 sender. warning_fired = true ;
197- sender. creation_backtrace . resolve ( ) ;
198184 error ! (
199- "The number of unprocessed events in channel `{}` reached {}.\n \
200- The channel was created at:\n {:?}",
201- sender. name, sender. queue_size_warning, sender. creation_backtrace,
185+ "The number of unprocessed events in channel `{}` exceeded {}.\n \
186+ The channel was created at:\n {:}\n
187+ The last event was sent from:\n {:}" ,
188+ sender. name,
189+ sender. queue_size_warning,
190+ sender. creation_backtrace,
191+ Backtrace :: force_capture( ) ,
202192 ) ;
203193 }
204- sender. inner . unbounded_send ( event. clone ( ) ) . is_ok ( )
194+ sender. inner . try_send ( event. clone ( ) ) . is_ok ( )
205195 } ) ;
206196
207197 if let Some ( metrics) = & * self . metrics {
0 commit comments