Skip to content

Commit 5541298

Browse files
committed
quic: add packet pacer
The pacer rate-limits the transmission of packets to avoid creating bursts that may cause short-term congestion or loss. See RFC 9002, Section 7.7. For golang/go#58547 Change-Id: I75285c194a1048f988e4d5a829602d199829669d Reviewed-on: https://go-review.googlesource.com/c/net/+/499287 Run-TryBot: Damien Neil <dneil@google.com> TryBot-Result: Gopher Robot <gobot@golang.org> Reviewed-by: Jonathan Amsterdam <jba@google.com>
1 parent 88a50b6 commit 5541298

2 files changed

Lines changed: 355 additions & 0 deletions

File tree

internal/quic/pacer.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Copyright 2023 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
//go:build go1.21
6+
7+
package quic
8+
9+
import (
10+
"time"
11+
)
12+
13+
// A pacerState controls the rate at which packets are sent using a leaky-bucket rate limiter.
14+
//
15+
// The pacer limits the maximum size of a burst of packets.
16+
// When a burst exceeds this limit, it spreads subsequent packets
17+
// over time.
18+
//
19+
// The bucket is initialized to the maximum burst size (ten packets by default),
20+
// and fills at the rate:
21+
//
22+
// 1.25 * congestion_window / smoothed_rtt
23+
//
24+
// A sender can send one congestion window of packets per RTT,
25+
// since the congestion window consumed by each packet is returned
26+
// one round-trip later by the responding ack.
27+
// The pacer permits sending at slightly faster than this rate to
28+
// avoid underutilizing the congestion window.
29+
//
30+
// The pacer permits the bucket to become negative, and permits
31+
// sending when non-negative. This biases slightly in favor of
32+
// sending packets over limiting them, and permits bursts one
33+
// packet greater than the configured maximum, but permits the pacer
34+
// to be ignorant of the maximum packet size.
35+
//
36+
// https://www.rfc-editor.org/rfc/rfc9002.html#section-7.7
37+
type pacerState struct {
38+
bucket int // measured in bytes
39+
maxBucket int
40+
timerGranularity time.Duration
41+
lastUpdate time.Time
42+
nextSend time.Time
43+
}
44+
45+
func (p *pacerState) init(now time.Time, maxBurst int, timerGranularity time.Duration) {
46+
// Bucket is limited to maximum burst size, which is the initial congestion window.
47+
// https://www.rfc-editor.org/rfc/rfc9002#section-7.7-2
48+
p.maxBucket = maxBurst
49+
p.bucket = p.maxBucket
50+
p.timerGranularity = timerGranularity
51+
p.lastUpdate = now
52+
p.nextSend = now
53+
}
54+
55+
// pacerBytesForInterval returns the number of bytes permitted over an interval.
56+
//
57+
// rate = 1.25 * congestion_window / smoothed_rtt
58+
// bytes = interval * rate
59+
//
60+
// https://www.rfc-editor.org/rfc/rfc9002#section-7.7-6
61+
func pacerBytesForInterval(interval time.Duration, congestionWindow int, rtt time.Duration) int {
62+
bytes := (int64(interval) * int64(congestionWindow)) / int64(rtt)
63+
bytes = (bytes * 5) / 4 // bytes *= 1.25
64+
return int(bytes)
65+
}
66+
67+
// pacerIntervalForBytes returns the amount of time required for a number of bytes.
68+
//
69+
// time_per_byte = (smoothed_rtt / congestion_window) / 1.25
70+
// interval = time_per_byte * bytes
71+
//
72+
// https://www.rfc-editor.org/rfc/rfc9002#section-7.7-8
73+
func pacerIntervalForBytes(bytes int, congestionWindow int, rtt time.Duration) time.Duration {
74+
interval := (int64(rtt) * int64(bytes)) / int64(congestionWindow)
75+
interval = (interval * 4) / 5 // interval /= 1.25
76+
return time.Duration(interval)
77+
}
78+
79+
// advance is called when time passes.
80+
func (p *pacerState) advance(now time.Time, congestionWindow int, rtt time.Duration) {
81+
elapsed := now.Sub(p.lastUpdate)
82+
if elapsed < 0 {
83+
// Time has gone backward?
84+
elapsed = 0
85+
p.nextSend = now // allow a packet through to get back on track
86+
if p.bucket < 0 {
87+
p.bucket = 0
88+
}
89+
}
90+
p.lastUpdate = now
91+
if rtt == 0 {
92+
// Avoid divide by zero in the implausible case that we measure no RTT.
93+
p.bucket = p.maxBucket
94+
return
95+
}
96+
// Refill the bucket.
97+
delta := pacerBytesForInterval(elapsed, congestionWindow, rtt)
98+
p.bucket = min(p.bucket+delta, p.maxBucket)
99+
}
100+
101+
// packetSent is called to record transmission of a packet.
102+
func (p *pacerState) packetSent(now time.Time, size, congestionWindow int, rtt time.Duration) {
103+
p.bucket -= size
104+
if p.bucket < -congestionWindow {
105+
// Never allow the bucket to fall more than one congestion window in arrears.
106+
// We can only fall this far behind if the sender is sending unpaced packets,
107+
// the congestion window has been exceeded, or the RTT is less than the
108+
// timer granularity.
109+
//
110+
// Limiting the minimum bucket size limits the maximum pacer delay
111+
// to RTT/1.25.
112+
p.bucket = -congestionWindow
113+
}
114+
if p.bucket >= 0 {
115+
p.nextSend = now
116+
return
117+
}
118+
// Next send occurs when the bucket has refilled to 0.
119+
delay := pacerIntervalForBytes(-p.bucket, congestionWindow, rtt)
120+
p.nextSend = now.Add(delay)
121+
}
122+
123+
// canSend reports whether a packet can be sent now.
124+
// If it returns false, next is the time when the next packet can be sent.
125+
func (p *pacerState) canSend(now time.Time) (canSend bool, next time.Time) {
126+
// If the next send time is within the timer granularity, send immediately.
127+
if p.nextSend.After(now.Add(p.timerGranularity)) {
128+
return false, p.nextSend
129+
}
130+
return true, time.Time{}
131+
}

