Skip to content

Commit 3a3211e

Browse files
PratRanj07pranavrth
authored andcommitted
Call sharegroupheartbeat when leaving group (#5247)
1 parent 4776a3d commit 3a3211e

1 file changed

Lines changed: 84 additions & 0 deletions

File tree

src/rdkafka_cgrp.c

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,7 +1080,91 @@ rd_kafka_cgrp_handle_ConsumerGroupHeartbeat_leave(rd_kafka_t *rk,
10801080
goto err;
10811081
}
10821082

1083+
static void rd_kafka_cgrp_handle_ShareGroupHeartbeat_leave(
1084+
rd_kafka_t *rk,
1085+
rd_kafka_broker_t *rkb,
1086+
rd_kafka_resp_err_t err,
1087+
rd_kafka_buf_t *rkbuf,
1088+
rd_kafka_buf_t *request,
1089+
void *opaque) {
1090+
rd_kafka_cgrp_t *rkcg = opaque;
1091+
const int log_decode_errors = LOG_ERR;
1092+
int16_t ErrorCode = 0;
1093+
1094+
if (err) {
1095+
ErrorCode = err;
1096+
goto err;
1097+
}
1098+
1099+
rd_kafka_buf_read_throttle_time(rkbuf);
1100+
1101+
rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
1102+
err:
1103+
if (ErrorCode)
1104+
rd_kafka_dbg(
1105+
rkb->rkb_rk, CGRP, "LEAVEGROUP",
1106+
"ShareGroupHeartbeat response error in state %s: %s",
1107+
rd_kafka_cgrp_state_names[rkcg->rkcg_state],
1108+
rd_kafka_err2str(ErrorCode));
1109+
else
1110+
rd_kafka_dbg(
1111+
rkb->rkb_rk, CGRP, "LEAVEGROUP",
1112+
"ShareGroupHeartbeat response received in state %s",
1113+
rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
1114+
rd_kafka_cgrp_consumer_reset(rkcg);
1115+
if (ErrorCode != RD_KAFKA_RESP_ERR__DESTROY) {
1116+
rd_assert(thrd_is_current(rk->rk_thread));
1117+
rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE;
1118+
rd_kafka_cgrp_try_terminate(rkcg);
1119+
}
1120+
return;
1121+
err_parse:
1122+
ErrorCode = rkbuf->rkbuf_err;
1123+
goto err;
1124+
}
1125+
1126+
static void rd_kafka_cgrp_share_consumer_leave(rd_kafka_cgrp_t *rkcg) {
1127+
int32_t member_epoch = -1;
1128+
1129+
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE) {
1130+
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE",
1131+
"Group \"%.*s\": leave (in state %s): "
1132+
"ShareGroupHeartbeat already in-transit",
1133+
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
1134+
rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
1135+
return;
1136+
}
1137+
1138+
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE",
1139+
"Group \"%.*s\": leave (in state %s)",
1140+
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
1141+
rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
1142+
1143+
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_LEAVE;
1144+
1145+
if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) {
1146+
rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "LEAVE",
1147+
"Share consumer: leaving group");
1148+
rd_kafka_ShareGroupHeartbeatRequest(
1149+
rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_member_id,
1150+
member_epoch,
1151+
NULL /* no rack */,
1152+
NULL /* no subscription topics */,
1153+
RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
1154+
rd_kafka_cgrp_handle_ShareGroupHeartbeat_leave, rkcg);
1155+
} else {
1156+
rd_kafka_cgrp_handle_ShareGroupHeartbeat_leave(
1157+
rkcg->rkcg_rk, rkcg->rkcg_coord,
1158+
RD_KAFKA_RESP_ERR__WAIT_COORD, NULL, NULL, rkcg);
1159+
}
1160+
}
1161+
10831162
static void rd_kafka_cgrp_consumer_leave(rd_kafka_cgrp_t *rkcg) {
1163+
if (RD_KAFKA_IS_SHARE_CONSUMER(rkcg->rkcg_rk)) {
1164+
rd_kafka_cgrp_share_consumer_leave(rkcg);
1165+
return;
1166+
}
1167+
10841168
int32_t member_epoch = -1;
10851169

10861170
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE) {

0 commit comments

Comments
 (0)