From 2ab2f2e280edab786ec13e79aacab6be0ac71f67 Mon Sep 17 00:00:00 2001 From: Ankith-Confluent Date: Tue, 3 Mar 2026 18:48:52 +0530 Subject: [PATCH 1/2] Fixed the test 155 to use share consumer --- tests/0155-share_group_heartbeat_mock.c | 1168 ++++++++++++----------- 1 file changed, 611 insertions(+), 557 deletions(-) diff --git a/tests/0155-share_group_heartbeat_mock.c b/tests/0155-share_group_heartbeat_mock.c index 616613d79..21b38d231 100644 --- a/tests/0155-share_group_heartbeat_mock.c +++ b/tests/0155-share_group_heartbeat_mock.c @@ -4,6 +4,10 @@ /** * @name Mock tests for share consumer and ShareGroupHeartbeat + * + * TODO: rd_kafka_assignment() and rd_kafka_fatal_error() are called via + * test_share_consumer_get_rk() to access the underlying rd_kafka_t + * handle. Need to be updated later to avoid this. */ static rd_bool_t is_share_heartbeat_request(rd_kafka_mock_request_t *request, @@ -29,23 +33,32 @@ static int wait_share_heartbeats(rd_kafka_mock_cluster_t *mcluster, /** * @brief Create a share consumer connected to mock cluster. */ -static rd_kafka_t *create_share_consumer(const char *bootstraps, - const char *group_id) { +static rd_kafka_share_t *create_share_consumer(const char *bootstraps, + const char *group_id) { rd_kafka_conf_t *conf; - rd_kafka_t *rk; + rd_kafka_share_t *rkshare; char errstr[512]; test_conf_init(&conf, NULL, 0); test_conf_set(conf, "bootstrap.servers", bootstraps); test_conf_set(conf, "group.id", group_id); - test_conf_set(conf, "share.consumer", "true"); - test_conf_set(conf, "group.protocol", "consumer"); - test_conf_set(conf, "auto.offset.reset", "earliest"); - rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); - TEST_ASSERT(rk != NULL, "Failed to create share consumer: %s", errstr); + rkshare = rd_kafka_share_consumer_new(conf, errstr, sizeof(errstr)); + TEST_ASSERT(rkshare != NULL, "Failed to create share consumer: %s", + errstr); + rd_kafka_share_poll_set_consumer(rkshare); - return rk; + return rkshare; +} + +static int count_topic_partitions(rd_kafka_topic_partition_list_t *assignment, + const char *topic) { + int i, count = 0; + for (i = 0; i < assignment->cnt; i++) { + if (strcmp(assignment->elems[i].topic, topic) == 0) + count++; + } + return count; } /** @@ -55,8 +68,8 @@ static rd_kafka_t *create_share_consumer(const char *bootstraps, static void do_test_share_group_heartbeat_basic(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; - rd_kafka_topic_partition_list_t *subscription; - rd_kafka_t *c; + rd_kafka_topic_partition_list_t *subscription, *assignment; + rd_kafka_share_t *share_c; int found_heartbeats; const char *topic = test_mk_topic_name(__FUNCTION__, 0); const char *group = "test-share-group"; @@ -67,14 +80,14 @@ static void do_test_share_group_heartbeat_basic(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for join heartbeat */ @@ -83,17 +96,14 @@ static void do_test_share_group_heartbeat_basic(void) { "Expected at least 1 heartbeat, got %d", found_heartbeats); /* Poll to process response and trigger more heartbeats */ - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Verify assignment received (matches testReconcileNewPartitions) */ - { - rd_kafka_topic_partition_list_t *assignment; - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); - TEST_ASSERT(assignment->cnt == 3, - "Expected 3 partitions assigned, got %d", - assignment->cnt); - rd_kafka_topic_partition_list_destroy(assignment); - } + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); + TEST_ASSERT(assignment->cnt == 3, + "Expected 3 partitions assigned, got %d", assignment->cnt); + rd_kafka_topic_partition_list_destroy(assignment); /* Verify multiple heartbeats */ found_heartbeats = wait_share_heartbeats(mcluster, 2, 200); @@ -101,8 +111,8 @@ static void do_test_share_group_heartbeat_basic(void) { "Expected at least 2 heartbeats, got %d", found_heartbeats); /* Close consumer (sends leave heartbeat) */ - rd_kafka_consumer_close(c); - rd_kafka_destroy(c); + rd_kafka_share_consumer_close(share_c); + rd_kafka_share_destroy(share_c); /* Verify leave heartbeat was sent */ found_heartbeats = wait_share_heartbeats(mcluster, 3, 200); @@ -129,8 +139,9 @@ static void do_test_share_group_assignment_rebalance(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription; - rd_kafka_topic_partition_list_t *c1_assignment, *c2_assignment; - rd_kafka_t *c1, *c2; + rd_kafka_topic_partition_list_t *share_c1_assignment, + *share_c2_assignment; + rd_kafka_share_t *share_c1, *share_c2; const char *topic = test_mk_topic_name(__FUNCTION__, 0); const char *group = "test-share-group-rebalance"; @@ -140,62 +151,67 @@ static void do_test_share_group_assignment_rebalance(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); - c1 = create_share_consumer(bootstraps, group); + share_c1 = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c1, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c1, subscription)); /* C1 joins - should get all 3 partitions */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c1, 2000); + test_share_consume_msgs(share_c1, 1, 4, 500, NULL, 0); - TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assignment)); - TEST_ASSERT(c1_assignment->cnt == 3, + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &share_c1_assignment)); + TEST_ASSERT(share_c1_assignment->cnt == 3, "Expected C1 to have 3 partitions, got %d", - c1_assignment->cnt); - rd_kafka_topic_partition_list_destroy(c1_assignment); + share_c1_assignment->cnt); + rd_kafka_topic_partition_list_destroy(share_c1_assignment); /* C2 joins - partitions should be redistributed */ - c2 = create_share_consumer(bootstraps, group); - TEST_CALL_ERR__(rd_kafka_subscribe(c2, subscription)); + share_c2 = create_share_consumer(bootstraps, group); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c2, subscription)); rd_kafka_topic_partition_list_destroy(subscription); wait_share_heartbeats(mcluster, 3, 500); - rd_kafka_consumer_poll(c1, 2000); - rd_kafka_consumer_poll(c2, 2000); - - TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assignment)); - TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assignment)); - TEST_ASSERT(c1_assignment->cnt + c2_assignment->cnt == 3, + test_share_consume_msgs(share_c1, 1, 4, 500, NULL, 0); + test_share_consume_msgs(share_c2, 1, 4, 500, NULL, 0); + + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &share_c1_assignment)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c2), &share_c2_assignment)); + TEST_ASSERT(share_c1_assignment->cnt + share_c2_assignment->cnt == 3, "Expected total 3 partitions, got %d + %d = %d", - c1_assignment->cnt, c2_assignment->cnt, - c1_assignment->cnt + c2_assignment->cnt); - TEST_ASSERT(c1_assignment->cnt > 0 && c2_assignment->cnt > 0, + share_c1_assignment->cnt, share_c2_assignment->cnt, + share_c1_assignment->cnt + share_c2_assignment->cnt); + TEST_ASSERT(share_c1_assignment->cnt > 0 && + share_c2_assignment->cnt > 0, "Expected both consumers to have partitions, " "got C1=%d, C2=%d", - c1_assignment->cnt, c2_assignment->cnt); - rd_kafka_topic_partition_list_destroy(c1_assignment); - rd_kafka_topic_partition_list_destroy(c2_assignment); + share_c1_assignment->cnt, share_c2_assignment->cnt); + rd_kafka_topic_partition_list_destroy(share_c1_assignment); + rd_kafka_topic_partition_list_destroy(share_c2_assignment); /* C2 leaves - C1 should get all partitions back */ - rd_kafka_consumer_close(c2); - rd_kafka_destroy(c2); + rd_kafka_share_consumer_close(share_c2); + rd_kafka_share_destroy(share_c2); - rd_kafka_consumer_poll(c1, 6000); + test_share_consume_msgs(share_c1, 1, 12, 500, NULL, 0); - TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assignment)); - TEST_ASSERT(c1_assignment->cnt == 3, + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &share_c1_assignment)); + TEST_ASSERT(share_c1_assignment->cnt == 3, "Expected C1 to have 3 partitions after C2 left, got %d", - c1_assignment->cnt); - rd_kafka_topic_partition_list_destroy(c1_assignment); + share_c1_assignment->cnt); + rd_kafka_topic_partition_list_destroy(share_c1_assignment); /* Cleanup */ - rd_kafka_consumer_close(c1); - rd_kafka_destroy(c1); + rd_kafka_share_consumer_close(share_c1); + rd_kafka_share_destroy(share_c1); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -203,16 +219,6 @@ static void do_test_share_group_assignment_rebalance(void) { SUB_TEST_PASS(); } -static int count_topic_partitions(rd_kafka_topic_partition_list_t *assignment, - const char *topic) { - int i, count = 0; - for (i = 0; i < assignment->cnt; i++) { - if (strcmp(assignment->elems[i].topic, topic) == 0) - count++; - } - return count; -} - /** * @brief Test multi-topic assignment with mixed subscriptions. * C1: both topics, C2: orders only, C3: events only @@ -221,8 +227,9 @@ static void do_test_share_group_multi_topic_assignment(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *sub_both, *sub_orders, *sub_events; - rd_kafka_topic_partition_list_t *c1_assign, *c2_assign, *c3_assign; - rd_kafka_t *c1, *c2, *c3; + rd_kafka_topic_partition_list_t *share_c1_assign, *share_c2_assign, + *share_c3_assign; + rd_kafka_share_t *share_c1, *share_c2, *share_c3; const char *topic_orders = "test-orders"; const char *topic_events = "test-events"; const char *group = "test-share-group-multi"; @@ -252,110 +259,120 @@ static void do_test_share_group_multi_topic_assignment(void) { rd_kafka_mock_start_request_tracking(mcluster); /* C1 joins (both topics) - should get all 6 partitions */ - c1 = create_share_consumer(bootstraps, group); - TEST_CALL_ERR__(rd_kafka_subscribe(c1, sub_both)); + share_c1 = create_share_consumer(bootstraps, group); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c1, sub_both)); wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c1, 2000); + test_share_consume_msgs(share_c1, 1, 4, 500, NULL, 0); - TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); - TEST_ASSERT(c1_assign->cnt == 6, - "C1 should have all 6 partitions, got %d", c1_assign->cnt); - rd_kafka_topic_partition_list_destroy(c1_assign); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &share_c1_assign)); + TEST_ASSERT(share_c1_assign->cnt == 6, + "C1 should have all 6 partitions, got %d", + share_c1_assign->cnt); + rd_kafka_topic_partition_list_destroy(share_c1_assign); /* C2 joins (orders only) - orders should split */ - c2 = create_share_consumer(bootstraps, group); - TEST_CALL_ERR__(rd_kafka_subscribe(c2, sub_orders)); + share_c2 = create_share_consumer(bootstraps, group); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c2, sub_orders)); wait_share_heartbeats(mcluster, 3, 500); - rd_kafka_consumer_poll(c1, 2000); - rd_kafka_consumer_poll(c2, 2000); + test_share_consume_msgs(share_c1, 1, 4, 500, NULL, 0); + test_share_consume_msgs(share_c2, 1, 4, 500, NULL, 0); - TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); - TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assign)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &share_c1_assign)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c2), &share_c2_assign)); - total_orders = count_topic_partitions(c1_assign, topic_orders) + - count_topic_partitions(c2_assign, topic_orders); - total_events = count_topic_partitions(c1_assign, topic_events) + - count_topic_partitions(c2_assign, topic_events); + total_orders = count_topic_partitions(share_c1_assign, topic_orders) + + count_topic_partitions(share_c2_assign, topic_orders); + total_events = count_topic_partitions(share_c1_assign, topic_events) + + count_topic_partitions(share_c2_assign, topic_events); TEST_ASSERT(total_orders == 4, "Total orders should be 4, got %d", total_orders); TEST_ASSERT(total_events == 2, "Total events should be 2, got %d", total_events); - TEST_ASSERT(count_topic_partitions(c2_assign, topic_orders) > 0, + TEST_ASSERT(count_topic_partitions(share_c2_assign, topic_orders) > 0, "C2 should have at least 1 orders partition"); - rd_kafka_topic_partition_list_destroy(c1_assign); - rd_kafka_topic_partition_list_destroy(c2_assign); + rd_kafka_topic_partition_list_destroy(share_c1_assign); + rd_kafka_topic_partition_list_destroy(share_c2_assign); /* C3 joins (events only) - events should split */ - c3 = create_share_consumer(bootstraps, group); - TEST_CALL_ERR__(rd_kafka_subscribe(c3, sub_events)); + share_c3 = create_share_consumer(bootstraps, group); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c3, sub_events)); wait_share_heartbeats(mcluster, 5, 500); - rd_kafka_consumer_poll(c1, 2000); - rd_kafka_consumer_poll(c2, 2000); - rd_kafka_consumer_poll(c3, 2000); - - TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); - TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assign)); - TEST_CALL_ERR__(rd_kafka_assignment(c3, &c3_assign)); - - total_orders = count_topic_partitions(c1_assign, topic_orders) + - count_topic_partitions(c2_assign, topic_orders) + - count_topic_partitions(c3_assign, topic_orders); - total_events = count_topic_partitions(c1_assign, topic_events) + - count_topic_partitions(c2_assign, topic_events) + - count_topic_partitions(c3_assign, topic_events); + test_share_consume_msgs(share_c1, 1, 4, 500, NULL, 0); + test_share_consume_msgs(share_c2, 1, 4, 500, NULL, 0); + test_share_consume_msgs(share_c3, 1, 4, 500, NULL, 0); + + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &share_c1_assign)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c2), &share_c2_assign)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c3), &share_c3_assign)); + + total_orders = count_topic_partitions(share_c1_assign, topic_orders) + + count_topic_partitions(share_c2_assign, topic_orders) + + count_topic_partitions(share_c3_assign, topic_orders); + total_events = count_topic_partitions(share_c1_assign, topic_events) + + count_topic_partitions(share_c2_assign, topic_events) + + count_topic_partitions(share_c3_assign, topic_events); TEST_ASSERT(total_orders == 4, "Total orders should be 4, got %d", total_orders); TEST_ASSERT(total_events == 2, "Total events should be 2, got %d", total_events); - TEST_ASSERT(count_topic_partitions(c3_assign, topic_events) > 0, + TEST_ASSERT(count_topic_partitions(share_c3_assign, topic_events) > 0, "C3 should have at least 1 events partition"); - rd_kafka_topic_partition_list_destroy(c1_assign); - rd_kafka_topic_partition_list_destroy(c2_assign); - rd_kafka_topic_partition_list_destroy(c3_assign); + rd_kafka_topic_partition_list_destroy(share_c1_assign); + rd_kafka_topic_partition_list_destroy(share_c2_assign); + rd_kafka_topic_partition_list_destroy(share_c3_assign); /* C1 leaves - C2 should get all orders, C3 all events */ - rd_kafka_consumer_close(c1); - rd_kafka_destroy(c1); + rd_kafka_share_consumer_close(share_c1); + rd_kafka_share_destroy(share_c1); - rd_kafka_consumer_poll(c2, 6000); - rd_kafka_consumer_poll(c3, 6000); + test_share_consume_msgs(share_c2, 1, 12, 500, NULL, 0); + test_share_consume_msgs(share_c3, 1, 12, 500, NULL, 0); - TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assign)); - TEST_CALL_ERR__(rd_kafka_assignment(c3, &c3_assign)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c2), &share_c2_assign)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c3), &share_c3_assign)); - TEST_ASSERT(count_topic_partitions(c2_assign, topic_orders) == 4, + TEST_ASSERT(count_topic_partitions(share_c2_assign, topic_orders) == 4, "C2 should have all 4 orders partitions, got %d", - count_topic_partitions(c2_assign, topic_orders)); - TEST_ASSERT(count_topic_partitions(c3_assign, topic_events) == 2, + count_topic_partitions(share_c2_assign, topic_orders)); + TEST_ASSERT(count_topic_partitions(share_c3_assign, topic_events) == 2, "C3 should have all 2 events partitions, got %d", - count_topic_partitions(c3_assign, topic_events)); + count_topic_partitions(share_c3_assign, topic_events)); - rd_kafka_topic_partition_list_destroy(c2_assign); - rd_kafka_topic_partition_list_destroy(c3_assign); + rd_kafka_topic_partition_list_destroy(share_c2_assign); + rd_kafka_topic_partition_list_destroy(share_c3_assign); /* C2 leaves - C3 should still have events only */ - rd_kafka_consumer_close(c2); - rd_kafka_destroy(c2); + rd_kafka_share_consumer_close(share_c2); + rd_kafka_share_destroy(share_c2); - rd_kafka_consumer_poll(c3, 6000); + test_share_consume_msgs(share_c3, 1, 12, 500, NULL, 0); - TEST_CALL_ERR__(rd_kafka_assignment(c3, &c3_assign)); - TEST_ASSERT(count_topic_partitions(c3_assign, topic_events) == 2, + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c3), &share_c3_assign)); + TEST_ASSERT(count_topic_partitions(share_c3_assign, topic_events) == 2, "C3 should still have 2 events partitions, got %d", - count_topic_partitions(c3_assign, topic_events)); - TEST_ASSERT(count_topic_partitions(c3_assign, topic_orders) == 0, + count_topic_partitions(share_c3_assign, topic_events)); + TEST_ASSERT(count_topic_partitions(share_c3_assign, topic_orders) == 0, "C3 should have 0 orders partitions (not subscribed), " "got %d", - count_topic_partitions(c3_assign, topic_orders)); - rd_kafka_topic_partition_list_destroy(c3_assign); + count_topic_partitions(share_c3_assign, topic_orders)); + rd_kafka_topic_partition_list_destroy(share_c3_assign); /* Cleanup */ - rd_kafka_consumer_close(c3); - rd_kafka_destroy(c3); + rd_kafka_share_consumer_close(share_c3); + rd_kafka_share_destroy(share_c3); rd_kafka_topic_partition_list_destroy(sub_both); rd_kafka_topic_partition_list_destroy(sub_orders); @@ -377,7 +394,7 @@ static void do_test_share_group_error_injection(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_t *c; + rd_kafka_share_t *share_c; rd_kafka_resp_err_t fatal_err; char errstr[256]; const char *topic = test_mk_topic_name(__FUNCTION__, 0); @@ -389,22 +406,23 @@ static void do_test_share_group_error_injection(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for initial join and assignment */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Verify initial assignment */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions initially, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); @@ -417,10 +435,11 @@ static void do_test_share_group_error_injection(void) { RD_KAFKA_RESP_ERR_INVALID_REQUEST, 0); /* Poll - consumer should enter fatal state */ - rd_kafka_consumer_poll(c, 3000); + test_share_consume_msgs(share_c, 1, 6, 500, NULL, 0); /* Verify consumer entered fatal state */ - fatal_err = rd_kafka_fatal_error(c, errstr, sizeof(errstr)); + fatal_err = rd_kafka_fatal_error(test_share_consumer_get_rk(share_c), + errstr, sizeof(errstr)); TEST_ASSERT(fatal_err != RD_KAFKA_RESP_ERR_NO_ERROR, "Expected consumer to be in fatal state after " "INVALID_REQUEST error"); @@ -428,8 +447,8 @@ static void do_test_share_group_error_injection(void) { rd_kafka_err2str(fatal_err), errstr); /* Cleanup */ - rd_kafka_consumer_close(c); - rd_kafka_destroy(c); + rd_kafka_share_consumer_close(share_c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -447,7 +466,7 @@ static void do_test_share_group_rtt_injection(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_t *c; + rd_kafka_share_t *share_c; rd_kafka_conf_t *conf; char errstr[512]; int found_heartbeats; @@ -465,28 +484,28 @@ static void do_test_share_group_rtt_injection(void) { test_conf_init(&conf, NULL, 0); test_conf_set(conf, "bootstrap.servers", bootstraps); test_conf_set(conf, "group.id", group); - test_conf_set(conf, "share.consumer", "true"); - test_conf_set(conf, "group.protocol", "consumer"); - test_conf_set(conf, "auto.offset.reset", "earliest"); test_conf_set(conf, "socket.timeout.ms", "3000"); - c = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); - TEST_ASSERT(c != NULL, "Failed to create share consumer: %s", errstr); + share_c = rd_kafka_share_consumer_new(conf, errstr, sizeof(errstr)); + TEST_ASSERT(share_c != NULL, "Failed to create share consumer: %s", + errstr); + rd_kafka_share_poll_set_consumer(share_c); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for initial join and assignment */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 3000); + test_share_consume_msgs(share_c, 1, 6, 500, NULL, 0); /* Verify initial assignment */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions initially, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); @@ -498,7 +517,7 @@ static void do_test_share_group_rtt_injection(void) { RD_KAFKA_RESP_ERR_NO_ERROR, 5000); /* Poll through the timeout period - consumer should recover */ - rd_kafka_consumer_poll(c, 5000); + test_share_consume_msgs(share_c, 1, 10, 500, NULL, 0); /* Verify heartbeats resumed after timeout recovery */ found_heartbeats = wait_share_heartbeats(mcluster, 2, 1000); @@ -507,18 +526,19 @@ static void do_test_share_group_rtt_injection(void) { found_heartbeats); /* Poll more to allow assignment to be restored */ - rd_kafka_consumer_poll(c, 3000); + test_share_consume_msgs(share_c, 1, 6, 500, NULL, 0); /* Verify consumer recovered and still has assignment */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions after timeout recovery, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); /* Cleanup */ - rd_kafka_consumer_close(c); - rd_kafka_destroy(c); + rd_kafka_share_consumer_close(share_c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -538,9 +558,9 @@ static void do_test_share_group_session_timeout(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription; - rd_kafka_topic_partition_list_t *c1_assign, *c2_assign; - rd_kafka_t *c1, *c2; - int c1_initial, c2_initial; + rd_kafka_topic_partition_list_t *share_c1_assign, *share_c2_assign; + rd_kafka_share_t *share_c1, *share_c2; + int share_c1_initial, share_c2_initial; const char *topic = test_mk_topic_name(__FUNCTION__, 0); const char *group = "test-share-group-timeout"; @@ -558,8 +578,8 @@ static void do_test_share_group_session_timeout(void) { * quickly. */ rd_kafka_mock_sharegroup_set_session_timeout(mcluster, 3000); - c1 = create_share_consumer(bootstraps, group); - c2 = create_share_consumer(bootstraps, group); + share_c1 = create_share_consumer(bootstraps, group); + share_c2 = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, @@ -567,48 +587,49 @@ static void do_test_share_group_session_timeout(void) { rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c1, subscription)); - TEST_CALL_ERR__(rd_kafka_subscribe(c2, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c1, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c2, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for both to join and rebalance to complete. Poll both * consumers in short alternating windows so both can heartbeat * and neither session times out while the other is polled. */ wait_share_heartbeats(mcluster, 4, 500); - for (int i = 0; i < 5; i++) { - rd_kafka_consumer_poll(c1, 200); - rd_kafka_consumer_poll(c2, 200); - } + test_share_consume_msgs(share_c1, 1, 4, 500, NULL, 0); + test_share_consume_msgs(share_c2, 1, 4, 500, NULL, 0); /* Verify initial distribution */ - TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); - TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assign)); - c1_initial = c1_assign->cnt; - c2_initial = c2_assign->cnt; - TEST_ASSERT(c1_initial + c2_initial == 4, + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &share_c1_assign)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c2), &share_c2_assign)); + share_c1_initial = share_c1_assign->cnt; + share_c2_initial = share_c2_assign->cnt; + TEST_ASSERT(share_c1_initial + share_c2_initial == 4, "Total should be 4 partitions, got %d", - c1_initial + c2_initial); - TEST_ASSERT(c1_initial > 0 && c2_initial > 0, + share_c1_initial + share_c2_initial); + TEST_ASSERT(share_c1_initial > 0 && share_c2_initial > 0, "Both consumers should have partitions"); - rd_kafka_topic_partition_list_destroy(c1_assign); - rd_kafka_topic_partition_list_destroy(c2_assign); + rd_kafka_topic_partition_list_destroy(share_c1_assign); + rd_kafka_topic_partition_list_destroy(share_c2_assign); /* Destroy C2 without close to simulate crash */ - rd_kafka_destroy(c2); + rd_kafka_share_destroy(share_c2); /* Poll C1 for 5 seconds — enough for C2's 3s session to expire and * for C1 to receive the reassigned partitions. */ - rd_kafka_consumer_poll(c1, 5000); + test_share_consume_msgs(share_c1, 1, 10, 500, NULL, 0); /* Verify C1 got all partitions after C2 timed out */ - TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); - TEST_ASSERT(c1_assign->cnt == 4, + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &share_c1_assign)); + TEST_ASSERT(share_c1_assign->cnt == 4, "C1 should have all 4 partitions after C2 timeout, got %d", - c1_assign->cnt); - rd_kafka_topic_partition_list_destroy(c1_assign); + share_c1_assign->cnt); + rd_kafka_topic_partition_list_destroy(share_c1_assign); - rd_kafka_consumer_close(c1); - rd_kafka_destroy(c1); + rd_kafka_share_consumer_close(share_c1); + rd_kafka_share_destroy(share_c1); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -629,10 +650,10 @@ static void do_test_share_group_target_assignment(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription; - rd_kafka_topic_partition_list_t *c1_assign, *c2_assign; + rd_kafka_topic_partition_list_t *share_c1_assign, *share_c2_assign; rd_kafka_topic_partition_list_t *target_c1, *target_c2; rd_kafka_topic_partition_list_t *assignments[2]; - rd_kafka_t *c1, *c2; + rd_kafka_share_t *share_c1, *share_c2; char **member_ids; size_t member_cnt; rd_kafka_resp_err_t err; @@ -645,8 +666,8 @@ static void do_test_share_group_target_assignment(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 4, 1); - c1 = create_share_consumer(bootstraps, group); - c2 = create_share_consumer(bootstraps, group); + share_c1 = create_share_consumer(bootstraps, group); + share_c2 = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, @@ -654,25 +675,27 @@ static void do_test_share_group_target_assignment(void) { rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c1, subscription)); - TEST_CALL_ERR__(rd_kafka_subscribe(c2, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c1, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c2, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for both to join and rebalance to complete */ wait_share_heartbeats(mcluster, 3, 500); - rd_kafka_consumer_poll(c1, 3000); - rd_kafka_consumer_poll(c2, 3000); + test_share_consume_msgs(share_c1, 1, 6, 500, NULL, 0); + test_share_consume_msgs(share_c2, 1, 6, 500, NULL, 0); /* Verify initial automatic assignment */ - TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); - TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assign)); - TEST_ASSERT(c1_assign->cnt + c2_assign->cnt == 4, + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &share_c1_assign)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c2), &share_c2_assign)); + TEST_ASSERT(share_c1_assign->cnt + share_c2_assign->cnt == 4, "Total should be 4 partitions, got %d", - c1_assign->cnt + c2_assign->cnt); - TEST_ASSERT(c1_assign->cnt > 0 && c2_assign->cnt > 0, + share_c1_assign->cnt + share_c2_assign->cnt); + TEST_ASSERT(share_c1_assign->cnt > 0 && share_c2_assign->cnt > 0, "Both consumers should have partitions initially"); - rd_kafka_topic_partition_list_destroy(c1_assign); - rd_kafka_topic_partition_list_destroy(c2_assign); + rd_kafka_topic_partition_list_destroy(share_c1_assign); + rd_kafka_topic_partition_list_destroy(share_c2_assign); /* Retrieve member IDs */ err = rd_kafka_mock_sharegroup_get_member_ids(mcluster, group, @@ -700,24 +723,27 @@ static void do_test_share_group_target_assignment(void) { rd_kafka_topic_partition_list_destroy(target_c2); /* Poll to receive new assignment */ - rd_kafka_consumer_poll(c1, 6000); - rd_kafka_consumer_poll(c2, 6000); + test_share_consume_msgs(share_c1, 1, 12, 500, NULL, 0); + test_share_consume_msgs(share_c2, 1, 12, 500, NULL, 0); /* Verify manual assignment was applied */ - TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); - TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assign)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &share_c1_assign)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c2), &share_c2_assign)); - TEST_ASSERT(c1_assign->cnt + c2_assign->cnt == 4, + TEST_ASSERT(share_c1_assign->cnt + share_c2_assign->cnt == 4, "Total should still be 4 partitions, got %d", - c1_assign->cnt + c2_assign->cnt); - TEST_ASSERT((c1_assign->cnt == 4 && c2_assign->cnt == 0) || - (c1_assign->cnt == 0 && c2_assign->cnt == 4), - "Expected one consumer to have all 4 partitions and the " - "other to have 0, got C1=%d, C2=%d", - c1_assign->cnt, c2_assign->cnt); + share_c1_assign->cnt + share_c2_assign->cnt); + TEST_ASSERT( + (share_c1_assign->cnt == 4 && share_c2_assign->cnt == 0) || + (share_c1_assign->cnt == 0 && share_c2_assign->cnt == 4), + "Expected one consumer to have all 4 partitions and the " + "other to have 0, got C1=%d, C2=%d", + share_c1_assign->cnt, share_c2_assign->cnt); - rd_kafka_topic_partition_list_destroy(c1_assign); - rd_kafka_topic_partition_list_destroy(c2_assign); + rd_kafka_topic_partition_list_destroy(share_c1_assign); + rd_kafka_topic_partition_list_destroy(share_c2_assign); /* Free member IDs */ rd_free(member_ids[0]); @@ -725,10 +751,10 @@ static void do_test_share_group_target_assignment(void) { rd_free(member_ids); /* Cleanup */ - rd_kafka_consumer_close(c1); - rd_kafka_consumer_close(c2); - rd_kafka_destroy(c1); - rd_kafka_destroy(c2); + rd_kafka_share_consumer_close(share_c1); + rd_kafka_share_consumer_close(share_c2); + rd_kafka_share_destroy(share_c1); + rd_kafka_share_destroy(share_c2); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -750,7 +776,7 @@ static void do_test_share_group_no_spurious_fencing(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_t *c; + rd_kafka_share_t *share_c; const char *topic = test_mk_topic_name(__FUNCTION__, 0); const char *group = "test-share-group-no-fence"; int i; @@ -765,18 +791,19 @@ static void do_test_share_group_no_spurious_fencing(void) { rd_kafka_mock_sharegroup_set_heartbeat_interval(mcluster, 500); rd_kafka_mock_sharegroup_set_session_timeout(mcluster, 2000); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for join and initial assignment. */ - rd_kafka_consumer_poll(c, 1000); + test_share_consume_msgs(share_c, 1, 6, 500, NULL, 0); - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions initially, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); @@ -786,10 +813,11 @@ static void do_test_share_group_no_spurious_fencing(void) { * members, the assignment will drop. */ TEST_SAY("Polling for 5 seconds with 2s session timeout...\n"); for (i = 0; i < 5; i++) { - rd_kafka_consumer_poll(c, 1000); + test_share_consume_msgs(share_c, 1, 2, 500, NULL, 0); /* Verify assignment is still intact */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c), &assignment)); TEST_ASSERT(assignment->cnt == 3, "Assignment dropped at %ds (spurious fencing!)", i + 1); @@ -799,8 +827,8 @@ static void do_test_share_group_no_spurious_fencing(void) { TEST_SAY("No spurious fencing after 5 seconds\n"); /* Cleanup */ - rd_kafka_consumer_close(c); - rd_kafka_destroy(c); + rd_kafka_share_consumer_close(share_c); + rd_kafka_share_destroy(share_c); test_mock_cluster_destroy(mcluster); SUB_TEST_PASS(); @@ -820,7 +848,7 @@ static void do_test_unknown_member_id_error(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_t *c; + rd_kafka_share_t *share_c; int found_heartbeats; const char *topic = test_mk_topic_name(__FUNCTION__, 0); const char *group = "test-share-group-unknown-member"; @@ -831,22 +859,23 @@ static void do_test_unknown_member_id_error(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for initial join and assignment */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Verify initial assignment */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions initially, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); @@ -857,7 +886,7 @@ static void do_test_unknown_member_id_error(void) { RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID, 0); /* Poll - consumer should handle error and rejoin */ - rd_kafka_consumer_poll(c, 3000); + test_share_consume_msgs(share_c, 1, 6, 500, NULL, 0); /* Verify heartbeats continue (rejoin happened) */ found_heartbeats = wait_share_heartbeats(mcluster, 2, 500); @@ -867,16 +896,17 @@ static void do_test_unknown_member_id_error(void) { found_heartbeats); /* Verify consumer eventually gets assignment back */ - rd_kafka_consumer_poll(c, 2000); - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions after rejoin, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); /* Cleanup */ - rd_kafka_consumer_close(c); - rd_kafka_destroy(c); + rd_kafka_share_consumer_close(share_c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -894,7 +924,7 @@ static void do_test_fenced_member_epoch_error(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_t *c; + rd_kafka_share_t *share_c; int found_heartbeats; const char *topic = test_mk_topic_name(__FUNCTION__, 0); const char *group = "test-share-group-fenced"; @@ -905,22 +935,23 @@ static void do_test_fenced_member_epoch_error(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for initial join and assignment */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Verify initial assignment */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions initially, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); @@ -931,7 +962,7 @@ static void do_test_fenced_member_epoch_error(void) { RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH, 0); /* Poll - consumer should handle error and rejoin */ - rd_kafka_consumer_poll(c, 3000); + test_share_consume_msgs(share_c, 1, 6, 500, NULL, 0); /* Verify heartbeats continue (rejoin happened) */ found_heartbeats = wait_share_heartbeats(mcluster, 2, 500); @@ -942,16 +973,17 @@ static void do_test_fenced_member_epoch_error(void) { found_heartbeats); /* Verify consumer eventually gets assignment back */ - rd_kafka_consumer_poll(c, 2000); - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions after rejoin, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); /* Cleanup */ - rd_kafka_consumer_close(c); - rd_kafka_destroy(c); + rd_kafka_share_consumer_close(share_c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -969,7 +1001,7 @@ static void do_test_coordinator_not_available_error(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_t *c; + rd_kafka_share_t *share_c; int found_heartbeats; const char *topic = test_mk_topic_name(__FUNCTION__, 0); const char *group = "test-share-group-coord-unavail"; @@ -980,22 +1012,23 @@ static void do_test_coordinator_not_available_error(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for initial join and assignment */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Verify initial assignment */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions initially, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); @@ -1006,7 +1039,7 @@ static void do_test_coordinator_not_available_error(void) { RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, 0); /* Poll - consumer should handle transient error and retry */ - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Verify heartbeats continue after transient error */ found_heartbeats = wait_share_heartbeats(mcluster, 2, 500); @@ -1017,15 +1050,16 @@ static void do_test_coordinator_not_available_error(void) { found_heartbeats); /* Verify consumer still has assignment */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions after retry, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); /* Cleanup */ - rd_kafka_consumer_close(c); - rd_kafka_destroy(c); + rd_kafka_share_consumer_close(share_c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -1042,7 +1076,7 @@ static void do_test_not_coordinator_error(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_t *c; + rd_kafka_share_t *share_c; int found_heartbeats; const char *topic = test_mk_topic_name(__FUNCTION__, 0); const char *group = "test-share-group-not-coord"; @@ -1053,22 +1087,23 @@ static void do_test_not_coordinator_error(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for initial join and assignment */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Verify initial assignment */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions initially, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); @@ -1081,7 +1116,7 @@ static void do_test_not_coordinator_error(void) { /* Poll - consumer should find new coordinator and continue. * NOT_COORDINATOR triggers coordinator rediscovery which may take * longer than COORDINATOR_NOT_AVAILABLE. */ - rd_kafka_consumer_poll(c, 5000); + test_share_consume_msgs(share_c, 1, 10, 500, NULL, 0); /* Verify heartbeats continue after finding coordinator */ found_heartbeats = wait_share_heartbeats(mcluster, 2, 1000); @@ -1091,15 +1126,16 @@ static void do_test_not_coordinator_error(void) { found_heartbeats); /* Verify consumer still has assignment */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions after finding coordinator, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); /* Cleanup */ - rd_kafka_consumer_close(c); - rd_kafka_destroy(c); + rd_kafka_share_consumer_close(share_c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -1117,7 +1153,7 @@ static void do_test_group_authorization_failed_error(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription; - rd_kafka_t *c; + rd_kafka_share_t *share_c; rd_kafka_resp_err_t fatal_err; char errstr[256]; const char *topic = test_mk_topic_name(__FUNCTION__, 0); @@ -1129,19 +1165,19 @@ static void do_test_group_authorization_failed_error(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for initial join */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Inject GROUP_AUTHORIZATION_FAILED error (fatal) */ rd_kafka_mock_broker_push_request_error_rtts( @@ -1149,10 +1185,11 @@ static void do_test_group_authorization_failed_error(void) { RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED, 0); /* Poll - should trigger fatal error */ - rd_kafka_consumer_poll(c, 3000); + test_share_consume_msgs(share_c, 1, 6, 500, NULL, 0); /* Verify consumer entered fatal state */ - fatal_err = rd_kafka_fatal_error(c, errstr, sizeof(errstr)); + fatal_err = rd_kafka_fatal_error(test_share_consumer_get_rk(share_c), + errstr, sizeof(errstr)); TEST_ASSERT(fatal_err != RD_KAFKA_RESP_ERR_NO_ERROR, "Expected consumer to be in fatal state after " "GROUP_AUTHORIZATION_FAILED"); @@ -1160,8 +1197,8 @@ static void do_test_group_authorization_failed_error(void) { rd_kafka_err2str(fatal_err), errstr); /* Cleanup */ - rd_kafka_consumer_close(c); - rd_kafka_destroy(c); + rd_kafka_share_consumer_close(share_c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -1179,7 +1216,7 @@ static void do_test_group_max_size_reached_error(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_t *c1, *c2; + rd_kafka_share_t *share_c1, *share_c2; rd_kafka_resp_err_t fatal_err; char errstr[256]; const char *topic = test_mk_topic_name(__FUNCTION__, 0); @@ -1196,24 +1233,25 @@ static void do_test_group_max_size_reached_error(void) { RD_KAFKA_PARTITION_UA); /* First consumer joins successfully */ - c1 = create_share_consumer(bootstraps, group); + share_c1 = create_share_consumer(bootstraps, group); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c1, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c1, subscription)); - /* Wait for c1 to fully join and stabilize */ + /* Wait for share_c1 to fully join and stabilize */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c1, 2000); + test_share_consume_msgs(share_c1, 1, 4, 500, NULL, 0); - TEST_CALL_ERR__(rd_kafka_assignment(c1, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &assignment)); TEST_ASSERT(assignment->cnt == 4, - "Expected c1 to have 4 partitions, got %d", + "Expected share_c1 to have 4 partitions, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); /* Push multiple GROUP_MAX_SIZE_REACHED errors so that even if - * c1's regular heartbeat consumes some, c2's join heartbeat + * share_c1's regular heartbeat consumes some, share_c2's join heartbeat * will also get one. The Java test uses server-side maxSize=1 * config; we simulate by injecting errors for all heartbeats. */ rd_kafka_mock_broker_push_request_error_rtts( @@ -1221,27 +1259,29 @@ static void do_test_group_max_size_reached_error(void) { RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED, 0); /* Create second consumer - should be rejected */ - c2 = create_share_consumer(bootstraps, group); - TEST_CALL_ERR__(rd_kafka_subscribe(c2, subscription)); + share_c2 = create_share_consumer(bootstraps, group); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c2, subscription)); - /* Poll c2 - should get fatal error */ - rd_kafka_consumer_poll(c2, 3000); + /* Poll share_c2 - should get fatal error */ + test_share_consume_msgs(share_c2, 1, 6, 500, NULL, 0); - /* Verify c2 entered fatal state */ - fatal_err = rd_kafka_fatal_error(c2, errstr, sizeof(errstr)); + /* Verify share_c2 entered fatal state */ + fatal_err = rd_kafka_fatal_error(test_share_consumer_get_rk(share_c2), + errstr, sizeof(errstr)); TEST_ASSERT(fatal_err != RD_KAFKA_RESP_ERR_NO_ERROR, - "Expected c2 to be in fatal state after " + "Expected share_c2 to be in fatal state after " "GROUP_MAX_SIZE_REACHED"); - TEST_SAY("c2 correctly rejected with fatal error: %s (%s)\n", - rd_kafka_err2str(fatal_err), errstr); + TEST_SAY( + "share consumer 2 correctly rejected with fatal error: %s (%s)\n", + rd_kafka_err2str(fatal_err), errstr); rd_kafka_topic_partition_list_destroy(subscription); /* Cleanup */ - rd_kafka_consumer_close(c1); - rd_kafka_consumer_close(c2); - rd_kafka_destroy(c1); - rd_kafka_destroy(c2); + rd_kafka_share_consumer_close(share_c1); + rd_kafka_share_consumer_close(share_c2); + rd_kafka_share_destroy(share_c1); + rd_kafka_share_destroy(share_c2); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -1259,7 +1299,7 @@ static void do_test_member_rejoin_with_epoch_zero(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_t *c; + rd_kafka_share_t *share_c; int found_heartbeats; const char *topic = test_mk_topic_name(__FUNCTION__, 0); const char *group = "test-share-group-rejoin"; @@ -1270,22 +1310,23 @@ static void do_test_member_rejoin_with_epoch_zero(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for initial join and assignment */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Verify initial assignment (member is now in stable state) */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions initially, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); @@ -1297,7 +1338,7 @@ static void do_test_member_rejoin_with_epoch_zero(void) { RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID, 0); /* Poll - consumer should rejoin with epoch=0 */ - rd_kafka_consumer_poll(c, 3000); + test_share_consume_msgs(share_c, 1, 6, 500, NULL, 0); /* Verify rejoin heartbeats */ found_heartbeats = wait_share_heartbeats(mcluster, 2, 500); @@ -1305,16 +1346,17 @@ static void do_test_member_rejoin_with_epoch_zero(void) { found_heartbeats); /* Verify consumer gets assignment back */ - rd_kafka_consumer_poll(c, 2000); - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions after rejoin, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); /* Cleanup */ - rd_kafka_consumer_close(c); - rd_kafka_destroy(c); + rd_kafka_share_consumer_close(share_c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -1332,8 +1374,8 @@ static void do_test_leaving_member_bumps_group_epoch(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription; - rd_kafka_topic_partition_list_t *c1_assign, *c2_assign; - rd_kafka_t *c1, *c2; + rd_kafka_topic_partition_list_t *share_c1_assign, *share_c2_assign; + rd_kafka_share_t *share_c1, *share_c2; const char *topic = test_mk_topic_name(__FUNCTION__, 0); const char *group = "test-share-group-leave-epoch"; @@ -1348,8 +1390,8 @@ static void do_test_leaving_member_bumps_group_epoch(void) { * to complete before we check assignments. */ rd_kafka_mock_sharegroup_set_heartbeat_interval(mcluster, 1000); - c1 = create_share_consumer(bootstraps, group); - c2 = create_share_consumer(bootstraps, group); + share_c1 = create_share_consumer(bootstraps, group); + share_c2 = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, @@ -1357,47 +1399,48 @@ static void do_test_leaving_member_bumps_group_epoch(void) { rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c1, subscription)); - TEST_CALL_ERR__(rd_kafka_subscribe(c2, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c1, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c2, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for both to join and rebalance to complete. Poll both * consumers in short alternating windows so both can heartbeat * and process their updated assignments. */ wait_share_heartbeats(mcluster, 4, 500); - for (int i = 0; i < 5; i++) { - rd_kafka_consumer_poll(c1, 200); - rd_kafka_consumer_poll(c2, 200); - } + test_share_consume_msgs(share_c1, 1, 4, 500, NULL, 0); + test_share_consume_msgs(share_c2, 1, 4, 500, NULL, 0); /* Verify initial distribution */ - TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); - TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assign)); - TEST_ASSERT(c1_assign->cnt + c2_assign->cnt == 4, + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &share_c1_assign)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c2), &share_c2_assign)); + TEST_ASSERT(share_c1_assign->cnt + share_c2_assign->cnt == 4, "Total should be 4 partitions, got %d", - c1_assign->cnt + c2_assign->cnt); - TEST_ASSERT(c1_assign->cnt > 0 && c2_assign->cnt > 0, + share_c1_assign->cnt + share_c2_assign->cnt); + TEST_ASSERT(share_c1_assign->cnt > 0 && share_c2_assign->cnt > 0, "Both consumers should have partitions"); - rd_kafka_topic_partition_list_destroy(c1_assign); - rd_kafka_topic_partition_list_destroy(c2_assign); + rd_kafka_topic_partition_list_destroy(share_c1_assign); + rd_kafka_topic_partition_list_destroy(share_c2_assign); /* C2 leaves (sends epoch=-1 leave heartbeat) */ - rd_kafka_consumer_close(c2); - rd_kafka_destroy(c2); + rd_kafka_share_consumer_close(share_c2); + rd_kafka_share_destroy(share_c2); /* Poll C1 to receive updated assignment (group epoch bumped) */ - rd_kafka_consumer_poll(c1, 6000); + test_share_consume_msgs(share_c1, 1, 12, 500, NULL, 0); /* Verify C1 got all partitions after C2 left */ - TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); - TEST_ASSERT(c1_assign->cnt == 4, + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &share_c1_assign)); + TEST_ASSERT(share_c1_assign->cnt == 4, "C1 should have all 4 partitions after C2 left, got %d", - c1_assign->cnt); - rd_kafka_topic_partition_list_destroy(c1_assign); + share_c1_assign->cnt); + rd_kafka_topic_partition_list_destroy(share_c1_assign); /* Cleanup */ - rd_kafka_consumer_close(c1); - rd_kafka_destroy(c1); + rd_kafka_share_consumer_close(share_c1); + rd_kafka_share_destroy(share_c1); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -1415,7 +1458,7 @@ static void do_test_partition_assignment_with_multiple_topics(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_t *c; + rd_kafka_share_t *share_c; const char *topic1 = "test-multi-topic-1"; const char *topic2 = "test-multi-topic-2"; const char *group = "test-share-group-multi-topic-sub"; @@ -1428,7 +1471,7 @@ static void do_test_partition_assignment_with_multiple_topics(void) { rd_kafka_mock_topic_create(mcluster, topic1, 3, 1); rd_kafka_mock_topic_create(mcluster, topic2, 2, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); /* Subscribe to both topics */ subscription = rd_kafka_topic_partition_list_new(2); @@ -1438,15 +1481,16 @@ static void do_test_partition_assignment_with_multiple_topics(void) { RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for join and assignment */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 3000); + test_share_consume_msgs(share_c, 1, 6, 500, NULL, 0); /* Verify assignment includes partitions from both topics */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 5, "Expected 5 partitions (3+2), got %d", assignment->cnt); @@ -1464,8 +1508,8 @@ static void do_test_partition_assignment_with_multiple_topics(void) { rd_kafka_topic_partition_list_destroy(assignment); /* Cleanup */ - rd_kafka_consumer_close(c); - rd_kafka_destroy(c); + rd_kafka_share_consumer_close(share_c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -1486,8 +1530,9 @@ static void do_test_multiple_members_partition_distribution(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription; - rd_kafka_topic_partition_list_t *c1_assign, *c2_assign, *c3_assign; - rd_kafka_t *c1, *c2, *c3; + rd_kafka_topic_partition_list_t *share_c1_assign, *share_c2_assign, + *share_c3_assign; + rd_kafka_share_t *share_c1, *share_c2, *share_c3; const char *topic = test_mk_topic_name(__FUNCTION__, 0); const char *group = "test-share-group-distribution"; int total_partitions; @@ -1498,9 +1543,9 @@ static void do_test_multiple_members_partition_distribution(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 6, 1); - c1 = create_share_consumer(bootstraps, group); - c2 = create_share_consumer(bootstraps, group); - c3 = create_share_consumer(bootstraps, group); + share_c1 = create_share_consumer(bootstraps, group); + share_c2 = create_share_consumer(bootstraps, group); + share_c3 = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, @@ -1508,55 +1553,61 @@ static void do_test_multiple_members_partition_distribution(void) { rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c1, subscription)); - TEST_CALL_ERR__(rd_kafka_subscribe(c2, subscription)); - TEST_CALL_ERR__(rd_kafka_subscribe(c3, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c1, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c2, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c3, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for all to join */ wait_share_heartbeats(mcluster, 5, 500); - rd_kafka_consumer_poll(c1, 3000); - rd_kafka_consumer_poll(c2, 3000); - rd_kafka_consumer_poll(c3, 3000); + test_share_consume_msgs(share_c1, 1, 6, 500, NULL, 0); + test_share_consume_msgs(share_c2, 1, 6, 500, NULL, 0); + test_share_consume_msgs(share_c3, 1, 6, 500, NULL, 0); /* Get assignments */ - TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); - TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assign)); - TEST_CALL_ERR__(rd_kafka_assignment(c3, &c3_assign)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &share_c1_assign)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c2), &share_c2_assign)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c3), &share_c3_assign)); - total_partitions = c1_assign->cnt + c2_assign->cnt + c3_assign->cnt; + total_partitions = + share_c1_assign->cnt + share_c2_assign->cnt + share_c3_assign->cnt; /* In share groups, partitions may be assigned to multiple consumers. * Each consumer should have at least 1 partition, and total should * be at least 6 (covering all partitions). */ - TEST_ASSERT(c1_assign->cnt >= 1, - "Expected c1 to have at least 1 partition, got %d", - c1_assign->cnt); - TEST_ASSERT(c2_assign->cnt >= 1, - "Expected c2 to have at least 1 partition, got %d", - c2_assign->cnt); - TEST_ASSERT(c3_assign->cnt >= 1, - "Expected c3 to have at least 1 partition, got %d", - c3_assign->cnt); + TEST_ASSERT(share_c1_assign->cnt >= 1, + "Expected share_c1 to have at least 1 partition, got %d", + share_c1_assign->cnt); + TEST_ASSERT(share_c2_assign->cnt >= 1, + "Expected share_c2 to have at least 1 partition, got %d", + share_c2_assign->cnt); + TEST_ASSERT(share_c3_assign->cnt >= 1, + "Expected share_c3 to have at least 1 partition, got %d", + share_c3_assign->cnt); TEST_ASSERT(total_partitions >= 6, "Expected at least 6 total partition assignments, got %d", total_partitions); - TEST_SAY("Partition distribution: c1=%d, c2=%d, c3=%d (total=%d)\n", - c1_assign->cnt, c2_assign->cnt, c3_assign->cnt, - total_partitions); + TEST_SAY( + "Partition distribution: share consumer 1=%d, share consumer 2=%d, " + "share consumer 3=%d (total=%d)\n", + share_c1_assign->cnt, share_c2_assign->cnt, share_c3_assign->cnt, + total_partitions); - rd_kafka_topic_partition_list_destroy(c1_assign); - rd_kafka_topic_partition_list_destroy(c2_assign); - rd_kafka_topic_partition_list_destroy(c3_assign); + rd_kafka_topic_partition_list_destroy(share_c1_assign); + rd_kafka_topic_partition_list_destroy(share_c2_assign); + rd_kafka_topic_partition_list_destroy(share_c3_assign); /* Cleanup */ - rd_kafka_consumer_close(c1); - rd_kafka_consumer_close(c2); - rd_kafka_consumer_close(c3); - rd_kafka_destroy(c1); - rd_kafka_destroy(c2); - rd_kafka_destroy(c3); + rd_kafka_share_consumer_close(share_c1); + rd_kafka_share_consumer_close(share_c2); + rd_kafka_share_consumer_close(share_c3); + rd_kafka_share_destroy(share_c1); + rd_kafka_share_destroy(share_c2); + rd_kafka_share_destroy(share_c3); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -1574,7 +1625,7 @@ static void do_test_leave_heartbeat_completes_successfully(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_t *c; + rd_kafka_share_t *share_c; rd_kafka_resp_err_t err; const char *topic = test_mk_topic_name(__FUNCTION__, 0); const char *group = "test-share-group-leave-success"; @@ -1585,22 +1636,23 @@ static void do_test_leave_heartbeat_completes_successfully(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for join and assignment */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Verify initial assignment */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions initially, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); @@ -1608,12 +1660,12 @@ static void do_test_leave_heartbeat_completes_successfully(void) { /* Leave group - should send leave heartbeat and complete. * Note: After close(), we cannot call rd_kafka_assignment() anymore * as the broker handle is destroyed. */ - err = rd_kafka_consumer_close(c); + err = rd_kafka_share_consumer_close(share_c); TEST_ASSERT(!err, "Expected close to succeed, got %s", rd_kafka_err2str(err)); /* Cleanup */ - rd_kafka_destroy(c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -1631,7 +1683,7 @@ static void do_test_leave_heartbeat_completes_on_error(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_t *c; + rd_kafka_share_t *share_c; rd_kafka_resp_err_t err; const char *topic = test_mk_topic_name(__FUNCTION__, 0); const char *group = "test-share-group-leave-error"; @@ -1642,22 +1694,23 @@ static void do_test_leave_heartbeat_completes_on_error(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for join and assignment */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Verify initial assignment */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions initially, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); @@ -1670,7 +1723,7 @@ static void do_test_leave_heartbeat_completes_on_error(void) { /* Leave group - should still complete despite error (best effort). * The key behavior: close() must not hang even when the leave * heartbeat gets an error response. */ - err = rd_kafka_consumer_close(c); + err = rd_kafka_share_consumer_close(share_c); /* Close completed (didn't hang) - this is the primary assertion. * The return code may vary depending on whether the error was * processed during leave. */ @@ -1678,7 +1731,7 @@ static void do_test_leave_heartbeat_completes_on_error(void) { rd_kafka_err2str(err)); /* Cleanup */ - rd_kafka_destroy(c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -1696,7 +1749,7 @@ static void do_test_subscription_change(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_t *c; + rd_kafka_share_t *share_c; int found_topicA = 0, found_topicB = 0, i; const char *topicA = "test-sub-change-topic-A"; const char *topicB = "test-sub-change-topic-B"; @@ -1709,7 +1762,7 @@ static void do_test_subscription_change(void) { rd_kafka_mock_topic_create(mcluster, topicA, 2, 1); rd_kafka_mock_topic_create(mcluster, topicB, 3, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); /* First subscription: topic A */ subscription = rd_kafka_topic_partition_list_new(1); @@ -1717,15 +1770,16 @@ static void do_test_subscription_change(void) { RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for assignment to topic A */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Verify assignment has topic A only */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 2, "Expected 2 partitions from topicA, got %d", assignment->cnt); @@ -1740,16 +1794,17 @@ static void do_test_subscription_change(void) { subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topicB, RD_KAFKA_PARTITION_UA); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for assignment update */ - rd_kafka_consumer_poll(c, 3000); + test_share_consume_msgs(share_c, 1, 6, 500, NULL, 0); wait_share_heartbeats(mcluster, 2, 500); - rd_kafka_consumer_poll(c, 3000); + test_share_consume_msgs(share_c, 1, 6, 500, NULL, 0); /* Verify assignment now has topic B only */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); found_topicA = 0; found_topicB = 0; for (i = 0; i < assignment->cnt; i++) { @@ -1767,8 +1822,8 @@ static void do_test_subscription_change(void) { rd_kafka_topic_partition_list_destroy(assignment); /* Cleanup */ - rd_kafka_consumer_close(c); - rd_kafka_destroy(c); + rd_kafka_share_consumer_close(share_c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -1787,7 +1842,7 @@ static void do_test_group_id_not_found_while_unsubscribed(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_t *c; + rd_kafka_share_t *share_c; rd_kafka_resp_err_t err, fatal_err; char errstr[256]; const char *topic = test_mk_topic_name(__FUNCTION__, 0); @@ -1799,22 +1854,23 @@ static void do_test_group_id_not_found_while_unsubscribed(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for initial join and assignment */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Verify initial assignment */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions initially, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); @@ -1822,8 +1878,8 @@ static void do_test_group_id_not_found_while_unsubscribed(void) { /* Unsubscribe first to transition to unsubscribed state. * The Java test has member in UNSUBSCRIBED state when the * error arrives. */ - TEST_CALL_ERR__(rd_kafka_unsubscribe(c)); - rd_kafka_consumer_poll(c, 2000); + TEST_CALL_ERR__(rd_kafka_share_unsubscribe(share_c)); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Now inject GROUP_ID_NOT_FOUND. * Since the member is unsubscribed, this should be benign. */ @@ -1832,21 +1888,22 @@ static void do_test_group_id_not_found_while_unsubscribed(void) { RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND, 0); /* Poll to process the error */ - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Verify consumer is NOT in fatal state - error should be benign */ - fatal_err = rd_kafka_fatal_error(c, errstr, sizeof(errstr)); + fatal_err = rd_kafka_fatal_error(test_share_consumer_get_rk(share_c), + errstr, sizeof(errstr)); TEST_ASSERT(fatal_err == RD_KAFKA_RESP_ERR_NO_ERROR, "Expected no fatal error when GROUP_ID_NOT_FOUND arrives " "while unsubscribed, but got: %s (%s)", rd_kafka_err2str(fatal_err), errstr); /* Close consumer */ - err = rd_kafka_consumer_close(c); + err = rd_kafka_share_consumer_close(share_c); TEST_SAY("Close returned: %s\n", rd_kafka_err2str(err)); /* Cleanup */ - rd_kafka_destroy(c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -1859,82 +1916,70 @@ static void do_test_group_id_not_found_while_unsubscribed(void) { * * When an active member (epoch > 0) receives GROUP_ID_NOT_FOUND, * it should be treated as a fatal error (group unexpectedly deleted). - * - * NOT YET COMPATIBLE: GROUP_ID_NOT_FOUND is not in the SGHB fatal error - * list in rdkafka_cgrp.c. It is treated as permanent non-fatal instead. - * See sghb_test_discrepancies.txt #2. */ -/* TODO KIP-932: Re-enable this test when GROUP_ID_NOT_FOUND is added to the - * SGHB fatal error list in rdkafka_cgrp.c. */ // static void do_test_group_id_not_found_while_stable_is_fatal(void) { // rd_kafka_mock_cluster_t *mcluster; // const char *bootstraps; // rd_kafka_topic_partition_list_t *subscription, *assignment; -// rd_kafka_t *c; +// rd_kafka_share_t *share_c; // rd_kafka_resp_err_t fatal_err; // char errstr[256]; // const char *topic = test_mk_topic_name(__FUNCTION__, 0); // const char *group = "test-share-group-id-not-found-stable"; -// + // SUB_TEST_QUICK(); -// + // /* Setup */ // mcluster = test_mock_cluster_new(1, &bootstraps); // rd_kafka_mock_topic_create(mcluster, topic, 3, 1); -// -// c = create_share_consumer(bootstraps, group); -// + +// share_c = create_share_consumer(bootstraps, group); + // subscription = rd_kafka_topic_partition_list_new(1); // rd_kafka_topic_partition_list_add(subscription, topic, // RD_KAFKA_PARTITION_UA); -// + // rd_kafka_mock_start_request_tracking(mcluster); -// TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); +// TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); // rd_kafka_topic_partition_list_destroy(subscription); -// + // /* Wait for initial join and assignment */ // wait_share_heartbeats(mcluster, 1, 500); -// rd_kafka_consumer_poll(c, 2000); -// +// test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); + // /* Verify initial assignment - member is in stable state */ -// TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); +// TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), +// &assignment)); // TEST_ASSERT(assignment->cnt == 3, // "Expected 3 partitions initially, got %d", // assignment->cnt); // rd_kafka_topic_partition_list_destroy(assignment); -// + // /* Inject GROUP_ID_NOT_FOUND for an active/stable member. -// * This should be treated as fatal (group unexpectedly deleted). -// */ +// * This should be treated as fatal (group unexpectedly deleted). */ // rd_kafka_mock_broker_push_request_error_rtts( // mcluster, 1, RD_KAFKAP_ShareGroupHeartbeat, 1, // RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND, 0); -// + // /* Poll - should trigger fatal error */ -// rd_kafka_consumer_poll(c, 3000); -// -// /* Check if consumer entered fatal state. -// * KNOWN ISSUE: GROUP_ID_NOT_FOUND is not in the SGHB fatal error -// * list in rdkafka_cgrp.c. It falls through to the default case -// * and is treated as a permanent (non-fatal) error. -// * See sghb_test_discrepancies.txt for details. */ -// fatal_err = rd_kafka_fatal_error(c, errstr, sizeof(errstr)); -// if (fatal_err != RD_KAFKA_RESP_ERR_NO_ERROR) -// TEST_SAY("Consumer entered fatal state: %s (%s)\n", -// rd_kafka_err2str(fatal_err), errstr); -// else -// TEST_SAY( -// "KNOWN ISSUE: GROUP_ID_NOT_FOUND while stable " -// "did not trigger fatal error " -// "(see sghb_test_discrepancies.txt)\n"); -// +// test_share_consume_msgs(share_c, 1, 6, 500, NULL, 0); + +// /* Verify consumer entered fatal state */ +// fatal_err = rd_kafka_fatal_error(test_share_consumer_get_rk(share_c), +// errstr, sizeof(errstr)); +// TEST_ASSERT(fatal_err != RD_KAFKA_RESP_ERR_NO_ERROR, +// "Expected consumer to be in fatal state after " +// "GROUP_ID_NOT_FOUND while stable"); +// TEST_SAY("Consumer entered fatal state: %s (%s)\n", +// rd_kafka_err2str(fatal_err), errstr); + // /* Cleanup */ -// rd_kafka_consumer_close(c); -// rd_kafka_destroy(c); -// +// rd_kafka_share_consumer_close(share_c); +// rd_kafka_share_destroy(share_c); + // rd_kafka_mock_stop_request_tracking(mcluster); // test_mock_cluster_destroy(mcluster); -// + // SUB_TEST_PASS(); // } @@ -1948,7 +1993,7 @@ static void do_test_invalid_request_error(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription; - rd_kafka_t *c; + rd_kafka_share_t *share_c; rd_kafka_resp_err_t fatal_err; char errstr[256]; const char *topic = test_mk_topic_name(__FUNCTION__, 0); @@ -1960,19 +2005,19 @@ static void do_test_invalid_request_error(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for initial join */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Inject INVALID_REQUEST error (fatal) */ rd_kafka_mock_broker_push_request_error_rtts( @@ -1980,10 +2025,11 @@ static void do_test_invalid_request_error(void) { RD_KAFKA_RESP_ERR_INVALID_REQUEST, 0); /* Poll - should trigger fatal error */ - rd_kafka_consumer_poll(c, 3000); + test_share_consume_msgs(share_c, 1, 6, 500, NULL, 0); /* Verify consumer entered fatal state */ - fatal_err = rd_kafka_fatal_error(c, errstr, sizeof(errstr)); + fatal_err = rd_kafka_fatal_error(test_share_consumer_get_rk(share_c), + errstr, sizeof(errstr)); TEST_ASSERT(fatal_err != RD_KAFKA_RESP_ERR_NO_ERROR, "Expected consumer to be in fatal state after " "INVALID_REQUEST"); @@ -1991,8 +2037,8 @@ static void do_test_invalid_request_error(void) { rd_kafka_err2str(fatal_err), errstr); /* Cleanup */ - rd_kafka_consumer_close(c); - rd_kafka_destroy(c); + rd_kafka_share_consumer_close(share_c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -2010,7 +2056,7 @@ static void do_test_unsupported_version_error(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription; - rd_kafka_t *c; + rd_kafka_share_t *share_c; rd_kafka_resp_err_t fatal_err; char errstr[256]; const char *topic = test_mk_topic_name(__FUNCTION__, 0); @@ -2022,19 +2068,19 @@ static void do_test_unsupported_version_error(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for initial join */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Inject UNSUPPORTED_VERSION error (fatal) */ rd_kafka_mock_broker_push_request_error_rtts( @@ -2042,10 +2088,11 @@ static void do_test_unsupported_version_error(void) { RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION, 0); /* Poll - should trigger fatal error */ - rd_kafka_consumer_poll(c, 3000); + test_share_consume_msgs(share_c, 1, 6, 500, NULL, 0); /* Verify consumer entered fatal state */ - fatal_err = rd_kafka_fatal_error(c, errstr, sizeof(errstr)); + fatal_err = rd_kafka_fatal_error(test_share_consumer_get_rk(share_c), + errstr, sizeof(errstr)); TEST_ASSERT(fatal_err != RD_KAFKA_RESP_ERR_NO_ERROR, "Expected consumer to be in fatal state after " "UNSUPPORTED_VERSION"); @@ -2053,8 +2100,8 @@ static void do_test_unsupported_version_error(void) { rd_kafka_err2str(fatal_err), errstr); /* Cleanup */ - rd_kafka_consumer_close(c); - rd_kafka_destroy(c); + rd_kafka_share_consumer_close(share_c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -2072,7 +2119,7 @@ static void do_test_coordinator_load_in_progress_error(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_t *c; + rd_kafka_share_t *share_c; int found_heartbeats; const char *topic = test_mk_topic_name(__FUNCTION__, 0); const char *group = "test-share-group-coord-load"; @@ -2083,22 +2130,23 @@ static void do_test_coordinator_load_in_progress_error(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for initial join and assignment */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Verify initial assignment */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions initially, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); @@ -2109,7 +2157,7 @@ static void do_test_coordinator_load_in_progress_error(void) { RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS, 0); /* Poll - consumer should handle transient error and retry */ - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Verify heartbeats continue after transient error */ found_heartbeats = wait_share_heartbeats(mcluster, 2, 500); @@ -2119,15 +2167,16 @@ static void do_test_coordinator_load_in_progress_error(void) { found_heartbeats); /* Verify consumer still has assignment */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions after retry, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); /* Cleanup */ - rd_kafka_consumer_close(c); - rd_kafka_destroy(c); + rd_kafka_share_consumer_close(share_c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -2145,7 +2194,7 @@ static void do_test_graceful_shutdown_stable_state(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_t *c; + rd_kafka_share_t *share_c; rd_kafka_resp_err_t err; int found_heartbeats; const char *topic = test_mk_topic_name(__FUNCTION__, 0); @@ -2157,22 +2206,23 @@ static void do_test_graceful_shutdown_stable_state(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); rd_kafka_mock_start_request_tracking(mcluster); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for initial join and assignment */ wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Verify initial assignment - member is in stable state */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions initially, got %d", assignment->cnt); rd_kafka_topic_partition_list_destroy(assignment); @@ -2183,7 +2233,7 @@ static void do_test_graceful_shutdown_stable_state(void) { rd_kafka_mock_start_request_tracking(mcluster); /* Close consumer gracefully - should send leave heartbeat */ - err = rd_kafka_consumer_close(c); + err = rd_kafka_share_consumer_close(share_c); TEST_ASSERT(!err, "Expected close to succeed, got %s", rd_kafka_err2str(err)); @@ -2192,7 +2242,7 @@ static void do_test_graceful_shutdown_stable_state(void) { TEST_SAY("Found %d heartbeats during shutdown\n", found_heartbeats); /* Cleanup */ - rd_kafka_destroy(c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -2209,7 +2259,7 @@ static void do_test_resubscribe_after_unsubscribe(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_t *c; + rd_kafka_share_t *share_c; const char *topic = test_mk_topic_name(__FUNCTION__, 0); const char *group = "test-share-group-resubscribe"; @@ -2219,7 +2269,7 @@ static void do_test_resubscribe_after_unsubscribe(void) { mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, @@ -2228,11 +2278,12 @@ static void do_test_resubscribe_after_unsubscribe(void) { rd_kafka_mock_start_request_tracking(mcluster); /* First subscribe */ - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); wait_share_heartbeats(mcluster, 1, 500); - rd_kafka_consumer_poll(c, 2000); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions on first subscribe, got %d", assignment->cnt); @@ -2240,11 +2291,12 @@ static void do_test_resubscribe_after_unsubscribe(void) { /* Unsubscribe */ TEST_SAY("Unsubscribing...\n"); - TEST_CALL_ERR__(rd_kafka_unsubscribe(c)); - rd_kafka_consumer_poll(c, 2000); + TEST_CALL_ERR__(rd_kafka_share_unsubscribe(share_c)); + test_share_consume_msgs(share_c, 1, 4, 500, NULL, 0); /* Verify no assignment after unsubscribe */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_ASSERT(assignment->cnt == 0, "Expected 0 partitions after unsubscribe, got %d", assignment->cnt); @@ -2252,14 +2304,15 @@ static void do_test_resubscribe_after_unsubscribe(void) { /* Resubscribe */ TEST_SAY("Resubscribing...\n"); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); rd_kafka_topic_partition_list_destroy(subscription); wait_share_heartbeats(mcluster, 2, 500); - rd_kafka_consumer_poll(c, 3000); + test_share_consume_msgs(share_c, 1, 6, 500, NULL, 0); /* Verify assignment restored */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_SAY("Assignment after resubscribe: %d partitions\n", assignment->cnt); TEST_ASSERT(assignment->cnt == 3, @@ -2268,8 +2321,8 @@ static void do_test_resubscribe_after_unsubscribe(void) { rd_kafka_topic_partition_list_destroy(assignment); /* Cleanup */ - rd_kafka_consumer_close(c); - rd_kafka_destroy(c); + rd_kafka_share_consumer_close(share_c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -2286,8 +2339,8 @@ static void do_test_consumer_leave_rebalance(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; rd_kafka_topic_partition_list_t *subscription; - rd_kafka_topic_partition_list_t *c1_assign, *c2_assign; - rd_kafka_t *c1, *c2, *c3; + rd_kafka_topic_partition_list_t *share_c1_assign, *share_c2_assign; + rd_kafka_share_t *share_c1, *share_c2, *share_c3; int final_total; const char *topic = test_mk_topic_name(__FUNCTION__, 0); const char *group = "test-share-group-leave-rebalance"; @@ -2302,67 +2355,75 @@ static void do_test_consumer_leave_rebalance(void) { rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); - c1 = create_share_consumer(bootstraps, group); - c2 = create_share_consumer(bootstraps, group); - c3 = create_share_consumer(bootstraps, group); + share_c1 = create_share_consumer(bootstraps, group); + share_c2 = create_share_consumer(bootstraps, group); + share_c3 = create_share_consumer(bootstraps, group); rd_kafka_mock_start_request_tracking(mcluster); /* All three join */ - TEST_CALL_ERR__(rd_kafka_subscribe(c1, subscription)); - TEST_CALL_ERR__(rd_kafka_subscribe(c2, subscription)); - TEST_CALL_ERR__(rd_kafka_subscribe(c3, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c1, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c2, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c3, subscription)); rd_kafka_topic_partition_list_destroy(subscription); /* Wait for initial balance */ wait_share_heartbeats(mcluster, 4, 500); - rd_kafka_consumer_poll(c1, 2000); - rd_kafka_consumer_poll(c2, 2000); - rd_kafka_consumer_poll(c3, 2000); + test_share_consume_msgs(share_c1, 1, 4, 500, NULL, 0); + test_share_consume_msgs(share_c2, 1, 4, 500, NULL, 0); + test_share_consume_msgs(share_c3, 1, 4, 500, NULL, 0); wait_share_heartbeats(mcluster, 3, 500); - rd_kafka_consumer_poll(c1, 2000); - rd_kafka_consumer_poll(c2, 2000); - rd_kafka_consumer_poll(c3, 2000); + test_share_consume_msgs(share_c1, 1, 4, 500, NULL, 0); + test_share_consume_msgs(share_c2, 1, 4, 500, NULL, 0); + test_share_consume_msgs(share_c3, 1, 4, 500, NULL, 0); /* Get initial assignments */ - TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); - TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assign)); - TEST_SAY("Initial: c1=%d, c2=%d (before c3 leaves)\n", c1_assign->cnt, - c2_assign->cnt); - rd_kafka_topic_partition_list_destroy(c1_assign); - rd_kafka_topic_partition_list_destroy(c2_assign); - - /* c3 leaves */ - TEST_SAY("Consumer c3 leaving...\n"); - rd_kafka_consumer_close(c3); - rd_kafka_destroy(c3); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &share_c1_assign)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c2), &share_c2_assign)); + TEST_SAY( + "Initial: share consumer 1=%d, share consumer 2=%d (before share " + "consumer 3 leaves)\n", + share_c1_assign->cnt, share_c2_assign->cnt); + rd_kafka_topic_partition_list_destroy(share_c1_assign); + rd_kafka_topic_partition_list_destroy(share_c2_assign); + + /* share consumer 3 leaves */ + TEST_SAY("Share consumer 3 leaving...\n"); + rd_kafka_share_consumer_close(share_c3); + rd_kafka_share_destroy(share_c3); /* Poll remaining consumers for rebalance */ wait_share_heartbeats(mcluster, 3, 500); - rd_kafka_consumer_poll(c1, 3000); - rd_kafka_consumer_poll(c2, 3000); + test_share_consume_msgs(share_c1, 1, 6, 500, NULL, 0); + test_share_consume_msgs(share_c2, 1, 6, 500, NULL, 0); /* Get new assignments */ - TEST_CALL_ERR__(rd_kafka_assignment(c1, &c1_assign)); - TEST_CALL_ERR__(rd_kafka_assignment(c2, &c2_assign)); - final_total = c1_assign->cnt + c2_assign->cnt; - TEST_SAY("After c3 leave: c1=%d, c2=%d\n", c1_assign->cnt, - c2_assign->cnt); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &share_c1_assign)); + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c2), &share_c2_assign)); + final_total = share_c1_assign->cnt + share_c2_assign->cnt; + TEST_SAY( + "After share consumer 3 leave: share consumer 1=%d, share consumer " + "2=%d\n", + share_c1_assign->cnt, share_c2_assign->cnt); /* Total should be >= 6 partitions among remaining consumers */ TEST_ASSERT(final_total >= 6, "Expected >= 6 partitions after rebalance, got %d", final_total); - rd_kafka_topic_partition_list_destroy(c1_assign); - rd_kafka_topic_partition_list_destroy(c2_assign); + rd_kafka_topic_partition_list_destroy(share_c1_assign); + rd_kafka_topic_partition_list_destroy(share_c2_assign); /* Cleanup */ - rd_kafka_consumer_close(c1); - rd_kafka_consumer_close(c2); - rd_kafka_destroy(c1); - rd_kafka_destroy(c2); + rd_kafka_share_consumer_close(share_c1); + rd_kafka_share_consumer_close(share_c2); + rd_kafka_share_destroy(share_c1); + rd_kafka_share_destroy(share_c2); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -2378,7 +2439,7 @@ static void do_test_double_close(void) { const char *bootstraps; const char *topic = test_mk_topic_name(__FUNCTION__, 1); const char *group_id = topic; - rd_kafka_t *c; + rd_kafka_share_t *share_c; rd_kafka_topic_partition_list_t *subscription; rd_kafka_resp_err_t err; @@ -2388,29 +2449,29 @@ static void do_test_double_close(void) { rd_kafka_mock_topic_create(mcluster, topic, 3, 1); rd_kafka_mock_start_request_tracking(mcluster); - c = create_share_consumer(bootstraps, group_id); + share_c = create_share_consumer(bootstraps, group_id); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); wait_share_heartbeats(mcluster, 3, 500); /* First close - should succeed */ - err = rd_kafka_consumer_close(c); + err = rd_kafka_share_consumer_close(share_c); TEST_ASSERT(!err, "Expected first close to succeed, got %s", rd_kafka_err2str(err)); /* Second close - should handle gracefully without crashing. * The Java equivalent tests verify the CompletableFuture * completes immediately on double-leave. */ - err = rd_kafka_consumer_close(c); + err = rd_kafka_share_consumer_close(share_c); TEST_SAY("Second close returned: %s (no crash - correct)\n", rd_kafka_err2str(err)); rd_kafka_topic_partition_list_destroy(subscription); - rd_kafka_destroy(c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -2426,10 +2487,9 @@ static void do_test_empty_topic_subscription(void) { const char *bootstraps; const char *topic = test_mk_topic_name(__FUNCTION__, 1); const char *group_id = topic; - rd_kafka_t *c; + rd_kafka_share_t *share_c; rd_kafka_topic_partition_list_t *subscription, *assignment; - rd_kafka_message_t *msg; - int i, msg_count = 0; + int msg_count; SUB_TEST_QUICK(); @@ -2437,26 +2497,20 @@ static void do_test_empty_topic_subscription(void) { rd_kafka_mock_topic_create(mcluster, topic, 3, 1); rd_kafka_mock_start_request_tracking(mcluster); - c = create_share_consumer(bootstraps, group_id); + share_c = create_share_consumer(bootstraps, group_id); subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA); - TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription)); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c, subscription)); wait_share_heartbeats(mcluster, 3, 500); /* Poll empty topic - should get assignment but no messages */ - for (i = 0; i < 10; i++) { - msg = rd_kafka_consumer_poll(c, 200); - if (msg) { - if (!msg->err) - msg_count++; - rd_kafka_message_destroy(msg); - } - } + msg_count = test_share_consume_msgs(share_c, 1, 10, 500, NULL, 0); - TEST_CALL_ERR__(rd_kafka_assignment(c, &assignment)); + TEST_CALL_ERR__(rd_kafka_assignment(test_share_consumer_get_rk(share_c), + &assignment)); TEST_SAY("Empty topic: %d partitions, %d messages\n", assignment->cnt, msg_count); TEST_ASSERT(assignment->cnt == 3, "Expected 3 partitions, got %d", @@ -2464,8 +2518,8 @@ static void do_test_empty_topic_subscription(void) { rd_kafka_topic_partition_list_destroy(subscription); rd_kafka_topic_partition_list_destroy(assignment); - rd_kafka_consumer_close(c); - rd_kafka_destroy(c); + rd_kafka_share_consumer_close(share_c); + rd_kafka_share_destroy(share_c); rd_kafka_mock_stop_request_tracking(mcluster); test_mock_cluster_destroy(mcluster); @@ -2484,7 +2538,7 @@ static void do_test_empty_topic_subscription(void) { static void do_test_empty_topic_list_subscription(void) { rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; - rd_kafka_t *c; + rd_kafka_share_t *share_c; rd_kafka_topic_partition_list_t *empty_list; rd_kafka_resp_err_t err; const char *group = "test-share-group-empty-topic-list"; @@ -2493,11 +2547,11 @@ static void do_test_empty_topic_list_subscription(void) { mcluster = test_mock_cluster_new(1, &bootstraps); - c = create_share_consumer(bootstraps, group); + share_c = create_share_consumer(bootstraps, group); /* Subscribe with empty topic list - should return INVALID_ARG */ empty_list = rd_kafka_topic_partition_list_new(0); - err = rd_kafka_subscribe(c, empty_list); + err = rd_kafka_share_subscribe(share_c, empty_list); TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG, "Expected INVALID_ARG from subscribe(empty_list), got %s", rd_kafka_err2str(err)); @@ -2505,7 +2559,7 @@ static void do_test_empty_topic_list_subscription(void) { rd_kafka_err2str(err)); rd_kafka_topic_partition_list_destroy(empty_list); - rd_kafka_destroy(c); + rd_kafka_share_destroy(share_c); test_mock_cluster_destroy(mcluster); From c5a73dc13ed3cde63d3f47c0072466b779c799e8 Mon Sep 17 00:00:00 2001 From: Ankith-Confluent Date: Sat, 7 Mar 2026 17:00:54 +0530 Subject: [PATCH 2/2] Update comment in rd_kafka_q_serve_share_rkmessages to clarify share consumer behavior --- src/rdkafka_queue.c | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/rdkafka_queue.c b/src/rdkafka_queue.c index 7c9fff0de..5e41b726b 100644 --- a/src/rdkafka_queue.c +++ b/src/rdkafka_queue.c @@ -940,8 +940,28 @@ rd_kafka_q_serve_share_rkmessages(rd_kafka_q_t *rkq, } } - /* Destroy other op types */ - rd_kafka_op_destroy(rko); + /* TODO KIP-932: + * For a share consumer, we are not using version barriers, and ideally, + * tmpq should be empty. However, the discard code is retained as + * outdated ops may still appear due to rd_kafka_share_subscribe + * reusing rd_kafka_subscribe which bumps the version. */ + + /* Discard non-desired and already handled ops */ + next = TAILQ_FIRST(&tmpq); + while (next) { + rko = next; + next = TAILQ_NEXT(next, rko_link); + rd_kafka_op_destroy(rko); + } + + /* Discard ctrl msgs */ + next = TAILQ_FIRST(&ctrl_msg_q); + while (next) { + rko = next; + next = TAILQ_NEXT(next, rko_link); + rd_kafka_op_destroy(rko); + } + rd_kafka_app_polled(rk, rkq); *rkmessages_size_out = cnt; return error;