out_kafka: Add dynamic/static headers support#8583
out_kafka: Add dynamic/static headers support#8583DIFRIN wants to merge 3 commits intofluent:masterfrom
Conversation
627834f to
3d7ad58
Compare
|
Can you sort the DCO failure? This cannot be merged without it |
8482d45 to
1f1ea15
Compare
|
@patrick-stephens DCO added to the commits |
|
@patrick-stephens @edsiper this is the message of the failed check
it's not linked to the PR content |
|
Possibly the test failure is related to #9023 |
|
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days. |
|
@patrick-stephens Hello Pat my team and I are strongly interested in using this new feature. Why is it still on hold ? Thank you for your answer |
You can see here that it is failing a couple of checks so cannot be merged until it succeeds. It likely needs rebasing first and then updating as required. I would encourage you to pick up the pr potentially and contribute the updates if you require it. |
|
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days. |
|
@mitou70 and /or @DIFRIN can you please resolve the conflicts here? If you do we can get @patrick-stephens to merge and I can merge the corresponding docs PR fluent/fluent-bit-docs#1341 |
|
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days. |
c5ed387 to
d3a01ac
Compare
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review infoConfiguration used: defaults Review profile: CHILL Plan: Pro 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughAdds Kafka message header support: new Changes
Sequence DiagramsequenceDiagram
participant Config as Configuration
participant Producer as produce_message()
participant Builder as kafka_build_message_headers()
participant MsgPack as Msgpack Map
participant Kafka as librdkafka
Config->>Producer: ctx->headers present
Producer->>Builder: build headers from config + msgpack map
Builder->>Builder: allocate rd_kafka_headers_t
alt static header
Builder->>Builder: append static header value
else dynamic header
Builder->>MsgPack: kafka_msgpack_get_field(map, key)
MsgPack-->>Builder: return value or not found
alt found & string
Builder->>Builder: append dynamic header value
else missing/invalid
Builder-->>Producer: skip header (log warning)
end
end
Builder-->>Producer: return headers or NULL on failure
Producer->>Kafka: rd_kafka_producev(..., RD_KAFKA_V_HEADERS, headers, ...)
alt queue full (retry)
Kafka-->>Producer: queue full (headers remain owned by caller)
else success
Kafka-->>Producer: success (librdkafka owns headers)
else other error
Producer->>Builder: destroy headers to avoid leak
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
plugins/out_kafka/kafka.c (2)
98-115: Add defensive check for map type.This function assumes
mapis aMSGPACK_OBJECT_MAP, but accessingmap->via.map.sizeon a non-map type is undefined behavior. Although callers likely pass valid maps, a defensive check improves robustness.Proposed fix
static int kafka_msgpack_get_field(msgpack_object *map, const char *key, size_t key_len, msgpack_object **val) { size_t i; + if (map->type != MSGPACK_OBJECT_MAP) { + return -1; + } + for (i = 0; i < map->via.map.size; i++) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/out_kafka/kafka.c` around lines 98 - 115, The function kafka_msgpack_get_field assumes the incoming msgpack_object *map is of type MSGPACK_OBJECT_MAP; add a defensive check at the start of kafka_msgpack_get_field to verify map->type == MSGPACK_OBJECT_MAP (and return -1 if not or if map is NULL) before accessing map->via.map.size or map->via.map.ptr, so the function avoids undefined behavior on non-map inputs.
165-167: Consider checkingrd_kafka_header_addreturn value.
rd_kafka_header_addcan return error codes (e.g., allocation failures). While unlikely, checking the return value and handling errors would improve robustness.Example check
- rd_kafka_header_add(headers, - hkey->str, flb_sds_len(hkey->str), - field->via.str.ptr, field->via.str.size); + if (rd_kafka_header_add(headers, + hkey->str, flb_sds_len(hkey->str), + field->via.str.ptr, field->via.str.size) != RD_KAFKA_RESP_ERR_NO_ERROR) { + flb_plg_warn(ctx->ins, "failed to add header '%s'", hkey->str); + }Also applies to: 178-180
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/out_kafka/kafka.c` around lines 165 - 167, Check the return value of rd_kafka_header_add calls used when adding header entries (the call using headers, hkey->str / flb_sds_len(hkey->str), field->via.str.ptr / field->via.str.size and the other similar call later) and handle non-zero error codes: capture the return, log an error via the plugin logger with context (which header key/value failed and the rd_kafka error code/message), and take an appropriate action (e.g., skip the header or abort producing that message) to avoid silent failures; ensure you call rd_kafka_err2str or similar to include readable error text in the log.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 98-115: The function kafka_msgpack_get_field assumes the incoming
msgpack_object *map is of type MSGPACK_OBJECT_MAP; add a defensive check at the
start of kafka_msgpack_get_field to verify map->type == MSGPACK_OBJECT_MAP (and
return -1 if not or if map is NULL) before accessing map->via.map.size or
map->via.map.ptr, so the function avoids undefined behavior on non-map inputs.
- Around line 165-167: Check the return value of rd_kafka_header_add calls used
when adding header entries (the call using headers, hkey->str /
flb_sds_len(hkey->str), field->via.str.ptr / field->via.str.size and the other
similar call later) and handle non-zero error codes: capture the return, log an
error via the plugin logger with context (which header key/value failed and the
rd_kafka error code/message), and take an appropriate action (e.g., skip the
header or abort producing that message) to avoid silent failures; ensure you
call rd_kafka_err2str or similar to include readable error text in the log.
|
@eschabell / @patrick-stephens the rebase is done |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
tests/runtime/out_kafka.c (2)
55-58: Header test intent is documented but not actually asserted.At Line 55-58, Line 95-99, Line 136-139, and Line 176-178, comments claim specific behaviors (static value usage,
$fieldresolution, missing-field warning/skip), but the tests only validate startup and push flow. Consider adding explicit assertions tied to those outcomes so regressions in header logic are detectable.Also applies to: 95-99, 136-139, 176-178
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/runtime/out_kafka.c` around lines 55 - 58, The comments in tests/runtime/out_kafka.c describe expected header behaviors (static literal values, $field resolution, and missing-field warning/skip) but the test only verifies startup/push flow; update the test to explicitly assert kafka_build_message_headers() outputs: (1) that static headers are present verbatim (no '$' prefix), (2) that headers configured with a $prefix are resolved to the correct event field values, and (3) that headers referencing missing fields are either omitted and/or emit the expected warning; add assertions checking header keys/values and, where applicable, expect/log behavior so regressions in header logic are detected.
59-92: Consider extracting common Kafka test bootstrap/teardown into a helper.These four test functions repeat nearly identical context/input/output setup and lifecycle code. A shared helper would reduce drift and make per-test intent clearer.
Also applies to: 100-133, 140-173, 179-214
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/runtime/out_kafka.c` around lines 59 - 92, Multiple Kafka tests repeat identical bootstrap/teardown (flb_create, flb_input, flb_output, flb_output_set calls, flb_start, flb_stop, flb_destroy and flb_lib_push); extract that into a reusable helper (e.g., kafka_test_setup) that creates the context, registers the lib input and kafka output, applies common options (match, format, topics, brokers, queue_full_retries and any "header" entries passed in), calls flb_start and returns the necessary handles (ctx, in_ffd, out_ffd or a small struct). Replace flb_test_header_static and the other similar tests to call this helper, do the per-test flb_lib_push/sleep, then call a corresponding kafka_test_teardown (or use returned ctx to call flb_stop and flb_destroy). Reference functions: flb_create, flb_input, flb_output, flb_output_set, flb_start, flb_lib_push, flb_stop, flb_destroy in the implementation and call sites.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/runtime/out_kafka.c`:
- Around line 25-35: The test ignores return values from flb_input_set,
flb_output_set, and flb_lib_push which can create false-positive passes; update
each call (e.g., flb_input_set(ctx, in_ffd, ...), flb_output_set(ctx, out_ffd,
...), and flb_lib_push(...)) to assert the expected return (use TEST_CHECK or
equivalent) immediately after the call — for setters expect non-negative/zero
success codes and for flb_lib_push check for success — so failures in setup or
push fail the test instead of being ignored.
---
Nitpick comments:
In `@tests/runtime/out_kafka.c`:
- Around line 55-58: The comments in tests/runtime/out_kafka.c describe expected
header behaviors (static literal values, $field resolution, and missing-field
warning/skip) but the test only verifies startup/push flow; update the test to
explicitly assert kafka_build_message_headers() outputs: (1) that static headers
are present verbatim (no '$' prefix), (2) that headers configured with a $prefix
are resolved to the correct event field values, and (3) that headers referencing
missing fields are either omitted and/or emit the expected warning; add
assertions checking header keys/values and, where applicable, expect/log
behavior so regressions in header logic are detected.
- Around line 59-92: Multiple Kafka tests repeat identical bootstrap/teardown
(flb_create, flb_input, flb_output, flb_output_set calls, flb_start, flb_stop,
flb_destroy and flb_lib_push); extract that into a reusable helper (e.g.,
kafka_test_setup) that creates the context, registers the lib input and kafka
output, applies common options (match, format, topics, brokers,
queue_full_retries and any "header" entries passed in), calls flb_start and
returns the necessary handles (ctx, in_ffd, out_ffd or a small struct). Replace
flb_test_header_static and the other similar tests to call this helper, do the
per-test flb_lib_push/sleep, then call a corresponding kafka_test_teardown (or
use returned ctx to call flb_stop and flb_destroy). Reference functions:
flb_create, flb_input, flb_output, flb_output_set, flb_start, flb_lib_push,
flb_stop, flb_destroy in the implementation and call sites.
e6534ed to
3b101c1
Compare
Add a new 'header' configuration option to the Kafka output plugin, allowing users to attach custom headers to every produced message. Header values starting with '$' are treated as dynamic references: the named field is looked up in the log record and its string value is used as the header value. Any other value is used verbatim (static header). Example: header X-Env production header X-TraceId $trace_id Multiple 'header' entries may be provided. If a referenced field is not found in the record or its value is not a string, the header is skipped and a warning is logged. The produce call was migrated from rd_kafka_produce() to the variadic rd_kafka_producev(), which is required to pass the headers object via RD_KAFKA_V_HEADERS(). On produce failure, rdkafka does not take ownership of the headers object, so it is freed explicitly; on queue-full retries the same object is reused safely. Signed-off-by: difrin <frindi.abderrahmane.ied@gmail.com>
Add four runtime test cases covering the message header feature added
in out_kafka. All tests use an unreachable broker (127.0.0.1:111)
with queue_full_retries=1; no real Kafka instance is required.
- header_static: two verbatim static headers; exercises the static
branch of kafka_build_message_headers()
- header_dynamic: value prefixed '$' resolved from the log record;
exercises the dynamic field-lookup branch
- header_dynamic_missing_field: referenced field absent from the
record; verifies the warning-and-skip path is clean under Valgrind
- header_mixed: one static + one dynamic header in the same message;
exercises both branches together
Tested with Valgrind 3.22.0: 0 errors, 0 leaks.
Signed-off-by: difrin <frindi.abderrahmane.ied@gmail.com>
3b101c1 to
3244898
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 174-177: The rd_kafka_header_add() calls (the five places where
headers are built using headers, hkey->str and field->via.str.ptr/size)
currently ignore their return value; capture each call's rd_kafka_resp_err_t
result, check for RD_KAFKA_RESP_ERR_NO_ERROR, and handle failures explicitly
(e.g., log an error including the returned error code and the header key via
your logger and decide whether to continue or abort message creation). Update
each occurrence to use a local rd_kafka_resp_err_t err =
rd_kafka_header_add(...); if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { /* log err
and header name; handle RD_KAFKA_RESP_ERR__READ_ONLY specially if needed */ } so
we don't silently drop headers.
In `@tests/runtime/out_kafka.c`:
- Around line 42-79: kafka_test_setup currently continues after critical
failures (null ctx or invalid ffd) which leads to invalid-handle calls; change
it to fail fast by checking results of flb_create, flb_input, flb_output,
flb_input_set, flb_output_set and flb_start and returning an error immediately
on failure (and cleaning up any partially initialized resources such as calling
flb_destroy on kctx->ctx if allocated) instead of only using TEST_CHECK
assertions; apply the same early-return hardening to the other setup path
referenced around the 82-86 block so any invalid ffd or NULL context aborts
setup rather than proceeding.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
plugins/out_kafka/kafka.cplugins/out_kafka/kafka_config.htests/runtime/out_kafka.c
🚧 Files skipped from review as they are similar to previous changes (1)
- plugins/out_kafka/kafka_config.h
3244898 to
beed528
Compare
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/out_kafka/kafka.c (1)
605-650:⚠️ Potential issue | 🔴 CriticalNon-queue-full produce failures are acknowledged as success.
At Line 605, when
produce_err != RD_KAFKA_RESP_ERR_NO_ERRORand it is not queue-full (Line 615), the code logs and frees resources but later returnsFLB_OK(Line 673). This can silently drop records instead of retrying.Suggested fix
int produce_message(struct flb_time *tm, msgpack_object *map, struct flb_out_kafka *ctx, struct flb_config *config) { int i; int size; + int produce_result = FLB_OK; int queue_full_retries = 0; @@ if (produce_err != RD_KAFKA_RESP_ERR_NO_ERROR) { @@ if (produce_err == RD_KAFKA_RESP_ERR__QUEUE_FULL) { @@ goto retry; } @@ if (kafka_headers) { rd_kafka_headers_destroy(kafka_headers); } + produce_result = FLB_RETRY; } @@ - return FLB_OK; + return produce_result; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/out_kafka/kafka.c` around lines 605 - 650, When produce_err != RD_KAFKA_RESP_ERR_NO_ERROR and it's not the queue-full case (i.e., not RD_KAFKA_RESP_ERR__QUEUE_FULL), the code only frees kafka_headers and later returns FLB_OK which silently drops records; change the control flow so non-queue-full failures propagate an error to the caller instead of returning FLB_OK — after rd_kafka_headers_destroy(kafka_headers) (if kafka_headers) return an appropriate failure code (not FLB_OK) or set a retry indicator so the caller retries; ensure you do not alter the queue-full handling that uses ctx->blocked, queue_full_retries and goto retry, but make sure produce_err is returned/propagated for all other error paths.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 605-650: When produce_err != RD_KAFKA_RESP_ERR_NO_ERROR and it's
not the queue-full case (i.e., not RD_KAFKA_RESP_ERR__QUEUE_FULL), the code only
frees kafka_headers and later returns FLB_OK which silently drops records;
change the control flow so non-queue-full failures propagate an error to the
caller instead of returning FLB_OK — after
rd_kafka_headers_destroy(kafka_headers) (if kafka_headers) return an appropriate
failure code (not FLB_OK) or set a retry indicator so the caller retries; ensure
you do not alter the queue-full handling that uses ctx->blocked,
queue_full_retries and goto retry, but make sure produce_err is
returned/propagated for all other error paths.
- Added char num_buf[64]; int num_len; locals before the loop
- Split the old single if (found && is_string) / else warn into six branches:
a. key not found → warn "not found in record, skipping"
b. MSGPACK_OBJECT_STR → verbatim (unchanged)
c. MSGPACK_OBJECT_POSITIVE_INTEGER → snprintf with PRIu64
d. MSGPACK_OBJECT_NEGATIVE_INTEGER → snprintf with PRId64
e. MSGPACK_OBJECT_FLOAT32 / FLOAT64 → snprintf with %g
f. anything else (bool/array/map/nil) → warn "unsupported type, skipping"
- Addes Helper for tests + updated block comments to mention integer/float conversion and the updated warning wording
Signed-off-by: difrin <frindi.abderrahmane.ied@gmail.com>
beed528 to
c4e9e4c
Compare
What
Add a new header configuration option to the out_kafka plugin that
attaches custom Kafka message headers to every produced message.
Two header modes are supported:
matching log record field. If the field does not exist or is not a
string, a warning is emitted and that header is silently skipped; the
message is still produced.
Memory ownership
rd_kafka_producev() with RD_KAFKA_V_HEADERS() takes ownership of
the rd_kafka_headers_t object on success only. The plugin calls
rd_kafka_headers_destroy() on every failure path:
Testing
Example configuration file for the change
[INPUT]
Name dummy
Tag app.logs
Dummy {"level": "info", "host": "web-01", "trace_id": "abc-123"}
[OUTPUT]
Name kafka
Match *
Brokers 192.168.1.3:9092
Topics app-events
YAML equivalent:
pipeline:
inputs:
- name: dummy
tag: app.logs
dummy: '{"level":"info","host":"web-01","trace_id":"abc-123"}'
outputs:
- name: kafka
match: '*'
brokers: 192.168.1.3:9092
topics: app-events
header: 'X-Environment production'
header: 'X-Source fluent-bit'
header: 'X-Host $host'
header: 'X-TraceId $trace_id'
Debug log output
All five runtime tests passed. Key lines annotated below.
Test raw_format...
[2026/03/01 07:44:47] [ info] [output:kafka:kafka.0] brokers='127.0.0.1:111' topics='test'
[2026/03/01 07:44:47] [error] [output:kafka:kafka.0] 127.0.0.1:111/bootstrap:
Connect to ipv4#127.0.0.1:111 failed: Connection refused
[2026/03/01 07:44:54] [ warn] [output:kafka:kafka.0] Failed to force flush:
Local: Timed out
[ OK ]
Test header_static... <-- two static headers: "env production", "team platform"
[2026/03/01 07:44:54] [ info] [output:kafka:kafka.0] brokers='127.0.0.1:111' topics='test'
[2026/03/01 07:44:54] [error] [output:kafka:kafka.0] 127.0.0.1:111/bootstrap:
Connect to ipv4#127.0.0.1:111 failed: Connection refused
[2026/03/01 07:45:02] [ warn] [output:kafka:kafka.0] Failed to force flush:
Local: Timed out
[ OK ]
Test header_dynamic... <-- "source $key_0" resolved to "val_0"
[2026/03/01 07:45:02] [ info] [output:kafka:kafka.0] brokers='127.0.0.1:111' topics='test'
[2026/03/01 07:45:02] [error] [output:kafka:kafka.0] 127.0.0.1:111/bootstrap:
Connect to ipv4#127.0.0.1:111 failed: Connection refused
[2026/03/01 07:45:10] [ warn] [output:kafka:kafka.0] Failed to force flush:
Local: Timed out
[ OK ]
Test header_dynamic_missing_field... <-- "$no_such_field" absent: warn + skip
[2026/03/01 07:45:10] [ info] [output:kafka:kafka.0] brokers='127.0.0.1:111' topics='test'
[2026/03/01 07:45:10] [error] [output:kafka:kafka.0] 127.0.0.1:111/bootstrap:
Connect to ipv4#127.0.0.1:111 failed: Connection refused
[2026/03/01 07:45:11] [ warn] [output:kafka:kafka.0] header 'missing': field
'$no_such_field' not found in record or not a string, skipping <-- expected
[2026/03/01 07:45:18] [ warn] [output:kafka:kafka.0] Failed to force flush:
Local: Timed out
[ OK ]
Test header_mixed... <-- "app myapp" (static) + "source $key_1" (dynamic)
[2026/03/01 07:45:18] [ info] [output:kafka:kafka.0] brokers='127.0.0.1:111' topics='test'
[2026/03/01 07:45:18] [error] [output:kafka:kafka.0] 127.0.0.1:111/bootstrap:
Connect to ipv4#127.0.0.1:111 failed: Connection refused
[2026/03/01 07:45:26] [ warn] [output:kafka:kafka.0] Failed to force flush:
Local: Timed out
[ OK ]
SUCCESS: All unit tests have passed.
Valgrind output
Command:
valgrind --tool=memcheck --leak-check=full
--show-leak-kinds=definite,indirect
--error-exitcode=1
--suppressions=valgrind.supp
build/bin/flb-rt-out_kafka
Output:
==26539== Memcheck, a memory error detector
==26539== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==26539== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==26539== Command: build/bin/flb-rt-out_kafka
Test raw_format... [ OK ]
Test header_static... [ OK ]
Test header_dynamic... [ OK ]
Test header_dynamic_missing_field... [ OK ]
Test header_mixed... [ OK ]
SUCCESS: All unit tests have passed.
==26539== HEAP SUMMARY:
==26539== in use at exit: 0 bytes in 0 blocks
==26539== total heap usage: 9,995 allocs, 9,995 frees, 6,314,321 bytes allocated
==26539==
==26539== All heap blocks were freed -- no leaks are possible
==26539==
==26539== For lists of detected and suppressed errors, rerun with: -s
==26539== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
Documentation
the parameter table and a new "Message headers" section with static +
dynamic examples in both .conf and .yaml formats.
[N/A ] Backporting
this is a new feature targeting master only; backport
decisions deferred to maintainers.
Summary by CodeRabbit
New Features
Reliability
Tests