Skip to content

Initial implementation for in_amqp plugin#11427

Open
matwey wants to merge 1 commit intofluent:masterfrom
matwey:in_amqp
Open

Initial implementation for in_amqp plugin#11427
matwey wants to merge 1 commit intofluent:masterfrom
matwey:in_amqp

Conversation

@matwey
Copy link

@matwey matwey commented Feb 4, 2026

Here is initial implementation for input AMQP plugin. I guess it is stable enough to obtain some review. The plugin was tested with RabbitMQ 4.2.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change:
    fluent-bit -i amqp -pqueue=flb.in_amqp -o stdout
  • [ N/A ] Debug log output from testing the change
  • [ N/A ] Attached Valgrind output that shows no leaks or memory corruption was found

Documentation

  • Documentation required for this feature

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features
    • Added an AMQP input plugin to consume messages from queues with optional parsing, metadata emission, SSL/TLS support, pause/resume, and configurable reconnect/retry behavior; plugin is now registered for builds.
  • Chores
    • Build system now checks for the AMQP client library and will disable AMQP support if the required library is not found.

@coderabbitai
Copy link

coderabbitai bot commented Feb 4, 2026

Note

Currently processing new changes in this PR. This may take a few minutes, please wait...

 ________________________________________________________
< Code review is my cardio: https://youtu.be/HSmKiws-4NU >
 --------------------------------------------------------
  \
   \   (\__/)
       (•ㅅ•)
       /   づ

✏️ Tip: You can disable in-progress messages and the fortune message in your review settings.

Tip

CodeRabbit can suggest fixes for GitHub Check annotations.

Configure the reviews.tools.github-checks setting to adjust the time to wait for GitHub Checks to complete.

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a new AMQP input plugin: build gating for rabbitmq-c, plugin registration and CMake entries, public header types for connection/plugin state, and a complete plugin implementation handling AMQP connections, consumption, parsing, encoding, retry/reconnect, and lifecycle callbacks.

Changes

Cohort / File(s) Summary
Top-level build
CMakeLists.txt
Added find_package(rabbitmq-c) and a guard that disables FLB_IN_AMQP with a STATUS message if rabbitmq-c is not found.
Plugin registry
plugins/CMakeLists.txt
Registered the new input plugin via REGISTER_IN_PLUGIN("in_amqp").
Plugin build
plugins/in_amqp/CMakeLists.txt
New CMake for in_amqp: declares in_amqp.c and registers the plugin with FLB_PLUGIN(in_amqp ... rabbitmq::rabbitmq).
Plugin header
plugins/in_amqp/in_amqp.h
Adds struct flb_amqp_connection and struct flb_amqp types (URI, queue, parser, reconnect config, encoder, parser pointer, runtime connection fields).
Plugin implementation
plugins/in_amqp/in_amqp.c
New AMQP input implementation: connection/socket/channel management, consumer loop, envelope metadata extraction, optional parser integration, encoding/emission, retry/reconnect scheduling, collectors, and lifecycle callbacks (init/collect/pause/resume/exit).

Sequence Diagrams

sequenceDiagram
    participant FLB as Fluent Bit
    participant Plugin as in_amqp Plugin
    participant AMQP as AMQP Broker
    participant Parser as Parser
    participant Encoder as Encoder

    FLB->>Plugin: in_amqp_init()
    Plugin->>Plugin: parse config (uri, queue, parser, retries)
    Plugin->>Parser: init parser (if configured)
    Plugin->>Encoder: init encoder
    Plugin->>AMQP: open socket / login / open channel / basic_consume
    Plugin->>FLB: register collect callback
Loading
sequenceDiagram
    participant Plugin as in_amqp Plugin
    participant AMQP as AMQP Broker
    participant Parser as Parser
    participant Encoder as Encoder
    participant FLB as Fluent Bit

    Plugin->>AMQP: amqp_consume_message() (poll)
    alt delivery received
        AMQP->>Plugin: envelope (body, headers, routing_key, props)
        Plugin->>Parser: parse body (optional)
        Parser-->>Plugin: parsed record or error
        Plugin->>Plugin: attach metadata (routing_key, headers, props, timestamp)
        Plugin->>Encoder: encode event
        Encoder-->>Plugin: msgpack buffer
        Plugin->>FLB: emit record
    else connection/error
        AMQP-->>Plugin: error / timeout
        Plugin->>Plugin: schedule reconnect via retry collector
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 I hopped into queues where routing keys gleam,
Opened channels, sipped bytes from the stream,
I parse, I pack, if errors begin,
I retry, reconnect — then emit again,
A rabbitly plugin, ready to beam.

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 14.29% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and directly summarizes the main change: adding initial implementation of the AMQP input plugin, which is confirmed by all file changes across CMakeLists.txt, plugin registration, and new in_amqp source files.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Warning

Review ran into problems

🔥 Problems

Git: Failed to clone repository. Please run the @coderabbitai full review command to re-trigger a full review. If the issue persists, set path_filters to include or exclude specific files.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ead056ea6c

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Fix all issues with AI agents
In `@plugins/in_amqp/in_amqp.c`:
- Around line 579-583: The cast in in_amqp_exit improperly dereferences the
unused parameter using "(void) *config;", which can cause undefined behavior if
config is NULL; change that to the standard unused-parameter idiom "(void)
config;" inside the in_amqp_exit function so you do not dereference config while
keeping the variable marked as intentionally unused.
- Around line 446-516: The ctx struct allocated in in_amqp_init is created with
flb_malloc so its members (notably ctx->encoder) are uninitialized and later
in_amqp_config_destroy calls flb_log_event_encoder_destroy which can free
invalid pointers; change the allocation in in_amqp_init to use flb_calloc (or
allocate with flb_malloc followed by memset to zero) so struct flb_amqp fields
(including ctx->encoder) are zero-initialized before any early teardown; keep
all other initialization logic the same.
- Around line 170-176: The flb_plg_trace call logs AMQP body using "%s" which
assumes NUL-termination and can read past the buffer; change the trace to use a
length-limited format (use "%.*s" with body->len) so it prints at most body->len
bytes from body->bytes. Update the flb_plg_trace invocation(s) in the parsing
block around flb_parser_do (referencing ctx->parser, flb_parser_do, body->bytes,
body->len, and ctx->ins) to pass (int)body->len and body->bytes accordingly and
keep the flb_plg_error call as-is.
- Around line 182-218: The code reads optional fields from
amqp_basic_properties_t (properties->timestamp, content_type, content_encoding,
correlation_id, reply_to, headers) without checking the properties->_flags
bitmask; update the logic around flb_time_set(&out_time, properties->timestamp,
0) and each call to in_amqp_append_metadata_bytes/in_amqp_append_metadata_entry
so you first test the appropriate flag (AMQP_BASIC_TIMESTAMP_FLAG,
AMQP_BASIC_CONTENT_TYPE_FLAG, AMQP_BASIC_CONTENT_ENCODING_FLAG,
AMQP_BASIC_CORRELATION_ID_FLAG, AMQP_BASIC_REPLY_TO_FLAG,
AMQP_BASIC_HEADERS_FLAG) before accessing the field; only call flb_time_set or
in_amqp_append_metadata_* when the corresponding flag is set, otherwise skip to
the fallback (flb_time_get) or omit that metadata entry to avoid reading
uninitialized memory (refer to symbols: properties, properties->_flags,
AMQP_BASIC_*_FLAG, out_time, flb_time_set, flb_time_get,
in_amqp_append_metadata_bytes, in_amqp_append_metadata_entry, headers,
ctx->encoder).
- Around line 350-386: The AMQP_RESPONSE_LIBRARY_EXCEPTION branch currently
handles only UNEXPECTED_STATE, CONNECTION_CLOSED and TIMEOUT; other
ret.library_error values from amqp_consume_message() should be handled to avoid
tight CPU loops—add an else clause after the existing checks that logs the
unexpected ret.library_error via flb_plg_warn (including the numeric error),
tears down the connection with in_amqp_connection_destroy(ctx) and either return
a non-zero error or call in_amqp_consumer_start(in, config, in_context) to
trigger a reconnect/backoff; reference the amqp_consume_message() return
handling, ret.library_error, AMQP_RESPONSE_LIBRARY_EXCEPTION, flb_plg_warn,
in_amqp_connection_destroy and in_amqp_consumer_start when applying the change.
- Around line 398-423: When reconnecting, the old socket collector
(ctx->coll_id) is left registered and must be removed before creating a new one;
modify the reconnect path (in the code that calls in_amqp_connection_init and
then sets ctx->coll_id) to check if ctx->coll_id >= 0 and call
flb_input_collector_destroy(ctx->coll_id, in) (or the appropriate
collector-remove API used elsewhere) to delete the stale collector, then clear
ctx->coll_id and proceed to create the new collector with
flb_input_set_collector_socket (refer to ctx->coll_id, in_amqp_collect,
amqp_socket_get_sockfd and in_amqp_connection_init).

Comment on lines 182 to 218
if (flb_time_to_nanosec(&out_time) == 0L) {
flb_time_set(&out_time, properties->timestamp, 0);
}

if (flb_time_to_nanosec(&out_time) == 0L) {
flb_time_get(&out_time);
}

ret = flb_log_event_encoder_begin_record(&ctx->encoder);

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_set_timestamp(&ctx->encoder, &out_time);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = in_amqp_append_metadata_bytes(&ctx->encoder, "routing_key", envelope->routing_key);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = in_amqp_append_metadata_bytes(&ctx->encoder, "content_type", properties->content_type);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = in_amqp_append_metadata_bytes(&ctx->encoder, "content_encoding", properties->content_encoding);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = in_amqp_append_metadata_bytes(&ctx->encoder, "correlation_id", properties->correlation_id);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = in_amqp_append_metadata_bytes(&ctx->encoder, "reply_to", properties->reply_to);
}

for (i = 0; i < headers->num_entries && ret == FLB_EVENT_ENCODER_SUCCESS; ++i) {
ret = in_amqp_append_metadata_entry(&ctx->encoder, &headers->entries[i]);
}
Copy link

@coderabbitai coderabbitai bot Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n plugins/in_amqp/in_amqp.c | sed -n '170,225p'

Repository: fluent/fluent-bit

Length of output: 2412


🏁 Script executed:

rg -n "AMQP_BASIC.*FLAG|_flags" plugins/in_amqp/ -A 2 -B 2

Repository: fluent/fluent-bit

Length of output: 43


🏁 Script executed:

head -50 plugins/in_amqp/in_amqp.c

Repository: fluent/fluent-bit

Length of output: 1738


🏁 Script executed:

rg -n "properties\->_flags|amqp_basic_properties" plugins/in_amqp/ -A 2 -B 2

Repository: fluent/fluent-bit

Length of output: 416


🏁 Script executed:

rg -n "amqp_basic_properties_t|amqp_message_t" plugins/in_amqp/ -B 3 -A 3

Repository: fluent/fluent-bit

Length of output: 641


🏁 Script executed:

rg -n "amqp_bytes_t.*NULL|properties\->|envelope\->" plugins/in_amqp/in_amqp.c | head -30

Repository: fluent/fluent-bit

Length of output: 761


🏁 Script executed:

rg -n "amqp_consume_message|amqp_get" plugins/in_amqp/ -B 2 -A 5

Repository: fluent/fluent-bit

Length of output: 1647


🏁 Script executed:

cat plugins/in_amqp/in_amqp.h

Repository: fluent/fluent-bit

Length of output: 1548


🌐 Web query:

rabbitmq-c library amqp_basic_properties_t _flags optional fields

💡 Result:

In rabbitmq-c, amqp_basic_properties_t uses the _flags bitmask (amqp_flags_t _flags;) to indicate which optional AMQP Basic properties are present; you should only read/write a field if its corresponding AMQP_BASIC_*_FLAG bit is set. [1][2]

amqp_basic_properties_t optional fields and their _flags bits:

  • AMQP_BASIC_CONTENT_TYPE_FLAGcontent_type [1][2]
  • AMQP_BASIC_CONTENT_ENCODING_FLAGcontent_encoding [1][2]
  • AMQP_BASIC_HEADERS_FLAGheaders [1][2]
  • AMQP_BASIC_DELIVERY_MODE_FLAGdelivery_mode [1][2]
  • AMQP_BASIC_PRIORITY_FLAGpriority [1][2]
  • AMQP_BASIC_CORRELATION_ID_FLAGcorrelation_id [1][2]
  • AMQP_BASIC_REPLY_TO_FLAGreply_to [1][2]
  • AMQP_BASIC_EXPIRATION_FLAGexpiration [1][2]
  • AMQP_BASIC_MESSAGE_ID_FLAGmessage_id [1][2]
  • AMQP_BASIC_TIMESTAMP_FLAGtimestamp [1][2]
  • AMQP_BASIC_TYPE_FLAGtype [1][2]
  • AMQP_BASIC_USER_ID_FLAGuser_id [1][2]
  • AMQP_BASIC_APP_ID_FLAGapp_id [1][2]
  • AMQP_BASIC_CLUSTER_ID_FLAGcluster_id [1][2]

