From ec6a0ba1549b166a89acf8e96eaac502b7675590 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 19 Nov 2025 00:45:05 +0530 Subject: [PATCH] Call sharegroupheartbeat when leaving group --- src/rdkafka_cgrp.c | 84 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index e462e6843a..91711d7eac 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -1086,7 +1086,91 @@ rd_kafka_cgrp_handle_ConsumerGroupHeartbeat_leave(rd_kafka_t *rk, goto err; } +static void rd_kafka_cgrp_handle_ShareGroupHeartbeat_leave( + rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + rd_kafka_cgrp_t *rkcg = opaque; + const int log_decode_errors = LOG_ERR; + int16_t ErrorCode = 0; + + if (err) { + ErrorCode = err; + goto err; + } + + rd_kafka_buf_read_throttle_time(rkbuf); + + rd_kafka_buf_read_i16(rkbuf, &ErrorCode); +err: + if (ErrorCode) + rd_kafka_dbg( + rkb->rkb_rk, CGRP, "LEAVEGROUP", + "ShareGroupHeartbeat response error in state %s: %s", + rd_kafka_cgrp_state_names[rkcg->rkcg_state], + rd_kafka_err2str(ErrorCode)); + else + rd_kafka_dbg( + rkb->rkb_rk, CGRP, "LEAVEGROUP", + "ShareGroupHeartbeat response received in state %s", + rd_kafka_cgrp_state_names[rkcg->rkcg_state]); + rd_kafka_cgrp_consumer_reset(rkcg); + if (ErrorCode != RD_KAFKA_RESP_ERR__DESTROY) { + rd_assert(thrd_is_current(rk->rk_thread)); + rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE; + rd_kafka_cgrp_try_terminate(rkcg); + } + return; +err_parse: + ErrorCode = rkbuf->rkbuf_err; + goto err; +} + +static void rd_kafka_cgrp_share_consumer_leave(rd_kafka_cgrp_t *rkcg) { + int32_t member_epoch = -1; + + if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE", + "Group \"%.*s\": leave (in state %s): " + "ShareGroupHeartbeat already in-transit", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_cgrp_state_names[rkcg->rkcg_state]); + return; + } + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE", + "Group \"%.*s\": leave (in state %s)", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_cgrp_state_names[rkcg->rkcg_state]); + + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_LEAVE; + + if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) { + rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "LEAVE", + "Share consumer: leaving group"); + rd_kafka_ShareGroupHeartbeatRequest( + rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_member_id, + member_epoch, + NULL /* no rack */, + NULL /* no subscription topics */, + RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), + rd_kafka_cgrp_handle_ShareGroupHeartbeat_leave, rkcg); + } else { + rd_kafka_cgrp_handle_ShareGroupHeartbeat_leave( + rkcg->rkcg_rk, rkcg->rkcg_coord, + RD_KAFKA_RESP_ERR__WAIT_COORD, NULL, NULL, rkcg); + } +} + static void rd_kafka_cgrp_consumer_leave(rd_kafka_cgrp_t *rkcg) { + if (RD_KAFKA_IS_SHARE_CONSUMER(rkcg->rkcg_rk)) { + rd_kafka_cgrp_share_consumer_leave(rkcg); + return; + } + int32_t member_epoch = -1; if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE) {