diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index bbe63cff48..58a8236516 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -51,6 +51,7 @@ set( rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c + rdkafka_mock_sharegrp.c rdkafka_error.c rdkafka_fetcher.c rdkafka_telemetry.c diff --git a/src/Makefile b/src/Makefile index 0d0635ce30..87de298cbe 100644 --- a/src/Makefile +++ b/src/Makefile @@ -57,7 +57,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \ rdkafka_background.c rdkafka_idempotence.c rdkafka_cert.c \ rdkafka_txnmgr.c rdkafka_coord.c rdbase64.c \ rdvarint.c rdbuf.c rdmap.c rdunittest.c \ - rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c \ + rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c rdkafka_mock_sharegrp.c \ rdkafka_error.c rdkafka_fetcher.c rdkafka_telemetry.c \ rdkafka_telemetry_encode.c rdkafka_telemetry_decode.c \ nanopb/pb_encode.c nanopb/pb_decode.c nanopb/pb_common.c \ diff --git a/src/rdkafka_mock.c b/src/rdkafka_mock.c index cdc0445f28..f3d3e0b088 100644 --- a/src/rdkafka_mock.c +++ b/src/rdkafka_mock.c @@ -2708,6 +2708,7 @@ static void rd_kafka_mock_cluster_destroy0(rd_kafka_mock_cluster_t *mcluster) { rd_kafka_mock_broker_t *mrkb; rd_kafka_mock_cgrp_classic_t *mcgrp_classic; rd_kafka_mock_cgrp_consumer_t *mcgrp_consumer; + rd_kafka_mock_sharegroup_t *mshgrp; rd_kafka_mock_coord_t *mcoord; rd_kafka_mock_error_stack_t *errstack; thrd_t dummy_rkb_thread; @@ -2726,6 +2727,9 @@ static void rd_kafka_mock_cluster_destroy0(rd_kafka_mock_cluster_t *mcluster) { while ((mcgrp_consumer = TAILQ_FIRST(&mcluster->cgrps_consumer))) rd_kafka_mock_cgrp_consumer_destroy(mcgrp_consumer); + while ((mshgrp = TAILQ_FIRST(&mcluster->sharegrps))) + rd_kafka_mock_sharegroup_destroy(mshgrp); + while ((mcoord = TAILQ_FIRST(&mcluster->coords))) rd_kafka_mock_coord_destroy(mcluster, mcoord); @@ -2843,6 +2847,8 @@ rd_kafka_mock_cluster_t *rd_kafka_mock_cluster_new(rd_kafka_t *rk, TAILQ_INIT(&mcluster->cgrps_consumer); + rd_kafka_mock_sharegrps_init(mcluster); + TAILQ_INIT(&mcluster->coords); rd_list_init(&mcluster->pids, 16, rd_free); diff --git a/src/rdkafka_mock.h b/src/rdkafka_mock.h index 0b81b312ef..1be895705a 100644 --- a/src/rdkafka_mock.h +++ b/src/rdkafka_mock.h @@ -601,6 +601,62 @@ RD_EXPORT void rd_kafka_mock_set_group_consumer_heartbeat_interval_ms( rd_kafka_mock_cluster_t *mcluster, int group_consumer_heartbeat_interval_ms); +/** + * @brief Set the sharegroup session timeout in milliseconds. + * + * @param mcluster Mock cluster instance. + * @param session_timeout_ms Session timeout in milliseconds. + */ +RD_EXPORT void rd_kafka_mock_sharegroup_set_session_timeout( + rd_kafka_mock_cluster_t *mcluster, + int session_timeout_ms); + +/** + * @brief Set the sharegroup heartbeat interval in milliseconds. + * + * @param mcluster Mock cluster instance. + * @param heartbeat_interval_ms Heartbeat interval in milliseconds. + */ +RD_EXPORT void rd_kafka_mock_sharegroup_set_heartbeat_interval( + rd_kafka_mock_cluster_t *mcluster, + int heartbeat_interval_ms); + +/** + * @brief Set a manual target assignment for a sharegroup. + * + * This allows tests to override the automatic partition assignment + * and manually specify which partitions each member should get. + * + * @param mcluster Mock cluster instance. + * @param group_id The sharegroup ID. + * @param member_ids Array of member IDs (strings). + * @param assignments Array of partition lists (one per member). + * @param member_cnt Number of members (length of both arrays). + */ +RD_EXPORT void rd_kafka_mock_sharegroup_target_assignment( + rd_kafka_mock_cluster_t *mcluster, + const char *group_id, + const char **member_ids, + rd_kafka_topic_partition_list_t **assignments, + size_t member_cnt); + +/** + * @brief Retrieve the member IDs from a sharegroup. + * + * @param mcluster Mock cluster instance. + * @param group_id The sharegroup ID. + * @param member_ids_out Output array of member IDs. Caller must free each + * string with rd_free() and the array itself. + * @param member_cnt_out Output count of members. + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, + * RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND if sharegroup not found. + */ +RD_EXPORT rd_kafka_resp_err_t rd_kafka_mock_sharegroup_get_member_ids( + rd_kafka_mock_cluster_t *mcluster, + const char *group_id, + char ***member_ids_out, + size_t *member_cnt_out); /**@}*/ diff --git a/src/rdkafka_mock_cgrp.c b/src/rdkafka_mock_cgrp.c index 0c75e003e5..5f36c0b55a 100644 --- a/src/rdkafka_mock_cgrp.c +++ b/src/rdkafka_mock_cgrp.c @@ -35,6 +35,7 @@ #include "rdkafka_int.h" #include "rdbuf.h" #include "rdkafka_mock_int.h" +#include "rdkafka_mock_group_common.h" static const char *rd_kafka_mock_cgrp_classic_state_names[] = { @@ -72,10 +73,9 @@ rd_kafka_mock_cgrp_classic_set_state(rd_kafka_mock_cgrp_classic_t *mcgrp, void rd_kafka_mock_cgrp_classic_member_active( rd_kafka_mock_cgrp_classic_t *mcgrp, rd_kafka_mock_cgrp_classic_member_t *member) { - rd_kafka_dbg(mcgrp->cluster->rk, MOCK, "MOCK", - "Marking mock consumer group member %s as active", - member->id); - member->ts_last_activity = rd_clock(); + rd_kafka_mock_group_member_mark_active(mcgrp->cluster->rk, "classic", + member->id, + &member->ts_last_activity); } @@ -537,13 +537,8 @@ static void rd_kafka_mock_cgrp_classic_member_destroy( rd_kafka_mock_cgrp_classic_member_t *rd_kafka_mock_cgrp_classic_member_find( const rd_kafka_mock_cgrp_classic_t *mcgrp, const rd_kafkap_str_t *MemberId) { - const rd_kafka_mock_cgrp_classic_member_t *member; - TAILQ_FOREACH(member, &mcgrp->members, link) { - if (!rd_kafkap_str_cmp_str(MemberId, member->id)) - return (rd_kafka_mock_cgrp_classic_member_t *)member; - } - - return NULL; + return RD_KAFKA_MOCK_MEMBER_FIND(&mcgrp->members, MemberId, + rd_kafka_mock_cgrp_classic_member_t); } @@ -659,13 +654,8 @@ void rd_kafka_mock_cgrp_classic_destroy(rd_kafka_mock_cgrp_classic_t *mcgrp) { rd_kafka_mock_cgrp_classic_t * rd_kafka_mock_cgrp_classic_find(rd_kafka_mock_cluster_t *mcluster, const rd_kafkap_str_t *GroupId) { - rd_kafka_mock_cgrp_classic_t *mcgrp; - TAILQ_FOREACH(mcgrp, &mcluster->cgrps_classic, link) { - if (!rd_kafkap_str_cmp_str(GroupId, mcgrp->id)) - return mcgrp; - } - - return NULL; + return RD_KAFKA_MOCK_GROUP_FIND(&mcluster->cgrps_classic, GroupId, + rd_kafka_mock_cgrp_classic_t); } @@ -1335,10 +1325,9 @@ rd_kafka_mock_cgrp_consumer_member_next_assignment( void rd_kafka_mock_cgrp_consumer_member_active( rd_kafka_mock_cgrp_consumer_t *mcgrp, rd_kafka_mock_cgrp_consumer_member_t *member) { - rd_kafka_dbg(mcgrp->cluster->rk, MOCK, "MOCK", - "Marking mock consumer group member %s as active", - member->id); - member->ts_last_activity = rd_clock(); + rd_kafka_mock_group_member_mark_active( + mcgrp->cluster->rk, "consumer", member->id, + &member->ts_last_activity); } /** @@ -1353,13 +1342,8 @@ void rd_kafka_mock_cgrp_consumer_member_active( rd_kafka_mock_cgrp_consumer_member_t *rd_kafka_mock_cgrp_consumer_member_find( const rd_kafka_mock_cgrp_consumer_t *mcgrp, const rd_kafkap_str_t *MemberId) { - const rd_kafka_mock_cgrp_consumer_member_t *member; - TAILQ_FOREACH(member, &mcgrp->members, link) { - if (!rd_kafkap_str_cmp_str(MemberId, member->id)) - return (rd_kafka_mock_cgrp_consumer_member_t *)member; - } - - return NULL; + return RD_KAFKA_MOCK_MEMBER_FIND(&mcgrp->members, MemberId, + rd_kafka_mock_cgrp_consumer_member_t); } /** @@ -1700,13 +1684,8 @@ void rd_kafka_mock_cgrp_consumer_member_fenced( rd_kafka_mock_cgrp_consumer_t * rd_kafka_mock_cgrp_consumer_find(const rd_kafka_mock_cluster_t *mcluster, const rd_kafkap_str_t *GroupId) { - rd_kafka_mock_cgrp_consumer_t *mcgrp; - TAILQ_FOREACH(mcgrp, &mcluster->cgrps_consumer, link) { - if (!rd_kafkap_str_cmp_str(GroupId, mcgrp->id)) - return mcgrp; - } - - return NULL; + return RD_KAFKA_MOCK_GROUP_FIND(&mcluster->cgrps_consumer, GroupId, + rd_kafka_mock_cgrp_consumer_t); } /** @@ -1873,4 +1852,5 @@ void rd_kafka_mock_cgrps_connection_closed(rd_kafka_mock_cluster_t *mcluster, rd_kafka_mock_connection_t *mconn) { rd_kafka_mock_cgrps_classic_connection_closed(mcluster, mconn); rd_kafka_mock_cgrps_consumer_connection_closed(mcluster, mconn); + rd_kafka_mock_sharegrps_connection_closed(mcluster, mconn); } diff --git a/src/rdkafka_mock_group_common.h b/src/rdkafka_mock_group_common.h new file mode 100644 index 0000000000..9667cdab96 --- /dev/null +++ b/src/rdkafka_mock_group_common.h @@ -0,0 +1,98 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2025, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * @file rdkafka_mock_group_common.h + * @brief Common macros and inline functions shared between + * ConsumerGroupHeartbeat (KIP-848) and ShareGroupHeartbeat (KIP-932) + * mock broker implementations. + */ + +#ifndef _RDKAFKA_MOCK_GROUP_COMMON_H_ +#define _RDKAFKA_MOCK_GROUP_COMMON_H_ + +#include "rdkafka_int.h" + +/** + * @brief Generic group find by GroupId in a TAILQ. + * + * @param groups_head Pointer to TAILQ_HEAD of groups. + * @param GroupId rd_kafkap_str_t pointer to search for. + * @param group_type The group struct type (e.g., rd_kafka_mock_sharegroup_t). + * + * @returns Pointer to found group, or NULL if not found. + */ +#define RD_KAFKA_MOCK_GROUP_FIND(groups_head, GroupId, group_type) \ + ({ \ + group_type *_grp = NULL; \ + TAILQ_FOREACH(_grp, groups_head, link) { \ + if (!rd_kafkap_str_cmp_str(GroupId, _grp->id)) \ + break; \ + } \ + _grp; \ + }) + +/** + * @brief Generic member find by MemberId in a TAILQ. + * + * @param members_head Pointer to TAILQ_HEAD of members. + * @param MemberId rd_kafkap_str_t pointer to search for. + * @param member_type The member struct type. + * + * @returns Pointer to found member, or NULL if not found. + */ +#define RD_KAFKA_MOCK_MEMBER_FIND(members_head, MemberId, member_type) \ + ({ \ + member_type *_member = NULL; \ + TAILQ_FOREACH(_member, members_head, link) { \ + if (!rd_kafkap_str_cmp_str(MemberId, _member->id)) \ + break; \ + } \ + _member; \ + }) + +/** + * @brief Mark a group member as active by updating its last activity timestamp. + * + * @param rk rd_kafka_t handle for logging. + * @param group_type String describing the group type ("consumer" or "share"). + * @param member_id Member ID string for logging. + * @param ts_last_activity Pointer to the member's last activity timestamp. + */ +static RD_INLINE RD_UNUSED void rd_kafka_mock_group_member_mark_active( + rd_kafka_t *rk, + const char *group_type, + const char *member_id, + rd_ts_t *ts_last_activity) { + rd_kafka_dbg(rk, MOCK, "MOCK", + "Marking mock %s group member %s as active", group_type, + member_id); + *ts_last_activity = rd_clock(); +} + +#endif /* _RDKAFKA_MOCK_GROUP_COMMON_H_ */ diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index ad509ecceb..00fa89273a 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -2993,6 +2993,233 @@ rd_kafka_mock_handle_ConsumerGroupHeartbeat(rd_kafka_mock_connection_t *mconn, return -1; } +/** + * @brief Helper to write assignment TopicPartitions to ShareGroupHeartbeat + * response. + */ +static void rd_kafka_mock_handle_ShareGroupHeartbeat_write_TopicPartitions( + rd_kafka_buf_t *resp, + rd_kafka_topic_partition_list_t *assignment) { + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + + rd_kafka_topic_partition_list_sort_by_topic_id(assignment); + rd_kafka_buf_write_topic_partitions( + resp, assignment, rd_false /* don't skip invalid offsets */, + rd_false /* any offset */, rd_true /* use topic id */, + rd_false /* don't use topic name */, fields); +} + +/** + * @brief Handle ShareGroupHeartbeat request (API Key 76). + */ +static int +rd_kafka_mock_handle_ShareGroupHeartbeat(rd_kafka_mock_connection_t *mconn, + rd_kafka_buf_t *rkbuf) { + const rd_bool_t log_decode_errors = rd_true; + rd_kafka_mock_cluster_t *mcluster = mconn->broker->cluster; + rd_kafka_buf_t *resp; + rd_kafkap_str_t GroupId, MemberId, RackId; + rd_kafkap_str_t *SubscribedTopicNames = NULL; + int32_t MemberEpoch = 0, SubscribedTopicNamesCnt; + int32_t i; + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_kafka_mock_sharegroup_t *mshgrp = NULL; + rd_kafka_mock_sharegroup_member_t *member = NULL; + rd_bool_t assignment_changed = rd_false; + + resp = rd_kafka_mock_buf_new_response(rkbuf); + + /* Inject Error */ + err = rd_kafka_mock_next_request_error(mconn, resp); + if (err) + goto build_response; + + /* GroupId */ + rd_kafka_buf_read_str(rkbuf, &GroupId); + + /* Coordinator check */ + { + rd_kafka_mock_broker_t *mrkb; + + mrkb = rd_kafka_mock_cluster_get_coord( + mcluster, RD_KAFKA_COORD_GROUP, &GroupId); + + if (!mrkb) + err = RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE; + else if (mrkb != mconn->broker) + err = RD_KAFKA_RESP_ERR_NOT_COORDINATOR; + } + + if (err) + goto build_response; + + /* MemberId */ + rd_kafka_buf_read_str(rkbuf, &MemberId); + + /* MemberEpoch */ + rd_kafka_buf_read_i32(rkbuf, &MemberEpoch); + + /* RackId (nullable) */ + rd_kafka_buf_read_str(rkbuf, &RackId); + + /* SubscribedTopicNames array (nullable) */ + rd_kafka_buf_read_arraycnt(rkbuf, &SubscribedTopicNamesCnt, + RD_KAFKAP_TOPICS_MAX); + if (SubscribedTopicNamesCnt >= 0) { + SubscribedTopicNames = rd_calloc( + SubscribedTopicNamesCnt > 0 ? SubscribedTopicNamesCnt : 1, + sizeof(rd_kafkap_str_t)); + for (i = 0; i < SubscribedTopicNamesCnt; i++) { + rd_kafka_buf_read_str(rkbuf, &SubscribedTopicNames[i]); + } + } + + { + mtx_lock(&mcluster->lock); + + mshgrp = rd_kafka_mock_sharegroup_get(mcluster, &GroupId); + + if (MemberEpoch == -1) { + /* LEAVE: Member wants to leave */ + member = rd_kafka_mock_sharegroup_member_find( + mshgrp, &MemberId); + if (member) { + rd_kafka_mock_sharegroup_member_destroy(mshgrp, + member); + member = NULL; + assignment_changed = rd_true; + } + + } else if (MemberEpoch == 0) { + /* JOIN: New member wants to join */ + member = rd_kafka_mock_sharegroup_member_get( + mshgrp, &MemberId, MemberEpoch, mconn); + + if (member) { + if (rd_kafka_mock_sharegroup_member_subscribed_topic_names_set( + member, SubscribedTopicNames, + SubscribedTopicNamesCnt)) { + assignment_changed = rd_true; + } else { + /* New member always triggers + * recalculation */ + assignment_changed = rd_true; + } + MemberEpoch = member->member_epoch; + } else { + err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID; + } + + } else { + /* HEARTBEAT: Existing member heartbeat */ + member = rd_kafka_mock_sharegroup_member_find( + mshgrp, &MemberId); + if (!member) { + err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID; + } else if (MemberEpoch > member->member_epoch) { + /* Client epoch is ahead of server - indicates + * a bug or stale coordinator. */ + err = RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH; + } else if (MemberEpoch < member->member_epoch) { + /* Client epoch is behind. Allow if it matches + * the previous epoch (response with bumped + * epoch may have been lost). Otherwise fence. + */ + if (MemberEpoch != + member->previous_member_epoch) { + err = + RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH; + } else { + /* Accept previous epoch - client is + * catching up */ + member->conn = mconn; + MemberEpoch = member->member_epoch; + rd_kafka_mock_sharegroup_member_active( + mshgrp, member); + } + } else { + /* Epoch matches - normal heartbeat */ + /* Check for subscription changes */ + if (SubscribedTopicNamesCnt >= 0 && + rd_kafka_mock_sharegroup_member_subscribed_topic_names_set( + member, SubscribedTopicNames, + SubscribedTopicNamesCnt)) { + assignment_changed = rd_true; + } + member->conn = mconn; + MemberEpoch = member->member_epoch; + rd_kafka_mock_sharegroup_member_active(mshgrp, + member); + } + } + + /* Recalculate assignments if needed */ + if (assignment_changed && mshgrp->member_cnt > 0) { + rd_kafka_mock_sharegroup_assignment_recalculate(mshgrp); + if (member) + MemberEpoch = member->member_epoch; + } + + mtx_unlock(&mcluster->lock); + } + +build_response: + + /* ThrottleTimeMs */ + rd_kafka_buf_write_i32(resp, 0); + + /* ErrorCode */ + rd_kafka_buf_write_i16(resp, err); + + /* ErrorMessage */ + if (err) + rd_kafka_buf_write_str(resp, rd_kafka_err2str(err), -1); + else + rd_kafka_buf_write_str(resp, NULL, -1); + + /* MemberId */ + if (!err && member) + rd_kafka_buf_write_str(resp, member->id, -1); + else + rd_kafka_buf_write_str(resp, NULL, -1); + + /* MemberEpoch */ + rd_kafka_buf_write_i32(resp, MemberEpoch); + + /* HeartbeatIntervalMs */ + if (mshgrp) + rd_kafka_buf_write_i32(resp, mshgrp->heartbeat_interval_ms); + else + rd_kafka_buf_write_i32(resp, 5000); + + /* Assignment */ + if (!err && member && member->assignment) { + /* Send assignment even if empty (cnt == 0). + * Null (-1) means "no change", while an empty assignment + * means "you have 0 partitions". */ + rd_kafka_buf_write_i8(resp, 1); + rd_kafka_mock_handle_ShareGroupHeartbeat_write_TopicPartitions( + resp, member->assignment); + rd_kafka_buf_write_tags_empty(resp); + } else { + rd_kafka_buf_write_i8(resp, -1); + } + + rd_kafka_buf_write_tags_empty(resp); + + rd_kafka_mock_connection_send_response(mconn, resp); + + RD_IF_FREE(SubscribedTopicNames, rd_free); + return 0; + +err_parse: + RD_IF_FREE(SubscribedTopicNames, rd_free); + rd_kafka_buf_destroy(resp); + return -1; +} + /** * @brief Default request handlers */ @@ -3025,6 +3252,8 @@ const struct rd_kafka_mock_api_handler {2, 2, -1, rd_kafka_mock_handle_OffsetForLeaderEpoch}, [RD_KAFKAP_ConsumerGroupHeartbeat] = {1, 1, 1, rd_kafka_mock_handle_ConsumerGroupHeartbeat}, + [RD_KAFKAP_ShareGroupHeartbeat] = + {1, 1, 1, rd_kafka_mock_handle_ShareGroupHeartbeat}, [RD_KAFKAP_GetTelemetrySubscriptions] = {0, 0, 0, rd_kafka_mock_handle_GetTelemetrySubscriptions}, [RD_KAFKAP_PushTelemetry] = {0, 0, 0, diff --git a/src/rdkafka_mock_int.h b/src/rdkafka_mock_int.h index 2ef7a2a339..aa84c5f2c5 100644 --- a/src/rdkafka_mock_int.h +++ b/src/rdkafka_mock_int.h @@ -179,6 +179,39 @@ typedef struct rd_kafka_mock_cgrp_consumer_member_s { rd_kafka_mock_cgrp_consumer_t *mcgrp; /**< Consumer group */ } rd_kafka_mock_cgrp_consumer_member_t; +/** + * @struct Share group (KIP-932). + */ +typedef struct rd_kafka_mock_sharegroup_s { + TAILQ_ENTRY(rd_kafka_mock_sharegroup_s) link; + struct rd_kafka_mock_cluster_s *cluster; /**< Cluster */ + char *id; /**< Share group Id */ + int32_t group_epoch; /**< Group epoch */ + int session_timeout_ms; /**< Session timeout */ + rd_kafka_timer_t session_tmr; /**< Session timeout timer */ + int heartbeat_interval_ms; /**< Heartbeat interval */ + TAILQ_HEAD(, rd_kafka_mock_sharegroup_member_s) + members; /**< Share group members */ + int member_cnt; /**< Number of share group members */ + rd_bool_t manual_assignment; /**< Use manual assignment */ +} rd_kafka_mock_sharegroup_t; + +/** + * @struct Share group member (KIP-932). + */ +typedef struct rd_kafka_mock_sharegroup_member_s { + TAILQ_ENTRY(rd_kafka_mock_sharegroup_member_s) link; + char *id; /**< MemberId */ + rd_ts_t ts_last_activity; /**< Last heartbeat timestamp */ + int32_t member_epoch; /**< Current member epoch */ + int32_t previous_member_epoch; /**< Previous member epoch (allows + * client to catch up if response + * with bumped epoch was lost) */ + rd_list_t *subscribed_topic_names; /**< Subscribed topic names */ + rd_kafka_topic_partition_list_t *assignment; /**< Current assignment */ + struct rd_kafka_mock_connection_s *conn; /**< Connection */ + rd_kafka_mock_sharegroup_t *mshgrp; /**< Share group */ +} rd_kafka_mock_sharegroup_member_t; /** * @struct TransactionalId + PID (+ optional sequence state) @@ -429,6 +462,8 @@ struct rd_kafka_mock_cluster_s { TAILQ_HEAD(, rd_kafka_mock_cgrp_consumer_s) cgrps_consumer; + TAILQ_HEAD(, rd_kafka_mock_sharegroup_s) sharegrps; + /** Explicit coordinators (set with mock_set_coordinator()) */ TAILQ_HEAD(, rd_kafka_mock_coord_s) coords; @@ -467,6 +502,10 @@ struct rd_kafka_mock_cluster_s { int group_consumer_session_timeout_ms; /** Heartbeat interval (KIP 848) */ int group_consumer_heartbeat_interval_ms; + /** Session timeout (KIP 932) */ + int sharegroup_session_timeout_ms; + /** Heartbeat interval (KIP 932) */ + int sharegroup_heartbeat_interval_ms; } defaults; /**< Dynamic array of IO handlers for corresponding fd in .fds */ @@ -700,6 +739,51 @@ rd_kafka_mock_cgrp_consumer_member_t *rd_kafka_mock_cgrp_consumer_member_add( void rd_kafka_mock_cgrps_connection_closed(rd_kafka_mock_cluster_t *mcluster, rd_kafka_mock_connection_t *mconn); + +/* Share group (KIP-932) */ + +void rd_kafka_mock_sharegrps_init(rd_kafka_mock_cluster_t *mcluster); + +rd_kafka_mock_sharegroup_t * +rd_kafka_mock_sharegroup_find(rd_kafka_mock_cluster_t *mcluster, + const rd_kafkap_str_t *GroupId); + +rd_kafka_mock_sharegroup_t * +rd_kafka_mock_sharegroup_get(rd_kafka_mock_cluster_t *mcluster, + const rd_kafkap_str_t *GroupId); + +void rd_kafka_mock_sharegroup_destroy(rd_kafka_mock_sharegroup_t *mshgrp); + +rd_kafka_mock_sharegroup_member_t * +rd_kafka_mock_sharegroup_member_find(rd_kafka_mock_sharegroup_t *mshgrp, + const rd_kafkap_str_t *MemberId); + +void rd_kafka_mock_sharegroup_member_destroy( + rd_kafka_mock_sharegroup_t *mshgrp, + rd_kafka_mock_sharegroup_member_t *member); + +void rd_kafka_mock_sharegroup_member_active( + rd_kafka_mock_sharegroup_t *mshgrp, + rd_kafka_mock_sharegroup_member_t *member); + +rd_kafka_mock_sharegroup_member_t * +rd_kafka_mock_sharegroup_member_get(rd_kafka_mock_sharegroup_t *mshgrp, + const rd_kafkap_str_t *MemberID, + int32_t MemberEpoch, + rd_kafka_mock_connection_t *mconn); + +rd_bool_t rd_kafka_mock_sharegroup_member_subscribed_topic_names_set( + rd_kafka_mock_sharegroup_member_t *member, + const rd_kafkap_str_t *SubscribedTopicNames, + int32_t SubscribedTopicNamesCnt); + +void rd_kafka_mock_sharegroup_assignment_recalculate( + rd_kafka_mock_sharegroup_t *mshgrp); + +void rd_kafka_mock_sharegrps_connection_closed( + rd_kafka_mock_cluster_t *mcluster, + rd_kafka_mock_connection_t *mconn); + /** *@} */ diff --git a/src/rdkafka_mock_sharegrp.c b/src/rdkafka_mock_sharegrp.c new file mode 100644 index 0000000000..15718a9fcf --- /dev/null +++ b/src/rdkafka_mock_sharegrp.c @@ -0,0 +1,747 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2025, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Mocks + * + */ + + +#include "rdkafka_int.h" +#include "rdbuf.h" +#include "rdkafka_mock_int.h" +#include "rdkafka_mock_group_common.h" + +/** + * @brief Share group target assignment (manual) + */ +typedef struct rd_kafka_mock_sharegroup_target_assignments_s { + rd_list_t member_ids; /**< List of member ids (char *) */ + rd_list_t assignment; /**< List of rd_kafka_topic_partition_list_t */ +} rd_kafka_mock_sharegroup_target_assignment_t; + +/* Forward declarations */ +static void rd_kafka_mock_sharegroup_session_tmr_cb(rd_kafka_timers_t *rkts, + void *arg); + +/** + * @brief Initializes sharegroups in mock cluster + */ +void rd_kafka_mock_sharegrps_init(rd_kafka_mock_cluster_t *mcluster) { + TAILQ_INIT(&mcluster->sharegrps); + mcluster->defaults.sharegroup_session_timeout_ms = 45000; + mcluster->defaults.sharegroup_heartbeat_interval_ms = 5000; +} + +/** + * @brief Find a share group by GroupId. + */ +rd_kafka_mock_sharegroup_t * +rd_kafka_mock_sharegroup_find(rd_kafka_mock_cluster_t *mcluster, + const rd_kafkap_str_t *GroupId) { + return RD_KAFKA_MOCK_GROUP_FIND(&mcluster->sharegrps, GroupId, + rd_kafka_mock_sharegroup_t); +} + +/** + * @brief Get or create a share group + */ +rd_kafka_mock_sharegroup_t * +rd_kafka_mock_sharegroup_get(rd_kafka_mock_cluster_t *mcluster, + const rd_kafkap_str_t *GroupID) { + rd_kafka_mock_sharegroup_t *mshgrp; + + /* Check if the share group already exists */ + mshgrp = rd_kafka_mock_sharegroup_find(mcluster, GroupID); + if (mshgrp) + return mshgrp; + + /* Create new share group */ + mshgrp = rd_calloc(1, sizeof(*mshgrp)); + mshgrp->cluster = mcluster; + mshgrp->id = RD_KAFKAP_STR_DUP(GroupID); + mshgrp->group_epoch = 1; + mshgrp->session_timeout_ms = + mcluster->defaults.sharegroup_session_timeout_ms; + mshgrp->heartbeat_interval_ms = + mcluster->defaults.sharegroup_heartbeat_interval_ms; + + TAILQ_INIT(&mshgrp->members); + mshgrp->member_cnt = 0; + + rd_kafka_timer_start(&mcluster->timers, &mshgrp->session_tmr, + 1000 * 1000 /* 1s */, + rd_kafka_mock_sharegroup_session_tmr_cb, mshgrp); + + TAILQ_INSERT_TAIL(&mcluster->sharegrps, mshgrp, link); + + return mshgrp; +} + +/** + * @brief Destroy a share group + */ +void rd_kafka_mock_sharegroup_destroy(rd_kafka_mock_sharegroup_t *mshgrp) { + rd_kafka_mock_sharegroup_member_t *member; + + TAILQ_REMOVE(&mshgrp->cluster->sharegrps, mshgrp, link); + rd_kafka_timer_stop(&mshgrp->cluster->timers, &mshgrp->session_tmr, + RD_DO_LOCK); + + /* Destroy all members */ + while ((member = TAILQ_FIRST(&mshgrp->members))) + rd_kafka_mock_sharegroup_member_destroy(mshgrp, member); + + rd_free(mshgrp->id); + rd_free(mshgrp); +} + +/** + * @brief Find a share group member by MemberId. + */ +rd_kafka_mock_sharegroup_member_t * +rd_kafka_mock_sharegroup_member_find(rd_kafka_mock_sharegroup_t *mshgrp, + const rd_kafkap_str_t *MemberId) { + return RD_KAFKA_MOCK_MEMBER_FIND(&mshgrp->members, MemberId, + rd_kafka_mock_sharegroup_member_t); +} + +/** + * @brief Destroy a share group member. + */ +void rd_kafka_mock_sharegroup_member_destroy( + rd_kafka_mock_sharegroup_t *mshgrp, + rd_kafka_mock_sharegroup_member_t *member) { + rd_assert(mshgrp->member_cnt > 0); + TAILQ_REMOVE(&mshgrp->members, member, link); + mshgrp->member_cnt--; + rd_free(member->id); + + RD_IF_FREE(member->subscribed_topic_names, rd_list_destroy_free); + RD_IF_FREE(member->assignment, rd_kafka_topic_partition_list_destroy); + rd_free(member); +} + +/** + * @brief Mark member as active. + */ +void rd_kafka_mock_sharegroup_member_active( + rd_kafka_mock_sharegroup_t *mshgrp, + rd_kafka_mock_sharegroup_member_t *member) { + rd_kafka_mock_group_member_mark_active(mshgrp->cluster->rk, "share", + member->id, + &member->ts_last_activity); +} + +/** + * @brief Fence a member. + */ +void rd_kafka_mock_sharegroup_member_fenced( + rd_kafka_mock_sharegroup_t *mshgrp, + rd_kafka_mock_sharegroup_member_t *member) { + rd_kafka_dbg(mshgrp->cluster->rk, MOCK, "MOCK", + "Member %s is fenced from sharegroup %s", member->id, + mshgrp->id); + + rd_kafka_mock_sharegroup_member_destroy(mshgrp, member); + + /* Recalculate assignments so remaining members get the + * freed partitions. */ + rd_kafka_mock_sharegroup_assignment_recalculate(mshgrp); +} + +/** + * @brief Check all members for inactivity and remove them if timed out. + */ +static void rd_kafka_mock_sharegroup_session_tmr_cb(rd_kafka_timers_t *rkts, + void *arg) { + rd_kafka_mock_sharegroup_t *mshgrp = arg; + rd_kafka_mock_sharegroup_member_t *member, *tmp; + rd_ts_t now = rd_clock(); + rd_kafka_mock_cluster_t *mcluster = mshgrp->cluster; + + mtx_lock(&mcluster->lock); + TAILQ_FOREACH_SAFE(member, &mshgrp->members, link, tmp) { + if (member->ts_last_activity + + (mshgrp->session_timeout_ms * 1000) > + now) + continue; + + rd_kafka_dbg(mcluster->rk, MOCK, "MOCK", + "Member %s session timed out for sharegroup %s", + member->id, mshgrp->id); + + rd_kafka_mock_sharegroup_member_fenced(mshgrp, member); + } + mtx_unlock(&mcluster->lock); +} + +/** + * @brief Get or create a share group member. + */ +rd_kafka_mock_sharegroup_member_t * +rd_kafka_mock_sharegroup_member_get(rd_kafka_mock_sharegroup_t *mshgrp, + const rd_kafkap_str_t *MemberID, + int32_t MemberEpoch, + rd_kafka_mock_connection_t *mconn) { + rd_kafka_mock_sharegroup_member_t *member; + + /* Check if the member already exists */ + member = rd_kafka_mock_sharegroup_member_find(mshgrp, MemberID); + if (member) { + member->conn = mconn; + rd_kafka_mock_sharegroup_member_active(mshgrp, member); + return member; + } + + /* Only create if epoch is 0 */ + if (MemberEpoch != 0) + return NULL; + + /* Create new member */ + member = rd_calloc(1, sizeof(*member)); + member->mshgrp = mshgrp; + member->id = RD_KAFKAP_STR_DUP(MemberID); + member->member_epoch = mshgrp->group_epoch; + member->previous_member_epoch = + -1; /* No previous epoch for new members */ + member->conn = mconn; + + TAILQ_INSERT_TAIL(&mshgrp->members, member, link); + mshgrp->member_cnt++; + rd_kafka_mock_sharegroup_member_active(mshgrp, member); + + return member; +} + +/** + * @brief Update share group member's subscribed topic names. + * + * @param member The member to update. + * @param SubscribedTopicNames Array of topic names. + * @param SubscribedTopicNamesCnt Count of topic names: + * -1 = unchanged (no modification) + * 0 = clear all subscriptions + * >0 = set to provided topics + * + * @returns rd_true if subscriptions changed, rd_false otherwise. + */ +rd_bool_t rd_kafka_mock_sharegroup_member_subscribed_topic_names_set( + rd_kafka_mock_sharegroup_member_t *member, + const rd_kafkap_str_t *SubscribedTopicNames, + int32_t SubscribedTopicNamesCnt) { + int32_t i; + + if (SubscribedTopicNamesCnt < 0) { + /* -1 means unchanged */ + return rd_false; + } + + if (SubscribedTopicNamesCnt == 0) { + /* 0 means clear all subscriptions */ + if (!member->subscribed_topic_names || + rd_list_cnt(member->subscribed_topic_names) == 0) { + /* Already empty, no change */ + return rd_false; + } + rd_list_destroy(member->subscribed_topic_names); + member->subscribed_topic_names = NULL; + return rd_true; + } + + /* SubscribedTopicNamesCnt > 0: Check if subscription changed */ + if (member->subscribed_topic_names) { + if (rd_list_cnt(member->subscribed_topic_names) == + SubscribedTopicNamesCnt) { + rd_bool_t same = rd_true; + char *topic; + int j; + + RD_LIST_FOREACH(topic, member->subscribed_topic_names, + j) { + rd_bool_t found = rd_false; + for (i = 0; i < SubscribedTopicNamesCnt; i++) { + if (!rd_kafkap_str_cmp_str( + &SubscribedTopicNames[i], + topic)) { + found = rd_true; + break; + } + } + if (!found) { + same = rd_false; + break; + } + } + if (same) + return rd_false; + } + } + + /* Subscription changed, update the list */ + RD_IF_FREE(member->subscribed_topic_names, rd_list_destroy); + member->subscribed_topic_names = + rd_list_new(SubscribedTopicNamesCnt, rd_free); + + for (i = 0; i < SubscribedTopicNamesCnt; i++) { + rd_list_add(member->subscribed_topic_names, + RD_KAFKAP_STR_DUP(&SubscribedTopicNames[i])); + } + + return rd_true; +} + +/** + * @brief Collect all subscribed topic names from all members. + */ +static rd_list_t *rd_kafka_mock_sharegroup_collect_subscribed_topics( + rd_kafka_mock_sharegroup_t *mshgrp) { + rd_kafka_mock_sharegroup_member_t *member; + rd_list_t *all_topics; + + all_topics = rd_list_new(32, rd_free); + + TAILQ_FOREACH(member, &mshgrp->members, link) { + const char *topic; + int i; + + if (!member->subscribed_topic_names) + continue; + + RD_LIST_FOREACH(topic, member->subscribed_topic_names, i) { + const char *existing; + int j; + rd_bool_t found = rd_false; + + /* Check if topic already in all_topics */ + RD_LIST_FOREACH(existing, all_topics, j) { + if (!strcmp(topic, existing)) { + found = rd_true; + break; + } + } + + /* Add if not found */ + if (!found) { + rd_list_add(all_topics, rd_strdup(topic)); + } + } + } + + return all_topics; +} + +/** + * @brief Get list of member ID's subscribed to a topic. + */ +rd_list_t *rd_kafka_mock_sharegroup_get_members_for_topic( + rd_kafka_mock_sharegroup_t *mshgrp, + char *topic_name) { + rd_kafka_mock_sharegroup_member_t *member; + rd_list_t *subscribed_members; + int member_idx = 0; + + subscribed_members = rd_list_new(mshgrp->member_cnt, rd_free); + + TAILQ_FOREACH(member, &mshgrp->members, link) { + char *topic; + int i; + + if (member->subscribed_topic_names) { + RD_LIST_FOREACH(topic, member->subscribed_topic_names, + i) { + if (!strcmp(topic, topic_name)) { + int *idx = rd_malloc(sizeof(*idx)); + *idx = member_idx; + rd_list_add(subscribed_members, idx); + break; + } + } + } + member_idx++; + } + + return subscribed_members; +} + +/** + * @brief Assign partitions of a single topic to subscribed members. + */ +void rd_kafka_mock_sharegroup_assign_topic_partitions( + rd_kafka_mock_sharegroup_t *mshgrp, + rd_kafka_mock_topic_t *mtopic, + rd_list_t *subscribed_member_indices) { + int member_count; + int partition_cnt; + int partitions_per_member; + int extra_partitions; + int partition_idx; + int i; + + member_count = rd_list_cnt(subscribed_member_indices); + partition_cnt = mtopic->partition_cnt; + + if (member_count == 0 || partition_cnt == 0) + return; + + partitions_per_member = partition_cnt / member_count; + extra_partitions = partition_cnt % member_count; + partition_idx = 0; + + for (i = 0; i < member_count; i++) { + int *member_idx_ptr = + (int *)rd_list_elem(subscribed_member_indices, i); + rd_kafka_mock_sharegroup_member_t *member; + int j, cnt = 0; + int num_partitions; + + TAILQ_FOREACH(member, &mshgrp->members, link) { + if (cnt == *member_idx_ptr) + break; + cnt++; + } + + if (!member) + continue; + + num_partitions = + partitions_per_member + (i < extra_partitions ? 1 : 0); + + if (!member->assignment) + member->assignment = + rd_kafka_topic_partition_list_new(num_partitions); + + for (j = 0; j < num_partitions && partition_idx < partition_cnt; + j++, partition_idx++) { + rd_kafka_topic_partition_t *rktpar; + rktpar = rd_kafka_topic_partition_list_add( + member->assignment, mtopic->name, partition_idx); + /* Set topic ID so the response can include it */ + rd_kafka_topic_partition_set_topic_id(rktpar, + mtopic->id); + } + } +} + +/** + * @brief Recalculate assignments for all members in the share group. + */ +void rd_kafka_mock_sharegroup_assignment_recalculate( + rd_kafka_mock_sharegroup_t *mshgrp) { + rd_kafka_mock_sharegroup_member_t *member; + rd_list_t *all_topics; + char *topic_name; + int i; + + if (mshgrp->member_cnt == 0) + return; + + /* Skip automatic assignment if manual mode is enabled */ + if (mshgrp->manual_assignment) + return; + + TAILQ_FOREACH(member, &mshgrp->members, link) { + if (member->assignment) { + rd_kafka_topic_partition_list_destroy( + member->assignment); + member->assignment = NULL; + } + } + + all_topics = rd_kafka_mock_sharegroup_collect_subscribed_topics(mshgrp); + + RD_LIST_FOREACH(topic_name, all_topics, i) { + rd_kafka_mock_topic_t *mtopic; + rd_list_t *subscribed_members; + + mtopic = rd_kafka_mock_topic_find(mshgrp->cluster, topic_name); + if (!mtopic) + continue; + + subscribed_members = + rd_kafka_mock_sharegroup_get_members_for_topic(mshgrp, + topic_name); + + rd_kafka_mock_sharegroup_assign_topic_partitions( + mshgrp, mtopic, subscribed_members); + + rd_list_destroy(subscribed_members); + } + + mshgrp->group_epoch++; + + TAILQ_FOREACH(member, &mshgrp->members, link) { + /* Save the current epoch as previous before bumping. + * This allows the client to catch up if the response + * with the new epoch was lost. */ + member->previous_member_epoch = member->member_epoch; + member->member_epoch = mshgrp->group_epoch; + } + + rd_list_destroy(all_topics); +} + +/** + * @brief Create a new target assignment (manual) + */ +rd_kafka_mock_sharegroup_target_assignment_t * +rd_kafka_mock_sharegroup_target_assignment_new(void) { + rd_kafka_mock_sharegroup_target_assignment_t *target_assignment; + target_assignment = rd_calloc(1, sizeof(*target_assignment)); + rd_list_init(&target_assignment->member_ids, 0, rd_free); + rd_list_init(&target_assignment->assignment, 0, + (void *)rd_kafka_topic_partition_list_destroy); + + return target_assignment; +} + +/** + * @brief Destroy target assignment + */ +void rd_kafka_mock_sharegroup_target_assignment_destroy( + rd_kafka_mock_sharegroup_target_assignment_t *target_assignment) { + rd_list_destroy(&target_assignment->member_ids); + rd_list_destroy(&target_assignment->assignment); + rd_free(target_assignment); +} + +/** + * @brief Set the target assignment for the sharegroup. + * This applies the manual assignment to the members. + * + * @locks mcluster->lock MUST be held. + */ +static void rd_kafka_mock_sharegroup_target_assignment_set( + rd_kafka_mock_sharegroup_t *mshgrp, + rd_kafka_mock_sharegroup_target_assignment_t *target_assignment) { + rd_kafka_mock_sharegroup_member_t *member; + size_t i; + + for (i = 0; i < rd_list_cnt(&target_assignment->member_ids); i++) { + const char *member_id = + rd_list_elem(&target_assignment->member_ids, i); + const rd_kafka_topic_partition_list_t *partitions = + rd_list_elem(&target_assignment->assignment, i); + rd_kafkap_str_t *member_id_str; + + member_id_str = rd_kafkap_str_new(member_id, -1); + member = + rd_kafka_mock_sharegroup_member_find(mshgrp, member_id_str); + rd_kafkap_str_destroy(member_id_str); + + if (!member) { + rd_kafka_dbg(mshgrp->cluster->rk, MOCK, "MOCK", + "Cannot set target assignment for " + "non-existing member %s in sharegroup %s", + member_id, mshgrp->id); + continue; + } + + if (member->assignment) { + rd_kafka_topic_partition_list_destroy( + member->assignment); + } + + member->assignment = + rd_kafka_topic_partition_list_copy(partitions); + + /* Set topic IDs on each partition so the heartbeat response + * can include them (ShareGroupHeartbeat uses topic IDs) */ + { + int j; + for (j = 0; j < member->assignment->cnt; j++) { + rd_kafka_topic_partition_t *rktpar = + &member->assignment->elems[j]; + rd_kafkap_str_t topic_str = { + .str = rktpar->topic, + .len = strlen(rktpar->topic)}; + rd_kafka_mock_topic_t *mtopic = + rd_kafka_mock_topic_find_by_kstr( + mshgrp->cluster, &topic_str); + if (mtopic) { + rd_kafka_topic_partition_set_topic_id( + rktpar, mtopic->id); + } + } + } + + rd_kafka_dbg( + mshgrp->cluster->rk, MOCK, "MOCK", + "Target assignment set for member %s: %d partition(s)", + member_id, member->assignment->cnt); + } + + /* Bump the epochs */ + TAILQ_FOREACH(member, &mshgrp->members, link) { + member->previous_member_epoch = member->member_epoch; + member->member_epoch = ++mshgrp->group_epoch; + } +} + +/** + * @brief Manual target assignment interface for sharegroups. + */ +void rd_kafka_mock_sharegroup_target_assignment( + rd_kafka_mock_cluster_t *mcluster, + const char *group_id, + const char **member_ids, + rd_kafka_topic_partition_list_t **assignment, + size_t member_cnt) { + rd_kafka_mock_sharegroup_t *mshgrp; + rd_kafka_mock_sharegroup_target_assignment_t *target_assignment; + size_t i; + rd_kafkap_str_t *group_id_str; + + mtx_lock(&mcluster->lock); + group_id_str = rd_kafkap_str_new(group_id, -1); + mshgrp = rd_kafka_mock_sharegroup_find(mcluster, group_id_str); + rd_kafkap_str_destroy(group_id_str); + + if (!mshgrp) { + rd_kafka_log(mcluster->rk, LOG_ERR, "MOCK", + "Sharegroup %s not found for target assignment", + group_id); + mtx_unlock(&mcluster->lock); + return; + } + + mshgrp->manual_assignment = rd_true; + target_assignment = rd_kafka_mock_sharegroup_target_assignment_new(); + + for (i = 0; i < member_cnt; i++) { + rd_list_add(&target_assignment->member_ids, + rd_strdup(member_ids[i])); + rd_list_add(&target_assignment->assignment, + rd_kafka_topic_partition_list_copy(assignment[i])); + } + rd_kafka_mock_sharegroup_target_assignment_set(mshgrp, + target_assignment); + rd_kafka_mock_sharegroup_target_assignment_destroy(target_assignment); + mtx_unlock(&mcluster->lock); +} + +/** + * @brief Set the sharegroup session timeout for the sharegroup. + */ +void rd_kafka_mock_sharegroup_set_session_timeout( + rd_kafka_mock_cluster_t *mcluster, + int session_timeout_ms) { + mtx_lock(&mcluster->lock); + mcluster->defaults.sharegroup_session_timeout_ms = session_timeout_ms; + mtx_unlock(&mcluster->lock); +} + +/** + * @brief Set the sharegroup heartbeat interval for the sharegroup. + */ +void rd_kafka_mock_sharegroup_set_heartbeat_interval( + rd_kafka_mock_cluster_t *mcluster, + int heartbeat_interval_ms) { + mtx_lock(&mcluster->lock); + mcluster->defaults.sharegroup_heartbeat_interval_ms = + heartbeat_interval_ms; + mtx_unlock(&mcluster->lock); +} + +/** + * @brief A client connection closed, check if any sharegroup has any + * state for this connection that needs to be cleared. + * + * @param mcluster Cluster to search in. + * @param mconn Connection that was closed. + * + * @locks mcluster->lock MUST be held. + */ +void rd_kafka_mock_sharegrps_connection_closed( + rd_kafka_mock_cluster_t *mcluster, + rd_kafka_mock_connection_t *mconn) { + rd_kafka_mock_sharegroup_t *mshgrp; + + TAILQ_FOREACH(mshgrp, &mcluster->sharegrps, link) { + rd_kafka_mock_sharegroup_member_t *member, *tmp; + TAILQ_FOREACH_SAFE(member, &mshgrp->members, link, tmp) { + if (member->conn == mconn) { + rd_kafka_mock_sharegroup_member_fenced(mshgrp, + member); + } + } + } +} + +/** + * @brief Retrieve the member IDs from a sharegroup. + * + * @param mcluster Mock cluster instance. + * @param group_id The sharegroup ID. + * @param member_ids_out Output array of member IDs (caller must free each + * string and the array itself). + * @param member_cnt_out Output count of members. + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, + * RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND if sharegroup not found. + */ +rd_kafka_resp_err_t rd_kafka_mock_sharegroup_get_member_ids( + rd_kafka_mock_cluster_t *mcluster, + const char *group_id, + char ***member_ids_out, + size_t *member_cnt_out) { + rd_kafka_mock_sharegroup_t *mshgrp; + rd_kafka_mock_sharegroup_member_t *member; + rd_kafkap_str_t *group_id_str; + char **member_ids; + size_t i; + + mtx_lock(&mcluster->lock); + group_id_str = rd_kafkap_str_new(group_id, -1); + mshgrp = rd_kafka_mock_sharegroup_find(mcluster, group_id_str); + rd_kafkap_str_destroy(group_id_str); + + if (!mshgrp) { + mtx_unlock(&mcluster->lock); + *member_ids_out = NULL; + *member_cnt_out = 0; + return RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND; + } + + *member_cnt_out = mshgrp->member_cnt; + if (mshgrp->member_cnt == 0) { + mtx_unlock(&mcluster->lock); + *member_ids_out = NULL; + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + + member_ids = rd_malloc(sizeof(*member_ids) * mshgrp->member_cnt); + i = 0; + TAILQ_FOREACH(member, &mshgrp->members, link) { + member_ids[i++] = rd_strdup(member->id); + } + + mtx_unlock(&mcluster->lock); + *member_ids_out = member_ids; + return RD_KAFKA_RESP_ERR_NO_ERROR; +} \ No newline at end of file diff --git a/tests/0155-share_group_heartbeat_mock.c b/tests/0155-share_group_heartbeat_mock.c new file mode 100644 index 0000000000..985b6b886e --- /dev/null +++ b/tests/0155-share_group_heartbeat_mock.c @@ -0,0 +1,702 @@ +#include "test.h" + +#include "../src/rdkafka_proto.h" + +/** + * @name Mock tests for share consumer and ShareGroupHeartbeat + */ + +static rd_bool_t is_share_heartbeat_request(rd_kafka_mock_request_t *request, + void *opaque) { + return rd_kafka_mock_request_api_key(request) == + RD_KAFKAP_ShareGroupHeartbeat; +} + +/** + * @brief Wait for at least \p num ShareGroupHeartbeat requests + * to be received by the mock cluster. + * + * @return Number of heartbeats received. + */ +static int wait_share_heartbeats(rd_kafka_mock_cluster_t *mcluster, + int num, + int confidence_interval) { + return test_mock_wait_matching_requests( + mcluster, num, confidence_interval, is_share_heartbeat_request, + NULL); +} + +/** + * @brief Create a share consumer connected to mock cluster. + */ +static rd_kafka_t *create_share_consumer(const char *bootstraps, + const char *group_id) { + rd_kafka_conf_t *conf; + rd_kafka_t *rk; + char errstr[512]; + + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "group.id", group_id); + test_conf_set(conf, "share.consumer", "true"); + test_conf_set(conf, "group.protocol", "consumer"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + + rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); + TEST_ASSERT(rk != NULL, "Failed to create share consumer: %s", errstr); + + return rk; +} + +/** + * @brief Test basic ShareGroupHeartbeat flow: + * join, receive assignment, heartbeats, leave. + */ +static void do_test_share_group_heartbeat_basic(void) { + rd_kafka_mock_cluster_t *mcluster; + const char *bootstraps; + rd_kafka_topic_partition_list_t *subscription; + rd_kafka_t *c; + int found_heartbeats; + const char *topic = test_mk_topic_name(__FUNCTION__, 0); + const char *group = "test-share-group"; + + SUB_TEST_QUICK(); + + /* Setup */ + mcluster = test_mock_cluster_new(1, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 3, 1); + + c = create_share_consumer(bootstraps, group); + + subscription = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(subscription, topic, + RD_KAFKA_PARTITION_UA); + + rd_kafka_mock_start_request_tracking(mcluster); + TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + rd_kafka_topic_partition_list_destroy(subscription); + + /* Wait for join heartbeat */ + found_heartbeats = wait_share_heartbeats(mcluster, 1, 500); + TEST_ASSERT(found_heartbeats >= 1, + "Expected at least 1 heartbeat, got %d", found_heartbeats); + + /* Poll to process response and trigger more heartbeats */ + rd_kafka_consumer_poll(c, 2000); + + /* Verify multiple heartbeats */ + found_heartbeats = wait_share_heartbeats(mcluster, 2, 200); + TEST_ASSERT(found_heartbeats >= 2, + "Expected at least 2 heartbeats, got %d", found_heartbeats); + + /* Close consumer (sends leave heartbeat) */ + rd_kafka_consumer_close(c); + rd_kafka_destroy(c); + + /* Verify leave heartbeat was sent */ + found_heartbeats = wait_share_heartbeats(mcluster, 3, 200); + + /* Verify no more heartbeats after leave */ + rd_kafka_mock_stop_request_tracking(mcluster); + rd_kafka_mock_start_request_tracking(mcluster); + rd_sleep(3); + found_heartbeats = wait_share_heartbeats(mcluster, 0, 100); + TEST_ASSERT(found_heartbeats == 0, + "Expected 0 heartbeats after leave, got %d", + found_heartbeats); + + rd_kafka_mock_stop_request_tracking(mcluster); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + +/** + * @brief Test assignment redistribution when consumers join/leave. + */ +static void do_test_share_group_assignment_rebalance(void) { + rd_kafka_mock_cluster_t *mcluster; + const char *bootstraps; + rd_kafka_topic_partition_list_t *subscription; + rd_kafka_topic_partition_list_t *c1_assignment, *c2_assignment; + rd_kafka_t *c1, *c2; + const char *topic = test_mk_topic_name(__FUNCTION__, 0); + const char *group = "test-share-group-rebalance"; + + SUB_TEST_QUICK(); + + /* Setup */ + mcluster = test_mock_cluster_new(1, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 3, 1); + + c1 = create_share_consumer(bootstraps, group); + + subscription = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(subscription, topic, + RD_KAFKA_PARTITION_UA); + + rd_kafka_mock_start_request_tracking(mcluster); + TEST_CALL_ERR__(rd_kafka_subscribe(c1, subscription)); + + /* C1 joins - should get all 3 partitions */ + wait_share_heartbeats(mcluster, 1, 500); + rd_kafka_consumer_poll(c1, 2000); + + TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assignment)); + TEST_ASSERT(c1_assignment->cnt == 3, + "Expected C1 to have 3 partitions, got %d", + c1_assignment->cnt); + rd_kafka_topic_partition_list_destroy(c1_assignment); + + /* C2 joins - partitions should be redistributed */ + c2 = create_share_consumer(bootstraps, group); + TEST_CALL_ERR__(rd_kafka_subscribe(c2, subscription)); + rd_kafka_topic_partition_list_destroy(subscription); + + wait_share_heartbeats(mcluster, 3, 500); + rd_kafka_consumer_poll(c1, 2000); + rd_kafka_consumer_poll(c2, 2000); + + TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assignment)); + TEST_ASSERT(c1_assignment->cnt + c2_assignment->cnt == 3, + "Expected total 3 partitions, got %d + %d = %d", + c1_assignment->cnt, c2_assignment->cnt, + c1_assignment->cnt + c2_assignment->cnt); + TEST_ASSERT(c1_assignment->cnt > 0 && c2_assignment->cnt > 0, + "Expected both consumers to have partitions, " + "got C1=%d, C2=%d", + c1_assignment->cnt, c2_assignment->cnt); + rd_kafka_topic_partition_list_destroy(c1_assignment); + rd_kafka_topic_partition_list_destroy(c2_assignment); + + /* C2 leaves - C1 should get all partitions back */ + rd_kafka_consumer_close(c2); + rd_kafka_destroy(c2); + + rd_kafka_consumer_poll(c1, 6000); + + TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assignment)); + TEST_ASSERT(c1_assignment->cnt == 3, + "Expected C1 to have 3 partitions after C2 left, got %d", + c1_assignment->cnt); + rd_kafka_topic_partition_list_destroy(c1_assignment); + + /* Cleanup */ + rd_kafka_consumer_close(c1); + rd_kafka_destroy(c1); + + rd_kafka_mock_stop_request_tracking(mcluster); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + +/** + * @brief Helper to count partitions for a specific topic in an assignment. + */ +static int count_topic_partitions(rd_kafka_topic_partition_list_t *assignment, + const char *topic) { + int i, count = 0; + for (i = 0; i < assignment->cnt; i++) { + if (strcmp(assignment->elems[i].topic, topic) == 0) + count++; + } + return count; +} + +/** + * @brief Test multi-topic assignment with mixed subscriptions. + * C1: both topics, C2: orders only, C3: events only + */ +static void do_test_share_group_multi_topic_assignment(void) { + rd_kafka_mock_cluster_t *mcluster; + const char *bootstraps; + rd_kafka_topic_partition_list_t *sub_both, *sub_orders, *sub_events; + rd_kafka_topic_partition_list_t *c1_assign, *c2_assign, *c3_assign; + rd_kafka_t *c1, *c2, *c3; + const char *topic_orders = "test-orders"; + const char *topic_events = "test-events"; + const char *group = "test-share-group-multi"; + int total_orders, total_events; + + SUB_TEST_QUICK(); + + /* Setup: orders (4 partitions), events (2 partitions) */ + mcluster = test_mock_cluster_new(1, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic_orders, 4, 1); + rd_kafka_mock_topic_create(mcluster, topic_events, 2, 1); + + sub_both = rd_kafka_topic_partition_list_new(2); + rd_kafka_topic_partition_list_add(sub_both, topic_orders, + RD_KAFKA_PARTITION_UA); + rd_kafka_topic_partition_list_add(sub_both, topic_events, + RD_KAFKA_PARTITION_UA); + + sub_orders = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(sub_orders, topic_orders, + RD_KAFKA_PARTITION_UA); + + sub_events = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(sub_events, topic_events, + RD_KAFKA_PARTITION_UA); + + rd_kafka_mock_start_request_tracking(mcluster); + + /* C1 joins (both topics) - should get all 6 partitions */ + c1 = create_share_consumer(bootstraps, group); + TEST_CALL_ERR__(rd_kafka_subscribe(c1, sub_both)); + wait_share_heartbeats(mcluster, 1, 500); + rd_kafka_consumer_poll(c1, 2000); + + TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); + TEST_ASSERT(c1_assign->cnt == 6, + "C1 should have all 6 partitions, got %d", c1_assign->cnt); + rd_kafka_topic_partition_list_destroy(c1_assign); + + /* C2 joins (orders only) - orders should split */ + c2 = create_share_consumer(bootstraps, group); + TEST_CALL_ERR__(rd_kafka_subscribe(c2, sub_orders)); + wait_share_heartbeats(mcluster, 3, 500); + rd_kafka_consumer_poll(c1, 2000); + rd_kafka_consumer_poll(c2, 2000); + + TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); + TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assign)); + + total_orders = count_topic_partitions(c1_assign, topic_orders) + + count_topic_partitions(c2_assign, topic_orders); + total_events = count_topic_partitions(c1_assign, topic_events) + + count_topic_partitions(c2_assign, topic_events); + + TEST_ASSERT(total_orders == 4, "Total orders should be 4, got %d", + total_orders); + TEST_ASSERT(total_events == 2, "Total events should be 2, got %d", + total_events); + TEST_ASSERT(count_topic_partitions(c2_assign, topic_orders) > 0, + "C2 should have at least 1 orders partition"); + + rd_kafka_topic_partition_list_destroy(c1_assign); + rd_kafka_topic_partition_list_destroy(c2_assign); + + /* C3 joins (events only) - events should split */ + c3 = create_share_consumer(bootstraps, group); + TEST_CALL_ERR__(rd_kafka_subscribe(c3, sub_events)); + wait_share_heartbeats(mcluster, 5, 500); + rd_kafka_consumer_poll(c1, 2000); + rd_kafka_consumer_poll(c2, 2000); + rd_kafka_consumer_poll(c3, 2000); + + TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); + TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assign)); + TEST_CALL_ERR__(rd_kafka_assignment(c3, &c3_assign)); + + total_orders = count_topic_partitions(c1_assign, topic_orders) + + count_topic_partitions(c2_assign, topic_orders) + + count_topic_partitions(c3_assign, topic_orders); + total_events = count_topic_partitions(c1_assign, topic_events) + + count_topic_partitions(c2_assign, topic_events) + + count_topic_partitions(c3_assign, topic_events); + + TEST_ASSERT(total_orders == 4, "Total orders should be 4, got %d", + total_orders); + TEST_ASSERT(total_events == 2, "Total events should be 2, got %d", + total_events); + TEST_ASSERT(count_topic_partitions(c3_assign, topic_events) > 0, + "C3 should have at least 1 events partition"); + + rd_kafka_topic_partition_list_destroy(c1_assign); + rd_kafka_topic_partition_list_destroy(c2_assign); + rd_kafka_topic_partition_list_destroy(c3_assign); + + /* C1 leaves - C2 should get all orders, C3 all events */ + rd_kafka_consumer_close(c1); + rd_kafka_destroy(c1); + + rd_kafka_consumer_poll(c2, 6000); + rd_kafka_consumer_poll(c3, 6000); + + TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assign)); + TEST_CALL_ERR__(rd_kafka_assignment(c3, &c3_assign)); + + TEST_ASSERT(count_topic_partitions(c2_assign, topic_orders) == 4, + "C2 should have all 4 orders partitions, got %d", + count_topic_partitions(c2_assign, topic_orders)); + TEST_ASSERT(count_topic_partitions(c3_assign, topic_events) == 2, + "C3 should have all 2 events partitions, got %d", + count_topic_partitions(c3_assign, topic_events)); + + rd_kafka_topic_partition_list_destroy(c2_assign); + rd_kafka_topic_partition_list_destroy(c3_assign); + + /* C2 leaves - C3 should still have events only */ + rd_kafka_consumer_close(c2); + rd_kafka_destroy(c2); + + rd_kafka_consumer_poll(c3, 6000); + + TEST_CALL_ERR__(rd_kafka_assignment(c3, &c3_assign)); + TEST_ASSERT(count_topic_partitions(c3_assign, topic_events) == 2, + "C3 should still have 2 events partitions, got %d", + count_topic_partitions(c3_assign, topic_events)); + TEST_ASSERT(count_topic_partitions(c3_assign, topic_orders) == 0, + "C3 should have 0 orders partitions (not subscribed), " + "got %d", + count_topic_partitions(c3_assign, topic_orders)); + rd_kafka_topic_partition_list_destroy(c3_assign); + + /* Cleanup */ + rd_kafka_consumer_close(c3); + rd_kafka_destroy(c3); + + rd_kafka_topic_partition_list_destroy(sub_both); + rd_kafka_topic_partition_list_destroy(sub_orders); + rd_kafka_topic_partition_list_destroy(sub_events); + + rd_kafka_mock_stop_request_tracking(mcluster); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + +/** + * @brief Test error injection for ShareGroupHeartbeat. + * + * Tests that the mock broker error injection API works for + * ShareGroupHeartbeat requests. + */ +static void do_test_share_group_error_injection(void) { + rd_kafka_mock_cluster_t *mcluster; + const char *bootstraps; + rd_kafka_topic_partition_list_t *subscription, *assignment; + rd_kafka_t *c; + int found_heartbeats; + const char *topic = test_mk_topic_name(__FUNCTION__, 0); + const char *group = "test-share-group-errors"; + + SUB_TEST_QUICK(); + + /* Setup */ + mcluster = test_mock_cluster_new(1, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 3, 1); + + c = create_share_consumer(bootstraps, group); + + subscription = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(subscription, topic, + RD_KAFKA_PARTITION_UA); + + rd_kafka_mock_start_request_tracking(mcluster); + TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + rd_kafka_topic_partition_list_destroy(subscription); + + /* Wait for initial join and assignment */ + wait_share_heartbeats(mcluster, 1, 500); + rd_kafka_consumer_poll(c, 2000); + + /* Verify initial assignment */ + TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_ASSERT(assignment->cnt == 3, + "Expected 3 partitions initially, got %d", assignment->cnt); + rd_kafka_topic_partition_list_destroy(assignment); + + /* Test that we can inject a transient error */ + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, 1, RD_KAFKAP_ShareGroupHeartbeat, 1, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, 0); + + /* Poll briefly - consumer should handle the transient error */ + rd_kafka_consumer_poll(c, 1000); + + /* Verify heartbeats continue after transient error */ + found_heartbeats = wait_share_heartbeats(mcluster, 2, 500); + TEST_ASSERT(found_heartbeats >= 1, + "Expected heartbeats to continue after error, got %d", + found_heartbeats); + + /* Cleanup */ + rd_kafka_consumer_close(c); + rd_kafka_destroy(c); + + rd_kafka_mock_stop_request_tracking(mcluster); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + +/** + * @brief Test RTT injection for ShareGroupHeartbeat. + * + * Tests that the mock broker RTT injection API works for + * ShareGroupHeartbeat requests. + */ +static void do_test_share_group_rtt_injection(void) { + rd_kafka_mock_cluster_t *mcluster; + const char *bootstraps; + rd_kafka_topic_partition_list_t *subscription, *assignment; + rd_kafka_t *c; + const char *topic = test_mk_topic_name(__FUNCTION__, 0); + const char *group = "test-share-group-rtt"; + + SUB_TEST_QUICK(); + + /* Setup */ + mcluster = test_mock_cluster_new(1, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 3, 1); + + c = create_share_consumer(bootstraps, group); + + subscription = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(subscription, topic, + RD_KAFKA_PARTITION_UA); + + TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + rd_kafka_topic_partition_list_destroy(subscription); + + /* Poll to join and get assignment */ + rd_kafka_consumer_poll(c, 3000); + + /* Verify initial assignment */ + TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_ASSERT(assignment->cnt == 3, + "Expected 3 partitions initially, got %d", assignment->cnt); + rd_kafka_topic_partition_list_destroy(assignment); + + /* Test RTT injection API - inject 50ms latency */ + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, 1, RD_KAFKAP_ShareGroupHeartbeat, 1, + RD_KAFKA_RESP_ERR_NO_ERROR, 50); + + rd_kafka_consumer_poll(c, 1000); + + /* Verify consumer still has assignment */ + TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_ASSERT(assignment->cnt == 3, + "Expected 3 partitions after RTT injection, got %d", + assignment->cnt); + rd_kafka_topic_partition_list_destroy(assignment); + + /* Cleanup */ + rd_kafka_consumer_close(c); + rd_kafka_destroy(c); + + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + +/** + * @brief Test session timeout for ShareGroupHeartbeat. + * + * Tests that the mock broker correctly times out members that stop + * heartbeating. Uses a short session timeout (3000ms) and verifies: + * - Member is removed after timeout + * - Remaining members get reassigned partitions + */ +static void do_test_share_group_session_timeout(void) { + rd_kafka_mock_cluster_t *mcluster; + const char *bootstraps; + rd_kafka_topic_partition_list_t *subscription; + rd_kafka_topic_partition_list_t *c1_assign, *c2_assign; + rd_kafka_t *c1, *c2; + int c1_initial, c2_initial; + const char *topic = test_mk_topic_name(__FUNCTION__, 0); + const char *group = "test-share-group-timeout"; + + SUB_TEST_QUICK(); + + /* Setup */ + mcluster = test_mock_cluster_new(1, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 4, 1); + + /* Set short session timeout on mock broker */ + rd_kafka_mock_sharegroup_set_session_timeout(mcluster, 3000); + + c1 = create_share_consumer(bootstraps, group); + c2 = create_share_consumer(bootstraps, group); + + subscription = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(subscription, topic, + RD_KAFKA_PARTITION_UA); + + rd_kafka_mock_start_request_tracking(mcluster); + + TEST_CALL_ERR__(rd_kafka_subscribe(c1, subscription)); + TEST_CALL_ERR__(rd_kafka_subscribe(c2, subscription)); + rd_kafka_topic_partition_list_destroy(subscription); + + /* Wait for both to join and rebalance to complete */ + wait_share_heartbeats(mcluster, 3, 500); + rd_kafka_consumer_poll(c1, 2000); + rd_kafka_consumer_poll(c2, 2000); + + /* Verify initial distribution */ + TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); + TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assign)); + c1_initial = c1_assign->cnt; + c2_initial = c2_assign->cnt; + TEST_ASSERT(c1_initial + c2_initial == 4, + "Total should be 4 partitions, got %d", + c1_initial + c2_initial); + TEST_ASSERT(c1_initial > 0 && c2_initial > 0, + "Both consumers should have partitions"); + rd_kafka_topic_partition_list_destroy(c1_assign); + rd_kafka_topic_partition_list_destroy(c2_assign); + + /* Destroy C2 without close to simulate crash */ + rd_kafka_destroy(c2); + + /* Poll C1 for 5 seconds - enough for C2 to timeout */ + rd_kafka_consumer_poll(c1, 5000); + + /* Verify C1 got all partitions after C2 timed out */ + TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); + TEST_ASSERT(c1_assign->cnt == 4, + "C1 should have all 4 partitions after C2 timeout, got %d", + c1_assign->cnt); + rd_kafka_topic_partition_list_destroy(c1_assign); + + rd_kafka_consumer_close(c1); + rd_kafka_destroy(c1); + + rd_kafka_mock_stop_request_tracking(mcluster); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + +/** + * @brief Test target assignment API for ShareGroupHeartbeat. + * + * Tests that the mock broker can apply manual target assignments: + * 1. Two consumers join and get automatic assignment (2 partitions each) + * 2. Retrieve member IDs using rd_kafka_mock_sharegroup_get_member_ids() + * 3. Set manual target assignment: C1 gets all 4 partitions, C2 gets none + * 4. Verify consumers receive the manual assignment + */ +static void do_test_share_group_target_assignment(void) { + rd_kafka_mock_cluster_t *mcluster; + const char *bootstraps; + rd_kafka_topic_partition_list_t *subscription; + rd_kafka_topic_partition_list_t *c1_assign, *c2_assign; + rd_kafka_topic_partition_list_t *target_c1, *target_c2; + rd_kafka_topic_partition_list_t *assignments[2]; + rd_kafka_t *c1, *c2; + char **member_ids; + size_t member_cnt; + rd_kafka_resp_err_t err; + const char *topic = test_mk_topic_name(__FUNCTION__, 0); + const char *group = "test-share-group-target"; + + SUB_TEST_QUICK(); + + /* Setup */ + mcluster = test_mock_cluster_new(1, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 4, 1); + + c1 = create_share_consumer(bootstraps, group); + c2 = create_share_consumer(bootstraps, group); + + subscription = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(subscription, topic, + RD_KAFKA_PARTITION_UA); + + rd_kafka_mock_start_request_tracking(mcluster); + + TEST_CALL_ERR__(rd_kafka_subscribe(c1, subscription)); + TEST_CALL_ERR__(rd_kafka_subscribe(c2, subscription)); + rd_kafka_topic_partition_list_destroy(subscription); + + /* Wait for both to join and rebalance to complete */ + wait_share_heartbeats(mcluster, 3, 500); + rd_kafka_consumer_poll(c1, 3000); + rd_kafka_consumer_poll(c2, 3000); + + /* Verify initial automatic assignment */ + TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); + TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assign)); + TEST_ASSERT(c1_assign->cnt + c2_assign->cnt == 4, + "Total should be 4 partitions, got %d", + c1_assign->cnt + c2_assign->cnt); + TEST_ASSERT(c1_assign->cnt > 0 && c2_assign->cnt > 0, + "Both consumers should have partitions initially"); + rd_kafka_topic_partition_list_destroy(c1_assign); + rd_kafka_topic_partition_list_destroy(c2_assign); + + /* Retrieve member IDs */ + err = rd_kafka_mock_sharegroup_get_member_ids(mcluster, group, + &member_ids, &member_cnt); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "Expected no error, got %s", rd_kafka_err2str(err)); + TEST_ASSERT(member_cnt == 2, "Expected 2 members, got %zu", member_cnt); + + /* Set manual target assignment: all to first member */ + target_c1 = rd_kafka_topic_partition_list_new(4); + rd_kafka_topic_partition_list_add(target_c1, topic, 0); + rd_kafka_topic_partition_list_add(target_c1, topic, 1); + rd_kafka_topic_partition_list_add(target_c1, topic, 2); + rd_kafka_topic_partition_list_add(target_c1, topic, 3); + + target_c2 = rd_kafka_topic_partition_list_new(0); + + assignments[0] = target_c1; + assignments[1] = target_c2; + + rd_kafka_mock_sharegroup_target_assignment( + mcluster, group, (const char **)member_ids, assignments, 2); + + rd_kafka_topic_partition_list_destroy(target_c1); + rd_kafka_topic_partition_list_destroy(target_c2); + + /* Poll to receive new assignment */ + rd_kafka_consumer_poll(c1, 6000); + rd_kafka_consumer_poll(c2, 6000); + + /* Verify manual assignment was applied */ + TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); + TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assign)); + + TEST_ASSERT(c1_assign->cnt + c2_assign->cnt == 4, + "Total should still be 4 partitions, got %d", + c1_assign->cnt + c2_assign->cnt); + TEST_ASSERT((c1_assign->cnt == 4 && c2_assign->cnt == 0) || + (c1_assign->cnt == 0 && c2_assign->cnt == 4), + "Expected one consumer to have all 4 partitions and the " + "other to have 0, got C1=%d, C2=%d", + c1_assign->cnt, c2_assign->cnt); + + rd_kafka_topic_partition_list_destroy(c1_assign); + rd_kafka_topic_partition_list_destroy(c2_assign); + + /* Free member IDs */ + rd_free(member_ids[0]); + rd_free(member_ids[1]); + rd_free(member_ids); + + /* Cleanup */ + rd_kafka_consumer_close(c1); + rd_kafka_consumer_close(c2); + rd_kafka_destroy(c1); + rd_kafka_destroy(c2); + + rd_kafka_mock_stop_request_tracking(mcluster); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + +int main_0155_share_group_heartbeat_mock(int argc, char **argv) { + TEST_SKIP_MOCK_CLUSTER(0); + + /* Run all tests */ + do_test_share_group_heartbeat_basic(); + do_test_share_group_assignment_rebalance(); + do_test_share_group_multi_topic_assignment(); + do_test_share_group_error_injection(); + do_test_share_group_rtt_injection(); + do_test_share_group_session_timeout(); + do_test_share_group_target_assignment(); + + return 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index fd90f7aefb..243309dfd6 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -145,6 +145,7 @@ set( 0152-rebootstrap.c 0153-memberid.c 0154-share-consumer.c + 0155-share_group_heartbeat_mock.c 8000-idle.cpp 8001-fetch_from_follower_mock_manual.c test.c diff --git a/tests/test.c b/tests/test.c index 8874395aae..fda75fac32 100644 --- a/tests/test.c +++ b/tests/test.c @@ -273,6 +273,7 @@ _TEST_DECL(0151_purge_brokers_mock); _TEST_DECL(0152_rebootstrap_local); _TEST_DECL(0153_memberid); _TEST_DECL(0154_share_consumer); +_TEST_DECL(0155_share_group_heartbeat_mock); /* Manual tests */ _TEST_DECL(8000_idle); @@ -542,6 +543,7 @@ struct test tests[] = { _TEST(0152_rebootstrap_local, TEST_F_LOCAL), _TEST(0153_memberid, TEST_F_LOCAL), _TEST(0154_share_consumer, 0, TEST_BRKVER(0, 4, 0, 0)), + _TEST(0155_share_group_heartbeat_mock, TEST_F_LOCAL), /* Manual tests */ _TEST(8000_idle, TEST_F_MANUAL), diff --git a/win32/librdkafka.vcxproj b/win32/librdkafka.vcxproj index b31f895d62..9f30d0652a 100644 --- a/win32/librdkafka.vcxproj +++ b/win32/librdkafka.vcxproj @@ -237,6 +237,7 @@ +