Skip to content

out_kafka: Add dynamic/static headers support#8583

Open
DIFRIN wants to merge 3 commits intofluent:masterfrom
DIFRIN:output-kafka-add_headers
Open

out_kafka: Add dynamic/static headers support#8583
DIFRIN wants to merge 3 commits intofluent:masterfrom
DIFRIN:output-kafka-add_headers

Conversation

@DIFRIN
Copy link

@DIFRIN DIFRIN commented Mar 15, 2024

What

Add a new header configuration option to the out_kafka plugin that
attaches custom Kafka message headers to every produced message.

Two header modes are supported:

  • Static — the configured value is used verbatim.
  • Dynamic — a value prefixed with $ is resolved at runtime from the
    matching log record field. If the field does not exist or is not a
    string, a warning is emitted and that header is silently skipped; the
    message is still produced.

Memory ownership

rd_kafka_producev() with RD_KAFKA_V_HEADERS() takes ownership of
the rd_kafka_headers_t object on success only. The plugin calls
rd_kafka_headers_destroy() on every failure path:

  • Non–queue-full produce error
  • Queue-full retry limit exceeded (queue_full_retries)

Testing

  • Example configuration file for the change

    [INPUT]
    Name dummy
    Tag app.logs
    Dummy {"level": "info", "host": "web-01", "trace_id": "abc-123"}

    [OUTPUT]
    Name kafka
    Match *
    Brokers 192.168.1.3:9092
    Topics app-events

    # Static headers — value used verbatim
    header      X-Environment  production
    header      X-Source       fluent-bit
    
    # Dynamic headers — value resolved from the log record field
    header      X-Host         $host
    header      X-TraceId      $trace_id
    

    YAML equivalent:

    pipeline:
    inputs:
    - name: dummy
    tag: app.logs
    dummy: '{"level":"info","host":"web-01","trace_id":"abc-123"}'

    outputs:
    - name: kafka
    match: '*'
    brokers: 192.168.1.3:9092
    topics: app-events
    header: 'X-Environment production'
    header: 'X-Source fluent-bit'
    header: 'X-Host $host'
    header: 'X-TraceId $trace_id'


  • Debug log output

    All five runtime tests passed. Key lines annotated below.

    Test raw_format...
    [2026/03/01 07:44:47] [ info] [output:kafka:kafka.0] brokers='127.0.0.1:111' topics='test'
    [2026/03/01 07:44:47] [error] [output:kafka:kafka.0] 127.0.0.1:111/bootstrap:
    Connect to ipv4#127.0.0.1:111 failed: Connection refused
    [2026/03/01 07:44:54] [ warn] [output:kafka:kafka.0] Failed to force flush:
    Local: Timed out
    [ OK ]

    Test header_static... <-- two static headers: "env production", "team platform"
    [2026/03/01 07:44:54] [ info] [output:kafka:kafka.0] brokers='127.0.0.1:111' topics='test'
    [2026/03/01 07:44:54] [error] [output:kafka:kafka.0] 127.0.0.1:111/bootstrap:
    Connect to ipv4#127.0.0.1:111 failed: Connection refused
    [2026/03/01 07:45:02] [ warn] [output:kafka:kafka.0] Failed to force flush:
    Local: Timed out
    [ OK ]

    Test header_dynamic... <-- "source $key_0" resolved to "val_0"
    [2026/03/01 07:45:02] [ info] [output:kafka:kafka.0] brokers='127.0.0.1:111' topics='test'
    [2026/03/01 07:45:02] [error] [output:kafka:kafka.0] 127.0.0.1:111/bootstrap:
    Connect to ipv4#127.0.0.1:111 failed: Connection refused
    [2026/03/01 07:45:10] [ warn] [output:kafka:kafka.0] Failed to force flush:
    Local: Timed out
    [ OK ]

    Test header_dynamic_missing_field... <-- "$no_such_field" absent: warn + skip
    [2026/03/01 07:45:10] [ info] [output:kafka:kafka.0] brokers='127.0.0.1:111' topics='test'
    [2026/03/01 07:45:10] [error] [output:kafka:kafka.0] 127.0.0.1:111/bootstrap:
    Connect to ipv4#127.0.0.1:111 failed: Connection refused
    [2026/03/01 07:45:11] [ warn] [output:kafka:kafka.0] header 'missing': field
    '$no_such_field' not found in record or not a string, skipping <-- expected
    [2026/03/01 07:45:18] [ warn] [output:kafka:kafka.0] Failed to force flush:
    Local: Timed out
    [ OK ]

    Test header_mixed... <-- "app myapp" (static) + "source $key_1" (dynamic)
    [2026/03/01 07:45:18] [ info] [output:kafka:kafka.0] brokers='127.0.0.1:111' topics='test'
    [2026/03/01 07:45:18] [error] [output:kafka:kafka.0] 127.0.0.1:111/bootstrap:
    Connect to ipv4#127.0.0.1:111 failed: Connection refused
    [2026/03/01 07:45:26] [ warn] [output:kafka:kafka.0] Failed to force flush:
    Local: Timed out
    [ OK ]

    SUCCESS: All unit tests have passed.


  • Valgrind output

    Command:
    valgrind --tool=memcheck --leak-check=full
    --show-leak-kinds=definite,indirect
    --error-exitcode=1
    --suppressions=valgrind.supp
    build/bin/flb-rt-out_kafka

    Output:
    ==26539== Memcheck, a memory error detector
    ==26539== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
    ==26539== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
    ==26539== Command: build/bin/flb-rt-out_kafka

    Test raw_format... [ OK ]
    Test header_static... [ OK ]
    Test header_dynamic... [ OK ]
    Test header_dynamic_missing_field... [ OK ]
    Test header_mixed... [ OK ]
    SUCCESS: All unit tests have passed.

    ==26539== HEAP SUMMARY:
    ==26539== in use at exit: 0 bytes in 0 blocks
    ==26539== total heap usage: 9,995 allocs, 9,995 frees, 6,314,321 bytes allocated
    ==26539==
    ==26539== All heap blocks were freed -- no leaks are possible
    ==26539==
    ==26539== For lists of detected and suppressed errors, rerun with: -s
    ==26539== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)


  • Documentation

    • Documentation updated in fluent-bit-docs: added header row to
      the parameter table and a new "Message headers" section with static +
      dynamic examples in both .conf and .yaml formats.

  • [N/A ] Backporting

    this is a new feature targeting master only; backport
    decisions deferred to maintainers.

