Skip to content

AsyncProducer produces messages in out-of-order when retries happen #2619

@dethi

Description

@dethi
Versions
Sarama Kafka Go
>= v1.31.0 v3.5.0 1.20.7
Configuration

What configuration values are you using for Sarama and Kafka?

config.Version = sarama.V2_7_0_0
// Read success and errors
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
// Wait for Ack from all
config.Producer.RequiredAcks = sarama.WaitForAll
// Reduce internal buffer size to simplify the reproduction
config.ChannelBufferSize = 10
// Allow only one open requests at a time
config.Net.MaxOpenRequests = 1
// Retry almost indefinitely
config.Producer.Retry.Max = 1_000_000_000
// Reduce the max size, to flush more often and reproduce easily
config.Producer.MaxMessageBytes = 100
// We are only gonna produce to one partition to simplify the reproduction
config.Producer.Partitioner = sarama.NewManualPartitioner
Logs & Reproduction Code

I have created a Gist with all the information required to reproduce (code, configuration, etc), including some example logs:
https://gist.github.com/dethi/406fb5562030d8d0fa76db18d95bbbbe

Problem Description
Expectation

When producing message to the same partition, the AsyncProducer guarantee the ordering of the produced messages even when retries are required, as long as config.Net.MaxOpenRequests = 1.

Idempotency shouldn't matter here.

Reality

Up until 1.31.0, the expectation hols. But the behaviour changed when request pipelining was introduced to the AsyncProducer. Now, retries cause message to be published out of order.

An easy way to see this is by enabling config.Producer.Idempotent. It will result in the AsyncProducer returning an error to the caller when retries happen (like the partition leader disappear, i.e. broker disconnect):

kafka: Failed to produce message to topic sarama-outoforder-bug: kafka server: The broker received an out of order sequence number

When idempotency is not enabled, Sarama will publish successfully the messages, but in out-of-order.

Code Analysis / Flow of produced messages

(follow the links as you read each paragraph)

  1. Code link: We think the issue is coming here. Here it use to be that we would not send another message/batch to the Kafka broker before we got back the answer from that broker and we sent the answer to the goroutine that processes that answer. One of the key point here as well is that the goroutine that writes into the bridge channel is also the goroutine that reads from the responses channel as we can see in func (bp brokerProducer) waitForSpace or func (bp brokerProducer) run, which means that wouldn't send another message to Kafka before we received AND processed the answer for the previous message.

  2. Code link: Now we use the new AsyncProduce function to send messages to Kafka brokers. The key point here is that it use to be that we would not be able to call AsyncProduce (or Produce to be exact) before the previous call to AsyncProduce/Produce returned (which would also give us the response of the request). Now the response are processed asynchronously and sent back to us via the sendResponse callback. We will see in part 3 that once a message is sent to the broker and the goroutine that processes the response is scheduled then AsyncProduce will return and another call will be made even though we potentially did not received the answer from the previous request yet.

  3. Code link: broker.AsyncProduce() uses sendInternal to send a batch/message to a broker. b.responses is a buffered channel that is used to control how many "in flight" requests there is currently to that broker so that a goroutine can't call AsyncProduce before we were able to schedule a run of the goroutine that will processes the response for that request (see func (b *Broker) responseReceiver()). One of the issue here is that if we set b.conf.Net.MaxOpenRequests = 1 so that we force the b.response to have a buffer of size 0 then it seems to me that we can still have b.conf.Net.MaxOpenRequests + 1 in flight requests to the Kafka broker. Assuming MaxOpenRequests is equal to one, then sure the 2nd call to AsyncProduce will block on b.responses <- promise but there will still be 2 inflight requests.

  4. Code link: Once a response was received by the broker from Kafka, it gets forwarded to the pending channel, which we read from the same goroutine. The responses are then forwarded back to the func (bp *brokerProducer) run() goroutine, which is also the one that is sending messages to the bridge channel.

  5. Code link: Once a response is received in the func (bp brokerProducer) run() goroutine, we will end up calling this retryMessages function if for instance the broker that we tried to send a the batch/message to crashed or was not available for other reasons. In that case we will retry to send the message that trigger that error by calling retryMessages on line 1146 for pSet.msgs which will send back all the element of the pSet.msgs batch into the input channel of the asyncProducer manager. We then also send back all messages that were buffered in the brokerProducer (these messages should have been batched and sent to Kafka at some point via AsyncProduce) so that we can send them after we tried to send back the elements of pSet.msgs because they were produced after the messages in pSet.msgs. The problem here is that this retry sequence does not take into account the messages that are currently in flight and for which we did not receive a response yet. Because of that, the ordering of the messages will change in case we have to retry to send the messages that were in flight when we had to reschedule a retry when we received the first error.

Surprisingly, part of the issue the issue was already noted in the latest comment of the same PR:

This means setting Net.MaxOpenRequests to 1 and using the AsyncProducer might result in up to 2 in-flight requests, leading to possible ordering issues or maybe an error with idempotency turn on against Kafka 0.11.0.x.

Thanks to @T30rix for helping me with the debugging and writing the detailed flow.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugneeds-investigationIssues that require followup from maintainersstale/exemptIssues and pull requests that should never be closed as stale

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions