ShareGroupHeartBeat API#5299
ShareGroupHeartBeat API#5299Ankith L (Ankith-Confluent) merged 8 commits intodev_kip-932_queues-for-kafkafrom
Conversation
- Introduced functions to set session timeout and heartbeat interval for sharegroups. - Implemented manual target assignment for sharegroup members. - Enhanced connection handling to clear states for closed connections.
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
This commit introduces a new test file `0155-share_consumer_mock.c` that implements a series of mock tests for the Share Consumer functionality in Kafka. The tests cover various scenarios including basic heartbeat flow, assignment redistribution during rebalances, multi-topic assignments, error injection handling, RTT injection, session timeouts, and manual target assignments. Additionally, the test suite is updated to include the new mock tests in `tests/test.c`, ensuring they are executed as part of the test run.
| */ | ||
| typedef struct rd_kafka_mock_sharegroup_member_s { | ||
| TAILQ_ENTRY(rd_kafka_mock_sharegroup_member_s) link; | ||
| char *id; /**< MemberId */ |
There was a problem hiding this comment.
Is Member Id server generated, in case client doesn't send memberId in params?
There was a problem hiding this comment.
Actually the member Id is generated by the consumer itself, if its not sent with the request an exception (UnknownMemberIdException) will be thrown. So we dont need to generate it in the broker side.
| /** | ||
| * @brief Share group target assignment (manual) | ||
| */ | ||
| typedef struct rd_kafka_mock_sharegroup_target_assignments_s { |
There was a problem hiding this comment.
For SGHB file, can we reuse CGHB code?
Eg. these functions seems to be common _write_TopicPartitions, session timer, find/get/destroy, member_find/active/fenced, set_timeout/interval, target_assignment struct, response builder
Can we check how much code can be deduplicated, then plan and commit changes?
There was a problem hiding this comment.
Sure
| } else if (MemberEpoch > member->member_epoch) { | ||
| /* Client epoch is ahead of server - indicates | ||
| * a bug or stale coordinator. */ | ||
| err = RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH; |
There was a problem hiding this comment.
Does this need to be RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID ?
There was a problem hiding this comment.
I checked the java implementation, in that they are returning FencedMemberEpochException in both the cases if its more or less, so RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH is being returned.
- Removed debug logging option from create_share_consumer function. - Simplified test output by removing excessive logging statements in share consumer mock tests. - Consolidated heartbeat wait logic and improved readability in test cases. - Added a new header file `rdkafka_mock_group_common.h` containing common macros and inline functions for managing group and member operations in mock broker implementations. - Introduced generic functions for finding groups and members in TAILQ, and a utility function to mark group members as active.
- Renamed test file from 0155-share_consumer_mock.c to 0155-share_group_heartbeat_mock.c to reflect the new focus on ShareGroupHeartbeat. - Implemented a series of tests to validate the ShareGroupHeartbeat flow, including: - Basic heartbeat operations (join, assignment, heartbeats, leave). - Assignment redistribution during consumer joins and leaves. - Multi-topic assignment handling with mixed subscriptions. - Error injection tests for UNKNOWN_MEMBER_ID and RTT scenarios. - Session timeout handling to ensure proper rebalancing. - Manual target assignment for consumers in a share group.
Kaushik Raina (k-raina)
left a comment
There was a problem hiding this comment.
Ankith L (@Ankith-Confluent) Seems like clang formatting is breaking CI.
src/rdkafka_fetcher.c:921:31: error: code should be clang-formatted [-Wclang-format-violations]06:50
rd_kafka_buf_read_i16(rkbuf, &AcknowledgementErrorCode); // AcknowledgementError06:50
^06:50
src/rdkafka_fetcher.c:921:65: error: code should be clang-formatted [-Wclang-format-violations]06:50
rd_kafka_buf_read_i16(rkbuf, &AcknowledgementErrorCode); // AcknowledgementError06:50
^06:50
src/rdkafka_fetcher.c:922:31: error: code should be clang-formatted [-Wclang-format-violations]06:50
rd_kafka_buf_read_str(rkbuf, &AcknowledgementErrorStr); // AcknowledgementErrorString06:50
^06:50
src/rdkafka_fetcher.c:922:64: error: code should be clang-formatted [-Wclang-format-violations]06:50
rd_kafka_buf_read_str(rkbuf, &AcknowledgementErrorStr); // AcknowledgementErrorString
There was a problem hiding this comment.
Pull request overview
Adds mock-broker support and test coverage for the KIP-932 ShareGroupHeartbeat flow used by share consumers, enabling deterministic local/mock testing of join/heartbeat/assignment behaviors in librdkafka’s mock cluster.
Changes:
- Implement mock sharegroup state, assignment logic, session timeout handling, and manual target-assignment APIs.
- Add a ShareGroupHeartbeat request handler to the mock broker.
- Add a new mock-based test suite for sharegroup heartbeats and assignments, and wire it into build/test systems.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| win32/librdkafka.vcxproj | Builds the new mock sharegroup source on Windows. |
| tests/test.c | Registers the new 0155 mock test. |
| tests/CMakeLists.txt | Adds the new 0155 test source to the test build. |
| tests/0155-share_group_heartbeat_mock.c | New mock-based tests for ShareGroupHeartbeat flows. |
| src/rdkafka_mock_sharegrp.c | New mock sharegroup implementation (members, assignment, timeouts, manual assignments). |
| src/rdkafka_mock_int.h | Adds mock sharegroup structs, defaults, and internal APIs to the mock cluster. |
| src/rdkafka_mock_handlers.c | Adds mock broker handler for ShareGroupHeartbeat (API key RD_KAFKAP_ShareGroupHeartbeat). |
| src/rdkafka_mock_group_common.h | New shared helpers/macros for mock group/member lookup and activity marking. |
| src/rdkafka_mock_cgrp.c | Refactors consumer-group mock to use new common helpers and fences sharegroup members on connection close. |
| src/rdkafka_mock.h | Exposes new public mock APIs for sharegroup timeout/heartbeat interval, target assignment, and member-id retrieval. |
| src/rdkafka_mock.c | Initializes/destroys sharegroups within mock cluster lifecycle. |
| src/Makefile | Adds the new mock sharegroup source to the build. |
| src/CMakeLists.txt | Adds the new mock sharegroup source to the CMake build. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| free(member_ids[0]); | ||
| free(member_ids[1]); | ||
| free(member_ids); |
There was a problem hiding this comment.
member_ids returned by rd_kafka_mock_sharegroup_get_member_ids() are allocated with rd_malloc/rd_strdup and the header comment says to free them with rd_free(). Using free() here can be wrong on some platforms/builds; use rd_free() for each string and for the array.
| free(member_ids[0]); | |
| free(member_ids[1]); | |
| free(member_ids); | |
| rd_free(member_ids[0]); | |
| rd_free(member_ids[1]); | |
| rd_free(member_ids); |
| /* HEARTBEAT: Existing member heartbeat */ | ||
| member = rd_kafka_mock_sharegroup_member_find( | ||
| mshgrp, &MemberId); | ||
| if (!member) { | ||
| err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID; | ||
| } else if (MemberEpoch > member->member_epoch) { | ||
| /* Client epoch is ahead of server - indicates | ||
| * a bug or stale coordinator. */ | ||
| err = RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH; | ||
| } else if (MemberEpoch < member->member_epoch) { | ||
| /* Client epoch is behind. Allow if it matches | ||
| * the previous epoch (response with bumped | ||
| * epoch may have been lost). Otherwise fence. | ||
| */ | ||
| if (MemberEpoch != | ||
| member->previous_member_epoch) { | ||
| err = | ||
| RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH; | ||
| } else { | ||
| /* Accept previous epoch - client is | ||
| * catching up */ | ||
| member->conn = mconn; | ||
| MemberEpoch = member->member_epoch; | ||
| } | ||
| } else { | ||
| /* Epoch matches - normal heartbeat */ | ||
| /* Check for subscription changes */ | ||
| if (SubscribedTopicNamesCnt >= 0 && | ||
| rd_kafka_mock_sharegroup_member_subscribed_topic_names_set( | ||
| member, SubscribedTopicNames, | ||
| SubscribedTopicNamesCnt)) { | ||
| assignment_changed = rd_true; | ||
| } | ||
| member->conn = mconn; | ||
| MemberEpoch = member->member_epoch; | ||
| } |
There was a problem hiding this comment.
ShareGroupHeartbeat processing does not mark the member as active on successful heartbeats (it updates member->conn/MemberEpoch but never updates ts_last_activity). This will cause members to time out even while heartbeating (breaking session-timeout semantics and likely making tests flaky). Call rd_kafka_mock_sharegroup_member_active() whenever a heartbeat is accepted (including the “catching up” previous-epoch path).
| mshgrp, &MemberId, MemberEpoch, mconn); | ||
|
|
||
| if (member) { | ||
| if (rd_kafka_mock_sharegroup_member_subscribed_topic_names_set( |
There was a problem hiding this comment.
On JOIN (MemberEpoch == 0) this calls rd_kafka_mock_sharegroup_member_subscribed_topic_names_set() even when SubscribedTopicNamesCnt is -1 (nullable array). Passing -1 through leads to rd_list_new(SubscribedTopicNamesCnt, ...) with a negative size. Only call the setter when SubscribedTopicNamesCnt >= 0, and treat -1 as “no change / not provided”.
| if (rd_kafka_mock_sharegroup_member_subscribed_topic_names_set( | |
| if (SubscribedTopicNamesCnt >= 0 && | |
| rd_kafka_mock_sharegroup_member_subscribed_topic_names_set( |
src/rdkafka_mock_sharegrp.c
Outdated
| if (!SubscribedTopicNamesCnt) { | ||
| /* No change */ | ||
| return rd_false; |
There was a problem hiding this comment.
rd_kafka_mock_sharegroup_member_subscribed_topic_names_set() treats SubscribedTopicNamesCnt==0 as “no change”, but for nullable arrays 0 typically means “empty subscription” (clear topics) while -1 means “null/no change”. This currently prevents clearing subscriptions and can mis-handle null counts. Handle SubscribedTopicNamesCnt < 0 explicitly, and when cnt==0 clear/destroy the existing list and return whether it changed.
| if (!SubscribedTopicNamesCnt) { | |
| /* No change */ | |
| return rd_false; | |
| if (SubscribedTopicNamesCnt < 0) { | |
| /* Null / no change */ | |
| return rd_false; | |
| } else if (SubscribedTopicNamesCnt == 0) { | |
| /* Empty subscription: clear existing list and report | |
| * whether there was any previous subscription. */ | |
| rd_bool_t had_topics = | |
| member->subscribed_topic_names && | |
| rd_list_cnt(member->subscribed_topic_names) > 0; | |
| RD_IF_FREE(member->subscribed_topic_names, rd_list_destroy); | |
| return had_topics; |
| /* Bump the epochs */ | ||
| TAILQ_FOREACH(member, &mshgrp->members, link) { | ||
| member->previous_member_epoch = member->member_epoch; | ||
| member->member_epoch = ++mshgrp->group_epoch; |
There was a problem hiding this comment.
Epoch bumping in manual target assignment increments mshgrp->group_epoch once per member (member_epoch = ++group_epoch), which will give different epochs to different members for the same assignment change. Bump the group epoch once, then set each member’s epoch to that same new value (and update previous_member_epoch accordingly).
| /* Bump the epochs */ | |
| TAILQ_FOREACH(member, &mshgrp->members, link) { | |
| member->previous_member_epoch = member->member_epoch; | |
| member->member_epoch = ++mshgrp->group_epoch; | |
| /* Bump the epochs: increment group_epoch once and apply to all members */ | |
| { | |
| int32_t new_epoch = ++mshgrp->group_epoch; | |
| TAILQ_FOREACH(member, &mshgrp->members, link) { | |
| member->previous_member_epoch = member->member_epoch; | |
| member->member_epoch = new_epoch; | |
| } |
| int hb_after_error = wait_share_heartbeats(mcluster, 1, 500); | ||
| TEST_ASSERT(hb_after_error >= 2, | ||
| "Expected at least 2 heartbeats after error " | ||
| "(error + rejoin), got %d", | ||
| hb_after_error); |
There was a problem hiding this comment.
This wait uses expected_cnt=1 but then asserts hb_after_error >= 2. test_mock_wait_matching_requests() waits until it has seen at least expected_cnt and then returns the final count, so this can return 1 and fail. Pass expected_cnt=2 (or adjust the assertion to match what you’re waiting for).
Kaushik Raina (k-raina)
left a comment
There was a problem hiding this comment.
Ack, pipeline is broken in base branch.
PR LGTM! Thanks
9151d2b
into
dev_kip-932_queues-for-kafka
* ShareGroupHeartBeat MVP * Add sharegroup session management and manual assignment features - Introduced functions to set session timeout and heartbeat interval for sharegroups. - Implemented manual target assignment for sharegroup members. - Enhanced connection handling to clear states for closed connections. * Add function to retrieve member IDs from a sharegroup and update assignment handling * Add mock tests for Share Consumer and ShareGroupHeartbeat This commit introduces a new test file `0155-share_consumer_mock.c` that implements a series of mock tests for the Share Consumer functionality in Kafka. The tests cover various scenarios including basic heartbeat flow, assignment redistribution during rebalances, multi-topic assignments, error injection handling, RTT injection, session timeouts, and manual target assignments. Additionally, the test suite is updated to include the new mock tests in `tests/test.c`, ensuring they are executed as part of the test run. * Refactor share consumer mock tests and add common group utilities - Removed debug logging option from create_share_consumer function. - Simplified test output by removing excessive logging statements in share consumer mock tests. - Consolidated heartbeat wait logic and improved readability in test cases. - Added a new header file `rdkafka_mock_group_common.h` containing common macros and inline functions for managing group and member operations in mock broker implementations. - Introduced generic functions for finding groups and members in TAILQ, and a utility function to mark group members as active. * Add mock tests for ShareGroupHeartbeat functionality - Renamed test file from 0155-share_consumer_mock.c to 0155-share_group_heartbeat_mock.c to reflect the new focus on ShareGroupHeartbeat. - Implemented a series of tests to validate the ShareGroupHeartbeat flow, including: - Basic heartbeat operations (join, assignment, heartbeats, leave). - Assignment redistribution during consumer joins and leaves. - Multi-topic assignment handling with mixed subscriptions. - Error injection tests for UNKNOWN_MEMBER_ID and RTT scenarios. - Session timeout handling to ensure proper rebalancing. - Manual target assignment for consumers in a share group. * Enhance RTT injection test with latency and adjusted polling intervals * Implement sharegroup member activation and enhance error injection tests for ShareGroupHeartbeat
* ShareGroupHeartBeat MVP * Add sharegroup session management and manual assignment features - Introduced functions to set session timeout and heartbeat interval for sharegroups. - Implemented manual target assignment for sharegroup members. - Enhanced connection handling to clear states for closed connections. * Add function to retrieve member IDs from a sharegroup and update assignment handling * Add mock tests for Share Consumer and ShareGroupHeartbeat This commit introduces a new test file `0155-share_consumer_mock.c` that implements a series of mock tests for the Share Consumer functionality in Kafka. The tests cover various scenarios including basic heartbeat flow, assignment redistribution during rebalances, multi-topic assignments, error injection handling, RTT injection, session timeouts, and manual target assignments. Additionally, the test suite is updated to include the new mock tests in `tests/test.c`, ensuring they are executed as part of the test run. * Refactor share consumer mock tests and add common group utilities - Removed debug logging option from create_share_consumer function. - Simplified test output by removing excessive logging statements in share consumer mock tests. - Consolidated heartbeat wait logic and improved readability in test cases. - Added a new header file `rdkafka_mock_group_common.h` containing common macros and inline functions for managing group and member operations in mock broker implementations. - Introduced generic functions for finding groups and members in TAILQ, and a utility function to mark group members as active. * Add mock tests for ShareGroupHeartbeat functionality - Renamed test file from 0155-share_consumer_mock.c to 0155-share_group_heartbeat_mock.c to reflect the new focus on ShareGroupHeartbeat. - Implemented a series of tests to validate the ShareGroupHeartbeat flow, including: - Basic heartbeat operations (join, assignment, heartbeats, leave). - Assignment redistribution during consumer joins and leaves. - Multi-topic assignment handling with mixed subscriptions. - Error injection tests for UNKNOWN_MEMBER_ID and RTT scenarios. - Session timeout handling to ensure proper rebalancing. - Manual target assignment for consumers in a share group. * Enhance RTT injection test with latency and adjusted polling intervals * Implement sharegroup member activation and enhance error injection tests for ShareGroupHeartbeat
* ShareGroupHeartBeat MVP * Add sharegroup session management and manual assignment features - Introduced functions to set session timeout and heartbeat interval for sharegroups. - Implemented manual target assignment for sharegroup members. - Enhanced connection handling to clear states for closed connections. * Add function to retrieve member IDs from a sharegroup and update assignment handling * Add mock tests for Share Consumer and ShareGroupHeartbeat This commit introduces a new test file `0155-share_consumer_mock.c` that implements a series of mock tests for the Share Consumer functionality in Kafka. The tests cover various scenarios including basic heartbeat flow, assignment redistribution during rebalances, multi-topic assignments, error injection handling, RTT injection, session timeouts, and manual target assignments. Additionally, the test suite is updated to include the new mock tests in `tests/test.c`, ensuring they are executed as part of the test run. * Refactor share consumer mock tests and add common group utilities - Removed debug logging option from create_share_consumer function. - Simplified test output by removing excessive logging statements in share consumer mock tests. - Consolidated heartbeat wait logic and improved readability in test cases. - Added a new header file `rdkafka_mock_group_common.h` containing common macros and inline functions for managing group and member operations in mock broker implementations. - Introduced generic functions for finding groups and members in TAILQ, and a utility function to mark group members as active. * Add mock tests for ShareGroupHeartbeat functionality - Renamed test file from 0155-share_consumer_mock.c to 0155-share_group_heartbeat_mock.c to reflect the new focus on ShareGroupHeartbeat. - Implemented a series of tests to validate the ShareGroupHeartbeat flow, including: - Basic heartbeat operations (join, assignment, heartbeats, leave). - Assignment redistribution during consumer joins and leaves. - Multi-topic assignment handling with mixed subscriptions. - Error injection tests for UNKNOWN_MEMBER_ID and RTT scenarios. - Session timeout handling to ensure proper rebalancing. - Manual target assignment for consumers in a share group. * Enhance RTT injection test with latency and adjusted polling intervals * Implement sharegroup member activation and enhance error injection tests for ShareGroupHeartbeat
Design :
https://confluentinc.atlassian.net/wiki/x/-QDSMwE