Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ enable.partition.eof | C | true, false | false
check.crcs | C | true, false | false | medium | Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage. <br>*Type: boolean*
client.rack | * | | | low | A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config `broker.rack`. <br>*Type: string*
max.poll.records | C | 1 .. 2147483647 | 500 | low | tba description, <br>*Type: integer*
share.acknowledgement.mode | C | implicit, explicit | implicit | low | Acknowledgement mode for share consumers. 'implicit' - messages are implicitly acknowledged when the next poll is called. 'explicit' - messages must be explicitly acknowledged using rd_kafka_share_acknowledge*() APIs. <br>*Type: enum value*
transactional.id | P | | | high | Enables the transactional producer. The transactional.id is used to identify the same transactional producer instance across process restarts. It allows the producer to guarantee that transactions corresponding to earlier instances of the same producer have been finalized prior to starting any new transactions, and that any zombie instances are fenced off. If no transactional.id is provided, then the producer is limited to idempotent delivery (if enable.idempotence is set). Requires broker version >= 0.11.0. <br>*Type: string*
transaction.timeout.ms | P | 1000 .. 2147483647 | 60000 | medium | The maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the broker, the init_transactions() call will fail with ERR_INVALID_TRANSACTION_TIMEOUT. The transaction timeout automatically adjusts `message.timeout.ms` and `socket.timeout.ms`, unless explicitly configured in which case they must not exceed the transaction timeout (`socket.timeout.ms` must be at least 100ms lower than `transaction.timeout.ms`). This is also the default timeout value if no timeout (-1) is supplied to the transactional API methods. <br>*Type: integer*
enable.idempotence | P | true, false | false | high | When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: `max.in.flight.requests.per.connection=5` (must be less than or equal to 5), `retries=INT32_MAX` (must be greater than 0), `acks=all`, `queuing.strategy=fifo`. Producer instantation will fail if user-supplied configuration is incompatible. <br>*Type: boolean*
Expand Down
7 changes: 6 additions & 1 deletion examples/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
EXAMPLES ?= rdkafka_example rdkafka_performance rdkafka_example_cpp \
rdkafka_complex_consumer_example rdkafka_complex_consumer_example_cpp \
kafkatest_verifiable_client \
producer consumer share_consumer idempotent_producer transactions \
producer consumer share_consumer share_consumer_commit_async \
idempotent_producer transactions \
delete_records \
openssl_engine_example_cpp \
list_consumer_groups \
Expand Down Expand Up @@ -58,6 +59,10 @@ share_consumer: ../src/librdkafka.a share_consumer.c
$(CC) $(CPPFLAGS) $(CFLAGS) [email protected] -o $@ $(LDFLAGS) \
../src/librdkafka.a $(LIBS)

share_consumer_commit_async: ../src/librdkafka.a share_consumer_commit_async.c
$(CC) $(CPPFLAGS) $(CFLAGS) [email protected] -o $@ $(LDFLAGS) \
../src/librdkafka.a $(LIBS)

idempotent_producer: ../src/librdkafka.a idempotent_producer.c
$(CC) $(CPPFLAGS) $(CFLAGS) [email protected] -o $@ $(LDFLAGS) \
../src/librdkafka.a $(LIBS)
Expand Down
261 changes: 261 additions & 0 deletions examples/share_consumer_commit_async.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2019-2022, Magnus Edenhill
* 2023, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

/**
* Share consumer example using rd_kafka_share_commit_async() to
* explicitly acknowledge and commit records between polls.
*
* Usage:
* share_consumer_commit_async <broker> <group.id> <topic1> [topic2 ...]
*
* This example demonstrates:
* - Consuming records with rd_kafka_share_consume_batch()
* - Explicitly acknowledging individual records with
* rd_kafka_share_acknowledge_type() using ACCEPT or RELEASE
* (RELEASE at ~50% rate to simulate redelivery)
* - Committing acknowledgements asynchronously with
* rd_kafka_share_commit_async() at ~10% rate mid-batch
*/

#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 199309L
#endif

#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <ctype.h>
#include <stdlib.h>
#include <time.h>

/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is builtin from within the librdkafka source tree and thus differs. */
#include "rdkafka.h"


static volatile sig_atomic_t run = 1;

/**
* @brief Signal termination of program
*/
static void stop(int sig) {
run = 0;
}


/**
* @returns 1 if all bytes are printable, else 0.
*/
static int is_printable(const char *buf, size_t size) {
size_t i;

for (i = 0; i < size; i++)
if (!isprint((int)buf[i]))
return 0;

return 1;
}