Typical usage

  • Publishing: set props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | ...; and then fill only those fields.
  • Consuming: check if (p->_flags & AMQP_BASIC_..._FLAG) before accessing each field.

Sources:
[1] rabbitmq-c amqp_framing.h (0.8.0) flag definitions (AMQP_BASIC_*_FLAG)
[2] rabbitmq-c amqp_framing.h (0.8.0) amqp_basic_properties_t struct layout


🏁 Script executed:

rg -n "properties\->" plugins/in_amqp/in_amqp.c | head -20

Repository: fluent/fluent-bit

Length of output: 605


🏁 Script executed:

rg -n "in_amqp_append_metadata" plugins/in_amqp/in_amqp.c -B 5 -A 10

Repository: fluent/fluent-bit

Length of output: 2503


Guard optional AMQP properties with _flags checks before accessing them.

The rabbitmq-c library's amqp_basic_properties_t struct uses a _flags bitmask to indicate which optional properties are present in a message. Accessing optional fields without checking their corresponding flag bits risks reading uninitialized or garbage data when those properties are omitted. The current code at lines 183, 201, 205, 209, 213, and 216-217 accesses timestamp, content_type, content_encoding, correlation_id, reply_to, and headers without these guards. While in_amqp_append_metadata_bytes checks for null pointers, this provides only partial protection and does not prevent reading from uninitialized memory.

Add flag checks before each optional property access:

  • properties->timestamp → check AMQP_BASIC_TIMESTAMP_FLAG
  • properties->content_type → check AMQP_BASIC_CONTENT_TYPE_FLAG
  • properties->content_encoding → check AMQP_BASIC_CONTENT_ENCODING_FLAG
  • properties->correlation_id → check AMQP_BASIC_CORRELATION_ID_FLAG
  • properties->reply_to → check AMQP_BASIC_REPLY_TO_FLAG
  • properties->headers → check AMQP_BASIC_HEADERS_FLAG
🔧 Suggested fix
-    if (flb_time_to_nanosec(&out_time) == 0L) {
-        flb_time_set(&out_time, properties->timestamp, 0);
-    }
+    if (flb_time_to_nanosec(&out_time) == 0L &&
+        (properties->_flags & AMQP_BASIC_TIMESTAMP_FLAG)) {
+        flb_time_set(&out_time, properties->timestamp, 0);
+    }

-    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
-        ret = in_amqp_append_metadata_bytes(&ctx->encoder, "content_type", properties->content_type);
-    }
+    if (ret == FLB_EVENT_ENCODER_SUCCESS &&
+        (properties->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG)) {
+        ret = in_amqp_append_metadata_bytes(&ctx->encoder, "content_type", properties->content_type);
+    }

-    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
-        ret = in_amqp_append_metadata_bytes(&ctx->encoder, "content_encoding", properties->content_encoding);
-    }
+    if (ret == FLB_EVENT_ENCODER_SUCCESS &&
+        (properties->_flags & AMQP_BASIC_CONTENT_ENCODING_FLAG)) {
+        ret = in_amqp_append_metadata_bytes(&ctx->encoder, "content_encoding", properties->content_encoding);
+    }

-    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
-        ret = in_amqp_append_metadata_bytes(&ctx->encoder, "correlation_id", properties->correlation_id);
-    }
+    if (ret == FLB_EVENT_ENCODER_SUCCESS &&
+        (properties->_flags & AMQP_BASIC_CORRELATION_ID_FLAG)) {
+        ret = in_amqp_append_metadata_bytes(&ctx->encoder, "correlation_id", properties->correlation_id);
+    }

-    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
-        ret = in_amqp_append_metadata_bytes(&ctx->encoder, "reply_to", properties->reply_to);
-    }
+    if (ret == FLB_EVENT_ENCODER_SUCCESS &&
+        (properties->_flags & AMQP_BASIC_REPLY_TO_FLAG)) {
+        ret = in_amqp_append_metadata_bytes(&ctx->encoder, "reply_to", properties->reply_to);
+    }

-    for (i = 0; i < headers->num_entries && ret == FLB_EVENT_ENCODER_SUCCESS; ++i) {
-        ret = in_amqp_append_metadata_entry(&ctx->encoder, &headers->entries[i]);
-    }
+    if (properties->_flags & AMQP_BASIC_HEADERS_FLAG) {
+        for (i = 0; i < headers->num_entries && ret == FLB_EVENT_ENCODER_SUCCESS; ++i) {
+            ret = in_amqp_append_metadata_entry(&ctx->encoder, &headers->entries[i]);
+        }
+    }
🤖 Prompt for AI Agents
In `@plugins/in_amqp/in_amqp.c` around lines 182 - 218, The code reads optional
fields from amqp_basic_properties_t (properties->timestamp, content_type,
content_encoding, correlation_id, reply_to, headers) without checking the
properties->_flags bitmask; update the logic around flb_time_set(&out_time,
properties->timestamp, 0) and each call to
in_amqp_append_metadata_bytes/in_amqp_append_metadata_entry so you first test
the appropriate flag (AMQP_BASIC_TIMESTAMP_FLAG, AMQP_BASIC_CONTENT_TYPE_FLAG,
AMQP_BASIC_CONTENT_ENCODING_FLAG, AMQP_BASIC_CORRELATION_ID_FLAG,
AMQP_BASIC_REPLY_TO_FLAG, AMQP_BASIC_HEADERS_FLAG) before accessing the field;
only call flb_time_set or in_amqp_append_metadata_* when the corresponding flag
is set, otherwise skip to the fallback (flb_time_get) or omit that metadata
entry to avoid reading uninitialized memory (refer to symbols: properties,
properties->_flags, AMQP_BASIC_*_FLAG, out_time, flb_time_set, flb_time_get,
in_amqp_append_metadata_bytes, in_amqp_append_metadata_entry, headers,
ctx->encoder).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai resume

Comment on lines +350 to +321
} else if (ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
/*
* If ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, and
* ret.library_error == AMQP_STATUS_UNEXPECTED_STATE, a frame
* other than AMQP_BASIC_DELIVER_METHOD was received, the caller
* should call amqp_simple_wait_frame() to read this frame and
* take appropriate action.
*/
if (ret.library_error == AMQP_STATUS_UNEXPECTED_STATE) {
if (amqp_simple_wait_frame(c->conn, &frame) != AMQP_STATUS_OK) {
break;
}

if (frame.frame_type == AMQP_FRAME_METHOD) {
switch (frame.payload.method.id) {
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *)frame.payload.method.decoded;
flb_plg_warn(in, "AMQP server channel error %hu, message: %.*s", m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
break;
}
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m = (amqp_connection_close_t *)frame.payload.method.decoded;
flb_plg_warn(in, "AMQP server connection error %hu, message: %.*s", m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
break;
}
default:
flb_plg_warn(in, "An unexpected AMQP method id 0x%08X", frame.payload.method.id);
}
}
} else if (ret.library_error == AMQP_STATUS_CONNECTION_CLOSED) {
in_amqp_connection_destroy(ctx);

return in_amqp_consumer_start(in, config, in_context);
} else if (ret.library_error == AMQP_STATUS_TIMEOUT) {
return 0;
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n plugins/in_amqp/in_amqp.c | sed -n '340,400p'

Repository: fluent/fluent-bit

Length of output: 3159


🏁 Script executed:

cat -n plugins/in_amqp/in_amqp.c | sed -n '320,390p'

Repository: fluent/fluent-bit

Length of output: 3378


🏁 Script executed:

rg "amqp_error_string" plugins/in_amqp/

Repository: fluent/fluent-bit

Length of output: 406


🏁 Script executed:

rg -B 3 -A 3 "amqp_error_string2" plugins/in_amqp/in_amqp.c

Repository: fluent/fluent-bit

Length of output: 775


🏁 Script executed:

cat -n plugins/in_amqp/in_amqp.c | sed -n '328,395p'

Repository: fluent/fluent-bit

Length of output: 3479


🏁 Script executed:

rg -B 2 -A 2 "amqp_consume_message" plugins/in_amqp/

Repository: fluent/fluent-bit

Length of output: 355


🏁 Script executed:

# Check how the AMQP library is defined/included
rg "#include.*amqp" plugins/in_amqp/in_amqp.c | head -20

Repository: fluent/fluent-bit

Length of output: 81


🏁 Script executed:

cat plugins/in_amqp/in_amqp.h | grep -A 20 "#include"

Repository: fluent/fluent-bit

Length of output: 637


🏁 Script executed:

# Look at how other AMQP_STATUS values are handled in the codebase
rg "AMQP_STATUS_" plugins/in_amqp/in_amqp.c | head -20

Repository: fluent/fluent-bit

Length of output: 498


🏁 Script executed:

rg "AMQP_RESPONSE_" plugins/in_amqp/in_amqp.c

Repository: fluent/fluent-bit

Length of output: 484


🏁 Script executed:

# Check if there's any else clause or other handling after line 385
cat -n plugins/in_amqp/in_amqp.c | sed -n '358,390p'

Repository: fluent/fluent-bit

Length of output: 1914


Handle other AMQP library errors to prevent tight loops.
Unhandled AMQP_RESPONSE_LIBRARY_EXCEPTION values cause the non-blocking amqp_consume_message() call to spin in a tight loop without reconnection or error logging. Add an else clause to handle remaining library errors:

Suggested fix
            } else if (ret.library_error == AMQP_STATUS_TIMEOUT) {
                return 0;
+            } else {
+                flb_plg_error(in, "AMQP consume error: %s", amqp_error_string2(ret.library_error));
+                in_amqp_connection_destroy(ctx);
+                return in_amqp_consumer_start(in, config, in_context);
             }
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} else if (ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
/*
* If ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, and
* ret.library_error == AMQP_STATUS_UNEXPECTED_STATE, a frame
* other than AMQP_BASIC_DELIVER_METHOD was received, the caller
* should call amqp_simple_wait_frame() to read this frame and
* take appropriate action.
*/
if (ret.library_error == AMQP_STATUS_UNEXPECTED_STATE) {
if (amqp_simple_wait_frame(c->conn, &frame) != AMQP_STATUS_OK) {
break;
}
if (frame.frame_type == AMQP_FRAME_METHOD) {
switch (frame.payload.method.id) {
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *)frame.payload.method.decoded;
flb_plg_warn(in, "AMQP server channel error %hu, message: %.*s", m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
break;
}
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m = (amqp_connection_close_t *)frame.payload.method.decoded;
flb_plg_warn(in, "AMQP server connection error %hu, message: %.*s", m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
break;
}
default:
flb_plg_warn(in, "An unexpected AMQP method id 0x%08X", frame.payload.method.id);
}
}
} else if (ret.library_error == AMQP_STATUS_CONNECTION_CLOSED) {
in_amqp_connection_destroy(ctx);
return in_amqp_consumer_start(in, config, in_context);
} else if (ret.library_error == AMQP_STATUS_TIMEOUT) {
return 0;
}
}
} else if (ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
/*
* If ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, and
* ret.library_error == AMQP_STATUS_UNEXPECTED_STATE, a frame
* other than AMQP_BASIC_DELIVER_METHOD was received, the caller
* should call amqp_simple_wait_frame() to read this frame and
* take appropriate action.
*/
if (ret.library_error == AMQP_STATUS_UNEXPECTED_STATE) {
if (amqp_simple_wait_frame(c->conn, &frame) != AMQP_STATUS_OK) {
break;
}
if (frame.frame_type == AMQP_FRAME_METHOD) {
switch (frame.payload.method.id) {
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *)frame.payload.method.decoded;
flb_plg_warn(in, "AMQP server channel error %hu, message: %.*s", m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
break;
}
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m = (amqp_connection_close_t *)frame.payload.method.decoded;
flb_plg_warn(in, "AMQP server connection error %hu, message: %.*s", m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
break;
}
default:
flb_plg_warn(in, "An unexpected AMQP method id 0x%08X", frame.payload.method.id);
}
}
} else if (ret.library_error == AMQP_STATUS_CONNECTION_CLOSED) {
in_amqp_connection_destroy(ctx);
return in_amqp_consumer_start(in, config, in_context);
} else if (ret.library_error == AMQP_STATUS_TIMEOUT) {
return 0;
} else {
flb_plg_error(in, "AMQP consume error: %s", amqp_error_string2(ret.library_error));
in_amqp_connection_destroy(ctx);
return in_amqp_consumer_start(in, config, in_context);
}
}
🤖 Prompt for AI Agents
In `@plugins/in_amqp/in_amqp.c` around lines 350 - 386, The
AMQP_RESPONSE_LIBRARY_EXCEPTION branch currently handles only UNEXPECTED_STATE,
CONNECTION_CLOSED and TIMEOUT; other ret.library_error values from
amqp_consume_message() should be handled to avoid tight CPU loops—add an else
clause after the existing checks that logs the unexpected ret.library_error via
flb_plg_warn (including the numeric error), tears down the connection with
in_amqp_connection_destroy(ctx) and either return a non-zero error or call
in_amqp_consumer_start(in, config, in_context) to trigger a reconnect/backoff;
reference the amqp_consume_message() return handling, ret.library_error,
AMQP_RESPONSE_LIBRARY_EXCEPTION, flb_plg_warn, in_amqp_connection_destroy and
in_amqp_consumer_start when applying the change.