Summary by CodeRabbit

  • New Features

    • Kafka output plugin supports custom message headers on produced messages; headers may be static or resolved dynamically from message fields using $field_name syntax.
  • Reliability

    • Improved header lifecycle and error handling to prevent header leaks and ensure correct behavior across retry and failure scenarios.
  • Tests

    • Added tests for static, dynamic, mixed headers and missing-field handling.

@DIFRIN DIFRIN force-pushed the output-kafka-add_headers branch 2 times, most recently from 627834f to 3d7ad58 Compare March 15, 2024 20:00
@DIFRIN DIFRIN closed this Mar 19, 2024
@DIFRIN DIFRIN reopened this Mar 19, 2024
@patrick-stephens
Copy link
Contributor

Can you sort the DCO failure? This cannot be merged without it

@DIFRIN
Copy link
Author

DIFRIN commented Mar 28, 2024

@patrick-stephens DCO added to the commits

@DIFRIN
Copy link
Author

DIFRIN commented Apr 2, 2024

@patrick-stephens @edsiper this is the message of the failed check

The following tests FAILED: 46 - flb-rt-out_http (Failed) Error: Process completed with exit code 8.

it's not linked to the PR content

@patrick-stephens
Copy link
Contributor

Possibly the test failure is related to #9023

@github-actions
Copy link
Contributor

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Dec 14, 2024
@github-actions github-actions bot removed the Stale label Mar 22, 2025
@mitou70
Copy link

