Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -2173,6 +2173,34 @@ rd_kafka_mock_topic_create(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));
}

rd_kafka_resp_err_t
rd_kafka_mock_topic_delete(rd_kafka_mock_cluster_t *mcluster,
const char *topic) {
rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK);

rko->rko_u.mock.name = rd_strdup(topic);
rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_TOPIC_DELETE;

return rd_kafka_op_err_destroy(
rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));
}

rd_kafka_resp_err_t
rd_kafka_mock_partition_delete_records(rd_kafka_mock_cluster_t *mcluster,
const char *topic,
int32_t partition,
int64_t before_offset) {
rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK);

rko->rko_u.mock.name = rd_strdup(topic);
rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_PART_DELETE_RECORDS;
rko->rko_u.mock.partition = partition;
rko->rko_u.mock.lo = before_offset;

return rd_kafka_op_err_destroy(
rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));
}

rd_kafka_resp_err_t
rd_kafka_mock_partition_set_leader(rd_kafka_mock_cluster_t *mcluster,
const char *topic,
Expand Down Expand Up @@ -2514,6 +2542,50 @@ rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster,
return RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION;
break;

case RD_KAFKA_MOCK_CMD_TOPIC_DELETE:
mtopic =
rd_kafka_mock_topic_find(mcluster, rko->rko_u.mock.name);
if (!mtopic)
return RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;

rd_kafka_mock_topic_destroy(mtopic);
break;

case RD_KAFKA_MOCK_CMD_PART_DELETE_RECORDS: {
rd_kafka_mock_msgset_t *mset, *tmp;
int64_t before_offset = rko->rko_u.mock.lo;

mtopic =
rd_kafka_mock_topic_find(mcluster, rko->rko_u.mock.name);
mpart = rd_kafka_mock_partition_find(mtopic,
rko->rko_u.mock.partition);
if (!mpart)
return RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;

if (before_offset > mpart->end_offset)
return RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE;

/* Remove all msgsets fully before before_offset */
TAILQ_FOREACH_SAFE(mset, &mpart->msgsets, link, tmp) {
if (mset->last_offset >= before_offset)
break;
rd_kafka_mock_msgset_destroy(mpart, mset);
}

/* Advance start offset */
mpart->start_offset = before_offset;
if (mpart->update_follower_start_offset)
mpart->follower_start_offset = mpart->start_offset;

rd_kafka_dbg(
mcluster->rk, MOCK, "MOCK",
"Delete records %s [%" PRId32 "] before offset %" PRId64
" (log now %" PRId64 "..%" PRId64 ")",
rko->rko_u.mock.name, rko->rko_u.mock.partition,
before_offset, mpart->start_offset, mpart->end_offset);
break;
}

case RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR:
mtopic =
rd_kafka_mock_topic_get(mcluster, rko->rko_u.mock.name, -1);
Expand Down
56 changes: 56 additions & 0 deletions src/rdkafka_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,47 @@ rd_kafka_mock_topic_create(rd_kafka_mock_cluster_t *mcluster,
int replication_factor);


/**
* @brief Deletes a topic and all its partitions.
*
* Subsequent requests referencing the topic will recieve

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: receive

* UNKNOWN_TOPIC_OR_PARTITION.
*
* @param mcluster The mock cluster instance.
* @param topic The topic to delete.
*
* @return RD_KAFKA_RESP_ERR_NO_ERROR on success
* RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PARTITION if the topic does not
Comment on lines +261 to +262

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @return RD_KAFKA_RESP_ERR_NO_ERROR on success
* RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PARTITION if the topic does not
* @return RD_KAFKA_RESP_ERR_NO_ERROR on success
* RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART if the topic does not

* exist.
*/
RD_EXPORT rd_kafka_resp_err_t
rd_kafka_mock_topic_delete(rd_kafka_mock_cluster_t *mcluster,
const char *topic);


/**
* @brief Delete records before \p before_offset in \p topic [\p partition].
*
* Advances the partition's start offset (log start offset) to
* \p before_offset and removes all message sets fully before that offset.
*
* @param mcluster The mock cluster.
* @param topic Topic name.
* @param partition Partition id.
* @param before_offset Delete all records before this offset.
* The new start offset will be set to this value.
*
* @returns RD_KAFKA_RESP_ERR_NO_ERROR on success,
* RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART if partition doesn't exist,
* RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE if before_offset > end_offset.
*/
RD_EXPORT rd_kafka_resp_err_t
rd_kafka_mock_partition_delete_records(rd_kafka_mock_cluster_t *mcluster,
const char *topic,
int32_t partition,
int64_t before_offset);


/**
* @brief Sets the partition leader.
*
Expand Down Expand Up @@ -647,6 +688,21 @@ RD_EXPORT void rd_kafka_mock_sharegroup_set_record_lock_duration(
rd_kafka_mock_cluster_t *mcluster,
int lock_duration_ms);

/**
* @brief Set the maximum number of members allowed in a share group.
*
* New members attempting to join via ShareGroupHeartbeat when the group
* is at capacity will receive GROUP_MAX_SIZE_REACHED.
*
* Default is 0 (unlimited).
*
* @param mcluster Mock cluster instance.
* @param max_size Maximum members allowed. 0 = unlimited.
*/
RD_EXPORT void
rd_kafka_mock_sharegroup_set_max_size(rd_kafka_mock_cluster_t *mcluster,
int max_size);

/**
* @brief Set a manual target assignment for a sharegroup.
*
Expand Down
11 changes: 11 additions & 0 deletions src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -3094,6 +3094,17 @@ rd_kafka_mock_handle_ShareGroupHeartbeat(rd_kafka_mock_connection_t *mconn,

} else if (MemberEpoch == 0) {
/* JOIN: New member wants to join */

/* Check max group size before allowing join */
if (mshgrp->max_size > 0 &&
!rd_kafka_mock_sharegroup_member_find(mshgrp,
&MemberId) &&
mshgrp->member_cnt >= mshgrp->max_size) {
err = RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED;
mtx_unlock(&mcluster->lock);
goto build_response;
}

member = rd_kafka_mock_sharegroup_member_get(
mshgrp, &MemberId, MemberEpoch, mconn);

Expand Down
5 changes: 5 additions & 0 deletions src/rdkafka_mock_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ typedef struct rd_kafka_mock_sharegroup_s {
int record_lock_duration_ms; /**< Per-record lock duration in ms.
* 0 = use session_timeout_ms
* as fallback (default 0). */
int max_size; /**< Max members allowed.
* 0 = unlimited (default). */
} rd_kafka_mock_sharegroup_t;

/**
Expand Down Expand Up @@ -573,6 +575,9 @@ struct rd_kafka_mock_cluster_s {
/** Per-record lock duration in ms (KIP 932).
* 0 = use session_timeout_ms. */
int sharegroup_record_lock_duration_ms;
/** Max members allowed in share group (KIP 932).
* 0 = unlimited. */
int sharegroup_max_size;
} defaults;

