From 8ecd5f4905b13ce56bccde68a26b69c38b8a8a58 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 4 May 2021 22:37:35 +0200 Subject: [PATCH 1/6] misc/multistream-select: Implement simultaneous open extension From the multistream-select 1.0 simultaneous open protocol extension specification: > In order to support direct connections through NATs with hole punching, we need to account for simultaneous open. In such cases, there is no single initiator and responder, but instead both peers act as initiators. This breaks protocol negotiation in multistream-select, which assumes a single initator. > This draft proposes a simple extension to the multistream protocol negotiation in order to select a single initator when both peers are acting as such. See https://github.com/libp2p/specs/pull/196/ for details. This commit implements the above specification, available via `Version::V1SimOpen`. --- misc/multistream-select/Cargo.toml | 1 + misc/multistream-select/src/dialer_select.rs | 331 +++++++++++++++++- misc/multistream-select/src/lib.rs | 1 + .../multistream-select/src/listener_select.rs | 33 +- misc/multistream-select/src/protocol.rs | 58 ++- misc/multistream-select/src/tests.rs | 54 ++- 6 files changed, 444 insertions(+), 34 deletions(-) diff --git a/misc/multistream-select/Cargo.toml b/misc/multistream-select/Cargo.toml index 7c86be9e6ac..ef689d5e7ee 100644 --- a/misc/multistream-select/Cargo.toml +++ b/misc/multistream-select/Cargo.toml @@ -14,6 +14,7 @@ bytes = "1" futures = "0.3" log = "0.4" pin-project = "1.0.0" +rand = "0.7" smallvec = "1.6.1" unsigned-varint = "0.7" diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index 482d31a56b3..6d80412e87b 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -21,10 +21,11 @@ //! Protocol negotiation strategies for the peer acting as the dialer. use crate::{Negotiated, NegotiationError, Version}; -use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, HeaderLine}; +use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, HeaderLine, SIM_OPEN_ID}; use futures::{future::Either, prelude::*}; -use std::{convert::TryFrom as _, iter, mem, pin::Pin, task::{Context, Poll}}; +use std::{cmp::Ordering, convert::TryFrom as _, iter, mem, pin::Pin, task::{Context, Poll}}; + /// Returns a `Future` that negotiates a protocol on the given I/O stream /// for a peer acting as the _dialer_ (or _initiator_). @@ -56,11 +57,18 @@ where I::Item: AsRef<[u8]> { let iter = protocols.into_iter(); - // We choose between the "serial" and "parallel" strategies based on the number of protocols. - if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) { - Either::Left(dialer_select_proto_serial(inner, iter, version)) - } else { - Either::Right(dialer_select_proto_parallel(inner, iter, version)) + match version { + Version::V1 | Version::V1Lazy => { + // We choose between the "serial" and "parallel" strategies based on the number of protocols. + if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) { + Either::Left(dialer_select_proto_serial(inner, iter, version)) + } else { + Either::Right(dialer_select_proto_parallel(inner, iter, version)) + } + }, + Version::V1SimOpen => { + Either::Left(dialer_select_proto_serial(inner, iter, version)) + } } } @@ -145,7 +153,16 @@ where R: AsyncRead + AsyncWrite, N: AsRef<[u8]> { - SendHeader { io: MessageIO, }, + SendHeader { io: MessageIO }, + + // Simultaneous open protocol extension + SendSimOpen { io: MessageIO, protocol: Option }, + FlushSimOpen { io: MessageIO, protocol: N }, + AwaitSimOpen { io: MessageIO, protocol: N }, + SimOpenPhase { selection: SimOpenPhase, protocol: N }, + Responder { responder: crate::ListenerSelectFuture }, + + // Standard multistream-select protocol SendProtocol { io: MessageIO, protocol: N }, FlushProtocol { io: MessageIO, protocol: N }, AwaitProtocol { io: MessageIO, protocol: N }, @@ -158,7 +175,8 @@ where // It also makes the implementation considerably easier to write. R: AsyncRead + AsyncWrite + Unpin, I: Iterator, - I::Item: AsRef<[u8]> + // TODO: Clone needed to embed ListenerSelectFuture. Still needed? + I::Item: AsRef<[u8]> + Clone { type Output = Result<(I::Item, Negotiated), NegotiationError>; @@ -181,11 +199,123 @@ where return Poll::Ready(Err(From::from(err))); } - let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?; + match this.version { + Version::V1 | Version::V1Lazy => { + let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?; + + // The dialer always sends the header and the first protocol + // proposal in one go for efficiency. + *this.state = SeqState::SendProtocol { io, protocol }; + } + Version::V1SimOpen => { + *this.state = SeqState::SendSimOpen { io, protocol: None }; + } + } + } + + SeqState::SendSimOpen { mut io, protocol } => { + match Pin::new(&mut io).poll_ready(cx)? { + Poll::Ready(()) => {}, + Poll::Pending => { + *this.state = SeqState::SendSimOpen { io, protocol }; + return Poll::Pending + }, + } + + match protocol { + None => { + let msg = Message::Protocol(SIM_OPEN_ID); + if let Err(err) = Pin::new(&mut io).start_send(msg) { + return Poll::Ready(Err(From::from(err))); + } + + let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?; + *this.state = SeqState::SendSimOpen { io, protocol: Some(protocol) }; + } + Some(protocol) => { + let p = Protocol::try_from(protocol.as_ref())?; + if let Err(err) = Pin::new(&mut io).start_send(Message::Protocol(p.clone())) { + return Poll::Ready(Err(From::from(err))); + } + log::debug!("Dialer: Proposed protocol: {}", p); + + *this.state = SeqState::FlushSimOpen { io, protocol } + } + } + } + + SeqState::FlushSimOpen { mut io, protocol } => { + match Pin::new(&mut io).poll_flush(cx)? { + Poll::Ready(()) => { + *this.state = SeqState::AwaitSimOpen { io, protocol } + }, + Poll::Pending => { + *this.state = SeqState::FlushSimOpen { io, protocol }; + return Poll::Pending + }, + } + } + + SeqState::AwaitSimOpen { mut io, protocol } => { + let msg = match Pin::new(&mut io).poll_next(cx)? { + Poll::Ready(Some(msg)) => msg, + Poll::Pending => { + *this.state = SeqState::AwaitSimOpen { io, protocol }; + return Poll::Pending + } + // Treat EOF error as [`NegotiationError::Failed`], not as + // [`NegotiationError::ProtocolError`], allowing dropping or closing an I/O + // stream as a permissible way to "gracefully" fail a negotiation. + Poll::Ready(None) => return Poll::Ready(Err(NegotiationError::Failed)), + }; + + match msg { + Message::Header(v) if v == HeaderLine::from(*this.version) => { + *this.state = SeqState::AwaitSimOpen { io, protocol }; + } + Message::Protocol(p) if p == SIM_OPEN_ID => { + let selection = SimOpenPhase { + state: SimOpenState::SendNonce{ io }, + }; + *this.state = SeqState::SimOpenPhase { selection, protocol }; + } + Message::NotAvailable => { + *this.state = SeqState::AwaitProtocol { io, protocol } + } + _ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())) + } + } + + SeqState::SimOpenPhase { mut selection, protocol } => { + let (io, selection_res) = match Pin::new(&mut selection).poll(cx)? { + Poll::Ready((io, res)) => (io, res), + Poll::Pending => { + *this.state = SeqState::SimOpenPhase { selection, protocol }; + return Poll::Pending + } + }; + + match selection_res { + SimOpenRole::Initiator => { + *this.state = SeqState::SendProtocol { io, protocol }; + } + SimOpenRole::Responder => { + let protocols: Vec<_> = this.protocols.collect(); + *this.state = SeqState::Responder { + responder: crate::listener_select::listener_select_proto_no_header(io, std::iter::once(protocol).chain(protocols.into_iter())), + } + }, + } + } - // The dialer always sends the header and the first protocol - // proposal in one go for efficiency. - *this.state = SeqState::SendProtocol { io, protocol }; + SeqState::Responder { mut responder } => { + match Pin::new(&mut responder ).poll(cx) { + Poll::Ready(res) => return Poll::Ready(res), + Poll::Pending => { + *this.state = SeqState::Responder { responder }; + return Poll::Pending + } + } } SeqState::SendProtocol { mut io, protocol } => { @@ -207,7 +337,7 @@ where *this.state = SeqState::FlushProtocol { io, protocol } } else { match this.version { - Version::V1 => *this.state = SeqState::FlushProtocol { io, protocol }, + Version::V1 | Version::V1SimOpen => *this.state = SeqState::FlushProtocol { io, protocol }, // This is the only effect that `V1Lazy` has compared to `V1`: // Optimistically settling on the only protocol that // the dialer supports for this negotiation. Notably, @@ -224,7 +354,9 @@ where SeqState::FlushProtocol { mut io, protocol } => { match Pin::new(&mut io).poll_flush(cx)? { - Poll::Ready(()) => *this.state = SeqState::AwaitProtocol { io, protocol }, + Poll::Ready(()) => { + *this.state = SeqState::AwaitProtocol { io, protocol } + } , Poll::Pending => { *this.state = SeqState::FlushProtocol { io, protocol }; return Poll::Pending @@ -245,10 +377,17 @@ where Poll::Ready(None) => return Poll::Ready(Err(NegotiationError::Failed)), }; + match msg { Message::Header(v) if v == HeaderLine::from(*this.version) => { *this.state = SeqState::AwaitProtocol { io, protocol }; } + Message::Protocol(p) if p == SIM_OPEN_ID => { + let selection = SimOpenPhase { + state: SimOpenState::SendNonce{ io }, + }; + *this.state = SeqState::SimOpenPhase { selection, protocol }; + } Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => { log::debug!("Dialer: Received confirmation for protocol: {}", p); let io = Negotiated::completed(io.into_inner()); @@ -270,6 +409,168 @@ where } } +struct SimOpenPhase { + state: SimOpenState, +} + +enum SimOpenState { + SendNonce { io: MessageIO }, + FlushNonce { io: MessageIO, local_nonce: u64 }, + ReadNonce { io: MessageIO, local_nonce: u64 }, + SendRole { io: MessageIO, local_role: SimOpenRole }, + FlushRole { io: MessageIO, local_role: SimOpenRole }, + ReadRole { io: MessageIO, local_role: SimOpenRole }, + Done, +} + +enum SimOpenRole { + Initiator, + Responder, +} + +impl Future for SimOpenPhase +where + // The Unpin bound here is required because we produce a `Negotiated` as the output. + // It also makes the implementation considerably easier to write. + R: AsyncRead + AsyncWrite + Unpin, +{ + type Output = Result<(MessageIO, SimOpenRole), NegotiationError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + + loop { + match mem::replace(&mut self.state, SimOpenState::Done) { + SimOpenState::SendNonce { mut io } => { + match Pin::new(&mut io).poll_ready(cx)? { + Poll::Ready(()) => {}, + Poll::Pending => { + self.state = SimOpenState::SendNonce { io }; + return Poll::Pending + }, + } + + let local_nonce = rand::random(); + let msg = Message::Select(local_nonce); + if let Err(err) = Pin::new(&mut io).start_send(msg) { + return Poll::Ready(Err(From::from(err))); + } + + self.state = SimOpenState::FlushNonce { + io, + local_nonce, + }; + }, + SimOpenState::FlushNonce { mut io, local_nonce } => { + match Pin::new(&mut io).poll_flush(cx)? { + Poll::Ready(()) => self.state = SimOpenState::ReadNonce { + io, + local_nonce, + }, + Poll::Pending => { + self.state =SimOpenState::FlushNonce { io, local_nonce }; + return Poll::Pending + }, + } + }, + SimOpenState::ReadNonce { mut io, local_nonce } => { + let msg = match Pin::new(&mut io).poll_next(cx)? { + Poll::Ready(Some(msg)) => msg, + Poll::Pending => { + self.state = SimOpenState::ReadNonce { io, local_nonce }; + return Poll::Pending + } + // Treat EOF error as [`NegotiationError::Failed`], not as + // [`NegotiationError::ProtocolError`], allowing dropping or closing an I/O + // stream as a permissible way to "gracefully" fail a negotiation. + Poll::Ready(None) => return Poll::Ready(Err(NegotiationError::Failed)), + }; + + match msg { + // TODO: Document that this might still be the protocol send by the remote with + // the sim open ID. + Message::Protocol(_) => { + self.state = SimOpenState::ReadNonce { io, local_nonce }; + } + Message::Select(remote_nonce) => { + match local_nonce.cmp(&remote_nonce) { + Ordering::Equal => { + // Start over. + self.state = SimOpenState::SendNonce { io }; + }, + Ordering::Greater => { + self.state = SimOpenState::SendRole { + io, + local_role: SimOpenRole::Initiator, + }; + }, + Ordering::Less => { + self.state = SimOpenState::SendRole { + io, + local_role: SimOpenRole::Responder, + }; + } + } + } + _ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())), + } + }, + SimOpenState::SendRole { mut io, local_role } => { + match Pin::new(&mut io).poll_ready(cx)? { + Poll::Ready(()) => {}, + Poll::Pending => { + self.state = SimOpenState::SendRole { io, local_role }; + return Poll::Pending + }, + } + + let msg = match local_role { + SimOpenRole::Initiator => Message::Initiator, + SimOpenRole::Responder => Message::Responder, + }; + + if let Err(err) = Pin::new(&mut io).start_send(msg) { + return Poll::Ready(Err(From::from(err))); + } + + self.state = SimOpenState::FlushRole { io, local_role }; + }, + SimOpenState::FlushRole { mut io, local_role } => { + match Pin::new(&mut io).poll_flush(cx)? { + Poll::Ready(()) => self.state = SimOpenState::ReadRole { io, local_role }, + Poll::Pending => { + self.state =SimOpenState::FlushRole { io, local_role }; + return Poll::Pending + }, + } + }, + SimOpenState::ReadRole { mut io, local_role } => { + let remote_msg = match Pin::new(&mut io).poll_next(cx)? { + Poll::Ready(Some(msg)) => msg, + Poll::Pending => { + self.state = SimOpenState::ReadRole { io, local_role }; + return Poll::Pending + } + // Treat EOF error as [`NegotiationError::Failed`], not as + // [`NegotiationError::ProtocolError`], allowing dropping or closing an I/O + // stream as a permissible way to "gracefully" fail a negotiation. + Poll::Ready(None) => return Poll::Ready(Err(NegotiationError::Failed)), + }; + + let result = match local_role { + SimOpenRole::Initiator if remote_msg == Message::Responder => Ok((io, local_role)), + SimOpenRole::Responder if remote_msg == Message::Initiator => Ok((io, local_role)), + + _ => Err(ProtocolError::InvalidMessage.into()) + }; + + return Poll::Ready(result) + }, + SimOpenState::Done => panic!("SimOpenPhase::poll called after completion") + } + } + } +} + /// A `Future` returned by [`dialer_select_proto_parallel`] which negotiates /// a protocol selectively by considering all supported protocols of the remote /// "in parallel". diff --git a/misc/multistream-select/src/lib.rs b/misc/multistream-select/src/lib.rs index 087b2a2cb21..29bc37914b0 100644 --- a/misc/multistream-select/src/lib.rs +++ b/misc/multistream-select/src/lib.rs @@ -137,6 +137,7 @@ pub enum Version { /// [1]: https://github.com/multiformats/go-multistream/issues/20 /// [2]: https://github.com/libp2p/rust-libp2p/pull/1212 V1Lazy, + V1SimOpen, // Draft: https://github.com/libp2p/specs/pull/95 // V2, } diff --git a/misc/multistream-select/src/listener_select.rs b/misc/multistream-select/src/listener_select.rs index 70463fa1cfe..0fd30f22417 100644 --- a/misc/multistream-select/src/listener_select.rs +++ b/misc/multistream-select/src/listener_select.rs @@ -39,6 +39,35 @@ pub fn listener_select_proto( inner: R, protocols: I, ) -> ListenerSelectFuture +where + R: AsyncRead + AsyncWrite, + I: IntoIterator, + I::Item: AsRef<[u8]> +{ + listener_select_proto_with_state(State::RecvHeader { + io: MessageIO::new(inner) + }, protocols) +} + +pub(crate) fn listener_select_proto_no_header( + io: MessageIO, + protocols: I, +) -> ListenerSelectFuture +where + R: AsyncRead + AsyncWrite, + I: IntoIterator, + I::Item: AsRef<[u8]> +{ + listener_select_proto_with_state( + State::RecvMessage { io }, + protocols, + ) +} + +fn listener_select_proto_with_state( + state: State, + protocols: I, +) -> ListenerSelectFuture where R: AsyncRead + AsyncWrite, I: IntoIterator, @@ -55,9 +84,7 @@ where }); ListenerSelectFuture { protocols: SmallVec::from_iter(protocols), - state: State::RecvHeader { - io: MessageIO::new(inner) - }, + state, last_sent_na: false, } } diff --git a/misc/multistream-select/src/protocol.rs b/misc/multistream-select/src/protocol.rs index 1d056de75ec..f9a9f4b49c1 100644 --- a/misc/multistream-select/src/protocol.rs +++ b/misc/multistream-select/src/protocol.rs @@ -30,7 +30,7 @@ use crate::length_delimited::{LengthDelimited, LengthDelimitedReader}; use bytes::{Bytes, BytesMut, BufMut}; use futures::{prelude::*, io::IoSlice, ready}; -use std::{convert::TryFrom, io, fmt, error::Error, pin::Pin, task::{Context, Poll}}; +use std::{convert::TryFrom, io, fmt, error::Error, pin::Pin, str::FromStr, task::{Context, Poll}}; use unsigned_varint as uvi; /// The maximum number of supported protocols that can be processed. @@ -42,6 +42,19 @@ const MSG_MULTISTREAM_1_0: &[u8] = b"/multistream/1.0.0\n"; const MSG_PROTOCOL_NA: &[u8] = b"na\n"; /// The encoded form of a multistream-select 'ls' message. const MSG_LS: &[u8] = b"ls\n"; +/// The encoded form of a 'select:' message of the multistream-select +/// simultaneous open protocol extension. +const MSG_SELECT: &[u8] = b"select:"; +/// The encoded form of a 'initiator' message of the multistream-select +/// simultaneous open protocol extension. +const MSG_INITIATOR: &[u8] = b"initiator\n"; +/// The encoded form of a 'responder' message of the multistream-select +/// simultaneous open protocol extension. +const MSG_RESPONDER: &[u8] = b"responder\n"; + +/// The identifier of the multistream-select simultaneous open protocol +/// extension. +pub(crate) const SIM_OPEN_ID: Protocol = Protocol(Bytes::from_static(b"/libp2p/simultaneous-connect")); /// The multistream-select header lines preceeding negotiation. /// @@ -55,7 +68,7 @@ pub enum HeaderLine { impl From for HeaderLine { fn from(v: Version) -> HeaderLine { match v { - Version::V1 | Version::V1Lazy => HeaderLine::V1, + Version::V1 | Version::V1Lazy | Version::V1SimOpen => HeaderLine::V1, } } } @@ -113,6 +126,9 @@ pub enum Message { Protocols(Vec), /// A message signaling that a requested protocol is not available. NotAvailable, + Select(u64), + Initiator, + Responder, } impl Message { @@ -154,6 +170,22 @@ impl Message { dest.put(MSG_PROTOCOL_NA); Ok(()) } + Message::Select(nonce) => { + dest.put(MSG_SELECT); + dest.put(nonce.to_string().as_ref()); + dest.put_u8(b'\n'); + Ok(()) + } + Message::Initiator => { + dest.reserve(MSG_INITIATOR.len()); + dest.put(MSG_INITIATOR); + Ok(()) + } + Message::Responder => { + dest.reserve(MSG_RESPONDER.len()); + dest.put(MSG_RESPONDER); + Ok(()) + } } } @@ -171,6 +203,26 @@ impl Message { return Ok(Message::ListProtocols) } + if msg.len() > MSG_SELECT.len() + 1 /* \n */ + && msg[.. MSG_SELECT.len()] == *MSG_SELECT + && msg.last() == Some(&b'\n') + { + if let Some(nonce) = std::str::from_utf8(&msg[MSG_SELECT.len() .. msg.len() -1]) + .ok() + .and_then(|s| u64::from_str(s).ok()) + { + return Ok(Message::Select(nonce)) + } + } + + if msg == MSG_INITIATOR { + return Ok(Message::Initiator) + } + + if msg == MSG_RESPONDER { + return Ok(Message::Responder) + } + // If it starts with a `/`, ends with a line feed without any // other line feeds in-between, it must be a protocol name. if msg.get(0) == Some(&b'/') && msg.last() == Some(&b'\n') && @@ -238,7 +290,7 @@ impl MessageIO { MessageReader { inner: self.inner.into_reader() } } - /// Drops the [`MessageIO`] resource, yielding the underlying I/O stream. + /// Draops the [`MessageIO`] resource, yielding the underlying I/O stream. /// /// # Panics /// diff --git a/misc/multistream-select/src/tests.rs b/misc/multistream-select/src/tests.rs index f03d1b1ff75..956c30aa5da 100644 --- a/misc/multistream-select/src/tests.rs +++ b/misc/multistream-select/src/tests.rs @@ -35,22 +35,21 @@ fn select_proto_basic() { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let listener_addr = listener.local_addr().unwrap(); - let server = async_std::task::spawn(async move { + let server = async move { let connec = listener.accept().await.unwrap().0; let protos = vec![b"/proto1", b"/proto2"]; let (proto, mut io) = listener_select_proto(connec, protos).await.unwrap(); assert_eq!(proto, b"/proto2"); - let mut out = vec![0; 32]; - let n = io.read(&mut out).await.unwrap(); - out.truncate(n); + let mut out = vec![0; 4]; + io.read_exact(&mut out).await.unwrap(); assert_eq!(out, b"ping"); io.write_all(b"pong").await.unwrap(); io.flush().await.unwrap(); - }); + }; - let client = async_std::task::spawn(async move { + let client = async move { let connec = TcpStream::connect(&listener_addr).await.unwrap(); let protos = vec![b"/proto3", b"/proto2"]; let (proto, mut io) = dialer_select_proto(connec, protos.into_iter(), version) @@ -60,18 +59,17 @@ fn select_proto_basic() { io.write_all(b"ping").await.unwrap(); io.flush().await.unwrap(); - let mut out = vec![0; 32]; - let n = io.read(&mut out).await.unwrap(); - out.truncate(n); + let mut out = vec![0; 4]; + io.read_exact(&mut out).await.unwrap(); assert_eq!(out, b"pong"); - }); + }; - server.await; - client.await; + futures::future::join(server, client).await; } async_std::task::block_on(run(Version::V1)); async_std::task::block_on(run(Version::V1Lazy)); + async_std::task::block_on(run(Version::V1SimOpen)); } /// Tests the expected behaviour of failed negotiations. @@ -165,7 +163,7 @@ fn negotiation_failed() { for (listen_protos, dial_protos) in protos { for dial_payload in payloads.clone() { - for &version in &[Version::V1, Version::V1Lazy] { + for &version in &[Version::V1, Version::V1Lazy, Version::V1SimOpen] { async_std::task::block_on(run(Test { version, listen_protos: listen_protos.clone(), @@ -237,4 +235,34 @@ fn select_proto_serial() { async_std::task::block_on(run(Version::V1)); async_std::task::block_on(run(Version::V1Lazy)); + async_std::task::block_on(run(Version::V1SimOpen)); +} + +#[test] +fn simultaneous_open() { + async fn run(version: Version) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let listener_addr = listener.local_addr().unwrap(); + + let server = async move { + let connec = listener.accept().await.unwrap().0; + let protos = vec![b"/proto1", b"/proto2"]; + let (proto, io) = dialer_select_proto_serial(connec, protos, version).await.unwrap(); + assert_eq!(proto, b"/proto2"); + io.complete().await.unwrap(); + }; + + let client = async move { + let connec = TcpStream::connect(&listener_addr).await.unwrap(); + let protos = vec![b"/proto3", b"/proto2"]; + let (proto, io) = dialer_select_proto_serial(connec, protos.into_iter(), version) + .await.unwrap(); + assert_eq!(proto, b"/proto2"); + io.complete().await.unwrap(); + }; + + futures::future::join(server, client).await; + } + + futures::executor::block_on(run(Version::V1SimOpen)); } From 60cce50c7cfd43d2f90a343522c1f0c8a6af2257 Mon Sep 17 00:00:00 2001 From: David Craven Date: Wed, 9 Jun 2021 14:23:11 +0200 Subject: [PATCH 2/6] Return SimOpenRole from multistream-select. --- misc/multistream-select/src/dialer_select.rs | 15 ++++++++------- misc/multistream-select/src/lib.rs | 4 ++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index b8b95ce5e02..6835c3e4974 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -169,7 +169,7 @@ where // TODO: Clone needed to embed ListenerSelectFuture. Still needed? I::Item: AsRef<[u8]> + Clone { - type Output = Result<(I::Item, Negotiated), NegotiationError>; + type Output = Result<(I::Item, SimOpenRole, Negotiated), NegotiationError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -301,7 +301,8 @@ where SeqState::Responder { mut responder } => { match Pin::new(&mut responder ).poll(cx) { - Poll::Ready(res) => return Poll::Ready(res), + Poll::Ready(Ok((protocol, io))) => return Poll::Ready(Ok((protocol, SimOpenRole::Responder, io))), + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), Poll::Pending => { *this.state = SeqState::Responder { responder }; return Poll::Pending @@ -337,7 +338,7 @@ where log::debug!("Dialer: Expecting proposed protocol: {}", p); let hl = HeaderLine::from(Version::V1Lazy); let io = Negotiated::expecting(io.into_reader(), p, Some(hl)); - return Poll::Ready(Ok((protocol, io))) + return Poll::Ready(Ok((protocol, SimOpenRole::Initiator, io))) } } } @@ -382,7 +383,7 @@ where Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => { log::debug!("Dialer: Received confirmation for protocol: {}", p); let io = Negotiated::completed(io.into_inner()); - return Poll::Ready(Ok((protocol, io))); + return Poll::Ready(Ok((protocol, SimOpenRole::Initiator, io))); } Message::NotAvailable => { log::debug!("Dialer: Received rejection of protocol: {}", @@ -414,7 +415,7 @@ enum SimOpenState { Done, } -enum SimOpenRole { +pub enum SimOpenRole { Initiator, Responder, } @@ -589,7 +590,7 @@ where I: Iterator, I::Item: AsRef<[u8]> { - type Output = Result<(I::Item, Negotiated), NegotiationError>; + type Output = Result<(I::Item, SimOpenRole, Negotiated), NegotiationError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -687,7 +688,7 @@ where log::debug!("Dialer: Expecting proposed protocol: {}", p); let io = Negotiated::expecting(io.into_reader(), p, None); - return Poll::Ready(Ok((protocol, io))) + return Poll::Ready(Ok((protocol, SimOpenRole::Initiator, io))) } ParState::Done => panic!("ParState::poll called after completion") diff --git a/misc/multistream-select/src/lib.rs b/misc/multistream-select/src/lib.rs index 29bc37914b0..2309ebdc943 100644 --- a/misc/multistream-select/src/lib.rs +++ b/misc/multistream-select/src/lib.rs @@ -96,7 +96,7 @@ mod tests; pub use self::negotiated::{Negotiated, NegotiatedComplete, NegotiationError}; pub use self::protocol::ProtocolError; -pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture}; +pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture, SimOpenRole}; pub use self::listener_select::{listener_select_proto, ListenerSelectFuture}; /// Supported multistream-select versions. @@ -146,4 +146,4 @@ impl Default for Version { fn default() -> Self { Version::V1 } -} \ No newline at end of file +} From aa25d237e5a8fc4ed2a5f21b2c536e1bbd45f825 Mon Sep 17 00:00:00 2001 From: David Craven Date: Wed, 9 Jun 2021 14:38:46 +0200 Subject: [PATCH 3/6] Update core/swarm. --- core/src/upgrade.rs | 5 ++--- core/src/upgrade/apply.rs | 7 +++---- core/src/upgrade/denied.rs | 4 ++-- core/src/upgrade/either.rs | 9 ++++----- core/src/upgrade/from_fn.rs | 14 +++++++------- core/src/upgrade/map.rs | 19 +++++++++---------- core/src/upgrade/optional.rs | 6 +++--- core/src/upgrade/select.rs | 9 ++++----- swarm/src/protocols_handler/multi.rs | 6 +++--- swarm/src/upgrade.rs | 12 ++++++------ 10 files changed, 43 insertions(+), 48 deletions(-) diff --git a/core/src/upgrade.rs b/core/src/upgrade.rs index 9798ae6c27a..50b52b06b75 100644 --- a/core/src/upgrade.rs +++ b/core/src/upgrade.rs @@ -70,7 +70,7 @@ mod transfer; use futures::future::Future; pub use crate::Negotiated; -pub use multistream_select::{Version, NegotiatedComplete, NegotiationError, ProtocolError}; +pub use multistream_select::{Version, NegotiatedComplete, NegotiationError, ProtocolError, SimOpenRole}; pub use self::{ apply::{apply, apply_inbound, apply_outbound, InboundUpgradeApply, OutboundUpgradeApply}, denied::DeniedUpgrade, @@ -195,7 +195,7 @@ pub trait OutboundUpgrade: UpgradeInfo { /// method is called to start the handshake. /// /// The `info` is the identifier of the protocol, as produced by `protocol_info`. - fn upgrade_outbound(self, socket: C, info: Self::Info) -> Self::Future; + fn upgrade_outbound(self, socket: C, info: Self::Info, role: SimOpenRole) -> Self::Future; } /// Extention trait for `OutboundUpgrade`. Automatically implemented on all types that implement @@ -221,4 +221,3 @@ pub trait OutboundUpgradeExt: OutboundUpgrade { } impl> OutboundUpgradeExt for U {} - diff --git a/core/src/upgrade/apply.rs b/core/src/upgrade/apply.rs index eaf25e884b3..ce19c112690 100644 --- a/core/src/upgrade/apply.rs +++ b/core/src/upgrade/apply.rs @@ -162,7 +162,7 @@ where upgrade: U }, Upgrade { - future: Pin> + future: Pin>, }, Undefined } @@ -185,7 +185,7 @@ where loop { match mem::replace(&mut self.inner, OutboundUpgradeApplyState::Undefined) { OutboundUpgradeApplyState::Init { mut future, upgrade } => { - let (info, connection) = match Future::poll(Pin::new(&mut future), cx)? { + let (info, role, connection) = match Future::poll(Pin::new(&mut future), cx)? { Poll::Ready(x) => x, Poll::Pending => { self.inner = OutboundUpgradeApplyState::Init { future, upgrade }; @@ -193,7 +193,7 @@ where } }; self.inner = OutboundUpgradeApplyState::Upgrade { - future: Box::pin(upgrade.upgrade_outbound(connection, info.0)) + future: Box::pin(upgrade.upgrade_outbound(connection, info.0, role)), }; } OutboundUpgradeApplyState::Upgrade { mut future } => { @@ -230,4 +230,3 @@ impl AsRef<[u8]> for NameWrap { self.0.protocol_name() } } - diff --git a/core/src/upgrade/denied.rs b/core/src/upgrade/denied.rs index 93438e0bea8..3a36a8dbf3e 100644 --- a/core/src/upgrade/denied.rs +++ b/core/src/upgrade/denied.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use crate::upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; use futures::future; use std::iter; use void::Void; @@ -52,7 +52,7 @@ impl OutboundUpgrade for DeniedUpgrade { type Error = Void; type Future = future::Pending>; - fn upgrade_outbound(self, _: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, _: C, _: Self::Info, _: SimOpenRole) -> Self::Future { future::pending() } } diff --git a/core/src/upgrade/either.rs b/core/src/upgrade/either.rs index 28db987ccd7..afdf404573e 100644 --- a/core/src/upgrade/either.rs +++ b/core/src/upgrade/either.rs @@ -20,7 +20,7 @@ use crate::{ either::{EitherOutput, EitherError, EitherFuture2, EitherName}, - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo} + upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo} }; /// A type to represent two possible upgrade types (inbound or outbound). @@ -73,10 +73,10 @@ where type Error = EitherError; type Future = EitherFuture2; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future { match (self, info) { - (EitherUpgrade::A(a), EitherName::A(info)) => EitherFuture2::A(a.upgrade_outbound(sock, info)), - (EitherUpgrade::B(b), EitherName::B(info)) => EitherFuture2::B(b.upgrade_outbound(sock, info)), + (EitherUpgrade::A(a), EitherName::A(info)) => EitherFuture2::A(a.upgrade_outbound(sock, info, role)), + (EitherUpgrade::B(b), EitherName::B(info)) => EitherFuture2::B(b.upgrade_outbound(sock, info, role)), _ => panic!("Invalid invocation of EitherUpgrade::upgrade_outbound") } } @@ -107,4 +107,3 @@ where } } } - diff --git a/core/src/upgrade/from_fn.rs b/core/src/upgrade/from_fn.rs index c6ef52c1e08..08473ea8a33 100644 --- a/core/src/upgrade/from_fn.rs +++ b/core/src/upgrade/from_fn.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{Endpoint, upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}}; +use crate::{Endpoint, upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, SimOpenRole, UpgradeInfo}}; use futures::prelude::*; use std::iter; @@ -51,7 +51,7 @@ pub fn from_fn(protocol_name: P, fun: F) -> FromFnUpgrad where // Note: these bounds are there in order to help the compiler infer types P: ProtocolName + Clone, - F: FnOnce(C, Endpoint) -> Fut, + F: FnOnce(C, Endpoint, SimOpenRole) -> Fut, Fut: Future>, { FromFnUpgrade { protocol_name, fun } @@ -81,7 +81,7 @@ where impl InboundUpgrade for FromFnUpgrade where P: ProtocolName + Clone, - F: FnOnce(C, Endpoint) -> Fut, + F: FnOnce(C, Endpoint, SimOpenRole) -> Fut, Fut: Future>, { type Output = Out; @@ -89,21 +89,21 @@ where type Future = Fut; fn upgrade_inbound(self, sock: C, _: Self::Info) -> Self::Future { - (self.fun)(sock, Endpoint::Listener) + (self.fun)(sock, Endpoint::Listener, SimOpenRole::Responder) } } impl OutboundUpgrade for FromFnUpgrade where P: ProtocolName + Clone, - F: FnOnce(C, Endpoint) -> Fut, + F: FnOnce(C, Endpoint, SimOpenRole) -> Fut, Fut: Future>, { type Output = Out; type Error = Err; type Future = Fut; - fn upgrade_outbound(self, sock: C, _: Self::Info) -> Self::Future { - (self.fun)(sock, Endpoint::Dialer) + fn upgrade_outbound(self, sock: C, _: Self::Info, role: SimOpenRole) -> Self::Future { + (self.fun)(sock, Endpoint::Dialer, role) } } diff --git a/core/src/upgrade/map.rs b/core/src/upgrade/map.rs index 2f5ca31e207..4cd051247f2 100644 --- a/core/src/upgrade/map.rs +++ b/core/src/upgrade/map.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use crate::upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; use futures::prelude::*; use std::{pin::Pin, task::Context, task::Poll}; @@ -69,8 +69,8 @@ where type Error = U::Error; type Future = U::Future; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { - self.upgrade.upgrade_outbound(sock, info) + fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future { + self.upgrade.upgrade_outbound(sock, info, role) } } @@ -118,9 +118,9 @@ where type Error = U::Error; type Future = MapFuture; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future { MapFuture { - inner: self.upgrade.upgrade_outbound(sock, info), + inner: self.upgrade.upgrade_outbound(sock, info, role), map: Some(self.fun) } } @@ -173,8 +173,8 @@ where type Error = U::Error; type Future = U::Future; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { - self.upgrade.upgrade_outbound(sock, info) + fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future { + self.upgrade.upgrade_outbound(sock, info, role) } } @@ -209,9 +209,9 @@ where type Error = T; type Future = MapErrFuture; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future { MapErrFuture { - fut: self.upgrade.upgrade_outbound(sock, info), + fut: self.upgrade.upgrade_outbound(sock, info, role), fun: Some(self.fun) } } @@ -283,4 +283,3 @@ where } } } - diff --git a/core/src/upgrade/optional.rs b/core/src/upgrade/optional.rs index 02dc3c48f78..7790029e12f 100644 --- a/core/src/upgrade/optional.rs +++ b/core/src/upgrade/optional.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use crate::upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; /// Upgrade that can be disabled at runtime. /// @@ -76,9 +76,9 @@ where type Error = T::Error; type Future = T::Future; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future { if let Some(inner) = self.0 { - inner.upgrade_outbound(sock, info) + inner.upgrade_outbound(sock, info, role) } else { panic!("Bad API usage; a protocol has been negotiated while this struct contains None") } diff --git a/core/src/upgrade/select.rs b/core/src/upgrade/select.rs index 8fa4c5b8a7a..5055be98ac9 100644 --- a/core/src/upgrade/select.rs +++ b/core/src/upgrade/select.rs @@ -20,7 +20,7 @@ use crate::{ either::{EitherOutput, EitherError, EitherFuture2, EitherName}, - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo} + upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo} }; /// Upgrade that combines two upgrades into one. Supports all the protocols supported by either @@ -81,10 +81,10 @@ where type Error = EitherError; type Future = EitherFuture2; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future { match info { - EitherName::A(info) => EitherFuture2::A(self.0.upgrade_outbound(sock, info)), - EitherName::B(info) => EitherFuture2::B(self.1.upgrade_outbound(sock, info)) + EitherName::A(info) => EitherFuture2::A(self.0.upgrade_outbound(sock, info, role)), + EitherName::B(info) => EitherFuture2::B(self.1.upgrade_outbound(sock, info, role)) } } } @@ -117,4 +117,3 @@ where (min1.saturating_add(min2), max) } } - diff --git a/swarm/src/protocols_handler/multi.rs b/swarm/src/protocols_handler/multi.rs index f23de96c30e..ab34c664867 100644 --- a/swarm/src/protocols_handler/multi.rs +++ b/swarm/src/protocols_handler/multi.rs @@ -37,7 +37,7 @@ use crate::upgrade::{ }; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{ConnectedPoint, Multiaddr, PeerId}; -use libp2p_core::upgrade::{ProtocolName, UpgradeError, NegotiationError, ProtocolError}; +use libp2p_core::upgrade::{ProtocolName, UpgradeError, NegotiationError, ProtocolError, SimOpenRole}; use rand::Rng; use std::{ cmp, @@ -437,10 +437,10 @@ where type Error = (K, ::Error); type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(mut self, resource: NegotiatedSubstream, info: Self::Info) -> Self::Future { + fn upgrade_outbound(mut self, resource: NegotiatedSubstream, info: Self::Info, role: SimOpenRole) -> Self::Future { let IndexedProtoName(index, info) = info; let (key, upgrade) = self.upgrades.remove(index); - upgrade.upgrade_outbound(resource, info) + upgrade.upgrade_outbound(resource, info, role) .map(move |out| { match out { Ok(o) => Ok((key, o)), diff --git a/swarm/src/upgrade.rs b/swarm/src/upgrade.rs index e544ac3f27f..8e28f4fac3d 100644 --- a/swarm/src/upgrade.rs +++ b/swarm/src/upgrade.rs @@ -21,7 +21,7 @@ use crate::NegotiatedSubstream; use futures::prelude::*; -use libp2p_core::upgrade; +use libp2p_core::upgrade::{self, SimOpenRole}; /// Implemented automatically on all types that implement [`UpgradeInfo`](upgrade::UpgradeInfo) /// and `Send + 'static`. @@ -66,7 +66,7 @@ pub trait OutboundUpgradeSend: UpgradeInfoSend { type Future: Future> + Send + 'static; /// Equivalent to [`OutboundUpgrade::upgrade_outbound`](upgrade::OutboundUpgrade::upgrade_outbound). - fn upgrade_outbound(self, socket: NegotiatedSubstream, info: Self::Info) -> Self::Future; + fn upgrade_outbound(self, socket: NegotiatedSubstream, info: Self::Info, role: SimOpenRole) -> Self::Future; } impl OutboundUpgradeSend for T @@ -81,8 +81,8 @@ where type Error = T::Error; type Future = T::Future; - fn upgrade_outbound(self, socket: NegotiatedSubstream, info: TInfo) -> Self::Future { - upgrade::OutboundUpgrade::upgrade_outbound(self, socket, info) + fn upgrade_outbound(self, socket: NegotiatedSubstream, info: TInfo, role: SimOpenRole) -> Self::Future { + upgrade::OutboundUpgrade::upgrade_outbound(self, socket, info, role) } } @@ -142,8 +142,8 @@ impl upgrade::OutboundUpgrade for S type Error = T::Error; type Future = T::Future; - fn upgrade_outbound(self, socket: NegotiatedSubstream, info: T::Info) -> Self::Future { - OutboundUpgradeSend::upgrade_outbound(self.0, socket, info) + fn upgrade_outbound(self, socket: NegotiatedSubstream, info: T::Info, role: SimOpenRole) -> Self::Future { + OutboundUpgradeSend::upgrade_outbound(self.0, socket, info, role) } } From f6e25bcb57cb335160ed97bcfba8f0c86e379294 Mon Sep 17 00:00:00 2001 From: David Craven Date: Wed, 9 Jun 2021 15:06:11 +0200 Subject: [PATCH 4/6] Update crates. --- core/src/lib.rs | 2 +- muxers/mplex/src/lib.rs | 4 +- muxers/yamux/src/lib.rs | 18 ++- protocols/floodsub/src/protocol.rs | 4 +- protocols/gossipsub/src/protocol.rs | 4 +- protocols/identify/src/protocol.rs | 6 +- protocols/kad/src/protocol.rs | 4 +- protocols/ping/src/protocol.rs | 4 +- .../relay/src/protocol/outgoing_dst_req.rs | 4 +- .../relay/src/protocol/outgoing_relay_req.rs | 4 +- .../request-response/src/handler/protocol.rs | 4 +- src/simple.rs | 4 +- transports/deflate/src/lib.rs | 4 +- transports/noise/src/lib.rs | 122 +++++++++++++----- transports/plaintext/src/lib.rs | 5 +- 15 files changed, 131 insertions(+), 62 deletions(-) diff --git a/core/src/lib.rs b/core/src/lib.rs index 844fd2a23bc..2024c7190c2 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -61,7 +61,7 @@ pub use peer_id::PeerId; pub use identity::PublicKey; pub use transport::Transport; pub use translation::address_translation; -pub use upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, UpgradeError, ProtocolName}; +pub use upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo, UpgradeError, ProtocolName}; pub use connection::{Connected, Endpoint, ConnectedPoint}; pub use network::Network; diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 653c3310ab4..647625788bd 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -30,7 +30,7 @@ use bytes::Bytes; use libp2p_core::{ StreamMuxer, muxing::StreamMuxerEvent, - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, + upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}, }; use parking_lot::Mutex; use futures::{prelude::*, future, ready}; @@ -67,7 +67,7 @@ where type Error = io::Error; type Future = future::Ready>; - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: C, _: Self::Info, _: SimOpenRole) -> Self::Future { future::ready(Ok(Multiplex { io: Mutex::new(io::Multiplexed::new(socket, self)) })) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 0edadb0ebb0..fbf7056ce3c 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -23,7 +23,7 @@ use futures::{future, prelude::*, ready, stream::{BoxStream, LocalBoxStream}}; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; use parking_lot::Mutex; use std::{fmt, io, iter, pin::Pin, task::{Context, Poll}}; use thiserror::Error; @@ -334,8 +334,12 @@ where type Error = io::Error; type Future = future::Ready>; - fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future { - let mode = self.mode.unwrap_or(yamux::Mode::Client); + fn upgrade_outbound(self, io: C, _: Self::Info, role: SimOpenRole) -> Self::Future { + let mode = match role { + SimOpenRole::Initiator => yamux::Mode::Client, + SimOpenRole::Responder => yamux::Mode::Server, + }; + let mode = self.mode.unwrap_or(mode); future::ready(Ok(Yamux::new(io, self.inner, mode))) } } @@ -348,9 +352,13 @@ where type Error = io::Error; type Future = future::Ready>; - fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, io: C, _: Self::Info, role: SimOpenRole) -> Self::Future { + let mode = match role { + SimOpenRole::Initiator => yamux::Mode::Client, + SimOpenRole::Responder => yamux::Mode::Server, + }; let cfg = self.0; - let mode = cfg.mode.unwrap_or(yamux::Mode::Client); + let mode = cfg.mode.unwrap_or(mode); future::ready(Ok(Yamux::local(io, cfg.inner, mode))) } } diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index bd0b5b2a646..36375bd8ca8 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -20,7 +20,7 @@ use crate::rpc_proto; use crate::topic::Topic; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, PeerId, upgrade}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo, PeerId, upgrade}; use prost::Message; use std::{error, fmt, io, iter, pin::Pin}; use futures::{Future, io::{AsyncRead, AsyncWrite}}; @@ -163,7 +163,7 @@ where type Error = io::Error; type Future = Pin> + Send>>; - fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info, _: SimOpenRole) -> Self::Future { Box::pin(async move { let bytes = self.into_bytes(); upgrade::write_one(&mut socket, bytes).await?; diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index ac2584107dd..d094d662c57 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -34,7 +34,7 @@ use futures::future; use futures::prelude::*; use asynchronous_codec::{Decoder, Encoder, Framed}; use libp2p_core::{ - identity::PublicKey, InboundUpgrade, OutboundUpgrade, PeerId, ProtocolName, UpgradeInfo, + identity::PublicKey, InboundUpgrade, OutboundUpgrade, PeerId, ProtocolName, SimOpenRole, UpgradeInfo, }; use log::{debug, warn}; use prost::Message as ProtobufMessage; @@ -152,7 +152,7 @@ where type Error = GossipsubHandlerError; type Future = Pin> + Send>>; - fn upgrade_outbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: TSocket, protocol_id: Self::Info, _: SimOpenRole) -> Self::Future { let mut length_codec = codec::UviBytes::default(); length_codec.set_max_len(self.max_transmit_size); Box::pin(future::ok(( diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 2c73a1d627f..745d0e618b1 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -23,7 +23,7 @@ use futures::prelude::*; use libp2p_core::{ Multiaddr, PublicKey, - upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeInfo} + upgrade::{self, InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo} }; use log::{debug, trace}; use prost::Message; @@ -122,7 +122,7 @@ where type Error = io::Error; type Future = Pin> + Send>>; - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: C, _: Self::Info, _: SimOpenRole) -> Self::Future { recv(socket).boxed() } } @@ -157,7 +157,7 @@ where type Error = io::Error; type Future = Pin> + Send>>; - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: C, _: Self::Info, _: SimOpenRole) -> Self::Future { send(socket, self.0.0).boxed() } } diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index 27eab8017c2..d4e1661ece9 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -33,7 +33,7 @@ use crate::record::{self, Record}; use futures::prelude::*; use asynchronous_codec::Framed; use libp2p_core::{Multiaddr, PeerId}; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; use prost::Message; use std::{borrow::Cow, convert::TryFrom, time::Duration}; use std::{io, iter}; @@ -224,7 +224,7 @@ where type Future = future::Ready>; type Error = io::Error; - fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, incoming: C, _: Self::Info, _: SimOpenRole) -> Self::Future { let mut codec = UviBytes::default(); codec.set_max_len(self.max_packet_size); diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs index aa63833f651..0940be9269a 100644 --- a/protocols/ping/src/protocol.rs +++ b/protocols/ping/src/protocol.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use futures::prelude::*; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; use libp2p_swarm::NegotiatedSubstream; use rand::{distributions, prelude::*}; use std::{io, iter, time::Duration}; @@ -74,7 +74,7 @@ impl OutboundUpgrade for Ping { type Error = Void; type Future = future::Ready>; - fn upgrade_outbound(self, stream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, stream: NegotiatedSubstream, _: Self::Info, _: SimOpenRole) -> Self::Future { future::ok(stream) } } diff --git a/protocols/relay/src/protocol/outgoing_dst_req.rs b/protocols/relay/src/protocol/outgoing_dst_req.rs index 0a257da3bab..671cab0950a 100644 --- a/protocols/relay/src/protocol/outgoing_dst_req.rs +++ b/protocols/relay/src/protocol/outgoing_dst_req.rs @@ -24,7 +24,7 @@ use asynchronous_codec::{Framed, FramedParts}; use bytes::Bytes; use futures::future::BoxFuture; use futures::prelude::*; -use libp2p_core::{upgrade, Multiaddr, PeerId}; +use libp2p_core::{upgrade, Multiaddr, PeerId, SimOpenRole}; use libp2p_swarm::NegotiatedSubstream; use prost::Message; use std::{fmt, error, iter}; @@ -88,7 +88,7 @@ impl upgrade::OutboundUpgrade for OutgoingDstReq { type Error = OutgoingDstReqError; type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info, _: SimOpenRole) -> Self::Future { let mut codec = UviBytes::default(); codec.set_max_len(MAX_ACCEPTED_MESSAGE_LEN); diff --git a/protocols/relay/src/protocol/outgoing_relay_req.rs b/protocols/relay/src/protocol/outgoing_relay_req.rs index 8f6ed8b3aa3..8526822b1b4 100644 --- a/protocols/relay/src/protocol/outgoing_relay_req.rs +++ b/protocols/relay/src/protocol/outgoing_relay_req.rs @@ -24,7 +24,7 @@ use asynchronous_codec::{Framed, FramedParts}; use futures::channel::oneshot; use futures::future::BoxFuture; use futures::prelude::*; -use libp2p_core::{upgrade, Multiaddr, PeerId}; +use libp2p_core::{upgrade, Multiaddr, PeerId, SimOpenRole}; use libp2p_swarm::NegotiatedSubstream; use prost::Message; use std::{error, fmt, iter}; @@ -72,7 +72,7 @@ impl upgrade::OutboundUpgrade for OutgoingRelayReq { type Error = OutgoingRelayReqError; type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info, _: SimOpenRole) -> Self::Future { let OutgoingRelayReq { src_id, dst_id, diff --git a/protocols/request-response/src/handler/protocol.rs b/protocols/request-response/src/handler/protocol.rs index 404fbd5c436..0d0f21a9925 100644 --- a/protocols/request-response/src/handler/protocol.rs +++ b/protocols/request-response/src/handler/protocol.rs @@ -27,7 +27,7 @@ use crate::RequestId; use crate::codec::RequestResponseCodec; use futures::{channel::oneshot, future::BoxFuture, prelude::*}; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; use libp2p_swarm::NegotiatedSubstream; use smallvec::SmallVec; use std::io; @@ -158,7 +158,7 @@ where type Error = io::Error; type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future { + fn upgrade_outbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info, _: SimOpenRole) -> Self::Future { async move { let write = self.codec.write_request(&protocol, &mut io, self.request); write.await?; diff --git a/src/simple.rs b/src/simple.rs index fb4d3b735d2..5adfdee3851 100644 --- a/src/simple.rs +++ b/src/simple.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use crate::core::upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; use bytes::Bytes; use futures::prelude::*; use std::{iter, sync::Arc}; @@ -89,7 +89,7 @@ where type Error = E; type Future = O; - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: C, _: Self::Info, _: SimOpenRole) -> Self::Future { let upgrade = &self.upgrade; upgrade(socket) } diff --git a/transports/deflate/src/lib.rs b/transports/deflate/src/lib.rs index d93e6ed2e39..039619f87b6 100644 --- a/transports/deflate/src/lib.rs +++ b/transports/deflate/src/lib.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use futures::{prelude::*, ready}; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}; use std::{io, iter, pin::Pin, task::Context, task::Poll}; #[derive(Debug, Copy, Clone)] @@ -65,7 +65,7 @@ where type Error = io::Error; type Future = future::Ready>; - fn upgrade_outbound(self, w: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, w: C, _: Self::Info, _: SimOpenRole) -> Self::Future { future::ok(DeflateOutput::new(w, self.compression)) } } diff --git a/transports/noise/src/lib.rs b/transports/noise/src/lib.rs index 6b09f636cd1..854838d645c 100644 --- a/transports/noise/src/lib.rs +++ b/transports/noise/src/lib.rs @@ -67,7 +67,7 @@ pub use protocol::{Protocol, ProtocolParams, IX, IK, XX}; pub use protocol::{x25519::X25519, x25519_spec::X25519Spec}; use futures::prelude::*; -use libp2p_core::{identity, PeerId, UpgradeInfo, InboundUpgrade, OutboundUpgrade}; +use libp2p_core::{identity, PeerId, UpgradeInfo, InboundUpgrade, OutboundUpgrade, SimOpenRole}; use std::pin::Pin; use zeroize::Zeroize; @@ -203,15 +203,35 @@ where type Error = NoiseError; type Future = Handshake; - fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future { - let session = self.params.into_builder() - .local_private_key(self.dh_keys.secret().as_ref()) - .build_initiator() - .map_err(NoiseError::from); - handshake::rt1_initiator(socket, session, - self.dh_keys.into_identity(), - IdentityExchange::Mutual, - self.legacy) + fn upgrade_outbound(self, socket: T, _: Self::Info, role: SimOpenRole) -> Self::Future { + match role { + SimOpenRole::Initiator => { + let session = self.params.into_builder() + .local_private_key(self.dh_keys.secret().as_ref()) + .build_initiator() + .map_err(NoiseError::from); + handshake::rt1_initiator( + socket, + session, + self.dh_keys.into_identity(), + IdentityExchange::Mutual, + self.legacy, + ) + } + SimOpenRole::Responder => { + let session = self.params.into_builder() + .local_private_key(self.dh_keys.secret().as_ref()) + .build_responder() + .map_err(NoiseError::from); + handshake::rt1_responder( + socket, + session, + self.dh_keys.into_identity(), + IdentityExchange::Mutual, + self.legacy, + ) + } + } } } @@ -249,15 +269,35 @@ where type Error = NoiseError; type Future = Handshake; - fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future { - let session = self.params.into_builder() - .local_private_key(self.dh_keys.secret().as_ref()) - .build_initiator() - .map_err(NoiseError::from); - handshake::rt15_initiator(socket, session, - self.dh_keys.into_identity(), - IdentityExchange::Mutual, - self.legacy) + fn upgrade_outbound(self, socket: T, _: Self::Info, role: SimOpenRole) -> Self::Future { + match role { + SimOpenRole::Initiator => { + let session = self.params.into_builder() + .local_private_key(self.dh_keys.secret().as_ref()) + .build_initiator() + .map_err(NoiseError::from); + handshake::rt15_initiator( + socket, + session, + self.dh_keys.into_identity(), + IdentityExchange::Mutual, + self.legacy, + ) + } + SimOpenRole::Responder => { + let session = self.params.into_builder() + .local_private_key(self.dh_keys.secret().as_ref()) + .build_responder() + .map_err(NoiseError::from); + handshake::rt15_responder( + socket, + session, + self.dh_keys.into_identity(), + IdentityExchange::Mutual, + self.legacy, + ) + } + } } } @@ -295,16 +335,36 @@ where type Error = NoiseError; type Future = Handshake; - fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future { - let session = self.params.into_builder() - .local_private_key(self.dh_keys.secret().as_ref()) - .remote_public_key(self.remote.0.as_ref()) - .build_initiator() - .map_err(NoiseError::from); - handshake::rt1_initiator(socket, session, - self.dh_keys.into_identity(), - IdentityExchange::Send { remote: self.remote.1 }, - self.legacy) + fn upgrade_outbound(self, socket: T, _: Self::Info, role: SimOpenRole) -> Self::Future { + match role { + SimOpenRole::Initiator => { + let session = self.params.into_builder() + .local_private_key(self.dh_keys.secret().as_ref()) + .remote_public_key(self.remote.0.as_ref()) + .build_initiator() + .map_err(NoiseError::from); + handshake::rt1_initiator( + socket, + session, + self.dh_keys.into_identity(), + IdentityExchange::Send { remote: self.remote.1 }, + self.legacy, + ) + } + SimOpenRole::Responder => { + let session = self.params.into_builder() + .local_private_key(self.dh_keys.secret().as_ref()) + .build_responder() + .map_err(NoiseError::from); + handshake::rt1_responder( + socket, + session, + self.dh_keys.into_identity(), + IdentityExchange::Receive, + self.legacy, + ) + } + } } } @@ -374,8 +434,8 @@ where type Error = NoiseError; type Future = Pin> + Send>>; - fn upgrade_outbound(self, socket: T, info: Self::Info) -> Self::Future { - Box::pin(self.config.upgrade_outbound(socket, info) + fn upgrade_outbound(self, socket: T, info: Self::Info, role: SimOpenRole) -> Self::Future { + Box::pin(self.config.upgrade_outbound(socket, info, role) .and_then(|(remote, io)| match remote { RemoteIdentity::IdentityKey(pk) => future::ok((pk.into_peer_id(), io)), _ => future::err(NoiseError::AuthenticationFailed) diff --git a/transports/plaintext/src/lib.rs b/transports/plaintext/src/lib.rs index 0f3a4c9e585..1de49d8c075 100644 --- a/transports/plaintext/src/lib.rs +++ b/transports/plaintext/src/lib.rs @@ -28,6 +28,7 @@ use libp2p_core::{ identity, InboundUpgrade, OutboundUpgrade, + SimOpenRole, UpgradeInfo, PeerId, PublicKey, @@ -96,7 +97,7 @@ impl OutboundUpgrade for PlainText1Config { type Error = Void; type Future = Ready>; - fn upgrade_outbound(self, i: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, i: C, _: Self::Info, _: SimOpenRole) -> Self::Future { future::ready(Ok(i)) } } @@ -138,7 +139,7 @@ where type Error = PlainTextError; type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: C, _: Self::Info, _: SimOpenRole) -> Self::Future { Box::pin(self.handshake(socket)) } } From 65994177b986e000d1fdd80d07acdf2dc4c8a8a2 Mon Sep 17 00:00:00 2001 From: David Craven Date: Wed, 9 Jun 2021 15:14:48 +0200 Subject: [PATCH 5/6] Fix tests. --- core/tests/transport_upgrade.rs | 5 ++--- misc/multistream-select/src/tests.rs | 12 ++++++------ 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/core/tests/transport_upgrade.rs b/core/tests/transport_upgrade.rs index eecace3e46f..da6e71f2614 100644 --- a/core/tests/transport_upgrade.rs +++ b/core/tests/transport_upgrade.rs @@ -23,7 +23,7 @@ mod util; use futures::prelude::*; use libp2p_core::identity; use libp2p_core::transport::{Transport, MemoryTransport}; -use libp2p_core::upgrade::{self, UpgradeInfo, InboundUpgrade, OutboundUpgrade}; +use libp2p_core::upgrade::{self, UpgradeInfo, InboundUpgrade, OutboundUpgrade, SimOpenRole}; use libp2p_mplex::MplexConfig; use libp2p_noise as noise; use multiaddr::{Multiaddr, Protocol}; @@ -68,7 +68,7 @@ where type Error = io::Error; type Future = Pin> + Send>>; - fn upgrade_outbound(self, mut socket: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, mut socket: C, _: Self::Info, _: SimOpenRole) -> Self::Future { Box::pin(async move { socket.write_all(b"hello").await.unwrap(); socket.flush().await.unwrap(); @@ -136,4 +136,3 @@ fn upgrade_pipeline() { async_std::task::spawn(server); async_std::task::block_on(client); } - diff --git a/misc/multistream-select/src/tests.rs b/misc/multistream-select/src/tests.rs index 956c30aa5da..49bcba8a552 100644 --- a/misc/multistream-select/src/tests.rs +++ b/misc/multistream-select/src/tests.rs @@ -52,7 +52,7 @@ fn select_proto_basic() { let client = async move { let connec = TcpStream::connect(&listener_addr).await.unwrap(); let protos = vec![b"/proto3", b"/proto2"]; - let (proto, mut io) = dialer_select_proto(connec, protos.into_iter(), version) + let (proto, _, mut io) = dialer_select_proto(connec, protos.into_iter(), version) .await.unwrap(); assert_eq!(proto, b"/proto2"); @@ -103,7 +103,7 @@ fn negotiation_failed() { let connec = TcpStream::connect(&listener_addr).await.unwrap(); let mut io = match dialer_select_proto(connec, dial_protos.into_iter(), version).await { Err(NegotiationError::Failed) => return, - Ok((_, io)) => io, + Ok((_, _, io)) => io, Err(_) => panic!() }; // The dialer may write a payload that is even sent before it @@ -192,7 +192,7 @@ fn select_proto_parallel() { let client = async_std::task::spawn(async move { let connec = TcpStream::connect(&listener_addr).await.unwrap(); let protos = vec![b"/proto3", b"/proto2"]; - let (proto, io) = dialer_select_proto_parallel(connec, protos.into_iter(), version) + let (proto, _, io) = dialer_select_proto_parallel(connec, protos.into_iter(), version) .await.unwrap(); assert_eq!(proto, b"/proto2"); io.complete().await.unwrap(); @@ -223,7 +223,7 @@ fn select_proto_serial() { let client = async_std::task::spawn(async move { let connec = TcpStream::connect(&listener_addr).await.unwrap(); let protos = vec![b"/proto3", b"/proto2"]; - let (proto, io) = dialer_select_proto_serial(connec, protos.into_iter(), version) + let (proto, _, io) = dialer_select_proto_serial(connec, protos.into_iter(), version) .await.unwrap(); assert_eq!(proto, b"/proto2"); io.complete().await.unwrap(); @@ -247,7 +247,7 @@ fn simultaneous_open() { let server = async move { let connec = listener.accept().await.unwrap().0; let protos = vec![b"/proto1", b"/proto2"]; - let (proto, io) = dialer_select_proto_serial(connec, protos, version).await.unwrap(); + let (proto, _, io) = dialer_select_proto_serial(connec, protos, version).await.unwrap(); assert_eq!(proto, b"/proto2"); io.complete().await.unwrap(); }; @@ -255,7 +255,7 @@ fn simultaneous_open() { let client = async move { let connec = TcpStream::connect(&listener_addr).await.unwrap(); let protos = vec![b"/proto3", b"/proto2"]; - let (proto, io) = dialer_select_proto_serial(connec, protos.into_iter(), version) + let (proto, _, io) = dialer_select_proto_serial(connec, protos.into_iter(), version) .await.unwrap(); assert_eq!(proto, b"/proto2"); io.complete().await.unwrap(); From 5faa3df3c86c5521d4b8a3fe9d1469e893b159e0 Mon Sep 17 00:00:00 2001 From: David Craven Date: Wed, 9 Jun 2021 16:44:31 +0200 Subject: [PATCH 6/6] Fix tests. --- core/src/upgrade/from_fn.rs | 2 +- misc/multistream-select/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/upgrade/from_fn.rs b/core/src/upgrade/from_fn.rs index 08473ea8a33..cd0b119fc75 100644 --- a/core/src/upgrade/from_fn.rs +++ b/core/src/upgrade/from_fn.rs @@ -33,7 +33,7 @@ use std::iter; /// # use std::io; /// let _transport = MemoryTransport::default() /// .and_then(move |out, cp| { -/// upgrade::apply(out, upgrade::from_fn("/foo/1", move |mut sock, endpoint| async move { +/// upgrade::apply(out, upgrade::from_fn("/foo/1", move |mut sock, endpoint, _| async move { /// if endpoint.is_dialer() { /// upgrade::write_one(&mut sock, "some handshake data").await?; /// } else { diff --git a/misc/multistream-select/src/lib.rs b/misc/multistream-select/src/lib.rs index 2309ebdc943..837fbec6303 100644 --- a/misc/multistream-select/src/lib.rs +++ b/misc/multistream-select/src/lib.rs @@ -79,7 +79,7 @@ //! let socket = TcpStream::connect("127.0.0.1:10333").await.unwrap(); //! //! let protos = vec![b"/echo/1.0.0", b"/echo/2.5.0"]; -//! let (protocol, _io) = dialer_select_proto(socket, protos, Version::V1).await.unwrap(); +//! let (protocol, _, _io) = dialer_select_proto(socket, protos, Version::V1).await.unwrap(); //! //! println!("Negotiated protocol: {:?}", protocol); //! // You can now use `_io` to communicate with the remote.