mitou70 commented May 20, 2025

@patrick-stephens Hello Pat my team and I are strongly interested in using this new feature. Why is it still on hold ? Thank you for your answer

@patrick-stephens
Copy link
Contributor

@patrick-stephens Hello Pat my team and I are strongly interested in using this new feature. Why is it still on hold ? Thank you for your answer

You can see here that it is failing a couple of checks so cannot be merged until it succeeds. It likely needs rebasing first and then updating as required.

I would encourage you to pick up the pr potentially and contribute the updates if you require it.

@github-actions
Copy link
Contributor

github-actions bot commented Sep 6, 2025

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Sep 6, 2025
@eschabell
Copy link
Contributor

@mitou70 and /or @DIFRIN can you please resolve the conflicts here? If you do we can get @patrick-stephens to merge and I can merge the corresponding docs PR fluent/fluent-bit-docs#1341

@github-actions
Copy link
Contributor

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Feb 18, 2026
@DIFRIN DIFRIN force-pushed the output-kafka-add_headers branch from c5ed387 to d3a01ac Compare February 28, 2026 18:32
@DIFRIN DIFRIN requested a review from cosmo0920 as a code owner February 28, 2026 18:32
@coderabbitai
Copy link

coderabbitai bot commented Feb 28, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between beed528 and c4e9e4c.

📒 Files selected for processing (2)
  • plugins/out_kafka/kafka.c
  • tests/runtime/out_kafka.c

📝 Walkthrough

Walkthrough

Adds Kafka message header support: new headers config field and list in plugin state; helpers to extract msgpack fields; header-construction logic supporting static and $field dynamic values; integration into message production with ownership and cleanup handling; unit tests for header scenarios.

Changes

Cohort / File(s) Summary
Configuration Structure
plugins/out_kafka/kafka_config.h
Added struct mk_list *headers to struct flb_out_kafka to store configured message headers.
Header Building & Integration
plugins/out_kafka/kafka.c
Added kafka_msgpack_get_field() and kafka_build_message_headers(); build rd_kafka_headers_t from static values and $field references during produce_message() when ctx->headers is set; switched to rd_kafka_producev() to pass headers; added ownership semantics, cleanup on errors/retries, and updated error handling; added header entry to config_map.
Tests
tests/runtime/out_kafka.c
Introduced shared test scaffolding and four header tests (header_static, header_dynamic, header_dynamic_missing_field, header_mixed); updated setup/teardown helpers and TEST_LIST.

Sequence Diagram

sequenceDiagram
    participant Config as Configuration
    participant Producer as produce_message()
    participant Builder as kafka_build_message_headers()
    participant MsgPack as Msgpack Map
    participant Kafka as librdkafka

    Config->>Producer: ctx->headers present
    Producer->>Builder: build headers from config + msgpack map
    Builder->>Builder: allocate rd_kafka_headers_t
    alt static header
        Builder->>Builder: append static header value
    else dynamic header
        Builder->>MsgPack: kafka_msgpack_get_field(map, key)
        MsgPack-->>Builder: return value or not found
        alt found & string
            Builder->>Builder: append dynamic header value
        else missing/invalid
            Builder-->>Producer: skip header (log warning)
        end
    end
    Builder-->>Producer: return headers or NULL on failure
    Producer->>Kafka: rd_kafka_producev(..., RD_KAFKA_V_HEADERS, headers, ...)
    alt queue full (retry)
        Kafka-->>Producer: queue full (headers remain owned by caller)
    else success
        Kafka-->>Producer: success (librdkafka owns headers)
    else other error
        Producer->>Builder: destroy headers to avoid leak
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 I sniffed through maps and chased a $-name,
tied static crumbs to dynamic frame.
Headers packed, no crumbs to spare—
off they hop to Kafka, light as air. ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 72.73% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely summarizes the main change: adding dynamic/static headers support to the out_kafka plugin, which is the primary feature introduced across the modified files.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
plugins/out_kafka/kafka.c (2)

