3737#include < unordered_set>
3838#include < unordered_map>
3939#include < map>
40+ #include < boost/thread/mutex.hpp>
41+ #include < boost/thread/shared_mutex.hpp>
42+ #include < atomic>
4043#include < boost/optional.hpp>
4144#include " ../producer.h"
4245#include " ../message.h"
@@ -126,6 +129,29 @@ class CPPKAFKA_API BufferedProducer {
126129 * Clears any buffered messages
127130 */
128131 void clear ();
132+
133+ /* *
134+ * \brief Returns the current size of the buffer
135+ *
136+ * \return Number of unsent messages in the buffer
137+ */
138+ size_t get_buffer_size () const ;
139+
140+ /* *
141+ * \brief Returns the total number of messages ack-ed by the broker
142+ *
143+ * \return The total number of messages since the beginning or since the last roll-over
144+ *
145+ * \remark Call get_rollover_count() to get the number of times the counter has rolled over
146+ */
147+ size_t get_total_messages_acked () const ;
148+
149+ /* *
150+ * \brief Roll-over counter for get_total_messages_acked
151+ *
152+ * \return The number of rolls
153+ */
154+ uint16_t get_rollover_count () const ;
129155
130156 /* *
131157 * Gets the Producer object
@@ -152,6 +178,7 @@ class CPPKAFKA_API BufferedProducer {
152178 * \param callback The callback to be set
153179 */
154180 void set_produce_failure_callback (ProduceFailureCallback callback);
181+
155182private:
156183 using QueueType = std::queue<Builder>;
157184
@@ -163,9 +190,12 @@ class CPPKAFKA_API BufferedProducer {
163190
164191 Producer producer_;
165192 QueueType messages_;
193+ mutable boost::mutex exclusive_access_;
194+ mutable boost::shared_mutex shared_access_;
166195 ProduceFailureCallback produce_failure_callback_;
167- size_t expected_acks_{0 };
168- size_t messages_acked_{0 };
196+ std::atomic_ulong expected_acks_{0 };
197+ std::atomic_ullong total_messages_acked_{0 };
198+ std::atomic_ushort rollover_counter_{0 };
169199};
170200
171201template <typename BufferType>
@@ -187,22 +217,25 @@ void BufferedProducer<BufferType>::add_message(Builder builder) {
187217template <typename BufferType>
188218void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
189219 produce_message (builder);
190- expected_acks_++;
191220}
192221
193222template <typename BufferType>
194223void BufferedProducer<BufferType>::flush() {
195- while (!messages_.empty ()) {
196- produce_message (messages_.front ());
197- messages_.pop ();
224+ {
225+ boost::shared_lock<boost::shared_mutex> grant (shared_access_);
226+ size_t num_messages = messages_.size ();
227+ while (num_messages--) {
228+ produce_message (messages_.front ());
229+ boost::lock_guard<boost::mutex> require (exclusive_access_);
230+ messages_.pop ();
231+ }
198232 }
199-
200233 wait_for_acks ();
201234}
202235
203236template <typename BufferType>
204237void BufferedProducer<BufferType>::wait_for_acks() {
205- while (messages_acked_ < expected_acks_ ) {
238+ while (expected_acks_ > 0 ) {
206239 try {
207240 producer_.flush ();
208241 }
@@ -216,22 +249,20 @@ void BufferedProducer<BufferType>::wait_for_acks() {
216249 }
217250 }
218251 }
219- expected_acks_ = 0 ;
220- messages_acked_ = 0 ;
221252}
222253
223254template <typename BufferType>
224255void BufferedProducer<BufferType>::clear() {
256+ boost::unique_lock<boost::shared_mutex> restrict (shared_access_);
225257 QueueType tmp;
226258 std::swap (tmp, messages_);
227- expected_acks_ = 0 ;
228- messages_acked_ = 0 ;
229259}
230260
231261template <typename BufferType>
232262template <typename BuilderType>
233263void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder) {
234- expected_acks_++;
264+ boost::shared_lock<boost::shared_mutex> grant (shared_access_);
265+ boost::lock_guard<boost::mutex> require (exclusive_access_);
235266 messages_.push (std::move (builder));
236267}
237268
@@ -245,6 +276,21 @@ const Producer& BufferedProducer<BufferType>::get_producer() const {
245276 return producer_;
246277}
247278
279+ template <typename BufferType>
280+ size_t BufferedProducer<BufferType>::get_buffer_size() const {
281+ return messages_.size ();
282+ }
283+
284+ template <typename BufferType>
285+ size_t BufferedProducer<BufferType>::get_total_messages_acked() const {
286+ return total_messages_acked_;
287+ }
288+
289+ template <typename BufferType>
290+ uint16_t BufferedProducer<BufferType>::get_rollover_count() const {
291+ return rollover_counter_;
292+ }
293+
248294template <typename BufferType>
249295typename BufferedProducer<BufferType>::Builder
250296BufferedProducer<BufferType>::make_builder(std::string topic) {
@@ -275,6 +321,8 @@ void BufferedProducer<BufferType>::produce_message(const MessageBuilder& builder
275321 }
276322 }
277323 }
324+ // Sent successfully
325+ ++expected_acks_;
278326}
279327
280328template <typename BufferType>
@@ -287,6 +335,10 @@ Configuration BufferedProducer<BufferType>::prepare_configuration(Configuration
287335
288336template <typename BufferType>
289337void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
338+ // Decrement the expected acks
339+ --expected_acks_;
340+ assert (expected_acks_ != (unsigned long )-1 ); // Prevent underflow
341+
290342 // We should produce this message again if it has an error and we either don't have a
291343 // produce failure callback or we have one but it returns true
292344 bool should_produce = message.get_error () &&
@@ -305,9 +357,12 @@ void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
305357 produce_message (builder);
306358 return ;
307359 }
308- // If production was successful or the produce failure callback returned false, then
309- // let's consider it to be acked
310- messages_acked_++;
360+
361+ // Increment the total successful transmissions
362+ ++total_messages_acked_;
363+ if (total_messages_acked_ == 0 ) {
364+ ++rollover_counter_;
365+ }
311366}
312367
313368} // cppkafka
0 commit comments