Skip to content

Commit 73e7878

Browse files
authored
Configurable multistream-select protocol. Add V1Lazy variant. (#1245)
Make the multistream-select protocol (version) configurable on transport upgrades as well as for individual substreams. Add a "lazy" variant of multistream-select 1.0 that delays sending of negotiation protocol frames as much as possible but is only safe to use under additional assumptions that go beyond what is required by the multistream-select v1 specification.
1 parent 8c11926 commit 73e7878

File tree

29 files changed

+374
-282
lines changed

29 files changed

+374
-282
lines changed

core/src/transport/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,12 +209,12 @@ pub trait Transport {
209209
}
210210

211211
/// Begins a series of protocol upgrades via an [`upgrade::Builder`].
212-
fn upgrade(self) -> upgrade::Builder<Self>
212+
fn upgrade(self, version: upgrade::Version) -> upgrade::Builder<Self>
213213
where
214214
Self: Sized,
215215
Self::Error: 'static
216216
{
217-
upgrade::Builder::new(self)
217+
upgrade::Builder::new(self, version)
218218
}
219219
}
220220

core/src/transport/upgrade.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
//! Configuration of transport protocol upgrades.
2222
23+
pub use crate::upgrade::Version;
24+
2325
use crate::{
2426
ConnectedPoint,
2527
ConnectionInfo,
@@ -68,7 +70,8 @@ use tokio_io::{AsyncRead, AsyncWrite};
6870
///
6971
/// [`Network`]: crate::nodes::Network
7072
pub struct Builder<T> {
71-
inner: T
73+
inner: T,
74+
version: upgrade::Version,
7275
}
7376

7477
impl<T> Builder<T>
@@ -77,8 +80,8 @@ where
7780
T::Error: 'static,
7881
{
7982
/// Creates a `Builder` over the given (base) `Transport`.
80-
pub fn new(transport: T) -> Builder<T> {
81-
Builder { inner: transport }
83+
pub fn new(inner: T, version: upgrade::Version) -> Builder<T> {
84+
Builder { inner, version }
8285
}
8386

8487
/// Upgrades the transport to perform authentication of the remote.
@@ -105,11 +108,12 @@ where
105108
U: OutboundUpgrade<C, Output = (I, D), Error = E> + Clone,
106109
E: Error + 'static,
107110
{
111+
let version = self.version;
108112
Builder::new(self.inner.and_then(move |conn, endpoint| {
109113
Authenticate {
110-
inner: upgrade::apply(conn, upgrade, endpoint)
114+
inner: upgrade::apply(conn, upgrade, endpoint, version)
111115
}
112-
}))
116+
}), version)
113117
}
114118

115119
/// Applies an arbitrary upgrade on an authenticated, non-multiplexed
@@ -133,7 +137,7 @@ where
133137
U: OutboundUpgrade<C, Output = D, Error = E> + Clone,
134138
E: Error + 'static,
135139
{
136-
Builder::new(Upgrade::new(self.inner, upgrade))
140+
Builder::new(Upgrade::new(self.inner, upgrade), self.version)
137141
}
138142

139143
/// Upgrades the transport with a (sub)stream multiplexer.
@@ -158,8 +162,9 @@ where
158162
U: OutboundUpgrade<C, Output = M, Error = E> + Clone,
159163
E: Error + 'static,
160164
{
165+
let version = self.version;
161166
self.inner.and_then(move |(i, c), endpoint| {
162-
let upgrade = upgrade::apply(c, upgrade, endpoint);
167+
let upgrade = upgrade::apply(c, upgrade, endpoint, version);
163168
Multiplex { info: Some(i), upgrade }
164169
})
165170
}
@@ -332,7 +337,7 @@ where
332337
future::Either::A(ref mut up) => {
333338
let (i, c) = try_ready!(self.future.poll().map_err(TransportUpgradeError::Transport));
334339
let u = up.take().expect("DialUpgradeFuture is constructed with Either::A(Some).");
335-
future::Either::B((Some(i), apply_outbound(c, u)))
340+
future::Either::B((Some(i), apply_outbound(c, u, upgrade::Version::V1)))
336341
}
337342
future::Either::B((ref mut i, ref mut up)) => {
338343
let d = try_ready!(up.poll().map_err(TransportUpgradeError::Upgrade));

core/src/upgrade/apply.rs

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,17 @@
2020

2121
use crate::ConnectedPoint;
2222
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError};
23-
use crate::upgrade::{ProtocolName, NegotiatedComplete};
23+
use crate::upgrade::ProtocolName;
2424
use futures::{future::Either, prelude::*};
2525
use log::debug;
2626
use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture};
2727
use std::{iter, mem};
2828
use tokio_io::{AsyncRead, AsyncWrite};
2929

