Skip to content

Commit 5e1dfc7

Browse files
committed
fix(test): improve flakey async producer test
The TestAsyncProducerRecoveryWithRetriesDisabled periodically fails and then passes on a re-run. Stepping through it I believe this was because the test had assumed metadata requests would always go to the seed/bootstrap broker, but they can actually go to any broker. So there was a 1 in 3 chance that the test would fail because it would await timeout of metadata requests that were never going to be responded to. Rewriting the test to use the handler map capability rather than the expectations interface and configuring the mock metadata responses on all mock brokers allows the test to pass quickly and reliably.
1 parent ae85104 commit 5e1dfc7

1 file changed

Lines changed: 51 additions & 26 deletions

File tree

async_producer_test.go

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -314,18 +314,30 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
314314

315315
func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
316316
tt := func(t *testing.T, kErr KError) {
317-
seedBroker := NewMockBroker(t, 1)
318-
leader1 := NewMockBroker(t, 2)
319-
leader2 := NewMockBroker(t, 3)
317+
seedBroker := NewMockBroker(t, 0)
318+
broker1 := NewMockBroker(t, 1)
319+
broker2 := NewMockBroker(t, 2)
320+
321+
mockLeader := func(leaderID int32) *MockMetadataResponse {
322+
return NewMockMetadataResponse(t).
323+
SetController(seedBroker.BrokerID()).
324+
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
325+
SetBroker(broker1.Addr(), broker1.BrokerID()).
326+
SetBroker(broker2.Addr(), broker2.BrokerID()).
327+
SetLeader("my_topic", 0, leaderID).
328+
SetLeader("my_topic", 1, leaderID)
329+
}
320330

321-
metadataLeader1 := new(MetadataResponse)
322-
metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
323-
metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
324-
metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError)
325-
seedBroker.Returns(metadataLeader1)
331+
seedBroker.SetHandlerByMap(
332+
map[string]MockResponse{
333+
"MetadataRequest": mockLeader(broker1.BrokerID()),
334+
},
335+
)
326336

327337
config := NewTestConfig()
338+
config.ClientID = "TestAsyncProducerRecoveryWithRetriesDisabled"
328339
config.Producer.Flush.Messages = 2
340+
config.Producer.Flush.Frequency = 100 * time.Millisecond
329341
config.Producer.Return.Successes = true
330342
config.Producer.Retry.Max = 0 // disable!
331343
config.Producer.Retry.Backoff = 0
@@ -335,33 +347,46 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
335347
t.Fatal(err)
336348
}
337349

350+
broker1.SetHandlerByMap(
351+
map[string]MockResponse{
352+
"MetadataRequest": mockLeader(broker1.BrokerID()),
353+
"ProduceRequest": NewMockProduceResponse(t).
354+
SetError("my_topic", 0, kErr).
355+
SetError("my_topic", 1, kErr),
356+
},
357+
)
358+
338359
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
339360
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
340-
prodNotLeader := new(ProduceResponse)
341-
prodNotLeader.AddTopicPartition("my_topic", 0, kErr)
342-
prodNotLeader.AddTopicPartition("my_topic", 1, kErr)
343-
leader1.Returns(prodNotLeader)
344361
expectResults(t, producer, 0, 2)
345362

346-
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
347-
metadataLeader2 := new(MetadataResponse)
348-
metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
349-
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
350-
metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, nil, ErrNoError)
351-
leader1.Returns(metadataLeader2)
352-
leader1.Returns(metadataLeader2)
363+
seedBroker.SetHandlerByMap(
364+
map[string]MockResponse{
365+
"MetadataRequest": mockLeader(broker2.BrokerID()),
366+
},
367+
)
368+
broker1.SetHandlerByMap(
369+
map[string]MockResponse{
370+
"MetadataRequest": mockLeader(broker2.BrokerID()),
371+
},
372+
)
373+
broker2.SetHandlerByMap(
374+
map[string]MockResponse{
375+
"MetadataRequest": mockLeader(broker2.BrokerID()),
376+
"ProduceRequest": NewMockProduceResponse(t).
377+
SetError("my_topic", 0, ErrNoError).
378+
SetError("my_topic", 1, ErrNoError),
379+
},
380+
)
353381

382+
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
354383
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
355-
prodSuccess := new(ProduceResponse)
356-
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
357-
prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
358-
leader2.Returns(prodSuccess)
359384
expectResults(t, producer, 2, 0)
360385

361-
seedBroker.Close()
362-
leader1.Close()
363-
leader2.Close()
364386
closeProducer(t, producer)
387+
seedBroker.Close()
388+
broker1.Close()
389+
broker2.Close()
365390
}
366391

367392
t.Run("retriable error", func(t *testing.T) {

0 commit comments

Comments
 (0)