Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/cppkafka/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class CPPKAFKA_API Configuration : public ConfigurationBase<Configuration> {
Configuration& set_default_topic_configuration(TopicConfiguration config);

/**
* Returns true iff the given property name has been set
* Returns true if the given property name has been set
*/
bool has_property(const std::string& name) const;

Expand Down
1 change: 1 addition & 0 deletions include/cppkafka/cppkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <cppkafka/macros.h>
#include <cppkafka/message.h>
#include <cppkafka/message_builder.h>
#include <cppkafka/message_internal.h>
#include <cppkafka/metadata.h>
#include <cppkafka/producer.h>
#include <cppkafka/queue.h>
Expand Down
18 changes: 15 additions & 3 deletions include/cppkafka/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
namespace cppkafka {

class MessageTimestamp;
struct Internal;

/**
* \brief Thin wrapper over a rdkafka message handle
Expand All @@ -56,6 +57,8 @@ class MessageTimestamp;
*/
class CPPKAFKA_API Message {
public:
friend class MessageInternal;
using InternalPtr = std::shared_ptr<Internal>;
/**
* Constructs a message that won't take ownership of the given pointer
*/
Expand Down Expand Up @@ -134,14 +137,13 @@ class CPPKAFKA_API Message {
}

/**
* \brief Gets the private data.
* \brief Gets the private user data.
*
* This should only be used on messages produced by a Producer that were set a private data
* attribute
*/
void* get_user_data() const {
assert(handle_);
return handle_->_private;
return user_data_;
}

/**
Expand All @@ -164,17 +166,27 @@ class CPPKAFKA_API Message {
rd_kafka_message_t* get_handle() const {
return handle_.get();
}

/**
* Internal private const data accessor (internal use only)
*/
InternalPtr internal() const {
return internal_;
}
private:
using HandlePtr = std::unique_ptr<rd_kafka_message_t, decltype(&rd_kafka_message_destroy)>;

struct NonOwningTag { };

Message(rd_kafka_message_t* handle, NonOwningTag);
Message(HandlePtr handle);
Message& load_internal();

HandlePtr handle_;
Buffer payload_;
Buffer key_;
void* user_data_;
InternalPtr internal_;
};

using MessageList = std::vector<Message>;
Expand Down
39 changes: 34 additions & 5 deletions include/cppkafka/message_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ class BasicMessageBuilder {
* Gets the message's user data pointer
*/
void* user_data() const;

/**
* Private data accessor (internal use only)
*/
Message::InternalPtr internal() const;
Concrete& internal(Message::InternalPtr internal);

private:
void construct_buffer(BufferType& lhs, const BufferType& rhs);
Concrete& get_concrete();
Expand All @@ -176,11 +183,13 @@ class BasicMessageBuilder {
BufferType payload_;
std::chrono::milliseconds timestamp_{0};
void* user_data_;
Message::InternalPtr internal_;
};

template <typename T, typename C>
BasicMessageBuilder<T, C>::BasicMessageBuilder(std::string topic)
: topic_(std::move(topic)) {
: topic_(std::move(topic)),
user_data_(nullptr) {
}

template <typename T, typename C>
Expand All @@ -190,16 +199,16 @@ BasicMessageBuilder<T, C>::BasicMessageBuilder(const Message& message)
payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())),
timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() :
std::chrono::milliseconds(0)),
user_data_(message.get_user_data())
{

user_data_(message.get_user_data()),
internal_(message.internal()) {
}

template <typename T, typename C>
template <typename U, typename V>
BasicMessageBuilder<T, C>::BasicMessageBuilder(const BasicMessageBuilder<U, V>& rhs)
: topic_(rhs.topic()), partition_(rhs.partition()), timestamp_(rhs.timestamp()),
user_data_(rhs.user_data()) {
user_data_(rhs.user_data()),
internal_(rhs.internal()) {
get_concrete().construct_buffer(key_, rhs.key());
get_concrete().construct_buffer(payload_, rhs.payload());
}
Expand Down Expand Up @@ -292,6 +301,17 @@ void* BasicMessageBuilder<T, C>::user_data() const {
return user_data_;
}

template <typename T, typename C>
Message::InternalPtr BasicMessageBuilder<T, C>::internal() const {
return internal_;
}

template <typename T, typename C>
C& BasicMessageBuilder<T, C>::internal(Message::InternalPtr internal) {
internal_ = internal;
return get_concrete();
}

template <typename T, typename C>
void BasicMessageBuilder<T, C>::construct_buffer(T& lhs, const T& rhs) {
lhs = rhs;
Expand Down Expand Up @@ -328,6 +348,15 @@ class MessageBuilder : public BasicMessageBuilder<Buffer, MessageBuilder> {
void construct_buffer(Buffer& lhs, const T& rhs) {
lhs = Buffer(rhs);
}

MessageBuilder clone() const {
return std::move(MessageBuilder(topic()).
key(Buffer(key().get_data(), key().get_size())).
payload(Buffer(payload().get_data(), payload().get_size())).
timestamp(timestamp()).
user_data(user_data()).
internal(internal()));
}
};

/**
Expand Down
86 changes: 86 additions & 0 deletions include/cppkafka/message_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

#ifndef CPPKAFKA_MESSAGE_INTERNAL_H
#define CPPKAFKA_MESSAGE_INTERNAL_H

#include <memory>

namespace cppkafka {

class Message;

class Internal {
public:
virtual ~Internal() = default;
};
using InternalPtr = std::shared_ptr<Internal>;

/**
* \brief Private message data structure
*/
class MessageInternal {
public:
MessageInternal(void* user_data, std::shared_ptr<Internal> internal);
static std::unique_ptr<MessageInternal> load(Message& message);
void* get_user_data() const;
InternalPtr get_internal() const;
private:
void* user_data_;
InternalPtr internal_;
};

template <typename BuilderType>
class MessageInternalGuard {
public:
MessageInternalGuard(BuilderType& builder)
: builder_(builder),
user_data_(builder.user_data()) {
if (builder_.internal()) {
// Swap contents with user_data
ptr_.reset(new MessageInternal(user_data_, builder_.internal()));
builder_.user_data(ptr_.get()); //overwrite user data
}
}
~MessageInternalGuard() {
//Restore user data
builder_.user_data(user_data_);
}
void release() {
ptr_.release();
}
private:
BuilderType& builder_;
std::unique_ptr<MessageInternal> ptr_;
void* user_data_;
};

}

#endif //CPPKAFKA_MESSAGE_INTERNAL_H
Loading