30+
pub use multistream_select::Version;
31+
3032
/// Applies an upgrade to the inbound and outbound direction of a connection or substream.
31-
pub fn apply<C, U>(conn: C, up: U, cp: ConnectedPoint)
33+
pub fn apply<C, U>(conn: C, up: U, cp: ConnectedPoint, v: Version)
3234
-> Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>
3335
where
3436
C: AsyncRead + AsyncWrite,
@@ -37,7 +39,7 @@ where
3739
if cp.is_listener() {
3840
Either::A(apply_inbound(conn, up))
3941
} else {
40-
Either::B(apply_outbound(conn, up))
42+
Either::B(apply_outbound(conn, up, v))
4143
}
4244
}
4345

@@ -55,13 +57,13 @@ where
5557
}
5658

5759
/// Tries to perform an upgrade on an outbound connection or substream.
58-
pub fn apply_outbound<C, U>(conn: C, up: U) -> OutboundUpgradeApply<C, U>
60+
pub fn apply_outbound<C, U>(conn: C, up: U, v: Version) -> OutboundUpgradeApply<C, U>
5961
where
6062
C: AsyncRead + AsyncWrite,
6163
U: OutboundUpgrade<C>
6264
{
6365
let iter = up.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>);
64-
let future = multistream_select::dialer_select_proto(conn, iter);
66+
let future = multistream_select::dialer_select_proto(conn, iter, v);
6567
OutboundUpgradeApply {
6668
inner: OutboundUpgradeApplyState::Init { future, upgrade: up }
6769
}
@@ -155,11 +157,6 @@ where
155157
future: DialerSelectFuture<C, NameWrapIter<<U::InfoIter as IntoIterator>::IntoIter>>,
156158
upgrade: U
157159
},
158-
AwaitNegotiated {
159-
io: NegotiatedComplete<C>,
160-
upgrade: U,
161-
protocol: U::Info
162-
},
163160
Upgrade {
164161
future: U::Future
165162
},
@@ -185,24 +182,8 @@ where
185182
return Ok(Async::NotReady)
186183
}
187184
};
188-
self.inner = OutboundUpgradeApplyState::AwaitNegotiated {
189-
io: connection.complete(),
190-
protocol: info.0,
191-
upgrade
192-
};
193-
}
194-
OutboundUpgradeApplyState::AwaitNegotiated { mut io, protocol, upgrade } => {
195-
let io = match io.poll()? {
196-
Async::NotReady => {
197-
self.inner = OutboundUpgradeApplyState::AwaitNegotiated {
198-
io, protocol, upgrade
199-
};
200-
return Ok(Async::NotReady)
201-
}
202-
Async::Ready(io) => io
203-
};
204185
self.inner = OutboundUpgradeApplyState::Upgrade {
205-
future: upgrade.upgrade_outbound(io, protocol)
186+
future: upgrade.upgrade_outbound(connection, info.0)
206187
};
207188
}
208189
OutboundUpgradeApplyState::Upgrade { mut future } => {

core/src/upgrade/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ mod transfer;
6868

6969
use futures::future::Future;
7070

71-
pub use multistream_select::{Negotiated, NegotiatedComplete, NegotiationError, ProtocolError};
71+
pub use multistream_select::{Version, Negotiated, NegotiatedComplete, NegotiationError, ProtocolError};
7272
pub use self::{
7373
apply::{apply, apply_inbound, apply_outbound, InboundUpgradeApply, OutboundUpgradeApply},
7474
denied::DeniedUpgrade,

core/tests/network_dial_error.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ fn deny_incoming_connec() {
9595
let local_key = identity::Keypair::generate_ed25519();
9696
let local_public_key = local_key.public();
9797
let transport = libp2p_tcp::TcpConfig::new()
98-
.upgrade()
98+
.upgrade(upgrade::Version::V1)
9999
.authenticate(libp2p_secio::SecioConfig::new(local_key))
100100
.multiplex(libp2p_mplex::MplexConfig::new());
101101
Network::new(transport, local_public_key.into())
@@ -105,7 +105,7 @@ fn deny_incoming_connec() {
105105
let local_key = identity::Keypair::generate_ed25519();
106106
let local_public_key = local_key.public();
107107
let transport = libp2p_tcp::TcpConfig::new()
108-
.upgrade()
108+
.upgrade(upgrade::Version::V1)
109109
.authenticate(libp2p_secio::SecioConfig::new(local_key))
110110
.multiplex(libp2p_mplex::MplexConfig::new());
111111
Network::new(transport, local_public_key.into())
@@ -170,7 +170,7 @@ fn dial_self() {
170170
let local_key = identity::Keypair::generate_ed25519();
171171
let local_public_key = local_key.public();
172172
let transport = libp2p_tcp::TcpConfig::new()
173-
.upgrade()
173+
.upgrade(upgrade::Version::V1)
174174
.authenticate(libp2p_secio::SecioConfig::new(local_key))
175175
.multiplex(libp2p_mplex::MplexConfig::new())
176176
.and_then(|(peer, mplex), _| {
@@ -249,7 +249,7 @@ fn dial_self_by_id() {
249249
let local_key = identity::Keypair::generate_ed25519();
250250
let local_public_key = local_key.public();
251251
let transport = libp2p_tcp::TcpConfig::new()
252-
.upgrade()
252+
.upgrade(upgrade::Version::V1)
253253
.authenticate(libp2p_secio::SecioConfig::new(local_key))
254254
.multiplex(libp2p_mplex::MplexConfig::new());
255255
Network::new(transport, local_public_key.into())
@@ -267,7 +267,7 @@ fn multiple_addresses_err() {
267267
let local_key = identity::Keypair::generate_ed25519();
268268
let local_public_key = local_key.public();
269269
let transport = libp2p_tcp::TcpConfig::new()
270-
.upgrade()
270+
.upgrade(upgrade::Version::V1)
271271
.authenticate(libp2p_secio::SecioConfig::new(local_key))
272272
.multiplex(libp2p_mplex::MplexConfig::new());
273273
Network::new(transport, local_public_key.into())

core/tests/network_simult.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ fn raw_swarm_simultaneous_connect() {
110110
let local_key = identity::Keypair::generate_ed25519();
111111
let local_public_key = local_key.public();
112112
let transport = libp2p_tcp::TcpConfig::new()
113-
.upgrade()
113+
.upgrade(upgrade::Version::V1Lazy)
114114
.authenticate(libp2p_secio::SecioConfig::new(local_key))
115115
.multiplex(libp2p_mplex::MplexConfig::new())
116116
.and_then(|(peer, mplex), _| {
@@ -125,7 +125,7 @@ fn raw_swarm_simultaneous_connect() {
125125
let local_key = identity::Keypair::generate_ed25519();
126126
let local_public_key = local_key.public();
127127
let transport = libp2p_tcp::TcpConfig::new()
128-
.upgrade()
128+
.upgrade(upgrade::Version::V1Lazy)
129129
.authenticate(libp2p_secio::SecioConfig::new(local_key))
130130
.multiplex(libp2p_mplex::MplexConfig::new())
131131
.and_then(|(peer, mplex), _| {

core/tests/transport_upgrade.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use futures::future::Future;
2424
use futures::stream::Stream;
2525
use libp2p_core::identity;
2626
use libp2p_core::transport::{Transport, MemoryTransport, ListenerEvent};
27-
use libp2p_core::upgrade::{UpgradeInfo, Negotiated, InboundUpgrade, OutboundUpgrade};
27+
use libp2p_core::upgrade::{self, UpgradeInfo, Negotiated, InboundUpgrade, OutboundUpgrade};
2828
use libp2p_mplex::MplexConfig;
2929
use libp2p_secio::SecioConfig;
3030
use multiaddr::Multiaddr;
@@ -78,7 +78,7 @@ fn upgrade_pipeline() {
7878
let listener_keys = identity::Keypair::generate_ed25519();
7979
let listener_id = listener_keys.public().into_peer_id();
8080
let listener_transport = MemoryTransport::default()
81-
.upgrade()
81+
.upgrade(upgrade::Version::V1)
8282
.authenticate(SecioConfig::new(listener_keys))
8383
.apply(HelloUpgrade {})
8484
.apply(HelloUpgrade {})
@@ -93,7 +93,7 @@ fn upgrade_pipeline() {
9393
let dialer_keys = identity::Keypair::generate_ed25519();
9494
let dialer_id = dialer_keys.public().into_peer_id();
9595
let dialer_transport = MemoryTransport::default()
96-
.upgrade()
96+
.upgrade(upgrade::Version::V1)
9797
.authenticate(SecioConfig::new(dialer_keys))
9898
.apply(HelloUpgrade {})
9999
.apply(HelloUpgrade {})

0 commit comments

Comments
 (0)