Skip to content

Commit dc3908a

Browse files
twmbclaude
andcommitted
kgo: unlinger partitions in ProduceSync to avoid linger delay
ProduceSync now stops lingering and triggers an immediate drain on all partitions that records were produced to. This avoids the caller waiting unnecessarily for linger timers (default 10ms since v1.20.0) when synchronously waiting for results. After each Produce call, the record's Partition field is already set (see bufferRecord), so we collect unique recBufs inline and unlinger them after all records are enqueued. A [16]*recBuf stack array with linear dedup avoids heap allocation in the common case. Closes #1195 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 677ddba commit dc3908a

1 file changed

Lines changed: 71 additions & 2 deletions

File tree

pkg/kgo/producer.go

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,13 @@ func (rs ProduceResults) First() (*Record, error) {
311311
// ProduceSync is a synchronous produce. See the [Produce] documentation for an
312312
// in depth description of how producing works.
313313
//
314-
// This function simply produces all records in one range loop and waits for
315-
// them all to be produced before returning.
314+
// This function produces all records and waits for them all to be produced
315+
// before returning. If the client has a non-zero linger configured, after all
316+
// records are enqueued, this function stops lingering and triggers an immediate
317+
// drain on all partitions that records were produced to. This avoids
318+
// unnecessarily waiting for linger timers when the caller is synchronously
319+
// waiting for results. Partitions that are lingering due to concurrent
320+
// [Produce] calls are not affected.
316321
func (cl *Client) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults {
317322
var (
318323
wg sync.WaitGroup
@@ -324,9 +329,73 @@ func (cl *Client) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults
324329
)
325330

326331
wg.Add(len(rs))
332+
333+
// After each Produce call for a known topic, the record's Partition
334+
// field is already set (see bufferRecord), allowing us to collect
335+
// which recBufs to unlinger without a second pass over the records.
336+
// We use a [16] base array to avoid heap allocation in the common
337+
// case, and linear dedup since the number of unique partitions is
338+
// typically small.
339+
//
340+
// We load partition data BEFORE calling Produce to avoid a data
341+
// race on r.Partition. If partitions exist before Produce,
342+
// partitionsForTopicProduce will also see them (partition counts
343+
// are monotonically increasing) and will partition the record
344+
// synchronously in bufferRecord, making r.Partition safe to read
345+
// after Produce returns. If pd is nil, we never read r.Partition,
346+
// avoiding a race with the metadata goroutine which partitions
347+
// unknownTopics records asynchronously.
348+
var (
349+
buf [16]*recBuf
350+
unlinger = buf[:0]
351+
topics topicsPartitionsData
352+
353+
lastTopic string
354+
lastPD *topicPartitionsData
355+
)
356+
if cl.cfg.linger > 0 {
357+
topics = cl.producer.topics.load()
358+
}
359+
327360
for _, r := range rs {
361+
var pd *topicPartitionsData
362+
if topics != nil {
363+
if r.Topic == lastTopic {
364+
pd = lastPD
365+
} else if parts, ok := topics[r.Topic]; ok {
366+
if v := parts.load(); len(v.partitions) > 0 {
367+
pd = v
368+
}
369+
lastTopic = r.Topic
370+
lastPD = pd
371+
}
372+
}
373+
328374
cl.Produce(ctx, r, promise)
375+
376+
if pd == nil {
377+
continue
378+
}
379+
if int(r.Partition) >= len(pd.partitions) {
380+
continue
381+
}
382+
rb := pd.partitions[r.Partition].records
383+
var seen bool
384+
for _, have := range unlinger {
385+
if have == rb {
386+
seen = true
387+
break
388+
}
389+
}
390+
if !seen {
391+
unlinger = append(unlinger, rb)
392+
}
329393
}
394+
395+
for _, rb := range unlinger {
396+
rb.unlingerAndManuallyDrain()
397+
}
398+
330399
wg.Wait()
331400

332401
return results

0 commit comments

Comments
 (0)