Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libp2p/builders.nim
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ proc withMplex*(
b.muxers.add(MuxerProvider.new(newMuxer, MplexCodec))
b

proc withYamux*(b: SwitchBuilder): SwitchBuilder =
proc newMuxer(conn: Connection): Muxer = Yamux.new(conn)
proc withYamux*(b: SwitchBuilder, windowSize: int = DefaultWindowSize): SwitchBuilder =
proc newMuxer(conn: Connection): Muxer = Yamux.new(conn, windowSize)

assert b.muxers.countIt(it.codec == YamuxCodec) == 0, "Yamux build multiple times"
b.muxers.add(MuxerProvider.new(newMuxer, YamuxCodec))
Expand Down
28 changes: 19 additions & 9 deletions libp2p/muxers/yamux/yamux.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ logScope:
const
YamuxCodec* = "/yamux/1.0.0"
YamuxVersion = 0.uint8
DefaultWindowSize = 256000
DefaultWindowSize* = 256000
MaxChannelCount = 200

when defined(libp2p_yamux_metrics):
Expand Down Expand Up @@ -351,7 +351,13 @@ proc open*(channel: YamuxChannel) {.async, gcsafe.} =
trace "Try to open channel twice"
return
channel.opened = true
await channel.conn.write(YamuxHeader.data(channel.id, 0, {if channel.isSrc: Syn else: Ack}))
let delta =
if channel.maxRecvWindow < DefaultWindowSize: 0'u32
else: channel.maxRecvWindow.uint32 - DefaultWindowSize
await channel.conn.write(YamuxHeader.windowUpdate(
channel.id,
delta,
{if channel.isSrc: Syn else: Ack}))

method getWrapped*(channel: YamuxChannel): Connection = channel.conn

Expand All @@ -362,6 +368,7 @@ type
currentId: uint32
isClosed: bool
maxChannCount: int
windowSize: int

proc lenBySrc(m: Yamux, isSrc: bool): int =
for v in m.channels.values():
Expand All @@ -375,11 +382,11 @@ proc cleanupChann(m: Yamux, channel: YamuxChannel) {.async.} =
if channel.isReset and channel.recvWindow > 0:
m.flushed[channel.id] = channel.recvWindow

proc createStream(m: Yamux, id: uint32, isSrc: bool): YamuxChannel =
proc createStream(m: Yamux, id: uint32, isSrc: bool, recvWindow: int): YamuxChannel =
result = YamuxChannel(
id: id,
maxRecvWindow: DefaultWindowSize,
recvWindow: DefaultWindowSize,
maxRecvWindow: recvWindow,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can maxRecvWindow be smaller than recvWindow?

Copy link
Copy Markdown
Contributor Author

@lchenut lchenut Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because maxRecvWindow could be, for example, 64k but the peer you're connected to will always assume your recvWindow is 256k (or larger if you enlarge it during the SYN/ACK). In this case, the recvWindow will be larger than maxRecvWindow until it receives 192k. Then it'll not update to something larger than 64k.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec does not really exclude the case that the desired recvWindow size is below 256.
So, you could set maxRecvWindow to something lower than 256, which would be lower than the initial recvWindow size of 256K.
Once the recvWindow falls below maxRecvWindow, it will only ever grow back to maxRecvWindow.

That being said, we should either

  1. explain this fact as a comment in the code
  2. or exclude that case and force maxRecvWindow to be at least 256k (== recvWindow).

I'd prefer 2), but I do not have a strong opinion here.

@diegomrsantos @lchenut wdyt?
Once we have consensus here, we can merge this PR :).

Copy link
Copy Markdown
Contributor

@diegomrsantos diegomrsantos Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My interpretation is that other peers will always assume your initial recvWindow is 256k. The point seems to be if we find it valuable to allow its max size to be lower than this. This IMHO is non-intuitive and requires a comment in the code, at least. In general, I haven't seen convincing practical evidence about why we should do it, but I don't have a strong opinion as well.

recvWindow: if recvWindow > DefaultWindowSize: recvWindow else: DefaultWindowSize,
sendWindow: DefaultWindowSize,
isSrc: isSrc,
conn: m.connection,
Expand Down Expand Up @@ -455,7 +462,7 @@ method handle*(m: Yamux) {.async, gcsafe.} =
m.flushed.del(header.streamId)
if header.streamId mod 2 == m.currentId mod 2:
raise newException(YamuxError, "Peer used our reserved stream id")
let newStream = m.createStream(header.streamId, false)
let newStream = m.createStream(header.streamId, false, m.windowSize)
if m.channels.len >= m.maxChannCount:
await newStream.reset()
continue
Expand Down Expand Up @@ -515,15 +522,18 @@ method newStream*(

if m.channels.len > m.maxChannCount - 1:
raise newException(TooManyChannels, "max allowed channel count exceeded")
let stream = m.createStream(m.currentId, true)
let stream = m.createStream(m.currentId, true, m.windowSize)
m.currentId += 2
if not lazy:
await stream.open()
return stream

proc new*(T: type[Yamux], conn: Connection, maxChannCount: int = MaxChannelCount): T =
proc new*(T: type[Yamux], conn: Connection,
maxChannCount: int = MaxChannelCount,
windowSize: int = DefaultWindowSize): T =
T(
connection: conn,
currentId: if conn.dir == Out: 1 else: 2,
maxChannCount: maxChannCount
maxChannCount: maxChannCount,
windowSize: windowSize
)
62 changes: 60 additions & 2 deletions tests/testyamux.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ suite "Yamux":
teardown:
checkTrackers()

template mSetup {.inject.} =
template mSetup(ws: int = DefaultWindowSize) {.inject.} =
#TODO in a template to avoid threadvar
let
(conna {.inject.}, connb {.inject.}) = bridgedConnections()
(yamuxa {.inject.}, yamuxb {.inject.}) = (Yamux.new(conna), Yamux.new(connb))
yamuxa {.inject.} = Yamux.new(conna, windowSize = ws)
yamuxb {.inject.} = Yamux.new(connb, windowSize = ws)
(handlera, handlerb) = (yamuxa.handle(), yamuxb.handle())

defer:
Expand Down Expand Up @@ -179,6 +180,63 @@ suite "Yamux":
writerBlocker.complete()
await streamA.close()

asyncTest "Increase window size":
mSetup(512000)
let readerBlocker = newFuture[void]()
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
await readerBlocker
var buffer: array[260000, byte]
discard await conn.readOnce(addr buffer[0], 260000)
await conn.close()

let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]

await wait(streamA.write(newSeq[byte](512000)), 1.seconds) # shouldn't block

let secondWriter = streamA.write(newSeq[byte](10000))
await sleepAsync(10.milliseconds)
check: not secondWriter.finished()

readerBlocker.complete()
await wait(secondWriter, 1.seconds)

await streamA.close()

asyncTest "Reduce window size":
mSetup(64000)
let readerBlocker1 = newFuture[void]()
let readerBlocker2 = newFuture[void]()
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
await readerBlocker1
var buffer: array[256000, byte]
# For the first roundtrip, the send window size is assumed to be 256k
discard await conn.readOnce(addr buffer[0], 256000)
await readerBlocker2
discard await conn.readOnce(addr buffer[0], 40000)

await conn.close()

let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]

await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block

let secondWriter = streamA.write(newSeq[byte](64000))
await sleepAsync(10.milliseconds)
check: not secondWriter.finished()

readerBlocker1.complete()
await wait(secondWriter, 1.seconds)

let thirdWriter = streamA.write(newSeq[byte](10))
await sleepAsync(10.milliseconds)
check: not thirdWriter.finished()

readerBlocker2.complete()
await wait(thirdWriter, 1.seconds)
await streamA.close()

suite "Exception testing":
asyncTest "Local & Remote close":
mSetup()
Expand Down