Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,91 @@ rd_kafka_cgrp_handle_ConsumerGroupHeartbeat_leave(rd_kafka_t *rk,
goto err;
}

static void rd_kafka_cgrp_handle_ShareGroupHeartbeat_leave(
rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
void *opaque) {
rd_kafka_cgrp_t *rkcg = opaque;
const int log_decode_errors = LOG_ERR;
int16_t ErrorCode = 0;

if (err) {
ErrorCode = err;
goto err;
}

rd_kafka_buf_read_throttle_time(rkbuf);

rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
err:
if (ErrorCode)
rd_kafka_dbg(
rkb->rkb_rk, CGRP, "LEAVEGROUP",
"ShareGroupHeartbeat response error in state %s: %s",
rd_kafka_cgrp_state_names[rkcg->rkcg_state],
rd_kafka_err2str(ErrorCode));
else
rd_kafka_dbg(
rkb->rkb_rk, CGRP, "LEAVEGROUP",
"ShareGroupHeartbeat response received in state %s",
rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
rd_kafka_cgrp_consumer_reset(rkcg);
if (ErrorCode != RD_KAFKA_RESP_ERR__DESTROY) {
rd_assert(thrd_is_current(rk->rk_thread));
rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE;
rd_kafka_cgrp_try_terminate(rkcg);
}
return;
err_parse:
ErrorCode = rkbuf->rkbuf_err;
goto err;
}

static void rd_kafka_cgrp_share_consumer_leave(rd_kafka_cgrp_t *rkcg) {
int32_t member_epoch = -1;

if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE",
"Group \"%.*s\": leave (in state %s): "
"ShareGroupHeartbeat already in-transit",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
return;
}

rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE",
"Group \"%.*s\": leave (in state %s)",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rd_kafka_cgrp_state_names[rkcg->rkcg_state]);

rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_LEAVE;

if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) {
rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "LEAVE",
"Share consumer: leaving group");
rd_kafka_ShareGroupHeartbeatRequest(
rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_member_id,
member_epoch,
NULL /* no rack */,
NULL /* no subscription topics */,
RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
rd_kafka_cgrp_handle_ShareGroupHeartbeat_leave, rkcg);
} else {
rd_kafka_cgrp_handle_ShareGroupHeartbeat_leave(
rkcg->rkcg_rk, rkcg->rkcg_coord,
RD_KAFKA_RESP_ERR__WAIT_COORD, NULL, NULL, rkcg);
}
}

static void rd_kafka_cgrp_consumer_leave(rd_kafka_cgrp_t *rkcg) {
if (RD_KAFKA_IS_SHARE_CONSUMER(rkcg->rkcg_rk)) {
rd_kafka_cgrp_share_consumer_leave(rkcg);
return;
}

int32_t member_epoch = -1;

if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE) {
Expand Down