Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions src/cloudevents/core/bindings/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
# under the License.

from dataclasses import dataclass
from datetime import datetime
from typing import Any, Callable, Final

from dateutil.parser import isoparse

from cloudevents.core.base import BaseCloudEvent, EventFactory
from cloudevents.core.bindings.common import (
CONTENT_TYPE_HEADER,
DATACONTENTTYPE_ATTR,
decode_header_value,
encode_header_value,
TIME_ATTR,
get_event_factory_for_version,
)
from cloudevents.core.formats.base import Format
Expand Down Expand Up @@ -108,15 +110,17 @@ def to_binary(
if attr_value is None:
continue

# Skip partitionkey - it goes in the message key, not headers
if attr_name == PARTITIONKEY_ATTR:
continue

if attr_name == DATACONTENTTYPE_ATTR:
headers[CONTENT_TYPE_HEADER] = str(attr_value).encode("utf-8")
else:
header_name = f"{CE_PREFIX}{attr_name}"
headers[header_name] = encode_header_value(attr_value).encode("utf-8")
if isinstance(attr_value, datetime):
s = attr_value.isoformat()
if s.endswith("+00:00"):
s = s[:-6] + "Z"
headers[header_name] = s.encode("utf-8")
else:
headers[header_name] = str(attr_value).encode("utf-8")

data = event.get_data()
datacontenttype = attributes.get(DATACONTENTTYPE_ATTR)
Expand Down Expand Up @@ -167,7 +171,10 @@ def from_binary(

if normalized_name.startswith(CE_PREFIX):
attr_name = normalized_name[len(CE_PREFIX) :]
attributes[attr_name] = decode_header_value(attr_name, header_value)
if attr_name == TIME_ATTR:
attributes[attr_name] = isoparse(header_value)
else:
attributes[attr_name] = header_value
elif normalized_name == CONTENT_TYPE_HEADER:
attributes[DATACONTENTTYPE_ATTR] = header_value

Expand Down
30 changes: 13 additions & 17 deletions tests/test_core/test_bindings/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ def test_to_binary_required_attributes() -> None:
assert "ce_type" in message.headers
assert message.headers["ce_type"] == b"com.example.test"
assert "ce_source" in message.headers
assert (
message.headers["ce_source"] == b"%2Ftest"
) # Forward slash is percent-encoded
assert message.headers["ce_source"] == b"/test"
assert "ce_id" in message.headers
assert message.headers["ce_id"] == b"test-id-123"
assert "ce_specversion" in message.headers
Expand All @@ -111,8 +109,7 @@ def test_to_binary_with_optional_attributes() -> None:
message = to_binary(event, JSONFormat())

assert message.headers["ce_subject"] == b"test-subject"
# All special characters including : and / are percent-encoded
assert message.headers["ce_dataschema"] == b"https%3A%2F%2Fexample.com%2Fschema"
assert message.headers["ce_dataschema"] == b"https://example.com/schema"


def test_to_binary_with_extensions() -> None:
Expand Down Expand Up @@ -171,8 +168,7 @@ def test_to_binary_datetime_encoding() -> None:
message = to_binary(event, JSONFormat())

assert "ce_time" in message.headers
# Should be ISO 8601 with Z suffix, percent-encoded
assert b"2023-01-15T10%3A30%3A45Z" in message.headers["ce_time"]
assert message.headers["ce_time"] == b"2023-01-15T10:30:45Z"


def test_to_binary_special_characters() -> None:
Expand All @@ -181,7 +177,7 @@ def test_to_binary_special_characters() -> None:
message = to_binary(event, JSONFormat())

assert "ce_subject" in message.headers
assert b"%" in message.headers["ce_subject"] # Percent encoding present
assert message.headers["ce_subject"] == b'Hello World! "quotes" & special'


def test_to_binary_datacontenttype_mapping() -> None:
Expand All @@ -195,12 +191,12 @@ def test_to_binary_datacontenttype_mapping() -> None:


def test_to_binary_partitionkey_in_key() -> None:
"""Test that partitionkey extension attribute becomes message key"""
"""Test that partitionkey becomes message key and is still included in headers"""
event = create_event({"partitionkey": "user-123"})
message = to_binary(event, JSONFormat())

assert message.key == "user-123"
assert "ce_partitionkey" not in message.headers
assert message.headers["ce_partitionkey"] == b"user-123"


def test_to_binary_custom_key_mapper() -> None:
Expand Down Expand Up @@ -228,7 +224,7 @@ def test_from_binary_required_attributes() -> None:
message = KafkaMessage(
headers={
"ce_type": b"com.example.test",
"ce_source": b"%2Ftest",
"ce_source": b"/test",
"ce_id": b"test-123",
"ce_specversion": b"1.0",
},
Expand All @@ -238,7 +234,7 @@ def test_from_binary_required_attributes() -> None:
event = from_binary(message, JSONFormat(), CloudEvent)

assert event.get_type() == "com.example.test"
assert event.get_source() == "/test" # Percent-decoded
assert event.get_source() == "/test"
assert event.get_id() == "test-123"
assert event.get_specversion() == "1.0"

Expand All @@ -252,15 +248,15 @@ def test_from_binary_with_optional_attributes() -> None:
"ce_id": b"123",
"ce_specversion": b"1.0",
"ce_subject": b"test-subject",
"ce_dataschema": b"https%3A%2F%2Fexample.com%2Fschema",
"ce_dataschema": b"https://example.com/schema",
},
key=None,
value=b"",
)
event = from_binary(message, JSONFormat(), CloudEvent)

assert event.get_subject() == "test-subject"
assert event.get_dataschema() == "https://example.com/schema" # Percent-decoded
assert event.get_dataschema() == "https://example.com/schema"


def test_from_binary_with_extensions() -> None:
Expand Down Expand Up @@ -310,7 +306,7 @@ def test_from_binary_datetime_parsing() -> None:
"ce_source": b"/test",
"ce_id": b"123",
"ce_specversion": b"1.0",
"ce_time": b"2023-01-15T10%3A30%3A45Z",
"ce_time": b"2023-01-15T10:30:45Z",
},
key=None,
value=b"",
Expand Down Expand Up @@ -658,7 +654,7 @@ def test_from_binary_with_defaults() -> None:
message = KafkaMessage(
headers={
"ce_type": b"com.example.test",
"ce_source": b"%2Ftest",
"ce_source": b"/test",
"ce_id": b"123",
"ce_specversion": b"1.0",
"content-type": b"application/json",
Expand Down Expand Up @@ -700,7 +696,7 @@ def test_from_kafka_with_defaults_binary() -> None:
message = KafkaMessage(
headers={
"ce_type": b"com.example.test",
"ce_source": b"%2Ftest",
"ce_source": b"/test",
"ce_id": b"123",
"ce_specversion": b"1.0",
},
Expand Down