diff --git a/src/rdkafka_mock.c b/src/rdkafka_mock.c index f3d3e0b08..c99348e60 100644 --- a/src/rdkafka_mock.c +++ b/src/rdkafka_mock.c @@ -2173,6 +2173,34 @@ rd_kafka_mock_topic_create(rd_kafka_mock_cluster_t *mcluster, rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); } +rd_kafka_resp_err_t +rd_kafka_mock_topic_delete(rd_kafka_mock_cluster_t *mcluster, + const char *topic) { + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK); + + rko->rko_u.mock.name = rd_strdup(topic); + rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_TOPIC_DELETE; + + return rd_kafka_op_err_destroy( + rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); +} + +rd_kafka_resp_err_t +rd_kafka_mock_partition_delete_records(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + int32_t partition, + int64_t before_offset) { + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK); + + rko->rko_u.mock.name = rd_strdup(topic); + rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_PART_DELETE_RECORDS; + rko->rko_u.mock.partition = partition; + rko->rko_u.mock.lo = before_offset; + + return rd_kafka_op_err_destroy( + rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); +} + rd_kafka_resp_err_t rd_kafka_mock_partition_set_leader(rd_kafka_mock_cluster_t *mcluster, const char *topic, @@ -2514,6 +2542,50 @@ rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster, return RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION; break; + case RD_KAFKA_MOCK_CMD_TOPIC_DELETE: + mtopic = + rd_kafka_mock_topic_find(mcluster, rko->rko_u.mock.name); + if (!mtopic) + return RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; + + rd_kafka_mock_topic_destroy(mtopic); + break; + + case RD_KAFKA_MOCK_CMD_PART_DELETE_RECORDS: { + rd_kafka_mock_msgset_t *mset, *tmp; + int64_t before_offset = rko->rko_u.mock.lo; + + mtopic = + rd_kafka_mock_topic_find(mcluster, rko->rko_u.mock.name); + mpart = rd_kafka_mock_partition_find(mtopic, + rko->rko_u.mock.partition); + if (!mpart) + return RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; + + if (before_offset > mpart->end_offset) + return RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE; + + /* Remove all msgsets fully before before_offset */ + TAILQ_FOREACH_SAFE(mset, &mpart->msgsets, link, tmp) { + if (mset->last_offset >= before_offset) + break; + rd_kafka_mock_msgset_destroy(mpart, mset); + } + + /* Advance start offset */ + mpart->start_offset = before_offset; + if (mpart->update_follower_start_offset) + mpart->follower_start_offset = mpart->start_offset; + + rd_kafka_dbg( + mcluster->rk, MOCK, "MOCK", + "Delete records %s [%" PRId32 "] before offset %" PRId64 + " (log now %" PRId64 "..%" PRId64 ")", + rko->rko_u.mock.name, rko->rko_u.mock.partition, + before_offset, mpart->start_offset, mpart->end_offset); + break; + } + case RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR: mtopic = rd_kafka_mock_topic_get(mcluster, rko->rko_u.mock.name, -1); diff --git a/src/rdkafka_mock.h b/src/rdkafka_mock.h index c0f1b1def..dba0fc37e 100644 --- a/src/rdkafka_mock.h +++ b/src/rdkafka_mock.h @@ -249,6 +249,47 @@ rd_kafka_mock_topic_create(rd_kafka_mock_cluster_t *mcluster, int replication_factor); +/** + * @brief Deletes a topic and all its partitions. + * + * Subsequent requests referencing the topic will recieve + * UNKNOWN_TOPIC_OR_PARTITION. + * + * @param mcluster The mock cluster instance. + * @param topic The topic to delete. + * + * @return RD_KAFKA_RESP_ERR_NO_ERROR on success + * RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PARTITION if the topic does not + * exist. + */ +RD_EXPORT rd_kafka_resp_err_t +rd_kafka_mock_topic_delete(rd_kafka_mock_cluster_t *mcluster, + const char *topic); + + +/** + * @brief Delete records before \p before_offset in \p topic [\p partition]. + * + * Advances the partition's start offset (log start offset) to + * \p before_offset and removes all message sets fully before that offset. + * + * @param mcluster The mock cluster. + * @param topic Topic name. + * @param partition Partition id. + * @param before_offset Delete all records before this offset. + * The new start offset will be set to this value. + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, + * RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART if partition doesn't exist, + * RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE if before_offset > end_offset. + */ +RD_EXPORT rd_kafka_resp_err_t +rd_kafka_mock_partition_delete_records(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + int32_t partition, + int64_t before_offset); + + /** * @brief Sets the partition leader. * @@ -647,6 +688,21 @@ RD_EXPORT void rd_kafka_mock_sharegroup_set_record_lock_duration( rd_kafka_mock_cluster_t *mcluster, int lock_duration_ms); +/** + * @brief Set the maximum number of members allowed in a share group. + * + * New members attempting to join via ShareGroupHeartbeat when the group + * is at capacity will receive GROUP_MAX_SIZE_REACHED. + * + * Default is 0 (unlimited). + * + * @param mcluster Mock cluster instance. + * @param max_size Maximum members allowed. 0 = unlimited. + */ +RD_EXPORT void +rd_kafka_mock_sharegroup_set_max_size(rd_kafka_mock_cluster_t *mcluster, + int max_size); + /** * @brief Set a manual target assignment for a sharegroup. * diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 328a8f9e1..90e9969a0 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -3094,6 +3094,17 @@ rd_kafka_mock_handle_ShareGroupHeartbeat(rd_kafka_mock_connection_t *mconn, } else if (MemberEpoch == 0) { /* JOIN: New member wants to join */ + + /* Check max group size before allowing join */ + if (mshgrp->max_size > 0 && + !rd_kafka_mock_sharegroup_member_find(mshgrp, + &MemberId) && + mshgrp->member_cnt >= mshgrp->max_size) { + err = RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED; + mtx_unlock(&mcluster->lock); + goto build_response; + } + member = rd_kafka_mock_sharegroup_member_get( mshgrp, &MemberId, MemberEpoch, mconn); diff --git a/src/rdkafka_mock_int.h b/src/rdkafka_mock_int.h index 42a70f58f..5a0369040 100644 --- a/src/rdkafka_mock_int.h +++ b/src/rdkafka_mock_int.h @@ -254,6 +254,8 @@ typedef struct rd_kafka_mock_sharegroup_s { int record_lock_duration_ms; /**< Per-record lock duration in ms. * 0 = use session_timeout_ms * as fallback (default 0). */ + int max_size; /**< Max members allowed. + * 0 = unlimited (default). */ } rd_kafka_mock_sharegroup_t; /** @@ -573,6 +575,9 @@ struct rd_kafka_mock_cluster_s { /** Per-record lock duration in ms (KIP 932). * 0 = use session_timeout_ms. */ int sharegroup_record_lock_duration_ms; + /** Max members allowed in share group (KIP 932). + * 0 = unlimited. */ + int sharegroup_max_size; } defaults; /**< Dynamic array of IO handlers for corresponding fd in .fds */ diff --git a/src/rdkafka_mock_sharegrp.c b/src/rdkafka_mock_sharegrp.c index 7420eb143..f55ba6b78 100644 --- a/src/rdkafka_mock_sharegrp.c +++ b/src/rdkafka_mock_sharegrp.c @@ -58,6 +58,7 @@ void rd_kafka_mock_sharegrps_init(rd_kafka_mock_cluster_t *mcluster) { mcluster->defaults.sharegroup_heartbeat_interval_ms = 5000; mcluster->defaults.sharegroup_max_delivery_attempts = 5; mcluster->defaults.sharegroup_record_lock_duration_ms = 0; + mcluster->defaults.sharegroup_max_size = 0; } /** @@ -107,6 +108,7 @@ rd_kafka_mock_sharegroup_get(rd_kafka_mock_cluster_t *mcluster, mcluster->defaults.sharegroup_max_delivery_attempts; mshgrp->record_lock_duration_ms = mcluster->defaults.sharegroup_record_lock_duration_ms; + mshgrp->max_size = mcluster->defaults.sharegroup_max_size; rd_kafka_timer_start(&mcluster->timers, &mshgrp->session_tmr, 1000 * 1000 /* 1s */, @@ -739,6 +741,19 @@ void rd_kafka_mock_sharegroup_set_record_lock_duration( mtx_unlock(&mcluster->lock); } +/** + * @brief Set the maximum number of members allowed in a share group. + */ +void rd_kafka_mock_sharegroup_set_max_size(rd_kafka_mock_cluster_t *mcluster, + int max_size) { + rd_kafka_mock_sharegroup_t *mshgrp; + mtx_lock(&mcluster->lock); + TAILQ_FOREACH(mshgrp, &mcluster->sharegrps, link) + mshgrp->max_size = max_size; + mcluster->defaults.sharegroup_max_size = max_size; + mtx_unlock(&mcluster->lock); +} + /** * @brief Destroy share fetch session. */ diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 993f5483e..0b4ef2490 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -587,6 +587,8 @@ struct rd_kafka_op_s { enum { RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR, RD_KAFKA_MOCK_CMD_TOPIC_CREATE, + RD_KAFKA_MOCK_CMD_TOPIC_DELETE, + RD_KAFKA_MOCK_CMD_PART_DELETE_RECORDS, RD_KAFKA_MOCK_CMD_PART_SET_LEADER, RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER, RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER_WMARKS, @@ -637,6 +639,8 @@ struct rd_kafka_op_s { * BROKER_SET_UPDOWN * APIVERSION_SET (minver) * BROKER_SET_RTT + * PART_DELETE_RECORDS + * (before_offset) */ int64_t hi; /**< High offset, for: * TOPIC_CREATE (repl fact) diff --git a/tests/0009-mock_cluster.c b/tests/0009-mock_cluster.c index a40fde2e2..ff6103e6a 100644 --- a/tests/0009-mock_cluster.c +++ b/tests/0009-mock_cluster.c @@ -36,6 +36,179 @@ +/** + * @brief Test rd_kafka_mock_topic_delete(). + * + * Create a topic, produce to it, delete it, then verify that + * a subsequent fetch receives UNKNOWN_TOPIC_OR_PARTITION. + */ +static void do_test_topic_delete(void) { + const char *topic = test_mk_topic_name("0009_topic_delete", 1); + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_t *p, *c; + rd_kafka_conf_t *conf; + rd_kafka_topic_t *rkt; + rd_kafka_resp_err_t err; + const char *bootstraps; + rd_kafka_topic_partition_list_t *parts; + rd_kafka_message_t *rkm; + + SUB_TEST_QUICK(); + + mcluster = test_mock_cluster_new(3, &bootstraps); + + /* Create topic explicitly so auto-create doesn't interfere + * after deletion. */ + TEST_CALL_ERR__(rd_kafka_mock_topic_create(mcluster, topic, 1, 1)); + + test_conf_init(&conf, NULL, 30); + + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "allow.auto.create.topics", "false"); + + /* Producer */ + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + p = test_create_handle(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(conf)); + + rkt = test_create_producer_topic(p, topic, NULL); + + /* Produce */ + test_produce_msgs(p, rkt, 0, 0, 0, 10, NULL, 0); + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(p); + + /* Delete the topic */ + TEST_CALL_ERR__(rd_kafka_mock_topic_delete(mcluster, topic)); + + /* Verify deleting a non-existent topic returns error */ + err = rd_kafka_mock_topic_delete(mcluster, topic); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, + "Expected UNKNOWN_TOPIC_OR_PART, got %s", + rd_kafka_err2str(err)); + + /* Consumer */ + test_conf_set(conf, "auto.offset.reset", "earliest"); + c = test_create_consumer(topic, NULL, conf, NULL); + + /* Assign */ + parts = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(parts, topic, 0); + test_consumer_assign("CONSUME_DELETED", c, parts); + rd_kafka_topic_partition_list_destroy(parts); + + /* Consume - expect no data messages since topic is deleted */ + rkm = rd_kafka_consumer_poll(c, 5000); + if (rkm) { + TEST_ASSERT(rkm->err != RD_KAFKA_RESP_ERR_NO_ERROR, + "Expected error or no message from deleted topic, " + "got message at offset %" PRId64, + rkm->offset); + rd_kafka_message_destroy(rkm); + } + + rd_kafka_destroy(c); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + + +/** + * @brief Test rd_kafka_mock_partition_delete_records(). + * + * Produce messages, delete records before an offset, then verify that: + * 1. Consuming from the beginning starts at the new start offset. + * 2. The API returns errors for invalid inputs. + */ +static void do_test_partition_delete_records(void) { + const char *topic = test_mk_topic_name("0009_delete_records", 1); + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_t *p, *c; + rd_kafka_conf_t *conf; + rd_kafka_topic_t *rkt; + rd_kafka_resp_err_t err; + const char *bootstraps; + const int msgcnt = 100; + const int64_t delete_before = 50; + rd_kafka_topic_partition_list_t *parts; + rd_kafka_message_t *rkm; + + SUB_TEST_QUICK(); + + mcluster = test_mock_cluster_new(3, &bootstraps); + + /* Create topic with 1 partition */ + TEST_CALL_ERR__(rd_kafka_mock_topic_create(mcluster, topic, 1, 1)); + + test_conf_init(&conf, NULL, 30); + + test_conf_set(conf, "bootstrap.servers", bootstraps); + + /* Producer */ + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + p = test_create_handle(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(conf)); + + rkt = test_create_producer_topic(p, topic, NULL); + + /* Produce */ + test_produce_msgs(p, rkt, 0, 0, 0, msgcnt, NULL, 0); + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(p); + + /* Delete records before offset 50 */ + TEST_CALL_ERR__(rd_kafka_mock_partition_delete_records( + mcluster, topic, 0, delete_before)); + + /* Verify error for non-existent topic */ + err = rd_kafka_mock_partition_delete_records(mcluster, "no_such_topic", + 0, 10); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, + "Expected UNKNOWN_TOPIC_OR_PART for bad topic, got %s", + rd_kafka_err2str(err)); + + /* Verify error for non-existent partition */ + err = rd_kafka_mock_partition_delete_records(mcluster, topic, 99, 10); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, + "Expected UNKNOWN_TOPIC_OR_PART for bad partition, got %s", + rd_kafka_err2str(err)); + + /* Verify error for offset beyond end */ + err = rd_kafka_mock_partition_delete_records(mcluster, topic, 0, + msgcnt + 100); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE, + "Expected OFFSET_OUT_OF_RANGE, got %s", + rd_kafka_err2str(err)); + + /* Consumer */ + test_conf_set(conf, "auto.offset.reset", "earliest"); + c = test_create_consumer(topic, NULL, conf, NULL); + + /* Assign */ + parts = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(parts, topic, 0); + test_consumer_assign("CONSUME_AFTER_DELETE", c, parts); + rd_kafka_topic_partition_list_destroy(parts); + + /* Consume - first message should be at offset >= delete_before */ + rkm = rd_kafka_consumer_poll(c, 10000); + TEST_ASSERT(rkm != NULL, "Expected message, got NULL"); + TEST_ASSERT(rkm->err == RD_KAFKA_RESP_ERR_NO_ERROR, + "Expected success, got %s", rd_kafka_err2str(rkm->err)); + TEST_ASSERT(rkm->offset >= delete_before, + "Expected first message offset >= %" PRId64 + ", got %" PRId64, + delete_before, rkm->offset); + TEST_SAY("First message after delete_records: offset %" PRId64 "\n", + rkm->offset); + rd_kafka_message_destroy(rkm); + + rd_kafka_destroy(c); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + + int main_0009_mock_cluster(int argc, char **argv) { const char *topic = test_mk_topic_name("0009_mock_cluster", 1); rd_kafka_mock_cluster_t *mcluster; @@ -93,5 +266,8 @@ int main_0009_mock_cluster(int argc, char **argv) { test_mock_cluster_destroy(mcluster); + do_test_topic_delete(); + do_test_partition_delete_records(); + return 0; } diff --git a/tests/0155-share_group_heartbeat_mock.c b/tests/0155-share_group_heartbeat_mock.c index 616613d79..140b5be38 100644 --- a/tests/0155-share_group_heartbeat_mock.c +++ b/tests/0155-share_group_heartbeat_mock.c @@ -48,6 +48,28 @@ static rd_kafka_t *create_share_consumer(const char *bootstraps, return rk; } +/** + * @brief Create a share consumer connected to mock cluster using the + * public share consumer API. + */ +static rd_kafka_share_t *create_mock_share_consumer(const char *bootstraps, + const char *group_id) { + rd_kafka_conf_t *conf; + rd_kafka_share_t *rkshare; + char errstr[512]; + + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "group.id", group_id); + + rkshare = rd_kafka_share_consumer_new(conf, errstr, sizeof(errstr)); + TEST_ASSERT(rkshare != NULL, "Failed to create share consumer: %s", + errstr); + rd_kafka_share_poll_set_consumer(rkshare); + + return rkshare; +} + /** * @brief Test basic ShareGroupHeartbeat flow: * join, receive assignment, heartbeats, leave. @@ -806,6 +828,86 @@ static void do_test_share_group_no_spurious_fencing(void) { SUB_TEST_PASS(); } +/** + * @brief Test rd_kafka_mock_sharegroup_set_max_size(). + * + * Set max_size=1, join one consumer successfully, then verify that a + * second consumer fails to join (gets no assignment / fenced). + */ +static void do_test_share_group_max_size(void) { + rd_kafka_mock_cluster_t *mcluster; + const char *bootstraps; + rd_kafka_topic_partition_list_t *subscription; + rd_kafka_topic_partition_list_t *share_c1_assignment, + *share_c2_assignment; + rd_kafka_share_t *share_c1, *share_c2; + const char *topic = test_mk_topic_name(__FUNCTION__, 0); + const char *group = "test-share-group-max-size"; + + SUB_TEST_QUICK(); + + mcluster = test_mock_cluster_new(1, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 3, 1); + + /* Limit share group to 1 member */ + rd_kafka_mock_sharegroup_set_max_size(mcluster, 1); + + subscription = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(subscription, topic, + RD_KAFKA_PARTITION_UA); + + rd_kafka_mock_start_request_tracking(mcluster); + + /* C1 joins - should succeed and get all 3 partitions */ + share_c1 = create_mock_share_consumer(bootstraps, group); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c1, subscription)); + + wait_share_heartbeats(mcluster, 1, 500); + test_share_consume_msgs(share_c1, 1, 4, 500, NULL, 0); + + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &share_c1_assignment)); + TEST_ASSERT(share_c1_assignment->cnt == 3, + "Expected C1 to have 3 partitions, got %d", + share_c1_assignment->cnt); + rd_kafka_topic_partition_list_destroy(share_c1_assignment); + + /* C2 joins - should be rejected (GROUP_MAX_SIZE_REACHED) */ + share_c2 = create_mock_share_consumer(bootstraps, group); + TEST_CALL_ERR__(rd_kafka_share_subscribe(share_c2, subscription)); + rd_kafka_topic_partition_list_destroy(subscription); + + /* Give C2 time to attempt join and get rejected */ + test_share_consume_msgs(share_c2, 1, 6, 500, NULL, 0); + + /* C2 should have no assignment since it was rejected */ + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c2), &share_c2_assignment)); + TEST_ASSERT(share_c2_assignment->cnt == 0, + "Expected C2 to have 0 partitions (rejected), got %d", + share_c2_assignment->cnt); + rd_kafka_topic_partition_list_destroy(share_c2_assignment); + + /* C1 should still have its assignment */ + TEST_CALL_ERR__(rd_kafka_assignment( + test_share_consumer_get_rk(share_c1), &share_c1_assignment)); + TEST_ASSERT(share_c1_assignment->cnt == 3, + "Expected C1 to still have 3 partitions, got %d", + share_c1_assignment->cnt); + rd_kafka_topic_partition_list_destroy(share_c1_assignment); + + /* Cleanup */ + rd_kafka_share_consumer_close(share_c1); + rd_kafka_share_consumer_close(share_c2); + rd_kafka_share_destroy(share_c1); + rd_kafka_share_destroy(share_c2); + + rd_kafka_mock_stop_request_tracking(mcluster); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + /** * @brief UNKNOWN_MEMBER_ID error handling. * @@ -2527,6 +2629,7 @@ int main_0155_share_group_heartbeat_mock(int argc, char **argv) { do_test_share_group_session_timeout(); do_test_share_group_target_assignment(); do_test_share_group_no_spurious_fencing(); + do_test_share_group_max_size(); do_test_unknown_member_id_error();