Skip to content

Commit 0851cac

Browse files
author
accelerated
committed
added flag for flush
1 parent 1676df3 commit 0851cac

File tree

1 file changed

+48
-14
lines changed

1 file changed

+48
-14
lines changed

include/cppkafka/utils/buffered_producer.h

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -185,20 +185,28 @@ class CPPKAFKA_API BufferedProducer {
185185
* This will send all messages and keep waiting until all of them are acknowledged (this is
186186
* done by calling wait_for_acks).
187187
*
188+
* \param preserve_order If set to True, each message in the queue will be flushed only when the previous
189+
* message ack is received. This may result in performance degradation as messages
190+
* are sent one at a time. This calls sync_produce() on each message in the buffer.
191+
* If set to False, all messages are flushed in one batch before waiting for acks,
192+
* however message reordering may occur if librdkafka setting 'messages.sent.max.retries > 0'.
193+
*
188194
* \remark Although it is possible to call flush from multiple threads concurrently, better
189195
* performance is achieved when called from the same thread or when serialized
190196
* with respect to other threads.
191197
*/
192-
void flush();
198+
void flush(bool preserve_order = false);
193199

194200
/**
195201
* \brief Flushes the buffered messages and waits up to 'timeout'
196202
*
197203
* \param timeout The maximum time to wait until all acks are received
198204
*
205+
* \param preserve_order True to preserve message ordering, False otherwise. See flush above for more details.
206+
*
199207
* \return True if the operation completes and all acks have been received.
200208
*/
201-
bool flush(std::chrono::milliseconds timeout);
209+
bool flush(std::chrono::milliseconds timeout, bool preserve_order = false);
202210

203211
/**
204212
* Waits for produced message's acknowledgements from the brokers
@@ -528,23 +536,49 @@ void BufferedProducer<BufferType>::async_flush() {
528536
}
529537

530538
template <typename BufferType>
531-
void BufferedProducer<BufferType>::flush() {
532-
CounterGuard<size_t> counter_guard(flushes_in_progress_);
533-
QueueType flush_queue; // flush from temporary queue
534-
{
535-
std::lock_guard<std::mutex> lock(mutex_);
536-
std::swap(messages_, flush_queue);
539+
void BufferedProducer<BufferType>::flush(bool preserve_order) {
540+
if (preserve_order) {
541+
CounterGuard<size_t> counter_guard(flushes_in_progress_);
542+
QueueType flush_queue; // flush from temporary queue
543+
{
544+
std::lock_guard<std::mutex> lock(mutex_);
545+
std::swap(messages_, flush_queue);
546+
}
547+
while (!flush_queue.empty()) {
548+
sync_produce(flush_queue.front());
549+
flush_queue.pop_front();
550+
}
537551
}
538-
while (!flush_queue.empty()) {
539-
sync_produce(flush_queue.front());
540-
flush_queue.pop_front();
552+
else {
553+
async_flush();
554+
wait_for_acks();
541555
}
542556
}
543557

544558
template <typename BufferType>
545-
bool BufferedProducer<BufferType>::flush(std::chrono::milliseconds timeout) {
546-
async_flush();
547-
return wait_for_acks(timeout);
559+
bool BufferedProducer<BufferType>::flush(std::chrono::milliseconds timeout,
560+
bool preserve_order) {
561+
if (preserve_order) {
562+
CounterGuard<size_t> counter_guard(flushes_in_progress_);
563+
QueueType flush_queue; // flush from temporary queue
564+
{
565+
std::lock_guard<std::mutex> lock(mutex_);
566+
std::swap(messages_, flush_queue);
567+
}
568+
auto start_time = std::chrono::high_resolution_clock::now();
569+
while (!flush_queue.empty()) {
570+
sync_produce(flush_queue.front());
571+
flush_queue.pop_front();
572+
if (std::chrono::duration_cast<std::chrono::milliseconds>
573+
(std::chrono::high_resolution_clock::now() - start_time) > timeout) {
574+
break;
575+
}
576+
}
577+
}
578+
else {
579+
async_flush();
580+
return wait_for_acks(timeout);
581+
}
548582
}
549583

550584
template <typename BufferType>

0 commit comments

Comments
 (0)