Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Property | C/P | Range | Default | Importance | Description
-----------------------------------------|-----|-----------------|--------------:|------------| --------------------------
builtin.features | * | | gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, plugins, zstd, sasl_oauthbearer, http, oidc | low | Indicates the builtin features for this build of librdkafka. An application can either query this value or attempt to set it with its list of required features to check for library support. <br>*Type: CSV flags*
client.id | * | | rdkafka | low | Client identifier. <br>*Type: string*
client.id | * | | rdkafka@{hostname} | low | Client identifier. Fallback default value is "rdkafka" if current host name cannot be retrieved. <br>*Type: string*
metadata.broker.list | * | | | high | Initial list of brokers as a CSV list of broker host or host:port. The application may also use `rd_kafka_brokers_add()` to add brokers during runtime. <br>*Type: string*
bootstrap.servers | * | | | high | Alias for `metadata.broker.list`: Initial list of brokers as a CSV list of broker host or host:port. The application may also use `rd_kafka_brokers_add()` to add brokers during runtime. <br>*Type: string*
message.max.bytes | * | 1000 .. 1000000000 | 1000000 | medium | Maximum Kafka protocol request message size. Due to differing framing overhead between protocol versions the producer is unable to reliably enforce a strict max message limit at produce time and may exceed the maximum size by one message in protocol ProduceRequests, the broker will enforce the the topic's `max.message.bytes` limit (see Apache Kafka documentation). <br>*Type: integer*
Expand Down
6 changes: 0 additions & 6 deletions examples/kafkatest_verifiable_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,12 +627,6 @@ int main(int argc, char **argv) {
exit(1);
}

{
char hostname[128];
gethostname(hostname, sizeof(hostname) - 1);
conf->set("client.id", std::string("rdkafka@") + hostname, errstr);
}

conf->set("log.thread.name", "true", errstr);

/* auto commit is explicitly enabled with --enable-autocommit */
Expand Down
6 changes: 4 additions & 2 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ typedef SSIZE_T ssize_t;
#endif
#define RD_UNUSED
#define RD_INLINE __inline
#define RD_NOINLINE __declspec(noinline)
#define RD_DEPRECATED __declspec(deprecated)
#define RD_FORMAT(...)
#undef RD_EXPORT
Expand All @@ -87,8 +88,9 @@ typedef SSIZE_T ssize_t;
#else
#include <sys/socket.h> /* for sockaddr, .. */

#define RD_UNUSED __attribute__((unused))
#define RD_INLINE inline
#define RD_UNUSED __attribute__((unused))
#define RD_INLINE inline
#define RD_NOINLINE __attribute__((noinline))
#define RD_EXPORT
#define RD_DEPRECATED __attribute__((deprecated))

Expand Down
68 changes: 67 additions & 1 deletion src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "rdkafka_int.h"
#include "rdkafka_feature.h"
#include "rdkafka_interceptor.h"
#include "rdkafka_transport.h"
#include "rdkafka_idempotence.h"
#include "rdkafka_assignor.h"
#include "rdkafka_sasl_oauthbearer.h"
Expand Down Expand Up @@ -386,7 +387,9 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
{0x2000, "oidc", _UNSUPPORTED_OIDC},
{0, NULL}}},
{_RK_GLOBAL, "client.id", _RK_C_STR, _RK(client_id_str),
"Client identifier.", .sdef = "rdkafka"},
"Client identifier. Fallback default value is \"rdkafka\" "
"if current host name cannot be retrieved.",
.sdef = "rdkafka@{hostname}" /* placeholder value */},
{_RK_GLOBAL | _RK_HIDDEN, "client.software.name", _RK_C_STR, _RK(sw_name),
"Client software name as reported to broker version >= v2.4.0. "
"Broker-side character restrictions apply, as of broker version "
Expand Down Expand Up @@ -2445,11 +2448,74 @@ static void rd_kafka_defaultconf_set(int scope, void *conf) {
}
}

static const char *rd_kafka_client_id = NULL;

static void rd_kafka_default_client_id_impl(void) {
const int stem_len = 8; /* strlen("rdkafka@") */
/* 256 = maximum hostname length among most platforms
however, some platforms have lesser value;
8 = stem length, merely `strlen("rdkafka@")';
1 = space for NUL byte.
*/
static char client_id_buf[256 + 8 + 1];
int res;

(void)memset(client_id_buf, 0, sizeof(client_id_buf));
/* strcpy(client_id_buf, "rdkafka@"); */
(void)memcpy(client_id_buf, "rdkafka@", stem_len);

/* required on Windows before calling gethostname();
* on other platforms it does nothing (for now). */
rd_kafka_transport_init();

res = gethostname(client_id_buf + stem_len,
sizeof(client_id_buf) - stem_len - 1);
if (res != 0) {
/* if gethostname() is failed for whatever reason,
* trim "client_id_buf": produces "rdkafka". */
client_id_buf[stem_len - 1] = 0;
}
rd_kafka_client_id = (const char *)client_id_buf;
}

