diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index cb466aeb..5c7d56d1 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -145,7 +145,7 @@ class CPPKAFKA_API Configuration : public ConfigurationBase { Configuration& set_default_topic_configuration(TopicConfiguration config); /** - * Returns true iff the given property name has been set + * Returns true if the given property name has been set */ bool has_property(const std::string& name) const; diff --git a/include/cppkafka/cppkafka.h b/include/cppkafka/cppkafka.h index 2473d1d6..c1c68853 100644 --- a/include/cppkafka/cppkafka.h +++ b/include/cppkafka/cppkafka.h @@ -44,6 +44,7 @@ #include #include #include +#include #include #include #include diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index a15f9474..4226c093 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -43,6 +43,7 @@ namespace cppkafka { class MessageTimestamp; +struct Internal; /** * \brief Thin wrapper over a rdkafka message handle @@ -56,6 +57,8 @@ class MessageTimestamp; */ class CPPKAFKA_API Message { public: + friend class MessageInternal; + using InternalPtr = std::shared_ptr; /** * Constructs a message that won't take ownership of the given pointer */ @@ -134,14 +137,13 @@ class CPPKAFKA_API Message { } /** - * \brief Gets the private data. + * \brief Gets the private user data. * * This should only be used on messages produced by a Producer that were set a private data * attribute */ void* get_user_data() const { - assert(handle_); - return handle_->_private; + return user_data_; } /** @@ -164,6 +166,13 @@ class CPPKAFKA_API Message { rd_kafka_message_t* get_handle() const { return handle_.get(); } + + /** + * Internal private const data accessor (internal use only) + */ + InternalPtr internal() const { + return internal_; + } private: using HandlePtr = std::unique_ptr; @@ -171,10 +180,13 @@ class CPPKAFKA_API Message { Message(rd_kafka_message_t* handle, NonOwningTag); Message(HandlePtr handle); + Message& load_internal(); HandlePtr handle_; Buffer payload_; Buffer key_; + void* user_data_; + InternalPtr internal_; }; using MessageList = std::vector; diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index 59b33657..d09a6022 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -166,6 +166,13 @@ class BasicMessageBuilder { * Gets the message's user data pointer */ void* user_data() const; + + /** + * Private data accessor (internal use only) + */ + Message::InternalPtr internal() const; + Concrete& internal(Message::InternalPtr internal); + private: void construct_buffer(BufferType& lhs, const BufferType& rhs); Concrete& get_concrete(); @@ -176,11 +183,13 @@ class BasicMessageBuilder { BufferType payload_; std::chrono::milliseconds timestamp_{0}; void* user_data_; + Message::InternalPtr internal_; }; template BasicMessageBuilder::BasicMessageBuilder(std::string topic) -: topic_(std::move(topic)) { +: topic_(std::move(topic)), + user_data_(nullptr) { } template @@ -190,16 +199,16 @@ BasicMessageBuilder::BasicMessageBuilder(const Message& message) payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())), timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() : std::chrono::milliseconds(0)), - user_data_(message.get_user_data()) -{ - + user_data_(message.get_user_data()), + internal_(message.internal()) { } template template BasicMessageBuilder::BasicMessageBuilder(const BasicMessageBuilder& rhs) : topic_(rhs.topic()), partition_(rhs.partition()), timestamp_(rhs.timestamp()), - user_data_(rhs.user_data()) { + user_data_(rhs.user_data()), + internal_(rhs.internal()) { get_concrete().construct_buffer(key_, rhs.key()); get_concrete().construct_buffer(payload_, rhs.payload()); } @@ -292,6 +301,17 @@ void* BasicMessageBuilder::user_data() const { return user_data_; } +template +Message::InternalPtr BasicMessageBuilder::internal() const { + return internal_; +} + +template +C& BasicMessageBuilder::internal(Message::InternalPtr internal) { + internal_ = internal; + return get_concrete(); +} + template void BasicMessageBuilder::construct_buffer(T& lhs, const T& rhs) { lhs = rhs; @@ -328,6 +348,15 @@ class MessageBuilder : public BasicMessageBuilder { void construct_buffer(Buffer& lhs, const T& rhs) { lhs = Buffer(rhs); } + + MessageBuilder clone() const { + return std::move(MessageBuilder(topic()). + key(Buffer(key().get_data(), key().get_size())). + payload(Buffer(payload().get_data(), payload().get_size())). + timestamp(timestamp()). + user_data(user_data()). + internal(internal())); + } }; /** diff --git a/include/cppkafka/message_internal.h b/include/cppkafka/message_internal.h new file mode 100644 index 00000000..06e99535 --- /dev/null +++ b/include/cppkafka/message_internal.h @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_MESSAGE_INTERNAL_H +#define CPPKAFKA_MESSAGE_INTERNAL_H + +#include + +namespace cppkafka { + +class Message; + +class Internal { +public: + virtual ~Internal() = default; +}; +using InternalPtr = std::shared_ptr; + +/** + * \brief Private message data structure + */ +class MessageInternal { +public: + MessageInternal(void* user_data, std::shared_ptr internal); + static std::unique_ptr load(Message& message); + void* get_user_data() const; + InternalPtr get_internal() const; +private: + void* user_data_; + InternalPtr internal_; +}; + +template +class MessageInternalGuard { +public: + MessageInternalGuard(BuilderType& builder) + : builder_(builder), + user_data_(builder.user_data()) { + if (builder_.internal()) { + // Swap contents with user_data + ptr_.reset(new MessageInternal(user_data_, builder_.internal())); + builder_.user_data(ptr_.get()); //overwrite user data + } + } + ~MessageInternalGuard() { + //Restore user data + builder_.user_data(user_data_); + } + void release() { + ptr_.release(); + } +private: + BuilderType& builder_; + std::unique_ptr ptr_; + void* user_data_; +}; + +} + +#endif //CPPKAFKA_MESSAGE_INTERNAL_H diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index c9bcb65b..cef799a3 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -39,10 +39,11 @@ #include #include #include +#include #include #include "../producer.h" -#include "../message.h" #include "../detail/callback_invoker.h" +#include "../message_internal.h" namespace cppkafka { @@ -103,7 +104,7 @@ class CPPKAFKA_API BufferedProducer { /** * Callback to indicate a message failed to be flushed */ - using FlushFailureCallback = std::function; + using FlushFailureCallback = std::function; /** * \brief Constructs a buffered producer using the provided configuration @@ -113,7 +114,7 @@ class CPPKAFKA_API BufferedProducer { BufferedProducer(Configuration config); /** - * \brief Adds a message to the producer's buffer. + * \brief Adds a message to the producer's buffer. * * The message won't be sent until flush is called. * @@ -122,7 +123,7 @@ class CPPKAFKA_API BufferedProducer { void add_message(const MessageBuilder& builder); /** - * \brief Adds a message to the producer's buffer. + * \brief Adds a message to the producer's buffer. * * The message won't be sent until flush is called. * @@ -145,6 +146,18 @@ class CPPKAFKA_API BufferedProducer { */ void produce(const MessageBuilder& builder); + /** + * \brief Produces a message synchronously without buffering it + * + * In case of failure, the message will be replayed until 'max_number_retries' is reached + * or until the user ProduceFailureCallback returns false. + * + * \param builder The builder that contains the message to be produced + * + * \remark This method throws cppkafka::HandleException on failure + */ + void sync_produce(const MessageBuilder& builder); + /** * \brief Produces a message asynchronously without buffering it * @@ -221,6 +234,13 @@ class CPPKAFKA_API BufferedProducer { */ size_t get_total_messages_produced() const; + /** + * \brief Get the total number of messages dropped since the beginning + * + * \return The number of messages + */ + size_t get_total_messages_dropped() const; + /** * \brief Get the total outstanding flush operations in progress * @@ -230,6 +250,20 @@ class CPPKAFKA_API BufferedProducer { * \return The number of outstanding flush operations. */ size_t get_flushes_in_progress() const; + + /** + * \brief Sets the maximum number of retries per message until giving up + * + * Default is 5 + */ + void set_max_number_retries(size_t max_number_retries); + + /** + * \brief Gets the max number of retries + * + * \return The number of retries + */ + size_t get_max_number_retries() const; /** * Gets the Producer object @@ -285,9 +319,29 @@ class CPPKAFKA_API BufferedProducer { */ void set_flush_failure_callback(FlushFailureCallback callback); + struct TestParameters { + bool force_delivery_error_; + bool force_produce_error_; + }; +protected: + //For testing purposes only +#ifdef KAFKA_TEST_INSTANCE + void set_test_parameters(TestParameters *test_params) { + test_params_ = test_params; + } + TestParameters* get_test_parameters() { + return test_params_; + } +#else + TestParameters* get_test_parameters() { + return nullptr; + } +#endif + private: using QueueType = std::deque; enum class MessagePriority { Low, High }; + enum class SenderType { Sync, Async }; template struct CounterGuard{ @@ -295,13 +349,39 @@ class CPPKAFKA_API BufferedProducer { ~CounterGuard() { --counter_; } std::atomic& counter_; }; - + + struct Tracker : public Internal { + Tracker(SenderType sender, size_t num_retries) + : sender_(sender), num_retries_(num_retries) + {} + std::future get_new_future() { + should_retry_ = std::promise(); //reset shared data + return should_retry_.get_future(); //issue new future + } + SenderType sender_; + std::promise should_retry_; + size_t num_retries_; + }; + using TrackerPtr = std::shared_ptr; + + template + TrackerPtr add_tracker(BuilderType& builder) { + if (has_internal_data_ && !builder.internal()) { + // Add message tracker only if it hasn't been added before + TrackerPtr tracker = std::make_shared(SenderType::Async, max_number_retries_); + builder.internal(tracker); + return tracker; + } + return nullptr; + } template void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush); - template - void produce_message(const MessageType& message); + template + void produce_message(BuilderType&& builder); Configuration prepare_configuration(Configuration config); void on_delivery_report(const Message& message); + template + void async_produce(BuilderType&& message, bool throw_on_error); // Members Producer producer_; @@ -314,6 +394,12 @@ class CPPKAFKA_API BufferedProducer { std::atomic pending_acks_{0}; std::atomic flushes_in_progress_{0}; std::atomic total_messages_produced_{0}; + std::atomic total_messages_dropped_{0}; + int max_number_retries_{0}; + bool has_internal_data_{false}; +#ifdef KAFKA_TEST_INSTANCE + TestParameters* test_params_; +#endif }; template @@ -330,26 +416,58 @@ template BufferedProducer::BufferedProducer(Configuration config) : producer_(prepare_configuration(std::move(config))) { producer_.set_payload_policy(get_default_payload_policy()); +#ifdef KAFKA_TEST_INSTANCE + test_params_ = nullptr; +#endif } template void BufferedProducer::add_message(const MessageBuilder& builder) { - do_add_message(builder, MessagePriority::Low, true); + add_message(Builder(builder)); //make ConcreteBuilder } template void BufferedProducer::add_message(Builder builder) { + add_tracker(builder); do_add_message(move(builder), MessagePriority::Low, true); } template void BufferedProducer::produce(const MessageBuilder& builder) { - produce_message(builder); + if (has_internal_data_) { + MessageBuilder builder_clone(builder.clone()); + add_tracker(builder_clone); + async_produce(builder_clone, true); + } + else { + async_produce(builder, true); + } +} + +template +void BufferedProducer::sync_produce(const MessageBuilder& builder) { + if (has_internal_data_) { + MessageBuilder builder_clone(builder.clone()); + TrackerPtr tracker = add_tracker(builder_clone); + // produce until we succeed or we reach max retry limit + std::future should_retry; + do { + should_retry = tracker->get_new_future(); + produce_message(builder_clone); + wait_for_acks(); + } + while (should_retry.get()); + } + else { + // produce once + produce_message(builder); + wait_for_acks(); + } } template void BufferedProducer::produce(const Message& message) { - produce_message(message); + async_produce(MessageBuilder(message), true); } template @@ -361,16 +479,7 @@ void BufferedProducer::flush() { std::swap(messages_, flush_queue); } while (!flush_queue.empty()) { - try { - produce_message(flush_queue.front()); - } - catch (const HandleException& ex) { - // If we have a flush failure callback and it returns true, we retry producing this message later - CallbackInvoker callback("flush failure", flush_failure_callback_, &producer_); - if (callback && callback(flush_queue.front(), ex.get_error())) { - do_add_message(std::move(flush_queue.front()), MessagePriority::Low, false); - } - } + async_produce(std::move(flush_queue.front()), false); flush_queue.pop_front(); } wait_for_acks(); @@ -427,10 +536,10 @@ void BufferedProducer::do_add_message(BuilderType&& builder, { std::lock_guard lock(mutex_); if (priority == MessagePriority::High) { - messages_.emplace_front(std::move(builder)); + messages_.emplace_front(std::forward(builder)); } else { - messages_.emplace_back(std::move(builder)); + messages_.emplace_back(std::forward(builder)); } } if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { @@ -458,11 +567,29 @@ size_t BufferedProducer::get_total_messages_produced() const { return total_messages_produced_; } +template +size_t BufferedProducer::get_total_messages_dropped() const { + return total_messages_dropped_; +} + template size_t BufferedProducer::get_flushes_in_progress() const { return flushes_in_progress_; } +template +void BufferedProducer::set_max_number_retries(size_t max_number_retries) { + if (!has_internal_data_ && (max_number_retries > 0)) { + has_internal_data_ = true; //enable once + } + max_number_retries_ = max_number_retries; +} + +template +size_t BufferedProducer::get_max_number_retries() const { + return max_number_retries_; +} + template typename BufferedProducer::Builder BufferedProducer::make_builder(std::string topic) { @@ -485,11 +612,14 @@ void BufferedProducer::set_flush_failure_callback(FlushFailureCallba } template -template -void BufferedProducer::produce_message(const MessageType& message) { +template +void BufferedProducer::produce_message(BuilderType&& builder) { + using builder_type = typename std::decay::type; while (true) { try { - producer_.produce(message); + MessageInternalGuard internal_guard(const_cast(builder)); + producer_.produce(builder); + internal_guard.release(); // Sent successfully ++pending_acks_; break; @@ -506,6 +636,34 @@ void BufferedProducer::produce_message(const MessageType& message) { } } +template +template +void BufferedProducer::async_produce(BuilderType&& builder, bool throw_on_error) { + try { + TestParameters* test_params = get_test_parameters(); + if (test_params && test_params->force_produce_error_) { + throw HandleException(Error(RD_KAFKA_RESP_ERR_UNKNOWN)); + } + produce_message(builder); + } + catch (const HandleException& ex) { + // If we have a flush failure callback and it returns true, we retry producing this message later + CallbackInvoker callback("flush failure", flush_failure_callback_, &producer_); + if (!callback || callback(builder, ex.get_error())) { + TrackerPtr tracker = std::static_pointer_cast(builder.internal()); + if (tracker && tracker->num_retries_ > 0) { + --tracker->num_retries_; + do_add_message(std::forward(builder), MessagePriority::High, false); + return; + } + } + ++total_messages_dropped_; + if (throw_on_error) { + throw; + } + } +} + template Configuration BufferedProducer::prepare_configuration(Configuration config) { using std::placeholders::_2; @@ -516,13 +674,31 @@ Configuration BufferedProducer::prepare_configuration(Configuration template void BufferedProducer::on_delivery_report(const Message& message) { - if (message.get_error()) { + //Get tracker data + TestParameters* test_params = get_test_parameters(); + TrackerPtr tracker = has_internal_data_ ? + std::static_pointer_cast(MessageInternal::load(const_cast(message))->get_internal()) : nullptr; + bool should_retry = false; + if (message.get_error() || (test_params && test_params->force_delivery_error_)) { // We should produce this message again if we don't have a produce failure callback // or we have one but it returns true CallbackInvoker callback("produce failure", produce_failure_callback_, &producer_); if (!callback || callback(message)) { - // Re-enqueue for later retransmission with higher priority (i.e. front of the queue) - do_add_message(Builder(message), MessagePriority::High, false); + // Check if we have reached the maximum retry limit + if (tracker && tracker->num_retries_ > 0) { + --tracker->num_retries_; + if (tracker->sender_ == SenderType::Async) { + // Re-enqueue for later retransmission with higher priority (i.e. front of the queue) + do_add_message(Builder(message), MessagePriority::High, false); + } + should_retry = true; + } + else { + ++total_messages_dropped_; + } + } + else { + ++total_messages_dropped_; } } else { @@ -531,6 +707,10 @@ void BufferedProducer::on_delivery_report(const Message& message) { // Increment the total successful transmissions ++total_messages_produced_; } + // Signal producers + if (tracker) { + tracker->should_retry_.set_value(should_retry); + } // Decrement the expected acks --pending_acks_; assert(pending_acks_ != (size_t)-1); // Prevent underflow diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2e893a83..1525b1c1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -7,6 +7,7 @@ set(SOURCES buffer.cpp queue.cpp message.cpp + message_internal.cpp topic_partition.cpp topic_partition_list.cpp metadata.cpp diff --git a/src/configuration.cpp b/src/configuration.cpp index 92a81df7..061adc7d 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -40,10 +40,8 @@ using std::map; using std::move; using std::vector; using std::initializer_list; - -using boost::optional; - using std::chrono::milliseconds; +using boost::optional; namespace cppkafka { diff --git a/src/message.cpp b/src/message.cpp index d9c0870e..3ac6c070 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -28,6 +28,7 @@ */ #include "message.h" +#include "message_internal.h" using std::chrono::milliseconds; @@ -42,7 +43,8 @@ Message Message::make_non_owning(rd_kafka_message_t* handle) { } Message::Message() -: handle_(nullptr, nullptr) { +: handle_(nullptr, nullptr), + user_data_(nullptr) { } @@ -51,15 +53,25 @@ Message::Message(rd_kafka_message_t* handle) } -Message::Message(rd_kafka_message_t* handle, NonOwningTag) +Message::Message(rd_kafka_message_t* handle, NonOwningTag) : Message(HandlePtr(handle, &dummy_deleter)) { } -Message::Message(HandlePtr handle) +Message::Message(HandlePtr handle) : handle_(move(handle)), payload_(handle_ ? Buffer((const Buffer::DataType*)handle_->payload, handle_->len) : Buffer()), - key_(handle_ ? Buffer((const Buffer::DataType*)handle_->key, handle_->key_len) : Buffer()) { + key_(handle_ ? Buffer((const Buffer::DataType*)handle_->key, handle_->key_len) : Buffer()), + user_data_(handle_ ? handle_->_private : nullptr) { +} + +Message& Message::load_internal() { + if (user_data_) { + MessageInternal* mi = static_cast(user_data_); + user_data_ = mi->get_user_data(); + internal_ = mi->get_internal(); + } + return *this; } // MessageTimestamp diff --git a/src/message_internal.cpp b/src/message_internal.cpp new file mode 100644 index 00000000..012b6514 --- /dev/null +++ b/src/message_internal.cpp @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ +#include "message_internal.h" +#include "message.h" +#include "message_builder.h" + +namespace cppkafka { + +// MessageInternal + +MessageInternal::MessageInternal(void* user_data, + std::shared_ptr internal) +: user_data_(user_data), + internal_(internal) { +} + +std::unique_ptr MessageInternal::load(Message& message) { + return std::unique_ptr(message.load_internal().get_handle() ? + static_cast(message.get_handle()->_private) : nullptr); +} + +void* MessageInternal::get_user_data() const { + return user_data_; +} + +InternalPtr MessageInternal::get_internal() const { + return internal_; +} + +} diff --git a/src/producer.cpp b/src/producer.cpp index e4d47d4f..4081b538 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -28,13 +28,16 @@ */ #include +#include #include "producer.h" #include "exceptions.h" -#include "message.h" +#include "message_internal.h" using std::move; using std::string; using std::chrono::milliseconds; +using std::unique_ptr; +using std::get; namespace cppkafka { diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 007f2e67..d3128fc2 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -74,6 +74,7 @@ void flusher_run(BufferedProducer& producer, if (producer.get_buffer_size() >= (size_t)num_flush) { producer.flush(); } + this_thread::sleep_for(milliseconds(10)); } producer.flush(); } @@ -86,6 +87,36 @@ void clear_run(BufferedProducer& producer, producer.clear(); } +vector dr_data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; +void dr_callback(const Message& message) { + static int i = 0; + if (!message || message.is_eof()) return; + CHECK(message.get_user_data() == &dr_data[i]); + CHECK(*static_cast(message.get_user_data()) == dr_data[i]); + ++i; +} + +bool dr_failure_callback(const Message& message) { + if (!message || message.is_eof()) return true; + CHECK(message.get_user_data() == &dr_data[0]); + CHECK(*static_cast(message.get_user_data()) == dr_data[0]); + return true; //always retry +} + +template +class ErrorProducer : public BufferedProducer +{ +public: + ErrorProducer(Configuration config, + typename BufferedProducer::TestParameters params) : + BufferedProducer(config), + params_(params) { + this->set_test_parameters(¶ms_); + } +private: + typename BufferedProducer::TestParameters params_; +}; + TEST_CASE("simple production", "[producer]") { int partition = 0; @@ -271,6 +302,91 @@ TEST_CASE("multiple messages", "[producer]") { } } +TEST_CASE("multiple sync messages", "[producer][buffered_producer][sync]") { + size_t message_count = 10; + set payloads; + + // Create a consumer and subscribe to this topic + Consumer consumer(make_consumer_config()); + consumer.subscribe({ KAFKA_TOPICS[0] }); + ConsumerRunner runner(consumer, message_count, KAFKA_NUM_PARTITIONS); + + // Now create a producer and produce a message + BufferedProducer producer(make_producer_config()); + producer.set_produce_success_callback(dr_callback); + const string payload_base = "Hello world "; + for (size_t i = 0; i < message_count; ++i) { + const string payload = payload_base + to_string(i); + payloads.insert(payload); + producer.sync_produce(MessageBuilder(KAFKA_TOPICS[0]).payload(payload).user_data(&dr_data[i])); + } + runner.try_join(); + + const auto& messages = runner.get_messages(); + REQUIRE(messages.size() == message_count); + for (size_t i = 0; i < messages.size(); ++i) { + const auto& message = messages[i]; + CHECK(message.get_topic() == KAFKA_TOPICS[0]); + CHECK(payloads.erase(message.get_payload()) == 1); + CHECK(!!message.get_error() == false); + CHECK(!!message.get_key() == false); + CHECK(message.get_partition() >= 0); + CHECK(message.get_partition() < KAFKA_NUM_PARTITIONS); + } +} + +TEST_CASE("replay sync messages with errors", "[producer][buffered_producer][sync]") { + size_t num_retries = 4; + + // Create a consumer and subscribe to this topic + Consumer consumer(make_consumer_config()); + consumer.subscribe({ KAFKA_TOPICS[0] }); + ConsumerRunner runner(consumer, 2*(num_retries+1), KAFKA_NUM_PARTITIONS); + + // Now create a producer and produce a message + ErrorProducer producer(make_producer_config(), BufferedProducer::TestParameters{true, false}); + producer.set_produce_failure_callback(dr_failure_callback); + producer.set_max_number_retries(num_retries); + string payload = "Hello world"; + MessageBuilder builder(KAFKA_TOPICS[0]); + builder.payload(payload).user_data(&dr_data[0]); + + //Produce the same message twice + producer.sync_produce(builder); + producer.sync_produce(builder); + runner.try_join(); + + const auto& messages = runner.get_messages(); + REQUIRE(messages.size() == 2*(num_retries+1)); + for (size_t i = 0; i < messages.size(); ++i) { + const auto& message = messages[i]; + CHECK(message.get_topic() == KAFKA_TOPICS[0]); + CHECK(message.get_payload() == payload); + CHECK(!!message.get_error() == false); + CHECK(!!message.get_key() == false); + CHECK(message.get_partition() >= 0); + CHECK(message.get_partition() < KAFKA_NUM_PARTITIONS); + } +} + +TEST_CASE("replay async messages with errors", "[producer][buffered_producer][async]") { + size_t num_retries = 4; + int exit_flag = 0; + + // Now create a producer and produce a message + ErrorProducer producer(make_producer_config(), + BufferedProducer::TestParameters{false, true}); + producer.set_max_number_retries(num_retries); + thread flusher_thread(flusher_run, ref(producer), ref(exit_flag), 0); + string payload = "Hello world"; + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).payload(payload)); + this_thread::sleep_for(milliseconds(2000)); + exit_flag = 1; + flusher_thread.join(); + REQUIRE(producer.get_total_messages_produced() == 0); + CHECK(producer.get_total_messages_dropped() == 1); +} + TEST_CASE("buffered producer", "[producer][buffered_producer]") { int partition = 0;