Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions include/cppkafka/utils/buffered_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,15 @@ class CPPKAFKA_API BufferedProducer {
};
using TrackerPtr = std::shared_ptr<Tracker>;

// Returns existing tracker or creates new one
template <typename BuilderType>
TrackerPtr add_tracker(BuilderType& builder) {
if (has_internal_data_ && !builder.internal()) {
// Add message tracker only if it hasn't been added before
TrackerPtr tracker = std::make_shared<Tracker>(SenderType::Async, max_number_retries_);
builder.internal(tracker);
return tracker;
TrackerPtr add_tracker(SenderType sender, BuilderType& builder) {
if (has_internal_data_) {
if (!builder.internal()) {
// Add message tracker only if it hasn't been added before
builder.internal(std::make_shared<Tracker>(sender, max_number_retries_));
}
return std::static_pointer_cast<Tracker>(builder.internal());
}
return nullptr;
}
Expand Down Expand Up @@ -469,15 +471,15 @@ void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) {

template <typename BufferType>
void BufferedProducer<BufferType>::add_message(Builder builder) {
add_tracker(builder);
add_tracker(SenderType::Async, builder);
do_add_message(move(builder), MessagePriority::Low, true);
}

template <typename BufferType>
void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
if (has_internal_data_) {
MessageBuilder builder_clone(builder.clone());
add_tracker(builder_clone);
add_tracker(SenderType::Async, builder_clone);
async_produce(builder_clone, true);
}
else {
Expand All @@ -489,7 +491,7 @@ template <typename BufferType>
void BufferedProducer<BufferType>::sync_produce(const MessageBuilder& builder) {
if (has_internal_data_) {
MessageBuilder builder_clone(builder.clone());
TrackerPtr tracker = add_tracker(builder_clone);
TrackerPtr tracker = add_tracker(SenderType::Sync, builder_clone);
// produce until we succeed or we reach max retry limit
std::future<bool> should_retry;
do {
Expand Down Expand Up @@ -527,8 +529,16 @@ void BufferedProducer<BufferType>::async_flush() {

template <typename BufferType>
void BufferedProducer<BufferType>::flush() {
async_flush();
wait_for_acks();
CounterGuard<size_t> counter_guard(flushes_in_progress_);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that this is trying to preserve the ordering but this is extremely inefficient. I'm not sure this should be the default behavior. Many applications just care about the entire batch of messages to be produced, they don't care too much about the exact ordering in which that happens. Maybe this belongs on a separate method or something like that.

QueueType flush_queue; // flush from temporary queue
{
std::lock_guard<std::mutex> lock(mutex_);
std::swap(messages_, flush_queue);
}
while (!flush_queue.empty()) {
sync_produce(flush_queue.front());
flush_queue.pop_front();
}
}

template <typename BufferType>
Expand Down
15 changes: 14 additions & 1 deletion tests/producer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ void flusher_run(BufferedProducer<string>& producer,
producer.flush();
}

void async_flusher_run(BufferedProducer<string>& producer,
int& exit_flag,
int num_flush) {
while (!exit_flag) {
if (producer.get_buffer_size() >= (size_t)num_flush) {
producer.async_flush();
}
this_thread::sleep_for(milliseconds(10));
}
producer.async_flush();
producer.wait_for_acks();
}

void clear_run(BufferedProducer<string>& producer,
condition_variable& clear) {
mutex m;
Expand Down Expand Up @@ -377,7 +390,7 @@ TEST_CASE("replay async messages with errors", "[producer][buffered_producer][as
ErrorProducer<string> producer(make_producer_config(),
BufferedProducer<string>::TestParameters{false, true});
producer.set_max_number_retries(num_retries);
thread flusher_thread(flusher_run, ref(producer), ref(exit_flag), 0);
thread flusher_thread(async_flusher_run, ref(producer), ref(exit_flag), 0);
string payload = "Hello world";
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).payload(payload));
this_thread::sleep_for(milliseconds(2000));
Expand Down