Skip to content

Commit e8dbdec

Browse files
committed
Use new dialer trait in network and swarm.
1 parent 8206140 commit e8dbdec

File tree

6 files changed

+138
-29
lines changed

6 files changed

+138
-29
lines changed

core/src/connection/listeners.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
//! Manage listening on multiple multiaddresses at once.
2222
23-
use crate::{Multiaddr, Transport, transport::{TransportError, ListenerEvent}};
23+
use crate::{address_translation, Dialer, Multiaddr, Transport, transport::{TransportError, ListenerEvent}};
2424
use futures::{prelude::*, task::Context, task::Poll};
2525
use log::debug;
2626
use smallvec::SmallVec;
@@ -232,6 +232,42 @@ where
232232
self.listeners.iter().flat_map(|l| l.addresses.iter())
233233
}
234234

235+
/// Perform address translation.
236+
pub fn address_translation(&self, observed_addr: &Multiaddr) -> Vec<Multiaddr> {
237+
let mut addrs = Vec::with_capacity(4 * self.listeners.len());
238+
for listener in &self.listeners {
239+
if listener.dialer.requires_address_translation() {
240+
for addr in &listener.addresses {
241+
if let Some(new_addr) = address_translation(addr, observed_addr) {
242+
if addrs.iter().find(|addr| *addr == &new_addr).is_none() {
243+
addrs.push(new_addr);
244+
}
245+
}
246+
}
247+
} else {
248+
if addrs.iter().find(|addr| *addr == observed_addr).is_none() {
249+
addrs.push(observed_addr.clone());
250+
}
251+
}
252+
}
253+
addrs
254+
}
255+
256+
/// Returns a dialer for an address.
257+
pub fn dialer_for_addr(&self, addr: &Multiaddr) -> TTrans::Dialer {
258+
if self.listen_addrs().find(|addr2| *addr2 == addr).is_some() {
259+
return self.transport.dialer();
260+
}
261+
262+
self.listeners.iter().filter_map(|listener| {
263+
listener.addresses.iter().find(|address| {
264+
address.can_dial(addr)
265+
}).map(|_| listener.dialer.clone())
266+
})
267+
.next()
268+
.unwrap_or_else(|| self.transport.dialer())
269+
}
270+
235271
/// Provides an API similar to `Stream`, except that it cannot end.
236272
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ListenersEvent<TTrans>> {
237273
// We remove each element from `listeners` one by one and add them back.

core/src/network.rs

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use crate::{
2929
Executor,
3030
Multiaddr,
3131
PeerId,
32-
address_translation,
3332
connection::{
3433
ConnectionId,
3534
ConnectionLimit,
@@ -199,21 +198,8 @@ where
199198
/// * `observed_addr` - should be an address a remote observes you as, which can be obtained for
200199
/// example with the identify protocol.
201200
///
202-
pub fn address_translation<'a>(&'a self, observed_addr: &'a Multiaddr)
203-
-> impl Iterator<Item = Multiaddr> + 'a
204-
where
205-
TMuxer: 'a,
206-
THandler: 'a,
207-
{
208-
let mut addrs: Vec<_> = self.listen_addrs()
209-
.filter_map(move |server| address_translation(server, observed_addr))
210-
.collect();
211-
212-
// remove duplicates
213-
addrs.sort_unstable();
214-
addrs.dedup();
215-
216-
addrs.into_iter()
201+
pub fn address_translation(&self, observed_addr: &Multiaddr) -> Vec<Multiaddr> {
202+
self.listeners.address_translation(observed_addr)
217203
}
218204

219205
/// Returns the peer id of the local node.
@@ -240,7 +226,7 @@ where
240226
TPeerId: Send + 'static,
241227
{
242228
let info = OutgoingInfo { address, peer_id: None };
243-
match self.transport().dialer().dial(address.clone()) {
229+
match self.listeners.dialer_for_addr(address).dial(address.clone()) {
244230
Ok(f) => {
245231
let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
246232
self.pool.add_outgoing(f, handler, info)
@@ -435,8 +421,8 @@ where
435421
let dialing = &mut self.dialing;
436422
let (next, event) = on_connection_failed(dialing, id, endpoint, error, handler);
437423
if let Some(dial) = next {
438-
let transport = self.listeners.transport().clone();
439-
if let Err(e) = dial_peer_impl(transport, pool, dialing, dial) {
424+
let dialer = self.listeners.dialer_for_addr(&dial.address);
425+
if let Err(e) = dial_peer_impl(dialer, pool, dialing, dial) {
440426
log::warn!("Dialing aborted: {:?}", e);
441427
}
442428
}
@@ -481,7 +467,8 @@ where
481467
TOutEvent: Send + 'static,
482468
TPeerId: Send + 'static,
483469
{
484-
dial_peer_impl(self.transport().clone(), &mut self.pool, &mut self.dialing, opts)
470+
let dialer = self.listeners.dialer_for_addr(&opts.address);
471+
dial_peer_impl(dialer, &mut self.pool, &mut self.dialing, opts)
485472
}
486473
}
487474

@@ -495,9 +482,9 @@ struct DialingOpts<TPeerId, THandler> {
495482
}
496483

497484
/// Standalone implementation of `Network::dial_peer` for more granular borrowing.
498-
fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans, TConnInfo, TPeerId>(
499-
transport: TTrans,
500-
pool: &mut Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
485+
fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TDialer, TConnInfo, TPeerId>(
486+
dialer: TDialer,
487+
pool: &mut Pool<TInEvent, TOutEvent, THandler, TDialer::Error,
501488
<THandler::Handler as ConnectionHandler>::Error, TConnInfo, TPeerId>,
502489
dialing: &mut FnvHashMap<TPeerId, SmallVec<[peer::DialingState; 10]>>,
503490
opts: DialingOpts<TPeerId, THandler>
@@ -511,17 +498,17 @@ where
511498
InEvent = TInEvent,
512499
OutEvent = TOutEvent,
513500
> + Send + 'static,
514-
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
515-
TTrans::Dial: Send + 'static,
516-
TTrans::Error: error::Error + Send + 'static,
501+
TDialer: Dialer<Output = (TConnInfo, TMuxer)>,
502+
TDialer::Dial: Send + 'static,
503+
TDialer::Error: error::Error + Send + 'static,
517504
TMuxer: StreamMuxer + Send + Sync + 'static,
518505
TMuxer::OutboundSubstream: Send + 'static,
519506
TInEvent: Send + 'static,
520507
TOutEvent: Send + 'static,
521508
TPeerId: Eq + Hash + Send + Clone + 'static,
522509
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Send + 'static,
523510
{
524-
let result = match transport.dialer().dial(opts.address.clone()) {
511+
let result = match dialer.dial(opts.address.clone()) {
525512
Ok(fut) => {
526513
let fut = fut.map_err(|e| PendingConnectionError::Transport(TransportError::Other(e)));
527514
let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };

core/src/transport.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pub use self::memory::MemoryTransport;
4949
pub use self::optional::OptionalTransport;
5050
pub use self::upgrade::Upgrade;
5151

52-
pub trait Dialer {
52+
pub trait Dialer: Clone {
5353
/// The result of a connection setup process, including protocol upgrades.
5454
///
5555
/// Typically the output contains at least a handle to a data stream (i.e. a
@@ -71,6 +71,8 @@ pub trait Dialer {
7171
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>
7272
where
7373
Self: Sized;
74+
75+
fn requires_address_translation(&self) -> bool { false }
7476
}
7577

7678
/// A transport provides connection-oriented communication between two peers

core/tests/network_dial_error.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,85 @@ fn new_network(cfg: NetworkConfig) -> TestNetwork {
5959
TestNetwork::new(transport, local_public_key.into(), cfg)
6060
}
6161

62+
#[test]
63+
fn port_reuse() {
64+
// Checks whether reusing a port on a swarm works.
65+
66+
let mut swarm1 = new_network(NetworkConfig::default());
67+
let mut swarm2 = new_network(NetworkConfig::default());
68+
69+
swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
70+
swarm2.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
71+
72+
let address1 = async_std::task::block_on(future::poll_fn(|cx| {
73+
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll(cx) {
74+
Poll::Ready(listen_addr)
75+
} else {
76+
panic!("Was expecting the listen address to be reported")
77+
}
78+
}));
79+
80+
let address2 = async_std::task::block_on(future::poll_fn(|cx| {
81+
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm2.poll(cx) {
82+
Poll::Ready(listen_addr)
83+
} else {
84+
panic!("Was expecting the listen address to be reported")
85+
}
86+
}));
87+
88+
swarm2
89+
.peer(swarm1.local_peer_id().clone())
90+
.dial(address1.clone(), Vec::new(), TestHandler())
91+
.unwrap();
92+
93+
let mut got_incoming = false;
94+
let mut got_outgoing = false;
95+
async_std::task::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
96+
loop {
97+
match swarm1.poll(cx) {
98+
Poll::Ready(NetworkEvent::IncomingConnection { connection, .. }) => {
99+
assert_eq!(&connection.local_addr, &address1);
100+
assert_eq!(&connection.send_back_addr, &address2);
101+
swarm1.accept(connection, TestHandler()).unwrap();
102+
},
103+
Poll::Ready(NetworkEvent::ConnectionEstablished {
104+
connection,
105+
..
106+
}) => {
107+
assert_eq!(connection.peer_id(), swarm2.local_peer_id());
108+
got_incoming = true;
109+
if got_outgoing {
110+
return Poll::Ready(Ok(()));
111+
}
112+
},
113+
Poll::Ready(NetworkEvent::ConnectionClosed { .. }) => continue,
114+
Poll::Ready(e) => panic!("{:?}", e),
115+
Poll::Pending => break,
116+
}
117+
}
118+
119+
loop {
120+
match swarm2.poll(cx) {
121+
Poll::Ready(NetworkEvent::ConnectionEstablished {
122+
connection,
123+
..
124+
}) => {
125+
assert_eq!(connection.peer_id(), swarm1.local_peer_id());
126+
got_outgoing = true;
127+
if got_incoming {
128+
return Poll::Ready(Ok(()));
129+
}
130+
},
131+
Poll::Ready(NetworkEvent::ConnectionClosed { .. }) => continue,
132+
Poll::Ready(e) => panic!("{:?}", e),
133+
Poll::Pending => break,
134+
}
135+
}
136+
137+
Poll::Pending
138+
})).unwrap();
139+
}
140+
62141
#[test]
63142
fn deny_incoming_connec() {
64143
// Checks whether refusing an incoming connection on a swarm triggers the correct events.

transports/dns/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ mod tests {
303303
}
304304
}
305305

306+
#[derive(Clone)]
306307
struct CustomDialer;
307308

308309
impl Dialer for CustomDialer {

transports/tcp/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,10 @@ impl Dialer for TcpDialer {
243243

244244
Ok(Box::pin(self.do_dial(socket_addr)))
245245
}
246+
247+
fn requires_address_translation(&self) -> bool {
248+
self.local_socket_addr.is_none()
249+
}
246250
}
247251

248252
impl Transport for TcpConfig {

0 commit comments

Comments
 (0)