Skip to content

Commit 4d43d96

Browse files
author
accelerated
committed
Prevent MessageInternal structures if there is no delivery callback registered
1 parent 97447cd commit 4d43d96

1 file changed

Lines changed: 14 additions & 4 deletions

File tree

src/producer.cpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,20 @@ void Producer::produce(const MessageBuilder& builder) {
6767
const Buffer& payload = builder.payload();
6868
const Buffer& key = builder.key();
6969
const int policy = static_cast<int>(message_payload_policy_);
70-
unique_ptr<MessageInternal> internal_data(new MessageInternal(builder.user_data(), builder.internal()));
70+
void* opaque = builder.user_data();
71+
unique_ptr<MessageInternal> internal_data;
72+
if (get_configuration().get_delivery_report_callback()) {
73+
internal_data.reset(new MessageInternal(builder.user_data(), builder.internal()));
74+
opaque = internal_data.get();
75+
}
7176
auto result = rd_kafka_producev(get_handle(),
7277
RD_KAFKA_V_TOPIC(builder.topic().data()),
7378
RD_KAFKA_V_PARTITION(builder.partition()),
7479
RD_KAFKA_V_MSGFLAGS(policy),
7580
RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()),
7681
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
7782
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
78-
RD_KAFKA_V_OPAQUE(internal_data.get()),
83+
RD_KAFKA_V_OPAQUE(opaque),
7984
RD_KAFKA_V_END);
8085
check_error(result);
8186
internal_data.release(); //data has been passed-on to rdkafka so we release ownership
@@ -86,15 +91,20 @@ void Producer::produce(const Message& message) {
8691
const Buffer& key = message.get_key();
8792
const int policy = static_cast<int>(message_payload_policy_);
8893
int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0;
89-
unique_ptr<MessageInternal> internal_data(new MessageInternal(message.get_user_data(), message.internal()));
94+
void* opaque = message.get_user_data();
95+
unique_ptr<MessageInternal> internal_data;
96+
if (get_configuration().get_delivery_report_callback()) {
97+
internal_data.reset(new MessageInternal(message.get_user_data(), message.internal()));
98+
opaque = internal_data.get();
99+
}
90100
auto result = rd_kafka_producev(get_handle(),
91101
RD_KAFKA_V_TOPIC(message.get_topic().data()),
92102
RD_KAFKA_V_PARTITION(message.get_partition()),
93103
RD_KAFKA_V_MSGFLAGS(policy),
94104
RD_KAFKA_V_TIMESTAMP(duration),
95105
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
96106
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
97-
RD_KAFKA_V_OPAQUE(internal_data.get()),
107+
RD_KAFKA_V_OPAQUE(opaque),
98108
RD_KAFKA_V_END);
99109
check_error(result);
100110
internal_data.release(); //data has been passed-on to rdkafka so we release ownership

0 commit comments

Comments
 (0)