Skip to content

Conversation

@kalavt
Copy link

@kalavt kalavt commented Nov 26, 2025

Fix AWS MSK IAM OAuth Token Expiration on Idle Connections and add TLS support

Summary

This PR fixes AWS MSK IAM authentication failures on idle Kafka connections by enabling librdkafka's SASL background callback mechanism, which allows OAuth token refresh to occur automatically in a background thread without requiring the application to call rd_kafka_poll().

Key Change:

  • Enabled rd_kafka_sasl_background_callbacks_enable() for MSK IAM authenticated producers
  • OAuth tokens are refreshed automatically in librdkafka's background thread
  • Maintains 5-minute OAuth token lifetime as per AWS Go SDK standard
  • Works perfectly for completely idle connections

Problem Statement

Observed Symptom

After prolonged idle periods, Kafka outputs experience authentication failures:

[error] SASL authentication error: Access denied (after 302ms in state AUTH_REQ)
[error] 3/3 brokers are down

Pattern:

  • Only affects idle connections (e.g., audit.log with low traffic)
  • OAuth tokens expire after 5 minutes
  • librdkafka does NOT proactively refresh tokens on idle connections

Root Cause: librdkafka OAuth Refresh Requires rd_kafka_poll()

Critical finding:

librdkafka's OAuth token refresh mechanism relies on a timer that only runs when the application calls rd_kafka_poll() or similar functions. This is documented in librdkafka issue #3871:

librdkafka maintainer:
"You need to explicitly call poll() once after creating the client to trigger the oauth callback"

Fluent Bit's current integration:

int produce_message(...) {
    ret = rd_kafka_produce(...);
    rd_kafka_poll(ctx->kafka.rk, 0);  // Only called when producing messages
}

Timeline on idle connection:

T=0:     Connection established, OAuth token set (5-min lifetime)
T=1-5min: No messages to produce → rd_kafka_poll() never called
T=5min:  Token expires ❌
T=10min: New data arrives, rd_kafka_poll() called
         ├─ librdkafka tries to use expired token
         └─> Access Denied ❌

Solution: Enable SASL Background Callbacks

librdkafka v1.9.0+ provides rd_kafka_sasl_background_callbacks_enable() specifically for this use case.

From librdkafka API documentation:

Enable SASL OAUTHBEARER refresh callbacks on the librdkafka background thread.

This serves as an alternative for applications that do NOT call rd_kafka_poll()
(et.al.) at regular intervals (or not at all), as a means of automatically
trigger the refresh callbacks.

Implementation

Modified plugins/out_kafka/kafka_config.c:

/* Kafka Producer */
ctx->kafka.rk = rd_kafka_new(RD_KAFKA_PRODUCER, ctx->conf, errstr, sizeof(errstr));

#ifdef FLB_HAVE_AWS_MSK_IAM
/*
 * Enable SASL background callbacks for MSK IAM to ensure OAuth tokens
 * are refreshed automatically even on idle connections.
 * This eliminates the need for the application to call rd_kafka_poll()
 * regularly for token refresh to occur.
 */
if (ctx->msk_iam) {
    rd_kafka_error_t *error;
    error = rd_kafka_sasl_background_callbacks_enable(ctx->kafka.rk);
    if (error) {
        flb_plg_warn(ctx->ins, "failed to enable SASL background callbacks: %s",
                     rd_kafka_error_string(error));
        rd_kafka_error_destroy(error);
    }
    else {
        flb_plg_info(ctx->ins, "MSK IAM: SASL background callbacks enabled, "
                     "OAuth tokens will be refreshed automatically in background thread");
    }
}
#endif

How It Works

With background callbacks enabled:

T=0:     Connection established, OAuth token set (5-min lifetime)
         ├─ librdkafka starts background thread
         └─ Token refresh timer active in background

T=4min:  Background thread detects token at 80% lifetime
         ├─ Automatically triggers oauthbearer_token_refresh_cb()
         ├─ New token generated (fresh 5-min lifetime)
         └─> Token refreshed ✅

T=8min:  Background thread refreshes again
T=12min: Background thread refreshes again
...

Result: Token NEVER expires, even with ZERO data flow

Key benefits:

  • Completely automatic - no application involvement needed
  • Works on idle connections - background thread runs independently
  • No rd_kafka_poll() required - eliminates the root cause
  • Built-in librdkafka feature - proven and supported

Performance Impact

Background Thread Overhead

Resource usage:

  • One additional background thread per producer instance
  • Thread runs librdkafka's internal timer loop
  • Minimal CPU usage (mostly sleeping, wakes on timer events)

Token refresh frequency:

  • Active connections: Every ~4 minutes (80% of 5-min lifetime)
  • Idle connections: Same as active (background thread ensures this)
  • No difference in behavior between active and idle

Comparison with Current Behavior

Before (with regular rd_kafka_poll calls):

Token refresh: Only when rd_kafka_poll() is called
Idle connections: Token expires after 5 minutes ❌

After (with background callbacks):

Token refresh: Automatic every ~4 minutes
Idle connections: Token refreshes automatically ✅
Additional cost: One background thread per producer

Files Modified

plugins/out_kafka/kafka_config.c

Added SASL background callback enable:

  • Call rd_kafka_sasl_background_callbacks_enable() after creating rd_kafka_t
  • Only when MSK IAM is configured
  • Handles errors gracefully with warnings
  • Logs success for troubleshooting

Code added (~15 lines):

if (ctx->msk_iam) {
    rd_kafka_error_t *error;
    error = rd_kafka_sasl_background_callbacks_enable(ctx->kafka.rk);
    if (error) {
        flb_plg_warn(ctx->ins, "failed to enable SASL background callbacks: %s",
                     rd_kafka_error_string(error));
        rd_kafka_error_destroy(error);
    }
    else {
        flb_plg_info(ctx->ins, "MSK IAM: SASL background callbacks enabled, "
                     "OAuth tokens will be refreshed automatically in background thread");
    }
}

Testing

Production Validation

Environment:

  • AWS EKS with IRSA
  • 3 Kafka outputs with varying traffic patterns
  • librdkafka 2.10.1

Test Duration: 12+ hours continuous operation

Results:

Before: Authentication failures on idle outputs
After:  Zero authentication failures ✅
        Automatic token refresh on idle connections ✅
        Background thread working correctly ✅

Test Scenarios

  1. Completely idle connection (10+ minutes)

    • No messages produced
    • Token refreshed automatically every ~4 minutes
    • Ready when data arrives
  2. Low-traffic connection

    • Sporadic message production
    • Token refreshes independent of message timing
    • No authentication errors
  3. High-traffic connection

    • Continuous message production
    • Background refresh works alongside normal operation
    • No performance degradation
  4. Mixed environments

    • Multiple outputs (some idle, some active)
    • All maintain valid tokens
    • Consistent behavior

Benefits

Reliability ✅

  • 100% coverage for idle connections
  • Works regardless of traffic patterns
  • Automatic recovery mechanism
  • No dependency on application behavior

Correctness ✅

  • Uses official librdkafka API for this exact use case
  • Designed and documented by librdkafka maintainers
  • Production-tested across many applications
  • Follows recommended best practices

Simplicity ✅

  • One API call solves the problem completely
  • No complex logic or workarounds
  • Clear and maintainable code
  • Easy to understand and debug

Safety ✅

  • No cross-module pointer storage
  • No custom timers or threads
  • No race conditions
  • Proper error handling

Performance ✅

  • One background thread per producer (minimal overhead)
  • No unnecessary reconnections
  • Optimal token refresh timing (80% lifetime)
  • No impact on message throughput

TLS Support

This PR maintains proper TLS support for AWS credential fetching:

  • struct flb_tls *cred_tls in MSK IAM context
  • TLS configured for STS and credential provider operations
  • Ensures secure communication with AWS services

Why This Solution is Correct

1. Official librdkafka Solution

From the librdkafka API documentation:

rd_kafka_sasl_background_callbacks_enable():
"This serves as an alternative for applications that do NOT call rd_kafka_poll()
at regular intervals (or not at all)"

This function was specifically designed for applications like Fluent Bit that don't call rd_kafka_poll() regularly.

2. Background Thread Architecture

librdkafka's background thread:

  • Runs independently of application calls
  • Executes the OAuth token refresh timer
  • Triggers callbacks at 80% of token lifetime
  • Works perfectly for completely idle connections

3. librdkafka v1.9.0+ Requirement

This feature requires librdkafka v1.9.0 or later:

  • Fluent Bit uses librdkafka 2.10.1
  • Full API support available ✅

Backward Compatibility

  • 100% backward compatible
  • ✅ No breaking changes to APIs or configuration
  • ✅ No user configuration changes needed
  • ✅ Only affects MSK IAM authenticated outputs
  • ✅ Automatic and transparent to users

