Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 64 additions & 18 deletions include/cppkafka/utils/buffered_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,25 @@ namespace cppkafka {
* When producing messages, this class will handle cases where the producer's queue is full so it
* will poll until the production is successful.
*
* \remark This class is thread safe
* \remark This class is thread safe.
*
* \warning
* Delivery Report Callback: This class makes internal use of this function and will overwrite anything
* the user has supplied as part of the configuration options. Instead user should call
* set_produce_success_callback() and set_produce_failure_callback() respectively.
* \remark Releasing buffers: For high-performance applications preferring a zero-copy approach
* (using PayloadPolicy::PASSTHROUGH_PAYLOAD - see warning below) it is very important to know when
* to safely release owned message buffers. One way is to perform individual cleanup when
* ProduceSuccessCallback is called. If the application produces messages in batches or has a
* bursty behavior another way is to check when flush operations have fully completed with
* get_buffer_size()==0 && get_flushes_in_progress()==0. Note that get_pending_acks()==0
* is not always a guarantee as there is very small window when flush() starts where
* get_buffer_size()==0 && get_pending_acks()==0 but messages have not yet been sent to the
* remote broker. For applications producing messages w/o buffering, get_pending_acks()==0
* is sufficient.
*
* Payload Policy: For payload-owning BufferTypes such as std::string or std::vector<char> the default
* policy is set to Producer::PayloadPolicy::COPY_PAYLOAD. For the specific non-payload owning type
* \warning Delivery Report Callback: This class makes internal use of this function and will
* overwrite anything the user has supplied as part of the configuration options. Instead user
* should call set_produce_success_callback() and set_produce_failure_callback() respectively.
*
* \warning Payload Policy: For payload-owning BufferTypes such as std::string or std::vector<char>
* the default policy is set to Producer::PayloadPolicy::COPY_PAYLOAD. For the specific non-payload owning type
* cppkafka::Buffer the policy is Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD. In this case, librdkafka
* shall not make any internal copies of the message and it is the application's responsability to free
* the messages *after* the ProduceSuccessCallback has reported a successful delivery to avoid memory
Expand Down Expand Up @@ -197,11 +207,28 @@ class CPPKAFKA_API BufferedProducer {
size_t get_buffer_size() const;

/**
* \brief Returns the total number of messages ack-ed by the broker since the beginning
* \brief Get the number of messages not yet acked by the broker
*
* \return The number of messages
*/
size_t get_total_messages_acked() const;
size_t get_pending_acks() const;

/**
* \brief Get the total number of messages successfully produced since the beginning
*
* \return The number of messages
*/
size_t get_total_messages_produced() const;

/**
* \brief Get the total outstanding flush operations in progress
*
* Since flush can be called from multiple threads concurrently, this counter indicates
* how many operations are curretnly in progress.
*
* \return The number of outstanding flush operations.
*/
size_t get_flushes_in_progress() const;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really a worth having metric? You can't really make a decision based on this as by the time the function returns, any in-progress flush may have already finished. I imagine you added this because you found a use case for it. If so, do you mind sharing it?

Copy link
Contributor Author

@accelerated accelerated May 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well the idea is well explained in the header of the class. By itself it's not too useful, you have to pair it with get_buffer_size() which if both return 0 is a good indication that everything has been pushed to the broker. The use case is that I'm writing a library on top of yours with a much higher level of abstraction (i.e. imagine dozens of producers and consumers maybe more and very generic so it's usable on multiple different projects), and I'm taking in messages in native format, serialize them (via transformer callbacks the user registers) and buffer them in chunks because I'm pulling them from application queues, and then flush them so I need a deterministic way of knowing when to delete the original (non-serialized) data - held in unique ptrs. If it turns out that it's not a reliable process or if I change my design later on, I will prob have some sort of correlation hash map based on the message handle and use that along with the success callback to delete each message as it's acked. But I kinda want to avoid all the look-ups.
In any case I'm using stack based allocators for the native message types and then i want zero copy until the message is delivered. The application has to be as low-latency as possible. Same thing in reverse will happen on the consumer side i.e. message is deserialized once via another transformer callback then moved out via unique ptrs, so my library relinquishes all ownership.

Otherwise, it may be good to wait for all flushes to end before shutting down the producer for example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to answer:

You can't really make a decision based on this as by the time the function returns, any in-progress flush may have already finished

Exactly and that's fine. I'm checking when flushes get to 0, I don't care about how many are at any point in time.


/**
* Gets the Producer object
Expand Down Expand Up @@ -260,6 +287,13 @@ class CPPKAFKA_API BufferedProducer {
private:
using QueueType = std::deque<Builder>;
enum class MessagePriority { Low, High };

template <typename T>
struct CounterGuard{
CounterGuard(std::atomic<T>& counter) : counter_(counter) { ++counter_; }
~CounterGuard() { --counter_; }
std::atomic<T>& counter_;
};

template <typename BuilderType>
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush);
Expand All @@ -276,8 +310,9 @@ class CPPKAFKA_API BufferedProducer {
ProduceFailureCallback produce_failure_callback_;
FlushFailureCallback flush_failure_callback_;
ssize_t max_buffer_size_{-1};
std::atomic_ulong expected_acks_{0};
std::atomic_ullong total_messages_acked_{0};
std::atomic<size_t> pending_acks_{0};
std::atomic<size_t> flushes_in_progress_{0};
std::atomic<size_t> total_messages_produced_{0};
};

template <typename BufferType>
Expand Down Expand Up @@ -318,6 +353,7 @@ void BufferedProducer<BufferType>::produce(const Message& message) {

template <typename BufferType>
void BufferedProducer<BufferType>::flush() {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there should be a loop here that makes sure that when flush exits, any previously added messages are indeed flushed. Right now if you flush and a message fails, it will be left in the queue and only flushed again after flush is called again. My expectation is that after calling flush, I actually flushed the writer and everything in it is written.

e.g.

while (true) {
    QueueType flush_queue;
    // {  lock & swap }
    if (flush_queue.empty()) {
        return;
    }
    // your loop...
    // wait for acks...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the kind of thing i am trying to avoid, if you have this ongoing flush, while other threads (including delivery failures) keep enqueuing at the same or higher rate, you have now created an open faucet situation, where nothing is buffered. My expectation is that 1) flush is called periodically so re-enqueued messages will eventually be delivered 2) flush only flushes what's in the buffer at the time of the call and not behaving like an ongoing flush.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm that's fair. I guess if you want to ensure everything is flushed you as a user should loop calling flush while there's still messages pending.

{
std::lock_guard<std::mutex> lock(mutex_);
Expand All @@ -341,7 +377,7 @@ void BufferedProducer<BufferType>::flush() {

template <typename BufferType>
void BufferedProducer<BufferType>::wait_for_acks() {
while (expected_acks_ > 0) {
while (pending_acks_ > 0) {
try {
producer_.flush();
}
Expand Down Expand Up @@ -412,8 +448,18 @@ const Producer& BufferedProducer<BufferType>::get_producer() const {
}

template <typename BufferType>
size_t BufferedProducer<BufferType>::get_total_messages_acked() const {
return total_messages_acked_;
size_t BufferedProducer<BufferType>::get_pending_acks() const {
return pending_acks_;
}

template <typename BufferType>
size_t BufferedProducer<BufferType>::get_total_messages_produced() const {
return total_messages_produced_;
}

template <typename BufferType>
size_t BufferedProducer<BufferType>::get_flushes_in_progress() const {
return flushes_in_progress_;
}

template <typename BufferType>
Expand Down Expand Up @@ -444,7 +490,7 @@ void BufferedProducer<BufferType>::produce_message(const MessageType& message) {
try {
producer_.produce(message);
// Sent successfully
++expected_acks_;
++pending_acks_;
break;
}
catch (const HandleException& ex) {
Expand All @@ -470,8 +516,8 @@ Configuration BufferedProducer<BufferType>::prepare_configuration(Configuration
template <typename BufferType>
void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
// Decrement the expected acks
--expected_acks_;
assert(expected_acks_ != (unsigned long)-1); // Prevent underflow
--pending_acks_;
assert(pending_acks_ != (size_t)-1); // Prevent underflow

// We should produce this message again if it has an error and we either don't have a
// produce failure callback or we have one but it returns true
Expand All @@ -487,7 +533,7 @@ void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
produce_success_callback_(message);
}
// Increment the total successful transmissions
++total_messages_acked_;
++total_messages_produced_;
}
}

Expand Down
8 changes: 6 additions & 2 deletions tests/producer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,9 @@ TEST_CASE("multi-threaded buffered producer", "[producer][buffered_producer]") {
}
const auto& messages = runner.get_messages();
REQUIRE(messages.size() == num_messages);
REQUIRE(producer.get_total_messages_acked() == num_messages);
REQUIRE(producer.get_flushes_in_progress() == 0);
REQUIRE(producer.get_pending_acks() == 0);
REQUIRE(producer.get_total_messages_produced() == num_messages);
REQUIRE(producer.get_buffer_size() == 0);
}

Expand All @@ -390,6 +392,8 @@ TEST_CASE("clear multi-threaded buffered producer", "[producer][buffered_produce
thread.join();
}

REQUIRE(producer.get_total_messages_acked() == 0);
REQUIRE(producer.get_total_messages_produced() == 0);
REQUIRE(producer.get_flushes_in_progress() == 0);
REQUIRE(producer.get_pending_acks() == 0);
REQUIRE(producer.get_buffer_size() < num_messages);
}