Skip to content

Commit deb72c8

Browse files
fix(dcutr): update the DCUtR initiator transport direction to Inbound (#994)
1 parent ce0685c commit deb72c8

13 files changed

Lines changed: 48 additions & 50 deletions

File tree

libp2p/dial.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ method connect*(
2626
addrs: seq[MultiAddress],
2727
forceDial = false,
2828
reuseConnection = true,
29-
upgradeDir = Direction.Out) {.async, base.} =
29+
dir = Direction.Out) {.async, base.} =
3030
## connect remote peer without negotiating
3131
## a protocol
3232
##

libp2p/dialer.nim

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ proc dialAndUpgrade(
5353
peerId: Opt[PeerId],
5454
hostname: string,
5555
address: MultiAddress,
56-
upgradeDir = Direction.Out):
56+
dir = Direction.Out):
5757
Future[Muxer] {.async.} =
5858

5959
for transport in self.transports: # for each transport
@@ -75,23 +75,27 @@ proc dialAndUpgrade(
7575

7676
let mux =
7777
try:
78-
dialed.transportDir = upgradeDir
79-
await transport.upgrade(dialed, upgradeDir, peerId)
78+
# This is for the very specific case of a simultaneous dial during DCUtR. In this case, both sides will have
79+
# an Outbound direction at the transport level. Therefore we update the DCUtR initiator transport direction to Inbound.
80+
# The if below is more general and might handle other use cases in the future.
81+
if dialed.dir != dir:
82+
dialed.dir = dir
83+
await transport.upgrade(dialed, peerId)
8084
except CatchableError as exc:
8185
# If we failed to establish the connection through one transport,
8286
# we won't succeeded through another - no use in trying again
8387
await dialed.close()
8488
debug "Upgrade failed", err = exc.msg, peerId = peerId.get(default(PeerId))
8589
if exc isnot CancelledError:
86-
if upgradeDir == Direction.Out:
90+
if dialed.dir == Direction.Out:
8791
libp2p_failed_upgrades_outgoing.inc()
8892
else:
8993
libp2p_failed_upgrades_incoming.inc()
9094

9195
# Try other address
9296
return nil
9397

94-
doAssert not isNil(mux), "connection died after upgrade " & $upgradeDir
98+
doAssert not isNil(mux), "connection died after upgrade " & $dialed.dir
9599
debug "Dial successful", peerId = mux.connection.peerId
96100
return mux
97101
return nil
@@ -128,7 +132,7 @@ proc dialAndUpgrade(
128132
self: Dialer,
129133
peerId: Opt[PeerId],
130134
addrs: seq[MultiAddress],
131-
upgradeDir = Direction.Out):
135+
dir = Direction.Out):
132136
Future[Muxer] {.async.} =
133137

134138
debug "Dialing peer", peerId = peerId.get(default(PeerId))
@@ -146,7 +150,7 @@ proc dialAndUpgrade(
146150
else: await self.nameResolver.resolveMAddress(expandedAddress)
147151

148152
for resolvedAddress in resolvedAddresses:
149-
result = await self.dialAndUpgrade(addrPeerId, hostname, resolvedAddress, upgradeDir)
153+
result = await self.dialAndUpgrade(addrPeerId, hostname, resolvedAddress, dir)
150154
if not isNil(result):
151155
return result
152156

@@ -164,7 +168,7 @@ proc internalConnect(
164168
addrs: seq[MultiAddress],
165169
forceDial: bool,
166170
reuseConnection = true,
167-
upgradeDir = Direction.Out):
171+
dir = Direction.Out):
168172
Future[Muxer] {.async.} =
169173
if Opt.some(self.localPeerId) == peerId:
170174
raise newException(CatchableError, "can't dial self!")
@@ -182,7 +186,7 @@ proc internalConnect(
182186
let slot = self.connManager.getOutgoingSlot(forceDial)
183187
let muxed =
184188
try:
185-
await self.dialAndUpgrade(peerId, addrs, upgradeDir)
189+
await self.dialAndUpgrade(peerId, addrs, dir)
186190
except CatchableError as exc:
187191
slot.release()
188192
raise exc
@@ -209,15 +213,15 @@ method connect*(
209213
addrs: seq[MultiAddress],
210214
forceDial = false,
211215
reuseConnection = true,
212-
upgradeDir = Direction.Out) {.async.} =
216+
dir = Direction.Out) {.async.} =
213217
## connect remote peer without negotiating
214218
## a protocol
215219
##
216220

217221
if self.connManager.connCount(peerId) > 0 and reuseConnection:
218222
return
219223