Token Refresh Timeline

OAuth token lifetime: 5 minutes (300 seconds)
Refresh trigger: 80% of lifetime = 4 minutes (240 seconds)

Timeline:
T=0:00  Token generated (expires T=5:00)
T=4:00  Background thread triggers refresh → New token (expires T=9:00)
T=8:00  Background thread triggers refresh → New token (expires T=13:00)
...

This works identically for:
- Active connections (producing messages)
- Idle connections (no messages for hours)
- Mixed traffic patterns

Checklist

  • Root cause identified (rd_kafka_poll not called on idle)
  • Official librdkafka API found (background callbacks)
  • Implementation completed
  • Error handling added
  • Logging added for troubleshooting
  • Production validated (12+ hours)
  • Zero authentication failures
  • Works on completely idle connections
  • Backward compatibility verified
  • TLS support maintained
  • Ready for production

I understand that Fluent Bit is licensed under Apache 2.0, and by submitting this pull request, I acknowledge that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features

    • Automatic background token refresh for MSK IAM authentication for idle connections; persistent credential handling improves stability and security.
  • Bug Fixes

    • MSK IAM OAuth token lifetime standardized to 300 seconds.
    • Missing credential file now logs at debug level (less noisy).
  • Chores

    • Minor formatting and whitespace cleanups.

✏️ Tip: You can customize this high-level summary in your review settings.

@kalavt kalavt requested a review from a team as a code owner November 26, 2025 09:12
@coderabbitai
Copy link

coderabbitai bot commented Nov 26, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Integrates a persistent AWS provider and TLS into MSK IAM, moves OAuth token generation to use persistent credentials with a 300s lifetime, updates payload/registration signatures, and enables SASL background token refresh in the Kafka plugin.

Changes

Cohort / File(s) Change Summary
MSK IAM Core Integration
src/aws/flb_aws_msk_iam.c
Added cred_tls and provider members to the MSK IAM context; build_msk_iam_payload() now accepts struct flb_aws_credentials *creds; registration creates a TLS instance and a persistent AWS provider (init sync → switch to async); token lifetime set to 300s; provider and TLS destroyed in context cleanup; registration signature changed to accept struct flb_kafka_opaque *opaque.
AWS Credentials Utilities (minor edits)
src/aws/flb_aws_credentials_ec2.c, src/aws/flb_aws_credentials_sts.c, src/aws/flb_aws_credentials_profile.c
Non-functional whitespace and logging tweaks: EC2/STS files received blank-line formatting changes; profile reader logs ENOENT as debug instead of error-or-debug. No API changes.
Kafka Plugin — MSK IAM Background Callbacks
plugins/out_kafka/kafka_config.c
Enables SASL background callbacks (rd_kafka_sasl_background_callbacks_enable()) when MSK IAM/OAUTHBEARER is enabled (calls before and after producer creation, guarded by FLB_HAVE_AWS_MSK_IAM); adds error handling, logging, and cleanup on failure.

Sequence Diagram(s)

sequenceDiagram
    participant App as Fluent Bit
    participant MSK as MSK IAM ctx
    participant KRD as librdkafka
    participant AWS as AWS Provider
    participant TLS as TLS Instance

    Note over App,MSK: OAuth registration (startup)
    App->>MSK: flb_aws_msk_iam_register_oauth_cb(...)
    MSK->>TLS: create TLS instance
    MSK->>AWS: create provider (sync init)
    AWS-->>MSK: provider initialized
    MSK->>AWS: switch provider to async mode
    MSK->>KRD: register oauth callback
    KRD->>KRD: enable background callbacks

    Note over KRD,MSK: Token refresh (runtime)
    KRD->>MSK: oauthbearer_token_refresh_cb()
    MSK->>AWS: request credentials (persistent provider)
    AWS-->>MSK: return creds
    MSK->>MSK: build_msk_iam_payload(creds)
    MSK-->>KRD: return OAuth token (lifetime=300s)
    MSK->>AWS: cleanup temporary creds
    KRD->>KRD: use token for SASL auth
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Pay extra attention to:
    • sync → async provider initialization and error handling in src/aws/flb_aws_msk_iam.c
    • token lifetime propagation and refresh logic in oauthbearer_token_refresh_cb
    • updated function signatures and all call sites for build_msk_iam_payload and registration
    • background callback enablement/cleanup in plugins/out_kafka/kafka_config.c

Suggested reviewers

  • edsiper
  • cosmo0920
  • koleini

Poem

🐇 I stitched TLS and creds with care,

Tokens now tick down to three-oh-oh,
Background refresh hums through the air,
Provider steady, credentials flow,
A rabbit hops — MSK's in tow 🥕

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 36.36% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main changes: fixing AWS MSK IAM authentication failures on idle connections and adding TLS support for credential operations.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b34bff6 and 3bbbde2.

📒 Files selected for processing (1)
  • plugins/out_kafka/kafka_config.c (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • plugins/out_kafka/kafka_config.c

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
src/aws/flb_aws_msk_iam.c (3)

219-220: Consider using flb_debug for per-refresh logging.

This log statement executes on every OAuth token refresh (~12 minutes). For production environments with multiple output instances, flb_info may be too verbose. The PR objective mentions reducing endpoint selection logs to debug level—this payload generation log could follow the same pattern.

Also, the log message references build_msk_iam_payload_with_creds but the actual function name is build_msk_iam_payload.

-    flb_info("[aws_msk_iam] build_msk_iam_payload_with_creds: generating payload for host: %s, region: %s",
+    flb_debug("[aws_msk_iam] build_msk_iam_payload: generating payload for host: %s, region: %s",
              host, config->region);

638-638: Consider aligning log level with other endpoint logs.

Lines 626, 630, and 635 use flb_debug for endpoint selection, but line 638 still uses flb_info. For consistency with the PR's goal of reducing verbosity, this could also be flb_debug.

-    flb_info("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s", config->region, host);
+    flb_debug("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s", config->region, host);

745-761: TLS debug level may be verbose in production.

The debug parameter is set to FLB_LOG_DEBUG, which could generate verbose TLS-level logging in production environments. Consider using a lower debug level (e.g., 0 or FLB_FALSE) to reduce log noise, or making this configurable.

     ctx->cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
                                     FLB_TRUE,
-                                    FLB_LOG_DEBUG,
+                                    0,  /* TLS debug level - 0 for production */
                                     NULL,  /* vhost */
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f4108db and 5864292.

📒 Files selected for processing (1)
  • src/aws/flb_aws_msk_iam.c (10 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (1)
src/aws/flb_aws_msk_iam.c (5)
src/aws/flb_aws_credentials.c (3)
  • flb_aws_credentials_destroy (752-767)
  • flb_standard_chain_provider_create (269-327)
  • flb_aws_provider_destroy (769-791)
src/tls/flb_tls.c (2)
  • flb_tls_create (183-232)
  • flb_tls_destroy (258-277)
src/flb_sds.c (1)
  • flb_sds_destroy (389-399)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
src/aws/flb_aws_util.c (1)
  • flb_aws_client_generator (280-283)
🔇 Additional comments (3)
src/aws/flb_aws_msk_iam.c (3)

41-48: LGTM - Struct changes align with the persistent provider design.

The addition of cred_tls and provider fields enables the credential lifecycle fix described in the PR objectives. The inline comments clearly document their purpose.


640-674: Credential lifecycle implementation looks correct.

The pattern of fetching credentials from the persistent provider, using them for payload generation, and immediately destroying them prevents credential leakage and ensures fresh credentials on each refresh. This addresses the core issue of stale/expired IRSA credentials.


765-791: I'll help you verify the potential double-free issue with TLS ownership. Let me search the codebase for the relevant implementation details.
<function_calls>


#!/bin/bash

Search for flb_standard_chain_provider_create function definition

rg -n "flb_standard_chain_provider_create" src/aws/ -A 20 -B 2

Also search for where this function is defined (likely in a provider file)

fd ".provider.\.c$" src/aws/ | head -20


</function_calls>

Comment on lines 814 to 822
/* Destroy AWS provider (provider manages its own credential caching) */
if (ctx->provider) {
flb_aws_provider_destroy(ctx->provider);
}

/* Clean up TLS instance */
if (ctx->cred_tls) {
flb_tls_destroy(ctx->cred_tls);
}
Copy link

@coderabbitai coderabbitai bot Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Double-free risk: verify TLS destruction order.

If flb_aws_provider_destroy also destroys the TLS instance (as shown in the relevant snippet where it calls flb_tls_destroy(provider->cred_tls)), then calling flb_tls_destroy(ctx->cred_tls) on line 821 after destroying the provider would cause a double-free.

If the provider does take ownership of the TLS, remove the explicit TLS destruction here:

     /* Destroy AWS provider (provider manages its own credential caching) */
     if (ctx->provider) {
         flb_aws_provider_destroy(ctx->provider);
     }

-    /* Clean up TLS instance */
-    if (ctx->cred_tls) {
-        flb_tls_destroy(ctx->cred_tls);
-    }
+    /* Note: TLS instance is owned and destroyed by the provider */

If the provider does not take ownership, the current code is correct and this comment can be dismissed.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/* Destroy AWS provider (provider manages its own credential caching) */
if (ctx->provider) {
flb_aws_provider_destroy(ctx->provider);
}
/* Clean up TLS instance */
if (ctx->cred_tls) {
flb_tls_destroy(ctx->cred_tls);
}
/* Destroy AWS provider (provider manages its own credential caching) */
if (ctx->provider) {
flb_aws_provider_destroy(ctx->provider);
}
/* Note: TLS instance is owned and destroyed by the provider */

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a double-free issue. The current code is correct.

After thorough analysis of the codebase, I can confirm that there is no double-free risk in the current implementation. Here's why:

TLS Ownership Model

There are two different credential provider creation patterns in the codebase with different ownership semantics:

  1. flb_standard_chain_provider_create (used by MSK IAM):

    • Does NOT take ownership of the TLS instance
    • The TLS is only passed as a parameter to sub-providers (e.g., EKS provider) for their use
    • The provider->cred_tls field is never set in this path
    • The caller remains responsible for managing the TLS lifecycle
  2. flb_managed_chain_provider_create (used elsewhere):

    • Takes ownership of the TLS instance
    • Stores it in provider->cred_tls
    • flb_aws_provider_destroy automatically destroys it

Code Evidence

In src/aws/flb_aws_credentials.c, the flb_aws_provider_destroy function only destroys the TLS if it's present:

void flb_aws_provider_destroy(struct flb_aws_provider *provider)
{
    if (provider) {
        // ...
        /* free managed dependencies */
        if (provider->cred_tls) {
            flb_tls_destroy(provider->cred_tls);
        }
        // ...
    }
}

Since flb_standard_chain_provider_create never assigns the TLS to provider->cred_tls, calling flb_aws_provider_destroy will not attempt to destroy the TLS instance.

Conclusion

The MSK IAM module correctly manages the TLS instance it creates. The explicit call to flb_tls_destroy(ctx->cred_tls) in flb_aws_msk_iam_destroy is necessary and correct because the provider does not take ownership of the TLS in this code path.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash

# Find and examine the implementation of flb_standard_chain_provider_create
echo "=== Searching for flb_standard_chain_provider_create implementation ==="
rg -n -A 30 "flb_standard_chain_provider_create\s*\(" --type c

echo ""
echo "=== Searching for flb_managed_chain_provider_create implementation ==="
rg -n -A 30 "flb_managed_chain_provider_create\s*\(" --type c

echo ""
echo "=== Examining flb_aws_provider_destroy for TLS cleanup ==="
rg -n -B 5 -A 10 "if \(provider->cred_tls\)" --type c

Length of output: 45861


🏁 Script executed:

#!/bin/bash

# Find the implementation of standard_chain_create to verify TLS ownership
echo "=== Searching for standard_chain_create implementation ==="
rg -n -B 5 -A 50 "^static.*standard_chain_create\s*\(" --type c src/aws/

echo ""
echo "=== Looking for assignments to provider->cred_tls in standard_chain_create context ==="
rg -n "provider->cred_tls\s*=" --type c src/aws/

Length of output: 8230


🏁 Script executed:

#!/bin/bash

# Get more context around the cred_tls assignments
echo "=== Context around provider->cred_tls assignments (lines 460-490) ==="
rg -n -C 15 "aws_provider->cred_tls\s*=" --type c src/aws/flb_aws_credentials.c

echo ""
echo "=== Verify standard_chain_create does not set cred_tls ==="
rg -n -A 100 "^static struct flb_aws_provider \*standard_chain_create" --type c src/aws/flb_aws_credentials.c | head -120

Length of output: 6596


@kalavt, thank you for the thorough analysis! After examining the codebase, I can confirm that you are absolutely correct—there is no double-free risk in this code.

Verification Results

The code evidence clearly shows two distinct ownership patterns:

1. flb_standard_chain_provider_create (used by MSK IAM):

  • Does NOT set provider->cred_tls
  • The TLS instance is only passed to sub-providers (like EKS provider) for their use
  • The caller retains ownership and is responsible for cleanup

2. flb_managed_chain_provider_create (used elsewhere):

  • DOES set aws_provider->cred_tls = cred_tls; (line 471)
  • Takes ownership of the TLS instance
  • flb_aws_provider_destroy will automatically clean it up

Why This Code is Safe

In flb_aws_provider_destroy (lines 782-784):

if (provider->cred_tls) {
    flb_tls_destroy(provider->cred_tls);
}

Since flb_standard_chain_provider_create never sets provider->cred_tls, the condition evaluates to false, and the TLS is not destroyed by the provider. The explicit call to flb_tls_destroy(ctx->cred_tls) in MSK IAM's destroy function is necessary and correct.

The error-handling paths in MSK IAM's initialization (lines 775, 786) also demonstrate the correct ownership model by explicitly destroying ctx->cred_tls after provider cleanup.

Your analysis is spot-on, and the current implementation properly follows the ownership semantics of the credential provider system. No changes are needed.


🧬 Code Graph Analysis Results
  • File: src/aws/flb_aws_credentials.c

    • Snippet: flb_aws_provider_destroy function (complete)
      • Summary/Context:

        • Description: Destroys an AWS provider and its managed TLS credential (cred_tls) if present.
        • Parameters: struct flb_aws_provider *provider
        • Return: void
        • Behavior:
          • If provider is non-NULL, destroys provider implementation, mutex, and any managed dependencies.
          • If provider->cred_tls is non-NULL, calls flb_tls_destroy(provider->cred_tls).
          • If provider->sts_tls is non-NULL, calls flb_tls_destroy(provider->sts_tls).
          • Frees the provider structure.
      • Code:
        void flb_aws_provider_destroy(struct flb_aws_provider *provider)
        {
        if (provider) {
        if (provider->implementation) {
        provider->provider_vtable->destroy(provider);
        }

            pthread_mutex_destroy(&provider->lock);
        
            /* free managed dependencies */
            if (provider->base_aws_provider) {
                flb_aws_provider_destroy(provider->base_aws_provider);
            }
            if (provider->cred_tls) {
                flb_tls_destroy(provider->cred_tls);
            }
            if (provider->sts_tls) {
                flb_tls_destroy(provider->sts_tls);
            }
        
            flb_free(provider);
        }
        

        }

  • File: src/aws/flb_aws_credentials.c

    • Snippet: flb_standard_chain_provider_create function (complete)
      • Summary/Context:

        • Description: Creates a standard AWS credentials provider chain. Accepts an optional TLS instance and region, and may wrap the base provider with additional layers (e.g., EKS Pod execution role for Fargate). The function may create a base provider via standard_chain_create and, in the EKS Pod Fargate path, wraps it with an STS provider.
        • Parameters:
          • config: struct flb_config*
          • tls: struct flb_tls* (TLS instance; ownership semantics depend on caller path)
          • region: char* (region string)
          • sts_endpoint: char* (STS endpoint)
          • proxy: char* (proxy)
          • generator: struct flb_aws_client_generator*
          • profile: char* (profile)
        • Return: struct flb_aws_provider*
        • Exceptions/Errors: Returns NULL on allocation/creation failure; ensures proper cleanup on error paths.
        • Important implementation details:
          • If EKS Pod Execution Role environment variable is set, it creates a temporary base provider with TLS but does not transfer TLS ownership to the base provider; instead, it wraps with an STS pod provider.
          • In non-EKS-Fargate path, it creates a standard chain provider using the provided TLS and region.
      • Code:
        struct flb_aws_provider *flb_standard_chain_provider_create(struct flb_config
        *config,
        struct flb_tls *tls,
        char *region,
        char *sts_endpoint,
        char *proxy,
        struct
        flb_aws_client_generator
        *generator,
        char *profile)
        {
        struct flb_aws_provider *provider;
        struct flb_aws_provider *tmp_provider;
        char *eks_pod_role = NULL;
        char *session_name;

        eks_pod_role = getenv(EKS_POD_EXECUTION_ROLE);
        if (eks_pod_role && strlen(eks_pod_role) > 0) {
            /*
             * eks fargate
             * standard chain will be base provider used to
             * assume the EKS_POD_EXECUTION_ROLE
             */
            flb_debug("[aws_credentials] Using EKS_POD_EXECUTION_ROLE=%s", eks_pod_role);
            tmp_provider = standard_chain_create(config, tls, region, sts_endpoint,
                                                 proxy, generator, FLB_FALSE, profile);
        
            if (!tmp_provider) {
                return NULL;
            }
        
            session_name = flb_sts_session_name();
            if (!session_name) {
                flb_error("Failed to generate random STS session name");
                flb_aws_provider_destroy(tmp_provider);
                return NULL;
            }
        
            provider = flb_sts_provider_create(config, tls, tmp_provider, NULL,
                                           eks_pod_role, session_name,
                                           region, sts_endpoint,
                                           NULL, generator);
            if (!provider) {
                flb_error("Failed to create EKS Fargate Credential Provider");
                flb_aws_provider_destroy(tmp_provider);
                return NULL;
            }
            /* session name can freed after provider is created */
            flb_free(session_name);
            session_name = NULL;
        
            return provider;
        }
        
        /* standard case- not in EKS Fargate */
        provider = standard_chain_create(config, tls, region, sts_endpoint,
                                         proxy, generator, FLB_TRUE, profile);
        return provider;
        

        }

  • File: src/aws/flb_aws_msk_iam.c

    • Snippet: MSK IAM destroy logic (summary)

      • Summary/Context:

        • Description: Cleans up MSK IAM context. Destroys the AWS provider (which manages credential caching) and then destroys the TLS instance used for AWS credentials if present.
        • Parameters: struct flb_aws_msk_iam *ctx
        • Return: void
        • Important implementation details:
          • Calls flb_aws_provider_destroy(ctx->provider) to clean up the provider (which may destroy cred_tls if owned by the provider).
          • Destroys ctx->cred_tls with flb_tls_destroy if it exists.
      • Code (excerpt from the file):
        void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx)
        {
        if (!ctx) {
        return;
        }

        flb_info("[aws_msk_iam] destroying MSK IAM config");
        
        /* Destroy AWS provider (provider manages its own credential caching) */
        if (ctx->provider) {
            flb_aws_provider_destroy(ctx->provider);
        }
        
        /* Clean up TLS instance */
        if (ctx->cred_tls) {
            flb_tls_destroy(ctx->cred_tls);
        }
        
        /* Clean up other resources */
        if (ctx->region) {
            flb_sds_destroy(ctx->region);
        }
        if (ctx->cluster_arn) {
            flb_sds_destroy(ctx->cluster_arn);
        }
        flb_free(ctx);
        
        flb_info("[aws_msk_iam] MSK IAM config destroyed");
        

        }

    • Snippet: MSK IAM provider/provider lifecycle and TLS ownership notes (from file)

      • Context:
        • The code creates a TLS instance for AWS credentials (STS) and passes that TLS handle to the standard credential provider creation path. In this path, the provider does not take ownership of the TLS; the TLS is owned by the MSK IAM module and is explicitly destroyed in its destroy function.
        • This aligns with the described ownership model where flb_standard_chain_provider_create does not assign the TLS to provider->cred_tls, whereas flb_managed_chain_provider_create would take ownership and be able to destroy it via flb_aws_provider_destroy.

@kalavt kalavt force-pushed the fix/aws-msk-iam-optimization branch from 5864292 to 1f9b74a Compare November 26, 2025 09:18
@chatgpt-codex-connector
Copy link

💡 Codex Review

/* Initialize provider */
if (ctx->provider->provider_vtable->init(ctx->provider) != 0) {
flb_error("[aws_msk_iam] failed to initialize AWS credentials provider");

P1 Badge Initialize MSK IAM provider without sync context

The provider is now created and initialized during Kafka configuration, but lines 782‑784 call provider_vtable->init directly without first switching the provider to sync mode or setting its upstream. AWS credential providers are created in async mode by default, and the EKS/STS sub-provider performs HTTP requests during init; running it here before the output event loop is available can cause credential initialization to hang or fail in IRSA/EKS deployments, making flb_aws_msk_iam_register_oauth_cb return NULL. Other AWS outputs call provider_vtable->sync() before init in this phase to avoid this.

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/aws/flb_aws_msk_iam.c (1)

805-822: Confirm cred_tls ownership to avoid a potential TLS double-free

The destroy path now calls:

if (ctx->provider) {
    flb_aws_provider_destroy(ctx->provider);
}

if (ctx->cred_tls) {
    flb_tls_destroy(ctx->cred_tls);
}

A previous review noted that flb_aws_provider_destroy might already destroy its internal cred_tls. If that’s still true, the explicit flb_tls_destroy(ctx->cred_tls) here would double-free the TLS object. If the provider does not own/destroy the TLS, this code is correct as-is.

Please double-check flb_aws_provider_destroy and any provider implementations used by flb_standard_chain_provider_create to confirm ownership, and then either keep or remove the explicit TLS destroy accordingly.

You can run this from the repo root to inspect the destroy behavior:

#!/bin/bash
# Inspect provider destroy and TLS ownership
rg -n "flb_aws_provider_destroy" -C5 src

# Also look for any flb_tls_destroy calls tied to provider fields
rg -n "cred_tls" -C3 src
🧹 Nitpick comments (4)
src/aws/flb_aws_msk_iam.c (4)

41-48: MSK IAM context extension with TLS/provider looks good; unused flb_config is a minor nit

The new cred_tls and provider fields make sense for a persistent provider+TLS design, and the struct remains straightforward. ctx->flb_config is now only stored and never read, and the “lightweight config” description is a bit outdated given the embedded provider/TLS; consider either using flb_config or dropping it and updating the comment to avoid confusion.


168-231: Creds-based payload builder is correct; align log naming / verbosity

The refactor to pass struct flb_aws_credentials *creds explicitly and the added NULL / completeness checks are solid. Two small nits:

  • The log prefix build_msk_iam_payload_with_creds doesn’t match the actual function name, which can make grepping harder.
  • This flb_info fires on every token refresh; consider downgrading to flb_debug if logs get noisy in large deployments.

640-687: Provider-backed credential retrieval & lifetime management are sound

The new flow—get_credentials from the persistent provider, building the payload with those creds, passing creds->access_key_id as the principal, and then destroying the credentials exactly once in both success and failure paths—looks correct and avoids the previous “temporary provider per refresh” pattern. This assumes config->provider and its vtable are always valid once registration succeeds; if you expect any teardown concurrency between Kafka and the MSK IAM ctx, it may be worth adding a defensive NULL check or assertion around config->provider before dereferencing.


706-794: TLS + standard-chain provider init and error paths look good; stale comments can be updated

Creating a dedicated cred_tls with FLB_TLS_CLIENT_MODE and wiring it into flb_standard_chain_provider_create, followed by an explicit provider_vtable->init, gives a clear, single provider lifecycle, and the error paths correctly unwind TLS, region, ARN, and ctx on failures.

Minor maintainability points:

  • The comments at Lines 706–707 and 713 (“NO AWS provider!” / “on-demand provider creation”) no longer reflect the actual behavior now that the provider is created eagerly here.
  • With the provider created only once, ctx->flb_config appears unused beyond assignment; consider either using it (if you anticipate future re-init logic) or removing it to keep the context minimal.
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5864292 and 1f9b74a.

📒 Files selected for processing (1)
  • src/aws/flb_aws_msk_iam.c (10 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (1)
src/aws/flb_aws_msk_iam.c (3)
src/aws/flb_aws_credentials.c (2)
  • flb_aws_credentials_destroy (752-767)
  • flb_standard_chain_provider_create (269-327)
src/tls/flb_tls.c (2)
  • flb_tls_create (183-232)
  • flb_tls_destroy (258-277)
src/aws/flb_aws_util.c (1)
  • flb_aws_client_generator (280-283)
🔇 Additional comments (1)
src/aws/flb_aws_msk_iam.c (1)

616-636: MSK generic endpoint selection and downgraded logging look correct

Using kafka-serverless.<region>.amazonaws.com when the ARN ends with -s3 and kafka.<region>.amazonaws.com otherwise matches the expected MSK/Serverless split, and switching the endpoint logs to flb_debug is appropriate to avoid log spam during frequent refreshes.

- Switch provider to sync mode before initialization to prevent hanging
- Initialize provider with sync mode (required before event loop is available)
- Switch back to async mode after successful initialization
- Follows pattern used by other AWS credential providers

This fixes potential credential initialization failures in IRSA/EKS deployments
where HTTP requests during init would hang without the event loop.

Signed-off-by: Arbin <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
src/aws/flb_aws_msk_iam.c (3)

219-220: Minor: Log message references non-existent function name.

The log message says build_msk_iam_payload_with_creds but the actual function is named build_msk_iam_payload. Consider updating for consistency.

-    flb_info("[aws_msk_iam] build_msk_iam_payload_with_creds: generating payload for host: %s, region: %s",
+    flb_info("[aws_msk_iam] build_msk_iam_payload: generating payload for host: %s, region: %s",
             host, config->region);

412-458: Redundant code: First base64 encoding is discarded.

This entire block encodes presigned_url into payload, but the result is immediately discarded when payload is destroyed at line 483 and re-encoded with the User-Agent appended. The meaningful encoding happens at lines 479-517.

Remove lines 412-458 and proceed directly to building the URL with User-Agent:

     query = tmp;

-    /* Build the complete presigned URL */
-    presigned_url = flb_sds_create_size(16384);
-    if (!presigned_url) {
-        goto error;
-    }
-
-    presigned_url = flb_sds_printf(&presigned_url, "https://%s/?%s", host, query);
-    if (!presigned_url) {
-        goto error;
-    }
-
-    /* Base64 URL encode the presigned URL */
-    url_len = flb_sds_len(presigned_url);
-    encoded_len = ((url_len + 2) / 3) * 4 + 1; /* Base64 encoding size + null terminator */
-
-    payload = flb_sds_create_size(encoded_len);
-    if (!payload) {
-        goto error;
-    }
-
-    encode_result = flb_base64_encode((unsigned char*) payload, encoded_len, &actual_encoded_len,
-                                     (const unsigned char*) presigned_url, url_len);
-    if (encode_result == -1) {
-        flb_error("[aws_msk_iam] build_msk_iam_payload: failed to base64 encode URL");
-        goto error;
-    }
-    flb_sds_len_set(payload, actual_encoded_len);
-
-    /* Convert to Base64 URL encoding (replace + with -, / with _, remove padding =) */
-    p = payload;
-    while (*p) {
-        if (*p == '+') {
-            *p = '-';
-        }
-        else if (*p == '/') {
-            *p = '_';
-        }
-        p++;
-    }
-
-    /* Remove padding */
-    len = flb_sds_len(payload);
-    while (len > 0 && payload[len-1] == '=') {
-        len--;
-    }
-    flb_sds_len_set(payload, len);
-    payload[len] = '\0';
-
     /* Build the complete presigned URL */
-    flb_sds_destroy(presigned_url);
     presigned_url = flb_sds_create_size(16384);

746-754: Consider TLS debug level for production.

The TLS instance is created with FLB_LOG_DEBUG as the debug parameter (line 748). This may produce verbose TLS debugging output in production. If this was intentional for diagnosing the credential expiration issue, consider reducing it to a lower verbosity level before release.

     ctx->cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
                                    FLB_TRUE,
-                                   FLB_LOG_DEBUG,
+                                   0,  /* disable TLS debug output */
                                    NULL,  /* vhost */
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1f9b74a and ca35c22.

📒 Files selected for processing (1)
  • src/aws/flb_aws_msk_iam.c (10 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (1)
src/aws/flb_aws_msk_iam.c (3)
src/aws/flb_aws_credentials.c (3)
  • flb_aws_credentials_destroy (752-767)
  • flb_standard_chain_provider_create (269-327)
  • flb_aws_provider_destroy (769-791)
src/tls/flb_tls.c (2)
  • flb_tls_create (183-232)
  • flb_tls_destroy (258-277)
src/aws/flb_aws_util.c (1)
  • flb_aws_client_generator (280-283)
🔇 Additional comments (4)
src/aws/flb_aws_msk_iam.c (4)

41-48: LGTM! Struct now properly encapsulates persistent provider and TLS state.

The addition of cred_tls and provider fields enables the core fix: maintaining a long-lived credential provider that handles automatic refresh, rather than creating ephemeral providers on each token refresh.


640-674: Well-structured credential lifecycle.

The credential handling follows best practices:

  1. Credentials are obtained from the persistent provider (which handles caching/refresh internally)
  2. Used immediately for payload generation and token setting
  3. Destroyed immediately after use (line 673) to minimize exposure window

This pattern properly addresses the original bug where ephemeral providers could return stale credentials.


782-794: Provider initialization pattern looks correct.

The sync → init → async sequence ensures the provider is properly initialized before the event loop takes over. This is the correct pattern for credential providers that need synchronous initialization during startup.


817-825: Cleanup logic is correct.

As previously confirmed in the review discussion, flb_standard_chain_provider_create does not take ownership of the TLS instance (unlike flb_managed_chain_provider_create). The explicit cleanup of both the provider and the TLS instance in this order is correct and necessary.

@kalavt kalavt marked this pull request as draft November 26, 2025 10:54
- Add force refresh logic to EC2, STS, and EKS credential providers

- Set next_refresh to 0 in refresh functions to ensure immediate credential update

- Fixes MSK IAM authentication failures after ~1 hour due to stale credentials

- Aligns with AWS SDK behavior where refresh() means force refresh

This resolves the issue where OAuth token refresh (every ~15 minutes) would

not actually refresh AWS credentials until next_refresh time was reached

(typically 1 hour later), causing MSK connection failures with 'Access denied'

errors.

The fix ensures that every OAuth callback will fetch fresh credentials from

AWS, matching the behavior of official AWS SDKs (Python, Java).

Signed-off-by: Arbin <[email protected]>
@kalavt kalavt marked this pull request as ready for review November 27, 2025 04:22
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/aws/flb_aws_msk_iam.c (1)

578-695: Potential use-after-free risk: librdkafka does not explicitly guarantee that token and principal strings are copied

The review comment's assumption that librdkafka copies token and principal strings is not confirmed by official documentation. Web search of librdkafka C API reference shows:

  • The extensions array is explicitly noted as copied by librdkafka
  • The token_value and md_principal_name parameters have no explicit copy guarantee in the C API reference
  • Passing creds->access_key_id (from creds) as md_principal_name, then immediately destroying creds, could introduce a use-after-free if the strings are not copied

Required actions:

  1. Verify string lifetime guarantee against the exact librdkafka version in use, or consult the maintainers directly. If strings are NOT copied, the code must either:

    • Copy the principal string to a persistent buffer before destroy, or
    • Use a static/cached principal name if acceptable for your use case
  2. Add refresh() error logging (original suggestion remains valid):

    int rc = config->provider->provider_vtable->refresh(config->provider);
    if (rc < 0) {
        flb_warn("[aws_msk_iam] AWS provider refresh() failed (rc=%d), continuing to get_credentials()", rc);
    }

    This surfaces credential refresh failures earlier without changing behavior.

🧹 Nitpick comments (3)
src/aws/flb_aws_credentials_ec2.c (1)

128-136: Comment wording vs next_refresh semantics

implementation->next_refresh = 0; doesn't actually "mark as expired" (the auto-refresh path only triggers when next_refresh > 0 && time() > next_refresh), it effectively disables auto-refresh until get_creds_ec2 sets a new positive deadline. Behavior is fine here since refresh_fn_ec2 unconditionally calls get_creds_ec2 on lock success, but consider rephrasing the comment to avoid confusion about how expiry is signaled.

src/aws/flb_aws_credentials_sts.c (1)

173-189: Clarify next_refresh = 0 intent in STS/EKS refresh paths

In both STS and EKS providers, implementation->next_refresh = 0; does not really "mark as expired"; it disables the auto-refresh check (next_refresh > 0 && time() > next_refresh) until the explicit refresh call finishes and writes a new positive deadline. Functionally this is fine (explicit refresh still forces a new STS/EKS call), but the comment is a bit misleading—consider rephrasing to something like “reset auto-refresh deadline; will be updated by the refresh call”.

Also applies to: 481-495

src/aws/flb_aws_msk_iam.c (1)

697-813: TLS and provider lifecycle look correct; update stale comment

The new registration path (create TLS → create standard-chain provider → init in sync mode → switch to async → register callback) and the corresponding destroy logic (flb_aws_provider_destroy then flb_tls_destroy then SDS/free) are consistent with the ownership model of flb_standard_chain_provider_create and avoid the double-free concern discussed previously.

One nit: the comment at Line 713 still says “NO AWS provider!”, but this function now does create and own an AWS provider. Updating that comment will prevent confusion for future readers.

Also applies to: 815-844

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ca35c22 and 4c19da4.

📒 Files selected for processing (4)
  • src/aws/flb_aws_credentials_ec2.c (1 hunks)
  • src/aws/flb_aws_credentials_profile.c (1 hunks)
  • src/aws/flb_aws_credentials_sts.c (2 hunks)
  • src/aws/flb_aws_msk_iam.c (10 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (1)
src/aws/flb_aws_msk_iam.c (3)
src/aws/flb_aws_credentials.c (3)
  • flb_aws_credentials_destroy (752-767)
  • flb_standard_chain_provider_create (269-327)
  • flb_aws_provider_destroy (769-791)
src/tls/flb_tls.c (2)
  • flb_tls_create (183-232)
  • flb_tls_destroy (258-277)
src/aws/flb_aws_util.c (1)
  • flb_aws_client_generator (280-283)
🔇 Additional comments (1)
src/aws/flb_aws_credentials_profile.c (1)

664-672: LGTM: less noisy logging on missing shared credentials file

Downgrading the ENOENT case to AWS_CREDS_DEBUG keeps the control flow and error signaling intact while avoiding spurious error logs when the shared credentials file is legitimately absent and other providers are expected to handle credentials.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
src/aws/flb_aws_msk_iam.c (3)

168-578: Refactored payload builder correctly uses explicit creds and tightens cleanup

The new build_msk_iam_payload(config, host, creds) flow looks sound: it validates inputs, uses the provided credentials consistently in the credential string and signing key, and the updated cleanup logic (including empty_payload_hex and early key destruction) avoids the earlier leak/double-free edge cases.

If you want to polish further, consider extracting the repeated “Base64‑encode + URL‑safe conversion + padding removal” sequence (lines ~432–459 and ~479–517) into a small helper to reduce duplication and keep the signing logic easier to audit.


581-698: OAuth refresh now correctly pulls fresh creds from the shared provider

The callback’s switch to using the persistent provider (refresh() + get_credentials()) and destroying credentials immediately after building the token is correct and should eliminate the earlier “stale IAM creds after ~1h” failure mode. Endpoint selection and logging changes (debug vs info) are also appropriate.

If the refresh vtable entry returns a status code in your implementation, you might optionally log failures there before attempting get_credentials, but the current behavior is acceptable since get_credentials is still the authoritative source.


700-816: TLS/provider initialization at registration is robust with clean error paths

Creating cred_tls once, wiring it into flb_standard_chain_provider_create, and synchronously initializing the provider (syncinitasync) during registration gives a clear, predictable lifecycle. All intermediate failure paths correctly release ctx, region, cluster_arn, cred_tls, and provider, so there are no obvious leaks here.

If ctx->flb_config isn’t needed later, you could drop it to keep the context minimal, but it’s fine to leave as-is if you plan to reuse it.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4c19da4 and 35bcf13.

📒 Files selected for processing (1)
  • src/aws/flb_aws_msk_iam.c (10 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (1)
src/aws/flb_aws_msk_iam.c (3)
src/aws/flb_aws_credentials.c (3)
  • flb_aws_credentials_destroy (752-767)
  • flb_standard_chain_provider_create (269-327)
  • flb_aws_provider_destroy (769-791)
src/tls/flb_tls.c (2)
  • flb_tls_create (183-232)
  • flb_tls_destroy (258-277)
src/aws/flb_aws_util.c (1)
  • flb_aws_client_generator (280-283)
🔇 Additional comments (2)
src/aws/flb_aws_msk_iam.c (2)

31-48: MSK IAM context extension for TLS/provider looks consistent

Including flb_tls.h and adding cred_tls and provider to struct flb_aws_msk_iam aligns with the new design of a persistent AWS provider plus dedicated TLS for credential calls. Ownership is clearly confined to this context and is cleaned up in the destroy path below.


818-847: Destroy path correctly mirrors provider/TLS ownership semantics

The teardown sequence—flb_aws_provider_destroy(ctx->provider) followed by flb_tls_destroy(ctx->cred_tls) and SDS/ctx cleanup—matches the standard-chain provider’s ownership model (provider does not manage the caller’s TLS). This avoids leaks without introducing a double free and keeps the lifecycle symmetric with the registration logic above.

@kalavt
Copy link
Author

kalavt commented Nov 27, 2025

Concern: Potential use-after-free when passing creds->access_key_id to rd_kafka_oauthbearer_set_token() and immediately destroying creds.

Analysis Result: No use-after-free risk exists.

I've verified this by examining the librdkafka 2.10.1 source code at lib/librdkafka-2.10.1/src/rdkafka_sasl_oauthbearer.c. The rd_kafka_oauthbearer_set_token0() implementation (lines 536-548) explicitly copies all string parameters:

rwlock_wrlock(&handle->lock);

RD_IF_FREE(handle->md_principal_name, rd_free);
handle->md_principal_name = rd_strdup(md_principal_name);  // ✅ Copied

RD_IF_FREE(handle->token_value, rd_free);
handle->token_value = rd_strdup(token_value);              // ✅ Copied

// Extensions are also copied via rd_strtup_new()
for (i = 0; i + 1 < extension_size; i += 2)
    rd_list_add(&handle->extensions,
                rd_strtup_new(extensions[i], extensions[i + 1]));  // ✅ Copied

Conclusion: librdkafka makes deep copies of all strings, so destroying credentials immediately after the call is safe. The current code is correct.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/aws/flb_aws_msk_iam.c (2)

31-48: Struct extensions and TLS include look correct; minor cleanup opportunity

Including flb_tls.h and extending struct flb_aws_msk_iam with cred_tls and provider is consistent with how they're allocated in flb_aws_msk_iam_register_oauth_cb and destroyed in flb_aws_msk_iam_destroy. The ownership story is coherent within this file.

The only nit is that ctx->flb_config is now written once and never read, and comments still talk about “lightweight config / on‑demand provider creation”, which no longer matches the implementation. Consider either removing flb_config (since the struct is private to this TU) or updating its usage/comment so future readers don’t assume a lazy/provider‑per‑refresh model still exists.


719-810: TLS/provider creation and initialization sequence is robust; refresh comments

The new initialization path in flb_aws_msk_iam_register_oauth_cb looks solid:

  • ctx->cred_tls is created once with FLB_TLS_CLIENT_MODE and verify=FLB_TRUE, then reused by flb_standard_chain_provider_create, addressing the missing‑TLS issue for STS/IMDS.
  • ctx->provider is created exactly once and initialized via syncinitasync, with comprehensive cleanup on each failure path (destroy provider, TLS, SDS fields, then ctx).
  • On success, the lifetime is clearly paired with flb_aws_msk_iam_destroy.

Two minor cleanups to consider:

  • The comments /* Allocate lightweight config - NO AWS provider! */ and /* Store the flb_config for on-demand provider creation */ are now misleading, since this function eagerly creates the provider instead of deferring it. Updating these comments to describe the current model (persistent provider with internal caching) would reduce confusion.
  • If ctx->flb_config is no longer needed under the new design, you could drop it from the struct and the assignment at Line 727.

These are documentation/clarity nits; behavior looks correct.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 35bcf13 and d45dab6.

📒 Files selected for processing (1)
  • src/aws/flb_aws_msk_iam.c (10 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (1)
src/aws/flb_aws_msk_iam.c (4)
src/aws/flb_aws_credentials.c (3)
  • flb_aws_credentials_destroy (752-767)
  • flb_standard_chain_provider_create (269-327)
  • flb_aws_provider_destroy (769-791)
src/tls/flb_tls.c (2)
  • flb_tls_create (183-232)
  • flb_tls_destroy (258-277)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
src/aws/flb_aws_util.c (1)
  • flb_aws_client_generator (280-283)
🔇 Additional comments (3)
src/aws/flb_aws_msk_iam.c (3)

168-225: Payload builder refactor and leak fix look sound

Changing build_msk_iam_payload to take struct flb_aws_credentials *creds explicitly, plus the added validation/logging for config, host, and creds, gives a clearer contract and fits the new “persistent provider + explicit creds” model.

The memory management also looks correct now:

  • empty_payload_hex is freed on the success path (Lines 322–323) and again guarded in the error cleanup (Lines 573–575), so there’s no leak or double‑free.
  • key is destroyed immediately after use and nulled (Lines 373–375), with the error path only destroying it when non‑NULL.
  • All allocated SDS values (credential, credential_enc, query, canonical, hexhash, string_to_sign, hexsig, action_enc, session_token_enc, presigned_url, payload) are either returned or freed on every exit path.

No changes needed here.

Also applies to: 300-378, 519-575


581-701: OAuth refresh now correctly uses persistent provider and cleans up creds

The updated oauthbearer_token_refresh_cb looks well‑structured:

  • Validates the opaque/context and region before doing any work.
  • Picks the correct generic MSK or serverless endpoint based on the ARN suffix and downgrades endpoint logging to debug, reducing noise.
  • Proactively calls provider_vtable->refresh() and then get_credentials(), logging but not hard‑failing on refresh errors while still guarding against creds == NULL.
  • Builds the payload from those credentials, passes creds->access_key_id into rd_kafka_oauthbearer_set_token, and uses a fixed 900‑second lifetime.
  • Ensures flb_aws_credentials_destroy(creds) is called on both success and payload‑generation failure paths, and payload is always freed before return.

This matches the PR’s goal of avoiding expired IAM credentials without introducing leaks or double‑frees.


821-850: Destroy routine correctly mirrors allocation and avoids leaks

flb_aws_msk_iam_destroy mirrors the successful registration path:

  • Checks ctx for NULL.
  • Destroys the AWS provider (which handles its own internal resources) and then destroys ctx->cred_tls, matching how TLS is created and owned in this module.
  • Frees ctx->region and ctx->cluster_arn SDS values and finally ctx itself, with logging before and after.

Given the ownership model of flb_standard_chain_provider_create for TLS in this path (provider does not own cred_tls), this cleanup order is appropriate and does not introduce double‑free risks.

@kalavt kalavt marked this pull request as draft November 27, 2025 04:56
@kalavt kalavt changed the title fix: AWS MSK IAM credential expiration and add TLS support fix: AWS Credential Refresh in Provider Chain for MSK IAM and add TLS support Nov 27, 2025
@kalavt kalavt marked this pull request as ready for review November 27, 2025 05:55
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines 182 to 186
flb_aws_credentials_destroy(implementation->creds);
implementation->creds = NULL;
}
/* Set to 1 (epoch start) to trigger immediate refresh via time check */
implementation->next_refresh = 1;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Refresh clears cached STS creds before new ones succeed

In refresh_fn_sts, the refresh path destroys implementation->creds and marks the cache expired before calling sts_assume_role_request(). With the MSK OAuth callback now invoking provider->refresh() on every token refresh, any transient STS failure leaves the provider with no credentials at all even if the previous ones were still valid, causing OAuth token generation to fail unnecessarily. Cached credentials should remain usable until a new set is successfully obtained.

Useful? React with 👍 / 👎.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed by 05ecb6d

@kalavt
Copy link
Author

kalavt commented Nov 27, 2025

@coderabbitai generate docstrings

@coderabbitai
Copy link

coderabbitai bot commented Nov 27, 2025

✅ Actions performed

Initiated docstring generation; will generate only if new commits exist.

@coderabbitai
Copy link

coderabbitai bot commented Nov 27, 2025

Note

Docstrings generation - SUCCESS
Generated docstrings for this pull request at #11217

coderabbitai bot added a commit that referenced this pull request Nov 27, 2025
Docstrings generation was requested by @kalavt.

* #11211 (comment)

The following files were modified:

* `src/aws/flb_aws_credentials_ec2.c`
* `src/aws/flb_aws_credentials_http.c`
* `src/aws/flb_aws_credentials_profile.c`
* `src/aws/flb_aws_credentials_sts.c`
* `src/aws/flb_aws_msk_iam.c`
@kalavt kalavt marked this pull request as draft November 27, 2025 09:09
@kalavt kalavt marked this pull request as ready for review November 28, 2025 00:45
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines 557 to 558
now = time(NULL);
md_lifetime_ms = (now + 900) * 1000;
md_lifetime_ms = (now + MSK_IAM_TOKEN_LIFETIME_SECONDS) * 1000;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid overflow computing Kafka token expiry

In oauthbearer_token_refresh_cb the token expiry is now computed with md_lifetime_ms = (now + MSK_IAM_TOKEN_LIFETIME_SECONDS) * 1000; while now is a time_t. On 32‑bit builds time_t is 32 bits, so the multiplication overflows before it is assigned to the 64‑bit md_lifetime_ms, producing a negative/incorrect expiry timestamp. librdkafka will treat that as an already expired token, causing MSK IAM authentication to fail immediately even though the credentials are still valid. Using a 64‑bit value for the arithmetic (as before) avoids the overflow.

Useful? React with 👍 / 👎.

Copy link
Author

@kalavt kalavt Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed by 6dde002

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 05ecb6d and 862a4ec.

📒 Files selected for processing (3)
  • src/aws/flb_aws_credentials_ec2.c (1 hunks)
  • src/aws/flb_aws_credentials_sts.c (2 hunks)
  • src/aws/flb_aws_msk_iam.c (13 hunks)
✅ Files skipped from review due to trivial changes (1)
  • src/aws/flb_aws_credentials_sts.c
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/aws/flb_aws_credentials_ec2.c
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (1)
src/aws/flb_aws_msk_iam.c (5)
src/flb_sds.c (3)
  • flb_sds_create_size (92-95)
  • flb_sds_printf (336-387)
  • flb_sds_destroy (389-399)
src/aws/flb_aws_credentials.c (2)
  • flb_aws_credentials_destroy (752-767)
  • flb_aws_provider_destroy (769-791)
src/tls/flb_tls.c (2)
  • flb_tls_create (183-232)
  • flb_tls_destroy (258-277)
src/aws/flb_aws_util.c (1)
  • flb_aws_client_generator (280-283)
src/flb_kafka.c (1)
  • flb_kafka_opaque_set (242-256)
🔇 Additional comments (3)
src/aws/flb_aws_msk_iam.c (3)

174-472: MSK IAM payload builder refactor and cleanup look solid

The refactor to accept struct flb_aws_credentials *creds and the added validations (config, host, credentials) are straightforward and correct. The canonical request and presigned URL generation are unchanged in spirit, and the memory/error handling looks tight now:

  • empty_payload_hex is freed on the success path and also in the error: block, avoiding the earlier leak without risking double‑free.
  • All allocated flb_sds_t buffers are destroyed on both success and error paths.
  • Session token handling is optional and fully covered in cleanup.

No functional or lifecycle issues spotted here.


585-680: TLS/provider lifecycle in registration and destroy is consistent

The registration path and destroy function line up well:

  • flb_aws_msk_iam_register_oauth_cb:

    • Allocates ctx, cluster_arn, region.
    • Creates ctx->cred_tls via flb_tls_create(...) and passes it into flb_standard_chain_provider_create(...).
    • Initializes the provider (sync → init() → async) and wires the callback+opaque.
    • Cleans up all intermediates on each failure path, with no leaks or double‑frees.
  • flb_aws_msk_iam_destroy:

    • Destroys ctx->provider, then ctx->cred_tls, then SDS fields and ctx itself.
    • Matches the allocation order and the ownership model where the standard chain provider does not own cred_tls.

Overall the lifecycle management here looks correct and robust.

Also applies to: 682-704


474-583: I'm encountering persistent repository access issues that are preventing me from verifying the credential ownership semantics as originally suggested. Since I cannot access the fluent-bit codebase to:

  1. Search for all get_credentials() call sites and implementations
  2. Verify how credentials are handled across different provider implementations
  3. Confirm whether any providers return cached/internal credentials instead of heap-owned ones
  4. Audit the consistency of flb_aws_credentials_destroy() usage patterns

Here is the rewritten review comment based on the original analysis, noting that the verification cannot be fully completed:


Verify credentials ownership contract across provider implementations

The new callback flow is clear and matches the PR intent:

  • Validates opaque/msk_iam_ctx and region.
  • Derives the correct MSK endpoint (including serverless -s3 suffix handling).
  • Uses the persistent provider to refresh() then get_credentials(), builds the payload, and calls rd_kafka_oauthbearer_set_token() with a short 3‑minute lifetime.
  • Destroys creds immediately after rd_kafka_oauthbearer_set_token() and cleans up payload at the end.
  • Error paths correctly call rd_kafka_oauthbearer_set_token_failure() and avoid leaks.

The ownership contract of provider_vtable->get_credentials() needs verification:

  • The code assumes the returned struct flb_aws_credentials * must be freed by the caller via flb_aws_credentials_destroy(creds).
  • Risk: If any provider implementation returns a cached/internal credentials object that it frees on provider destroy/refresh, calling flb_aws_credentials_destroy() would cause a double‑free.

Audit call sites of get_credentials() across all provider implementations to ensure a consistent ownership contract—that all implementations return heap-allocated credentials that callers own and must destroy.

@kalavt kalavt changed the title fix: AWS Credential Refresh in Provider Chain for MSK IAM and add TLS support fix: AWS MSK IAM Authentication Failures on Idle Connections and add TLS support Nov 28, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/aws/flb_aws_msk_iam.c (2)

174-471: Payload builder refactor (inputs/cleanup) looks solid; consider clarifying/tying X-Amz-Expires

The refactored build_msk_iam_payload:

  • Validates config, host, and creds up front (including access/secret key presence).
  • Correctly uses the passed-in credentials instead of constructing a new provider.
  • Has balanced cleanup on both success and failure paths, including empty_payload_hex and key, avoiding leaks on goto error.
  • Leaves X-Amz-Expires=900 unchanged while the Kafka token lifetime is now 180s.

Functionally this is fine (shorter Kafka token lifetime than URL validity), but if the intention is to also shorten the SigV4 presigned URL validity window, you might want to parameterize 900 with either MSK_IAM_TOKEN_LIFETIME_SECONDS or a separate named constant, and document the choice explicitly.


585-680: TLS/provider creation and initialization paths are well-guarded

The registration helper:

  • Validates cluster_arn, allocates ctx, and stores flb_config.
  • Extracts the region from the ARN with clear error handling and cleanup on failure.
  • Creates a dedicated TLS instance (ctx->cred_tls) and passes it into flb_standard_chain_provider_create, then initializes the provider via syncinitasync, with full teardown of provider, TLS, and SDS fields on any error path.
  • Wires the context into flb_kafka_opaque and librdkafka’s OAuth refresh callback as expected.

All resource lifetimes look balanced here. The ctx->flb_config field is currently only stored, not used in this file; if it’s not consumed elsewhere, it’s harmless but could be trimmed in a future cleanup.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 862a4ec and 6dde002.

📒 Files selected for processing (1)
  • src/aws/flb_aws_msk_iam.c (13 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
🔇 Additional comments (3)
src/aws/flb_aws_msk_iam.c (3)

31-55: Token lifetime constant and context struct extensions look consistent

Including flb_tls.h, introducing MSK_IAM_TOKEN_LIFETIME_SECONDS = 180, and extending struct flb_aws_msk_iam with flb_config, cred_tls, and provider are all coherent with the rest of the file and the new credential-flow design. The struct changes correctly support having a persistent TLS+provider bound to the MSK IAM context.


474-583: OAuth refresh callback flow and lifetime arithmetic look correct; ensure provider lifetime contract

The callback now:

  • Retrieves config via the opaque, validates region, and derives the proper MSK / serverless host.
  • Actively refreshes the persistent provider, then calls get_credentials, with clear error reporting to librdkafka on both refresh and retrieval failures.
  • Builds the payload with the fetched creds and destroys the credentials immediately after calling rd_kafka_oauthbearer_set_token, which is safe given librdkafka’s deep-copy semantics.
  • Computes md_lifetime_ms in 64‑bit (((int64_t)now + MSK_IAM_TOKEN_LIFETIME_SECONDS) * 1000), avoiding 32‑bit time_t overflow and setting a 3‑minute absolute expiry as intended.

One thing to keep in mind operationally: this callback assumes config->provider and config->cred_tls remain valid for the full lifetime of the Kafka handle. Please double-check that flb_aws_msk_iam_destroy is only invoked after librdkafka is fully torn down (and no further OAuth callbacks can fire), or that the plugin layer otherwise guarantees this ordering.


682-704: MSK IAM destroy correctly mirrors creation/ownership semantics

flb_aws_msk_iam_destroy:

  • Guards against NULL ctx.
  • Destroys the AWS provider (which owns its internal state but not this TLS handle in the flb_standard_chain_provider_create path).
  • Separately destroys ctx->cred_tls, then frees region, cluster_arn, and finally ctx.

This matches the documented ownership model for the standard chain provider, so there’s no double-free risk and no leaks in the normal or error teardown paths.

@kalavt
Copy link
Author

kalavt commented Nov 28, 2025

Hi @edsiper I saw your work and contribute to add MSK IAM auth, but it's missing TLS support and might cause authentication failures on low volume topic,

here's the PR fixes MSK IAM authentication failures on idle connections by using a persistent provider with TLS and reducing token lifetime to 3 minutes, validated with 10+ hours of production testing showing zero auth errors.

pls review the code and let me know if it can merge.

appreciate your work

@PettitWesley @koleini

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/aws/flb_aws_msk_iam.c (1)

472-579: OAuth refresh callback flow with persistent provider looks sound

Using the opaque flb_kafka_opaque to recover struct flb_aws_msk_iam, validating region, deriving the correct MSK endpoint (standard vs serverless), then calling provider_vtable->refresh() followed by get_credentials() before building the payload matches the intended active‑refresh design. The md_lifetime_ms computation now does 64‑bit arithmetic and avoids the 32‑bit time_t overflow issue, and credentials are destroyed immediately after rd_kafka_oauthbearer_set_token() while the payload SDS is freed afterwards.

As a small defensive tweak, you could add a guard for config->provider == NULL before dereferencing it, even though the current creation/registration flow guarantees it.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6dde002 and f343778.

📒 Files selected for processing (2)
  • plugins/out_kafka/kafka_config.c (1 hunks)
  • src/aws/flb_aws_msk_iam.c (13 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (1)
src/aws/flb_aws_msk_iam.c (6)
src/flb_sds.c (3)
  • flb_sds_create_size (92-95)
  • flb_sds_printf (336-387)
  • flb_sds_destroy (389-399)
src/aws/flb_aws_credentials.c (3)
  • flb_aws_credentials_destroy (752-767)
  • flb_standard_chain_provider_create (269-327)
  • flb_aws_provider_destroy (769-791)
src/tls/flb_tls.c (2)
  • flb_tls_create (183-232)
  • flb_tls_destroy (258-277)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
src/aws/flb_aws_util.c (1)
  • flb_aws_client_generator (280-283)
src/flb_kafka.c (1)
  • flb_kafka_opaque_set (242-256)
🔇 Additional comments (5)
plugins/out_kafka/kafka_config.c (1)

246-266: SASL background callbacks gating and error handling look correct

Enabling rd_kafka_sasl_background_callbacks_enable() only when ctx->msk_iam is set, logging failures as warnings, and destroying the rd_kafka_error_t avoids leaks while keeping failures non-fatal. This integrates cleanly with the MSK IAM path.

src/aws/flb_aws_msk_iam.c (4)

31-53: New TLS include, token lifetime constant, and context fields are consistent

Including flb_tls.h, introducing MSK_IAM_TOKEN_LIFETIME_SECONDS (300s), and extending flb_aws_msk_iam with flb_config, cred_tls, and provider line up with the new persistent-provider + TLS design and are used coherently later in this file.


172-470: Refactored build_msk_iam_payload correctly uses external credentials and cleans up resources

Accepting struct flb_aws_credentials *creds, validating inputs (region, host, AK/SK), and building the SigV4 presigned URL without touching the provider directly is a solid separation of concerns. The updated cleanup logic (including empty_payload_hex in both success and error paths) looks balanced and leak‑free given all allocations in this function.


581-676: TLS + provider setup and callback registration follow a correct ownership model

flb_aws_msk_iam_register_oauth_cb now: (1) extracts region, (2) creates a dedicated TLS instance, (3) builds a standard‑chain provider with that TLS, (4) runs syncinitasync on the provider, and (5) wires the Kafka opaque and refresh callback. Error paths consistently tear down cluster_arn, region, cred_tls, and provider only when they have been created, so there are no apparent leaks or double‑frees in these flows.


678-699: MSK IAM destroy path matches the established ownership semantics

flb_aws_msk_iam_destroy destroys the provider, then the TLS handle, and finally the SDS fields and context struct. Given this context is created with flb_standard_chain_provider_create (which does not take ownership of cred_tls), this ordering correctly avoids both leaks and double‑frees and aligns with the prior ownership analysis in the credentials code.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
plugins/out_kafka/kafka_config.c (1)

253-271: API verified: rd_kafka_sasl_background_callbacks_enable is available in librdkafka 2.10.1.

The error handling is correct—the error object is properly checked, logged, destroyed, and cleanup is performed. Web search confirms the API is documented in librdkafka 2.10.1 with the expected prototype and behavior for background SASL token refresh.

Consider clarifying the comment to explain the relationship with the earlier rd_kafka_conf_enable_sasl_queue call (line 222): the config function queues SASL callbacks, while this runtime call enables background thread scheduling for those callbacks. This helps maintainers understand why both calls are necessary for complete MSK IAM OAuth token refresh automation.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f343778 and b34bff6.

📒 Files selected for processing (1)
  • plugins/out_kafka/kafka_config.c (2 hunks)
🔇 Additional comments (1)
plugins/out_kafka/kafka_config.c (1)

217-222: The dual API calls are necessary and correctly implemented for OAUTHBEARER token refresh on idle connections.

Both rd_kafka_conf_enable_sasl_queue() and rd_kafka_sasl_background_callbacks_enable() (introduced in librdkafka v1.9.0) work together as a complementary pair:

  1. rd_kafka_conf_enable_sasl_queue(ctx->conf, 1) at lines 217-222 enables the dedicated SASL callback queue on the configuration object
  2. rd_kafka_sasl_background_callbacks_enable(ctx->kafka.rk) at lines 253-271 forwards that queue to librdkafka's background thread

This two-step pattern is the correct and necessary approach to allow OAuth token refresh callbacks to execute without the application calling rd_kafka_poll(). The order of execution (queue enabled → OAuth registered → producer created → background callbacks enabled) is correct, and the code comment accurately describes the purpose.

@kalavt kalavt marked this pull request as draft November 28, 2025 10:59
@kalavt kalavt marked this pull request as ready for review November 28, 2025 11:25
@kalavt
Copy link
Author

kalavt commented Nov 28, 2025

@edsiper @cosmo0920 @koleini
the PR has been tested and confirmed fixed TLS and auth failure issue.
now it's ready for your review. kindly please have a check.

@kalavt kalavt closed this Nov 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant