From 493f27132dc57bf0099285a1eec0a17b9f055e1a Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 27 Nov 2023 18:49:03 +0100 Subject: [PATCH 1/5] Set the client transport dir to In during dcutr --- libp2p/dial.nim | 3 ++- libp2p/dialer.nim | 23 ++++++++++++------- .../protocols/connectivity/dcutr/client.nim | 2 +- libp2p/switch.nim | 5 ++-- tests/stubs/switchstub.nim | 10 ++++---- tests/testdcutr.nim | 12 ++++++---- tests/testhpservice.nim | 3 ++- 7 files changed, 37 insertions(+), 21 deletions(-) diff --git a/libp2p/dial.nim b/libp2p/dial.nim index 422af7018f..5485a29b14 100644 --- a/libp2p/dial.nim +++ b/libp2p/dial.nim @@ -26,7 +26,8 @@ method connect*( addrs: seq[MultiAddress], forceDial = false, reuseConnection = true, - upgradeDir = Direction.Out) {.async, base.} = + upgradeDir = Direction.Out, + transportDir = Direction.Out) {.async, base.} = ## connect remote peer without negotiating ## a protocol ## diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 9fabd3f3d0..e0e1c4a986 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -53,7 +53,8 @@ proc dialAndUpgrade( peerId: Opt[PeerId], hostname: string, address: MultiAddress, - upgradeDir = Direction.Out): + upgradeDir = Direction.Out, + transportDir = Direction.Out): Future[Muxer] {.async.} = for transport in self.transports: # for each transport @@ -75,7 +76,10 @@ proc dialAndUpgrade( let mux = try: - dialed.transportDir = upgradeDir + # This is for the very specific case of a simultaneous dial during DCUtR. In this case, both sides will have + # an outbound dir at the transport level. Therefore we update the peer acting as the client to have an inbound dir. + if transportDir == Direction.In and dialed.dir == Direction.Out and upgradeDir == Direction.In: + dialed.dir = Direction.In await transport.upgrade(dialed, upgradeDir, peerId) except CatchableError as exc: # If we failed to establish the connection through one transport, @@ -128,7 +132,8 @@ proc dialAndUpgrade( self: Dialer, peerId: Opt[PeerId], addrs: seq[MultiAddress], - upgradeDir = Direction.Out): + upgradeDir = Direction.Out, + transportDir = Direction.Out): Future[Muxer] {.async.} = debug "Dialing peer", peerId = peerId.get(default(PeerId)) @@ -146,7 +151,7 @@ proc dialAndUpgrade( else: await self.nameResolver.resolveMAddress(expandedAddress) for resolvedAddress in resolvedAddresses: - result = await self.dialAndUpgrade(addrPeerId, hostname, resolvedAddress, upgradeDir) + result = await self.dialAndUpgrade(addrPeerId, hostname, resolvedAddress, upgradeDir, transportDir) if not isNil(result): return result @@ -164,7 +169,8 @@ proc internalConnect( addrs: seq[MultiAddress], forceDial: bool, reuseConnection = true, - upgradeDir = Direction.Out): + upgradeDir = Direction.Out, + transportDir = Direction.Out): Future[Muxer] {.async.} = if Opt.some(self.localPeerId) == peerId: raise newException(CatchableError, "can't dial self!") @@ -182,7 +188,7 @@ proc internalConnect( let slot = self.connManager.getOutgoingSlot(forceDial) let muxed = try: - await self.dialAndUpgrade(peerId, addrs, upgradeDir) + await self.dialAndUpgrade(peerId, addrs, upgradeDir, transportDir) except CatchableError as exc: slot.release() raise exc @@ -209,7 +215,8 @@ method connect*( addrs: seq[MultiAddress], forceDial = false, reuseConnection = true, - upgradeDir = Direction.Out) {.async.} = + upgradeDir = Direction.Out, + transportDir = Direction.Out) {.async.} = ## connect remote peer without negotiating ## a protocol ## @@ -217,7 +224,7 @@ method connect*( if self.connManager.connCount(peerId) > 0 and reuseConnection: return - discard await self.internalConnect(Opt.some(peerId), addrs, forceDial, reuseConnection, upgradeDir) + discard await self.internalConnect(Opt.some(peerId), addrs, forceDial, reuseConnection, upgradeDir, transportDir) method connect*( self: Dialer, diff --git a/libp2p/protocols/connectivity/dcutr/client.nim b/libp2p/protocols/connectivity/dcutr/client.nim index 19dda0cef7..91748ab436 100644 --- a/libp2p/protocols/connectivity/dcutr/client.nim +++ b/libp2p/protocols/connectivity/dcutr/client.nim @@ -66,7 +66,7 @@ proc startSync*(self: DcutrClient, switch: Switch, remotePeerId: PeerId, addrs: if peerDialableAddrs.len > self.maxDialableAddrs: peerDialableAddrs = peerDialableAddrs[0.. Date: Tue, 28 Nov 2023 17:04:41 +0100 Subject: [PATCH 2/5] Replace upgradeDir and transportDir by dir in the dialer --- libp2p/dial.nim | 3 +- libp2p/dialer.nim | 30 +++++++++---------- .../protocols/connectivity/dcutr/client.nim | 2 +- .../protocols/connectivity/dcutr/server.nim | 2 +- libp2p/switch.nim | 5 ++-- tests/stubs/switchstub.nim | 10 +++---- tests/testdcutr.nim | 12 +++----- tests/testhpservice.nim | 3 +- 8 files changed, 28 insertions(+), 39 deletions(-) diff --git a/libp2p/dial.nim b/libp2p/dial.nim index 5485a29b14..5d46cf5fc4 100644 --- a/libp2p/dial.nim +++ b/libp2p/dial.nim @@ -26,8 +26,7 @@ method connect*( addrs: seq[MultiAddress], forceDial = false, reuseConnection = true, - upgradeDir = Direction.Out, - transportDir = Direction.Out) {.async, base.} = + dir = Direction.Out) {.async, base.} = ## connect remote peer without negotiating ## a protocol ## diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index e0e1c4a986..4effaff5cd 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -53,8 +53,7 @@ proc dialAndUpgrade( peerId: Opt[PeerId], hostname: string, address: MultiAddress, - upgradeDir = Direction.Out, - transportDir = Direction.Out): + dir = Direction.Out): Future[Muxer] {.async.} = for transport in self.transports: # for each transport @@ -78,16 +77,18 @@ proc dialAndUpgrade( try: # This is for the very specific case of a simultaneous dial during DCUtR. In this case, both sides will have # an outbound dir at the transport level. Therefore we update the peer acting as the client to have an inbound dir. - if transportDir == Direction.In and dialed.dir == Direction.Out and upgradeDir == Direction.In: - dialed.dir = Direction.In - await transport.upgrade(dialed, upgradeDir, peerId) + # if transportDir == Direction.In and dialed.dir == Direction.Out and upgradeDir == Direction.In: + # dialed.dir = Direction.In + if dialed.dir != dir: + dialed.dir = dir + await transport.upgrade(dialed, dialed.dir, peerId) except CatchableError as exc: # If we failed to establish the connection through one transport, # we won't succeeded through another - no use in trying again await dialed.close() debug "Upgrade failed", err = exc.msg, peerId = peerId.get(default(PeerId)) if exc isnot CancelledError: - if upgradeDir == Direction.Out: + if dialed.dir == Direction.Out: libp2p_failed_upgrades_outgoing.inc() else: libp2p_failed_upgrades_incoming.inc() @@ -95,7 +96,7 @@ proc dialAndUpgrade( # Try other address return nil - doAssert not isNil(mux), "connection died after upgrade " & $upgradeDir + doAssert not isNil(mux), "connection died after upgrade " & $dialed.dir debug "Dial successful", peerId = mux.connection.peerId return mux return nil @@ -132,8 +133,7 @@ proc dialAndUpgrade( self: Dialer, peerId: Opt[PeerId], addrs: seq[MultiAddress], - upgradeDir = Direction.Out, - transportDir = Direction.Out): + dir = Direction.Out): Future[Muxer] {.async.} = debug "Dialing peer", peerId = peerId.get(default(PeerId)) @@ -151,7 +151,7 @@ proc dialAndUpgrade( else: await self.nameResolver.resolveMAddress(expandedAddress) for resolvedAddress in resolvedAddresses: - result = await self.dialAndUpgrade(addrPeerId, hostname, resolvedAddress, upgradeDir, transportDir) + result = await self.dialAndUpgrade(addrPeerId, hostname, resolvedAddress, dir) if not isNil(result): return result @@ -169,8 +169,7 @@ proc internalConnect( addrs: seq[MultiAddress], forceDial: bool, reuseConnection = true, - upgradeDir = Direction.Out, - transportDir = Direction.Out): + dir = Direction.Out): Future[Muxer] {.async.} = if Opt.some(self.localPeerId) == peerId: raise newException(CatchableError, "can't dial self!") @@ -188,7 +187,7 @@ proc internalConnect( let slot = self.connManager.getOutgoingSlot(forceDial) let muxed = try: - await self.dialAndUpgrade(peerId, addrs, upgradeDir, transportDir) + await self.dialAndUpgrade(peerId, addrs, dir) except CatchableError as exc: slot.release() raise exc @@ -215,8 +214,7 @@ method connect*( addrs: seq[MultiAddress], forceDial = false, reuseConnection = true, - upgradeDir = Direction.Out, - transportDir = Direction.Out) {.async.} = + dir = Direction.Out) {.async.} = ## connect remote peer without negotiating ## a protocol ## @@ -224,7 +222,7 @@ method connect*( if self.connManager.connCount(peerId) > 0 and reuseConnection: return - discard await self.internalConnect(Opt.some(peerId), addrs, forceDial, reuseConnection, upgradeDir, transportDir) + discard await self.internalConnect(Opt.some(peerId), addrs, forceDial, reuseConnection, dir) method connect*( self: Dialer, diff --git a/libp2p/protocols/connectivity/dcutr/client.nim b/libp2p/protocols/connectivity/dcutr/client.nim index 91748ab436..d760218020 100644 --- a/libp2p/protocols/connectivity/dcutr/client.nim +++ b/libp2p/protocols/connectivity/dcutr/client.nim @@ -66,7 +66,7 @@ proc startSync*(self: DcutrClient, switch: Switch, remotePeerId: PeerId, addrs: if peerDialableAddrs.len > self.maxDialableAddrs: peerDialableAddrs = peerDialableAddrs[0.. maxDialableAddrs: peerDialableAddrs = peerDialableAddrs[0.. Date: Tue, 28 Nov 2023 17:43:14 +0100 Subject: [PATCH 3/5] Remove direction from upgrade --- libp2p/dialer.nim | 2 +- libp2p/switch.nim | 2 +- libp2p/transports/transport.nim | 3 +-- libp2p/upgrademngrs/muxedupgrade.nim | 14 ++++++-------- libp2p/upgrademngrs/upgrade.nim | 6 ++---- 5 files changed, 11 insertions(+), 16 deletions(-) diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 4effaff5cd..e5c9c1f485 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -81,7 +81,7 @@ proc dialAndUpgrade( # dialed.dir = Direction.In if dialed.dir != dir: dialed.dir = dir - await transport.upgrade(dialed, dialed.dir, peerId) + await transport.upgrade(dialed, peerId) except CatchableError as exc: # If we failed to establish the connection through one transport, # we won't succeeded through another - no use in trying again diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 1cfd08628f..3fa361e191 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -213,7 +213,7 @@ proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil) s.peerInfo.protocols.add(proto.codec) proc upgrader(switch: Switch, trans: Transport, conn: Connection) {.async.} = - let muxed = await trans.upgrade(conn, Direction.In, Opt.none(PeerId)) + let muxed = await trans.upgrade(conn, Opt.none(PeerId)) switch.connManager.storeMuxer(muxed) await switch.peerStore.identify(muxed) trace "Connection upgrade succeeded" diff --git a/libp2p/transports/transport.nim b/libp2p/transports/transport.nim index 5c4a53503b..40b130a8cd 100644 --- a/libp2p/transports/transport.nim +++ b/libp2p/transports/transport.nim @@ -83,13 +83,12 @@ proc dial*( method upgrade*( self: Transport, conn: Connection, - direction: Direction, peerId: Opt[PeerId]): Future[Muxer] {.base, gcsafe.} = ## base upgrade method that the transport uses to perform ## transport specific upgrades ## - self.upgrader.upgrade(conn, direction, peerId) + self.upgrader.upgrade(conn, peerId) method handles*( self: Transport, diff --git a/libp2p/upgrademngrs/muxedupgrade.nim b/libp2p/upgrademngrs/muxedupgrade.nim index 05d751c94d..55a2c5dc32 100644 --- a/libp2p/upgrademngrs/muxedupgrade.nim +++ b/libp2p/upgrademngrs/muxedupgrade.nim @@ -32,8 +32,7 @@ proc getMuxerByCodec(self: MuxedUpgrade, muxerName: string): MuxerProvider = proc mux*( self: MuxedUpgrade, - conn: Connection, - direction: Direction): Future[Muxer] {.async, gcsafe.} = + conn: Connection): Future[Muxer] {.async, gcsafe.} = ## mux connection trace "Muxing connection", conn @@ -42,7 +41,7 @@ proc mux*( return let muxerName = - if direction == Out: await self.ms.select(conn, self.muxers.mapIt(it.codec)) + if conn.dir == Out: await self.ms.select(conn, self.muxers.mapIt(it.codec)) else: await MultistreamSelect.handle(conn, self.muxers.mapIt(it.codec)) if muxerName.len == 0 or muxerName == "na": @@ -62,16 +61,15 @@ proc mux*( method upgrade*( self: MuxedUpgrade, conn: Connection, - direction: Direction, peerId: Opt[PeerId]): Future[Muxer] {.async.} = - trace "Upgrading connection", conn, direction + trace "Upgrading connection", conn, direction = conn.dir - let sconn = await self.secure(conn, direction, peerId) # secure the connection + let sconn = await self.secure(conn, peerId) # secure the connection if isNil(sconn): raise newException(UpgradeFailedError, "unable to secure connection, stopping upgrade") - let muxer = await self.mux(sconn, direction) # mux it if possible + let muxer = await self.mux(sconn) # mux it if possible if muxer == nil: raise newException(UpgradeFailedError, "a muxer is required for outgoing connections") @@ -84,7 +82,7 @@ method upgrade*( raise newException(UpgradeFailedError, "Connection closed or missing peer info, stopping upgrade") - trace "Upgraded connection", conn, sconn, direction + trace "Upgraded connection", conn, sconn, direction = conn.dir return muxer proc new*( diff --git a/libp2p/upgrademngrs/upgrade.nim b/libp2p/upgrademngrs/upgrade.nim index 49e70a900d..0d663cee8b 100644 --- a/libp2p/upgrademngrs/upgrade.nim +++ b/libp2p/upgrademngrs/upgrade.nim @@ -40,20 +40,18 @@ type method upgrade*( self: Upgrade, conn: Connection, - direction: Direction, peerId: Opt[PeerId]): Future[Muxer] {.base.} = doAssert(false, "Not implemented!") proc secure*( self: Upgrade, conn: Connection, - direction: Direction, peerId: Opt[PeerId]): Future[Connection] {.async, gcsafe.} = if self.secureManagers.len <= 0: raise newException(UpgradeFailedError, "No secure managers registered!") let codec = - if direction == Out: await self.ms.select(conn, self.secureManagers.mapIt(it.codec)) + if conn.dir == Out: await self.ms.select(conn, self.secureManagers.mapIt(it.codec)) else: await MultistreamSelect.handle(conn, self.secureManagers.mapIt(it.codec)) if codec.len == 0: raise newException(UpgradeFailedError, "Unable to negotiate a secure channel!") @@ -65,4 +63,4 @@ proc secure*( # let's avoid duplicating checks but detect if it fails to do it properly doAssert(secureProtocol.len > 0) - return await secureProtocol[0].secure(conn, direction == Out, peerId) + return await secureProtocol[0].secure(conn, conn.dir == Out, peerId) From 9d0cfd555fecd45d9d26995236e6e2c74f8567d2 Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 28 Nov 2023 17:48:28 +0100 Subject: [PATCH 4/5] Remove direction from secure --- libp2p/protocols/secure/secure.nim | 3 +-- libp2p/upgrademngrs/upgrade.nim | 2 +- tests/testnoise.nim | 16 ++++++++-------- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index 96526714dd..9f1cad36e9 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -135,10 +135,9 @@ method init*(s: Secure) = method secure*(s: Secure, conn: Connection, - initiator: bool, peerId: Opt[PeerId]): Future[Connection] {.base.} = - s.handleConn(conn, initiator, peerId) + s.handleConn(conn, conn.dir == Direction.Out, peerId) method readOnce*(s: SecureConn, pbytes: pointer, diff --git a/libp2p/upgrademngrs/upgrade.nim b/libp2p/upgrademngrs/upgrade.nim index 0d663cee8b..ca4195ea6a 100644 --- a/libp2p/upgrademngrs/upgrade.nim +++ b/libp2p/upgrademngrs/upgrade.nim @@ -63,4 +63,4 @@ proc secure*( # let's avoid duplicating checks but detect if it fails to do it properly doAssert(secureProtocol.len > 0) - return await secureProtocol[0].secure(conn, conn.dir == Out, peerId) + return await secureProtocol[0].secure(conn, peerId) diff --git a/tests/testnoise.nim b/tests/testnoise.nim index f598b42029..ef05a14029 100644 --- a/tests/testnoise.nim +++ b/tests/testnoise.nim @@ -100,7 +100,7 @@ suite "Noise": proc acceptHandler() {.async.} = let conn = await transport1.accept() - let sconn = await serverNoise.secure(conn, false, Opt.none(PeerId)) + let sconn = await serverNoise.secure(conn, Opt.none(PeerId)) try: await sconn.write("Hello!") finally: @@ -115,7 +115,7 @@ suite "Noise": clientNoise = Noise.new(rng, clientPrivKey, outgoing = true) conn = await transport2.dial(transport1.addrs[0]) - let sconn = await clientNoise.secure(conn, true, Opt.some(serverInfo.peerId)) + let sconn = await clientNoise.secure(conn, Opt.some(serverInfo.peerId)) var msg = newSeq[byte](6) await sconn.readExactly(addr msg[0], 6) @@ -144,7 +144,7 @@ suite "Noise": var conn: Connection try: conn = await transport1.accept() - discard await serverNoise.secure(conn, false, Opt.none(PeerId)) + discard await serverNoise.secure(conn, Opt.none(PeerId)) except CatchableError: discard finally: @@ -160,7 +160,7 @@ suite "Noise": var sconn: Connection = nil expect(NoiseDecryptTagError): - sconn = await clientNoise.secure(conn, true, Opt.some(conn.peerId)) + sconn = await clientNoise.secure(conn, Opt.some(conn.peerId)) await conn.close() await handlerWait @@ -180,7 +180,7 @@ suite "Noise": proc acceptHandler() {.async, gcsafe.} = let conn = await transport1.accept() - let sconn = await serverNoise.secure(conn, false, Opt.none(PeerId)) + let sconn = await serverNoise.secure(conn, Opt.none(PeerId)) defer: await sconn.close() await conn.close() @@ -196,7 +196,7 @@ suite "Noise": clientInfo = PeerInfo.new(clientPrivKey, transport1.addrs) clientNoise = Noise.new(rng, clientPrivKey, outgoing = true) conn = await transport2.dial(transport1.addrs[0]) - let sconn = await clientNoise.secure(conn, true, Opt.some(serverInfo.peerId)) + let sconn = await clientNoise.secure(conn, Opt.some(serverInfo.peerId)) await sconn.write("Hello!") await acceptFut @@ -223,7 +223,7 @@ suite "Noise": proc acceptHandler() {.async, gcsafe.} = let conn = await transport1.accept() - let sconn = await serverNoise.secure(conn, false, Opt.none(PeerId)) + let sconn = await serverNoise.secure(conn, Opt.none(PeerId)) defer: await sconn.close() let msg = await sconn.readLp(1024*1024) @@ -237,7 +237,7 @@ suite "Noise": clientInfo = PeerInfo.new(clientPrivKey, transport1.addrs) clientNoise = Noise.new(rng, clientPrivKey, outgoing = true) conn = await transport2.dial(transport1.addrs[0]) - let sconn = await clientNoise.secure(conn, true, Opt.some(serverInfo.peerId)) + let sconn = await clientNoise.secure(conn, Opt.some(serverInfo.peerId)) await sconn.writeLp(hugePayload) await readTask From 9d0b90ca74735a4c480bc2b8137c60f04a6bfacc Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 29 Nov 2023 13:46:57 +0100 Subject: [PATCH 5/5] Improve comment --- libp2p/dialer.nim | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index e5c9c1f485..986f4e3701 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -76,9 +76,8 @@ proc dialAndUpgrade( let mux = try: # This is for the very specific case of a simultaneous dial during DCUtR. In this case, both sides will have - # an outbound dir at the transport level. Therefore we update the peer acting as the client to have an inbound dir. - # if transportDir == Direction.In and dialed.dir == Direction.Out and upgradeDir == Direction.In: - # dialed.dir = Direction.In + # an Outbound direction at the transport level. Therefore we update the DCUtR initiator transport direction to Inbound. + # The if below is more general and might handle other use cases in the future. if dialed.dir != dir: dialed.dir = dir await transport.upgrade(dialed, peerId)