Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub use peer_id::PeerId;
pub use identity::PublicKey;
pub use transport::Transport;
pub use translation::address_translation;
pub use upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, UpgradeError, ProtocolName};
pub use upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo, UpgradeError, ProtocolName};
pub use connection::{Connected, Endpoint, ConnectedPoint};
pub use network::Network;

Expand Down
5 changes: 2 additions & 3 deletions core/src/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ mod transfer;
use futures::future::Future;

pub use crate::Negotiated;
pub use multistream_select::{Version, NegotiatedComplete, NegotiationError, ProtocolError};
pub use multistream_select::{Version, NegotiatedComplete, NegotiationError, ProtocolError, SimOpenRole};
pub use self::{
apply::{apply, apply_inbound, apply_outbound, InboundUpgradeApply, OutboundUpgradeApply},
denied::DeniedUpgrade,
Expand Down Expand Up @@ -195,7 +195,7 @@ pub trait OutboundUpgrade<C>: UpgradeInfo {
/// method is called to start the handshake.
///
/// The `info` is the identifier of the protocol, as produced by `protocol_info`.
fn upgrade_outbound(self, socket: C, info: Self::Info) -> Self::Future;
fn upgrade_outbound(self, socket: C, info: Self::Info, role: SimOpenRole) -> Self::Future;
}

/// Extention trait for `OutboundUpgrade`. Automatically implemented on all types that implement
Expand All @@ -221,4 +221,3 @@ pub trait OutboundUpgradeExt<C>: OutboundUpgrade<C> {
}

impl<C, U: OutboundUpgrade<C>> OutboundUpgradeExt<C> for U {}

7 changes: 3 additions & 4 deletions core/src/upgrade/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ where
upgrade: U
},
Upgrade {
future: Pin<Box<U::Future>>
future: Pin<Box<U::Future>>,
},
Undefined
}
Expand All @@ -185,15 +185,15 @@ where
loop {
match mem::replace(&mut self.inner, OutboundUpgradeApplyState::Undefined) {
OutboundUpgradeApplyState::Init { mut future, upgrade } => {
let (info, connection) = match Future::poll(Pin::new(&mut future), cx)? {
let (info, role, connection) = match Future::poll(Pin::new(&mut future), cx)? {
Poll::Ready(x) => x,
Poll::Pending => {
self.inner = OutboundUpgradeApplyState::Init { future, upgrade };
return Poll::Pending
}
};
self.inner = OutboundUpgradeApplyState::Upgrade {
future: Box::pin(upgrade.upgrade_outbound(connection, info.0))
future: Box::pin(upgrade.upgrade_outbound(connection, info.0, role)),
};
}
OutboundUpgradeApplyState::Upgrade { mut future } => {
Expand Down Expand Up @@ -230,4 +230,3 @@ impl<N: ProtocolName> AsRef<[u8]> for NameWrap<N> {
self.0.protocol_name()
}
}

4 changes: 2 additions & 2 deletions core/src/upgrade/denied.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo};
use futures::future;
use std::iter;
use void::Void;
Expand Down Expand Up @@ -52,7 +52,7 @@ impl<C> OutboundUpgrade<C> for DeniedUpgrade {
type Error = Void;
type Future = future::Pending<Result<Self::Output, Self::Error>>;

fn upgrade_outbound(self, _: C, _: Self::Info) -> Self::Future {
fn upgrade_outbound(self, _: C, _: Self::Info, _: SimOpenRole) -> Self::Future {
future::pending()
}
}
9 changes: 4 additions & 5 deletions core/src/upgrade/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use crate::{
either::{EitherOutput, EitherError, EitherFuture2, EitherName},
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}
upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}
};

/// A type to represent two possible upgrade types (inbound or outbound).
Expand Down Expand Up @@ -73,10 +73,10 @@ where
type Error = EitherError<EA, EB>;
type Future = EitherFuture2<A::Future, B::Future>;

fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future {
match (self, info) {
(EitherUpgrade::A(a), EitherName::A(info)) => EitherFuture2::A(a.upgrade_outbound(sock, info)),
(EitherUpgrade::B(b), EitherName::B(info)) => EitherFuture2::B(b.upgrade_outbound(sock, info)),
(EitherUpgrade::A(a), EitherName::A(info)) => EitherFuture2::A(a.upgrade_outbound(sock, info, role)),
(EitherUpgrade::B(b), EitherName::B(info)) => EitherFuture2::B(b.upgrade_outbound(sock, info, role)),
_ => panic!("Invalid invocation of EitherUpgrade::upgrade_outbound")
}
}
Expand Down Expand Up @@ -107,4 +107,3 @@ where
}
}
}