@matwey matwey force-pushed the in_amqp branch 2 times, most recently from 3a5b411 to d3d939e Compare February 4, 2026 13:19
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@plugins/in_amqp/in_amqp.c`:
- Around line 421-433: When collector creation or start fails, the code must
clean up the partially-created collector and the open AMQP connection to avoid
leaving the plugin stuck: if flb_input_set_collector_socket succeeded but
flb_input_collector_start failed, call flb_input_collector_remove(ctx->coll_id)
(or the equivalent collector-remove API) and set ctx->coll_id = -1; in both
failure branches close the underlying socket (c->sock) and tear down the AMQP
connection using the library cleanup functions (e.g., amqp_socket_close(c->sock)
and the appropriate amqp connection destroy/close function), clear any
connection pointers in ctx, and then return the error.
- Around line 346-386: in in_amqp_collect(), when amqp_consume_message() returns
AMQP_RESPONSE_SERVER_EXCEPTION or AMQP_RESPONSE_NONE, call
in_amqp_log_reply_error(&ret, in) to record the error, then call
in_amqp_connection_destroy(ctx) and restart the consumer by returning
in_amqp_consumer_start(in, config, in_context); this mirrors the existing
library-exception handling flow and prevents a busy spin; reference the
amqp_consume_message return handling, in_amqp_log_reply_error(),
in_amqp_connection_destroy(), and in_amqp_consumer_start().
- Around line 592-595: The default AMQP URI "amqp://" is missing a HOST and will
cause amqp_parse_url to fail; update the default for the config map entry that
sets struct flb_amqp->uri (the FLB_CONFIG_MAP_STR for "uri") to a parseable
value (either NULL to allow amqp_default_connection_info or a concrete
"amqp://localhost") and ensure any code that reads flb_amqp->uri and calls
amqp_parse_url handles NULL appropriately by falling back to
amqp_default_connection_info; locate the config map entry (FLB_CONFIG_MAP_STR,
"uri", "amqp://") and change the default to NULL or "amqp://localhost" and
adjust initialization paths that call amqp_parse_url to accommodate the new
default.

Comment on lines +346 to +321
if (ret.reply_type == AMQP_RESPONSE_NORMAL) {
in_amqp_handle_envelope(ctx, &envelope);

amqp_destroy_envelope(&envelope);
} else if (ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
/*
* If ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, and
* ret.library_error == AMQP_STATUS_UNEXPECTED_STATE, a frame
* other than AMQP_BASIC_DELIVER_METHOD was received, the caller
* should call amqp_simple_wait_frame() to read this frame and
* take appropriate action.
*/
if (ret.library_error == AMQP_STATUS_UNEXPECTED_STATE) {
if (amqp_simple_wait_frame(c->conn, &frame) != AMQP_STATUS_OK) {
break;
}

if (frame.frame_type == AMQP_FRAME_METHOD) {
switch (frame.payload.method.id) {
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *)frame.payload.method.decoded;
flb_plg_warn(in, "AMQP server channel error %hu, message: %.*s", m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
break;
}
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m = (amqp_connection_close_t *)frame.payload.method.decoded;
flb_plg_warn(in, "AMQP server connection error %hu, message: %.*s", m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
break;
}
default:
flb_plg_warn(in, "An unexpected AMQP method id 0x%08X", frame.payload.method.id);
}
}
} else if (ret.library_error == AMQP_STATUS_CONNECTION_CLOSED) {
in_amqp_connection_destroy(ctx);

return in_amqp_consumer_start(in, config, in_context);
} else if (ret.library_error == AMQP_STATUS_TIMEOUT) {
return 0;
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "in_amqp.c" -type f

Repository: fluent/fluent-bit

Length of output: 88


🏁 Script executed:

wc -l plugins/in_amqp/in_amqp.c

Repository: fluent/fluent-bit

Length of output: 90


🏁 Script executed:

sed -n '340,390p' plugins/in_amqp/in_amqp.c | cat -n

Repository: fluent/fluent-bit

Length of output: 2867


🏁 Script executed:

grep -n "amqp_consume_message\|reply_type\|AMQP_RESPONSE" plugins/in_amqp/in_amqp.c | head -50

Repository: fluent/fluent-bit

Length of output: 611


🌐 Web query:

rabbitmq-c amqp_consume_message return type AMQP_RESPONSE constants documentation

💡 Result:

In rabbitmq-c, amqp_consume_message(...) returns an amqp_rpc_reply_t struct (declared in amqp.h). (alanxz.github.io)

To interpret the result, check ret.reply_type (type amqp_response_type_enum). The documented response-type constants are: (alanxz.github.io)

  • AMQP_RESPONSE_NORMAL — success
  • AMQP_RESPONSE_SERVER_EXCEPTION — broker returned an exception (inspect ret.reply)
  • AMQP_RESPONSE_LIBRARY_EXCEPTION — client library error (inspect ret.library_error)

For the “unexpected frame before basic.deliver” case: older docs mention AMQP_STATUS_UNEXPECTED_FRAME, but that was a documentation error; the actual status used is AMQP_STATUS_UNEXPECTED_STATE (fixed in later releases). (stackoverflow.com)

Citations:


🏁 Script executed:

sed -n '40,70p' plugins/in_amqp/in_amqp.c | cat -n

Repository: fluent/fluent-bit

Length of output: 1636


🏁 Script executed:

sed -n '330,400p' plugins/in_amqp/in_amqp.c | cat -n

Repository: fluent/fluent-bit

Length of output: 3469


🏁 Script executed:

rg -A 2 -B 2 "switch.*reply_type" plugins/in_amqp/in_amqp.c

Repository: fluent/fluent-bit

Length of output: 243


Handle AMQP_RESPONSE_SERVER_EXCEPTION and AMQP_RESPONSE_NONE in in_amqp_collect().

Currently only AMQP_RESPONSE_NORMAL and AMQP_RESPONSE_LIBRARY_EXCEPTION are handled. When amqp_consume_message() returns AMQP_RESPONSE_SERVER_EXCEPTION or AMQP_RESPONSE_NONE, the loop continues without logging or reconnection, potentially spinning indefinitely. The codebase already has in_amqp_log_reply_error() to handle these cases; use it to log the error, destroy the connection, and restart the consumer.

Suggested fix
        if (ret.reply_type == AMQP_RESPONSE_NORMAL) {
            in_amqp_handle_envelope(ctx, &envelope);
            amqp_destroy_envelope(&envelope);
        } else if (ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
            /* existing handling */
            ...
+        } else if (ret.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION ||
+                   ret.reply_type == AMQP_RESPONSE_NONE) {
+            in_amqp_log_reply_error(in, ret, "AMQP consume error");
+            in_amqp_connection_destroy(ctx);
+            return in_amqp_consumer_start(in, config, in_context);
        }
🤖 Prompt for AI Agents
In `@plugins/in_amqp/in_amqp.c` around lines 346 - 386, in in_amqp_collect(), when
amqp_consume_message() returns AMQP_RESPONSE_SERVER_EXCEPTION or
AMQP_RESPONSE_NONE, call in_amqp_log_reply_error(&ret, in) to record the error,
then call in_amqp_connection_destroy(ctx) and restart the consumer by returning
in_amqp_consumer_start(in, config, in_context); this mirrors the existing
library-exception handling flow and prevents a busy spin; reference the
amqp_consume_message return handling, in_amqp_log_reply_error(),
in_amqp_connection_destroy(), and in_amqp_consumer_start().

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Fix all issues with AI agents
In `@plugins/in_amqp/in_amqp.c`:
- Around line 139-141: Fix the typo in the comment inside the switch handling
AMQP field kinds: change the comment "fallthrought" to the correct "fallthrough"
where the case AMQP_FIELD_KIND_UTF8 falls into case AMQP_FIELD_KIND_BYTES so the
intent is clear; update the comment text near the AMQP_FIELD_KIND_UTF8 /
AMQP_FIELD_KIND_BYTES case labels to the correct spelling.
- Around line 413-451: in in_amqp_consumer_start: remove the direct call to
in_amqp_collect (the mutual recursion risk) and instead finish setup after
resetting ctx->retry and pausing the retry collector; rely on the socket/IO
collector you registered earlier to invoke in_amqp_collect naturally.
Specifically, stop calling in_amqp_collect(in, config, ctx) here, keep the
flb_input_collector_pause(ctx->retry_coll_id, in) and return 0 so the existing
socket collector will handle pending messages; ensure no other code path assumes
in_amqp_consumer_start will synchronously process the queue.
- Around line 276-327: The loop in in_amqp_collect can recurse unbounded by
calling in_amqp_consumer_start on AMQP_STATUS_CONNECTION_CLOSED; change the
behavior so that when ret.library_error == AMQP_STATUS_CONNECTION_CLOSED you
call in_amqp_connection_destroy(ctx) and break out of the for(;;) loop (or
return an error code) instead of calling in_amqp_consumer_start to avoid
recursion, letting the outer retry/timer logic handle reconnects; also add
catch-all branches: handle other ret.reply_type values
(AMQP_RESPONSE_SERVER_EXCEPTION, AMQP_RESPONSE_NONE) by logging and
breaking/returning appropriately, and add an else for ret.library_error to log
the unexpected error and break/return so the loop does not spin indefinitely.
- Around line 566-572: in in_amqp_pause ensure you only call
flb_input_collector_pause for the connection collector when a connection exists:
check ctx->conn.coll_id != -1 before invoking
flb_input_collector_pause(ctx->conn.coll_id, ctx->ins); still pause
ctx->retry_coll_id unconditionally as before. This mirrors the guard used in
in_amqp_resume and prevents attempting to pause a non-existent collector.
- Around line 585-593: Add a NULL guard in in_amqp_exit: check whether the
incoming data pointer is NULL before casting to struct flb_amqp *ctx and calling
in_amqp_config_destroy; if data is NULL simply return 0. This ensures
in_amqp_config_destroy (which dereferences ctx->encoder, ctx->retry_coll_id,
ctx->ins) is only called when ctx is non-NULL and avoids null pointer
dereference when the framework invokes cb_exit with a NULL context.

Comment on lines +276 to +325
for (;;) {
amqp_rpc_reply_t ret;
amqp_envelope_t envelope;

amqp_maybe_release_buffers(c->conn);
ret = amqp_consume_message(c->conn, &envelope, &tv, 0);

if (ret.reply_type == AMQP_RESPONSE_NORMAL) {
in_amqp_handle_envelope(ctx, &envelope);

amqp_destroy_envelope(&envelope);
} else if (ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
/*
* If ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, and
* ret.library_error == AMQP_STATUS_UNEXPECTED_STATE, a frame
* other than AMQP_BASIC_DELIVER_METHOD was received, the caller
* should call amqp_simple_wait_frame() to read this frame and
* take appropriate action.
*/
if (ret.library_error == AMQP_STATUS_UNEXPECTED_STATE) {
if (amqp_simple_wait_frame(c->conn, &frame) != AMQP_STATUS_OK) {
break;
}

if (frame.frame_type == AMQP_FRAME_METHOD) {
switch (frame.payload.method.id) {
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *)frame.payload.method.decoded;
flb_plg_warn(in, "AMQP server channel error %hu, message: %.*s", m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
break;
}
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m = (amqp_connection_close_t *)frame.payload.method.decoded;
flb_plg_warn(in, "AMQP server connection error %hu, message: %.*s", m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes);
break;
}
default:
flb_plg_warn(in, "An unexpected AMQP method id 0x%08X", frame.payload.method.id);
}
}
} else if (ret.library_error == AMQP_STATUS_CONNECTION_CLOSED) {
in_amqp_connection_destroy(ctx);

return in_amqp_consumer_start(in, config, in_context);
} else if (ret.library_error == AMQP_STATUS_TIMEOUT) {
return 0;
}
}
}

return 0;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Unbounded recursion between in_amqp_collect and in_amqp_consumer_start.

When AMQP_STATUS_CONNECTION_CLOSED is detected at line 317, the code calls in_amqp_consumer_start, which on success calls in_amqp_collect again (line 443). If the new connection also drops immediately, this chain recurses indefinitely, risking a stack overflow.

Additionally, unhandled reply_type values (AMQP_RESPONSE_SERVER_EXCEPTION, AMQP_RESPONSE_NONE) and unhandled library_error values (anything other than UNEXPECTED_STATE, CONNECTION_CLOSED, TIMEOUT) cause the for(;;) loop to spin without breaking or reconnecting.

Consider:

  1. Breaking out of in_amqp_collect on connection failure instead of recursing — let the retry timer handle reconnection.
  2. Adding a catch-all else for unhandled library_error values and an else for unhandled reply_type values.
Proposed fix (break instead of recurse; add catch-all handlers)
             } else if (ret.library_error == AMQP_STATUS_CONNECTION_CLOSED) {
                 in_amqp_connection_destroy(ctx);
-
-                return in_amqp_consumer_start(in, config, in_context);
+                /* Let the retry timer handle reconnection */
+                if (!flb_input_collector_running(ctx->retry_coll_id, in)) {
+                    flb_input_collector_resume(ctx->retry_coll_id, in);
+                }
+                return 0;
             } else if (ret.library_error == AMQP_STATUS_TIMEOUT) {
                 return 0;
+            } else {
+                flb_plg_error(in, "AMQP consume error: %s", amqp_error_string2(ret.library_error));
+                in_amqp_connection_destroy(ctx);
+                if (!flb_input_collector_running(ctx->retry_coll_id, in)) {
+                    flb_input_collector_resume(ctx->retry_coll_id, in);
+                }
+                return 0;
             }
+        } else {
+            in_amqp_log_reply_error(in, ret, "AMQP consume error");
+            in_amqp_connection_destroy(ctx);
+            if (!flb_input_collector_running(ctx->retry_coll_id, in)) {
+                flb_input_collector_resume(ctx->retry_coll_id, in);
+            }
+            return 0;
         }
🤖 Prompt for AI Agents
In `@plugins/in_amqp/in_amqp.c` around lines 276 - 327, The loop in
in_amqp_collect can recurse unbounded by calling in_amqp_consumer_start on
AMQP_STATUS_CONNECTION_CLOSED; change the behavior so that when
ret.library_error == AMQP_STATUS_CONNECTION_CLOSED you call
in_amqp_connection_destroy(ctx) and break out of the for(;;) loop (or return an
error code) instead of calling in_amqp_consumer_start to avoid recursion,
letting the outer retry/timer logic handle reconnects; also add catch-all
branches: handle other ret.reply_type values (AMQP_RESPONSE_SERVER_EXCEPTION,
AMQP_RESPONSE_NONE) by logging and breaking/returning appropriately, and add an
else for ret.library_error to log the unexpected error and break/return so the
loop does not spin indefinitely.

Comment on lines 413 to 452
static int in_amqp_consumer_start(struct flb_input_instance *in, struct flb_config *config, void *in_context)
{
struct flb_amqp* ctx = in_context;
struct flb_amqp_connection* c = &ctx->conn;
int ret;

if (c->conn) {
return 0; // Already connected
}

ret = in_amqp_connection_init(ctx, config);
if (ret < 0) {
if (++ctx->retry > ctx->reconnect_retry_limits) {
flb_plg_error(in, "Failed to reconnect after %d attempts", ctx->retry);

return -1;
}

if (!flb_input_collector_running(ctx->retry_coll_id, in)) {
flb_input_collector_resume(ctx->retry_coll_id, in);
}

return 0;
}

ctx->retry = 0;

flb_input_collector_pause(ctx->retry_coll_id, in);

/* Read messages currently pending in the queue */
ret = in_amqp_collect(in, config, ctx);
if (ret < 0) {
flb_plg_error(in, "Failed to process pending messages");

return -1;
}

return 0;
}
Copy link

@coderabbitai coderabbitai bot Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Recursive in_amqp_collect call inside in_amqp_consumer_start is risky.

Line 443 calls in_amqp_collect directly after a successful reconnect. Since in_amqp_collect can itself call in_amqp_consumer_start (line 319), this creates mutual recursion. Even if bounded in practice, it's fragile. Consider removing the direct call and letting the newly registered socket collector trigger in_amqp_collect naturally.

Proposed fix
     ctx->retry = 0;

     flb_input_collector_pause(ctx->retry_coll_id, in);

-    /* Read messages currently pending in the queue */
-    ret = in_amqp_collect(in, config, ctx);
-    if (ret < 0) {
-        flb_plg_error(in, "Failed to process pending messages");
-
-        return -1;
-    }
-
     return 0;
🤖 Prompt for AI Agents
In `@plugins/in_amqp/in_amqp.c` around lines 413 - 451, in in_amqp_consumer_start:
remove the direct call to in_amqp_collect (the mutual recursion risk) and
instead finish setup after resetting ctx->retry and pausing the retry collector;
rely on the socket/IO collector you registered earlier to invoke in_amqp_collect
naturally. Specifically, stop calling in_amqp_collect(in, config, ctx) here,
keep the flb_input_collector_pause(ctx->retry_coll_id, in) and return 0 so the
existing socket collector will handle pending messages; ensure no other code
path assumes in_amqp_consumer_start will synchronously process the queue.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that some data is buffered inside the library, so socket doesn't trigger anything until new additional data arrived

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matwey, thank you for the clarification! That makes perfect sense—if rabbitmq-c buffers messages internally, the socket FD won't become readable again until new network data arrives, so relying solely on the socket collector would miss already-buffered messages after a reconnect.

Given this buffering behavior, the direct call to in_amqp_collect is indeed necessary to drain pending messages. To help future maintainers understand this design decision, consider adding a brief comment before that call explaining the buffering rationale:

/* 
 * Explicitly drain messages already buffered in rabbitmq-c.
 * The socket collector won't trigger until new data arrives,
 * but messages may already be buffered internally.
 */
ret = in_amqp_collect(in, config, ctx);

✏️ Learnings added
Learnt from: matwey
Repo: fluent/fluent-bit PR: 11427
File: plugins/in_amqp/in_amqp.c:411-449
Timestamp: 2026-02-09T17:01:07.412Z
Learning: In the in_amqp plugin (plugins/in_amqp/in_amqp.c), the direct call to `in_amqp_collect` from `in_amqp_consumer_start` after reconnection is intentional. The rabbitmq-c library buffers messages internally, and the socket collector won't trigger until new network data arrives, so the explicit call is needed to drain already-buffered messages.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Comment on lines 566 to 580
static void in_amqp_pause(void *data, struct flb_config *config)
{
struct flb_amqp *ctx = data;

flb_input_collector_pause(ctx->conn.coll_id, ctx->ins);
flb_input_collector_pause(ctx->retry_coll_id, ctx->ins);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, find the implementation of flb_input_collector_pause
rg -n "flb_input_collector_pause" --type=c -B2 -A10 | head -60

Repository: fluent/fluent-bit

Length of output: 3380


🏁 Script executed:

# Search for the function definition specifically
rg -n "^[a-zA-Z_].*flb_input_collector_pause" --type=c -A15

Repository: fluent/fluent-bit

Length of output: 2416


🏁 Script executed:

# Check how coll_id is initialized in in_amqp.c
rg -n "coll_id" plugins/in_amqp/in_amqp.c -B2 -A2

Repository: fluent/fluent-bit

Length of output: 2584


🏁 Script executed:

# Check for collector pause implementations in different files
fd -e c -type f | xargs rg -l "input_collector_pause" | head -10

Repository: fluent/fluent-bit

Length of output: 552


🏁 Script executed:

rg -n "get_collector" src/flb_input.c -B2 -A10 | head -50

Repository: fluent/fluent-bit

Length of output: 1432


🏁 Script executed:

# Check what happens in in_amqp_resume for comparison
sed -n '574,583p' plugins/in_amqp/in_amqp.c

Repository: fluent/fluent-bit

Length of output: 340


🏁 Script executed:

# Look for how pause/resume are used elsewhere with potential -1 values
rg -n "flb_input_collector_(pause|resume)" plugins/ -B3 -A1 | grep -E "(coll_id|pause|resume)" | head -40

Repository: fluent/fluent-bit

Length of output: 4415


🏁 Script executed:

# Check the complete get_collector function to see what it does with -1
sed -n '1867,1879p' src/flb_input.c

Repository: fluent/fluent-bit

Length of output: 452


🏁 Script executed:

# Check if there are any constraints on what coll_id values are valid
rg -n "coll->id" src/flb_input.c -B2 -A2 | head -30

Repository: fluent/fluent-bit

Length of output: 710


🏁 Script executed:

# Verify what happens when pause is called with -1 in the actual usage pattern
rg -n "retry_coll_id\|coll_id" plugins/in_amqp/in_amqp.c | head -20

Repository: fluent/fluent-bit

Length of output: 43


🏁 Script executed:

# Let me check what happens if pause fails - does the calling code check the return value?
sed -n '566,572p' plugins/in_amqp/in_amqp.c

Repository: fluent/fluent-bit

Length of output: 284


🏁 Script executed:

# Check if there are any other places in in_amqp.c that check pause/resume return values
rg -n "flb_input_collector_pause\|flb_input_collector_resume" plugins/in_amqp/in_amqp.c -A1

Repository: fluent/fluent-bit

Length of output: 43


🏁 Script executed:

# Check the collector_id function to understand valid ID ranges
rg -n "collector_id" src/flb_input.c -B2 -A10 | head -30

Repository: fluent/fluent-bit

Length of output: 873


🏁 Script executed:

# Check the complete flb_input_collector_pause implementation
sed -n '2035,2075p' src/flb_input.c

Repository: fluent/fluent-bit

Length of output: 1164


🏁 Script executed:

# Verify the current state: does ctx->retry_coll_id stay -1 or get set?
sed -n '509,560p' plugins/in_amqp/in_amqp.c

Repository: fluent/fluent-bit

Length of output: 1534


🏁 Script executed:

# Let me trace through the logic: check when pause is actually called
rg -n "in_amqp_pause" plugins/in_amqp/in_amqp.c -B5 -A10

Repository: fluent/fluent-bit

Length of output: 775


🏁 Script executed:

# Check what the context is when in_amqp_pause is invoked
rg -n "\.pause" plugins/in_amqp/in_amqp.c -B3 -A3

Repository: fluent/fluent-bit

Length of output: 43


🏁 Script executed:

# Check if retry_coll_id can be -1 at the pause point
sed -n '545,575p' plugins/in_amqp/in_amqp.c

Repository: fluent/fluent-bit

Length of output: 908


Guard flb_input_collector_pause against coll_id == -1 for ctx->conn.coll_id.

When no active connection exists, ctx->conn.coll_id is -1. Although flb_input_collector_pause safely returns -1 when passed an invalid collector ID, it is semantically incorrect to pause a non-existent collector. This creates asymmetry with in_amqp_resume, which explicitly guards ctx->conn.coll_id based on connection state.

Proposed fix
 static void in_amqp_pause(void *data, struct flb_config *config)
 {
     struct flb_amqp *ctx = data;

-    flb_input_collector_pause(ctx->conn.coll_id, ctx->ins);
+    if (ctx->conn.coll_id >= 0) {
+        flb_input_collector_pause(ctx->conn.coll_id, ctx->ins);
+    }
     flb_input_collector_pause(ctx->retry_coll_id, ctx->ins);
 }
🤖 Prompt for AI Agents
In `@plugins/in_amqp/in_amqp.c` around lines 566 - 572, in in_amqp_pause ensure
you only call flb_input_collector_pause for the connection collector when a
connection exists: check ctx->conn.coll_id != -1 before invoking
flb_input_collector_pause(ctx->conn.coll_id, ctx->ins); still pause
ctx->retry_coll_id unconditionally as before. This mirrors the guard used in
in_amqp_resume and prevents attempting to pause a non-existent collector.

@coderabbitai
Copy link

coderabbitai bot commented Feb 9, 2026

Caution

Failed to replace (edit) comment. This is likely due to insufficient permissions or the comment being deleted.

Error details
{"name":"HttpError","status":500,"request":{"method":"PATCH","url":"https://api.github.com/repos/fluent/fluent-bit/issues/comments/3847295303","headers":{"accept":"application/vnd.github.v3+json","user-agent":"octokit.js/0.0.0-development octokit-core.js/7.0.6 Node.js/24","authorization":"token [REDACTED]","content-type":"application/json; charset=utf-8"},"body":{"body":"<!-- This is an auto-generated comment: summarize by coderabbit.ai -->\n<!-- This is an auto-generated comment: failure by coderabbit.ai -->\n\n> [!CAUTION]\n> ## Review failed\n> \n> Failed to post review comments\n\n<!-- end of auto-generated comment: failure by coderabbit.ai -->\n\n<!-- This is an auto-generated comment: review paused by coderabbit.ai -->\n\n> [!NOTE]\n> ## Reviews paused\n> \n> It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the `reviews.auto_review.auto_pause_after_reviewed_commits` setting.\n> \n> Use the following commands to manage reviews:\n> - `@coderabbitai resume` to resume automatic reviews.\n> - `@coderabbitai review` to trigger a single review.\n> \n> Use the checkboxes below for quick actions:\n> - [ ] <!-- {\"checkboxId\": \"7f6cc2e2-2e4e-497a-8c31-c9e4573e93d1\"} --> ▶️ Resume reviews\n> - [ ] <!-- {\"checkboxId\": \"e9bb8d72-00e8-4f67-9cb2-caf3b22574fe\"} --> 🔍 Trigger review\n\n<!-- end of auto-generated comment: review paused by coderabbit.ai -->\n\n<!-- walkthrough_start -->\n\n<details>\n<summary>📝 Walkthrough</summary>\n\n## Walkthrough\n\nAdds a new AMQP input plugin: build gating for rabbitmq-c, plugin registration and CMake entries, public header types for connection/plugin state, and a full plugin implementation handling AMQP connections, consumption, parsing, encoding, retry/reconnect, and lifecycle callbacks.\n\n## Changes\n\n|Cohort / File(s)|Summary|\n|---|---|\n|**Top-level build** <br> `CMakeLists.txt`|Added `find_package(rabbitmq-c)` and a guard that disables `FLB_IN_AMQP` with a STATUS message if rabbitmq-c is not found.|\n|**Plugin registry** <br> `plugins/CMakeLists.txt`|Registered the new input plugin via `REGISTER_IN_PLUGIN(\"in_amqp\")`.|\n|**Plugin build** <br> `plugins/in_amqp/CMakeLists.txt`|New CMake for `in_amqp`: declares `in_amqp.c` and registers the plugin with `FLB_PLUGIN(... rabbitmq::rabbitmq)`.|\n|**Plugin header** <br> `plugins/in_amqp/in_amqp.h`|Adds `struct flb_amqp_connection` and `struct flb_amqp` types containing connection, config, encoder, parser, and runtime fields.|\n|**Plugin implementation** <br> `plugins/in_amqp/in_amqp.c`|New AMQP input implementation: connection/socket/channel management, consumer loop, envelope metadata extraction, optional parser integration, encoding/emission, retry/reconnect scheduling, collectors, and lifecycle callbacks (init/collect/pause/resume/exit).|\n\n## Sequence Diagrams\n\n```mermaid\nsequenceDiagram\n    participant FLB as Fluent Bit\n    participant Plugin as in_amqp Plugin\n    participant AMQP as AMQP Broker\n    participant Parser as Parser\n    participant Encoder as Encoder\n\n    FLB->>Plugin: in_amqp_init()\n    Plugin->>Plugin: parse config (uri, queue, parser, retries)\n    Plugin->>Parser: init parser (if configured)\n    Plugin->>Encoder: init encoder\n    Plugin->>AMQP: open socket / login / open channel / basic_consume\n    Plugin->>FLB: register collect callback\n```\n\n```mermaid\nsequenceDiagram\n    participant Plugin as in_amqp Plugin\n    participant AMQP as AMQP Broker\n    participant Parser as Parser\n    participant Encoder as Encoder\n    participant FLB as Fluent Bit\n\n    Plugin->>AMQP: amqp_consume_message() (poll)\n    alt delivery received\n        AMQP->>Plugin: envelope (body, headers, routing_key, props)\n        Plugin->>Parser: parse body (optional)\n        Parser-->>Plugin: parsed record / raw body\n        Plugin->>Plugin: attach metadata (routing_key, headers, props)\n        Plugin->>Encoder: encode event\n        Encoder-->>Plugin: msgpack buffer\n        Plugin->>FLB: emit record\n    else connection/error\n        AMQP-->>Plugin: error / timeout\n        Plugin->>Plugin: schedule reconnect via retry collector\n    end\n```\n\n## Estimated code review effort\n\n🎯 4 (Complex) | ⏱️ ~45 minutes\n\n## Poem\n\n> 🐰 I hopped into queues where headers gleam,  \n> Opened channels, nibbled bytes from the stream,  \n> I parse and pack, then retry with a grin,  \n> Reconnect, emit — let the messages in,  \n> A floppy‑eared patch, now ready to spin.\n\n</details>\n\n<!-- walkthrough_end -->\n\n\n<!-- pre_merge_checks_walkthrough_start -->\n\n<details>\n<summary>🚥 Pre-merge checks | ✅ 2 | ❌ 1</summary>\n\n<details>\n<summary>❌ Failed checks (1 warning)</summary>\n\n|     Check name     | Status     | Explanation                                                                           | Resolution                                                                         |\n| :----------------: | :--------- | :------------------------------------------------------------------------------------ | :--------------------------------------------------------------------------------- |\n| Docstring Coverage | ⚠️ Warning | Docstring coverage is 14.29% which is insufficient. The required threshold is 80.00%. | Write docstrings for the functions missing them to satisfy the coverage threshold. |\n\n</details>\n<details>\n<summary>✅ Passed checks (2 passed)</summary>\n\n|     Check name    | Status   | Explanation                                                                                                                                                                                             |\n| :---------------: | :------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |\n| Description Check | ✅ Passed | Check skipped - CodeRabbit’s high-level summary is enabled.                                                                                                                                             |\n|    Title check    | ✅ Passed | The title 'Initial implementation for in_amqp plugin' directly and clearly describes the main change: adding a new AMQP input plugin to Fluent Bit, which is the primary objective of the pull request. |\n\n</details>\n\n<sub>✏️ Tip: You can configure your own custom pre-merge checks in the settings.</sub>\n\n</details>\n\n<!-- pre_merge_checks_walkthrough_end -->\n\n<!-- finishing_touch_checkbox_start -->\n\n<details>\n<summary>✨ Finishing touches</summary>\n\n- [ ] <!-- {\"checkboxId\": \"7962f53c-55bc-4827-bfbf-6a18da830691\"} --> 📝 Generate docstrings\n<details>\n<summary>🧪 Generate unit tests (beta)</summary>\n\n- [ ] <!-- {\"checkboxId\": \"f47ac10b-58cc-4372-a567-0e02b2c3d479\", \"radioGroupId\": \"utg-output-choice-group-unknown_comment_id\"} -->   Create PR with unit tests\n- [ ] <!-- {\"checkboxId\": \"07f1e7d6-8a8e-4e23-9900-8731c2c87f58\", \"radioGroupId\": \"utg-output-choice-group-unknown_comment_id\"} -->   Post copyable unit tests in a comment\n\n</details>\n\n</details>\n\n<!-- finishing_touch_checkbox_end -->\n\n<!-- tips_start -->\n\n---\n\nThanks for using [CodeRabbit](https://coderabbit.ai?utm_source=oss&utm_medium=github&utm_campaign=fluent/fluent-bit&utm_content=11427)! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.\n\n<details>\n<summary>❤️ Share</summary>\n\n- [X](https://twitter.com/intent/tweet?text=I%20just%20used%20%40coderabbitai%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20the%20proprietary%20code.%20Check%20it%20out%3A&url=https%3A//coderabbit.ai)\n- [Mastodon](https://mastodon.social/share?text=I%20just%20used%20%40coderabbitai%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20the%20proprietary%20code.%20Check%20it%20out%3A%20https%3A%2F%2Fcoderabbit.ai)\n- [Reddit](https://www.reddit.com/submit?title=Great%20tool%20for%20code%20review%20-%20CodeRabbit&text=I%20just%20used%20CodeRabbit%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20proprietary%20code.%20Check%20it%20out%3A%20https%3A//coderabbit.ai)\n- [LinkedIn](https://www.linkedin.com/sharing/share-offsite/?url=https%3A%2F%2Fcoderabbit.ai&mini=true&title=Great%20tool%20for%20code%20review%20-%20CodeRabbit&summary=I%20just%20used%20CodeRabbit%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20proprietary%20code)\n\n</details>\n\n<sub>Comment `@coderabbitai help` to get the list of available commands and usage tips.</sub>\n\n<!-- tips_end -->\n\n<!-- internal state start -->\n\n\n<!-- DwQgtGAEAqAWCWBnSTIEMB26CuAXA9mAOYCmGJATmriQCaQDG+Ats2bgFyQAOFk+AIwBWJBrngA3EsgEBPRvlqU0AgfFwA6NPEgQAfACgjoCEYDEZyAAUASpETZWaCrKPR1AGxJcAkhnXwaB4ozNxebBi41PD4WABm+HzwGAD6aMwAjtw8HthEyZCQBgByjgKUXACMlQAsAEwA7IUGAKo2ADJcsLi43IgcAPQD+biw2AIaTMwDcbnsM3ORYGq4A9zYHh4D1fVNRS2IFZDM1ADuJPJFAMr42BQMJJACVBgMsFzJaZnZgEmEMM6kXBPF5vLgnRA0PjXKK4bD9fjcMjNGwkCTwEjnCjwuiIeCIigAGkg/joAiCiAA1vAidxqBR4AwKWAISRuLAyIgiQ8PEESD1kpU6gAOABszXaKhIHnhtHwDEQYAoJAy2HgSvoRQAwkrqHR0JxIHUAAx1EVgE3mmrQQUcACsIo4RpqAC0jAARaQMenccSxDgGKAAQVotGQmBQ/nEQRCYRIERhMSw+Di6CwgYAsgBFKwR9ZAsJ5AoACk+6SyAEpIAk+AAxRZAgBC6g0AesuXyWFOaGQNBZ9FO6lgkBsKhWWcgNQ0dRbUAAogAPdKxhQYOLwIh3aKxHgUfBopS0LgAA1m2HYy3Uuh0ZeyYG4KpIZ4AvLMJqXvrp8PZcLK8EeZ5AmqxLg9ICHgiSMOyjIeEgQLqHG/QGIUc6LqEXgrmuG5UL6GBcLwe7wAeLbIZAHpgUQkAePgFG3L0eBVruzCQL24gYEQXDFAMgbEbokAAGpBEQ9IYPQtF5vYsD4AObHEl+XhoBS8KcdxSG8W6cqOOwW5YEqKpqnQXDYK8UEUnQAHtAyHLeAoSj2OMzDqDQ9BGUofCBrSbyPNORo8JQO74CIYiQLpqpKvGiAtgYFiQAA8sIojiFIyAOE4LhuJ41l+AE0bwGhcZaThVYQe+WQ5IWWBFKUzDlBQVS1I0zRtJ0kDdL0/RDCMYwTFMCxnpEvXnisawbFsOwNfshy1ccZwXM0Nx3A8wKYKCEZfKVfzQACfJLcZYLdpCc0wnCXD4IiFUGCiaIYpQ2KhnilBEiStBktKVI0nSDJMiybIclyUq8vyGCCqK4qStKXCyvKirKqFepajqTn6lwxqmuadSWtadR2g6TqugYHqIF6eI4f6QYhmGWDJNlwS5bG8bafwKbhhm2a5vRBYdpAJapDelbVpAdZ9Y2zatlY7YFCxeoDqMw6juo46TtOrYLku6FMKu66boV+H7gZkAnvWF5AmA14fneD7Pq+GglbeX4Qr+uD/q2QGRKB4F8J50GwSgNDMIhJEq3lGGa9hiZ4buuu0DxUBkXklHUfweDiXEjHMdIrHsZAynR/xgnCaJSf0YgknSRRGBySQClKVxOfqQwmmRAzIX6YekBGZ7plR62FkPBghxcEwtkOAIDk9HqLl+e5aCeYaGg+fi/mBUCzdhewEVGPoxjgFAZCiUz4HEGQyiI1M8bh/w8ViJI0hPPIg/KKozbaLoYCGCYUBwKgqDhmgB+kOQ2E9Sn3YFwKgpw7KpXkHIGyD8VhaB0JvLepgDCanTApEgFkIQRVwPOTgBgABEhCoqWEDD4Q+ADdT0BSicFwjNIKYFIIgDekAWY5l4JQGGSB4LICILqXwfdKCI1wKcL8rDoY8kRuw5uuJeyQVEIpCMgE0GmUwbgbBuCc7BgPFWZItAUgeQpGgUgRYqCP1wJkMADByyqSgD4FMNZ2gNhSD4YoKRWEoGQDFYoqZ6CmJWBYhgHjZJAgSC5IktAkAqHQg4pxLi3FZhzGiNAAtHEpBilYaAPgvFFhic41x7iYo1hrJWaWQ5klXGgIGaALQrjHGkIgIxJBWzQBESgJQkQGTRjQCGAIsQwzcERM4RRhxaSAJgcFEg+Q+lEgEPgGW+JqwORkqMR4DS2CQF4RneO+RAn83cQ4AZiRNCi3GDBBgAwSDzm4EcvUgYrA+AmScZIyB26wAYXQAA3LJYkGJICXOuRQRGiBZDVXwNKfgSRIiUDiNPaQkVzAkI8JCbSPYvwrMgEoBgPJQ59Lof8m5ok+DrAEGcv5HTxDSGYcUWITSDAWXIMgN47zW4AGohQDHNAQohAZkEc2eQMVB6DVHqLwYQ/BxCWFkP/sfPU1DnDyGTPQtilLWxaL1OQcBNsyqcyVPkCEOKsDQOeYI5IFEbCzgAOI+AqbOGweSUhWHaC0K1xQiz4JtvgysBQ+V9wFcojBsERUaBgAgZAsob7lyBMwRQ8A4jyEwPIS5sFTXbIZBCv5FBdx8DeSJGCbEvmxA8Imq5mBQzMXZKtH1yBdWwVoaU8tjwNVkpArIeFErAxIuPomVFDaMWiGxSivFVyCXpuJaS9gAQVW2IwBIOUDNukHmPOaq1Nq7VxMdc6lxbqPXliPIotc6EjxVr9UKwNGgcGOyMPSm+TLlWssqByo0RhZwQlypQ8ZSorrgJIHEasBp0x0HgI4Ll4qeVgCMMem2J6VFnovf6MV7apVH0AVQxwNCFUplvYwowWiwzHCpicYIgrTLBywgzfm6KtU+tJqRftzgb7JMQLce4jwYIQn1oge4e71ZRCpjJI8NtJhHlbCiPVkIewVp9ZAet6KRxmPHFMa55BIiQCSfrXJG6XXczWtkfBAASAA3pxhgABffBwU5bmIyBwDgfj1CZF3QilhnaDU9vRZigdOFkCKvxYCvUEEx1ponRSphrZqV/OHX52gw0SVpo884Qd0aIlrjMk5tB/g4jpwFvAdCgYMBBFkAAL0oFe5IN6c2kFZTUR9pWGVKsq1wFl1XOUIbAxB8WvqoOCYYPBoh0VSHkJlahyBdCsMqp8HlcK6AqwjRYQktm+YOtFVrPWSATY4JYEgzzb4AxuvBtw5RWNohZBYseCcfLpB4xcliOQK+24c20DzRRbmWLsARJklcK47QBjQE1Ik5wgRIiIHLESNgiAGmkDJVIKiiJ/IPHB6aokp0cLRlGZNCMNAhLaSJKWv5UgVNxiQLibcan0WC3YGty8ZB74UGDX4V7Shq2CDhECSgWaJkAozkSJU6tbuFVx0qFt9g4yYHEPKHHIk+0RIYO+pgmwEqJGQPzcuFBCMIi7duAXfJaFUR2cGoCSpGBoFpGoGCIX/TOxuwlRMPilXkGCDBTLDATteC4OnKJSAhzuN59b2IRJdfJCR2dO3UoiQQmcECdWKUU0pxYNN9WmE7h6gtiQIk9bTuYGwNkbcbPFcAVnDOqUp1Hj4Xh7iNiJ0sBKBglIFwQeUebHkGjx46KwcQ8ePW5JCeQ56mbxQL5YEcv0FRBTnniQy3JAZym8QYOoihG57RU1KRTKyGu1CyIKRcCyERGvmgG/qcxrYtdzN/0cIpEItz1kRbN/4Al/Qdx7Juk3S+YT/MzhDj0HTIgIgVhp4UieDGjfKcOyFgMkn3kEt3lhHQEjishQAOIcH8qPBZuArMrQK2q2ABlELQNQMkoTgjn6C1FKAvLSKMD2tQFEG8FDkXrDmwFgTgVzLMmCpXH3ESMkJjjdESLMPgNQJyJAC0NADWOynIL2ESDPu7qEMDsxGihWiPgTq8IoJQPnvIe9hRLjngcTrhCuPqtgGIESAfkoHfjwKcp7njmvF8g9uhPoSmrnliLfNsh2KoZLkqIcIDBROHjQABL/liH5GwZMgaidD6ImKju/n5HCI0stgAWgXDvUqal8sAUiGQFEtAW3IcBJiXiEXwAQP5LQDoY8F/j/n/onHRMclACiD7mIAMILjrtRAyAPLEInqYuhFUfIDBKPBTPQL4RQBIOSF8s0QoPLmIBBITOyDkU9l8rSHCCQJUdIJpMLudmLj2OiPQFkVJo7sdqdgBEBAMQQLYbjmsc7qdntGwdoH3NNk8ptmCk9v0V4IMXwLjoxqyAljQBMkLnLjcTsV8gcpzsgBMeXkQNMdHjJNuFJr8VMc4ZpF8lTFGDBMVu0YwPJKcVniuDQLgnocoQ9DwBkYYW8QrliJsfUSHNpBDD+mVsgG0D4ESCnu9N4YSLbmPhgHzsLq4YgOMSEWSTYA8hBEoDChsGol8t0TBNgbIivMnmeGeLblCYEDCTfOAbGqRknl3FAFYMYYEncj4G7pFmotNk2skHmPoktvFgaqtDePqeVNJoOIdk7i7o8DLpsGSIyHCV3gSRRCcNwABFlNCfAIVvOpLhnhgFnlwEEFRDLrIrjpKUEF6TesBJcrgGiTToYUqBIrKiBDobCM4eMdCokH7PCcwUiTnpmorumkmrGRGFPjJGwNGrQrjuUYVDQM4LKKcBgOZNRA4XUX3LBBTjYQMF2BQBgLtquHJNRErhBDWYmGvilIEX7piViIjmSoPLOVrqOTnvjmopFP1i5oOlke5nRq5kOpzv5kSiqc2pOqFlAJgZJPQAunrEeNoUFK+OfhgHqVJjbKaR2HugUAeo8Eeh1ogP2Tpn+TeEJkYCRB6LxuDKpCRMUOkNZC+T6hBbxNANvtZLeSEh4AIA+U+R1vBVAO0HOiTNqvyl1ttlkJMNhZAFcOuPlmmY8EWIFqqfcstswOWP6IUKxbxFHpHm8hQAAFTEjQWQBPiQD4I3jipsUkQcX0I8V9qEzeiFSCX4LuK6nswdaiViVQAMDoVQkCXGnfAPnqDwXiXoXsIpAUBGTaXFAtDtDtAGXsXoU4lBSCUvn2W4A2XqXoWnjFwpBgQpiCUWVWWuWkYpCunaWQFBXG4BUaX6K/wIGOXEXcBRWTERXoXgnrKxU6YmUzFsBJUpDFnaUvnFlpaYBHbsY1g5aPB5YFbFYUAlBeKzhOYegMBcKxBGCFUZZZafl9qQhLLaS1blbMqNYigADMDQNWrWEA4GBgW2/5gm7wIGiGg2KGEC6Go2FWKqB2ySTadFhBT+kKkIMKi0FGFa7i5OKm62C2BFuEzCHoa4dWKFVYaF6VS5nYFpKWHgoYgZulT1KQ7hJAm+K4GAXyJpTGjIfIf13FwNFIgNn1Oa9uf1t6kJKmbx5+iptGN1N8d195N45pMsr171916FiAoYf1dw8AXy95hNiAf1KeKQ+WbAZND1FNf1feNN0FnxKZQUJpX1yQCQ/1D5CQCNy8ogVuYgGVLaKQrR6gLJGOEyNZotLgD5kIApbNpld5D1uuOVK5OV6JfA+hlAytqZ+NUVNJkA3Ffe+tqtmlj5eAD54erwjw3FzyAtLx8tSNhETtzR5tqF6FnNwthUvOkU9OuQjONGgdb2jwG49ZXAuScSrCKQAAEq2B6GdO0o1dIKApZgEgMIBbAJwYbENPeUpZoDnfdULEbAsOherbIbgFrTThoLAEYOFnEO3F5umuUG8miFySSZGN2ugIbleV3O2huS3VuRWoaYOj5pqQeUYbFoEsFuiKeTAEhegCGNeRjQ9T7Qyb7hgO+fEGVfrFNTeABd8HXcJiRKVVKHjUeBvXzomN9TCL9ZHjdh8keESFfbpRDaDUCODXKJDS/frJzTDVKHDTms/a/Wwdccjc/c0kvf3ceGvd7d8DvTooegfTtjNafbxOfW9fCAbATUTUCCTaA/rOTfg5ANTbTSQEQ7g99aQ8zRQ1Q3ddfVvXzV+Lziw1Q+A/SXznLbIOLblJLRwyplwwlDwwrZQErX/TeezV7eLdRBrewDXQoTrdrQw9I4beAabRkaoyrTI4XTbVEHbSbY7ZI5w9rrw67bQII4LS2towbZjZ9b7TbrzlAxKulsVUCKVblvlkWlVQ3bVfVaIE1VgDYBsCqsUP4xKi0NwEKaxiPsEFEEQL1YyqtayraINaNdyogu/GSnvDgAQAtbLiwGfMgUtfKnYTTpZvAi/G/NvAoKwOoMjZTZ+uiOcHouHoCtU0gjvN0kaPaJXCKAwLUEKEaJUA0LaLaGgGgDCiKKoA0MaAwAwCKLaLQDUCKA0HEAAJzVCdPZODVoC2gCA1DVAbOZaVCTNxDGiVBbO0BChxCDUjPDUqALMCBGi0AihGgkB1BoCDU7O1O0CDW0AbODUbMkAigkBCh0BjOmi2gy41CqBoA1ANAbNgt1A/oNBCgbMfPgs1C2i/MQDoAHPVBCi2g1DYGVwnODU1A1DfMMATNotoBCi0sDMMA1AkCDV1CDOIudMGDZOnwNOERNOogtN0Ba30D6BAA=== -->\n\n<!-- internal state end -->"},"request":{"retryCount":3,"retries":3,"retryAfter":16}},"response":{"url":"https://api.github.com/repos/fluent/fluent-bit/issues/comments/3847295303","status":500,"headers":{"access-control-allow-origin":"*","access-control-expose-headers":"ETag, Link, Location, Retry-After, X-GitHub-OTP, X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Used, X-RateLimit-Resource, X-RateLimit-Reset, X-OAuth-Scopes, X-Accepted-OAuth-Scopes, X-Poll-Interval, X-GitHub-Media-Type, X-GitHub-SSO, X-GitHub-Request-Id, Deprecation, Sunset","content-length":"0","content-security-policy":"default-src 'none'","content-type":"application/json; charset=utf-8","date":"Mon, 09 Feb 2026 16:12:32 GMT","referrer-policy":"origin-when-cross-origin, strict-origin-when-cross-origin","server":"github.com","strict-transport-security":"max-age=31536000; includeSubdomains; preload","vary":"Accept-Encoding, Accept, X-Requested-With","x-accepted-github-permissions":"issues=write; pull_requests=write","x-content-type-options":"nosniff","x-frame-options":"deny","x-github-api-version-selected":"2022-11-28","x-github-media-type":"github.v3; format=json","x-github-request-id":"404F:68365:10B97A:47D026:698A076F","x-ratelimit-limit":"15000","x-ratelimit-remaining":"14927","x-ratelimit-reset":"1770656672","x-ratelimit-resource":"core","x-ratelimit-used":"73","x-xss-protection":"0"},"data":""}}

