Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/cppkafka/kafka_handle_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ class CPPKAFKA_API KafkaHandleBase {

void set_handle(rd_kafka_t* handle);
void check_error(rd_kafka_resp_err_t error) const;
void check_error(rd_kafka_resp_err_t error,
const rd_kafka_topic_partition_list_t* list_ptr) const;
rd_kafka_conf_t* get_configuration_handle();
private:
static const std::chrono::milliseconds DEFAULT_TIMEOUT;
Expand Down
32 changes: 21 additions & 11 deletions src/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,16 @@ void Consumer::unsubscribe() {
}

void Consumer::assign(const TopicPartitionList& topic_partitions) {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
// If the list is empty, then we need to use a null pointer
auto handle = topic_partitions.empty() ? nullptr : topic_list_handle.get();
rd_kafka_resp_err_t error = rd_kafka_assign(get_handle(), handle);
check_error(error);
rd_kafka_resp_err_t error;
if (topic_partitions.empty()) {
error = rd_kafka_assign(get_handle(), nullptr);
check_error(error);
}
else {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
error = rd_kafka_assign(get_handle(), topic_list_handle.get());
check_error(error, topic_list_handle.get());
}
}

void Consumer::unassign() {
Expand Down Expand Up @@ -178,15 +183,15 @@ Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) cons
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(),
static_cast<int>(get_timeout().count()));
check_error(error);
check_error(error, topic_list_handle.get());
return convert(topic_list_handle);
}

TopicPartitionList
Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) const {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_position(get_handle(), topic_list_handle.get());
check_error(error);
check_error(error, topic_list_handle.get());
return convert(topic_list_handle);
}

Expand Down Expand Up @@ -284,10 +289,15 @@ void Consumer::commit(const Message& msg, bool async) {

void Consumer::commit(const TopicPartitionList* topic_partitions, bool async) {
rd_kafka_resp_err_t error;
error = rd_kafka_commit(get_handle(),
!topic_partitions ? nullptr : convert(*topic_partitions).get(),
async ? 1 : 0);
check_error(error);
if (topic_partitions == nullptr) {
error = rd_kafka_commit(get_handle(), nullptr, async ? 1 : 0);
check_error(error);
}
else {
TopicPartitionsListPtr topic_list_handle = convert(*topic_partitions);
error = rd_kafka_commit(get_handle(), topic_list_handle.get(), async ? 1 : 0);
check_error(error, topic_list_handle.get());
}
}

void Consumer::handle_rebalance(rd_kafka_resp_err_t error,
Expand Down
21 changes: 18 additions & 3 deletions src/kafka_handle_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ void KafkaHandleBase::pause_partitions(const TopicPartitionList& topic_partition
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_pause_partitions(get_handle(),
topic_list_handle.get());
check_error(error);
check_error(error, topic_list_handle.get());
}

void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitions) {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_resume_partitions(get_handle(),
topic_list_handle.get());
check_error(error);
check_error(error, topic_list_handle.get());
}

void KafkaHandleBase::set_timeout(milliseconds timeout) {
Expand Down Expand Up @@ -145,7 +145,7 @@ KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queri
const int timeout = static_cast<int>(timeout_ms_.count());
rd_kafka_resp_err_t result = rd_kafka_offsets_for_times(handle_.get(), topic_list_handle.get(),
timeout);
check_error(result);
check_error(result, topic_list_handle.get());
return convert(topic_list_handle);
}

Expand Down Expand Up @@ -220,6 +220,21 @@ void KafkaHandleBase::check_error(rd_kafka_resp_err_t error) const {
}
}

void KafkaHandleBase::check_error(rd_kafka_resp_err_t error,
const rd_kafka_topic_partition_list_t* list_ptr) const {
if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
throw HandleException(error);
}
if (list_ptr) {
//check if any partition has errors
for (int i = 0; i < list_ptr->cnt; ++i) {
if (list_ptr->elems[i].err != RD_KAFKA_RESP_ERR_NO_ERROR) {
throw HandleException(error);
}
}
}
}

rd_kafka_conf_t* KafkaHandleBase::get_configuration_handle() {
return config_.get_handle();
}
Expand Down