Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ jobs:
package_name: '@message-queue-toolkit/kafka'
node_version: ${{ matrix.node-version }}

kafkajs:
strategy:
matrix:
node-version: [22.x, 24.x]
uses: ./.github/workflows/ci.common.yml
with:
package_name: '@message-queue-toolkit/kafkajs'
node_version: ${{ matrix.node-version }}

automerge:
needs: [ general ]
runs-on: ubuntu-latest
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ jobs:
- 'packages/kafka/lib/**'
- 'packages/kafka/package.json'
- 'packages/kafka/README.md'
pkg_kafkajs:
- 'packages/kafkajs/lib/**'
- 'packages/kafkajs/package.json'
- 'packages/kafkajs/README.md'
pkg_gcp_pubsub:
- 'packages/gcp-pubsub/lib/**'
- 'packages/gcp-pubsub/package.json'
Expand Down Expand Up @@ -177,6 +181,7 @@ jobs:
["pkg_sqs"]="sqs"
["pkg_sns"]="sns"
["pkg_kafka"]="kafka"
["pkg_kafkajs"]="kafkajs"
["pkg_gcp_pubsub"]="gcp-pubsub"
["pkg_gcs_payload_store"]="gcs-payload-store"
["pkg_s3_payload_store"]="s3-payload-store"
Expand Down
238 changes: 236 additions & 2 deletions packages/kafka/README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,126 @@
# Kafka
This library provides utilities for implementing Kafka consumers and publishers.