#ifdef _WIN32
static RD_NOINLINE
BOOL CALLBACK rd_kafka_default_client_id_once(PINIT_ONCE init_once,
PVOID parameter,
PVOID *context) {
rd_kafka_default_client_id_impl();
return TRUE;
}
#else
static RD_NOINLINE
void rd_kafka_default_client_id_once(void) {
rd_kafka_default_client_id_impl();
}
#endif

static RD_NOINLINE
void rd_kafka_default_client_id(rd_kafka_conf_t *conf) {
const char *client_id;
#ifdef _WIN32
static INIT_ONCE _once = INIT_ONCE_STATIC_INIT;
(void)InitOnceExecuteOnce(&_once, rd_kafka_default_client_id_once, NULL,
NULL);
#else
static pthread_once_t _once = PTHREAD_ONCE_INIT;
(void)pthread_once(&_once, rd_kafka_default_client_id_once);
#endif
client_id = rd_kafka_client_id;
if (!client_id)
return;
(void)rd_kafka_conf_set(conf, "client.id", client_id, NULL, 0);
}

rd_kafka_conf_t *rd_kafka_conf_new(void) {
rd_kafka_conf_t *conf = rd_calloc(1, sizeof(*conf));
rd_assert(RD_KAFKA_CONF_PROPS_IDX_MAX > sizeof(*conf) &&
*"Increase RD_KAFKA_CONF_PROPS_IDX_MAX");
rd_kafka_defaultconf_set(_RK_GLOBAL, conf);
rd_kafka_default_client_id(conf);
rd_kafka_anyconf_clear_all_is_modified(conf);
return conf;
}
Expand Down
33 changes: 32 additions & 1 deletion src/rdkafka_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@

#include <errno.h>

#ifndef _WIN32
#include <pthread.h>
#endif

/* AIX doesn't have MSG_DONTWAIT */
#ifndef MSG_DONTWAIT
#define MSG_DONTWAIT MSG_NONBLOCK
Expand Down Expand Up @@ -1301,9 +1305,36 @@ void rd_kafka_transport_term (void) {
}
#endif

void rd_kafka_transport_init(void) {
#ifdef _WIN32
static RD_NOINLINE
BOOL CALLBACK rd_kafka_transport_init_once(PINIT_ONCE init_once,
PVOID parameter,
PVOID *context) {
WSADATA d;
(void)WSAStartup(MAKEWORD(2, 2), &d);
return TRUE;
}

RD_NOINLINE
void rd_kafka_transport_init(void) {
static INIT_ONCE _once = INIT_ONCE_STATIC_INIT;
(void)InitOnceExecuteOnce(&_once, rd_kafka_transport_init_once, NULL,
NULL);
}
#else
/* example/skeleton for future usage */
#if 0
static RD_NOINLINE
void rd_kafka_transport_init_once(void) {
}
#endif

RD_NOINLINE
void rd_kafka_transport_init(void) {
/* example/skeleton for future usage */
#if 0
static pthread_once_t _once = PTHREAD_ONCE_INIT;
(void)pthread_once(&_once, rd_kafka_transport_init_once);
#endif
}
#endif
1 change: 1 addition & 0 deletions src/rdposix.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@

#define RD_UNUSED __attribute__((unused))
#define RD_INLINE inline
#define RD_NOINLINE __attribute__((noinline))
#define RD_WARN_UNUSED_RESULT __attribute__((warn_unused_result))
#define RD_NORETURN __attribute__((noreturn))
#define RD_IS_CONSTANT(p) __builtin_constant_p((p))
Expand Down
3 changes: 2 additions & 1 deletion src/rdwin32.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ struct msghdr {
#endif

#define RD_UNUSED
#define RD_INLINE __inline
#define RD_INLINE __inline
#define RD_NOINLINE __declspec(noinline)
#define RD_WARN_UNUSED_RESULT
#define RD_NORETURN __declspec(noreturn)
#define RD_IS_CONSTANT(p) (0)
Expand Down
3 changes: 1 addition & 2 deletions tests/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -2217,8 +2217,7 @@ rd_kafka_t *test_create_handle(int mode, rd_kafka_conf_t *conf) {
test_socket_enable(conf);
#endif
} else {
if (!strcmp(test_conf_get(conf, "client.id"), "rdkafka"))
test_conf_set(conf, "client.id", test_curr->name);
test_conf_set(conf, "client.id", test_curr->name);
}

if (mode == RD_KAFKA_CONSUMER && test_consumer_group_protocol_str) {
Expand Down