Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion pkg/kfake/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"os/signal"
"strconv"
"strings"

"github.com/twmb/franz-go/pkg/kfake"
Expand All @@ -17,9 +18,12 @@ func main() {
// Configure log level from flag
var logLevelStr string
var versionStr string
var portsStr string
flag.StringVar(&logLevelStr, "log-level", "none", "Log level: none, error, warn, info, debug")
flag.StringVar(&logLevelStr, "l", "none", "Log level (shorthand)")
flag.StringVar(&versionStr, "as-version", "", "Kafka version to emulate (e.g., 2.8, 3.5)")
flag.StringVar(&portsStr, "ports", "9092,9093,9094", "Comma-separated broker ports")
flag.StringVar(&portsStr, "p", "9092,9093,9094", "Comma-separated broker ports (shorthand)")
flag.Parse()

logLevel := kfake.LogLevelNone
Expand All @@ -36,8 +40,18 @@ func main() {
logLevel = kfake.LogLevelNone
}

var ports []int
for _, s := range strings.Split(portsStr, ",") {
p, err := strconv.Atoi(strings.TrimSpace(s))
if err != nil {
fmt.Fprintf(os.Stderr, "invalid port %q: %v\n", s, err)
os.Exit(1)
}
ports = append(ports, p)
}

opts := []kfake.Opt{
kfake.Ports(9092, 9093, 9094),
kfake.Ports(ports...),
kfake.SeedTopics(-1, "foo"),
kfake.WithLogger(kfake.BasicLogger(os.Stderr, logLevel)),
}
Expand Down
91 changes: 71 additions & 20 deletions pkg/kfake/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,46 @@
# Options:
# -t, --test PATTERN Test to run (default: all tests)
# Examples: Txn, Group, Txn/range, Group/sticky
# -b, --bench PATTERN Benchmark to run (e.g., ProduceSyncLinger)
# -c, --count NUM Benchmark count (default: 3, only with --bench)
# -n, --iterations NUM Max iterations (default: 50)
# -r, --records NUM Number of records (default: 100000)
# -p, --ports PORTS Comma-separated broker ports (default: 9092,9093,9094)
# --race Enable race detector (default: off)
# -l, --log-level LEVEL Set KGO_LOG_LEVEL (e.g., debug, info)
# -v, --version VERSION Kafka version to emulate (e.g., 2.8, 3.5)
# -k, --kill Kill processes on ports 9092-9094 and exit
# -k, --kill Kill processes on configured ports and exit
# -h, --help Show this help

# Derive paths from script location.
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
KFAKE_DIR="$SCRIPT_DIR"
KGO_DIR="$SCRIPT_DIR/../kgo"

MAX_ITERATIONS=50
RECORDS=500000
TEST_TYPE=""
BENCH_TYPE=""
BENCH_COUNT=3
RACE=""
LOG_LEVEL=""
KFAKE_VERSION="${KFAKE_VERSION:-}"
PORTS="9092,9093,9094"

while [[ $# -gt 0 ]]; do
case $1 in
-t|--test)
TEST_TYPE="$2"
shift 2
;;
-b|--bench)
BENCH_TYPE="$2"
shift 2
;;
-c|--count)
BENCH_COUNT="$2"
shift 2
;;
-n|--iterations)
MAX_ITERATIONS="$2"
shift 2
Expand All @@ -33,6 +52,10 @@ while [[ $# -gt 0 ]]; do
RECORDS="$2"
shift 2
;;
-p|--ports)
PORTS="$2"
shift 2
;;
--race)
RACE="-race"
shift
Expand All @@ -46,8 +69,9 @@ while [[ $# -gt 0 ]]; do
shift 2
;;
-k|--kill)
echo "Killing processes on ports 9092, 9093, 9094..."
for port in 9092 9093 9094; do
IFS=',' read -ra KILL_PORTS <<< "$PORTS"
echo "Killing processes on ports ${KILL_PORTS[*]}..."
for port in "${KILL_PORTS[@]}"; do
pid=$(lsof -ti:$port 2>/dev/null)
if [ -n "$pid" ]; then
echo " Killing PID $pid on port $port"
Expand All @@ -62,12 +86,15 @@ while [[ $# -gt 0 ]]; do
echo "Options:"
echo " -t, --test PATTERN Test to run (default: all tests)"
echo " Examples: Txn, Group, Txn/range, Group/sticky"
echo " -b, --bench PATTERN Benchmark to run (e.g., ProduceSyncLinger)"
echo " -c, --count NUM Benchmark count (default: 3, only with --bench)"
echo " -n, --iterations NUM Max iterations (default: 50)"
echo " -r, --records NUM Number of records (default: 100000)"
echo " -p, --ports PORTS Comma-separated broker ports (default: 9092,9093,9094)"
echo " --race Enable race detector (default: off)"
echo " -l, --log-level LEVEL Set KGO_LOG_LEVEL (e.g., debug, info)"
echo " -v, --version VERSION Kafka version to emulate (e.g., 2.8, 3.5)"
echo " -k, --kill Kill processes on ports 9092-9094 and exit"
echo " -k, --kill Kill processes on configured ports and exit"
echo " -h, --help Show this help"
exit 0
;;
Expand All @@ -78,15 +105,32 @@ while [[ $# -gt 0 ]]; do
esac
done

# Build test pattern
if [ -z "$TEST_TYPE" ]; then
# Parse ports into array and build seeds string for KGO_SEEDS.
IFS=',' read -ra PORT_ARRAY <<< "$PORTS"
FIRST_PORT="${PORT_ARRAY[0]}"
SEEDS=""
for port in "${PORT_ARRAY[@]}"; do
if [ -n "$SEEDS" ]; then
SEEDS="$SEEDS,"
fi
SEEDS="${SEEDS}127.0.0.1:${port}"
done

# Build test/bench arguments
if [ -n "$BENCH_TYPE" ]; then
# Benchmark mode: run only the bench, skip tests, single iteration
BENCH_PATTERN="Benchmark${BENCH_TYPE}"
GO_TEST_ARGS="-bench=$BENCH_PATTERN -benchmem -count=$BENCH_COUNT -run=^$"
MAX_ITERATIONS=1
TEST_PATTERN="$BENCH_PATTERN"
elif [ -z "$TEST_TYPE" ]; then
# Run all tests by default
TEST_PATTERN=""
RUN_ARG=""
GO_TEST_ARGS=""
TEST_PATTERN="all"
else
# User specified a pattern like "Txn", "Group", "Txn/range"
TEST_PATTERN="Test${TEST_TYPE}"
RUN_ARG="-run $TEST_PATTERN"
GO_TEST_ARGS="-run $TEST_PATTERN"
fi
LOG_DIR="/tmp/kfake_test_logs"
mkdir -p "$LOG_DIR"
Expand Down Expand Up @@ -116,18 +160,25 @@ trap cleanup_exit EXIT
echo "Configuration:"
echo " Iterations: $MAX_ITERATIONS"
echo " Records: $RECORDS"
echo " Test pattern: ${TEST_PATTERN:-all}"
echo " Test pattern: $TEST_PATTERN"
if [ -n "$BENCH_TYPE" ]; then
echo " Bench count: $BENCH_COUNT"
fi
echo " Ports: $PORTS"
echo " Seeds: $SEEDS"
echo " Race detector: ${RACE:-disabled}"
echo " Log level: ${LOG_LEVEL:-default}"
echo " Kafka version: ${KFAKE_VERSION:-latest}"
echo " kfake dir: $KFAKE_DIR"
echo " kgo dir: $KGO_DIR"
echo " Logs: $LOG_DIR"
echo ""

for i in $(seq 1 $MAX_ITERATIONS); do
echo "=== Run $i of $MAX_ITERATIONS ==="

# Wait for ports to be free (max 20 seconds)
for port in 9092 9093 9094; do
for port in "${PORT_ARRAY[@]}"; do
for _ in $(seq 1 40); do
if ! lsof -ti:$port >/dev/null 2>&1; then
break
Expand All @@ -137,17 +188,17 @@ for i in $(seq 1 $MAX_ITERATIONS); do
done

# Start server and capture its output
cd /Users/travisbischel/src/twmb/franz-go/pkg/kfake
cd "$KFAKE_DIR"
VERSION_ARG=""
if [ -n "$KFAKE_VERSION" ]; then
VERSION_ARG="--as-version $KFAKE_VERSION"
fi
KFAKE_LOG_LEVEL=$LOG_LEVEL go run main.go $VERSION_ARG > "$SERVER_LOG" 2>&1 &
KFAKE_LOG_LEVEL=$LOG_LEVEL go run main.go $VERSION_ARG --ports "$PORTS" > "$SERVER_LOG" 2>&1 &
SERVER_PID=$!

# Wait for server to be listening (max 10 seconds)
for _ in $(seq 1 20); do
if lsof -ti:9092 >/dev/null 2>&1; then
if lsof -ti:$FIRST_PORT >/dev/null 2>&1; then
break
fi
sleep 0.5
Expand All @@ -161,14 +212,14 @@ for i in $(seq 1 $MAX_ITERATIONS); do
exit 1
fi

# Run the test (use longer timeout with race detector)
cd /Users/travisbischel/src/twmb/franz-go/pkg/kgo
if [ -n "$RACE" ]; then
TIMEOUT="100s"
# Run the test (use longer timeout with race detector or benchmarks)
cd "$KGO_DIR"
if [ -n "$RACE" ] || [ -n "$BENCH_TYPE" ]; then
TIMEOUT="300s"
else
TIMEOUT="60s"
fi
KGO_TEST_RECORDS=$RECORDS KGO_LOG_LEVEL=$LOG_LEVEL go test $RACE $RUN_ARG -timeout $TIMEOUT > "$TEST_LOG" 2>&1
KGO_SEEDS=$SEEDS KGO_TEST_RECORDS=$RECORDS KGO_LOG_LEVEL=$LOG_LEVEL go test $RACE $GO_TEST_ARGS -timeout $TIMEOUT > "$TEST_LOG" 2>&1
TEST_EXIT=$?

if [ $TEST_EXIT -ne 0 ]; then
Expand All @@ -180,7 +231,7 @@ for i in $(seq 1 $MAX_ITERATIONS); do
tail -50 "$TEST_LOG"
echo ""
echo "Server (pid $SERVER_PID) left running for debugging."
echo "Connect to localhost:9092 to inspect state."
echo "Connect to localhost:$FIRST_PORT to inspect state."
echo "Kill manually when done: kill $SERVER_PID"
SERVER_PID="" # Clear so EXIT trap doesn't kill it
exit 1
Expand Down
73 changes: 71 additions & 2 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,13 @@ func (rs ProduceResults) First() (*Record, error) {
// ProduceSync is a synchronous produce. See the [Produce] documentation for an
// in depth description of how producing works.
//
// This function simply produces all records in one range loop and waits for
// them all to be produced before returning.
// This function produces all records and waits for them all to be produced
// before returning. If the client has a non-zero linger configured, after all
// records are enqueued, this function stops lingering and triggers an immediate
// drain on all partitions that records were produced to. This avoids
// unnecessarily waiting for linger timers when the caller is synchronously
// waiting for results. Partitions that are lingering due to concurrent
// [Produce] calls are not affected.
func (cl *Client) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults {
var (
wg sync.WaitGroup
Expand All @@ -324,9 +329,73 @@ func (cl *Client) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults
)

wg.Add(len(rs))

// After each Produce call for a known topic, the record's Partition
// field is already set (see bufferRecord), allowing us to collect
// which recBufs to unlinger without a second pass over the records.
// We use a [16] base array to avoid heap allocation in the common
// case, and linear dedup since the number of unique partitions is
// typically small.
//
// We load partition data BEFORE calling Produce to avoid a data
// race on r.Partition. If partitions exist before Produce,
// partitionsForTopicProduce will also see them (partition counts
// are monotonically increasing) and will partition the record
// synchronously in bufferRecord, making r.Partition safe to read
// after Produce returns. If pd is nil, we never read r.Partition,
// avoiding a race with the metadata goroutine which partitions
// unknownTopics records asynchronously.
var (
buf [16]*recBuf
unlinger = buf[:0]
topics topicsPartitionsData

lastTopic string
lastPD *topicPartitionsData
)
if cl.cfg.linger > 0 {
topics = cl.producer.topics.load()
}

for _, r := range rs {
var pd *topicPartitionsData
if topics != nil {
if r.Topic == lastTopic {
pd = lastPD
} else if parts, ok := topics[r.Topic]; ok {
if v := parts.load(); len(v.partitions) > 0 {
pd = v
}
lastTopic = r.Topic
lastPD = pd
}
}

cl.Produce(ctx, r, promise)

if pd == nil {
continue
}
if int(r.Partition) >= len(pd.partitions) {
continue
}
rb := pd.partitions[r.Partition].records
var seen bool
for _, have := range unlinger {
if have == rb {
seen = true
break
}
}
if !seen {
unlinger = append(unlinger, rb)
}
}

for _, rb := range unlinger {
rb.unlingerAndManuallyDrain()
}

wg.Wait()

return results
Expand Down