Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions prdoc/pr_7724.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
title: Terminate libp2p the outbound notification substream on io errors

doc:
- audience: [Node Dev, Node Operator]
description: |
This PR handles a case where we called the poll_next on an outbound substream notification to check if the stream is closed.
It is entirely possible that the poll_next would return an io::error, for example end of file.
This PR ensures that we make the distinction between unexpected incoming data, and error originated from poll_next.
While at it, the bulk of the PR change propagates the PeerID from the network behavior, through the notification handler, to the notification outbound stream for logging purposes.

crates:
- name: sc-network
bump: patch
24 changes: 18 additions & 6 deletions substrate/client/network/src/protocol/notifications/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ impl Notifications {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
event: NotifsHandlerIn::Open { protocol_index: set_id.into(), peer_id },
});
*connec_state = ConnectionState::Opening;
*occ_entry.into_mut() = PeerState::Enabled { connections };
Expand Down Expand Up @@ -1062,7 +1062,10 @@ impl Notifications {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: incoming.peer_id,
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Open { protocol_index: incoming.set_id.into() },
event: NotifsHandlerIn::Open {
protocol_index: incoming.set_id.into(),
peer_id: incoming.peer_id,
},
});
*connec_state = ConnectionState::Opening;
}
Expand Down Expand Up @@ -1260,7 +1263,10 @@ impl NetworkBehaviour for Notifications {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
event: NotifsHandlerIn::Open {
protocol_index: set_id.into(),
peer_id,
},
});

let mut connections = SmallVec::new();
Expand Down Expand Up @@ -1762,7 +1768,10 @@ impl NetworkBehaviour for Notifications {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
event: NotifsHandlerIn::Open {
protocol_index: set_id.into(),
peer_id,
},
});
*connec_state = ConnectionState::Opening;
} else {
Expand Down Expand Up @@ -1849,7 +1858,10 @@ impl NetworkBehaviour for Notifications {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
event: NotifsHandlerIn::Open {
protocol_index: set_id.into(),
peer_id,
},
});
*connec_state = ConnectionState::Opening;

