Skip to content

Commit b0ddced

Browse files
acceleratedmfontanini
authored andcommitted
Message timestamp refactoring and log level changes (#133)
* Message timestamp refactoring and log level changes * Changes per code review
1 parent 451d602 commit b0ddced

10 files changed

Lines changed: 148 additions & 60 deletions

File tree

include/cppkafka/cppkafka.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#include <cppkafka/message.h>
4949
#include <cppkafka/message_builder.h>
5050
#include <cppkafka/message_internal.h>
51+
#include <cppkafka/message_timestamp.h>
5152
#include <cppkafka/metadata.h>
5253
#include <cppkafka/producer.h>
5354
#include <cppkafka/queue.h>

include/cppkafka/kafka_handle_base.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include "topic_configuration.h"
4646
#include "configuration.h"
4747
#include "macros.h"
48+
#include "logging.h"
4849

4950
namespace cppkafka {
5051

@@ -107,6 +108,11 @@ class CPPKAFKA_API KafkaHandleBase {
107108
* \param timeout The timeout to be set
108109
*/
109110
void set_timeout(std::chrono::milliseconds timeout);
111+
112+
/**
113+
* \brief Sets the log level
114+
*/
115+
void set_log_level(LogLevel level);
110116

111117
/**
112118
* \brief Adds one or more brokers to this handle's broker list

include/cppkafka/message.h

Lines changed: 2 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@
4040
#include "macros.h"
4141
#include "error.h"
4242
#include "header_list.h"
43+
#include "message_timestamp.h"
4344

4445
namespace cppkafka {
4546

46-
class MessageTimestamp;
4747
class Internal;
4848

4949
/**
@@ -175,7 +175,7 @@ class CPPKAFKA_API Message {
175175
*
176176
* If calling rd_kafka_message_timestamp returns -1, then boost::none_t will be returned.
177177
*/
178-
inline boost::optional<MessageTimestamp> get_timestamp() const;
178+
boost::optional<MessageTimestamp> get_timestamp() const;
179179

180180
/**
181181
* \brief Gets the message latency in microseconds as measured from the produce() call.
@@ -226,49 +226,6 @@ class CPPKAFKA_API Message {
226226

227227
using MessageList = std::vector<Message>;
228228

229-
/**
230-
* Represents a message's timestamp
231-
*/
232-
class CPPKAFKA_API MessageTimestamp {
233-
public:
234-
/**
235-
* The timestamp type
236-
*/
237-
enum TimestampType {
238-
CREATE_TIME = RD_KAFKA_TIMESTAMP_CREATE_TIME,
239-
LOG_APPEND_TIME = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME
240-
};
241-
242-
/**
243-
* Constructs a timestamp object using a 'duration'.
244-
*/
245-
MessageTimestamp(std::chrono::milliseconds timestamp, TimestampType type);
246-
247-
/**
248-
* Gets the timestamp value. If the timestamp was created with a 'time_point',
249-
* the duration represents the number of milliseconds since epoch.
250-
*/
251-
std::chrono::milliseconds get_timestamp() const;
252-
253-
/**
254-
* Gets the timestamp type
255-
*/
256-
TimestampType get_type() const;
257-
private:
258-
std::chrono::milliseconds timestamp_;
259-
TimestampType type_;
260-
};
261-
262-
boost::optional<MessageTimestamp> Message::get_timestamp() const {
263-
rd_kafka_timestamp_type_t type = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
264-
int64_t timestamp = rd_kafka_message_timestamp(handle_.get(), &type);
265-
if (timestamp == -1 || type == RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
266-
return {};
267-
}
268-
return MessageTimestamp(std::chrono::milliseconds(timestamp),
269-
static_cast<MessageTimestamp::TimestampType>(type));
270-
}
271-
272229
} // cppkafka
273230

274231
#endif // CPPKAFKA_MESSAGE_H
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright (c) 2017, Matias Fontanini
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are
7+
* met:
8+
*
9+
* * Redistributions of source code must retain the above copyright
10+
* notice, this list of conditions and the following disclaimer.
11+
* * Redistributions in binary form must reproduce the above
12+
* copyright notice, this list of conditions and the following disclaimer
13+
* in the documentation and/or other materials provided with the
14+
* distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*
28+
*/
29+
30+
#ifndef CPPKAFKA_MESSAGE_TIMESTAMP_H
31+
#define CPPKAFKA_MESSAGE_TIMESTAMP_H
32+
33+
#include <chrono>
34+
#include <boost/optional.hpp>
35+
#include <librdkafka/rdkafka.h>
36+
#include "macros.h"
37+
38+
namespace cppkafka {
39+
40+
/**
41+
* Represents a message's timestamp
42+
*/
43+
class CPPKAFKA_API MessageTimestamp {
44+
friend class Message;
45+
public:
46+
/**
47+
* The timestamp type
48+
*/
49+
enum TimestampType {
50+
CREATE_TIME = RD_KAFKA_TIMESTAMP_CREATE_TIME,
51+
LOG_APPEND_TIME = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME
52+
};
53+
54+
/**
55+
* Gets the timestamp value. If the timestamp was created with a 'time_point',
56+
* the duration represents the number of milliseconds since epoch.
57+
*/
58+
std::chrono::milliseconds get_timestamp() const;
59+
60+
/**
61+
* Gets the timestamp type
62+
*/
63+
TimestampType get_type() const;
64+
private:
65+
MessageTimestamp(std::chrono::milliseconds timestamp, TimestampType type);
66+
67+
std::chrono::milliseconds timestamp_;
68+
TimestampType type_;
69+
};
70+
71+
} // cppkafka
72+
73+
#endif //CPPKAFKA_MESSAGE_TIMESTAMP_H

src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ set(SOURCES
77
buffer.cpp
88
queue.cpp
99
message.cpp
10+
message_timestamp.cpp
1011
message_internal.cpp
1112
topic_partition.cpp
1213
topic_partition_list.cpp

src/consumer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ Consumer::Consumer(Configuration config)
7979
}
8080
rd_kafka_poll_set_consumer(ptr);
8181
set_handle(ptr);
82+
set_log_level(LogLevel::LogErr);
8283
}
8384

8485
Consumer::~Consumer() {

src/kafka_handle_base.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ void KafkaHandleBase::set_timeout(milliseconds timeout) {
8383
timeout_ms_ = timeout;
8484
}
8585

86+
void KafkaHandleBase::set_log_level(LogLevel level) {
87+
rd_kafka_set_log_level(handle_.get(), static_cast<int>(level));
88+
}
89+
8690
void KafkaHandleBase::add_brokers(const string& brokers) {
8791
rd_kafka_brokers_add(handle_.get(), brokers.data());
8892
}

src/message.cpp

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -84,20 +84,14 @@ Message& Message::load_internal() {
8484
return *this;
8585
}
8686

87-
// MessageTimestamp
88-
89-
MessageTimestamp::MessageTimestamp(milliseconds timestamp, TimestampType type)
90-
: timestamp_(timestamp),
91-
type_(type) {
92-
93-
}
94-
95-
milliseconds MessageTimestamp::get_timestamp() const {
96-
return timestamp_;
97-
}
98-
99-
MessageTimestamp::TimestampType MessageTimestamp::get_type() const {
100-
return type_;
87+
boost::optional<MessageTimestamp> Message::get_timestamp() const {
88+
rd_kafka_timestamp_type_t type = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
89+
int64_t timestamp = rd_kafka_message_timestamp(handle_.get(), &type);
90+
if (timestamp == -1 || type == RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
91+
return {};
92+
}
93+
return MessageTimestamp(std::chrono::milliseconds(timestamp),
94+
static_cast<MessageTimestamp::TimestampType>(type));
10195
}
10296

10397
} // cppkafka

src/message_timestamp.cpp

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright (c) 2017, Matias Fontanini
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are
7+
* met:
8+
*
9+
* * Redistributions of source code must retain the above copyright
10+
* notice, this list of conditions and the following disclaimer.
11+
* * Redistributions in binary form must reproduce the above
12+
* copyright notice, this list of conditions and the following disclaimer
13+
* in the documentation and/or other materials provided with the
14+
* distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*
28+
*/
29+
30+
#include "message_timestamp.h"
31+
32+
using std::chrono::milliseconds;
33+
34+
namespace cppkafka {
35+
36+
MessageTimestamp::MessageTimestamp(milliseconds timestamp, TimestampType type)
37+
: timestamp_(timestamp),
38+
type_(type) {
39+
40+
}
41+
42+
milliseconds MessageTimestamp::get_timestamp() const {
43+
return timestamp_;
44+
}
45+
46+
MessageTimestamp::TimestampType MessageTimestamp::get_type() const {
47+
return type_;
48+
}
49+
50+
} // cppkafka
51+

src/producer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ Producer::Producer(Configuration config)
5252
if (!ptr) {
5353
throw Exception("Failed to create producer handle: " + string(error_buffer));
5454
}
55-
rd_kafka_set_log_level(ptr, 7);
5655
set_handle(ptr);
56+
set_log_level(LogLevel::LogErr);
5757
}
5858

5959
void Producer::set_payload_policy(PayloadPolicy policy) {

0 commit comments

Comments
 (0)