Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 216 additions & 31 deletions src/transport/quic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
crypto::tls::make_client_config,
error::{AddressError, DialError, Error, QuicError},
transport::{
common::listener::{AddressType, DnsType},
manager::TransportHandle,
quic::{config::Config as QuicConfig, connection::QuicConnection, listener::QuicListener},
Endpoint as Litep2pEndpoint, Transport, TransportBuilder, TransportEvent,
Expand Down Expand Up @@ -102,6 +103,9 @@ pub(crate) struct QuicTransport {
/// QUIC listener.
listener: QuicListener,

/// DNS resolver.
resolver: Arc<TokioResolver>,

/// Pending dials.
pending_dials: HashMap<ConnectionId, Multiaddr>,

Expand Down Expand Up @@ -129,6 +133,73 @@ pub(crate) struct QuicTransport {
}

impl QuicTransport {
/// Parse a QUIC multiaddress, supporting both IP and DNS addresses.
///
/// Accepted formats:
/// - `/ip4/<addr>/udp/<port>/quic-v1[/p2p/<peer-id>]`
/// - `/ip6/<addr>/udp/<port>/quic-v1[/p2p/<peer-id>]`
/// - `/dns/<host>/udp/<port>/quic-v1[/p2p/<peer-id>]`
/// - `/dns4/<host>/udp/<port>/quic-v1[/p2p/<peer-id>]`
/// - `/dns6/<host>/udp/<port>/quic-v1[/p2p/<peer-id>]`
fn parse_quic_address(
address: &Multiaddr,
) -> Result<(AddressType, Option<PeerId>), AddressError> {
tracing::trace!(target: LOG_TARGET, ?address, "parse quic multi address");

let mut iter = address.iter();

let address_type = match iter.next() {
Some(Protocol::Ip4(addr)) => match iter.next() {
Some(Protocol::Udp(port)) =>
AddressType::Socket(SocketAddr::new(IpAddr::V4(addr), port)),
_ => return Err(AddressError::InvalidProtocol),
},
Some(Protocol::Ip6(addr)) => match iter.next() {
Some(Protocol::Udp(port)) =>
AddressType::Socket(SocketAddr::new(IpAddr::V6(addr), port)),
_ => return Err(AddressError::InvalidProtocol),
},
Some(Protocol::Dns(host)) => match iter.next() {
Some(Protocol::Udp(port)) => AddressType::Dns {
address: host.to_string(),
port,
dns_type: DnsType::Dns,
},
_ => return Err(AddressError::InvalidProtocol),
},
Some(Protocol::Dns4(host)) => match iter.next() {
Some(Protocol::Udp(port)) => AddressType::Dns {
address: host.to_string(),
port,
dns_type: DnsType::Dns4,
},
_ => return Err(AddressError::InvalidProtocol),
},
Some(Protocol::Dns6(host)) => match iter.next() {
Some(Protocol::Udp(port)) => AddressType::Dns {
address: host.to_string(),
port,
dns_type: DnsType::Dns6,
},
_ => return Err(AddressError::InvalidProtocol),
},
_ => return Err(AddressError::InvalidProtocol),
};

match iter.next() {
Some(Protocol::QuicV1) => {}
_ => return Err(AddressError::InvalidProtocol),
}

let maybe_peer = match iter.next() {
Some(Protocol::P2p(multihash)) => Some(PeerId::from_multihash(multihash)?),
None => None,
_ => return Err(AddressError::PeerIdMissing),
};

Ok((address_type, maybe_peer))
}

/// Attempt to extract `PeerId` from connection certificates.
fn extract_peer_id(connection: &Connection) -> Option<PeerId> {
let certificates: Box<Vec<rustls::Certificate>> =
Expand Down Expand Up @@ -217,7 +288,7 @@ impl TransportBuilder for QuicTransport {
fn new(
context: TransportHandle,
mut config: Self::Config,
_resolver: Arc<TokioResolver>,
resolver: Arc<TokioResolver>,
) -> crate::Result<(Self, Vec<Multiaddr>)>
where
Self: Sized,
Expand All @@ -238,6 +309,7 @@ impl TransportBuilder for QuicTransport {
context,
config,
listener,
resolver,
opened_raw: HashMap::new(),
pending_open: HashMap::new(),
pending_dials: HashMap::new(),
Expand All @@ -253,42 +325,60 @@ impl TransportBuilder for QuicTransport {

impl Transport for QuicTransport {
fn dial(&mut self, connection_id: ConnectionId, address: Multiaddr) -> crate::Result<()> {
let Ok((socket_address, Some(peer))) = QuicListener::get_socket_address(&address) else {
let (address_type, Some(peer)) =
Self::parse_quic_address(&address).map_err(Error::AddressError)?
else {
return Err(Error::AddressError(AddressError::PeerIdMissing));
};

let crypto_config =
Arc::new(make_client_config(&self.context.keypair, Some(peer)).expect("to succeed"));
let mut transport_config = quinn::TransportConfig::default();
let timeout =
IdleTimeout::try_from(self.config.connection_open_timeout).expect("to succeed");
transport_config.max_idle_timeout(Some(timeout));
let mut client_config = ClientConfig::new(crypto_config);
client_config.transport_config(Arc::new(transport_config));

let client_listen_address = match address.iter().next() {
Some(Protocol::Ip6(_)) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
Some(Protocol::Ip4(_)) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
_ => return Err(Error::AddressError(AddressError::InvalidProtocol)),
};

let client = Endpoint::client(client_listen_address)
.map_err(|error| Error::Other(error.to_string()))?;
let connection = client
.connect_with(client_config, socket_address, "l")
.map_err(|error| Error::Other(error.to_string()))?;
let keypair = self.context.keypair.clone();
let connection_open_timeout = self.config.connection_open_timeout;
let resolver = self.resolver.clone();

tracing::trace!(
target: LOG_TARGET,
?address,
?peer,
?client_listen_address,
"dial peer",
);

self.pending_dials.insert(connection_id, address);

self.pending_connections.push(Box::pin(async move {
let socket_address = match tokio::time::timeout(
connection_open_timeout,
address_type.lookup_ip(resolver),
)
.await
{
Err(_) => return (connection_id, Err(DialError::Timeout)),
Ok(Err(error)) => return (connection_id, Err(error.into())),
Ok(Ok(address)) => address,
};

let crypto_config =
Arc::new(make_client_config(&keypair, Some(peer)).expect("to succeed"));
let mut transport_config = quinn::TransportConfig::default();
let timeout = IdleTimeout::try_from(connection_open_timeout).expect("to succeed");
transport_config.max_idle_timeout(Some(timeout));
let mut client_config = ClientConfig::new(crypto_config);
client_config.transport_config(Arc::new(transport_config));

let client_listen_address = if socket_address.is_ipv4() {
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)
} else {
SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)
};

let client = match Endpoint::client(client_listen_address) {
Ok(client) => client,
Err(error) => return (connection_id, Err(DialError::from(error))),
};
let connection = match client.connect_with(client_config, socket_address, "l") {
Ok(connection) => connection,
Err(error) => return (connection_id, Err(DialError::from(error))),
};

let connection = match connection.await {
Ok(connection) => connection,
Err(error) => return (connection_id, Err(DialError::from(error))),
Expand Down Expand Up @@ -372,14 +462,26 @@ impl Transport for QuicTransport {
.map(|address| {
let keypair = self.context.keypair.clone();
let connection_open_timeout = self.config.connection_open_timeout;
let resolver = self.resolver.clone();
let addr = address.clone();

let future = async move {
let (socket_address, peer) = QuicListener::get_socket_address(&address)
.map_err(DialError::AddressError)?;
let (address_type, peer) =
Self::parse_quic_address(&address).map_err(DialError::AddressError)?;
let peer =
peer.ok_or_else(|| DialError::AddressError(AddressError::PeerIdMissing))?;

let socket_address = match tokio::time::timeout(
connection_open_timeout,
address_type.lookup_ip(resolver),
)
.await
{
Err(_) => return Err(DialError::Timeout),
Ok(Err(error)) => return Err(error.into()),
Ok(Ok(address)) => address,
};

let crypto_config =
Arc::new(make_client_config(&keypair, Some(peer)).expect("to succeed"));
let mut transport_config = quinn::TransportConfig::default();
Expand All @@ -389,12 +491,10 @@ impl Transport for QuicTransport {
let mut client_config = ClientConfig::new(crypto_config);
client_config.transport_config(Arc::new(transport_config));

let client_listen_address = match address.iter().next() {
Some(Protocol::Ip6(_)) =>
SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
Some(Protocol::Ip4(_)) =>
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
_ => return Err(AddressError::InvalidProtocol.into()),
let client_listen_address = if socket_address.is_ipv4() {
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)
} else {
SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)
};

let client = match Endpoint::client(client_listen_address) {
Expand Down Expand Up @@ -592,6 +692,7 @@ mod tests {
use crate::{
codec::ProtocolCodec,
crypto::ed25519::Keypair,
error::AddressError,
executor::DefaultExecutor,
transport::manager::{ProtocolContext, TransportHandle},
types::protocol::ProtocolName,
Expand All @@ -600,6 +701,90 @@ mod tests {
use multihash::Multihash;
use tokio::sync::mpsc::channel;

fn peer_id() -> PeerId {
PeerId::from_public_key(&Keypair::generate().public().into())
}

#[test]
fn parse_quic_address() {
let peer = peer_id();

// IPv4 with peer ID
let address: Multiaddr =
format!("/ip4/192.168.1.1/udp/5000/quic-v1/p2p/{peer}").parse().unwrap();
let (address_type, parsed_peer) = QuicTransport::parse_quic_address(&address).unwrap();
assert!(
matches!(address_type, AddressType::Socket(addr) if addr.port() == 5000 && addr.ip().to_string() == "192.168.1.1")
);
assert_eq!(parsed_peer, Some(peer));

// IPv6 with peer ID
let address: Multiaddr = format!("/ip6/::1/udp/9000/quic-v1/p2p/{peer}").parse().unwrap();
let (address_type, parsed_peer) = QuicTransport::parse_quic_address(&address).unwrap();
assert!(matches!(address_type, AddressType::Socket(addr) if addr.port() == 9000));
assert_eq!(parsed_peer, Some(peer));

// DNS with peer ID
let address: Multiaddr =
format!("/dns/example.com/udp/5000/quic-v1/p2p/{peer}").parse().unwrap();
let (address_type, parsed_peer) = QuicTransport::parse_quic_address(&address).unwrap();
assert!(matches!(
address_type,
AddressType::Dns { ref address, port: 5000, dns_type: DnsType::Dns }
if address == "example.com"
));
assert_eq!(parsed_peer, Some(peer));

// DNS4 with peer ID
let address: Multiaddr =
format!("/dns4/example.com/udp/8080/quic-v1/p2p/{peer}").parse().unwrap();
let (address_type, parsed_peer) = QuicTransport::parse_quic_address(&address).unwrap();
assert!(matches!(
address_type,
AddressType::Dns { ref address, port: 8080, dns_type: DnsType::Dns4 }
if address == "example.com"
));
assert_eq!(parsed_peer, Some(peer));

// DNS6 with peer ID
let address: Multiaddr =
format!("/dns6/example.com/udp/3000/quic-v1/p2p/{peer}").parse().unwrap();
let (address_type, parsed_peer) = QuicTransport::parse_quic_address(&address).unwrap();
assert!(matches!(
address_type,
AddressType::Dns { ref address, port: 3000, dns_type: DnsType::Dns6 }
if address == "example.com"
));
assert_eq!(parsed_peer, Some(peer));

// Without peer ID
let address: Multiaddr = "/ip4/192.168.1.1/udp/5000/quic-v1".parse().unwrap();
let (address_type, parsed_peer) = QuicTransport::parse_quic_address(&address).unwrap();
assert!(matches!(address_type, AddressType::Socket(_)));
assert_eq!(parsed_peer, None);

// Invalid: missing quic-v1
let address: Multiaddr = "/ip4/192.168.1.1/udp/5000".parse().unwrap();
assert!(matches!(
QuicTransport::parse_quic_address(&address),
Err(AddressError::InvalidProtocol)
));

// Invalid: TCP instead of UDP
let address: Multiaddr = "/ip4/192.168.1.1/tcp/5000/quic-v1".parse().unwrap();
assert!(matches!(
QuicTransport::parse_quic_address(&address),
Err(AddressError::InvalidProtocol)
));

// Invalid: DNS with TCP instead of UDP
let address: Multiaddr = "/dns4/example.com/tcp/5000/quic-v1".parse().unwrap();
assert!(matches!(
QuicTransport::parse_quic_address(&address),
Err(AddressError::InvalidProtocol)
));
}

#[tokio::test]
async fn test_quinn() {
let _ = tracing_subscriber::fmt()
Expand Down