1 similar comment
@coderabbitai
Copy link

coderabbitai bot commented Feb 9, 2026

Caution

Failed to replace (edit) comment. This is likely due to insufficient permissions or the comment being deleted.

Error details
{"name":"HttpError","status":500,"request":{"method":"PATCH","url":"https://api.github.com/repos/fluent/fluent-bit/issues/comments/3847295303","headers":{"accept":"application/vnd.github.v3+json","user-agent":"octokit.js/0.0.0-development octokit-core.js/7.0.6 Node.js/24","authorization":"token [REDACTED]","content-type":"application/json; charset=utf-8"},"body":{"body":"<!-- This is an auto-generated comment: summarize by coderabbit.ai -->\n<!-- This is an auto-generated comment: failure by coderabbit.ai -->\n\n> [!CAUTION]\n> ## Review failed\n> \n> Failed to post review comments\n\n<!-- end of auto-generated comment: failure by coderabbit.ai -->\n\n<!-- This is an auto-generated comment: review paused by coderabbit.ai -->\n\n> [!NOTE]\n> ## Reviews paused\n> \n> It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the `reviews.auto_review.auto_pause_after_reviewed_commits` setting.\n> \n> Use the following commands to manage reviews:\n> - `@coderabbitai resume` to resume automatic reviews.\n> - `@coderabbitai review` to trigger a single review.\n> \n> Use the checkboxes below for quick actions:\n> - [ ] <!-- {\"checkboxId\": \"7f6cc2e2-2e4e-497a-8c31-c9e4573e93d1\"} --> ▶️ Resume reviews\n> - [ ] <!-- {\"checkboxId\": \"e9bb8d72-00e8-4f67-9cb2-caf3b22574fe\"} --> 🔍 Trigger review\n\n<!-- end of auto-generated comment: review paused by coderabbit.ai -->\n\n<!-- walkthrough_start -->\n\n<details>\n<summary>📝 Walkthrough</summary>\n\n## Walkthrough\n\nAdds a new AMQP input plugin: build gating for rabbitmq-c, plugin registration and CMake entries, public header types for connection/plugin state, and a full plugin implementation handling AMQP connections, consumption, parsing, encoding, retry/reconnect, and lifecycle callbacks.\n\n## Changes\n\n|Cohort / File(s)|Summary|\n|---|---|\n|**Top-level build** <br> `CMakeLists.txt`|Added `find_package(rabbitmq-c)` and a guard that disables `FLB_IN_AMQP` with a STATUS message if rabbitmq-c is not found.|\n|**Plugin registry** <br> `plugins/CMakeLists.txt`|Registered the new input plugin via `REGISTER_IN_PLUGIN(\"in_amqp\")`.|\n|**Plugin build** <br> `plugins/in_amqp/CMakeLists.txt`|New CMake for `in_amqp`: declares `in_amqp.c` and registers the plugin with `FLB_PLUGIN(... rabbitmq::rabbitmq)`.|\n|**Plugin header** <br> `plugins/in_amqp/in_amqp.h`|Adds `struct flb_amqp_connection` and `struct flb_amqp` types containing connection, config, encoder, parser, and runtime fields.|\n|**Plugin implementation** <br> `plugins/in_amqp/in_amqp.c`|New AMQP input implementation: connection/socket/channel management, consumer loop, envelope metadata extraction, optional parser integration, encoding/emission, retry/reconnect scheduling, collectors, and lifecycle callbacks (init/collect/pause/resume/exit).|\n\n## Sequence Diagrams\n\n```mermaid\nsequenceDiagram\n    participant FLB as Fluent Bit\n    participant Plugin as in_amqp Plugin\n    participant AMQP as AMQP Broker\n    participant Parser as Parser\n    participant Encoder as Encoder\n\n    FLB->>Plugin: in_amqp_init()\n    Plugin->>Plugin: parse config (uri, queue, parser, retries)\n    Plugin->>Parser: init parser (if configured)\n    Plugin->>Encoder: init encoder\n    Plugin->>AMQP: open socket / login / open channel / basic_consume\n    Plugin->>FLB: register collect callback\n```\n\n```mermaid\nsequenceDiagram\n    participant Plugin as in_amqp Plugin\n    participant AMQP as AMQP Broker\n    participant Parser as Parser\n    participant Encoder as Encoder\n    participant FLB as Fluent Bit\n\n    Plugin->>AMQP: amqp_consume_message() (poll)\n    alt delivery received\n        AMQP->>Plugin: envelope (body, headers, routing_key, props)\n        Plugin->>Parser: parse body (optional)\n        Parser-->>Plugin: parsed record / raw body\n        Plugin->>Plugin: attach metadata (routing_key, headers, props)\n        Plugin->>Encoder: encode event\n        Encoder-->>Plugin: msgpack buffer\n        Plugin->>FLB: emit record\n    else connection/error\n        AMQP-->>Plugin: error / timeout\n        Plugin->>Plugin: schedule reconnect via retry collector\n    end\n```\n\n## Estimated code review effort\n\n🎯 4 (Complex) | ⏱️ ~45 minutes\n\n## Poem\n\n> 🐰 I hopped into queues where headers gleam,  \n> Opened channels, nibbled bytes from the stream,  \n> I parse and pack, then retry with a grin,  \n> Reconnect, emit — let the messages in,  \n> A floppy‑eared patch, now ready to spin.\n\n</details>\n\n<!-- walkthrough_end -->\n\n\n<!-- pre_merge_checks_walkthrough_start -->\n\n<details>\n<summary>🚥 Pre-merge checks | ✅ 2 | ❌ 1</summary>\n\n<details>\n<summary>❌ Failed checks (1 warning)</summary>\n\n|     Check name     | Status     | Explanation                                                                           | Resolution                                                                         |\n| :----------------: | :--------- | :------------------------------------------------------------------------------------ | :--------------------------------------------------------------------------------- |\n| Docstring Coverage | ⚠️ Warning | Docstring coverage is 14.29% which is insufficient. The required threshold is 80.00%. | Write docstrings for the functions missing them to satisfy the coverage threshold. |\n\n</details>\n<details>\n<summary>✅ Passed checks (2 passed)</summary>\n\n|     Check name    | Status   | Explanation                                                                                                                                                                                             |\n| :---------------: | :------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |\n| Description Check | ✅ Passed | Check skipped - CodeRabbit’s high-level summary is enabled.                                                                                                                                             |\n|    Title check    | ✅ Passed | The title 'Initial implementation for in_amqp plugin' directly and clearly describes the main change: adding a new AMQP input plugin to Fluent Bit, which is the primary objective of the pull request. |\n\n</details>\n\n<sub>✏️ Tip: You can configure your own custom pre-merge checks in the settings.</sub>\n\n</details>\n\n<!-- pre_merge_checks_walkthrough_end -->\n\n<!-- finishing_touch_checkbox_start -->\n\n<details>\n<summary>✨ Finishing touches</summary>\n\n- [ ] <!-- {\"checkboxId\": \"7962f53c-55bc-4827-bfbf-6a18da830691\"} --> 📝 Generate docstrings\n<details>\n<summary>🧪 Generate unit tests (beta)</summary>\n\n- [ ] <!-- {\"checkboxId\": \"f47ac10b-58cc-4372-a567-0e02b2c3d479\", \"radioGroupId\": \"utg-output-choice-group-unknown_comment_id\"} -->   Create PR with unit tests\n- [ ] <!-- {\"checkboxId\": \"07f1e7d6-8a8e-4e23-9900-8731c2c87f58\", \"radioGroupId\": \"utg-output-choice-group-unknown_comment_id\"} -->   Post copyable unit tests in a comment\n\n</details>\n\n</details>\n\n<!-- finishing_touch_checkbox_end -->\n\n<!-- tips_start -->\n\n---\n\nThanks for using [CodeRabbit](https://coderabbit.ai?utm_source=oss&utm_medium=github&utm_campaign=fluent/fluent-bit&utm_content=11427)! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.\n\n<details>\n<summary>❤️ Share</summary>\n\n- [X](https://twitter.com/intent/tweet?text=I%20just%20used%20%40coderabbitai%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20the%20proprietary%20code.%20Check%20it%20out%3A&url=https%3A//coderabbit.ai)\n- [Mastodon](https://mastodon.social/share?text=I%20just%20used%20%40coderabbitai%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20the%20proprietary%20code.%20Check%20it%20out%3A%20https%3A%2F%2Fcoderabbit.ai)\n- [Reddit](https://www.reddit.com/submit?title=Great%20tool%20for%20code%20review%20-%20CodeRabbit&text=I%20just%20used%20CodeRabbit%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20proprietary%20code.%20Check%20it%20out%3A%20https%3A//coderabbit.ai)\n- [LinkedIn](https://www.linkedin.com/sharing/share-offsite/?url=https%3A%2F%2Fcoderabbit.ai&mini=true&title=Great%20tool%20for%20code%20review%20-%20CodeRabbit&summary=I%20just%20used%20CodeRabbit%20for%20my%20code%20review%2C%20and%20it%27s%20fantastic%21%20It%27s%20free%20for%20OSS%20and%20offers%20a%20free%20trial%20for%20proprietary%20code)\n\n</details>\n\n<sub>Comment `@coderabbitai help` to get the list of available commands and usage tips.</sub>\n\n<!-- tips_end -->\n\n<!-- internal state start -->\n\n\n<!-- DwQgtGAEAqAWCWBnSTIEMB26CuAXA9mAOYCmGJATmriQCaQDG+Ats2bgFyQAOFk+AIwBWJBrngA3EsgEBPRvlqU0AgfFwA6NPEgQAfACgjoCEYDEZyAAUASpETZWaCrKPR1AGxJcAkhnXwaB4ozNxebBi41PD4WABm+HzwGAD6aMwAjtw8HthEyZCQBgByjgKUXACMlQAsAEwA7IUGAKo2ADJcsLi43IgcAPQD+biw2AIaTMwDcbnsM3ORYGq4A9zYHh4D1fVNRS2IFZDM1ADuJPJFAMr42BQMJJACVBgMsFzJaZnZgEmEMM6kXBPF5vLgnRA0PjXKK4bD9fjcMjNGwkCTwEjnCjwuiIeCIigAGkg/joAiCiAA1vAidxqBR4AwKWAISRuLAyIgiQ8PEESD1kpU6gAOABszXaKhIHnhtHwDEQYAoJAy2HgSvoRQAwkrqHR0JxIHUAAx1EVgE3mmrQQUcACsIo4RpqAC0jAARaQMenccSxDgGKAAQVotGQmBQ/nEQRCYRIERhMSw+Di6CwgYAsgBFKwR9ZAsJ5AoACk+6SyAEpIAk+AAxRZAgBC6g0AesuXyWFOaGQNBZ9FO6lgkBsKhWWcgNQ0dRbUAAogAPdKxhQYOLwIh3aKxHgUfBopS0LgAA1m2HYy3Uuh0ZeyYG4KpIZ4AvLMJqXvrp8PZcLK8EeZ5AmqxLg9ICHgiSMOyjIeEgQLqHG/QGIUc6LqEXgrmuG5UL6GBcLwe7wAeLbIZAHpgUQkAePgFG3L0eBVruzCQL24gYEQXDFAMgbEbokAAGpBEQ9IYPQtF5vYsD4AObHEl+XhoBS8KcdxSG8W6cqOOwW5YEqKpqnQXDYK8UEUnQAHtAyHLeAoSj2OMzDqDQ9BGUofCBrSbyPNORo8JQO74CIYiQLpqpKvGiAtgYFiQAA8sIojiFIyAOE4LhuJ41l+AE0bwGhcZaThVYQe+WQ5IWWBFKUzDlBQVS1I0zRtJ0kDdL0/RDCMYwTFMCxnpEvXnisawbFsOwNfshy1ccZwXM0Nx3A8wKYKCEZfKVfzQACfJLcZYLdpCc0wnCXD4IiFUGCiaIYpQ2KhnilBEiStBktKVI0nSDJMiybIclyUq8vyGCCqK4qStKXCyvKirKqFepajqTn6lwxqmuadSWtadR2g6TqugYHqIF6eI4f6QYhmGWDJNlwS5bG8bafwKbhhm2a5vRBYdpAJapDelbVpAdZ9Y2zatlY7YFCxeoDqMw6juo46TtOrYLku6FMKu66boV+H7gZkAnvWF5AmA14fneD7Pq+GglbeX4Qr+uD/q2QGRKB4F8J50GwSgNDMIhJEq3lGGa9hiZ4buuu0DxUBkXklHUfweDiXEjHMdIrHsZAynR/xgnCaJSf0YgknSRRGBySQClKVxOfqQwmmRAzIX6YekBGZ7plR62FkPBghxcEwtkOAIDk9HqLl+e5aCeYaGg+fi/mBUCzdhewEVGPoxjgFAZCiUz4HEGQyiI1M8bh/w8ViJI0hPPIg/KKozbaLoYCGCYUBwKgqDhmgB+kOQ2E9Sn3YFwKgpw7KpXkHIGyD8VhaB0JvLepgDCanTApEgFkIQRVwPOTgBgABEhCoqWEDD4Q+ADdT0BSicFwjNIKYFIIgDekAWY5l4JQGGSB4LICILqXwfdKCI1wKcL8rDoY8kRuw5uuJeyQVEIpCMgE0GmUwbgbBuCc7BgPFWZItAUgeQpGgUgRYqCP1wJkMADByyqSgD4FMNZ2gNhSD4YoKRWEoGQDFYoqZ6CmJWBYhgHjZJAgSC5IktAkAqHQg4pxLi3FZhzGiNAAtHEpBilYaAPgvFFhic41x7iYo1hrJWaWQ5klXGgIGaALQrjHGkIgIxJBWzQBESgJQkQGTRjQCGAIsQwzcERM4RRhxaSAJgcFEg+Q+lEgEPgGW+JqwORkqMR4DS2CQF4RneO+RAn83cQ4AZiRNCi3GDBBgAwSDzm4EcvUgYrA+AmScZIyB26wAYXQAA3LJYkGJICXOuRQRGiBZDVXwNKfgSRIiUDiNPaQkVzAkI8JCbSPYvwrMgEoBgPJQ59Lof8m5ok+DrAEGcv5HTxDSGYcUWITSDAWXIMgN47zW4AGohQDHNAQohAZkEc2eQMVB6DVHqLwYQ/BxCWFkP/sfPU1DnDyGTPQtilLWxaL1OQcBNsyqcyVPkCEOKsDQOeYI5IFEbCzgAOI+AqbOGweSUhWHaC0K1xQiz4JtvgysBQ+V9wFcojBsERUaBgAgZAsob7lyBMwRQ8A4jyEwPIS5sFTXbIZBCv5FBdx8DeSJGCbEvmxA8Imq5mBQzMXZKtH1yBdWwVoaU8tjwNVkpArIeFErAxIuPomVFDaMWiGxSivFVyCXpuJaS9gAQVW2IwBIOUDNukHmPOaq1Nq7VxMdc6lxbqPXliPIotc6EjxVr9UKwNGgcGOyMPSm+TLlWssqByo0RhZwQlypQ8ZSorrgJIHEasBp0x0HgI4Ll4qeVgCMMem2J6VFnovf6MV7apVH0AVQxwNCFUplvYwowWiwzHCpicYIgrTLBywgzfm6KtU+tJqRftzgb7JMQLce4jwYIQn1oge4e71ZRCpjJI8NtJhHlbCiPVkIewVp9ZAet6KRxmPHFMa55BIiQCSfrXJG6XXczWtkfBAASAA3pxhgABffBwU5bmIyBwDgfj1CZF3QilhnaDU9vRZigdOFkCKvxYCvUEEx1ponRSphrZqV/OHX52gw0SVpo884Qd0aIlrjMk5tB/g4jpwFvAdCgYMBBFkAAL0oFe5IN6c2kFZTUR9pWGVKsq1wFl1XOUIbAxB8WvqoOCYYPBoh0VSHkJlahyBdCsMqp8HlcK6AqwjRYQktm+YOtFVrPWSATY4JYEgzzb4AxuvBtw5RWNohZBYseCcfLpB4xcliOQK+24c20DzRRbmWLsARJklcK47QBjQE1Ik5wgRIiIHLESNgiAGmkDJVIKiiJ/IPHB6aokp0cLRlGZNCMNAhLaSJKWv5UgVNxiQLibcan0WC3YGty8ZB74UGDX4V7Shq2CDhECSgWaJkAozkSJU6tbuFVx0qFt9g4yYHEPKHHIk+0RIYO+pgmwEqJGQPzcuFBCMIi7duAXfJaFUR2cGoCSpGBoFpGoGCIX/TOxuwlRMPilXkGCDBTLDATteC4OnKJSAhzuN59b2IRJdfJCR2dO3UoiQQmcECdWKUU0pxYNN9WmE7h6gtiQIk9bTuYGwNkbcbPFcAVnDOqUp1Hj4Xh7iNiJ0sBKBglIFwQeUebHkGjx46KwcQ8ePW5JCeQ56mbxQL5YEcv0FRBTnniQy3JAZym8QYOoihG57RU1KRTKyGu1CyIKRcCyERGvmgG/qcxrYtdzN/0cIpEItz1kRbN/4Al/Qdx7Juk3S+YT/MzhDj0HTIgIgVhp4UieDGjfKcOyFgMkn3kEt3lhHQEjishQAOIcH8qPBZuArMrQK2q2ABlELQNQMkoTgjn6C1FKAvLSKMD2tQFEG8FDkXrDmwFgTgVzLMmCpXH3ESMkJjjdESLMPgNQJyJAC0NADWOynIL2ESDPu7qEMDsxGihWiPgTq8IoJQPnvIe9hRLjngcTrhCuPqtgGIESAfkoHfjwKcp7njmvF8g9uhPoSmrnliLfNsh2KoZLkqIcIDBROHjQABL/liH5GwZMgaidD6ImKju/n5HCI0stgAWgXDvUqal8sAUiGQFEtAW3IcBJiXiEXwAQP5LQDoY8F/j/n/onHRMclACiD7mIAMILjrtRAyAPLEInqYuhFUfIDBKPBTPQL4RQBIOSF8s0QoPLmIBBITOyDkU9l8rSHCCQJUdIJpMLudmLj2OiPQFkVJo7sdqdgBEBAMQQLYbjmsc7qdntGwdoH3NNk8ptmCk9v0V4IMXwLjoxqyAljQBMkLnLjcTsV8gcpzsgBMeXkQNMdHjJNuFJr8VMc4ZpF8lTFGDBMVu0YwPJKcVniuDQLgnocoQ9DwBkYYW8QrliJsfUSHNpBDD+mVsgG0D4ESCnu9N4YSLbmPhgHzsLq4YgOMSEWSTYA8hBEoDChsGol8t0TBNgbIivMnmeGeLblCYEDCTfOAbGqRknl3FAFYMYYEncj4G7pFmotNk2skHmPoktvFgaqtDePqeVNJoOIdk7i7o8DLpsGSIyHCV3gSRRCcNwABFlNCfAIVvOpLhnhgFnlwEEFRDLrIrjpKUEF6TesBJcrgGiTToYUqBIrKiBDobCM4eMdCokH7PCcwUiTnpmorumkmrGRGFPjJGwNGrQrjuUYVDQM4LKKcBgOZNRA4XUX3LBBTjYQMF2BQBgLtquHJNRErhBDWYmGvilIEX7piViIjmSoPLOVrqOTnvjmopFP1i5oOlke5nRq5kOpzv5kSiqc2pOqFlAJgZJPQAunrEeNoUFK+OfhgHqVJjbKaR2HugUAeo8Eeh1ogP2Tpn+TeEJkYCRB6LxuDKpCRMUOkNZC+T6hBbxNANvtZLeSEh4AIA+U+R1vBVAO0HOiTNqvyl1ttlkJMNhZAFcOuPlmmY8EWIFqqfcstswOWP6IUKxbxFHpHm8hQAAFTEjQWQBPiQD4I3jipsUkQcX0I8V9qEzeiFSCX4LuK6nswdaiViVQAMDoVQkCXGnfAPnqDwXiXoXsIpAUBGTaXFAtDtDtAGXsXoU4lBSCUvn2W4A2XqXoWnjFwpBgQpiCUWVWWuWkYpCunaWQFBXG4BUaX6K/wIGOXEXcBRWTERXoXgnrKxU6YmUzFsBJUpDFnaUvnFlpaYBHbsY1g5aPB5YFbFYUAlBeKzhOYegMBcKxBGCFUZZZafl9qQhLLaS1blbMqNYigADMDQNWrWEA4GBgW2/5gm7wIGiGg2KGEC6Go2FWKqB2ySTadFhBT+kKkIMKi0FGFa7i5OKm62C2BFuEzCHoa4dWKFVYaF6VS5nYFpKWHgoYgZulT1KQ7hJAm+K4GAXyJpTGjIfIf13FwNFIgNn1Oa9uf1t6kJKmbx5+iptGN1N8d195N45pMsr171916FiAoYf1dw8AXy95hNiAf1KeKQ+WbAZND1FNf1feNN0FnxKZQUJpX1yQCQ/1D5CQCNy8ogVuYgGVLaKQrR6gLJGOEyNZotLgD5kIApbNpld5D1uuOVK5OV6JfA+hlAytqZ+NUVNJkA3Ffe+tqtmlj5eAD54erwjw3FzyAtLx8tSNhETtzR5tqF6FnNwthUvOkU9OuQjONGgdb2jwG49ZXAuScSrCKQAAEq2B6GdO0o1dIKApZgEgMIBbAJwYbENPeUpZoDnfdULEbAsOherbIbgFrTThoLAEYOFnEO3F5umuUG8miFySSZGN2ugIbleV3O2huS3VuRWoaYOj5pqQeUYbFoEsFuiKeTAEhegCGNeRjQ9T7Qyb7hgO+fEGVfrFNTeABd8HXcJiRKVVKHjUeBvXzomN9TCL9ZHjdh8keESFfbpRDaDUCODXKJDS/frJzTDVKHDTms/a/Wwdccjc/c0kvf3ceGvd7d8DvTooegfTtjNafbxOfW9fCAbATUTUCCTaA/rOTfg5ANTbTSQEQ7g99aQ8zRQ1Q3ddfVvXzV+Lziw1Q+A/SXznLbIOLblJLRwyplwwlDwwrZQErX/TeezV7eLdRBrewDXQoTrdrQw9I4beAabRkaoyrTI4XTbVEHbSbY7ZI5w9rrw67bQII4LS2towbZjZ9b7TbrzlAxKulsVUCKVblvlkWlVQ3bVfVaIE1VgDYBsCqsUP4xKi0NwEKaxiPsEFEEQL1YyqtayraINaNdyogu/GSnvDgAQAtbLiwGfMgUtfKnYTTpZvAi/G/NvAoKwOoMjZTZ+uiOcHouHoCtU0gjvN0kaPaJXCKAwLUEKEaJUA0LaLaGgGgDCiKKoA0MaAwAwCKLaLQDUCKA0HEAAJzVCdPZODVoC2gCA1DVAbOZaVCTNxDGiVBbO0BChxCDUjPDUqALMCBGi0AihGgkB1BoCDU7O1O0CDW0AbODUbMkAigkBCh0BjOmi2gy41CqBoA1ANAbNgt1A/oNBCgbMfPgs1C2i/MQDoAHPVBCi2g1DYGVwnODU1A1DfMMATNotoBCi0sDMMA1AkCDV1CDOIudMGDZOnwNOERNOogtN0Ba30D6BAA=== -->\n\n<!-- internal state end -->"},"request":{"retryCount":3,"retries":3,"retryAfter":16}},"response":{"url":"https://api.github.com/repos/fluent/fluent-bit/issues/comments/3847295303","status":500,"headers":{"access-control-allow-origin":"*","access-control-expose-headers":"ETag, Link, Location, Retry-After, X-GitHub-OTP, X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Used, X-RateLimit-Resource, X-RateLimit-Reset, X-OAuth-Scopes, X-Accepted-OAuth-Scopes, X-Poll-Interval, X-GitHub-Media-Type, X-GitHub-SSO, X-GitHub-Request-Id, Deprecation, Sunset","content-length":"0","content-security-policy":"default-src 'none'","content-type":"application/json; charset=utf-8","date":"Mon, 09 Feb 2026 16:12:32 GMT","referrer-policy":"origin-when-cross-origin, strict-origin-when-cross-origin","server":"github.com","strict-transport-security":"max-age=31536000; includeSubdomains; preload","vary":"Accept-Encoding, Accept, X-Requested-With","x-accepted-github-permissions":"issues=write; pull_requests=write","x-content-type-options":"nosniff","x-frame-options":"deny","x-github-api-version-selected":"2022-11-28","x-github-media-type":"github.v3; format=json","x-github-request-id":"404F:68365:10B97A:47D026:698A076F","x-ratelimit-limit":"15000","x-ratelimit-remaining":"14927","x-ratelimit-reset":"1770656672","x-ratelimit-resource":"core","x-ratelimit-used":"73","x-xss-protection":"0"},"data":""}}

