Skip to content

in_amqp: Add documentation for in_amqp plugin#2387

Open
matwey wants to merge 1 commit intofluent:masterfrom
matwey:in_amqp
Open

in_amqp: Add documentation for in_amqp plugin#2387
matwey wants to merge 1 commit intofluent:masterfrom
matwey:in_amqp

Conversation

@matwey
Copy link

@matwey matwey commented Feb 25, 2026

Reference: fluent/fluent-bit#11427

Summary by CodeRabbit

  • Documentation
    • Added comprehensive AMQP input plugin documentation: configuration options (connection URI, queue, parser selection, reconnect behavior, thread buffer settings), how messages map to records (body as content; properties and headers as metadata), optional JSON parsing example, connection retry guidance, CLI and config usage examples, note on required client library, and a link to this doc from the Data Pipeline Inputs section.

@matwey matwey requested review from a team and eschabell as code owners February 25, 2026 13:10
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 25, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 846fd91 and aaecb8e.

📒 Files selected for processing (2)
  • SUMMARY.md
  • pipeline/inputs/amqp.md
🚧 Files skipped from review as they are similar to previous changes (1)
  • SUMMARY.md

📝 Walkthrough

Walkthrough

Adds a new AMQP input documentation page and updates the table of contents; documents configuration options, message-to-record mapping (body → record content, properties → metadata, headers → nested metadata), parsing examples, reconnect/thread buffer settings, and a rabbitmq-c requirement.

Changes

Cohort / File(s) Summary
Documentation: AMQP input & TOC
pipeline/inputs/amqp.md, SUMMARY.md
Adds pipeline/inputs/amqp.md describing the AMQP input plugin (configuration keys: uri, queue, parser, reconnect.*, thread ring buffer options), message transformation rules (body → record content, properties → metadata, headers → nested metadata), CLI/config examples, connection management notes, and a rabbitmq-c requirement; updates SUMMARY.md to include the new doc.

Estimated code review effort

🎯 1 (Trivial) | ⏱️ ~3 minutes

Poem

🐇 I hopped through queues at break of day,
Wrote how headers, props, and bodies play,
A tiny guide for messages sent,
For Fluent Bit to parse and invent,
📜 Rabbit-approved, in clear display.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately reflects the main change: adding documentation for the in_amqp plugin, which is exactly what the changeset contains.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
pipeline/inputs/amqp.md (1)

9-15: Missing thread.ring_buffer.* configuration parameters.

The configuration table omits the ring buffer settings common to Fluent Bit input plugins: thread.ring_buffer.capacity, thread.ring_buffer.window, and thread.ring_buffer.retry_limit. Based on learnings, in Fluent Bit input plugin documentation under pipeline/inputs, use the thread. prefix for ring buffer settings (e.g., thread.ring_buffer.capacity, thread.ring_buffer.window, thread.ring_buffer.retry_limit).

📝 Suggested table rows to add
 | `reconnect.retry_interval` | Retry interval (in seconds) to connect to the broker | `60` |
+| `thread.ring_buffer.capacity` | Set the capacity of the ring buffer | _default_ |
+| `thread.ring_buffer.window` | Set the window size of the ring buffer | _default_ |
+| `thread.ring_buffer.retry_limit` | Set the retry limit for the ring buffer | _default_ |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pipeline/inputs/amqp.md` around lines 9 - 15, Add the missing Fluent Bit ring
buffer settings to the AMQP input configuration table: include rows for
thread.ring_buffer.capacity, thread.ring_buffer.window, and
thread.ring_buffer.retry_limit with appropriate descriptions and defaults (use
typical Fluent Bit defaults or project-consistent values); update any
documentation references for the amqp input parser/queue/reconnect entries if
needed and ensure the keys use the thread. prefix exactly
(thread.ring_buffer.capacity, thread.ring_buffer.window,
thread.ring_buffer.retry_limit) so they match other pipeline/inputs docs and the
plugin implementation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pipeline/inputs/amqp.md`:
- Around line 138-140: Summary: Clarify that message requeue on connection loss
depends on acknowledgment mode and timing. Update the bullet that currently
reads "Messages consumed but not yet processed will be requeued by the broker
(assuming proper acknowledgment settings)" to explicitly state this only applies
when the plugin uses manual acknowledgment (not auto-ack), and only if the
connection drops before the client sends basic.ack or basic.nack; reference the
reconnect behavior controlled by reconnect.retry_limits and
reconnect.retry_interval. Edit the text to mention "manual (explicit) ACK mode"
vs "auto-ack", note broker requeue happens on client disconnect prior to
ack/nack, and recommend users ensure the plugin is configured for manual ack if
they require this requeue guarantee.
- Around line 95-110: The YAML places the parsers list under pipeline:, which is
invalid for Fluent Bit; move the "parsers:" block out to top-level (sibling to
"pipeline:"), remove the nested "parsers" under pipeline, and ensure the parser
name ("json") referenced in pipeline.inputs (the amqp input) matches the
top-level parsers entry; verify the "pipeline:", "inputs:", and "outputs:"
structure remains unchanged.
- Around line 9-15: Move the parsers: block out from under pipeline: so parsers:
is a top-level YAML key (alongside pipeline:) and ensure the example uses
parser: json under the amqp input (references: parsers:, pipeline:, inputs:,
parser:). Also update the configuration table to include the two missing
threading parameters thread.ring_buffer.capacity (default 1024) and
thread.ring_buffer.window (default 5) with their descriptions so the AMQP input
docs match the other input plugins.