internal/quic/pacer_test.go

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
// Copyright 2023 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
//go:build go1.21
6+
7+
package quic
8+
9+
import (
10+
"testing"
11+
"time"
12+
)
13+
14+
func TestPacerStartup(t *testing.T) {
15+
p := &pacerTest{
16+
cwnd: 10000,
17+
rtt: 100 * time.Millisecond,
18+
timerGranularity: 1 * time.Millisecond,
19+
}
20+
p.init(t)
21+
t.Logf("# initial burst permits sending ten packets")
22+
for i := 0; i < 10; i++ {
23+
p.sendPacket(1000)
24+
}
25+
26+
t.Logf("# empty bucket allows for one more packet")
27+
p.sendPacket(1000)
28+
29+
t.Logf("# sending 1000 byte packets with 8ms interval:")
30+
t.Logf("# (smoothed_rtt * packet_size / congestion_window) / 1.25")
31+
t.Logf("# (100ms * 1000 / 10000) / 1.25 = 8ms")
32+
p.wantSendDelay(8 * time.Millisecond)
33+
p.advance(8 * time.Millisecond)
34+
p.sendPacket(1000)
35+
p.wantSendDelay(8 * time.Millisecond)
36+
37+
t.Logf("# accumulate enough window for two packets")
38+
p.advance(16 * time.Millisecond)
39+
p.sendPacket(1000)
40+
p.sendPacket(1000)
41+
p.wantSendDelay(8 * time.Millisecond)
42+
43+
t.Logf("# window does not grow to more than burst limit")
44+
p.advance(1 * time.Second)
45+
for i := 0; i < 11; i++ {
46+
p.sendPacket(1000)
47+
}
48+
p.wantSendDelay(8 * time.Millisecond)
49+
}
50+
51+
func TestPacerTimerGranularity(t *testing.T) {
52+
p := &pacerTest{
53+
cwnd: 10000,
54+
rtt: 100 * time.Millisecond,
55+
timerGranularity: 1 * time.Millisecond,
56+
}
57+
p.init(t)
58+
t.Logf("# consume initial burst")
59+
for i := 0; i < 11; i++ {
60+
p.sendPacket(1000)
61+
}
62+
p.wantSendDelay(8 * time.Millisecond)
63+
64+
t.Logf("# small advance in time does not permit sending")
65+
p.advance(4 * time.Millisecond)
66+
p.wantSendDelay(4 * time.Millisecond)
67+
68+
t.Logf("# advancing to within timerGranularity of next send permits send")
69+
p.advance(3 * time.Millisecond)
70+
p.wantSendDelay(0)
71+
72+
t.Logf("# early send adds skipped delay (1ms) to next send (8ms)")
73+
p.sendPacket(1000)
74+
p.wantSendDelay(9 * time.Millisecond)
75+
}
76+
77+
func TestPacerChangingRate(t *testing.T) {
78+
p := &pacerTest{
79+
cwnd: 10000,
80+
rtt: 100 * time.Millisecond,
81+
timerGranularity: 0,
82+
}
83+
p.init(t)
84+
t.Logf("# consume initial burst")
85+
for i := 0; i < 11; i++ {
86+
p.sendPacket(1000)
87+
}
88+
p.wantSendDelay(8 * time.Millisecond)
89+
p.advance(8 * time.Millisecond)
90+
91+
t.Logf("# set congestion window to 20000, 1000 byte interval is 4ms")
92+
p.cwnd = 20000
93+
p.sendPacket(1000)
94+
p.wantSendDelay(4 * time.Millisecond)
95+
p.advance(4 * time.Millisecond)
96+
97+
t.Logf("# set rtt to 200ms, 1000 byte interval is 8ms")
98+
p.rtt = 200 * time.Millisecond
99+
p.sendPacket(1000)
100+
p.wantSendDelay(8 * time.Millisecond)
101+
p.advance(8 * time.Millisecond)
102+
103+
t.Logf("# set congestion window to 40000, 1000 byte interval is 4ms")
104+
p.cwnd = 40000
105+
p.advance(8 * time.Millisecond)
106+
p.sendPacket(1000)
107+
p.sendPacket(1000)
108+
p.sendPacket(1000)
109+
p.wantSendDelay(4 * time.Millisecond)
110+
}
111+
112+
func TestPacerTimeReverses(t *testing.T) {
113+
p := &pacerTest{
114+
cwnd: 10000,
115+
rtt: 100 * time.Millisecond,
116+
timerGranularity: 0,
117+
}
118+
p.init(t)
119+
t.Logf("# consume initial burst")
120+
for i := 0; i < 11; i++ {
121+
p.sendPacket(1000)
122+
}
123+
p.wantSendDelay(8 * time.Millisecond)
124+
t.Logf("# reverse time")
125+
p.advance(-4 * time.Millisecond)
126+
p.sendPacket(1000)
127+
p.wantSendDelay(8 * time.Millisecond)
128+
p.advance(8 * time.Millisecond)
129+
p.sendPacket(1000)
130+
p.wantSendDelay(8 * time.Millisecond)
131+
}
132+
133+
func TestPacerZeroRTT(t *testing.T) {
134+
p := &pacerTest{
135+
cwnd: 10000,
136+
rtt: 0,
137+
timerGranularity: 0,
138+
}
139+
p.init(t)
140+
t.Logf("# with rtt 0, the pacer does not limit sending")
141+
for i := 0; i < 20; i++ {
142+
p.sendPacket(1000)
143+
}
144+
p.advance(1 * time.Second)
145+
for i := 0; i < 20; i++ {
146+
p.sendPacket(1000)
147+
}
148+
}
149+
150+
func TestPacerZeroCongestionWindow(t *testing.T) {
151+
p := &pacerTest{
152+
cwnd: 10000,
153+
rtt: 100 * time.Millisecond,
154+
timerGranularity: 0,
155+
}
156+
p.init(t)
157+
p.cwnd = 0
158+
t.Logf("# with cwnd 0, the pacer does not limit sending")
159+
for i := 0; i < 20; i++ {
160+
p.sendPacket(1000)
161+
}
162+
}
163+
164+
type pacerTest struct {
165+
t *testing.T
166+
p pacerState
167+
timerGranularity time.Duration
168+
cwnd int
169+
rtt time.Duration
170+
now time.Time
171+
}
172+
173+
func newPacerTest(t *testing.T, congestionWindow int, rtt time.Duration) *pacerTest {
174+
p := &pacerTest{
175+
now: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC),
176+
cwnd: congestionWindow,
177+
rtt: rtt,
178+
}
179+
p.p.init(p.now, congestionWindow, p.timerGranularity)
180+
return p
181+
}
182+
183+
func (p *pacerTest) init(t *testing.T) {
184+
p.t = t
185+
p.now = time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
186+
p.p.init(p.now, p.cwnd, p.timerGranularity)
187+
t.Logf("# initial congestion window: %v", p.cwnd)
188+
t.Logf("# timer granularity: %v", p.timerGranularity)
189+
}
190+
191+
func (p *pacerTest) advance(d time.Duration) {
192+
p.t.Logf("advance time %v", d)
193+
p.now = p.now.Add(d)
194+
p.p.advance(p.now, p.cwnd, p.rtt)
195+
}
196+
197+
func (p *pacerTest) sendPacket(size int) {
198+
if canSend, next := p.p.canSend(p.now); !canSend {
199+
p.t.Fatalf("ERROR: pacer unexpectedly blocked send, delay=%v", next.Sub(p.now))
200+
}
201+
p.t.Logf("send packet of size %v", size)
202+
p.p.packetSent(p.now, size, p.cwnd, p.rtt)
203+
}
204+
205+
func (p *pacerTest) wantSendDelay(want time.Duration) {
206+
wantCanSend := want == 0
207+
gotCanSend, next := p.p.canSend(p.now)
208+
var got time.Duration
209+
if !gotCanSend {
210+
got = next.Sub(p.now)
211+
}
212+
p.t.Logf("# pacer send delay: %v", got)
213+
if got != want || gotCanSend != wantCanSend {
214+
p.t.Fatalf("ERROR: pacer send delay = %v (can send: %v); want %v, %v", got, gotCanSend, want, wantCanSend)
215+
}
216+
}
217+
218+
func (p *pacerTest) sendDelay() time.Duration {
219+
canSend, next := p.p.canSend(p.now)
220+
if canSend {
221+
return 0
222+
}
223+
return next.Sub(p.now)
224+
}

0 commit comments

Comments
 (0)