@coderabbitai
Copy link

coderabbitai bot commented Feb 9, 2026

✅ Actions performed

Reviews resumed.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@plugins/in_amqp/in_amqp.c`:
- Around line 451-459: in in_amqp_config_destroy, guard the call to
flb_input_collector_delete by checking ctx->retry_coll_id is valid (not -1)
before calling it; update the function handling for struct flb_amqp (used by
in_amqp_init and cleanup paths) to only call
flb_input_collector_delete(ctx->retry_coll_id, ctx->ins) when ctx->retry_coll_id
has been initialized to a non -1 value, leaving other cleanup
(flb_log_event_encoder_destroy, in_amqp_connection_destroy, flb_free) unchanged.
- Around line 327-339: in_amqp_connection_destroy currently calls
flb_input_collector_delete unconditionally which can be invoked with c->coll_id
== -1; update the function to check c->coll_id != -1 before calling
flb_input_collector_delete and only reset c->coll_id to -1 after successful
deletion; locate the logic in in_amqp_connection_destroy that references struct
flb_amqp_connection *c = &ctx->conn, the c->conn check, and the
flb_input_collector_delete(c->coll_id, ctx->ins) call to add the guard.

@matwey
Copy link
Author

matwey commented Feb 9, 2026

@coderabbitai full review

@coderabbitai
Copy link

coderabbitai bot commented Feb 9, 2026

✅ Actions performed

Full review triggered.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants