Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions include/cppkafka/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,14 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
* This can return one or more messages
*
* \param max_batch_size The maximum amount of messages expected
* \param alloc The optionally supplied allocator for allocating messages
*
* \return A list of messages
*/
MessageList poll_batch(size_t max_batch_size);
template <typename Allocator>
std::vector<Message, Allocator> poll_batch(size_t max_batch_size,
const Allocator& alloc);
std::vector<Message> poll_batch(size_t max_batch_size);

/**
* \brief Polls for a batch of messages
Expand All @@ -391,10 +395,16 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
*
* \param max_batch_size The maximum amount of messages expected
* \param timeout The timeout for this operation
* \param alloc The optionally supplied allocator for allocating messages
*
* \return A list of messages
*/
MessageList poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout);
template <typename Allocator>
std::vector<Message, Allocator> poll_batch(size_t max_batch_size,
std::chrono::milliseconds timeout,
const Allocator& alloc);
std::vector<Message> poll_batch(size_t max_batch_size,
std::chrono::milliseconds timeout);

/**
* \brief Get the global event queue servicing this consumer corresponding to
Expand Down Expand Up @@ -430,6 +440,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
private:
static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error,
rd_kafka_topic_partition_list_t *partitions, void *opaque);
static Queue get_queue(rd_kafka_queue_t* handle);
void close();
void commit(const Message& msg, bool async);
void commit(const TopicPartitionList* topic_partitions, bool async);
Expand All @@ -440,6 +451,30 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
RebalanceErrorCallback rebalance_error_callback_;
};

// Implementations
template <typename Allocator>
std::vector<Message, Allocator> Consumer::poll_batch(size_t max_batch_size,
const Allocator& alloc) {
return poll_batch(max_batch_size, get_timeout(), alloc);
}

template <typename Allocator>
std::vector<Message, Allocator> Consumer::poll_batch(size_t max_batch_size,
std::chrono::milliseconds timeout,
const Allocator& alloc) {
std::vector<rd_kafka_message_t*> raw_messages(max_batch_size);
// Note that this will leak the queue when using rdkafka < 0.11.5 (see get_queue comment)
Queue queue(get_queue(rd_kafka_queue_get_consumer(get_handle())));
ssize_t result = rd_kafka_consume_batch_queue(queue.get_handle() , timeout.count(), raw_messages.data(),
raw_messages.size());
if (result == -1) {
check_error(rd_kafka_last_error());
// on the off-chance that check_error() does not throw an error
return std::vector<Message, Allocator>(alloc);
}
return std::vector<Message, Allocator>(raw_messages.begin(), raw_messages.begin() + result, alloc);
}

} // cppkafka

#endif // CPP_KAFKA_CONSUMER_H
42 changes: 40 additions & 2 deletions include/cppkafka/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,14 @@ class CPPKAFKA_API Queue {
*
* \param max_batch_size The max number of messages to consume if available
*
* \param alloc The optionally supplied allocator for the message list
*
* \return A list of messages. Could be empty if there's nothing to consume
*/
MessageList consume_batch(size_t max_batch_size) const;
template <typename Allocator>
std::vector<Message, Allocator> consume_batch(size_t max_batch_size,
const Allocator& alloc) const;
std::vector<Message> consume_batch(size_t max_batch_size) const;

/**
* \brief Consumes a batch of messages from this queue
Expand All @@ -151,9 +156,16 @@ class CPPKAFKA_API Queue {
*
* \param timeout The timeout to be used on this call
*
* \param alloc The optionally supplied allocator for the message list
*
* \return A list of messages. Could be empty if there's nothing to consume
*/
MessageList consume_batch(size_t max_batch_size, std::chrono::milliseconds timeout) const;
template <typename Allocator>
std::vector<Message, Allocator> consume_batch(size_t max_batch_size,
std::chrono::milliseconds timeout,
const Allocator& alloc) const;
std::vector<Message> consume_batch(size_t max_batch_size,
std::chrono::milliseconds timeout) const;

/**
* Indicates whether this queue is valid (not null)
Expand All @@ -178,6 +190,32 @@ class CPPKAFKA_API Queue {

using QueueList = std::vector<Queue>;

template <typename Allocator>
std::vector<Message, Allocator> Queue::consume_batch(size_t max_batch_size,
const Allocator& alloc) const {
return consume_batch(max_batch_size, timeout_ms_, alloc);
}

template <typename Allocator>
std::vector<Message, Allocator> Queue::consume_batch(size_t max_batch_size,
std::chrono::milliseconds timeout,
const Allocator& alloc) const {
std::vector<rd_kafka_message_t*> raw_messages(max_batch_size);
ssize_t result = rd_kafka_consume_batch_queue(handle_.get(),
static_cast<int>(timeout.count()),
raw_messages.data(),
raw_messages.size());
if (result == -1) {
rd_kafka_resp_err_t error = rd_kafka_last_error();
if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
throw QueueException(error);
}
return std::vector<Message, Allocator>(alloc);
}
// Build message list
return std::vector<Message, Allocator>(raw_messages.begin(), raw_messages.begin() + result, alloc);
}

} // cppkafka

#endif //CPPKAFKA_QUEUE_H
Loading