feat(protocol): negotiate API versions#3209
Conversation
3d8a4d2 to
bd79dfa
Compare
Signed-off-by: Giorgio Pellero <giorgio.pellero@typeform.com>
Signed-off-by: Giorgio Pellero <giorgio.pellero@typeform.com>
Signed-off-by: Giorgio Pellero <giorgio.pellero@typeform.com>
Signed-off-by: Giorgio Pellero <giorgio.pellero@typeform.com>
Signed-off-by: Giorgio Pellero <giorgio.pellero@typeform.com>
cf8d039 to
6ca2800
Compare
Signed-off-by: Giorgio Pellero <giorgio.pellero@typeform.com>
|
@puellanivis I've addressed your review comments - would love it if you could have a second look when you've got some time! Thanks 🙂 |
puellanivis
left a comment
There was a problem hiding this comment.
Looks pretty good from my side. (note: I do not have permissions to approve PRs.)
dnwe
left a comment
There was a problem hiding this comment.
Thanks for working on this PR. Changes look good to me too so I’ll go ahead and merge into main. I’ll roll the main branch client out to a few places and leave it running for a bit before I cut a release
I’m also wondering about enabling usingApiVersions by default too and making DefaultVersion max too
|
Great to see this, thanks! |
|
After some deeper testing with Redpanda (I've found its small differences to be helpful in testing protocol compliance), I've noticed something that I think could use some further improvement in a new PR. The good:
The bad:
In my testing I could reproduce this issue as follows:
Replacing the response promise's Let me know what you think! |
Have we changed the default for the user-defined |
|
Makes sense, thanks. |
This PR introduces basic Kafka API version negotiation in Sarama by enabling the client to restrict the API version for each request based on the version ranges advertised by brokers in the initial
ApiVersionsrequest/response - which is now executed synchronously as part of the broker connection process, rather than as a fire-and-forget deferred goroutine, and before SASL authentication takes place (but after SSL/TLS, as indicated by the Kafka protocol docs).The selected version is the highest supported by both the broker and the user-defined
Config.Versionupper bound.To support this dynamic selection, a new
setVersion(int16)method has been added to theprotocolBodyinterface, exposing the request/response structs'Versionfield to be set externally.Also, it defines constants for message keys, and replaces some (read: those I found) usages of numeric literals to use the constants instead.
I experimented with extracting more logic from
protocolBodyimplementations, too, such as centralizingrequiredVersion(),isValidVersion(), and the "max protocol version for Kafka version" selection in most Request constructors as suggested by @dnwe in my previous PR; however, while they can certainly simplify the code, I found these were significantly larger-scale changes (and kinda out of scope for the purpose of this PR), and decided to leave them out to keep this PR short and easy to review.Motivation behind the change: Sarama incompatibility with Redpanda
The latest
github.com/IBM/saramaversion fails to connect to Redpanda brokers (latest version too):Unsupported version 10 for metadata APIkafka: client has run out of available brokers to talk to: EOFSarama logs
``` 2025/05/13 18:26:29 Initializing new client 2025/05/13 18:26:29 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:29 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:29 client/metadata fetching metadata for all topics from broker localhost:9092 2025/05/13 18:26:29 Connected to broker at localhost:9092 (unregistered) 2025/05/13 18:26:29 client/metadata got error from broker -1 while fetching metadata: EOF 2025/05/13 18:26:29 Closed connection to broker localhost:9092 2025/05/13 18:26:29 client/metadata no available broker to send metadata request to 2025/05/13 18:26:29 client/brokers resurrecting 1 dead seed brokers 2025/05/13 18:26:29 Error while sending ApiVersionsRequest to broker localhost:9092: kafka: broker not connected 2025/05/13 18:26:30 client/metadata retrying after 250ms... (2 attempts remaining) 2025/05/13 18:26:30 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:30 client/metadata fetching metadata for all topics from broker localhost:9092 2025/05/13 18:26:30 Connected to broker at localhost:9092 (unregistered) 2025/05/13 18:26:30 client/metadata got error from broker -1 while fetching metadata: EOF 2025/05/13 18:26:30 Closed connection to broker localhost:9092 2025/05/13 18:26:30 client/metadata no available broker to send metadata request to 2025/05/13 18:26:30 client/brokers resurrecting 1 dead seed brokers 2025/05/13 18:26:30 client/metadata retrying after 250ms... (1 attempts remaining) 2025/05/13 18:26:30 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:30 client/metadata fetching metadata for all topics from broker localhost:9092 2025/05/13 18:26:30 Connected to broker at localhost:9092 (unregistered) 2025/05/13 18:26:30 client/metadata got error from broker -1 while fetching metadata: EOF 2025/05/13 18:26:30 Closed connection to broker localhost:9092 2025/05/13 18:26:30 client/metadata no available broker to send metadata request to 2025/05/13 18:26:30 client/brokers resurrecting 1 dead seed brokers 2025/05/13 18:26:30 client/metadata retrying after 250ms... (0 attempts remaining) 2025/05/13 18:26:30 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:30 client/metadata fetching metadata for all topics from broker localhost:9092 2025/05/13 18:26:30 Connected to broker at localhost:9092 (unregistered) 2025/05/13 18:26:30 client/metadata got error from broker -1 while fetching metadata: EOF 2025/05/13 18:26:30 Closed connection to broker localhost:9092 2025/05/13 18:26:30 client/metadata no available broker to send metadata request to 2025/05/13 18:26:30 client/brokers resurrecting 1 dead seed brokers 2025/05/13 18:26:30 Closing Client ```The culprit being simply that:
config.Version, even though it sends an ApiVersionsRequest