-
Notifications
You must be signed in to change notification settings - Fork 22
Open
Description
Description
Calling pause() during graceful shutdown causes offset corruption when enable.auto.commit: true. The consumer seeks to stale cached offsets and commits them, overwriting correct auto-committed offsets. This results in already-processed messages being reprocessed after restart or rebalance.
Environment Information
- OS: macOS (reproducible on Linux)
- Node Version: 23.x
- NPM Version: 9.x / 10.x
- C++ Toolchain: clang/g++
- confluent-kafka-javascript version: 1.6.0
Steps to Reproduce
- Create consumer group with multiple workers (
enable.auto.commit: true) - Consume messages actively (e.g., 1000 msg/sec)
- During consumption, trigger a rebalances and initiate graceful shutdown that calls
pause():await consumer.pause([{ topic: 'my-topic' }]); await waitForInflightMessages(); // Wait for handlers to complete await consumer.disconnect();
- Trigger sequential shutdowns (e.g., rolling deployment) causing rebalances
- Remaining/restarted consumers reprocess already-consumed messages
Root Cause
In lib/kafkajs/_consumer.js:
#pauseInternal()(line 1838-1862): Reads from#lastConsumedOffsetscache- Stale cache:
#lastConsumedOffsetsis never cleaned during rebalances (line 359-369), contains outdated offsets #seekInternal()commits (line 1821-1822): Seeks to stale offset and commits it immediately whenenable.auto.commit: true- Race with auto-commit: Overwrites correct offsets stored via
_offsetsStoreSingle()but not yet auto-committed
// Line 1848-1853: Uses stale cached offset
if (this.#lastConsumedOffsets.has(key)) {
const seekOffset = this.#lastConsumedOffsets.get(key);
// ...seeks to seekOffset.offset + 1
}
// Line 1821-1822: Commits the stale offset
if (offsetsToCommit.length !== 0 && this.#internalConfig['enable.auto.commit']) {
await this.#commitOffsetsUntilNoStateErr(offsetsToCommit); // ← COMMITS STALE OFFSET
}Expected Behavior
Graceful shutdown should not regress committed offsets. Auto-commit should handle offset management based on _offsetsStoreSingle() calls.
Actual Behavior
pause() immediately commits stale cached offsets, overwriting correct offsets, causing message reprocessing after restart.
confluent-kafka-javascript Configuration Settings
{
'group.id': 'my-consumer-group',
'enable.auto.commit': true,
'auto.commit.interval.ms': 5000,
// ... other settings
}Additional context
- Issue occurs most frequently during rolling deployments with sequential shutdowns
- Affects random partitions unpredictably (depends on which cached offsets are stale)
#lastConsumedOffsetsMap has no lifecycle management - entries never expire or get cleaned during rebalances- The
pause()method's seek-and-commit behavior conflicts with graceful shutdown scenarios
Metadata
Metadata
Assignees
Labels
No labels