220-
discard await self.internalConnect(Opt.some(peerId), addrs, forceDial, reuseConnection, upgradeDir)
224+
discard await self.internalConnect(Opt.some(peerId), addrs, forceDial, reuseConnection, dir)
221225

222226
method connect*(
223227
self: Dialer,

libp2p/protocols/connectivity/dcutr/client.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ proc startSync*(self: DcutrClient, switch: Switch, remotePeerId: PeerId, addrs:
6666

6767
if peerDialableAddrs.len > self.maxDialableAddrs:
6868
peerDialableAddrs = peerDialableAddrs[0..<self.maxDialableAddrs]
69-
var futs = peerDialableAddrs.mapIt(switch.connect(stream.peerId, @[it], forceDial = true, reuseConnection = false, upgradeDir = Direction.In))
69+
var futs = peerDialableAddrs.mapIt(switch.connect(stream.peerId, @[it], forceDial = true, reuseConnection = false, dir = Direction.In))
7070
try:
7171
discard await anyCompleted(futs).wait(self.connectTimeout)
7272
debug "Dcutr initiator has directly connected to the remote peer."

libp2p/protocols/connectivity/dcutr/server.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ proc new*(T: typedesc[Dcutr], switch: Switch, connectTimeout = 15.seconds, maxDi
5656

5757
if peerDialableAddrs.len > maxDialableAddrs:
5858
peerDialableAddrs = peerDialableAddrs[0..<maxDialableAddrs]
59-
var futs = peerDialableAddrs.mapIt(switch.connect(stream.peerId, @[it], forceDial = true, reuseConnection = false, upgradeDir = Direction.Out))
59+
var futs = peerDialableAddrs.mapIt(switch.connect(stream.peerId, @[it], forceDial = true, reuseConnection = false, dir = Direction.Out))
6060
try:
6161
discard await anyCompleted(futs).wait(connectTimeout)
6262
debug "Dcutr receiver has directly connected to the remote peer."

libp2p/protocols/secure/secure.nim

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,9 @@ method init*(s: Secure) =
135135

136136
method secure*(s: Secure,
137137
conn: Connection,
138-
initiator: bool,
139138
peerId: Opt[PeerId]):
140139
Future[Connection] {.base.} =
141-
s.handleConn(conn, initiator, peerId)
140+
s.handleConn(conn, conn.dir == Direction.Out, peerId)
142141

143142
method readOnce*(s: SecureConn,
144143
pbytes: pointer,

libp2p/switch.nim

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,10 @@ method connect*(
141141
addrs: seq[MultiAddress],
142142
forceDial = false,
143143
reuseConnection = true,
144-
upgradeDir = Direction.Out): Future[void] {.public.} =
144+
dir = Direction.Out): Future[void] {.public.} =
145145
## Connects to a peer without opening a stream to it
146146

147-
s.dialer.connect(peerId, addrs, forceDial, reuseConnection, upgradeDir)
147+
s.dialer.connect(peerId, addrs, forceDial, reuseConnection, dir)
148148

149149
method connect*(
150150
s: Switch,
@@ -213,7 +213,7 @@ proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil)
213213
s.peerInfo.protocols.add(proto.codec)
214214

215215
proc upgrader(switch: Switch, trans: Transport, conn: Connection) {.async.} =
216-
let muxed = await trans.upgrade(conn, Direction.In, Opt.none(PeerId))
216+
let muxed = await trans.upgrade(conn, Opt.none(PeerId))
217217
switch.connManager.storeMuxer(muxed)
218218
await switch.peerStore.identify(muxed)
219219
trace "Connection upgrade succeeded"

libp2p/transports/transport.nim

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,12 @@ proc dial*(
8383
method upgrade*(
8484
self: Transport,
8585
conn: Connection,
86-
direction: Direction,
8786
peerId: Opt[PeerId]): Future[Muxer] {.base, gcsafe.} =
8887
## base upgrade method that the transport uses to perform
8988
## transport specific upgrades
9089
##
9190

92-
self.upgrader.upgrade(conn, direction, peerId)
91+
self.upgrader.upgrade(conn, peerId)
9392

9493
method handles*(
9594
self: Transport,

libp2p/upgrademngrs/muxedupgrade.nim

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ proc getMuxerByCodec(self: MuxedUpgrade, muxerName: string): MuxerProvider =
3232

3333
proc mux*(
3434
self: MuxedUpgrade,
35-
conn: Connection,
36-
direction: Direction): Future[Muxer] {.async, gcsafe.} =
35+
conn: Connection): Future[Muxer] {.async, gcsafe.} =
3736
## mux connection
3837

3938
trace "Muxing connection", conn
@@ -42,7 +41,7 @@ proc mux*(
4241
return
4342

4443
let muxerName =
45-
if direction == Out: await self.ms.select(conn, self.muxers.mapIt(it.codec))
44+
if conn.dir == Out: await self.ms.select(conn, self.muxers.mapIt(it.codec))
4645
else: await MultistreamSelect.handle(conn, self.muxers.mapIt(it.codec))
4746

4847
if muxerName.len == 0 or muxerName == "na":
@@ -62,16 +61,15 @@ proc mux*(
6261
method upgrade*(
6362
self: MuxedUpgrade,
6463
conn: Connection,
65-
direction: Direction,
6664
peerId: Opt[PeerId]): Future[Muxer] {.async.} =
67-
trace "Upgrading connection", conn, direction
65+
trace "Upgrading connection", conn, direction = conn.dir
6866

69-
let sconn = await self.secure(conn, direction, peerId) # secure the connection
67+
let sconn = await self.secure(conn, peerId) # secure the connection
7068
if isNil(sconn):
7169
raise newException(UpgradeFailedError,
7270
"unable to secure connection, stopping upgrade")
7371

74-
let muxer = await self.mux(sconn, direction) # mux it if possible
72+
let muxer = await self.mux(sconn) # mux it if possible
7573
if muxer == nil:
7674
raise newException(UpgradeFailedError,
7775
"a muxer is required for outgoing connections")
@@ -84,7 +82,7 @@ method upgrade*(
8482
raise newException(UpgradeFailedError,
8583
"Connection closed or missing peer info, stopping upgrade")
8684

87-
trace "Upgraded connection", conn, sconn, direction
85+
trace "Upgraded connection", conn, sconn, direction = conn.dir
8886
return muxer
8987

9088
proc new*(

libp2p/upgrademngrs/upgrade.nim

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,18 @@ type
4040
method upgrade*(
4141
self: Upgrade,
4242
conn: Connection,
43-
direction: Direction,
4443
peerId: Opt[PeerId]): Future[Muxer] {.base.} =
4544
doAssert(false, "Not implemented!")
4645

4746
proc secure*(
4847
self: Upgrade,
4948
conn: Connection,
50-
direction: Direction,
5149
peerId: Opt[PeerId]): Future[Connection] {.async, gcsafe.} =
5250
if self.secureManagers.len <= 0:
5351
raise newException(UpgradeFailedError, "No secure managers registered!")
5452

5553
let codec =
56-
if direction == Out: await self.ms.select(conn, self.secureManagers.mapIt(it.codec))
54+
if conn.dir == Out: await self.ms.select(conn, self.secureManagers.mapIt(it.codec))
5755
else: await MultistreamSelect.handle(conn, self.secureManagers.mapIt(it.codec))
5856
if codec.len == 0:
5957
raise newException(UpgradeFailedError, "Unable to negotiate a secure channel!")
@@ -65,4 +63,4 @@ proc secure*(
6563
# let's avoid duplicating checks but detect if it fails to do it properly
6664
doAssert(secureProtocol.len > 0)
6765

68-
return await secureProtocol[0].secure(conn, direction == Out, peerId)
66+
return await secureProtocol[0].secure(conn, peerId)

tests/stubs/switchstub.nim

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,19 @@ type
2424
addrs: seq[MultiAddress],
2525
forceDial = false,
2626
reuseConnection = true,
27-
upgradeDir = Direction.Out): Future[void] {.gcsafe, async.}
27+
dir = Direction.Out): Future[void] {.gcsafe, async.}
2828

2929
method connect*(
3030
self: SwitchStub,
3131
peerId: PeerId,
3232
addrs: seq[MultiAddress],
3333
forceDial = false,
3434
reuseConnection = true,
35-
upgradeDir = Direction.Out) {.async.} =
35+
dir = Direction.Out) {.async.} =
3636
if (self.connectStub != nil):
37-
await self.connectStub(self, peerId, addrs, forceDial, reuseConnection, upgradeDir)
37+
await self.connectStub(self, peerId, addrs, forceDial, reuseConnection, dir)
3838
else:
39-
await self.switch.connect(peerId, addrs, forceDial, reuseConnection, upgradeDir)
39+
await self.switch.connect(peerId, addrs, forceDial, reuseConnection, dir)
4040

4141
proc new*(T: typedesc[SwitchStub], switch: Switch, connectStub: connectStubType = nil): T =
4242
return SwitchStub(

0 commit comments

Comments
 (0)