Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
6e1d53c
aws: optimize MSK IAM authentication and credential management
kalavt Nov 26, 2025
09aa64a
aws: optimize MSK IAM authentication and credential management
kalavt Nov 26, 2025
1f9b74a
aws: optimize MSK IAM authentication and credential management
kalavt Nov 26, 2025
ca35c22
fix: initialize AWS provider in sync mode for MSK IAM
kalavt Nov 26, 2025
4c19da4
fix(aws): force credential refresh in provider refresh functions
kalavt Nov 27, 2025
9aa3ebc
Merge branch 'fluent:master' into fix/aws-msk-iam-optimization
kalavt Nov 27, 2025
35bcf13
fix(aws): Minor leak on empty_payload_hex when canonical request buil…
kalavt Nov 27, 2025
d45dab6
aws: optimize MSK IAM authentication and credential management
kalavt Nov 27, 2025
8434f7d
fix(aws): AWS MSK IAM authentication failures caused by stale credent…
kalavt Nov 27, 2025
05ecb6d
aws: optimize MSK IAM authentication and credential management
kalavt Nov 27, 2025
862a4ec
fix(aws): AWS MSK IAM authentication failures on low traffic and Miss…
kalavt Nov 28, 2025
6dde002
fix(aws): Fix potential overflow in md_lifetime_ms on 32‑bit time_t
kalavt Nov 28, 2025
f343778
fix(aws): Fix AWS MSK IAM OAuth Token Expiration on Idle Connections …
kalavt Nov 28, 2025
b34bff6
fix(aws): Fix AWS MSK IAM OAuth Token Expiration on Idle Connections …
kalavt Nov 28, 2025
3bbbde2
fix(aws): Fix AWS MSK IAM OAuth Token Expiration on Idle Connections …
kalavt Nov 28, 2025
8892291
Merge branch 'fluent:master' into fix/aws-msk-iam-optimization
kalavt Nov 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/aws/flb_aws_credentials_ec2.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ int refresh_fn_ec2(struct flb_aws_provider *provider) {
int ret = -1;

flb_debug("[aws_credentials] Refresh called on the EC2 IMDS provider");

/* Force credential refresh by marking as expired */
implementation->next_refresh = 0;

if (try_lock_provider(provider)) {
ret = get_creds_ec2(implementation);
unlock_provider(provider);
Expand Down
3 changes: 1 addition & 2 deletions src/aws/flb_aws_credentials_profile.c
Original file line number Diff line number Diff line change
Expand Up @@ -663,8 +663,7 @@ static int get_shared_credentials(char* credentials_path,

if (flb_read_file(credentials_path, &buf, &size) < 0) {
if (errno == ENOENT) {
AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Shared credentials file %s does not exist",
credentials_path);
AWS_CREDS_DEBUG("Shared credentials file %s does not exist", credentials_path);
} else {
flb_errno();
AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Could not read shared credentials file %s",
Expand Down
7 changes: 7 additions & 0 deletions src/aws/flb_aws_credentials_sts.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ int refresh_fn_sts(struct flb_aws_provider *provider) {
struct flb_aws_provider_sts *implementation = provider->implementation;

flb_debug("[aws_credentials] Refresh called on the STS provider");

/* Force credential refresh by marking as expired */
implementation->next_refresh = 0;

if (try_lock_provider(provider)) {
ret = sts_assume_role_request(implementation->sts_client,
Expand Down Expand Up @@ -480,6 +483,10 @@ int refresh_fn_eks(struct flb_aws_provider *provider) {
struct flb_aws_provider_eks *implementation = provider->implementation;

flb_debug("[aws_credentials] Refresh called on the EKS provider");

/* Force credential refresh by marking as expired */
implementation->next_refresh = 0;

if (try_lock_provider(provider)) {
ret = assume_with_web_identity(implementation);
unlock_provider(provider);
Expand Down
193 changes: 121 additions & 72 deletions src/aws/flb_aws_msk_iam.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <fluent-bit/flb_kafka.h>
#include <fluent-bit/flb_aws_credentials.h>
#include <fluent-bit/aws/flb_aws_msk_iam.h>
#include <fluent-bit/tls/flb_tls.h>

#include <fluent-bit/flb_signv4.h>
#include <rdkafka.h>
Expand All @@ -37,11 +38,13 @@
#include <string.h>
#include <time.h>

/* Lightweight config - NO persistent AWS provider */
/* Lightweight config - provider manages credential caching and refresh internally */
struct flb_aws_msk_iam {
struct flb_config *flb_config; /* For creating AWS provider on-demand */
struct flb_config *flb_config;
flb_sds_t region;
flb_sds_t cluster_arn;
struct flb_tls *cred_tls; /* TLS instance for AWS credentials (STS) */
struct flb_aws_provider *provider; /* AWS credentials provider (created once, reused) */
};

/* Utility functions - same as before */
Expand Down Expand Up @@ -162,12 +165,11 @@ static char *extract_region(const char *arn)
return out;
}

/* Stateless payload generator - creates AWS provider on demand */
/* Payload generator - builds MSK IAM authentication payload */
static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config,
const char *host)
const char *host,
struct flb_aws_credentials *creds)
{
struct flb_aws_provider *temp_provider = NULL;
struct flb_aws_credentials *creds = NULL;
flb_sds_t payload = NULL;
int encode_result;
char *p;
Expand Down Expand Up @@ -214,37 +216,17 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config,
return NULL;
}

flb_info("[aws_msk_iam] build_msk_iam_payload: generating payload for host: %s, region: %s",
host, config->region);

/* Create AWS provider on-demand */
temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL,
config->region, NULL, NULL,
flb_aws_client_generator(),
NULL);
if (!temp_provider) {
flb_error("[aws_msk_iam] build_msk_iam_payload: failed to create AWS credentials provider");
return NULL;
}

if (temp_provider->provider_vtable->init(temp_provider) != 0) {
flb_error("[aws_msk_iam] build_msk_iam_payload: failed to initialize AWS credentials provider");
flb_aws_provider_destroy(temp_provider);
return NULL;
}
flb_debug("[aws_msk_iam] build_msk_iam_payload: generating payload for host: %s, region: %s",
host, config->region);

/* Get credentials */
creds = temp_provider->provider_vtable->get_credentials(temp_provider);
/* Validate credentials */
if (!creds) {
flb_error("[aws_msk_iam] build_msk_iam_payload: failed to get credentials");
flb_aws_provider_destroy(temp_provider);
flb_error("[aws_msk_iam] build_msk_iam_payload: credentials are NULL");
return NULL;
}

if (!creds->access_key_id || !creds->secret_access_key) {
flb_error("[aws_msk_iam] build_msk_iam_payload: incomplete credentials");
flb_aws_credentials_destroy(creds);
flb_aws_provider_destroy(temp_provider);
return NULL;
}

Expand Down Expand Up @@ -547,12 +529,6 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config,
if (session_token_enc) {
flb_sds_destroy(session_token_enc);
}
if (creds) {
flb_aws_credentials_destroy(creds);
}
if (temp_provider) {
flb_aws_provider_destroy(temp_provider);
}

return payload;

Expand Down Expand Up @@ -594,18 +570,15 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config,
if (session_token_enc) {
flb_sds_destroy(session_token_enc);
}
if (creds) {
flb_aws_credentials_destroy(creds);
}
if (temp_provider) {
flb_aws_provider_destroy(temp_provider);
if (empty_payload_hex) {
flb_sds_destroy(empty_payload_hex);
}

return NULL;
}


/* Stateless callback - creates AWS provider on-demand for each refresh */
/* OAuth token refresh callback with credential caching */
static void oauthbearer_token_refresh_cb(rd_kafka_t *rk,
const char *oauthbearer_config,
void *opaque)
Expand All @@ -622,7 +595,6 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk,
struct flb_aws_msk_iam *config;
struct flb_aws_credentials *creds = NULL;
struct flb_kafka_opaque *kafka_opaque;
struct flb_aws_provider *temp_provider = NULL;
(void) oauthbearer_config;

kafka_opaque = (struct flb_kafka_opaque *) opaque;
Expand All @@ -644,57 +616,76 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk,
return;
}

/* Determine host endpoint */
/*
* Use MSK generic endpoint for IAM authentication.
* AWS MSK IAM supports both cluster-specific and generic regional endpoints.
* Generic endpoints are recommended as they work across all brokers in the region.
*/
if (config->cluster_arn) {
arn_len = strlen(config->cluster_arn);
suffix_len = strlen(s3_suffix);
if (arn_len >= suffix_len && strcmp(config->cluster_arn + arn_len - suffix_len, s3_suffix) == 0) {
snprintf(host, sizeof(host), "kafka-serverless.%s.amazonaws.com", config->region);
flb_info("[aws_msk_iam] MSK Serverless cluster, using generic endpoint: %s", host);
flb_debug("[aws_msk_iam] using MSK Serverless generic endpoint: %s", host);
}
else {
snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region);
flb_info("[aws_msk_iam] Regular MSK cluster, using generic endpoint: %s", host);
flb_debug("[aws_msk_iam] using MSK generic endpoint: %s", host);
}
}
else {
snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region);
flb_info("[aws_msk_iam] Regular MSK cluster, using generic endpoint: %s", host);
flb_debug("[aws_msk_iam] using MSK generic endpoint: %s", host);
}

flb_info("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s", config->region, host);
flb_debug("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s", config->region, host);

/* Generate payload using stateless function - creates and destroys AWS provider internally */
payload = build_msk_iam_payload(config, host);
/*
* Refresh credentials before generating OAuth token.
* This is necessary because provider's passive refresh only triggers when
* get_credentials is called and detects expiration. However, OAuth tokens
* are refreshed every ~15 minutes while IAM credentials expire after ~1 hour.
* If OAuth callbacks are spaced far apart, the passive refresh may not trigger
* before credentials expire, causing authentication failures.
*/
int rc = config->provider->provider_vtable->refresh(config->provider);
if (rc < 0) {
flb_warn("[aws_msk_iam] AWS provider refresh() failed (rc=%d), continuing to get_credentials()", rc);
}

/* Get credentials from provider */
creds = config->provider->provider_vtable->get_credentials(config->provider);
if (!creds) {
flb_error("[aws_msk_iam] failed to get AWS credentials from provider");
rd_kafka_oauthbearer_set_token_failure(rk, "credential retrieval failed");
return;
}

/* Generate payload using credentials from provider */
payload = build_msk_iam_payload(config, host, creds);
if (!payload) {
flb_error("[aws_msk_iam] failed to generate MSK IAM payload");
flb_aws_credentials_destroy(creds);
rd_kafka_oauthbearer_set_token_failure(rk, "payload generation failed");
return;
}

/* Get credentials for principal (create temporary provider just for this) */
temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL,
config->region, NULL, NULL,
flb_aws_client_generator(),
NULL);
if (temp_provider) {
if (temp_provider->provider_vtable->init(temp_provider) == 0) {
creds = temp_provider->provider_vtable->get_credentials(temp_provider);
}
}

now = time(NULL);
md_lifetime_ms = (now + 900) * 1000;

err = rd_kafka_oauthbearer_set_token(rk,
payload,
md_lifetime_ms,
creds ? creds->access_key_id : "unknown",
creds->access_key_id,
NULL,
0,
errstr,
sizeof(errstr));

/* Destroy credentials immediately after use (standard pattern) */
flb_aws_credentials_destroy(creds);
creds = NULL;

if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
flb_error("[aws_msk_iam] failed to set OAuth bearer token: %s", errstr);
rd_kafka_oauthbearer_set_token_failure(rk, errstr);
Expand All @@ -703,14 +694,7 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk,
flb_info("[aws_msk_iam] OAuth bearer token successfully set");
}

