Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
fa126b4
Add documentation for new sub claim config
prashah-confluent Mar 2, 2026
9367a9b
Add new config in rdkafka_conf
prashah-confluent Mar 2, 2026
442bf87
Modify logic based on subclaim name value
prashah-confluent Mar 2, 2026
47588bd
Add unit tests
prashah-confluent Mar 2, 2026
16b6a39
Fix style format errors for the modified files
prashah-confluent Mar 2, 2026
d9b9195
Add integration tests for sub claim name
prashah-confluent Mar 3, 2026
0172920
Fix style check for 0126-oauthbearer_oidc
prashah-confluent Mar 3, 2026
f3c70a2
Add new trivup version 0.15.0 and update the dependency in requiremen…
prashah-confluent Mar 4, 2026
c3dd826
Remove redundant comments
prashah-confluent Mar 4, 2026
6b53ca4
Update Change log
prashah-confluent Mar 6, 2026
d00ec20
Remove the link for PR as GH adds it automatically
prashah-confluent Mar 12, 2026
6b36a83
Add validation for subclaim string configuration
prashah-confluent Mar 12, 2026
583981b
Remove tests as preconditions will already be validated before
prashah-confluent Mar 12, 2026
4d3c24d
Function should fail agnostic of reason. Corrected the comment
prashah-confluent Mar 12, 2026
3c3b9bf
Modify integration test to fail at configuration finalization
prashah-confluent Mar 12, 2026
92e16bd
Fix comment for configuration.md consistency
prashah-confluent Mar 13, 2026
e11f97c
Add back the default value for sub_claim_name configuration
prashah-confluent Mar 13, 2026
2b16b0c
Rebuilt CONFIGURATION.md and fix style check
prashah-confluent Mar 13, 2026
432fca4
Fix style check
prashah-confluent Mar 13, 2026
2d1d8c3
Add unit tests for rd_kafka_conf_validate_str
prashah-confluent Mar 13, 2026
3822efb
Fix style check
prashah-confluent Mar 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# librdkafka v2.14.0

librdkafka v2.14.0 is a feature release:

