Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ set(
rdkafka_mock.c
rdkafka_mock_handlers.c
rdkafka_mock_cgrp.c
rdkafka_mock_sharegrp.c
rdkafka_error.c
rdkafka_fetcher.c
rdkafka_telemetry.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_background.c rdkafka_idempotence.c rdkafka_cert.c \
rdkafka_txnmgr.c rdkafka_coord.c rdbase64.c \
rdvarint.c rdbuf.c rdmap.c rdunittest.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c rdkafka_mock_sharegrp.c \
rdkafka_error.c rdkafka_fetcher.c rdkafka_telemetry.c \
rdkafka_telemetry_encode.c rdkafka_telemetry_decode.c \
nanopb/pb_encode.c nanopb/pb_decode.c nanopb/pb_common.c \
Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -2708,6 +2708,7 @@ static void rd_kafka_mock_cluster_destroy0(rd_kafka_mock_cluster_t *mcluster) {
rd_kafka_mock_broker_t *mrkb;
rd_kafka_mock_cgrp_classic_t *mcgrp_classic;
rd_kafka_mock_cgrp_consumer_t *mcgrp_consumer;
rd_kafka_mock_sharegroup_t *mshgrp;
rd_kafka_mock_coord_t *mcoord;
rd_kafka_mock_error_stack_t *errstack;
thrd_t dummy_rkb_thread;
Expand All @@ -2726,6 +2727,9 @@ static void rd_kafka_mock_cluster_destroy0(rd_kafka_mock_cluster_t *mcluster) {
while ((mcgrp_consumer = TAILQ_FIRST(&mcluster->cgrps_consumer)))
rd_kafka_mock_cgrp_consumer_destroy(mcgrp_consumer);

while ((mshgrp = TAILQ_FIRST(&mcluster->sharegrps)))
rd_kafka_mock_sharegroup_destroy(mshgrp);

while ((mcoord = TAILQ_FIRST(&mcluster->coords)))
rd_kafka_mock_coord_destroy(mcluster, mcoord);

Expand Down Expand Up @@ -2843,6 +2847,8 @@ rd_kafka_mock_cluster_t *rd_kafka_mock_cluster_new(rd_kafka_t *rk,

TAILQ_INIT(&mcluster->cgrps_consumer);

rd_kafka_mock_sharegrps_init(mcluster);

TAILQ_INIT(&mcluster->coords);

rd_list_init(&mcluster->pids, 16, rd_free);
Expand Down
56 changes: 56 additions & 0 deletions src/rdkafka_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,62 @@ RD_EXPORT void rd_kafka_mock_set_group_consumer_heartbeat_interval_ms(
rd_kafka_mock_cluster_t *mcluster,
int group_consumer_heartbeat_interval_ms);

/**
* @brief Set the sharegroup session timeout in milliseconds.
*
* @param mcluster Mock cluster instance.
* @param session_timeout_ms Session timeout in milliseconds.
*/
RD_EXPORT void rd_kafka_mock_sharegroup_set_session_timeout(
rd_kafka_mock_cluster_t *mcluster,
int session_timeout_ms);

/**
* @brief Set the sharegroup heartbeat interval in milliseconds.
*
* @param mcluster Mock cluster instance.
* @param heartbeat_interval_ms Heartbeat interval in milliseconds.
*/
RD_EXPORT void rd_kafka_mock_sharegroup_set_heartbeat_interval(
rd_kafka_mock_cluster_t *mcluster,
int heartbeat_interval_ms);

/**
* @brief Set a manual target assignment for a sharegroup.
*
* This allows tests to override the automatic partition assignment
* and manually specify which partitions each member should get.
*
* @param mcluster Mock cluster instance.
* @param group_id The sharegroup ID.
* @param member_ids Array of member IDs (strings).
* @param assignments Array of partition lists (one per member).
* @param member_cnt Number of members (length of both arrays).
*/
RD_EXPORT void rd_kafka_mock_sharegroup_target_assignment(
rd_kafka_mock_cluster_t *mcluster,
const char *group_id,
const char **member_ids,
rd_kafka_topic_partition_list_t **assignments,
size_t member_cnt);

/**
* @brief Retrieve the member IDs from a sharegroup.
*
* @param mcluster Mock cluster instance.
* @param group_id The sharegroup ID.
* @param member_ids_out Output array of member IDs. Caller must free each
* string with rd_free() and the array itself.
* @param member_cnt_out Output count of members.
*
* @returns RD_KAFKA_RESP_ERR_NO_ERROR on success,
* RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND if sharegroup not found.
*/
RD_EXPORT rd_kafka_resp_err_t rd_kafka_mock_sharegroup_get_member_ids(
rd_kafka_mock_cluster_t *mcluster,
const char *group_id,
char ***member_ids_out,
size_t *member_cnt_out);

/**@}*/

Expand Down
52 changes: 16 additions & 36 deletions src/rdkafka_mock_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "rdkafka_int.h"
#include "rdbuf.h"
#include "rdkafka_mock_int.h"
#include "rdkafka_mock_group_common.h"


static const char *rd_kafka_mock_cgrp_classic_state_names[] = {
Expand Down Expand Up @@ -72,10 +73,9 @@ rd_kafka_mock_cgrp_classic_set_state(rd_kafka_mock_cgrp_classic_t *mcgrp,
void rd_kafka_mock_cgrp_classic_member_active(
rd_kafka_mock_cgrp_classic_t *mcgrp,
rd_kafka_mock_cgrp_classic_member_t *member) {
rd_kafka_dbg(mcgrp->cluster->rk, MOCK, "MOCK",
"Marking mock consumer group member %s as active",
member->id);
member->ts_last_activity = rd_clock();
rd_kafka_mock_group_member_mark_active(mcgrp->cluster->rk, "classic",
member->id,
&member->ts_last_activity);
}


Expand Down Expand Up @@ -537,13 +537,8 @@ static void rd_kafka_mock_cgrp_classic_member_destroy(
rd_kafka_mock_cgrp_classic_member_t *rd_kafka_mock_cgrp_classic_member_find(
const rd_kafka_mock_cgrp_classic_t *mcgrp,
const rd_kafkap_str_t *MemberId) {
const rd_kafka_mock_cgrp_classic_member_t *member;
TAILQ_FOREACH(member, &mcgrp->members, link) {
if (!rd_kafkap_str_cmp_str(MemberId, member->id))
return (rd_kafka_mock_cgrp_classic_member_t *)member;
}

return NULL;
return RD_KAFKA_MOCK_MEMBER_FIND(&mcgrp->members, MemberId,
rd_kafka_mock_cgrp_classic_member_t);
}


Expand Down Expand Up @@ -659,13 +654,8 @@ void rd_kafka_mock_cgrp_classic_destroy(rd_kafka_mock_cgrp_classic_t *mcgrp) {
rd_kafka_mock_cgrp_classic_t *
rd_kafka_mock_cgrp_classic_find(rd_kafka_mock_cluster_t *mcluster,
const rd_kafkap_str_t *GroupId) {
rd_kafka_mock_cgrp_classic_t *mcgrp;
TAILQ_FOREACH(mcgrp, &mcluster->cgrps_classic, link) {
if (!rd_kafkap_str_cmp_str(GroupId, mcgrp->id))
return mcgrp;
}

return NULL;
return RD_KAFKA_MOCK_GROUP_FIND(&mcluster->cgrps_classic, GroupId,
rd_kafka_mock_cgrp_classic_t);
}


Expand Down Expand Up @@ -1335,10 +1325,9 @@ rd_kafka_mock_cgrp_consumer_member_next_assignment(
void rd_kafka_mock_cgrp_consumer_member_active(
rd_kafka_mock_cgrp_consumer_t *mcgrp,
rd_kafka_mock_cgrp_consumer_member_t *member) {
rd_kafka_dbg(mcgrp->cluster->rk, MOCK, "MOCK",
"Marking mock consumer group member %s as active",
member->id);
member->ts_last_activity = rd_clock();
rd_kafka_mock_group_member_mark_active(
mcgrp->cluster->rk, "consumer", member->id,
&member->ts_last_activity);
}

/**
Expand All @@ -1353,13 +1342,8 @@ void rd_kafka_mock_cgrp_consumer_member_active(
rd_kafka_mock_cgrp_consumer_member_t *rd_kafka_mock_cgrp_consumer_member_find(
const rd_kafka_mock_cgrp_consumer_t *mcgrp,
const rd_kafkap_str_t *MemberId) {
const rd_kafka_mock_cgrp_consumer_member_t *member;
TAILQ_FOREACH(member, &mcgrp->members, link) {
if (!rd_kafkap_str_cmp_str(MemberId, member->id))
return (rd_kafka_mock_cgrp_consumer_member_t *)member;
}

return NULL;
return RD_KAFKA_MOCK_MEMBER_FIND(&mcgrp->members, MemberId,
rd_kafka_mock_cgrp_consumer_member_t);
}

/**
Expand Down Expand Up @@ -1700,13 +1684,8 @@ void rd_kafka_mock_cgrp_consumer_member_fenced(
rd_kafka_mock_cgrp_consumer_t *
rd_kafka_mock_cgrp_consumer_find(const rd_kafka_mock_cluster_t *mcluster,
const rd_kafkap_str_t *GroupId) {
rd_kafka_mock_cgrp_consumer_t *mcgrp;
TAILQ_FOREACH(mcgrp, &mcluster->cgrps_consumer, link) {
if (!rd_kafkap_str_cmp_str(GroupId, mcgrp->id))
return mcgrp;
}

return NULL;
return RD_KAFKA_MOCK_GROUP_FIND(&mcluster->cgrps_consumer, GroupId,
rd_kafka_mock_cgrp_consumer_t);
}

/**
Expand Down Expand Up @@ -1873,4 +1852,5 @@ void rd_kafka_mock_cgrps_connection_closed(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_mock_connection_t *mconn) {
rd_kafka_mock_cgrps_classic_connection_closed(mcluster, mconn);
rd_kafka_mock_cgrps_consumer_connection_closed(mcluster, mconn);
rd_kafka_mock_sharegrps_connection_closed(mcluster, mconn);
}
98 changes: 98 additions & 0 deletions src/rdkafka_mock_group_common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2025, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

/**
* @file rdkafka_mock_group_common.h
* @brief Common macros and inline functions shared between
* ConsumerGroupHeartbeat (KIP-848) and ShareGroupHeartbeat (KIP-932)
* mock broker implementations.
*/

#ifndef _RDKAFKA_MOCK_GROUP_COMMON_H_
#define _RDKAFKA_MOCK_GROUP_COMMON_H_

#include "rdkafka_int.h"

/**
* @brief Generic group find by GroupId in a TAILQ.
*
* @param groups_head Pointer to TAILQ_HEAD of groups.
* @param GroupId rd_kafkap_str_t pointer to search for.
* @param group_type The group struct type (e.g., rd_kafka_mock_sharegroup_t).
*
* @returns Pointer to found group, or NULL if not found.
*/
#define RD_KAFKA_MOCK_GROUP_FIND(groups_head, GroupId, group_type) \
({ \
group_type *_grp = NULL; \
TAILQ_FOREACH(_grp, groups_head, link) { \
if (!rd_kafkap_str_cmp_str(GroupId, _grp->id)) \
break; \
} \
_grp; \
})

/**
* @brief Generic member find by MemberId in a TAILQ.
*
* @param members_head Pointer to TAILQ_HEAD of members.
* @param MemberId rd_kafkap_str_t pointer to search for.
* @param member_type The member struct type.
*
* @returns Pointer to found member, or NULL if not found.
*/
#define RD_KAFKA_MOCK_MEMBER_FIND(members_head, MemberId, member_type) \
({ \
member_type *_member = NULL; \
TAILQ_FOREACH(_member, members_head, link) { \
if (!rd_kafkap_str_cmp_str(MemberId, _member->id)) \
break; \
} \
_member; \
})

/**
* @brief Mark a group member as active by updating its last activity timestamp.
*
* @param rk rd_kafka_t handle for logging.
* @param group_type String describing the group type ("consumer" or "share").
* @param member_id Member ID string for logging.
* @param ts_last_activity Pointer to the member's last activity timestamp.
*/
static RD_INLINE RD_UNUSED void rd_kafka_mock_group_member_mark_active(
rd_kafka_t *rk,
const char *group_type,
const char *member_id,
rd_ts_t *ts_last_activity) {
rd_kafka_dbg(rk, MOCK, "MOCK",
"Marking mock %s group member %s as active", group_type,
member_id);
*ts_last_activity = rd_clock();
}

#endif /* _RDKAFKA_MOCK_GROUP_COMMON_H_ */
Loading