This library provides utilities for implementing Kafka consumers and publishers using the [@platformatic/kafka](https://github.com/platformatic/kafka) client library.
While following the same patterns as other message broker implementations,
Kafka's unique characteristics require some specific adaptations in the publisher and consumer definitions.

> **_NOTE:_** Check [README.md](../../README.md) for transport-agnostic library documentation.

## Installation

```bash
npm install @message-queue-toolkit/kafka @platformatic/kafka
```

## Publishers

Use `AbstractKafkaPublisher` as a base class for publisher implementation.

```typescript
import { AbstractKafkaPublisher } from '@message-queue-toolkit/kafka'
import type { KafkaDependencies, TopicConfig } from '@message-queue-toolkit/kafka'
import { z } from 'zod'

const MY_MESSAGE_SCHEMA = z.object({
id: z.string(),
type: z.literal('my.event'),
payload: z.object({
userId: z.string(),
}),
})

const MY_TOPICS_CONFIG = [
{ topic: 'my-topic', schema: MY_MESSAGE_SCHEMA },
] as const satisfies TopicConfig[]

export class MyPublisher extends AbstractKafkaPublisher<typeof MY_TOPICS_CONFIG> {
constructor(dependencies: KafkaDependencies) {
super(dependencies, {
kafka: {
bootstrapBrokers: ['localhost:9092'],
clientId: 'my-app',
},
topicsConfig: MY_TOPICS_CONFIG,
autocreateTopics: true,
})
}
}

// Usage
const publisher = new MyPublisher(dependencies)
await publisher.init()
await publisher.publish('my-topic', {
id: '123',
type: 'my.event',
payload: { userId: 'user-1' },
})
await publisher.close()
```

See [test publisher](test/publisher/PermissionPublisher.ts) for an example of implementation.

## Consumers

Use `AbstractKafkaConsumer` as a base class for consumer implementation.

```typescript
import {
AbstractKafkaConsumer,
KafkaHandlerConfig,
KafkaHandlerRoutingBuilder
} from '@message-queue-toolkit/kafka'
import type { KafkaConsumerDependencies, TopicConfig } from '@message-queue-toolkit/kafka'

type MyExecutionContext = {
userService: UserService
}

export class MyConsumer extends AbstractKafkaConsumer<
typeof MY_TOPICS_CONFIG,
MyExecutionContext,
false
> {
constructor(
dependencies: KafkaConsumerDependencies,
executionContext: MyExecutionContext,
) {
super(
dependencies,
{
kafka: {
bootstrapBrokers: ['localhost:9092'],
clientId: 'my-app',
},
groupId: 'my-consumer-group',
batchProcessingEnabled: false,
handlers: new KafkaHandlerRoutingBuilder<
typeof MY_TOPICS_CONFIG,
MyExecutionContext,
false
>()
.addConfig(
'my-topic',
new KafkaHandlerConfig(MY_MESSAGE_SCHEMA, async (message, context) => {
// Handle message
console.log('Received:', message.value)
await context.userService.processEvent(message.value.payload.userId)
}),
)
.build(),
},
executionContext,
)
}
}

// Usage
const consumer = new MyConsumer(dependencies, { userService })
await consumer.init()
// Consumer is now running and processing messages
// ...
await consumer.close()
```

See [test consumer](test/consumer/PermissionConsumer.ts) for an example of implementation.

## Batch Processing
Expand All @@ -24,6 +129,52 @@ Kafka supports batch processing for improved throughput. To enable it, set `batc

When batch processing is enabled, message handlers receive an array of messages instead of a single message.

```typescript
export class MyBatchConsumer extends AbstractKafkaConsumer<
typeof MY_TOPICS_CONFIG,
MyExecutionContext,
true // Enable batch processing
> {
constructor(
dependencies: KafkaConsumerDependencies,
executionContext: MyExecutionContext,
) {
super(
dependencies,
{
kafka: {
bootstrapBrokers: ['localhost:9092'],
clientId: 'my-app',
},
groupId: 'my-batch-consumer-group',
batchProcessingEnabled: true,
batchProcessingOptions: {
batchSize: 100,
timeoutMilliseconds: 5000,
},
handlers: new KafkaHandlerRoutingBuilder<
typeof MY_TOPICS_CONFIG,
MyExecutionContext,
true
>()
.addConfig(
'my-topic',
new KafkaHandlerConfig(MY_MESSAGE_SCHEMA, async (messages, context) => {
// Handle batch of messages
console.log(`Processing batch of ${messages.length} messages`)
for (const message of messages) {
await context.userService.processEvent(message.value.payload.userId)
}
}),
)
.build(),
},
executionContext,
)
}
}
```

### Configuration Options

- `batchSize` - Maximum number of messages per batch
Expand All @@ -38,3 +189,86 @@ Messages are buffered per topic-partition combination. Batches are processed whe
After successful batch processing, the offset of the last message in the batch is committed.

See [test batch consumer](test/consumer/PermissionBatchConsumer.ts) for an example of implementation.

## Configuration

### KafkaConfig

```typescript
type KafkaConfig = {
bootstrapBrokers: string[] // List of Kafka broker addresses
clientId: string // Client identifier
ssl?: boolean | TLSConfig // SSL configuration
sasl?: SASLOptions // SASL authentication
connectTimeout?: number // Connection timeout in ms
}
```

### Publisher Options

- `kafka` - Kafka connection configuration
- `topicsConfig` - Array of topic configurations with schemas
- `autocreateTopics` - Whether to auto-create topics (default: false)

### Consumer Options

- `kafka` - Kafka connection configuration
- `groupId` - Consumer group ID (required)
- `handlers` - Handler routing configuration built with `KafkaHandlerRoutingBuilder`
- `batchProcessingEnabled` - Enable batch processing (default: false)
- `batchProcessingOptions` - Batch configuration (required if batch processing enabled)
- `autocreateTopics` - Whether to auto-create topics (default: false)
- `fromBeginning` - Start consuming from beginning of topic (default: false)
- `sessionTimeout` - Session timeout in ms
- `rebalanceTimeout` - Rebalance timeout in ms
- `heartbeatInterval` - Heartbeat interval in ms

## Error Handling and Retries

The consumer implements an in-memory retry mechanism with exponential backoff:
- Failed messages are retried up to 3 times
- Backoff delay: 2^(retry-1) seconds between retries
- After all retries are exhausted, the message is logged as an error

## Message Format

Messages are deserialized and passed to handlers with the following structure:

```typescript
type DeserializedMessage<MessageValue> = {
topic: string
partition: number
key: string | null
value: MessageValue
headers: Record<string, string | undefined>
offset: string
timestamp: string
}
```

## Handler Routing

The `KafkaHandlerRoutingBuilder` provides a type-safe way to configure message handlers:

```typescript
const handlers = new KafkaHandlerRoutingBuilder<TopicsConfig, ExecutionContext, BatchEnabled>()
.addConfig('topic-1', new KafkaHandlerConfig(SCHEMA_1, handler1))
.addConfig('topic-2', new KafkaHandlerConfig(SCHEMA_2, handler2))
.build()
```

Each handler config requires:
- A Zod schema for message validation
- A handler function that receives the validated message(s) and execution context

## Testing

Use the `handlerSpy` for testing message processing:

```typescript
// Wait for a specific message to be processed
const result = await consumer.handlerSpy.waitForMessageWithId('message-123', 'consumed')

// Check if a message was processed without waiting
const check = consumer.handlerSpy.checkForMessage({ type: 'my.event' })
```
Loading
Loading