Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ set(
rdkafka_mock.c
rdkafka_mock_handlers.c
rdkafka_mock_cgrp.c
rdkafka_mock_sharegrp.c
rdkafka_error.c
rdkafka_fetcher.c
rdkafka_telemetry.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_background.c rdkafka_idempotence.c rdkafka_cert.c \
rdkafka_txnmgr.c rdkafka_coord.c rdbase64.c \
rdvarint.c rdbuf.c rdmap.c rdunittest.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c rdkafka_mock_sharegrp.c \
rdkafka_error.c rdkafka_fetcher.c rdkafka_telemetry.c \
rdkafka_telemetry_encode.c rdkafka_telemetry_decode.c \
nanopb/pb_encode.c nanopb/pb_decode.c nanopb/pb_common.c \
Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -2708,6 +2708,7 @@ static void rd_kafka_mock_cluster_destroy0(rd_kafka_mock_cluster_t *mcluster) {
rd_kafka_mock_broker_t *mrkb;
rd_kafka_mock_cgrp_classic_t *mcgrp_classic;
rd_kafka_mock_cgrp_consumer_t *mcgrp_consumer;
rd_kafka_mock_sharegroup_t *mshgrp;
rd_kafka_mock_coord_t *mcoord;
rd_kafka_mock_error_stack_t *errstack;
thrd_t dummy_rkb_thread;
Expand All @@ -2726,6 +2727,9 @@ static void rd_kafka_mock_cluster_destroy0(rd_kafka_mock_cluster_t *mcluster) {
while ((mcgrp_consumer = TAILQ_FIRST(&mcluster->cgrps_consumer)))
rd_kafka_mock_cgrp_consumer_destroy(mcgrp_consumer);

while ((mshgrp = TAILQ_FIRST(&mcluster->sharegrps)))
rd_kafka_mock_sharegroup_destroy(mshgrp);

while ((mcoord = TAILQ_FIRST(&mcluster->coords)))
rd_kafka_mock_coord_destroy(mcluster, mcoord);

Expand Down Expand Up @@ -2843,6 +2847,8 @@ rd_kafka_mock_cluster_t *rd_kafka_mock_cluster_new(rd_kafka_t *rk,

TAILQ_INIT(&mcluster->cgrps_consumer);

rd_kafka_mock_sharegrps_init(mcluster);

TAILQ_INIT(&mcluster->coords);

rd_list_init(&mcluster->pids, 16, rd_free);
Expand Down
56 changes: 56 additions & 0 deletions src/rdkafka_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,62 @@ RD_EXPORT void rd_kafka_mock_set_group_consumer_heartbeat_interval_ms(
rd_kafka_mock_cluster_t *mcluster,
int group_consumer_heartbeat_interval_ms);

/**
* @brief Set the sharegroup session timeout in milliseconds.
*
* @param mcluster Mock cluster instance.
* @param session_timeout_ms Session timeout in milliseconds.
*/
RD_EXPORT void rd_kafka_mock_sharegroup_set_session_timeout(
rd_kafka_mock_cluster_t *mcluster,
int session_timeout_ms);

/**
* @brief Set the sharegroup heartbeat interval in milliseconds.
*
* @param mcluster Mock cluster instance.
* @param heartbeat_interval_ms Heartbeat interval in milliseconds.
*/
RD_EXPORT void rd_kafka_mock_sharegroup_set_heartbeat_interval(
rd_kafka_mock_cluster_t *mcluster,
int heartbeat_interval_ms);

/**
* @brief Set a manual target assignment for a sharegroup.
*
* This allows tests to override the automatic partition assignment
* and manually specify which partitions each member should get.
*
* @param mcluster Mock cluster instance.
* @param group_id The sharegroup ID.
* @param member_ids Array of member IDs (strings).
* @param assignments Array of partition lists (one per member).
* @param member_cnt Number of members (length of both arrays).
*/
RD_EXPORT void rd_kafka_mock_sharegroup_target_assignment(
rd_kafka_mock_cluster_t *mcluster,
const char *group_id,
const char **member_ids,
rd_kafka_topic_partition_list_t **assignments,
size_t member_cnt);

/**
* @brief Retrieve the member IDs from a sharegroup.
*
* @param mcluster Mock cluster instance.
* @param group_id The sharegroup ID.
* @param member_ids_out Output array of member IDs. Caller must free each
* string with rd_free() and the array itself.
* @param member_cnt_out Output count of members.
*
* @returns RD_KAFKA_RESP_ERR_NO_ERROR on success,
* RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND if sharegroup not found.
*/
RD_EXPORT rd_kafka_resp_err_t rd_kafka_mock_sharegroup_get_member_ids(
rd_kafka_mock_cluster_t *mcluster,
const char *group_id,
char ***member_ids_out,
size_t *member_cnt_out);

/**@}*/

Expand Down
52 changes: 16 additions & 36 deletions src/rdkafka_mock_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "rdkafka_int.h"
#include "rdbuf.h"
#include "rdkafka_mock_int.h"
#include "rdkafka_mock_group_common.h"


static const char *rd_kafka_mock_cgrp_classic_state_names[] = {
Expand Down Expand Up @@ -72,10 +73,9 @@ rd_kafka_mock_cgrp_classic_set_state(rd_kafka_mock_cgrp_classic_t *mcgrp,
void rd_kafka_mock_cgrp_classic_member_active(
rd_kafka_mock_cgrp_classic_t *mcgrp,
rd_kafka_mock_cgrp_classic_member_t *member) {
rd_kafka_dbg(mcgrp->cluster->rk, MOCK, "MOCK",
"Marking mock consumer group member %s as active",
member->id);
member->ts_last_activity = rd_clock();
rd_kafka_mock_group_member_mark_active(mcgrp->cluster->rk, "classic",
member->id,
&member->ts_last_activity);
}


Expand Down Expand Up @@ -537,13 +537,8 @@ static void rd_kafka_mock_cgrp_classic_member_destroy(
rd_kafka_mock_cgrp_classic_member_t *rd_kafka_mock_cgrp_classic_member_find(
const rd_kafka_mock_cgrp_classic_t *mcgrp,
const rd_kafkap_str_t *MemberId) {
const rd_kafka_mock_cgrp_classic_member_t *member;
TAILQ_FOREACH(member, &mcgrp->members, link) {
if (!rd_kafkap_str_cmp_str(MemberId, member->id))
return (rd_kafka_mock_cgrp_classic_member_t *)member;
}

return NULL;
return RD_KAFKA_MOCK_MEMBER_FIND(&mcgrp->members, MemberId,
rd_kafka_mock_cgrp_classic_member_t);
}


Expand Down Expand Up @@ -659,13 +654,8 @@ void rd_kafka_mock_cgrp_classic_destroy(rd_kafka_mock_cgrp_classic_t *mcgrp) {
rd_kafka_mock_cgrp_classic_t *
rd_kafka_mock_cgrp_classic_find(rd_kafka_mock_cluster_t *mcluster,
const rd_kafkap_str_t *GroupId) {
rd_kafka_mock_cgrp_classic_t *mcgrp;
TAILQ_FOREACH(mcgrp, &mcluster->cgrps_classic, link) {
if (!rd_kafkap_str_cmp_str(GroupId, mcgrp->id))
return mcgrp;
}

return NULL;
return RD_KAFKA_MOCK_GROUP_FIND(&mcluster->cgrps_classic, GroupId,
rd_kafka_mock_cgrp_classic_t);
}


Expand Down Expand Up @@ -1335,10 +1325,9 @@ rd_kafka_mock_cgrp_consumer_member_next_assignment(
void rd_kafka_mock_cgrp_consumer_member_active(
rd_kafka_mock_cgrp_consumer_t *mcgrp,
rd_kafka_mock_cgrp_consumer_member_t *member) {
rd_kafka_dbg(mcgrp->cluster->rk, MOCK, "MOCK",
"Marking mock consumer group member %s as active",
member->id);
member->ts_last_activity = rd_clock();
rd_kafka_mock_group_member_mark_active(
mcgrp->cluster->rk, "consumer", member->id,
&member->ts_last_activity);
}

/**
Expand All @@ -1353,13 +1342,8 @@ void rd_kafka_mock_cgrp_consumer_member_active(
rd_kafka_mock_cgrp_consumer_member_t *rd_kafka_mock_cgrp_consumer_member_find(
const rd_kafka_mock_cgrp_consumer_t *mcgrp,
const rd_kafkap_str_t *MemberId) {
const rd_kafka_mock_cgrp_consumer_member_t *member;
TAILQ_FOREACH(member, &mcgrp->members, link) {
if (!rd_kafkap_str_cmp_str(MemberId, member->id))
return (rd_kafka_mock_cgrp_consumer_member_t *)member;
}

return NULL;
return RD_KAFKA_MOCK_MEMBER_FIND(&mcgrp->members, MemberId,
rd_kafka_mock_cgrp_consumer_member_t);
}

/**
Expand Down Expand Up @@ -1700,13 +1684,8 @@ void rd_kafka_mock_cgrp_consumer_member_fenced(
rd_kafka_mock_cgrp_consumer_t *
rd_kafka_mock_cgrp_consumer_find(const rd_kafka_mock_cluster_t *mcluster,
const rd_kafkap_str_t *GroupId) {
rd_kafka_mock_cgrp_consumer_t *mcgrp;
TAILQ_FOREACH(mcgrp, &mcluster->cgrps_consumer, link) {
if (!rd_kafkap_str_cmp_str(GroupId, mcgrp->id))
return mcgrp;
}

return NULL;
return RD_KAFKA_MOCK_GROUP_FIND(&mcluster->cgrps_consumer, GroupId,
rd_kafka_mock_cgrp_consumer_t);
}

/**
Expand Down Expand Up @@ -1873,4 +1852,5 @@ void rd_kafka_mock_cgrps_connection_closed(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_mock_connection_t *mconn) {
rd_kafka_mock_cgrps_classic_connection_closed(mcluster, mconn);
rd_kafka_mock_cgrps_consumer_connection_closed(mcluster, mconn);
rd_kafka_mock_sharegrps_connection_closed(mcluster, mconn);
}
98 changes: 98 additions & 0 deletions src/rdkafka_mock_group_common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2025, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

/**
* @file rdkafka_mock_group_common.h
* @brief Common macros and inline functions shared between
* ConsumerGroupHeartbeat (KIP-848) and ShareGroupHeartbeat (KIP-932)
* mock broker implementations.
*/

#ifndef _RDKAFKA_MOCK_GROUP_COMMON_H_
#define _RDKAFKA_MOCK_GROUP_COMMON_H_

#include "rdkafka_int.h"

/**
* @brief Generic group find by GroupId in a TAILQ.
*
* @param groups_head Pointer to TAILQ_HEAD of groups.
* @param GroupId rd_kafkap_str_t pointer to search for.
* @param group_type The group struct type (e.g., rd_kafka_mock_sharegroup_t).
*
* @returns Pointer to found group, or NULL if not found.
*/
#define RD_KAFKA_MOCK_GROUP_FIND(groups_head, GroupId, group_type) \
({ \
group_type *_grp = NULL; \
TAILQ_FOREACH(_grp, groups_head, link) { \
if (!rd_kafkap_str_cmp_str(GroupId, _grp->id)) \
break; \
} \
_grp; \
})

/**
* @brief Generic member find by MemberId in a TAILQ.
*
* @param members_head Pointer to TAILQ_HEAD of members.
* @param MemberId rd_kafkap_str_t pointer to search for.
* @param member_type The member struct type.
*
* @returns Pointer to found member, or NULL if not found.
*/
#define RD_KAFKA_MOCK_MEMBER_FIND(members_head, MemberId, member_type) \
({ \
member_type *_member = NULL; \
TAILQ_FOREACH(_member, members_head, link) { \
if (!rd_kafkap_str_cmp_str(MemberId, _member->id)) \
break; \
} \
_member; \
})

/**
* @brief Mark a group member as active by updating its last activity timestamp.
*
* @param rk rd_kafka_t handle for logging.
* @param group_type String describing the group type ("consumer" or "share").
* @param member_id Member ID string for logging.
* @param ts_last_activity Pointer to the member's last activity timestamp.
*/
static RD_INLINE RD_UNUSED void rd_kafka_mock_group_member_mark_active(
rd_kafka_t *rk,
const char *group_type,
const char *member_id,
rd_ts_t *ts_last_activity) {
rd_kafka_dbg(rk, MOCK, "MOCK",
"Marking mock %s group member %s as active", group_type,
member_id);
*ts_last_activity = rd_clock();
}

#endif /* _RDKAFKA_MOCK_GROUP_COMMON_H_ */
Loading