Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/protocol/notification/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,14 @@ impl Config {
let (notif_tx, notif_rx) = channel(DEFAULT_CHANNEL_SIZE);
let (command_tx, command_rx) = channel(DEFAULT_CHANNEL_SIZE);
let handshake = Arc::new(RwLock::new(handshake));
let handle =
NotificationHandle::new(event_rx, notif_rx, command_tx, Arc::clone(&handshake));
let handle = NotificationHandle::new(
protocol_name.clone(),
sync_channel_size,
event_rx,
notif_rx,
command_tx,
Arc::clone(&handshake),
);

(
Self {
Expand Down
60 changes: 52 additions & 8 deletions src/protocol/notification/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ impl NotificationSink {
/// Handle allowing the user protocol to interact with the notification protocol.
#[derive(Debug)]
pub struct NotificationHandle {
/// Protocol name served by this handle.
protocol_name: ProtocolName,

/// Configured synchronous channel size.
sync_channel_size: usize,

/// RX channel for receiving events from the notification protocol.
event_rx: Receiver<InnerNotificationEvent>,

Expand All @@ -195,12 +201,16 @@ pub struct NotificationHandle {
impl NotificationHandle {
/// Create new [`NotificationHandle`].
pub(crate) fn new(
protocol_name: ProtocolName,
sync_channel_size: usize,
event_rx: Receiver<InnerNotificationEvent>,
notif_rx: Receiver<(PeerId, BytesMut)>,
command_tx: Sender<NotificationCommand>,
handshake: Arc<RwLock<Vec<u8>>>,
) -> Self {
Self {
protocol_name,
sync_channel_size,
event_rx,
notif_rx,
command_tx,
Expand Down Expand Up @@ -401,9 +411,34 @@ impl NotificationHandle {
Err(error) => match error {
NotificationError::NoConnection => Err(NotificationError::NoConnection),
NotificationError::ChannelClogged => {
let _ = self.clogged.insert(peer).then(|| {
self.command_tx.try_send(NotificationCommand::ForceClose { peer })
});
if self.clogged.insert(peer) {
match self.command_tx.try_send(NotificationCommand::ForceClose { peer })
{
Ok(()) => tracing::warn!(
target: LOG_TARGET,
?peer,
protocol = %self.protocol_name,
sync_channel_size = self.sync_channel_size,
"sync notification channel clogged, queueing force close",
),
Err(error) => tracing::warn!(
target: LOG_TARGET,
?peer,
protocol = %self.protocol_name,
sync_channel_size = self.sync_channel_size,
?error,
"sync notification channel clogged, failed to queue force close",
),
}
} else {
tracing::debug!(
target: LOG_TARGET,
?peer,
protocol = %self.protocol_name,
sync_channel_size = self.sync_channel_size,
"sync notification channel still clogged, force close already queued",
);
}

Err(NotificationError::ChannelClogged)
}
Expand Down Expand Up @@ -479,7 +514,14 @@ impl Stream for NotificationHandle {
}
InnerNotificationEvent::NotificationStreamClosed { peer } => {
self.peers.remove(&peer);
self.clogged.remove(&peer);
if self.clogged.remove(&peer) {
tracing::debug!(
target: LOG_TARGET,
?peer,
protocol = %self.protocol_name,
"cleared clogged state after notification stream closed",
);
}

return Poll::Ready(Some(NotificationEvent::NotificationStreamClosed {
peer,
Expand All @@ -501,22 +543,24 @@ impl Stream for NotificationHandle {
handshake,
}));
}
InnerNotificationEvent::NotificationStreamOpenFailure { peer, error } =>
InnerNotificationEvent::NotificationStreamOpenFailure { peer, error } => {
return Poll::Ready(Some(
NotificationEvent::NotificationStreamOpenFailure { peer, error },
)),
))
}
},
}

match futures::ready!(self.notif_rx.poll_recv(cx)) {
None => return Poll::Ready(None),
Some((peer, notification)) =>
Some((peer, notification)) => {
if self.peers.contains_key(&peer) {
return Poll::Ready(Some(NotificationEvent::NotificationReceived {
peer,
notification,
}));
},
}
}
}
}
}
Expand Down
22 changes: 19 additions & 3 deletions src/protocol/notification/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1409,8 +1409,9 @@ impl NotificationProtocol {
let (tx, rx) = oneshot::channel();
self.pending_validations.push(Box::pin(async move {
match rx.await {
Ok(ValidationResult::Accept) =>
(peer, ValidationResult::Accept),
Ok(ValidationResult::Accept) => {
(peer, ValidationResult::Accept)
}
_ => (peer, ValidationResult::Reject),
}
}));
Expand Down Expand Up @@ -1827,7 +1828,22 @@ impl NotificationProtocol {
}
}
NotificationCommand::ForceClose { peer } => {
let _ = self.service.force_close(peer);
tracing::warn!(
target: LOG_TARGET,
?peer,
protocol = %self.protocol,
"processing force close command after notification channel clog",
);

if let Err(error) = self.service.force_close(peer) {
tracing::warn!(
target: LOG_TARGET,
?peer,
protocol = %self.protocol,
?error,
"failed to force close connection after notification channel clog",
);
}
}
#[cfg(feature = "fuzz")]
NotificationCommand::SendNotification{ .. } => unreachable!()
Expand Down
Loading