@@ -103,7 +103,7 @@ class CPPKAFKA_API BufferedProducer {
103103 /* *
104104 * Callback to indicate a message failed to be flushed
105105 */
106- using FlushFailureCallback = std::function<bool (const Builder &, Error error)>;
106+ using FlushFailureCallback = std::function<bool (const MessageBuilder &, Error error)>;
107107
108108 /* *
109109 * \brief Constructs a buffered producer using the provided configuration
@@ -368,24 +368,22 @@ class CPPKAFKA_API BufferedProducer {
368368 if (!has_internal_data_ && (max_number_retries_ > 0 )) {
369369 has_internal_data_ = true ; // enable once
370370 }
371- if (has_internal_data_) {
372- // Add message tracker
371+ if (has_internal_data_ && !builder. internal () ) {
372+ // Add message tracker only if it hasn't been added before
373373 TrackerPtr tracker = std::make_shared<Tracker>(SenderType::Async, max_number_retries_);
374374 builder.internal (tracker);
375375 return tracker;
376376 }
377377 return nullptr ;
378378 }
379-
380379 template <typename BuilderType>
381380 void do_add_message (BuilderType&& builder, MessagePriority priority, bool do_flush);
382- void do_add_message (const Message& message, MessagePriority priority, bool do_flush);
383- template <typename MessageType>
384- void produce_message (MessageType&& message);
381+ template <typename BuilderType>
382+ void produce_message (BuilderType&& builder);
385383 Configuration prepare_configuration (Configuration config);
386384 void on_delivery_report (const Message& message);
387- template <typename MessageType >
388- void async_produce (MessageType && message, bool throw_on_error);
385+ template <typename BuilderType >
386+ void async_produce (BuilderType && message, bool throw_on_error);
389387
390388 // Members
391389 Producer producer_;
@@ -465,7 +463,7 @@ void BufferedProducer<BufferType>::sync_produce(const MessageBuilder& builder) {
465463
466464template <typename BufferType>
467465void BufferedProducer<BufferType>::produce(const Message& message) {
468- async_produce (message, true );
466+ async_produce (MessageBuilder ( message) , true );
469467}
470468
471469template <typename BufferType>
@@ -545,13 +543,6 @@ void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
545543 }
546544}
547545
548- template <typename BufferType>
549- void BufferedProducer<BufferType>::do_add_message(const Message& message,
550- MessagePriority priority,
551- bool do_flush) {
552- do_add_messsage (MessageBuilder (message), priority, do_flush);
553- }
554-
555546template <typename BufferType>
556547Producer& BufferedProducer<BufferType>::get_producer() {
557548 return producer_;
@@ -614,11 +605,14 @@ void BufferedProducer<BufferType>::set_flush_failure_callback(FlushFailureCallba
614605}
615606
616607template <typename BufferType>
617- template <typename MessageType>
618- void BufferedProducer<BufferType>::produce_message(MessageType&& message) {
608+ template <typename BuilderType>
609+ void BufferedProducer<BufferType>::produce_message(BuilderType&& builder) {
610+ using builder_type = typename std::decay<BuilderType>::type;
619611 while (true ) {
620612 try {
621- producer_.produce (std::forward<MessageType>(message));
613+ MessageInternalGuard<builder_type> internal_guard (const_cast <builder_type&>(builder));
614+ producer_.produce (builder);
615+ internal_guard.release ();
622616 // Sent successfully
623617 ++pending_acks_;
624618 break ;
@@ -636,23 +630,23 @@ void BufferedProducer<BufferType>::produce_message(MessageType&& message) {
636630}
637631
638632template <typename BufferType>
639- template <typename MessageType >
640- void BufferedProducer<BufferType>::async_produce(MessageType && message , bool throw_on_error) {
633+ template <typename BuilderType >
634+ void BufferedProducer<BufferType>::async_produce(BuilderType && builder , bool throw_on_error) {
641635 try {
642636 TestParameters* test_params = get_test_parameters ();
643637 if (test_params && test_params->force_produce_error_ ) {
644638 throw HandleException (Error (RD_KAFKA_RESP_ERR_UNKNOWN));
645639 }
646- produce_message (std::forward<MessageType>(message ));
640+ produce_message (std::forward<BuilderType>(builder ));
647641 }
648642 catch (const HandleException& ex) {
649643 // If we have a flush failure callback and it returns true, we retry producing this message later
650644 CallbackInvoker<FlushFailureCallback> callback (" flush failure" , flush_failure_callback_, &producer_);
651- if (!callback || callback (std::forward<MessageType>(message ), ex.get_error ())) {
652- TrackerPtr tracker = std::static_pointer_cast<Tracker>(message .internal ());
645+ if (!callback || callback (std::forward<BuilderType>(builder ), ex.get_error ())) {
646+ TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder .internal ());
653647 if (tracker && tracker->num_retries_ > 0 ) {
654648 --tracker->num_retries_ ;
655- do_add_message (std::forward<MessageType>(message ), MessagePriority::High, false );
649+ do_add_message (std::forward<BuilderType>(builder ), MessagePriority::High, false );
656650 return ;
657651 }
658652 }
@@ -675,7 +669,8 @@ template <typename BufferType>
675669void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
676670 // Get tracker data
677671 TestParameters* test_params = get_test_parameters ();
678- TrackerPtr tracker = std::static_pointer_cast<Tracker>(message.internal ());
672+ TrackerPtr tracker = has_internal_data_ ?
673+ std::static_pointer_cast<Tracker>(MessageInternal::load (const_cast <Message&>(message))->internal_ ) : nullptr ;
679674 bool should_retry = false ;
680675 if (message.get_error () || (test_params && test_params->force_delivery_error_ )) {
681676 // We should produce this message again if we don't have a produce failure callback
0 commit comments