diff --git a/src/crypto/ed25519.rs b/src/crypto/ed25519.rs index af10a20c4..d95aaa395 100644 --- a/src/crypto/ed25519.rs +++ b/src/crypto/ed25519.rs @@ -21,7 +21,10 @@ //! Ed25519 keys. -use crate::{error::Error, PeerId}; +use crate::{ + error::{Error, ParseError}, + PeerId, +}; use ed25519_dalek::{self as ed25519, Signer as _, Verifier as _}; use std::fmt; @@ -131,11 +134,13 @@ impl PublicKey { /// Try to parse a public key from a byte array containing the actual key as produced by /// `to_bytes`. - pub fn try_from_bytes(k: &[u8]) -> crate::Result { - let k = <[u8; 32]>::try_from(k) - .map_err(|e| Error::Other(format!("Failed to parse ed25519 public key: {e}")))?; + pub fn try_from_bytes(k: &[u8]) -> Result { + let k = <[u8; 32]>::try_from(k).map_err(|_| ParseError::InvalidPublicKey)?; + + // The error type of the verifying key is deliberately opaque as to avoid side-channel + // leakage. We can't provide a more specific error type here. ed25519::VerifyingKey::from_bytes(&k) - .map_err(|e| Error::Other(format!("Failed to parse ed25519 public key: {e}"))) + .map_err(|_| ParseError::InvalidPublicKey) .map(PublicKey) } diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs index b565639a9..f98ab70f5 100644 --- a/src/crypto/mod.rs +++ b/src/crypto/mod.rs @@ -21,7 +21,7 @@ //! Crypto-related code. -use crate::{error::*, peer_id::*}; +use crate::{error::ParseError, peer_id::*}; pub mod ed25519; pub(crate) mod noise; @@ -65,11 +65,10 @@ impl PublicKey { /// Decode a public key from a protobuf structure, e.g. read from storage /// or received from another node. - pub fn from_protobuf_encoding(bytes: &[u8]) -> crate::Result { + pub fn from_protobuf_encoding(bytes: &[u8]) -> Result { use prost::Message; - let pubkey = keys_proto::PublicKey::decode(bytes) - .map_err(|error| Error::Other(format!("Invalid Protobuf: {error:?}")))?; + let pubkey = keys_proto::PublicKey::decode(bytes)?; pubkey.try_into() } @@ -92,19 +91,16 @@ impl From<&PublicKey> for keys_proto::PublicKey { } impl TryFrom for PublicKey { - type Error = Error; + type Error = ParseError; fn try_from(pubkey: keys_proto::PublicKey) -> Result { let key_type = keys_proto::KeyType::try_from(pubkey.r#type) - .map_err(|_| Error::Other(format!("Unknown key type: {}", pubkey.r#type)))?; - - match key_type { - keys_proto::KeyType::Ed25519 => - Ok(ed25519::PublicKey::try_from_bytes(&pubkey.data).map(PublicKey::Ed25519)?), - _ => Err(Error::Other(format!( - "Unsupported key type: {}", - key_type.as_str_name() - ))), + .map_err(|_| ParseError::UnknownKeyType(pubkey.r#type))?; + + if key_type == keys_proto::KeyType::Ed25519 { + Ok(ed25519::PublicKey::try_from_bytes(&pubkey.data).map(PublicKey::Ed25519)?) + } else { + Err(ParseError::UnknownKeyType(key_type as i32)) } } } diff --git a/src/crypto/noise/mod.rs b/src/crypto/noise/mod.rs index f130042cf..8254121c6 100644 --- a/src/crypto/noise/mod.rs +++ b/src/crypto/noise/mod.rs @@ -24,7 +24,8 @@ use crate::{ config::Role, crypto::{ed25519::Keypair, PublicKey}, - error, PeerId, + error::{NegotiationError, ParseError}, + PeerId, }; use bytes::{Buf, Bytes, BytesMut}; @@ -103,7 +104,7 @@ impl NoiseContext { keypair: snow::Keypair, id_keys: &Keypair, role: Role, - ) -> crate::Result { + ) -> Result { let noise_payload = handshake_schema::NoiseHandshakePayload { identity_key: Some(PublicKey::Ed25519(id_keys.public()).to_protobuf_encoding()), identity_sig: Some( @@ -113,7 +114,7 @@ impl NoiseContext { }; let mut payload = Vec::with_capacity(noise_payload.encoded_len()); - noise_payload.encode(&mut payload)?; + noise_payload.encode(&mut payload).map_err(ParseError::from)?; Ok(Self { noise: NoiseState::Handshake(noise), @@ -123,7 +124,7 @@ impl NoiseContext { }) } - pub fn new(keypair: &Keypair, role: Role) -> crate::Result { + pub fn new(keypair: &Keypair, role: Role) -> Result { tracing::trace!(target: LOG_TARGET, ?role, "create new noise configuration"); let builder: Builder<'_> = Builder::with_resolver( @@ -144,7 +145,7 @@ impl NoiseContext { /// Create new [`NoiseContext`] with prologue. #[cfg(feature = "webrtc")] - pub fn with_prologue(id_keys: &Keypair, prologue: Vec) -> crate::Result { + pub fn with_prologue(id_keys: &Keypair, prologue: Vec) -> Result { let noise: Builder<'_> = Builder::with_resolver( NOISE_PARAMETERS.parse().expect("qed; Valid noise pattern"), Box::new(protocol::Resolver), @@ -162,35 +163,36 @@ impl NoiseContext { /// Get remote public key from the received Noise payload. #[cfg(feature = "webrtc")] - pub fn get_remote_public_key(&mut self, reply: &[u8]) -> crate::Result { + pub fn get_remote_public_key(&mut self, reply: &[u8]) -> Result { let (len_slice, reply) = reply.split_at(2); - let len = u16::from_be_bytes(len_slice.try_into().map_err(|_| error::Error::InvalidData)?) - as usize; + let len = u16::from_be_bytes( + len_slice + .try_into() + .map_err(|_| NegotiationError::ParseError(ParseError::InvalidPublicKey))?, + ) as usize; let mut buffer = vec![0u8; len]; let NoiseState::Handshake(ref mut noise) = self.noise else { tracing::error!(target: LOG_TARGET, "invalid state to read the second handshake message"); debug_assert!(false); - return Err(error::Error::Other( - "Noise state missmatch: expected handshake".into(), - )); + return Err(NegotiationError::StateMismatch); }; let res = noise.read_message(reply, &mut buffer)?; buffer.truncate(res); - let payload = handshake_schema::NoiseHandshakePayload::decode(buffer.as_slice())?; + let payload = handshake_schema::NoiseHandshakePayload::decode(buffer.as_slice()) + .map_err(|err| NegotiationError::ParseError(err.into()))?; - PublicKey::from_protobuf_encoding(&payload.identity_key.ok_or( - error::Error::NegotiationError(error::NegotiationError::PeerIdMissing), - )?) + let identity = payload.identity_key.ok_or(NegotiationError::PeerIdMissing)?; + PublicKey::from_protobuf_encoding(&identity).map_err(|err| err.into()) } /// Get first message. /// /// Listener only sends one message (the payload) - pub fn first_message(&mut self, role: Role) -> crate::Result> { + pub fn first_message(&mut self, role: Role) -> Result, NegotiationError> { match role { Role::Dialer => { tracing::trace!(target: LOG_TARGET, "get noise dialer first message"); @@ -198,9 +200,7 @@ impl NoiseContext { let NoiseState::Handshake(ref mut noise) = self.noise else { tracing::error!(target: LOG_TARGET, "invalid state to read the first handshake message"); debug_assert!(false); - return Err(error::Error::Other( - "Noise state missmatch: expected handshake".into(), - )); + return Err(NegotiationError::StateMismatch); }; let mut buffer = vec![0u8; 256]; @@ -220,15 +220,13 @@ impl NoiseContext { /// Get second message. /// /// Only the dialer sends the second message. - pub fn second_message(&mut self) -> crate::Result> { + pub fn second_message(&mut self) -> Result, NegotiationError> { tracing::trace!(target: LOG_TARGET, "get noise paylod message"); let NoiseState::Handshake(ref mut noise) = self.noise else { tracing::error!(target: LOG_TARGET, "invalid state to read the first handshake message"); debug_assert!(false); - return Err(error::Error::Other( - "Noise state missmatch: expected handshake".into(), - )); + return Err(NegotiationError::StateMismatch); }; let mut buffer = vec![0u8; 2048]; @@ -246,7 +244,7 @@ impl NoiseContext { async fn read_handshake_message( &mut self, io: &mut T, - ) -> crate::Result { + ) -> Result { let mut size = BytesMut::zeroed(2); io.read_exact(&mut size).await?; let size = size.get_u16(); @@ -260,9 +258,7 @@ impl NoiseContext { let NoiseState::Handshake(ref mut noise) = self.noise else { tracing::error!(target: LOG_TARGET, "invalid state to read handshake message"); debug_assert!(false); - return Err(error::Error::Other( - "Noise state missmatch: expected handshake".into(), - )); + return Err(NegotiationError::StateMismatch); }; let nread = noise.read_message(&message, &mut out)?; @@ -286,13 +282,10 @@ impl NoiseContext { } /// Convert Noise into transport mode. - fn into_transport(self) -> crate::Result { + fn into_transport(self) -> Result { let transport = match self.noise { NoiseState::Handshake(noise) => noise.into_transport_mode()?, - NoiseState::Transport(_) => - return Err(error::Error::Other( - "Noise state missmatch: expected handshake".into(), - )), + NoiseState::Transport(_) => return Err(NegotiationError::StateMismatch), }; Ok(NoiseContext { @@ -664,15 +657,15 @@ impl AsyncWrite for NoiseSocket { } /// Try to parse `PeerId` from received `NoiseHandshakePayload` -fn parse_peer_id(buf: &[u8]) -> crate::Result { +fn parse_peer_id(buf: &[u8]) -> Result { match handshake_schema::NoiseHandshakePayload::decode(buf) { Ok(payload) => { - let public_key = PublicKey::from_protobuf_encoding(&payload.identity_key.ok_or( - error::Error::NegotiationError(error::NegotiationError::PeerIdMissing), - )?)?; + let identity = payload.identity_key.ok_or(NegotiationError::PeerIdMissing)?; + + let public_key = PublicKey::from_protobuf_encoding(&identity)?; Ok(PeerId::from_public_key(&public_key)) } - Err(err) => Err(From::from(err)), + Err(err) => Err(ParseError::from(err).into()), } } @@ -683,7 +676,7 @@ pub async fn handshake( role: Role, max_read_ahead_factor: usize, max_write_buffer_size: usize, -) -> crate::Result<(NoiseSocket, PeerId)> { +) -> Result<(NoiseSocket, PeerId), NegotiationError> { tracing::debug!(target: LOG_TARGET, ?role, "start noise handshake"); let mut noise = NoiseContext::new(keypair, role)?; @@ -797,7 +790,7 @@ mod tests { #[test] fn invalid_peer_id_schema() { match parse_peer_id(&vec![1, 2, 3, 4]).unwrap_err() { - crate::Error::ParseError(_) => {} + NegotiationError::ParseError(_) => {} _ => panic!("invalid error"), } } diff --git a/src/error.rs b/src/error.rs index 947f22397..f05a83c3c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -48,15 +48,15 @@ pub enum Error { #[error("Protocol `{0}` not supported")] ProtocolNotSupported(String), #[error("Address error: `{0}`")] - AddressError(AddressError), + AddressError(#[from] AddressError), #[error("Parse error: `{0}`")] ParseError(ParseError), #[error("I/O error: `{0}`")] IoError(ErrorKind), #[error("Negotiation error: `{0}`")] - NegotiationError(NegotiationError), + NegotiationError(#[from] NegotiationError), #[error("Substream error: `{0}`")] - SubstreamError(SubstreamError), + SubstreamError(#[from] SubstreamError), #[error("Substream error: `{0}`")] NotificationError(NotificationError), #[error("Essential task closed")] @@ -127,24 +127,57 @@ pub enum Error { ConnectionLimit(ConnectionLimitsError), } +/// Error type for address parsing. #[derive(Debug, thiserror::Error)] pub enum AddressError { - #[error("Invalid protocol")] + /// The provided address does not correspond to the transport protocol. + /// + /// For example, this can happen when the address used the UDP protocol but + /// the handling transport only allows TCP connections. + #[error("Invalid address for protocol")] InvalidProtocol, + /// The provided address is not a valid URL. + #[error("Invalid URL")] + InvalidUrl, + /// The provided address does not include a peer ID. #[error("`PeerId` missing from the address")] PeerIdMissing, + /// No address is available for the provided peer ID. #[error("Address not available")] AddressNotAvailable, + /// The provided address contains an invalid multihash. + #[error("Multihash does not contain a valid peer ID : `{0:?}`")] + InvalidPeerId(Multihash), } #[derive(Debug, thiserror::Error)] pub enum ParseError { - #[error("Invalid multihash: `{0:?}`")] - InvalidMultihash(Multihash), + /// The provided probuf message cannot be decoded. #[error("Failed to decode protobuf message: `{0:?}`")] - ProstDecodeError(prost::DecodeError), + ProstDecodeError(#[from] prost::DecodeError), + /// The provided protobuf message cannot be encoded. #[error("Failed to encode protobuf message: `{0:?}`")] - ProstEncodeError(prost::EncodeError), + ProstEncodeError(#[from] prost::EncodeError), + /// The protobuf message contains an unexpected key type. + /// + /// This error can happen when: + /// - The provided key type is not recognized. + /// - The provided key type is recognized but not supported. + #[error("Unknown key type from protobuf message: `{0}`")] + UnknownKeyType(i32), + /// The public key bytes are invalid and cannot be parsed. + /// + /// This error can happen when: + /// - The received number of bytes is not equal to the expected number of bytes (32 bytes). + /// - The bytes are not a valid Ed25519 public key. + /// - Length of the public key is not represented by 2 bytes (WebRTC specific). + #[error("Invalid public key")] + InvalidPublicKey, + /// The provided date has an invalid format. + /// + /// This error is protocol specific. + #[error("Invalid data")] + InvalidData, } #[derive(Debug, thiserror::Error)] @@ -152,23 +185,51 @@ pub enum SubstreamError { #[error("Connection closed")] ConnectionClosed, #[error("yamux error: `{0}`")] - YamuxError(crate::yamux::ConnectionError), + YamuxError(crate::yamux::ConnectionError, Direction), #[error("Failed to read from substream, substream id `{0:?}`")] ReadFailure(Option), #[error("Failed to write to substream, substream id `{0:?}`")] WriteFailure(Option), + #[error("Negotiation error: `{0:?}`")] + NegotiationError(#[from] NegotiationError), } +/// Error during the negotiation phase. #[derive(Debug, thiserror::Error)] pub enum NegotiationError { + /// Error occurred during the multistream-select phase of the negotiation. #[error("multistream-select error: `{0:?}`")] - MultistreamSelectError(crate::multistream_select::NegotiationError), + MultistreamSelectError(#[from] crate::multistream_select::NegotiationError), + /// Error occurred during the Noise handshake negotiation. #[error("multistream-select error: `{0:?}`")] - SnowError(snow::Error), - #[error("Connection closed while negotiating")] - ConnectionClosed, + SnowError(#[from] snow::Error), + /// The peer ID was not provided by the noise handshake. #[error("`PeerId` missing from Noise handshake")] PeerIdMissing, + /// The negotiation operation timed out. + #[error("Operation timed out")] + Timeout, + /// The message provided over the wire has an invalid format or is unsupported. + #[error("Parse error: `{0}`")] + ParseError(#[from] ParseError), + /// An I/O error occurred during the negotiation process. + #[error("I/O error: `{0}`")] + IoError(ErrorKind), + /// Expected a different state during the negotiation process. + #[error("Expected a different state")] + StateMismatch, + /// The noise handshake provided a different peer ID than the one expected in the dialing + /// address. + #[error("Peer ID mismatch: expected `{0}`, got `{1}`")] + PeerIdMismatch(PeerId, PeerId), + /// Error specific to the QUIC transport. + #[cfg(feature = "quic")] + #[error("QUIC error: `{0}`")] + Quic(#[from] QuicError), + /// Error specific to the WebSocket transport. + #[cfg(feature = "websocket")] + #[error("WebSocket error: `{0}`")] + WebSocket(#[from] tokio_tungstenite::tungstenite::error::Error), } #[derive(Debug, thiserror::Error)] @@ -183,19 +244,62 @@ pub enum NotificationError { NotificationStreamClosed(PeerId), } +/// The error type for dialing a peer. +/// +/// This error is reported via the litep2p events. #[derive(Debug, thiserror::Error)] pub enum DialError { - #[error("Tried to dial self")] - TriedToDialSelf, - #[error("Already connected to peer")] - AlreadyConnected, - #[error("Peer doens't have any known addresses")] - NoAddressAvailable(PeerId), + /// The dialing operation timed out. + /// + /// This error indicates that the `connection_open_timeout` from the protocol configuration + /// was exceeded. + #[error("Dial timed out")] + Timeout, + /// The provided address for dialing is invalid. + #[error("Address error: `{0}`")] + AddressError(#[from] AddressError), + /// An error occurred during DNS lookup operation. + /// + /// The address provided may be valid, however it failed to resolve to a concrete IP address. + /// This error may be recoverable. + #[error("DNS lookup error for `{0}`")] + DnsError(#[from] DnsError), + /// An error occurred during the negotiation process. + #[error("Negotiation error: `{0}`")] + NegotiationError(#[from] NegotiationError), +} + +/// Error during the QUIC transport negotiation. +#[cfg(feature = "quic")] +#[derive(Debug, thiserror::Error)] +pub enum QuicError { + /// The provided certificate is invalid. + #[error("Invalid certificate")] + InvalidCertificate, + /// The connection was lost. + #[error("Failed to negotiate QUIC: `{0}`")] + ConnectionError(#[from] quinn::ConnectionError), + /// The connection could not be established. + #[error("Failed to connect to peer: `{0}`")] + ConnectError(#[from] quinn::ConnectError), +} + +/// Error during DNS resolution. +#[derive(Debug, thiserror::Error)] +pub enum DnsError { + /// The DNS resolution failed to resolve the provided URL. + #[error("DNS failed to resolve url `{0}`")] + ResolveError(String), + /// The DNS expected a different IP address version. + /// + /// For example, DNSv4 was expected but DNSv6 was provided. + #[error("DNS type is different from the provided IP address")] + IpVersionMismatch, } impl From> for Error { fn from(hash: MultihashGeneric<64>) -> Self { - Error::ParseError(ParseError::InvalidMultihash(hash)) + Error::AddressError(AddressError::InvalidPeerId(hash)) } } @@ -205,6 +309,12 @@ impl From for Error { } } +impl From for DialError { + fn from(error: io::Error) -> Self { + DialError::NegotiationError(NegotiationError::IoError(error.kind())) + } +} + impl From for Error { fn from(error: crate::multistream_select::NegotiationError) -> Error { Error::NegotiationError(NegotiationError::MultistreamSelectError(error)) @@ -241,6 +351,24 @@ impl From for Error { } } +impl From for NegotiationError { + fn from(error: io::Error) -> Self { + NegotiationError::IoError(error.kind()) + } +} + +impl From for Error { + fn from(error: ParseError) -> Self { + Error::ParseError(error) + } +} + +impl From> for AddressError { + fn from(hash: MultihashGeneric<64>) -> Self { + AddressError::InvalidPeerId(hash) + } +} + #[cfg(feature = "quic")] impl From for Error { fn from(error: quinn::ConnectionError) -> Self { @@ -251,6 +379,23 @@ impl From for Error { } } +#[cfg(feature = "quic")] +impl From for DialError { + fn from(error: quinn::ConnectionError) -> Self { + match error { + quinn::ConnectionError::TimedOut => DialError::Timeout, + error => DialError::NegotiationError(NegotiationError::Quic(error.into())), + } + } +} + +#[cfg(feature = "quic")] +impl From for DialError { + fn from(error: quinn::ConnectError) -> Self { + DialError::NegotiationError(NegotiationError::Quic(error.into())) + } +} + impl From for Error { fn from(error: ConnectionLimitsError) -> Self { Error::ConnectionLimit(error) @@ -264,18 +409,6 @@ mod tests { #[tokio::test] async fn try_from_errors() { - tracing::trace!("{:?}", NotificationError::InvalidState); - tracing::trace!("{:?}", DialError::AlreadyConnected); - tracing::trace!( - "{:?}", - SubstreamError::YamuxError(crate::yamux::ConnectionError::Closed) - ); - tracing::trace!("{:?}", AddressError::PeerIdMissing); - tracing::trace!( - "{:?}", - ParseError::InvalidMultihash(Multihash::from(PeerId::random())) - ); - let (tx, rx) = channel(1); drop(rx); diff --git a/src/lib.rs b/src/lib.rs index 6222c2413..c42d6fda2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,6 +50,7 @@ use crate::transport::webrtc::WebRtcTransport; #[cfg(feature = "websocket")] use crate::transport::websocket::WebSocketTransport; +use error::DialError; use multiaddr::{Multiaddr, Protocol}; use multihash::Multihash; use transport::Endpoint; @@ -112,12 +113,22 @@ pub enum Litep2pEvent { }, /// Failed to dial peer. + /// + /// This error can originate from dialing a single peer address. DialFailure { /// Address of the peer. address: Multiaddr, /// Dial error. - error: Error, + error: DialError, + }, + + /// A list of multiple dial failures. + ListDialFailures { + /// List of errors. + /// + /// Depending on the transport, the address might be different for each error. + errors: Vec<(Multiaddr, DialError)>, }, } @@ -489,6 +500,10 @@ impl Litep2p { }), TransportEvent::DialFailure { address, error, .. } => return Some(Litep2pEvent::DialFailure { address, error }), + + TransportEvent::OpenFailure { errors, .. } => { + return Some(Litep2pEvent::ListDialFailures { errors }); + } _ => {} } } diff --git a/src/multistream_select/dialer_select.rs b/src/multistream_select/dialer_select.rs index d28435a3e..0bd9c259d 100644 --- a/src/multistream_select/dialer_select.rs +++ b/src/multistream_select/dialer_select.rs @@ -22,7 +22,7 @@ use crate::{ codec::unsigned_varint::UnsignedVarint, - error::{self, Error}, + error::{self, Error, ParseError}, multistream_select::{ protocol::{ encode_multistream_message, HeaderLine, Message, MessageIO, Protocol, ProtocolError, @@ -354,19 +354,23 @@ impl DialerState { } /// Register response to [`DialerState`]. - pub fn register_response(&mut self, payload: Vec) -> crate::Result { + pub fn register_response( + &mut self, + payload: Vec, + ) -> Result { let Message::Protocols(protocols) = - Message::decode(payload.into()).map_err(|_| Error::InvalidData)? + Message::decode(payload.into()).map_err(|_| ParseError::InvalidData)? else { - return Err(Error::NegotiationError( - error::NegotiationError::MultistreamSelectError(NegotiationError::Failed), + return Err(crate::error::NegotiationError::MultistreamSelectError( + NegotiationError::Failed, )); }; let mut protocol_iter = protocols.into_iter(); loop { match (&self.state, protocol_iter.next()) { - (HandshakeState::WaitingResponse, None) => return Err(Error::InvalidState), + (HandshakeState::WaitingResponse, None) => + return Err(crate::error::NegotiationError::StateMismatch), (HandshakeState::WaitingResponse, Some(protocol)) => { let header = Protocol::try_from(&b"/multistream/1.0.0"[..]) .expect("valid multitstream-select header"); @@ -374,10 +378,8 @@ impl DialerState { if protocol == header { self.state = HandshakeState::WaitingProtocol; } else { - return Err(Error::NegotiationError( - error::NegotiationError::MultistreamSelectError( - NegotiationError::Failed, - ), + return Err(crate::error::NegotiationError::MultistreamSelectError( + NegotiationError::Failed, )); } } @@ -392,8 +394,8 @@ impl DialerState { } } - return Err(Error::NegotiationError( - error::NegotiationError::MultistreamSelectError(NegotiationError::Failed), + return Err(crate::error::NegotiationError::MultistreamSelectError( + NegotiationError::Failed, )); } (HandshakeState::WaitingProtocol, None) => { @@ -816,9 +818,7 @@ mod tests { DialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap(); match dialer_state.register_response(bytes.freeze().to_vec()) { - Err(Error::NegotiationError(error::NegotiationError::MultistreamSelectError( - NegotiationError::Failed, - ))) => {} + Err(error::NegotiationError::MultistreamSelectError(NegotiationError::Failed)) => {} event => panic!("invalid event: {event:?}"), } } @@ -837,9 +837,7 @@ mod tests { DialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap(); match dialer_state.register_response(bytes.freeze().to_vec()) { - Err(Error::NegotiationError(error::NegotiationError::MultistreamSelectError( - NegotiationError::Failed, - ))) => {} + Err(error::NegotiationError::MultistreamSelectError(NegotiationError::Failed)) => {} event => panic!("invalid event: {event:?}"), } } diff --git a/src/multistream_select/negotiated.rs b/src/multistream_select/negotiated.rs index 8eee84595..846399aab 100644 --- a/src/multistream_select/negotiated.rs +++ b/src/multistream_select/negotiated.rs @@ -350,21 +350,17 @@ where } /// Error that can happen when negotiating a protocol with the remote. -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum NegotiationError { /// A protocol error occurred during the negotiation. - ProtocolError(ProtocolError), + #[error("A protocol error occurred during the negotiation: `{0:?}`")] + ProtocolError(#[from] ProtocolError), /// Protocol negotiation failed because no protocol could be agreed upon. + #[error("Protocol negotiation failed.")] Failed, } -impl From for NegotiationError { - fn from(err: ProtocolError) -> NegotiationError { - NegotiationError::ProtocolError(err) - } -} - impl From for NegotiationError { fn from(err: io::Error) -> NegotiationError { ProtocolError::from(err).into() @@ -379,22 +375,3 @@ impl From for io::Error { io::Error::new(io::ErrorKind::Other, err) } } - -impl Error for NegotiationError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - match self { - NegotiationError::ProtocolError(err) => Some(err), - _ => None, - } - } -} - -impl fmt::Display for NegotiationError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - match self { - NegotiationError::ProtocolError(p) => - fmt.write_fmt(format_args!("Protocol error: {p}")), - NegotiationError::Failed => fmt.write_str("Protocol negotiation failed."), - } - } -} diff --git a/src/multistream_select/protocol.rs b/src/multistream_select/protocol.rs index b23df5fc8..3c02f7949 100644 --- a/src/multistream_select/protocol.rs +++ b/src/multistream_select/protocol.rs @@ -422,25 +422,27 @@ where } /// A protocol error. -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum ProtocolError { /// I/O error. - IoError(io::Error), + #[error("I/O error: `{0}`")] + IoError(#[from] io::Error), /// Received an invalid message from the remote. + #[error("Received an invalid message from the remote.")] InvalidMessage, /// A protocol (name) is invalid. + #[error("A protocol (name) is invalid.")] InvalidProtocol, /// Too many protocols have been returned by the remote. + #[error("Too many protocols have been returned by the remote.")] TooManyProtocols, -} -impl From for ProtocolError { - fn from(err: io::Error) -> ProtocolError { - ProtocolError::IoError(err) - } + /// The protocol is not supported. + #[error("The protocol is not supported.")] + ProtocolNotSupported, } impl From for io::Error { @@ -457,23 +459,3 @@ impl From for ProtocolError { Self::from(io::Error::new(io::ErrorKind::InvalidData, err.to_string())) } } - -impl Error for ProtocolError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - match *self { - ProtocolError::IoError(ref err) => Some(err), - _ => None, - } - } -} - -impl fmt::Display for ProtocolError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - match self { - ProtocolError::IoError(e) => write!(fmt, "I/O error: {e}"), - ProtocolError::InvalidMessage => write!(fmt, "Received an invalid message."), - ProtocolError::InvalidProtocol => write!(fmt, "A protocol (name) is invalid."), - ProtocolError::TooManyProtocols => write!(fmt, "Too many protocols received."), - } - } -} diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 9dc2c3477..b69689e62 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -21,7 +21,7 @@ //! [`/ipfs/kad/1.0.0`](https://github.com/libp2p/specs/blob/master/kad-dht/README.md) implementation. use crate::{ - error::Error, + error::{Error, SubstreamError}, protocol::{ libp2p::kademlia::{ bucket::KBucketEntry, @@ -492,7 +492,11 @@ impl Kademlia { } /// Failed to open substream to remote peer. - async fn on_substream_open_failure(&mut self, substream_id: SubstreamId, error: Error) { + async fn on_substream_open_failure( + &mut self, + substream_id: SubstreamId, + error: SubstreamError, + ) { tracing::trace!( target: LOG_TARGET, ?substream_id, diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 94043904e..87b92f849 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -22,7 +22,7 @@ use crate::{ codec::ProtocolCodec, - error::Error, + error::SubstreamError, substream::Substream, transport::Endpoint, types::{protocol::ProtocolName, SubstreamId}, @@ -125,7 +125,7 @@ pub enum TransportEvent { substream: SubstreamId, /// Error that occurred when the substream was being opened. - error: Error, + error: SubstreamError, }, } diff --git a/src/protocol/notification/mod.rs b/src/protocol/notification/mod.rs index eefe59043..984771b38 100644 --- a/src/protocol/notification/mod.rs +++ b/src/protocol/notification/mod.rs @@ -21,7 +21,7 @@ //! Notification protocol implementation. use crate::{ - error::Error, + error::{Error, SubstreamError}, executor::Executor, protocol::{ self, @@ -813,7 +813,11 @@ impl NotificationProtocol { /// /// If the substream was initiated by the local node, it must be reported that the substream /// failed to open. Otherwise the peer state can silently be converted to `Closed`. - async fn on_substream_open_failure(&mut self, substream_id: SubstreamId, error: Error) { + async fn on_substream_open_failure( + &mut self, + substream_id: SubstreamId, + error: SubstreamError, + ) { tracing::debug!( target: LOG_TARGET, protocol = %self.protocol, diff --git a/src/protocol/notification/tests/notification.rs b/src/protocol/notification/tests/notification.rs index 52b12f07d..f766fc44d 100644 --- a/src/protocol/notification/tests/notification.rs +++ b/src/protocol/notification/tests/notification.rs @@ -31,7 +31,7 @@ use crate::{ ConnectionState, InboundState, NotificationProtocol, OutboundState, PeerContext, PeerState, ValidationResult, }, - InnerTransportEvent, ProtocolCommand, + InnerTransportEvent, ProtocolCommand, SubstreamError, }, substream::Substream, transport::Endpoint, @@ -225,7 +225,9 @@ async fn substream_open_failure_for_unknown_substream() { let (mut notif, _handle, _sender, _tx) = make_notification_protocol(); - notif.on_substream_open_failure(SubstreamId::new(), Error::Unknown).await; + notif + .on_substream_open_failure(SubstreamId::new(), SubstreamError::ConnectionClosed) + .await; } #[tokio::test] @@ -313,7 +315,9 @@ async fn substream_open_failure_for_unknown_peer() { let substream_id = SubstreamId::from(1337usize); notif.pending_outbound.insert(substream_id, peer); - notif.on_substream_open_failure(substream_id, Error::Unknown).await; + notif + .on_substream_open_failure(substream_id, SubstreamError::ConnectionClosed) + .await; } #[tokio::test] @@ -895,7 +899,10 @@ async fn open_failure_reported_once() { notif.pending_outbound.insert(SubstreamId::from(1337usize), peer); notif - .on_substream_open_failure(SubstreamId::from(1337usize), Error::Unknown) + .on_substream_open_failure( + SubstreamId::from(1337usize), + SubstreamError::ConnectionClosed, + ) .await; match handle.next().await { diff --git a/src/protocol/protocol_set.rs b/src/protocol/protocol_set.rs index a04581467..736cb7f75 100644 --- a/src/protocol/protocol_set.rs +++ b/src/protocol/protocol_set.rs @@ -20,7 +20,10 @@ use crate::{ codec::ProtocolCodec, - error::Error, + error::{Error, NegotiationError, SubstreamError}, + multistream_select::{ + NegotiationError as MultiStreamNegotiationError, ProtocolError as MultiStreamProtocolError, + }, protocol::{ connection::{ConnectionHandle, Permit}, Direction, TransportEvent, @@ -131,7 +134,7 @@ pub enum InnerTransportEvent { substream: SubstreamId, /// Error that occurred when the substream was being opened. - error: Error, + error: SubstreamError, }, } @@ -274,7 +277,7 @@ impl ProtocolSet { protocol: ProtocolName, direction: Direction, substream: Substream, - ) -> crate::Result<()> { + ) -> Result<(), SubstreamError> { tracing::debug!(target: LOG_TARGET, %protocol, ?peer, ?direction, "substream opened"); let (protocol, fallback) = match self.fallback_names.get(&protocol) { @@ -282,19 +285,28 @@ impl ProtocolSet { None => (protocol, None), }; - self.protocols - .get_mut(&protocol) - .ok_or(Error::ProtocolNotSupported(protocol.to_string()))? + let Some(protocol_context) = self.protocols.get(&protocol) else { + return Err(NegotiationError::MultistreamSelectError( + MultiStreamNegotiationError::ProtocolError( + MultiStreamProtocolError::ProtocolNotSupported, + ), + ) + .into()); + }; + + let event = InnerTransportEvent::SubstreamOpened { + peer, + protocol: protocol.clone(), + fallback, + direction, + substream, + }; + + protocol_context .tx - .send(InnerTransportEvent::SubstreamOpened { - peer, - protocol: protocol.clone(), - fallback, - direction, - substream, - }) + .send(event) .await - .map_err(From::from) + .map_err(|_| SubstreamError::ConnectionClosed) } /// Get codec used by the protocol. @@ -312,7 +324,7 @@ impl ProtocolSet { &mut self, protocol: ProtocolName, substream: SubstreamId, - error: Error, + error: SubstreamError, ) -> crate::Result<()> { tracing::debug!( target: LOG_TARGET, diff --git a/src/protocol/request_response/mod.rs b/src/protocol/request_response/mod.rs index 16b3c4680..3125f71c7 100644 --- a/src/protocol/request_response/mod.rs +++ b/src/protocol/request_response/mod.rs @@ -21,7 +21,7 @@ //! Request-response protocol implementation. use crate::{ - error::{Error, NegotiationError}, + error::{Error, NegotiationError, SubstreamError}, multistream_select::NegotiationError::Failed as MultistreamFailed, protocol::{ request_response::handle::{InnerRequestResponseEvent, RequestResponseCommand}, @@ -623,7 +623,7 @@ impl RequestResponseProtocol { async fn on_substream_open_failure( &mut self, substream: SubstreamId, - error: Error, + error: SubstreamError, ) -> crate::Result<()> { let Some(RequestContext { request_id, peer, .. @@ -660,7 +660,7 @@ impl RequestResponseProtocol { peer, request_id, error: match error { - Error::NegotiationError(NegotiationError::MultistreamSelectError( + SubstreamError::NegotiationError(NegotiationError::MultistreamSelectError( MultistreamFailed, )) => RequestResponseError::UnsupportedProtocol, _ => RequestResponseError::Rejected, diff --git a/src/protocol/request_response/tests.rs b/src/protocol/request_response/tests.rs index 9cb842f62..107b87cab 100644 --- a/src/protocol/request_response/tests.rs +++ b/src/protocol/request_response/tests.rs @@ -26,7 +26,7 @@ use crate::{ ConfigBuilder, DialOptions, RequestResponseError, RequestResponseEvent, RequestResponseHandle, RequestResponseProtocol, }, - InnerTransportEvent, TransportService, + InnerTransportEvent, SubstreamError, TransportService, }, substream::Substream, transport::{ @@ -138,7 +138,10 @@ async fn unknown_substream_open_failure() { let (mut protocol, _handle, _manager, _tx) = protocol(); match protocol - .on_substream_open_failure(SubstreamId::from(1338usize), Error::Unknown) + .on_substream_open_failure( + SubstreamId::from(1338usize), + SubstreamError::ConnectionClosed, + ) .await { Err(Error::InvalidState) => {} diff --git a/src/transport/common/listener.rs b/src/transport/common/listener.rs index d9afe7297..fbd0ddb61 100644 --- a/src/transport/common/listener.rs +++ b/src/transport/common/listener.rs @@ -20,7 +20,10 @@ //! Shared socket listener between TCP and WebSocket. -use crate::{error::AddressError, Error, PeerId}; +use crate::{ + error::{AddressError, DnsError}, + PeerId, +}; use futures::Stream; use multiaddr::{Multiaddr, Protocol}; @@ -70,7 +73,7 @@ pub enum DnsType { impl AddressType { /// Resolve the address to a concrete IP. - pub async fn lookup_ip(self) -> crate::Result { + pub async fn lookup_ip(self) -> Result { let (url, port, dns_type) = match self { // We already have the IP address. AddressType::Socket(address) => return Ok(address), @@ -95,7 +98,7 @@ impl AddressType { url ); - return Err(Error::Other(format!("Failed to resolve DNS address {url}"))); + return Err(DnsError::ResolveError(url)); } }; @@ -109,10 +112,7 @@ impl AddressType { "Multiaddr DNS type does not match IP version `{}`", url ); - - return Err(Error::Other(format!( - "Miss-match in DNS address IP version {url}" - ))); + return Err(DnsError::IpVersionMismatch); }; Ok(SocketAddr::new(ip, port)) @@ -185,7 +185,7 @@ pub trait GetSocketAddr { /// The `PeerId` is optional and may not be present. fn multiaddr_to_socket_address( address: &Multiaddr, - ) -> crate::Result<(AddressType, Option)>; + ) -> Result<(AddressType, Option), AddressError>; /// Convert concrete `SocketAddr` to `Multiaddr`. fn socket_address_to_multiaddr(address: &SocketAddr) -> Multiaddr; @@ -197,7 +197,7 @@ pub struct TcpAddress; impl GetSocketAddr for TcpAddress { fn multiaddr_to_socket_address( address: &Multiaddr, - ) -> crate::Result<(AddressType, Option)> { + ) -> Result<(AddressType, Option), AddressError> { multiaddr_to_socket_address(address, SocketListenerType::Tcp) } @@ -214,7 +214,7 @@ pub struct WebSocketAddress; impl GetSocketAddr for WebSocketAddress { fn multiaddr_to_socket_address( address: &Multiaddr, - ) -> crate::Result<(AddressType, Option)> { + ) -> Result<(AddressType, Option), AddressError> { multiaddr_to_socket_address(address, SocketListenerType::WebSocket) } @@ -352,7 +352,7 @@ enum SocketListenerType { fn multiaddr_to_socket_address( address: &Multiaddr, ty: SocketListenerType, -) -> crate::Result<(AddressType, Option)> { +) -> Result<(AddressType, Option), AddressError> { tracing::trace!(target: LOG_TARGET, ?address, "parse multi address"); let mut iter = address.iter(); @@ -370,7 +370,7 @@ fn multiaddr_to_socket_address( ?protocol, "invalid transport protocol, expected `Tcp`", ); - Err(Error::AddressError(AddressError::InvalidProtocol)) + Err(AddressError::InvalidProtocol) } }; @@ -384,7 +384,7 @@ fn multiaddr_to_socket_address( ?protocol, "invalid transport protocol, expected `Tcp`", ); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::InvalidProtocol); } }, Some(Protocol::Ip4(address)) => match iter.next() { @@ -396,7 +396,7 @@ fn multiaddr_to_socket_address( ?protocol, "invalid transport protocol, expected `Tcp`", ); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::InvalidProtocol); } }, Some(Protocol::Dns(address)) => handle_dns_type(address.into(), DnsType::Dns, iter.next())?, @@ -406,7 +406,7 @@ fn multiaddr_to_socket_address( handle_dns_type(address.into(), DnsType::Dns6, iter.next())?, protocol => { tracing::error!(target: LOG_TARGET, ?protocol, "invalid transport protocol"); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::InvalidProtocol); } }; @@ -423,14 +423,15 @@ fn multiaddr_to_socket_address( ?protocol, "invalid protocol, expected `Ws` or `Wss`" ); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::InvalidProtocol); } }; } } let maybe_peer = match iter.next() { - Some(Protocol::P2p(multihash)) => Some(PeerId::from_multihash(multihash)?), + Some(Protocol::P2p(multihash)) => + Some(PeerId::from_multihash(multihash).map_err(AddressError::InvalidPeerId)?), None => None, protocol => { tracing::error!( @@ -438,7 +439,7 @@ fn multiaddr_to_socket_address( ?protocol, "invalid protocol, expected `P2p` or `None`" ); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::InvalidProtocol); } }; diff --git a/src/transport/dummy.rs b/src/transport/dummy.rs index f8c075711..29e9570df 100644 --- a/src/transport/dummy.rs +++ b/src/transport/dummy.rs @@ -104,7 +104,7 @@ impl Transport for DummyTransport { #[cfg(test)] mod tests { use super::*; - use crate::{transport::Endpoint, Error, PeerId}; + use crate::{error::DialError, transport::Endpoint, PeerId}; use futures::StreamExt; #[tokio::test] @@ -114,7 +114,7 @@ mod tests { transport.inject_event(TransportEvent::DialFailure { connection_id: ConnectionId::from(1338usize), address: Multiaddr::empty(), - error: Error::Unknown, + error: DialError::Timeout, }); let peer = PeerId::random(); diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index 33cc8f5ba..a47d309eb 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -21,7 +21,7 @@ use crate::{ codec::ProtocolCodec, crypto::ed25519::Keypair, - error::{AddressError, Error}, + error::{AddressError, DialError, Error}, executor::Executor, protocol::{InnerTransportEvent, TransportService}, transport::{ @@ -248,6 +248,9 @@ pub struct TransportManager { /// Connection limits. connection_limits: limits::ConnectionLimits, + + /// Opening connections errors. + opening_errors: HashMap>, } impl TransportManager { @@ -292,6 +295,7 @@ impl TransportManager { next_substream_id: Arc::new(AtomicUsize::new(0usize)), next_connection_id: Arc::new(AtomicUsize::new(0usize)), connection_limits: limits::ConnectionLimits::new(connection_limits_config), + opening_errors: HashMap::new(), }, handle, ) @@ -1603,6 +1607,7 @@ impl TransportManager { } } TransportEvent::ConnectionEstablished { peer, endpoint } => { + self.opening_errors.remove(&endpoint.connection_id()); match self.on_connection_established(peer, &endpoint) { Err(error) => { tracing::debug!( @@ -1655,6 +1660,8 @@ impl TransportManager { } } TransportEvent::ConnectionOpened { connection_id, address } => { + self.opening_errors.remove(&connection_id); + if let Err(error) = self.on_connection_opened(transport, connection_id, address) { tracing::debug!( target: LOG_TARGET, @@ -1664,7 +1671,7 @@ impl TransportManager { ); } } - TransportEvent::OpenFailure { connection_id } => { + TransportEvent::OpenFailure { connection_id, errors } => { match self.on_open_failure(transport, connection_id) { Err(error) => tracing::debug!( target: LOG_TARGET, @@ -1709,13 +1716,19 @@ impl TransportManager { }; } - return Some(TransportEvent::DialFailure { - connection_id, - address: Multiaddr::empty(), - error: Error::Unknown, - }) + let mut grouped_errors = self.opening_errors.remove(&connection_id).unwrap_or_default(); + grouped_errors.extend(errors); + return Some(TransportEvent::OpenFailure { connection_id, errors: grouped_errors }); + } + Ok(None) => { + tracing::trace!( + target: LOG_TARGET, + ?connection_id, + "open failure, but not the last transport", + ); + + self.opening_errors.entry(connection_id).or_default().extend(errors); } - Ok(None) => {} } }, TransportEvent::PendingInboundConnection { connection_id } => { @@ -1764,6 +1777,7 @@ mod tests { transport::{dummy::DummyTransport, KEEP_ALIVE_TIMEOUT}, }; use std::{ + borrow::Cow, net::{Ipv4Addr, Ipv6Addr}, sync::Arc, }; @@ -4067,4 +4081,110 @@ mod tests { assert!(!peer_context.addresses.contains(&second_address)); } } + + #[tokio::test] + async fn opening_errors_are_reported() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let (mut manager, _handle) = TransportManager::new( + Keypair::generate(), + HashSet::new(), + BandwidthSink::new(), + 8usize, + ConnectionLimitsConfig::default(), + ); + let peer = PeerId::random(); + let connection_id = ConnectionId::from(0); + + // Setup TCP transport. + let dial_address_tcp = Multiaddr::empty() + .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) + .with(Protocol::Tcp(8888)) + .with(Protocol::P2p( + Multihash::from_bytes(&peer.to_bytes()).unwrap(), + )); + let transport = Box::new({ + let mut transport = DummyTransport::new(); + transport.inject_event(TransportEvent::OpenFailure { + connection_id, + errors: vec![(dial_address_tcp.clone(), DialError::Timeout)], + }); + transport + }); + manager.register_transport(SupportedTransport::Tcp, transport); + manager.add_known_address( + peer, + vec![Multiaddr::empty() + .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 5))) + .with(Protocol::Tcp(8888)) + .with(Protocol::P2p(Multihash::from(peer)))] + .into_iter(), + ); + + // Setup WebSockets transport. + let dial_address_ws = Multiaddr::empty() + .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) + .with(Protocol::Tcp(8889)) + .with(Protocol::Ws(Cow::Borrowed("/"))) + .with(Protocol::P2p( + Multihash::from_bytes(&peer.to_bytes()).unwrap(), + )); + + let transport = Box::new({ + let mut transport = DummyTransport::new(); + transport.inject_event(TransportEvent::OpenFailure { + connection_id, + errors: vec![(dial_address_ws.clone(), DialError::Timeout)], + }); + transport + }); + manager.register_transport(SupportedTransport::WebSocket, transport); + manager.add_known_address( + peer, + vec![Multiaddr::empty() + .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 5))) + .with(Protocol::Tcp(8889)) + .with(Protocol::Ws(Cow::Borrowed("/"))) + .with(Protocol::P2p( + Multihash::from_bytes(&peer.to_bytes()).unwrap(), + ))] + .into_iter(), + ); + + // Dial the peer on both transports. + assert!(manager.dial(peer).await.is_ok()); + assert!(!manager.pending_connections.is_empty()); + + { + let peers = manager.peers.read(); + + match peers.get(&peer) { + Some(PeerContext { + state: PeerState::Opening { .. }, + .. + }) => {} + state => panic!("invalid state for peer: {state:?}"), + } + } + + match manager.next().await.unwrap() { + TransportEvent::OpenFailure { + connection_id, + errors, + } => { + assert_eq!(connection_id, ConnectionId::from(0)); + assert_eq!(errors.len(), 2); + let tcp = errors.iter().find(|(addr, _)| addr == &dial_address_tcp).unwrap(); + assert!(std::matches!(tcp.1, DialError::Timeout)); + + let ws = errors.iter().find(|(addr, _)| addr == &dial_address_ws).unwrap(); + assert!(std::matches!(ws.1, DialError::Timeout)); + } + event => panic!("invalid event: {event:?}"), + } + assert!(manager.pending_connections.is_empty()); + assert!(manager.opening_errors.is_empty()); + } } diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 792508cc9..1d61ca9d2 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -20,7 +20,7 @@ //! Transport protocol implementations provided by [`Litep2p`](`crate::Litep2p`). -use crate::{transport::manager::TransportHandle, types::ConnectionId, Error, PeerId}; +use crate::{error::DialError, transport::manager::TransportHandle, types::ConnectionId, PeerId}; use futures::Stream; use multiaddr::Multiaddr; @@ -159,13 +159,16 @@ pub(crate) enum TransportEvent { address: Multiaddr, /// Error. - error: Error, + error: DialError, }, /// Open failure for an unnegotiated set of connections. OpenFailure { /// Connection ID. connection_id: ConnectionId, + + /// Errors. + errors: Vec<(Multiaddr, DialError)>, }, } diff --git a/src/transport/quic/connection.rs b/src/transport/quic/connection.rs index ec14d7725..52c198e7c 100644 --- a/src/transport/quic/connection.rs +++ b/src/transport/quic/connection.rs @@ -24,7 +24,7 @@ use std::time::Duration; use crate::{ config::Role, - error::Error, + error::{Error, NegotiationError, SubstreamError}, multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version}, protocol::{Direction, Permit, ProtocolCommand, ProtocolSet}, substream, @@ -63,7 +63,7 @@ enum ConnectionError { substream_id: Option, /// Error. - error: Error, + error: SubstreamError, }, } @@ -138,13 +138,14 @@ impl QuicConnection { stream: S, role: &Role, protocols: Vec<&str>, - ) -> crate::Result<(Negotiated, ProtocolName)> { + ) -> Result<(Negotiated, ProtocolName), NegotiationError> { tracing::trace!(target: LOG_TARGET, ?protocols, "negotiating protocols"); let (protocol, socket) = match role { - Role::Dialer => dialer_select_proto(stream, protocols, Version::V1).await?, - Role::Listener => listener_select_proto(stream, protocols).await?, - }; + Role::Dialer => dialer_select_proto(stream, protocols, Version::V1).await, + Role::Listener => listener_select_proto(stream, protocols).await, + } + .map_err(NegotiationError::MultistreamSelectError)?; tracing::trace!(target: LOG_TARGET, ?protocol, "protocol negotiated"); @@ -158,12 +159,12 @@ impl QuicConnection { substream_id: SubstreamId, protocol: ProtocolName, fallback_names: Vec, - ) -> crate::Result { + ) -> Result { tracing::debug!(target: LOG_TARGET, ?protocol, ?substream_id, "open substream"); let stream = match handle.open_bi().await { Ok((send_stream, recv_stream)) => NegotiatingSubstream::new(send_stream, recv_stream), - Err(error) => return Err(Error::Quinn(error)), + Err(error) => return Err(NegotiationError::Quic(error.into()).into()), }; // TODO: protocols don't change after they've been initialized so this should be done only @@ -200,7 +201,7 @@ impl QuicConnection { protocols: Vec, substream_id: SubstreamId, permit: Permit, - ) -> crate::Result { + ) -> Result { tracing::trace!( target: LOG_TARGET, ?substream_id, @@ -258,7 +259,7 @@ impl QuicConnection { Ok(Err(error)) => Err(ConnectionError::FailedToNegotiate { protocol: None, substream_id: None, - error, + error: SubstreamError::NegotiationError(error), }), Err(_) => Err(ConnectionError::Timeout { protocol: None, @@ -283,7 +284,7 @@ impl QuicConnection { let (protocol, substream_id, error) = match error { ConnectionError::Timeout { protocol, substream_id } => { - (protocol, substream_id, Error::Timeout) + (protocol, substream_id, SubstreamError::NegotiationError(NegotiationError::Timeout)) } ConnectionError::FailedToNegotiate { protocol, substream_id, error } => { (protocol, substream_id, error) diff --git a/src/transport/quic/listener.rs b/src/transport/quic/listener.rs index 7f6c3ad05..e8550ffb7 100644 --- a/src/transport/quic/listener.rs +++ b/src/transport/quic/listener.rs @@ -20,7 +20,7 @@ use crate::{ crypto::{ed25519::Keypair, tls::make_server_config}, - error::{AddressError, Error}, + error::AddressError, PeerId, }; @@ -101,7 +101,9 @@ impl QuicListener { } /// Extract socket address and `PeerId`, if found, from `address`. - pub fn get_socket_address(address: &Multiaddr) -> crate::Result<(SocketAddr, Option)> { + pub fn get_socket_address( + address: &Multiaddr, + ) -> Result<(SocketAddr, Option), AddressError> { tracing::trace!(target: LOG_TARGET, ?address, "parse multi address"); let mut iter = address.iter(); @@ -114,7 +116,7 @@ impl QuicListener { ?protocol, "invalid transport protocol, expected `QuicV1`", ); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::InvalidProtocol); } }, Some(Protocol::Ip4(address)) => match iter.next() { @@ -125,19 +127,19 @@ impl QuicListener { ?protocol, "invalid transport protocol, expected `QuicV1`", ); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::InvalidProtocol); } }, protocol => { tracing::error!(target: LOG_TARGET, ?protocol, "invalid transport protocol"); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::InvalidProtocol); } }; // verify that quic exists match iter.next() { Some(Protocol::QuicV1) => {} - _ => return Err(Error::AddressError(AddressError::InvalidProtocol)), + _ => return Err(AddressError::InvalidProtocol), } let maybe_peer = match iter.next() { @@ -149,7 +151,7 @@ impl QuicListener { ?protocol, "invalid protocol, expected `P2p` or `None`" ); - return Err(Error::AddressError(AddressError::InvalidProtocol)); + return Err(AddressError::PeerIdMissing); } }; @@ -268,12 +270,10 @@ mod tests { let crypto_config = Arc::new(make_client_config(&Keypair::generate(), Some(peer)).expect("to succeed")); let client_config = ClientConfig::new(crypto_config); - let client = Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)) - .map_err(|error| Error::Other(error.to_string())) - .unwrap(); + let client = + Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)).unwrap(); let connection = client .connect_with(client_config, format!("[::1]:{port}").parse().unwrap(), "l") - .map_err(|error| Error::Other(error.to_string())) .unwrap(); let (res1, res2) = tokio::join!( @@ -318,31 +318,27 @@ mod tests { let crypto_config1 = Arc::new(make_client_config(&Keypair::generate(), Some(peer)).expect("to succeed")); let client_config1 = ClientConfig::new(crypto_config1); - let client1 = Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)) - .map_err(|error| Error::Other(error.to_string())) - .unwrap(); + let client1 = + Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)).unwrap(); let connection1 = client1 .connect_with( client_config1, format!("[::1]:{port1}").parse().unwrap(), "l", ) - .map_err(|error| Error::Other(error.to_string())) .unwrap(); let crypto_config2 = Arc::new(make_client_config(&Keypair::generate(), Some(peer)).expect("to succeed")); let client_config2 = ClientConfig::new(crypto_config2); - let client2 = Endpoint::client(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)) - .map_err(|error| Error::Other(error.to_string())) - .unwrap(); + let client2 = + Endpoint::client(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)).unwrap(); let connection2 = client2 .connect_with( client_config2, format!("127.0.0.1:{port2}").parse().unwrap(), "l", ) - .map_err(|error| Error::Other(error.to_string())) .unwrap(); tokio::spawn(async move { @@ -391,31 +387,27 @@ mod tests { let crypto_config1 = Arc::new(make_client_config(&Keypair::generate(), Some(peer)).expect("to succeed")); let client_config1 = ClientConfig::new(crypto_config1); - let client1 = Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)) - .map_err(|error| Error::Other(error.to_string())) - .unwrap(); + let client1 = + Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)).unwrap(); let connection1 = client1 .connect_with( client_config1, format!("[::1]:{port}").parse().unwrap(), "l", ) - .map_err(|error| Error::Other(error.to_string())) .unwrap(); let crypto_config2 = Arc::new(make_client_config(&Keypair::generate(), Some(peer)).expect("to succeed")); let client_config2 = ClientConfig::new(crypto_config2); - let client2 = Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)) - .map_err(|error| Error::Other(error.to_string())) - .unwrap(); + let client2 = + Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)).unwrap(); let connection2 = client2 .connect_with( client_config2, format!("[::1]:{port}").parse().unwrap(), "l", ) - .map_err(|error| Error::Other(error.to_string())) .unwrap(); tokio::spawn(async move { diff --git a/src/transport/quic/mod.rs b/src/transport/quic/mod.rs index d274a977c..ad03674d5 100644 --- a/src/transport/quic/mod.rs +++ b/src/transport/quic/mod.rs @@ -24,7 +24,7 @@ use crate::{ crypto::tls::make_client_config, - error::{AddressError, Error}, + error::{AddressError, DialError, Error, QuicError}, transport::{ manager::TransportHandle, quic::{config::Config as QuicConfig, connection::QuicConnection, listener::QuicListener}, @@ -84,15 +84,22 @@ pub(crate) struct QuicTransport { pending_inbound_connections: HashMap, /// Pending connections. - pending_connections: - FuturesUnordered)>>, + pending_connections: FuturesUnordered< + BoxFuture<'static, (ConnectionId, Result)>, + >, /// Negotiated connections waiting for validation. pending_open: HashMap, /// Pending raw, unnegotiated connections. pending_raw_connections: FuturesUnordered< - BoxFuture<'static, Result<(ConnectionId, Multiaddr, NegotiatedConnection), ConnectionId>>, + BoxFuture< + 'static, + Result< + (ConnectionId, Multiaddr, NegotiatedConnection), + (ConnectionId, Vec<(Multiaddr, DialError)>), + >, + >, >, /// Opened raw connection, waiting for approval/rejection from `TransportManager`. @@ -118,11 +125,14 @@ impl QuicTransport { self.pending_connections.push(Box::pin(async move { let connection = match connection.await { Ok(connection) => connection, - Err(error) => return (connection_id, Err(error.into())), + Err(error) => return (connection_id, Err(DialError::from(error))), }; let Some(peer) = Self::extract_peer_id(&connection) else { - return (connection_id, Err(Error::InvalidCertificate)); + return ( + connection_id, + Err(crate::error::NegotiationError::Quic(QuicError::InvalidCertificate).into()), + ); }; (connection_id, Ok(NegotiatedConnection { peer, connection })) @@ -133,7 +143,7 @@ impl QuicTransport { fn on_connection_established( &mut self, connection_id: ConnectionId, - result: crate::Result, + result: Result, ) -> Option { tracing::debug!(target: LOG_TARGET, ?connection_id, success = result.is_ok(), "connection established"); @@ -257,14 +267,18 @@ impl Transport for QuicTransport { ); self.pending_dials.insert(connection_id, address); + self.pending_connections.push(Box::pin(async move { let connection = match connection.await { Ok(connection) => connection, - Err(error) => return (connection_id, Err(error.into())), + Err(error) => return (connection_id, Err(DialError::from(error))), }; let Some(peer) = Self::extract_peer_id(&connection) else { - return (connection_id, Err(Error::InvalidCertificate)); + return ( + connection_id, + Err(crate::error::NegotiationError::Quic(QuicError::InvalidCertificate).into()), + ); }; (connection_id, Ok(NegotiatedConnection { peer, connection })) @@ -332,21 +346,19 @@ impl Transport for QuicTransport { connection_id: ConnectionId, addresses: Vec, ) -> crate::Result<()> { + let num_addresses = addresses.len(); let mut futures: FuturesUnordered<_> = addresses .into_iter() .map(|address| { let keypair = self.context.keypair.clone(); let connection_open_timeout = self.config.connection_open_timeout; + let addr = address.clone(); - async move { - let Ok((socket_address, Some(peer))) = - QuicListener::get_socket_address(&address) - else { - return ( - connection_id, - Err(Error::AddressError(AddressError::PeerIdMissing)), - ); - }; + let future = async move { + let (socket_address, peer) = QuicListener::get_socket_address(&address) + .map_err(DialError::AddressError)?; + let peer = + peer.ok_or_else(|| DialError::AddressError(AddressError::PeerIdMissing))?; let crypto_config = Arc::new(make_client_config(&keypair, Some(peer)).expect("to succeed")); @@ -362,59 +374,58 @@ impl Transport for QuicTransport { SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0), Some(Protocol::Ip4(_)) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), - _ => - return ( - connection_id, - Err(Error::AddressError(AddressError::InvalidProtocol)), - ), + _ => return Err(AddressError::InvalidProtocol.into()), }; let client = match Endpoint::client(client_listen_address) { Ok(client) => client, Err(error) => { - return (connection_id, Err(Error::Other(error.to_string()))); + return Err(DialError::from(error)); } }; let connection = match client.connect_with(client_config, socket_address, "l") { Ok(connection) => connection, - Err(error) => { - return (connection_id, Err(Error::Other(error.to_string()))); - } + Err(error) => return Err(DialError::from(error)), }; let connection = match connection.await { Ok(connection) => connection, - Err(error) => return (connection_id, Err(error.into())), + Err(error) => return Err(DialError::from(error)), }; let Some(peer) = Self::extract_peer_id(&connection) else { - return (connection_id, Err(Error::InvalidCertificate)); + return Err(crate::error::NegotiationError::Quic( + QuicError::InvalidCertificate, + ) + .into()); }; - ( - connection_id, - Ok((address, NegotiatedConnection { peer, connection })), - ) - } + Ok(NegotiatedConnection { peer, connection }) + }; + + async move { future.await.map(|ok| (addr.clone(), ok)).map_err(|err| (addr, err)) } }) .collect(); self.pending_raw_connections.push(Box::pin(async move { - while let Some(result) = futures.next().await { - let (connection_id, result) = result; + let mut errors = Vec::with_capacity(num_addresses); + while let Some(result) = futures.next().await { match result { Ok((address, connection)) => return Ok((connection_id, address, connection)), - Err(error) => tracing::debug!( - target: LOG_TARGET, - ?connection_id, - ?error, - "failed to open connection", - ), + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + ?connection_id, + ?error, + "failed to open connection", + ); + errors.push(error) + } } } - Err(connection_id) + Err((connection_id, errors)) })); Ok(()) @@ -478,9 +489,12 @@ impl Stream for QuicTransport { })); } } - Err(connection_id) => + Err((connection_id, errors)) => if !self.canceled.remove(&connection_id) { - return Poll::Ready(Some(TransportEvent::OpenFailure { connection_id })); + return Poll::Ready(Some(TransportEvent::OpenFailure { + connection_id, + errors, + })); }, } } diff --git a/src/transport/tcp/connection.rs b/src/transport/tcp/connection.rs index 3d3511e0d..cb752244c 100644 --- a/src/transport/tcp/connection.rs +++ b/src/transport/tcp/connection.rs @@ -24,7 +24,7 @@ use crate::{ ed25519::Keypair, noise::{self, NoiseSocket}, }, - error::{Error, NegotiationError}, + error::{Error, NegotiationError, SubstreamError}, multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version}, protocol::{Direction, Permit, ProtocolCommand, ProtocolSet}, substream, @@ -101,7 +101,7 @@ enum ConnectionError { substream_id: Option, /// Error. - error: Error, + error: SubstreamError, }, } @@ -222,7 +222,7 @@ impl TcpConnection { max_write_buffer_size: usize, connection_open_timeout: Duration, substream_open_timeout: Duration, - ) -> crate::Result { + ) -> Result { tracing::debug!( target: LOG_TARGET, ?address, @@ -249,7 +249,7 @@ impl TcpConnection { { Err(_) => { tracing::trace!(target: LOG_TARGET, ?connection_id, "connection timed out during negotiation"); - Err(Error::Timeout) + Err(NegotiationError::Timeout) } Ok(result) => result, } @@ -263,7 +263,7 @@ impl TcpConnection { protocol: ProtocolName, fallback_names: Vec, open_timeout: Duration, - ) -> crate::Result { + ) -> Result { tracing::debug!(target: LOG_TARGET, ?protocol, ?substream_id, "open substream"); let stream = match control.open_stream().await { @@ -278,7 +278,10 @@ impl TcpConnection { ?error, "failed to open substream" ); - return Err(Error::YamuxError(Direction::Outbound(substream_id), error)); + return Err(SubstreamError::YamuxError( + error, + Direction::Outbound(substream_id), + )); } }; @@ -311,7 +314,7 @@ impl TcpConnection { max_write_buffer_size: usize, connection_open_timeout: Duration, substream_open_timeout: Duration, - ) -> crate::Result { + ) -> Result { tracing::debug!(target: LOG_TARGET, ?address, "accept connection"); match tokio::time::timeout(connection_open_timeout, async move { @@ -331,7 +334,7 @@ impl TcpConnection { }) .await { - Err(_) => Err(Error::Timeout), + Err(_) => Err(NegotiationError::Timeout), Ok(result) => result, } } @@ -343,7 +346,7 @@ impl TcpConnection { substream_id: SubstreamId, protocols: Vec, open_timeout: Duration, - ) -> crate::Result { + ) -> Result { tracing::trace!( target: LOG_TARGET, ?substream_id, @@ -375,7 +378,7 @@ impl TcpConnection { role: &Role, protocols: Vec<&str>, substream_open_timeout: Duration, - ) -> crate::Result<(Negotiated, ProtocolName)> { + ) -> Result<(Negotiated, ProtocolName), NegotiationError> { tracing::trace!(target: LOG_TARGET, ?protocols, "negotiating protocols"); match tokio::time::timeout(substream_open_timeout, async move { @@ -386,10 +389,8 @@ impl TcpConnection { }) .await { - Err(_) => Err(Error::Timeout), - Ok(Err(error)) => Err(Error::NegotiationError( - NegotiationError::MultistreamSelectError(error), - )), + Err(_) => Err(NegotiationError::Timeout), + Ok(Err(error)) => Err(NegotiationError::MultistreamSelectError(error)), Ok(Ok((protocol, socket))) => { tracing::trace!(target: LOG_TARGET, ?protocol, "protocol negotiated"); @@ -410,7 +411,7 @@ impl TcpConnection { max_read_ahead_factor: usize, max_write_buffer_size: usize, substream_open_timeout: Duration, - ) -> crate::Result { + ) -> Result { tracing::trace!( target: LOG_TARGET, ?role, @@ -442,7 +443,7 @@ impl TcpConnection { if let Some(dialed_peer) = dialed_peer { if dialed_peer != peer { tracing::debug!(target: LOG_TARGET, ?dialed_peer, ?peer, "peer id mismatch"); - return Err(Error::PeerIdMismatch(dialed_peer, peer)); + return Err(NegotiationError::PeerIdMismatch(dialed_peer, peer)); } } @@ -521,7 +522,7 @@ impl TcpConnection { Ok(Err(error)) => Err(ConnectionError::FailedToNegotiate { protocol: None, substream_id: None, - error, + error: SubstreamError::NegotiationError(error), }), Err(_) => Err(ConnectionError::Timeout { protocol: None, @@ -561,7 +562,7 @@ impl TcpConnection { let (protocol, substream_id, error) = match error { ConnectionError::Timeout { protocol, substream_id } => { - (protocol, substream_id, Error::Timeout) + (protocol, substream_id, SubstreamError::NegotiationError(NegotiationError::Timeout)) } ConnectionError::FailedToNegotiate { protocol, substream_id, error } => { (protocol, substream_id, error) @@ -717,11 +718,11 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::NegotiationError(NegotiationError::MultistreamSelectError( + Err(NegotiationError::MultistreamSelectError( crate::multistream_select::NegotiationError::ProtocolError( crate::multistream_select::ProtocolError::InvalidMessage, ), - ))) => {} + )) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -759,11 +760,11 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::NegotiationError(NegotiationError::MultistreamSelectError( + Err(NegotiationError::MultistreamSelectError( crate::multistream_select::NegotiationError::ProtocolError( crate::multistream_select::ProtocolError::InvalidMessage, ), - ))) => {} + )) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -812,9 +813,9 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::NegotiationError(NegotiationError::MultistreamSelectError( + Err(NegotiationError::MultistreamSelectError( crate::multistream_select::NegotiationError::Failed, - ))) => {} + )) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -856,9 +857,9 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::NegotiationError(NegotiationError::MultistreamSelectError( + Err(NegotiationError::MultistreamSelectError( crate::multistream_select::NegotiationError::Failed, - ))) => {} + )) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -903,7 +904,7 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::Timeout) => {} + Err(NegotiationError::Timeout) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -954,7 +955,7 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::Timeout) => {} + Err(NegotiationError::Timeout) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -1000,7 +1001,7 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::Timeout) => {} + Err(NegotiationError::Timeout) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -1040,7 +1041,7 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::Timeout) => {} + Err(NegotiationError::Timeout) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -1095,9 +1096,9 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::NegotiationError(NegotiationError::MultistreamSelectError( + Err(NegotiationError::MultistreamSelectError( crate::multistream_select::NegotiationError::Failed, - ))) => {} + )) => {} Err(error) => panic!("{error:?}"), } } @@ -1155,9 +1156,9 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::NegotiationError(NegotiationError::MultistreamSelectError( + Err(NegotiationError::MultistreamSelectError( crate::multistream_select::NegotiationError::Failed, - ))) => {} + )) => {} Err(error) => panic!("{error:?}"), } } @@ -1208,7 +1209,7 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::Timeout) => {} + Err(NegotiationError::Timeout) => {} Err(error) => panic!("invalid error: {error:?}"), } } @@ -1265,7 +1266,7 @@ mod tests { .await { Ok(_) => panic!("connection was supposed to fail"), - Err(Error::Timeout) => {} + Err(NegotiationError::Timeout) => {} Err(error) => panic!("invalid error: {error:?}"), } } diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 35c8da597..856c9410b 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -23,7 +23,7 @@ use crate::{ config::Role, - error::Error, + error::{DialError, Error}, transport::{ common::listener::{DialAddresses, GetSocketAddr, SocketListener, TcpAddress}, manager::TransportHandle, @@ -91,12 +91,19 @@ pub(crate) struct TcpTransport { pending_inbound_connections: HashMap, /// Pending opening connections. - pending_connections: - FuturesUnordered>>, + pending_connections: FuturesUnordered< + BoxFuture<'static, Result>, + >, /// Pending raw, unnegotiated connections. pending_raw_connections: FuturesUnordered< - BoxFuture<'static, Result<(ConnectionId, Multiaddr, TcpStream), ConnectionId>>, + BoxFuture< + 'static, + Result< + (ConnectionId, Multiaddr, TcpStream), + (ConnectionId, Vec<(Multiaddr, DialError)>), + >, + >, >, /// Opened raw connection, waiting for approval/rejection from `TransportManager`. @@ -145,7 +152,7 @@ impl TcpTransport { substream_open_timeout, ) .await - .map_err(|error| (connection_id, error)) + .map_err(|error| (connection_id, error.into())) })); } @@ -155,8 +162,9 @@ impl TcpTransport { dial_addresses: DialAddresses, connection_open_timeout: Duration, nodelay: bool, - ) -> crate::Result<(Multiaddr, TcpStream)> { + ) -> Result<(Multiaddr, TcpStream), DialError> { let (socket_address, _) = TcpAddress::multiaddr_to_socket_address(&address)?; + let remote_address = match tokio::time::timeout(connection_open_timeout, socket_address.lookup_ip()).await { Err(_) => { @@ -166,9 +174,9 @@ impl TcpTransport { ?connection_open_timeout, "failed to resolve address within timeout", ); - return Err(Error::Timeout); + return Err(DialError::Timeout); } - Ok(Err(error)) => return Err(error), + Ok(Err(error)) => return Err(error.into()), Ok(Ok(address)) => address, }; @@ -225,7 +233,7 @@ impl TcpTransport { ?connection_open_timeout, "failed to connect within timeout", ); - Err(Error::Timeout) + Err(DialError::Timeout) } Ok(Err(error)) => Err(error.into()), Ok(Ok((address, stream))) => { @@ -316,7 +324,7 @@ impl Transport for TcpTransport { substream_open_timeout, ) .await - .map_err(|error| (connection_id, error)) + .map_err(|error| (connection_id, error.into())) })); Ok(()) @@ -383,6 +391,7 @@ impl Transport for TcpTransport { connection_id: ConnectionId, addresses: Vec, ) -> crate::Result<()> { + let num_addresses = addresses.len(); let mut futures: FuturesUnordered<_> = addresses .into_iter() .map(|address| { @@ -392,30 +401,35 @@ impl Transport for TcpTransport { async move { TcpTransport::dial_peer( - address, + address.clone(), dial_addresses, connection_open_timeout, nodelay, ) .await + .map_err(|error| (address, error)) } }) .collect(); self.pending_raw_connections.push(Box::pin(async move { + let mut errors = Vec::with_capacity(num_addresses); while let Some(result) = futures.next().await { match result { Ok((address, stream)) => return Ok((connection_id, address, stream)), - Err(error) => tracing::debug!( - target: LOG_TARGET, - ?connection_id, - ?error, - "failed to open connection", - ), + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + ?connection_id, + ?error, + "failed to open connection", + ); + errors.push(error) + } } } - Err(connection_id) + Err((connection_id, errors)) })); Ok(()) @@ -459,11 +473,11 @@ impl Transport for TcpTransport { substream_open_timeout, ) .await - .map_err(|error| (connection_id, error)) + .map_err(|error| (connection_id, error.into())) }) .await { - Err(_) => Err((connection_id, Error::Timeout)), + Err(_) => Err((connection_id, DialError::Timeout)), Ok(Err(error)) => Err(error), Ok(Ok(connection)) => Ok(connection), } @@ -528,9 +542,12 @@ impl Stream for TcpTransport { })); } } - Err(connection_id) => + Err((connection_id, errors)) => if !self.canceled.remove(&connection_id) { - return Poll::Ready(Some(TransportEvent::OpenFailure { connection_id })); + return Poll::Ready(Some(TransportEvent::OpenFailure { + connection_id, + errors, + })); }, } } @@ -554,6 +571,8 @@ impl Stream for TcpTransport { address, error, })); + } else { + tracing::debug!(target: LOG_TARGET, ?error, ?connection_id, "Pending inbound connection failed"); } } } @@ -900,7 +919,7 @@ mod tests { assert!(!transport.pending_dials.is_empty()); transport.pending_connections.push(Box::pin(async move { - Err((ConnectionId::from(0usize), Error::Unknown)) + Err((ConnectionId::from(0usize), DialError::Timeout)) })); assert!(std::matches!( diff --git a/src/transport/webrtc/connection.rs b/src/transport/webrtc/connection.rs index d7abfb4a8..f31a48b28 100644 --- a/src/transport/webrtc/connection.rs +++ b/src/transport/webrtc/connection.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - error::Error, + error::{Error, ParseError, SubstreamError}, multistream_select::{listener_negotiate, DialerState, HandshakeResult, ListenerSelectResult}, protocol::{Direction, Permit, ProtocolCommand, ProtocolSet}, substream::Substream, @@ -349,6 +349,7 @@ impl WebRtcConnection { .report_substream_open(self.peer, protocol.clone(), Direction::Inbound, substream) .await .map(|_| (substream_id, handle, permit)) + .map_err(Into::into) } /// Handle data received to an opening outbound channel. @@ -372,7 +373,7 @@ impl WebRtcConnection { data: Vec, mut dialer_state: DialerState, context: ChannelContext, - ) -> crate::Result> { + ) -> Result, SubstreamError> { tracing::trace!( target: LOG_TARGET, peer = ?self.peer, @@ -380,7 +381,12 @@ impl WebRtcConnection { "handle opening outbound substream", ); - let message = WebRtcMessage::decode(&data)?.payload.ok_or(Error::InvalidData)?; + let rtc_message = WebRtcMessage::decode(&data) + .map_err(|err| SubstreamError::NegotiationError(err.into()))?; + let message = rtc_message.payload.ok_or(SubstreamError::NegotiationError( + ParseError::InvalidData.into(), + ))?; + let HandshakeResult::Succeeded(protocol) = dialer_state.register_response(message)? else { tracing::trace!( target: LOG_TARGET, diff --git a/src/transport/webrtc/util.rs b/src/transport/webrtc/util.rs index 951434b88..c9d4d141a 100644 --- a/src/transport/webrtc/util.rs +++ b/src/transport/webrtc/util.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{codec::unsigned_varint::UnsignedVarint, error::Error, transport::webrtc::schema}; +use crate::{codec::unsigned_varint::UnsignedVarint, error::ParseError, transport::webrtc::schema}; use prost::Message; use tokio_util::codec::{Decoder, Encoder}; @@ -72,18 +72,21 @@ impl WebRtcMessage { } /// Decode payload into [`WebRtcMessage`]. - pub fn decode(payload: &[u8]) -> crate::Result { + pub fn decode(payload: &[u8]) -> Result { // TODO: set correct size let mut codec = UnsignedVarint::new(None); let mut data = bytes::BytesMut::from(payload); - let result = codec.decode(&mut data)?.ok_or(Error::InvalidData)?; + let result = codec + .decode(&mut data) + .map_err(|_| ParseError::InvalidData)? + .ok_or(ParseError::InvalidData)?; match schema::webrtc::Message::decode(result) { Ok(message) => Ok(Self { payload: message.message, flags: message.flag, }), - Err(_) => Err(Error::InvalidData), + Err(_) => Err(ParseError::InvalidData), } } } diff --git a/src/transport/websocket/connection.rs b/src/transport/websocket/connection.rs index 19a3e14fd..8c5056070 100644 --- a/src/transport/websocket/connection.rs +++ b/src/transport/websocket/connection.rs @@ -24,7 +24,7 @@ use crate::{ ed25519::Keypair, noise::{self, NoiseSocket}, }, - error::Error, + error::{Error, NegotiationError, SubstreamError}, multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version}, protocol::{Direction, Permit, ProtocolCommand, ProtocolSet}, substream, @@ -93,7 +93,7 @@ enum ConnectionError { substream_id: Option, /// Error. - error: Error, + error: SubstreamError, }, } @@ -195,13 +195,14 @@ impl WebSocketConnection { stream: S, role: &Role, protocols: Vec<&str>, - ) -> crate::Result<(Negotiated, ProtocolName)> { + ) -> Result<(Negotiated, ProtocolName), NegotiationError> { tracing::trace!(target: LOG_TARGET, ?protocols, "negotiating protocols"); let (protocol, socket) = match role { - Role::Dialer => dialer_select_proto(stream, protocols, Version::V1).await?, - Role::Listener => listener_select_proto(stream, protocols).await?, - }; + Role::Dialer => dialer_select_proto(stream, protocols, Version::V1).await, + Role::Listener => listener_select_proto(stream, protocols).await, + } + .map_err(NegotiationError::MultistreamSelectError)?; tracing::trace!(target: LOG_TARGET, ?protocol, "protocol negotiated"); @@ -219,7 +220,7 @@ impl WebSocketConnection { yamux_config: crate::yamux::Config, max_read_ahead_factor: usize, max_write_buffer_size: usize, - ) -> crate::Result { + ) -> Result { tracing::trace!( target: LOG_TARGET, ?address, @@ -251,11 +252,13 @@ impl WebSocketConnection { yamux_config: crate::yamux::Config, max_read_ahead_factor: usize, max_write_buffer_size: usize, - ) -> crate::Result { + ) -> Result { let stream = MaybeTlsStream::Plain(stream); Self::negotiate_connection( - tokio_tungstenite::accept_async(stream).await?, + tokio_tungstenite::accept_async(stream) + .await + .map_err(NegotiationError::WebSocket)?, None, Role::Listener, address, @@ -279,7 +282,7 @@ impl WebSocketConnection { yamux_config: crate::yamux::Config, max_read_ahead_factor: usize, max_write_buffer_size: usize, - ) -> crate::Result { + ) -> Result { tracing::trace!( target: LOG_TARGET, ?connection_id, @@ -310,12 +313,11 @@ impl WebSocketConnection { if let Some(dialed_peer) = dialed_peer { if peer != dialed_peer { - return Err(Error::PeerIdMismatch(dialed_peer, peer)); + return Err(NegotiationError::PeerIdMismatch(dialed_peer, peer)); } } let stream: NoiseSocket> = stream; - tracing::trace!(target: LOG_TARGET, "noise handshake done"); // negotiate `yamux` @@ -347,7 +349,7 @@ impl WebSocketConnection { permit: Permit, substream_id: SubstreamId, protocols: Vec, - ) -> crate::Result { + ) -> Result { tracing::trace!( target: LOG_TARGET, ?substream_id, @@ -379,7 +381,7 @@ impl WebSocketConnection { substream_id: SubstreamId, protocol: ProtocolName, fallback_names: Vec, - ) -> crate::Result { + ) -> Result { tracing::debug!(target: LOG_TARGET, ?protocol, ?substream_id, "open substream"); let stream = match control.open_stream().await { @@ -394,7 +396,10 @@ impl WebSocketConnection { ?error, "failed to open substream" ); - return Err(Error::YamuxError(Direction::Outbound(substream_id), error)); + return Err(SubstreamError::YamuxError( + error, + Direction::Outbound(substream_id), + )); } }; @@ -441,7 +446,7 @@ impl WebSocketConnection { Ok(Err(error)) => Err(ConnectionError::FailedToNegotiate { protocol: None, substream_id: None, - error, + error: SubstreamError::NegotiationError(error), }), Err(_) => Err(ConnectionError::Timeout { protocol: None, @@ -481,7 +486,7 @@ impl WebSocketConnection { let (protocol, substream_id, error) = match error { ConnectionError::Timeout { protocol, substream_id } => { - (protocol, substream_id, Error::Timeout) + (protocol, substream_id, SubstreamError::NegotiationError(NegotiationError::Timeout)) } ConnectionError::FailedToNegotiate { protocol, substream_id, error } => { (protocol, substream_id, error) diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index 75360f31c..f7999735c 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -22,7 +22,7 @@ use crate::{ config::Role, - error::{AddressError, Error}, + error::{AddressError, Error, NegotiationError}, transport::{ common::listener::{DialAddresses, GetSocketAddr, SocketListener, WebSocketAddress}, manager::TransportHandle, @@ -33,7 +33,7 @@ use crate::{ Transport, TransportBuilder, TransportEvent, }, types::ConnectionId, - PeerId, + DialError, PeerId, }; use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; @@ -60,24 +60,6 @@ mod substream; pub mod config; -#[derive(Debug)] -pub(super) struct WebSocketError { - /// Error. - error: Error, - - /// Connection ID. - connection_id: Option, -} - -impl WebSocketError { - pub fn new(error: Error, connection_id: Option) -> Self { - Self { - error, - connection_id, - } - } -} - /// Logging target for the file. const LOG_TARGET: &str = "litep2p::websocket"; @@ -110,8 +92,9 @@ pub(crate) struct WebSocketTransport { pending_inbound_connections: HashMap, /// Pending connections. - pending_connections: - FuturesUnordered>>, + pending_connections: FuturesUnordered< + BoxFuture<'static, Result>, + >, /// Pending raw, unnegotiated connections. pending_raw_connections: FuturesUnordered< @@ -123,7 +106,7 @@ pub(crate) struct WebSocketTransport { Multiaddr, WebSocketStream>, ), - ConnectionId, + (ConnectionId, Vec<(Multiaddr, DialError)>), >, >, >, @@ -168,11 +151,11 @@ impl WebSocketTransport { max_write_buffer_size, ) .await - .map_err(|error| WebSocketError::new(error, None)) + .map_err(|error| (connection_id, error.into())) }) .await { - Err(_) => Err(WebSocketError::new(Error::Timeout, None)), + Err(_) => Err((connection_id, DialError::Timeout)), Ok(Err(error)) => Err(error), Ok(Ok(result)) => Ok(result), } @@ -180,31 +163,25 @@ impl WebSocketTransport { } /// Convert `Multiaddr` into `url::Url` - fn multiaddr_into_url(address: Multiaddr) -> crate::Result<(Url, PeerId)> { + fn multiaddr_into_url(address: Multiaddr) -> Result<(Url, PeerId), AddressError> { let mut protocol_stack = address.iter(); - let dial_address = match protocol_stack - .next() - .ok_or_else(|| Error::TransportNotSupported(address.clone()))? - { + let dial_address = match protocol_stack.next().ok_or(AddressError::InvalidProtocol)? { Protocol::Ip4(address) => address.to_string(), Protocol::Ip6(address) => format!("[{address}]"), Protocol::Dns(address) | Protocol::Dns4(address) | Protocol::Dns6(address) => address.to_string(), - _ => return Err(Error::TransportNotSupported(address)), + _ => return Err(AddressError::InvalidProtocol), }; - let url = match protocol_stack - .next() - .ok_or_else(|| Error::TransportNotSupported(address.clone()))? - { + let url = match protocol_stack.next().ok_or(AddressError::InvalidProtocol)? { Protocol::Tcp(port) => match protocol_stack.next() { Some(Protocol::Ws(_)) => format!("ws://{dial_address}:{port}/"), Some(Protocol::Wss(_)) => format!("wss://{dial_address}:{port}/"), - _ => return Err(Error::TransportNotSupported(address.clone())), + _ => return Err(AddressError::InvalidProtocol), }, - _ => return Err(Error::TransportNotSupported(address)), + _ => return Err(AddressError::InvalidProtocol), }; let peer = match protocol_stack.next() { @@ -215,13 +192,15 @@ impl WebSocketTransport { ?protocol, "invalid protocol, expected `Protocol::Ws`/`Protocol::Wss`", ); - return Err(Error::AddressError(AddressError::PeerIdMissing)); + return Err(AddressError::PeerIdMissing); } }; tracing::trace!(target: LOG_TARGET, ?url, "parse address"); - url::Url::parse(&url).map(|url| (url, peer)).map_err(|_| Error::InvalidData) + url::Url::parse(&url) + .map(|url| (url, peer)) + .map_err(|_| AddressError::InvalidUrl) } /// Dial remote peer over `address`. @@ -230,14 +209,14 @@ impl WebSocketTransport { dial_addresses: DialAddresses, connection_open_timeout: Duration, nodelay: bool, - ) -> crate::Result<(Multiaddr, WebSocketStream>)> { + ) -> Result<(Multiaddr, WebSocketStream>), DialError> { let (url, _) = Self::multiaddr_into_url(address.clone())?; let (socket_address, _) = WebSocketAddress::multiaddr_to_socket_address(&address)?; let remote_address = match tokio::time::timeout(connection_open_timeout, socket_address.lookup_ip()).await { - Err(_) => return Err(Error::Timeout), - Ok(Err(error)) => return Err(error), + Err(_) => return Err(DialError::Timeout), + Ok(Err(error)) => return Err(error.into()), Ok(Ok(address)) => address, }; @@ -274,27 +253,26 @@ impl WebSocketTransport { Ok(()) => {} Err(error) if error.raw_os_error() == Some(libc::EINPROGRESS) => {} Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {} - Err(error) => return Err(Error::Other(error.to_string())), + Err(err) => return Err(DialError::from(err)), } - let stream = TcpStream::try_from(Into::::into(socket)) - .map_err(|error| Error::Other(error.to_string()))?; - stream.writable().await.map_err(|error| Error::Other(error.to_string()))?; - - if let Some(error) = - stream.take_error().map_err(|error| Error::Other(error.to_string()))? - { - return Err(Error::Other(error.to_string())); + let stream = TcpStream::try_from(Into::::into(socket))?; + stream.writable().await?; + if let Some(e) = stream.take_error()? { + return Err(DialError::from(e)); } Ok(( address, - tokio_tungstenite::client_async_tls(url, stream).await?.0, + tokio_tungstenite::client_async_tls(url, stream) + .await + .map_err(NegotiationError::WebSocket)? + .0, )) }; match tokio::time::timeout(connection_open_timeout, future).await { - Err(_) => Err(Error::Timeout), + Err(_) => Err(DialError::Timeout), Ok(Err(error)) => Err(error), Ok(Ok((address, stream))) => Ok((address, stream)), } @@ -366,7 +344,7 @@ impl Transport for WebSocketTransport { nodelay, ) .await - .map_err(|error| WebSocketError::new(error, Some(connection_id)))?; + .map_err(|error| (connection_id, error))?; WebSocketConnection::open_connection( connection_id, @@ -380,12 +358,12 @@ impl Transport for WebSocketTransport { max_write_buffer_size, ) .await - .map_err(|error| WebSocketError::new(error, Some(connection_id))) + .map_err(|error| (connection_id, error.into())) }; self.pending_connections.push(Box::pin(async move { match tokio::time::timeout(connection_open_timeout, future).await { - Err(_) => Err(WebSocketError::new(Error::Timeout, Some(connection_id))), + Err(_) => Err((connection_id, DialError::Timeout)), Ok(Err(error)) => Err(error), Ok(Ok(result)) => Ok(result), } @@ -459,6 +437,7 @@ impl Transport for WebSocketTransport { connection_id: ConnectionId, addresses: Vec, ) -> crate::Result<()> { + let num_addresses = addresses.len(); let mut futures: FuturesUnordered<_> = addresses .into_iter() .map(|address| { @@ -468,30 +447,36 @@ impl Transport for WebSocketTransport { async move { WebSocketTransport::dial_peer( - address, + address.clone(), dial_addresses, connection_open_timeout, nodelay, ) .await + .map_err(|error| (address, error)) } }) .collect(); self.pending_raw_connections.push(Box::pin(async move { + let mut errors = Vec::with_capacity(num_addresses); + while let Some(result) = futures.next().await { match result { Ok((address, stream)) => return Ok((connection_id, address, stream)), - Err(error) => tracing::debug!( - target: LOG_TARGET, - ?connection_id, - ?error, - "failed to open connection", - ), + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + ?connection_id, + ?error, + "failed to open connection", + ); + errors.push(error) + } } } - Err(connection_id) + Err((connection_id, errors)) })); Ok(()) @@ -536,11 +521,11 @@ impl Transport for WebSocketTransport { max_write_buffer_size, ) .await - .map_err(|error| WebSocketError::new(error, Some(connection_id))) + .map_err(|error| (connection_id, error.into())) }) .await { - Err(_) => Err(WebSocketError::new(Error::Timeout, Some(connection_id))), + Err(_) => Err((connection_id, DialError::Timeout)), Ok(Err(error)) => Err(error), Ok(Ok(connection)) => Ok(connection), } @@ -599,9 +584,12 @@ impl Stream for WebSocketTransport { })); } } - Err(connection_id) => + Err((connection_id, errors)) => if !self.canceled.remove(&connection_id) { - return Poll::Ready(Some(TransportEvent::OpenFailure { connection_id })); + return Poll::Ready(Some(TransportEvent::OpenFailure { + connection_id, + errors, + })); }, } } @@ -618,22 +606,17 @@ impl Stream for WebSocketTransport { endpoint, })); } - Err(error) => match error.connection_id { - Some(connection_id) => match self.pending_dials.remove(&connection_id) { - Some(address) => - return Poll::Ready(Some(TransportEvent::DialFailure { - connection_id, - address, - error: error.error, - })), - None => { - tracing::debug!(target: LOG_TARGET, ?error, "failed to establish connection") - } - }, - None => { - tracing::debug!(target: LOG_TARGET, ?error, "failed to establish connection") + Err((connection_id, error)) => { + if let Some(address) = self.pending_dials.remove(&connection_id) { + return Poll::Ready(Some(TransportEvent::DialFailure { + connection_id, + address, + error, + })); + } else { + tracing::debug!(target: LOG_TARGET, ?error, ?connection_id, "Pending inbound connection failed"); } - }, + } } } diff --git a/tests/connection/mod.rs b/tests/connection/mod.rs index 769a161f9..8f037e623 100644 --- a/tests/connection/mod.rs +++ b/tests/connection/mod.rs @@ -21,7 +21,7 @@ use litep2p::{ config::ConfigBuilder, crypto::ed25519::Keypair, - error::Error, + error::{DialError, Error, NegotiationError}, protocol::libp2p::ping::{Config as PingConfig, PingEvent}, transport::tcp::config::Config as TcpConfig, Litep2p, Litep2pEvent, PeerId, @@ -366,7 +366,11 @@ async fn connection_timeout(transport: Transport, address: Multiaddr) { assert_eq!(dial_address, address); println!("{error:?}"); - assert!(std::matches!(error, Error::Timeout)); + match error { + DialError::Timeout => {} + DialError::NegotiationError(NegotiationError::Timeout) => {} + _ => panic!("unexpected error {error:?}"), + } } #[cfg(feature = "quic")]