diff --git a/core/src/transport/mod.rs b/core/src/transport/mod.rs index 6cc9dc7bb30..c864bb9c394 100644 --- a/core/src/transport/mod.rs +++ b/core/src/transport/mod.rs @@ -209,12 +209,12 @@ pub trait Transport { } /// Begins a series of protocol upgrades via an [`upgrade::Builder`]. - fn upgrade(self) -> upgrade::Builder + fn upgrade(self, version: upgrade::Version) -> upgrade::Builder where Self: Sized, Self::Error: 'static { - upgrade::Builder::new(self) + upgrade::Builder::new(self, version) } } diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 4a4535ff381..aad6fa5fe3e 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -20,6 +20,8 @@ //! Configuration of transport protocol upgrades. +pub use crate::upgrade::Version; + use crate::{ ConnectedPoint, ConnectionInfo, @@ -68,7 +70,8 @@ use tokio_io::{AsyncRead, AsyncWrite}; /// /// [`Network`]: crate::nodes::Network pub struct Builder { - inner: T + inner: T, + version: upgrade::Version, } impl Builder @@ -77,8 +80,8 @@ where T::Error: 'static, { /// Creates a `Builder` over the given (base) `Transport`. - pub fn new(transport: T) -> Builder { - Builder { inner: transport } + pub fn new(inner: T, version: upgrade::Version) -> Builder { + Builder { inner, version } } /// Upgrades the transport to perform authentication of the remote. @@ -105,11 +108,12 @@ where U: OutboundUpgrade + Clone, E: Error + 'static, { + let version = self.version; Builder::new(self.inner.and_then(move |conn, endpoint| { Authenticate { - inner: upgrade::apply(conn, upgrade, endpoint) + inner: upgrade::apply(conn, upgrade, endpoint, version) } - })) + }), version) } /// Applies an arbitrary upgrade on an authenticated, non-multiplexed @@ -133,7 +137,7 @@ where U: OutboundUpgrade + Clone, E: Error + 'static, { - Builder::new(Upgrade::new(self.inner, upgrade)) + Builder::new(Upgrade::new(self.inner, upgrade), self.version) } /// Upgrades the transport with a (sub)stream multiplexer. @@ -158,8 +162,9 @@ where U: OutboundUpgrade + Clone, E: Error + 'static, { + let version = self.version; self.inner.and_then(move |(i, c), endpoint| { - let upgrade = upgrade::apply(c, upgrade, endpoint); + let upgrade = upgrade::apply(c, upgrade, endpoint, version); Multiplex { info: Some(i), upgrade } }) } @@ -332,7 +337,7 @@ where future::Either::A(ref mut up) => { let (i, c) = try_ready!(self.future.poll().map_err(TransportUpgradeError::Transport)); let u = up.take().expect("DialUpgradeFuture is constructed with Either::A(Some)."); - future::Either::B((Some(i), apply_outbound(c, u))) + future::Either::B((Some(i), apply_outbound(c, u, upgrade::Version::V1))) } future::Either::B((ref mut i, ref mut up)) => { let d = try_ready!(up.poll().map_err(TransportUpgradeError::Upgrade)); diff --git a/core/src/upgrade/apply.rs b/core/src/upgrade/apply.rs index 787ec4c4574..13912c33c9b 100644 --- a/core/src/upgrade/apply.rs +++ b/core/src/upgrade/apply.rs @@ -20,15 +20,17 @@ use crate::ConnectedPoint; use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError}; -use crate::upgrade::{ProtocolName, NegotiatedComplete}; +use crate::upgrade::ProtocolName; use futures::{future::Either, prelude::*}; use log::debug; use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture}; use std::{iter, mem}; use tokio_io::{AsyncRead, AsyncWrite}; +pub use multistream_select::Version; + /// Applies an upgrade to the inbound and outbound direction of a connection or substream. -pub fn apply(conn: C, up: U, cp: ConnectedPoint) +pub fn apply(conn: C, up: U, cp: ConnectedPoint, v: Version) -> Either, OutboundUpgradeApply> where C: AsyncRead + AsyncWrite, @@ -37,7 +39,7 @@ where if cp.is_listener() { Either::A(apply_inbound(conn, up)) } else { - Either::B(apply_outbound(conn, up)) + Either::B(apply_outbound(conn, up, v)) } } @@ -55,13 +57,13 @@ where } /// Tries to perform an upgrade on an outbound connection or substream. -pub fn apply_outbound(conn: C, up: U) -> OutboundUpgradeApply +pub fn apply_outbound(conn: C, up: U, v: Version) -> OutboundUpgradeApply where C: AsyncRead + AsyncWrite, U: OutboundUpgrade { let iter = up.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>); - let future = multistream_select::dialer_select_proto(conn, iter); + let future = multistream_select::dialer_select_proto(conn, iter, v); OutboundUpgradeApply { inner: OutboundUpgradeApplyState::Init { future, upgrade: up } } @@ -155,11 +157,6 @@ where future: DialerSelectFuture::IntoIter>>, upgrade: U }, - AwaitNegotiated { - io: NegotiatedComplete, - upgrade: U, - protocol: U::Info - }, Upgrade { future: U::Future }, @@ -185,24 +182,8 @@ where return Ok(Async::NotReady) } }; - self.inner = OutboundUpgradeApplyState::AwaitNegotiated { - io: connection.complete(), - protocol: info.0, - upgrade - }; - } - OutboundUpgradeApplyState::AwaitNegotiated { mut io, protocol, upgrade } => { - let io = match io.poll()? { - Async::NotReady => { - self.inner = OutboundUpgradeApplyState::AwaitNegotiated { - io, protocol, upgrade - }; - return Ok(Async::NotReady) - } - Async::Ready(io) => io - }; self.inner = OutboundUpgradeApplyState::Upgrade { - future: upgrade.upgrade_outbound(io, protocol) + future: upgrade.upgrade_outbound(connection, info.0) }; } OutboundUpgradeApplyState::Upgrade { mut future } => { diff --git a/core/src/upgrade/mod.rs b/core/src/upgrade/mod.rs index 7403655f513..d9bb7b33154 100644 --- a/core/src/upgrade/mod.rs +++ b/core/src/upgrade/mod.rs @@ -68,7 +68,7 @@ mod transfer; use futures::future::Future; -pub use multistream_select::{Negotiated, NegotiatedComplete, NegotiationError, ProtocolError}; +pub use multistream_select::{Version, Negotiated, NegotiatedComplete, NegotiationError, ProtocolError}; pub use self::{ apply::{apply, apply_inbound, apply_outbound, InboundUpgradeApply, OutboundUpgradeApply}, denied::DeniedUpgrade, diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index cc9c3dfac53..f5306647508 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -95,7 +95,7 @@ fn deny_incoming_connec() { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = libp2p_tcp::TcpConfig::new() - .upgrade() + .upgrade(upgrade::Version::V1) .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()); Network::new(transport, local_public_key.into()) @@ -105,7 +105,7 @@ fn deny_incoming_connec() { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = libp2p_tcp::TcpConfig::new() - .upgrade() + .upgrade(upgrade::Version::V1) .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()); Network::new(transport, local_public_key.into()) @@ -170,7 +170,7 @@ fn dial_self() { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = libp2p_tcp::TcpConfig::new() - .upgrade() + .upgrade(upgrade::Version::V1) .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()) .and_then(|(peer, mplex), _| { @@ -249,7 +249,7 @@ fn dial_self_by_id() { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = libp2p_tcp::TcpConfig::new() - .upgrade() + .upgrade(upgrade::Version::V1) .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()); Network::new(transport, local_public_key.into()) @@ -267,7 +267,7 @@ fn multiple_addresses_err() { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = libp2p_tcp::TcpConfig::new() - .upgrade() + .upgrade(upgrade::Version::V1) .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()); Network::new(transport, local_public_key.into()) diff --git a/core/tests/network_simult.rs b/core/tests/network_simult.rs index 958631b5142..2db3152e752 100644 --- a/core/tests/network_simult.rs +++ b/core/tests/network_simult.rs @@ -110,7 +110,7 @@ fn raw_swarm_simultaneous_connect() { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = libp2p_tcp::TcpConfig::new() - .upgrade() + .upgrade(upgrade::Version::V1Lazy) .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()) .and_then(|(peer, mplex), _| { @@ -125,7 +125,7 @@ fn raw_swarm_simultaneous_connect() { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = libp2p_tcp::TcpConfig::new() - .upgrade() + .upgrade(upgrade::Version::V1Lazy) .authenticate(libp2p_secio::SecioConfig::new(local_key)) .multiplex(libp2p_mplex::MplexConfig::new()) .and_then(|(peer, mplex), _| { diff --git a/core/tests/transport_upgrade.rs b/core/tests/transport_upgrade.rs index 61b96f35e80..1620e8fc14c 100644 --- a/core/tests/transport_upgrade.rs +++ b/core/tests/transport_upgrade.rs @@ -24,7 +24,7 @@ use futures::future::Future; use futures::stream::Stream; use libp2p_core::identity; use libp2p_core::transport::{Transport, MemoryTransport, ListenerEvent}; -use libp2p_core::upgrade::{UpgradeInfo, Negotiated, InboundUpgrade, OutboundUpgrade}; +use libp2p_core::upgrade::{self, UpgradeInfo, Negotiated, InboundUpgrade, OutboundUpgrade}; use libp2p_mplex::MplexConfig; use libp2p_secio::SecioConfig; use multiaddr::Multiaddr; @@ -78,7 +78,7 @@ fn upgrade_pipeline() { let listener_keys = identity::Keypair::generate_ed25519(); let listener_id = listener_keys.public().into_peer_id(); let listener_transport = MemoryTransport::default() - .upgrade() + .upgrade(upgrade::Version::V1) .authenticate(SecioConfig::new(listener_keys)) .apply(HelloUpgrade {}) .apply(HelloUpgrade {}) @@ -93,7 +93,7 @@ fn upgrade_pipeline() { let dialer_keys = identity::Keypair::generate_ed25519(); let dialer_id = dialer_keys.public().into_peer_id(); let dialer_transport = MemoryTransport::default() - .upgrade() + .upgrade(upgrade::Version::V1) .authenticate(SecioConfig::new(dialer_keys)) .apply(HelloUpgrade {}) .apply(HelloUpgrade {}) diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index dc39f753230..c17d9d80659 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -42,18 +42,16 @@ use crate::{Negotiated, NegotiationError}; /// determined through the `size_hint` of the given iterator and thus /// an inaccurate size estimate may result in a suboptimal choice. /// -/// > **Note**: When multiple `DialerSelectFuture`s are composed, i.e. a -/// > dialer performs multiple, nested protocol negotiations with just a -/// > single supported protocol (0-RTT negotiations), a listener that -/// > does not support one of the intermediate protocols may still process -/// > the request data associated with a supported follow-up protocol. -/// > See \[[1]\]. To avoid this behaviour, a dialer should ensure completion -/// > of the previous negotiation before starting the next negotiation, -/// > which can be accomplished by waiting for the future returned by -/// > [`Negotiated::complete`] to resolve. -/// -/// [1]: https://github.com/multiformats/go-multistream/issues/20 -pub fn dialer_select_proto(inner: R, protocols: I) -> DialerSelectFuture +/// Within the scope of this library, a dialer always commits to a specific +/// multistream-select protocol [`Version`], whereas a listener always supports +/// all versions supported by this library. Frictionless multistream-select +/// protocol upgrades may thus proceed by deployments with updated listeners, +/// eventually followed by deployments of dialers choosing the newer protocol. +pub fn dialer_select_proto( + inner: R, + protocols: I, + version: Version +) -> DialerSelectFuture where R: AsyncRead + AsyncWrite, I: IntoIterator, @@ -62,9 +60,9 @@ where let iter = protocols.into_iter(); // We choose between the "serial" and "parallel" strategies based on the number of protocols. if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) { - Either::A(dialer_select_proto_serial(inner, iter)) + Either::A(dialer_select_proto_serial(inner, iter, version)) } else { - Either::B(dialer_select_proto_parallel(inner, iter)) + Either::B(dialer_select_proto_parallel(inner, iter, version)) } } @@ -80,7 +78,11 @@ pub type DialerSelectFuture = Either, DialerSelectPa /// trying the given list of supported protocols one-by-one. /// /// This strategy is preferable if the dialer only supports a few protocols. -pub fn dialer_select_proto_serial(inner: R, protocols: I) -> DialerSelectSeq +pub fn dialer_select_proto_serial( + inner: R, + protocols: I, + version: Version +) -> DialerSelectSeq where R: AsyncRead + AsyncWrite, I: IntoIterator, @@ -88,9 +90,10 @@ where { let protocols = protocols.into_iter().peekable(); DialerSelectSeq { + version, protocols, state: SeqState::SendHeader { - io: MessageIO::new(inner) + io: MessageIO::new(inner), } } } @@ -104,7 +107,11 @@ where /// /// This strategy may be beneficial if the dialer supports many protocols /// and it is unclear whether the remote supports one of the first few. -pub fn dialer_select_proto_parallel(inner: R, protocols: I) -> DialerSelectPar +pub fn dialer_select_proto_parallel( + inner: R, + protocols: I, + version: Version +) -> DialerSelectPar where R: AsyncRead + AsyncWrite, I: IntoIterator, @@ -112,6 +119,7 @@ where { let protocols = protocols.into_iter(); DialerSelectPar { + version, protocols, state: ParState::SendHeader { io: MessageIO::new(inner) @@ -129,7 +137,8 @@ where { // TODO: It would be nice if eventually N = I::Item = Protocol. protocols: iter::Peekable, - state: SeqState + state: SeqState, + version: Version, } enum SeqState @@ -157,7 +166,7 @@ where loop { match mem::replace(&mut self.state, SeqState::Done) { SeqState::SendHeader { mut io } => { - if io.start_send(Message::Header(Version::V1))?.is_not_ready() { + if io.start_send(Message::Header(self.version))?.is_not_ready() { self.state = SeqState::SendHeader { io }; return Ok(Async::NotReady) } @@ -174,9 +183,14 @@ where if self.protocols.peek().is_some() { self.state = SeqState::FlushProtocol { io, protocol } } else { - debug!("Dialer: Expecting proposed protocol: {}", p); - let io = Negotiated::expecting(io.into_reader(), p); - return Ok(Async::Ready((protocol, io))) + match self.version { + Version::V1 => self.state = SeqState::FlushProtocol { io, protocol }, + Version::V1Lazy => { + debug!("Dialer: Expecting proposed protocol: {}", p); + let io = Negotiated::expecting(io.into_reader(), p, self.version); + return Ok(Async::Ready((protocol, io))) + } + } } } SeqState::FlushProtocol { mut io, protocol } => { @@ -199,7 +213,7 @@ where }; match msg { - Message::Header(Version::V1) => { + Message::Header(v) if v == self.version => { self.state = SeqState::AwaitProtocol { io, protocol }; } Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => { @@ -234,7 +248,8 @@ where I::Item: AsRef<[u8]> { protocols: I, - state: ParState + state: ParState, + version: Version, } enum ParState @@ -263,7 +278,7 @@ where loop { match mem::replace(&mut self.state, ParState::Done) { ParState::SendHeader { mut io } => { - if io.start_send(Message::Header(Version::V1))?.is_not_ready() { + if io.start_send(Message::Header(self.version))?.is_not_ready() { self.state = ParState::SendHeader { io }; return Ok(Async::NotReady) } @@ -297,7 +312,7 @@ where }; match &msg { - Message::Header(Version::V1) => { + Message::Header(v) if v == &self.version => { self.state = ParState::RecvProtocols { io } } Message::Protocols(supported) => { @@ -319,7 +334,7 @@ where return Ok(Async::NotReady) } debug!("Dialer: Expecting proposed protocol: {}", p); - let io = Negotiated::expecting(io.into_reader(), p); + let io = Negotiated::expecting(io.into_reader(), p, self.version); return Ok(Async::Ready((protocol, io))) } ParState::Done => panic!("ParState::poll called after completion") diff --git a/misc/multistream-select/src/lib.rs b/misc/multistream-select/src/lib.rs index 9dd89e3cbe2..6ab6eabedbf 100644 --- a/misc/multistream-select/src/lib.rs +++ b/misc/multistream-select/src/lib.rs @@ -62,7 +62,6 @@ //! yet have sent the last negotiation message despite having settled on a protocol //! proposed by the dialer that it supports. //! -//! //! This behaviour allows both the dialer and the listener to send data //! relating to the negotiated protocol together with the last negotiation //! message(s), which, in the case of the dialer only supporting a single @@ -79,7 +78,7 @@ //! ```no_run //! # fn main() { //! use bytes::Bytes; -//! use multistream_select::dialer_select_proto; +//! use multistream_select::{dialer_select_proto, Version}; //! use futures::{Future, Sink, Stream}; //! use tokio_tcp::TcpStream; //! use tokio::runtime::current_thread::Runtime; @@ -91,7 +90,7 @@ //! .from_err() //! .and_then(move |io| { //! let protos = vec![b"/echo/1.0.0", b"/echo/2.5.0"]; -//! dialer_select_proto(io, protos) // .map(|r| r.0) +//! dialer_select_proto(io, protos, Version::V1) //! }) //! .map(|(protocol, _io)| protocol); //! @@ -110,7 +109,7 @@ mod protocol; mod tests; pub use self::negotiated::{Negotiated, NegotiatedComplete, NegotiationError}; -pub use self::protocol::ProtocolError; +pub use self::protocol::{ProtocolError, Version}; pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture}; pub use self::listener_select::{listener_select_proto, ListenerSelectFuture}; diff --git a/misc/multistream-select/src/listener_select.rs b/misc/multistream-select/src/listener_select.rs index a62581158dd..f6a39bfb0f7 100644 --- a/misc/multistream-select/src/listener_select.rs +++ b/misc/multistream-select/src/listener_select.rs @@ -36,7 +36,10 @@ use crate::{Negotiated, NegotiationError}; /// computation that performs the protocol negotiation with the remote. The /// returned `Future` resolves with the name of the negotiated protocol and /// a [`Negotiated`] I/O stream. -pub fn listener_select_proto(inner: R, protocols: I) -> ListenerSelectFuture +pub fn listener_select_proto( + inner: R, + protocols: I, +) -> ListenerSelectFuture where R: AsyncRead + AsyncWrite, I: IntoIterator, @@ -78,7 +81,7 @@ where N: AsRef<[u8]> { RecvHeader { io: MessageIO }, - SendHeader { io: MessageIO }, + SendHeader { io: MessageIO, version: Version }, RecvMessage { io: MessageIO }, SendMessage { io: MessageIO, @@ -102,22 +105,8 @@ where match mem::replace(&mut self.state, State::Done) { State::RecvHeader { mut io } => { match io.poll()? { - Async::Ready(Some(Message::Header(Version::V1))) => { - self.state = State::SendHeader { io } - } - Async::Ready(Some(Message::Header(Version::V2))) => { - // The V2 protocol is not yet supported and not even - // yet fully specified or implemented anywhere. For - // now we just return 'na' to force any dialer to - // fall back to V1, according to the current plans - // for the "transition period". - // - // See: https://github.com/libp2p/specs/pull/95. - self.state = State::SendMessage { - io, - message: Message::NotAvailable, - protocol: None, - } + Async::Ready(Some(Message::Header(version))) => { + self.state = State::SendHeader { io, version } } Async::Ready(Some(_)) => { return Err(ProtocolError::InvalidMessage.into()) @@ -132,11 +121,14 @@ where } } } - State::SendHeader { mut io } => { - if io.start_send(Message::Header(Version::V1))?.is_not_ready() { + State::SendHeader { mut io, version } => { + if io.start_send(Message::Header(version))?.is_not_ready() { return Ok(Async::NotReady) } - self.state = State::RecvMessage { io }; + self.state = match version { + Version::V1 => State::Flush { io }, + Version::V1Lazy => State::RecvMessage { io }, + } } State::RecvMessage { mut io } => { let msg = match io.poll() { diff --git a/misc/multistream-select/src/negotiated.rs b/misc/multistream-select/src/negotiated.rs index 3edc705d7a7..3519d6ccce1 100644 --- a/misc/multistream-select/src/negotiated.rs +++ b/misc/multistream-select/src/negotiated.rs @@ -70,8 +70,8 @@ impl Negotiated { /// Creates a `Negotiated` in state [`State::Expecting`] that is still /// expecting confirmation of the given `protocol`. - pub(crate) fn expecting(io: MessageReader, protocol: Protocol) -> Self { - Negotiated { state: State::Expecting { io, protocol } } + pub(crate) fn expecting(io: MessageReader, protocol: Protocol, version: Version) -> Self { + Negotiated { state: State::Expecting { io, protocol, version } } } /// Polls the `Negotiated` for completion. @@ -100,27 +100,29 @@ impl Negotiated { // Read outstanding protocol negotiation messages. loop { match mem::replace(&mut self.state, State::Invalid) { - State::Expecting { mut io, protocol } => { + State::Expecting { mut io, protocol, version } => { let msg = match io.poll() { Ok(Async::Ready(Some(msg))) => msg, Ok(Async::NotReady) => { - self.state = State::Expecting { io, protocol }; + self.state = State::Expecting { io, protocol, version }; return Ok(Async::NotReady) } Ok(Async::Ready(None)) => { - self.state = State::Expecting { io, protocol }; + self.state = State::Expecting { io, protocol, version }; return Err(ProtocolError::IoError( io::ErrorKind::UnexpectedEof.into()).into()) } Err(err) => { - self.state = State::Expecting { io, protocol }; + self.state = State::Expecting { io, protocol, version }; return Err(err.into()) } }; - if let Message::Header(Version::V1) = &msg { - self.state = State::Expecting { io, protocol }; - continue + if let Message::Header(v) = &msg { + if v == &version { + self.state = State::Expecting { io, protocol, version }; + continue + } } if let Message::Protocol(p) = &msg { @@ -152,7 +154,14 @@ impl Negotiated { enum State { /// In this state, a `Negotiated` is still expecting to /// receive confirmation of the protocol it as settled on. - Expecting { io: MessageReader, protocol: Protocol }, + Expecting { + /// The underlying I/O stream. + io: MessageReader, + /// The expected protocol (i.e. name and version). + protocol: Protocol, + /// The expected multistream-select protocol version. + version: Version + }, /// In this state, a protocol has been agreed upon and may /// only be pending the sending of the final acknowledgement, diff --git a/misc/multistream-select/src/protocol.rs b/misc/multistream-select/src/protocol.rs index 8e82ff181e3..a21b80030f8 100644 --- a/misc/multistream-select/src/protocol.rs +++ b/misc/multistream-select/src/protocol.rs @@ -50,21 +50,72 @@ const MAX_PROTOCOL_LEN: usize = 140; /// The encoded form of a multistream-select 1.0.0 header message. const MSG_MULTISTREAM_1_0: &[u8] = b"/multistream/1.0.0\n"; -/// The encoded form of a multistream-select 2.0.0 header message. -const MSG_MULTISTREAM_2_0: &[u8] = b"/multistream/2.0.0\n"; +/// The encoded form of a multistream-select 1.0.0 header message. +const MSG_MULTISTREAM_1_0_LAZY: &[u8] = b"/multistream-lazy/1\n"; /// The encoded form of a multistream-select 'na' message. const MSG_PROTOCOL_NA: &[u8] = b"na\n"; /// The encoded form of a multistream-select 'ls' message. const MSG_LS: &[u8] = b"ls\n"; -/// The known multistream-select protocol versions. +/// Supported multistream-select protocol versions. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum Version { - /// The first and currently still the only deployed version - /// of multistream-select. + /// Version 1 of the multistream-select protocol. See [1] and [2]. + /// + /// [1] https://github.com/libp2p/specs/blob/master/connections/README.md#protocol-negotiation + /// [2] https://github.com/multiformats/multistream-select V1, - /// Draft: https://github.com/libp2p/specs/pull/95 - V2, + /// A lazy variant of version 1 that is identical on the wire but delays + /// sending of protocol negotiation data as much as possible. + /// + /// Delaying the sending of protocol negotiation data can result in + /// significantly fewer network roundtrips used for the negotiation, + /// up to 0-RTT negotiation. + /// + /// 0-RTT negotiation is achieved if the dialer supports only a single + /// application protocol. In that case the dialer immedidately settles + /// on that protocol, buffering the negotiation messages to be sent + /// with the first round of application protocol data (or an attempt + /// is made to read from the `Negotiated` I/O stream). + /// + /// A listener receiving a `V1Lazy` header will similarly delay sending + /// of the protocol confirmation. Though typically the listener will need + /// to read the request data before sending its response, thus triggering + /// sending of the protocol confirmation, which, in absence of additional + /// buffering on lower layers will result in at least two response frames + /// to be sent. + /// + /// `V1Lazy` is specific to `rust-libp2p`: While the wire protocol + /// is identical to `V1`, delayed sending of protocol negotiation frames + /// is only safe under the following assumptions: + /// + /// 1. The dialer is assumed to always send the first multistream-select + /// protocol message immediately after the multistream header, without + /// first waiting for confirmation of that header. Since the listener + /// delays sending the protocol confirmation, a deadlock situation may + /// otherwise occurs that is only resolved by a timeout. This assumption + /// is trivially satisfied if both peers support and use `V1Lazy`. + /// + /// 2. When nesting multiple protocol negotiations, the listener is either + /// known to support all of the dialer's optimistically chosen protocols + /// or there is no intermediate protocol without a payload and none of + /// the protocol payloads has the potential for being mistaken for a + /// multistream-select protocol message. This avoids rare edge-cases whereby + /// the listener may not recognize upgrade boundaries and erroneously + /// process a request despite not supporting one of the intermediate + /// protocols that the dialer committed to. See [1] and [2]. + /// + /// [1]: https://github.com/multiformats/go-multistream/issues/20 + /// [2]: https://github.com/libp2p/rust-libp2p/pull/1212 + V1Lazy, + // Draft: https://github.com/libp2p/specs/pull/95 + // V2, +} + +impl Default for Version { + fn default() -> Self { + Version::V1 + } } /// A protocol (name) exchanged during protocol negotiation. @@ -131,9 +182,9 @@ impl Message { dest.put(MSG_MULTISTREAM_1_0); Ok(()) } - Message::Header(Version::V2) => { - dest.reserve(MSG_MULTISTREAM_2_0.len()); - dest.put(MSG_MULTISTREAM_2_0); + Message::Header(Version::V1Lazy) => { + dest.reserve(MSG_MULTISTREAM_1_0_LAZY.len()); + dest.put(MSG_MULTISTREAM_1_0_LAZY); Ok(()) } Message::Protocol(p) => { @@ -170,12 +221,12 @@ impl Message { /// Decodes a `Message` from its byte representation. pub fn decode(mut msg: Bytes) -> Result { - if msg == MSG_MULTISTREAM_1_0 { - return Ok(Message::Header(Version::V1)) + if msg == MSG_MULTISTREAM_1_0_LAZY { + return Ok(Message::Header(Version::V1Lazy)) } - if msg == MSG_MULTISTREAM_2_0 { - return Ok(Message::Header(Version::V2)) + if msg == MSG_MULTISTREAM_1_0 { + return Ok(Message::Header(Version::V1)) } if msg.get(0) == Some(&b'/') && msg.last() == Some(&b'\n') && msg.len() <= MAX_PROTOCOL_LEN { diff --git a/misc/multistream-select/src/tests.rs b/misc/multistream-select/src/tests.rs index 95e7c151849..0f2a33abd00 100644 --- a/misc/multistream-select/src/tests.rs +++ b/misc/multistream-select/src/tests.rs @@ -22,7 +22,7 @@ #![cfg(test)] -use crate::NegotiationError; +use crate::{Version, NegotiationError}; use crate::dialer_select::{dialer_select_proto_parallel, dialer_select_proto_serial}; use crate::{dialer_select_proto, listener_select_proto}; use futures::prelude::*; @@ -32,137 +32,157 @@ use tokio_io::io as nio; #[test] fn select_proto_basic() { - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); - let listener_addr = listener.local_addr().unwrap(); - - let server = listener - .incoming() - .into_future() - .map(|s| s.0.unwrap()) - .map_err(|(e, _)| e.into()) - .and_then(move |connec| { - let protos = vec![b"/proto1", b"/proto2"]; - listener_select_proto(connec, protos) - }) - .and_then(|(proto, io)| { - nio::write_all(io, b"pong").from_err().map(move |_| proto) - }); - - let client = TcpStream::connect(&listener_addr) - .from_err() - .and_then(move |connec| { - let protos = vec![b"/proto3", b"/proto2"]; - dialer_select_proto(connec, protos) - }) - .and_then(|(proto, io)| { - nio::write_all(io, b"ping").from_err().map(move |(io, _)| (proto, io)) - }) - .and_then(|(proto, io)| { - nio::read_exact(io, [0; 4]).from_err().map(move |(_, msg)| { - assert_eq!(&msg, b"pong"); - proto + fn run(version: Version) { + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener_addr = listener.local_addr().unwrap(); + + let server = listener + .incoming() + .into_future() + .map(|s| s.0.unwrap()) + .map_err(|(e, _)| e.into()) + .and_then(move |connec| { + let protos = vec![b"/proto1", b"/proto2"]; + listener_select_proto(connec, protos) }) - }); - - let mut rt = Runtime::new().unwrap(); - let (dialer_chosen, listener_chosen) = - rt.block_on(client.join(server)).unwrap(); + .and_then(|(proto, io)| { + nio::write_all(io, b"pong").from_err().map(move |_| proto) + }); + + let client = TcpStream::connect(&listener_addr) + .from_err() + .and_then(move |connec| { + let protos = vec![b"/proto3", b"/proto2"]; + dialer_select_proto(connec, protos, version) + }) + .and_then(|(proto, io)| { + nio::write_all(io, b"ping").from_err().map(move |(io, _)| (proto, io)) + }) + .and_then(|(proto, io)| { + nio::read_exact(io, [0; 4]).from_err().map(move |(_, msg)| { + assert_eq!(&msg, b"pong"); + proto + }) + }); + + let mut rt = Runtime::new().unwrap(); + let (dialer_chosen, listener_chosen) = + rt.block_on(client.join(server)).unwrap(); + + assert_eq!(dialer_chosen, b"/proto2"); + assert_eq!(listener_chosen, b"/proto2"); + } - assert_eq!(dialer_chosen, b"/proto2"); - assert_eq!(listener_chosen, b"/proto2"); + run(Version::V1); + run(Version::V1Lazy); } #[test] fn no_protocol_found() { - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); - let listener_addr = listener.local_addr().unwrap(); - - let server = listener - .incoming() - .into_future() - .map(|s| s.0.unwrap()) - .map_err(|(e, _)| e.into()) - .and_then(move |connec| { - let protos = vec![b"/proto1", b"/proto2"]; - listener_select_proto(connec, protos) - }) - .and_then(|(proto, io)| io.complete().map(move |_| proto)); - - let client = TcpStream::connect(&listener_addr) - .from_err() - .and_then(move |connec| { - let protos = vec![b"/proto3", b"/proto4"]; - dialer_select_proto(connec, protos) - }) - .and_then(|(proto, io)| io.complete().map(move |_| proto)); - - let mut rt = Runtime::new().unwrap(); - match rt.block_on(client.join(server)) { - Err(NegotiationError::Failed) => (), - e => panic!("{:?}", e), + fn run(version: Version) { + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener_addr = listener.local_addr().unwrap(); + + let server = listener + .incoming() + .into_future() + .map(|s| s.0.unwrap()) + .map_err(|(e, _)| e.into()) + .and_then(move |connec| { + let protos = vec![b"/proto1", b"/proto2"]; + listener_select_proto(connec, protos) + }) + .and_then(|(proto, io)| io.complete().map(move |_| proto)); + + let client = TcpStream::connect(&listener_addr) + .from_err() + .and_then(move |connec| { + let protos = vec![b"/proto3", b"/proto4"]; + dialer_select_proto(connec, protos, version) + }) + .and_then(|(proto, io)| io.complete().map(move |_| proto)); + + let mut rt = Runtime::new().unwrap(); + match rt.block_on(client.join(server)) { + Err(NegotiationError::Failed) => (), + e => panic!("{:?}", e), + } } + + run(Version::V1); + run(Version::V1Lazy); } #[test] fn select_proto_parallel() { - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); - let listener_addr = listener.local_addr().unwrap(); - - let server = listener - .incoming() - .into_future() - .map(|s| s.0.unwrap()) - .map_err(|(e, _)| e.into()) - .and_then(move |connec| { - let protos = vec![b"/proto1", b"/proto2"]; - listener_select_proto(connec, protos) - }) - .and_then(|(proto, io)| io.complete().map(move |_| proto)); - - let client = TcpStream::connect(&listener_addr) - .from_err() - .and_then(move |connec| { - let protos = vec![b"/proto3", b"/proto2"]; - dialer_select_proto_parallel(connec, protos.into_iter()) - }) - .and_then(|(proto, io)| io.complete().map(move |_| proto)); - - let mut rt = Runtime::new().unwrap(); - let (dialer_chosen, listener_chosen) = - rt.block_on(client.join(server)).unwrap(); - - assert_eq!(dialer_chosen, b"/proto2"); - assert_eq!(listener_chosen, b"/proto2"); + fn run(version: Version) { + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener_addr = listener.local_addr().unwrap(); + + let server = listener + .incoming() + .into_future() + .map(|s| s.0.unwrap()) + .map_err(|(e, _)| e.into()) + .and_then(move |connec| { + let protos = vec![b"/proto1", b"/proto2"]; + listener_select_proto(connec, protos) + }) + .and_then(|(proto, io)| io.complete().map(move |_| proto)); + + let client = TcpStream::connect(&listener_addr) + .from_err() + .and_then(move |connec| { + let protos = vec![b"/proto3", b"/proto2"]; + dialer_select_proto_parallel(connec, protos.into_iter(), version) + }) + .and_then(|(proto, io)| io.complete().map(move |_| proto)); + + let mut rt = Runtime::new().unwrap(); + let (dialer_chosen, listener_chosen) = + rt.block_on(client.join(server)).unwrap(); + + assert_eq!(dialer_chosen, b"/proto2"); + assert_eq!(listener_chosen, b"/proto2"); + } + + run(Version::V1); + run(Version::V1Lazy); } #[test] fn select_proto_serial() { - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); - let listener_addr = listener.local_addr().unwrap(); - - let server = listener - .incoming() - .into_future() - .map(|s| s.0.unwrap()) - .map_err(|(e, _)| e.into()) - .and_then(move |connec| { - let protos = vec![b"/proto1", b"/proto2"]; - listener_select_proto(connec, protos) - }) - .and_then(|(proto, io)| io.complete().map(move |_| proto)); - - let client = TcpStream::connect(&listener_addr) - .from_err() - .and_then(move |connec| { - let protos = vec![b"/proto3", b"/proto2"]; - dialer_select_proto_serial(connec, protos.into_iter()) - }) - .and_then(|(proto, io)| io.complete().map(move |_| proto)); - - let mut rt = Runtime::new().unwrap(); - let (dialer_chosen, listener_chosen) = - rt.block_on(client.join(server)).unwrap(); - - assert_eq!(dialer_chosen, b"/proto2"); - assert_eq!(listener_chosen, b"/proto2"); + fn run(version: Version) { + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener_addr = listener.local_addr().unwrap(); + + let server = listener + .incoming() + .into_future() + .map(|s| s.0.unwrap()) + .map_err(|(e, _)| e.into()) + .and_then(move |connec| { + let protos = vec![b"/proto1", b"/proto2"]; + listener_select_proto(connec, protos) + }) + .and_then(|(proto, io)| io.complete().map(move |_| proto)); + + let client = TcpStream::connect(&listener_addr) + .from_err() + .and_then(move |connec| { + let protos = vec![b"/proto3", b"/proto2"]; + dialer_select_proto_serial(connec, protos.into_iter(), version) + }) + .and_then(|(proto, io)| io.complete().map(move |_| proto)); + + let mut rt = Runtime::new().unwrap(); + let (dialer_chosen, listener_chosen) = + rt.block_on(client.join(server)).unwrap(); + + assert_eq!(dialer_chosen, b"/proto2"); + assert_eq!(listener_chosen, b"/proto2"); + } + + run(Version::V1); + run(Version::V1Lazy); } diff --git a/muxers/mplex/tests/async_write.rs b/muxers/mplex/tests/async_write.rs index 8d728302437..4fe3c319cb0 100644 --- a/muxers/mplex/tests/async_write.rs +++ b/muxers/mplex/tests/async_write.rs @@ -34,7 +34,8 @@ fn async_write() { let bg_thread = thread::spawn(move || { let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e)); + let transport = TcpConfig::new().and_then(move |c, e| + upgrade::apply(c, mplex, e, upgrade::Version::V1)); let mut listener = transport .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) @@ -69,7 +70,8 @@ fn async_write() { }); let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e)); + let transport = TcpConfig::new().and_then(move |c, e| + upgrade::apply(c, mplex, e, upgrade::Version::V1)); let future = transport .dial(rx.recv().unwrap()) diff --git a/muxers/mplex/tests/two_peers.rs b/muxers/mplex/tests/two_peers.rs index aaa4fca2baa..e3e7d5d7fbc 100644 --- a/muxers/mplex/tests/two_peers.rs +++ b/muxers/mplex/tests/two_peers.rs @@ -37,7 +37,8 @@ fn client_to_server_outbound() { let bg_thread = thread::spawn(move || { let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e)); + let transport = TcpConfig::new().and_then(move |c, e| + upgrade::apply(c, mplex, e, upgrade::Version::V1)); let mut listener = transport .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) @@ -77,7 +78,8 @@ fn client_to_server_outbound() { }); let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e)); + let transport = TcpConfig::new().and_then(move |c, e| + upgrade::apply(c, mplex, e, upgrade::Version::V1)); let future = transport .dial(rx.recv().unwrap()) @@ -101,7 +103,8 @@ fn client_to_server_inbound() { let bg_thread = thread::spawn(move || { let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e)); + let transport = TcpConfig::new().and_then(move |c, e| + upgrade::apply(c, mplex, e, upgrade::Version::V1)); let mut listener = transport .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) @@ -142,7 +145,8 @@ fn client_to_server_inbound() { }); let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e)); + let transport = TcpConfig::new().and_then(move |c, e| + upgrade::apply(c, mplex, e, upgrade::Version::V1)); let future = transport .dial(rx.recv().unwrap()) diff --git a/protocols/deflate/tests/test.rs b/protocols/deflate/tests/test.rs index a0b2c07fa8c..dd714836b94 100644 --- a/protocols/deflate/tests/test.rs +++ b/protocols/deflate/tests/test.rs @@ -32,7 +32,8 @@ fn deflate() { let _ = env_logger::try_init(); fn prop(message: Vec) -> bool { - let client = TcpConfig::new().and_then(|c, e| upgrade::apply(c, DeflateConfig {}, e)); + let client = TcpConfig::new().and_then(|c, e| + upgrade::apply(c, DeflateConfig {}, e, upgrade::Version::V1)); let server = client.clone(); run(server, client, message); true diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index 7c8b68e4467..699c365aa13 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -262,6 +262,7 @@ mod tests { muxing::StreamMuxer, Multiaddr, Transport, + upgrade }; use libp2p_tcp::TcpConfig; use libp2p_secio::SecioConfig; @@ -282,7 +283,7 @@ mod tests { let pubkey = id_keys.public(); let transport = TcpConfig::new() .nodelay(true) - .upgrade() + .upgrade(upgrade::Version::V1) .authenticate(SecioConfig::new(id_keys)) .multiplex(MplexConfig::new()); (pubkey, transport) diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 8b197414f94..adee47a5f04 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -288,7 +288,7 @@ mod tests { identity, Transport, transport::ListenerEvent, - upgrade::{apply_outbound, apply_inbound} + upgrade::{self, apply_outbound, apply_inbound} }; use std::{io, sync::mpsc, thread}; @@ -351,7 +351,7 @@ mod tests { let future = transport.dial(rx.recv().unwrap()) .unwrap() .and_then(|socket| { - apply_outbound(socket, IdentifyProtocolConfig) + apply_outbound(socket, IdentifyProtocolConfig, upgrade::Version::V1) .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) }) .and_then(|RemoteInfo { info, observed_addr, .. }| { diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index d39a312fe30..5bf8db1ba0b 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -34,6 +34,7 @@ use libp2p_core::{ nodes::Substream, multiaddr::{Protocol, multiaddr}, muxing::StreamMuxerBox, + upgrade }; use libp2p_secio::SecioConfig; use libp2p_swarm::Swarm; @@ -63,7 +64,7 @@ fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> (u64, Vec::new().into_authentic(&id_keys).unwrap(); //! let noise = NoiseConfig::xx(dh_keys).into_authenticated(); -//! let builder = TcpConfig::new().upgrade().authenticate(noise); +//! let builder = TcpConfig::new().upgrade(upgrade::Version::V1).authenticate(noise); //! // let transport = builder.multiplex(...); //! # } //! ``` diff --git a/protocols/noise/tests/smoke.rs b/protocols/noise/tests/smoke.rs index ff7a9d5a163..2dafaaab5f1 100644 --- a/protocols/noise/tests/smoke.rs +++ b/protocols/noise/tests/smoke.rs @@ -35,7 +35,7 @@ fn core_upgrade_compat() { let id_keys = identity::Keypair::generate_ed25519(); let dh_keys = Keypair::::new().into_authentic(&id_keys).unwrap(); let noise = NoiseConfig::xx(dh_keys).into_authenticated(); - let _ = TcpConfig::new().upgrade().authenticate(noise); + let _ = TcpConfig::new().upgrade(upgrade::Version::V1).authenticate(noise); } #[test] @@ -51,14 +51,14 @@ fn xx() { let server_dh = Keypair::::new().into_authentic(&server_id).unwrap(); let server_transport = TcpConfig::new() .and_then(move |output, endpoint| { - upgrade::apply(output, NoiseConfig::xx(server_dh), endpoint) + upgrade::apply(output, NoiseConfig::xx(server_dh), endpoint, upgrade::Version::V1) }) .and_then(move |out, _| expect_identity(out, &client_id_public)); let client_dh = Keypair::::new().into_authentic(&client_id).unwrap(); let client_transport = TcpConfig::new() .and_then(move |output, endpoint| { - upgrade::apply(output, NoiseConfig::xx(client_dh), endpoint) + upgrade::apply(output, NoiseConfig::xx(client_dh), endpoint, upgrade::Version::V1) }) .and_then(move |out, _| expect_identity(out, &server_id_public)); @@ -81,14 +81,14 @@ fn ix() { let server_dh = Keypair::::new().into_authentic(&server_id).unwrap(); let server_transport = TcpConfig::new() .and_then(move |output, endpoint| { - upgrade::apply(output, NoiseConfig::ix(server_dh), endpoint) + upgrade::apply(output, NoiseConfig::ix(server_dh), endpoint, upgrade::Version::V1) }) .and_then(move |out, _| expect_identity(out, &client_id_public)); let client_dh = Keypair::::new().into_authentic(&client_id).unwrap(); let client_transport = TcpConfig::new() .and_then(move |output, endpoint| { - upgrade::apply(output, NoiseConfig::ix(client_dh), endpoint) + upgrade::apply(output, NoiseConfig::ix(client_dh), endpoint, upgrade::Version::V1) }) .and_then(move |out, _| expect_identity(out, &server_id_public)); @@ -115,7 +115,8 @@ fn ik_xx() { if endpoint.is_listener() { Either::A(apply_inbound(output, NoiseConfig::ik_listener(server_dh))) } else { - Either::B(apply_outbound(output, NoiseConfig::xx(server_dh))) + Either::B(apply_outbound(output, NoiseConfig::xx(server_dh), + upgrade::Version::V1)) } }) .and_then(move |out, _| expect_identity(out, &client_id_public)); @@ -126,7 +127,8 @@ fn ik_xx() { .and_then(move |output, endpoint| { if endpoint.is_dialer() { Either::A(apply_outbound(output, - NoiseConfig::ik_dialer(client_dh, server_id_public, server_dh_public))) + NoiseConfig::ik_dialer(client_dh, server_id_public, server_dh_public), + upgrade::Version::V1)) } else { Either::B(apply_inbound(output, NoiseConfig::xx(client_dh))) } diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs index 926aad03e6d..ffb77a28213 100644 --- a/protocols/ping/src/protocol.rs +++ b/protocols/ping/src/protocol.rs @@ -217,7 +217,7 @@ mod tests { let client = MemoryTransport.dial(listener_addr).unwrap() .and_then(|c| { - upgrade::apply_outbound(c, Ping::default()) + upgrade::apply_outbound(c, Ping::default(), upgrade::Version::V1) .map_err(|e| panic!(e)) }); diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 6d6b98c2e6e..7c05ff772ee 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -25,10 +25,9 @@ use libp2p_core::{ PeerId, Negotiated, identity, - muxing::StreamMuxer, transport::{Transport, boxed::Boxed}, either::EitherError, - upgrade::UpgradeError + upgrade::{self, UpgradeError} }; use libp2p_ping::*; use libp2p_yamux::{self as yamux, Yamux}; @@ -36,7 +35,7 @@ use libp2p_secio::{SecioConfig, SecioOutput, SecioError}; use libp2p_swarm::Swarm; use libp2p_tcp::{TcpConfig, TcpTransStream}; use futures::{future, prelude::*}; -use std::{fmt, io, time::Duration, sync::mpsc::sync_channel}; +use std::{io, time::Duration, sync::mpsc::sync_channel}; use tokio::runtime::Runtime; #[test] @@ -114,7 +113,7 @@ fn mk_transport() -> ( let peer_id = id_keys.public().into_peer_id(); let transport = TcpConfig::new() .nodelay(true) - .upgrade() + .upgrade(upgrade::Version::V1) .authenticate(SecioConfig::new(id_keys)) .multiplex(yamux::Config::default()) .boxed(); diff --git a/protocols/secio/src/algo_support.rs b/protocols/secio/src/algo_support.rs index b37b4c856d3..39e58787e02 100644 --- a/protocols/secio/src/algo_support.rs +++ b/protocols/secio/src/algo_support.rs @@ -42,7 +42,7 @@ const SHA_256: &str = "SHA256"; const SHA_512: &str = "SHA512"; pub(crate) const DEFAULT_AGREEMENTS_PROPOSITION: &str = "P-256,P-384"; -pub(crate) const DEFAULT_CIPHERS_PROPOSITION: &str = "AES-128,AES-256,TwofishCTR"; +pub(crate) const DEFAULT_CIPHERS_PROPOSITION: &str = "NULL"; // "AES-128,AES-256,TwofishCTR"; pub(crate) const DEFAULT_DIGESTS_PROPOSITION: &str = "SHA256,SHA512"; /// Return a proposition string from the given sequence of `KeyAgreement` values. diff --git a/protocols/secio/src/lib.rs b/protocols/secio/src/lib.rs index 2965a92150c..cba09b475b6 100644 --- a/protocols/secio/src/lib.rs +++ b/protocols/secio/src/lib.rs @@ -31,7 +31,7 @@ //! # fn main() { //! use futures::Future; //! use libp2p_secio::{SecioConfig, SecioOutput}; -//! use libp2p_core::{PeerId, Multiaddr, identity}; +//! use libp2p_core::{PeerId, Multiaddr, identity, upgrade}; //! use libp2p_core::transport::Transport; //! use libp2p_mplex::MplexConfig; //! use libp2p_tcp::TcpConfig; @@ -41,7 +41,7 @@ //! //! // Create a `Transport`. //! let transport = TcpConfig::new() -//! .upgrade() +//! .upgrade(upgrade::Version::V1) //! .authenticate(SecioConfig::new(local_keys.clone())) //! .multiplex(MplexConfig::default()); //! diff --git a/src/lib.rs b/src/lib.rs index f69e50a0248..43c26d41161 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -244,7 +244,7 @@ pub fn build_tcp_ws_secio_mplex_yamux(keypair: identity::Keypair) -> impl Transport> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone { CommonTransport::new() - .upgrade() + .upgrade(core::upgrade::Version::V1) .authenticate(secio::SecioConfig::new(keypair)) .multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new())) .map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer))) diff --git a/swarm/src/protocols_handler/mod.rs b/swarm/src/protocols_handler/mod.rs index 855d95d4252..3ad4d3037a0 100644 --- a/swarm/src/protocols_handler/mod.rs +++ b/swarm/src/protocols_handler/mod.rs @@ -48,7 +48,7 @@ use futures::prelude::*; use libp2p_core::{ ConnectedPoint, PeerId, - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError}, + upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeError}, }; use std::{cmp::Ordering, error, fmt, time::Duration}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -244,6 +244,7 @@ pub trait ProtocolsHandler { #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub struct SubstreamProtocol { upgrade: TUpgrade, + upgrade_protocol: upgrade::Version, timeout: Duration, } @@ -255,10 +256,18 @@ impl SubstreamProtocol { pub fn new(upgrade: TUpgrade) -> SubstreamProtocol { SubstreamProtocol { upgrade, + upgrade_protocol: upgrade::Version::V1, timeout: Duration::from_secs(10), } } + /// Sets the multistream-select protocol (version) to use for negotiating + /// protocols upgrades on outbound substreams. + pub fn with_upgrade_protocol(mut self, version: upgrade::Version) -> Self { + self.upgrade_protocol = version; + self + } + /// Maps a function over the protocol upgrade. pub fn map_upgrade(self, f: F) -> SubstreamProtocol where @@ -266,6 +275,7 @@ impl SubstreamProtocol { { SubstreamProtocol { upgrade: f(self.upgrade), + upgrade_protocol: self.upgrade_protocol, timeout: self.timeout, } } @@ -287,8 +297,8 @@ impl SubstreamProtocol { } /// Converts the substream protocol configuration into the contained upgrade. - pub fn into_upgrade(self) -> TUpgrade { - self.upgrade + pub fn into_upgrade(self) -> (upgrade::Version, TUpgrade) { + (self.upgrade_protocol, self.upgrade) } } @@ -460,7 +470,7 @@ where T: ProtocolsHandler } fn inbound_protocol(&self) -> ::InboundProtocol { - self.listen_protocol().into_upgrade() + self.listen_protocol().into_upgrade().1 } } diff --git a/swarm/src/protocols_handler/node_handler.rs b/swarm/src/protocols_handler/node_handler.rs index 14b2e01cdad..15c9bcc0e87 100644 --- a/swarm/src/protocols_handler/node_handler.rs +++ b/swarm/src/protocols_handler/node_handler.rs @@ -111,7 +111,7 @@ where )>, /// For each outbound substream request, how to upgrade it. The first element of the tuple /// is the unique identifier (see `unique_dial_upgrade_id`). - queued_dial_upgrades: Vec<(u64, TProtoHandler::OutboundProtocol)>, + queued_dial_upgrades: Vec<(u64, (upgrade::Version, TProtoHandler::OutboundProtocol))>, /// Unique identifier assigned to each queued dial upgrade. unique_dial_upgrade_id: u64, /// The currently planned connection & handler shutdown. @@ -197,7 +197,7 @@ where NodeHandlerEndpoint::Listener => { let protocol = self.handler.listen_protocol(); let timeout = protocol.timeout().clone(); - let upgrade = upgrade::apply_inbound(substream, protocol.into_upgrade()); + let upgrade = upgrade::apply_inbound(substream, protocol.into_upgrade().1); let with_timeout = Timeout::new(upgrade, timeout); self.negotiating_in.push(with_timeout); } @@ -214,8 +214,8 @@ where } }; - let (_, proto_upgrade) = self.queued_dial_upgrades.remove(pos); - let upgrade = upgrade::apply_outbound(substream, proto_upgrade); + let (_, (version, upgrade)) = self.queued_dial_upgrades.remove(pos); + let upgrade = upgrade::apply_outbound(substream, upgrade, version); let with_timeout = Timeout::new(upgrade, timeout); self.negotiating_out.push((user_data, with_timeout)); } diff --git a/swarm/src/protocols_handler/select.rs b/swarm/src/protocols_handler/select.rs index 074920b1e64..7e930596eb2 100644 --- a/swarm/src/protocols_handler/select.rs +++ b/swarm/src/protocols_handler/select.rs @@ -126,8 +126,8 @@ where let proto1 = self.proto1.listen_protocol(); let proto2 = self.proto2.listen_protocol(); let timeout = std::cmp::max(proto1.timeout(), proto2.timeout()).clone(); - SubstreamProtocol::new(SelectUpgrade::new(proto1.into_upgrade(), proto2.into_upgrade())) - .with_timeout(timeout) + let choice = SelectUpgrade::new(proto1.into_upgrade().1, proto2.into_upgrade().1); + SubstreamProtocol::new(choice).with_timeout(timeout) } fn inject_fully_negotiated_outbound(&mut self, protocol: >::Output, endpoint: Self::OutboundOpenInfo) {