From 6e1d53c2a80ad918883521fd36a0aada5a0133fa Mon Sep 17 00:00:00 2001 From: Arbin Date: Wed, 26 Nov 2025 12:31:31 +0800 Subject: [PATCH 01/14] aws: optimize MSK IAM authentication and credential management Signed-off-by: Arbin --- src/aws/flb_aws_msk_iam.c | 342 ++++++++++++++++++++++++++++++-------- 1 file changed, 274 insertions(+), 68 deletions(-) diff --git a/src/aws/flb_aws_msk_iam.c b/src/aws/flb_aws_msk_iam.c index cf8af7d0cc8..42be1466c9c 100644 --- a/src/aws/flb_aws_msk_iam.c +++ b/src/aws/flb_aws_msk_iam.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -37,11 +38,18 @@ #include #include -/* Lightweight config - NO persistent AWS provider */ +/* Lightweight config with credential caching to prevent principal changes */ struct flb_aws_msk_iam { - struct flb_config *flb_config; /* For creating AWS provider on-demand */ + struct flb_config *flb_config; flb_sds_t region; flb_sds_t cluster_arn; + struct flb_tls *cred_tls; /* TLS instance for AWS credentials (STS) */ + struct flb_aws_provider *provider; /* AWS credentials provider (created once, reused) */ + + /* Credential caching to maintain consistent principal during re-authentication */ + struct flb_aws_credentials *cached_creds; /* Cached AWS credentials */ + time_t creds_expiration; /* Credential expiration time */ + pthread_mutex_t creds_lock; /* Thread-safe access to cached credentials */ }; /* Utility functions - same as before */ @@ -162,12 +170,153 @@ static char *extract_region(const char *arn) return out; } -/* Stateless payload generator - creates AWS provider on demand */ -static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, - const char *host) +/* + * Duplicate AWS credentials structure + * Returns NULL on failure + */ +static struct flb_aws_credentials* duplicate_credentials(struct flb_aws_credentials *src) { - struct flb_aws_provider *temp_provider = NULL; + struct flb_aws_credentials *dst; + + if (!src) { + return NULL; + } + + dst = flb_calloc(1, sizeof(struct flb_aws_credentials)); + if (!dst) { + return NULL; + } + + if (src->access_key_id) { + dst->access_key_id = flb_sds_create(src->access_key_id); + if (!dst->access_key_id) { + flb_free(dst); + return NULL; + } + } + + if (src->secret_access_key) { + dst->secret_access_key = flb_sds_create(src->secret_access_key); + if (!dst->secret_access_key) { + if (dst->access_key_id) { + flb_sds_destroy(dst->access_key_id); + } + flb_free(dst); + return NULL; + } + } + + if (src->session_token) { + dst->session_token = flb_sds_create(src->session_token); + if (!dst->session_token) { + if (dst->access_key_id) { + flb_sds_destroy(dst->access_key_id); + } + if (dst->secret_access_key) { + flb_sds_destroy(dst->secret_access_key); + } + flb_free(dst); + return NULL; + } + } + + return dst; +} + +/* + * Get cached credentials or refresh if expired + * This function ensures the same AWS temporary credentials (with the same session ID) + * are reused across multiple token refreshes, preventing "principal change" errors. + * + * Returns a COPY of credentials that the caller must destroy. + * Returns NULL on failure. + */ +static struct flb_aws_credentials* get_cached_or_refresh_credentials( + struct flb_aws_msk_iam *config, time_t *expiration) +{ + time_t now; struct flb_aws_credentials *creds = NULL; + struct flb_aws_credentials *creds_copy = NULL; + int needs_refresh = FLB_FALSE; + + now = time(NULL); + + pthread_mutex_lock(&config->creds_lock); + + /* Check if cached credentials are still valid */ + if (config->cached_creds && + config->creds_expiration > now + FLB_AWS_REFRESH_WINDOW) { + /* Credentials are still valid, return a copy */ + creds_copy = duplicate_credentials(config->cached_creds); + if (expiration) { + *expiration = config->creds_expiration; + } + pthread_mutex_unlock(&config->creds_lock); + + if (creds_copy) { + flb_info("[aws_msk_iam] reusing cached AWS credentials (valid until %ld, %ld seconds remaining)", + config->creds_expiration, config->creds_expiration - now); + } + return creds_copy; + } + + needs_refresh = FLB_TRUE; + pthread_mutex_unlock(&config->creds_lock); + + /* Credentials expired or don't exist, need to refresh */ + if (needs_refresh) { + flb_info("[aws_msk_iam] AWS credentials expired or not cached, fetching new credentials"); + + /* Get new credentials using the long-lived provider */ + creds = config->provider->provider_vtable->get_credentials(config->provider); + if (!creds) { + flb_error("[aws_msk_iam] failed to get AWS credentials from provider"); + return NULL; + } + + /* Update cache with new credentials */ + pthread_mutex_lock(&config->creds_lock); + + if (config->cached_creds) { + flb_aws_credentials_destroy(config->cached_creds); + config->cached_creds = NULL; + } + + config->cached_creds = duplicate_credentials(creds); + if (!config->cached_creds) { + pthread_mutex_unlock(&config->creds_lock); + flb_error("[aws_msk_iam] failed to cache credentials"); + flb_aws_credentials_destroy(creds); + return NULL; + } + + /* + * Set expiration time. AWS temporary credentials typically last 1 hour. + * We use a conservative estimate if we can't determine the exact expiration. + */ + config->creds_expiration = now + 3600; /* Default: 1 hour */ + + if (expiration) { + *expiration = config->creds_expiration; + } + + pthread_mutex_unlock(&config->creds_lock); + + flb_info("[aws_msk_iam] successfully cached new AWS credentials (valid until %ld, %ld seconds remaining)", + config->creds_expiration, config->creds_expiration - now); + + /* Return the credentials (caller owns them) */ + return creds; + } + + return NULL; +} + +/* Payload generator using cached credentials to maintain consistent principal */ +static flb_sds_t build_msk_iam_payload_with_creds(struct flb_aws_msk_iam *config, + const char *host, + struct flb_aws_credentials *creds) +{ flb_sds_t payload = NULL; int encode_result; char *p; @@ -214,37 +363,17 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, return NULL; } - flb_info("[aws_msk_iam] build_msk_iam_payload: generating payload for host: %s, region: %s", + flb_info("[aws_msk_iam] build_msk_iam_payload_with_creds: generating payload for host: %s, region: %s", host, config->region); - /* Create AWS provider on-demand */ - temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL, - config->region, NULL, NULL, - flb_aws_client_generator(), - NULL); - if (!temp_provider) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to create AWS credentials provider"); - return NULL; - } - - if (temp_provider->provider_vtable->init(temp_provider) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to initialize AWS credentials provider"); - flb_aws_provider_destroy(temp_provider); - return NULL; - } - - /* Get credentials */ - creds = temp_provider->provider_vtable->get_credentials(temp_provider); + /* Validate credentials */ if (!creds) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to get credentials"); - flb_aws_provider_destroy(temp_provider); + flb_error("[aws_msk_iam] build_msk_iam_payload_with_creds: credentials are NULL"); return NULL; } if (!creds->access_key_id || !creds->secret_access_key) { - flb_error("[aws_msk_iam] build_msk_iam_payload: incomplete credentials"); - flb_aws_credentials_destroy(creds); - flb_aws_provider_destroy(temp_provider); + flb_error("[aws_msk_iam] build_msk_iam_payload_with_creds: incomplete credentials"); return NULL; } @@ -547,12 +676,6 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, if (session_token_enc) { flb_sds_destroy(session_token_enc); } - if (creds) { - flb_aws_credentials_destroy(creds); - } - if (temp_provider) { - flb_aws_provider_destroy(temp_provider); - } return payload; @@ -594,18 +717,12 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, if (session_token_enc) { flb_sds_destroy(session_token_enc); } - if (creds) { - flb_aws_credentials_destroy(creds); - } - if (temp_provider) { - flb_aws_provider_destroy(temp_provider); - } return NULL; } -/* Stateless callback - creates AWS provider on-demand for each refresh */ +/* OAuth token refresh callback with credential caching */ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, const char *oauthbearer_config, void *opaque) @@ -622,7 +739,6 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, struct flb_aws_msk_iam *config; struct flb_aws_credentials *creds = NULL; struct flb_kafka_opaque *kafka_opaque; - struct flb_aws_provider *temp_provider = NULL; (void) oauthbearer_config; kafka_opaque = (struct flb_kafka_opaque *) opaque; @@ -644,45 +760,50 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, return; } - /* Determine host endpoint */ + /* + * Use MSK generic endpoint for IAM authentication. + * AWS MSK IAM supports both cluster-specific and generic regional endpoints. + * Generic endpoints are recommended as they work across all brokers in the region. + */ if (config->cluster_arn) { arn_len = strlen(config->cluster_arn); suffix_len = strlen(s3_suffix); if (arn_len >= suffix_len && strcmp(config->cluster_arn + arn_len - suffix_len, s3_suffix) == 0) { snprintf(host, sizeof(host), "kafka-serverless.%s.amazonaws.com", config->region); - flb_info("[aws_msk_iam] MSK Serverless cluster, using generic endpoint: %s", host); + flb_debug("[aws_msk_iam] using MSK Serverless generic endpoint: %s", host); } else { snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region); - flb_info("[aws_msk_iam] Regular MSK cluster, using generic endpoint: %s", host); + flb_debug("[aws_msk_iam] using MSK generic endpoint: %s", host); } } else { snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region); - flb_info("[aws_msk_iam] Regular MSK cluster, using generic endpoint: %s", host); + flb_debug("[aws_msk_iam] using MSK generic endpoint: %s", host); } flb_info("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s", config->region, host); - /* Generate payload using stateless function - creates and destroys AWS provider internally */ - payload = build_msk_iam_payload(config, host); + /* + * CRITICAL FIX: Use cached credentials to maintain consistent principal + * This prevents "Cannot change principals during re-authentication" errors + */ + creds = get_cached_or_refresh_credentials(config, NULL); + if (!creds) { + flb_error("[aws_msk_iam] failed to get AWS credentials (cached or refreshed)"); + rd_kafka_oauthbearer_set_token_failure(rk, "credential retrieval failed"); + return; + } + + /* Generate payload using cached credentials */ + payload = build_msk_iam_payload_with_creds(config, host, creds); if (!payload) { flb_error("[aws_msk_iam] failed to generate MSK IAM payload"); + flb_aws_credentials_destroy(creds); rd_kafka_oauthbearer_set_token_failure(rk, "payload generation failed"); return; } - /* Get credentials for principal (create temporary provider just for this) */ - temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL, - config->region, NULL, NULL, - flb_aws_client_generator(), - NULL); - if (temp_provider) { - if (temp_provider->provider_vtable->init(temp_provider) == 0) { - creds = temp_provider->provider_vtable->get_credentials(temp_provider); - } - } - now = time(NULL); md_lifetime_ms = (now + 900) * 1000; @@ -703,14 +824,10 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, flb_info("[aws_msk_iam] OAuth bearer token successfully set"); } - /* Clean up everything immediately - no memory leaks possible! */ + /* Clean up - credentials and payload */ if (creds) { flb_aws_credentials_destroy(creds); } - if (temp_provider) { - flb_aws_provider_destroy(temp_provider); - } - if (payload) { flb_sds_destroy(payload); } @@ -771,6 +888,74 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con flb_info("[aws_msk_iam] extracted region: %s", ctx->region); + /* Create TLS instance for AWS credentials (STS) - CRITICAL FIX */ + ctx->cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + FLB_LOG_DEBUG, + NULL, /* vhost */ + NULL, /* ca_path */ + NULL, /* ca_file */ + NULL, /* crt_file */ + NULL, /* key_file */ + NULL); /* key_passwd */ + if (!ctx->cred_tls) { + flb_error("[aws_msk_iam] failed to create TLS instance for AWS credentials"); + flb_sds_destroy(ctx->region); + flb_sds_destroy(ctx->cluster_arn); + flb_free(ctx); + return NULL; + } + + flb_info("[aws_msk_iam] TLS instance created for AWS credentials"); + + /* Initialize credential caching fields */ + ctx->cached_creds = NULL; + ctx->creds_expiration = 0; + + /* Initialize mutex for thread-safe credential access */ + if (pthread_mutex_init(&ctx->creds_lock, NULL) != 0) { + flb_error("[aws_msk_iam] failed to initialize credentials mutex"); + flb_tls_destroy(ctx->cred_tls); + flb_sds_destroy(ctx->region); + flb_sds_destroy(ctx->cluster_arn); + flb_free(ctx); + return NULL; + } + + flb_info("[aws_msk_iam] Credential cache initialized with mutex protection"); + + /* Create AWS provider once - will be reused for credential refresh */ + ctx->provider = flb_standard_chain_provider_create(config, + ctx->cred_tls, + ctx->region, + NULL, /* sts_endpoint */ + NULL, /* proxy */ + flb_aws_client_generator(), + NULL); /* profile */ + if (!ctx->provider) { + flb_error("[aws_msk_iam] failed to create AWS credentials provider"); + pthread_mutex_destroy(&ctx->creds_lock); + flb_tls_destroy(ctx->cred_tls); + flb_sds_destroy(ctx->region); + flb_sds_destroy(ctx->cluster_arn); + flb_free(ctx); + return NULL; + } + + /* Initialize provider */ + if (ctx->provider->provider_vtable->init(ctx->provider) != 0) { + flb_error("[aws_msk_iam] failed to initialize AWS credentials provider"); + flb_aws_provider_destroy(ctx->provider); + pthread_mutex_destroy(&ctx->creds_lock); + flb_tls_destroy(ctx->cred_tls); + flb_sds_destroy(ctx->region); + flb_sds_destroy(ctx->cluster_arn); + flb_free(ctx); + return NULL; + } + + flb_info("[aws_msk_iam] AWS credentials provider created and initialized successfully"); + /* Set the callback and opaque */ rd_kafka_conf_set_oauthbearer_token_refresh_cb(kconf, oauthbearer_token_refresh_cb); flb_kafka_opaque_set(opaque, NULL, ctx); @@ -781,7 +966,7 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con return ctx; } -/* Simple destroy - just config cleanup, no AWS provider to leak! */ +/* Destroy MSK IAM config - includes cached credentials cleanup */ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx) { if (!ctx) { @@ -790,7 +975,26 @@ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx) flb_info("[aws_msk_iam] destroying MSK IAM config"); - /* NO AWS provider to destroy! */ + /* Clean up cached credentials */ + if (ctx->cached_creds) { + flb_aws_credentials_destroy(ctx->cached_creds); + ctx->cached_creds = NULL; + } + + /* Destroy AWS provider */ + if (ctx->provider) { + flb_aws_provider_destroy(ctx->provider); + } + + /* Destroy mutex */ + pthread_mutex_destroy(&ctx->creds_lock); + + /* Clean up TLS instance */ + if (ctx->cred_tls) { + flb_tls_destroy(ctx->cred_tls); + } + + /* Clean up other resources */ if (ctx->region) { flb_sds_destroy(ctx->region); } @@ -798,4 +1002,6 @@ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx) flb_sds_destroy(ctx->cluster_arn); } flb_free(ctx); + + flb_info("[aws_msk_iam] MSK IAM config destroyed, cached credentials and provider cleared"); } From 09aa64ac05363e5811d5283a577d5dc5e88912b2 Mon Sep 17 00:00:00 2001 From: Arbin Date: Wed, 26 Nov 2025 06:28:16 +0000 Subject: [PATCH 02/14] aws: optimize MSK IAM authentication and credential management Signed-off-by: Arbin --- src/aws/flb_aws_msk_iam.c | 198 +++----------------------------------- 1 file changed, 12 insertions(+), 186 deletions(-) diff --git a/src/aws/flb_aws_msk_iam.c b/src/aws/flb_aws_msk_iam.c index 42be1466c9c..fb2bfec0931 100644 --- a/src/aws/flb_aws_msk_iam.c +++ b/src/aws/flb_aws_msk_iam.c @@ -38,18 +38,13 @@ #include #include -/* Lightweight config with credential caching to prevent principal changes */ +/* Lightweight config - provider manages credential caching and refresh internally */ struct flb_aws_msk_iam { struct flb_config *flb_config; flb_sds_t region; flb_sds_t cluster_arn; struct flb_tls *cred_tls; /* TLS instance for AWS credentials (STS) */ struct flb_aws_provider *provider; /* AWS credentials provider (created once, reused) */ - - /* Credential caching to maintain consistent principal during re-authentication */ - struct flb_aws_credentials *cached_creds; /* Cached AWS credentials */ - time_t creds_expiration; /* Credential expiration time */ - pthread_mutex_t creds_lock; /* Thread-safe access to cached credentials */ }; /* Utility functions - same as before */ @@ -170,150 +165,8 @@ static char *extract_region(const char *arn) return out; } -/* - * Duplicate AWS credentials structure - * Returns NULL on failure - */ -static struct flb_aws_credentials* duplicate_credentials(struct flb_aws_credentials *src) -{ - struct flb_aws_credentials *dst; - - if (!src) { - return NULL; - } - - dst = flb_calloc(1, sizeof(struct flb_aws_credentials)); - if (!dst) { - return NULL; - } - - if (src->access_key_id) { - dst->access_key_id = flb_sds_create(src->access_key_id); - if (!dst->access_key_id) { - flb_free(dst); - return NULL; - } - } - - if (src->secret_access_key) { - dst->secret_access_key = flb_sds_create(src->secret_access_key); - if (!dst->secret_access_key) { - if (dst->access_key_id) { - flb_sds_destroy(dst->access_key_id); - } - flb_free(dst); - return NULL; - } - } - - if (src->session_token) { - dst->session_token = flb_sds_create(src->session_token); - if (!dst->session_token) { - if (dst->access_key_id) { - flb_sds_destroy(dst->access_key_id); - } - if (dst->secret_access_key) { - flb_sds_destroy(dst->secret_access_key); - } - flb_free(dst); - return NULL; - } - } - - return dst; -} - -/* - * Get cached credentials or refresh if expired - * This function ensures the same AWS temporary credentials (with the same session ID) - * are reused across multiple token refreshes, preventing "principal change" errors. - * - * Returns a COPY of credentials that the caller must destroy. - * Returns NULL on failure. - */ -static struct flb_aws_credentials* get_cached_or_refresh_credentials( - struct flb_aws_msk_iam *config, time_t *expiration) -{ - time_t now; - struct flb_aws_credentials *creds = NULL; - struct flb_aws_credentials *creds_copy = NULL; - int needs_refresh = FLB_FALSE; - - now = time(NULL); - - pthread_mutex_lock(&config->creds_lock); - - /* Check if cached credentials are still valid */ - if (config->cached_creds && - config->creds_expiration > now + FLB_AWS_REFRESH_WINDOW) { - /* Credentials are still valid, return a copy */ - creds_copy = duplicate_credentials(config->cached_creds); - if (expiration) { - *expiration = config->creds_expiration; - } - pthread_mutex_unlock(&config->creds_lock); - - if (creds_copy) { - flb_info("[aws_msk_iam] reusing cached AWS credentials (valid until %ld, %ld seconds remaining)", - config->creds_expiration, config->creds_expiration - now); - } - return creds_copy; - } - - needs_refresh = FLB_TRUE; - pthread_mutex_unlock(&config->creds_lock); - - /* Credentials expired or don't exist, need to refresh */ - if (needs_refresh) { - flb_info("[aws_msk_iam] AWS credentials expired or not cached, fetching new credentials"); - - /* Get new credentials using the long-lived provider */ - creds = config->provider->provider_vtable->get_credentials(config->provider); - if (!creds) { - flb_error("[aws_msk_iam] failed to get AWS credentials from provider"); - return NULL; - } - - /* Update cache with new credentials */ - pthread_mutex_lock(&config->creds_lock); - - if (config->cached_creds) { - flb_aws_credentials_destroy(config->cached_creds); - config->cached_creds = NULL; - } - - config->cached_creds = duplicate_credentials(creds); - if (!config->cached_creds) { - pthread_mutex_unlock(&config->creds_lock); - flb_error("[aws_msk_iam] failed to cache credentials"); - flb_aws_credentials_destroy(creds); - return NULL; - } - - /* - * Set expiration time. AWS temporary credentials typically last 1 hour. - * We use a conservative estimate if we can't determine the exact expiration. - */ - config->creds_expiration = now + 3600; /* Default: 1 hour */ - - if (expiration) { - *expiration = config->creds_expiration; - } - - pthread_mutex_unlock(&config->creds_lock); - - flb_info("[aws_msk_iam] successfully cached new AWS credentials (valid until %ld, %ld seconds remaining)", - config->creds_expiration, config->creds_expiration - now); - - /* Return the credentials (caller owns them) */ - return creds; - } - - return NULL; -} - -/* Payload generator using cached credentials to maintain consistent principal */ -static flb_sds_t build_msk_iam_payload_with_creds(struct flb_aws_msk_iam *config, +/* Payload generator - builds MSK IAM authentication payload */ +static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, const char *host, struct flb_aws_credentials *creds) { @@ -785,18 +638,18 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, flb_info("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s", config->region, host); /* - * CRITICAL FIX: Use cached credentials to maintain consistent principal - * This prevents "Cannot change principals during re-authentication" errors + * Get credentials from provider. The provider handles caching and expiration internally. + * The provider automatically manages credential refresh when needed. */ - creds = get_cached_or_refresh_credentials(config, NULL); + creds = config->provider->provider_vtable->get_credentials(config->provider); if (!creds) { - flb_error("[aws_msk_iam] failed to get AWS credentials (cached or refreshed)"); + flb_error("[aws_msk_iam] failed to get AWS credentials from provider"); rd_kafka_oauthbearer_set_token_failure(rk, "credential retrieval failed"); return; } - /* Generate payload using cached credentials */ - payload = build_msk_iam_payload_with_creds(config, host, creds); + /* Generate payload using credentials from provider */ + payload = build_msk_iam_payload(config, host, creds); if (!payload) { flb_error("[aws_msk_iam] failed to generate MSK IAM payload"); flb_aws_credentials_destroy(creds); @@ -908,22 +761,6 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con flb_info("[aws_msk_iam] TLS instance created for AWS credentials"); - /* Initialize credential caching fields */ - ctx->cached_creds = NULL; - ctx->creds_expiration = 0; - - /* Initialize mutex for thread-safe credential access */ - if (pthread_mutex_init(&ctx->creds_lock, NULL) != 0) { - flb_error("[aws_msk_iam] failed to initialize credentials mutex"); - flb_tls_destroy(ctx->cred_tls); - flb_sds_destroy(ctx->region); - flb_sds_destroy(ctx->cluster_arn); - flb_free(ctx); - return NULL; - } - - flb_info("[aws_msk_iam] Credential cache initialized with mutex protection"); - /* Create AWS provider once - will be reused for credential refresh */ ctx->provider = flb_standard_chain_provider_create(config, ctx->cred_tls, @@ -934,7 +771,6 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con NULL); /* profile */ if (!ctx->provider) { flb_error("[aws_msk_iam] failed to create AWS credentials provider"); - pthread_mutex_destroy(&ctx->creds_lock); flb_tls_destroy(ctx->cred_tls); flb_sds_destroy(ctx->region); flb_sds_destroy(ctx->cluster_arn); @@ -946,7 +782,6 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con if (ctx->provider->provider_vtable->init(ctx->provider) != 0) { flb_error("[aws_msk_iam] failed to initialize AWS credentials provider"); flb_aws_provider_destroy(ctx->provider); - pthread_mutex_destroy(&ctx->creds_lock); flb_tls_destroy(ctx->cred_tls); flb_sds_destroy(ctx->region); flb_sds_destroy(ctx->cluster_arn); @@ -966,7 +801,7 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con return ctx; } -/* Destroy MSK IAM config - includes cached credentials cleanup */ +/* Destroy MSK IAM config */ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx) { if (!ctx) { @@ -975,20 +810,11 @@ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx) flb_info("[aws_msk_iam] destroying MSK IAM config"); - /* Clean up cached credentials */ - if (ctx->cached_creds) { - flb_aws_credentials_destroy(ctx->cached_creds); - ctx->cached_creds = NULL; - } - - /* Destroy AWS provider */ + /* Destroy AWS provider (provider manages its own credential caching) */ if (ctx->provider) { flb_aws_provider_destroy(ctx->provider); } - /* Destroy mutex */ - pthread_mutex_destroy(&ctx->creds_lock); - /* Clean up TLS instance */ if (ctx->cred_tls) { flb_tls_destroy(ctx->cred_tls); @@ -1003,5 +829,5 @@ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx) } flb_free(ctx); - flb_info("[aws_msk_iam] MSK IAM config destroyed, cached credentials and provider cleared"); + flb_info("[aws_msk_iam] MSK IAM config destroyed"); } From 1f9b74a15deedf620be3ded571b23b516bb7ee30 Mon Sep 17 00:00:00 2001 From: Arbin Date: Wed, 26 Nov 2025 16:50:55 +0800 Subject: [PATCH 03/14] aws: optimize MSK IAM authentication and credential management Signed-off-by: Arbin --- src/aws/flb_aws_msk_iam.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/aws/flb_aws_msk_iam.c b/src/aws/flb_aws_msk_iam.c index fb2bfec0931..f8db0c2ecee 100644 --- a/src/aws/flb_aws_msk_iam.c +++ b/src/aws/flb_aws_msk_iam.c @@ -663,12 +663,16 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, err = rd_kafka_oauthbearer_set_token(rk, payload, md_lifetime_ms, - creds ? creds->access_key_id : "unknown", + creds->access_key_id, NULL, 0, errstr, sizeof(errstr)); + /* Destroy credentials immediately after use (standard pattern) */ + flb_aws_credentials_destroy(creds); + creds = NULL; + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { flb_error("[aws_msk_iam] failed to set OAuth bearer token: %s", errstr); rd_kafka_oauthbearer_set_token_failure(rk, errstr); @@ -677,10 +681,7 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, flb_info("[aws_msk_iam] OAuth bearer token successfully set"); } - /* Clean up - credentials and payload */ - if (creds) { - flb_aws_credentials_destroy(creds); - } + /* Clean up - payload only (creds already destroyed) */ if (payload) { flb_sds_destroy(payload); } From ca35c2254ffef20e962a97a1d6d441800f5c00c4 Mon Sep 17 00:00:00 2001 From: Arbin Date: Wed, 26 Nov 2025 09:31:09 +0000 Subject: [PATCH 04/14] fix: initialize AWS provider in sync mode for MSK IAM - Switch provider to sync mode before initialization to prevent hanging - Initialize provider with sync mode (required before event loop is available) - Switch back to async mode after successful initialization - Follows pattern used by other AWS credential providers This fixes potential credential initialization failures in IRSA/EKS deployments where HTTP requests during init would hang without the event loop. Signed-off-by: Arbin --- src/aws/flb_aws_msk_iam.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/aws/flb_aws_msk_iam.c b/src/aws/flb_aws_msk_iam.c index f8db0c2ecee..eb4d1cc0b0a 100644 --- a/src/aws/flb_aws_msk_iam.c +++ b/src/aws/flb_aws_msk_iam.c @@ -779,7 +779,8 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con return NULL; } - /* Initialize provider */ + /* Initialize provider in sync mode (required before event loop is available) */ + ctx->provider->provider_vtable->sync(ctx->provider); if (ctx->provider->provider_vtable->init(ctx->provider) != 0) { flb_error("[aws_msk_iam] failed to initialize AWS credentials provider"); flb_aws_provider_destroy(ctx->provider); @@ -789,6 +790,8 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con flb_free(ctx); return NULL; } + /* Switch back to async mode */ + ctx->provider->provider_vtable->async(ctx->provider); flb_info("[aws_msk_iam] AWS credentials provider created and initialized successfully"); From 4c19da4065244bae1291bb015148a0bc5d0bd3ed Mon Sep 17 00:00:00 2001 From: Arbin Date: Thu, 27 Nov 2025 12:20:03 +0800 Subject: [PATCH 05/14] fix(aws): force credential refresh in provider refresh functions - Add force refresh logic to EC2, STS, and EKS credential providers - Set next_refresh to 0 in refresh functions to ensure immediate credential update - Fixes MSK IAM authentication failures after ~1 hour due to stale credentials - Aligns with AWS SDK behavior where refresh() means force refresh This resolves the issue where OAuth token refresh (every ~15 minutes) would not actually refresh AWS credentials until next_refresh time was reached (typically 1 hour later), causing MSK connection failures with 'Access denied' errors. The fix ensures that every OAuth callback will fetch fresh credentials from AWS, matching the behavior of official AWS SDKs (Python, Java). Signed-off-by: Arbin --- src/aws/flb_aws_credentials_ec2.c | 4 ++++ src/aws/flb_aws_credentials_profile.c | 3 +-- src/aws/flb_aws_credentials_sts.c | 7 +++++++ src/aws/flb_aws_msk_iam.c | 21 ++++++++++++++------- 4 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/aws/flb_aws_credentials_ec2.c b/src/aws/flb_aws_credentials_ec2.c index 2722e26d223..e56dc467fbd 100644 --- a/src/aws/flb_aws_credentials_ec2.c +++ b/src/aws/flb_aws_credentials_ec2.c @@ -130,6 +130,10 @@ int refresh_fn_ec2(struct flb_aws_provider *provider) { int ret = -1; flb_debug("[aws_credentials] Refresh called on the EC2 IMDS provider"); + + /* Force credential refresh by marking as expired */ + implementation->next_refresh = 0; + if (try_lock_provider(provider)) { ret = get_creds_ec2(implementation); unlock_provider(provider); diff --git a/src/aws/flb_aws_credentials_profile.c b/src/aws/flb_aws_credentials_profile.c index 48cb9299572..7ad7099ff45 100644 --- a/src/aws/flb_aws_credentials_profile.c +++ b/src/aws/flb_aws_credentials_profile.c @@ -663,8 +663,7 @@ static int get_shared_credentials(char* credentials_path, if (flb_read_file(credentials_path, &buf, &size) < 0) { if (errno == ENOENT) { - AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Shared credentials file %s does not exist", - credentials_path); + AWS_CREDS_DEBUG("Shared credentials file %s does not exist", credentials_path); } else { flb_errno(); AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Could not read shared credentials file %s", diff --git a/src/aws/flb_aws_credentials_sts.c b/src/aws/flb_aws_credentials_sts.c index 554fac20353..5fbac774cf7 100644 --- a/src/aws/flb_aws_credentials_sts.c +++ b/src/aws/flb_aws_credentials_sts.c @@ -175,6 +175,9 @@ int refresh_fn_sts(struct flb_aws_provider *provider) { struct flb_aws_provider_sts *implementation = provider->implementation; flb_debug("[aws_credentials] Refresh called on the STS provider"); + + /* Force credential refresh by marking as expired */ + implementation->next_refresh = 0; if (try_lock_provider(provider)) { ret = sts_assume_role_request(implementation->sts_client, @@ -480,6 +483,10 @@ int refresh_fn_eks(struct flb_aws_provider *provider) { struct flb_aws_provider_eks *implementation = provider->implementation; flb_debug("[aws_credentials] Refresh called on the EKS provider"); + + /* Force credential refresh by marking as expired */ + implementation->next_refresh = 0; + if (try_lock_provider(provider)) { ret = assume_with_web_identity(implementation); unlock_provider(provider); diff --git a/src/aws/flb_aws_msk_iam.c b/src/aws/flb_aws_msk_iam.c index eb4d1cc0b0a..cc65093dacd 100644 --- a/src/aws/flb_aws_msk_iam.c +++ b/src/aws/flb_aws_msk_iam.c @@ -216,17 +216,17 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, return NULL; } - flb_info("[aws_msk_iam] build_msk_iam_payload_with_creds: generating payload for host: %s, region: %s", - host, config->region); + flb_debug("[aws_msk_iam] build_msk_iam_payload: generating payload for host: %s, region: %s", + host, config->region); /* Validate credentials */ if (!creds) { - flb_error("[aws_msk_iam] build_msk_iam_payload_with_creds: credentials are NULL"); + flb_error("[aws_msk_iam] build_msk_iam_payload: credentials are NULL"); return NULL; } if (!creds->access_key_id || !creds->secret_access_key) { - flb_error("[aws_msk_iam] build_msk_iam_payload_with_creds: incomplete credentials"); + flb_error("[aws_msk_iam] build_msk_iam_payload: incomplete credentials"); return NULL; } @@ -635,12 +635,19 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, flb_debug("[aws_msk_iam] using MSK generic endpoint: %s", host); } - flb_info("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s", config->region, host); + flb_debug("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s", config->region, host); /* - * Get credentials from provider. The provider handles caching and expiration internally. - * The provider automatically manages credential refresh when needed. + * Refresh credentials before generating OAuth token. + * This is necessary because provider's passive refresh only triggers when + * get_credentials is called and detects expiration. However, OAuth tokens + * are refreshed every ~15 minutes while IAM credentials expire after ~1 hour. + * If OAuth callbacks are spaced far apart, the passive refresh may not trigger + * before credentials expire, causing authentication failures. */ + config->provider->provider_vtable->refresh(config->provider); + + /* Get credentials from provider */ creds = config->provider->provider_vtable->get_credentials(config->provider); if (!creds) { flb_error("[aws_msk_iam] failed to get AWS credentials from provider"); From 35bcf13bbac2f0991b6ade0aa5f280c5f60601e3 Mon Sep 17 00:00:00 2001 From: Arbin Date: Thu, 27 Nov 2025 12:33:50 +0800 Subject: [PATCH 06/14] fix(aws): Minor leak on empty_payload_hex when canonical request build fails Signed-off-by: Arbin --- src/aws/flb_aws_msk_iam.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/aws/flb_aws_msk_iam.c b/src/aws/flb_aws_msk_iam.c index cc65093dacd..7da72441ab5 100644 --- a/src/aws/flb_aws_msk_iam.c +++ b/src/aws/flb_aws_msk_iam.c @@ -570,6 +570,9 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, if (session_token_enc) { flb_sds_destroy(session_token_enc); } + if (empty_payload_hex) { + flb_sds_destroy(empty_payload_hex); + } return NULL; } From d45dab6419cae802531865c6295e333e4953dd13 Mon Sep 17 00:00:00 2001 From: Arbin Date: Thu, 27 Nov 2025 12:46:00 +0800 Subject: [PATCH 07/14] aws: optimize MSK IAM authentication and credential management Signed-off-by: Arbin --- src/aws/flb_aws_msk_iam.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/aws/flb_aws_msk_iam.c b/src/aws/flb_aws_msk_iam.c index 7da72441ab5..38910bb3515 100644 --- a/src/aws/flb_aws_msk_iam.c +++ b/src/aws/flb_aws_msk_iam.c @@ -648,7 +648,10 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, * If OAuth callbacks are spaced far apart, the passive refresh may not trigger * before credentials expire, causing authentication failures. */ - config->provider->provider_vtable->refresh(config->provider); + int rc = config->provider->provider_vtable->refresh(config->provider); + if (rc < 0) { + flb_warn("[aws_msk_iam] AWS provider refresh() failed (rc=%d), continuing to get_credentials()", rc); + } /* Get credentials from provider */ creds = config->provider->provider_vtable->get_credentials(config->provider); @@ -829,7 +832,7 @@ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx) flb_aws_provider_destroy(ctx->provider); } - /* Clean up TLS instance */ + /* Clean up TLS instance - caller owns TLS lifecycle with flb_standard_chain_provider_create */ if (ctx->cred_tls) { flb_tls_destroy(ctx->cred_tls); } From 8434f7ddc4551f31b750908e7361d40231bb9611 Mon Sep 17 00:00:00 2001 From: Arbin Date: Thu, 27 Nov 2025 13:51:29 +0800 Subject: [PATCH 08/14] fix(aws): AWS MSK IAM authentication failures caused by stale credentials Signed-off-by: Arbin --- src/aws/flb_aws_credentials_ec2.c | 11 ++++++++--- src/aws/flb_aws_credentials_http.c | 8 ++++++++ src/aws/flb_aws_credentials_sts.c | 22 ++++++++++++++++------ 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/src/aws/flb_aws_credentials_ec2.c b/src/aws/flb_aws_credentials_ec2.c index e56dc467fbd..d4ca79befc4 100644 --- a/src/aws/flb_aws_credentials_ec2.c +++ b/src/aws/flb_aws_credentials_ec2.c @@ -131,10 +131,15 @@ int refresh_fn_ec2(struct flb_aws_provider *provider) { flb_debug("[aws_credentials] Refresh called on the EC2 IMDS provider"); - /* Force credential refresh by marking as expired */ - implementation->next_refresh = 0; - if (try_lock_provider(provider)) { + /* Force credential refresh by clearing cache and setting expired time */ + if (implementation->creds) { + flb_aws_credentials_destroy(implementation->creds); + implementation->creds = NULL; + } + /* Set to 1 (epoch start) to trigger immediate refresh via time check */ + implementation->next_refresh = 1; + ret = get_creds_ec2(implementation); unlock_provider(provider); } diff --git a/src/aws/flb_aws_credentials_http.c b/src/aws/flb_aws_credentials_http.c index 8ba78b788fd..b7da7f0d2d9 100644 --- a/src/aws/flb_aws_credentials_http.c +++ b/src/aws/flb_aws_credentials_http.c @@ -158,6 +158,14 @@ int refresh_fn_http(struct flb_aws_provider *provider) { flb_debug("[aws_credentials] Refresh called on the http provider"); if (try_lock_provider(provider)) { + /* Force credential refresh by clearing cache and setting expired time */ + if (implementation->creds) { + flb_aws_credentials_destroy(implementation->creds); + implementation->creds = NULL; + } + /* Set to 1 (epoch start) to trigger immediate refresh via time check */ + implementation->next_refresh = 1; + ret = http_credentials_request(implementation); unlock_provider(provider); } diff --git a/src/aws/flb_aws_credentials_sts.c b/src/aws/flb_aws_credentials_sts.c index 5fbac774cf7..ec130762cdc 100644 --- a/src/aws/flb_aws_credentials_sts.c +++ b/src/aws/flb_aws_credentials_sts.c @@ -176,10 +176,15 @@ int refresh_fn_sts(struct flb_aws_provider *provider) { flb_debug("[aws_credentials] Refresh called on the STS provider"); - /* Force credential refresh by marking as expired */ - implementation->next_refresh = 0; - if (try_lock_provider(provider)) { + /* Force credential refresh by clearing cache and setting expired time */ + if (implementation->creds) { + flb_aws_credentials_destroy(implementation->creds); + implementation->creds = NULL; + } + /* Set to 1 (epoch start) to trigger immediate refresh via time check */ + implementation->next_refresh = 1; + ret = sts_assume_role_request(implementation->sts_client, &implementation->creds, implementation->uri, &implementation->next_refresh); @@ -484,10 +489,15 @@ int refresh_fn_eks(struct flb_aws_provider *provider) { flb_debug("[aws_credentials] Refresh called on the EKS provider"); - /* Force credential refresh by marking as expired */ - implementation->next_refresh = 0; - if (try_lock_provider(provider)) { + /* Force credential refresh by clearing cache and setting expired time */ + if (implementation->creds) { + flb_aws_credentials_destroy(implementation->creds); + implementation->creds = NULL; + } + /* Set to 1 (epoch start) to trigger immediate refresh via time check */ + implementation->next_refresh = 1; + ret = assume_with_web_identity(implementation); unlock_provider(provider); } From 05ecb6dabd2f50a5b4730f2b16460e2f89e8f3f4 Mon Sep 17 00:00:00 2001 From: Arbin Date: Thu, 27 Nov 2025 14:11:19 +0800 Subject: [PATCH 09/14] aws: optimize MSK IAM authentication and credential management Signed-off-by: Arbin --- src/aws/flb_aws_credentials_ec2.c | 5 ----- src/aws/flb_aws_credentials_http.c | 5 ----- src/aws/flb_aws_credentials_sts.c | 10 ---------- 3 files changed, 20 deletions(-) diff --git a/src/aws/flb_aws_credentials_ec2.c b/src/aws/flb_aws_credentials_ec2.c index d4ca79befc4..1d3ad695b8c 100644 --- a/src/aws/flb_aws_credentials_ec2.c +++ b/src/aws/flb_aws_credentials_ec2.c @@ -132,11 +132,6 @@ int refresh_fn_ec2(struct flb_aws_provider *provider) { flb_debug("[aws_credentials] Refresh called on the EC2 IMDS provider"); if (try_lock_provider(provider)) { - /* Force credential refresh by clearing cache and setting expired time */ - if (implementation->creds) { - flb_aws_credentials_destroy(implementation->creds); - implementation->creds = NULL; - } /* Set to 1 (epoch start) to trigger immediate refresh via time check */ implementation->next_refresh = 1; diff --git a/src/aws/flb_aws_credentials_http.c b/src/aws/flb_aws_credentials_http.c index b7da7f0d2d9..a4ceeca2c74 100644 --- a/src/aws/flb_aws_credentials_http.c +++ b/src/aws/flb_aws_credentials_http.c @@ -158,11 +158,6 @@ int refresh_fn_http(struct flb_aws_provider *provider) { flb_debug("[aws_credentials] Refresh called on the http provider"); if (try_lock_provider(provider)) { - /* Force credential refresh by clearing cache and setting expired time */ - if (implementation->creds) { - flb_aws_credentials_destroy(implementation->creds); - implementation->creds = NULL; - } /* Set to 1 (epoch start) to trigger immediate refresh via time check */ implementation->next_refresh = 1; diff --git a/src/aws/flb_aws_credentials_sts.c b/src/aws/flb_aws_credentials_sts.c index ec130762cdc..7546adfcc94 100644 --- a/src/aws/flb_aws_credentials_sts.c +++ b/src/aws/flb_aws_credentials_sts.c @@ -177,11 +177,6 @@ int refresh_fn_sts(struct flb_aws_provider *provider) { flb_debug("[aws_credentials] Refresh called on the STS provider"); if (try_lock_provider(provider)) { - /* Force credential refresh by clearing cache and setting expired time */ - if (implementation->creds) { - flb_aws_credentials_destroy(implementation->creds); - implementation->creds = NULL; - } /* Set to 1 (epoch start) to trigger immediate refresh via time check */ implementation->next_refresh = 1; @@ -490,11 +485,6 @@ int refresh_fn_eks(struct flb_aws_provider *provider) { flb_debug("[aws_credentials] Refresh called on the EKS provider"); if (try_lock_provider(provider)) { - /* Force credential refresh by clearing cache and setting expired time */ - if (implementation->creds) { - flb_aws_credentials_destroy(implementation->creds); - implementation->creds = NULL; - } /* Set to 1 (epoch start) to trigger immediate refresh via time check */ implementation->next_refresh = 1; From 862a4ece7da41cfcbbe3563c8bd805d8aff4a32d Mon Sep 17 00:00:00 2001 From: Arbin Date: Fri, 28 Nov 2025 08:41:03 +0800 Subject: [PATCH 10/14] fix(aws): AWS MSK IAM authentication failures on low traffic and Missing TLS support Signed-off-by: Arbin --- src/aws/flb_aws_credentials_ec2.c | 3 - src/aws/flb_aws_credentials_http.c | 3 - src/aws/flb_aws_credentials_sts.c | 6 - src/aws/flb_aws_msk_iam.c | 294 ++++++++--------------------- 4 files changed, 74 insertions(+), 232 deletions(-) diff --git a/src/aws/flb_aws_credentials_ec2.c b/src/aws/flb_aws_credentials_ec2.c index 1d3ad695b8c..9aa1444f1fb 100644 --- a/src/aws/flb_aws_credentials_ec2.c +++ b/src/aws/flb_aws_credentials_ec2.c @@ -132,9 +132,6 @@ int refresh_fn_ec2(struct flb_aws_provider *provider) { flb_debug("[aws_credentials] Refresh called on the EC2 IMDS provider"); if (try_lock_provider(provider)) { - /* Set to 1 (epoch start) to trigger immediate refresh via time check */ - implementation->next_refresh = 1; - ret = get_creds_ec2(implementation); unlock_provider(provider); } diff --git a/src/aws/flb_aws_credentials_http.c b/src/aws/flb_aws_credentials_http.c index a4ceeca2c74..8ba78b788fd 100644 --- a/src/aws/flb_aws_credentials_http.c +++ b/src/aws/flb_aws_credentials_http.c @@ -158,9 +158,6 @@ int refresh_fn_http(struct flb_aws_provider *provider) { flb_debug("[aws_credentials] Refresh called on the http provider"); if (try_lock_provider(provider)) { - /* Set to 1 (epoch start) to trigger immediate refresh via time check */ - implementation->next_refresh = 1; - ret = http_credentials_request(implementation); unlock_provider(provider); } diff --git a/src/aws/flb_aws_credentials_sts.c b/src/aws/flb_aws_credentials_sts.c index 7546adfcc94..155a41d3998 100644 --- a/src/aws/flb_aws_credentials_sts.c +++ b/src/aws/flb_aws_credentials_sts.c @@ -177,9 +177,6 @@ int refresh_fn_sts(struct flb_aws_provider *provider) { flb_debug("[aws_credentials] Refresh called on the STS provider"); if (try_lock_provider(provider)) { - /* Set to 1 (epoch start) to trigger immediate refresh via time check */ - implementation->next_refresh = 1; - ret = sts_assume_role_request(implementation->sts_client, &implementation->creds, implementation->uri, &implementation->next_refresh); @@ -485,9 +482,6 @@ int refresh_fn_eks(struct flb_aws_provider *provider) { flb_debug("[aws_credentials] Refresh called on the EKS provider"); if (try_lock_provider(provider)) { - /* Set to 1 (epoch start) to trigger immediate refresh via time check */ - implementation->next_refresh = 1; - ret = assume_with_web_identity(implementation); unlock_provider(provider); } diff --git a/src/aws/flb_aws_msk_iam.c b/src/aws/flb_aws_msk_iam.c index 38910bb3515..e6358d32175 100644 --- a/src/aws/flb_aws_msk_iam.c +++ b/src/aws/flb_aws_msk_iam.c @@ -38,16 +38,22 @@ #include #include -/* Lightweight config - provider manages credential caching and refresh internally */ +/* + * Fixed token lifetime of 3 minutes. + * This short lifetime ensures that idle Kafka connections (e.g., low-traffic inputs) + * will quickly detect token expiration when new data arrives and trigger a refresh callback, + * preventing "Access denied" errors from using expired tokens on idle connections. + */ +#define MSK_IAM_TOKEN_LIFETIME_SECONDS 180 + struct flb_aws_msk_iam { struct flb_config *flb_config; flb_sds_t region; flb_sds_t cluster_arn; - struct flb_tls *cred_tls; /* TLS instance for AWS credentials (STS) */ - struct flb_aws_provider *provider; /* AWS credentials provider (created once, reused) */ + struct flb_tls *cred_tls; + struct flb_aws_provider *provider; }; -/* Utility functions - same as before */ static int to_encode(char c) { if ((c >= '0' && c <= '9') || @@ -167,8 +173,8 @@ static char *extract_region(const char *arn) /* Payload generator - builds MSK IAM authentication payload */ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, - const char *host, - struct flb_aws_credentials *creds) + const char *host, + struct flb_aws_credentials *creds) { flb_sds_t payload = NULL; int encode_result; @@ -207,26 +213,17 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, /* Validate inputs */ if (!config || !config->region || flb_sds_len(config->region) == 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: region is not set or invalid"); + flb_error("[aws_msk_iam] region is not set or invalid"); return NULL; } if (!host || strlen(host) == 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: host is required"); - return NULL; - } - - flb_debug("[aws_msk_iam] build_msk_iam_payload: generating payload for host: %s, region: %s", - host, config->region); - - /* Validate credentials */ - if (!creds) { - flb_error("[aws_msk_iam] build_msk_iam_payload: credentials are NULL"); + flb_error("[aws_msk_iam] host is required"); return NULL; } - if (!creds->access_key_id || !creds->secret_access_key) { - flb_error("[aws_msk_iam] build_msk_iam_payload: incomplete credentials"); + if (!creds || !creds->access_key_id || !creds->secret_access_key) { + flb_error("[aws_msk_iam] invalid or incomplete credentials"); return NULL; } @@ -251,19 +248,17 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, goto error; } - /* CRITICAL: Encode the action parameter */ action_enc = uri_encode_params("kafka-cluster:Connect", 21); if (!action_enc) { goto error; } - /* Build canonical query string with ACTION parameter first (alphabetical order) */ + /* Build canonical query string */ query = flb_sds_create_size(8192); if (!query) { goto error; } - /* note: Action must be FIRST in alphabetical order */ query = flb_sds_printf(&query, "Action=%s&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=%s" "&X-Amz-Date=%s&X-Amz-Expires=900", @@ -272,27 +267,23 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, goto error; } - /* Add session token if present (before SignedHeaders alphabetically) */ + /* Add session token if present */ if (creds->session_token && flb_sds_len(creds->session_token) > 0) { session_token_enc = uri_encode_params(creds->session_token, flb_sds_len(creds->session_token)); if (!session_token_enc) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to encode session token"); goto error; } tmp = flb_sds_printf(&query, "&X-Amz-Security-Token=%s", session_token_enc); if (!tmp) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to append session token to query"); goto error; } query = tmp; } - /* Add SignedHeaders LAST (alphabetically after Security-Token) */ tmp = flb_sds_printf(&query, "&X-Amz-SignedHeaders=host"); if (!tmp) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to append SignedHeaders"); goto error; } query = tmp; @@ -303,10 +294,8 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, goto error; } - /* CRITICAL: MSK IAM canonical request format - use SHA256 of empty string, not UNSIGNED-PAYLOAD */ if (flb_hash_simple(FLB_HASH_SHA256, (unsigned char *) "", 0, empty_payload_hash, sizeof(empty_payload_hash)) != FLB_CRYPTO_SUCCESS) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to hash empty payload"); goto error; } @@ -320,17 +309,15 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, query, host, empty_payload_hex); flb_sds_destroy(empty_payload_hex); - empty_payload_hex = NULL; /* Prevent double-free */ + empty_payload_hex = NULL; if (!canonical) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to build canonical request"); goto error; } - /* Hash canonical request immediately */ + /* Hash canonical request */ if (flb_hash_simple(FLB_HASH_SHA256, (unsigned char *) canonical, flb_sds_len(canonical), sha256_buf, sizeof(sha256_buf)) != FLB_CRYPTO_SUCCESS) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to hash canonical request"); goto error; } @@ -366,34 +353,28 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, len = strlen(datestamp); if (hmac_sha256_sign(key_date, (unsigned char *) key, flb_sds_len(key), (unsigned char *) datestamp, len) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to sign date"); goto error; } - /* Clean up key immediately after use - prevent double-free */ flb_sds_destroy(key); key = NULL; len = strlen(config->region); if (hmac_sha256_sign(key_region, key_date, 32, (unsigned char *) config->region, len) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to sign region"); goto error; } if (hmac_sha256_sign(key_service, key_region, 32, (unsigned char *) "kafka-cluster", 13) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to sign service"); goto error; } if (hmac_sha256_sign(key_signing, key_service, 32, (unsigned char *) "aws4_request", 12) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to create signing key"); goto error; } if (hmac_sha256_sign(sig, key_signing, 32, (unsigned char *) string_to_sign, flb_sds_len(string_to_sign)) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to sign request"); goto error; } @@ -402,85 +383,28 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, goto error; } - /* Append signature to query */ tmp = flb_sds_printf(&query, "&X-Amz-Signature=%s", hexsig); if (!tmp) { goto error; } query = tmp; - /* Build the complete presigned URL */ - presigned_url = flb_sds_create_size(16384); - if (!presigned_url) { - goto error; - } - - presigned_url = flb_sds_printf(&presigned_url, "https://%s/?%s", host, query); - if (!presigned_url) { - goto error; - } - - /* Base64 URL encode the presigned URL */ - url_len = flb_sds_len(presigned_url); - encoded_len = ((url_len + 2) / 3) * 4 + 1; /* Base64 encoding size + null terminator */ - - payload = flb_sds_create_size(encoded_len); - if (!payload) { - goto error; - } - - encode_result = flb_base64_encode((unsigned char*) payload, encoded_len, &actual_encoded_len, - (const unsigned char*) presigned_url, url_len); - if (encode_result == -1) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to base64 encode URL"); - goto error; - } - flb_sds_len_set(payload, actual_encoded_len); - - /* Convert to Base64 URL encoding (replace + with -, / with _, remove padding =) */ - p = payload; - while (*p) { - if (*p == '+') { - *p = '-'; - } - else if (*p == '/') { - *p = '_'; - } - p++; - } - - /* Remove padding */ - len = flb_sds_len(payload); - while (len > 0 && payload[len-1] == '=') { - len--; - } - flb_sds_len_set(payload, len); - payload[len] = '\0'; - - /* Build the complete presigned URL */ - flb_sds_destroy(presigned_url); + /* Build complete presigned URL */ presigned_url = flb_sds_create_size(16384); if (!presigned_url) { goto error; } - presigned_url = flb_sds_printf(&presigned_url, "https://%s/?%s", host, query); + presigned_url = flb_sds_printf(&presigned_url, "https://%s/?%s&User-Agent=fluent-bit-msk-iam", + host, query); if (!presigned_url) { goto error; } - /* Add User-Agent parameter to the signed URL (like Go implementation) */ - tmp = flb_sds_printf(&presigned_url, "&User-Agent=fluent-bit-msk-iam"); - if (!tmp) { - goto error; - } - presigned_url = tmp; - - /* Base64 URL encode the presigned URL (RawURLEncoding - no padding like Go) */ + /* Base64 URL encode */ url_len = flb_sds_len(presigned_url); - encoded_len = ((url_len + 2) / 3) * 4 + 1; /* Base64 encoding size + null terminator */ + encoded_len = ((url_len + 2) / 3) * 4 + 1; - flb_sds_destroy(payload); payload = flb_sds_create_size(encoded_len); if (!payload) { goto error; @@ -489,14 +413,12 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, encode_result = flb_base64_encode((unsigned char*) payload, encoded_len, &actual_encoded_len, (const unsigned char *) presigned_url, url_len); if (encode_result == -1) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to base64 encode URL"); goto error; } - /* Update the SDS length to match actual encoded length */ flb_sds_len_set(payload, actual_encoded_len); - /* Convert to Base64 URL encoding AND remove padding (RawURLEncoding like Go) */ + /* Convert to Base64 URL encoding and remove padding */ p = payload; while (*p) { if (*p == '+') { @@ -508,7 +430,6 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, p++; } - /* Remove ALL padding (RawURLEncoding) */ final_len = flb_sds_len(payload); while (final_len > 0 && payload[final_len-1] == '=') { final_len--; @@ -516,7 +437,7 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, flb_sds_len_set(payload, final_len); payload[final_len] = '\0'; - /* Clean up before successful return */ + /* Clean up */ flb_sds_destroy(credential); flb_sds_destroy(credential_enc); flb_sds_destroy(canonical); @@ -533,52 +454,24 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, return payload; error: - /* Clean up everything - check for NULL to prevent double-free */ - if (credential) { - flb_sds_destroy(credential); - } - if (credential_enc) { - flb_sds_destroy(credential_enc); - } - if (canonical) { - flb_sds_destroy(canonical); - } - if (hexhash) { - flb_sds_destroy(hexhash); - } - if (string_to_sign) { - flb_sds_destroy(string_to_sign); - } - if (hexsig) { - flb_sds_destroy(hexsig); - } - if (query) { - flb_sds_destroy(query); - } - if (action_enc) { - flb_sds_destroy(action_enc); - } - if (presigned_url) { - flb_sds_destroy(presigned_url); - } - if (key) { /* Only destroy if not already destroyed */ - flb_sds_destroy(key); - } - if (payload) { - flb_sds_destroy(payload); - } - if (session_token_enc) { - flb_sds_destroy(session_token_enc); - } - if (empty_payload_hex) { - flb_sds_destroy(empty_payload_hex); - } + if (credential) flb_sds_destroy(credential); + if (credential_enc) flb_sds_destroy(credential_enc); + if (canonical) flb_sds_destroy(canonical); + if (hexhash) flb_sds_destroy(hexhash); + if (string_to_sign) flb_sds_destroy(string_to_sign); + if (hexsig) flb_sds_destroy(hexsig); + if (query) flb_sds_destroy(query); + if (action_enc) flb_sds_destroy(action_enc); + if (presigned_url) flb_sds_destroy(presigned_url); + if (key) flb_sds_destroy(key); + if (payload) flb_sds_destroy(payload); + if (session_token_enc) flb_sds_destroy(session_token_enc); + if (empty_payload_hex) flb_sds_destroy(empty_payload_hex); return NULL; } - -/* OAuth token refresh callback with credential caching */ +/* OAuth token refresh callback */ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, const char *oauthbearer_config, void *opaque) @@ -587,7 +480,7 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, flb_sds_t payload = NULL; rd_kafka_resp_err_t err; char errstr[512]; - int64_t now; + time_t now; int64_t md_lifetime_ms; const char *s3_suffix = "-s3"; size_t arn_len; @@ -599,61 +492,44 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, kafka_opaque = (struct flb_kafka_opaque *) opaque; if (!kafka_opaque || !kafka_opaque->msk_iam_ctx) { - flb_error("[aws_msk_iam] oauthbearer_token_refresh_cb: invalid opaque context"); + flb_error("[aws_msk_iam] invalid opaque context"); rd_kafka_oauthbearer_set_token_failure(rk, "invalid context"); return; } - flb_debug("[aws_msk_iam] running OAuth bearer token refresh callback"); - - /* get the msk_iam config (not persistent context!) */ config = kafka_opaque->msk_iam_ctx; - /* validate region (mandatory) */ if (!config->region || flb_sds_len(config->region) == 0) { - flb_error("[aws_msk_iam] region is not set or invalid"); + flb_error("[aws_msk_iam] region is not set"); rd_kafka_oauthbearer_set_token_failure(rk, "region not set"); return; } - /* - * Use MSK generic endpoint for IAM authentication. - * AWS MSK IAM supports both cluster-specific and generic regional endpoints. - * Generic endpoints are recommended as they work across all brokers in the region. - */ + /* Determine MSK endpoint */ if (config->cluster_arn) { arn_len = strlen(config->cluster_arn); suffix_len = strlen(s3_suffix); if (arn_len >= suffix_len && strcmp(config->cluster_arn + arn_len - suffix_len, s3_suffix) == 0) { snprintf(host, sizeof(host), "kafka-serverless.%s.amazonaws.com", config->region); - flb_debug("[aws_msk_iam] using MSK Serverless generic endpoint: %s", host); } else { snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region); - flb_debug("[aws_msk_iam] using MSK generic endpoint: %s", host); } } else { snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region); - flb_debug("[aws_msk_iam] using MSK generic endpoint: %s", host); } - flb_debug("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s", config->region, host); + flb_debug("[aws_msk_iam] OAuth token refresh callback triggered"); - /* - * Refresh credentials before generating OAuth token. - * This is necessary because provider's passive refresh only triggers when - * get_credentials is called and detects expiration. However, OAuth tokens - * are refreshed every ~15 minutes while IAM credentials expire after ~1 hour. - * If OAuth callbacks are spaced far apart, the passive refresh may not trigger - * before credentials expire, causing authentication failures. - */ - int rc = config->provider->provider_vtable->refresh(config->provider); - if (rc < 0) { - flb_warn("[aws_msk_iam] AWS provider refresh() failed (rc=%d), continuing to get_credentials()", rc); + /* Refresh credentials */ + if (config->provider->provider_vtable->refresh(config->provider) < 0) { + flb_warn("[aws_msk_iam] credential refresh failed, will retry on next callback"); + rd_kafka_oauthbearer_set_token_failure(rk, "credential refresh failed"); + return; } - /* Get credentials from provider */ + /* Get credentials */ creds = config->provider->provider_vtable->get_credentials(config->provider); if (!creds) { flb_error("[aws_msk_iam] failed to get AWS credentials from provider"); @@ -661,7 +537,7 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, return; } - /* Generate payload using credentials from provider */ + /* Generate payload */ payload = build_msk_iam_payload(config, host, creds); if (!payload) { flb_error("[aws_msk_iam] failed to generate MSK IAM payload"); @@ -670,8 +546,16 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, return; } + /* + * Set OAuth token with fixed 3-minute lifetime. + * librdkafka will trigger a refresh callback before the token expires. + * For idle connections, the refresh may be delayed until new data arrives, + * at which point librdkafka detects the expired token and triggers the callback. + * The short 3-minute lifetime ensures credentials (typically 60 minutes) are still + * valid when the callback is eventually triggered, allowing successful token regeneration. + */ now = time(NULL); - md_lifetime_ms = (now + 900) * 1000; + md_lifetime_ms = (now + MSK_IAM_TOKEN_LIFETIME_SECONDS) * 1000; err = rd_kafka_oauthbearer_set_token(rk, payload, @@ -682,25 +566,23 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, errstr, sizeof(errstr)); - /* Destroy credentials immediately after use (standard pattern) */ flb_aws_credentials_destroy(creds); - creds = NULL; if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { flb_error("[aws_msk_iam] failed to set OAuth bearer token: %s", errstr); rd_kafka_oauthbearer_set_token_failure(rk, errstr); } else { - flb_info("[aws_msk_iam] OAuth bearer token successfully set"); + flb_info("[aws_msk_iam] OAuth bearer token successfully set with %d second lifetime", + MSK_IAM_TOKEN_LIFETIME_SECONDS); } - /* Clean up - payload only (creds already destroyed) */ if (payload) { flb_sds_destroy(payload); } } -/* Register callback with lightweight config - keeps your current interface */ +/* Register OAuth callback */ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *config, rd_kafka_conf_t *kconf, const char *cluster_arn, @@ -709,26 +591,21 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con struct flb_aws_msk_iam *ctx; char *region_str; - flb_info("[aws_msk_iam] registering OAuth callback with cluster ARN: %s", cluster_arn); - if (!cluster_arn) { flb_error("[aws_msk_iam] cluster ARN is required"); return NULL; } - /* Allocate lightweight config - NO AWS provider! */ ctx = flb_calloc(1, sizeof(struct flb_aws_msk_iam)); if (!ctx) { flb_errno(); return NULL; } - /* Store the flb_config for on-demand provider creation */ ctx->flb_config = config; ctx->cluster_arn = flb_sds_create(cluster_arn); if (!ctx->cluster_arn) { - flb_error("[aws_msk_iam] failed to create cluster ARN string"); flb_free(ctx); return NULL; } @@ -736,7 +613,7 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con /* Extract region */ region_str = extract_region(cluster_arn); if (!region_str || strlen(region_str) == 0) { - flb_error("[aws_msk_iam] failed to extract region from cluster ARN: %s", cluster_arn); + flb_error("[aws_msk_iam] failed to extract region from ARN"); flb_sds_destroy(ctx->cluster_arn); flb_free(ctx); if (region_str) flb_free(region_str); @@ -747,42 +624,31 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con flb_free(region_str); if (!ctx->region) { - flb_error("[aws_msk_iam] failed to create region string"); flb_sds_destroy(ctx->cluster_arn); flb_free(ctx); return NULL; } - flb_info("[aws_msk_iam] extracted region: %s", ctx->region); - - /* Create TLS instance for AWS credentials (STS) - CRITICAL FIX */ + /* Create TLS instance */ ctx->cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, FLB_TRUE, FLB_LOG_DEBUG, - NULL, /* vhost */ - NULL, /* ca_path */ - NULL, /* ca_file */ - NULL, /* crt_file */ - NULL, /* key_file */ - NULL); /* key_passwd */ + NULL, NULL, NULL, NULL, NULL, NULL); if (!ctx->cred_tls) { - flb_error("[aws_msk_iam] failed to create TLS instance for AWS credentials"); + flb_error("[aws_msk_iam] failed to create TLS instance"); flb_sds_destroy(ctx->region); flb_sds_destroy(ctx->cluster_arn); flb_free(ctx); return NULL; } - flb_info("[aws_msk_iam] TLS instance created for AWS credentials"); - - /* Create AWS provider once - will be reused for credential refresh */ + /* Create AWS provider */ ctx->provider = flb_standard_chain_provider_create(config, ctx->cred_tls, ctx->region, - NULL, /* sts_endpoint */ - NULL, /* proxy */ + NULL, NULL, flb_aws_client_generator(), - NULL); /* profile */ + NULL); if (!ctx->provider) { flb_error("[aws_msk_iam] failed to create AWS credentials provider"); flb_tls_destroy(ctx->cred_tls); @@ -792,7 +658,7 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con return NULL; } - /* Initialize provider in sync mode (required before event loop is available) */ + /* Initialize provider */ ctx->provider->provider_vtable->sync(ctx->provider); if (ctx->provider->provider_vtable->init(ctx->provider) != 0) { flb_error("[aws_msk_iam] failed to initialize AWS credentials provider"); @@ -803,18 +669,13 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con flb_free(ctx); return NULL; } - /* Switch back to async mode */ ctx->provider->provider_vtable->async(ctx->provider); - flb_info("[aws_msk_iam] AWS credentials provider created and initialized successfully"); - - /* Set the callback and opaque */ + /* Register callback */ rd_kafka_conf_set_oauthbearer_token_refresh_cb(kconf, oauthbearer_token_refresh_cb); flb_kafka_opaque_set(opaque, NULL, ctx); rd_kafka_conf_set_opaque(kconf, opaque); - flb_info("[aws_msk_iam] OAuth callback registered successfully"); - return ctx; } @@ -825,19 +686,14 @@ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx) return; } - flb_info("[aws_msk_iam] destroying MSK IAM config"); - - /* Destroy AWS provider (provider manages its own credential caching) */ if (ctx->provider) { flb_aws_provider_destroy(ctx->provider); } - /* Clean up TLS instance - caller owns TLS lifecycle with flb_standard_chain_provider_create */ if (ctx->cred_tls) { flb_tls_destroy(ctx->cred_tls); } - /* Clean up other resources */ if (ctx->region) { flb_sds_destroy(ctx->region); } @@ -845,6 +701,4 @@ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx) flb_sds_destroy(ctx->cluster_arn); } flb_free(ctx); - - flb_info("[aws_msk_iam] MSK IAM config destroyed"); } From 6dde002facaf074bd03d5bd56f7aec3b8b884d66 Mon Sep 17 00:00:00 2001 From: Arbin Date: Fri, 28 Nov 2025 09:08:39 +0800 Subject: [PATCH 11/14] =?UTF-8?q?fix(aws):=20Fix=20potential=20overflow=20?= =?UTF-8?q?in=20md=5Flifetime=5Fms=20on=2032=E2=80=91bit=20time=5Ft?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Arbin --- src/aws/flb_aws_msk_iam.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aws/flb_aws_msk_iam.c b/src/aws/flb_aws_msk_iam.c index e6358d32175..cf220892079 100644 --- a/src/aws/flb_aws_msk_iam.c +++ b/src/aws/flb_aws_msk_iam.c @@ -555,7 +555,7 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, * valid when the callback is eventually triggered, allowing successful token regeneration. */ now = time(NULL); - md_lifetime_ms = (now + MSK_IAM_TOKEN_LIFETIME_SECONDS) * 1000; + md_lifetime_ms = ((int64_t)now + MSK_IAM_TOKEN_LIFETIME_SECONDS) * 1000; err = rd_kafka_oauthbearer_set_token(rk, payload, From f34377857f4c5f727a6f21628792a7ebcdd92163 Mon Sep 17 00:00:00 2001 From: Arbin Date: Fri, 28 Nov 2025 17:18:01 +0800 Subject: [PATCH 12/14] fix(aws): Fix AWS MSK IAM OAuth Token Expiration on Idle Connections and Add TLS support Signed-off-by: Arbin --- plugins/out_kafka/kafka_config.c | 22 ++++++++++++++++++++++ src/aws/flb_aws_msk_iam.c | 18 +++++++----------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/plugins/out_kafka/kafka_config.c b/plugins/out_kafka/kafka_config.c index b4bb9be6acf..b5eb12ace15 100644 --- a/plugins/out_kafka/kafka_config.c +++ b/plugins/out_kafka/kafka_config.c @@ -243,6 +243,28 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, return NULL; } +#ifdef FLB_HAVE_AWS_MSK_IAM + /* + * Enable SASL background callbacks for MSK IAM to ensure OAuth tokens + * are refreshed automatically even on idle connections. + * This eliminates the need for the application to call rd_kafka_poll() + * regularly for token refresh to occur. + */ + if (ctx->msk_iam) { + rd_kafka_error_t *error; + error = rd_kafka_sasl_background_callbacks_enable(ctx->kafka.rk); + if (error) { + flb_plg_warn(ctx->ins, "failed to enable SASL background callbacks: %s", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + } + else { + flb_plg_info(ctx->ins, "MSK IAM: SASL background callbacks enabled, " + "OAuth tokens will be refreshed automatically in background thread"); + } + } +#endif + #ifdef FLB_HAVE_AVRO_ENCODER /* Config AVRO */ tmp = flb_output_get_property("schema_str", ins); diff --git a/src/aws/flb_aws_msk_iam.c b/src/aws/flb_aws_msk_iam.c index cf220892079..c90c3c468d2 100644 --- a/src/aws/flb_aws_msk_iam.c +++ b/src/aws/flb_aws_msk_iam.c @@ -39,12 +39,10 @@ #include /* - * Fixed token lifetime of 3 minutes. - * This short lifetime ensures that idle Kafka connections (e.g., low-traffic inputs) - * will quickly detect token expiration when new data arrives and trigger a refresh callback, - * preventing "Access denied" errors from using expired tokens on idle connections. + * OAuth token lifetime of 5 minutes (industry standard). + * Matches AWS Go SDK and Kafka Connect implementations. */ -#define MSK_IAM_TOKEN_LIFETIME_SECONDS 180 +#define MSK_IAM_TOKEN_LIFETIME_SECONDS 300 struct flb_aws_msk_iam { struct flb_config *flb_config; @@ -547,12 +545,10 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, } /* - * Set OAuth token with fixed 3-minute lifetime. - * librdkafka will trigger a refresh callback before the token expires. - * For idle connections, the refresh may be delayed until new data arrives, - * at which point librdkafka detects the expired token and triggers the callback. - * The short 3-minute lifetime ensures credentials (typically 60 minutes) are still - * valid when the callback is eventually triggered, allowing successful token regeneration. + * Set OAuth token with fixed 5-minute lifetime (AWS industry standard). + * librdkafka's background thread will automatically trigger a refresh callback + * at 80% of the token's lifetime (4 minutes) to ensure the token never expires, + * even on completely idle connections. */ now = time(NULL); md_lifetime_ms = ((int64_t)now + MSK_IAM_TOKEN_LIFETIME_SECONDS) * 1000; From b34bff6ed9c438bbd7347d024c60612791c5659e Mon Sep 17 00:00:00 2001 From: Arbin Date: Fri, 28 Nov 2025 17:31:42 +0800 Subject: [PATCH 13/14] fix(aws): Fix AWS MSK IAM OAuth Token Expiration on Idle Connections and Add TLS support Signed-off-by: Arbin --- plugins/out_kafka/kafka_config.c | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/plugins/out_kafka/kafka_config.c b/plugins/out_kafka/kafka_config.c index b5eb12ace15..d4052889f7e 100644 --- a/plugins/out_kafka/kafka_config.c +++ b/plugins/out_kafka/kafka_config.c @@ -214,6 +214,13 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn && ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { + /* + * Enable SASL queue for background callbacks BEFORE registering OAuth callback. + * This allows librdkafka to handle OAuth token refresh in a background thread, + * which is essential for idle connections where rd_kafka_poll() is not called. + */ + rd_kafka_conf_enable_sasl_queue(ctx->conf, 1); + ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config, ctx->conf, ctx->aws_msk_iam_cluster_arn, @@ -247,21 +254,19 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, /* * Enable SASL background callbacks for MSK IAM to ensure OAuth tokens * are refreshed automatically even on idle connections. - * This eliminates the need for the application to call rd_kafka_poll() - * regularly for token refresh to occur. */ if (ctx->msk_iam) { rd_kafka_error_t *error; error = rd_kafka_sasl_background_callbacks_enable(ctx->kafka.rk); if (error) { - flb_plg_warn(ctx->ins, "failed to enable SASL background callbacks: %s", + flb_plg_error(ctx->ins, "failed to enable SASL background callbacks: %s", rd_kafka_error_string(error)); rd_kafka_error_destroy(error); + flb_out_kafka_destroy(ctx); + return NULL; } - else { - flb_plg_info(ctx->ins, "MSK IAM: SASL background callbacks enabled, " - "OAuth tokens will be refreshed automatically in background thread"); - } + flb_plg_info(ctx->ins, "MSK IAM: SASL background callbacks enabled, " + "OAuth tokens will be refreshed automatically in background thread"); } #endif From 3bbbde2f4e84ec958a0808980c76743b0805dbc6 Mon Sep 17 00:00:00 2001 From: Arbin Date: Fri, 28 Nov 2025 17:35:14 +0800 Subject: [PATCH 14/14] fix(aws): Fix AWS MSK IAM OAuth Token Expiration on Idle Connections and Add TLS support Signed-off-by: Arbin --- plugins/out_kafka/kafka_config.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/plugins/out_kafka/kafka_config.c b/plugins/out_kafka/kafka_config.c index d4052889f7e..dca5bf958c9 100644 --- a/plugins/out_kafka/kafka_config.c +++ b/plugins/out_kafka/kafka_config.c @@ -259,14 +259,15 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, rd_kafka_error_t *error; error = rd_kafka_sasl_background_callbacks_enable(ctx->kafka.rk); if (error) { - flb_plg_error(ctx->ins, "failed to enable SASL background callbacks: %s", + flb_plg_warn(ctx->ins, "failed to enable SASL background callbacks: %s. " + "OAuth tokens may not refresh on idle connections.", rd_kafka_error_string(error)); rd_kafka_error_destroy(error); - flb_out_kafka_destroy(ctx); - return NULL; } - flb_plg_info(ctx->ins, "MSK IAM: SASL background callbacks enabled, " - "OAuth tokens will be refreshed automatically in background thread"); + else { + flb_plg_info(ctx->ins, "MSK IAM: SASL background callbacks enabled, " + "OAuth tokens will be refreshed automatically in background thread"); + } } #endif