Skip to content

Commit 2ac7122

Browse files
committed
Fix wrong offsets in mock Consumer
1 parent 556ddf7 commit 2ac7122

2 files changed

Lines changed: 72 additions & 1 deletion

File tree

mocks/consumer.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,13 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset
155155
}
156156

157157
if c.partitionConsumers[topic][partition] == nil {
158+
highWatermarkOffset := offset
159+
if offset == sarama.OffsetOldest {
160+
highWatermarkOffset = 0
161+
}
162+
158163
c.partitionConsumers[topic][partition] = &PartitionConsumer{
164+
highWaterMarkOffset: highWatermarkOffset,
159165
t: c.t,
160166
topic: topic,
161167
partition: partition,
@@ -282,7 +288,7 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
282288

283289
msg.Topic = pc.topic
284290
msg.Partition = pc.partition
285-
msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1)
291+
msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1) - 1
286292

287293
pc.messages <- msg
288294
}

mocks/consumer_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,3 +247,68 @@ func TestConsumerUnexpectedTopicMetadata(t *testing.T) {
247247
t.Errorf("Expected an expectation failure to be set on the error reporter.")
248248
}
249249
}
250+
251+
func TestConsumerOffsetsAreManagedCorrectlyWithOffsetOldest(t *testing.T) {
252+
trm := newTestReporterMock()
253+
consumer := NewConsumer(trm, NewTestConfig())
254+
pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest)
255+
pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")})
256+
pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")})
257+
pcmock.ExpectMessagesDrainedOnClose()
258+
259+
pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
260+
if err != nil {
261+
t.Error(err)
262+
}
263+
264+
message1 := <-pc.Messages()
265+
if message1.Offset != 0 {
266+
t.Errorf("Expected offset of first message in the partition to be 0, got %d", message1.Offset)
267+
}
268+
269+
message2 := <-pc.Messages()
270+
if message2.Offset != 1 {
271+
t.Errorf("Expected offset of second message in the partition to be 1, got %d", message2.Offset)
272+
}
273+
274+
if err := consumer.Close(); err != nil {
275+
t.Error(err)
276+
}
277+
278+
if len(trm.errors) != 0 {
279+
t.Errorf("Expected to not report any errors, found: %v", trm.errors)
280+
}
281+
}
282+
283+
func TestConsumerOffsetsAreManagedCorrectlyWithSpecifiedOffset(t *testing.T) {
284+
startingOffset := int64(123)
285+
trm := newTestReporterMock()
286+
consumer := NewConsumer(trm, NewTestConfig())
287+
pcmock := consumer.ExpectConsumePartition("test", 0, startingOffset)
288+
pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")})
289+
pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")})
290+
pcmock.ExpectMessagesDrainedOnClose()
291+
292+
pc, err := consumer.ConsumePartition("test", 0, startingOffset)
293+
if err != nil {
294+
t.Error(err)
295+
}
296+
297+
message1 := <-pc.Messages()
298+
if message1.Offset != startingOffset {
299+
t.Errorf("Expected offset of first message to be %d, got %d", startingOffset, message1.Offset)
300+
}
301+
302+
message2 := <-pc.Messages()
303+
if message2.Offset != startingOffset+1 {
304+
t.Errorf("Expected offset of second message to be %d, got %d", startingOffset+1, message2.Offset)
305+
}
306+
307+
if err := consumer.Close(); err != nil {
308+
t.Error(err)
309+
}
310+
311+
if len(trm.errors) != 0 {
312+
t.Errorf("Expected to not report any errors, found: %v", trm.errors)
313+
}
314+
}

0 commit comments

Comments
 (0)