Skip to content

Commit 669e9f8

Browse files
committed
cgrp: Fix NULL dereference in LeaveGroup error path
rd_kafka_cgrp_handle_LeaveGroup() would crash with SIGSEGV when logging errors because it dereferenced rkb->rkb_rk when rkb was NULL. This can occur when the coordinator becomes unavailable during consumer shutdown. Use the always-valid `rk` parameter instead of `rkb->rkb_rk` in the rd_kafka_dbg() calls in the error path.
1 parent f383af9 commit 669e9f8

4 files changed

Lines changed: 144 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ double-free errors (#5240).
1919

2020
### General fixes
2121

22+
* Fix crash (SIGSEGV) in `rd_kafka_cgrp_handle_LeaveGroup()` when coordinator
23+
is unavailable during consumer close. The error logging path dereferenced
24+
a potentially NULL broker pointer. Happening since 1.x.
2225
* Issues: #4348.
2326
Strip trailing dot of hostname to fix SSL certificate verification issue.
2427
Happening since 1.x (#5253).

src/rdkafka_cgrp.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -981,12 +981,12 @@ static void rd_kafka_cgrp_handle_LeaveGroup(rd_kafka_t *rk,
981981

982982
err:
983983
if (ErrorCode)
984-
rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
984+
rd_kafka_dbg(rk, CGRP, "LEAVEGROUP",
985985
"LeaveGroup response error in state %s: %s",
986986
rd_kafka_cgrp_state_names[rkcg->rkcg_state],
987987
rd_kafka_err2str(ErrorCode));
988988
else
989-
rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
989+
rd_kafka_dbg(rk, CGRP, "LEAVEGROUP",
990990
"LeaveGroup response received in state %s",
991991
rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
992992

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* librdkafka - Apache Kafka C library
3+
*
4+
* Copyright (c) 2025, Confluent Inc.
5+
* All rights reserved.
6+
*
7+
* Redistribution and use in source and binary forms, with or without
8+
* modification, are permitted provided that the following conditions are met:
9+
*
10+
* 1. Redistributions of source code must retain the above copyright notice,
11+
* this list of conditions and the following disclaimer.
12+
* 2. Redistributions in binary form must reproduce the above copyright notice,
13+
* this list of conditions and the following disclaimer in the documentation
14+
* and/or other materials provided with the distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19+
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20+
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21+
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22+
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23+
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24+
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25+
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26+
* POSSIBILITY OF SUCH DAMAGE.
27+
*/
28+
29+
#include "test.h"
30+
31+
/**
32+
* @name Test LeaveGroup handling when coordinator becomes unavailable
33+
*
34+
* This test verifies that rd_kafka_cgrp_handle_LeaveGroup() correctly handles
35+
* the case where the coordinator broker (rkb) is NULL when the consumer group
36+
* initiates LeaveGroup during shutdown.
37+
*
38+
* Previously, rd_kafka_dbg() calls in the error path would dereference
39+
* rkb->rkb_rk when rkb was NULL, causing a SIGSEGV crash.
40+
*
41+
* The fix uses the always-valid `rk` parameter instead of `rkb->rkb_rk`.
42+
*/
43+
44+
static int allowed_error;
45+
46+
static int
47+
error_is_fatal_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) {
48+
if (err == allowed_error || err == RD_KAFKA_RESP_ERR__TRANSPORT ||
49+
err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN ||
50+
err == RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE ||
51+
err == RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP) {
52+
TEST_SAY("Ignoring allowed error: %s: %s\n",
53+
rd_kafka_err2name(err), reason);
54+
return 0;
55+
}
56+
return 1;
57+
}
58+
59+
/**
60+
* @brief Test that consumer close/destroy handles missing coordinator
61+
* gracefully.
62+
*
63+
* Scenario:
64+
* 1. Create a consumer and subscribe to a topic
65+
* 2. Wait for consumer to join group and establish coordinator connection
66+
* 3. Make coordinator unavailable (broker goes down)
67+
* 4. Close/destroy the consumer, which triggers LeaveGroup
68+
* 5. Verify no crash occurs (the bug would cause SIGSEGV here)
69+
*/
70+
static void do_test_leavegroup_no_coordinator(void) {
71+
rd_kafka_t *consumer;
72+
rd_kafka_conf_t *conf;
73+
rd_kafka_mock_cluster_t *mcluster;
74+
const char *bootstraps;
75+
const char *topic = test_mk_topic_name(__FUNCTION__, 0);
76+
rd_kafka_topic_partition_list_t *subscription;
77+
rd_kafka_message_t *rkm;
78+
79+
SUB_TEST();
80+
81+
test_curr->is_fatal_cb = error_is_fatal_cb;
82+
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
83+
84+
mcluster = test_mock_cluster_new(2, &bootstraps);
85+
rd_kafka_mock_topic_create(mcluster, topic, 1, 2);
86+
rd_kafka_mock_group_initial_rebalance_delay_ms(mcluster, 0);
87+
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
88+
rd_kafka_mock_coordinator_set(mcluster, "group", topic, 1);
89+
90+
test_conf_init(&conf, NULL, 60);
91+
test_conf_set(conf, "bootstrap.servers", bootstraps);
92+
test_conf_set(conf, "group.id", topic);
93+
test_conf_set(conf, "auto.offset.reset", "earliest");
94+
test_conf_set(conf, "session.timeout.ms", "6000");
95+
test_conf_set(conf, "heartbeat.interval.ms", "1000");
96+
97+
consumer = test_create_consumer(topic, NULL, conf, NULL);
98+
99+
subscription = rd_kafka_topic_partition_list_new(1);
100+
rd_kafka_topic_partition_list_add(subscription, topic,
101+
RD_KAFKA_PARTITION_UA);
102+
TEST_CALL_ERR__(rd_kafka_subscribe(consumer, subscription));
103+
rd_kafka_topic_partition_list_destroy(subscription);
104+
105+
TEST_SAY("Waiting for consumer to join group and get assignment\n");
106+
rkm = rd_kafka_consumer_poll(consumer, 10000);
107+
if (rkm)
108+
rd_kafka_message_destroy(rkm);
109+
110+
TEST_SAY(
111+
"Simulating coordinator failure by making broker unavailable\n");
112+
rd_kafka_mock_broker_set_down(mcluster, 1);
113+
114+
rd_sleep(1);
115+
116+
TEST_SAY(
117+
"Destroying consumer - this triggers LeaveGroup "
118+
"with coordinator unavailable\n");
119+
rd_kafka_destroy(consumer);
120+
121+
TEST_SAY("Consumer destroyed successfully (no crash)\n");
122+
123+
test_mock_cluster_destroy(mcluster);
124+
125+
test_curr->is_fatal_cb = NULL;
126+
allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
127+
128+
SUB_TEST_PASS();
129+
}
130+
131+
int main_0154_leavegroup_no_coordinator(int argc, char **argv) {
132+
TEST_SKIP_MOCK_CLUSTER(0);
133+
134+
do_test_leavegroup_no_coordinator();
135+
136+
return 0;
137+
}

tests/test.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ _TEST_DECL(0150_telemetry_mock);
272272
_TEST_DECL(0151_purge_brokers_mock);
273273
_TEST_DECL(0152_rebootstrap_local);
274274
_TEST_DECL(0153_memberid);
275+
_TEST_DECL(0154_leavegroup_no_coordinator);
275276

276277
/* Manual tests */
277278
_TEST_DECL(8000_idle);
@@ -540,6 +541,7 @@ struct test tests[] = {
540541
_TEST(0151_purge_brokers_mock, TEST_F_LOCAL),
541542
_TEST(0152_rebootstrap_local, TEST_F_LOCAL),
542543
_TEST(0153_memberid, TEST_F_LOCAL),
544+
_TEST(0154_leavegroup_no_coordinator, TEST_F_LOCAL),
543545

544546
/* Manual tests */
545547
_TEST(8000_idle, TEST_F_MANUAL),

0 commit comments

Comments
 (0)