From 1a0a4e39530e98f698d7e16a87afb4f92370b64d Mon Sep 17 00:00:00 2001 From: Konstantin Demin Date: Fri, 13 Feb 2026 13:28:32 +0300 Subject: [PATCH 1/5] Define RD_NOINLINE as opposite to RD_INLINE This allows to mark functions to not be inlined even if compiler decides that inlining is possible. Signed-off-by: Konstantin Demin --- src/rdkafka.h | 6 ++++-- src/rdposix.h | 1 + src/rdwin32.h | 3 ++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/rdkafka.h b/src/rdkafka.h index c77f4d7925..89a839e34d 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -68,6 +68,7 @@ typedef SSIZE_T ssize_t; #endif #define RD_UNUSED #define RD_INLINE __inline +#define RD_NOINLINE __declspec(noinline) #define RD_DEPRECATED __declspec(deprecated) #define RD_FORMAT(...) #undef RD_EXPORT @@ -87,8 +88,9 @@ typedef SSIZE_T ssize_t; #else #include /* for sockaddr, .. */ -#define RD_UNUSED __attribute__((unused)) -#define RD_INLINE inline +#define RD_UNUSED __attribute__((unused)) +#define RD_INLINE inline +#define RD_NOINLINE __attribute__((noinline)) #define RD_EXPORT #define RD_DEPRECATED __attribute__((deprecated)) diff --git a/src/rdposix.h b/src/rdposix.h index 0af5948168..3a80b93a2d 100644 --- a/src/rdposix.h +++ b/src/rdposix.h @@ -57,6 +57,7 @@ #define RD_UNUSED __attribute__((unused)) #define RD_INLINE inline +#define RD_NOINLINE __attribute__((noinline)) #define RD_WARN_UNUSED_RESULT __attribute__((warn_unused_result)) #define RD_NORETURN __attribute__((noreturn)) #define RD_IS_CONSTANT(p) __builtin_constant_p((p)) diff --git a/src/rdwin32.h b/src/rdwin32.h index 40ea43a7ac..9e1d70208d 100644 --- a/src/rdwin32.h +++ b/src/rdwin32.h @@ -75,7 +75,8 @@ struct msghdr { #endif #define RD_UNUSED -#define RD_INLINE __inline +#define RD_INLINE __inline +#define RD_NOINLINE __declspec(noinline) #define RD_WARN_UNUSED_RESULT #define RD_NORETURN __declspec(noreturn) #define RD_IS_CONSTANT(p) (0) From 95d601d241c624b296e6aa07b035a9974d631323 Mon Sep 17 00:00:00 2001 From: Konstantin Demin Date: Fri, 13 Feb 2026 13:28:32 +0300 Subject: [PATCH 2/5] Ensure rd_kafka_transport_init() is called once This change will raise minimum requirements for Windows builds due to InitOnceExecuteOnce() usage: Windows Vista / Windows Server 2008 are minimum supported systems. Signed-off-by: Konstantin Demin --- src/rdkafka_transport.c | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/src/rdkafka_transport.c b/src/rdkafka_transport.c index b7c691d762..3ee652b1ef 100644 --- a/src/rdkafka_transport.c +++ b/src/rdkafka_transport.c @@ -43,6 +43,10 @@ #include +#ifndef _WIN32 +#include +#endif + /* AIX doesn't have MSG_DONTWAIT */ #ifndef MSG_DONTWAIT #define MSG_DONTWAIT MSG_NONBLOCK @@ -1301,9 +1305,36 @@ void rd_kafka_transport_term (void) { } #endif -void rd_kafka_transport_init(void) { #ifdef _WIN32 +static RD_NOINLINE +BOOL CALLBACK rd_kafka_transport_init_once(PINIT_ONCE init_once, + PVOID parameter, + PVOID *context) { WSADATA d; (void)WSAStartup(MAKEWORD(2, 2), &d); + return TRUE; +} + +RD_NOINLINE +void rd_kafka_transport_init(void) { + static INIT_ONCE _once = INIT_ONCE_STATIC_INIT; + (void)InitOnceExecuteOnce(&_once, rd_kafka_transport_init_once, NULL, + NULL); +} +#else +/* example/skeleton for future usage */ +#if 0 +static RD_NOINLINE +void rd_kafka_transport_init_once(void) { +} +#endif + +RD_NOINLINE +void rd_kafka_transport_init(void) { +/* example/skeleton for future usage */ +#if 0 + static pthread_once_t _once = PTHREAD_ONCE_INIT; + (void)pthread_once(&_once, rd_kafka_transport_init_once); #endif } +#endif From 7c00c72e5d7eddd8c8111b672faaee169b8742e8 Mon Sep 17 00:00:00 2001 From: Konstantin Demin Date: Fri, 13 Feb 2026 13:28:32 +0300 Subject: [PATCH 3/5] Adjust buffer size to avoid hostname truncation Also initialize buffer with zeros (with good intentions). Signed-off-by: Konstantin Demin --- examples/kafkatest_verifiable_client.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/kafkatest_verifiable_client.cpp b/examples/kafkatest_verifiable_client.cpp index 1afd1347ed..faadd6e44e 100644 --- a/examples/kafkatest_verifiable_client.cpp +++ b/examples/kafkatest_verifiable_client.cpp @@ -628,7 +628,7 @@ int main(int argc, char **argv) { } { - char hostname[128]; + char hostname[260] = { 0 }; gethostname(hostname, sizeof(hostname) - 1); conf->set("client.id", std::string("rdkafka@") + hostname, errstr); } From 0b16e689c6d064c91dc088b2c892ecee0ee67f2c Mon Sep 17 00:00:00 2001 From: Konstantin Demin Date: Fri, 13 Feb 2026 13:28:32 +0300 Subject: [PATCH 4/5] Produce deterministic "client.id" by default Set "client.id" to "rdkafka@{hostname}" and fallback to "rdkafka" (previous default value) if there're problems with retrieving current host name. Signed-off-by: Konstantin Demin --- CONFIGURATION.md | 2 +- src/rdkafka_conf.c | 68 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 68 insertions(+), 2 deletions(-) diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 40b7412efd..8b10bf235e 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -4,7 +4,7 @@ Property | C/P | Range | Default | Importance | Description -----------------------------------------|-----|-----------------|--------------:|------------| -------------------------- builtin.features | * | | gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, plugins, zstd, sasl_oauthbearer, http, oidc | low | Indicates the builtin features for this build of librdkafka. An application can either query this value or attempt to set it with its list of required features to check for library support.
*Type: CSV flags* -client.id | * | | rdkafka | low | Client identifier.
*Type: string* +client.id | * | | rdkafka@{hostname} | low | Client identifier. Fallback default value is "rdkafka" if current host name cannot be retrieved.
*Type: string* metadata.broker.list | * | | | high | Initial list of brokers as a CSV list of broker host or host:port. The application may also use `rd_kafka_brokers_add()` to add brokers during runtime.
*Type: string* bootstrap.servers | * | | | high | Alias for `metadata.broker.list`: Initial list of brokers as a CSV list of broker host or host:port. The application may also use `rd_kafka_brokers_add()` to add brokers during runtime.
*Type: string* message.max.bytes | * | 1000 .. 1000000000 | 1000000 | medium | Maximum Kafka protocol request message size. Due to differing framing overhead between protocol versions the producer is unable to reliably enforce a strict max message limit at produce time and may exceed the maximum size by one message in protocol ProduceRequests, the broker will enforce the the topic's `max.message.bytes` limit (see Apache Kafka documentation).
*Type: integer* diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index 0e346e7456..4b29682450 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -38,6 +38,7 @@ #include "rdkafka_int.h" #include "rdkafka_feature.h" #include "rdkafka_interceptor.h" +#include "rdkafka_transport.h" #include "rdkafka_idempotence.h" #include "rdkafka_assignor.h" #include "rdkafka_sasl_oauthbearer.h" @@ -386,7 +387,9 @@ static const struct rd_kafka_property rd_kafka_properties[] = { {0x2000, "oidc", _UNSUPPORTED_OIDC}, {0, NULL}}}, {_RK_GLOBAL, "client.id", _RK_C_STR, _RK(client_id_str), - "Client identifier.", .sdef = "rdkafka"}, + "Client identifier. Fallback default value is \"rdkafka\" " + "if current host name cannot be retrieved.", + .sdef = "rdkafka@{hostname}" /* placeholder value */}, {_RK_GLOBAL | _RK_HIDDEN, "client.software.name", _RK_C_STR, _RK(sw_name), "Client software name as reported to broker version >= v2.4.0. " "Broker-side character restrictions apply, as of broker version " @@ -2445,11 +2448,74 @@ static void rd_kafka_defaultconf_set(int scope, void *conf) { } } +static const char *rd_kafka_client_id = NULL; + +static void rd_kafka_default_client_id_impl(void) { + const int stem_len = 8; /* strlen("rdkafka@") */ + /* 256 = maximum hostname length among most platforms + however, some platforms have lesser value; + 8 = stem length, merely `strlen("rdkafka@")'; + 1 = space for NUL byte. + */ + static char client_id_buf[256 + 8 + 1]; + int res; + + (void)memset(client_id_buf, 0, sizeof(client_id_buf)); + /* strcpy(client_id_buf, "rdkafka@"); */ + (void)memcpy(client_id_buf, "rdkafka@", stem_len); + + /* required on Windows before calling gethostname(); + * on other platforms it does nothing (for now). */ + rd_kafka_transport_init(); + + res = gethostname(client_id_buf + stem_len, + sizeof(client_id_buf) - stem_len - 1); + if (res != 0) { + /* if gethostname() is failed for whatever reason, + * trim "client_id_buf": produces "rdkafka". */ + client_id_buf[stem_len - 1] = 0; + } + rd_kafka_client_id = (const char *)client_id_buf; +} + +#ifdef _WIN32 +static RD_NOINLINE +BOOL CALLBACK rd_kafka_default_client_id_once(PINIT_ONCE init_once, + PVOID parameter, + PVOID *context) { + rd_kafka_default_client_id_impl(); + return TRUE; +} +#else +static RD_NOINLINE +void rd_kafka_default_client_id_once(void) { + rd_kafka_default_client_id_impl(); +} +#endif + +static RD_NOINLINE +void rd_kafka_default_client_id(rd_kafka_conf_t *conf) { + const char *client_id; +#ifdef _WIN32 + static INIT_ONCE _once = INIT_ONCE_STATIC_INIT; + (void)InitOnceExecuteOnce(&_once, rd_kafka_default_client_id_once, NULL, + NULL); +#else + static pthread_once_t _once = PTHREAD_ONCE_INIT; + (void)pthread_once(&_once, rd_kafka_default_client_id_once); +#endif + client_id = rd_kafka_client_id; + if (!client_id) + return; + (void)rd_kafka_conf_set(conf, "client.id", client_id, NULL, 0); +} + rd_kafka_conf_t *rd_kafka_conf_new(void) { rd_kafka_conf_t *conf = rd_calloc(1, sizeof(*conf)); rd_assert(RD_KAFKA_CONF_PROPS_IDX_MAX > sizeof(*conf) && *"Increase RD_KAFKA_CONF_PROPS_IDX_MAX"); rd_kafka_defaultconf_set(_RK_GLOBAL, conf); + rd_kafka_default_client_id(conf); rd_kafka_anyconf_clear_all_is_modified(conf); return conf; } From 76b1aa9e4acd570a3c13453d05b42b37da4f335a Mon Sep 17 00:00:00 2001 From: Konstantin Demin Date: Fri, 13 Feb 2026 13:28:32 +0300 Subject: [PATCH 5/5] Adjust "client.id" in examples and tests Signed-off-by: Konstantin Demin --- examples/kafkatest_verifiable_client.cpp | 6 ------ tests/test.c | 3 +-- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/examples/kafkatest_verifiable_client.cpp b/examples/kafkatest_verifiable_client.cpp index faadd6e44e..de2536f079 100644 --- a/examples/kafkatest_verifiable_client.cpp +++ b/examples/kafkatest_verifiable_client.cpp @@ -627,12 +627,6 @@ int main(int argc, char **argv) { exit(1); } - { - char hostname[260] = { 0 }; - gethostname(hostname, sizeof(hostname) - 1); - conf->set("client.id", std::string("rdkafka@") + hostname, errstr); - } - conf->set("log.thread.name", "true", errstr); /* auto commit is explicitly enabled with --enable-autocommit */ diff --git a/tests/test.c b/tests/test.c index 51c6e2a293..e6af8d4f87 100644 --- a/tests/test.c +++ b/tests/test.c @@ -2217,8 +2217,7 @@ rd_kafka_t *test_create_handle(int mode, rd_kafka_conf_t *conf) { test_socket_enable(conf); #endif } else { - if (!strcmp(test_conf_get(conf, "client.id"), "rdkafka")) - test_conf_set(conf, "client.id", test_curr->name); + test_conf_set(conf, "client.id", test_curr->name); } if (mode == RD_KAFKA_CONSUMER && test_consumer_group_protocol_str) {