-
Notifications
You must be signed in to change notification settings - Fork 1.8k
fix: AWS MSK IAM Authentication Failures on Idle Connections and add TLS support #11211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
6e1d53c
09aa64a
1f9b74a
ca35c22
4c19da4
9aa3ebc
35bcf13
d45dab6
8434f7d
05ecb6d
862a4ec
6dde002
f343778
b34bff6
3bbbde2
8892291
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -28,6 +28,7 @@ | |||||||||||||||||||||||||||||||
| #include <fluent-bit/flb_kafka.h> | ||||||||||||||||||||||||||||||||
| #include <fluent-bit/flb_aws_credentials.h> | ||||||||||||||||||||||||||||||||
| #include <fluent-bit/aws/flb_aws_msk_iam.h> | ||||||||||||||||||||||||||||||||
| #include <fluent-bit/tls/flb_tls.h> | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| #include <fluent-bit/flb_signv4.h> | ||||||||||||||||||||||||||||||||
| #include <rdkafka.h> | ||||||||||||||||||||||||||||||||
|
|
@@ -37,11 +38,13 @@ | |||||||||||||||||||||||||||||||
| #include <string.h> | ||||||||||||||||||||||||||||||||
| #include <time.h> | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Lightweight config - NO persistent AWS provider */ | ||||||||||||||||||||||||||||||||
| /* Lightweight config - provider manages credential caching and refresh internally */ | ||||||||||||||||||||||||||||||||
| struct flb_aws_msk_iam { | ||||||||||||||||||||||||||||||||
| struct flb_config *flb_config; /* For creating AWS provider on-demand */ | ||||||||||||||||||||||||||||||||
| struct flb_config *flb_config; | ||||||||||||||||||||||||||||||||
| flb_sds_t region; | ||||||||||||||||||||||||||||||||
| flb_sds_t cluster_arn; | ||||||||||||||||||||||||||||||||
| struct flb_tls *cred_tls; /* TLS instance for AWS credentials (STS) */ | ||||||||||||||||||||||||||||||||
| struct flb_aws_provider *provider; /* AWS credentials provider (created once, reused) */ | ||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Utility functions - same as before */ | ||||||||||||||||||||||||||||||||
|
|
@@ -162,12 +165,11 @@ static char *extract_region(const char *arn) | |||||||||||||||||||||||||||||||
| return out; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Stateless payload generator - creates AWS provider on demand */ | ||||||||||||||||||||||||||||||||
| /* Payload generator - builds MSK IAM authentication payload */ | ||||||||||||||||||||||||||||||||
| static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, | ||||||||||||||||||||||||||||||||
| const char *host) | ||||||||||||||||||||||||||||||||
| const char *host, | ||||||||||||||||||||||||||||||||
| struct flb_aws_credentials *creds) | ||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||
| struct flb_aws_provider *temp_provider = NULL; | ||||||||||||||||||||||||||||||||
| struct flb_aws_credentials *creds = NULL; | ||||||||||||||||||||||||||||||||
| flb_sds_t payload = NULL; | ||||||||||||||||||||||||||||||||
| int encode_result; | ||||||||||||||||||||||||||||||||
| char *p; | ||||||||||||||||||||||||||||||||
|
|
@@ -214,37 +216,17 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, | |||||||||||||||||||||||||||||||
| return NULL; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| flb_info("[aws_msk_iam] build_msk_iam_payload: generating payload for host: %s, region: %s", | ||||||||||||||||||||||||||||||||
| host, config->region); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Create AWS provider on-demand */ | ||||||||||||||||||||||||||||||||
| temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL, | ||||||||||||||||||||||||||||||||
| config->region, NULL, NULL, | ||||||||||||||||||||||||||||||||
| flb_aws_client_generator(), | ||||||||||||||||||||||||||||||||
| NULL); | ||||||||||||||||||||||||||||||||
| if (!temp_provider) { | ||||||||||||||||||||||||||||||||
| flb_error("[aws_msk_iam] build_msk_iam_payload: failed to create AWS credentials provider"); | ||||||||||||||||||||||||||||||||
| return NULL; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| if (temp_provider->provider_vtable->init(temp_provider) != 0) { | ||||||||||||||||||||||||||||||||
| flb_error("[aws_msk_iam] build_msk_iam_payload: failed to initialize AWS credentials provider"); | ||||||||||||||||||||||||||||||||
| flb_aws_provider_destroy(temp_provider); | ||||||||||||||||||||||||||||||||
| return NULL; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| flb_debug("[aws_msk_iam] build_msk_iam_payload: generating payload for host: %s, region: %s", | ||||||||||||||||||||||||||||||||
| host, config->region); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Get credentials */ | ||||||||||||||||||||||||||||||||
| creds = temp_provider->provider_vtable->get_credentials(temp_provider); | ||||||||||||||||||||||||||||||||
| /* Validate credentials */ | ||||||||||||||||||||||||||||||||
| if (!creds) { | ||||||||||||||||||||||||||||||||
| flb_error("[aws_msk_iam] build_msk_iam_payload: failed to get credentials"); | ||||||||||||||||||||||||||||||||
| flb_aws_provider_destroy(temp_provider); | ||||||||||||||||||||||||||||||||
| flb_error("[aws_msk_iam] build_msk_iam_payload: credentials are NULL"); | ||||||||||||||||||||||||||||||||
| return NULL; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| if (!creds->access_key_id || !creds->secret_access_key) { | ||||||||||||||||||||||||||||||||
| flb_error("[aws_msk_iam] build_msk_iam_payload: incomplete credentials"); | ||||||||||||||||||||||||||||||||
| flb_aws_credentials_destroy(creds); | ||||||||||||||||||||||||||||||||
| flb_aws_provider_destroy(temp_provider); | ||||||||||||||||||||||||||||||||
| return NULL; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
|
|
@@ -547,12 +529,6 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, | |||||||||||||||||||||||||||||||
| if (session_token_enc) { | ||||||||||||||||||||||||||||||||
| flb_sds_destroy(session_token_enc); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| if (creds) { | ||||||||||||||||||||||||||||||||
| flb_aws_credentials_destroy(creds); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| if (temp_provider) { | ||||||||||||||||||||||||||||||||
| flb_aws_provider_destroy(temp_provider); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| return payload; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
|
|
@@ -594,18 +570,12 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, | |||||||||||||||||||||||||||||||
| if (session_token_enc) { | ||||||||||||||||||||||||||||||||
| flb_sds_destroy(session_token_enc); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| if (creds) { | ||||||||||||||||||||||||||||||||
| flb_aws_credentials_destroy(creds); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| if (temp_provider) { | ||||||||||||||||||||||||||||||||
| flb_aws_provider_destroy(temp_provider); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| return NULL; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Stateless callback - creates AWS provider on-demand for each refresh */ | ||||||||||||||||||||||||||||||||
| /* OAuth token refresh callback with credential caching */ | ||||||||||||||||||||||||||||||||
| static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, | ||||||||||||||||||||||||||||||||
| const char *oauthbearer_config, | ||||||||||||||||||||||||||||||||
| void *opaque) | ||||||||||||||||||||||||||||||||
|
|
@@ -622,7 +592,6 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, | |||||||||||||||||||||||||||||||
| struct flb_aws_msk_iam *config; | ||||||||||||||||||||||||||||||||
| struct flb_aws_credentials *creds = NULL; | ||||||||||||||||||||||||||||||||
| struct flb_kafka_opaque *kafka_opaque; | ||||||||||||||||||||||||||||||||
| struct flb_aws_provider *temp_provider = NULL; | ||||||||||||||||||||||||||||||||
| (void) oauthbearer_config; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| kafka_opaque = (struct flb_kafka_opaque *) opaque; | ||||||||||||||||||||||||||||||||
|
|
@@ -644,57 +613,73 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, | |||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Determine host endpoint */ | ||||||||||||||||||||||||||||||||
| /* | ||||||||||||||||||||||||||||||||
| * Use MSK generic endpoint for IAM authentication. | ||||||||||||||||||||||||||||||||
| * AWS MSK IAM supports both cluster-specific and generic regional endpoints. | ||||||||||||||||||||||||||||||||
| * Generic endpoints are recommended as they work across all brokers in the region. | ||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||
| if (config->cluster_arn) { | ||||||||||||||||||||||||||||||||
| arn_len = strlen(config->cluster_arn); | ||||||||||||||||||||||||||||||||
| suffix_len = strlen(s3_suffix); | ||||||||||||||||||||||||||||||||
| if (arn_len >= suffix_len && strcmp(config->cluster_arn + arn_len - suffix_len, s3_suffix) == 0) { | ||||||||||||||||||||||||||||||||
| snprintf(host, sizeof(host), "kafka-serverless.%s.amazonaws.com", config->region); | ||||||||||||||||||||||||||||||||
| flb_info("[aws_msk_iam] MSK Serverless cluster, using generic endpoint: %s", host); | ||||||||||||||||||||||||||||||||
| flb_debug("[aws_msk_iam] using MSK Serverless generic endpoint: %s", host); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| else { | ||||||||||||||||||||||||||||||||
| snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region); | ||||||||||||||||||||||||||||||||
| flb_info("[aws_msk_iam] Regular MSK cluster, using generic endpoint: %s", host); | ||||||||||||||||||||||||||||||||
| flb_debug("[aws_msk_iam] using MSK generic endpoint: %s", host); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| else { | ||||||||||||||||||||||||||||||||
| snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region); | ||||||||||||||||||||||||||||||||
| flb_info("[aws_msk_iam] Regular MSK cluster, using generic endpoint: %s", host); | ||||||||||||||||||||||||||||||||
| flb_debug("[aws_msk_iam] using MSK generic endpoint: %s", host); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| flb_info("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s", config->region, host); | ||||||||||||||||||||||||||||||||
| flb_debug("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s", config->region, host); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* | ||||||||||||||||||||||||||||||||
| * Refresh credentials before generating OAuth token. | ||||||||||||||||||||||||||||||||
| * This is necessary because provider's passive refresh only triggers when | ||||||||||||||||||||||||||||||||
| * get_credentials is called and detects expiration. However, OAuth tokens | ||||||||||||||||||||||||||||||||
| * are refreshed every ~15 minutes while IAM credentials expire after ~1 hour. | ||||||||||||||||||||||||||||||||
| * If OAuth callbacks are spaced far apart, the passive refresh may not trigger | ||||||||||||||||||||||||||||||||
| * before credentials expire, causing authentication failures. | ||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||
| config->provider->provider_vtable->refresh(config->provider); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Generate payload using stateless function - creates and destroys AWS provider internally */ | ||||||||||||||||||||||||||||||||
| payload = build_msk_iam_payload(config, host); | ||||||||||||||||||||||||||||||||
| /* Get credentials from provider */ | ||||||||||||||||||||||||||||||||
| creds = config->provider->provider_vtable->get_credentials(config->provider); | ||||||||||||||||||||||||||||||||
| if (!creds) { | ||||||||||||||||||||||||||||||||
| flb_error("[aws_msk_iam] failed to get AWS credentials from provider"); | ||||||||||||||||||||||||||||||||
| rd_kafka_oauthbearer_set_token_failure(rk, "credential retrieval failed"); | ||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Generate payload using credentials from provider */ | ||||||||||||||||||||||||||||||||
| payload = build_msk_iam_payload(config, host, creds); | ||||||||||||||||||||||||||||||||
| if (!payload) { | ||||||||||||||||||||||||||||||||
| flb_error("[aws_msk_iam] failed to generate MSK IAM payload"); | ||||||||||||||||||||||||||||||||
| flb_aws_credentials_destroy(creds); | ||||||||||||||||||||||||||||||||
| rd_kafka_oauthbearer_set_token_failure(rk, "payload generation failed"); | ||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Get credentials for principal (create temporary provider just for this) */ | ||||||||||||||||||||||||||||||||
| temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL, | ||||||||||||||||||||||||||||||||
| config->region, NULL, NULL, | ||||||||||||||||||||||||||||||||
| flb_aws_client_generator(), | ||||||||||||||||||||||||||||||||
| NULL); | ||||||||||||||||||||||||||||||||
| if (temp_provider) { | ||||||||||||||||||||||||||||||||
| if (temp_provider->provider_vtable->init(temp_provider) == 0) { | ||||||||||||||||||||||||||||||||
| creds = temp_provider->provider_vtable->get_credentials(temp_provider); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| now = time(NULL); | ||||||||||||||||||||||||||||||||
| md_lifetime_ms = (now + 900) * 1000; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| err = rd_kafka_oauthbearer_set_token(rk, | ||||||||||||||||||||||||||||||||
| payload, | ||||||||||||||||||||||||||||||||
| md_lifetime_ms, | ||||||||||||||||||||||||||||||||
| creds ? creds->access_key_id : "unknown", | ||||||||||||||||||||||||||||||||
| creds->access_key_id, | ||||||||||||||||||||||||||||||||
| NULL, | ||||||||||||||||||||||||||||||||
| 0, | ||||||||||||||||||||||||||||||||
| errstr, | ||||||||||||||||||||||||||||||||
| sizeof(errstr)); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Destroy credentials immediately after use (standard pattern) */ | ||||||||||||||||||||||||||||||||
| flb_aws_credentials_destroy(creds); | ||||||||||||||||||||||||||||||||
| creds = NULL; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { | ||||||||||||||||||||||||||||||||
| flb_error("[aws_msk_iam] failed to set OAuth bearer token: %s", errstr); | ||||||||||||||||||||||||||||||||
| rd_kafka_oauthbearer_set_token_failure(rk, errstr); | ||||||||||||||||||||||||||||||||
|
|
@@ -703,14 +688,7 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, | |||||||||||||||||||||||||||||||
| flb_info("[aws_msk_iam] OAuth bearer token successfully set"); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Clean up everything immediately - no memory leaks possible! */ | ||||||||||||||||||||||||||||||||
| if (creds) { | ||||||||||||||||||||||||||||||||
| flb_aws_credentials_destroy(creds); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| if (temp_provider) { | ||||||||||||||||||||||||||||||||
| flb_aws_provider_destroy(temp_provider); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Clean up - payload only (creds already destroyed) */ | ||||||||||||||||||||||||||||||||
| if (payload) { | ||||||||||||||||||||||||||||||||
| flb_sds_destroy(payload); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
@@ -771,6 +749,59 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con | |||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| flb_info("[aws_msk_iam] extracted region: %s", ctx->region); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Create TLS instance for AWS credentials (STS) - CRITICAL FIX */ | ||||||||||||||||||||||||||||||||
| ctx->cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, | ||||||||||||||||||||||||||||||||
| FLB_TRUE, | ||||||||||||||||||||||||||||||||
| FLB_LOG_DEBUG, | ||||||||||||||||||||||||||||||||
| NULL, /* vhost */ | ||||||||||||||||||||||||||||||||
| NULL, /* ca_path */ | ||||||||||||||||||||||||||||||||
| NULL, /* ca_file */ | ||||||||||||||||||||||||||||||||
| NULL, /* crt_file */ | ||||||||||||||||||||||||||||||||
| NULL, /* key_file */ | ||||||||||||||||||||||||||||||||
| NULL); /* key_passwd */ | ||||||||||||||||||||||||||||||||
| if (!ctx->cred_tls) { | ||||||||||||||||||||||||||||||||
| flb_error("[aws_msk_iam] failed to create TLS instance for AWS credentials"); | ||||||||||||||||||||||||||||||||
| flb_sds_destroy(ctx->region); | ||||||||||||||||||||||||||||||||
| flb_sds_destroy(ctx->cluster_arn); | ||||||||||||||||||||||||||||||||
| flb_free(ctx); | ||||||||||||||||||||||||||||||||
| return NULL; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| flb_info("[aws_msk_iam] TLS instance created for AWS credentials"); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Create AWS provider once - will be reused for credential refresh */ | ||||||||||||||||||||||||||||||||
| ctx->provider = flb_standard_chain_provider_create(config, | ||||||||||||||||||||||||||||||||
| ctx->cred_tls, | ||||||||||||||||||||||||||||||||
| ctx->region, | ||||||||||||||||||||||||||||||||
| NULL, /* sts_endpoint */ | ||||||||||||||||||||||||||||||||
| NULL, /* proxy */ | ||||||||||||||||||||||||||||||||
| flb_aws_client_generator(), | ||||||||||||||||||||||||||||||||
| NULL); /* profile */ | ||||||||||||||||||||||||||||||||
| if (!ctx->provider) { | ||||||||||||||||||||||||||||||||
| flb_error("[aws_msk_iam] failed to create AWS credentials provider"); | ||||||||||||||||||||||||||||||||
| flb_tls_destroy(ctx->cred_tls); | ||||||||||||||||||||||||||||||||
| flb_sds_destroy(ctx->region); | ||||||||||||||||||||||||||||||||
| flb_sds_destroy(ctx->cluster_arn); | ||||||||||||||||||||||||||||||||
| flb_free(ctx); | ||||||||||||||||||||||||||||||||
| return NULL; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Initialize provider in sync mode (required before event loop is available) */ | ||||||||||||||||||||||||||||||||
| ctx->provider->provider_vtable->sync(ctx->provider); | ||||||||||||||||||||||||||||||||
| if (ctx->provider->provider_vtable->init(ctx->provider) != 0) { | ||||||||||||||||||||||||||||||||
| flb_error("[aws_msk_iam] failed to initialize AWS credentials provider"); | ||||||||||||||||||||||||||||||||
| flb_aws_provider_destroy(ctx->provider); | ||||||||||||||||||||||||||||||||
| flb_tls_destroy(ctx->cred_tls); | ||||||||||||||||||||||||||||||||
| flb_sds_destroy(ctx->region); | ||||||||||||||||||||||||||||||||
| flb_sds_destroy(ctx->cluster_arn); | ||||||||||||||||||||||||||||||||
| flb_free(ctx); | ||||||||||||||||||||||||||||||||
| return NULL; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| /* Switch back to async mode */ | ||||||||||||||||||||||||||||||||
| ctx->provider->provider_vtable->async(ctx->provider); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| flb_info("[aws_msk_iam] AWS credentials provider created and initialized successfully"); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Set the callback and opaque */ | ||||||||||||||||||||||||||||||||
| rd_kafka_conf_set_oauthbearer_token_refresh_cb(kconf, oauthbearer_token_refresh_cb); | ||||||||||||||||||||||||||||||||
| flb_kafka_opaque_set(opaque, NULL, ctx); | ||||||||||||||||||||||||||||||||
|
|
@@ -781,7 +812,7 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con | |||||||||||||||||||||||||||||||
| return ctx; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Simple destroy - just config cleanup, no AWS provider to leak! */ | ||||||||||||||||||||||||||||||||
| /* Destroy MSK IAM config */ | ||||||||||||||||||||||||||||||||
| void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx) | ||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||
| if (!ctx) { | ||||||||||||||||||||||||||||||||
|
|
@@ -790,12 +821,24 @@ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx) | |||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| flb_info("[aws_msk_iam] destroying MSK IAM config"); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* NO AWS provider to destroy! */ | ||||||||||||||||||||||||||||||||
| /* Destroy AWS provider (provider manages its own credential caching) */ | ||||||||||||||||||||||||||||||||
| if (ctx->provider) { | ||||||||||||||||||||||||||||||||
| flb_aws_provider_destroy(ctx->provider); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /* Clean up TLS instance */ | ||||||||||||||||||||||||||||||||
| if (ctx->cred_tls) { | ||||||||||||||||||||||||||||||||
| flb_tls_destroy(ctx->cred_tls); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
| /* Destroy AWS provider (provider manages its own credential caching) */ | |
| if (ctx->provider) { | |
| flb_aws_provider_destroy(ctx->provider); | |
| } | |
| /* Clean up TLS instance */ | |
| if (ctx->cred_tls) { | |
| flb_tls_destroy(ctx->cred_tls); | |
| } | |
| /* Destroy AWS provider (provider manages its own credential caching) */ | |
| if (ctx->provider) { | |
| flb_aws_provider_destroy(ctx->provider); | |
| } | |
| /* Note: TLS instance is owned and destroyed by the provider */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a double-free issue. The current code is correct.
After thorough analysis of the codebase, I can confirm that there is no double-free risk in the current implementation. Here's why:
TLS Ownership Model
There are two different credential provider creation patterns in the codebase with different ownership semantics:
-
flb_standard_chain_provider_create(used by MSK IAM):- Does NOT take ownership of the TLS instance
- The TLS is only passed as a parameter to sub-providers (e.g., EKS provider) for their use
- The
provider->cred_tlsfield is never set in this path - The caller remains responsible for managing the TLS lifecycle
-
flb_managed_chain_provider_create(used elsewhere):- Takes ownership of the TLS instance
- Stores it in
provider->cred_tls flb_aws_provider_destroyautomatically destroys it
Code Evidence
In src/aws/flb_aws_credentials.c, the flb_aws_provider_destroy function only destroys the TLS if it's present:
void flb_aws_provider_destroy(struct flb_aws_provider *provider)
{
if (provider) {
// ...
/* free managed dependencies */
if (provider->cred_tls) {
flb_tls_destroy(provider->cred_tls);
}
// ...
}
}Since flb_standard_chain_provider_create never assigns the TLS to provider->cred_tls, calling flb_aws_provider_destroy will not attempt to destroy the TLS instance.
Conclusion
The MSK IAM module correctly manages the TLS instance it creates. The explicit call to flb_tls_destroy(ctx->cred_tls) in flb_aws_msk_iam_destroy is necessary and correct because the provider does not take ownership of the TLS in this code path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find and examine the implementation of flb_standard_chain_provider_create
echo "=== Searching for flb_standard_chain_provider_create implementation ==="
rg -n -A 30 "flb_standard_chain_provider_create\s*\(" --type c
echo ""
echo "=== Searching for flb_managed_chain_provider_create implementation ==="
rg -n -A 30 "flb_managed_chain_provider_create\s*\(" --type c
echo ""
echo "=== Examining flb_aws_provider_destroy for TLS cleanup ==="
rg -n -B 5 -A 10 "if \(provider->cred_tls\)" --type c
Length of output: 45861
🏁 Script executed:
#!/bin/bash
# Find the implementation of standard_chain_create to verify TLS ownership
echo "=== Searching for standard_chain_create implementation ==="
rg -n -B 5 -A 50 "^static.*standard_chain_create\s*\(" --type c src/aws/
echo ""
echo "=== Looking for assignments to provider->cred_tls in standard_chain_create context ==="
rg -n "provider->cred_tls\s*=" --type c src/aws/
Length of output: 8230
🏁 Script executed:
#!/bin/bash
# Get more context around the cred_tls assignments
echo "=== Context around provider->cred_tls assignments (lines 460-490) ==="
rg -n -C 15 "aws_provider->cred_tls\s*=" --type c src/aws/flb_aws_credentials.c
echo ""
echo "=== Verify standard_chain_create does not set cred_tls ==="
rg -n -A 100 "^static struct flb_aws_provider \*standard_chain_create" --type c src/aws/flb_aws_credentials.c | head -120
Length of output: 6596
@kalavt, thank you for the thorough analysis! After examining the codebase, I can confirm that you are absolutely correct—there is no double-free risk in this code.
Verification Results
The code evidence clearly shows two distinct ownership patterns:
1. flb_standard_chain_provider_create (used by MSK IAM):
- Does NOT set
provider->cred_tls - The TLS instance is only passed to sub-providers (like EKS provider) for their use
- The caller retains ownership and is responsible for cleanup
2. flb_managed_chain_provider_create (used elsewhere):
- DOES set
aws_provider->cred_tls = cred_tls;(line 471) - Takes ownership of the TLS instance
flb_aws_provider_destroywill automatically clean it up
Why This Code is Safe
In flb_aws_provider_destroy (lines 782-784):
if (provider->cred_tls) {
flb_tls_destroy(provider->cred_tls);
}Since flb_standard_chain_provider_create never sets provider->cred_tls, the condition evaluates to false, and the TLS is not destroyed by the provider. The explicit call to flb_tls_destroy(ctx->cred_tls) in MSK IAM's destroy function is necessary and correct.
The error-handling paths in MSK IAM's initialization (lines 775, 786) also demonstrate the correct ownership model by explicitly destroying ctx->cred_tls after provider cleanup.
Your analysis is spot-on, and the current implementation properly follows the ownership semantics of the credential provider system. No changes are needed.
🧬 Code Graph Analysis Results
-
File: src/aws/flb_aws_credentials.c
- Snippet: flb_aws_provider_destroy function (complete)
-
Summary/Context:
- Description: Destroys an AWS provider and its managed TLS credential (cred_tls) if present.
- Parameters: struct flb_aws_provider *provider
- Return: void
- Behavior:
- If provider is non-NULL, destroys provider implementation, mutex, and any managed dependencies.
- If provider->cred_tls is non-NULL, calls flb_tls_destroy(provider->cred_tls).
- If provider->sts_tls is non-NULL, calls flb_tls_destroy(provider->sts_tls).
- Frees the provider structure.
-
Code:
void flb_aws_provider_destroy(struct flb_aws_provider *provider)
{
if (provider) {
if (provider->implementation) {
provider->provider_vtable->destroy(provider);
}pthread_mutex_destroy(&provider->lock); /* free managed dependencies */ if (provider->base_aws_provider) { flb_aws_provider_destroy(provider->base_aws_provider); } if (provider->cred_tls) { flb_tls_destroy(provider->cred_tls); } if (provider->sts_tls) { flb_tls_destroy(provider->sts_tls); } flb_free(provider); }}
-
- Snippet: flb_aws_provider_destroy function (complete)
-
File: src/aws/flb_aws_credentials.c
- Snippet: flb_standard_chain_provider_create function (complete)
-
Summary/Context:
- Description: Creates a standard AWS credentials provider chain. Accepts an optional TLS instance and region, and may wrap the base provider with additional layers (e.g., EKS Pod execution role for Fargate). The function may create a base provider via standard_chain_create and, in the EKS Pod Fargate path, wraps it with an STS provider.
- Parameters:
- config: struct flb_config*
- tls: struct flb_tls* (TLS instance; ownership semantics depend on caller path)
- region: char* (region string)
- sts_endpoint: char* (STS endpoint)
- proxy: char* (proxy)
- generator: struct flb_aws_client_generator*
- profile: char* (profile)
- Return: struct flb_aws_provider*
- Exceptions/Errors: Returns NULL on allocation/creation failure; ensures proper cleanup on error paths.
- Important implementation details:
- If EKS Pod Execution Role environment variable is set, it creates a temporary base provider with TLS but does not transfer TLS ownership to the base provider; instead, it wraps with an STS pod provider.
- In non-EKS-Fargate path, it creates a standard chain provider using the provided TLS and region.
-
Code:
struct flb_aws_provider *flb_standard_chain_provider_create(struct flb_config
*config,
struct flb_tls *tls,
char *region,
char *sts_endpoint,
char *proxy,
struct
flb_aws_client_generator
*generator,
char *profile)
{
struct flb_aws_provider *provider;
struct flb_aws_provider *tmp_provider;
char *eks_pod_role = NULL;
char *session_name;eks_pod_role = getenv(EKS_POD_EXECUTION_ROLE); if (eks_pod_role && strlen(eks_pod_role) > 0) { /* * eks fargate * standard chain will be base provider used to * assume the EKS_POD_EXECUTION_ROLE */ flb_debug("[aws_credentials] Using EKS_POD_EXECUTION_ROLE=%s", eks_pod_role); tmp_provider = standard_chain_create(config, tls, region, sts_endpoint, proxy, generator, FLB_FALSE, profile); if (!tmp_provider) { return NULL; } session_name = flb_sts_session_name(); if (!session_name) { flb_error("Failed to generate random STS session name"); flb_aws_provider_destroy(tmp_provider); return NULL; } provider = flb_sts_provider_create(config, tls, tmp_provider, NULL, eks_pod_role, session_name, region, sts_endpoint, NULL, generator); if (!provider) { flb_error("Failed to create EKS Fargate Credential Provider"); flb_aws_provider_destroy(tmp_provider); return NULL; } /* session name can freed after provider is created */ flb_free(session_name); session_name = NULL; return provider; } /* standard case- not in EKS Fargate */ provider = standard_chain_create(config, tls, region, sts_endpoint, proxy, generator, FLB_TRUE, profile); return provider;}
-
- Snippet: flb_standard_chain_provider_create function (complete)
-
File: src/aws/flb_aws_msk_iam.c
-
Snippet: MSK IAM destroy logic (summary)
-
Summary/Context:
- Description: Cleans up MSK IAM context. Destroys the AWS provider (which manages credential caching) and then destroys the TLS instance used for AWS credentials if present.
- Parameters: struct flb_aws_msk_iam *ctx
- Return: void
- Important implementation details:
- Calls flb_aws_provider_destroy(ctx->provider) to clean up the provider (which may destroy cred_tls if owned by the provider).
- Destroys ctx->cred_tls with flb_tls_destroy if it exists.
-
Code (excerpt from the file):
void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx)
{
if (!ctx) {
return;
}flb_info("[aws_msk_iam] destroying MSK IAM config"); /* Destroy AWS provider (provider manages its own credential caching) */ if (ctx->provider) { flb_aws_provider_destroy(ctx->provider); } /* Clean up TLS instance */ if (ctx->cred_tls) { flb_tls_destroy(ctx->cred_tls); } /* Clean up other resources */ if (ctx->region) { flb_sds_destroy(ctx->region); } if (ctx->cluster_arn) { flb_sds_destroy(ctx->cluster_arn); } flb_free(ctx); flb_info("[aws_msk_iam] MSK IAM config destroyed");}
-
-
Snippet: MSK IAM provider/provider lifecycle and TLS ownership notes (from file)
- Context:
- The code creates a TLS instance for AWS credentials (STS) and passes that TLS handle to the standard credential provider creation path. In this path, the provider does not take ownership of the TLS; the TLS is owned by the MSK IAM module and is explicitly destroyed in its destroy function.
- This aligns with the described ownership model where flb_standard_chain_provider_create does not assign the TLS to provider->cred_tls, whereas flb_managed_chain_provider_create would take ownership and be able to destroy it via flb_aws_provider_destroy.
- Context:
-
Uh oh!
There was an error while loading. Please reload this page.