Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
1157dfd
Prototype code to obtain assignments
milindl Jul 14, 2025
a536e2f
Implement polling logic to work with normal fetch
milindl Oct 6, 2025
c23dd64
Call sharegroupheartbeat when leaving group (#5247)
PratRanj07 Nov 20, 2025
de014cf
Add SHARE_FETCH op handling in broker thread
pranavrth Oct 23, 2025
1b3bbed
Add session epoch and partition management
pranavrth Nov 13, 2025
a6e6fc5
Improve refcount debugging for toppar destroy
pranavrth Nov 20, 2025
1fed6eb
Add session leave while decommissioning the broker.
pranavrth Jan 8, 2026
6c9dee4
Implicitly acknowledge all batches instead of just the first batch wh…
pranavrth Nov 20, 2025
f51dc67
* Create a new type for share consumer
pranavrth Nov 25, 2025
10db908
Fix UNKNOWN_ERROR from broker arising because broker was not acceptin…
pranavrth Jan 8, 2026
9151d2b
ShareGroupHeartBeat API (#5299)
Ankith-Confluent Feb 13, 2026
cac0c73
MockBroker: Implement ShareFetch API (#5302)
k-raina Feb 15, 2026
e921c5f
Added more tests for SGHB API (#5320)
Ankith-Confluent Feb 20, 2026
27e2aba
Add more tests to sharefetch API mock broker (#5321)
Ankith-Confluent Feb 20, 2026
846983b
[KIP-932]: Basic testing for consume and heartbeat flow (#5258)
PratRanj07 Feb 23, 2026
917561f
Initial Style fix for Share Consumer
pranavrth Feb 16, 2026
71d1e95
Style fixes for the latest changes
pranavrth Feb 23, 2026
931830f
Segregate acquired records and make acknowledgement mapping
PratRanj07 Feb 16, 2026
81f3fbf
style fix
PratRanj07 Feb 16, 2026
88610fe
remove next_msg_idx usage and return everything broker has sent
PratRanj07 Feb 16, 2026
a38a84b
combine all three acknowledgement batches structure into a common str…
PratRanj07 Feb 18, 2026
2c06711
A comment
PratRanj07 Feb 18, 2026
798dcd0
variable naming and relocation
PratRanj07 Feb 19, 2026
b56a873
requested changes
PratRanj07 Feb 22, 2026
dd05801
nit
PratRanj07 Feb 22, 2026
d775285
some comments changes
PratRanj07 Feb 23, 2026
a193959
send only one response rko and consumer err ops
PratRanj07 Feb 23, 2026
8294e00
loop for reading all the messages in all share fetch response ops
PratRanj07 Feb 24, 2026
0ee01cf
unnecessary changes
PratRanj07 Feb 24, 2026
0686d47
Refactoring code
PratRanj07 Feb 25, 2026
02f1f7a
Additional comments added
PratRanj07 Feb 25, 2026
75ac008
Naming comment
PratRanj07 Feb 25, 2026
cb889e9
Convert list to pointer
PratRanj07 Feb 25, 2026
5a5b02f
KIP-932: Redesign FANOUT and SHARE_FETCH reply flow
pranavrth Feb 25, 2026
53f64dd
KIP-932: Wire ack_details into ShareFetchRequest and broker thread
pranavrth Feb 25, 2026
5679c6e
KIP-932: Remove old rktp_share_acknowledge fields from toppar
pranavrth Feb 25, 2026
fcd4268
KIP-932: Separate rk_rep callback processing from consumer queue in poll
pranavrth Feb 26, 2026
ccd1f76
KIP-932: Fix ack_details resource cleanup and segregation refactor
pranavrth Feb 26, 2026
48278ac
KIP-932: Fix double-free and use-after-free in share consumer
pranavrth Feb 26, 2026
aa01760
KIP-932: Fix NULL deref in build_response_rko and increase example bu…
pranavrth Feb 26, 2026
a83578d
KIP-932: Fix broker lock deadlock in share_select_broker and add debu…
pranavrth Mar 1, 2026
bb72e04
KIP-932: Add global fetch-in-flight guard to prevent duplicate FANOUTs
pranavrth Mar 2, 2026
ddc4c07
Coloring Share Consumer example output for testing
pranavrth Mar 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ offset_commit_cb | C | |
enable.partition.eof | C | true, false | false | low | Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition. <br>*Type: boolean*
check.crcs | C | true, false | false | medium | Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage. <br>*Type: boolean*
client.rack | * | | | low | A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config `broker.rack`. <br>*Type: string*
max.poll.records | C | 1 .. 2147483647 | 500 | low | tba description, <br>*Type: integer*
transactional.id | P | | | high | Enables the transactional producer. The transactional.id is used to identify the same transactional producer instance across process restarts. It allows the producer to guarantee that transactions corresponding to earlier instances of the same producer have been finalized prior to starting any new transactions, and that any zombie instances are fenced off. If no transactional.id is provided, then the producer is limited to idempotent delivery (if enable.idempotence is set). Requires broker version >= 0.11.0. <br>*Type: string*
transaction.timeout.ms | P | 1000 .. 2147483647 | 60000 | medium | The maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the broker, the init_transactions() call will fail with ERR_INVALID_TRANSACTION_TIMEOUT. The transaction timeout automatically adjusts `message.timeout.ms` and `socket.timeout.ms`, unless explicitly configured in which case they must not exceed the transaction timeout (`socket.timeout.ms` must be at least 100ms lower than `transaction.timeout.ms`). This is also the default timeout value if no timeout (-1) is supplied to the transactional API methods. <br>*Type: integer*
enable.idempotence | P | true, false | false | high | When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: `max.in.flight.requests.per.connection=5` (must be less than or equal to 5), `retries=INT32_MAX` (must be greater than 0), `acks=all`, `queuing.strategy=fifo`. Producer instantation will fail if user-supplied configuration is incompatible. <br>*Type: boolean*
Expand Down
3 changes: 2 additions & 1 deletion examples/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ alter_consumer_group_offsets
incremental_alter_configs
user_scram
list_offsets
elect_leaders
elect_leaders
share_consumer
6 changes: 5 additions & 1 deletion examples/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
EXAMPLES ?= rdkafka_example rdkafka_performance rdkafka_example_cpp \
rdkafka_complex_consumer_example rdkafka_complex_consumer_example_cpp \
kafkatest_verifiable_client \
producer consumer idempotent_producer transactions \
producer consumer share_consumer idempotent_producer transactions \
delete_records \
openssl_engine_example_cpp \
list_consumer_groups \
Expand Down Expand Up @@ -54,6 +54,10 @@ consumer: ../src/librdkafka.a consumer.c
$(CC) $(CPPFLAGS) $(CFLAGS) [email protected] -o $@ $(LDFLAGS) \
../src/librdkafka.a $(LIBS)

share_consumer: ../src/librdkafka.a share_consumer.c
$(CC) $(CPPFLAGS) $(CFLAGS) [email protected] -o $@ $(LDFLAGS) \
../src/librdkafka.a $(LIBS)

idempotent_producer: ../src/librdkafka.a idempotent_producer.c
$(CC) $(CPPFLAGS) $(CFLAGS) [email protected] -o $@ $(LDFLAGS) \
../src/librdkafka.a $(LIBS)
Expand Down
298 changes: 298 additions & 0 deletions examples/share_consumer.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2019-2022, Magnus Edenhill
* 2023, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

/**
* Simple high-level balanced Apache Kafka consumer
* using the Kafka driver from librdkafka
* (https://github.com/confluentinc/librdkafka)
*/

#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 199309L
#endif

#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <ctype.h>
#include <time.h>


/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is builtin from within the librdkafka source tree and thus differs. */
// #include <librdkafka/rdkafka.h>
#include "rdkafka.h"

/* ANSI color codes */
#define ANSI_RED "\033[31m"
#define ANSI_GREEN "\033[32m"
#define ANSI_YELLOW "\033[33m"
#define ANSI_CYAN "\033[36m"
#define ANSI_RESET "\033[0m"

#define TIME_BLOCK_MS(elapsed_var, expr) \
do { \
struct timespec __t0, __t1; \
if (clock_gettime(CLOCK_MONOTONIC, &__t0) != 0) \
perror("clock_gettime"); \
expr; \
if (clock_gettime(CLOCK_MONOTONIC, &__t1) != 0) \
perror("clock_gettime"); \
(elapsed_var) = (__t1.tv_sec - __t0.tv_sec) * 1000.0 + \
(__t1.tv_nsec - __t0.tv_nsec) / 1e6; \
} while (0)


static volatile sig_atomic_t run = 1;

/**
* @brief Signal termination of program
*/
static void stop(int sig) {
run = 0;
}



/**
* @returns 1 if all bytes are printable, else 0.
*/
static int is_printable(const char *buf, size_t size) {
size_t i;

for (i = 0; i < size; i++)
if (!isprint((int)buf[i]))
return 0;

return 1;
}


int main(int argc, char **argv) {
rd_kafka_share_t *rkshare; /* Consumer instance handle */
rd_kafka_conf_t *conf; /* Temporary configuration object */
rd_kafka_resp_err_t err; /* librdkafka API error code */
char errstr[512]; /* librdkafka API error reporting buffer */
const char *brokers; /* Argument: broker list */
const char *groupid; /* Argument: Consumer group id */
char **topics; /* Argument: list of topics to subscribe to */
int topic_cnt; /* Number of topics to subscribe to */
rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */
int i;

/*
* Argument validation
*/
if (argc < 3) {
fprintf(stderr,
ANSI_RED "%% Usage: "
"%s <broker> <group.id> <topic1> "
"<topic2>..\n" ANSI_RESET,
argv[0]);
return 1;
}

brokers = argv[1];
groupid = argv[2];
topics = &argv[3];
topic_cnt = argc - 3;


/*
* Create Kafka client configuration place-holder
*/
conf = rd_kafka_conf_new();

/* Set bootstrap broker(s) as a comma-separated list of
* host or host:port (default port 9092).
* librdkafka will use the bootstrap brokers to acquire the full
* set of brokers from the cluster. */
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, ANSI_RED "%s\n" ANSI_RESET, errstr);
rd_kafka_conf_destroy(conf);
return 1;
}

if (rd_kafka_conf_set(conf, "debug",
"protocol",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, ANSI_RED "%s\n" ANSI_RESET, errstr);
rd_kafka_conf_destroy(conf);
return 1;
}

/* Set the consumer group id.
* All consumers sharing the same group id will join the same
* group, and the subscribed topic' partitions will be assigned
* according to the partition.assignment.strategy
* (consumer config property) to the consumers in the group. */
if (rd_kafka_conf_set(conf, "group.id", groupid, errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, ANSI_RED "%s\n" ANSI_RESET, errstr);
rd_kafka_conf_destroy(conf);
return 1;
}

/*
* Create a new share consumer instance.
*
* NOTE: rd_kafka_share_consumer_new() takes ownership of the conf
* object and the application must not reference it again after this
* call.
*/
rkshare = rd_kafka_share_consumer_new(conf, errstr, sizeof(errstr));
if (!rkshare) {
fprintf(stderr,
ANSI_RED "%% Failed to create new share consumer: "
"%s\n" ANSI_RESET,
errstr);
return 1;
}

conf = NULL; /* Configuration object is now owned, and freed,
* by the rd_kafka_t instance. */


/* Convert the list of topics to a format suitable for librdkafka */
subscription = rd_kafka_topic_partition_list_new(topic_cnt);
for (i = 0; i < topic_cnt; i++)
rd_kafka_topic_partition_list_add(subscription, topics[i],
/* the partition is ignored
* by subscribe() */
RD_KAFKA_PARTITION_UA);

/* Subscribe to the list of topics */
err = rd_kafka_share_subscribe(rkshare, subscription);
if (err) {
fprintf(stderr,
ANSI_RED "%% Failed to subscribe to %d topics: "
"%s\n" ANSI_RESET,
subscription->cnt, rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(subscription);
rd_kafka_share_destroy(rkshare);
return 1;
}

fprintf(stderr,
ANSI_YELLOW "%% Subscribed to %d topic(s), "
"waiting for rebalance and messages...\n" ANSI_RESET,
subscription->cnt);

rd_kafka_topic_partition_list_destroy(subscription);


/* Signal handler for clean shutdown */
signal(SIGINT, stop);

/* Subscribing to topics will trigger a group rebalance
* which may take some time to finish, but there is no need
* for the application to handle this idle period in a special way
* since a rebalance may happen at any time.
* Start polling for messages. */

rd_kafka_message_t *rkmessages[10001];
while (run) {
rd_kafka_message_t *rkm = NULL;
size_t rcvd_msgs = 0;
int i;
rd_kafka_error_t *error;
double __elapsed_ms;

TIME_BLOCK_MS(__elapsed_ms,
error = rd_kafka_share_consume_batch(
rkshare, 3000, rkmessages, &rcvd_msgs));
fprintf(stdout,
ANSI_GREEN "%% rd_kafka_share_consume_batch() took "
"%.3f ms\n" ANSI_RESET,
__elapsed_ms);

if (error) {
fprintf(stdout,
ANSI_RED "%% Consume error: %s\n" ANSI_RESET,
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
continue;
}

fprintf(stdout,
ANSI_GREEN "%% Received %zu messages\n" ANSI_RESET,
rcvd_msgs);
for (i = 0; i < (int)rcvd_msgs; i++) {
rkm = rkmessages[i];

if (rkm->err) {
fprintf(stdout,
ANSI_RED "%% Consumer error: %d: "
"%s\n" ANSI_RESET,
rkm->err, rd_kafka_message_errstr(rkm));
rd_kafka_message_destroy(rkm);
continue;
}

/* Proper message. */
printf(ANSI_CYAN "Message received on %s [%" PRId32
"] at offset %" PRId64 ANSI_RESET,
rd_kafka_topic_name(rkm->rkt), rkm->partition,
rkm->offset);

/* Print the message key. */
if (rkm->key && is_printable(rkm->key, rkm->key_len))
printf(ANSI_CYAN " Key: %.*s\n" ANSI_RESET,
(int)rkm->key_len,
(const char *)rkm->key);
else if (rkm->key)
printf(ANSI_CYAN " Key: (%d bytes)\n" ANSI_RESET,
(int)rkm->key_len);

/* Print the message value/payload. */
if (rkm->payload &&
is_printable(rkm->payload, rkm->len))
printf(ANSI_CYAN " - Value: %.*s\n" ANSI_RESET,
(int)rkm->len,
(const char *)rkm->payload);
else if (rkm->payload)
printf(ANSI_CYAN " - Value: (%d bytes)\n" ANSI_RESET,
(int)rkm->len);

rd_kafka_message_destroy(rkm);
}
}


/* Close the consumer: commit final offsets and leave the group. */
fprintf(stderr, ANSI_YELLOW "%% Closing share consumer\n" ANSI_RESET);
rd_kafka_share_consumer_close(rkshare);


/* Destroy the consumer */
rd_kafka_share_destroy(rkshare);

return 0;
}
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ set(
rdkafka_offset.c
rdkafka_op.c
rdkafka_partition.c
rdkafka_share_acknowledgement.c
rdkafka_pattern.c
rdkafka_queue.c
rdkafka_range_assignor.c
Expand All @@ -51,6 +52,7 @@ set(
rdkafka_mock.c
rdkafka_mock_handlers.c
rdkafka_mock_cgrp.c
rdkafka_mock_sharegrp.c
rdkafka_error.c
rdkafka_fetcher.c
rdkafka_telemetry.c
Expand Down
7 changes: 4 additions & 3 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_conf.c rdkafka_timer.c rdkafka_offset.c \
rdkafka_transport.c rdkafka_buf.c rdkafka_queue.c rdkafka_op.c \
rdkafka_request.c rdkafka_cgrp.c rdkafka_pattern.c \
rdkafka_partition.c rdkafka_subscription.c \
rdkafka_partition.c rdkafka_share_acknowledgement.c \
rdkafka_subscription.c \
rdkafka_assignment.c \
rdkafka_assignor.c rdkafka_range_assignor.c \
rdkafka_roundrobin_assignor.c rdkafka_sticky_assignor.c \
Expand All @@ -56,8 +57,8 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_header.c rdkafka_admin.c rdkafka_aux.c \
rdkafka_background.c rdkafka_idempotence.c rdkafka_cert.c \
rdkafka_txnmgr.c rdkafka_coord.c rdbase64.c \
rdvarint.c rdbuf.c rdmap.c rdunittest.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c \
rdvarint.c rdbuf.c rdmap.c rdunittest.c rdunittest_fetcher.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c rdkafka_mock_sharegrp.c \
rdkafka_error.c rdkafka_fetcher.c rdkafka_telemetry.c \
rdkafka_telemetry_encode.c rdkafka_telemetry_decode.c \
nanopb/pb_encode.c nanopb/pb_decode.c nanopb/pb_common.c \
Expand Down
3 changes: 1 addition & 2 deletions src/rdhdrhistogram.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,7 @@ typedef struct rd_hdr_iter_s {
int64_t highestEquivalentValue;
} rd_hdr_iter_t;

#define RD_HDR_ITER_INIT(hdr) \
{ .hdr = hdr, .subBucketIdx = -1 }
#define RD_HDR_ITER_INIT(hdr) {.hdr = hdr, .subBucketIdx = -1}

static int rd_hdr_iter_next(rd_hdr_iter_t *it) {
const rd_hdr_histogram_t *hdr = it->hdr;
Expand Down
Loading