16 changes: 8 additions & 8 deletions core/src/upgrade/from_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{Endpoint, upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}};
use crate::{Endpoint, upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, SimOpenRole, UpgradeInfo}};

use futures::prelude::*;
use std::iter;
Expand All @@ -33,7 +33,7 @@ use std::iter;
/// # use std::io;
/// let _transport = MemoryTransport::default()
/// .and_then(move |out, cp| {
/// upgrade::apply(out, upgrade::from_fn("/foo/1", move |mut sock, endpoint| async move {
/// upgrade::apply(out, upgrade::from_fn("/foo/1", move |mut sock, endpoint, _| async move {
/// if endpoint.is_dialer() {
/// upgrade::write_one(&mut sock, "some handshake data").await?;
/// } else {
Expand All @@ -51,7 +51,7 @@ pub fn from_fn<P, F, C, Fut, Out, Err>(protocol_name: P, fun: F) -> FromFnUpgrad
where
// Note: these bounds are there in order to help the compiler infer types
P: ProtocolName + Clone,
F: FnOnce(C, Endpoint) -> Fut,
F: FnOnce(C, Endpoint, SimOpenRole) -> Fut,
Fut: Future<Output = Result<Out, Err>>,
{
FromFnUpgrade { protocol_name, fun }
Expand Down Expand Up @@ -81,29 +81,29 @@ where
impl<C, P, F, Fut, Err, Out> InboundUpgrade<C> for FromFnUpgrade<P, F>
where
P: ProtocolName + Clone,
F: FnOnce(C, Endpoint) -> Fut,
F: FnOnce(C, Endpoint, SimOpenRole) -> Fut,
Fut: Future<Output = Result<Out, Err>>,
{
type Output = Out;
type Error = Err;
type Future = Fut;

fn upgrade_inbound(self, sock: C, _: Self::Info) -> Self::Future {
(self.fun)(sock, Endpoint::Listener)
(self.fun)(sock, Endpoint::Listener, SimOpenRole::Responder)
}
}

impl<C, P, F, Fut, Err, Out> OutboundUpgrade<C> for FromFnUpgrade<P, F>
where
P: ProtocolName + Clone,
F: FnOnce(C, Endpoint) -> Fut,
F: FnOnce(C, Endpoint, SimOpenRole) -> Fut,
Fut: Future<Output = Result<Out, Err>>,
{
type Output = Out;
type Error = Err;
type Future = Fut;

fn upgrade_outbound(self, sock: C, _: Self::Info) -> Self::Future {
(self.fun)(sock, Endpoint::Dialer)
fn upgrade_outbound(self, sock: C, _: Self::Info, role: SimOpenRole) -> Self::Future {
(self.fun)(sock, Endpoint::Dialer, role)
}
}
19 changes: 9 additions & 10 deletions core/src/upgrade/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo};
use futures::prelude::*;
use std::{pin::Pin, task::Context, task::Poll};

Expand Down Expand Up @@ -69,8 +69,8 @@ where
type Error = U::Error;
type Future = U::Future;

fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
self.upgrade.upgrade_outbound(sock, info)
fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future {
self.upgrade.upgrade_outbound(sock, info, role)
}
}

Expand Down Expand Up @@ -118,9 +118,9 @@ where
type Error = U::Error;
type Future = MapFuture<U::Future, F>;

fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future {
MapFuture {
inner: self.upgrade.upgrade_outbound(sock, info),
inner: self.upgrade.upgrade_outbound(sock, info, role),
map: Some(self.fun)
}
}
Expand Down Expand Up @@ -173,8 +173,8 @@ where
type Error = U::Error;
type Future = U::Future;

fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
self.upgrade.upgrade_outbound(sock, info)
fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future {
self.upgrade.upgrade_outbound(sock, info, role)
}
}

Expand Down Expand Up @@ -209,9 +209,9 @@ where
type Error = T;
type Future = MapErrFuture<U::Future, F>;

fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future {
MapErrFuture {
fut: self.upgrade.upgrade_outbound(sock, info),
fut: self.upgrade.upgrade_outbound(sock, info, role),
fun: Some(self.fun)
}
}
Expand Down Expand Up @@ -283,4 +283,3 @@ where
}
}
}

6 changes: 3 additions & 3 deletions core/src/upgrade/optional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo};

/// Upgrade that can be disabled at runtime.
///
Expand Down Expand Up @@ -76,9 +76,9 @@ where
type Error = T::Error;
type Future = T::Future;

fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future {
if let Some(inner) = self.0 {
inner.upgrade_outbound(sock, info)
inner.upgrade_outbound(sock, info, role)
} else {
panic!("Bad API usage; a protocol has been negotiated while this struct contains None")
}
Expand Down
9 changes: 4 additions & 5 deletions core/src/upgrade/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use crate::{
either::{EitherOutput, EitherError, EitherFuture2, EitherName},
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}
upgrade::{InboundUpgrade, OutboundUpgrade, SimOpenRole, UpgradeInfo}
};

/// Upgrade that combines two upgrades into one. Supports all the protocols supported by either
Expand Down Expand Up @@ -81,10 +81,10 @@ where
type Error = EitherError<EA, EB>;
type Future = EitherFuture2<A::Future, B::Future>;

fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
fn upgrade_outbound(self, sock: C, info: Self::Info, role: SimOpenRole) -> Self::Future {
match info {
EitherName::A(info) => EitherFuture2::A(self.0.upgrade_outbound(sock, info)),
EitherName::B(info) => EitherFuture2::B(self.1.upgrade_outbound(sock, info))
EitherName::A(info) => EitherFuture2::A(self.0.upgrade_outbound(sock, info, role)),
EitherName::B(info) => EitherFuture2::B(self.1.upgrade_outbound(sock, info, role))
}
}
}
Expand Down Expand Up @@ -117,4 +117,3 @@ where
(min1.saturating_add(min2), max)
}
}

5 changes: 2 additions & 3 deletions core/tests/transport_upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod util;
use futures::prelude::*;
use libp2p_core::identity;
use libp2p_core::transport::{Transport, MemoryTransport};
use libp2p_core::upgrade::{self, UpgradeInfo, InboundUpgrade, OutboundUpgrade};
use libp2p_core::upgrade::{self, UpgradeInfo, InboundUpgrade, OutboundUpgrade, SimOpenRole};
use libp2p_mplex::MplexConfig;
use libp2p_noise as noise;
use multiaddr::{Multiaddr, Protocol};
Expand Down Expand Up @@ -68,7 +68,7 @@ where
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

fn upgrade_outbound(self, mut socket: C, _: Self::Info) -> Self::Future {
fn upgrade_outbound(self, mut socket: C, _: Self::Info, _: SimOpenRole) -> Self::Future {
Box::pin(async move {
socket.write_all(b"hello").await.unwrap();
socket.flush().await.unwrap();
Expand Down Expand Up @@ -136,4 +136,3 @@ fn upgrade_pipeline() {
async_std::task::spawn(server);
async_std::task::block_on(client);
}

1 change: 1 addition & 0 deletions misc/multistream-select/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ bytes = "1"
futures = "0.3"
log = "0.4"
pin-project = "1.0.0"
rand = "0.7"
smallvec = "1.6.1"
unsigned-varint = "0.7"

Expand Down
Loading