diff --git a/CHANGELOG.md b/CHANGELOG.md
index 514e189349..ff2b7438e4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,9 @@
+# librdkafka v2.14.0
+
+librdkafka v2.14.0 is a feature release:
+
+* [KIP-768](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575#KIP768:ExtendSASL/OAUTHBEARERwithSupportforOIDC-ClientConfiguration) Extend SASL/OAUTHBEARER to support OIDC claim mapping beyond the default `sub` claim (#5336).
+
# librdkafka v2.13.2
librdkafka v2.13.2 is a maintenance release:
diff --git a/CONFIGURATION.md b/CONFIGURATION.md
index 40b7412efd..336830ecad 100644
--- a/CONFIGURATION.md
+++ b/CONFIGURATION.md
@@ -108,6 +108,7 @@ sasl.oauthbearer.client.secret | * | |
sasl.oauthbearer.scope | * | | | low | Client use this to specify the scope of the access request to the broker. Only used when `sasl.oauthbearer.method` is set to "oidc".
*Type: string*
sasl.oauthbearer.extensions | * | | | low | Allow additional information to be provided to the broker. Comma-separated list of key=value pairs. E.g., "supportFeatureX=true,organizationId=sales-emea".Only used when `sasl.oauthbearer.method` is set to "oidc".
*Type: string*
sasl.oauthbearer.token.endpoint.url | * | | | low | OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token. Only used when `sasl.oauthbearer.method` is set to "oidc".
*Type: string*
+sasl.oauthbearer.sub.claim.name | * | | sub | low | JWT claim name to use as the subject (principal) when validating OIDC access tokens. Must be present in the JWT payload with a non-empty value. Should match the broker's `sasl.oauthbearer.sub.claim.name` configuration for consistent authentication. Only used when `sasl.oauthbearer.method` is set to "oidc".
*Type: string*
sasl.oauthbearer.grant.type | * | client_credentials, urn:ietf:params:oauth:grant-type:jwt-bearer | client_credentials | low | OAuth grant type to use when communicating with the identity provider.
*Type: enum value*
sasl.oauthbearer.assertion.algorithm | * | RS256, ES256 | RS256 | low | Algorithm the client should use to sign the assertion sent to the identity provider and in the OAuth alg header in the JWT assertion.
*Type: enum value*
sasl.oauthbearer.assertion.private.key.file | * | | | low | Path to client's private key (PEM) used for authentication when using the JWT assertion.
*Type: string*
diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c
index 0e346e7456..39438e7551 100644
--- a/src/rdkafka_conf.c
+++ b/src/rdkafka_conf.c
@@ -353,6 +353,20 @@ rd_kafka_conf_validate_partitioner(const struct rd_kafka_property *prop,
!strcmp(val, "fnv1a_random");
}
+/**
+ * @brief Validate that a string is non-null, non-empty, and not
+ * whitespace-only.
+ */
+static rd_bool_t rd_kafka_conf_validate_str(const char *value) {
+ const char *p;
+ if (!value || !*value)
+ return rd_false;
+ for (p = value; *p; p++) {
+ if (!isspace((int)*p))
+ return rd_true;
+ }
+ return rd_false;
+}
/**
* librdkafka configuration property definitions.
@@ -1121,6 +1135,15 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token. "
"Only used when `sasl.oauthbearer.method` is set to \"oidc\".",
_UNSUPPORTED_OIDC},
+ {_RK_GLOBAL, "sasl.oauthbearer.sub.claim.name", _RK_C_STR,
+ _RK(sasl.oauthbearer.sub_claim_name),
+ "JWT claim name to use as the subject (principal) when validating "
+ "OIDC access tokens. Must be present in the JWT payload with a "
+ "non-empty value. Should match the broker's "
+ "`sasl.oauthbearer.sub.claim.name` configuration for consistent "
+ "authentication. "
+ "Only used when `sasl.oauthbearer.method` is set to \"oidc\".",
+ .sdef = "sub", _UNSUPPORTED_OIDC},
{
_RK_GLOBAL,
"sasl.oauthbearer.grant.type",
@@ -4133,6 +4156,13 @@ const char *rd_kafka_conf_finalize_oauthbearer_oidc(rd_kafka_conf_t *conf) {
conf->enabled_events |= RD_KAFKA_EVENT_BACKGROUND;
conf->sasl.enable_callback_queue = 1;
}
+
+ if (rd_kafka_conf_is_modified(conf,
+ "sasl.oauthbearer.sub.claim.name") &&
+ !rd_kafka_conf_validate_str(conf->sasl.oauthbearer.sub_claim_name))
+ return "`sasl.oauthbearer.sub.claim.name` must be "
+ "non-empty and not contain only whitespace";
+
return NULL;
}
@@ -4879,6 +4909,19 @@ int unittest_conf(void) {
rd_kafka_conf_destroy(conf);
+ /* Verify rd_kafka_conf_validate_str */
+ RD_UT_ASSERT(!rd_kafka_conf_validate_str(NULL), "NULL must be invalid");
+ RD_UT_ASSERT(!rd_kafka_conf_validate_str(""),
+ "empty string must be invalid");
+ RD_UT_ASSERT(!rd_kafka_conf_validate_str(" "),
+ "whitespace-only string must be invalid");
+ RD_UT_ASSERT(!rd_kafka_conf_validate_str("\t\n"),
+ "tab/newline-only string must be invalid");
+ RD_UT_ASSERT(rd_kafka_conf_validate_str("sub"),
+ "\"sub\" must be valid");
+ RD_UT_ASSERT(rd_kafka_conf_validate_str(" sub "),
+ "\" sub \" must be valid");
+
RD_UT_PASS();
}
diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h
index 92e5193eb7..f15d428f55 100644
--- a/src/rdkafka_conf.h
+++ b/src/rdkafka_conf.h
@@ -376,6 +376,7 @@ struct rd_kafka_conf_s {
char *extensions_str;
+ char *sub_claim_name;
rd_bool_t builtin_token_refresh_cb;
/* SASL/OAUTHBEARER token refresh event callback */
void (*token_refresh_cb)(rd_kafka_t *rk,
diff --git a/src/rdkafka_sasl_oauthbearer_oidc.c b/src/rdkafka_sasl_oauthbearer_oidc.c
index b1d729638c..262a69adde 100644
--- a/src/rdkafka_sasl_oauthbearer_oidc.c
+++ b/src/rdkafka_sasl_oauthbearer_oidc.c
@@ -674,6 +674,7 @@ static char *rd_kafka_oidc_assertion_read_from_file(const char *file_path) {
*/
static char *rd_kafka_oidc_token_try_validate(cJSON *json,
const char *field,
+ const char *sub_claim_name,
char **sub,
double *exp,
char *errstr,
@@ -729,19 +730,26 @@ static char *rd_kafka_oidc_token_try_validate(cJSON *json,
goto fail;
}
- jwt_sub = cJSON_GetObjectItem(payloads, "sub");
+ rd_dassert(sub_claim_name && *sub_claim_name);
+
+ jwt_sub = cJSON_GetObjectItem(payloads, sub_claim_name);
if (jwt_sub == NULL) {
rd_snprintf(errstr, errstr_size,
"Expected JSON JWT response with "
- "\"sub\" field");
+ "\"%s\" field",
+ sub_claim_name);
goto fail;
}
*sub = cJSON_GetStringValue(jwt_sub);
- if (*sub == NULL) {
+ if (*sub == NULL || **sub == '\0') {
+ /* Reset to NULL to prevent a dangling pointer to cJSON
+ * internal memory after cJSON_Delete(payloads) */
+ *sub = NULL;
rd_snprintf(errstr, errstr_size,
"Expected JSON JWT response with "
- "valid \"sub\" field");
+ "valid \"%s\" field (non-empty value required)",
+ sub_claim_name);
goto fail;
}
*sub = rd_strdup(*sub);
@@ -857,13 +865,14 @@ void rd_kafka_oidc_token_jwt_bearer_refresh_cb(rd_kafka_t *rk,
* This function will try to validate the `access_token` and then the
* `id_token`.
*/
- jwt_token = rd_kafka_oidc_token_try_validate(json, "access_token", &sub,
- &exp, validate_errstr,
- sizeof(validate_errstr));
+ jwt_token = rd_kafka_oidc_token_try_validate(
+ json, "access_token", rk->rk_conf.sasl.oauthbearer.sub_claim_name,
+ &sub, &exp, validate_errstr, sizeof(validate_errstr));
if (!jwt_token)
jwt_token = rd_kafka_oidc_token_try_validate(
- json, "id_token", &sub, &exp, validate_errstr,
- sizeof(validate_errstr));
+ json, "id_token",
+ rk->rk_conf.sasl.oauthbearer.sub_claim_name, &sub, &exp,
+ validate_errstr, sizeof(validate_errstr));
if (!jwt_token) {
rd_kafka_oauthbearer_set_token_failure(rk, validate_errstr);
@@ -965,9 +974,9 @@ void rd_kafka_oidc_token_client_credentials_refresh_cb(
goto done;
}
- jwt_token = rd_kafka_oidc_token_try_validate(json, "access_token", &sub,
- &exp, set_token_errstr,
- sizeof(set_token_errstr));
+ jwt_token = rd_kafka_oidc_token_try_validate(
+ json, "access_token", rk->rk_conf.sasl.oauthbearer.sub_claim_name,
+ &sub, &exp, set_token_errstr, sizeof(set_token_errstr));
if (!jwt_token) {
rd_kafka_oauthbearer_set_token_failure(rk, set_token_errstr);
goto done;
@@ -1083,9 +1092,9 @@ void rd_kafka_oidc_token_metadata_azure_imds_refresh_cb(
goto done;
}
- jwt_token = rd_kafka_oidc_token_try_validate(json, "access_token", &sub,
- &exp, set_token_errstr,
- sizeof(set_token_errstr));
+ jwt_token = rd_kafka_oidc_token_try_validate(
+ json, "access_token", rk->rk_conf.sasl.oauthbearer.sub_claim_name,
+ &sub, &exp, set_token_errstr, sizeof(set_token_errstr));
if (!jwt_token) {
rd_kafka_oauthbearer_set_token_failure(rk, set_token_errstr);
goto done;
@@ -1312,6 +1321,132 @@ static int ut_sasl_oauthbearer_oidc_post_fields_with_empty_scope(void) {
RD_UT_PASS();
}
+/** All JWTs use a fake signature since token_try_validate() only
+ * decodes and inspects the payload; signature verification is done
+ * separately by the broker.
+ *
+ * JWT payloads (decoded):
+ * JWT_SUB_ONLY:
+ * {"exp":9999999999,"iat":1000000000,"sub":"subject"}
+ *
+ * JWT_MULTI_CLAIMS:
+ * {"exp":9999999999,"iat":1000000000,"sub":"subject",
+ * "client_id":"client_id_123","azp":"azp_123"}
+ *
+ * JWT_EMPTY_SUB:
+ * {"exp":9999999999,"iat":1000000000,"sub":""}
+ *
+ * JWT_MISSING_SUB:
+ * {"exp":9999999999,"iat":1000000000,"client_id":"client_id_123"}
+ */
+/* payload: {"exp":9999999999,"iat":1000000000,"sub":"subject"} */
+#define UT_JWT_SUB_ONLY \
+ "eyJhbGciOiJSUzI1NiIsImtpZCI6ImFiY2RlZmcifQ" \
+ "." \
+ "eyJleHAiOjk5OTk5OTk5OTksImlhdCI6MTAwMDAwMDAwMCwic" \
+ "3ViIjoic3ViamVjdCJ9" \
+ "." \
+ "fakesignature"
+/* payload: {"exp":9999999999,"iat":1000000000,"sub":"subject",
+ * "client_id":"client_id_123","azp":"azp_123"} */
+#define UT_JWT_MULTI_CLAIMS \
+ "eyJhbGciOiJSUzI1NiIsImtpZCI6ImFiY2RlZmcifQ" \
+ "." \
+ "eyJleHAiOjk5OTk5OTk5OTksImlhdCI6MTAwMDAwMDAwMCwic3ViIjoic3ViamVj" \
+ "dCIsImNsaWVudF9pZCI6ImNsaWVudF9pZF8xMjMiLCJhenAiOiJhenBfMTIzIn0" \
+ "." \
+ "fakesignature"
+/* payload: {"exp":9999999999,"iat":1000000000,"sub":""} */
+#define UT_JWT_EMPTY_SUB \
+ "eyJhbGciOiJSUzI1NiIsImtpZCI6ImFiY2RlZmcifQ" \
+ "." \
+ "eyJleHAiOjk5OTk5OTk5OTksImlhdCI6MTAwMDAwMDAwMCwic3ViIjoiIn0" \
+ "." \
+ "fakesignature"
+/* payload: {"exp":9999999999,"iat":1000000000,"client_id":"client_id_123"} */
+#define UT_JWT_MISSING_SUB \
+ "eyJhbGciOiJSUzI1NiIsImtpZCI6ImFiY2RlZmcifQ" \
+ "." \
+ "eyJleHAiOjk5OTk5OTk5OTksImlhdCI6MTAwMDAwMDAwMCwiY2xpZW50X2lkIjoi" \
+ "Y2xpZW50X2lkXzEyMyJ9" \
+ "." \
+ "fakesignature"
+
+/**
+ * @brief Verifies the extraction logic of the subject from the configured JWT
+ * claim name, falls back to "sub" when unconfigured, and rejects missing or
+ * empty claim values.
+ */
+static int ut_sasl_oauthbearer_oidc_sub_claim_name(void) {
+
+ const struct {
+ const char *test_name;
+ const char *jwt;
+ const char *sub_claim_name;
+ rd_bool_t expect_success;
+ const char *expected_sub;
+ } tests[] = {
+ {"Explicit 'sub' claim name", UT_JWT_SUB_ONLY, "sub", rd_true,
+ "subject"},
+ {"Custom 'client_id' claim", UT_JWT_MULTI_CLAIMS, "client_id",
+ rd_true, "client_id_123"},
+ {"Custom 'azp' claim", UT_JWT_MULTI_CLAIMS, "azp", rd_true,
+ "azp_123"},
+ {"Custom 'client_id' claim succeeds without sub",
+ UT_JWT_MISSING_SUB, "client_id", rd_true, "client_id_123"},
+ {"Missing 'sub' claim fails", UT_JWT_MISSING_SUB, "sub", rd_false,
+ NULL},
+ {"Empty 'sub' value fails", UT_JWT_EMPTY_SUB, "sub", rd_false,
+ NULL},
+ {"Nonexistent claim name fails", UT_JWT_SUB_ONLY, "nonexistent",
+ rd_false, NULL},
+ };
+
+ unsigned int i;
+
+ RD_UT_BEGIN();
+
+ for (i = 0; i < RD_ARRAYSIZE(tests); i++) {
+ char *sub = NULL;
+ double exp_v = 0;
+ char errstr[256];
+ char *result;
+ cJSON *json;
+ char access_token_json[2048];
+
+ rd_snprintf(access_token_json, sizeof(access_token_json),
+ "{\"access_token\":\"%s\"}", tests[i].jwt);
+ json = cJSON_Parse(access_token_json);
+ RD_UT_ASSERT(json != NULL, "[%s] Failed to build test JSON",
+ tests[i].test_name);
+
+ result = rd_kafka_oidc_token_try_validate(
+ json, "access_token", tests[i].sub_claim_name, &sub, &exp_v,
+ errstr, sizeof(errstr));
+
+ if (tests[i].expect_success) {
+ RD_UT_ASSERT(result != NULL,
+ "[%s] Expected success but got error: %s",
+ tests[i].test_name, errstr);
+ RD_UT_ASSERT(sub != NULL, "[%s] Expected non-NULL sub",
+ tests[i].test_name);
+ RD_UT_ASSERT(!strcmp(sub, tests[i].expected_sub),
+ "[%s] Expected sub '%s', got '%s'",
+ tests[i].test_name, tests[i].expected_sub,
+ sub);
+ } else {
+ RD_UT_ASSERT(result == NULL,
+ "[%s] Expected failure but got sub '%s'",
+ tests[i].test_name, sub ? sub : "(null)");
+ }
+
+ RD_IF_FREE(sub, rd_free);
+ cJSON_Delete(json);
+ }
+
+ RD_UT_PASS();
+}
+
/**
* @brief make sure the jwt is able to be extracted from HTTP(S) requests
@@ -1323,6 +1458,7 @@ int unittest_sasl_oauthbearer_oidc(void) {
fails += ut_sasl_oauthbearer_oidc_with_empty_key();
fails += ut_sasl_oauthbearer_oidc_post_fields();
fails += ut_sasl_oauthbearer_oidc_post_fields_with_empty_scope();
+ fails += ut_sasl_oauthbearer_oidc_sub_claim_name();
return fails;
}
diff --git a/tests/0126-oauthbearer_oidc.c b/tests/0126-oauthbearer_oidc.c
index 9a7790ef5a..4fc80cfa87 100644
--- a/tests/0126-oauthbearer_oidc.c
+++ b/tests/0126-oauthbearer_oidc.c
@@ -148,7 +148,6 @@ static void do_test_produce_consumer_with_OIDC_expired_token_should_fail(
SUB_TEST_PASS();
}
-
/**
* @brief After configiguring OIDC, make sure the
* authentication fails as expected.
@@ -494,6 +493,119 @@ void do_test_produce_consumer_with_OIDC_metadata_authentication(
}
}
+typedef enum oidc_configuration_sub_claim_variation_t {
+ /** Use default "sub" claim (backward compatibility). */
+ OIDC_CONFIGURATION_SUB_CLAIM_VARIATION_DEFAULT_SUB,
+ /** Explicitly set "sub" as the claim name. */
+ OIDC_CONFIGURATION_SUB_CLAIM_VARIATION_EXPLICIT_SUB,
+ /** Use custom claim name "client_id". */
+ OIDC_CONFIGURATION_SUB_CLAIM_VARIATION_CUSTOM_CLIENT_ID,
+ /** Set empty string "" — resets to default "sub" per librdkafka string
+ semantics. */
+ OIDC_CONFIGURATION_SUB_CLAIM_VARIATION_EMPTY_STRING,
+ /** Use a claim name that doesn't exist in the token (should fail). */
+ OIDC_CONFIGURATION_SUB_CLAIM_VARIATION_MISSING_CLAIM,
+ OIDC_CONFIGURATION_SUB_CLAIM_VARIATION__CNT
+} oidc_configuration_sub_claim_variation_t;
+
+#define OIDC_CONFIGURATION_SUB_CLAIM_VARIATION__FIRST_FAILING \
+ OIDC_CONFIGURATION_SUB_CLAIM_VARIATION_MISSING_CLAIM
+
+static const char *oidc_configuration_sub_claim_variation_name(
+ oidc_configuration_sub_claim_variation_t variation) {
+ rd_assert(variation >=
+ OIDC_CONFIGURATION_SUB_CLAIM_VARIATION_DEFAULT_SUB &&
+ variation < OIDC_CONFIGURATION_SUB_CLAIM_VARIATION__CNT);
+ static const char *names[] = {
+ "default sub claim", "explicit sub claim", "custom client_id claim",
+ "empty string (defaults to sub)", "missing claim (should fail)"};
+ return names[variation];
+}
+
+/**
+ * @brief Configure OIDC with different subject claim name variations.
+ *
+ * Note: This test assumes the OIDC token provider returns tokens with
+ * standard claims including "sub" and "client_id".
+ * The test validates that librdkafka can extract the subject from
+ * different claims based on configuration.
+ */
+static rd_kafka_conf_t *oidc_configuration_sub_claim(
+ rd_kafka_conf_t *conf,
+ oidc_configuration_sub_claim_variation_t variation) {
+ conf = rd_kafka_conf_dup(conf);
+
+ switch (variation) {
+ case OIDC_CONFIGURATION_SUB_CLAIM_VARIATION_DEFAULT_SUB:
+ break;
+ case OIDC_CONFIGURATION_SUB_CLAIM_VARIATION_EXPLICIT_SUB:
+ test_conf_set(conf, "sasl.oauthbearer.sub.claim.name", "sub");
+ break;
+ case OIDC_CONFIGURATION_SUB_CLAIM_VARIATION_CUSTOM_CLIENT_ID:
+ test_conf_set(conf, "sasl.oauthbearer.sub.claim.name",
+ "client_id");
+ break;
+ case OIDC_CONFIGURATION_SUB_CLAIM_VARIATION_EMPTY_STRING:
+ test_conf_set(conf, "sasl.oauthbearer.sub.claim.name", "");
+ break;
+ case OIDC_CONFIGURATION_SUB_CLAIM_VARIATION_MISSING_CLAIM:
+ test_conf_set(conf, "sasl.oauthbearer.sub.claim.name",
+ "nonexistent_claim");
+ break;
+ default:
+ rd_assert(!*"Unknown OIDC sub claim test variation");
+ }
+ return conf;
+}
+
+/**
+ * @brief Test producer and consumer with different subject claim name
+ * configurations.
+ *
+ * This test validates KIP-768 parity for sasl.oauthbearer.sub.claim.name:
+ * - Default behavior uses "sub" claim
+ * - Custom claim names can be configured
+ * - Missing configured claim causes validation failure
+ * - Non-empty claim value is enforced
+ */
+void do_test_produce_consumer_with_OIDC_sub_claim(rd_kafka_conf_t *conf) {
+ rd_kafka_conf_t *sub_claim_conf;
+ oidc_configuration_sub_claim_variation_t variation;
+
+ const char *url = test_getenv("VALID_OIDC_URL", NULL);
+
+ /* Check if we should skip sub claim tests based on environment */
+ if (!url) {
+ SUB_TEST_SKIP(
+ "VALID_OIDC_URL environment variable is not set, "
+ "skipping sub claim tests\n");
+ return;
+ }
+
+ for (variation = OIDC_CONFIGURATION_SUB_CLAIM_VARIATION_DEFAULT_SUB;
+ variation < OIDC_CONFIGURATION_SUB_CLAIM_VARIATION__CNT;
+ variation++) {
+ const char *test_name;
+ sub_claim_conf = oidc_configuration_sub_claim(conf, variation);
+
+ test_name = tsprintf(
+ "Sub claim variation: %s\n",
+ oidc_configuration_sub_claim_variation_name(variation));
+
+ if (variation <
+ OIDC_CONFIGURATION_SUB_CLAIM_VARIATION__FIRST_FAILING) {
+ /* These variations should succeed */
+ do_test_produce_consumer_with_OIDC(test_name,
+ sub_claim_conf);
+ } else {
+ /* These variations should fail */
+ do_test_produce_consumer_with_OIDC_should_fail(
+ test_name, sub_claim_conf);
+ }
+ rd_kafka_conf_destroy(sub_claim_conf);
+ }
+}
+
int main_0126_oauthbearer_oidc(int argc, char **argv) {
rd_kafka_conf_t *conf;
const char *sec;
@@ -521,6 +633,7 @@ int main_0126_oauthbearer_oidc(int argc, char **argv) {
do_test_produce_consumer_with_OIDC_expired_token_should_fail(conf);
do_test_produce_consumer_with_OIDC_jwt_bearer(conf);
do_test_produce_consumer_with_OIDC_metadata_authentication(conf);
+ do_test_produce_consumer_with_OIDC_sub_claim(conf);
rd_kafka_conf_destroy(conf);
diff --git a/tests/requirements.txt b/tests/requirements.txt
index 48198911be..21b2cfeb4c 100644
--- a/tests/requirements.txt
+++ b/tests/requirements.txt
@@ -1,2 +1,2 @@
-trivup/trivup-0.14.0.tar.gz
+trivup/trivup-0.15.0.tar.gz
jsoncomment
diff --git a/tests/trivup/trivup-0.14.0.tar.gz b/tests/trivup/trivup-0.14.0.tar.gz
deleted file mode 100644
index a8965847a1..0000000000
Binary files a/tests/trivup/trivup-0.14.0.tar.gz and /dev/null differ
diff --git a/tests/trivup/trivup-0.15.0.tar.gz b/tests/trivup/trivup-0.15.0.tar.gz
new file mode 100644
index 0000000000..740833069f
Binary files /dev/null and b/tests/trivup/trivup-0.15.0.tar.gz differ