diff --git a/src/rdkafka_mock.h b/src/rdkafka_mock.h index 1be895705a..f42fb4b528 100644 --- a/src/rdkafka_mock.h +++ b/src/rdkafka_mock.h @@ -621,6 +621,32 @@ RD_EXPORT void rd_kafka_mock_sharegroup_set_heartbeat_interval( rd_kafka_mock_cluster_t *mcluster, int heartbeat_interval_ms); +/** + * @brief Set the maximum number of times a record can be acquired + * before it is automatically archived (dead-lettered). + * + * Default is 5. Set to 0 for unlimited deliveries. + * + * @param mcluster Mock cluster instance. + * @param max_attempts Maximum delivery attempts per record. + */ +RD_EXPORT void rd_kafka_mock_sharegroup_set_max_delivery_attempts( + rd_kafka_mock_cluster_t *mcluster, + int max_attempts); + +/** + * @brief Set the per-record lock duration in milliseconds. + * + * When a record is acquired, its lock expires after this duration. + * Default is 0, which falls back to the session timeout value. + * + * @param mcluster Mock cluster instance. + * @param lock_duration_ms Lock duration in milliseconds. + */ +RD_EXPORT void rd_kafka_mock_sharegroup_set_record_lock_duration( + rd_kafka_mock_cluster_t *mcluster, + int lock_duration_ms); + /** * @brief Set a manual target assignment for a sharegroup. * diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 00fa89273a..a87106cfa2 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -1461,7 +1461,8 @@ static int rd_kafka_mock_handle_Metadata(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_write_i32(resp, INT32_MIN); } - rd_kafka_buf_skip_tags(rkbuf); + if (rd_kafka_buf_read_remain(rkbuf) > 0) + rd_kafka_buf_skip_tags(rkbuf); rd_kafka_buf_write_tags_empty(resp); if (requested_topics) @@ -3220,6 +3221,802 @@ rd_kafka_mock_handle_ShareGroupHeartbeat(rd_kafka_mock_connection_t *mconn, return -1; } +/** + * @brief Handle ShareFetch (KIP-932) - request parsing only. + */ +static rd_bool_t +rd_kafka_mock_tplist_equal_by_id(rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b) { + int i; + + if (!a && !b) + return rd_true; + if (!a || !b) + return rd_false; + if (a->cnt != b->cnt) + return rd_false; + + rd_kafka_topic_partition_list_sort_by_topic_id(a); + rd_kafka_topic_partition_list_sort_by_topic_id(b); + + for (i = 0; i < a->cnt; i++) { + const rd_kafka_topic_partition_t *pa = &a->elems[i]; + const rd_kafka_topic_partition_t *pb = &b->elems[i]; + rd_kafka_Uuid_t ta = + rd_kafka_topic_partition_get_topic_id(pa); + rd_kafka_Uuid_t tb = + rd_kafka_topic_partition_get_topic_id(pb); + + if (pa->partition != pb->partition) + return rd_false; + if (rd_kafka_Uuid_cmp(ta, tb) != 0) + return rd_false; + } + + return rd_true; +} + +static rd_kafka_mock_sgrp_partmeta_t * +rd_kafka_mock_sgrp_partmeta_find(rd_kafka_mock_sharegroup_t *sgrp, + rd_kafka_Uuid_t topic_id, + int32_t partition) { + rd_kafka_mock_sgrp_partmeta_t *pmeta; + + TAILQ_FOREACH(pmeta, &sgrp->partitions, link) { + if (pmeta->partition != partition) + continue; + if (!rd_kafka_Uuid_cmp(pmeta->topic_id, topic_id)) + return pmeta; + } + + return NULL; +} + +static rd_kafka_mock_sgrp_partmeta_t * +rd_kafka_mock_sgrp_partmeta_get(rd_kafka_mock_sharegroup_t *sgrp, + rd_kafka_Uuid_t topic_id, + int32_t partition, + const rd_kafka_mock_partition_t *mpart) { + rd_kafka_mock_sgrp_partmeta_t *pmeta; + int64_t log_start; + int64_t log_end; + + pmeta = rd_kafka_mock_sgrp_partmeta_find(sgrp, topic_id, partition); + if (pmeta) { + log_start = mpart->start_offset; + log_end = mpart->end_offset; + if (log_start > pmeta->spso) + pmeta->spso = log_start; + if (log_end > log_start) { + int64_t new_speo = log_end - 1; + if (new_speo > pmeta->speo) + pmeta->speo = new_speo; + } + return pmeta; + } + + pmeta = rd_calloc(1, sizeof(*pmeta)); + pmeta->topic_id = topic_id; + pmeta->partition = partition; + pmeta->spso = mpart->start_offset; + if (mpart->end_offset > mpart->start_offset) + pmeta->speo = mpart->end_offset - 1; + else + pmeta->speo = mpart->start_offset - 1; + TAILQ_INIT(&pmeta->inflight); + + TAILQ_INSERT_TAIL(&sgrp->partitions, pmeta, link); + sgrp->partition_cnt++; + + return pmeta; +} + +static rd_kafka_mock_sgrp_record_state_t * +rd_kafka_mock_sgrp_record_state_find(rd_kafka_mock_sgrp_partmeta_t *pmeta, + int64_t offset) { + rd_kafka_mock_sgrp_record_state_t *state; + + TAILQ_FOREACH(state, &pmeta->inflight, link) { + if (state->offset == offset) + return state; + } + + return NULL; +} + +static rd_kafka_mock_sgrp_record_state_t * +rd_kafka_mock_sgrp_record_state_get(rd_kafka_mock_sgrp_partmeta_t *pmeta, + int64_t offset) { + rd_kafka_mock_sgrp_record_state_t *state; + + state = rd_kafka_mock_sgrp_record_state_find(pmeta, offset); + if (state) + return state; + + state = rd_calloc(1, sizeof(*state)); + state->offset = offset; + state->state = RD_KAFKA_MOCK_SGRP_RECORD_AVAILABLE; + TAILQ_INSERT_TAIL(&pmeta->inflight, state, link); + pmeta->inflight_cnt++; + + return state; +} + +static int32_t +rd_kafka_mock_msgset_est_record_size(const rd_kafka_mock_msgset_t *mset) { + int64_t record_cnt = mset->last_offset - mset->first_offset + 1; + int32_t size; + + if (record_cnt <= 0) + return 1; + + size = (int32_t)(RD_KAFKAP_BYTES_LEN(&mset->bytes) / record_cnt); + if (size <= 0) + size = 1; + + return size; +} + +static void +rd_kafka_mock_sgrp_acquire_available_offsets( + rd_kafka_mock_sgrp_partmeta_t *pmeta, + const rd_kafka_mock_partition_t *mpart, + const rd_kafkap_str_t *member_id, + rd_ts_t lock_expiry_ts, + int max_delivery_attempts, + int64_t *remaining_records, + int64_t *remaining_bytes, + int *acquired_cnt, + int64_t *acquired_bytes) { + int64_t offset; + + for (offset = pmeta->spso; offset <= pmeta->speo; offset++) { + const rd_kafka_mock_msgset_t *mset; + rd_kafka_mock_sgrp_record_state_t *state; + int32_t est_size; + + if (remaining_records && *remaining_records == 0) + break; + if (remaining_bytes && *remaining_bytes == 0) + break; + + state = rd_kafka_mock_sgrp_record_state_find(pmeta, offset); + if (state && + state->state != RD_KAFKA_MOCK_SGRP_RECORD_AVAILABLE) + continue; + + /* Check max delivery attempts: if the record has already + * been acquired (and released/expired) too many times, + * archive it instead of re-acquiring. */ + if (max_delivery_attempts > 0 && state && + state->delivery_count >= max_delivery_attempts) { + state->state = RD_KAFKA_MOCK_SGRP_RECORD_ARCHIVED; + RD_IF_FREE(state->owner_member_id, rd_free); + state->owner_member_id = NULL; + state->lock_expiry_ts = 0; + continue; + } + + mset = rd_kafka_mock_msgset_find(mpart, offset, rd_false); + if (!mset) + continue; + + est_size = rd_kafka_mock_msgset_est_record_size(mset); + if (remaining_bytes && *remaining_bytes > 0 && + est_size > *remaining_bytes) + break; + + state = rd_kafka_mock_sgrp_record_state_get(pmeta, offset); + state->state = RD_KAFKA_MOCK_SGRP_RECORD_ACQUIRED; + state->lock_expiry_ts = lock_expiry_ts; + state->delivery_count++; + RD_IF_FREE(state->owner_member_id, rd_free); + state->owner_member_id = RD_KAFKAP_STR_DUP(member_id); + + (*acquired_cnt)++; + *acquired_bytes += est_size; + if (remaining_records && *remaining_records > 0) + (*remaining_records)--; + if (remaining_bytes && *remaining_bytes > 0) + (*remaining_bytes) -= est_size; + } +} + +static void +rd_kafka_mock_sgrp_partmeta_prune_archived(rd_kafka_mock_sgrp_partmeta_t *pmeta) { + rd_kafka_mock_sgrp_record_state_t *state, *tmp; + + TAILQ_FOREACH_SAFE(state, &pmeta->inflight, link, tmp) { + if (state->state != RD_KAFKA_MOCK_SGRP_RECORD_ARCHIVED) + continue; + if (state->offset >= pmeta->spso) + continue; + + TAILQ_REMOVE(&pmeta->inflight, state, link); + pmeta->inflight_cnt--; + rd_free(state->owner_member_id); + rd_free(state); + } +} + +/** + * @brief Write all acquired record batches for the given partition and member + * into the response buffer as a single Records field (compact bytes + * containing concatenated RecordBatches). + * + * @returns the total number of record data bytes written (0 if none). + */ +static size_t +rd_kafka_mock_sgrp_write_acquired_records( + rd_kafka_buf_t *resp, + rd_kafka_mock_sgrp_partmeta_t *pmeta, + const rd_kafka_mock_partition_t *mpart, + const rd_kafkap_str_t *member_id, + rd_ts_t now) { + rd_list_t msgsets; + int64_t offset; + size_t total_len = 0; + int i; + + rd_list_init(&msgsets, 16, NULL); /* no free_cb: borrowed ptrs */ + + /* Collect unique msgsets containing acquired records for this member */ + for (offset = pmeta->spso; offset <= pmeta->speo; offset++) { + rd_kafka_mock_sgrp_record_state_t *state = + rd_kafka_mock_sgrp_record_state_find(pmeta, offset); + const rd_kafka_mock_msgset_t *mset; + + if (!state || + state->state != RD_KAFKA_MOCK_SGRP_RECORD_ACQUIRED) + continue; + + if (state->lock_expiry_ts && state->lock_expiry_ts <= now) + continue; + + if (rd_kafkap_str_cmp_str(member_id, state->owner_member_id)) + continue; + + mset = rd_kafka_mock_msgset_find(mpart, offset, rd_false); + if (!mset) + continue; + + /* Deduplicate: multiple offsets may fall in the same batch */ + if (rd_list_find(&msgsets, mset, rd_list_cmp_ptr)) + continue; + + rd_list_add(&msgsets, (void *)mset); + total_len += RD_KAFKAP_BYTES_LEN(&mset->bytes); + } + + if (rd_list_cnt(&msgsets) == 0) { + /* No acquired records: write NULL compact bytes */ + rd_list_destroy(&msgsets); + rd_kafka_buf_write_kbytes(resp, NULL); + return 0; + } + + /* Write compact bytes length prefix (N+1 encoding) */ + rd_kafka_buf_write_uvarint(resp, (uint64_t)(total_len + 1)); + + /* Write each msgset's raw bytes back-to-back */ + for (i = 0; i < rd_list_cnt(&msgsets); i++) { + const rd_kafka_mock_msgset_t *mset = + rd_list_elem(&msgsets, i); + rd_kafka_buf_write(resp, mset->bytes.data, + RD_KAFKAP_BYTES_LEN(&mset->bytes)); + } + + rd_list_destroy(&msgsets); + return total_len; +} + +static int +rd_kafka_mock_handle_ShareFetch(rd_kafka_mock_connection_t *mconn, + rd_kafka_buf_t *rkbuf) { + const rd_bool_t log_decode_errors = rd_true; + rd_kafka_mock_cluster_t *mcluster = mconn->broker->cluster; + rd_kafka_buf_t *resp = rd_kafka_mock_buf_new_response(rkbuf); + rd_kafkap_str_t GroupId, MemberId; + int32_t SessionEpoch = -1, MaxWaitMs = 0, MinBytes = 0, + MaxBytes = 0, MaxRecords = 0, BatchSize = 0; + int32_t TopicsCnt; + rd_kafka_topic_partition_list_t *requested_partitions = NULL; + rd_kafka_topic_partition_list_t *forgotten_partitions = NULL; + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_kafka_mock_sharegroup_t *sgrp = NULL; + rd_kafka_mock_sgrp_fetch_session_t *session = NULL; + + (void)log_decode_errors; + + rd_kafka_buf_read_str(rkbuf, &GroupId); + rd_kafka_buf_read_str(rkbuf, &MemberId); + /* KIP-932: ShareFetch has ShareSessionEpoch only, no SessionId. + * Sessions are keyed by (GroupId, MemberId). */ + rd_kafka_buf_read_i32(rkbuf, &SessionEpoch); + rd_kafka_buf_read_i32(rkbuf, &MaxWaitMs); + rd_kafka_buf_read_i32(rkbuf, &MinBytes); + rd_kafka_buf_read_i32(rkbuf, &MaxBytes); + rd_kafka_buf_read_i32(rkbuf, &MaxRecords); + rd_kafka_buf_read_i32(rkbuf, &BatchSize); + + requested_partitions = rd_kafka_topic_partition_list_new(0); + + rd_kafka_buf_read_arraycnt(rkbuf, &TopicsCnt, RD_KAFKAP_TOPICS_MAX); + while (TopicsCnt-- > 0) { + rd_kafka_Uuid_t TopicId = RD_KAFKA_UUID_ZERO; + int32_t PartitionCnt; + + rd_kafka_buf_read_uuid(rkbuf, &TopicId); + rd_kafka_buf_read_arraycnt(rkbuf, &PartitionCnt, + RD_KAFKAP_PARTITIONS_MAX); + + while (PartitionCnt-- > 0) { + int32_t Partition; + int32_t AckBatchCnt; + rd_kafka_topic_partition_t *rktpar; + + rd_kafka_buf_read_i32(rkbuf, &Partition); + rd_kafka_buf_read_arraycnt(rkbuf, &AckBatchCnt, -1); + while (AckBatchCnt-- > 0) { + int32_t AckTypeCnt; + int8_t AckType; + rd_kafka_buf_read_i64(rkbuf, NULL); /* FirstOffset */ + rd_kafka_buf_read_i64(rkbuf, NULL); /* LastOffset */ + rd_kafka_buf_read_arraycnt(rkbuf, &AckTypeCnt, + -1); + while (AckTypeCnt-- > 0) { + rd_kafka_buf_read_i8(rkbuf, &AckType); + (void)AckType; + } + rd_kafka_buf_skip_tags(rkbuf); + } + + rktpar = rd_kafka_topic_partition_list_add( + requested_partitions, "", Partition); + rd_kafka_topic_partition_set_topic_id(rktpar, TopicId); + + rd_kafka_buf_skip_tags(rkbuf); + } + + rd_kafka_buf_skip_tags(rkbuf); + } + + /* ForgottenTopicsData */ + { + int32_t ForgottenTopicsCnt; + rd_kafka_buf_read_arraycnt(rkbuf, &ForgottenTopicsCnt, + RD_KAFKAP_TOPICS_MAX); + if (ForgottenTopicsCnt > 0) + forgotten_partitions = + rd_kafka_topic_partition_list_new( + ForgottenTopicsCnt); + while (ForgottenTopicsCnt-- > 0) { + rd_kafka_Uuid_t ForgTopicId = RD_KAFKA_UUID_ZERO; + int32_t ForgPartCnt; + rd_kafka_buf_read_uuid(rkbuf, &ForgTopicId); + rd_kafka_buf_read_arraycnt(rkbuf, &ForgPartCnt, + RD_KAFKAP_PARTITIONS_MAX); + while (ForgPartCnt-- > 0) { + int32_t ForgPartition; + rd_kafka_topic_partition_t *ftp; + rd_kafka_buf_read_i32(rkbuf, &ForgPartition); + + /* Record in forgotten list for session + * removal and inflight release. */ + ftp = rd_kafka_topic_partition_list_add( + forgotten_partitions, "", ForgPartition); + rd_kafka_topic_partition_set_topic_id( + ftp, ForgTopicId); + + /* Remove from requested_partitions so they + * are not fetched in this request. */ + if (requested_partitions) { + int idx = + rd_kafka_topic_partition_list_find_idx_by_id( + requested_partitions, + ForgTopicId, ForgPartition); + if (idx >= 0) + rd_kafka_topic_partition_list_del_by_idx( + requested_partitions, idx); + } + } + /* ForgottenTopic tags */ + rd_kafka_buf_skip_tags(rkbuf); + } + } + + /* Top-level tags */ + rd_kafka_buf_skip_tags(rkbuf); + + rd_kafka_dbg(mconn->broker->cluster->rk, MOCK, "MOCK", + "ShareFetch parsed: group %.*s member %.*s " + "session_epoch %" PRId32 + " max_wait %" PRId32 " min_bytes %" PRId32 + " max_bytes %" PRId32 " max_records %" PRId32 + " batch_size %" PRId32, + RD_KAFKAP_STR_PR(&GroupId), RD_KAFKAP_STR_PR(&MemberId), + SessionEpoch, MaxWaitMs, MinBytes, MaxBytes, + MaxRecords, BatchSize); + + err = rd_kafka_mock_next_request_error(mconn, resp); + + if (!err) { + int64_t remaining_records = + MaxRecords > 0 ? (int64_t)MaxRecords : -1; + int64_t remaining_bytes = + MaxBytes > 0 ? (int64_t)MaxBytes : -1; + int acquired_cnt = 0; + int64_t acquired_bytes = 0; + rd_ts_t now = rd_clock(); + + /* KIP-932 session management. + * Sessions are keyed by (GroupId, MemberId). + * SessionEpoch: 0 = open new session, + * -1 = close session, + * >0 = continue (must match expected epoch). */ + mtx_lock(&mcluster->lock); + sgrp = rd_kafka_mock_sharegroup_get(mcluster, &GroupId); + + /* Member validation: verify the MemberId is a registered + * member of this share group (joined via SGHB). */ + if (!rd_kafka_mock_sharegroup_member_find(sgrp, &MemberId)) { + err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID; + rd_kafka_dbg( + mconn->broker->cluster->rk, MOCK, "MOCK", + "ShareFetch: unknown member %.*s in group %.*s", + RD_KAFKAP_STR_PR(&MemberId), + RD_KAFKAP_STR_PR(&GroupId)); + } + + /* Look up existing session by MemberId */ + if (!err) { + TAILQ_FOREACH(session, &sgrp->fetch_sessions, link) { + if (!rd_kafkap_str_cmp_str(&MemberId, + session->member_id)) + break; + } + } + + if (!err && SessionEpoch == 0) { + /* Open a new session (or reuse if one already exists + * for this member). */ + if (!session) { + session = rd_calloc(1, sizeof(*session)); + session->member_id = + RD_KAFKAP_STR_DUP(&MemberId); + session->session_epoch = 0; + session->partitions = + rd_kafka_topic_partition_list_copy( + requested_partitions); + TAILQ_INSERT_TAIL(&sgrp->fetch_sessions, + session, link); + sgrp->fetch_session_cnt++; + } else { + /* Session already exists for this member; + * reset it. */ + session->session_epoch = 0; + RD_IF_FREE( + session->partitions, + rd_kafka_topic_partition_list_destroy); + session->partitions = + rd_kafka_topic_partition_list_copy( + requested_partitions); + } + } else if (!err && SessionEpoch == -1) { + /* Close the session and release all locks held + * by this member. */ + if (session) { + rd_kafka_mock_sgrp_release_member_locks( + sgrp, session->member_id); + TAILQ_REMOVE(&sgrp->fetch_sessions, session, + link); + sgrp->fetch_session_cnt--; + rd_kafka_mock_sgrp_fetch_session_destroy( + session); + session = NULL; + } + /* Closing a non-existent session is not an error; + * proceed with an empty response. */ + } else if (!err) { + /* Continue existing session: validate epoch. */ + if (!session) { + err = + RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH; + } else if (SessionEpoch != + session->session_epoch) { + err = + RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH; + } else { + /* Update partition list if changed */ + if (!rd_kafka_mock_tplist_equal_by_id( + requested_partitions, + session->partitions)) { + RD_IF_FREE( + session->partitions, + rd_kafka_topic_partition_list_destroy); + session->partitions = + rd_kafka_topic_partition_list_copy( + requested_partitions); + } + } + } + + /* For all successful, non-close requests: update activity + * timestamp and increment epoch for next request. */ + if (!err && session) { + session->ts_last_activity = rd_clock(); + session->session_epoch++; + } + + /* Remove forgotten partitions from session and release + * any in-flight ACQUIRED records owned by this member. */ + if (!err && session && forgotten_partitions && + forgotten_partitions->cnt > 0) { + rd_kafka_topic_partition_t *ftp; + RD_KAFKA_TPLIST_FOREACH(ftp, forgotten_partitions) { + rd_kafka_Uuid_t ftid = + rd_kafka_topic_partition_get_topic_id(ftp); + rd_kafka_mock_sgrp_partmeta_t *pmeta; + + /* Remove from session partition list */ + if (session->partitions) { + int idx = + rd_kafka_topic_partition_list_find_idx_by_id( + session->partitions, ftid, + ftp->partition); + if (idx >= 0) + rd_kafka_topic_partition_list_del_by_idx( + session->partitions, idx); + } + + /* Release ACQUIRED records for this + * member on this partition. */ + pmeta = rd_kafka_mock_sgrp_partmeta_find( + sgrp, ftid, ftp->partition); + if (pmeta) { + rd_kafka_mock_sgrp_record_state_t + *state, *tmp; + TAILQ_FOREACH_SAFE( + state, &pmeta->inflight, link, + tmp) { + if (state->state != + RD_KAFKA_MOCK_SGRP_RECORD_ACQUIRED) + continue; + if (!state->owner_member_id) + continue; + if (rd_kafkap_str_cmp_str( + &MemberId, + state + ->owner_member_id)) + continue; + /* Release: mark AVAILABLE */ + state->state = + RD_KAFKA_MOCK_SGRP_RECORD_AVAILABLE; + rd_free( + state->owner_member_id); + state->owner_member_id = NULL; + state->lock_expiry_ts = 0; + } + } + } + } + + if (!err && sgrp) { + rd_kafka_topic_partition_t *rktpar; + RD_KAFKA_TPLIST_FOREACH(rktpar, requested_partitions) { + rd_kafka_Uuid_t topic_id = + rd_kafka_topic_partition_get_topic_id( + rktpar); + rd_kafka_mock_topic_t *mtopic = + rd_kafka_mock_topic_find_by_id(mcluster, + topic_id); + rd_kafka_mock_partition_t *mpart; + + if (!mtopic) { + err = + RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; + break; + } + + mpart = rd_kafka_mock_partition_find( + mtopic, rktpar->partition); + if (!mpart) { + err = + RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; + break; + } + + rd_kafka_mock_sgrp_partmeta_t *pmeta = + rd_kafka_mock_sgrp_partmeta_get( + sgrp, topic_id, rktpar->partition, + mpart); + rd_kafka_mock_sgrp_partmeta_prune_archived( + pmeta); + rd_kafka_mock_sgrp_acquire_available_offsets( + pmeta, mpart, + &MemberId, + now + + ((sgrp->record_lock_duration_ms > 0 + ? sgrp->record_lock_duration_ms + : sgrp->session_timeout_ms) * + 1000), + sgrp->max_delivery_attempts, + MaxRecords > 0 ? &remaining_records : NULL, + MaxBytes > 0 ? &remaining_bytes : NULL, + &acquired_cnt, &acquired_bytes); + } + } + + rd_kafka_dbg(mconn->broker->cluster->rk, MOCK, "MOCK", + "ShareFetch acquired: %d records, %" PRId64 + " bytes", + acquired_cnt, acquired_bytes); + + /* Response: ThrottleTimeMs */ + rd_kafka_buf_write_i32(resp, 0); + /* Response: ErrorCode */ + rd_kafka_buf_write_i16(resp, err); + /* Response: ErrorMessage */ + if (err) + rd_kafka_buf_write_str(resp, rd_kafka_err2str(err), -1); + else + rd_kafka_buf_write_str(resp, NULL, -1); + /* Response: AcquisitionLockTimeoutMs */ + rd_kafka_buf_write_i32(resp, sgrp ? sgrp->session_timeout_ms : 0); + + rd_kafka_topic_partition_list_sort_by_topic_id( + requested_partitions); + + { + int i = 0; + int topic_cnt = 0; + rd_kafka_Uuid_t current_topic = RD_KAFKA_UUID_ZERO; + + for (i = 0; i < requested_partitions->cnt; i++) { + rd_kafka_topic_partition_t *rktpar = + &requested_partitions->elems[i]; + rd_kafka_Uuid_t topic_id = + rd_kafka_topic_partition_get_topic_id( + rktpar); + if (i == 0 || + rd_kafka_Uuid_cmp(topic_id, + current_topic) != 0) { + topic_cnt++; + current_topic = topic_id; + } + } + + /* Response: #Topics */ + rd_kafka_buf_write_arraycnt(resp, topic_cnt); + + i = 0; + while (i < requested_partitions->cnt) { + int j; + rd_kafka_Uuid_t topic_id = + rd_kafka_topic_partition_get_topic_id( + &requested_partitions->elems[i]); + int part_cnt = 0; + + for (j = i; j < requested_partitions->cnt; + j++) { + rd_kafka_Uuid_t next_topic_id = + rd_kafka_topic_partition_get_topic_id( + &requested_partitions->elems[j]); + if (rd_kafka_Uuid_cmp(topic_id, + next_topic_id) != + 0) + break; + part_cnt++; + } + + /* Response: TopicId */ + rd_kafka_buf_write_uuid(resp, &topic_id); + /* Response: #Partitions */ + rd_kafka_buf_write_arraycnt(resp, part_cnt); + + for (j = i; j < i + part_cnt; j++) { + rd_kafka_topic_partition_t *rktpar = + &requested_partitions->elems[j]; + rd_kafka_mock_topic_t *mtopic = + rd_kafka_mock_topic_find_by_id( + mcluster, topic_id); + rd_kafka_mock_partition_t *mpart = + mtopic ? rd_kafka_mock_partition_find( + mtopic, + rktpar->partition) + : NULL; + rd_kafka_mock_sgrp_partmeta_t *pmeta = + mpart + ? rd_kafka_mock_sgrp_partmeta_find( + sgrp, topic_id, + rktpar->partition) + : NULL; + rd_kafka_resp_err_t part_err = + mpart ? RD_KAFKA_RESP_ERR_NO_ERROR + : RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; + + /* Response: Partition */ + rd_kafka_buf_write_i32( + resp, rktpar->partition); + /* Response: PartitionFetchErrorCode */ + rd_kafka_buf_write_i16(resp, part_err); + /* Response: PartitionFetchErrorString */ + if (part_err) + rd_kafka_buf_write_str( + resp, + rd_kafka_err2str(part_err), + -1); + else + rd_kafka_buf_write_str( + resp, NULL, -1); + /* Response: AcknowledgementErrorCode */ + rd_kafka_buf_write_i16(resp, 0); + /* Response: AcknowledgementErrorString */ + rd_kafka_buf_write_str(resp, NULL, -1); + /* Response: CurrentLeader */ + rd_kafka_buf_write_i32(resp, -1); + rd_kafka_buf_write_i32(resp, -1); + rd_kafka_buf_write_tags_empty(resp); + /* Response: Records (all acquired + * batches concatenated) */ + if (mpart && pmeta) + rd_kafka_mock_sgrp_write_acquired_records( + resp, pmeta, mpart, + &MemberId, now); + else + rd_kafka_buf_write_kbytes( + resp, NULL); + /* Response: AcquiredRecords */ + rd_kafka_buf_write_arraycnt(resp, 0); + /* Response: Partition tags */ + rd_kafka_buf_write_tags_empty(resp); + } + + /* Response: Topic tags */ + rd_kafka_buf_write_tags_empty(resp); + + i += part_cnt; + } + } + + /* Response: NodeEndpoints */ + rd_kafka_buf_write_arraycnt(resp, 0); + /* Response: Top-level tags */ + rd_kafka_buf_write_tags_empty(resp); + + mtx_unlock(&mcluster->lock); + + rd_kafka_mock_connection_send_response0(mconn, resp, rd_true); + + rd_kafka_topic_partition_list_destroy(requested_partitions); + RD_IF_FREE(forgotten_partitions, + rd_kafka_topic_partition_list_destroy); + return 0; + } + + /* Error response */ + rd_kafka_buf_write_i32(resp, 0); + rd_kafka_buf_write_i16(resp, err); + rd_kafka_buf_write_str(resp, rd_kafka_err2str(err), -1); + rd_kafka_buf_write_i32(resp, 0); + rd_kafka_buf_write_arraycnt(resp, 0); + rd_kafka_buf_write_arraycnt(resp, 0); + rd_kafka_buf_write_tags_empty(resp); + + rd_kafka_mock_connection_send_response0(mconn, resp, rd_true); + rd_kafka_topic_partition_list_destroy(requested_partitions); + RD_IF_FREE(forgotten_partitions, + rd_kafka_topic_partition_list_destroy); + return 0; + +err_parse: + RD_IF_FREE(requested_partitions, rd_kafka_topic_partition_list_destroy); + RD_IF_FREE(forgotten_partitions, + rd_kafka_topic_partition_list_destroy); + rd_kafka_buf_destroy(resp); + return -1; +} + /** * @brief Default request handlers */ @@ -3258,6 +4055,7 @@ const struct rd_kafka_mock_api_handler {0, 0, 0, rd_kafka_mock_handle_GetTelemetrySubscriptions}, [RD_KAFKAP_PushTelemetry] = {0, 0, 0, rd_kafka_mock_handle_PushTelemetry}, + [RD_KAFKAP_ShareFetch] = {1, 1, 1, rd_kafka_mock_handle_ShareFetch}, }; diff --git a/src/rdkafka_mock_int.h b/src/rdkafka_mock_int.h index aa84c5f2c5..2bf8c57c3b 100644 --- a/src/rdkafka_mock_int.h +++ b/src/rdkafka_mock_int.h @@ -179,6 +179,49 @@ typedef struct rd_kafka_mock_cgrp_consumer_member_s { rd_kafka_mock_cgrp_consumer_t *mcgrp; /**< Consumer group */ } rd_kafka_mock_cgrp_consumer_member_t; +/** + * @brief Share record state. + */ +enum rd_kafka_mock_sgrp_record_state_e { + RD_KAFKA_MOCK_SGRP_RECORD_AVAILABLE = 0, + RD_KAFKA_MOCK_SGRP_RECORD_ACQUIRED = 1, + RD_KAFKA_MOCK_SGRP_RECORD_ARCHIVED = 2 +}; + +typedef struct rd_kafka_mock_sgrp_record_state_s { + TAILQ_ENTRY(rd_kafka_mock_sgrp_record_state_s) link; + int64_t offset; + char *owner_member_id; + rd_ts_t lock_expiry_ts; + int32_t delivery_count; + enum rd_kafka_mock_sgrp_record_state_e state; +} rd_kafka_mock_sgrp_record_state_t; + +/** + * @brief Share partition metadata. + */ +typedef struct rd_kafka_mock_sgrp_partmeta_s { + TAILQ_ENTRY(rd_kafka_mock_sgrp_partmeta_s) link; + rd_kafka_Uuid_t topic_id; + int32_t partition; + int64_t spso; + int64_t speo; + TAILQ_HEAD(, rd_kafka_mock_sgrp_record_state_s) inflight; + int inflight_cnt; +} rd_kafka_mock_sgrp_partmeta_t; + +/** + * @brief Share fetch session. + */ +typedef struct rd_kafka_mock_sgrp_fetch_session_s { + TAILQ_ENTRY(rd_kafka_mock_sgrp_fetch_session_s) link; + char *member_id; + int32_t session_id; + int32_t session_epoch; + rd_ts_t ts_last_activity; + rd_kafka_topic_partition_list_t *partitions; +} rd_kafka_mock_sgrp_fetch_session_t; + /** * @struct Share group (KIP-932). */ @@ -194,6 +237,23 @@ typedef struct rd_kafka_mock_sharegroup_s { members; /**< Share group members */ int member_cnt; /**< Number of share group members */ rd_bool_t manual_assignment; /**< Use manual assignment */ + + /* ShareFetch state (KIP-932) */ + TAILQ_HEAD(, rd_kafka_mock_sgrp_partmeta_s) + partitions; /**< Share partition metadata */ + int partition_cnt; /**< Number of partitions */ + TAILQ_HEAD(, rd_kafka_mock_sgrp_fetch_session_s) + fetch_sessions; /**< Active fetch sessions */ + int fetch_session_cnt; /**< Number of fetch sessions */ + rd_kafka_timer_t fetch_session_tmr; /**< Fetch session expiry timer */ + + /* Per-record limits */ + int max_delivery_attempts; /**< Max times a record can be + * acquired before being archived. + * 0 = unlimited (default 5). */ + int record_lock_duration_ms; /**< Per-record lock duration in ms. + * 0 = use session_timeout_ms + * as fallback (default 0). */ } rd_kafka_mock_sharegroup_t; /** @@ -213,6 +273,7 @@ typedef struct rd_kafka_mock_sharegroup_member_s { rd_kafka_mock_sharegroup_t *mshgrp; /**< Share group */ } rd_kafka_mock_sharegroup_member_t; + /** * @struct TransactionalId + PID (+ optional sequence state) */ @@ -506,6 +567,12 @@ struct rd_kafka_mock_cluster_s { int sharegroup_session_timeout_ms; /** Heartbeat interval (KIP 932) */ int sharegroup_heartbeat_interval_ms; + /** Max delivery attempts per record (KIP 932). + * 0 = unlimited. */ + int sharegroup_max_delivery_attempts; + /** Per-record lock duration in ms (KIP 932). + * 0 = use session_timeout_ms. */ + int sharegroup_record_lock_duration_ms; } defaults; /**< Dynamic array of IO handlers for corresponding fd in .fds */ @@ -580,6 +647,14 @@ rd_kafka_mock_topic_t * rd_kafka_mock_topic_find_by_id(const rd_kafka_mock_cluster_t *mcluster, rd_kafka_Uuid_t id); +void rd_kafka_mock_sgrp_fetch_session_destroy( + rd_kafka_mock_sgrp_fetch_session_t *session); +void rd_kafka_mock_sgrp_release_member_locks( + rd_kafka_mock_sharegroup_t *mshgrp, + const char *member_id); +void rd_kafka_mock_sgrp_fetch_session_tmr_cb(rd_kafka_timers_t *rkts, + void *arg); + rd_kafka_mock_broker_t * rd_kafka_mock_cluster_get_coord(rd_kafka_mock_cluster_t *mcluster, rd_kafka_coordtype_t KeyType, diff --git a/src/rdkafka_mock_sharegrp.c b/src/rdkafka_mock_sharegrp.c index 15718a9fcf..064b0bb402 100644 --- a/src/rdkafka_mock_sharegrp.c +++ b/src/rdkafka_mock_sharegrp.c @@ -54,8 +54,10 @@ static void rd_kafka_mock_sharegroup_session_tmr_cb(rd_kafka_timers_t *rkts, */ void rd_kafka_mock_sharegrps_init(rd_kafka_mock_cluster_t *mcluster) { TAILQ_INIT(&mcluster->sharegrps); - mcluster->defaults.sharegroup_session_timeout_ms = 45000; - mcluster->defaults.sharegroup_heartbeat_interval_ms = 5000; + mcluster->defaults.sharegroup_session_timeout_ms = 45000; + mcluster->defaults.sharegroup_heartbeat_interval_ms = 5000; + mcluster->defaults.sharegroup_max_delivery_attempts = 5; + mcluster->defaults.sharegroup_record_lock_duration_ms = 0; } /** @@ -94,10 +96,27 @@ rd_kafka_mock_sharegroup_get(rd_kafka_mock_cluster_t *mcluster, TAILQ_INIT(&mshgrp->members); mshgrp->member_cnt = 0; + /* ShareFetch state */ + TAILQ_INIT(&mshgrp->partitions); + TAILQ_INIT(&mshgrp->fetch_sessions); + mshgrp->partition_cnt = 0; + mshgrp->fetch_session_cnt = 0; + + /* Per-record limits */ + mshgrp->max_delivery_attempts = + mcluster->defaults.sharegroup_max_delivery_attempts; + mshgrp->record_lock_duration_ms = + mcluster->defaults.sharegroup_record_lock_duration_ms; + rd_kafka_timer_start(&mcluster->timers, &mshgrp->session_tmr, 1000 * 1000 /* 1s */, rd_kafka_mock_sharegroup_session_tmr_cb, mshgrp); + /* Fetch session expiry timer */ + rd_kafka_timer_start(&mcluster->timers, &mshgrp->fetch_session_tmr, + 1000 * 1000 /* 1s */, + rd_kafka_mock_sgrp_fetch_session_tmr_cb, mshgrp); + TAILQ_INSERT_TAIL(&mcluster->sharegrps, mshgrp, link); return mshgrp; @@ -108,15 +127,38 @@ rd_kafka_mock_sharegroup_get(rd_kafka_mock_cluster_t *mcluster, */ void rd_kafka_mock_sharegroup_destroy(rd_kafka_mock_sharegroup_t *mshgrp) { rd_kafka_mock_sharegroup_member_t *member; + rd_kafka_mock_sgrp_partmeta_t *pmeta; + rd_kafka_mock_sgrp_fetch_session_t *session; TAILQ_REMOVE(&mshgrp->cluster->sharegrps, mshgrp, link); rd_kafka_timer_stop(&mshgrp->cluster->timers, &mshgrp->session_tmr, RD_DO_LOCK); + rd_kafka_timer_stop(&mshgrp->cluster->timers, + &mshgrp->fetch_session_tmr, RD_DO_LOCK); /* Destroy all members */ while ((member = TAILQ_FIRST(&mshgrp->members))) rd_kafka_mock_sharegroup_member_destroy(mshgrp, member); + /* Destroy ShareFetch partition metadata */ + while ((pmeta = TAILQ_FIRST(&mshgrp->partitions))) { + rd_kafka_mock_sgrp_record_state_t *state; + TAILQ_REMOVE(&mshgrp->partitions, pmeta, link); + while ((state = TAILQ_FIRST(&pmeta->inflight))) { + TAILQ_REMOVE(&pmeta->inflight, state, link); + rd_free(state->owner_member_id); + rd_free(state); + } + rd_free(pmeta); + } + + /* Destroy ShareFetch sessions */ + while ((session = TAILQ_FIRST(&mshgrp->fetch_sessions))) { + TAILQ_REMOVE(&mshgrp->fetch_sessions, session, link); + mshgrp->fetch_session_cnt--; + rd_kafka_mock_sgrp_fetch_session_destroy(session); + } + rd_free(mshgrp->id); rd_free(mshgrp); } @@ -668,6 +710,152 @@ void rd_kafka_mock_sharegroup_set_heartbeat_interval( mtx_unlock(&mcluster->lock); } +/** + * @brief Set the maximum delivery attempts per record for the sharegroup. + */ +void rd_kafka_mock_sharegroup_set_max_delivery_attempts( + rd_kafka_mock_cluster_t *mcluster, + int max_attempts) { + rd_kafka_mock_sharegroup_t *mshgrp; + mtx_lock(&mcluster->lock); + TAILQ_FOREACH(mshgrp, &mcluster->sharegrps, link) + mshgrp->max_delivery_attempts = max_attempts; + mcluster->defaults.sharegroup_max_delivery_attempts = max_attempts; + mtx_unlock(&mcluster->lock); +} + +/** + * @brief Set the per-record lock duration in milliseconds for the sharegroup. + */ +void rd_kafka_mock_sharegroup_set_record_lock_duration( + rd_kafka_mock_cluster_t *mcluster, + int lock_duration_ms) { + rd_kafka_mock_sharegroup_t *mshgrp; + mtx_lock(&mcluster->lock); + TAILQ_FOREACH(mshgrp, &mcluster->sharegrps, link) + mshgrp->record_lock_duration_ms = lock_duration_ms; + mcluster->defaults.sharegroup_record_lock_duration_ms = + lock_duration_ms; + mtx_unlock(&mcluster->lock); +} + +/** + * @brief Destroy share fetch session. + */ +void rd_kafka_mock_sgrp_fetch_session_destroy( + rd_kafka_mock_sgrp_fetch_session_t *session) { + rd_free(session->member_id); + RD_IF_FREE(session->partitions, rd_kafka_topic_partition_list_destroy); + rd_free(session); +} + +/** + * @brief Release all ACQUIRED records owned by \p member_id across all + * share-partition metadata in the share group. + * + * This is called when a session is closed (epoch=-1), when a session + * times out, or as part of periodic lock-expiry reclamation. + * + * @locks mcluster->lock MUST be held. + */ +void rd_kafka_mock_sgrp_release_member_locks( + rd_kafka_mock_sharegroup_t *mshgrp, + const char *member_id) { + rd_kafka_mock_sgrp_partmeta_t *pmeta; + + TAILQ_FOREACH(pmeta, &mshgrp->partitions, link) { + rd_kafka_mock_sgrp_record_state_t *state, *tmp; + TAILQ_FOREACH_SAFE(state, &pmeta->inflight, link, tmp) { + if (state->state != + RD_KAFKA_MOCK_SGRP_RECORD_ACQUIRED) + continue; + if (!state->owner_member_id) + continue; + if (strcmp(state->owner_member_id, member_id) != 0) + continue; + + state->state = RD_KAFKA_MOCK_SGRP_RECORD_AVAILABLE; + rd_free(state->owner_member_id); + state->owner_member_id = NULL; + state->lock_expiry_ts = 0; + } + } +} + +/** + * @brief Proactively release any expired acquisition locks. + * + * Iterates all partition metadata in the share group and flips + * ACQUIRED records whose lock_expiry_ts has passed back to AVAILABLE. + * + * @locks mcluster->lock MUST be held. + */ +static void +rd_kafka_mock_sgrp_expire_locks(rd_kafka_mock_sharegroup_t *mshgrp, + rd_ts_t now) { + rd_kafka_mock_sgrp_partmeta_t *pmeta; + + TAILQ_FOREACH(pmeta, &mshgrp->partitions, link) { + rd_kafka_mock_sgrp_record_state_t *state, *tmp; + TAILQ_FOREACH_SAFE(state, &pmeta->inflight, link, tmp) { + if (state->state != + RD_KAFKA_MOCK_SGRP_RECORD_ACQUIRED) + continue; + if (!state->lock_expiry_ts || + state->lock_expiry_ts > now) + continue; + + /* Lock has expired — release back to AVAILABLE */ + state->state = RD_KAFKA_MOCK_SGRP_RECORD_AVAILABLE; + RD_IF_FREE(state->owner_member_id, rd_free); + state->owner_member_id = NULL; + state->lock_expiry_ts = 0; + } + } +} + +/** + * @brief Periodic timer: expire stale share-fetch sessions and + * proactively reclaim expired acquisition locks. + * + * @locks mcluster->lock is acquired and released. + */ +void rd_kafka_mock_sgrp_fetch_session_tmr_cb(rd_kafka_timers_t *rkts, + void *arg) { + rd_kafka_mock_sharegroup_t *mshgrp = arg; + rd_kafka_mock_sgrp_fetch_session_t *session, *tmp; + rd_ts_t now = rd_clock(); + rd_kafka_mock_cluster_t *mcluster = mshgrp->cluster; + + (void)rkts; + + mtx_lock(&mcluster->lock); + + /* 1. Expire stale sessions and release their member locks. */ + TAILQ_FOREACH_SAFE(session, &mshgrp->fetch_sessions, link, tmp) { + if (session->ts_last_activity + + (mshgrp->session_timeout_ms * 1000) > + now) + continue; + + /* Release all locks held by this member before + * destroying the session. */ + rd_kafka_mock_sgrp_release_member_locks(mshgrp, + session->member_id); + + TAILQ_REMOVE(&mshgrp->fetch_sessions, session, link); + mshgrp->fetch_session_cnt--; + rd_kafka_mock_sgrp_fetch_session_destroy(session); + } + + /* 2. Proactively reclaim any expired acquisition locks. + * This catches records whose owning consumer crashed + * without closing its session cleanly. */ + rd_kafka_mock_sgrp_expire_locks(mshgrp, now); + + mtx_unlock(&mcluster->lock); +} + /** * @brief A client connection closed, check if any sharegroup has any * state for this connection that needs to be cleared. diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index ef0e4c4d73..6d584609f6 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -614,7 +614,8 @@ struct rd_kafka_op_s { * PART_PUSH_LEADER_RESPONSE */ char *str; /**< For: - * COORD_SET (key) */ + * COORD_SET (key) + */ int32_t partition; /**< For: * PART_SET_FOLLOWER * PART_SET_FOLLOWER_WMARKS diff --git a/tests/0156-kip932-sharefetch_mockbroker.c b/tests/0156-kip932-sharefetch_mockbroker.c new file mode 100644 index 0000000000..0b375d762e --- /dev/null +++ b/tests/0156-kip932-sharefetch_mockbroker.c @@ -0,0 +1,893 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2025, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" + +#include "../src/rdkafka_proto.h" + +/** + * @name KIP-932 ShareFetch mock broker tests using the share consumer API. + * + * Exercises the ShareFetch path via mock broker. There is no coordinator + * or ShareAcknowledge support in the mock broker, so group management and + * ack-based state transitions are not validated here. + */ + +typedef struct test_ctx_s { + rd_kafka_t *producer; + rd_kafka_mock_cluster_t *mcluster; + const char *bootstraps; +} test_ctx_t; + +static test_ctx_t test_ctx_new(void) { + test_ctx_t ctx; + rd_kafka_conf_t *conf; + char errstr[512]; + + memset(&ctx, 0, sizeof(ctx)); + + ctx.mcluster = test_mock_cluster_new(3, &ctx.bootstraps); + + TEST_ASSERT(rd_kafka_mock_set_apiversion(ctx.mcluster, + RD_KAFKAP_ShareGroupHeartbeat, + 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to enable ShareGroupHeartbeat"); + TEST_ASSERT( + rd_kafka_mock_set_apiversion(ctx.mcluster, RD_KAFKAP_ShareFetch, 1, + 1) == RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to enable ShareFetch"); + + /* Create a producer targeting the mock cluster */ + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "bootstrap.servers", ctx.bootstraps); + + ctx.producer = + rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); + TEST_ASSERT(ctx.producer != NULL, "Failed to create producer: %s", + errstr); + + return ctx; +} + +static void test_ctx_destroy(test_ctx_t *ctx) { + if (ctx->producer) + rd_kafka_destroy(ctx->producer); + if (ctx->mcluster) + test_mock_cluster_destroy(ctx->mcluster); + memset(ctx, 0, sizeof(*ctx)); +} + +static void produce_messages(rd_kafka_t *producer, + const char *topic, + int msgcnt) { + for (int i = 0; i < msgcnt; i++) { + char payload[64]; + snprintf(payload, sizeof(payload), "%s-%d", topic, i); + TEST_ASSERT( + rd_kafka_producev( + producer, RD_KAFKA_V_TOPIC(topic), + RD_KAFKA_V_VALUE(payload, strlen(payload)), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_END) == RD_KAFKA_RESP_ERR_NO_ERROR, + "Produce failed"); + } + rd_kafka_flush(producer, 5000); +} + +static rd_kafka_share_t *new_share_consumer(const char *bootstraps, + const char *group_id) { + rd_kafka_conf_t *conf; + rd_kafka_share_t *consumer; + + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "group.id", group_id); + + consumer = rd_kafka_share_consumer_new(conf, NULL, 0); + TEST_ASSERT(consumer != NULL, "Failed to create share consumer"); + rd_kafka_share_poll_set_consumer(consumer); + return consumer; +} + +static void subscribe_topics(rd_kafka_share_t *consumer, + const char **topics, + int topic_cnt) { + rd_kafka_topic_partition_list_t *tpl = + rd_kafka_topic_partition_list_new(topic_cnt); + for (int i = 0; i < topic_cnt; i++) { + rd_kafka_topic_partition_list_add(tpl, topics[i], + RD_KAFKA_PARTITION_UA); + } + TEST_ASSERT(!rd_kafka_share_subscribe(consumer, tpl), + "Subscribe failed"); + rd_kafka_topic_partition_list_destroy(tpl); +} + +static int +consume_n(rd_kafka_share_t *consumer, int expected, int max_attempts) { + int consumed = 0; + int attempts = 0; + + while (consumed < expected && attempts < max_attempts) { + rd_kafka_message_t *rkmessages[100]; + size_t rcvd_msgs = 0; + rd_kafka_error_t *error; + + error = rd_kafka_share_consume_batch(consumer, 500, rkmessages, + &rcvd_msgs); + attempts++; + + if (error) { + TEST_SAY("consume error: %s\n", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + continue; + } + + for (size_t i = 0; i < rcvd_msgs; i++) { + rd_kafka_message_t *rkmsg = rkmessages[i]; + if (rkmsg->err) { + TEST_SAY("consume error: %s\n", + rd_kafka_message_errstr(rkmsg)); + rd_kafka_message_destroy(rkmsg); + continue; + } + TEST_SAY("Consumed: %.*s\n", (int)rkmsg->len, + (const char *)rkmsg->payload); + consumed++; + rd_kafka_message_destroy(rkmsg); + } + } + + return consumed; +} + + +static void do_test_basic_consume(void) { + const char *topic = "kip932_pos_basic"; + const int msgcnt = 5; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer; + int consumed; + + SUB_TEST_QUICK(); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + produce_messages(ctx.producer, topic, msgcnt); + + consumer = new_share_consumer(ctx.bootstraps, "sg-pos-basic"); + subscribe_topics(consumer, &topic, 1); + consumed = consume_n(consumer, msgcnt, 50); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed == msgcnt, + "Expected %d consumed, got %d", msgcnt, consumed); + SUB_TEST_PASS(); +} + +static void do_test_followup_fetch(void) { + const char *topic = "kip932_pos_followup"; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer; + int consumed; + + SUB_TEST_QUICK(); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + produce_messages(ctx.producer, topic, 5); + + consumer = new_share_consumer(ctx.bootstraps, "sg-pos-followup"); + subscribe_topics(consumer, &topic, 1); + consumed = consume_n(consumer, 3, 30); + consumed += consume_n(consumer, 2, 30); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed == 5, + "Expected 5 consumed, got %d", consumed); + SUB_TEST_PASS(); +} + +static void do_test_multi_partition(void) { + const char *topic = "kip932_pos_multi_part"; + const int msgcnt = 6; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer; + int consumed; + + SUB_TEST_QUICK(); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 2, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + produce_messages(ctx.producer, topic, msgcnt); + + consumer = new_share_consumer(ctx.bootstraps, "sg-pos-multipart"); + subscribe_topics(consumer, &topic, 1); + consumed = consume_n(consumer, msgcnt, 60); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed == msgcnt, + "Expected %d consumed, got %d", msgcnt, consumed); + SUB_TEST_PASS(); +} + +static void do_test_multi_topic(void) { + const char *topic_a = "kip932_pos_topic_a"; + const char *topic_b = "kip932_pos_topic_b"; + const char *topics[] = {topic_a, topic_b}; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer; + int consumed; + + SUB_TEST_QUICK(); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic_a, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic A"); + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic_b, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic B"); + + produce_messages(ctx.producer, topic_a, 2); + produce_messages(ctx.producer, topic_b, 2); + consumer = new_share_consumer(ctx.bootstraps, "sg-pos-multitopic"); + subscribe_topics(consumer, topics, 2); + consumed = consume_n(consumer, 4, 40); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed == 4, + "Expected 4 consumed, got %d", consumed); + SUB_TEST_PASS(); +} + +static void do_test_empty_topic_no_records(void) { + const char *topic = "kip932_pos_empty"; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer; + int consumed; + + SUB_TEST_QUICK(); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + + consumer = new_share_consumer(ctx.bootstraps, "sg-pos-empty"); + subscribe_topics(consumer, &topic, 1); + consumed = consume_n(consumer, 1, 5); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed == 0, + "Expected 0 consumed, got %d", consumed); + SUB_TEST_PASS(); +} + +static void do_test_negative_sharefetch_error(rd_kafka_resp_err_t err) { + const char *topic = "kip932_neg_sharefetch_error"; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer; + int consumed; + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + produce_messages(ctx.producer, topic, 1); + + rd_kafka_mock_push_request_errors(ctx.mcluster, RD_KAFKAP_ShareFetch, 1, + err); + + consumer = new_share_consumer(ctx.bootstraps, "sg-neg-sharefetch"); + subscribe_topics(consumer, &topic, 1); + consumed = consume_n(consumer, 1, 5); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed == 0, + "Expected 0 consumed, got %d", consumed); +} + +static void do_test_sharefetch_invalid_session_epoch(void) { + SUB_TEST_QUICK(); + do_test_negative_sharefetch_error( + RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH); + SUB_TEST_PASS(); +} + +static void do_test_sharefetch_unknown_topic_or_part(void) { + SUB_TEST_QUICK(); + do_test_negative_sharefetch_error( + RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART); + SUB_TEST_PASS(); +} + +static void do_test_sghb_error(rd_kafka_resp_err_t err, int count) { + const char *topic = "kip932_neg_sghb"; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer; + int consumed; + rd_kafka_resp_err_t *errs; + int i; + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + produce_messages(ctx.producer, topic, 1); + + /* Build an array of 'count' identical errors and push them all. + * Using the array variant avoids UB from mismatched varargs count. */ + errs = malloc(sizeof(*errs) * count); + TEST_ASSERT(errs != NULL, "malloc failed"); + for (i = 0; i < count; i++) + errs[i] = err; + rd_kafka_mock_push_request_errors_array( + ctx.mcluster, RD_KAFKAP_ShareGroupHeartbeat, + (size_t)count, errs); + free(errs); + + consumer = new_share_consumer(ctx.bootstraps, "sg-neg-sghb"); + subscribe_topics(consumer, &topic, 1); + consumed = consume_n(consumer, 1, 5); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed == 0, + "Expected 0 consumed, got %d", consumed); +} + +static void do_test_sghb_coord_unavailable(void) { + SUB_TEST_QUICK(); + do_test_sghb_error(RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, 50); + SUB_TEST_PASS(); +} + +static void do_test_topic_error(rd_kafka_resp_err_t err) { + const char *topic = "kip932_neg_topic_error"; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer; + int consumed; + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + produce_messages(ctx.producer, topic, 1); + rd_kafka_mock_topic_set_error(ctx.mcluster, topic, err); + + consumer = new_share_consumer(ctx.bootstraps, "sg-neg-topicerr"); + subscribe_topics(consumer, &topic, 1); + consumed = consume_n(consumer, 1, 5); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed == 0, + "Expected 0 consumed, got %d", consumed); +} + +static void do_test_topic_error_unknown_topic_or_part(void) { + SUB_TEST_QUICK(); + do_test_topic_error(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART); + SUB_TEST_PASS(); +} + +static void do_test_unknown_topic_subscription(void) { + const char *topic = "kip932_neg_unknown_topic"; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer; + int consumed; + + SUB_TEST_QUICK(); + + consumer = new_share_consumer(ctx.bootstraps, "sg-neg-unknown-topic"); + subscribe_topics(consumer, &topic, 1); + consumed = consume_n(consumer, 1, 5); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed == 0, + "Expected 0 consumed, got %d", consumed); + SUB_TEST_PASS(); +} + +static void do_test_empty_fetch_no_records(void) { + const char *topic = "kip932_neg_empty_fetch"; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer; + int consumed; + + SUB_TEST_QUICK(); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + + consumer = new_share_consumer(ctx.bootstraps, "sg-neg-empty"); + subscribe_topics(consumer, &topic, 1); + consumed = consume_n(consumer, 1, 5); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed == 0, + "Expected 0 consumed, got %d", consumed); + SUB_TEST_PASS(); +} + +/** + * @brief Verify that ShareFetch rejects requests from an unregistered member + * (UNKNOWN_MEMBER_ID), and that after the member re-joins it can + * consume again. + * + * Phase 1: Consumer joins normally via SGHB -> consumes messages OK. + * Phase 2: Push SGHB errors -> heartbeats fail -> member expires -> broker + * rejects ShareFetch with UNKNOWN_MEMBER_ID. + * Phase 3: SGHB errors drain -> member re-joins -> consumes again. + */ +static void do_test_member_validation(void) { + const char *topic = "kip932_member_validation"; + const int msgcnt = 4; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer; + int consumed_p1, consumed_p3; + + SUB_TEST(); + + /* Short session timeout so the member is evicted quickly once + * heartbeats stop succeeding. */ + rd_kafka_mock_sharegroup_set_session_timeout(ctx.mcluster, 500); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + produce_messages(ctx.producer, topic, msgcnt); + + consumer = new_share_consumer(ctx.bootstraps, "sg-member-val"); + subscribe_topics(consumer, &topic, 1); + + /* Phase 1: Consume normally -- member is registered via SGHB. */ + consumed_p1 = consume_n(consumer, 2, 30); + TEST_SAY("member_validation: phase1 consumed %d/2\n", consumed_p1); + + /* Phase 2: Block SGHB so heartbeats fail. + * Push enough errors to cover the window while we wait for the + * member to be evicted. */ + rd_kafka_mock_push_request_errors( + ctx.mcluster, RD_KAFKAP_ShareGroupHeartbeat, 20, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE); + + /* Wait for the member to be evicted (500ms session timeout + margin). */ + usleep(1500 * 1000); + + /* Phase 3: SGHB errors will eventually drain. Once a SGHB + * succeeds, the member re-joins and the remaining records + * become fetchable again. */ + consumed_p3 = consume_n(consumer, 2, 50); + TEST_SAY("member_validation: phase3 consumed %d/2\n", consumed_p3); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed_p1 >= 2 && (consumed_p1 + consumed_p3) >= msgcnt, + "Expected at least 2+2, got %d+%d", + consumed_p1, consumed_p3); + SUB_TEST_PASS(); +} + +static void do_test_sharefetch_session_expiry_rtt(void) { + const char *topic = "kip932_rtt_expiry"; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer; + int consumed; + + SUB_TEST(); + + /* Session timeout must be long enough for normal requests + * to complete, but short enough to expire during high RTT. */ + rd_kafka_mock_sharegroup_set_session_timeout(ctx.mcluster, 1000); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + produce_messages(ctx.producer, topic, 2); + + consumer = new_share_consumer(ctx.bootstraps, "sg-rtt-expiry"); + subscribe_topics(consumer, &topic, 1); + + /* Phase 1: consume one message with normal RTT (no injection). */ + consumed = consume_n(consumer, 1, 20); + TEST_SAY("rtt_expiry: phase1 consumed %d/1\n", consumed); + + /* Phase 2: inject RTT >> session timeout to force session expiry. + * All requests to broker 1 now take 3s, but the session + * expires after 1s of inactivity. */ + rd_kafka_mock_broker_set_rtt(ctx.mcluster, 1, 3000); + usleep(2000 * 1000); /* wait for session to expire */ + + /* Phase 3: clear RTT and let the consumer recover. */ + rd_kafka_mock_broker_set_rtt(ctx.mcluster, 1, 0); + consumed += consume_n(consumer, 1, 30); + TEST_SAY("rtt_expiry: phase3 consumed %d/2 total\n", consumed); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed == 2, + "Expected 2 consumed, got %d", consumed); + SUB_TEST_PASS(); +} + +static void do_test_forgotten_topics(void) { + const char *topic_a = "kip932_forgotten_a"; + const char *topic_b = "kip932_forgotten_b"; + const char *both[] = {topic_a, topic_b}; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer; + int consumed; + + SUB_TEST_QUICK(); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic_a, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic A"); + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic_b, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic B"); + + /* Produce 2 messages to each topic */ + produce_messages(ctx.producer, topic_a, 2); + produce_messages(ctx.producer, topic_b, 2); + + /* Subscribe to both topics and consume all 4 messages */ + consumer = new_share_consumer(ctx.bootstraps, "sg-forgotten"); + subscribe_topics(consumer, both, 2); + consumed = consume_n(consumer, 4, 40); + TEST_SAY("forgotten_topics: consumed %d/4 from both topics\n", + consumed); + + /* Re-subscribe to only topic_a (topic_b becomes forgotten) */ + subscribe_topics(consumer, &topic_a, 1); + + /* Produce 2 more messages to topic_a */ + produce_messages(ctx.producer, topic_a, 2); + + /* Consume the 2 new messages -- only topic_a should deliver */ + consumed += consume_n(consumer, 2, 30); + TEST_SAY("forgotten_topics: consumed %d/6 total after forget\n", + consumed); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + /* We expect at least the 4 initial + 2 from topic_a = 6. + * Depending on timing the consumer may or may not have already + * received all messages from the first round, so we accept >= 4. */ + TEST_ASSERT(consumed >= 4, + "Expected at least 4 consumed, got %d", consumed); + SUB_TEST_PASS(); +} + +/** + * @brief Produce messages one-at-a-time (each flush creates a separate + * msgset on the mock partition), then consume and verify all are + * received. This validates that the ShareFetch response includes + * records from *all* acquired msgsets, not just the first one. + */ +static void do_test_multi_batch_consume(void) { + const char *topic = "kip932_multi_batch"; + const int msgcnt = 5; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer; + int consumed; + + SUB_TEST_QUICK(); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + + /* Produce each message individually with a flush in between, + * guaranteeing separate msgsets on the mock partition. */ + for (int i = 0; i < msgcnt; i++) { + char payload[64]; + snprintf(payload, sizeof(payload), "batch-%d", i); + TEST_ASSERT( + rd_kafka_producev( + ctx.producer, RD_KAFKA_V_TOPIC(topic), + RD_KAFKA_V_VALUE(payload, strlen(payload)), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_END) == RD_KAFKA_RESP_ERR_NO_ERROR, + "Produce failed"); + rd_kafka_flush(ctx.producer, 5000); + } + + consumer = new_share_consumer(ctx.bootstraps, "sg-multi-batch"); + subscribe_topics(consumer, &topic, 1); + consumed = consume_n(consumer, msgcnt, 50); + TEST_SAY("multi_batch: consumed %d/%d\n", consumed, msgcnt); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed == msgcnt, + "Expected %d consumed, got %d", msgcnt, consumed); + SUB_TEST_PASS(); +} + +/** + * @brief Verify that max_delivery_attempts causes records to be archived + * after the limit is exceeded. Consumer A acquires all records, then + * its session times out (releasing locks). Consumer B acquires them + * again, and its session also times out. After the delivery limit is + * exhausted, Consumer C should see 0 available records. + */ +static void do_test_max_delivery_attempts(void) { + const char *topic = "kip932_max_delivery"; + const int msgcnt = 3; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer; + int consumed_a, consumed_b, consumed_c; + + SUB_TEST(); + + /* Set max delivery attempts to 2 and a short session timeout + * so locks expire quickly after consumer destruction. */ + rd_kafka_mock_sharegroup_set_max_delivery_attempts(ctx.mcluster, 2); + rd_kafka_mock_sharegroup_set_session_timeout(ctx.mcluster, 500); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + produce_messages(ctx.producer, topic, msgcnt); + + /* Delivery 1: Consumer A acquires and "crashes" (no ack). */ + consumer = new_share_consumer(ctx.bootstraps, "sg-max-delivery"); + subscribe_topics(consumer, &topic, 1); + consumed_a = consume_n(consumer, msgcnt, 50); + TEST_SAY("max_delivery: A consumed %d/%d (delivery 1)\n", + consumed_a, msgcnt); + rd_kafka_share_destroy(consumer); + usleep(1500 * 1000); /* wait for lock expiry */ + + /* Delivery 2: Consumer B acquires same records again (delivery_count + * reaches 2 = limit) and "crashes". */ + consumer = new_share_consumer(ctx.bootstraps, "sg-max-delivery"); + subscribe_topics(consumer, &topic, 1); + consumed_b = consume_n(consumer, msgcnt, 50); + TEST_SAY("max_delivery: B consumed %d/%d (delivery 2)\n", + consumed_b, msgcnt); + rd_kafka_share_destroy(consumer); + usleep(1500 * 1000); /* wait for lock expiry */ + + /* Delivery 3 attempt: Consumer C should get 0 records because + * all records have been archived (delivery_count >= max). */ + consumer = new_share_consumer(ctx.bootstraps, "sg-max-delivery"); + subscribe_topics(consumer, &topic, 1); + consumed_c = consume_n(consumer, 1, 10); + TEST_SAY("max_delivery: C consumed %d/0 (should be archived)\n", + consumed_c); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed_a == msgcnt && consumed_b == msgcnt && + consumed_c == 0, + "Expected A=%d B=%d C=0, got A=%d B=%d C=%d", + msgcnt, msgcnt, consumed_a, consumed_b, consumed_c); + SUB_TEST_PASS(); +} + +/** + * @brief Verify that record_lock_duration_ms controls how long acquired + * records stay locked, independently of session_timeout_ms. + * Sets a short lock duration (300ms) with a longer session timeout + * (10s). Consumer A acquires records and "crashes". After the short + * lock duration expires, Consumer B should be able to acquire them + * even though A's session hasn't timed out yet. + */ +static void do_test_record_lock_duration(void) { + const char *topic = "kip932_lock_duration"; + const int msgcnt = 3; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer; + int consumed_a, consumed_b; + + SUB_TEST(); + + /* Long session timeout, short record lock duration. */ + rd_kafka_mock_sharegroup_set_session_timeout(ctx.mcluster, 10000); + rd_kafka_mock_sharegroup_set_record_lock_duration(ctx.mcluster, 300); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + produce_messages(ctx.producer, topic, msgcnt); + + /* Consumer A acquires records, then crashes (no close). */ + consumer = new_share_consumer(ctx.bootstraps, "sg-lock-duration"); + subscribe_topics(consumer, &topic, 1); + consumed_a = consume_n(consumer, msgcnt, 50); + TEST_SAY("lock_duration: A consumed %d/%d\n", consumed_a, msgcnt); + rd_kafka_share_destroy(consumer); + + /* Wait for record lock to expire (300ms + margin), + * but NOT session timeout (10s). */ + usleep(800 * 1000); + + /* Consumer B should get the records because locks have expired + * even though A's session is still technically alive. */ + consumer = new_share_consumer(ctx.bootstraps, "sg-lock-duration"); + subscribe_topics(consumer, &topic, 1); + consumed_b = consume_n(consumer, msgcnt, 50); + TEST_SAY("lock_duration: B consumed %d/%d\n", consumed_b, msgcnt); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed_a == msgcnt && consumed_b == msgcnt, + "Expected A=%d B=%d, got A=%d B=%d", + msgcnt, msgcnt, consumed_a, consumed_b); + SUB_TEST_PASS(); +} + +/** + * @brief Multi-consumer lock expiry test. + * + * Consumer A acquires records, then crashes (destroyed without close). + * After the lock expiry timeout, consumer B should be able to pick up + * the same records because the proactive lock-expiry scan releases them. + */ +static void do_test_multi_consumer_lock_expiry(void) { + const char *topic = "kip932_multi_consumer_lock"; + const int msgcnt = 3; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer_a, *consumer_b; + int consumed_a, consumed_b; + + SUB_TEST(); + + /* Use a short session/lock timeout so the test runs quickly. */ + rd_kafka_mock_sharegroup_set_session_timeout(ctx.mcluster, 500); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + produce_messages(ctx.producer, topic, msgcnt); + + /* Consumer A: subscribe and consume all records (acquires locks). */ + consumer_a = + new_share_consumer(ctx.bootstraps, "sg-multi-consumer-lock"); + subscribe_topics(consumer_a, &topic, 1); + consumed_a = consume_n(consumer_a, msgcnt, 50); + TEST_SAY("multi_consumer: A consumed %d/%d\n", consumed_a, msgcnt); + + /* Simulate crash: destroy consumer A without calling close. + * The session will time out and the proactive lock-expiry + * timer will release A's locks. */ + rd_kafka_share_destroy(consumer_a); + + /* Wait for locks to expire (session_timeout=500ms, add margin). */ + usleep(1500 * 1000); + + /* Consumer B: joins the same share group, should get the same + * records once the locks have been released. */ + consumer_b = + new_share_consumer(ctx.bootstraps, "sg-multi-consumer-lock"); + subscribe_topics(consumer_b, &topic, 1); + consumed_b = consume_n(consumer_b, msgcnt, 50); + TEST_SAY("multi_consumer: B consumed %d/%d\n", consumed_b, msgcnt); + + rd_kafka_share_consumer_close(consumer_b); + rd_kafka_share_destroy(consumer_b); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed_a == msgcnt && consumed_b == msgcnt, + "Expected A=%d B=%d, got A=%d B=%d", + msgcnt, msgcnt, consumed_a, consumed_b); + SUB_TEST_PASS(); +} + + +int main_0156_kip932_sharefetch_mockbroker(int argc, char **argv) { + TEST_SKIP_MOCK_CLUSTER(0); + + /* Positive scenarios */ + do_test_basic_consume(); + do_test_followup_fetch(); + do_test_multi_partition(); + do_test_multi_topic(); + do_test_empty_topic_no_records(); + do_test_sharefetch_session_expiry_rtt(); + do_test_forgotten_topics(); + do_test_multi_batch_consume(); + do_test_max_delivery_attempts(); + do_test_record_lock_duration(); + do_test_multi_consumer_lock_expiry(); + + /* Negative scenarios */ + do_test_sharefetch_invalid_session_epoch(); + do_test_sharefetch_unknown_topic_or_part(); + do_test_sghb_coord_unavailable(); + do_test_topic_error_unknown_topic_or_part(); + do_test_unknown_topic_subscription(); + do_test_empty_fetch_no_records(); + do_test_member_validation(); + + return 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 243309dfd6..88fa3eee3d 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -146,6 +146,7 @@ set( 0153-memberid.c 0154-share-consumer.c 0155-share_group_heartbeat_mock.c + 0156-kip932-sharefetch_mockbroker.c 8000-idle.cpp 8001-fetch_from_follower_mock_manual.c test.c diff --git a/tests/test.c b/tests/test.c index fda75fac32..300cbeb079 100644 --- a/tests/test.c +++ b/tests/test.c @@ -274,6 +274,7 @@ _TEST_DECL(0152_rebootstrap_local); _TEST_DECL(0153_memberid); _TEST_DECL(0154_share_consumer); _TEST_DECL(0155_share_group_heartbeat_mock); +_TEST_DECL(0156_kip932_sharefetch_mockbroker); /* Manual tests */ _TEST_DECL(8000_idle); @@ -544,6 +545,7 @@ struct test tests[] = { _TEST(0153_memberid, TEST_F_LOCAL), _TEST(0154_share_consumer, 0, TEST_BRKVER(0, 4, 0, 0)), _TEST(0155_share_group_heartbeat_mock, TEST_F_LOCAL), + _TEST(0156_kip932_sharefetch_mockbroker, TEST_F_LOCAL), /* Manual tests */ _TEST(8000_idle, TEST_F_MANUAL),