Poll flow using acks sent received from the app thread#5339
Draft
Pranav Rathi (pranavrth) wants to merge 43 commits intodev_kip-932_queues-for-kafkafrom
Draft
Poll flow using acks sent received from the app thread#5339Pranav Rathi (pranavrth) wants to merge 43 commits intodev_kip-932_queues-for-kafkafrom
Pranav Rathi (pranavrth) wants to merge 43 commits intodev_kip-932_queues-for-kafkafrom
Conversation
* Add SHARE_FETCH_FANOUT op to send fetch request to all the brokers * Add SHARE_FETCH op and related fields which is sent to each broker * select broker to fetch from as only 1 broker is allowed to fetch at a time * retry skelton for share fetch replies from brokers * add max.poll.records config * start returning messages to user from poll from normal fetch * Update consumer example to work as share consumer
* Get SHARE_FETCH op * Add Share Fetch Request and Response parsing * Add messages to the consumer queue * Use broker toppars list to fetch messages * Handle fetch and should not fetch cases
* Add session partition representation * Add ops to add and remove session partitions * Manipulate partitions in session based on received response from the Share Fetch request * Update consumer to use session partitions for fetch requests instead of broker toppars * Add session epoch management for positive cases * Added acknowledgement handling only for the first batch in the response
Note: Not working properly. Improve this once consumer close is designed and implemented.
…ich was done just to make initial basic share fetch work.
* Add new APIs specific to share consumer which takes share consumer type instead of rdkafka_t handle * Add new example for share consumer * Move checking of need of share fetch request to the caller from the request implementation itself * Fix incorrectly parsing ShareFetch Response if MessageSetSize was 0 * Fix not reading buffer properly when rktp is not locally present in Share Fetch Response. Missed reading acknowledgement part if rktp was not present locally. * Fix max.poll.records not being honored when sending Share Fetch Request * Fix calling share_fetch even if there was some error due to which we could directly return to the main thread * Add more TODOs wherever necessary * Remove some unnecessary asserts * Improve some naming * Address some more minor PR comments
…g Uuid without url encoing. Fix by explicitly doing url encoding for the generated Uuid. Should be fixed on the broker side and then revert this change.
* ShareGroupHeartBeat MVP * Add sharegroup session management and manual assignment features - Introduced functions to set session timeout and heartbeat interval for sharegroups. - Implemented manual target assignment for sharegroup members. - Enhanced connection handling to clear states for closed connections. * Add function to retrieve member IDs from a sharegroup and update assignment handling * Add mock tests for Share Consumer and ShareGroupHeartbeat This commit introduces a new test file `0155-share_consumer_mock.c` that implements a series of mock tests for the Share Consumer functionality in Kafka. The tests cover various scenarios including basic heartbeat flow, assignment redistribution during rebalances, multi-topic assignments, error injection handling, RTT injection, session timeouts, and manual target assignments. Additionally, the test suite is updated to include the new mock tests in `tests/test.c`, ensuring they are executed as part of the test run. * Refactor share consumer mock tests and add common group utilities - Removed debug logging option from create_share_consumer function. - Simplified test output by removing excessive logging statements in share consumer mock tests. - Consolidated heartbeat wait logic and improved readability in test cases. - Added a new header file `rdkafka_mock_group_common.h` containing common macros and inline functions for managing group and member operations in mock broker implementations. - Introduced generic functions for finding groups and members in TAILQ, and a utility function to mark group members as active. * Add mock tests for ShareGroupHeartbeat functionality - Renamed test file from 0155-share_consumer_mock.c to 0155-share_group_heartbeat_mock.c to reflect the new focus on ShareGroupHeartbeat. - Implemented a series of tests to validate the ShareGroupHeartbeat flow, including: - Basic heartbeat operations (join, assignment, heartbeats, leave). - Assignment redistribution during consumer joins and leaves. - Multi-topic assignment handling with mixed subscriptions. - Error injection tests for UNKNOWN_MEMBER_ID and RTT scenarios. - Session timeout handling to ensure proper rebalancing. - Manual target assignment for consumers in a share group. * Enhance RTT injection test with latency and adjusted polling intervals * Implement sharegroup member activation and enhance error injection tests for ShareGroupHeartbeat
* Mock Broker : Implement share fetch * ShareGroupHeartBeat MVP * Add sharegroup session management and manual assignment features - Introduced functions to set session timeout and heartbeat interval for sharegroups. - Implemented manual target assignment for sharegroup members. - Enhanced connection handling to clear states for closed connections. * Add function to retrieve member IDs from a sharegroup and update assignment handling * Mock Broker : Implement share fetch * Fix SGHB return and fetcher api version * Add test for share fetch API * Fix session workflow * Update API version * Add handing for ForgottenTopicsData * Add lock expiry logic * Remove duplicates with sghb * Address feedbacks * Implement max delivery attempts and record lock * Add member validation for share fetch * Remove tmp produce code * Implement feedback * Minor * Add test to CMakeList * Fix minor test failure --------- Co-authored-by: Ankith-Confluent <anl@confluent.io>
* Added more tests for SGHB * Enable test for unknown member ID error in share group heartbeat mock * Remove source references from share group heartbeat test cases
* Add tests for ShareFetch error handling and implicit close * Remove Java references from ShareFetch error handling test descriptions
Rewrite rd_kafka_ShareFetchRequest() to use ack_details from the SHARE_FETCH op instead of reading rktp->rktp_share_acknowledge. Separate toppars_to_add (no acks) and ack_details (with ack batches) into two independent sections of the Topics array. Move session copy logic (adding_toppars/forgetting_toppars) into the request function. Remove rd_kafka_broker_share_fetch_get_toppars_to_send() entirely. Set records_fetched on the reply op in broker thread.
Remove the legacy rktp_share_acknowledge struct and count fields that stored acquired record ranges on the toppar for implicit ack piggybacking. This flow is fully replaced by ack_details on the SHARE_FETCH op built from the inflight map. Remove dead writes in response parsing, cleanup in session clear, helper functions, and destructor assertions.
Rewrite rd_kafka_share_consume_batch() to drain rk_rep callbacks independently before and after serving records from the consumer queue. Use a 4-case decision matrix based on has_records/has_pending_acks to determine whether to send FANOUT ops and with what fetch_more_records flag. Remove rd_kafka_share_poll_set_consumer() calls from example and test since rk_rep forwarding is no longer needed.
- Free ack_details list and batches in SHARE_FETCH op destroy - Clean up rkb_share_async_ack_details in broker decommission with assert in destroy_final to catch leaks - Refactor segregate_acks_by_leader to use rd_list_pop() preventing double-free when source list is destroyed after segregation - Properly destroy dropped batches (no leader) with rktpar cleanup
Fix two memory safety bugs found via AddressSanitizer: 1. rdkafka_op.c: Remove redundant rd_free() after rd_list_destroy() on SHARE_FETCH op's ack_details. The list was created with rd_list_new() which sets RD_LIST_F_ALLOCATED, so rd_list_destroy() already frees the struct. 2. rdkafka_fetcher.c: Null local pointer after rd_list_destroy() in rd_kafka_broker_session_update_toppars_list(). When the toppars_to_remove_from list was destroyed mid-loop, only the caller's pointer was nulled via the double-pointer, but the local copy still held the dangling address, causing use-after-free on the next loop iteration.
…ffer 1. rdkafka_fetcher.c: Add NULL check for rd_kafka_share_find_entry_for_offset() return value in rd_kafka_share_build_response_rko(). When a message offset falls outside all ack batch entry ranges, the function returns NULL, causing a segfault at entry->types dereference (address 0x20). Skip the message with a debug log instead. 2. examples/share_consumer.c: Increase rkmessages buffer from 500 to 10001 to avoid stack buffer overflow when large batches are returned.
…g printing Fix deadlock in rd_kafka_share_select_broker() where a break statement inside the broker lock caused the lock to never be released, deadlocking the broker thread when it tried to acquire the same lock in rd_kafka_ShareFetchRequest(). Also add temporary printf-based debug printing of SHARE_FETCH op details (should_fetch flag and ack offset ranges) for integration testing visibility.
Introduces rkshare_fetch_more_records_requested flag to suppress redundant fetch FANOUTs while one is already in flight. Also adds early exit for ack-only FANOUTs with no acks and improves ack error logging in ShareFetch response handling.
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
db788b7 to
6ac3c9c
Compare
Base automatically changed from
dev_kip-932_segregation_and_ack_mapping
to
dev_kip-932_queues-for-kafka
March 9, 2026 14:42
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.