/* Clean up everything immediately - no memory leaks possible! */
if (creds) {
flb_aws_credentials_destroy(creds);
}
if (temp_provider) {
flb_aws_provider_destroy(temp_provider);
}

/* Clean up - payload only (creds already destroyed) */
if (payload) {
flb_sds_destroy(payload);
}
Expand Down Expand Up @@ -771,6 +755,59 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con

flb_info("[aws_msk_iam] extracted region: %s", ctx->region);

/* Create TLS instance for AWS credentials (STS) - CRITICAL FIX */
ctx->cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
FLB_TRUE,
FLB_LOG_DEBUG,
NULL, /* vhost */
NULL, /* ca_path */
NULL, /* ca_file */
NULL, /* crt_file */
NULL, /* key_file */
NULL); /* key_passwd */
if (!ctx->cred_tls) {
flb_error("[aws_msk_iam] failed to create TLS instance for AWS credentials");
flb_sds_destroy(ctx->region);
flb_sds_destroy(ctx->cluster_arn);
flb_free(ctx);
return NULL;
}

flb_info("[aws_msk_iam] TLS instance created for AWS credentials");

/* Create AWS provider once - will be reused for credential refresh */
ctx->provider = flb_standard_chain_provider_create(config,
ctx->cred_tls,
ctx->region,
NULL, /* sts_endpoint */
NULL, /* proxy */
flb_aws_client_generator(),
NULL); /* profile */
if (!ctx->provider) {
flb_error("[aws_msk_iam] failed to create AWS credentials provider");
flb_tls_destroy(ctx->cred_tls);
flb_sds_destroy(ctx->region);
flb_sds_destroy(ctx->cluster_arn);
flb_free(ctx);
return NULL;
}

