Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ set(
rdkafka_mock.c
rdkafka_mock_handlers.c
rdkafka_mock_cgrp.c
rdkafka_mock_sharegrp.c
rdkafka_error.c
rdkafka_fetcher.c
rdkafka_telemetry.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_background.c rdkafka_idempotence.c rdkafka_cert.c \
rdkafka_txnmgr.c rdkafka_coord.c rdbase64.c \
rdvarint.c rdbuf.c rdmap.c rdunittest.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c rdkafka_mock_sharegrp.c \
rdkafka_error.c rdkafka_fetcher.c rdkafka_telemetry.c \
rdkafka_telemetry_encode.c rdkafka_telemetry_decode.c \
nanopb/pb_encode.c nanopb/pb_decode.c nanopb/pb_common.c \
Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -2708,6 +2708,7 @@ static void rd_kafka_mock_cluster_destroy0(rd_kafka_mock_cluster_t *mcluster) {
rd_kafka_mock_broker_t *mrkb;
rd_kafka_mock_cgrp_classic_t *mcgrp_classic;
rd_kafka_mock_cgrp_consumer_t *mcgrp_consumer;
rd_kafka_mock_sharegroup_t *mshgrp;
rd_kafka_mock_coord_t *mcoord;
rd_kafka_mock_error_stack_t *errstack;
thrd_t dummy_rkb_thread;
Expand All @@ -2726,6 +2727,9 @@ static void rd_kafka_mock_cluster_destroy0(rd_kafka_mock_cluster_t *mcluster) {
while ((mcgrp_consumer = TAILQ_FIRST(&mcluster->cgrps_consumer)))
rd_kafka_mock_cgrp_consumer_destroy(mcgrp_consumer);

while ((mshgrp = TAILQ_FIRST(&mcluster->sharegrps)))
rd_kafka_mock_sharegroup_destroy(mshgrp);

while ((mcoord = TAILQ_FIRST(&mcluster->coords)))
rd_kafka_mock_coord_destroy(mcluster, mcoord);

Expand Down Expand Up @@ -2843,6 +2847,8 @@ rd_kafka_mock_cluster_t *rd_kafka_mock_cluster_new(rd_kafka_t *rk,

TAILQ_INIT(&mcluster->cgrps_consumer);

rd_kafka_mock_sharegrps_init(mcluster);

TAILQ_INIT(&mcluster->coords);

rd_list_init(&mcluster->pids, 16, rd_free);
Expand Down
56 changes: 56 additions & 0 deletions src/rdkafka_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,62 @@ RD_EXPORT void rd_kafka_mock_set_group_consumer_heartbeat_interval_ms(
rd_kafka_mock_cluster_t *mcluster,
int group_consumer_heartbeat_interval_ms);

/**
* @brief Set the sharegroup session timeout in milliseconds.
*
* @param mcluster Mock cluster instance.
* @param session_timeout_ms Session timeout in milliseconds.
*/
RD_EXPORT void rd_kafka_mock_sharegroup_set_session_timeout(
rd_kafka_mock_cluster_t *mcluster,
int session_timeout_ms);

/**
* @brief Set the sharegroup heartbeat interval in milliseconds.
*
* @param mcluster Mock cluster instance.
* @param heartbeat_interval_ms Heartbeat interval in milliseconds.
*/
RD_EXPORT void rd_kafka_mock_sharegroup_set_heartbeat_interval(
rd_kafka_mock_cluster_t *mcluster,
int heartbeat_interval_ms);

/**
* @brief Set a manual target assignment for a sharegroup.
*
* This allows tests to override the automatic partition assignment
* and manually specify which partitions each member should get.
*
* @param mcluster Mock cluster instance.
* @param group_id The sharegroup ID.
* @param member_ids Array of member IDs (strings).
* @param assignments Array of partition lists (one per member).
* @param member_cnt Number of members (length of both arrays).
*/
RD_EXPORT void rd_kafka_mock_sharegroup_target_assignment(
rd_kafka_mock_cluster_t *mcluster,
const char *group_id,
const char **member_ids,
rd_kafka_topic_partition_list_t **assignments,
size_t member_cnt);

/**
* @brief Retrieve the member IDs from a sharegroup.
*
* @param mcluster Mock cluster instance.
* @param group_id The sharegroup ID.
* @param member_ids_out Output array of member IDs. Caller must free each
* string with rd_free() and the array itself.
* @param member_cnt_out Output count of members.
*
* @returns RD_KAFKA_RESP_ERR_NO_ERROR on success,
* RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND if sharegroup not found.
*/
RD_EXPORT rd_kafka_resp_err_t rd_kafka_mock_sharegroup_get_member_ids(
rd_kafka_mock_cluster_t *mcluster,
const char *group_id,
char ***member_ids_out,
size_t *member_cnt_out);

/**@}*/

Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_mock_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1873,4 +1873,5 @@ void rd_kafka_mock_cgrps_connection_closed(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_mock_connection_t *mconn) {
rd_kafka_mock_cgrps_classic_connection_closed(mcluster, mconn);
rd_kafka_mock_cgrps_consumer_connection_closed(mcluster, mconn);
rd_kafka_mock_sharegrps_connection_closed(mcluster, mconn);
}
225 changes: 225 additions & 0 deletions src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -2993,6 +2993,229 @@ rd_kafka_mock_handle_ConsumerGroupHeartbeat(rd_kafka_mock_connection_t *mconn,
return -1;
}

/**
* @brief Helper to write assignment TopicPartitions to ShareGroupHeartbeat
* response.
*/
static void rd_kafka_mock_handle_ShareGroupHeartbeat_write_TopicPartitions(
rd_kafka_buf_t *resp,
rd_kafka_topic_partition_list_t *assignment) {
const rd_kafka_topic_partition_field_t fields[] = {
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};

rd_kafka_topic_partition_list_sort_by_topic_id(assignment);
rd_kafka_buf_write_topic_partitions(
resp, assignment, rd_false /* don't skip invalid offsets */,
rd_false /* any offset */, rd_true /* use topic id */,
rd_false /* don't use topic name */, fields);
}

/**
* @brief Handle ShareGroupHeartbeat request (API Key 76).
*/
static int
rd_kafka_mock_handle_ShareGroupHeartbeat(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_t *rkbuf) {
const rd_bool_t log_decode_errors = rd_true;
rd_kafka_mock_cluster_t *mcluster = mconn->broker->cluster;
rd_kafka_buf_t *resp;
rd_kafkap_str_t GroupId, MemberId, RackId;
rd_kafkap_str_t *SubscribedTopicNames = NULL;
int32_t MemberEpoch = 0, SubscribedTopicNamesCnt;
int32_t i;
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
rd_kafka_mock_sharegroup_t *mshgrp = NULL;
rd_kafka_mock_sharegroup_member_t *member = NULL;
rd_bool_t assignment_changed = rd_false;

resp = rd_kafka_mock_buf_new_response(rkbuf);

/* Inject Error */
err = rd_kafka_mock_next_request_error(mconn, resp);
if (err)
goto build_response;

/* GroupId */
rd_kafka_buf_read_str(rkbuf, &GroupId);

/* Coordinator check */
{
rd_kafka_mock_broker_t *mrkb;

mrkb = rd_kafka_mock_cluster_get_coord(
mcluster, RD_KAFKA_COORD_GROUP, &GroupId);

if (!mrkb)
err = RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE;
else if (mrkb != mconn->broker)
err = RD_KAFKA_RESP_ERR_NOT_COORDINATOR;
}

if (err)
goto build_response;

/* MemberId */
rd_kafka_buf_read_str(rkbuf, &MemberId);

/* MemberEpoch */
rd_kafka_buf_read_i32(rkbuf, &MemberEpoch);

/* RackId (nullable) */
rd_kafka_buf_read_str(rkbuf, &RackId);

/* SubscribedTopicNames array (nullable) */
rd_kafka_buf_read_arraycnt(rkbuf, &SubscribedTopicNamesCnt,
RD_KAFKAP_TOPICS_MAX);
if (SubscribedTopicNamesCnt >= 0) {
SubscribedTopicNames = rd_calloc(
SubscribedTopicNamesCnt > 0 ? SubscribedTopicNamesCnt : 1,
sizeof(rd_kafkap_str_t));
for (i = 0; i < SubscribedTopicNamesCnt; i++) {
rd_kafka_buf_read_str(rkbuf, &SubscribedTopicNames[i]);
}
}

{
mtx_lock(&mcluster->lock);

mshgrp = rd_kafka_mock_sharegroup_get(mcluster, &GroupId);

if (MemberEpoch == -1) {
/* LEAVE: Member wants to leave */
member = rd_kafka_mock_sharegroup_member_find(
mshgrp, &MemberId);
if (member) {
rd_kafka_mock_sharegroup_member_destroy(mshgrp,
member);
member = NULL;
assignment_changed = rd_true;
}

} else if (MemberEpoch == 0) {
/* JOIN: New member wants to join */
member = rd_kafka_mock_sharegroup_member_get(
mshgrp, &MemberId, MemberEpoch, mconn);

if (member) {
if (rd_kafka_mock_sharegroup_member_subscribed_topic_names_set(
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On JOIN (MemberEpoch == 0) this calls rd_kafka_mock_sharegroup_member_subscribed_topic_names_set() even when SubscribedTopicNamesCnt is -1 (nullable array). Passing -1 through leads to rd_list_new(SubscribedTopicNamesCnt, ...) with a negative size. Only call the setter when SubscribedTopicNamesCnt >= 0, and treat -1 as “no change / not provided”.

Suggested change
if (rd_kafka_mock_sharegroup_member_subscribed_topic_names_set(
if (SubscribedTopicNamesCnt >= 0 &&
rd_kafka_mock_sharegroup_member_subscribed_topic_names_set(

Copilot uses AI. Check for mistakes.
member, SubscribedTopicNames,
SubscribedTopicNamesCnt)) {
assignment_changed = rd_true;
} else {
/* New member always triggers
* recalculation */
assignment_changed = rd_true;
}
MemberEpoch = member->member_epoch;
} else {
err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID;
}

} else {
/* HEARTBEAT: Existing member heartbeat */
member = rd_kafka_mock_sharegroup_member_find(
mshgrp, &MemberId);
if (!member) {
err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID;
} else if (MemberEpoch > member->member_epoch) {
/* Client epoch is ahead of server - indicates
* a bug or stale coordinator. */
err = RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the java implementation, in that they are returning FencedMemberEpochException in both the cases if its more or less, so RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH is being returned.

} else if (MemberEpoch < member->member_epoch) {
/* Client epoch is behind. Allow if it matches
* the previous epoch (response with bumped
* epoch may have been lost). Otherwise fence.
*/
if (MemberEpoch !=
member->previous_member_epoch) {
err =
RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH;
} else {
/* Accept previous epoch - client is
* catching up */
member->conn = mconn;
MemberEpoch = member->member_epoch;
}
} else {
/* Epoch matches - normal heartbeat */
/* Check for subscription changes */
if (SubscribedTopicNamesCnt >= 0 &&
rd_kafka_mock_sharegroup_member_subscribed_topic_names_set(
member, SubscribedTopicNames,
SubscribedTopicNamesCnt)) {
assignment_changed = rd_true;
}
member->conn = mconn;
MemberEpoch = member->member_epoch;
}
Comment on lines +3116 to +3155
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShareGroupHeartbeat processing does not mark the member as active on successful heartbeats (it updates member->conn/MemberEpoch but never updates ts_last_activity). This will cause members to time out even while heartbeating (breaking session-timeout semantics and likely making tests flaky). Call rd_kafka_mock_sharegroup_member_active() whenever a heartbeat is accepted (including the “catching up” previous-epoch path).

Copilot uses AI. Check for mistakes.
}

/* Recalculate assignments if needed */
if (assignment_changed && mshgrp->member_cnt > 0) {
rd_kafka_mock_sharegroup_assignment_recalculate(mshgrp);
if (member)
MemberEpoch = member->member_epoch;
}

mtx_unlock(&mcluster->lock);
}

build_response:

/* ThrottleTimeMs */
rd_kafka_buf_write_i32(resp, 0);

/* ErrorCode */
rd_kafka_buf_write_i16(resp, err);

/* ErrorMessage */
if (err)
rd_kafka_buf_write_str(resp, rd_kafka_err2str(err), -1);
else
rd_kafka_buf_write_str(resp, NULL, -1);

/* MemberId */
if (!err && member)
rd_kafka_buf_write_str(resp, member->id, -1);
else
rd_kafka_buf_write_str(resp, NULL, -1);

/* MemberEpoch */
rd_kafka_buf_write_i32(resp, MemberEpoch);

/* HeartbeatIntervalMs */
if (mshgrp)
rd_kafka_buf_write_i32(resp, mshgrp->heartbeat_interval_ms);
else
rd_kafka_buf_write_i32(resp, 5000);

/* Assignment */
if (!err && member && member->assignment) {
/* Send assignment even if empty (cnt == 0).
* Null (-1) means "no change", while an empty assignment
* means "you have 0 partitions". */
rd_kafka_buf_write_i8(resp, 1);
rd_kafka_mock_handle_ShareGroupHeartbeat_write_TopicPartitions(
resp, member->assignment);
rd_kafka_buf_write_tags_empty(resp);
} else {
rd_kafka_buf_write_i8(resp, -1);
}

rd_kafka_buf_write_tags_empty(resp);

rd_kafka_mock_connection_send_response(mconn, resp);

RD_IF_FREE(SubscribedTopicNames, rd_free);
return 0;

err_parse:
RD_IF_FREE(SubscribedTopicNames, rd_free);
rd_kafka_buf_destroy(resp);
return -1;
}

/**
* @brief Default request handlers
*/
Expand Down Expand Up @@ -3025,6 +3248,8 @@ const struct rd_kafka_mock_api_handler
{2, 2, -1, rd_kafka_mock_handle_OffsetForLeaderEpoch},
[RD_KAFKAP_ConsumerGroupHeartbeat] =
{1, 1, 1, rd_kafka_mock_handle_ConsumerGroupHeartbeat},
[RD_KAFKAP_ShareGroupHeartbeat] =
{1, 1, 1, rd_kafka_mock_handle_ShareGroupHeartbeat},
[RD_KAFKAP_GetTelemetrySubscriptions] =
{0, 0, 0, rd_kafka_mock_handle_GetTelemetrySubscriptions},
[RD_KAFKAP_PushTelemetry] = {0, 0, 0,
Expand Down
Loading