Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1878,7 +1878,8 @@ type EpochOffset struct {
// than the other if this one's epoch is less, or the epoch's are equal and
// this one's offset is less.
func (e EpochOffset) Less(o EpochOffset) bool {
return e.Epoch < o.Epoch || e.Epoch == o.Epoch && e.Offset < o.Offset
ee, oe := max(e.Epoch, -1), max(o.Epoch, -1)
return ee < oe || ee == oe && e.Offset < o.Offset
}

type uncommitted map[string]map[int32]uncommit
Expand Down Expand Up @@ -1925,13 +1926,18 @@ func (g *groupConsumer) updateUncommitted(fetches Fetches) {
final.LeaderEpoch, // -1 if old message / unknown
final.Offset + 1,
}
prior := topicOffsets[partition.Partition]
prior, ok := topicOffsets[partition.Partition]
if !ok {
uninit := EpochOffset{-1, 0}
uncommit := uncommit{uninit, uninit, uninit}
prior, topicOffsets[partition.Partition] = uncommit, uncommit
}

if debug {
if setHead {
fmt.Fprintf(&b, "%d{%d=>%d r%d}, ", partition.Partition, prior.head.Offset, set.Offset, len(partition.Records))
fmt.Fprintf(&b, "%d{%d=>%d r%d e%d}, ", partition.Partition, prior.head.Offset, set.Offset, len(partition.Records), set.Epoch)
} else {
fmt.Fprintf(&b, "%d{%d=>%d=>%d r%d}, ", partition.Partition, prior.head.Offset, prior.dirty.Offset, set.Offset, len(partition.Records))
fmt.Fprintf(&b, "%d{%d=>%d=>%d r%d e%d}, ", partition.Partition, prior.head.Offset, prior.dirty.Offset, set.Offset, len(partition.Records), set.Epoch)
}
}

Expand Down Expand Up @@ -2400,15 +2406,18 @@ func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error {
offsets[r.Topic] = toffsets
}

set := EpochOffset{
r.LeaderEpoch,
r.Offset + 1, // need to advice to next offset to move forward
}

if at, exists := toffsets[r.Partition]; exists {
if at.Epoch > r.LeaderEpoch || at.Epoch == r.LeaderEpoch && at.Offset > r.Offset {
if set.Less(at) {
continue
}
}
toffsets[r.Partition] = EpochOffset{
r.LeaderEpoch,
r.Offset + 1, // need to advice to next offset to move forward
}

toffsets[r.Partition] = set
}

var rerr error // return error
Expand Down Expand Up @@ -2469,11 +2478,11 @@ func (cl *Client) MarkCommitRecords(rs ...*Record) {
curTopic = r.Topic
}

current := curPartitions[r.Partition]
current, ok := curPartitions[r.Partition]
if newHead := (EpochOffset{
r.LeaderEpoch,
r.Offset + 1,
}); current.head.Less(newHead) {
}); !ok || current.head.Less(newHead) {
curPartitions[r.Partition] = uncommit{
dirty: current.dirty,
committed: current.committed,
Expand Down Expand Up @@ -2509,8 +2518,8 @@ func (cl *Client) MarkCommitOffsets(unmarked map[string]map[int32]EpochOffset) {
}

for partition, newHead := range partitions {
current := curPartitions[partition]
if current.head.Less(newHead) {
current, ok := curPartitions[partition]
if !ok || current.head.Less(newHead) {
curPartitions[partition] = uncommit{
dirty: current.dirty,
committed: current.committed,
Expand Down
12 changes: 12 additions & 0 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,8 +926,20 @@ func (cl *Client) mergeTopicPartitions(
for _, newTP := range newPartitions {
if isProduce && newTP.records.recBufsIdx == -1 {
newTP.records.sink.addRecBuf(newTP.records)
cl.cfg.logger.Log(LogLevelDebug, "metadata refresh new produce partition",
"topic", topic,
"partition", newTP.partition(),
"leader", newTP.leader,
"leader_epoch", newTP.leaderEpoch,
)
} else if !isProduce && newTP.cursor.cursorsIdx == -1 {
newTP.cursor.source.addCursor(newTP.cursor)
cl.cfg.logger.Log(LogLevelDebug, "metadata refresh new consume partition",
"topic", topic,
"partition", newTP.partition(),
"leader", newTP.leader,
"leader_epoch", newTP.leaderEpoch,
)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (p *producer) finishPromises(b batchPromise) {
}()
start:
for i, pr := range b.recs {
pr.LeaderEpoch = 0
pr.LeaderEpoch = -1
if b.baseOffset == -1 {
// if the base offset is invalid/unknown (-1), all record offsets should
// be treated as unknown
Expand Down
4 changes: 3 additions & 1 deletion pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ type Record struct {
ProducerID int64

// LeaderEpoch is the leader epoch of the broker at the time this
// record was written, or -1 if on message sets.
// record was written, or -1 if on message sets. When producing,
// this is always set to -1 (producers do not use this field and the
// broker does not reply with the epoch).
//
// For committing records, it is not recommended to modify the
// LeaderEpoch. Clients use the LeaderEpoch for data loss detection.
Expand Down