diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index aefaaeec933..ccccac6f887 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -26,7 +26,7 @@ use futures::future; use futures::future::{BoxFuture, FutureExt}; use instant::Instant; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::upgrade::{DeniedUpgrade, NegotiationError, UpgradeError}; +use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::ConnectedPoint; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, @@ -229,27 +229,23 @@ impl Handler { }, )); } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + ConnectionHandlerUpgrErr::NegotiationFailed => { // The remote merely doesn't support the DCUtR protocol. // This is no reason to close the connection, which may // successfully communicate with other protocols already. self.keep_alive = KeepAlive::No; self.queued_events.push_back(ConnectionHandlerEvent::Custom( Event::InboundNegotiationFailed { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::Failed, - )), + error: ConnectionHandlerUpgrErr::NegotiationFailed, }, )); } _ => { // Anything else is considered a fatal error or misbehaviour of // the remote peer and results in closing the connection. - self.pending_error = Some(error.map_upgrade_err(|e| { - e.map_err(|e| match e { - Either::Left(e) => Either::Left(e), - Either::Right(v) => void::unreachable(v), - }) + self.pending_error = Some(error.map_upgrade_err(|e| match e { + Either::Left(e) => Either::Left(e), + Either::Right(v) => void::unreachable(v), })); } } @@ -272,22 +268,20 @@ impl Handler { }, )); } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + ConnectionHandlerUpgrErr::NegotiationFailed => { // The remote merely doesn't support the DCUtR protocol. // This is no reason to close the connection, which may // successfully communicate with other protocols already. self.queued_events.push_back(ConnectionHandlerEvent::Custom( Event::OutboundNegotiationFailed { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::Failed, - )), + error: ConnectionHandlerUpgrErr::NegotiationFailed, }, )); } _ => { // Anything else is considered a fatal error or misbehaviour of // the remote peer and results in closing the connection. - self.pending_error = Some(error.map_upgrade_err(|e| e.map_err(Either::Right))); + self.pending_error = Some(error.map_upgrade_err(Either::Right)); } } } @@ -388,7 +382,7 @@ impl ConnectionHandler for Handler { } Err(e) => { return Poll::Ready(ConnectionHandlerEvent::Close( - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Left(e))), + ConnectionHandlerUpgrErr::Upgrade(Either::Left(e)), )) } } diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 39a65e7455a..2e0975f68d4 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -25,7 +25,6 @@ use asynchronous_codec::Framed; use futures::prelude::*; use futures::StreamExt; use instant::Instant; -use libp2p_core::upgrade::{NegotiationError, UpgradeError}; use libp2p_swarm::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, @@ -299,28 +298,21 @@ impl ConnectionHandler for Handler { Some(HandlerError::NegotiationTimeout) } // There was an error post negotiation, close the connection. - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => Some(e), - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => { - match negotiation_error { - NegotiationError::Failed => { - // The protocol is not supported - self.protocol_unsupported = true; - if !self.peer_kind_sent { - self.peer_kind_sent = true; - // clear all substreams so the keep alive returns false - self.inbound_substream = None; - self.outbound_substream = None; - self.keep_alive = KeepAlive::No; - return Poll::Ready(ConnectionHandlerEvent::Custom( - HandlerEvent::PeerKind(PeerKind::NotSupported), - )); - } else { - None - } - } - NegotiationError::ProtocolError(e) => { - Some(HandlerError::NegotiationProtocolError(e)) - } + ConnectionHandlerUpgrErr::Upgrade(e) => Some(e), + ConnectionHandlerUpgrErr::NegotiationFailed => { + // The protocol is not supported + self.protocol_unsupported = true; + if !self.peer_kind_sent { + self.peer_kind_sent = true; + // clear all substreams so the keep alive returns false + self.inbound_substream = None; + self.outbound_substream = None; + self.keep_alive = KeepAlive::No; + return Poll::Ready(ConnectionHandlerEvent::Custom( + HandlerEvent::PeerKind(PeerKind::NotSupported), + )); + } else { + None } } }; diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 7b5f4b580e7..925140ce65f 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -254,12 +254,9 @@ impl Handler { ::OutboundProtocol, >, ) { - use libp2p_core::upgrade::UpgradeError; - let err = err.map_upgrade_err(|e| match e { - UpgradeError::Select(e) => UpgradeError::Select(e), - UpgradeError::Apply(Either::Left(ioe)) => UpgradeError::Apply(ioe), - UpgradeError::Apply(Either::Right(ioe)) => UpgradeError::Apply(ioe), + Either::Left(ioe) => ioe, + Either::Right(ioe) => ioe, }); self.events .push(ConnectionHandlerEvent::Custom(Event::IdentificationError( @@ -366,9 +363,7 @@ impl ConnectionHandler for Handler { Event::Identification(peer_id), )), Poll::Ready(Some(Err(err))) => Poll::Ready(ConnectionHandlerEvent::Custom( - Event::IdentificationError(ConnectionHandlerUpgrErr::Upgrade( - libp2p_core::upgrade::UpgradeError::Apply(err), - )), + Event::IdentificationError(ConnectionHandlerUpgrErr::Upgrade(err)), )), Poll::Ready(None) | Poll::Pending => Poll::Pending, } diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index 2703b274c77..302fbe53567 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -23,7 +23,6 @@ use futures::future::BoxFuture; use futures::prelude::*; use futures_timer::Delay; use libp2p_core::upgrade::ReadyUpgrade; -use libp2p_core::{upgrade::NegotiationError, UpgradeError}; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; @@ -238,7 +237,7 @@ impl Handler { self.outbound = None; // Request a new substream on the next `poll`. let error = match error { - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + ConnectionHandlerUpgrErr::NegotiationFailed => { debug_assert_eq!(self.state, State::Active); self.state = State::Inactive { reported: false }; diff --git a/protocols/relay/src/behaviour/handler.rs b/protocols/relay/src/behaviour/handler.rs index a1aaf0fa9d7..c5cf701ae51 100644 --- a/protocols/relay/src/behaviour/handler.rs +++ b/protocols/relay/src/behaviour/handler.rs @@ -30,7 +30,7 @@ use futures::io::AsyncWriteExt; use futures::stream::{FuturesUnordered, StreamExt}; use futures_timer::Delay; use instant::Instant; -use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId}; +use libp2p_core::{ConnectedPoint, Multiaddr, PeerId}; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, SendWrapper, @@ -506,25 +506,9 @@ impl Handler { let non_fatal_error = match error { ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout, ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer, - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )) => ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )), - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - )) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)), - )); - return; - } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply( - inbound_hop::UpgradeError::Fatal(error), - )) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(Either::Left(error)), - )); + ConnectionHandlerUpgrErr::NegotiationFailed => ConnectionHandlerUpgrErr::NegotiationFailed, + ConnectionHandlerUpgrErr::Upgrade(inbound_hop::UpgradeError::Fatal(error)) => { + self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(Either::Left(error))); return; } }; @@ -553,29 +537,16 @@ impl Handler { ConnectionHandlerUpgrErr::Timer => { (ConnectionHandlerUpgrErr::Timer, Status::ConnectionFailed) } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )) => { + ConnectionHandlerUpgrErr::NegotiationFailed => { // The remote has previously done a reservation. Doing a reservation but not // supporting the stop protocol is pointless, thus disconnecting. - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Select(upgrade::NegotiationError::Failed), - )); + self.pending_error = Some(ConnectionHandlerUpgrErr::NegotiationFailed); return; } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - )) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)), - )); - return; - } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => match error { + ConnectionHandlerUpgrErr::Upgrade(error) => match error { outbound_stop::UpgradeError::Fatal(error) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(Either::Right(error)), - )); + self.pending_error = + Some(ConnectionHandlerUpgrErr::Upgrade(Either::Right(error))); return; } outbound_stop::UpgradeError::CircuitFailed(error) => { @@ -587,10 +558,7 @@ impl Handler { Status::PermissionDenied } }; - ( - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)), - status, - ) + (ConnectionHandlerUpgrErr::Upgrade(error), status) } }, }; diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index 9bf09e3d178..20859886e86 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -29,7 +29,7 @@ use futures::stream::{FuturesUnordered, StreamExt}; use futures_timer::Delay; use instant::Instant; use libp2p_core::multiaddr::Protocol; -use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId}; +use libp2p_core::{ConnectedPoint, Multiaddr, PeerId}; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, SendWrapper, @@ -356,25 +356,9 @@ impl Handler { let non_fatal_error = match error { ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout, ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer, - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )) => ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )), - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - )) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)), - )); - return; - } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply( - inbound_stop::UpgradeError::Fatal(error), - )) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(Either::Left(error)), - )); + ConnectionHandlerUpgrErr::NegotiationFailed => ConnectionHandlerUpgrErr::NegotiationFailed, + ConnectionHandlerUpgrErr::Upgrade(inbound_stop::UpgradeError::Fatal(error)) => { + self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade(Either::Left(error))); return; } }; @@ -401,41 +385,20 @@ impl Handler { let non_fatal_error = match error { ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout, ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer, - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )) => ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )), - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - )) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - ), - )); - return; - } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => { - match error { - outbound_hop::UpgradeError::Fatal(error) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(Either::Right(error)), - )); - return; - } - outbound_hop::UpgradeError::ReservationFailed(error) => { - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply( - error, - )) - } - outbound_hop::UpgradeError::CircuitFailed(_) => { - unreachable!( - "Do not emitt `CircuitFailed` for outgoing reservation." - ) - } + ConnectionHandlerUpgrErr::NegotiationFailed => ConnectionHandlerUpgrErr::NegotiationFailed, + ConnectionHandlerUpgrErr::Upgrade(error) => match error { + outbound_hop::UpgradeError::Fatal(error) => { + self.pending_error = + Some(ConnectionHandlerUpgrErr::Upgrade(Either::Right(error))); + return; } - } + outbound_hop::UpgradeError::ReservationFailed(error) => { + ConnectionHandlerUpgrErr::Upgrade(error) + } + outbound_hop::UpgradeError::CircuitFailed(_) => { + unreachable!("Do not emitt `CircuitFailed` for outgoing reservation.") + } + }, }; if self.pending_error.is_none() { @@ -464,41 +427,20 @@ impl Handler { let non_fatal_error = match error { ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout, ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer, - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )) => ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::Failed, - )), - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - )) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Select( - upgrade::NegotiationError::ProtocolError(e), - ), - )); - return; - } - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => { - match error { - outbound_hop::UpgradeError::Fatal(error) => { - self.pending_error = Some(ConnectionHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(Either::Right(error)), - )); - return; - } - outbound_hop::UpgradeError::CircuitFailed(error) => { - ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply( - error, - )) - } - outbound_hop::UpgradeError::ReservationFailed(_) => { - unreachable!( - "Do not emitt `ReservationFailed` for outgoing circuit." - ) - } + ConnectionHandlerUpgrErr::NegotiationFailed => ConnectionHandlerUpgrErr::NegotiationFailed, + ConnectionHandlerUpgrErr::Upgrade(error) => match error { + outbound_hop::UpgradeError::Fatal(error) => { + self.pending_error = + Some(ConnectionHandlerUpgrErr::Upgrade(Either::Right(error))); + return; } - } + outbound_hop::UpgradeError::CircuitFailed(error) => { + ConnectionHandlerUpgrErr::Upgrade(error) + } + outbound_hop::UpgradeError::ReservationFailed(_) => { + unreachable!("Do not emitt `ReservationFailed` for outgoing circuit.") + } + }, }; let _ = send_back.send(Err(())); diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 50cd6adb055..b80cfade866 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -31,7 +31,6 @@ pub use protocol::{ProtocolSupport, RequestProtocol, ResponseProtocol}; use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered}; use instant::Instant; -use libp2p_core::upgrade::{NegotiationError, UpgradeError}; use libp2p_swarm::{ handler::{ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive}, SubstreamProtocol, @@ -148,7 +147,7 @@ where ConnectionHandlerUpgrErr::Timeout => { self.pending_events.push_back(Event::OutboundTimeout(info)); } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + ConnectionHandlerUpgrErr::NegotiationFailed => { // The remote merely doesn't support the protocol(s) we requested. // This is no reason to close the connection, which may // successfully communicate with other protocols already. @@ -175,7 +174,7 @@ where ConnectionHandlerUpgrErr::Timeout => { self.pending_events.push_back(Event::InboundTimeout(info)) } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + ConnectionHandlerUpgrErr::NegotiationFailed => { // The local peer merely doesn't support the protocol(s) requested. // This is no reason to close the connection, which may // successfully communicate with other protocols already. diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 2c6f116571e..097aa761b46 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -71,6 +71,11 @@ - Remove `ConnectionId::new`. Manually creating `ConnectionId`s is now unsupported. See [PR 3327]. +- Close connections on `multistream-select` protocol violations. + As a consequence, `ConnectionHandlerUpgrErr::Upgrade` directly contains the upgrade error now. + Additionally, `ConnectionHandlerUpgrErr::NegotiationFailed` is introduced to represent nodes failing to agree on a protocol for a stream. + See [PR XXXX]. + [PR 3364]: https://github.com/libp2p/rust-libp2p/pull/3364 [PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170 [PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134 @@ -84,6 +89,7 @@ [PR 3373]: https://github.com/libp2p/rust-libp2p/pull/3373 [PR 3374]: https://github.com/libp2p/rust-libp2p/pull/3374 [PR 3375]: https://github.com/libp2p/rust-libp2p/pull/3375 +[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX # 0.41.1 diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index 1546a16bc86..58920075154 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -211,12 +211,11 @@ where let err = match err { ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout, ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer, - ConnectionHandlerUpgrErr::Upgrade(err) => { - ConnectionHandlerUpgrErr::Upgrade(err.map_err(|err| match err { - Either::Left(e) => e, - Either::Right(v) => void::unreachable(v), - })) + ConnectionHandlerUpgrErr::Upgrade(Either::Left(e)) => { + ConnectionHandlerUpgrErr::Upgrade(e) } + ConnectionHandlerUpgrErr::Upgrade(Either::Right(never)) => void::unreachable(never), + ConnectionHandlerUpgrErr::NegotiationFailed => ConnectionHandlerUpgrErr::NegotiationFailed, }; inner.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index baa3896462c..a8d83272ec7 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -41,7 +41,9 @@ use instant::Instant; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerEvent, StreamMuxerExt, SubstreamBox}; -use libp2p_core::upgrade::{InboundUpgradeApply, OutboundUpgradeApply}; +use libp2p_core::upgrade::{ + InboundUpgradeApply, NegotiationError, OutboundUpgradeApply, ProtocolError, +}; use libp2p_core::{upgrade, UpgradeError}; use libp2p_core::{Endpoint, PeerId}; use std::future::Future; @@ -238,36 +240,42 @@ where // In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams. match negotiating_out.poll_next_unpin(cx) { Poll::Pending | Poll::Ready(None) => {} - Poll::Ready(Some((info, Ok(protocol)))) => { + Poll::Ready(Some((info, Ok(Ok(protocol))))) => { handler.on_connection_event(ConnectionEvent::FullyNegotiatedOutbound( FullyNegotiatedOutbound { protocol, info }, )); continue; } - Poll::Ready(Some((info, Err(error)))) => { + Poll::Ready(Some((info, Ok(Err(error))))) => { handler.on_connection_event(ConnectionEvent::DialUpgradeError( DialUpgradeError { info, error }, )); continue; } + Poll::Ready(Some((_, Err(e)))) => { + return Poll::Ready(Err(ConnectionError::Protocol(e))) + } } // In case both the [`ConnectionHandler`] and the negotiating outbound streams can not // make any more progress, poll the negotiating inbound streams. match negotiating_in.poll_next_unpin(cx) { Poll::Pending | Poll::Ready(None) => {} - Poll::Ready(Some((info, Ok(protocol)))) => { + Poll::Ready(Some((info, Ok(Ok(protocol))))) => { handler.on_connection_event(ConnectionEvent::FullyNegotiatedInbound( FullyNegotiatedInbound { protocol, info }, )); continue; } - Poll::Ready(Some((info, Err(error)))) => { + Poll::Ready(Some((info, Ok(Err(error))))) => { handler.on_connection_event(ConnectionEvent::ListenUpgradeError( ListenUpgradeError { info, error }, )); continue; } + Poll::Ready(Some((_, Err(e)))) => { + return Poll::Ready(Err(ConnectionError::Protocol(e))) + } } // Ask the handler whether it wants the connection (and the handler itself) @@ -464,7 +472,7 @@ where { type Output = ( UserData, - Result>, + Result>, ProtocolError>, ); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { @@ -474,7 +482,7 @@ where self.user_data .take() .expect("Future not to be polled again once ready."), - Err(ConnectionHandlerUpgrErr::Timeout), + Ok(Err(ConnectionHandlerUpgrErr::Timeout)), )) } @@ -486,14 +494,28 @@ where self.user_data .take() .expect("Future not to be polled again once ready."), - Ok(upgrade), + Ok(Ok(upgrade)), + )), + Poll::Ready(Err(UpgradeError::Apply(err))) => Poll::Ready(( + self.user_data + .take() + .expect("Future not to be polled again once ready."), + Ok(Err(ConnectionHandlerUpgrErr::Upgrade(err))), )), - Poll::Ready(Err(err)) => Poll::Ready(( + Poll::Ready(Err(UpgradeError::Select(NegotiationError::Failed))) => Poll::Ready(( self.user_data .take() .expect("Future not to be polled again once ready."), - Err(ConnectionHandlerUpgrErr::Upgrade(err)), + Ok(Err(ConnectionHandlerUpgrErr::NegotiationFailed)), )), + Poll::Ready(Err(UpgradeError::Select(NegotiationError::ProtocolError(err)))) => { + Poll::Ready(( + self.user_data + .take() + .expect("Future not to be polled again once ready."), + Err(err), + )) + } Poll::Pending => Poll::Pending, } } diff --git a/swarm/src/connection/error.rs b/swarm/src/connection/error.rs index a5a6136f0b1..8fc7fefb057 100644 --- a/swarm/src/connection/error.rs +++ b/swarm/src/connection/error.rs @@ -21,6 +21,7 @@ use crate::transport::TransportError; use crate::Multiaddr; use crate::{connection::ConnectionLimit, ConnectedPoint, PeerId}; +use libp2p_core::upgrade::ProtocolError; use std::{fmt, io}; /// Errors that can occur in the context of an established `Connection`. @@ -35,6 +36,11 @@ pub enum ConnectionError { /// The connection handler produced an error. Handler(THandlerErr), + + /// A unrecoverable [`ProtocolError`] occurred on one of the streams of this connection. + /// + /// A protocol violation results in the entire connection being closed. + Protocol(ProtocolError), } impl fmt::Display for ConnectionError @@ -48,6 +54,7 @@ where write!(f, "Connection closed due to expired keep-alive timeout.") } ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {err}"), + ConnectionError::Protocol(_) => write!(f, "Unrecoverable protocol error occurred"), } } } @@ -61,6 +68,7 @@ where ConnectionError::IO(err) => Some(err), ConnectionError::KeepAliveTimeout => None, ConnectionError::Handler(err) => Some(err), + ConnectionError::Protocol(err) => Some(err), } } } diff --git a/swarm/src/dummy.rs b/swarm/src/dummy.rs index 2a7c1a9481f..97b8be95f4c 100644 --- a/swarm/src/dummy.rs +++ b/swarm/src/dummy.rs @@ -9,7 +9,6 @@ use crate::{ }; use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::PeerId; -use libp2p_core::UpgradeError; use std::task::{Context, Poll}; use void::Void; @@ -117,10 +116,8 @@ impl crate::handler::ConnectionHandler for ConnectionHandler { ConnectionEvent::DialUpgradeError(DialUpgradeError { info: _, error }) => match error { ConnectionHandlerUpgrErr::Timeout => unreachable!(), ConnectionHandlerUpgrErr::Timer => unreachable!(), - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => void::unreachable(e), - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(_)) => { - unreachable!("Denied upgrade does not support any protocols") - } + ConnectionHandlerUpgrErr::Upgrade(e) => void::unreachable(e), + ConnectionHandlerUpgrErr::NegotiationFailed => unreachable!(), }, ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} } diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index f60e94e2465..c989529a01c 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -49,7 +49,7 @@ mod select; pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend}; use instant::Instant; -use libp2p_core::{upgrade::UpgradeError, ConnectedPoint, Multiaddr, PeerId}; +use libp2p_core::{ConnectedPoint, Multiaddr, PeerId}; use std::{cmp::Ordering, error, fmt, task::Context, task::Poll, time::Duration}; pub use map_in::MapInEvent; @@ -438,19 +438,22 @@ pub enum ConnectionHandlerUpgrErr { /// There was an error in the timer used. Timer, /// Error while upgrading the substream to the protocol we want. - Upgrade(UpgradeError), + Upgrade(TUpgrErr), + /// Protocol negotiation failed, in other words, the two nodes could not agree on a protocol for this stream. + NegotiationFailed, } impl ConnectionHandlerUpgrErr { /// Map the inner [`UpgradeError`] type. pub fn map_upgrade_err(self, f: F) -> ConnectionHandlerUpgrErr where - F: FnOnce(UpgradeError) -> UpgradeError, + F: FnOnce(TUpgrErr) -> E, { match self { ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout, ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer, ConnectionHandlerUpgrErr::Upgrade(e) => ConnectionHandlerUpgrErr::Upgrade(f(e)), + ConnectionHandlerUpgrErr::NegotiationFailed => ConnectionHandlerUpgrErr::NegotiationFailed, } } } @@ -468,6 +471,7 @@ where write!(f, "Timer error while opening a substream") } ConnectionHandlerUpgrErr::Upgrade(err) => write!(f, "{err}"), + ConnectionHandlerUpgrErr::NegotiationFailed => write!(f, "Protocol negotiation failed"), } } } @@ -481,6 +485,7 @@ where ConnectionHandlerUpgrErr::Timeout => None, ConnectionHandlerUpgrErr::Timer => None, ConnectionHandlerUpgrErr::Upgrade(err) => Some(err), + ConnectionHandlerUpgrErr::NegotiationFailed => None, } } } diff --git a/swarm/src/handler/either.rs b/swarm/src/handler/either.rs index e2c72bb3a10..7a6abd37694 100644 --- a/swarm/src/handler/either.rs +++ b/swarm/src/handler/either.rs @@ -26,7 +26,6 @@ use crate::handler::{ use crate::upgrade::SendWrapper; use either::Either; use futures::future; -use libp2p_core::upgrade::UpgradeError; use libp2p_core::{ConnectedPoint, PeerId}; use std::task::{Context, Poll}; @@ -122,31 +121,17 @@ where fn transpose(self) -> Either, ListenUpgradeError> { match self { ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Left(error))), + error: ConnectionHandlerUpgrErr::Upgrade(Either::Left(error)), info: Either::Left(info), } => Either::Left(ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(error)), + error: ConnectionHandlerUpgrErr::Upgrade(error), info, }), ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Right(error))), + error: ConnectionHandlerUpgrErr::Upgrade(Either::Right(error)), info: Either::Right(info), } => Either::Right(ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(error)), - info, - }), - ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), - info: Either::Left(info), - } => Either::Left(ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), - info, - }), - ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), - info: Either::Right(info), - } => Either::Right(ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)), + error: ConnectionHandlerUpgrErr::Upgrade(error), info, }), ListenUpgradeError { diff --git a/swarm/src/handler/multi.rs b/swarm/src/handler/multi.rs index 35f0ebd7995..2301011d6d1 100644 --- a/swarm/src/handler/multi.rs +++ b/swarm/src/handler/multi.rs @@ -29,7 +29,7 @@ use crate::handler::{ use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, UpgradeInfoSend}; use crate::NegotiatedSubstream; use futures::{future::BoxFuture, prelude::*}; -use libp2p_core::upgrade::{NegotiationError, ProtocolError, ProtocolName, UpgradeError}; +use libp2p_core::upgrade::ProtocolName; use libp2p_core::{ConnectedPoint, PeerId}; use rand::Rng; use std::{ @@ -116,94 +116,25 @@ where } } } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + ConnectionHandlerUpgrErr::NegotiationFailed => { for (k, h) in &mut self.handlers { if let Some(i) = info.take(k) { h.on_connection_event(ConnectionEvent::ListenUpgradeError( ListenUpgradeError { info: i, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::Failed, - )), + error: ConnectionHandlerUpgrErr::NegotiationFailed, }, )); } } } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::ProtocolError(e), - )) => match e { - ProtocolError::IoError(e) => { - for (k, h) in &mut self.handlers { - if let Some(i) = info.take(k) { - let e = NegotiationError::ProtocolError(ProtocolError::IoError( - e.kind().into(), - )); - h.on_connection_event(ConnectionEvent::ListenUpgradeError( - ListenUpgradeError { - info: i, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - e, - )), - }, - )); - } - } - } - ProtocolError::InvalidMessage => { - for (k, h) in &mut self.handlers { - if let Some(i) = info.take(k) { - let e = NegotiationError::ProtocolError(ProtocolError::InvalidMessage); - h.on_connection_event(ConnectionEvent::ListenUpgradeError( - ListenUpgradeError { - info: i, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - e, - )), - }, - )); - } - } - } - ProtocolError::InvalidProtocol => { - for (k, h) in &mut self.handlers { - if let Some(i) = info.take(k) { - let e = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol); - h.on_connection_event(ConnectionEvent::ListenUpgradeError( - ListenUpgradeError { - info: i, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - e, - )), - }, - )); - } - } - } - ProtocolError::TooManyProtocols => { - for (k, h) in &mut self.handlers { - if let Some(i) = info.take(k) { - let e = - NegotiationError::ProtocolError(ProtocolError::TooManyProtocols); - h.on_connection_event(ConnectionEvent::ListenUpgradeError( - ListenUpgradeError { - info: i, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - e, - )), - }, - )); - } - } - } - }, - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply((k, e))) => { + ConnectionHandlerUpgrErr::Upgrade((k, e)) => { if let Some(h) = self.handlers.get_mut(&k) { if let Some(i) = info.take(&k) { h.on_connection_event(ConnectionEvent::ListenUpgradeError( ListenUpgradeError { info: i, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), + error: ConnectionHandlerUpgrErr::Upgrade(e), }, )); } diff --git a/swarm/src/handler/select.rs b/swarm/src/handler/select.rs index 8c38ffe747d..aa158c0a27d 100644 --- a/swarm/src/handler/select.rs +++ b/swarm/src/handler/select.rs @@ -28,10 +28,7 @@ use crate::upgrade::SendWrapper; use either::Either; use futures::future; -use libp2p_core::{ - upgrade::{NegotiationError, ProtocolError, SelectUpgrade, UpgradeError}, - ConnectedPoint, PeerId, -}; +use libp2p_core::{upgrade::SelectUpgrade, ConnectedPoint, PeerId}; use std::{cmp, task::Context, task::Poll}; /// Implementation of `IntoConnectionHandler` that combines two protocols into one. @@ -175,17 +172,10 @@ where }), DialUpgradeError { info: Either::Left(info), - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), - } => Either::Left(DialUpgradeError { - info, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), - }), - DialUpgradeError { - info: Either::Left(info), - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Left(err))), + error: ConnectionHandlerUpgrErr::Upgrade(Either::Left(err)), } => Either::Left(DialUpgradeError { info, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)), + error: ConnectionHandlerUpgrErr::Upgrade(err), }), DialUpgradeError { info: Either::Right(info), @@ -203,17 +193,10 @@ where }), DialUpgradeError { info: Either::Right(info), - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), - } => Either::Right(DialUpgradeError { - info, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), - }), - DialUpgradeError { - info: Either::Right(info), - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Right(err))), + error: ConnectionHandlerUpgrErr::Upgrade(Either::Right(err)), } => Either::Right(DialUpgradeError { info, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)), + error: ConnectionHandlerUpgrErr::Upgrade(err), }), _ => panic!("Wrong API usage; the upgrade error doesn't match the outbound open info"), } @@ -262,70 +245,31 @@ where error: ConnectionHandlerUpgrErr::Timeout, })); } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + ConnectionHandlerUpgrErr::NegotiationFailed => { self.proto1 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i1, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::Failed, - )), + error: ConnectionHandlerUpgrErr::NegotiationFailed, })); self.proto2 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i2, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::Failed, - )), - })); - } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::ProtocolError(e), - )) => { - let (e1, e2); - match e { - ProtocolError::IoError(e) => { - e1 = NegotiationError::ProtocolError(ProtocolError::IoError( - e.kind().into(), - )); - e2 = NegotiationError::ProtocolError(ProtocolError::IoError(e)) - } - ProtocolError::InvalidMessage => { - e1 = NegotiationError::ProtocolError(ProtocolError::InvalidMessage); - e2 = NegotiationError::ProtocolError(ProtocolError::InvalidMessage) - } - ProtocolError::InvalidProtocol => { - e1 = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol); - e2 = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol) - } - ProtocolError::TooManyProtocols => { - e1 = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols); - e2 = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols) - } - } - self.proto1 - .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - info: i1, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e1)), - })); - self.proto2 - .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { - info: i2, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e2)), + error: ConnectionHandlerUpgrErr::NegotiationFailed, })); } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Left(e))) => { + ConnectionHandlerUpgrErr::Upgrade(Either::Left(e)) => { self.proto1 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i1, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), + error: ConnectionHandlerUpgrErr::Upgrade(e), })); } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(Either::Right(e))) => { + ConnectionHandlerUpgrErr::Upgrade(Either::Right(e)) => { self.proto2 .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: i2, - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), + error: ConnectionHandlerUpgrErr::Upgrade(e), })); } }