/* Initialize provider in sync mode (required before event loop is available) */
ctx->provider->provider_vtable->sync(ctx->provider);
if (ctx->provider->provider_vtable->init(ctx->provider) != 0) {
flb_error("[aws_msk_iam] failed to initialize AWS credentials provider");
flb_aws_provider_destroy(ctx->provider);
flb_tls_destroy(ctx->cred_tls);
flb_sds_destroy(ctx->region);
flb_sds_destroy(ctx->cluster_arn);
flb_free(ctx);
return NULL;
}
/* Switch back to async mode */
ctx->provider->provider_vtable->async(ctx->provider);

flb_info("[aws_msk_iam] AWS credentials provider created and initialized successfully");

/* Set the callback and opaque */
rd_kafka_conf_set_oauthbearer_token_refresh_cb(kconf, oauthbearer_token_refresh_cb);
flb_kafka_opaque_set(opaque, NULL, ctx);
Expand All @@ -781,7 +818,7 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con
return ctx;
}

/* Simple destroy - just config cleanup, no AWS provider to leak! */
/* Destroy MSK IAM config */
void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx)
{
if (!ctx) {
Expand All @@ -790,12 +827,24 @@ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx)

flb_info("[aws_msk_iam] destroying MSK IAM config");

/* NO AWS provider to destroy! */
/* Destroy AWS provider (provider manages its own credential caching) */
if (ctx->provider) {
flb_aws_provider_destroy(ctx->provider);
}

/* Clean up TLS instance - caller owns TLS lifecycle with flb_standard_chain_provider_create */
if (ctx->cred_tls) {
flb_tls_destroy(ctx->cred_tls);
}

/* Clean up other resources */
if (ctx->region) {
flb_sds_destroy(ctx->region);
}
if (ctx->cluster_arn) {
flb_sds_destroy(ctx->cluster_arn);
}
flb_free(ctx);

flb_info("[aws_msk_iam] MSK IAM config destroyed");
}