Skip to content

Commit 471be62

Browse files
author
accelerated
committed
Added pause/resume for producers
1 parent 157b7ec commit 471be62

5 files changed

Lines changed: 36 additions & 1 deletion

File tree

include/cppkafka/consumer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
202202
/**
203203
* \brief Resumes all consumption
204204
*/
205-
void resume();
205+
void resume();
206206

207207
/**
208208
* \brief Commits the current partition assignment

include/cppkafka/producer.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,20 @@ class CPPKAFKA_API Producer : public KafkaHandleBase {
155155
* \param timeout The timeout used on this call
156156
*/
157157
void flush(std::chrono::milliseconds timeout);
158+
159+
/**
160+
* \brief Pauses production for this topic
161+
*
162+
* \param topic The topic name
163+
*/
164+
void pause(const std::string& topic);
165+
166+
/**
167+
* \brief Resumes production for this topic
168+
*
169+
* \param topic The topic name
170+
*/
171+
void resume(const std::string& topic);
158172
private:
159173
PayloadPolicy message_payload_policy_;
160174
};

include/cppkafka/topic_partition_list.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include <set>
3838
#include <librdkafka/rdkafka.h>
3939
#include "macros.h"
40+
#include "metadata.h"
4041

4142
namespace cppkafka {
4243

@@ -53,6 +54,8 @@ using TopicPartitionList = std::vector<TopicPartition>;
5354
CPPKAFKA_API TopicPartitionsListPtr convert(const TopicPartitionList& topic_partitions);
5455
CPPKAFKA_API TopicPartitionList convert(const TopicPartitionsListPtr& topic_partitions);
5556
CPPKAFKA_API TopicPartitionList convert(rd_kafka_topic_partition_list_t* topic_partitions);
57+
CPPKAFKA_API TopicPartitionList convert(const std::string& topic,
58+
const std::vector<PartitionMetadata>& partition_metadata);
5659
CPPKAFKA_API TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle);
5760

5861
// Extracts a partition list subset belonging to the provided topics (case-insensitive)

src/producer.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,12 @@ void Producer::flush(milliseconds timeout) {
114114
check_error(result);
115115
}
116116

117+
void Producer::pause(const std::string& topic) {
118+
pause_partitions(convert(topic, get_metadata(get_topic(topic)).get_partitions()));
119+
}
120+
121+
void Producer::resume(const std::string& topic) {
122+
resume_partitions(convert(topic, get_metadata(get_topic(topic)).get_partitions()));
123+
}
124+
117125
} // cppkafka

src/topic_partition_list.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,16 @@ TopicPartitionList convert(rd_kafka_topic_partition_list_t* topic_partitions) {
6666
return output;
6767
}
6868

69+
TopicPartitionList convert(const std::string& topic,
70+
const std::vector<PartitionMetadata>& partition_metadata)
71+
{
72+
TopicPartitionList output;
73+
for (const auto& meta : partition_metadata) {
74+
output.emplace_back(topic, meta.get_id());
75+
}
76+
return output;
77+
}
78+
6979
TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle) {
7080
return TopicPartitionsListPtr(handle, &rd_kafka_topic_partition_list_destroy);
7181
}

0 commit comments

Comments
 (0)