Skip to content

Commit 43a44ec

Browse files
webtransport: use the rcmgr to control flow control window increases
1 parent fd82f74 commit 43a44ec

File tree

6 files changed

+187
-16
lines changed

6 files changed

+187
-16
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ require (
3333
github.com/libp2p/zeroconf/v2 v2.2.0
3434
github.com/lucas-clemente/quic-go v0.30.0
3535
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd
36-
github.com/marten-seemann/webtransport-go v0.1.1
36+
github.com/marten-seemann/webtransport-go v0.2.0
3737
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b
3838
github.com/minio/sha256-simd v1.0.0
3939
github.com/mr-tron/base58 v1.2.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,8 @@ github.com/marten-seemann/qtls-go1-19 v0.1.1 h1:mnbxeq3oEyQxQXwI4ReCgW9DPoPR94sN
333333
github.com/marten-seemann/qtls-go1-19 v0.1.1/go.mod h1:5HTDWtVudo/WFsHKRNuOhWlbdjrfs5JHrYb0wIJqGpI=
334334
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd h1:br0buuQ854V8u83wA0rVZ8ttrq5CpaPZdvrK0LP2lOk=
335335
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs1Nt24+FYQEqAAncTDPJIuGs+LxK1MCiFL25pMU=
336-
github.com/marten-seemann/webtransport-go v0.1.1 h1:TnyKp3pEXcDooTaNn4s9dYpMJ7kMnTp7k5h+SgYP/mc=
337-
github.com/marten-seemann/webtransport-go v0.1.1/go.mod h1:kBEh5+RSvOA4troP1vyOVBWK4MIMzDICXVrvCPrYcrM=
336+
github.com/marten-seemann/webtransport-go v0.2.0 h1:987jPVqcyE3vF+CHNIxDhT0P21O+bI4fVF+0NoRujSo=
337+
github.com/marten-seemann/webtransport-go v0.2.0/go.mod h1:XmnWYsWXaxUF7kjeIIzLWPyS+q0OcBY5vA64NuyK0ps=
338338
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
339339
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
340340
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=

p2p/transport/webtransport/conn.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@ func (c *connMultiaddrs) RemoteMultiaddr() ma.Multiaddr { return c.remote }
2727
type conn struct {
2828
*connSecurityMultiaddrs
2929

30-
transport tpt.Transport
30+
transport *transport
3131
session *webtransport.Session
3232

3333
scope network.ConnScope
3434
}
3535

3636
var _ tpt.CapableConn = &conn{}
3737

38-
func newConn(tr tpt.Transport, sess *webtransport.Session, sconn *connSecurityMultiaddrs, scope network.ConnScope) *conn {
38+
func newConn(tr *transport, sess *webtransport.Session, sconn *connSecurityMultiaddrs, scope network.ConnScope) *conn {
3939
return &conn{
4040
connSecurityMultiaddrs: sconn,
4141
transport: tr,
@@ -60,7 +60,18 @@ func (c *conn) AcceptStream() (network.MuxedStream, error) {
6060
return &stream{str}, nil
6161
}
6262

63-
func (c *conn) Close() error { return c.session.Close() }
63+
func (c *conn) allowWindowIncrease(size uint64) bool {
64+
return c.scope.ReserveMemory(int(size), network.ReservationPriorityMedium) == nil
65+
}
66+
67+
// Close closes the connection.
68+
// It must be called even if the peer closed the connection in order for
69+
// garbage collection to properly work in this package.
70+
func (c *conn) Close() error {
71+
c.transport.removeConn(c.session)
72+
return c.session.Close()
73+
}
74+
6475
func (c *conn) IsClosed() bool { return c.session.Context().Err() != nil }
6576
func (c *conn) Scope() network.ConnScope { return c.scope }
6677
func (c *conn) Transport() tpt.Transport { return c.transport }

p2p/transport/webtransport/listener.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,10 @@ func (l *listener) httpHandler(w http.ResponseWriter, r *http.Request) {
157157
return
158158
}
159159

160+
conn := newConn(l.transport, sess, sconn, connScope)
161+
l.transport.addConn(sess, conn)
160162
select {
161-
case l.queue <- newConn(l.transport, sess, sconn, connScope):
163+
case l.queue <- conn:
162164
default:
163165
log.Debugw("accept queue full, dropping incoming connection", "peer", sconn.RemotePeer(), "addr", r.RemoteAddr, "error", err)
164166
sess.Close()

p2p/transport/webtransport/transport.go

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ type transport struct {
8282
tlsClientConf *tls.Config
8383

8484
noise *noise.Transport
85+
86+
connMx sync.Mutex
87+
conns map[uint64]*conn // using quic-go's ConnectionTracingKey as map key
8588
}
8689

8790
var _ tpt.Transport = &transport{}
@@ -94,13 +97,14 @@ func New(key ic.PrivKey, gater connmgr.ConnectionGater, rcmgr network.ResourceMa
9497
return nil, err
9598
}
9699
t := &transport{
97-
pid: id,
98-
privKey: key,
99-
rcmgr: rcmgr,
100-
gater: gater,
101-
clock: clock.New(),
102-
quicConfig: &quic.Config{},
103-
}
100+
pid: id,
101+
privKey: key,
102+
rcmgr: rcmgr,
103+
gater: gater,
104+
clock: clock.New(),
105+
conns: map[uint64]*conn{},
106+
}
107+
t.quicConfig = &quic.Config{AllowConnectionWindowIncrease: t.allowWindowIncrease}
104108
for _, opt := range opts {
105109
if err := opt(t); err != nil {
106110
return nil, err
@@ -157,8 +161,9 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp
157161
scope.Done()
158162
return nil, fmt.Errorf("secured connection gated")
159163
}
160-
161-
return newConn(t, sess, sconn, scope), nil
164+
conn := newConn(t, sess, sconn, scope)
165+
t.addConn(sess, conn)
166+
return conn, nil
162167
}
163168

164169
func (t *transport) dial(ctx context.Context, addr string, sni string, certHashes []multihash.DecodedMultihash) (*webtransport.Session, error) {
@@ -313,6 +318,29 @@ func (t *transport) Close() error {
313318
return nil
314319
}
315320

321+
func (t *transport) allowWindowIncrease(conn quic.Connection, size uint64) bool {
322+
t.connMx.Lock()
323+
defer t.connMx.Unlock()
324+
325+
c, ok := t.conns[conn.Context().Value(quic.ConnectionTracingKey).(uint64)]
326+
if !ok {
327+
return false
328+
}
329+
return c.allowWindowIncrease(size)
330+
}
331+
332+
func (t *transport) addConn(sess *webtransport.Session, c *conn) {
333+
t.connMx.Lock()
334+
t.conns[sess.Context().Value(quic.ConnectionTracingKey).(uint64)] = c
335+
t.connMx.Unlock()
336+
}
337+
338+
func (t *transport) removeConn(sess *webtransport.Session) {
339+
t.connMx.Lock()
340+
delete(t.conns, sess.Context().Value(quic.ConnectionTracingKey).(uint64))
341+
t.connMx.Unlock()
342+
}
343+
316344
// extractSNI returns what the SNI should be for the given maddr. If there is an
317345
// SNI component in the multiaddr, then it will be returned and
318346
// foundSniComponent will be true. If there's no SNI component, but there is a

p2p/transport/webtransport/transport_test.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package libp2pwebtransport_test
22

33
import (
4+
"bytes"
45
"context"
56
"crypto/ecdsa"
67
"crypto/elliptic"
@@ -14,7 +15,10 @@ import (
1415
"io"
1516
"math/big"
1617
"net"
18+
"os"
19+
"runtime"
1720
"strings"
21+
"sync/atomic"
1822
"testing"
1923
"time"
2024

@@ -26,6 +30,7 @@ import (
2630
libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
2731

2832
"github.com/golang/mock/gomock"
33+
quicproxy "github.com/lucas-clemente/quic-go/integrationtests/tools/proxy"
2934
ma "github.com/multiformats/go-multiaddr"
3035
manet "github.com/multiformats/go-multiaddr/net"
3136
"github.com/multiformats/go-multibase"
@@ -582,5 +587,130 @@ func TestSNIIsSent(t *testing.T) {
582587
case <-time.After(time.Minute):
583588
t.Fatalf("Expected to get server name")
584589
}
590+
}
591+
592+
type reportingRcmgr struct {
593+
network.NullResourceManager
594+
report chan<- int
595+
}
596+
597+
func (m *reportingRcmgr) OpenConnection(dir network.Direction, usefd bool, endpoint ma.Multiaddr) (network.ConnManagementScope, error) {
598+
return &reportingScope{report: m.report}, nil
599+
}
600+
601+
type reportingScope struct {
602+
network.NullScope
603+
report chan<- int
604+
}
605+
606+
func (s *reportingScope) ReserveMemory(size int, _ uint8) error {
607+
s.report <- size
608+
return nil
609+
}
610+
611+
func TestFlowControlWindowIncrease(t *testing.T) {
612+
if runtime.GOOS == "windows" {
613+
t.Skip("this test is flaky on Windows")
614+
}
615+
616+
rtt := 10 * time.Millisecond
617+
timeout := 5 * time.Second
618+
619+
if os.Getenv("CI") != "" {
620+
rtt = 40 * time.Millisecond
621+
timeout = 15 * time.Second
622+
}
623+
624+
serverID, serverKey := newIdentity(t)
625+
serverWindowIncreases := make(chan int, 100)
626+
serverRcmgr := &reportingRcmgr{report: serverWindowIncreases}
627+
tr, err := libp2pwebtransport.New(serverKey, nil, serverRcmgr)
628+
require.NoError(t, err)
629+
defer tr.(io.Closer).Close()
630+
ln, err := tr.Listen(ma.StringCast("/ip4/127.0.0.1/udp/0/quic/webtransport"))
631+
require.NoError(t, err)
632+
defer ln.Close()
633+
634+
go func() {
635+
conn, err := ln.Accept()
636+
require.NoError(t, err)
637+
str, err := conn.AcceptStream()
638+
require.NoError(t, err)
639+
_, err = io.CopyBuffer(str, str, make([]byte, 2<<10))
640+
require.NoError(t, err)
641+
str.CloseWrite()
642+
}()
643+
644+
proxy, err := quicproxy.NewQuicProxy("localhost:0", &quicproxy.Opts{
645+
RemoteAddr: ln.Addr().String(),
646+
DelayPacket: func(quicproxy.Direction, []byte) time.Duration { return rtt / 2 },
647+
})
648+
require.NoError(t, err)
649+
defer proxy.Close()
585650

651+
_, clientKey := newIdentity(t)
652+
clientWindowIncreases := make(chan int, 100)
653+
clientRcmgr := &reportingRcmgr{report: clientWindowIncreases}
654+
tr2, err := libp2pwebtransport.New(clientKey, nil, clientRcmgr)
655+
require.NoError(t, err)
656+
defer tr2.(io.Closer).Close()
657+
658+
var addr ma.Multiaddr
659+
for _, comp := range ma.Split(ln.Multiaddr()) {
660+
if _, err := comp.ValueForProtocol(ma.P_UDP); err == nil {
661+
addr = addr.Encapsulate(ma.StringCast(fmt.Sprintf("/udp/%d", proxy.LocalPort())))
662+
continue
663+
}
664+
if addr == nil {
665+
addr = comp
666+
continue
667+
}
668+
addr = addr.Encapsulate(comp)
669+
}
670+
671+
conn, err := tr2.Dial(context.Background(), addr, serverID)
672+
require.NoError(t, err)
673+
str, err := conn.OpenStream(context.Background())
674+
require.NoError(t, err)
675+
var increasesDone uint32 // to be used atomically
676+
go func() {
677+
for {
678+
_, err := str.Write(bytes.Repeat([]byte{0x42}, 1<<10))
679+
require.NoError(t, err)
680+
if atomic.LoadUint32(&increasesDone) > 0 {
681+
str.CloseWrite()
682+
return
683+
}
684+
}
685+
}()
686+
done := make(chan struct{})
687+
go func() {
688+
defer close(done)
689+
_, err := io.ReadAll(str)
690+
require.NoError(t, err)
691+
}()
692+
693+
var numServerIncreases, numClientIncreases int
694+
timer := time.NewTimer(timeout)
695+
defer timer.Stop()
696+
for {
697+
select {
698+
case <-serverWindowIncreases:
699+
numServerIncreases++
700+
case <-clientWindowIncreases:
701+
numClientIncreases++
702+
case <-timer.C:
703+
t.Fatalf("didn't receive enough window increases (client: %d, server: %d)", numClientIncreases, numServerIncreases)
704+
}
705+
if numClientIncreases >= 1 && numServerIncreases >= 1 {
706+
atomic.AddUint32(&increasesDone, 1)
707+
break
708+
}
709+
}
710+
711+
select {
712+
case <-done:
713+
case <-time.After(timeout):
714+
t.Fatal("timeout")
715+
}
586716
}

0 commit comments

Comments
 (0)