Skip to content

MockBroker: Implement ShareFetch API#5302

Merged
Ankith L (Ankith-Confluent) merged 21 commits intodev_kip-932_queues-for-kafkafrom
kraina-mock-broker-share-fetch
Feb 15, 2026
Merged

MockBroker: Implement ShareFetch API#5302
Ankith L (Ankith-Confluent) merged 21 commits intodev_kip-932_queues-for-kafkafrom
kraina-mock-broker-share-fetch

Conversation

@k-raina
Copy link
Member

Summary

  • Implemented KIP‑932 ShareFetch in the mock broker, including session management, partition metadata (SPSO/SPEO), inflight record state, acquisition logic, and response construction.

  • Wired ShareFetch/ShareGroupHeartbeat ApiVersions, fixed client ShareFetch request versioning, and ensured SGHB sends subscription topics.

  • Added a comprehensive ShareFetch mock‑broker test suite (share consumer) with positive/negative scenarios and RTT‑expiry coverage.

ShareFetch Workflow

[1] Parse Request
  - Read GroupId, MemberId, SessionEpoch
  - Read Wait/Min/Max/MaxRecords/BatchSize
  - Read Topics -> Partitions -> AckBatches/AckTypes
  - Skip partition/topic/top-level tags

        |
        v

[2] Resolve ShareGroup
  - Lookup by GroupId
  - Create new group if missing
  - Start session timer (if new)

        |
        v

[3] Session Management
  - Find session by member/session id
  - Validate SessionEpoch
  - Update session partitions if changed
  - Update last-activity timestamp

        |
        v

[4] Partition Metadata + Acquisition
  - Validate topic + partition exist
  - Get/create SharePartitionMeta (SPSO/SPEO)
  - Prune archived inflight records
  - Acquire available offsets with caps

        |
        v

[5] Build ShareFetch Response
  - Write ThrottleTime, Error, LockTimeout
  - For each topic/partition:
      * Errors + CurrentLeader
      * Records (msgset)
      * AcquiredRecords
      * Tags
  - Write NodeEndpoints + top-level tags

        |
        v

[6] Client Parse + Delivery
  - Parse response + records
  - Store acquired records for ack
  - Deliver messages to app

High level Classe and Data structures

MockCluster
  |
  +-- shareGroups : TAILQ<MockShareGroup>
         |
         +-- id : String
         +-- groupEpoch : int
         +-- sessionTimeoutMs : int
         |
         +-- fetchSessions : TAILQ<ShareFetchSession>
         |       |
         |       +-- memberId : String
         |       +-- sessionId : int
         |       +-- sessionEpoch : int
         |       +-- lastActivityTs : long
         |       +-- partitions : List<TopicPartition>
         |
         +-- partitions : TAILQ<SharePartitionMeta>
                 |
                 +-- topicId : UUID
                 +-- partition : int
                 +-- spso : long
                 +-- speo : long
                 +-- inflight : TAILQ<RecordState>
                         |
                         +-- offset : long
                         +-- ownerMemberId : String
                         +-- lockExpiryTs : long
                         +-- deliveryCount : int
                         +-- state : RecordStateState
                                   { AVAILABLE | ACQUIRED | ARCHIVED }

@k-raina Kaushik Raina (k-raina) requested a review from a team as a code owner February 6, 2026 13:01
@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.


ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb, RD_KAFKAP_ShareFetch,
1, 1, NULL);
0, 0, NULL);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java version of Share Fetch supports version (1, 1) and above, so I think this should also be the same

rd_kafka_mock_cgrp_classic_t *mcgrp_classic;
rd_kafka_mock_cgrp_consumer_t *mcgrp_consumer;
rd_kafka_mock_sharegroup_t *mshgrp;
rd_kafka_mock_sgrp_t *msgrp;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually duplicate.
rd_kafka_mock_sharegroup_t already exists

while ((mshgrp = TAILQ_FIRST(&mcluster->sharegrps)))
rd_kafka_mock_sharegroup_destroy(mshgrp);

while ((msgrp = TAILQ_FIRST(&mcluster->sgrps_share)))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same regarding this, this is also duplicate.

TAILQ_INIT(&mcluster->cgrps_consumer);

rd_kafka_mock_sharegrps_init(mcluster);
TAILQ_INIT(&mcluster->sgrps_share);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The init is already present
rd_kafka_mock_sharegrps_init(mcluster);

* @locks mcluster->lock MUST be held.
*/
rd_kafka_mock_sgrp_t *
rd_kafka_mock_sgrp_find(rd_kafka_mock_cluster_t *mcluster,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already present in src/rdkafka_mock_sharegrp.c

rd_kafka_mock_sgrp_find(rd_kafka_mock_cluster_t *mcluster,
const rd_kafkap_str_t *GroupId);
rd_kafka_mock_sgrp_t *
rd_kafka_mock_sgrp_get(rd_kafka_mock_cluster_t *mcluster,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

find, get, and destroy are duplicates

}

rd_kafka_buf_skip_tags(rkbuf);
/* ForgottenToppars */

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didnt understand why this code block is present in the metadata_handler.
Shouldnt this be part of share fetch handler

TAILQ_FOREACH_SAFE(state, &pmeta->inflight, link, tmp) {
if (state->state != RD_KAFKA_MOCK_SGRP_RECORD_ARCHIVED)
continue;
if (state->offset <= pmeta->speo)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The messages will not be pruned in every scenario, because the offset can never be more than speo.
So it should be state->offset >= pmeta->spso instead.

rd_kafka_buf_write_kbytes(
resp, mset ? &mset->bytes : NULL);
/* Response: AcquiredRecords */
rd_kafka_buf_write_arraycnt(resp, 0);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for it to be 0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we have kept acknolegement workflow seperate

}

static const rd_kafka_mock_msgset_t *
rd_kafka_mock_sgrp_first_acquired_msgset(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we sending only the first message set?
Shouldnt we send all the sets in the acquired offset range

rd_free(mcgrp);
}

/**

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we shift these functions to rdkafka_mock_sharegrp.c

* @brief Destroy share-partition metadata.
*/
static void
rd_kafka_mock_sgrp_partmeta_destroy(rd_kafka_mock_sgrp_partmeta_t *pmeta) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is not being used anywhere
what is the use of this?

Actually sharegroup_destroy() handles it, so this is not required I guess

session->member_id);

TAILQ_REMOVE(&mshgrp->fetch_sessions, session, link);
mshgrp->fetch_session_cnt--;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this requires lock to modify share group related data?

const rd_kafka_mock_partition_t *mpart,
const rd_kafkap_str_t *member_id,
rd_ts_t now) {
const rd_kafka_mock_msgset_t *msgsets[256];

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is 256 intentional?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Kaushik Raina (@k-raina)
Thanks for the PR!
LGTM!

Base automatically changed from anl_MB_SGHB to dev_kip-932_queues-for-kafka February 13, 2026 06:32
@airlock-confluentinc airlock-confluentinc bot force-pushed the kraina-mock-broker-share-fetch branch from 6f6aba9 to 01874dd Compare February 13, 2026 11:17
@Ankith-Confluent Ankith L (Ankith-Confluent) merged commit cac0c73 into dev_kip-932_queues-for-kafka Feb 15, 2026
1 of 2 checks passed
@Ankith-Confluent Ankith L (Ankith-Confluent) deleted the kraina-mock-broker-share-fetch branch February 15, 2026 08:44
airlock-confluentinc bot pushed a commit that referenced this pull request Feb 16, 2026
* Mock Broker : Implement share fetch

* ShareGroupHeartBeat MVP

* Add sharegroup session management and manual assignment features

- Introduced functions to set session timeout and heartbeat interval for sharegroups.
- Implemented manual target assignment for sharegroup members.
- Enhanced connection handling to clear states for closed connections.

* Add function to retrieve member IDs from a sharegroup and update assignment handling

* Mock Broker : Implement share fetch

* Fix SGHB return and fetcher api version

* Add test for share fetch API

* Fix session workflow

* Update API version

* Add handing for ForgottenTopicsData

* Add lock expiry logic

* Remove duplicates with sghb

* Address feedbacks

* Implement max delivery attempts and record lock

* Add member validation for share fetch

* Remove tmp produce code

* Implement feedback

* Minor

* Add test to CMakeList

* Fix minor test failure

---------

Co-authored-by: Ankith-Confluent <[email protected]>
airlock-confluentinc bot pushed a commit that referenced this pull request Feb 16, 2026
* Mock Broker : Implement share fetch

* ShareGroupHeartBeat MVP

* Add sharegroup session management and manual assignment features

- Introduced functions to set session timeout and heartbeat interval for sharegroups.
- Implemented manual target assignment for sharegroup members.
- Enhanced connection handling to clear states for closed connections.

* Add function to retrieve member IDs from a sharegroup and update assignment handling

* Mock Broker : Implement share fetch

* Fix SGHB return and fetcher api version

* Add test for share fetch API

* Fix session workflow

* Update API version

* Add handing for ForgottenTopicsData

* Add lock expiry logic

* Remove duplicates with sghb

* Address feedbacks

* Implement max delivery attempts and record lock

* Add member validation for share fetch

* Remove tmp produce code

* Implement feedback

* Minor

* Add test to CMakeList

* Fix minor test failure

---------

Co-authored-by: Ankith-Confluent <[email protected]>
airlock-confluentinc bot pushed a commit that referenced this pull request Mar 2, 2026
* Mock Broker : Implement share fetch

* ShareGroupHeartBeat MVP

* Add sharegroup session management and manual assignment features

- Introduced functions to set session timeout and heartbeat interval for sharegroups.
- Implemented manual target assignment for sharegroup members.
- Enhanced connection handling to clear states for closed connections.

* Add function to retrieve member IDs from a sharegroup and update assignment handling

* Mock Broker : Implement share fetch

* Fix SGHB return and fetcher api version

* Add test for share fetch API

* Fix session workflow

* Update API version

* Add handing for ForgottenTopicsData

* Add lock expiry logic

* Remove duplicates with sghb

* Address feedbacks

* Implement max delivery attempts and record lock

* Add member validation for share fetch

* Remove tmp produce code

* Implement feedback

* Minor

* Add test to CMakeList

* Fix minor test failure

---------

Co-authored-by: Ankith-Confluent <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants