Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion include/cppkafka/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
*/
void unassign();

/**
* \brief Pauses all consumption
*/
void pause();

/**
* \brief Resumes all consumption
*/
void resume();

/**
* \brief Commits the current partition assignment
*
Expand Down Expand Up @@ -364,7 +374,6 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
private:
static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error,
rd_kafka_topic_partition_list_t *partitions, void *opaque);

void close();
void commit(const Message& msg, bool async);
void commit(const TopicPartitionList* topic_partitions, bool async);
Expand Down
11 changes: 11 additions & 0 deletions include/cppkafka/topic_partition_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <iosfwd>
#include <algorithm>
#include <vector>
#include <set>
#include <librdkafka/rdkafka.h>
#include "macros.h"

Expand All @@ -54,6 +55,16 @@ CPPKAFKA_API TopicPartitionList convert(const TopicPartitionsListPtr& topic_part
CPPKAFKA_API TopicPartitionList convert(rd_kafka_topic_partition_list_t* topic_partitions);
CPPKAFKA_API TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle);

// Extracts a partition list subset belonging to the provided topics (case-insensitive)
CPPKAFKA_API TopicPartitionList find_matches(const TopicPartitionList& partitions,
const std::set<std::string>& topics);

// Extracts a partition list subset belonging to the provided partition ids
// Note: this assumes that all topic partitions in the original list belong to the same topic
// otherwise the partition ids may not be unique
CPPKAFKA_API TopicPartitionList find_matches(const TopicPartitionList& partitions,
const std::set<int>& ids);

CPPKAFKA_API std::ostream& operator<<(std::ostream& output, const TopicPartitionList& rhs);

} // cppkafka
Expand Down
12 changes: 12 additions & 0 deletions src/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
*
*/
#include <sstream>
#include <algorithm>
#include <cctype>
#include "consumer.h"
#include "exceptions.h"
#include "logging.h"
Expand All @@ -39,6 +41,8 @@ using std::move;
using std::make_tuple;
using std::ostringstream;
using std::chrono::milliseconds;
using std::toupper;
using std::equal;

namespace cppkafka {

Expand Down Expand Up @@ -125,6 +129,14 @@ void Consumer::unassign() {
check_error(error);
}

void Consumer::pause() {
pause_partitions(get_assignment());
}

void Consumer::resume() {
resume_partitions(get_assignment());
}

void Consumer::commit() {
commit(nullptr, false);
}
Expand Down
34 changes: 34 additions & 0 deletions src/topic_partition_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@
*/

#include <iostream>
#include <string>
#include "topic_partition_list.h"
#include "topic_partition.h"
#include "exceptions.h"

using std::vector;
using std::set;
using std::ostream;
using std::string;

namespace cppkafka {

Expand Down Expand Up @@ -67,6 +70,37 @@ TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle) {
return TopicPartitionsListPtr(handle, &rd_kafka_topic_partition_list_destroy);
}

TopicPartitionList find_matches(const TopicPartitionList& partitions,
const set<string>& topics) {
TopicPartitionList subset;
for (const auto& partition : partitions) {
for (const auto& topic : topics) {
if (topic.size() == partition.get_topic().size()) {
// compare both strings
bool match = equal(topic.begin(), topic.end(), partition.get_topic().begin(),
[](char c1, char c2)->bool {
return toupper(c1) == toupper(c2);
});
if (match) {
subset.emplace_back(partition);
}
}
}
}
return subset;
}

TopicPartitionList find_matches(const TopicPartitionList& partitions,
const set<int>& ids) {
TopicPartitionList subset;
for (const auto& partition : partitions) {
if (ids.count(partition.get_partition()) > 0) {
subset.emplace_back(partition);
}
}
return subset;
}

ostream& operator<<(ostream& output, const TopicPartitionList& rhs) {
output << "[ ";
for (auto iter = rhs.begin(); iter != rhs.end(); ++iter) {
Expand Down
43 changes: 43 additions & 0 deletions tests/topic_partition_list_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "cppkafka/topic_partition.h"

using std::ostringstream;
using std::set;
using std::string;

using namespace cppkafka;

Expand Down Expand Up @@ -42,3 +44,44 @@ TEST_CASE("topic partition list to string", "[topic_partition]") {
output << list;
CHECK(output.str() == "[ foo[-1:#], bar[2:#], foobar[3:4] ]");
}

TEST_CASE("find matches by topic", "[topic_partition]") {
const TopicPartitionList list = {
{ "foo", 0 },
{ "bar", 3 },
{ "fb", 1 },
{ "foo", 1 },
{ "fb", 2 },
{ "other", 1 },
{ "a", 1 }
};

const TopicPartitionList expected = {
{ "foo", 0 },
{ "fb", 1 },
{ "foo", 1 },
{ "fb", 2 },
};
const TopicPartitionList subset = find_matches(list, set<string>{"foo", "fb"});
CHECK(subset == expected);
}

TEST_CASE("find matches by id", "[topic_partition]") {
const TopicPartitionList list = {
{ "foo", 2 },
{ "foo", 3 },
{ "foo", 4 },
{ "foo", 5 },
{ "foo", 6 },
{ "foo", 7 },
{ "foo", 8 }
};

const TopicPartitionList expected = {
{ "foo", 2 },
{ "foo", 5 },
{ "foo", 8 },
};
const TopicPartitionList subset = find_matches(list, set<int>{2,5,8});
CHECK(subset == expected);
}