Skip to content

Commit c16c8ec

Browse files
twmbclaude
andcommitted
kgo: add backpressure to batchPromises ring buffer
The batchPromises ring buffer in the producer could grow unboundedly. When records fail faster than finishPromises() can process them (e.g., spin-loop producing failing records), the ring would grow without bound, potentially causing OOM within the client. This commit rewrites the ring as a single dynamically-sized circular buffer with optional max length blocking: - Buffer starts at capacity 8, grows by doubling when full - Shrinks back to capacity 8 when at 4 or fewer elements - Adds initMaxLen() to block pushes when at capacity - Initializes batchPromises with max(maxBufferedRecords, 8192) limit The blocking pushes back-pressure to the application rather than OOMing within the client. Closes #1194 Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 5f30524 commit c16c8ec

File tree

3 files changed

+212
-89
lines changed

3 files changed

+212
-89
lines changed

pkg/kgo/producer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ func (p *producer) init(cl *Client) {
180180
err: errReloadProducerID,
181181
})
182182
p.c = sync.NewCond(&p.mu)
183+
p.batchPromises.initMaxLen(max(int(cl.cfg.maxBufferedRecords), 8192))
183184

184185
inithooks := func() {
185186
if p.hooks == nil {

pkg/kgo/ring.go

Lines changed: 67 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@ import (
44
"sync"
55
)
66

7-
// The ring type below in based on fixed sized blocking MPSC ringbuffer
8-
// if the number of elements is less than or equal to 8, and fallback to a slice if
9-
// the number of elements in greater than 8. The maximum number of elements
10-
// in the ring is unlimited.
7+
// The ring type is a dynamically-sized circular buffer with optional blocking
8+
// when full. The buffer starts at capacity 8 and grows as needed. When the
9+
// buffer empties, it shrinks back to capacity 8 to release memory.
1110
//
12-
// This ring replace channels in a few places in this client. The *main* advantage it
13-
// provide is to allow loops that terminate.
11+
// This ring replaces channels in a few places in this client. The *main*
12+
// advantage it provides is to allow loops that terminate.
1413
//
15-
// With channels, we always have to have a goroutine draining the channel. We
14+
// With channels, we always have to have a goroutine draining the channel. We
1615
// cannot start the goroutine when we add the first element, because the
1716
// goroutine will immediately drain the first and if something produces right
1817
// away, it will start a second concurrent draining goroutine.
@@ -23,10 +22,6 @@ import (
2322
// which would block the worker from grabbing the lock. Any other lock ordering
2423
// has TOCTOU problems as well.
2524
//
26-
// We could exclusively use a slice that we always push to and pop the front of.
27-
// This is a bit easier to reason about, but constantly reallocates and has no bounded
28-
// capacity, so we use it only if the number of elements is greater than 8.
29-
//
3025
// The key insight is that we only pop the front *after* we are done with it.
3126
// If there are still more elements, the worker goroutine can continue working.
3227
// If there are no more elements, it can quit. When pushing, if the pusher
@@ -36,95 +31,108 @@ import (
3631
// If a die happens while a worker is running, all future pops will see the
3732
// ring is dead and can fail promises immediately. If a worker is not running,
3833
// then there are no promises that need to be called.
39-
//
40-
// We use size 8 buffers because eh why not. This gives us a small optimization
41-
// of masking to increment and decrement, rather than modulo arithmetic.
4234

43-
const (
44-
mask7 = 0b0000_0111
45-
eight = mask7 + 1
46-
)
35+
const minRingCap = 8
4736

4837
type ring[T any] struct {
4938
mu sync.Mutex
5039

51-
elems [eight]T
40+
elems []T // circular buffer, min capacity minRingCap
41+
head int // index of first element
42+
l int // number of elements
5243

53-
head uint8
54-
tail uint8
55-
l uint8
56-
dead bool
44+
maxLen int // if >0, push blocks when l >= maxLen
45+
cond *sync.Cond // used for blocking when at maxLen
46+
dead bool
47+
}
5748

58-
overflow []T
49+
// initMaxLen sets the maximum number of elements before push blocks.
50+
// This must be called before any concurrent access.
51+
func (r *ring[T]) initMaxLen(max int) {
52+
r.maxLen = max
53+
r.cond = sync.NewCond(&r.mu)
5954
}
6055

6156
func (r *ring[T]) die() {
6257
r.mu.Lock()
6358
defer r.mu.Unlock()
6459

6560
r.dead = true
61+
if r.cond != nil {
62+
r.cond.Broadcast()
63+
}
6664
}
6765

6866
func (r *ring[T]) push(elem T) (first, dead bool) {
6967
r.mu.Lock()
7068
defer r.mu.Unlock()
7169

70+
// If a max length is set, block until there's space.
71+
for r.maxLen > 0 && r.l >= r.maxLen && !r.dead {
72+
r.cond.Wait()
73+
}
74+
7275
if r.dead {
7376
return false, true
7477
}
7578

76-
// If the ring is full, we go into overflow; if overflow is non-empty,
77-
// for ordering purposes, we add to the end of overflow. We only go
78-
// back to using the ring once overflow is finally empty.
79-
if r.l == eight || len(r.overflow) > 0 {
80-
r.overflow = append(r.overflow, elem)
81-
return false, false
79+
// Grow: double capacity when full (or initialize to minRingCap).
80+
if r.l == cap(r.elems) {
81+
r.resize(max(cap(r.elems)*2, minRingCap))
8282
}
8383

84-
r.elems[r.tail] = elem
85-
r.tail = (r.tail + 1) & mask7
84+
// Write at tail position (head + l, wrapped).
85+
writePos := (r.head + r.l) % cap(r.elems)
86+
r.elems[writePos] = elem
8687
r.l++
8788

8889
return r.l == 1, false
8990
}
9091

92+
// resize changes the buffer capacity, copying elements in linear order.
93+
// Must be called with r.mu held.
94+
func (r *ring[T]) resize(newCap int) {
95+
newElems := make([]T, newCap)
96+
if r.l > 0 {
97+
// Copy elements in order: from head to end, then from start to head.
98+
if r.head+r.l <= len(r.elems) {
99+
copy(newElems, r.elems[r.head:r.head+r.l])
100+
} else {
101+
n := copy(newElems, r.elems[r.head:])
102+
copy(newElems[n:], r.elems[:r.l-n])
103+
}
104+
}
105+
r.elems = newElems
106+
r.head = 0
107+
}
108+
91109
func (r *ring[T]) dropPeek() (next T, more, dead bool) {
92110
var zero T
93111

94112
r.mu.Lock()
95113
defer r.mu.Unlock()
96114

97-
// We always drain the ring first. If the ring is ever empty, there
98-
// must be overflow: we would not be here if the ring is not-empty.
99-
if r.l > 1 {
100-
r.elems[r.head] = zero
101-
r.head = (r.head + 1) & mask7
102-
r.l--
103-
return r.elems[r.head], true, r.dead
104-
} else if r.l == 1 {
105-
r.elems[r.head] = zero
106-
r.head = (r.head + 1) & mask7
107-
r.l--
108-
if len(r.overflow) == 0 {
109-
return next, false, r.dead
110-
}
111-
return r.overflow[0], true, r.dead
115+
if r.l == 0 {
116+
return zero, false, r.dead
112117
}
113118

114-
r.overflow[0] = zero
119+
// Clear current head element.
120+
r.elems[r.head] = zero
121+
r.head = (r.head + 1) % cap(r.elems)
122+
r.l--
115123

116-
// In case of continuous push and pulls to the overflow slice, the overflow
117-
// slice's underlying memory array is not expected to grow indefinitely because
118-
// append() will eventually re-allocate the memory and, when will do it, it will
119-
// only copy the "live" elements (the part of the slide pointed by the slice header).
120-
r.overflow = r.overflow[1:]
121-
122-
if len(r.overflow) > 0 {
123-
return r.overflow[0], true, r.dead
124+
// Signal any blocked pushers that space is available.
125+
if r.cond != nil {
126+
r.cond.Signal()
124127
}
125128

126-
// We have no more overflow elements. We reset the slice to nil to release memory.
127-
r.overflow = nil
129+
// Shrink: reduce to minRingCap when mostly empty to release memory.
130+
if r.l <= minRingCap/2 && cap(r.elems) > minRingCap {
131+
r.resize(minRingCap)
132+
}
128133

129-
return next, false, r.dead
134+
if r.l > 0 {
135+
return r.elems[r.head], true, r.dead
136+
}
137+
return zero, false, r.dead
130138
}

0 commit comments

Comments
 (0)