Expand Down Expand Up @@ -2336,7 +2348,7 @@ impl NetworkBehaviour for Notifications {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
event: NotifsHandlerIn::Open { protocol_index: set_id.into(), peer_id },
});
*connec_state = ConnectionState::Opening;
*peer_state = PeerState::Enabled { connections: mem::take(connections) };
Expand Down
37 changes: 30 additions & 7 deletions substrate/client/network/src/protocol/notifications/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ pub enum NotifsHandlerIn {
Open {
/// Index of the protocol in the list of protocols passed at initialization.
protocol_index: usize,

/// The peer id of the remote.
peer_id: PeerId,
},

/// Instruct the handler to close the notification substreams, or reject any pending incoming
Expand Down Expand Up @@ -632,7 +635,7 @@ impl ConnectionHandler for NotifsHandler {

fn on_behaviour_event(&mut self, message: NotifsHandlerIn) {
match message {
NotifsHandlerIn::Open { protocol_index } => {
NotifsHandlerIn::Open { protocol_index, peer_id } => {
let protocol_info = &mut self.protocols[protocol_index];
match &mut protocol_info.state {
State::Closed { pending_opening } => {
Expand All @@ -642,6 +645,7 @@ impl ConnectionHandler for NotifsHandler {
protocol_info.config.fallback_names.clone(),
protocol_info.config.handshake.read().clone(),
protocol_info.config.max_notification_size,
peer_id,
);

self.events_queue.push_back(
Expand All @@ -663,6 +667,7 @@ impl ConnectionHandler for NotifsHandler {
protocol_info.config.fallback_names.clone(),
handshake_message.clone(),
protocol_info.config.max_notification_size,
peer_id,
);

self.events_queue.push_back(
Expand Down Expand Up @@ -1202,7 +1207,10 @@ pub mod tests {
.await;

// move the handler state to 'Opening'
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
handler.on_behaviour_event(NotifsHandlerIn::Open {
protocol_index: 0,
peer_id: PeerId::random(),
});
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_), .. }
Expand Down Expand Up @@ -1273,7 +1281,10 @@ pub mod tests {
.await;

// move the handler state to 'Opening'
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
handler.on_behaviour_event(NotifsHandlerIn::Open {
protocol_index: 0,
peer_id: PeerId::random(),
});
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_), .. }
Expand Down Expand Up @@ -1362,7 +1373,10 @@ pub mod tests {

// first instruct the handler to open a connection and then close it right after
// so the handler is in state `Closed { pending_opening: true }`
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
handler.on_behaviour_event(NotifsHandlerIn::Open {
protocol_index: 0,
peer_id: PeerId::random(),
});
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_), .. }
Expand Down Expand Up @@ -1421,7 +1435,10 @@ pub mod tests {

// first instruct the handler to open a connection and then close it right after
// so the handler is in state `Closed { pending_opening: true }`
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
handler.on_behaviour_event(NotifsHandlerIn::Open {
protocol_index: 0,
peer_id: PeerId::random(),
});
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_), .. }
Expand Down Expand Up @@ -1502,7 +1519,10 @@ pub mod tests {

// first instruct the handler to open a connection and then close it right after
// so the handler is in state `Closed { pending_opening: true }`
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
handler.on_behaviour_event(NotifsHandlerIn::Open {
protocol_index: 0,
peer_id: PeerId::random(),
});
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_), .. }
Expand Down Expand Up @@ -1550,7 +1570,10 @@ pub mod tests {

// first instruct the handler to open a connection and then close it right after
// so the handler is in state `Closed { pending_opening: true }`
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
handler.on_behaviour_event(NotifsHandlerIn::Open {
protocol_index: 0,
peer_id: PeerId::random(),
});
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_), .. }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ use crate::types::ProtocolName;
use asynchronous_codec::Framed;
use bytes::BytesMut;
use futures::prelude::*;
use libp2p::core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p::{
core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
PeerId,
};
use log::{error, warn};
use unsigned_varint::codec::UviBytes;

Expand Down Expand Up @@ -78,6 +81,8 @@ pub struct NotificationsOut {
initial_message: Vec<u8>,
/// Maximum allowed size for a single notification.
max_notification_size: u64,
/// The peerID of the remote.
peer_id: PeerId,
}

/// A substream for incoming notification messages.
Expand Down Expand Up @@ -114,12 +119,15 @@ pub struct NotificationsOutSubstream<TSubstream> {
/// Substream where to send messages.
#[pin]
socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>,

/// The remote peer.
peer_id: PeerId,
}

#[cfg(test)]
impl<TSubstream> NotificationsOutSubstream<TSubstream> {
pub fn new(socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>) -> Self {
Self { socket }
Self { socket, peer_id: PeerId::random() }
}
}

Expand Down Expand Up @@ -349,6 +357,7 @@ impl NotificationsOut {
fallback_names: Vec<ProtocolName>,
initial_message: impl Into<Vec<u8>>,
max_notification_size: u64,
peer_id: PeerId,
) -> Self {
let initial_message = initial_message.into();
if initial_message.len() > MAX_HANDSHAKE_SIZE {
Expand All @@ -358,7 +367,7 @@ impl NotificationsOut {
let mut protocol_names = fallback_names;
protocol_names.insert(0, main_protocol_name.into());

Self { protocol_names, initial_message, max_notification_size }
Self { protocol_names, initial_message, max_notification_size, peer_id }
}
}

Expand Down Expand Up @@ -414,7 +423,10 @@ where
} else {
Some(negotiated_name)
},
substream: NotificationsOutSubstream { socket: Framed::new(socket, codec) },
substream: NotificationsOutSubstream {
socket: Framed::new(socket, codec),
peer_id: self.peer_id,
},
})
})
}
Expand Down Expand Up @@ -465,11 +477,25 @@ where
// even if we don't write anything into it.
match Stream::poll_next(this.socket.as_mut(), cx) {
Poll::Pending => {},
Poll::Ready(Some(_)) => {
error!(
target: LOG_TARGET,
"Unexpected incoming data in `NotificationsOutSubstream`",
);
Poll::Ready(Some(result)) => match result {
Ok(bytes) => {
error!(
target: "sub-libp2p",
"Unexpected incoming data in `NotificationsOutSubstream` peer={:?} bytes={bytes:?}",
this.peer_id
);
},
Err(error) => {
warn!(
target: "sub-libp2p",
"Error while reading from `NotificationsOutSubstream` peer={:?} error={error:?}",
this.peer_id
);

// The expectation is that the remote has closed the substream.
// This is similar to the `Poll::Ready(None)` branch below.
return Poll::Ready(Err(NotificationsOutError::Terminated));
},
},
Poll::Ready(None) => return Poll::Ready(Err(NotificationsOutError::Terminated)),
}
Expand Down Expand Up @@ -537,7 +563,10 @@ mod tests {
NotificationsOutSubstream,
};
use futures::{channel::oneshot, future, prelude::*, SinkExt, StreamExt};
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p::{
core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo},
PeerId,
};
use std::{pin::Pin, task::Poll};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::compat::TokioAsyncReadCompatExt;
Expand All @@ -557,7 +586,13 @@ mod tests {
NotificationsHandshakeError,
> {
let socket = TcpStream::connect(addr).await.unwrap();
let notifs_out = NotificationsOut::new("/test/proto/1", Vec::new(), handshake, 1024 * 1024);
let notifs_out = NotificationsOut::new(
"/test/proto/1",
Vec::new(),
handshake,
1024 * 1024,
PeerId::random(),
);
let (_, substream) = multistream_select::dialer_select_proto(
socket.compat(),
notifs_out.protocol_info(),
Expand Down Expand Up @@ -721,7 +756,13 @@ mod tests {
let client = tokio::spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let NotificationsOutOpen { handshake, .. } = OutboundUpgrade::upgrade_outbound(
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024),
NotificationsOut::new(
PROTO_NAME,
Vec::new(),
&b"initial message"[..],
1024 * 1024,
PeerId::random(),
),
socket.compat(),
ProtocolName::Static(PROTO_NAME),
)
Expand Down Expand Up @@ -766,6 +807,7 @@ mod tests {
Vec::new(),
&b"initial message"[..],
1024 * 1024,
PeerId::random(),
),
socket.compat(),
ProtocolName::Static(PROTO_NAME),
Expand Down
Loading