Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 75 additions & 6 deletions include/cppkafka/utils/buffered_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,39 @@ class CPPKAFKA_API BufferedProducer {
* Callback to indicate a message failed to be produced by the broker.
*
* The returned bool indicates whether the BufferedProducer should try to produce
* the message again after each failure.
* the message again after each failure, subject to the maximum number of retries set. If this callback
* is not set or returns false or if the number of retries reaches zero, the ProduceTerminationCallback
* will be called.
*/
using ProduceFailureCallback = std::function<bool(const Message&)>;

/**
* Callback to indicate a message failed to be produced by the broker and was dropped.
*
* The application can use this callback to track delivery failure of messages similar to the
* FlushTerminationCallback. If the application is only interested in message dropped events,
* then ProduceFailureCallback should not be set.
*/
using ProduceTerminationCallback = std::function<void(const Message&)>;

/**
* Callback to indicate a message failed to be flushed
*
* If this callback returns true, the message will be re-enqueued and flushed again later subject
* to the maximum number of retries set. If this callback is not set or returns false or if the number of retries
* reaches zero, the FlushTerminationCallback will be called.
*/
using FlushFailureCallback = std::function<bool(const MessageBuilder&, Error error)>;

/**
* Callback to indicate a message was dropped after multiple flush attempts or when the retry count
* reaches zero.
*
* The application can use this callback to track delivery failure of messages similar to the
* ProduceTerminationCallback. If the application is only interested in message dropped events,
* then FlushFailureCallback should not be set.
*/
using FlushTerminationCallback = std::function<void(const MessageBuilder&, Error error)>;

/**
* \brief Constructs a buffered producer using the provided configuration
Expand Down Expand Up @@ -343,13 +368,24 @@ class CPPKAFKA_API BufferedProducer {
*
* \param callback The callback to be set
*
* \remark It is *highly* recommended to set this callback as your message may be produced
* indefinitely if there's a remote error.
*
* \warning Do not call any method on the BufferedProducer while inside this callback.
*/
void set_produce_failure_callback(ProduceFailureCallback callback);

/**
* \brief Sets the message produce termination callback
*
* This will be called when the delivery report callback is executed for a message having
* an error and after all retries have expired and the message is dropped.
*
* \param callback The callback to be set
*
* \remark If the application only tracks dropped messages, the set_produce_failure_callback() should not be set.
*
* \warning Do not call any method on the BufferedProducer while inside this callback.
*/
void set_produce_termination_callback(ProduceTerminationCallback callback);

/**
* \brief Sets the successful delivery callback
*
Expand All @@ -360,19 +396,33 @@ class CPPKAFKA_API BufferedProducer {
void set_produce_success_callback(ProduceSuccessCallback callback);

/**
* \brief Sets the local message produce failure callback
* \brief Sets the local flush failure callback
*
* This callback will be called when local message production fails during a flush() operation.
* Failure errors are typically payload too large, unknown topic or unknown partition.
* Note that if the callback returns false, the message will be dropped from the buffer,
* otherwise it will be re-enqueued for later retry.
* otherwise it will be re-enqueued for later retry subject to the message retry count.
*
* \param callback
*
* \warning Do not call any method on the BufferedProducer while inside this callback
*/
void set_flush_failure_callback(FlushFailureCallback callback);

/**
* \brief Sets the local flush termination callback
*
* This callback will be called when local message production fails during a flush() operation after
* all previous flush attempts have failed. The message will be dropped after this callback.
*
* \param callback
*
* \remark If the application only tracks dropped messages, the set_flush_failure_callback() should not be set.
*
* \warning Do not call any method on the BufferedProducer while inside this callback
*/
void set_flush_termination_callback(FlushTerminationCallback callback);

struct TestParameters {
bool force_delivery_error_;
bool force_produce_error_;
Expand Down Expand Up @@ -444,7 +494,9 @@ class CPPKAFKA_API BufferedProducer {
mutable std::mutex mutex_;
ProduceSuccessCallback produce_success_callback_;
ProduceFailureCallback produce_failure_callback_;
ProduceTerminationCallback produce_termination_callback_;
FlushFailureCallback flush_failure_callback_;
FlushTerminationCallback flush_termination_callback_;
ssize_t max_buffer_size_{-1};
FlushMethod flush_method_{FlushMethod::Sync};
std::atomic<size_t> pending_acks_{0};
Expand Down Expand Up @@ -745,6 +797,11 @@ void BufferedProducer<BufferType, Allocator>::set_produce_failure_callback(Produ
produce_failure_callback_ = std::move(callback);
}

template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::set_produce_termination_callback(ProduceTerminationCallback callback) {
produce_termination_callback_ = std::move(callback);
}

template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::set_produce_success_callback(ProduceSuccessCallback callback) {
produce_success_callback_ = std::move(callback);
Expand All @@ -755,6 +812,11 @@ void BufferedProducer<BufferType, Allocator>::set_flush_failure_callback(FlushFa
flush_failure_callback_ = std::move(callback);
}

template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::set_flush_termination_callback(FlushTerminationCallback callback) {
flush_termination_callback_ = std::move(callback);
}

template <typename BufferType, typename Allocator>
template <typename BuilderType>
void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& builder) {
Expand Down Expand Up @@ -802,6 +864,9 @@ void BufferedProducer<BufferType, Allocator>::async_produce(BuilderType&& builde
}
}
++total_messages_dropped_;
// Call the flush termination callback
CallbackInvoker<FlushTerminationCallback>("flush termination", flush_termination_callback_, &producer_)
(builder, ex.get_error());
if (throw_on_error) {
throw;
}
Expand Down Expand Up @@ -839,10 +904,14 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
}
else {
++total_messages_dropped_;
CallbackInvoker<ProduceTerminationCallback>
("produce termination", produce_termination_callback_, &producer_)(message);
}
}
else {
++total_messages_dropped_;
CallbackInvoker<ProduceTerminationCallback>
("produce termination", produce_termination_callback_, &producer_)(message);
}
}
else {
Expand Down