Skip to content

Commit ec6a0ba

Browse files
committed
Call sharegroupheartbeat when leaving group
1 parent 089d6ed commit ec6a0ba

1 file changed

Lines changed: 84 additions & 0 deletions

File tree

src/rdkafka_cgrp.c

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,7 +1086,91 @@ rd_kafka_cgrp_handle_ConsumerGroupHeartbeat_leave(rd_kafka_t *rk,
10861086
goto err;
10871087
}
10881088

1089+
static void rd_kafka_cgrp_handle_ShareGroupHeartbeat_leave(
1090+
rd_kafka_t *rk,
1091+
rd_kafka_broker_t *rkb,
1092+
rd_kafka_resp_err_t err,
1093+
rd_kafka_buf_t *rkbuf,
1094+
rd_kafka_buf_t *request,
1095+
void *opaque) {
1096+
rd_kafka_cgrp_t *rkcg = opaque;
1097+
const int log_decode_errors = LOG_ERR;
1098+
int16_t ErrorCode = 0;
1099+
1100+
if (err) {
1101+
ErrorCode = err;
1102+
goto err;
1103+
}
1104+
1105+
rd_kafka_buf_read_throttle_time(rkbuf);
1106+
1107+
rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
1108+
err:
1109+
if (ErrorCode)
1110+
rd_kafka_dbg(
1111+
rkb->rkb_rk, CGRP, "LEAVEGROUP",
1112+
"ShareGroupHeartbeat response error in state %s: %s",
1113+
rd_kafka_cgrp_state_names[rkcg->rkcg_state],
1114+
rd_kafka_err2str(ErrorCode));
1115+
else
1116+
rd_kafka_dbg(
1117+
rkb->rkb_rk, CGRP, "LEAVEGROUP",
1118+
"ShareGroupHeartbeat response received in state %s",
1119+
rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
1120+
rd_kafka_cgrp_consumer_reset(rkcg);
1121+
if (ErrorCode != RD_KAFKA_RESP_ERR__DESTROY) {
1122+
rd_assert(thrd_is_current(rk->rk_thread));
1123+
rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE;
1124+
rd_kafka_cgrp_try_terminate(rkcg);
1125+
}
1126+
return;
1127+
err_parse:
1128+
ErrorCode = rkbuf->rkbuf_err;
1129+
goto err;
1130+
}
1131+
1132+
static void rd_kafka_cgrp_share_consumer_leave(rd_kafka_cgrp_t *rkcg) {
1133+
int32_t member_epoch = -1;
1134+
1135+
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE) {
1136+
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE",
1137+
"Group \"%.*s\": leave (in state %s): "
1138+
"ShareGroupHeartbeat already in-transit",
1139+
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
1140+
rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
1141+
return;
1142+
}
1143+
1144+
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE",
1145+
"Group \"%.*s\": leave (in state %s)",
1146+
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
1147+
rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
1148+
1149+
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_LEAVE;
1150+
1151+
if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) {
1152+
rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "LEAVE",
1153+
"Share consumer: leaving group");
1154+
rd_kafka_ShareGroupHeartbeatRequest(
1155+
rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_member_id,
1156+
member_epoch,
1157+
NULL /* no rack */,
1158+
NULL /* no subscription topics */,
1159+
RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
1160+
rd_kafka_cgrp_handle_ShareGroupHeartbeat_leave, rkcg);
1161+
} else {
1162+
rd_kafka_cgrp_handle_ShareGroupHeartbeat_leave(
1163+
rkcg->rkcg_rk, rkcg->rkcg_coord,
1164+
RD_KAFKA_RESP_ERR__WAIT_COORD, NULL, NULL, rkcg);
1165+
}
1166+
}
1167+
10891168
static void rd_kafka_cgrp_consumer_leave(rd_kafka_cgrp_t *rkcg) {
1169+
if (RD_KAFKA_IS_SHARE_CONSUMER(rkcg->rkcg_rk)) {
1170+
rd_kafka_cgrp_share_consumer_leave(rkcg);
1171+
return;
1172+
}
1173+
10901174
int32_t member_epoch = -1;
10911175

10921176
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE) {

0 commit comments

Comments
 (0)