diff --git a/CONFIGURATION.md b/CONFIGURATION.md index ddf795a9e2..00efc65589 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -155,6 +155,7 @@ enable.partition.eof | C | true, false | false check.crcs | C | true, false | false | medium | Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.
*Type: boolean* client.rack | * | | | low | A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config `broker.rack`.
*Type: string* max.poll.records | C | 1 .. 2147483647 | 500 | low | tba description,
*Type: integer* +share.acknowledgement.mode | C | implicit, explicit | implicit | low | Acknowledgement mode for share consumers. 'implicit' - messages are implicitly acknowledged when the next poll is called. 'explicit' - messages must be explicitly acknowledged using rd_kafka_share_acknowledge*() APIs.
*Type: enum value* transactional.id | P | | | high | Enables the transactional producer. The transactional.id is used to identify the same transactional producer instance across process restarts. It allows the producer to guarantee that transactions corresponding to earlier instances of the same producer have been finalized prior to starting any new transactions, and that any zombie instances are fenced off. If no transactional.id is provided, then the producer is limited to idempotent delivery (if enable.idempotence is set). Requires broker version >= 0.11.0.
*Type: string* transaction.timeout.ms | P | 1000 .. 2147483647 | 60000 | medium | The maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the broker, the init_transactions() call will fail with ERR_INVALID_TRANSACTION_TIMEOUT. The transaction timeout automatically adjusts `message.timeout.ms` and `socket.timeout.ms`, unless explicitly configured in which case they must not exceed the transaction timeout (`socket.timeout.ms` must be at least 100ms lower than `transaction.timeout.ms`). This is also the default timeout value if no timeout (-1) is supplied to the transactional API methods.
*Type: integer* enable.idempotence | P | true, false | false | high | When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: `max.in.flight.requests.per.connection=5` (must be less than or equal to 5), `retries=INT32_MAX` (must be greater than 0), `acks=all`, `queuing.strategy=fifo`. Producer instantation will fail if user-supplied configuration is incompatible.
*Type: boolean* diff --git a/examples/Makefile b/examples/Makefile index e9d64e30d4..adc16bba02 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -1,7 +1,8 @@ EXAMPLES ?= rdkafka_example rdkafka_performance rdkafka_example_cpp \ rdkafka_complex_consumer_example rdkafka_complex_consumer_example_cpp \ kafkatest_verifiable_client \ - producer consumer share_consumer idempotent_producer transactions \ + producer consumer share_consumer share_consumer_commit_async \ + idempotent_producer transactions \ delete_records \ openssl_engine_example_cpp \ list_consumer_groups \ @@ -58,6 +59,10 @@ share_consumer: ../src/librdkafka.a share_consumer.c $(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \ ../src/librdkafka.a $(LIBS) +share_consumer_commit_async: ../src/librdkafka.a share_consumer_commit_async.c + $(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \ + ../src/librdkafka.a $(LIBS) + idempotent_producer: ../src/librdkafka.a idempotent_producer.c $(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \ ../src/librdkafka.a $(LIBS) diff --git a/examples/share_consumer_commit_async.c b/examples/share_consumer_commit_async.c new file mode 100644 index 0000000000..3588047565 --- /dev/null +++ b/examples/share_consumer_commit_async.c @@ -0,0 +1,261 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2019-2022, Magnus Edenhill + * 2023, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Share consumer example using rd_kafka_share_commit_async() to + * explicitly acknowledge and commit records between polls. + * + * Usage: + * share_consumer_commit_async [topic2 ...] + * + * This example demonstrates: + * - Consuming records with rd_kafka_share_consume_batch() + * - Explicitly acknowledging individual records with + * rd_kafka_share_acknowledge_type() using ACCEPT or RELEASE + * (RELEASE at ~50% rate to simulate redelivery) + * - Committing acknowledgements asynchronously with + * rd_kafka_share_commit_async() at ~10% rate mid-batch + */ + +#ifndef _POSIX_C_SOURCE +#define _POSIX_C_SOURCE 199309L +#endif + +#include +#include +#include +#include +#include +#include + +/* Typical include path would be , but this program + * is builtin from within the librdkafka source tree and thus differs. */ +#include "rdkafka.h" + + +static volatile sig_atomic_t run = 1; + +/** + * @brief Signal termination of program + */ +static void stop(int sig) { + run = 0; +} + + +/** + * @returns 1 if all bytes are printable, else 0. + */ +static int is_printable(const char *buf, size_t size) { + size_t i; + + for (i = 0; i < size; i++) + if (!isprint((int)buf[i])) + return 0; + + return 1; +} + + +int main(int argc, char **argv) { + rd_kafka_share_t *rkshare; + rd_kafka_conf_t *conf; + rd_kafka_resp_err_t err; + char errstr[512]; + const char *brokers; + const char *groupid; + char **topics; + int topic_cnt; + rd_kafka_topic_partition_list_t *subscription; + int i; + + if (argc < 4) { + fprintf(stderr, + "%% Usage: " + "%s [topic2 ..]\n", + argv[0]); + return 1; + } + + brokers = argv[1]; + groupid = argv[2]; + topics = &argv[3]; + topic_cnt = argc - 3; + + conf = rd_kafka_conf_new(); + + if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + if (rd_kafka_conf_set(conf, "group.id", groupid, errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + if (rd_kafka_conf_set(conf, "share.acknowledgement.mode", "explicit", + errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + rkshare = rd_kafka_share_consumer_new(conf, errstr, sizeof(errstr)); + if (!rkshare) { + fprintf(stderr, "%% Failed to create new share consumer: %s\n", + errstr); + return 1; + } + + conf = NULL; + + subscription = rd_kafka_topic_partition_list_new(topic_cnt); + for (i = 0; i < topic_cnt; i++) + rd_kafka_topic_partition_list_add(subscription, topics[i], + RD_KAFKA_PARTITION_UA); + + err = rd_kafka_share_subscribe(rkshare, subscription); + if (err) { + fprintf(stderr, "%% Failed to subscribe to %d topics: %s\n", + subscription->cnt, rd_kafka_err2str(err)); + rd_kafka_topic_partition_list_destroy(subscription); + rd_kafka_share_destroy(rkshare); + return 1; + } + + fprintf(stderr, + "%% Subscribed to %d topic(s), " + "waiting for rebalance and messages...\n", + subscription->cnt); + + rd_kafka_topic_partition_list_destroy(subscription); + + signal(SIGINT, stop); + + srand((unsigned int)time(NULL)); + + rd_kafka_message_t *rkmessages[10001]; + while (run) { + size_t rcvd_msgs = 0; + rd_kafka_error_t *error; + + printf("Calling rd_kafka_share_consume_batch()\n"); + error = rd_kafka_share_consume_batch(rkshare, 3000, rkmessages, + &rcvd_msgs); + + if (error) { + fprintf(stderr, "%% Consume error: %s\n", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + continue; + } + + if (rcvd_msgs == 0) + continue; + + printf("Received %zu messages\n", rcvd_msgs); + + for (i = 0; i < (int)rcvd_msgs; i++) { + rd_kafka_message_t *rkm = rkmessages[i]; + + if (rkm->err) { + fprintf(stderr, "%% Consumer error: %d: %s\n", + rkm->err, rd_kafka_message_errstr(rkm)); + rd_kafka_message_destroy(rkm); + continue; + } + + /* Randomly RELEASE ~50% of messages, + * ACCEPT the rest. */ + rd_kafka_share_ack_type_t ack_type = + ((rand() % 100) < 50) + ? RD_KAFKA_SHARE_ACK_TYPE_RELEASE + : RD_KAFKA_SHARE_ACK_TYPE_ACCEPT; + + printf("Message on %s [%" PRId32 "] at offset %" PRId64 + " -> %s", + rd_kafka_topic_name(rkm->rkt), rkm->partition, + rkm->offset, + ack_type == RD_KAFKA_SHARE_ACK_TYPE_ACCEPT + ? "ACCEPT" + : "RELEASE"); + + if (rkm->key && is_printable(rkm->key, rkm->key_len)) + printf(" Key: %.*s", (int)rkm->key_len, + (const char *)rkm->key); + + if (rkm->payload && + is_printable(rkm->payload, rkm->len)) + printf(" Value: %.*s", (int)rkm->len, + (const char *)rkm->payload); + + printf("\n"); + + err = rd_kafka_share_acknowledge_type(rkshare, rkm, + ack_type); + if (err) + fprintf(stderr, + "%% Acknowledge error for " + "%s [%" PRId32 "] @ %" PRId64 ": %s\n", + rd_kafka_topic_name(rkm->rkt), + rkm->partition, rkm->offset, + rd_kafka_err2str(err)); + + rd_kafka_message_destroy(rkm); + + /* Randomly commit ~10% of the time to + * exercise async commit mid-batch. */ + if (run && (rand() % 100) < 10) { + printf( + "Calling " + "rd_kafka_share_commit_async()\n"); + error = rd_kafka_share_commit_async(rkshare); + if (error) { + fprintf(stderr, + "%% Commit async " + "error: %s\n", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + } + } + } + } + + fprintf(stderr, "%% Closing share consumer\n"); + rd_kafka_share_consumer_close(rkshare); + + rd_kafka_share_destroy(rkshare); + + return 0; +} diff --git a/src/Makefile b/src/Makefile index 4de1b48359..ffd253f3b9 100644 --- a/src/Makefile +++ b/src/Makefile @@ -58,6 +58,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \ rdkafka_background.c rdkafka_idempotence.c rdkafka_cert.c \ rdkafka_txnmgr.c rdkafka_coord.c rdbase64.c \ rdvarint.c rdbuf.c rdmap.c rdunittest.c rdunittest_fetcher.c \ + rdunittest_acknowledge.c \ rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c rdkafka_mock_sharegrp.c \ rdkafka_error.c rdkafka_fetcher.c rdkafka_telemetry.c \ rdkafka_telemetry_encode.c rdkafka_telemetry_decode.c \ diff --git a/src/rdkafka.c b/src/rdkafka.c index be4edfd1cd..68e9aa564c 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -57,6 +57,7 @@ #include "rdkafka_interceptor.h" #include "rdkafka_idempotence.h" #include "rdkafka_sasl_oauthbearer.h" +#include "rdkafka_share_acknowledgement.h" #if WITH_OAUTHBEARER_OIDC #include "rdkafka_sasl_oauthbearer_oidc.h" #endif @@ -738,15 +739,51 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = { "the consumer group"), _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH, "Broker: The member epoch is stale"), + _ERR_DESC(RD_KAFKA_RESP_ERR_MISMATCHED_ENDPOINT_TYPE, + "Broker: The request was sent to an endpoint of the wrong type"), + _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_ENDPOINT_TYPE, + "Broker: This endpoint type is not supported yet"), + _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_CONTROLLER_ID, + "Broker: This controller ID is not known"), _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_SUBSCRIPTION_ID, "Broker: Client sent a push telemetry request with an invalid or " "outdated subscription ID"), _ERR_DESC(RD_KAFKA_RESP_ERR_TELEMETRY_TOO_LARGE, "Broker: Client sent a push telemetry request larger than the " "maximum size the broker will accept"), + _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REGISTRATION, + "Broker: The controller has considered the broker registration " + "to be invalid"), + _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTION_ABORTABLE, + "Broker: The server encountered an error with the transaction"), + _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_RECORD_STATE, + "Broker: The record state is invalid"), + _ERR_DESC(RD_KAFKA_RESP_ERR_SHARE_SESSION_NOT_FOUND, + "Broker: The share session was not found"), + _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_SHARE_SESSION_EPOCH, + "Broker: The share session epoch is invalid"), + _ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_STATE_EPOCH, + "Broker: The share-group state epoch did not match"), + _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_VOTER_KEY, + "Broker: The voter key doesn't match the receiving replica's " + "key"), + _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_VOTER, + "Broker: The voter is already part of the set of voters"), + _ERR_DESC(RD_KAFKA_RESP_ERR_VOTER_NOT_FOUND, + "Broker: The voter is not part of the set of voters"), + _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REGULAR_EXPRESSION, + "Broker: The regular expression is not valid"), _ERR_DESC(RD_KAFKA_RESP_ERR_REBOOTSTRAP_REQUIRED, "Broker: Client metadata is stale, " "client should rebootstrap to obtain new metadata"), + _ERR_DESC(RD_KAFKA_RESP_ERR_STREAMS_INVALID_TOPOLOGY, + "Broker: The supplied topology is invalid"), + _ERR_DESC(RD_KAFKA_RESP_ERR_STREAMS_INVALID_TOPOLOGY_EPOCH, + "Broker: The supplied topology epoch is invalid"), + _ERR_DESC(RD_KAFKA_RESP_ERR_STREAMS_TOPOLOGY_FENCED, + "Broker: The supplied topology epoch is outdated"), + _ERR_DESC(RD_KAFKA_RESP_ERR_SHARE_SESSION_LIMIT_REACHED, + "Broker: The limit of share sessions has been reached"), _ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)}; @@ -3001,10 +3038,9 @@ rd_kafka_share_t *rd_kafka_share_consumer_new(rd_kafka_conf_t *conf, /* Set backpointer from rk to rkshare for access in retry handlers */ rk->rk_rkshare = rkshare; - /* Inflight acks map keyed by topic_id + partition */ + /* Inflight acks map keyed by topic name + partition */ RD_MAP_INIT(&rkshare->rkshare_inflight_acks, 16, - rd_kafka_topic_partition_by_id_cmp, - rd_kafka_topic_partition_hash_by_id, + rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash, rd_kafka_topic_partition_destroy_free, rd_kafka_share_ack_batches_destroy_free); @@ -3053,8 +3089,11 @@ static rd_kafka_broker_t *rd_kafka_share_select_broker(rd_kafka_t *rk, /* Criteria to choose a broker: * 1. It should be the leader of a partition. - * 2. A share-fetch op must not already be enqueued on it. */ - if (rktp->rktp_leader) { + * 2. A share-fetch op must not already be enqueued on it. + * 3. The broker or instance must not be terminating. */ + if (rktp->rktp_leader && + !rd_kafka_broker_or_instance_terminating( + rktp->rktp_leader)) { /* TODO: We're only going to access * rkb_share_fetch_enqueued from the main thread, except * when it's being calloc'd and destroyed. Is it safe to @@ -3162,10 +3201,11 @@ rd_kafka_op_res_t rd_kafka_share_fetch_reply_op(rd_kafka_t *rk, rkcg->rkcg_share.share_should_fetch_ops_in_flight_cnt--; /* - * Step 1: If records were fetched, reset the global fetch guard - * so the next FANOUT can select a new fetch broker. + * Step 1: If records were fetched and broker is not terminating, + * reset the global fetch guard so the next FANOUT can select + * a new fetch broker. */ - if (records_fetched) + if (records_fetched && rko_orig->rko_err != RD_KAFKA_RESP_ERR__DESTROY) rkcg->rkcg_share.share_fetch_more_records = rd_false; /* @@ -3180,7 +3220,8 @@ rd_kafka_op_res_t rd_kafka_share_fetch_reply_op(rd_kafka_t *rk, * assignments are empty) and reset fetch_more_records. */ if (should_fetch && !records_fetched && - rkcg->rkcg_share.share_fetch_more_records) { + rkcg->rkcg_share.share_fetch_more_records && + !rd_kafka_terminating(rk)) { rd_kafka_broker_t *selected_rkb; selected_rkb = rd_kafka_share_select_broker(rk, rkcg); @@ -3206,7 +3247,8 @@ rd_kafka_op_res_t rd_kafka_share_fetch_reply_op(rd_kafka_t *rk, * send an ack-only SHARE_FETCH op to it. */ if (reply_rkb->rkb_share_async_ack_details && - !reply_rkb->rkb_share_fetch_enqueued) { + !reply_rkb->rkb_share_fetch_enqueued && + !rd_kafka_broker_or_instance_terminating(reply_rkb)) { rd_kafka_dbg(rk, CGRP, "SHARE", "Enqueuing ack-only SHARE_FETCH to broker %s " "to flush pending acks", @@ -3273,6 +3315,9 @@ rd_kafka_share_find_ack_batch(rd_list_t *ack_list, int i; RD_LIST_FOREACH(existing, ack_list, i) { + /* TODO KIP-932: We might need to use leader id and epoch + * as well to understand about the stale topic partition + * information */ if (rd_kafka_topic_partition_by_id_cmp(existing->rktpar, rktpar) == 0) return existing; @@ -3589,9 +3634,19 @@ rd_kafka_error_t *rd_kafka_share_consume_batch( rd_kafka_q_serve(rk->rk_rep, RD_POLL_NOWAIT, 0, RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL); - /* Implicit ack: acknowledge all ACQUIRED records from previous - * poll as ACCEPT, then extract ack_details for sending. */ - rd_kafka_share_ack_all(rk->rk_rkshare); + /* In implicit ack mode, convert all ACQUIRED records to ACCEPT + * before extracting ack details. In explicit mode, the app has + * already acknowledged records via the acknowledge APIs, so + * only extract what's been explicitly acknowledged. */ + if (!rk->rk_conf.share.explicit_acks) + rd_kafka_share_ack_all(rkshare); + else if (rkshare->rkshare_unacked_cnt > 0) + return rd_kafka_error_new( + RD_KAFKA_RESP_ERR__STATE, + "%" PRId64 + " records from previous poll have not " + "been acknowledged", + rkshare->rkshare_unacked_cnt); rd_list_t *ack_batches = rd_kafka_share_build_ack_details(rk->rk_rkshare); @@ -3618,6 +3673,160 @@ rd_kafka_error_t *rd_kafka_share_consume_batch( return error; } +rd_kafka_resp_err_t +rd_kafka_share_acknowledge(rd_kafka_share_t *rkshare, + const rd_kafka_message_t *rkmessage) { + return rd_kafka_share_acknowledge_type(rkshare, rkmessage, + RD_KAFKA_SHARE_ACK_TYPE_ACCEPT); +} + +rd_kafka_resp_err_t +rd_kafka_share_acknowledge_type(rd_kafka_share_t *rkshare, + const rd_kafka_message_t *rkmessage, + rd_kafka_share_ack_type_t type) { + if (!rkshare || !rkmessage) + return RD_KAFKA_RESP_ERR__INVALID_ARG; + + /* Explicit acknowledge APIs require explicit acknowledgement mode */ + if (!rkshare->rkshare_rk->rk_conf.share.explicit_acks) + return RD_KAFKA_RESP_ERR__INVALID_ARG; + + /* Validate type - only ACCEPT, REJECT, RELEASE allowed */ + if (type < RD_KAFKA_SHARE_ACK_TYPE_ACCEPT || + type > RD_KAFKA_SHARE_ACK_TYPE_REJECT) + return RD_KAFKA_RESP_ERR__INVALID_ARG; + + return rd_kafka_share_inflight_ack_update( + rkshare, rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, + rkmessage->offset, + (rd_kafka_share_internal_acknowledgement_type)type); +} + +rd_kafka_resp_err_t +rd_kafka_share_acknowledge_offset(rd_kafka_share_t *rkshare, + const char *topic, + int32_t partition, + int64_t offset, + rd_kafka_share_ack_type_t type) { + if (!rkshare || !topic) + return RD_KAFKA_RESP_ERR__INVALID_ARG; + + /* Explicit acknowledge APIs require explicit acknowledgement mode */ + if (!rkshare->rkshare_rk->rk_conf.share.explicit_acks) + return RD_KAFKA_RESP_ERR__INVALID_ARG; + + /* Validate type - ACCEPT, RELEASE, REJECT allowed */ + if (type < RD_KAFKA_SHARE_ACK_TYPE_ACCEPT || + type > RD_KAFKA_SHARE_ACK_TYPE_REJECT) + return RD_KAFKA_RESP_ERR__INVALID_ARG; + + return rd_kafka_share_inflight_ack_update( + rkshare, topic, partition, offset, + (rd_kafka_share_internal_acknowledgement_type)type); +} + +/** + * @brief Main thread handler for SHARE_COMMIT_ASYNC_FANOUT op. + * + * Segregates acks by partition leader and sends ack-only + * SHARE_FETCH ops to the respective brokers. + * + * @locality main thread + */ +rd_kafka_op_res_t rd_kafka_share_commit_async_fanout_op(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_broker_t *rkb; + rd_bool_t has_acks = + (rko->rko_u.share_commit_async_fanout.ack_batches && + rd_list_cnt(rko->rko_u.share_commit_async_fanout.ack_batches) > 0); + + if (!has_acks) { + rd_kafka_dbg(rk, CGRP, "SHARE", + "No acks to commit in " + "SHARE_COMMIT_ASYNC_FANOUT"); + return RD_KAFKA_OP_RES_HANDLED; + } + + /* Step 1: Segregate acks by partition leader. */ + rd_kafka_share_segregate_acks_by_leader( + rk, rko->rko_u.share_commit_async_fanout.ack_batches); + + rd_list_destroy(rko->rko_u.share_commit_async_fanout.ack_batches); + rko->rko_u.share_commit_async_fanout.ack_batches = NULL; + + /* Step 2: Send ack-only SHARE_FETCH ops to brokers + * that have pending acks. */ + rd_kafka_rdlock(rk); + TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { + if (rd_kafka_broker_or_instance_terminating(rkb) || + RD_KAFKA_BROKER_IS_LOGICAL(rkb)) + continue; + + if (!rkb->rkb_share_async_ack_details) + continue; + + /* If a request is already in-flight on this broker, + * acks stay cached in rkb_share_async_ack_details + * and will be sent with the next request. */ + if (rkb->rkb_share_fetch_enqueued) + continue; + + rd_kafka_share_enqueue_fetch_op(rk, rkb, + rd_false /* ack-only */); + } + rd_kafka_rdunlock(rk); + + return RD_KAFKA_OP_RES_HANDLED; +} + +/** + * @brief Asynchronously commit all pending acknowledgements. + * + * Builds ack details from the inflight map and enqueues a + * SHARE_COMMIT_ASYNC_FANOUT op on the main thread to send + * them to brokers. Does not fetch new records. + * + * In implicit ack mode, all ACQUIRED records are first converted + * to ACCEPT before building the ack details. + * + * @param rkshare Share consumer instance. + * @returns NULL on success, or an rd_kafka_error_t* on failure. + * + * @locality app thread + */ +rd_kafka_error_t *rd_kafka_share_commit_async(rd_kafka_share_t *rkshare) { + rd_kafka_t *rk = rkshare->rkshare_rk; + rd_kafka_op_t *rko; + rd_list_t *ack_batches; + + /* Drain rk_rep for all pending callbacks (non-blocking) */ + rd_kafka_q_serve(rk->rk_rep, RD_POLL_NOWAIT, 0, RD_KAFKA_Q_CB_CALLBACK, + rd_kafka_poll_cb, NULL); + + /* In implicit ack mode, convert all ACQUIRED records to ACCEPT + * before extracting ack details. In explicit mode, the app has + * already acknowledged records via the acknowledge APIs, so + * only extract what's been explicitly acknowledged. */ + if (!rk->rk_conf.share.explicit_acks) + rd_kafka_share_ack_all(rk->rk_rkshare); + ack_batches = rd_kafka_share_build_ack_details(rk->rk_rkshare); + + if (!ack_batches) { + rd_kafka_dbg(rk, CGRP, "SHARE", + "No pending acknowledgements to commit"); + return NULL; + } + + rko = rd_kafka_op_new_cb(rk, RD_KAFKA_OP_SHARE_COMMIT_ASYNC_FANOUT, + rd_kafka_share_commit_async_fanout_op); + rko->rko_u.share_commit_async_fanout.ack_batches = ack_batches; + + rd_kafka_q_enq(rk->rk_ops, rko); + + return NULL; +} + /** * Schedules a rebootstrap of the cluster immediately. * diff --git a/src/rdkafka.h b/src/rdkafka.h index 08f1764c43..bc6c321571 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -651,15 +651,54 @@ typedef enum { RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR = 112, /** The member epoch is stale */ RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH = 113, + /** The request was sent to an endpoint of the wrong type. */ + RD_KAFKA_RESP_ERR_MISMATCHED_ENDPOINT_TYPE = 114, + /** This endpoint type is not supported yet. */ + RD_KAFKA_RESP_ERR_UNSUPPORTED_ENDPOINT_TYPE = 115, + /** This controller ID is not known. */ + RD_KAFKA_RESP_ERR_UNKNOWN_CONTROLLER_ID = 116, /** Client sent a push telemetry request with an invalid or outdated * subscription ID. */ RD_KAFKA_RESP_ERR_UNKNOWN_SUBSCRIPTION_ID = 117, /** Client sent a push telemetry request larger than the maximum size * the broker will accept. */ RD_KAFKA_RESP_ERR_TELEMETRY_TOO_LARGE = 118, + /** The controller has considered the broker registration + * to be invalid. */ + RD_KAFKA_RESP_ERR_INVALID_REGISTRATION = 119, + /** The server encountered an error with the transaction. + * The client can abort the transaction to continue using + * this transactional ID. */ + RD_KAFKA_RESP_ERR_TRANSACTION_ABORTABLE = 120, + /** The record state is invalid. The acknowledgement of delivery + * could not be completed. */ + RD_KAFKA_RESP_ERR_INVALID_RECORD_STATE = 121, + /** The share session was not found. */ + RD_KAFKA_RESP_ERR_SHARE_SESSION_NOT_FOUND = 122, + /** The share session epoch is invalid. */ + RD_KAFKA_RESP_ERR_INVALID_SHARE_SESSION_EPOCH = 123, + /** The share coordinator rejected the request because the + * share-group state epoch did not match. */ + RD_KAFKA_RESP_ERR_FENCED_STATE_EPOCH = 124, + /** The voter key doesn't match the receiving replica's key. */ + RD_KAFKA_RESP_ERR_INVALID_VOTER_KEY = 125, + /** The voter is already part of the set of voters. */ + RD_KAFKA_RESP_ERR_DUPLICATE_VOTER = 126, + /** The voter is not part of the set of voters. */ + RD_KAFKA_RESP_ERR_VOTER_NOT_FOUND = 127, + /** The regular expression is not valid. */ + RD_KAFKA_RESP_ERR_INVALID_REGULAR_EXPRESSION = 128, /** Client metadata is stale, * client should rebootstrap to obtain new metadata. */ RD_KAFKA_RESP_ERR_REBOOTSTRAP_REQUIRED = 129, + /** The supplied topology is invalid. */ + RD_KAFKA_RESP_ERR_STREAMS_INVALID_TOPOLOGY = 130, + /** The supplied topology epoch is invalid. */ + RD_KAFKA_RESP_ERR_STREAMS_INVALID_TOPOLOGY_EPOCH = 131, + /** The supplied topology epoch is outdated. */ + RD_KAFKA_RESP_ERR_STREAMS_TOPOLOGY_FENCED = 132, + /** The limit of share sessions has been reached. */ + RD_KAFKA_RESP_ERR_SHARE_SESSION_LIMIT_REACHED = 133, RD_KAFKA_RESP_ERR_END_ALL, } rd_kafka_resp_err_t; @@ -3105,6 +3144,89 @@ rd_kafka_share_consume_batch(rd_kafka_share_t *rkshare, rd_kafka_message_t **rkmessages /* out */, size_t *rkmessages_size /* out */); +/** + * @enum rd_kafka_share_ack_type_t + * @brief Share consumer acknowledgement types (exposed to Public APIs). + */ +typedef enum rd_kafka_share_ack_type_s { + RD_KAFKA_SHARE_ACK_TYPE_ACCEPT = 1, /**< Accept the message */ + RD_KAFKA_SHARE_ACK_TYPE_RELEASE = 2, /**< Release the message for + * redelivery */ + RD_KAFKA_SHARE_ACK_TYPE_REJECT = 3 /**< Reject the message */ +} rd_kafka_share_ack_type_t; + +/** + * @brief Acknowledge a message with ACCEPT type. + * + * This is equivalent to calling rd_kafka_share_acknowledge_type() with + * RD_KAFKA_SHARE_INTERNAL_ACK_ACCEPT. + * + * @param rkshare Share consumer handle. + * @param rkmessage Message to acknowledge. + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, + * RD_KAFKA_RESP_ERR__INVALID_ARG if parameters are invalid, + * RD_KAFKA_RESP_ERR__STATE if message is not in ACQUIRED state. + */ +RD_EXPORT +rd_kafka_resp_err_t +rd_kafka_share_acknowledge(rd_kafka_share_t *rkshare, + const rd_kafka_message_t *rkmessage); + +/** + * @brief Acknowledge a message with specified acknowledgement type. + * + * @param rkshare Share consumer handle. + * @param rkmessage Message to acknowledge. + * @param type Acknowledgement type (ACCEPT, RELEASE, or REJECT). + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, + * RD_KAFKA_RESP_ERR__INVALID_ARG if parameters are invalid or type is + * invalid, RD_KAFKA_RESP_ERR__STATE if message is not in ACQUIRED state. + */ +RD_EXPORT +rd_kafka_resp_err_t +rd_kafka_share_acknowledge_type(rd_kafka_share_t *rkshare, + const rd_kafka_message_t *rkmessage, + rd_kafka_share_ack_type_t type); + +/** + * @brief Acknowledge an offset directly with specified acknowledgement type. + * + * @param rkshare Share consumer handle. + * @param topic Topic name. + * @param partition Partition id. + * @param offset Offset to acknowledge. + * @param type Acknowledgement type (ACCEPT, RELEASE, or REJECT). + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, + * RD_KAFKA_RESP_ERR__INVALID_ARG if parameters are invalid or type is + * invalid, RD_KAFKA_RESP_ERR__STATE if offset is a GAP offset. + */ +RD_EXPORT +rd_kafka_resp_err_t +rd_kafka_share_acknowledge_offset(rd_kafka_share_t *rkshare, + const char *topic, + int32_t partition, + int64_t offset, + rd_kafka_share_ack_type_t type); + +/** + * @brief Asynchronously commit all pending acknowledgements. + * + * Sends all pending acknowledgements (from rd_kafka_share_acknowledge* + * calls) to their respective brokers without fetching new records. + * This function returns immediately — the acknowledgements are sent + * asynchronously via the broker threads. + * + * @param rkshare Share consumer instance. + * + * @returns NULL on success, or an rd_kafka_error_t* on failure. + * The returned error must be freed with rd_kafka_error_destroy(). + */ +RD_EXPORT +rd_kafka_error_t *rd_kafka_share_commit_async(rd_kafka_share_t *rkshare); + /** * @brief Destroy Kafka handle. * diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index dd57d7ac47..b2ca07ee3a 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -3593,6 +3593,18 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) { "Ignoring SHARE_FETCH op: " "instance or broker is terminating"); rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR__DESTROY); + } else if (rkb->rkb_state != RD_KAFKA_BROKER_STATE_UP) { + /* TODO KIP-932: The main thread should check + * broker state before enqueuing SHARE_FETCH + * ops, or back off after receiving ERR__STATE + * replies, to avoid flooding the broker thread + * with ops that are immediately rejected. */ + rd_kafka_dbg( + rkb->rkb_rk, BROKER, "SHAREFETCH", + "Ignoring SHARE_FETCH op: " + "broker not up (state %s)", + rd_kafka_broker_state_names[rkb->rkb_state]); + rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR__STATE); } else if (rkb->rkb_fetching) { rd_kafka_dbg(rkb->rkb_rk, BROKER, "SHAREFETCH", "Ignoring SHARE_FETCH op: " diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index dd6f7a071b..6f367481e6 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1513,6 +1513,13 @@ static const struct rd_kafka_property rd_kafka_properties[] = { _RK(share.is_share_consumer), "tba description", 0, 1, 0}, {_RK_GLOBAL | _RK_CONSUMER, "max.poll.records", _RK_C_INT, _RK(share.max_poll_records), "tba description,", 1, INT_MAX, 500}, + {_RK_GLOBAL | _RK_CONSUMER, "share.acknowledgement.mode", _RK_C_S2I, + _RK(share.explicit_acks), + "Acknowledgement mode for share consumers. " + "'implicit' - messages are implicitly acknowledged when the next poll " + "is called. 'explicit' - messages must be explicitly acknowledged " + "using rd_kafka_share_acknowledge*() APIs.", + .vdef = 0, .s2i = {{0, "implicit"}, {1, "explicit"}}}, /* Global producer properties */ diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h index ea9e216efa..866f5cf80f 100644 --- a/src/rdkafka_conf.h +++ b/src/rdkafka_conf.h @@ -196,7 +196,7 @@ typedef enum { /* Increase in steps of 64 as needed. * This must be larger than sizeof(rd_kafka_[topic_]conf_t) */ -#define RD_KAFKA_CONF_PROPS_IDX_MAX (64 * 35) +#define RD_KAFKA_CONF_PROPS_IDX_MAX (64 * 36) /** * @struct rd_kafka_anyconf_t @@ -473,6 +473,9 @@ struct rd_kafka_conf_s { struct { int is_share_consumer; /**< Is this a share consumer? */ int max_poll_records; /**< Max records returned per poll */ + int explicit_acks; /**< Acknowledgement mode + * (share.acknowledgement.mode). 0 = + * "implicit" (default), 1 = "explicit" */ } share; /* diff --git a/src/rdkafka_fetcher.c b/src/rdkafka_fetcher.c index 331fc3b6e3..941782606c 100644 --- a/src/rdkafka_fetcher.c +++ b/src/rdkafka_fetcher.c @@ -1244,7 +1244,6 @@ static rd_kafka_resp_err_t rd_kafka_share_fetch_reply_handle_partition( } /* Initialize batches_out with NULL rktpar for unknown topic */ batches_out->rktpar = NULL; - rd_kafka_q_destroy(temp_fetchq); rd_kafka_buf_skip_tags(rkbuf); goto done; } @@ -1372,8 +1371,6 @@ static rd_kafka_resp_err_t rd_kafka_share_fetch_reply_handle_partition( rd_kafka_op_destroy(rko); } - rd_kafka_q_destroy_owner(temp_fetchq); - rd_kafka_buf_skip_tags(rkbuf); // Partition tags goto done; @@ -1382,6 +1379,7 @@ static rd_kafka_resp_err_t rd_kafka_share_fetch_reply_handle_partition( err = rkbuf->rkbuf_err; done: + RD_IF_FREE(temp_fetchq, rd_kafka_q_destroy_owner); if (rktp) rd_kafka_toppar_destroy(rktp); /* from toppar_get() */ return err; @@ -1422,8 +1420,14 @@ rd_kafka_share_fetch_reply_handle(rd_kafka_broker_t *rkb, if (ErrorCode) { rd_rkb_log(rkb, LOG_ERR, "SHAREFETCH", - "ShareFetch response error %d: '%.*s'", ErrorCode, + "ShareFetch response error %s: '%.*s'", + rd_kafka_err2name(ErrorCode), RD_KAFKAP_STR_PR(&ErrorStr)); + /* TODO KIP-932: Check if the response buffer still needs + * to be parsed in some error cases. For example, + * UNKNOWN_TOPIC_ID may require removing partitions from + * the session, and acknowledgements for that topic id + * should also be destroyed. */ return ErrorCode; } @@ -1536,10 +1540,100 @@ rd_kafka_share_fetch_reply_handle(rd_kafka_broker_t *rkb, /** - * TODO KIP-932: Implement. + * @brief Reset the share fetch session for a broker. + * + * Called when the broker returns SHARE_SESSION_NOT_FOUND, + * INVALID_SHARE_SESSION_EPOCH, or SHARE_SESSION_LIMIT_REACHED, + * indicating the session is lost or cannot be created. + * Resets the epoch to 0 and moves all toppars_in_session back + * to toppars_to_add so the next request re-establishes the + * full session. + * + * Any acknowledgements present in \p rko_orig are destroyed since + * the broker did not process them (session was invalid). + * + * @param rkb Broker whose session is being reset. + * @param rko_orig The SHARE_FETCH op whose ack_details need cleanup. + * + * @locality broker thread */ -// static void rd_kafak_broker_session_reset(rd_kafka_broker_t *rkb) { -// } +static void rd_kafka_broker_session_reset(rd_kafka_broker_t *rkb, + rd_kafka_op_t *rko_orig) { + rd_kafka_toppar_t *rktp, *tmp_rktp; + + rd_rkb_dbg(rkb, FETCH, "SHAREFETCH", + "Resetting share session: epoch %" PRId32 + " -> 0, " + "moving %d toppars_in_session to toppars_to_add", + rkb->rkb_share_fetch_session.epoch, + rkb->rkb_share_fetch_session.toppars_in_session_cnt); + + rkb->rkb_share_fetch_session.epoch = 0; + + /* Remove toppars_to_forget from toppars_in_session — these + * were pending removal and should not be re-added to the + * new session. */ + if (rkb->rkb_share_fetch_session.toppars_to_forget) { + rd_kafka_toppar_t *forget_rktp; + int i; + + RD_LIST_FOREACH(forget_rktp, + rkb->rkb_share_fetch_session.toppars_to_forget, + i) { + TAILQ_FOREACH_SAFE( + rktp, + &rkb->rkb_share_fetch_session.toppars_in_session, + rktp_rkb_session_link, tmp_rktp) { + if (rktp == forget_rktp) { + TAILQ_REMOVE( + &rkb->rkb_share_fetch_session + .toppars_in_session, + rktp, rktp_rkb_session_link); + rkb->rkb_share_fetch_session + .toppars_in_session_cnt--; + rd_kafka_toppar_destroy( + rktp); /* from session list */ + break; + } + } + } + + rd_list_destroy(rkb->rkb_share_fetch_session.toppars_to_forget); + rkb->rkb_share_fetch_session.toppars_to_forget = NULL; + } + + /* Move remaining toppars_in_session to toppars_to_add so they + * get sent as new additions in the next request (epoch 0). */ + TAILQ_FOREACH_SAFE(rktp, + &rkb->rkb_share_fetch_session.toppars_in_session, + rktp_rkb_session_link, tmp_rktp) { + TAILQ_REMOVE(&rkb->rkb_share_fetch_session.toppars_in_session, + rktp, rktp_rkb_session_link); + rkb->rkb_share_fetch_session.toppars_in_session_cnt--; + + if (!rkb->rkb_share_fetch_session.toppars_to_add) + rkb->rkb_share_fetch_session.toppars_to_add = + rd_list_new(1, rd_kafka_toppar_destroy_free); + + if (!rd_list_find(rkb->rkb_share_fetch_session.toppars_to_add, + rktp, rd_list_cmp_ptr)) + rd_list_add(rkb->rkb_share_fetch_session.toppars_to_add, + rktp); /* transfer ref */ + else + rd_kafka_toppar_destroy(rktp); /* from session list */ + } + + /* Destroy acknowledgements from the failed request — the broker + * did not process them because the session was invalid. + * TODO KIP-932: When acknowledgement callbacks are implemented, + * report these acks as failed to the application via the + * callback instead of silently dropping them. */ + if (rko_orig->rko_u.share_fetch.ack_details) { + rd_list_destroy(rko_orig->rko_u.share_fetch.ack_details); + rko_orig->rko_u.share_fetch.ack_details = NULL; + } +} + static void rd_kafka_broker_session_update_epoch(rd_kafka_broker_t *rkb) { if (rkb->rkb_share_fetch_session.epoch == -1) { rd_kafka_dbg( @@ -1710,6 +1804,11 @@ static void rd_kafka_broker_share_fetch_reply(rd_kafka_t *rk, if (err == RD_KAFKA_RESP_ERR__DESTROY) { /* TODO KIP-932: Check what is needed out of the below */ rd_kafka_broker_session_update(rkb); + if (rko_orig->rko_u.share_fetch.ack_details) { + rd_list_destroy( + rko_orig->rko_u.share_fetch.ack_details); + rko_orig->rko_u.share_fetch.ack_details = NULL; + } rd_kafka_op_reply(rko_orig, err); return; /* Terminating */ } @@ -1723,44 +1822,55 @@ static void rd_kafka_broker_share_fetch_reply(rd_kafka_t *rk, rd_kafka_broker_session_update(rkb); - /* TODO: Check if this error handling is required here or in the main - * thread. */ if (unlikely(err)) { - char tmp[128]; - - rd_rkb_dbg(rkb, MSG, "FETCH", "Fetch reply: %s", + rd_rkb_dbg(rkb, FETCH, "SHAREFETCH", "Fetch reply: %s", rd_kafka_err2str(err)); switch (err) { + case RD_KAFKA_RESP_ERR_SHARE_SESSION_NOT_FOUND: + case RD_KAFKA_RESP_ERR_INVALID_SHARE_SESSION_EPOCH: + case RD_KAFKA_RESP_ERR_SHARE_SESSION_LIMIT_REACHED: + case RD_KAFKA_RESP_ERR__TRANSPORT: + case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT: + /* Session is invalid, lost, cannot be created, + * or connection/request failed. + * Reset session state so the next request + * re-establishes a new session (epoch 0). */ + rd_kafka_broker_session_reset(rkb, rko_orig); + break; + case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART: case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE: case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION: case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE: case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE: - case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID: + case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID: { + char tmp[128]; /* Request metadata information update */ rd_snprintf(tmp, sizeof(tmp), "FetchRequest failed: %s", rd_kafka_err2str(err)); rd_kafka_metadata_refresh_known_topics( rkb->rkb_rk, NULL, rd_true /*force*/, tmp); - /* FALLTHRU */ - - case RD_KAFKA_RESP_ERR__TRANSPORT: - case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT: - case RD_KAFKA_RESP_ERR__MSG_TIMED_OUT: - /* The fetch is already intervalled from - * consumer_serve() so dont retry. */ break; + } default: + /* TODO KIP-932: Fatal error handling. + * Propagate fatal errors to the application + * and do not retry. */ break; } - /* - * TODO KIP-932: Check if this needed or not. If yes, check the - * working. - */ - rd_kafka_broker_fetch_backoff(rkb, err); - /* FALLTHRU */ + /* There is no retry for ShareFetch RPC at the broker + * thread level. */ + } + + /* Destroy ack_details before replying — on success the acks + * have been sent to the broker, on error they are unprocessable. + * TODO KIP-932: When acknowledgement callbacks are implemented, + * report failed acks to the application via the callback. */ + if (rko_orig->rko_u.share_fetch.ack_details) { + rd_list_destroy(rko_orig->rko_u.share_fetch.ack_details); + rko_orig->rko_u.share_fetch.ack_details = NULL; } if (rko_orig) @@ -2335,6 +2445,16 @@ void rd_kafka_broker_share_fetch(rd_kafka_broker_t *rkb, int32_t max_records = 0; rd_list_t *ack_details = rko_orig->rko_u.share_fetch.ack_details; rd_bool_t has_ack_details = ack_details && rd_list_cnt(ack_details) > 0; + rd_list_t *toppars_to_add = NULL; + rd_list_t *toppars_to_forget = NULL; + + if (!rko_orig->rko_u.share_fetch.should_fetch && !has_ack_details) { + rd_kafka_dbg(rkb->rkb_rk, FETCH, "SHAREFETCH", + "Not sending Share Fetch Request: " + "no fetch requested and no acknowledgements"); + rd_kafka_op_reply(rko_orig, RD_KAFKA_RESP_ERR__NOOP); + return; + } if (!rkcg->rkcg_member_id) { rd_kafka_dbg(rkb->rkb_rk, FETCH, "SHAREFETCH", @@ -2344,33 +2464,20 @@ void rd_kafka_broker_share_fetch(rd_kafka_broker_t *rkb, } if (rko_orig->rko_u.share_fetch.should_fetch) { - max_records = rkb->rkb_rk->rk_conf.share.max_poll_records; + max_records = rkb->rkb_rk->rk_conf.share.max_poll_records; + toppars_to_add = rkb->rkb_share_fetch_session.toppars_to_add; + toppars_to_forget = + rkb->rkb_share_fetch_session.toppars_to_forget; } - if (rkb->rkb_share_fetch_session.toppars_to_add || - rkb->rkb_share_fetch_session.toppars_to_forget || - rko_orig->rko_u.share_fetch.should_fetch || has_ack_details) { - rd_kafka_dbg(rkb->rkb_rk, FETCH, "SHAREFETCH", - "Sending Share Fetch Request with%s%s%s%s", - has_ack_details ? " acknowledgements," : "", - rkb->rkb_share_fetch_session.toppars_to_add - ? " new topics," - : "", - rkb->rkb_share_fetch_session.toppars_to_forget - ? " forgotten toppars," - : "", - rko_orig->rko_u.share_fetch.should_fetch - ? " fetching messages" - : ""); - } else { - rd_kafka_dbg( - rkb->rkb_rk, FETCH, "SHAREFETCH", - "Not sending Share Fetch Request since there are no " - "new topics to add, acknowledgements, forgotten toppars" - " or messages to fetch"); - rd_kafka_op_reply(rko_orig, RD_KAFKA_RESP_ERR__NOOP); - return; - } + rd_kafka_dbg(rkb->rkb_rk, FETCH, "SHAREFETCH", + "Sending Share Fetch Request with%s%s%s%s", + has_ack_details ? " acknowledgements," : "", + toppars_to_add ? " new topics," : "", + toppars_to_forget ? " forgotten toppars," : "", + rko_orig->rko_u.share_fetch.should_fetch + ? " fetching messages" + : ""); rd_kafka_ShareFetchRequest( rkb, rkcg->rkcg_group_id, /* group_id */ @@ -2379,13 +2486,11 @@ void rd_kafka_broker_share_fetch(rd_kafka_broker_t *rkb, rkb->rkb_rk->rk_conf.fetch_wait_max_ms, rkb->rkb_rk->rk_conf.fetch_min_bytes, rkb->rkb_rk->rk_conf.fetch_max_bytes, max_records, - max_records, /* TODO KIP-932: Check if this is correct for batch - size or not */ - rkb->rkb_share_fetch_session - .toppars_to_add, /* toppars to add to session */ - rkb->rkb_share_fetch_session - .toppars_to_forget, /* forgetting toppars */ - rko_orig, /* rko (carries ack_details) */ + max_records, /* TODO KIP-932: Check if this is correct for batch + size or not */ + toppars_to_add, /* toppars to add to session */ + toppars_to_forget, /* forgetting toppars */ + rko_orig, /* rko (carries ack_details) */ now); } diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index 26406e8fc4..8f60de5bc0 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -131,6 +131,8 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) { [RD_KAFKA_OP_SHARE_SESSION_PARTITION_REMOVE] = "REPLY:SHARE_SESSION_PARTITION_REMOVE", [RD_KAFKA_OP_SHARE_FETCH_RESPONSE] = "REPLY:SHARE_FETCH_RESPONSE", + [RD_KAFKA_OP_SHARE_COMMIT_ASYNC_FANOUT] = + "REPLY:SHARE_COMMIT_ASYNC_FANOUT", }; if (type & RD_KAFKA_OP_REPLY) @@ -302,6 +304,8 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) { [RD_KAFKA_OP_SHARE_SESSION_PARTITION_REMOVE] = _RD_KAFKA_OP_EMPTY, [RD_KAFKA_OP_SHARE_FETCH_RESPONSE] = sizeof(rko->rko_u.share_fetch_response), + [RD_KAFKA_OP_SHARE_COMMIT_ASYNC_FANOUT] = + sizeof(rko->rko_u.share_commit_async_fanout), }; size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK]; @@ -534,6 +538,11 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { rd_list_destroy); break; + case RD_KAFKA_OP_SHARE_COMMIT_ASYNC_FANOUT: + RD_IF_FREE(rko->rko_u.share_commit_async_fanout.ack_batches, + rd_list_destroy); + break; + case RD_KAFKA_OP_SHARE_FETCH_RESPONSE: { if (rko->rko_u.share_fetch_response.message_rkos) { /* Messages not handed to app, destroy them. */ diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 02435597ba..f9ef7c841a 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -192,8 +192,11 @@ typedef enum { RD_KAFKA_OP_SHARE_FETCH, /**< broker op: Issue share fetch request if applicable. */ RD_KAFKA_OP_SHARE_FETCH_FANOUT, /**< fanout share fetch operation */ - RD_KAFKA_OP_SHARE_SESSION_PARTITION_ADD, /**< share session: - * add partition */ + RD_KAFKA_OP_SHARE_COMMIT_ASYNC_FANOUT, /**< fanout share commit async + * operation (ack-only, + * no fetch) */ + RD_KAFKA_OP_SHARE_SESSION_PARTITION_ADD, /**< share session: + * add partition */ RD_KAFKA_OP_SHARE_SESSION_PARTITION_REMOVE, /**< share session: * remove partition */ RD_KAFKA_OP_SHARE_FETCH_RESPONSE, /**< Share fetch response containing @@ -791,6 +794,16 @@ struct rd_kafka_op_s { rd_list_t *ack_batches; } share_fetch_fanout; + struct { + /** List of all acknowledgement batches to commit. + * Type: rd_kafka_share_ack_batches_t* + * Built from inflight ack map, will be segregated + * by leader and sent to respective brokers. + * Set to NULL after ownership is transferred + * to per-broker ack_details. */ + rd_list_t *ack_batches; + } share_commit_async_fanout; + /** * Share fetch response - single rko containing all messages * and partition ack info from one broker response. diff --git a/src/rdkafka_share_acknowledgement.c b/src/rdkafka_share_acknowledgement.c index 243269430e..285a2306da 100644 --- a/src/rdkafka_share_acknowledgement.c +++ b/src/rdkafka_share_acknowledgement.c @@ -1,8 +1,7 @@ /* * librdkafka - The Apache Kafka C/C++ library * - * Copyright (c) 2015-2022, Magnus Edenhill, - * 2026, Confluent Inc. + * Copyright (c) 2026, Confluent Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -53,7 +52,6 @@ void rd_kafka_share_ack_batch_entry_destroy( if (!entry) return; rd_free(entry->types); - rd_free(entry); } static void rd_kafka_share_ack_batch_entry_destroy_free(void *ptr) { @@ -156,9 +154,11 @@ void rd_kafka_share_build_inflight_acks_map(rd_kafka_share_t *rkshare, rd_dassert(batches->rktpar != NULL); - key = rd_kafka_topic_partition_new_with_topic_id( - rd_kafka_topic_partition_get_topic_id(batches->rktpar), - batches->rktpar->partition); + key = rd_kafka_topic_partition_new(batches->rktpar->topic, + batches->rktpar->partition); + rd_kafka_topic_partition_set_topic_id( + key, + rd_kafka_topic_partition_get_topic_id(batches->rktpar)); /* Each topic-partition is always a new entry (no overwrites). */ @@ -357,10 +357,12 @@ rd_list_t *rd_kafka_share_build_ack_details(rd_kafka_share_t *rkshare) { /* If no ACQUIRED offsets remain, mark for removal */ if (rd_list_cnt(&inflight_batches->entries) == 0) { rd_kafka_topic_partition_t *del_key = - rd_kafka_topic_partition_new_with_topic_id( - rd_kafka_topic_partition_get_topic_id( - inflight_batches->rktpar), + rd_kafka_topic_partition_new( + inflight_batches->rktpar->topic, inflight_batches->rktpar->partition); + rd_kafka_topic_partition_set_topic_id( + del_key, rd_kafka_topic_partition_get_topic_id( + inflight_batches->rktpar)); rd_list_add(&keys_to_delete, del_key); } @@ -386,3 +388,52 @@ rd_list_t *rd_kafka_share_build_ack_details(rd_kafka_share_t *rkshare) { return ack_details; } + +rd_kafka_resp_err_t rd_kafka_share_inflight_ack_update( + rd_kafka_share_t *rkshare, + const char *topic, + int32_t partition, + int64_t offset, + rd_kafka_share_internal_acknowledgement_type type) { + rd_kafka_topic_partition_t *lookup_key; + rd_kafka_share_ack_batches_t *batches; + rd_kafka_share_ack_batch_entry_t *entry; + int i; + int64_t idx; + + /* Find partition in inflight_acks map */ + lookup_key = rd_kafka_topic_partition_new(topic, partition); + + batches = RD_MAP_GET(&rkshare->rkshare_inflight_acks, lookup_key); + rd_kafka_topic_partition_destroy(lookup_key); + if (!batches) + return RD_KAFKA_RESP_ERR__INVALID_ARG; + + /* Find entry containing offset */ + RD_LIST_FOREACH(entry, &batches->entries, i) { + if (offset >= entry->start_offset && + offset <= entry->end_offset) { + /* Found the entry containing this offset */ + idx = offset - entry->start_offset; + + /* GAP records cannot be acknowledged */ + if (entry->types[idx] == + RD_KAFKA_SHARE_INTERNAL_ACK_GAP) + return RD_KAFKA_RESP_ERR__STATE; + + /* Decrement unacked count when transitioning + * from ACQUIRED to an explicit ack type. */ + if (entry->types[idx] == + RD_KAFKA_SHARE_INTERNAL_ACK_ACQUIRED) + rkshare->rkshare_unacked_cnt--; + + /* Update the type */ + entry->types[idx] = type; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + } + + /* Offset not found in any entry */ + return RD_KAFKA_RESP_ERR__INVALID_ARG; +} diff --git a/src/rdkafka_share_acknowledgement.h b/src/rdkafka_share_acknowledgement.h index d7c3bee034..94627b72ab 100644 --- a/src/rdkafka_share_acknowledgement.h +++ b/src/rdkafka_share_acknowledgement.h @@ -1,8 +1,7 @@ /* * librdkafka - The Apache Kafka C/C++ library * - * Copyright (c) 2015-2022, Magnus Edenhill, - * 2026, Confluent Inc. + * Copyright (c) 2026, Confluent Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -163,4 +162,24 @@ void rd_kafka_share_ack_all(rd_kafka_share_t *rkshare); */ rd_list_t *rd_kafka_share_build_ack_details(rd_kafka_share_t *rkshare); +/** + * @brief Update ack type for any offset except GAP offsets. + * + * @param rkshare Share consumer handle + * @param topic Topic name + * @param partition Partition id + * @param offset Offset to acknowledge + * @param type New acknowledgement type (ACCEPT, REJECT, RELEASE) + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success + * @returns RD_KAFKA_RESP_ERR__INVALID_ARG if partition/offset not found + * @returns RD_KAFKA_RESP_ERR__STATE if record is GAP offset + */ +rd_kafka_resp_err_t rd_kafka_share_inflight_ack_update( + rd_kafka_share_t *rkshare, + const char *topic, + int32_t partition, + int64_t offset, + rd_kafka_share_internal_acknowledgement_type type); + #endif /* _RDKAFKA_SHARE_ACKNOWLEDGEMENT_H_ */ diff --git a/src/rdunittest.c b/src/rdunittest.c index 65a10f62ec..9799bbcc87 100644 --- a/src/rdunittest.c +++ b/src/rdunittest.c @@ -424,6 +424,7 @@ extern int unittest_scram(void); extern int unittest_assignors(void); extern int unittest_map(void); extern int unittest_fetcher_share_filter_forward(void); +extern int unittest_share_acknowledge(void); #if WITH_CURL extern int unittest_http(void); #endif @@ -486,6 +487,7 @@ int rd_unittest(void) { {"telemetry_decode", unittest_telemetry_decode}, {"fetcher_share_filter_forward", unittest_fetcher_share_filter_forward}, + {"share_acknowledge", unittest_share_acknowledge}, {"feature", unittest_feature}, #if WITH_SSL {"ssl", unittest_ssl}, diff --git a/src/rdunittest_acknowledge.c b/src/rdunittest_acknowledge.c new file mode 100644 index 0000000000..3721d87bb6 --- /dev/null +++ b/src/rdunittest_acknowledge.c @@ -0,0 +1,648 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2026, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * @brief Unit tests for Share Consumer acknowledge APIs + * + * Tests the following public APIs: + * 1. rd_kafka_share_acknowledge() - Acknowledge delivered record with ACCEPT + * 2. rd_kafka_share_acknowledge_type() - Acknowledge delivered record with type + * 3. rd_kafka_share_acknowledge_offset() - Acknowledge error record by offset + */ + +#include "rd.h" +#include "rdunittest.h" +#include "rdkafka_int.h" +#include "rdkafka_partition.h" + +/* Test topic_id for unit tests (non-zero UUID) */ +static const rd_kafka_Uuid_t ut_topic_id_t1 = {.most_significant_bits = 1, + .least_significant_bits = 1}; +static const rd_kafka_Uuid_t ut_topic_id_t2 = {.most_significant_bits = 2, + .least_significant_bits = 2}; + +/** + * @brief Create a rd_kafka_share_t instance for testing using the proper + * share consumer API with explicit acknowledgement mode. + */ +static rd_kafka_share_t *ut_ack_create_share_consumer(void) { + rd_kafka_conf_t *conf = rd_kafka_conf_new(); + char errstr[128]; + + if (rd_kafka_conf_set(conf, "group.id", "ut-share-ack", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + rd_kafka_conf_destroy(conf); + return NULL; + } + + /* Enable explicit acknowledgement mode for testing acknowledge APIs */ + if (rd_kafka_conf_set(conf, "share.acknowledgement.mode", "explicit", + errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + rd_kafka_conf_destroy(conf); + return NULL; + } + + rd_kafka_share_t *rkshare = + rd_kafka_share_consumer_new(conf, errstr, sizeof(errstr)); + /* conf is consumed by rd_kafka_share_consumer_new on success */ + + return rkshare; +} + +/** + * @brief Register a topic with the rd_kafka_t instance and set its topic_id. + * + * This is needed for rd_kafka_share_acknowledge_offset() to find the topic + * by name and resolve its topic_id. + */ +static rd_kafka_topic_t *ut_ack_register_topic(rd_kafka_t *rk, + const char *topic, + rd_kafka_Uuid_t topic_id) { + rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic, NULL); + if (!rkt) + return NULL; + + /* Set the topic_id on the registered topic */ + rkt->rkt_topic_id = topic_id; + + return rkt; +} + +/** + * @brief Clear all entries from the inflight acks map. + * + * Used between tests to reset state while keeping the same rkshare. + */ +static void ut_ack_clear_inflight_map(rd_kafka_share_t *rkshare) { + if (!rkshare) + return; + + /* RD_MAP_CLEAR will call the free functions for keys and values */ + RD_MAP_CLEAR(&rkshare->rkshare_inflight_acks); +} + +/** + * @brief Add a partition with acquired offsets to the rkshare inflight map. + * + * Creates an entry with all offsets in ACQUIRED state (delivered records). + * Delivered records can be acknowledged via record-based APIs. + */ +static void ut_ack_add_partition(rd_kafka_share_t *rkshare, + const char *topic, + int32_t partition, + int64_t start_offset, + int64_t end_offset) { + rd_kafka_topic_partition_private_t *parpriv; + rd_kafka_share_ack_batches_t *batches = rd_calloc(1, sizeof(*batches)); + + batches->rktpar = rd_kafka_topic_partition_new(topic, partition); + parpriv = rd_kafka_topic_partition_private_new(); + batches->rktpar->_private = parpriv; + + batches->response_leader_id = 1; + batches->response_leader_epoch = 1; + + int64_t size = end_offset - start_offset + 1; + batches->response_msgs_count = (int32_t)size; + + rd_list_init(&batches->entries, 1, NULL); + + rd_kafka_share_ack_batch_entry_t *entry = rd_calloc(1, sizeof(*entry)); + entry->start_offset = start_offset; + entry->end_offset = end_offset; + entry->size = size; + entry->types_cnt = (int32_t)size; + entry->types = rd_calloc(size, sizeof(*entry->types)); + + /* Initialize all offsets to ACQUIRED */ + for (int64_t i = 0; i < size; i++) { + entry->types[i] = RD_KAFKA_SHARE_INTERNAL_ACK_ACQUIRED; + } + + rd_list_add(&batches->entries, entry); + + rd_kafka_topic_partition_t *key = + rd_kafka_topic_partition_new(topic, partition); + RD_MAP_SET(&rkshare->rkshare_inflight_acks, key, batches); +} + + +/** + * @brief Set a specific offset as a GAP record. + */ +static void ut_ack_set_gap(rd_kafka_share_t *rkshare, + const char *topic, + int32_t partition, + int64_t offset) { + rd_kafka_topic_partition_t *lookup_key = + rd_kafka_topic_partition_new(topic, partition); + + rd_kafka_share_ack_batches_t *batches = + RD_MAP_GET(&rkshare->rkshare_inflight_acks, lookup_key); + rd_kafka_topic_partition_destroy(lookup_key); + + if (!batches) + return; + + rd_kafka_share_ack_batch_entry_t *entry; + int i; + RD_LIST_FOREACH(entry, &batches->entries, i) { + if (offset >= entry->start_offset && + offset <= entry->end_offset) { + int64_t idx = offset - entry->start_offset; + entry->types[idx] = RD_KAFKA_SHARE_INTERNAL_ACK_GAP; + return; + } + } +} + +/** + * @brief Get the ack type for a specific offset in the inflight map. + */ +static rd_kafka_share_internal_acknowledgement_type +ut_ack_get_type(rd_kafka_share_t *rkshare, + const char *topic, + int32_t partition, + int64_t offset) { + rd_kafka_topic_partition_t *lookup_key = + rd_kafka_topic_partition_new(topic, partition); + + rd_kafka_share_ack_batches_t *batches = + RD_MAP_GET(&rkshare->rkshare_inflight_acks, lookup_key); + rd_kafka_topic_partition_destroy(lookup_key); + + if (!batches) + return -99; /* Invalid marker */ + + rd_kafka_share_ack_batch_entry_t *entry; + int i; + RD_LIST_FOREACH(entry, &batches->entries, i) { + if (offset >= entry->start_offset && + offset <= entry->end_offset) { + int64_t idx = offset - entry->start_offset; + return entry->types[idx]; + } + } + + return -99; /* Invalid marker */ +} + +/** + * @brief Create a mock rd_kafka_message_t for testing. + * + * Uses a properly registered rd_kafka_topic_t to avoid issues with + * rd_kafka_topic_name() when the acknowledge APIs are called. + */ +static rd_kafka_message_t *ut_ack_create_message(rd_kafka_topic_t *rkt, + int32_t partition, + int64_t offset) { + rd_kafka_message_t *rkmessage = rd_calloc(1, sizeof(*rkmessage)); + rkmessage->rkt = rkt; + rkmessage->partition = partition; + rkmessage->offset = offset; + return rkmessage; +} + +/** + * @brief Destroy a mock rd_kafka_message_t. + */ +static void ut_ack_destroy_message(rd_kafka_message_t *rkmessage) { + if (rkmessage) + rd_free(rkmessage); +} + +/** + * @brief Test rd_kafka_share_acknowledge() - Basic ACCEPT acknowledgement. + * + * Verifies that rd_kafka_share_acknowledge() correctly updates an offset from + * ACQUIRED to ACCEPT state, and that adjacent offsets remain unchanged. + */ +static int ut_case_acknowledge_accept(rd_kafka_share_t *rkshare, + rd_kafka_topic_t *rkt) { + const char *topic = rd_kafka_topic_name(rkt); + + /* Add partition with offsets 0-9 in ACQUIRED state */ + ut_ack_add_partition(rkshare, topic, 0, 0, 9); + + /* Create a mock message for offset 5 */ + rd_kafka_message_t *msg = ut_ack_create_message(rkt, 0, 5); + + /* Verify offset 5 is currently ACQUIRED */ + RD_UT_ASSERT(ut_ack_get_type(rkshare, topic, 0, 5) == + RD_KAFKA_SHARE_INTERNAL_ACK_ACQUIRED, + "offset 5 should be ACQUIRED before acknowledge"); + + /* Call rd_kafka_share_acknowledge */ + rd_kafka_resp_err_t err = rd_kafka_share_acknowledge(rkshare, msg); + RD_UT_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "acknowledge failed: %s", rd_kafka_err2str(err)); + + /* Verify offset 5 is now ACCEPT */ + RD_UT_ASSERT(ut_ack_get_type(rkshare, topic, 0, 5) == + RD_KAFKA_SHARE_INTERNAL_ACK_ACCEPT, + "offset 5 should be ACCEPT after acknowledge"); + + /* Verify other offsets are still ACQUIRED */ + RD_UT_ASSERT(ut_ack_get_type(rkshare, topic, 0, 4) == + RD_KAFKA_SHARE_INTERNAL_ACK_ACQUIRED, + "offset 4 should still be ACQUIRED"); + RD_UT_ASSERT(ut_ack_get_type(rkshare, topic, 0, 6) == + RD_KAFKA_SHARE_INTERNAL_ACK_ACQUIRED, + "offset 6 should still be ACQUIRED"); + + ut_ack_destroy_message(msg); + ut_ack_clear_inflight_map(rkshare); + + RD_UT_PASS(); +} + +/** + * @brief Test rd_kafka_share_acknowledge_type() - Acknowledge with various + * types. + * + * Tests that rd_kafka_share_acknowledge_type() correctly updates the offset + * to the specified type (REJECT or RELEASE). + */ +static int ut_case_acknowledge_type_reject(rd_kafka_share_t *rkshare, + rd_kafka_topic_t *rkt) { + const char *topic = rd_kafka_topic_name(rkt); + + ut_ack_add_partition(rkshare, topic, 0, 0, 9); + + rd_kafka_message_t *msg = ut_ack_create_message(rkt, 0, 3); + + /* Acknowledge with REJECT */ + rd_kafka_resp_err_t err = rd_kafka_share_acknowledge_type( + rkshare, msg, RD_KAFKA_SHARE_ACK_TYPE_REJECT); + RD_UT_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "acknowledge_type REJECT failed: %s", + rd_kafka_err2str(err)); + + /* Verify type changed to REJECT */ + RD_UT_ASSERT(ut_ack_get_type(rkshare, topic, 0, 3) == + RD_KAFKA_SHARE_INTERNAL_ACK_REJECT, + "offset 3 should be REJECT"); + + ut_ack_destroy_message(msg); + ut_ack_clear_inflight_map(rkshare); + + RD_UT_PASS(); +} + +/** + * @brief Test rd_kafka_share_acknowledge_type() with RELEASE type. + */ +static int ut_case_acknowledge_type_release(rd_kafka_share_t *rkshare, + rd_kafka_topic_t *rkt) { + const char *topic = rd_kafka_topic_name(rkt); + + ut_ack_add_partition(rkshare, topic, 0, 0, 9); + + rd_kafka_message_t *msg = ut_ack_create_message(rkt, 0, 7); + + /* Acknowledge with RELEASE */ + rd_kafka_resp_err_t err = rd_kafka_share_acknowledge_type( + rkshare, msg, RD_KAFKA_SHARE_ACK_TYPE_RELEASE); + RD_UT_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "acknowledge_type RELEASE failed: %s", + rd_kafka_err2str(err)); + + /* Verify type changed to RELEASE */ + RD_UT_ASSERT(ut_ack_get_type(rkshare, topic, 0, 7) == + RD_KAFKA_SHARE_INTERNAL_ACK_RELEASE, + "offset 7 should be RELEASE"); + + ut_ack_destroy_message(msg); + ut_ack_clear_inflight_map(rkshare); + + RD_UT_PASS(); +} + +/** + * @brief Test re-acknowledgement with record-based APIs. + * + * Verifies that re-acknowledging a delivered record with record-based APIs + * succeeds and updates the type. This is allowed before commit. + */ +static int ut_case_reacknowledge_delivered(rd_kafka_share_t *rkshare, + rd_kafka_topic_t *rkt) { + const char *topic = rd_kafka_topic_name(rkt); + + ut_ack_add_partition(rkshare, topic, 0, 0, 9); + + rd_kafka_message_t *msg = ut_ack_create_message(rkt, 0, 5); + + /* First acknowledge with ACCEPT */ + rd_kafka_resp_err_t err = rd_kafka_share_acknowledge_type( + rkshare, msg, RD_KAFKA_SHARE_ACK_TYPE_ACCEPT); + RD_UT_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "first acknowledge failed: %s", rd_kafka_err2str(err)); + RD_UT_ASSERT(ut_ack_get_type(rkshare, topic, 0, 5) == + RD_KAFKA_SHARE_INTERNAL_ACK_ACCEPT, + "offset 5 should be ACCEPT"); + + /* Re-acknowledge with REJECT - should succeed */ + err = rd_kafka_share_acknowledge_type(rkshare, msg, + RD_KAFKA_SHARE_ACK_TYPE_REJECT); + RD_UT_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "re-acknowledge should succeed, got %s", + rd_kafka_err2str(err)); + + /* Verify type changed to REJECT */ + RD_UT_ASSERT(ut_ack_get_type(rkshare, topic, 0, 5) == + RD_KAFKA_SHARE_INTERNAL_ACK_REJECT, + "offset 5 should be REJECT after re-acknowledge"); + + /* Re-acknowledge again with RELEASE */ + err = rd_kafka_share_acknowledge_type(rkshare, msg, + RD_KAFKA_SHARE_ACK_TYPE_RELEASE); + RD_UT_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "second re-acknowledge should succeed"); + RD_UT_ASSERT(ut_ack_get_type(rkshare, topic, 0, 5) == + RD_KAFKA_SHARE_INTERNAL_ACK_RELEASE, + "offset 5 should be RELEASE after second re-acknowledge"); + + ut_ack_destroy_message(msg); + ut_ack_clear_inflight_map(rkshare); + + RD_UT_PASS(); +} + +/** + * @brief Test error case - GAP records cannot be acknowledged. + * + * Verifies that GAP records cannot be acknowledged by any API. + */ +static int ut_case_error_gap_record(rd_kafka_share_t *rkshare, + rd_kafka_topic_t *rkt) { + const char *topic = rd_kafka_topic_name(rkt); + + ut_ack_add_partition(rkshare, topic, 0, 0, 9); + + /* Set offset 5 as a GAP record */ + ut_ack_set_gap(rkshare, topic, 0, 5); + + rd_kafka_message_t *msg = ut_ack_create_message(rkt, 0, 5); + + /* Try to acknowledge GAP with record-based API - should fail */ + rd_kafka_resp_err_t err = rd_kafka_share_acknowledge(rkshare, msg); + RD_UT_ASSERT(err == RD_KAFKA_RESP_ERR__STATE, + "expected STATE error for GAP record, got %s", + rd_kafka_err2str(err)); + + /* Verify GAP record is unchanged */ + RD_UT_ASSERT(ut_ack_get_type(rkshare, topic, 0, 5) == + RD_KAFKA_SHARE_INTERNAL_ACK_GAP, + "offset 5 should still be GAP"); + + ut_ack_destroy_message(msg); + ut_ack_clear_inflight_map(rkshare); + + RD_UT_PASS(); +} + +/** + * @brief Test error case - Invalid parameters (NULL). + * + * Verifies that NULL rkshare, message, or topic parameters return + * RD_KAFKA_RESP_ERR__INVALID_ARG error. + */ +static int ut_case_error_null_parameters(rd_kafka_share_t *rkshare, + rd_kafka_topic_t *rkt) { + rd_kafka_message_t *msg = ut_ack_create_message(rkt, 0, 5); + + /* Test NULL rkshare */ + rd_kafka_resp_err_t err = rd_kafka_share_acknowledge(NULL, msg); + RD_UT_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG, + "expected INVALID_ARG for NULL rkshare, got %s", + rd_kafka_err2str(err)); + + /* Test NULL message */ + err = rd_kafka_share_acknowledge(rkshare, NULL); + RD_UT_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG, + "expected INVALID_ARG for NULL message, got %s", + rd_kafka_err2str(err)); + + /* Test NULL topic in acknowledge_offset */ + err = rd_kafka_share_acknowledge_offset(rkshare, NULL, 0, 5, + RD_KAFKA_SHARE_ACK_TYPE_ACCEPT); + RD_UT_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG, + "expected INVALID_ARG for NULL topic, got %s", + rd_kafka_err2str(err)); + + ut_ack_destroy_message(msg); + + RD_UT_PASS(); +} + +/** + * @brief Test error case - Invalid acknowledgement type. + * + * Verifies that invalid acknowledgement types (e.g., GAP which is + * internal-only, or arbitrary values like 99) return + * RD_KAFKA_RESP_ERR__INVALID_ARG error. Also verifies the offset remains in + * ACQUIRED state after the failed attempt. + */ +static int ut_case_error_invalid_type(rd_kafka_share_t *rkshare, + rd_kafka_topic_t *rkt) { + const char *topic = rd_kafka_topic_name(rkt); + + ut_ack_add_partition(rkshare, topic, 0, 0, 9); + + rd_kafka_message_t *msg = ut_ack_create_message(rkt, 0, 5); + + /* Test invalid type value (e.g., 99) */ + rd_kafka_resp_err_t err = rd_kafka_share_acknowledge_type( + rkshare, msg, (rd_kafka_share_ack_type_t)99); + RD_UT_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG, + "expected INVALID_ARG for invalid type, got %s", + rd_kafka_err2str(err)); + + /* Test type 0 (GAP - not allowed in public API) */ + err = rd_kafka_share_acknowledge_type(rkshare, msg, + (rd_kafka_share_ack_type_t)0); + RD_UT_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG, + "expected INVALID_ARG for GAP type, got %s", + rd_kafka_err2str(err)); + + /* Verify offset is still ACQUIRED (not modified) */ + RD_UT_ASSERT(ut_ack_get_type(rkshare, topic, 0, 5) == + RD_KAFKA_SHARE_INTERNAL_ACK_ACQUIRED, + "offset 5 should still be ACQUIRED after invalid type"); + + ut_ack_destroy_message(msg); + ut_ack_clear_inflight_map(rkshare); + + RD_UT_PASS(); +} + +/** + * @brief Test multiple partitions with record-based APIs. + * + * Tests acknowledging delivered records across multiple topics and partitions + * using record-based APIs. Verifies that each partition's acknowledgements + * are tracked independently. + */ +static int ut_case_acknowledge_multiple_partitions(rd_kafka_share_t *rkshare, + rd_kafka_topic_t *rkt_t1, + rd_kafka_topic_t *rkt_t2) { + const char *topic1 = rd_kafka_topic_name(rkt_t1); + const char *topic2 = rd_kafka_topic_name(rkt_t2); + + /* Add multiple partitions with delivered records */ + ut_ack_add_partition(rkshare, topic1, 0, 0, 9); + ut_ack_add_partition(rkshare, topic1, 1, 100, 109); + ut_ack_add_partition(rkshare, topic2, 0, 50, 59); + + rd_kafka_message_t *msg1 = ut_ack_create_message(rkt_t1, 0, 5); + rd_kafka_message_t *msg2 = ut_ack_create_message(rkt_t1, 1, 105); + rd_kafka_message_t *msg3 = ut_ack_create_message(rkt_t2, 0, 55); + + rd_kafka_resp_err_t err; + + /* Acknowledge across partitions using record-based APIs */ + err = rd_kafka_share_acknowledge_type(rkshare, msg1, + RD_KAFKA_SHARE_ACK_TYPE_ACCEPT); + RD_UT_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, "T1-0 offset 5 failed"); + + err = rd_kafka_share_acknowledge_type(rkshare, msg2, + RD_KAFKA_SHARE_ACK_TYPE_REJECT); + RD_UT_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "T1-1 offset 105 failed"); + + err = rd_kafka_share_acknowledge_type(rkshare, msg3, + RD_KAFKA_SHARE_ACK_TYPE_RELEASE); + RD_UT_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "T2-0 offset 55 failed"); + + /* Verify each partition independently */ + RD_UT_ASSERT(ut_ack_get_type(rkshare, topic1, 0, 5) == + RD_KAFKA_SHARE_INTERNAL_ACK_ACCEPT, + "T1-0 offset 5 should be ACCEPT"); + RD_UT_ASSERT(ut_ack_get_type(rkshare, topic1, 1, 105) == + RD_KAFKA_SHARE_INTERNAL_ACK_REJECT, + "T1-1 offset 105 should be REJECT"); + RD_UT_ASSERT(ut_ack_get_type(rkshare, topic2, 0, 55) == + RD_KAFKA_SHARE_INTERNAL_ACK_RELEASE, + "T2-0 offset 55 should be RELEASE"); + + /* Verify other offsets unchanged */ + RD_UT_ASSERT(ut_ack_get_type(rkshare, topic1, 0, 4) == + RD_KAFKA_SHARE_INTERNAL_ACK_ACQUIRED, + "T1-0 offset 4 should be ACQUIRED"); + RD_UT_ASSERT(ut_ack_get_type(rkshare, topic1, 1, 104) == + RD_KAFKA_SHARE_INTERNAL_ACK_ACQUIRED, + "T1-1 offset 104 should be ACQUIRED"); + + ut_ack_destroy_message(msg1); + ut_ack_destroy_message(msg2); + ut_ack_destroy_message(msg3); + ut_ack_clear_inflight_map(rkshare); + + RD_UT_PASS(); +} + + +/** + * @brief Main entry point for Share Consumer acknowledge API unit tests. + */ +int unittest_share_acknowledge(void) { + rd_kafka_share_t *rkshare; + rd_kafka_t *rk; + rd_kafka_topic_t *rkt_t1, *rkt_t2; + + RD_UT_SAY("==============================================="); + RD_UT_SAY("Share Consumer Acknowledge API Unit Tests"); + RD_UT_SAY("==============================================="); + + rkshare = ut_ack_create_share_consumer(); + RD_UT_ASSERT(rkshare != NULL, "Failed to create rd_kafka_share_t"); + + rk = rkshare->rkshare_rk; + + /* Register topics T1 and T2 upfront for all tests that need them. + * These will be destroyed at the end of the test suite. */ + rkt_t1 = ut_ack_register_topic(rk, "T1", ut_topic_id_t1); + RD_UT_ASSERT(rkt_t1 != NULL, "Failed to register topic T1"); + rkt_t2 = ut_ack_register_topic(rk, "T2", ut_topic_id_t2); + RD_UT_ASSERT(rkt_t2 != NULL, "Failed to register topic T2"); + +/* Macro for test cleanup on failure */ +#define UT_ACK_CLEANUP_AND_FAIL() \ + do { \ + rd_kafka_topic_destroy(rkt_t1); \ + rd_kafka_topic_destroy(rkt_t2); \ + rd_kafka_share_consumer_close(rkshare); \ + rd_kafka_share_destroy(rkshare); \ + return 1; \ + } while (0) + + /* Record-based API tests (delivered records) */ + RD_UT_SAY("Testing rd_kafka_share_acknowledge() (ACCEPT)..."); + if (ut_case_acknowledge_accept(rkshare, rkt_t1)) + UT_ACK_CLEANUP_AND_FAIL(); + + RD_UT_SAY("Testing rd_kafka_share_acknowledge_type() (REJECT)..."); + if (ut_case_acknowledge_type_reject(rkshare, rkt_t1)) + UT_ACK_CLEANUP_AND_FAIL(); + + RD_UT_SAY("Testing rd_kafka_share_acknowledge_type() (RELEASE)..."); + if (ut_case_acknowledge_type_release(rkshare, rkt_t1)) + UT_ACK_CLEANUP_AND_FAIL(); + + /* Re-acknowledgement tests */ + RD_UT_SAY("Testing re-acknowledgement of delivered records..."); + if (ut_case_reacknowledge_delivered(rkshare, rkt_t1)) + UT_ACK_CLEANUP_AND_FAIL(); + + /* Error case tests */ + RD_UT_SAY("Testing error: GAP records cannot be acknowledged..."); + if (ut_case_error_gap_record(rkshare, rkt_t1)) + UT_ACK_CLEANUP_AND_FAIL(); + + RD_UT_SAY("Testing error: NULL parameters..."); + if (ut_case_error_null_parameters(rkshare, rkt_t1)) + UT_ACK_CLEANUP_AND_FAIL(); + + RD_UT_SAY("Testing error: invalid type..."); + if (ut_case_error_invalid_type(rkshare, rkt_t1)) + UT_ACK_CLEANUP_AND_FAIL(); + + /* Multi-partition test */ + RD_UT_SAY("Testing multiple partitions..."); + if (ut_case_acknowledge_multiple_partitions(rkshare, rkt_t1, rkt_t2)) + UT_ACK_CLEANUP_AND_FAIL(); + +#undef UT_ACK_CLEANUP_AND_FAIL + + rd_kafka_topic_destroy(rkt_t1); + rd_kafka_topic_destroy(rkt_t2); + rd_kafka_share_consumer_close(rkshare); + rd_kafka_share_destroy(rkshare); + RD_UT_PASS(); +} diff --git a/src/rdunittest_fetcher.c b/src/rdunittest_fetcher.c index e9fad0744d..3a60f3ec74 100644 --- a/src/rdunittest_fetcher.c +++ b/src/rdunittest_fetcher.c @@ -1,8 +1,7 @@ /* * librdkafka - Apache Kafka C library * - * Copyright (c) 2012-2022, Magnus Edenhill - * 2026, Confluent Inc. + * Copyright (c) 2026, Confluent Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -54,15 +53,17 @@ static rd_kafka_t *ut_mock_rk; static rd_kafka_share_t *ut_create_mock_rkshare(void) { - rd_kafka_share_t *rkshare = rd_calloc(1, sizeof(*rkshare)); - rkshare->rkshare_rk = ut_mock_rk; - rkshare->rkshare_unacked_cnt = 0; + rd_kafka_conf_t *conf = rd_kafka_conf_new(); + char errstr[128]; - RD_MAP_INIT(&rkshare->rkshare_inflight_acks, 16, - rd_kafka_topic_partition_by_id_cmp, - rd_kafka_topic_partition_hash_by_id, - rd_kafka_topic_partition_destroy_free, - rd_kafka_share_ack_batches_destroy_free); + if (rd_kafka_conf_set(conf, "group.id", "ut-share-ack", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + rd_kafka_conf_destroy(conf); + return NULL; + } + + rd_kafka_share_t *rkshare = + rd_kafka_share_consumer_new(conf, errstr, sizeof(errstr)); return rkshare; } @@ -70,8 +71,8 @@ static rd_kafka_share_t *ut_create_mock_rkshare(void) { static void ut_destroy_mock_rkshare(rd_kafka_share_t *rkshare) { if (!rkshare) return; - RD_MAP_DESTROY(&rkshare->rkshare_inflight_acks); - rd_free(rkshare); + rd_kafka_share_consumer_close(rkshare); + rd_kafka_share_destroy(rkshare); } static rd_kafka_topic_partition_t * @@ -86,10 +87,8 @@ ut_create_rktpar_with_id(const char *topic, static void ut_add_batches_to_map(rd_kafka_share_t *rkshare, rd_kafka_share_ack_batches_t *batches) { - rd_kafka_topic_partition_t *key = - rd_kafka_topic_partition_new_with_topic_id( - rd_kafka_topic_partition_get_topic_id(batches->rktpar), - batches->rktpar->partition); + rd_kafka_topic_partition_t *key = rd_kafka_topic_partition_new( + batches->rktpar->topic, batches->rktpar->partition); RD_MAP_SET(&rkshare->rkshare_inflight_acks, key, batches); }