diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index a241e7b6..0ee3f93d 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -482,9 +482,10 @@ class CPPKAFKA_API BufferedProducer { #endif private: - enum class MessagePriority { Low, High }; enum class SenderType { Sync, Async }; - + enum class QueueKind { Retry, Regular }; + enum class FlushAction { DontFlush, DoFlush }; + template struct CounterGuard{ CounterGuard(std::atomic& counter) : counter_(counter) { ++counter_; } @@ -519,18 +520,21 @@ class CPPKAFKA_API BufferedProducer { return nullptr; } template - void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush); + void do_add_message(BuilderType&& builder, QueueKind queue_kind, FlushAction flush_action); template void produce_message(BuilderType&& builder); Configuration prepare_configuration(Configuration config); void on_delivery_report(const Message& message); template void async_produce(BuilderType&& message, bool throw_on_error); - + static void swap_queues(QueueType & queue1, QueueType & queue2, std::mutex & mutex); + // Members Producer producer_; QueueType messages_; + QueueType retry_messages_; mutable std::mutex mutex_; + mutable std::mutex retry_mutex_; ProduceSuccessCallback produce_success_callback_; ProduceFailureCallback produce_failure_callback_; ProduceTerminationCallback produce_termination_callback_; @@ -565,7 +569,8 @@ template BufferedProducer::BufferedProducer(Configuration config, const Allocator& alloc) : producer_(prepare_configuration(std::move(config))), - messages_(alloc) { + messages_(alloc), + retry_messages_(alloc) { producer_.set_payload_policy(get_default_payload_policy()); #ifdef KAFKA_TEST_INSTANCE test_params_ = nullptr; @@ -580,7 +585,7 @@ void BufferedProducer::add_message(const MessageBuilder& template void BufferedProducer::add_message(Builder builder) { add_tracker(SenderType::Async, builder); - do_add_message(move(builder), MessagePriority::Low, true); + do_add_message(move(builder), QueueKind::Regular, FlushAction::DoFlush); } template @@ -624,30 +629,36 @@ void BufferedProducer::produce(const Message& message) { template void BufferedProducer::async_flush() { CounterGuard counter_guard(flushes_in_progress_); - QueueType flush_queue; // flush from temporary queue + auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void { - std::lock_guard lock(mutex_); - std::swap(messages_, flush_queue); - } - while (!flush_queue.empty()) { - async_produce(std::move(flush_queue.front()), false); - flush_queue.pop_front(); - } + QueueType flush_queue; // flush from temporary queue + swap_queues(queue, flush_queue, mutex); + + while (!flush_queue.empty()) { + async_produce(std::move(flush_queue.front()), false); + flush_queue.pop_front(); + } + }; + queue_flusher(retry_messages_, retry_mutex_); + queue_flusher(messages_, mutex_); } template void BufferedProducer::flush(bool preserve_order) { if (preserve_order) { CounterGuard counter_guard(flushes_in_progress_); - QueueType flush_queue; // flush from temporary queue + auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void { - std::lock_guard lock(mutex_); - std::swap(messages_, flush_queue); - } - while (!flush_queue.empty()) { - sync_produce(flush_queue.front()); - flush_queue.pop_front(); - } + QueueType flush_queue; // flush from temporary queue + swap_queues(queue, flush_queue, mutex); + + while (!flush_queue.empty()) { + sync_produce(flush_queue.front()); + flush_queue.pop_front(); + } + }; + queue_flusher(retry_messages_, retry_mutex_); + queue_flusher(messages_, mutex_); } else { async_flush(); @@ -661,25 +672,42 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti if (preserve_order) { CounterGuard counter_guard(flushes_in_progress_); QueueType flush_queue; // flush from temporary queue + swap_queues(messages_, flush_queue, mutex_); + QueueType retry_flush_queue; // flush from temporary retry queue + swap_queues(retry_messages_, retry_flush_queue, retry_mutex_); + + auto queue_flusher = [this](QueueType& queue)->bool { - std::lock_guard lock(mutex_); - std::swap(messages_, flush_queue); - } + if (!queue.empty()) { + sync_produce(queue.front()); + queue.pop_front(); + return true; + } + return false; + }; auto remaining = timeout; auto start_time = std::chrono::high_resolution_clock::now(); do { - sync_produce(flush_queue.front()); - flush_queue.pop_front(); + if (!queue_flusher(retry_flush_queue) && !queue_flusher(flush_queue)) { + break; + } // calculate remaining time remaining = timeout - std::chrono::duration_cast (std::chrono::high_resolution_clock::now() - start_time); - } while (!flush_queue.empty() && (remaining.count() > 0)); + } while (remaining.count() > 0); // Re-enqueue remaining messages in original order - if (!flush_queue.empty()) { - std::lock_guard lock(mutex_); - messages_.insert(messages_.begin(), std::make_move_iterator(flush_queue.begin()), std::make_move_iterator(flush_queue.end())); - } + auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex)->void + { + if (!src_queue.empty()) { + std::lock_guard lock(mutex); + dst_queue.insert(dst_queue.begin(), + std::make_move_iterator(src_queue.begin()), + std::make_move_iterator(src_queue.end())); + } + }; + re_enqueuer(retry_flush_queue, retry_messages_, retry_mutex_); + re_enqueuer(flush_queue, messages_, mutex_); } else { async_flush(); @@ -732,14 +760,15 @@ bool BufferedProducer::wait_for_acks(std::chrono::millise template void BufferedProducer::clear() { - std::lock_guard lock(mutex_); QueueType tmp; - std::swap(tmp, messages_); + swap_queues(messages_, tmp, mutex_); + QueueType retry_tmp; + swap_queues(retry_messages_, retry_tmp, retry_mutex_); } template size_t BufferedProducer::get_buffer_size() const { - return messages_.size(); + return messages_.size() + retry_messages_.size(); } template @@ -769,18 +798,20 @@ BufferedProducer::get_flush_method() const { template template void BufferedProducer::do_add_message(BuilderType&& builder, - MessagePriority priority, - bool do_flush) { - { + QueueKind queue_kind, + FlushAction flush_action) { + if (queue_kind == QueueKind::Retry) { + std::lock_guard lock(retry_mutex_); + retry_messages_.emplace_back(std::forward(builder)); + } + else { std::lock_guard lock(mutex_); - if (priority == MessagePriority::High) { - messages_.emplace_front(std::forward(builder)); - } - else { - messages_.emplace_back(std::forward(builder)); - } + messages_.emplace_back(std::forward(builder)); } - if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { + + // Flush the queues only if a regular message is added. Retry messages may be added + // from rdkafka callbacks, and flush/async_flush is a user-level call + if (queue_kind == QueueKind::Regular && flush_action == FlushAction::DoFlush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) { if (flush_method_ == FlushMethod::Sync) { flush(); } @@ -928,7 +959,7 @@ void BufferedProducer::async_produce(BuilderType&& builde TrackerPtr tracker = std::static_pointer_cast(builder.internal()); if (tracker && tracker->num_retries_ > 0) { --tracker->num_retries_; - do_add_message(std::forward(builder), MessagePriority::High, false); + do_add_message(std::forward(builder), QueueKind::Retry, FlushAction::DontFlush); return; } } @@ -967,7 +998,7 @@ void BufferedProducer::on_delivery_report(const Message& --tracker->num_retries_; if (tracker->sender_ == SenderType::Async) { // Re-enqueue for later retransmission with higher priority (i.e. front of the queue) - do_add_message(Builder(message), MessagePriority::High, false); + do_add_message(Builder(message), QueueKind::Retry, FlushAction::DontFlush); } should_retry = true; } @@ -999,6 +1030,13 @@ void BufferedProducer::on_delivery_report(const Message& } } +template +void BufferedProducer::swap_queues(BufferedProducer::QueueType & queue1, BufferedProducer::QueueType & queue2, std::mutex & mutex) +{ + std::lock_guard lock(mutex); + std::swap(queue1, queue2); +} + } // cppkafka #endif // CPPKAFKA_BUFFERED_PRODUCER_H