---

Nitpick comments:
In `@pipeline/inputs/amqp.md`:
- Around line 9-15: Add the missing Fluent Bit ring buffer settings to the AMQP
input configuration table: include rows for thread.ring_buffer.capacity,
thread.ring_buffer.window, and thread.ring_buffer.retry_limit with appropriate
descriptions and defaults (use typical Fluent Bit defaults or project-consistent
values); update any documentation references for the amqp input
parser/queue/reconnect entries if needed and ensure the keys use the thread.
prefix exactly (thread.ring_buffer.capacity, thread.ring_buffer.window,
thread.ring_buffer.retry_limit) so they match other pipeline/inputs docs and the
plugin implementation.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4a18664 and 2b4ec4b.

📒 Files selected for processing (1)
  • pipeline/inputs/amqp.md

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (3)
pipeline/inputs/amqp.md (3)

95-110: ⚠️ Potential issue | 🟠 Major

Fix YAML example structure: parsers: must be top-level.

In Line 103, parsers: is nested under pipeline:. Move it to top-level (sibling of pipeline:), and keep parser: json on the input.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pipeline/inputs/amqp.md` around lines 95 - 110, The YAML example nests
"parsers:" under "pipeline:" which is incorrect; move the "parsers:" block to be
a top-level sibling of "pipeline:" (so it sits alongside "pipeline:" and
"outputs:"), leaving the input's "parser: json" reference inside the
"pipeline.inputs" entry unchanged; update the example so the keys "pipeline:",
"parsers:", and "outputs:" are all top-level keys and "parser: json" remains
under the amqp input.

9-15: ⚠️ Potential issue | 🟠 Major

Add missing threaded ring buffer parameters to the config table.

Please document thread.ring_buffer.capacity and thread.ring_buffer.window (and thread.ring_buffer.retry_limit if supported by this input) so AMQP matches other input plugin docs.

Based on learnings: In Fluent Bit input plugin documentation under pipeline/inputs, use the thread. prefix for ring buffer settings (for example thread.ring_buffer.capacity, thread.ring_buffer.window, thread.ring_buffer.retry_limit).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pipeline/inputs/amqp.md` around lines 9 - 15, The AMQP input docs are missing
the threaded ring buffer settings; update the config table to include entries
for thread.ring_buffer.capacity, thread.ring_buffer.window and (if supported)
thread.ring_buffer.retry_limit with short descriptions and defaults consistent
with other input docs (e.g., capacity = default buffer size, window = default
window size, retry_limit = default retry count); reference the existing keys
`uri`, `queue`, `parser`, `reconnect.retry_limits`, and
`reconnect.retry_interval` so the new `thread.ring_buffer.*` rows follow the
same format and naming convention used elsewhere in pipeline/inputs
documentation.

138-140: ⚠️ Potential issue | 🟡 Minor

Clarify requeue behavior for ACK mode and timing.

Line 140 currently implies a broad guarantee. Please state this only applies in manual ACK mode and only if disconnect happens before ack/nack is sent.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pipeline/inputs/amqp.md` around lines 138 - 140, The sentence about requeued
messages is too broad; update the docs around the third bullet so it explicitly
states that message requeueing by the broker applies only when the consumer is
using manual ACK mode and the disconnect occurs before the consumer has sent an
ack or nack for the message (i.e., unacknowledged messages at disconnect).
Reference the existing wording about "Messages consumed but not yet processed
will be requeued by the broker" and change it to mention "manual ACK mode" and
"disconnect before ack/nack is sent" to make the scope precise.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@pipeline/inputs/amqp.md`:
- Around line 95-110: The YAML example nests "parsers:" under "pipeline:" which
is incorrect; move the "parsers:" block to be a top-level sibling of "pipeline:"
(so it sits alongside "pipeline:" and "outputs:"), leaving the input's "parser:
json" reference inside the "pipeline.inputs" entry unchanged; update the example
so the keys "pipeline:", "parsers:", and "outputs:" are all top-level keys and
"parser: json" remains under the amqp input.
- Around line 9-15: The AMQP input docs are missing the threaded ring buffer
settings; update the config table to include entries for
thread.ring_buffer.capacity, thread.ring_buffer.window and (if supported)
thread.ring_buffer.retry_limit with short descriptions and defaults consistent
with other input docs (e.g., capacity = default buffer size, window = default
window size, retry_limit = default retry count); reference the existing keys
`uri`, `queue`, `parser`, `reconnect.retry_limits`, and
`reconnect.retry_interval` so the new `thread.ring_buffer.*` rows follow the
same format and naming convention used elsewhere in pipeline/inputs
documentation.
- Around line 138-140: The sentence about requeued messages is too broad; update
the docs around the third bullet so it explicitly states that message requeueing
by the broker applies only when the consumer is using manual ACK mode and the
disconnect occurs before the consumer has sent an ack or nack for the message
(i.e., unacknowledged messages at disconnect). Reference the existing wording
about "Messages consumed but not yet processed will be requeued by the broker"
and change it to mention "manual ACK mode" and "disconnect before ack/nack is
sent" to make the scope precise.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2b4ec4b and 0641155.

