Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/cppkafka/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ CPPKAFKA_API bool operator==(const Buffer& lhs, const Buffer& rhs);
*/
CPPKAFKA_API bool operator!=(const Buffer& lhs, const Buffer& rhs);

/**
* Compares Buffer objects lexicographically
*/
CPPKAFKA_API bool operator<(const Buffer& lhs, const Buffer& rhs);
CPPKAFKA_API bool operator<=(const Buffer& lhs, const Buffer& rhs);
CPPKAFKA_API bool operator>(const Buffer& lhs, const Buffer& rhs);
CPPKAFKA_API bool operator>=(const Buffer& lhs, const Buffer& rhs);

} // cppkafka

#endif // CPPKAFKA_BUFFER_H
27 changes: 22 additions & 5 deletions include/cppkafka/clonable_ptr.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ template <typename T, typename Deleter, typename Cloner>
class ClonablePtr {
public:
/**
* Creates an instance
* \brief Creates an instance
*
* \param ptr The pointer to be wrapped
* \param deleter The deleter functor
Expand All @@ -60,17 +60,27 @@ class ClonablePtr {
* \param rhs The pointer to be copied
*/
ClonablePtr(const ClonablePtr& rhs)
: handle_(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()), cloner_(rhs.cloner_) {
: handle_(rhs.cloner_ ? std::unique_ptr<T, Deleter>(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) :
std::unique_ptr<T, Deleter>(nullptr, nullptr)),
cloner_(rhs.cloner_) {

}

/**
* Copies and assigns the given pointer
* \brief Copies and assigns the given pointer
*
* \param rhs The pointer to be copied
*/
ClonablePtr& operator=(const ClonablePtr& rhs) {
handle_.reset(cloner_(rhs.handle_.get()));
if (this == &rhs) {
return *this;
}
if (rhs.cloner_) {
handle_.reset(rhs.cloner_(rhs.handle_.get()));
}
else {
handle_.reset();
}
return *this;
}

Expand All @@ -79,11 +89,18 @@ class ClonablePtr {
~ClonablePtr() = default;

/**
* Getter for the internal pointer
* \brief Getter for the internal pointer
*/
T* get() const {
return handle_.get();
}

/**
* \brief Indicates whether this clonable pointer is valid (not null)
*/
explicit operator bool() const {
return static_cast<bool>(handle_);
}
private:
std::unique_ptr<T, Deleter> handle_;
Cloner cloner_;
Expand Down
154 changes: 154 additions & 0 deletions include/cppkafka/header.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

#ifndef CPPKAFKA_HEADER_H
#define CPPKAFKA_HEADER_H

#include "buffer.h"
#include <string>
#include <assert.h>

namespace cppkafka {

template <typename BufferType>
class Header {
public:
using ValueType = BufferType;
Header() = default;

Header(const std::string& name,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since name is being stored here, I would normally take it by value and then std::move it.

Copy link
Contributor Author

@accelerated accelerated Oct 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferType is also stored...shouldn't it also in this case be passed by value and moved ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you pls comment on this...I'm not sure if you also want the BufferType to be passed by value instead of lvalue/rvalue ?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given a BufferType can't be moved automatically, I don't think this one should take a value. Probably just have both constructors take a string by value.

const BufferType& value);

Header(const std::string& name,
BufferType&& value);

const std::string& get_name() const;

const BufferType& get_value() const;

BufferType& get_value();

operator bool() const;

private:
template <typename T>
T make_value(const T& other);

Buffer make_value(const Buffer& other);

std::string name_;
BufferType value_;
};

template <typename BufferType>
bool operator==(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return std::tie(lhs.get_name(), lhs.get_value()) == std::tie(rhs.get_name(), rhs.get_value());
}

template <typename BufferType>
bool operator!=(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return !(lhs == rhs);
}

template <typename BufferType>
bool operator<(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return std::tie(lhs.get_name(), lhs.get_value()) < std::tie(rhs.get_name(), rhs.get_value());
}

template <typename BufferType>
bool operator>(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return std::tie(lhs.get_name(), lhs.get_value()) > std::tie(rhs.get_name(), rhs.get_value());
}

template <typename BufferType>
bool operator<=(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return !(lhs > rhs);
}

template <typename BufferType>
bool operator>=(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return !(lhs < rhs);
}

template <typename BufferType>
Header<BufferType>::Header(const std::string& name,
const BufferType& value)
: name_(name),
value_(make_value(value)) {
assert(!name.empty());
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't played around with headers but just in case: is it impossible for a header to have an empty name? It looks like the header is defined as just a string in the protocol so I don't know if the brokers validate the length before accepting them. I know this wouldn't make sense but I'd rather be sure this won't backfire.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I will remove the assert and I will add a null-name test to see if kafka likes it.

}

template <typename BufferType>
Header<BufferType>::Header(const std::string& name,
BufferType&& value)
: name_(name),
value_(std::move(value)) {
assert(!name.empty());
}

template <typename BufferType>
const std::string& Header<BufferType>::get_name() const {
return name_;
}

template <typename BufferType>
const BufferType& Header<BufferType>::get_value() const {
return value_;
}

template <typename BufferType>
BufferType& Header<BufferType>::get_value() {
return value_;
}

template <typename BufferType>
Header<BufferType>::operator bool() const {
return !value_.empty();
}

template <>
inline
Header<Buffer>::operator bool() const {
return value_.get_size() > 0;
}

template <typename BufferType>
template <typename T>
T Header<BufferType>::make_value(const T& other) {
return other;
}

template <typename BufferType>
Buffer Header<BufferType>::make_value(const Buffer& other) {
return Buffer(other.get_data(), other.get_size());
}

} //namespace cppkafka

#endif //CPPKAFKA_HEADER_H
Loading