/**< Dynamic array of IO handlers for corresponding fd in .fds */
Expand Down
15 changes: 15 additions & 0 deletions src/rdkafka_mock_sharegrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ void rd_kafka_mock_sharegrps_init(rd_kafka_mock_cluster_t *mcluster) {
mcluster->defaults.sharegroup_heartbeat_interval_ms = 5000;
mcluster->defaults.sharegroup_max_delivery_attempts = 5;
mcluster->defaults.sharegroup_record_lock_duration_ms = 0;
mcluster->defaults.sharegroup_max_size = 0;
}

/**
Expand Down Expand Up @@ -107,6 +108,7 @@ rd_kafka_mock_sharegroup_get(rd_kafka_mock_cluster_t *mcluster,
mcluster->defaults.sharegroup_max_delivery_attempts;
mshgrp->record_lock_duration_ms =
mcluster->defaults.sharegroup_record_lock_duration_ms;
mshgrp->max_size = mcluster->defaults.sharegroup_max_size;

rd_kafka_timer_start(&mcluster->timers, &mshgrp->session_tmr,
1000 * 1000 /* 1s */,
Expand Down Expand Up @@ -739,6 +741,19 @@ void rd_kafka_mock_sharegroup_set_record_lock_duration(
mtx_unlock(&mcluster->lock);
}

/**
* @brief Set the maximum number of members allowed in a share group.
*/
void rd_kafka_mock_sharegroup_set_max_size(rd_kafka_mock_cluster_t *mcluster,
int max_size) {
rd_kafka_mock_sharegroup_t *mshgrp;
mtx_lock(&mcluster->lock);
TAILQ_FOREACH(mshgrp, &mcluster->sharegrps, link)
mshgrp->max_size = max_size;
mcluster->defaults.sharegroup_max_size = max_size;
mtx_unlock(&mcluster->lock);
}

/**
* @brief Destroy share fetch session.
*/
Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,8 @@ struct rd_kafka_op_s {
enum {
RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR,
RD_KAFKA_MOCK_CMD_TOPIC_CREATE,
RD_KAFKA_MOCK_CMD_TOPIC_DELETE,
RD_KAFKA_MOCK_CMD_PART_DELETE_RECORDS,
RD_KAFKA_MOCK_CMD_PART_SET_LEADER,
RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER,
RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER_WMARKS,
Expand Down Expand Up @@ -637,6 +639,8 @@ struct rd_kafka_op_s {
* BROKER_SET_UPDOWN
* APIVERSION_SET (minver)
* BROKER_SET_RTT
* PART_DELETE_RECORDS
* (before_offset)
*/
int64_t hi; /**< High offset, for:
* TOPIC_CREATE (repl fact)
Expand Down
Loading