📒 Files selected for processing (2)
  • SUMMARY.md
  • pipeline/inputs/amqp.md

@eschabell eschabell self-assigned this Feb 26, 2026
@eschabell
Copy link
Collaborator

@matwey before I review I would like to ask that you address all the comments from coderabbitai bot, please?

@eschabell eschabell added waiting-for-user Waiting for user/contributors feedback or requested changes waiting-on-code-merge waiting-on-review Waiting on a review from mainteners labels Feb 26, 2026
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
pipeline/inputs/amqp.md (1)

16-18: ⚠️ Potential issue | 🟡 Minor

Use concrete defaults instead of _default_ placeholders.

Please replace _default_ with actual values (or explicitly state “implementation-defined” if truly unspecified), so operators can configure reliably.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pipeline/inputs/amqp.md` around lines 16 - 18, Replace the `_default_`
placeholders for thread.ring_buffer.capacity, thread.ring_buffer.window, and
thread.ring_buffer.retry_limit with concrete defaults (or explicitly
"implementation-defined" if the implementation truly decides them at runtime);
update the table lines for the keys thread.ring_buffer.capacity,
thread.ring_buffer.window, and thread.ring_buffer.retry_limit to show numeric
defaults (e.g., capacity: 1024, window: 256, retry_limit: 3) or the phrase
"implementation-defined", and include units or semantics (items, entries,
attempts) so operators know what to configure; ensure the chosen values match
the actual implementation/config constants and adjust any duplicate docs
accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pipeline/inputs/amqp.md`:
- Line 139: Edit the sentence that mentions reconnect.retry_limits and
reconnect.retry_interval so it reads "the plugin will automatically attempt to
reconnect…" (i.e., add the definite article "the" before "plugin") to improve
clarity; locate the sentence containing the property names
reconnect.retry_limits and reconnect.retry_interval and update it accordingly.

---

Duplicate comments:
In `@pipeline/inputs/amqp.md`:
- Around line 16-18: Replace the `_default_` placeholders for
thread.ring_buffer.capacity, thread.ring_buffer.window, and
thread.ring_buffer.retry_limit with concrete defaults (or explicitly
"implementation-defined" if the implementation truly decides them at runtime);
update the table lines for the keys thread.ring_buffer.capacity,
thread.ring_buffer.window, and thread.ring_buffer.retry_limit to show numeric
defaults (e.g., capacity: 1024, window: 256, retry_limit: 3) or the phrase
"implementation-defined", and include units or semantics (items, entries,
attempts) so operators know what to configure; ensure the chosen values match
the actual implementation/config constants and adjust any duplicate docs
accordingly.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0641155 and c155d00.

📒 Files selected for processing (2)
  • SUMMARY.md
  • pipeline/inputs/amqp.md

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Signed-off-by: Matwey V. Kornilov <matwey.kornilov@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pipeline/inputs/amqp.md`:
- Around line 16-18: Update the table to replace the `_default_` placeholders
for `thread.ring_buffer.capacity` and `thread.ring_buffer.window` with concrete
defaults `1024` and `5` respectively (matching other input plugins like
tail/opentelemetry/prometheus-textfile), and investigate the
`thread.ring_buffer.retry_limit` entry: check the AMQP input plugin docs/source
for that parameter and either add the correct documented default if it exists or
remove the `thread.ring_buffer.retry_limit` row from the AMQP docs if it is not
a valid/configurable option for the plugin.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c155d00 and 846fd91.

📒 Files selected for processing (1)
  • pipeline/inputs/amqp.md

@matwey
Copy link
Author

matwey commented Mar 2, 2026

@coderabbitai review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 2, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@matwey
Copy link
Author

matwey commented Mar 2, 2026

@coderabbitai resume

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 2, 2026

✅ Actions performed

Reviews resumed.

@matwey
Copy link
Author

matwey commented Mar 2, 2026

@matwey before I review I would like to ask that you address all the comments from coderabbitai bot, please?

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

waiting-for-user Waiting for user/contributors feedback or requested changes waiting-on-code-merge waiting-on-review Waiting on a review from mainteners

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants