diff --git a/CONFIGURATION.md b/CONFIGURATION.md
index 40b7412efd..8b10bf235e 100644
--- a/CONFIGURATION.md
+++ b/CONFIGURATION.md
@@ -4,7 +4,7 @@
Property | C/P | Range | Default | Importance | Description
-----------------------------------------|-----|-----------------|--------------:|------------| --------------------------
builtin.features | * | | gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, plugins, zstd, sasl_oauthbearer, http, oidc | low | Indicates the builtin features for this build of librdkafka. An application can either query this value or attempt to set it with its list of required features to check for library support.
*Type: CSV flags*
-client.id | * | | rdkafka | low | Client identifier.
*Type: string*
+client.id | * | | rdkafka@{hostname} | low | Client identifier. Fallback default value is "rdkafka" if current host name cannot be retrieved.
*Type: string*
metadata.broker.list | * | | | high | Initial list of brokers as a CSV list of broker host or host:port. The application may also use `rd_kafka_brokers_add()` to add brokers during runtime.
*Type: string*
bootstrap.servers | * | | | high | Alias for `metadata.broker.list`: Initial list of brokers as a CSV list of broker host or host:port. The application may also use `rd_kafka_brokers_add()` to add brokers during runtime.
*Type: string*
message.max.bytes | * | 1000 .. 1000000000 | 1000000 | medium | Maximum Kafka protocol request message size. Due to differing framing overhead between protocol versions the producer is unable to reliably enforce a strict max message limit at produce time and may exceed the maximum size by one message in protocol ProduceRequests, the broker will enforce the the topic's `max.message.bytes` limit (see Apache Kafka documentation).
*Type: integer*
diff --git a/examples/kafkatest_verifiable_client.cpp b/examples/kafkatest_verifiable_client.cpp
index 1afd1347ed..de2536f079 100644
--- a/examples/kafkatest_verifiable_client.cpp
+++ b/examples/kafkatest_verifiable_client.cpp
@@ -627,12 +627,6 @@ int main(int argc, char **argv) {
exit(1);
}
- {
- char hostname[128];
- gethostname(hostname, sizeof(hostname) - 1);
- conf->set("client.id", std::string("rdkafka@") + hostname, errstr);
- }
-
conf->set("log.thread.name", "true", errstr);
/* auto commit is explicitly enabled with --enable-autocommit */
diff --git a/src/rdkafka.h b/src/rdkafka.h
index c77f4d7925..89a839e34d 100644
--- a/src/rdkafka.h
+++ b/src/rdkafka.h
@@ -68,6 +68,7 @@ typedef SSIZE_T ssize_t;
#endif
#define RD_UNUSED
#define RD_INLINE __inline
+#define RD_NOINLINE __declspec(noinline)
#define RD_DEPRECATED __declspec(deprecated)
#define RD_FORMAT(...)
#undef RD_EXPORT
@@ -87,8 +88,9 @@ typedef SSIZE_T ssize_t;
#else
#include /* for sockaddr, .. */
-#define RD_UNUSED __attribute__((unused))
-#define RD_INLINE inline
+#define RD_UNUSED __attribute__((unused))
+#define RD_INLINE inline
+#define RD_NOINLINE __attribute__((noinline))
#define RD_EXPORT
#define RD_DEPRECATED __attribute__((deprecated))
diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c
index 0e346e7456..4b29682450 100644
--- a/src/rdkafka_conf.c
+++ b/src/rdkafka_conf.c
@@ -38,6 +38,7 @@
#include "rdkafka_int.h"
#include "rdkafka_feature.h"
#include "rdkafka_interceptor.h"
+#include "rdkafka_transport.h"
#include "rdkafka_idempotence.h"
#include "rdkafka_assignor.h"
#include "rdkafka_sasl_oauthbearer.h"
@@ -386,7 +387,9 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
{0x2000, "oidc", _UNSUPPORTED_OIDC},
{0, NULL}}},
{_RK_GLOBAL, "client.id", _RK_C_STR, _RK(client_id_str),
- "Client identifier.", .sdef = "rdkafka"},
+ "Client identifier. Fallback default value is \"rdkafka\" "
+ "if current host name cannot be retrieved.",
+ .sdef = "rdkafka@{hostname}" /* placeholder value */},
{_RK_GLOBAL | _RK_HIDDEN, "client.software.name", _RK_C_STR, _RK(sw_name),
"Client software name as reported to broker version >= v2.4.0. "
"Broker-side character restrictions apply, as of broker version "
@@ -2445,11 +2448,74 @@ static void rd_kafka_defaultconf_set(int scope, void *conf) {
}
}
+static const char *rd_kafka_client_id = NULL;
+
+static void rd_kafka_default_client_id_impl(void) {
+ const int stem_len = 8; /* strlen("rdkafka@") */
+ /* 256 = maximum hostname length among most platforms
+ however, some platforms have lesser value;
+ 8 = stem length, merely `strlen("rdkafka@")';
+ 1 = space for NUL byte.
+ */
+ static char client_id_buf[256 + 8 + 1];
+ int res;
+
+ (void)memset(client_id_buf, 0, sizeof(client_id_buf));
+ /* strcpy(client_id_buf, "rdkafka@"); */
+ (void)memcpy(client_id_buf, "rdkafka@", stem_len);
+
+ /* required on Windows before calling gethostname();
+ * on other platforms it does nothing (for now). */
+ rd_kafka_transport_init();
+
+ res = gethostname(client_id_buf + stem_len,
+ sizeof(client_id_buf) - stem_len - 1);
+ if (res != 0) {
+ /* if gethostname() is failed for whatever reason,
+ * trim "client_id_buf": produces "rdkafka". */
+ client_id_buf[stem_len - 1] = 0;
+ }
+ rd_kafka_client_id = (const char *)client_id_buf;
+}
+
+#ifdef _WIN32
+static RD_NOINLINE
+BOOL CALLBACK rd_kafka_default_client_id_once(PINIT_ONCE init_once,
+ PVOID parameter,
+ PVOID *context) {
+ rd_kafka_default_client_id_impl();
+ return TRUE;
+}
+#else
+static RD_NOINLINE
+void rd_kafka_default_client_id_once(void) {
+ rd_kafka_default_client_id_impl();
+}
+#endif
+
+static RD_NOINLINE
+void rd_kafka_default_client_id(rd_kafka_conf_t *conf) {
+ const char *client_id;
+#ifdef _WIN32
+ static INIT_ONCE _once = INIT_ONCE_STATIC_INIT;
+ (void)InitOnceExecuteOnce(&_once, rd_kafka_default_client_id_once, NULL,
+ NULL);
+#else
+ static pthread_once_t _once = PTHREAD_ONCE_INIT;
+ (void)pthread_once(&_once, rd_kafka_default_client_id_once);
+#endif
+ client_id = rd_kafka_client_id;
+ if (!client_id)
+ return;
+ (void)rd_kafka_conf_set(conf, "client.id", client_id, NULL, 0);
+}
+
rd_kafka_conf_t *rd_kafka_conf_new(void) {
rd_kafka_conf_t *conf = rd_calloc(1, sizeof(*conf));
rd_assert(RD_KAFKA_CONF_PROPS_IDX_MAX > sizeof(*conf) &&
*"Increase RD_KAFKA_CONF_PROPS_IDX_MAX");
rd_kafka_defaultconf_set(_RK_GLOBAL, conf);
+ rd_kafka_default_client_id(conf);
rd_kafka_anyconf_clear_all_is_modified(conf);
return conf;
}
diff --git a/src/rdkafka_transport.c b/src/rdkafka_transport.c
index b7c691d762..3ee652b1ef 100644
--- a/src/rdkafka_transport.c
+++ b/src/rdkafka_transport.c
@@ -43,6 +43,10 @@
#include
+#ifndef _WIN32
+#include
+#endif
+
/* AIX doesn't have MSG_DONTWAIT */
#ifndef MSG_DONTWAIT
#define MSG_DONTWAIT MSG_NONBLOCK
@@ -1301,9 +1305,36 @@ void rd_kafka_transport_term (void) {
}
#endif
-void rd_kafka_transport_init(void) {
#ifdef _WIN32
+static RD_NOINLINE
+BOOL CALLBACK rd_kafka_transport_init_once(PINIT_ONCE init_once,
+ PVOID parameter,
+ PVOID *context) {
WSADATA d;
(void)WSAStartup(MAKEWORD(2, 2), &d);
+ return TRUE;
+}
+
+RD_NOINLINE
+void rd_kafka_transport_init(void) {
+ static INIT_ONCE _once = INIT_ONCE_STATIC_INIT;
+ (void)InitOnceExecuteOnce(&_once, rd_kafka_transport_init_once, NULL,
+ NULL);
+}
+#else
+/* example/skeleton for future usage */
+#if 0
+static RD_NOINLINE
+void rd_kafka_transport_init_once(void) {
+}
+#endif
+
+RD_NOINLINE
+void rd_kafka_transport_init(void) {
+/* example/skeleton for future usage */
+#if 0
+ static pthread_once_t _once = PTHREAD_ONCE_INIT;
+ (void)pthread_once(&_once, rd_kafka_transport_init_once);
#endif
}
+#endif
diff --git a/src/rdposix.h b/src/rdposix.h
index 0af5948168..3a80b93a2d 100644
--- a/src/rdposix.h
+++ b/src/rdposix.h
@@ -57,6 +57,7 @@
#define RD_UNUSED __attribute__((unused))
#define RD_INLINE inline
+#define RD_NOINLINE __attribute__((noinline))
#define RD_WARN_UNUSED_RESULT __attribute__((warn_unused_result))
#define RD_NORETURN __attribute__((noreturn))
#define RD_IS_CONSTANT(p) __builtin_constant_p((p))
diff --git a/src/rdwin32.h b/src/rdwin32.h
index 40ea43a7ac..9e1d70208d 100644
--- a/src/rdwin32.h
+++ b/src/rdwin32.h
@@ -75,7 +75,8 @@ struct msghdr {
#endif
#define RD_UNUSED
-#define RD_INLINE __inline
+#define RD_INLINE __inline
+#define RD_NOINLINE __declspec(noinline)
#define RD_WARN_UNUSED_RESULT
#define RD_NORETURN __declspec(noreturn)
#define RD_IS_CONSTANT(p) (0)
diff --git a/tests/test.c b/tests/test.c
index 51c6e2a293..e6af8d4f87 100644
--- a/tests/test.c
+++ b/tests/test.c
@@ -2217,8 +2217,7 @@ rd_kafka_t *test_create_handle(int mode, rd_kafka_conf_t *conf) {
test_socket_enable(conf);
#endif
} else {
- if (!strcmp(test_conf_get(conf, "client.id"), "rdkafka"))
- test_conf_set(conf, "client.id", test_curr->name);
+ test_conf_set(conf, "client.id", test_curr->name);
}
if (mode == RD_KAFKA_CONSUMER && test_consumer_group_protocol_str) {