[KIP-932] Add commit async API#5353
Draft
Pranav Rathi (pranavrth) wants to merge 5 commits intodev_kip-932_acknowledge_public_apisfrom
Draft
[KIP-932] Add commit async API#5353Pranav Rathi (pranavrth) wants to merge 5 commits intodev_kip-932_acknowledge_public_apisfrom
Pranav Rathi (pranavrth) wants to merge 5 commits intodev_kip-932_acknowledge_public_apisfrom
Conversation
1. Add missing Kafka error codes 114-133 to rdkafka.h, including
SHARE_SESSION_NOT_FOUND, INVALID_SHARE_SESSION_EPOCH,
FENCED_STATE_EPOCH, and SHARE_SESSION_LIMIT_REACHED.
2. Implement rd_kafka_broker_session_reset() to recover from
session errors by resetting epoch to 0, removing toppars_to_forget
from toppars_in_session, moving remaining toppars back to
toppars_to_add, and destroying unprocessed ack_details from the
rko.
3. Rewrite error handling in rd_kafka_broker_share_fetch_reply():
- Reset session on SHARE_SESSION_NOT_FOUND,
INVALID_SHARE_SESSION_EPOCH, SHARE_SESSION_LIMIT_REACHED,
ERR__TRANSPORT, and REQUEST_TIMED_OUT.
- Refresh metadata on topic/leader errors.
- Destroy ack_details before reply in all code paths.
- Handle ERR__DESTROY with ack_details cleanup.
- Remove unnecessary rd_kafka_broker_fetch_backoff().
4. Add broker state check in SHARE_FETCH op handler: reject ops
with ERR__STATE when broker is not in UP state, preventing
requests before API version negotiation completes.
5. Add termination guards in rd_kafka_share_fetch_reply_op() and
rd_kafka_share_select_broker() to prevent new fetch/ack ops
during shutdown.
…owledge all offsets
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
1. Add rd_kafka_share_commit_async() API to send pending acknowledgements to brokers without waiting for the next poll. 2. Guard ack_all() with explicit_acks check in both consume_batch and commit_async so explicit acknowledgements are not overwritten with ACCEPT before extraction. 3. Decrement rkshare_unacked_cnt in inflight_ack_update() when an entry transitions from ACQUIRED to an explicit ack type, so the counter accurately tracks unacknowledged records. 4. Add unacked_cnt > 0 check in consume_batch for explicit mode, returning RD_KAFKA_RESP_ERR__STATE if records from the previous poll have not been acknowledged. 5. Add share_consumer_commit_async example with explicit ack mode, random RELEASE/ACCEPT, and mid-batch commit_async calls.
127a032 to
a2e2d59
Compare
c96ac5a to
6305b56
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.