int main(int argc, char **argv) {
rd_kafka_share_t *rkshare;
rd_kafka_conf_t *conf;
rd_kafka_resp_err_t err;
char errstr[512];
const char *brokers;
const char *groupid;
char **topics;
int topic_cnt;
rd_kafka_topic_partition_list_t *subscription;
int i;

if (argc < 4) {
fprintf(stderr,
"%% Usage: "
"%s <broker> <group.id> <topic1> [topic2 ..]\n",
argv[0]);
return 1;
}

brokers = argv[1];
groupid = argv[2];
topics = &argv[3];
topic_cnt = argc - 3;

conf = rd_kafka_conf_new();

if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}

if (rd_kafka_conf_set(conf, "group.id", groupid, errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}

if (rd_kafka_conf_set(conf, "share.acknowledgement.mode", "explicit",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}

rkshare = rd_kafka_share_consumer_new(conf, errstr, sizeof(errstr));
if (!rkshare) {
fprintf(stderr, "%% Failed to create new share consumer: %s\n",
errstr);
return 1;
}

conf = NULL;

subscription = rd_kafka_topic_partition_list_new(topic_cnt);
for (i = 0; i < topic_cnt; i++)
rd_kafka_topic_partition_list_add(subscription, topics[i],
RD_KAFKA_PARTITION_UA);

err = rd_kafka_share_subscribe(rkshare, subscription);
if (err) {
fprintf(stderr, "%% Failed to subscribe to %d topics: %s\n",
subscription->cnt, rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(subscription);
rd_kafka_share_destroy(rkshare);
return 1;
}

fprintf(stderr,
"%% Subscribed to %d topic(s), "
"waiting for rebalance and messages...\n",
subscription->cnt);

rd_kafka_topic_partition_list_destroy(subscription);

signal(SIGINT, stop);

srand((unsigned int)time(NULL));

rd_kafka_message_t *rkmessages[10001];
while (run) {
size_t rcvd_msgs = 0;
rd_kafka_error_t *error;

printf("Calling rd_kafka_share_consume_batch()\n");
error = rd_kafka_share_consume_batch(rkshare, 3000, rkmessages,
&rcvd_msgs);

if (error) {
fprintf(stderr, "%% Consume error: %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
continue;
}

if (rcvd_msgs == 0)
continue;

printf("Received %zu messages\n", rcvd_msgs);

for (i = 0; i < (int)rcvd_msgs; i++) {
rd_kafka_message_t *rkm = rkmessages[i];

if (rkm->err) {
fprintf(stderr, "%% Consumer error: %d: %s\n",
rkm->err, rd_kafka_message_errstr(rkm));
rd_kafka_message_destroy(rkm);
continue;
}

/* Randomly RELEASE ~50% of messages,
* ACCEPT the rest. */
rd_kafka_share_ack_type_t ack_type =
((rand() % 100) < 50)
? RD_KAFKA_SHARE_ACK_TYPE_RELEASE
: RD_KAFKA_SHARE_ACK_TYPE_ACCEPT;

printf("Message on %s [%" PRId32 "] at offset %" PRId64
" -> %s",
rd_kafka_topic_name(rkm->rkt), rkm->partition,
rkm->offset,
ack_type == RD_KAFKA_SHARE_ACK_TYPE_ACCEPT
? "ACCEPT"
: "RELEASE");

if (rkm->key && is_printable(rkm->key, rkm->key_len))
printf(" Key: %.*s", (int)rkm->key_len,
(const char *)rkm->key);

if (rkm->payload &&
is_printable(rkm->payload, rkm->len))
printf(" Value: %.*s", (int)rkm->len,
(const char *)rkm->payload);

printf("\n");

err = rd_kafka_share_acknowledge_type(rkshare, rkm,
ack_type);
if (err)
fprintf(stderr,
"%% Acknowledge error for "
"%s [%" PRId32 "] @ %" PRId64 ": %s\n",
rd_kafka_topic_name(rkm->rkt),
rkm->partition, rkm->offset,
rd_kafka_err2str(err));

rd_kafka_message_destroy(rkm);

/* Randomly commit ~10% of the time to
* exercise async commit mid-batch. */
if (run && (rand() % 100) < 10) {
printf(
"Calling "
"rd_kafka_share_commit_async()\n");
error = rd_kafka_share_commit_async(rkshare);
if (error) {
fprintf(stderr,
"%% Commit async "
"error: %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
}
}
}
}

fprintf(stderr, "%% Closing share consumer\n");
rd_kafka_share_consumer_close(rkshare);

rd_kafka_share_destroy(rkshare);

return 0;
}
1 change: 1 addition & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_background.c rdkafka_idempotence.c rdkafka_cert.c \
rdkafka_txnmgr.c rdkafka_coord.c rdbase64.c \
rdvarint.c rdbuf.c rdmap.c rdunittest.c rdunittest_fetcher.c \
rdunittest_acknowledge.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c rdkafka_mock_sharegrp.c \
rdkafka_error.c rdkafka_fetcher.c rdkafka_telemetry.c \
rdkafka_telemetry_encode.c rdkafka_telemetry_decode.c \
Expand Down
Loading