Skip to content

Commit 9f6556d

Browse files
authored
Merge pull request #125 from mfontanini/events
Event implementation
2 parents b0ddced + 46481d8 commit 9f6556d

File tree

16 files changed

+407
-35
lines changed

16 files changed

+407
-35
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ compiler:
77
- clang
88

99
env:
10+
- RDKAFKA_VERSION=v0.9.4
1011
- RDKAFKA_VERSION=v0.11.5
1112

1213
os:

include/cppkafka/configuration.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include "clonable_ptr.h"
4343
#include "configuration_base.h"
4444
#include "macros.h"
45+
#include "event.h"
4546

4647
namespace cppkafka {
4748

@@ -78,6 +79,7 @@ class CPPKAFKA_API Configuration : public ConfigurationBase<Configuration> {
7879
const std::string& message)>;
7980
using StatsCallback = std::function<void(KafkaHandleBase& handle, const std::string& json)>;
8081
using SocketCallback = std::function<int(int domain, int type, int protocol)>;
82+
using BackgroundEventCallback = std::function<void(KafkaHandleBase& handle, Event)>;
8183

8284
using ConfigurationBase<Configuration>::set;
8385
using ConfigurationBase<Configuration>::get;
@@ -142,6 +144,13 @@ class CPPKAFKA_API Configuration : public ConfigurationBase<Configuration> {
142144
*/
143145
Configuration& set_socket_callback(SocketCallback callback);
144146

147+
#if RD_KAFKA_VERSION >= RD_KAFKA_ADMIN_API_SUPPORT_VERSION
148+
/**
149+
* Sets the background event callback (invokes rd_kafka_conf_set_background_event_cb)
150+
*/
151+
Configuration& set_background_event_callback(BackgroundEventCallback callback);
152+
#endif
153+
145154
/**
146155
* Sets the default topic configuration
147156
*/
@@ -204,6 +213,11 @@ class CPPKAFKA_API Configuration : public ConfigurationBase<Configuration> {
204213
*/
205214
const SocketCallback& get_socket_callback() const;
206215

216+
/**
217+
* Gets the background event callback
218+
*/
219+
const BackgroundEventCallback& get_background_event_callback() const;
220+
207221
/**
208222
* Gets the default topic configuration
209223
*/
@@ -229,6 +243,7 @@ class CPPKAFKA_API Configuration : public ConfigurationBase<Configuration> {
229243
LogCallback log_callback_;
230244
StatsCallback stats_callback_;
231245
SocketCallback socket_callback_;
246+
BackgroundEventCallback background_event_callback_;
232247
};
233248

234249
} // cppkafka

include/cppkafka/consumer.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,6 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
461461
private:
462462
static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error,
463463
rd_kafka_topic_partition_list_t *partitions, void *opaque);
464-
static Queue get_queue(rd_kafka_queue_t* handle);
465464
void close();
466465
void commit(const Message& msg, bool async);
467466
void commit(const TopicPartitionList* topic_partitions, bool async);
@@ -485,7 +484,7 @@ std::vector<Message, Allocator> Consumer::poll_batch(size_t max_batch_size,
485484
const Allocator& alloc) {
486485
std::vector<rd_kafka_message_t*> raw_messages(max_batch_size);
487486
// Note that this will leak the queue when using rdkafka < 0.11.5 (see get_queue comment)
488-
Queue queue(get_queue(rd_kafka_queue_get_consumer(get_handle())));
487+
Queue queue = Queue::make_queue(rd_kafka_queue_get_consumer(get_handle()));
489488
ssize_t result = rd_kafka_consume_batch_queue(queue.get_handle(),
490489
timeout.count(),
491490
raw_messages.data(),

include/cppkafka/cppkafka.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include <cppkafka/configuration_option.h>
3838
#include <cppkafka/consumer.h>
3939
#include <cppkafka/error.h>
40+
#include <cppkafka/event.h>
4041
#include <cppkafka/exceptions.h>
4142
#include <cppkafka/group_information.h>
4243
#include <cppkafka/header.h>

include/cppkafka/event.h

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Copyright (c) 2018, Matias Fontanini
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are
7+
* met:
8+
*
9+
* * Redistributions of source code must retain the above copyright
10+
* notice, this list of conditions and the following disclaimer.
11+
* * Redistributions in binary form must reproduce the above
12+
* copyright notice, this list of conditions and the following disclaimer
13+
* in the documentation and/or other materials provided with the
14+
* distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*
28+
*/
29+
30+
#ifndef CPPKAFKA_EVENT_H
31+
#define CPPKAFKA_EVENT_H
32+
33+
#include <memory>
34+
#include <string>
35+
#include <vector>
36+
#include "error.h"
37+
#include "message.h"
38+
#include "topic_partition.h"
39+
#include "topic_partition_list.h"
40+
41+
namespace cppkafka {
42+
43+
class Event {
44+
public:
45+
/**
46+
* Construct an Event from a rdkafka event handle and take ownership of it
47+
*
48+
* /param handle The handle to construct this event from
49+
*/
50+
Event(rd_kafka_event_t* handle);
51+
52+
/**
53+
* Returns the name of this event
54+
*/
55+
std::string get_name() const;
56+
57+
/**
58+
* Returns the type of this event
59+
*/
60+
rd_kafka_event_type_t get_type() const;
61+
62+
/**
63+
* \brief Gets the next message contained in this event.
64+
*
65+
* This call is only valid if the event type is one of:
66+
* * RD_KAFKA_EVENT_FETCH
67+
* * RD_KAFKA_EVENT_DR
68+
*
69+
* \note The returned message's lifetime *is tied to this Event*. That is, if the event
70+
* is free'd so will the contents of the message.
71+
*/
72+
Message get_next_message() const;
73+
74+
/**
75+
* \brief Gets all messages in this event (if any)
76+
*
77+
* This call is only valid if the event type is one of:
78+
* * RD_KAFKA_EVENT_FETCH
79+
* * RD_KAFKA_EVENT_DR
80+
*
81+
* \note The returned messages' lifetime *is tied to this Event*. That is, if the event
82+
* is free'd so will the contents of the messages.
83+
*
84+
* \return A vector containing 0 or more messages
85+
*/
86+
std::vector<Message> get_messages();
87+
88+
/**
89+
* \brief Gets all messages in this event (if any)
90+
*
91+
* This call is only valid if the event type is one of:
92+
* * RD_KAFKA_EVENT_FETCH
93+
* * RD_KAFKA_EVENT_DR
94+
*
95+
* \param allocator The allocator to use on the output vector
96+
*
97+
* \note The returned messages' lifetime *is tied to this Event*. That is, if the event
98+
* is free'd so will the contents of the messages.
99+
*
100+
* \return A vector containing 0 or more messages
101+
*/
102+
template <typename Allocator>
103+
std::vector<Message, Allocator> get_messages(const Allocator allocator);
104+
105+
/**
106+
* \brief Gets the number of messages contained in this event
107+
*
108+
* This call is only valid if the event type is one of:
109+
* * RD_KAFKA_EVENT_FETCH
110+
* * RD_KAFKA_EVENT_DR
111+
*/
112+
size_t get_message_count() const;
113+
114+
/**
115+
* \brief Returns the error in this event
116+
*/
117+
Error get_error() const;
118+
119+
/**
120+
* Gets the opaque pointer in this event
121+
*/
122+
void* get_opaque() const;
123+
124+
#if RD_KAFKA_VERSION >= RD_KAFKA_EVENT_STATS_SUPPORT_VERSION
125+
/**
126+
* \brief Gets the stats in this event
127+
*
128+
* This call is only valid if the event type is RD_KAFKA_EVENT_STATS
129+
*/
130+
std::string get_stats() const {
131+
return rd_kafka_event_stats(handle_.get());
132+
}
133+
#endif
134+
135+
/**
136+
* \brief Gets the topic/partition for this event
137+
*
138+
* This call is only valid if the event type is RD_KAFKA_EVENT_ERROR
139+
*/
140+
TopicPartition get_topic_partition() const;
141+
142+
/**
143+
* \brief Gets the list of topic/partitions in this event
144+
*
145+
* This call is only valid if the event type is one of:
146+
* * RD_KAFKA_EVENT_REBALANCE
147+
* * RD_KAFKA_EVENT_OFFSET_COMMIT
148+
*/
149+
TopicPartitionList get_topic_partition_list() const;
150+
151+
/**
152+
* Check whether this event is valid
153+
*
154+
* /return true iff this event has a valid (non-null) handle inside
155+
*/
156+
operator bool() const;
157+
private:
158+
using HandlePtr = std::unique_ptr<rd_kafka_event_t, decltype(&rd_kafka_event_destroy)>;
159+
160+
HandlePtr handle_;
161+
};
162+
163+
template <typename Allocator>
164+
std::vector<Message, Allocator> Event::get_messages(const Allocator allocator) {
165+
const size_t total_messages = get_message_count();
166+
std::vector<const rd_kafka_message_t*> raw_messages(total_messages);
167+
const auto messages_read = rd_kafka_event_message_array(handle_.get(),
168+
raw_messages.data(),
169+
total_messages);
170+
std::vector<Message, Allocator> output(allocator);
171+
output.reserve(messages_read);
172+
for (auto message : raw_messages) {
173+
output.emplace_back(Message::make_non_owning(const_cast<rd_kafka_message_t*>(message)));
174+
}
175+
return output;
176+
}
177+
178+
} // cppkafka
179+
180+
#endif // CPPKAFKA_EVENT_H

include/cppkafka/kafka_handle_base.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#include "configuration.h"
4747
#include "macros.h"
4848
#include "logging.h"
49+
#include "queue.h"
4950

5051
namespace cppkafka {
5152

@@ -239,6 +240,19 @@ class CPPKAFKA_API KafkaHandleBase {
239240
*/
240241
const Configuration& get_configuration() const;
241242

243+
#if RD_KAFKA_VERSION >= RD_KAFKA_ADMIN_API_SUPPORT_VERSION
244+
/**
245+
* \brief Gets the background queue
246+
*
247+
* This translates into a call to rd_kafka_queue_get_background
248+
*
249+
* \return The background queue
250+
*/
251+
Queue get_background_queue() const {
252+
return Queue::make_queue(rd_kafka_queue_get_background(handle_.get()));
253+
}
254+
#endif
255+
242256
/**
243257
* \brief Gets the length of the out queue
244258
*

include/cppkafka/macros.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,8 @@
4646
// See: https://github.com/edenhill/librdkafka/issues/1792
4747
#define RD_KAFKA_QUEUE_REFCOUNT_BUG_VERSION 0x000b0500 //v0.11.5.00
4848
#define RD_KAFKA_HEADERS_SUPPORT_VERSION 0x000b0402 //v0.11.4.02
49+
#define RD_KAFKA_ADMIN_API_SUPPORT_VERSION 0x000b0500 //v0.11.5.00
50+
#define RD_KAFKA_MESSAGE_LATENCY_SUPPORT_VERSION 0x000b0000 //v0.11.0.00
51+
#define RD_KAFKA_EVENT_STATS_SUPPORT_VERSION 0x000b0000 //v0.11.0.00
4952

5053
#endif // CPPKAFKA_MACROS_H

include/cppkafka/message.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,13 +177,15 @@ class CPPKAFKA_API Message {
177177
*/
178178
boost::optional<MessageTimestamp> get_timestamp() const;
179179

180+
#if RD_KAFKA_VERSION >= RD_KAFKA_MESSAGE_LATENCY_SUPPORT_VERSION
180181
/**
181182
* \brief Gets the message latency in microseconds as measured from the produce() call.
182183
*/
183184
std::chrono::microseconds get_latency() const {
184185
assert(handle_);
185186
return std::chrono::microseconds(rd_kafka_message_latency(handle_.get()));
186187
}
188+
#endif
187189

188190
/**
189191
* \brief Indicates whether this message is valid (not null)

include/cppkafka/queue.h

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <memory>
3232
#include <boost/optional.hpp>
3333
#include <librdkafka/rdkafka.h>
34+
#include "event.h"
3435
#include "macros.h"
3536
#include "message.h"
3637

@@ -51,7 +52,18 @@ class CPPKAFKA_API Queue {
5152
* \param handle The handle to be used
5253
*/
5354
static Queue make_non_owning(rd_kafka_queue_t* handle);
54-
55+
56+
/**
57+
* \brieef Creates a Queue object out of a handle.
58+
*
59+
* This will check what the rdkafka version is and will return either an owned
60+
* queue handle or a non owned one, depending on whether the current version
61+
* is >= RD_KAFKA_QUEUE_REFCOUNT_BUG_VERSION (see macros.h)
62+
*
63+
* \param handle The handle to be used
64+
*/
65+
static Queue make_queue(rd_kafka_queue_t* handle);
66+
5567
/**
5668
* \brief Constructs an empty queue
5769
*
@@ -130,7 +142,7 @@ class CPPKAFKA_API Queue {
130142
* \return A message
131143
*/
132144
Message consume(std::chrono::milliseconds timeout) const;
133-
145+
134146
/**
135147
* \brief Consumes a batch of messages from this queue
136148
*
@@ -188,7 +200,23 @@ class CPPKAFKA_API Queue {
188200
*/
189201
std::vector<Message> consume_batch(size_t max_batch_size,
190202
std::chrono::milliseconds timeout) const;
191-
203+
204+
/**
205+
* \brief Extracts the next message in this Queue
206+
*
207+
* /return The latest event, if any
208+
*/
209+
Event next_event() const;
210+
211+
/**
212+
* \brief Extracts the next message in this Queue
213+
*
214+
* \param timeout The amount of time to wait for this operation to complete
215+
*
216+
* /return The latest event, if any
217+
*/
218+
Event next_event(std::chrono::milliseconds timeout) const;
219+
192220
/**
193221
* Indicates whether this queue is valid (not null)
194222
*/

src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ set(SOURCES
1414
metadata.cpp
1515
group_information.cpp
1616
error.cpp
17+
event.cpp
1718

1819
kafka_handle_base.cpp
1920
producer.cpp

0 commit comments

Comments
 (0)