diff --git a/core/src/lib.rs b/core/src/lib.rs index 844fd2a23bc..2024c7190c2 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -61,7 +61,7 @@ pub use peer_id::PeerId; pub use identity::PublicKey; pub use transport::Transport; pub use translation::address_translation; -pub use upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, UpgradeError, ProtocolName}; +pub use upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo, UpgradeError, ProtocolName}; pub use connection::{Connected, Endpoint, ConnectedPoint}; pub use network::Network; diff --git a/core/src/upgrade.rs b/core/src/upgrade.rs index 9798ae6c27a..50b52b06b75 100644 --- a/core/src/upgrade.rs +++ b/core/src/upgrade.rs @@ -70,7 +70,7 @@ mod transfer; use futures::future::Future; pub use crate::Negotiated; -pub use multistream_select::{Version, NegotiatedComplete, NegotiationError, ProtocolError}; +pub use multistream_select::{Version, NegotiatedComplete, NegotiationError, ProtocolError, SimOpenRole}; pub use self::{ apply::{apply, apply_inbound, apply_outbound, InboundUpgradeApply, OutboundUpgradeApply}, denied::DeniedUpgrade, @@ -195,7 +195,7 @@ pub trait OutboundUpgrade: UpgradeInfo { /// method is called to start the handshake. /// /// The `info` is the identifier of the protocol, as produced by `protocol_info`. - fn upgrade_outbound(self, socket: C, info: Self::Info) -> Self::Future; + fn upgrade_outbound(self, socket: C, info: Self::Info, role: SimOpenRole) -> Self::Future; } /// Extention trait for `OutboundUpgrade`. Automatically implemented on all types that implement @@ -221,4 +221,3 @@ pub trait OutboundUpgradeExt: OutboundUpgrade { } impl> OutboundUpgradeExt for U {} - diff --git a/core/src/upgrade/apply.rs b/core/src/upgrade/apply.rs index eaf25e884b3..ce19c112690 100644 --- a/core/src/upgrade/apply.rs +++ b/core/src/upgrade/apply.rs @@ -162,7 +162,7 @@ where upgrade: U }, Upgrade { - future: Pin> + future: Pin>, }, Undefined } @@ -185,7 +185,7 @@ where loop { match mem::replace(&mut self.inner, OutboundUpgradeApplyState::Undefined) { OutboundUpgradeApplyState::Init { mut future, upgrade } => { - let (info, connection) = match Future::poll(Pin::new(&mut future), cx)? { + let (info, role, connection) = match Future::poll(Pin::new(&mut future), cx)? { Poll::Ready(x) => x, Poll::Pending => { self.inner = OutboundUpgradeApplyState::Init { future, upgrade }; @@ -193,7 +193,7 @@ where } }; self.inner = OutboundUpgradeApplyState::Upgrade { - future: Box::pin(upgrade.upgrade_outbound(connection, info.0)) + future: Box::pin(upgrade.upgrade_outbound(connection, info.0, role)), }; } OutboundUpgradeApplyState::Upgrade { mut future } => { @@ -230,4 +230,3 @@ impl AsRef<[u8]> for NameWrap { self.0.protocol_name() } } - diff --git a/core/src/upgrade/denied.rs b/core/src/upgrade/denied.rs index 93438e0bea8..3a36a8dbf3e 100644 --- a/core/src/upgrade/denied.rs +++ b/core/src/upgrade/denied.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use crate::upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; use futures::future; use std::iter; use void::Void; @@ -52,7 +52,7 @@ impl OutboundUpgrade for DeniedUpgrade { type Error = Void; type Future = future::Pending>; - fn upgrade_outbound(self, _: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, _: C, _: Self::Info, _: SimOpenRole) -> Self::Future { future::pending() } } diff --git a/core/src/upgrade/either.rs b/core/src/upgrade/either.rs index 28db987ccd7..afdf404573e 100644 --- a/core/src/upgrade/either.rs +++ b/core/src/upgrade/either.rs @@ -20,7 +20,7 @@ use crate::{ either::{EitherOutput, EitherError, EitherFuture2, EitherName}, - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo} + upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo} }; /// A type to represent two possible upgrade types (inbound or outbound). @@ -73,10 +73,10 @@ where type Error = EitherError; type Future = EitherFuture2; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future { match (self, info) { - (EitherUpgrade::A(a), EitherName::A(info)) => EitherFuture2::A(a.upgrade_outbound(sock, info)), - (EitherUpgrade::B(b), EitherName::B(info)) => EitherFuture2::B(b.upgrade_outbound(sock, info)), + (EitherUpgrade::A(a), EitherName::A(info)) => EitherFuture2::A(a.upgrade_outbound(sock, info, role)), + (EitherUpgrade::B(b), EitherName::B(info)) => EitherFuture2::B(b.upgrade_outbound(sock, info, role)), _ => panic!("Invalid invocation of EitherUpgrade::upgrade_outbound") } } @@ -107,4 +107,3 @@ where } } } - diff --git a/core/src/upgrade/from_fn.rs b/core/src/upgrade/from_fn.rs index c6ef52c1e08..cd0b119fc75 100644 --- a/core/src/upgrade/from_fn.rs +++ b/core/src/upgrade/from_fn.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{Endpoint, upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}}; +use crate::{Endpoint, upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, SimOpenRole, UpgradeInfo}}; use futures::prelude::*; use std::iter; @@ -33,7 +33,7 @@ use std::iter; /// # use std::io; /// let _transport = MemoryTransport::default() /// .and_then(move |out, cp| { -/// upgrade::apply(out, upgrade::from_fn("/foo/1", move |mut sock, endpoint| async move { +/// upgrade::apply(out, upgrade::from_fn("/foo/1", move |mut sock, endpoint, _| async move { /// if endpoint.is_dialer() { /// upgrade::write_one(&mut sock, "some handshake data").await?; /// } else { @@ -51,7 +51,7 @@ pub fn from_fn(protocol_name: P, fun: F) -> FromFnUpgrad where // Note: these bounds are there in order to help the compiler infer types P: ProtocolName + Clone, - F: FnOnce(C, Endpoint) -> Fut, + F: FnOnce(C, Endpoint, SimOpenRole) -> Fut, Fut: Future>, { FromFnUpgrade { protocol_name, fun } @@ -81,7 +81,7 @@ where impl InboundUpgrade for FromFnUpgrade where P: ProtocolName + Clone, - F: FnOnce(C, Endpoint) -> Fut, + F: FnOnce(C, Endpoint, SimOpenRole) -> Fut, Fut: Future>, { type Output = Out; @@ -89,21 +89,21 @@ where type Future = Fut; fn upgrade_inbound(self, sock: C, _: Self::Info) -> Self::Future { - (self.fun)(sock, Endpoint::Listener) + (self.fun)(sock, Endpoint::Listener, SimOpenRole::Responder) } } impl OutboundUpgrade for FromFnUpgrade where P: ProtocolName + Clone, - F: FnOnce(C, Endpoint) -> Fut, + F: FnOnce(C, Endpoint, SimOpenRole) -> Fut, Fut: Future>, { type Output = Out; type Error = Err; type Future = Fut; - fn upgrade_outbound(self, sock: C, _: Self::Info) -> Self::Future { - (self.fun)(sock, Endpoint::Dialer) + fn upgrade_outbound(self, sock: C, _: Self::Info, role: SimOpenRole) -> Self::Future { + (self.fun)(sock, Endpoint::Dialer, role) } } diff --git a/core/src/upgrade/map.rs b/core/src/upgrade/map.rs index 2f5ca31e207..4cd051247f2 100644 --- a/core/src/upgrade/map.rs +++ b/core/src/upgrade/map.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use crate::upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; use futures::prelude::*; use std::{pin::Pin, task::Context, task::Poll}; @@ -69,8 +69,8 @@ where type Error = U::Error; type Future = U::Future; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { - self.upgrade.upgrade_outbound(sock, info) + fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future { + self.upgrade.upgrade_outbound(sock, info, role) } } @@ -118,9 +118,9 @@ where type Error = U::Error; type Future = MapFuture; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future { MapFuture { - inner: self.upgrade.upgrade_outbound(sock, info), + inner: self.upgrade.upgrade_outbound(sock, info, role), map: Some(self.fun) } } @@ -173,8 +173,8 @@ where type Error = U::Error; type Future = U::Future; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { - self.upgrade.upgrade_outbound(sock, info) + fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future { + self.upgrade.upgrade_outbound(sock, info, role) } } @@ -209,9 +209,9 @@ where type Error = T; type Future = MapErrFuture; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future { MapErrFuture { - fut: self.upgrade.upgrade_outbound(sock, info), + fut: self.upgrade.upgrade_outbound(sock, info, role), fun: Some(self.fun) } } @@ -283,4 +283,3 @@ where } } } - diff --git a/core/src/upgrade/optional.rs b/core/src/upgrade/optional.rs index 02dc3c48f78..7790029e12f 100644 --- a/core/src/upgrade/optional.rs +++ b/core/src/upgrade/optional.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use crate::upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; /// Upgrade that can be disabled at runtime. /// @@ -76,9 +76,9 @@ where type Error = T::Error; type Future = T::Future; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future { if let Some(inner) = self.0 { - inner.upgrade_outbound(sock, info) + inner.upgrade_outbound(sock, info, role) } else { panic!("Bad API usage; a protocol has been negotiated while this struct contains None") } diff --git a/core/src/upgrade/select.rs b/core/src/upgrade/select.rs index 8fa4c5b8a7a..5055be98ac9 100644 --- a/core/src/upgrade/select.rs +++ b/core/src/upgrade/select.rs @@ -20,7 +20,7 @@ use crate::{ either::{EitherOutput, EitherError, EitherFuture2, EitherName}, - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo} + upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo} }; /// Upgrade that combines two upgrades into one. Supports all the protocols supported by either @@ -81,10 +81,10 @@ where type Error = EitherError; type Future = EitherFuture2; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future { match info { - EitherName::A(info) => EitherFuture2::A(self.0.upgrade_outbound(sock, info)), - EitherName::B(info) => EitherFuture2::B(self.1.upgrade_outbound(sock, info)) + EitherName::A(info) => EitherFuture2::A(self.0.upgrade_outbound(sock, info, role)), + EitherName::B(info) => EitherFuture2::B(self.1.upgrade_outbound(sock, info, role)) } } } @@ -117,4 +117,3 @@ where (min1.saturating_add(min2), max) } } - diff --git a/core/tests/transport_upgrade.rs b/core/tests/transport_upgrade.rs index eecace3e46f..da6e71f2614 100644 --- a/core/tests/transport_upgrade.rs +++ b/core/tests/transport_upgrade.rs @@ -23,7 +23,7 @@ mod util; use futures::prelude::*; use libp2p_core::identity; use libp2p_core::transport::{Transport, MemoryTransport}; -use libp2p_core::upgrade::{self, UpgradeInfo, InboundUpgrade, OutboundUpgrade}; +use libp2p_core::upgrade::{self, UpgradeInfo, InboundUpgrade, OutboundUpgrade, SimOpenRole}; use libp2p_mplex::MplexConfig; use libp2p_noise as noise; use multiaddr::{Multiaddr, Protocol}; @@ -68,7 +68,7 @@ where type Error = io::Error; type Future = Pin> + Send>>; - fn upgrade_outbound(self, mut socket: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, mut socket: C, _: Self::Info, _: SimOpenRole) -> Self::Future { Box::pin(async move { socket.write_all(b"hello").await.unwrap(); socket.flush().await.unwrap(); @@ -136,4 +136,3 @@ fn upgrade_pipeline() { async_std::task::spawn(server); async_std::task::block_on(client); } - diff --git a/misc/multistream-select/Cargo.toml b/misc/multistream-select/Cargo.toml index 7c86be9e6ac..ef689d5e7ee 100644 --- a/misc/multistream-select/Cargo.toml +++ b/misc/multistream-select/Cargo.toml @@ -14,6 +14,7 @@ bytes = "1" futures = "0.3" log = "0.4" pin-project = "1.0.0" +rand = "0.7" smallvec = "1.6.1" unsigned-varint = "0.7" diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index 34344bcd556..6835c3e4974 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -21,10 +21,11 @@ //! Protocol negotiation strategies for the peer acting as the dialer. use crate::{Negotiated, NegotiationError, Version}; -use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, HeaderLine}; +use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, HeaderLine, SIM_OPEN_ID}; use futures::{future::Either, prelude::*}; -use std::{convert::TryFrom as _, iter, mem, pin::Pin, task::{Context, Poll}}; +use std::{cmp::Ordering, convert::TryFrom as _, iter, mem, pin::Pin, task::{Context, Poll}}; + /// Returns a `Future` that negotiates a protocol on the given I/O stream /// for a peer acting as the _dialer_ (or _initiator_). @@ -56,11 +57,18 @@ where I::Item: AsRef<[u8]> { let iter = protocols.into_iter(); - // We choose between the "serial" and "parallel" strategies based on the number of protocols. - if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) { - Either::Left(dialer_select_proto_serial(inner, iter, version)) - } else { - Either::Right(dialer_select_proto_parallel(inner, iter, version)) + match version { + Version::V1 | Version::V1Lazy => { + // We choose between the "serial" and "parallel" strategies based on the number of protocols. + if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) { + Either::Left(dialer_select_proto_serial(inner, iter, version)) + } else { + Either::Right(dialer_select_proto_parallel(inner, iter, version)) + } + }, + Version::V1SimOpen => { + Either::Left(dialer_select_proto_serial(inner, iter, version)) + } } } @@ -136,7 +144,16 @@ pub struct DialerSelectSeq { } enum SeqState { - SendHeader { io: MessageIO, }, + SendHeader { io: MessageIO }, + + // Simultaneous open protocol extension + SendSimOpen { io: MessageIO, protocol: Option }, + FlushSimOpen { io: MessageIO, protocol: N }, + AwaitSimOpen { io: MessageIO, protocol: N }, + SimOpenPhase { selection: SimOpenPhase, protocol: N }, + Responder { responder: crate::ListenerSelectFuture }, + + // Standard multistream-select protocol SendProtocol { io: MessageIO, protocol: N }, FlushProtocol { io: MessageIO, protocol: N }, AwaitProtocol { io: MessageIO, protocol: N }, @@ -149,9 +166,10 @@ where // It also makes the implementation considerably easier to write. R: AsyncRead + AsyncWrite + Unpin, I: Iterator, - I::Item: AsRef<[u8]> + // TODO: Clone needed to embed ListenerSelectFuture. Still needed? + I::Item: AsRef<[u8]> + Clone { - type Output = Result<(I::Item, Negotiated), NegotiationError>; + type Output = Result<(I::Item, SimOpenRole, Negotiated), NegotiationError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -172,11 +190,124 @@ where return Poll::Ready(Err(From::from(err))); } - let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?; + match this.version { + Version::V1 | Version::V1Lazy => { + let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?; + + // The dialer always sends the header and the first protocol + // proposal in one go for efficiency. + *this.state = SeqState::SendProtocol { io, protocol }; + } + Version::V1SimOpen => { + *this.state = SeqState::SendSimOpen { io, protocol: None }; + } + } + } + + SeqState::SendSimOpen { mut io, protocol } => { + match Pin::new(&mut io).poll_ready(cx)? { + Poll::Ready(()) => {}, + Poll::Pending => { + *this.state = SeqState::SendSimOpen { io, protocol }; + return Poll::Pending + }, + } + + match protocol { + None => { + let msg = Message::Protocol(SIM_OPEN_ID); + if let Err(err) = Pin::new(&mut io).start_send(msg) { + return Poll::Ready(Err(From::from(err))); + } + + let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?; + *this.state = SeqState::SendSimOpen { io, protocol: Some(protocol) }; + } + Some(protocol) => { + let p = Protocol::try_from(protocol.as_ref())?; + if let Err(err) = Pin::new(&mut io).start_send(Message::Protocol(p.clone())) { + return Poll::Ready(Err(From::from(err))); + } + log::debug!("Dialer: Proposed protocol: {}", p); + + *this.state = SeqState::FlushSimOpen { io, protocol } + } + } + } + + SeqState::FlushSimOpen { mut io, protocol } => { + match Pin::new(&mut io).poll_flush(cx)? { + Poll::Ready(()) => { + *this.state = SeqState::AwaitSimOpen { io, protocol } + }, + Poll::Pending => { + *this.state = SeqState::FlushSimOpen { io, protocol }; + return Poll::Pending + }, + } + } + + SeqState::AwaitSimOpen { mut io, protocol } => { + let msg = match Pin::new(&mut io).poll_next(cx)? { + Poll::Ready(Some(msg)) => msg, + Poll::Pending => { + *this.state = SeqState::AwaitSimOpen { io, protocol }; + return Poll::Pending + } + // Treat EOF error as [`NegotiationError::Failed`], not as + // [`NegotiationError::ProtocolError`], allowing dropping or closing an I/O + // stream as a permissible way to "gracefully" fail a negotiation. + Poll::Ready(None) => return Poll::Ready(Err(NegotiationError::Failed)), + }; + + match msg { + Message::Header(v) if v == HeaderLine::from(*this.version) => { + *this.state = SeqState::AwaitSimOpen { io, protocol }; + } + Message::Protocol(p) if p == SIM_OPEN_ID => { + let selection = SimOpenPhase { + state: SimOpenState::SendNonce{ io }, + }; + *this.state = SeqState::SimOpenPhase { selection, protocol }; + } + Message::NotAvailable => { + *this.state = SeqState::AwaitProtocol { io, protocol } + } + _ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())) + } + } + + SeqState::SimOpenPhase { mut selection, protocol } => { + let (io, selection_res) = match Pin::new(&mut selection).poll(cx)? { + Poll::Ready((io, res)) => (io, res), + Poll::Pending => { + *this.state = SeqState::SimOpenPhase { selection, protocol }; + return Poll::Pending + } + }; + + match selection_res { + SimOpenRole::Initiator => { + *this.state = SeqState::SendProtocol { io, protocol }; + } + SimOpenRole::Responder => { + let protocols: Vec<_> = this.protocols.collect(); + *this.state = SeqState::Responder { + responder: crate::listener_select::listener_select_proto_no_header(io, std::iter::once(protocol).chain(protocols.into_iter())), + } + }, + } + } - // The dialer always sends the header and the first protocol - // proposal in one go for efficiency. - *this.state = SeqState::SendProtocol { io, protocol }; + SeqState::Responder { mut responder } => { + match Pin::new(&mut responder ).poll(cx) { + Poll::Ready(Ok((protocol, io))) => return Poll::Ready(Ok((protocol, SimOpenRole::Responder, io))), + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => { + *this.state = SeqState::Responder { responder }; + return Poll::Pending + } + } } SeqState::SendProtocol { mut io, protocol } => { @@ -198,7 +329,7 @@ where *this.state = SeqState::FlushProtocol { io, protocol } } else { match this.version { - Version::V1 => *this.state = SeqState::FlushProtocol { io, protocol }, + Version::V1 | Version::V1SimOpen => *this.state = SeqState::FlushProtocol { io, protocol }, // This is the only effect that `V1Lazy` has compared to `V1`: // Optimistically settling on the only protocol that // the dialer supports for this negotiation. Notably, @@ -207,7 +338,7 @@ where log::debug!("Dialer: Expecting proposed protocol: {}", p); let hl = HeaderLine::from(Version::V1Lazy); let io = Negotiated::expecting(io.into_reader(), p, Some(hl)); - return Poll::Ready(Ok((protocol, io))) + return Poll::Ready(Ok((protocol, SimOpenRole::Initiator, io))) } } } @@ -215,7 +346,9 @@ where SeqState::FlushProtocol { mut io, protocol } => { match Pin::new(&mut io).poll_flush(cx)? { - Poll::Ready(()) => *this.state = SeqState::AwaitProtocol { io, protocol }, + Poll::Ready(()) => { + *this.state = SeqState::AwaitProtocol { io, protocol } + } , Poll::Pending => { *this.state = SeqState::FlushProtocol { io, protocol }; return Poll::Pending @@ -236,14 +369,21 @@ where Poll::Ready(None) => return Poll::Ready(Err(NegotiationError::Failed)), }; + match msg { Message::Header(v) if v == HeaderLine::from(*this.version) => { *this.state = SeqState::AwaitProtocol { io, protocol }; } + Message::Protocol(p) if p == SIM_OPEN_ID => { + let selection = SimOpenPhase { + state: SimOpenState::SendNonce{ io }, + }; + *this.state = SeqState::SimOpenPhase { selection, protocol }; + } Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => { log::debug!("Dialer: Received confirmation for protocol: {}", p); let io = Negotiated::completed(io.into_inner()); - return Poll::Ready(Ok((protocol, io))); + return Poll::Ready(Ok((protocol, SimOpenRole::Initiator, io))); } Message::NotAvailable => { log::debug!("Dialer: Received rejection of protocol: {}", @@ -261,6 +401,168 @@ where } } +struct SimOpenPhase { + state: SimOpenState, +} + +enum SimOpenState { + SendNonce { io: MessageIO }, + FlushNonce { io: MessageIO, local_nonce: u64 }, + ReadNonce { io: MessageIO, local_nonce: u64 }, + SendRole { io: MessageIO, local_role: SimOpenRole }, + FlushRole { io: MessageIO, local_role: SimOpenRole }, + ReadRole { io: MessageIO, local_role: SimOpenRole }, + Done, +} + +pub enum SimOpenRole { + Initiator, + Responder, +} + +impl Future for SimOpenPhase +where + // The Unpin bound here is required because we produce a `Negotiated` as the output. + // It also makes the implementation considerably easier to write. + R: AsyncRead + AsyncWrite + Unpin, +{ + type Output = Result<(MessageIO, SimOpenRole), NegotiationError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + + loop { + match mem::replace(&mut self.state, SimOpenState::Done) { + SimOpenState::SendNonce { mut io } => { + match Pin::new(&mut io).poll_ready(cx)? { + Poll::Ready(()) => {}, + Poll::Pending => { + self.state = SimOpenState::SendNonce { io }; + return Poll::Pending + }, + } + + let local_nonce = rand::random(); + let msg = Message::Select(local_nonce); + if let Err(err) = Pin::new(&mut io).start_send(msg) { + return Poll::Ready(Err(From::from(err))); + } + + self.state = SimOpenState::FlushNonce { + io, + local_nonce, + }; + }, + SimOpenState::FlushNonce { mut io, local_nonce } => { + match Pin::new(&mut io).poll_flush(cx)? { + Poll::Ready(()) => self.state = SimOpenState::ReadNonce { + io, + local_nonce, + }, + Poll::Pending => { + self.state =SimOpenState::FlushNonce { io, local_nonce }; + return Poll::Pending + }, + } + }, + SimOpenState::ReadNonce { mut io, local_nonce } => { + let msg = match Pin::new(&mut io).poll_next(cx)? { + Poll::Ready(Some(msg)) => msg, + Poll::Pending => { + self.state = SimOpenState::ReadNonce { io, local_nonce }; + return Poll::Pending + } + // Treat EOF error as [`NegotiationError::Failed`], not as + // [`NegotiationError::ProtocolError`], allowing dropping or closing an I/O + // stream as a permissible way to "gracefully" fail a negotiation. + Poll::Ready(None) => return Poll::Ready(Err(NegotiationError::Failed)), + }; + + match msg { + // TODO: Document that this might still be the protocol send by the remote with + // the sim open ID. + Message::Protocol(_) => { + self.state = SimOpenState::ReadNonce { io, local_nonce }; + } + Message::Select(remote_nonce) => { + match local_nonce.cmp(&remote_nonce) { + Ordering::Equal => { + // Start over. + self.state = SimOpenState::SendNonce { io }; + }, + Ordering::Greater => { + self.state = SimOpenState::SendRole { + io, + local_role: SimOpenRole::Initiator, + }; + }, + Ordering::Less => { + self.state = SimOpenState::SendRole { + io, + local_role: SimOpenRole::Responder, + }; + } + } + } + _ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())), + } + }, + SimOpenState::SendRole { mut io, local_role } => { + match Pin::new(&mut io).poll_ready(cx)? { + Poll::Ready(()) => {}, + Poll::Pending => { + self.state = SimOpenState::SendRole { io, local_role }; + return Poll::Pending + }, + } + + let msg = match local_role { + SimOpenRole::Initiator => Message::Initiator, + SimOpenRole::Responder => Message::Responder, + }; + + if let Err(err) = Pin::new(&mut io).start_send(msg) { + return Poll::Ready(Err(From::from(err))); + } + + self.state = SimOpenState::FlushRole { io, local_role }; + }, + SimOpenState::FlushRole { mut io, local_role } => { + match Pin::new(&mut io).poll_flush(cx)? { + Poll::Ready(()) => self.state = SimOpenState::ReadRole { io, local_role }, + Poll::Pending => { + self.state =SimOpenState::FlushRole { io, local_role }; + return Poll::Pending + }, + } + }, + SimOpenState::ReadRole { mut io, local_role } => { + let remote_msg = match Pin::new(&mut io).poll_next(cx)? { + Poll::Ready(Some(msg)) => msg, + Poll::Pending => { + self.state = SimOpenState::ReadRole { io, local_role }; + return Poll::Pending + } + // Treat EOF error as [`NegotiationError::Failed`], not as + // [`NegotiationError::ProtocolError`], allowing dropping or closing an I/O + // stream as a permissible way to "gracefully" fail a negotiation. + Poll::Ready(None) => return Poll::Ready(Err(NegotiationError::Failed)), + }; + + let result = match local_role { + SimOpenRole::Initiator if remote_msg == Message::Responder => Ok((io, local_role)), + SimOpenRole::Responder if remote_msg == Message::Initiator => Ok((io, local_role)), + + _ => Err(ProtocolError::InvalidMessage.into()) + }; + + return Poll::Ready(result) + }, + SimOpenState::Done => panic!("SimOpenPhase::poll called after completion") + } + } + } +} + /// A `Future` returned by [`dialer_select_proto_parallel`] which negotiates /// a protocol selectively by considering all supported protocols of the remote /// "in parallel". @@ -288,7 +590,7 @@ where I: Iterator, I::Item: AsRef<[u8]> { - type Output = Result<(I::Item, Negotiated), NegotiationError>; + type Output = Result<(I::Item, SimOpenRole, Negotiated), NegotiationError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -386,7 +688,7 @@ where log::debug!("Dialer: Expecting proposed protocol: {}", p); let io = Negotiated::expecting(io.into_reader(), p, None); - return Poll::Ready(Ok((protocol, io))) + return Poll::Ready(Ok((protocol, SimOpenRole::Initiator, io))) } ParState::Done => panic!("ParState::poll called after completion") diff --git a/misc/multistream-select/src/lib.rs b/misc/multistream-select/src/lib.rs index 087b2a2cb21..837fbec6303 100644 --- a/misc/multistream-select/src/lib.rs +++ b/misc/multistream-select/src/lib.rs @@ -79,7 +79,7 @@ //! let socket = TcpStream::connect("127.0.0.1:10333").await.unwrap(); //! //! let protos = vec![b"/echo/1.0.0", b"/echo/2.5.0"]; -//! let (protocol, _io) = dialer_select_proto(socket, protos, Version::V1).await.unwrap(); +//! let (protocol, _, _io) = dialer_select_proto(socket, protos, Version::V1).await.unwrap(); //! //! println!("Negotiated protocol: {:?}", protocol); //! // You can now use `_io` to communicate with the remote. @@ -96,7 +96,7 @@ mod tests; pub use self::negotiated::{Negotiated, NegotiatedComplete, NegotiationError}; pub use self::protocol::ProtocolError; -pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture}; +pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture, SimOpenRole}; pub use self::listener_select::{listener_select_proto, ListenerSelectFuture}; /// Supported multistream-select versions. @@ -137,6 +137,7 @@ pub enum Version { /// [1]: https://github.com/multiformats/go-multistream/issues/20 /// [2]: https://github.com/libp2p/rust-libp2p/pull/1212 V1Lazy, + V1SimOpen, // Draft: https://github.com/libp2p/specs/pull/95 // V2, } @@ -145,4 +146,4 @@ impl Default for Version { fn default() -> Self { Version::V1 } -} \ No newline at end of file +} diff --git a/misc/multistream-select/src/listener_select.rs b/misc/multistream-select/src/listener_select.rs index 7cf07c5fb02..01347b04bef 100644 --- a/misc/multistream-select/src/listener_select.rs +++ b/misc/multistream-select/src/listener_select.rs @@ -39,6 +39,35 @@ pub fn listener_select_proto( inner: R, protocols: I, ) -> ListenerSelectFuture +where + R: AsyncRead + AsyncWrite, + I: IntoIterator, + I::Item: AsRef<[u8]> +{ + listener_select_proto_with_state(State::RecvHeader { + io: MessageIO::new(inner) + }, protocols) +} + +pub(crate) fn listener_select_proto_no_header( + io: MessageIO, + protocols: I, +) -> ListenerSelectFuture +where + R: AsyncRead + AsyncWrite, + I: IntoIterator, + I::Item: AsRef<[u8]> +{ + listener_select_proto_with_state( + State::RecvMessage { io }, + protocols, + ) +} + +fn listener_select_proto_with_state( + state: State, + protocols: I, +) -> ListenerSelectFuture where R: AsyncRead + AsyncWrite, I: IntoIterator, @@ -55,9 +84,7 @@ where }); ListenerSelectFuture { protocols: SmallVec::from_iter(protocols), - state: State::RecvHeader { - io: MessageIO::new(inner) - }, + state, last_sent_na: false, } } diff --git a/misc/multistream-select/src/protocol.rs b/misc/multistream-select/src/protocol.rs index 1d056de75ec..f9a9f4b49c1 100644 --- a/misc/multistream-select/src/protocol.rs +++ b/misc/multistream-select/src/protocol.rs @@ -30,7 +30,7 @@ use crate::length_delimited::{LengthDelimited, LengthDelimitedReader}; use bytes::{Bytes, BytesMut, BufMut}; use futures::{prelude::*, io::IoSlice, ready}; -use std::{convert::TryFrom, io, fmt, error::Error, pin::Pin, task::{Context, Poll}}; +use std::{convert::TryFrom, io, fmt, error::Error, pin::Pin, str::FromStr, task::{Context, Poll}}; use unsigned_varint as uvi; /// The maximum number of supported protocols that can be processed. @@ -42,6 +42,19 @@ const MSG_MULTISTREAM_1_0: &[u8] = b"/multistream/1.0.0\n"; const MSG_PROTOCOL_NA: &[u8] = b"na\n"; /// The encoded form of a multistream-select 'ls' message. const MSG_LS: &[u8] = b"ls\n"; +/// The encoded form of a 'select:' message of the multistream-select +/// simultaneous open protocol extension. +const MSG_SELECT: &[u8] = b"select:"; +/// The encoded form of a 'initiator' message of the multistream-select +/// simultaneous open protocol extension. +const MSG_INITIATOR: &[u8] = b"initiator\n"; +/// The encoded form of a 'responder' message of the multistream-select +/// simultaneous open protocol extension. +const MSG_RESPONDER: &[u8] = b"responder\n"; + +/// The identifier of the multistream-select simultaneous open protocol +/// extension. +pub(crate) const SIM_OPEN_ID: Protocol = Protocol(Bytes::from_static(b"/libp2p/simultaneous-connect")); /// The multistream-select header lines preceeding negotiation. /// @@ -55,7 +68,7 @@ pub enum HeaderLine { impl From for HeaderLine { fn from(v: Version) -> HeaderLine { match v { - Version::V1 | Version::V1Lazy => HeaderLine::V1, + Version::V1 | Version::V1Lazy | Version::V1SimOpen => HeaderLine::V1, } } } @@ -113,6 +126,9 @@ pub enum Message { Protocols(Vec), /// A message signaling that a requested protocol is not available. NotAvailable, + Select(u64), + Initiator, + Responder, } impl Message { @@ -154,6 +170,22 @@ impl Message { dest.put(MSG_PROTOCOL_NA); Ok(()) } + Message::Select(nonce) => { + dest.put(MSG_SELECT); + dest.put(nonce.to_string().as_ref()); + dest.put_u8(b'\n'); + Ok(()) + } + Message::Initiator => { + dest.reserve(MSG_INITIATOR.len()); + dest.put(MSG_INITIATOR); + Ok(()) + } + Message::Responder => { + dest.reserve(MSG_RESPONDER.len()); + dest.put(MSG_RESPONDER); + Ok(()) + } } } @@ -171,6 +203,26 @@ impl Message { return Ok(Message::ListProtocols) } + if msg.len() > MSG_SELECT.len() + 1 /* \n */ + && msg[.. MSG_SELECT.len()] == *MSG_SELECT + && msg.last() == Some(&b'\n') + { + if let Some(nonce) = std::str::from_utf8(&msg[MSG_SELECT.len() .. msg.len() -1]) + .ok() + .and_then(|s| u64::from_str(s).ok()) + { + return Ok(Message::Select(nonce)) + } + } + + if msg == MSG_INITIATOR { + return Ok(Message::Initiator) + } + + if msg == MSG_RESPONDER { + return Ok(Message::Responder) + } + // If it starts with a `/`, ends with a line feed without any // other line feeds in-between, it must be a protocol name. if msg.get(0) == Some(&b'/') && msg.last() == Some(&b'\n') && @@ -238,7 +290,7 @@ impl MessageIO { MessageReader { inner: self.inner.into_reader() } } - /// Drops the [`MessageIO`] resource, yielding the underlying I/O stream. + /// Draops the [`MessageIO`] resource, yielding the underlying I/O stream. /// /// # Panics /// diff --git a/misc/multistream-select/src/tests.rs b/misc/multistream-select/src/tests.rs index f03d1b1ff75..49bcba8a552 100644 --- a/misc/multistream-select/src/tests.rs +++ b/misc/multistream-select/src/tests.rs @@ -35,43 +35,41 @@ fn select_proto_basic() { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let listener_addr = listener.local_addr().unwrap(); - let server = async_std::task::spawn(async move { + let server = async move { let connec = listener.accept().await.unwrap().0; let protos = vec![b"/proto1", b"/proto2"]; let (proto, mut io) = listener_select_proto(connec, protos).await.unwrap(); assert_eq!(proto, b"/proto2"); - let mut out = vec![0; 32]; - let n = io.read(&mut out).await.unwrap(); - out.truncate(n); + let mut out = vec![0; 4]; + io.read_exact(&mut out).await.unwrap(); assert_eq!(out, b"ping"); io.write_all(b"pong").await.unwrap(); io.flush().await.unwrap(); - }); + }; - let client = async_std::task::spawn(async move { + let client = async move { let connec = TcpStream::connect(&listener_addr).await.unwrap(); let protos = vec![b"/proto3", b"/proto2"]; - let (proto, mut io) = dialer_select_proto(connec, protos.into_iter(), version) + let (proto, _, mut io) = dialer_select_proto(connec, protos.into_iter(), version) .await.unwrap(); assert_eq!(proto, b"/proto2"); io.write_all(b"ping").await.unwrap(); io.flush().await.unwrap(); - let mut out = vec![0; 32]; - let n = io.read(&mut out).await.unwrap(); - out.truncate(n); + let mut out = vec![0; 4]; + io.read_exact(&mut out).await.unwrap(); assert_eq!(out, b"pong"); - }); + }; - server.await; - client.await; + futures::future::join(server, client).await; } async_std::task::block_on(run(Version::V1)); async_std::task::block_on(run(Version::V1Lazy)); + async_std::task::block_on(run(Version::V1SimOpen)); } /// Tests the expected behaviour of failed negotiations. @@ -105,7 +103,7 @@ fn negotiation_failed() { let connec = TcpStream::connect(&listener_addr).await.unwrap(); let mut io = match dialer_select_proto(connec, dial_protos.into_iter(), version).await { Err(NegotiationError::Failed) => return, - Ok((_, io)) => io, + Ok((_, _, io)) => io, Err(_) => panic!() }; // The dialer may write a payload that is even sent before it @@ -165,7 +163,7 @@ fn negotiation_failed() { for (listen_protos, dial_protos) in protos { for dial_payload in payloads.clone() { - for &version in &[Version::V1, Version::V1Lazy] { + for &version in &[Version::V1, Version::V1Lazy, Version::V1SimOpen] { async_std::task::block_on(run(Test { version, listen_protos: listen_protos.clone(), @@ -194,7 +192,7 @@ fn select_proto_parallel() { let client = async_std::task::spawn(async move { let connec = TcpStream::connect(&listener_addr).await.unwrap(); let protos = vec![b"/proto3", b"/proto2"]; - let (proto, io) = dialer_select_proto_parallel(connec, protos.into_iter(), version) + let (proto, _, io) = dialer_select_proto_parallel(connec, protos.into_iter(), version) .await.unwrap(); assert_eq!(proto, b"/proto2"); io.complete().await.unwrap(); @@ -225,7 +223,7 @@ fn select_proto_serial() { let client = async_std::task::spawn(async move { let connec = TcpStream::connect(&listener_addr).await.unwrap(); let protos = vec![b"/proto3", b"/proto2"]; - let (proto, io) = dialer_select_proto_serial(connec, protos.into_iter(), version) + let (proto, _, io) = dialer_select_proto_serial(connec, protos.into_iter(), version) .await.unwrap(); assert_eq!(proto, b"/proto2"); io.complete().await.unwrap(); @@ -237,4 +235,34 @@ fn select_proto_serial() { async_std::task::block_on(run(Version::V1)); async_std::task::block_on(run(Version::V1Lazy)); + async_std::task::block_on(run(Version::V1SimOpen)); +} + +#[test] +fn simultaneous_open() { + async fn run(version: Version) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let listener_addr = listener.local_addr().unwrap(); + + let server = async move { + let connec = listener.accept().await.unwrap().0; + let protos = vec![b"/proto1", b"/proto2"]; + let (proto, _, io) = dialer_select_proto_serial(connec, protos, version).await.unwrap(); + assert_eq!(proto, b"/proto2"); + io.complete().await.unwrap(); + }; + + let client = async move { + let connec = TcpStream::connect(&listener_addr).await.unwrap(); + let protos = vec![b"/proto3", b"/proto2"]; + let (proto, _, io) = dialer_select_proto_serial(connec, protos.into_iter(), version) + .await.unwrap(); + assert_eq!(proto, b"/proto2"); + io.complete().await.unwrap(); + }; + + futures::future::join(server, client).await; + } + + futures::executor::block_on(run(Version::V1SimOpen)); } diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 653c3310ab4..647625788bd 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -30,7 +30,7 @@ use bytes::Bytes; use libp2p_core::{ StreamMuxer, muxing::StreamMuxerEvent, - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, + upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}, }; use parking_lot::Mutex; use futures::{prelude::*, future, ready}; @@ -67,7 +67,7 @@ where type Error = io::Error; type Future = future::Ready>; - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: C, _: Self::Info, _: SimOpenRole) -> Self::Future { future::ready(Ok(Multiplex { io: Mutex::new(io::Multiplexed::new(socket, self)) })) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 0edadb0ebb0..fbf7056ce3c 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -23,7 +23,7 @@ use futures::{future, prelude::*, ready, stream::{BoxStream, LocalBoxStream}}; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; use parking_lot::Mutex; use std::{fmt, io, iter, pin::Pin, task::{Context, Poll}}; use thiserror::Error; @@ -334,8 +334,12 @@ where type Error = io::Error; type Future = future::Ready>; - fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future { - let mode = self.mode.unwrap_or(yamux::Mode::Client); + fn upgrade_outbound(self, io: C, _: Self::Info, role: SimOpenRole) -> Self::Future { + let mode = match role { + SimOpenRole::Initiator => yamux::Mode::Client, + SimOpenRole::Responder => yamux::Mode::Server, + }; + let mode = self.mode.unwrap_or(mode); future::ready(Ok(Yamux::new(io, self.inner, mode))) } } @@ -348,9 +352,13 @@ where type Error = io::Error; type Future = future::Ready>; - fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, io: C, _: Self::Info, role: SimOpenRole) -> Self::Future { + let mode = match role { + SimOpenRole::Initiator => yamux::Mode::Client, + SimOpenRole::Responder => yamux::Mode::Server, + }; let cfg = self.0; - let mode = cfg.mode.unwrap_or(yamux::Mode::Client); + let mode = cfg.mode.unwrap_or(mode); future::ready(Ok(Yamux::local(io, cfg.inner, mode))) } } diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index bd0b5b2a646..36375bd8ca8 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -20,7 +20,7 @@ use crate::rpc_proto; use crate::topic::Topic; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, PeerId, upgrade}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo, PeerId, upgrade}; use prost::Message; use std::{error, fmt, io, iter, pin::Pin}; use futures::{Future, io::{AsyncRead, AsyncWrite}}; @@ -163,7 +163,7 @@ where type Error = io::Error; type Future = Pin> + Send>>; - fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info, _: SimOpenRole) -> Self::Future { Box::pin(async move { let bytes = self.into_bytes(); upgrade::write_one(&mut socket, bytes).await?; diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index ac2584107dd..d094d662c57 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -34,7 +34,7 @@ use futures::future; use futures::prelude::*; use asynchronous_codec::{Decoder, Encoder, Framed}; use libp2p_core::{ - identity::PublicKey, InboundUpgrade, OutboundUpgrade, PeerId, ProtocolName, UpgradeInfo, + identity::PublicKey, InboundUpgrade, OutboundUpgrade, PeerId, ProtocolName, SimOpenRole, UpgradeInfo, }; use log::{debug, warn}; use prost::Message as ProtobufMessage; @@ -152,7 +152,7 @@ where type Error = GossipsubHandlerError; type Future = Pin> + Send>>; - fn upgrade_outbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: TSocket, protocol_id: Self::Info, _: SimOpenRole) -> Self::Future { let mut length_codec = codec::UviBytes::default(); length_codec.set_max_len(self.max_transmit_size); Box::pin(future::ok(( diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 2c73a1d627f..745d0e618b1 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -23,7 +23,7 @@ use futures::prelude::*; use libp2p_core::{ Multiaddr, PublicKey, - upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeInfo} + upgrade::{self, InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo} }; use log::{debug, trace}; use prost::Message; @@ -122,7 +122,7 @@ where type Error = io::Error; type Future = Pin> + Send>>; - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: C, _: Self::Info, _: SimOpenRole) -> Self::Future { recv(socket).boxed() } } @@ -157,7 +157,7 @@ where type Error = io::Error; type Future = Pin> + Send>>; - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: C, _: Self::Info, _: SimOpenRole) -> Self::Future { send(socket, self.0.0).boxed() } } diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index 27eab8017c2..d4e1661ece9 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -33,7 +33,7 @@ use crate::record::{self, Record}; use futures::prelude::*; use asynchronous_codec::Framed; use libp2p_core::{Multiaddr, PeerId}; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; use prost::Message; use std::{borrow::Cow, convert::TryFrom, time::Duration}; use std::{io, iter}; @@ -224,7 +224,7 @@ where type Future = future::Ready>; type Error = io::Error; - fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, incoming: C, _: Self::Info, _: SimOpenRole) -> Self::Future { let mut codec = UviBytes::default(); codec.set_max_len(self.max_packet_size); diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs index aa63833f651..0940be9269a 100644 --- a/protocols/ping/src/protocol.rs +++ b/protocols/ping/src/protocol.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use futures::prelude::*; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; use libp2p_swarm::NegotiatedSubstream; use rand::{distributions, prelude::*}; use std::{io, iter, time::Duration}; @@ -74,7 +74,7 @@ impl OutboundUpgrade for Ping { type Error = Void; type Future = future::Ready>; - fn upgrade_outbound(self, stream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, stream: NegotiatedSubstream, _: Self::Info, _: SimOpenRole) -> Self::Future { future::ok(stream) } } diff --git a/protocols/relay/src/protocol/outgoing_dst_req.rs b/protocols/relay/src/protocol/outgoing_dst_req.rs index 0a257da3bab..671cab0950a 100644 --- a/protocols/relay/src/protocol/outgoing_dst_req.rs +++ b/protocols/relay/src/protocol/outgoing_dst_req.rs @@ -24,7 +24,7 @@ use asynchronous_codec::{Framed, FramedParts}; use bytes::Bytes; use futures::future::BoxFuture; use futures::prelude::*; -use libp2p_core::{upgrade, Multiaddr, PeerId}; +use libp2p_core::{upgrade, Multiaddr, PeerId, SimOpenRole}; use libp2p_swarm::NegotiatedSubstream; use prost::Message; use std::{fmt, error, iter}; @@ -88,7 +88,7 @@ impl upgrade::OutboundUpgrade for OutgoingDstReq { type Error = OutgoingDstReqError; type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info, _: SimOpenRole) -> Self::Future { let mut codec = UviBytes::default(); codec.set_max_len(MAX_ACCEPTED_MESSAGE_LEN); diff --git a/protocols/relay/src/protocol/outgoing_relay_req.rs b/protocols/relay/src/protocol/outgoing_relay_req.rs index 8f6ed8b3aa3..8526822b1b4 100644 --- a/protocols/relay/src/protocol/outgoing_relay_req.rs +++ b/protocols/relay/src/protocol/outgoing_relay_req.rs @@ -24,7 +24,7 @@ use asynchronous_codec::{Framed, FramedParts}; use futures::channel::oneshot; use futures::future::BoxFuture; use futures::prelude::*; -use libp2p_core::{upgrade, Multiaddr, PeerId}; +use libp2p_core::{upgrade, Multiaddr, PeerId, SimOpenRole}; use libp2p_swarm::NegotiatedSubstream; use prost::Message; use std::{error, fmt, iter}; @@ -72,7 +72,7 @@ impl upgrade::OutboundUpgrade for OutgoingRelayReq { type Error = OutgoingRelayReqError; type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info, _: SimOpenRole) -> Self::Future { let OutgoingRelayReq { src_id, dst_id, diff --git a/protocols/request-response/src/handler/protocol.rs b/protocols/request-response/src/handler/protocol.rs index 404fbd5c436..0d0f21a9925 100644 --- a/protocols/request-response/src/handler/protocol.rs +++ b/protocols/request-response/src/handler/protocol.rs @@ -27,7 +27,7 @@ use crate::RequestId; use crate::codec::RequestResponseCodec; use futures::{channel::oneshot, future::BoxFuture, prelude::*}; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; use libp2p_swarm::NegotiatedSubstream; use smallvec::SmallVec; use std::io; @@ -158,7 +158,7 @@ where type Error = io::Error; type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future { + fn upgrade_outbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info, _: SimOpenRole) -> Self::Future { async move { let write = self.codec.write_request(&protocol, &mut io, self.request); write.await?; diff --git a/src/simple.rs b/src/simple.rs index fb4d3b735d2..5adfdee3851 100644 --- a/src/simple.rs +++ b/src/simple.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use crate::core::upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; use bytes::Bytes; use futures::prelude::*; use std::{iter, sync::Arc}; @@ -89,7 +89,7 @@ where type Error = E; type Future = O; - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: C, _: Self::Info, _: SimOpenRole) -> Self::Future { let upgrade = &self.upgrade; upgrade(socket) } diff --git a/swarm/src/protocols_handler/multi.rs b/swarm/src/protocols_handler/multi.rs index f23de96c30e..ab34c664867 100644 --- a/swarm/src/protocols_handler/multi.rs +++ b/swarm/src/protocols_handler/multi.rs @@ -37,7 +37,7 @@ use crate::upgrade::{ }; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{ConnectedPoint, Multiaddr, PeerId}; -use libp2p_core::upgrade::{ProtocolName, UpgradeError, NegotiationError, ProtocolError}; +use libp2p_core::upgrade::{ProtocolName, UpgradeError, NegotiationError, ProtocolError, SimOpenRole}; use rand::Rng; use std::{ cmp, @@ -437,10 +437,10 @@ where type Error = (K, ::Error); type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(mut self, resource: NegotiatedSubstream, info: Self::Info) -> Self::Future { + fn upgrade_outbound(mut self, resource: NegotiatedSubstream, info: Self::Info, role: SimOpenRole) -> Self::Future { let IndexedProtoName(index, info) = info; let (key, upgrade) = self.upgrades.remove(index); - upgrade.upgrade_outbound(resource, info) + upgrade.upgrade_outbound(resource, info, role) .map(move |out| { match out { Ok(o) => Ok((key, o)), diff --git a/swarm/src/upgrade.rs b/swarm/src/upgrade.rs index e544ac3f27f..8e28f4fac3d 100644 --- a/swarm/src/upgrade.rs +++ b/swarm/src/upgrade.rs @@ -21,7 +21,7 @@ use crate::NegotiatedSubstream; use futures::prelude::*; -use libp2p_core::upgrade; +use libp2p_core::upgrade::{self, SimOpenRole}; /// Implemented automatically on all types that implement [`UpgradeInfo`](upgrade::UpgradeInfo) /// and `Send + 'static`. @@ -66,7 +66,7 @@ pub trait OutboundUpgradeSend: UpgradeInfoSend { type Future: Future> + Send + 'static; /// Equivalent to [`OutboundUpgrade::upgrade_outbound`](upgrade::OutboundUpgrade::upgrade_outbound). - fn upgrade_outbound(self, socket: NegotiatedSubstream, info: Self::Info) -> Self::Future; + fn upgrade_outbound(self, socket: NegotiatedSubstream, info: Self::Info, role: SimOpenRole) -> Self::Future; } impl OutboundUpgradeSend for T @@ -81,8 +81,8 @@ where type Error = T::Error; type Future = T::Future; - fn upgrade_outbound(self, socket: NegotiatedSubstream, info: TInfo) -> Self::Future { - upgrade::OutboundUpgrade::upgrade_outbound(self, socket, info) + fn upgrade_outbound(self, socket: NegotiatedSubstream, info: TInfo, role: SimOpenRole) -> Self::Future { + upgrade::OutboundUpgrade::upgrade_outbound(self, socket, info, role) } } @@ -142,8 +142,8 @@ impl upgrade::OutboundUpgrade for S type Error = T::Error; type Future = T::Future; - fn upgrade_outbound(self, socket: NegotiatedSubstream, info: T::Info) -> Self::Future { - OutboundUpgradeSend::upgrade_outbound(self.0, socket, info) + fn upgrade_outbound(self, socket: NegotiatedSubstream, info: T::Info, role: SimOpenRole) -> Self::Future { + OutboundUpgradeSend::upgrade_outbound(self.0, socket, info, role) } } diff --git a/transports/deflate/src/lib.rs b/transports/deflate/src/lib.rs index d93e6ed2e39..039619f87b6 100644 --- a/transports/deflate/src/lib.rs +++ b/transports/deflate/src/lib.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use futures::{prelude::*, ready}; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; use std::{io, iter, pin::Pin, task::Context, task::Poll}; #[derive(Debug, Copy, Clone)] @@ -65,7 +65,7 @@ where type Error = io::Error; type Future = future::Ready>; - fn upgrade_outbound(self, w: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, w: C, _: Self::Info, _: SimOpenRole) -> Self::Future { future::ok(DeflateOutput::new(w, self.compression)) } } diff --git a/transports/noise/src/lib.rs b/transports/noise/src/lib.rs index 6b09f636cd1..854838d645c 100644 --- a/transports/noise/src/lib.rs +++ b/transports/noise/src/lib.rs @@ -67,7 +67,7 @@ pub use protocol::{Protocol, ProtocolParams, IX, IK, XX}; pub use protocol::{x25519::X25519, x25519_spec::X25519Spec}; use futures::prelude::*; -use libp2p_core::{identity, PeerId, UpgradeInfo, InboundUpgrade, OutboundUpgrade}; +use libp2p_core::{identity, PeerId, UpgradeInfo, InboundUpgrade, OutboundUpgrade, SimOpenRole}; use std::pin::Pin; use zeroize::Zeroize; @@ -203,15 +203,35 @@ where type Error = NoiseError; type Future = Handshake; - fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future { - let session = self.params.into_builder() - .local_private_key(self.dh_keys.secret().as_ref()) - .build_initiator() - .map_err(NoiseError::from); - handshake::rt1_initiator(socket, session, - self.dh_keys.into_identity(), - IdentityExchange::Mutual, - self.legacy) + fn upgrade_outbound(self, socket: T, _: Self::Info, role: SimOpenRole) -> Self::Future { + match role { + SimOpenRole::Initiator => { + let session = self.params.into_builder() + .local_private_key(self.dh_keys.secret().as_ref()) + .build_initiator() + .map_err(NoiseError::from); + handshake::rt1_initiator( + socket, + session, + self.dh_keys.into_identity(), + IdentityExchange::Mutual, + self.legacy, + ) + } + SimOpenRole::Responder => { + let session = self.params.into_builder() + .local_private_key(self.dh_keys.secret().as_ref()) + .build_responder() + .map_err(NoiseError::from); + handshake::rt1_responder( + socket, + session, + self.dh_keys.into_identity(), + IdentityExchange::Mutual, + self.legacy, + ) + } + } } } @@ -249,15 +269,35 @@ where type Error = NoiseError; type Future = Handshake; - fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future { - let session = self.params.into_builder() - .local_private_key(self.dh_keys.secret().as_ref()) - .build_initiator() - .map_err(NoiseError::from); - handshake::rt15_initiator(socket, session, - self.dh_keys.into_identity(), - IdentityExchange::Mutual, - self.legacy) + fn upgrade_outbound(self, socket: T, _: Self::Info, role: SimOpenRole) -> Self::Future { + match role { + SimOpenRole::Initiator => { + let session = self.params.into_builder() + .local_private_key(self.dh_keys.secret().as_ref()) + .build_initiator() + .map_err(NoiseError::from); + handshake::rt15_initiator( + socket, + session, + self.dh_keys.into_identity(), + IdentityExchange::Mutual, + self.legacy, + ) + } + SimOpenRole::Responder => { + let session = self.params.into_builder() + .local_private_key(self.dh_keys.secret().as_ref()) + .build_responder() + .map_err(NoiseError::from); + handshake::rt15_responder( + socket, + session, + self.dh_keys.into_identity(), + IdentityExchange::Mutual, + self.legacy, + ) + } + } } } @@ -295,16 +335,36 @@ where type Error = NoiseError; type Future = Handshake; - fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future { - let session = self.params.into_builder() - .local_private_key(self.dh_keys.secret().as_ref()) - .remote_public_key(self.remote.0.as_ref()) - .build_initiator() - .map_err(NoiseError::from); - handshake::rt1_initiator(socket, session, - self.dh_keys.into_identity(), - IdentityExchange::Send { remote: self.remote.1 }, - self.legacy) + fn upgrade_outbound(self, socket: T, _: Self::Info, role: SimOpenRole) -> Self::Future { + match role { + SimOpenRole::Initiator => { + let session = self.params.into_builder() + .local_private_key(self.dh_keys.secret().as_ref()) + .remote_public_key(self.remote.0.as_ref()) + .build_initiator() + .map_err(NoiseError::from); + handshake::rt1_initiator( + socket, + session, + self.dh_keys.into_identity(), + IdentityExchange::Send { remote: self.remote.1 }, + self.legacy, + ) + } + SimOpenRole::Responder => { + let session = self.params.into_builder() + .local_private_key(self.dh_keys.secret().as_ref()) + .build_responder() + .map_err(NoiseError::from); + handshake::rt1_responder( + socket, + session, + self.dh_keys.into_identity(), + IdentityExchange::Receive, + self.legacy, + ) + } + } } } @@ -374,8 +434,8 @@ where type Error = NoiseError; type Future = Pin> + Send>>; - fn upgrade_outbound(self, socket: T, info: Self::Info) -> Self::Future { - Box::pin(self.config.upgrade_outbound(socket, info) + fn upgrade_outbound(self, socket: T, info: Self::Info, role: SimOpenRole) -> Self::Future { + Box::pin(self.config.upgrade_outbound(socket, info, role) .and_then(|(remote, io)| match remote { RemoteIdentity::IdentityKey(pk) => future::ok((pk.into_peer_id(), io)), _ => future::err(NoiseError::AuthenticationFailed) diff --git a/transports/plaintext/src/lib.rs b/transports/plaintext/src/lib.rs index 0f3a4c9e585..1de49d8c075 100644 --- a/transports/plaintext/src/lib.rs +++ b/transports/plaintext/src/lib.rs @@ -28,6 +28,7 @@ use libp2p_core::{ identity, InboundUpgrade, OutboundUpgrade, + SimOpenRole, UpgradeInfo, PeerId, PublicKey, @@ -96,7 +97,7 @@ impl OutboundUpgrade for PlainText1Config { type Error = Void; type Future = Ready>; - fn upgrade_outbound(self, i: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, i: C, _: Self::Info, _: SimOpenRole) -> Self::Future { future::ready(Ok(i)) } } @@ -138,7 +139,7 @@ where type Error = PlainTextError; type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: C, _: Self::Info, _: SimOpenRole) -> Self::Future { Box::pin(self.handshake(socket)) } }