98-115: Add defensive check for map type.

This function assumes map is a MSGPACK_OBJECT_MAP, but accessing map->via.map.size on a non-map type is undefined behavior. Although callers likely pass valid maps, a defensive check improves robustness.

Proposed fix
 static int kafka_msgpack_get_field(msgpack_object *map,
                                    const char *key, size_t key_len,
                                    msgpack_object **val)
 {
     size_t i;
 
+    if (map->type != MSGPACK_OBJECT_MAP) {
+        return -1;
+    }
+
     for (i = 0; i < map->via.map.size; i++) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/out_kafka/kafka.c` around lines 98 - 115, The function
kafka_msgpack_get_field assumes the incoming msgpack_object *map is of type
MSGPACK_OBJECT_MAP; add a defensive check at the start of
kafka_msgpack_get_field to verify map->type == MSGPACK_OBJECT_MAP (and return -1
if not or if map is NULL) before accessing map->via.map.size or
map->via.map.ptr, so the function avoids undefined behavior on non-map inputs.

165-167: Consider checking rd_kafka_header_add return value.

rd_kafka_header_add can return error codes (e.g., allocation failures). While unlikely, checking the return value and handling errors would improve robustness.

Example check
-            rd_kafka_header_add(headers,
-                                hkey->str, flb_sds_len(hkey->str),
-                                field->via.str.ptr, field->via.str.size);
+            if (rd_kafka_header_add(headers,
+                                    hkey->str, flb_sds_len(hkey->str),
+                                    field->via.str.ptr, field->via.str.size) != RD_KAFKA_RESP_ERR_NO_ERROR) {
+                flb_plg_warn(ctx->ins, "failed to add header '%s'", hkey->str);
+            }

Also applies to: 178-180

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/out_kafka/kafka.c` around lines 165 - 167, Check the return value of
rd_kafka_header_add calls used when adding header entries (the call using
headers, hkey->str / flb_sds_len(hkey->str), field->via.str.ptr /
field->via.str.size and the other similar call later) and handle non-zero error
codes: capture the return, log an error via the plugin logger with context
(which header key/value failed and the rd_kafka error code/message), and take an
appropriate action (e.g., skip the header or abort producing that message) to
avoid silent failures; ensure you call rd_kafka_err2str or similar to include
readable error text in the log.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 98-115: The function kafka_msgpack_get_field assumes the incoming
msgpack_object *map is of type MSGPACK_OBJECT_MAP; add a defensive check at the
start of kafka_msgpack_get_field to verify map->type == MSGPACK_OBJECT_MAP (and
return -1 if not or if map is NULL) before accessing map->via.map.size or
map->via.map.ptr, so the function avoids undefined behavior on non-map inputs.
- Around line 165-167: Check the return value of rd_kafka_header_add calls used
when adding header entries (the call using headers, hkey->str /
flb_sds_len(hkey->str), field->via.str.ptr / field->via.str.size and the other
similar call later) and handle non-zero error codes: capture the return, log an
error via the plugin logger with context (which header key/value failed and the
rd_kafka error code/message), and take an appropriate action (e.g., skip the
header or abort producing that message) to avoid silent failures; ensure you
call rd_kafka_err2str or similar to include readable error text in the log.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a7db0c7 and d3a01ac.

📒 Files selected for processing (2)
  • plugins/out_kafka/kafka.c
  • plugins/out_kafka/kafka_config.h

@DIFRIN
Copy link
Author

DIFRIN commented Feb 28, 2026

@eschabell / @patrick-stephens the rebase is done

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
tests/runtime/out_kafka.c (2)

55-58: Header test intent is documented but not actually asserted.

At Line 55-58, Line 95-99, Line 136-139, and Line 176-178, comments claim specific behaviors (static value usage, $field resolution, missing-field warning/skip), but the tests only validate startup and push flow. Consider adding explicit assertions tied to those outcomes so regressions in header logic are detectable.

Also applies to: 95-99, 136-139, 176-178

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/runtime/out_kafka.c` around lines 55 - 58, The comments in
tests/runtime/out_kafka.c describe expected header behaviors (static literal
values, $field resolution, and missing-field warning/skip) but the test only
verifies startup/push flow; update the test to explicitly assert
kafka_build_message_headers() outputs: (1) that static headers are present
verbatim (no '$' prefix), (2) that headers configured with a $prefix are
resolved to the correct event field values, and (3) that headers referencing
missing fields are either omitted and/or emit the expected warning; add
assertions checking header keys/values and, where applicable, expect/log
behavior so regressions in header logic are detected.

59-92: Consider extracting common Kafka test bootstrap/teardown into a helper.

These four test functions repeat nearly identical context/input/output setup and lifecycle code. A shared helper would reduce drift and make per-test intent clearer.

Also applies to: 100-133, 140-173, 179-214

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/runtime/out_kafka.c` around lines 59 - 92, Multiple Kafka tests repeat
identical bootstrap/teardown (flb_create, flb_input, flb_output, flb_output_set
calls, flb_start, flb_stop, flb_destroy and flb_lib_push); extract that into a
reusable helper (e.g., kafka_test_setup) that creates the context, registers the
lib input and kafka output, applies common options (match, format, topics,
brokers, queue_full_retries and any "header" entries passed in), calls flb_start
and returns the necessary handles (ctx, in_ffd, out_ffd or a small struct).
Replace flb_test_header_static and the other similar tests to call this helper,
do the per-test flb_lib_push/sleep, then call a corresponding
kafka_test_teardown (or use returned ctx to call flb_stop and flb_destroy).
Reference functions: flb_create, flb_input, flb_output, flb_output_set,
flb_start, flb_lib_push, flb_stop, flb_destroy in the implementation and call
sites.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/runtime/out_kafka.c`:
- Around line 25-35: The test ignores return values from flb_input_set,
flb_output_set, and flb_lib_push which can create false-positive passes; update
each call (e.g., flb_input_set(ctx, in_ffd, ...), flb_output_set(ctx, out_ffd,
...), and flb_lib_push(...)) to assert the expected return (use TEST_CHECK or
equivalent) immediately after the call — for setters expect non-negative/zero
success codes and for flb_lib_push check for success — so failures in setup or
push fail the test instead of being ignored.

---

Nitpick comments:
In `@tests/runtime/out_kafka.c`:
- Around line 55-58: The comments in tests/runtime/out_kafka.c describe expected
header behaviors (static literal values, $field resolution, and missing-field
warning/skip) but the test only verifies startup/push flow; update the test to
explicitly assert kafka_build_message_headers() outputs: (1) that static headers
are present verbatim (no '$' prefix), (2) that headers configured with a $prefix
are resolved to the correct event field values, and (3) that headers referencing
missing fields are either omitted and/or emit the expected warning; add
assertions checking header keys/values and, where applicable, expect/log
behavior so regressions in header logic are detected.
- Around line 59-92: Multiple Kafka tests repeat identical bootstrap/teardown
(flb_create, flb_input, flb_output, flb_output_set calls, flb_start, flb_stop,
flb_destroy and flb_lib_push); extract that into a reusable helper (e.g.,
kafka_test_setup) that creates the context, registers the lib input and kafka
output, applies common options (match, format, topics, brokers,
queue_full_retries and any "header" entries passed in), calls flb_start and
returns the necessary handles (ctx, in_ffd, out_ffd or a small struct). Replace
flb_test_header_static and the other similar tests to call this helper, do the
per-test flb_lib_push/sleep, then call a corresponding kafka_test_teardown (or
use returned ctx to call flb_stop and flb_destroy). Reference functions:
flb_create, flb_input, flb_output, flb_output_set, flb_start, flb_lib_push,
flb_stop, flb_destroy in the implementation and call sites.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d3a01ac and a9f4cb5.

📒 Files selected for processing (1)
  • tests/runtime/out_kafka.c

@DIFRIN DIFRIN force-pushed the output-kafka-add_headers branch 2 times, most recently from e6534ed to 3b101c1 Compare March 1, 2026 19:02
DIFRIN added 2 commits March 1, 2026 20:04
Add a new 'header' configuration option to the Kafka output plugin,
allowing users to attach custom headers to every produced message.

Header values starting with '$' are treated as dynamic references: the
named field is looked up in the log record and its string value is used
as the header value. Any other value is used verbatim (static header).

Example:

  header X-Env      production
  header X-TraceId  $trace_id

Multiple 'header' entries may be provided. If a referenced field is not
found in the record or its value is not a string, the header is skipped
and a warning is logged.

The produce call was migrated from rd_kafka_produce() to the variadic
rd_kafka_producev(), which is required to pass the headers object via
RD_KAFKA_V_HEADERS(). On produce failure, rdkafka does not take
ownership of the headers object, so it is freed explicitly; on
queue-full retries the same object is reused safely.

Signed-off-by: difrin <frindi.abderrahmane.ied@gmail.com>
 Add four runtime test cases covering the message header feature added
  in out_kafka. All tests use an unreachable broker (127.0.0.1:111)
  with queue_full_retries=1; no real Kafka instance is required.

  - header_static: two verbatim static headers; exercises the static
    branch of kafka_build_message_headers()
  - header_dynamic: value prefixed '$' resolved from the log record;
    exercises the dynamic field-lookup branch
  - header_dynamic_missing_field: referenced field absent from the
    record; verifies the warning-and-skip path is clean under Valgrind
  - header_mixed: one static + one dynamic header in the same message;
    exercises both branches together

  Tested with Valgrind 3.22.0: 0 errors, 0 leaks.

Signed-off-by: difrin <frindi.abderrahmane.ied@gmail.com>
@DIFRIN DIFRIN force-pushed the output-kafka-add_headers branch from 3b101c1 to 3244898 Compare March 1, 2026 19:05
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 174-177: The rd_kafka_header_add() calls (the five places where
headers are built using headers, hkey->str and field->via.str.ptr/size)
currently ignore their return value; capture each call's rd_kafka_resp_err_t
result, check for RD_KAFKA_RESP_ERR_NO_ERROR, and handle failures explicitly
(e.g., log an error including the returned error code and the header key via
your logger and decide whether to continue or abort message creation). Update
each occurrence to use a local rd_kafka_resp_err_t err =
rd_kafka_header_add(...); if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { /* log err
and header name; handle RD_KAFKA_RESP_ERR__READ_ONLY specially if needed */ } so
we don't silently drop headers.

In `@tests/runtime/out_kafka.c`:
- Around line 42-79: kafka_test_setup currently continues after critical
failures (null ctx or invalid ffd) which leads to invalid-handle calls; change
it to fail fast by checking results of flb_create, flb_input, flb_output,
flb_input_set, flb_output_set and flb_start and returning an error immediately
on failure (and cleaning up any partially initialized resources such as calling
flb_destroy on kctx->ctx if allocated) instead of only using TEST_CHECK
assertions; apply the same early-return hardening to the other setup path
referenced around the 82-86 block so any invalid ffd or NULL context aborts
setup rather than proceeding.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a9f4cb5 and 3244898.

📒 Files selected for processing (3)
  • plugins/out_kafka/kafka.c
  • plugins/out_kafka/kafka_config.h
  • tests/runtime/out_kafka.c
🚧 Files skipped from review as they are similar to previous changes (1)
  • plugins/out_kafka/kafka_config.h

@DIFRIN DIFRIN force-pushed the output-kafka-add_headers branch from 3244898 to beed528 Compare March 1, 2026 20:29
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
plugins/out_kafka/kafka.c (1)

605-650: ⚠️ Potential issue | 🔴 Critical

Non-queue-full produce failures are acknowledged as success.

At Line 605, when produce_err != RD_KAFKA_RESP_ERR_NO_ERROR and it is not queue-full (Line 615), the code logs and frees resources but later returns FLB_OK (Line 673). This can silently drop records instead of retrying.

Suggested fix
 int produce_message(struct flb_time *tm, msgpack_object *map,
                     struct flb_out_kafka *ctx, struct flb_config *config)
 {
     int i;
     int size;
+    int produce_result = FLB_OK;
     int queue_full_retries = 0;
@@
     if (produce_err != RD_KAFKA_RESP_ERR_NO_ERROR) {
@@
         if (produce_err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
@@
             goto retry;
         }
@@
         if (kafka_headers) {
             rd_kafka_headers_destroy(kafka_headers);
         }
+        produce_result = FLB_RETRY;
     }
@@
-    return FLB_OK;
+    return produce_result;
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/out_kafka/kafka.c` around lines 605 - 650, When produce_err !=
RD_KAFKA_RESP_ERR_NO_ERROR and it's not the queue-full case (i.e., not
RD_KAFKA_RESP_ERR__QUEUE_FULL), the code only frees kafka_headers and later
returns FLB_OK which silently drops records; change the control flow so
non-queue-full failures propagate an error to the caller instead of returning
FLB_OK — after rd_kafka_headers_destroy(kafka_headers) (if kafka_headers) return
an appropriate failure code (not FLB_OK) or set a retry indicator so the caller
retries; ensure you do not alter the queue-full handling that uses ctx->blocked,
queue_full_retries and goto retry, but make sure produce_err is
returned/propagated for all other error paths.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 605-650: When produce_err != RD_KAFKA_RESP_ERR_NO_ERROR and it's
not the queue-full case (i.e., not RD_KAFKA_RESP_ERR__QUEUE_FULL), the code only
frees kafka_headers and later returns FLB_OK which silently drops records;
change the control flow so non-queue-full failures propagate an error to the
caller instead of returning FLB_OK — after
rd_kafka_headers_destroy(kafka_headers) (if kafka_headers) return an appropriate
failure code (not FLB_OK) or set a retry indicator so the caller retries; ensure
you do not alter the queue-full handling that uses ctx->blocked,
queue_full_retries and goto retry, but make sure produce_err is
returned/propagated for all other error paths.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3244898 and beed528.

📒 Files selected for processing (2)
  • plugins/out_kafka/kafka.c
  • tests/runtime/out_kafka.c

 - Added char num_buf[64]; int num_len; locals before the loop
  - Split the old single if (found && is_string) / else warn into six branches:
    a. key not found → warn "not found in record, skipping"
    b. MSGPACK_OBJECT_STR → verbatim (unchanged)
    c. MSGPACK_OBJECT_POSITIVE_INTEGER → snprintf with PRIu64
    d. MSGPACK_OBJECT_NEGATIVE_INTEGER → snprintf with PRId64
    e. MSGPACK_OBJECT_FLOAT32 / FLOAT64 → snprintf with %g
    f. anything else (bool/array/map/nil) → warn "unsupported type, skipping"

 - Addes Helper for tests + updated block comments to mention integer/float conversion and the updated warning wording

Signed-off-by: difrin <frindi.abderrahmane.ied@gmail.com>
@DIFRIN DIFRIN force-pushed the output-kafka-add_headers branch from beed528 to c4e9e4c Compare March 1, 2026 20:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants