Skip to content

Commit cbcd734

Browse files
committed
fix: only update preferredReadReplica if valid
FetchResponse from a follower will _not_ contain a PreferredReadReplica. It seems like the partitionConsumer would overwrite the previously assigned value from the leader with -1 which would then trigger the "reconnect to the current leader" changes from #1936 causing a flip-flop effect. Contributes-to: #2071 Signed-off-by: Dominic Evans <[email protected]>
1 parent 556ddf7 commit cbcd734

2 files changed

Lines changed: 16 additions & 11 deletions

File tree

consumer.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -132,16 +132,17 @@ func (c *consumer) Partitions(topic string) ([]int32, error) {
132132

133133
func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
134134
child := &partitionConsumer{
135-
consumer: c,
136-
conf: c.conf,
137-
topic: topic,
138-
partition: partition,
139-
messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
140-
errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
141-
feeder: make(chan *FetchResponse, 1),
142-
trigger: make(chan none, 1),
143-
dying: make(chan none),
144-
fetchSize: c.conf.Consumer.Fetch.Default,
135+
consumer: c,
136+
conf: c.conf,
137+
topic: topic,
138+
partition: partition,
139+
messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
140+
errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
141+
feeder: make(chan *FetchResponse, 1),
142+
preferredReadReplica: invalidPreferredReplicaID,
143+
trigger: make(chan none, 1),
144+
dying: make(chan none),
145+
fetchSize: c.conf.Consumer.Fetch.Default,
145146
}
146147

147148
if err := child.chooseStartingOffset(offset); err != nil {
@@ -605,7 +606,9 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
605606

606607
consumerBatchSizeMetric.Update(int64(nRecs))
607608

608-
child.preferredReadReplica = block.PreferredReadReplica
609+
if block.PreferredReadReplica != invalidPreferredReplicaID {
610+
child.preferredReadReplica = block.PreferredReadReplica
611+
}
609612

610613
if nRecs == 0 {
611614
partialTrailingMessage, err := block.isPartial()

fetch_response.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"time"
66
)
77

8+
const invalidPreferredReplicaID = -1
9+
810
type AbortedTransaction struct {
911
ProducerID int64
1012
FirstOffset int64

0 commit comments

Comments
 (0)