Skip to content

Commit 6092d23

Browse files
k-rainaAnkith-Confluent
authored andcommitted
MockBroker: Implement ShareFetch API (#5302)
* Mock Broker : Implement share fetch * ShareGroupHeartBeat MVP * Add sharegroup session management and manual assignment features - Introduced functions to set session timeout and heartbeat interval for sharegroups. - Implemented manual target assignment for sharegroup members. - Enhanced connection handling to clear states for closed connections. * Add function to retrieve member IDs from a sharegroup and update assignment handling * Mock Broker : Implement share fetch * Fix SGHB return and fetcher api version * Add test for share fetch API * Fix session workflow * Update API version * Add handing for ForgottenTopicsData * Add lock expiry logic * Remove duplicates with sghb * Address feedbacks * Implement max delivery attempts and record lock * Add member validation for share fetch * Remove tmp produce code * Implement feedback * Minor * Add test to CMakeList * Fix minor test failure --------- Co-authored-by: Ankith-Confluent <[email protected]>
1 parent 6c00542 commit 6092d23

8 files changed

Lines changed: 1988 additions & 4 deletions

src/rdkafka_mock.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,32 @@ RD_EXPORT void rd_kafka_mock_sharegroup_set_heartbeat_interval(
621621
rd_kafka_mock_cluster_t *mcluster,
622622
int heartbeat_interval_ms);
623623

624+
/**
625+
* @brief Set the maximum number of times a record can be acquired
626+
* before it is automatically archived (dead-lettered).
627+
*
628+
* Default is 5. Set to 0 for unlimited deliveries.
629+
*
630+
* @param mcluster Mock cluster instance.
631+
* @param max_attempts Maximum delivery attempts per record.
632+
*/
633+
RD_EXPORT void rd_kafka_mock_sharegroup_set_max_delivery_attempts(
634+
rd_kafka_mock_cluster_t *mcluster,
635+
int max_attempts);
636+
637+
/**
638+
* @brief Set the per-record lock duration in milliseconds.
639+
*
640+
* When a record is acquired, its lock expires after this duration.
641+
* Default is 0, which falls back to the session timeout value.
642+
*
643+
* @param mcluster Mock cluster instance.
644+
* @param lock_duration_ms Lock duration in milliseconds.
645+
*/
646+
RD_EXPORT void rd_kafka_mock_sharegroup_set_record_lock_duration(
647+
rd_kafka_mock_cluster_t *mcluster,
648+
int lock_duration_ms);
649+
624650
/**
625651
* @brief Set a manual target assignment for a sharegroup.
626652
*

0 commit comments

Comments
 (0)