* [KIP-768](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575#KIP768:ExtendSASL/OAUTHBEARERwithSupportforOIDC-ClientConfiguration) Extend SASL/OAUTHBEARER to support OIDC claim mapping beyond the default `sub` claim (#5336).

# librdkafka v2.13.2

librdkafka v2.13.2 is a maintenance release:
Expand Down
1 change: 1 addition & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ sasl.oauthbearer.client.secret | * | |
sasl.oauthbearer.scope | * | | | low | Client use this to specify the scope of the access request to the broker. Only used when `sasl.oauthbearer.method` is set to "oidc". <br>*Type: string*
sasl.oauthbearer.extensions | * | | | low | Allow additional information to be provided to the broker. Comma-separated list of key=value pairs. E.g., "supportFeatureX=true,organizationId=sales-emea".Only used when `sasl.oauthbearer.method` is set to "oidc". <br>*Type: string*
sasl.oauthbearer.token.endpoint.url | * | | | low | OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token. Only used when `sasl.oauthbearer.method` is set to "oidc". <br>*Type: string*
sasl.oauthbearer.sub.claim.name | * | | sub | low | JWT claim name to use as the subject (principal) when validating OIDC access tokens. Must be present in the JWT payload with a non-empty value. Should match the broker's `sasl.oauthbearer.sub.claim.name` configuration for consistent authentication. Only used when `sasl.oauthbearer.method` is set to "oidc". <br>*Type: string*
sasl.oauthbearer.grant.type | * | client_credentials, urn:ietf:params:oauth:grant-type:jwt-bearer | client_credentials | low | OAuth grant type to use when communicating with the identity provider. <br>*Type: enum value*
sasl.oauthbearer.assertion.algorithm | * | RS256, ES256 | RS256 | low | Algorithm the client should use to sign the assertion sent to the identity provider and in the OAuth alg header in the JWT assertion. <br>*Type: enum value*
sasl.oauthbearer.assertion.private.key.file | * | | | low | Path to client's private key (PEM) used for authentication when using the JWT assertion. <br>*Type: string*
Expand Down
43 changes: 43 additions & 0 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,20 @@ rd_kafka_conf_validate_partitioner(const struct rd_kafka_property *prop,
!strcmp(val, "fnv1a_random");
}

/**
* @brief Validate that a string is non-null, non-empty, and not
* whitespace-only.
*/
static rd_bool_t rd_kafka_conf_validate_str(const char *value) {
const char *p;
if (!value || !*value)
return rd_false;
for (p = value; *p; p++) {
if (!isspace((int)*p))
return rd_true;
}
return rd_false;
}

/**
* librdkafka configuration property definitions.
Expand Down Expand Up @@ -1121,6 +1135,15 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token. "
"Only used when `sasl.oauthbearer.method` is set to \"oidc\".",
_UNSUPPORTED_OIDC},
{_RK_GLOBAL, "sasl.oauthbearer.sub.claim.name", _RK_C_STR,
_RK(sasl.oauthbearer.sub_claim_name),
"JWT claim name to use as the subject (principal) when validating "
"OIDC access tokens. Must be present in the JWT payload with a "
"non-empty value. Should match the broker's "
"`sasl.oauthbearer.sub.claim.name` configuration for consistent "
"authentication. "
"Only used when `sasl.oauthbearer.method` is set to \"oidc\".",
.sdef = "sub", _UNSUPPORTED_OIDC},
{
_RK_GLOBAL,
"sasl.oauthbearer.grant.type",
Expand Down Expand Up @@ -4133,6 +4156,13 @@ const char *rd_kafka_conf_finalize_oauthbearer_oidc(rd_kafka_conf_t *conf) {
conf->enabled_events |= RD_KAFKA_EVENT_BACKGROUND;
conf->sasl.enable_callback_queue = 1;
}

if (rd_kafka_conf_is_modified(conf,
"sasl.oauthbearer.sub.claim.name") &&
!rd_kafka_conf_validate_str(conf->sasl.oauthbearer.sub_claim_name))
return "`sasl.oauthbearer.sub.claim.name` must be "
"non-empty and not contain only whitespace";

return NULL;
}

Expand Down Expand Up @@ -4879,6 +4909,19 @@ int unittest_conf(void) {

rd_kafka_conf_destroy(conf);

/* Verify rd_kafka_conf_validate_str */
RD_UT_ASSERT(!rd_kafka_conf_validate_str(NULL), "NULL must be invalid");
RD_UT_ASSERT(!rd_kafka_conf_validate_str(""),
"empty string must be invalid");
RD_UT_ASSERT(!rd_kafka_conf_validate_str(" "),
"whitespace-only string must be invalid");
RD_UT_ASSERT(!rd_kafka_conf_validate_str("\t\n"),
"tab/newline-only string must be invalid");
RD_UT_ASSERT(rd_kafka_conf_validate_str("sub"),
"\"sub\" must be valid");
RD_UT_ASSERT(rd_kafka_conf_validate_str(" sub "),
"\" sub \" must be valid");

RD_UT_PASS();
}

Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ struct rd_kafka_conf_s {


char *extensions_str;
char *sub_claim_name;
rd_bool_t builtin_token_refresh_cb;
/* SASL/OAUTHBEARER token refresh event callback */
void (*token_refresh_cb)(rd_kafka_t *rk,
Expand Down
166 changes: 151 additions & 15 deletions src/rdkafka_sasl_oauthbearer_oidc.c
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ static char *rd_kafka_oidc_assertion_read_from_file(const char *file_path) {
*/
static char *rd_kafka_oidc_token_try_validate(cJSON *json,
const char *field,
const char *sub_claim_name,
char **sub,
double *exp,
char *errstr,
Expand Down Expand Up @@ -729,19 +730,26 @@ static char *rd_kafka_oidc_token_try_validate(cJSON *json,
goto fail;
}

jwt_sub = cJSON_GetObjectItem(payloads, "sub");
rd_dassert(sub_claim_name && *sub_claim_name);

jwt_sub = cJSON_GetObjectItem(payloads, sub_claim_name);
if (jwt_sub == NULL) {
rd_snprintf(errstr, errstr_size,
"Expected JSON JWT response with "
"\"sub\" field");
"\"%s\" field",
sub_claim_name);
goto fail;
}

*sub = cJSON_GetStringValue(jwt_sub);
if (*sub == NULL) {
if (*sub == NULL || **sub == '\0') {
/* Reset to NULL to prevent a dangling pointer to cJSON
* internal memory after cJSON_Delete(payloads) */
*sub = NULL;
rd_snprintf(errstr, errstr_size,
"Expected JSON JWT response with "
"valid \"sub\" field");
"valid \"%s\" field (non-empty value required)",
sub_claim_name);
goto fail;
}
*sub = rd_strdup(*sub);
Expand Down Expand Up @@ -857,13 +865,14 @@ void rd_kafka_oidc_token_jwt_bearer_refresh_cb(rd_kafka_t *rk,
* This function will try to validate the `access_token` and then the
* `id_token`.
*/
jwt_token = rd_kafka_oidc_token_try_validate(json, "access_token", &sub,
&exp, validate_errstr,
sizeof(validate_errstr));
jwt_token = rd_kafka_oidc_token_try_validate(
json, "access_token", rk->rk_conf.sasl.oauthbearer.sub_claim_name,
&sub, &exp, validate_errstr, sizeof(validate_errstr));
if (!jwt_token)
jwt_token = rd_kafka_oidc_token_try_validate(
json, "id_token", &sub, &exp, validate_errstr,
sizeof(validate_errstr));
json, "id_token",
rk->rk_conf.sasl.oauthbearer.sub_claim_name, &sub, &exp,
validate_errstr, sizeof(validate_errstr));

if (!jwt_token) {
rd_kafka_oauthbearer_set_token_failure(rk, validate_errstr);
Expand Down Expand Up @@ -965,9 +974,9 @@ void rd_kafka_oidc_token_client_credentials_refresh_cb(
goto done;
}

jwt_token = rd_kafka_oidc_token_try_validate(json, "access_token", &sub,
&exp, set_token_errstr,
sizeof(set_token_errstr));
jwt_token = rd_kafka_oidc_token_try_validate(
json, "access_token", rk->rk_conf.sasl.oauthbearer.sub_claim_name,
&sub, &exp, set_token_errstr, sizeof(set_token_errstr));
if (!jwt_token) {
rd_kafka_oauthbearer_set_token_failure(rk, set_token_errstr);
goto done;
Expand Down Expand Up @@ -1083,9 +1092,9 @@ void rd_kafka_oidc_token_metadata_azure_imds_refresh_cb(
goto done;
}

jwt_token = rd_kafka_oidc_token_try_validate(json, "access_token", &sub,
&exp, set_token_errstr,
sizeof(set_token_errstr));
jwt_token = rd_kafka_oidc_token_try_validate(
json, "access_token", rk->rk_conf.sasl.oauthbearer.sub_claim_name,
&sub, &exp, set_token_errstr, sizeof(set_token_errstr));
if (!jwt_token) {
rd_kafka_oauthbearer_set_token_failure(rk, set_token_errstr);
goto done;
Expand Down Expand Up @@ -1312,6 +1321,132 @@ static int ut_sasl_oauthbearer_oidc_post_fields_with_empty_scope(void) {
RD_UT_PASS();
}

/** All JWTs use a fake signature since token_try_validate() only
* decodes and inspects the payload; signature verification is done
* separately by the broker.
*
* JWT payloads (decoded):
* JWT_SUB_ONLY:
* {"exp":9999999999,"iat":1000000000,"sub":"subject"}
*
* JWT_MULTI_CLAIMS:
* {"exp":9999999999,"iat":1000000000,"sub":"subject",
* "client_id":"client_id_123","azp":"azp_123"}
*
* JWT_EMPTY_SUB:
* {"exp":9999999999,"iat":1000000000,"sub":""}
*
* JWT_MISSING_SUB:
* {"exp":9999999999,"iat":1000000000,"client_id":"client_id_123"}
*/
/* payload: {"exp":9999999999,"iat":1000000000,"sub":"subject"} */
#define UT_JWT_SUB_ONLY \
"eyJhbGciOiJSUzI1NiIsImtpZCI6ImFiY2RlZmcifQ" \
"." \
"eyJleHAiOjk5OTk5OTk5OTksImlhdCI6MTAwMDAwMDAwMCwic" \
"3ViIjoic3ViamVjdCJ9" \
"." \
"fakesignature"
/* payload: {"exp":9999999999,"iat":1000000000,"sub":"subject",
* "client_id":"client_id_123","azp":"azp_123"} */
#define UT_JWT_MULTI_CLAIMS \
"eyJhbGciOiJSUzI1NiIsImtpZCI6ImFiY2RlZmcifQ" \
"." \
"eyJleHAiOjk5OTk5OTk5OTksImlhdCI6MTAwMDAwMDAwMCwic3ViIjoic3ViamVj" \
"dCIsImNsaWVudF9pZCI6ImNsaWVudF9pZF8xMjMiLCJhenAiOiJhenBfMTIzIn0" \
"." \
"fakesignature"
/* payload: {"exp":9999999999,"iat":1000000000,"sub":""} */
#define UT_JWT_EMPTY_SUB \
"eyJhbGciOiJSUzI1NiIsImtpZCI6ImFiY2RlZmcifQ" \
"." \
"eyJleHAiOjk5OTk5OTk5OTksImlhdCI6MTAwMDAwMDAwMCwic3ViIjoiIn0" \
"." \
"fakesignature"
/* payload: {"exp":9999999999,"iat":1000000000,"client_id":"client_id_123"} */
#define UT_JWT_MISSING_SUB \
"eyJhbGciOiJSUzI1NiIsImtpZCI6ImFiY2RlZmcifQ" \
"." \
"eyJleHAiOjk5OTk5OTk5OTksImlhdCI6MTAwMDAwMDAwMCwiY2xpZW50X2lkIjoi" \
"Y2xpZW50X2lkXzEyMyJ9" \
"." \
"fakesignature"

/**
* @brief Verifies the extraction logic of the subject from the configured JWT
* claim name, falls back to "sub" when unconfigured, and rejects missing or
* empty claim values.
*/
static int ut_sasl_oauthbearer_oidc_sub_claim_name(void) {

const struct {
const char *test_name;
const char *jwt;
const char *sub_claim_name;
rd_bool_t expect_success;
const char *expected_sub;
} tests[] = {
{"Explicit 'sub' claim name", UT_JWT_SUB_ONLY, "sub", rd_true,
"subject"},
{"Custom 'client_id' claim", UT_JWT_MULTI_CLAIMS, "client_id",
rd_true, "client_id_123"},
{"Custom 'azp' claim", UT_JWT_MULTI_CLAIMS, "azp", rd_true,
"azp_123"},
{"Custom 'client_id' claim succeeds without sub",
UT_JWT_MISSING_SUB, "client_id", rd_true, "client_id_123"},
{"Missing 'sub' claim fails", UT_JWT_MISSING_SUB, "sub", rd_false,
NULL},
{"Empty 'sub' value fails", UT_JWT_EMPTY_SUB, "sub", rd_false,
NULL},
{"Nonexistent claim name fails", UT_JWT_SUB_ONLY, "nonexistent",
rd_false, NULL},
};

unsigned int i;

RD_UT_BEGIN();

for (i = 0; i < RD_ARRAYSIZE(tests); i++) {
char *sub = NULL;
double exp_v = 0;
char errstr[256];
char *result;
cJSON *json;
char access_token_json[2048];

rd_snprintf(access_token_json, sizeof(access_token_json),
"{\"access_token\":\"%s\"}", tests[i].jwt);
json = cJSON_Parse(access_token_json);
RD_UT_ASSERT(json != NULL, "[%s] Failed to build test JSON",
tests[i].test_name);

result = rd_kafka_oidc_token_try_validate(
json, "access_token", tests[i].sub_claim_name, &sub, &exp_v,
errstr, sizeof(errstr));

if (tests[i].expect_success) {
RD_UT_ASSERT(result != NULL,
"[%s] Expected success but got error: %s",
tests[i].test_name, errstr);
RD_UT_ASSERT(sub != NULL, "[%s] Expected non-NULL sub",
tests[i].test_name);
RD_UT_ASSERT(!strcmp(sub, tests[i].expected_sub),
"[%s] Expected sub '%s', got '%s'",
tests[i].test_name, tests[i].expected_sub,
sub);
} else {
RD_UT_ASSERT(result == NULL,
"[%s] Expected failure but got sub '%s'",
tests[i].test_name, sub ? sub : "(null)");
}

RD_IF_FREE(sub, rd_free);
cJSON_Delete(json);
}

RD_UT_PASS();
}


/**
* @brief make sure the jwt is able to be extracted from HTTP(S) requests
Expand All @@ -1323,6 +1458,7 @@ int unittest_sasl_oauthbearer_oidc(void) {
fails += ut_sasl_oauthbearer_oidc_with_empty_key();
fails += ut_sasl_oauthbearer_oidc_post_fields();
fails += ut_sasl_oauthbearer_oidc_post_fields_with_empty_scope();
fails += ut_sasl_oauthbearer_oidc_sub_claim_name();
return fails;
}

Expand Down
Loading