-
-
Notifications
You must be signed in to change notification settings - Fork 267
Closed
Labels
Description
Description
The ProduceSync method in the kafka client can block indefinitely even when the context is cancelled. This occurs because ProduceSync uses a sync.WaitGroup to wait for all records to be processed, but there's no mechanism to break out of wg.Wait() when the context is cancelled.
Steps to Reproduce
- start a broker locally with default settings
env KAFKA_HOST=127.0.0.1:9092 KAFKA_TOPIC=test go run produce.go- the client starts producing records every 5s
- shut down the broker
- the client hangs at
ProduceSync(), even if deadline (1min) already exceeded.
produce.go - client code that produces records repeatedly within deadline
package main
import (
"context"
"fmt"
"os"
"log"
"time"
"github.com/twmb/franz-go/pkg/kgo"
)
func produce(ctx context.Context) error {
client, _ := kgo.NewClient(
kgo.SeedBrokers(os.Getenv("KAFKA_HOST")),
kgo.RecordDeliveryTimeout(10*time.Second),
kgo.AllowAutoTopicCreation(),
)
defer client.Close()
var sequence int64
topic := os.Getenv("KAFKA_TOPIC")
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
sequence++
record := &kgo.Record{
Topic: topic,
Value: []byte(fmt.Sprintf("%d", sequence)),
}
deadline, _ := ctx.Deadline()
timeLeft := time.Until(deadline).Round(time.Second)
log.Printf("Producing message with sequence %d to topic %s (%s remaining)", sequence, topic, timeLeft)
if err := client.ProduceSync(ctx, record).FirstErr(); err != nil {
return fmt.Errorf("failed to produce message: %w", err)
}
log.Printf("Message produced to partition %d at offset %d", record.Partition, record.Offset)
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
if err := produce(ctx); err != nil {
log.Fatalf("Failed to produce: %v", err)
}
}Possible Root Cause(s)
So, the client retries sending the same batch over and over again (which will be deduplicated due to the sequence numbers) until the client receives a response -- and the client ignores any end-user context cancellation or retry limit.
Proposed Fix
If the goal is to unblock ProduceSync, either one might work:
- replace
sync.Groupwithgolang.org/x/sync/errgroup.Group wg.Wait()in another goroutine and signal byclose(waitCh), meanwhilewaitChcan be checked with aselect {}block.
However, the comment mentioned data loss and idempotent write, which I don't quite understand.
Reactions are currently unavailable