From a4c2f7e34bd7fafdf228326be817c6989de02be2 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 14 Jan 2026 12:53:54 +0100 Subject: [PATCH 01/20] Removing backpressure to test --- packages/kafka/lib/AbstractKafkaConsumer.ts | 32 +++- .../lib/utils/KafkaMessageBatchStream.spec.ts | 169 +++++++----------- .../lib/utils/KafkaMessageBatchStream.ts | 47 ++--- 3 files changed, 104 insertions(+), 144 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index cc69a009..6bc3d11f 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -190,19 +190,14 @@ export abstract class AbstractKafkaConsumer< }) this.consumerStream = await this.consumer.consume({ ...consumeOptions, topics }) - this.consumerStream.on('error', (error) => this.handlerError(error)) - if (this.options.batchProcessingEnabled && this.options.batchProcessingOptions) { this.messageBatchStream = new KafkaMessageBatchStream< DeserializedMessage> - >( - (batch) => - this.consume(batch.topic, batch.messages).catch((error) => this.handlerError(error)), - this.options.batchProcessingOptions, - ) + >({ + batchSize: this.options.batchProcessingOptions.batchSize, + timeoutMilliseconds: this.options.batchProcessingOptions.timeoutMilliseconds, + }) this.consumerStream.pipe(this.messageBatchStream) - } else { - this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error)) } } catch (error) { throw new InternalError({ @@ -211,6 +206,15 @@ export abstract class AbstractKafkaConsumer< cause: error, }) } + + if (this.options.batchProcessingEnabled && this.messageBatchStream) { + this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error)) + } else { + this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error)) + } + + this.consumerStream.on('error', (error) => this.handlerError(error)) + this.messageBatchStream?.on('error', (error) => this.handlerError(error)) } private async handleSyncStream( @@ -223,6 +227,16 @@ export abstract class AbstractKafkaConsumer< ) } } + private async handleSyncStreamBatch( + stream: KafkaMessageBatchStream>>, + ): Promise { + for await (const messageBatch of stream) { + await this.consume( + messageBatch.topic, + messageBatch.messages as DeserializedMessage>, + ) + } + } async close(): Promise { if (!this.consumerStream && !this.messageBatchStream) { diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts index ccc88e03..de47c429 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts @@ -21,20 +21,18 @@ describe('KafkaMessageBatchStream', () => { resolvePromise = resolve }) - const batchStream = new KafkaMessageBatchStream( - (batch) => { - receivedBatches.push(batch) - // We expect 3 batches and the last message waiting in the stream - if (receivedBatches.length >= 3) { - resolvePromise() - } - return Promise.resolve() - }, - { - batchSize: 3, - timeoutMilliseconds: 10000, - }, - ) // Setting big timeout to check batch size only + const batchStream = new KafkaMessageBatchStream({ + batchSize: 3, + timeoutMilliseconds: 10000, + }) // Setting big timeout to check batch size only + + batchStream.on('data', (batch) => { + receivedBatches.push(batch) + // We expect 3 batches and the last message waiting in the stream + if (receivedBatches.length >= 3) { + resolvePromise() + } + }) for (const message of messages) { batchStream.write(message) @@ -63,16 +61,14 @@ describe('KafkaMessageBatchStream', () => { // When const receivedBatches: MessageBatch[] = [] - const batchStream = new KafkaMessageBatchStream( - (batch) => { - receivedBatches.push(batch) - return Promise.resolve() - }, - { - batchSize: 1000, - timeoutMilliseconds: 100, - }, - ) // Setting big batch size to check timeout only + const batchStream = new KafkaMessageBatchStream({ + batchSize: 1000, + timeoutMilliseconds: 100, + }) // Setting big batch size to check timeout only + + batchStream.on('data', (batch) => { + receivedBatches.push(batch) + }) for (const message of messages) { batchStream.write(message) @@ -120,27 +116,24 @@ describe('KafkaMessageBatchStream', () => { resolvePromise = resolve }) - const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>( - (batch) => { - const key = `${batch.topic}:${batch.partition}` - if (!receivedBatchesByTopicPartition[key]) { - receivedBatchesByTopicPartition[key] = [] - } - receivedBatchesByTopicPartition[key]!.push(batch.messages) - - // We expect 5 batches and last message waiting in the stream - receivedMessagesCounter++ - if (receivedMessagesCounter >= 5) { - resolvePromise() - } - - return Promise.resolve() - }, - { - batchSize: 2, - timeoutMilliseconds: 10000, - }, - ) // Setting big timeout to check batch size only + const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>({ + batchSize: 2, + timeoutMilliseconds: 10000, + }) // Setting big timeout to check batch size only + + batchStream.on('data', (batch) => { + const key = `${batch.topic}:${batch.partition}` + if (!receivedBatchesByTopicPartition[key]) { + receivedBatchesByTopicPartition[key] = [] + } + receivedBatchesByTopicPartition[key]!.push(batch.messages) + + // We expect 5 batches and last message waiting in the stream + receivedMessagesCounter++ + if (receivedMessagesCounter >= 5) { + resolvePromise() + } + }) for (const message of messages) { batchStream.write(message) @@ -199,23 +192,20 @@ describe('KafkaMessageBatchStream', () => { resolvePromise = resolve }) - const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>( - (batch) => { - receivedBatches.push(batch) + const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>({ + batchSize: 2, + timeoutMilliseconds: 10000, + }) // Setting big timeout to check batch size only - // We expect 4 batches (2 per partition) - receivedBatchesCounter++ - if (receivedBatchesCounter >= 4) { - resolvePromise() - } + batchStream.on('data', (batch) => { + receivedBatches.push(batch) - return Promise.resolve() - }, - { - batchSize: 2, - timeoutMilliseconds: 10000, - }, - ) // Setting big timeout to check batch size only + // We expect 4 batches (2 per partition) + receivedBatchesCounter++ + if (receivedBatchesCounter >= 4) { + resolvePromise() + } + }) for (const message of messages) { batchStream.write(message) @@ -232,7 +222,8 @@ describe('KafkaMessageBatchStream', () => { ]) }) - it('should handle backpressure correctly when timeout flush is slow', async () => { + // Skipping for beta + it.skip('should emit batches based on timeout with delayed writes', async () => { // Given const topic = 'test-topic' const messages = Array.from({ length: 6 }, (_, i) => ({ @@ -242,58 +233,28 @@ describe('KafkaMessageBatchStream', () => { partition: 0, })) - const batchStartTimes: number[] = [] // Track start times of batch processing - const batchEndTimes: number[] = [] // Track end times of batch processing const batchMessageCounts: number[] = [] // Track number of messages per batch - let maxConcurrentBatches = 0 // Track max concurrent batches - - let batchesProcessing = 0 - const batchStream = new KafkaMessageBatchStream( - async (batch) => { - batchStartTimes.push(Date.now()) - batchMessageCounts.push(batch.messages.length) - batchesProcessing++ - maxConcurrentBatches = Math.max(maxConcurrentBatches, batchesProcessing) - - // Simulate batch processing (50ms per batch) - await setTimeout(50) + const batchStream = new KafkaMessageBatchStream({ + batchSize: 1000, // Large batch size to never trigger size-based flushing + timeoutMilliseconds: 30, // Short timeout to trigger flush + }) - batchEndTimes.push(Date.now()) - batchesProcessing-- - }, - { - batchSize: 1000, // Large batch size to never trigger size-based flushing - timeoutMilliseconds: 10, // Short timeout to trigger flush after each message - }, - ) + batchStream.on('data', (batch) => { + batchMessageCounts.push(batch.messages.length) + }) // When: Write messages with 20ms delay between them - // Since processing (50ms) is slower than message arrival + timeout, backpressure causes accumulation for (const message of messages) { batchStream.write(message) await setTimeout(20) } - // Then - // Wait until all 3 batches have been processed - await waitAndRetry(() => batchMessageCounts.length >= 3, 500, 20) - - // Backpressure causes messages to accumulate while previous batch processes: - // - Batch 1: Message 1 (flushed at 10ms timeout) - // - Batch 2: Messages 2-4 (accumulated during Batch 1 processing, including Message 4 arriving at ~60ms) - // - Batch 3: Messages 5-6 (accumulated during Batch 2 processing) - expect(batchMessageCounts).toEqual([1, 3, 2]) - - // Verify that batches never processed in parallel (backpressure working) - expect(maxConcurrentBatches).toBe(1) // Should never process more than 1 batch at a time - - // Verify that batches were processed sequentially (each starts after previous ends) - for (let i = 1; i < batchStartTimes.length; i++) { - const previousEndTime = batchEndTimes[i - 1] - const currentStartTime = batchStartTimes[i] - // The current batch must start after the previous batch finished - expect(currentStartTime).toBeGreaterThanOrEqual(previousEndTime ?? 0) - } + // Then: Wait for batches to be emitted + await waitAndRetry(() => batchMessageCounts.length >= 6, 500, 20) + + // Each message triggers its own timeout flush since they arrive 20ms apart + expect(batchMessageCounts.length).toBeGreaterThanOrEqual(1) + expect(batchMessageCounts.reduce((a, b) => a + b, 0)).toBe(6) }) }) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index a6c43d0a..a11270f2 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -21,47 +21,29 @@ export type OnMessageBatchCallback = (batch: MessageBatch) = export class KafkaMessageBatchStream< TMessage extends MessageWithTopicAndPartition, > extends Transform { - private readonly onBatch: OnMessageBatchCallback private readonly batchSize: number private readonly timeout: number private readonly currentBatchPerTopicPartition: Record private readonly batchTimeoutPerTopicPartition: Record - - private readonly timeoutProcessingPromises: Map> = new Map() - - constructor( - onBatch: OnMessageBatchCallback, - options: { batchSize: number; timeoutMilliseconds: number }, - ) { + constructor(options: { batchSize: number; timeoutMilliseconds: number }) { super({ objectMode: true }) - this.onBatch = onBatch this.batchSize = options.batchSize this.timeout = options.timeoutMilliseconds this.currentBatchPerTopicPartition = {} this.batchTimeoutPerTopicPartition = {} } - override async _transform(message: TMessage, _encoding: BufferEncoding, callback: () => void) { + override _transform(message: TMessage, _encoding: BufferEncoding, callback: () => void) { const key = getTopicPartitionKey(message.topic, message.partition) - // Wait for all pending timeout flushes to complete to maintain backpressure - if (this.timeoutProcessingPromises.size > 0) { - // Capture a snapshot of current promises to avoid race conditions with new timeouts - const promiseEntries = Array.from(this.timeoutProcessingPromises.entries()) - // Wait for all to complete and then clean up from the map - await Promise.all( - promiseEntries.map(([k, p]) => p.finally(() => this.timeoutProcessingPromises.delete(k))), - ) - } - // Accumulate the message if (!this.currentBatchPerTopicPartition[key]) this.currentBatchPerTopicPartition[key] = [] this.currentBatchPerTopicPartition[key].push(message) // Check if the batch is complete by size if (this.currentBatchPerTopicPartition[key].length >= this.batchSize) { - await this.flushCurrentBatchMessages(message.topic, message.partition) + this.flushCurrentBatchMessages(message.topic, message.partition) callback() return } @@ -69,11 +51,7 @@ export class KafkaMessageBatchStream< // Start timeout for this partition if not already started if (!this.batchTimeoutPerTopicPartition[key]) { this.batchTimeoutPerTopicPartition[key] = setTimeout( - () => - this.timeoutProcessingPromises.set( - key, - this.flushCurrentBatchMessages(message.topic, message.partition), - ), + () => this.flushCurrentBatchMessages(message.topic, message.partition), this.timeout, ) } @@ -87,14 +65,14 @@ export class KafkaMessageBatchStream< callback() } - private async flushAllBatches() { + private flushAllBatches() { for (const key of Object.keys(this.currentBatchPerTopicPartition)) { const { topic, partition } = splitTopicPartitionKey(key) - await this.flushCurrentBatchMessages(topic, partition) + this.flushCurrentBatchMessages(topic, partition) } } - private async flushCurrentBatchMessages(topic: string, partition: number) { + private flushCurrentBatchMessages(topic: string, partition: number) { const key = getTopicPartitionKey(topic, partition) // Clear timeout @@ -104,11 +82,18 @@ export class KafkaMessageBatchStream< } const messages = this.currentBatchPerTopicPartition[key] ?? [] + if (!messages.length) return - // Push the batch downstream - await this.onBatch({ topic, partition, messages }) + this.push({ topic, partition, messages: messages }) this.currentBatchPerTopicPartition[key] = [] } + + override push( + chunk: { topic: string; partition: number; messages: TMessage[] }, + encoding?: BufferEncoding, + ): boolean { + return super.push(chunk, encoding) + } } const getTopicPartitionKey = (topic: string, partition: number): string => `${topic}:${partition}` From 11ea818332cac4e8a6c96c1e3caf7ea58ba680a1 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 14 Jan 2026 12:55:02 +0100 Subject: [PATCH 02/20] beta 2 release prepare --- packages/kafka/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/kafka/package.json b/packages/kafka/package.json index 3a9c000e..686e879a 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/kafka", - "version": "0.9.0", + "version": "0.9.1-beta.2", "engines": { "node": ">= 22.14.0" }, From 28bba6955a977492078b8d636898d0cc5b28194e Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 15 Jan 2026 13:55:33 +0100 Subject: [PATCH 03/20] beta 3 --- packages/kafka/lib/utils/KafkaMessageBatchStream.ts | 8 ++++---- packages/kafka/package.json | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index a11270f2..7830476d 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -9,7 +9,6 @@ export type KafkaMessageBatchOptions = { } export type MessageBatch = { topic: string; partition: number; messages: TMessage[] } -export type OnMessageBatchCallback = (batch: MessageBatch) => Promise /** * Collects messages in batches based on provided batchSize and flushes them when messages amount or timeout is reached. @@ -60,8 +59,9 @@ export class KafkaMessageBatchStream< } // Flush all remaining batches when stream is closing - override async _flush(callback: () => void) { - await this.flushAllBatches() + override _flush(callback: () => void) { + this.flushAllBatches() + this.push(null) // end of stream callback() } @@ -89,7 +89,7 @@ export class KafkaMessageBatchStream< } override push( - chunk: { topic: string; partition: number; messages: TMessage[] }, + chunk: { topic: string; partition: number; messages: TMessage[] } | null, encoding?: BufferEncoding, ): boolean { return super.push(chunk, encoding) diff --git a/packages/kafka/package.json b/packages/kafka/package.json index 686e879a..96a3d9a0 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/kafka", - "version": "0.9.1-beta.2", + "version": "0.9.1-beta.3", "engines": { "node": ">= 22.14.0" }, From 9577df442674396223eaa51d9928f4eb3c07cd69 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 16 Jan 2026 10:41:56 +0100 Subject: [PATCH 04/20] Switch to a single batch and split later --- packages/kafka/lib/AbstractKafkaConsumer.ts | 30 ++- .../lib/utils/KafkaMessageBatchStream.spec.ts | 186 +----------------- .../lib/utils/KafkaMessageBatchStream.ts | 79 +++----- 3 files changed, 60 insertions(+), 235 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 6bc3d11f..c2da622c 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -230,10 +230,34 @@ export abstract class AbstractKafkaConsumer< private async handleSyncStreamBatch( stream: KafkaMessageBatchStream>>, ): Promise { + const getTopicPartitionKey = (topic: string, partition: number): string => + `${topic}:${partition}` + const splitTopicPartitionKey = (key: string): { topic: string; partition: number } => { + const [topic, partition] = key.split(':') + /* v8 ignore start */ + if (!topic || !partition) throw new Error('Invalid topic-partition key format') + /* v8 ignore stop */ + + return { topic, partition: Number.parseInt(partition, 10) } + } + for await (const messageBatch of stream) { - await this.consume( - messageBatch.topic, - messageBatch.messages as DeserializedMessage>, + const messagesByTopicPartition: Record< + string, + DeserializedMessage>[] + > = {} + for (const message of messageBatch) { + const key = getTopicPartitionKey(message.topic, message.partition) + if (!messagesByTopicPartition[key]) { + messagesByTopicPartition[key] = [] + } + messagesByTopicPartition[key].push(message) + } + + await Promise.all( + Object.entries(messagesByTopicPartition).map(([key, messages]) => + this.consume(splitTopicPartitionKey(key).topic, messages), + ), ) } } diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts index de47c429..79561d6e 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts @@ -1,5 +1,4 @@ import { setTimeout } from 'node:timers/promises' -import { waitAndRetry } from '@lokalise/universal-ts-utils/node' import { KafkaMessageBatchStream, type MessageBatch } from './KafkaMessageBatchStream.ts' describe('KafkaMessageBatchStream', () => { @@ -42,9 +41,9 @@ describe('KafkaMessageBatchStream', () => { // Then expect(receivedBatches).toEqual([ - { topic, partition: 0, messages: [messages[0], messages[1], messages[2]] }, - { topic, partition: 0, messages: [messages[3], messages[4], messages[5]] }, - { topic, partition: 0, messages: [messages[6], messages[7], messages[8]] }, + [messages[0], messages[1], messages[2]], + [messages[3], messages[4], messages[5]], + [messages[6], messages[7], messages[8]], ]) }) @@ -78,183 +77,6 @@ describe('KafkaMessageBatchStream', () => { await setTimeout(150) // Then - expect(receivedBatches).toEqual([{ topic, partition: 0, messages }]) - }) - - it('should support multiple topics and partitions', async () => { - // Given - const firstTopic = 'test-topic-1' - const secondTopic = 'test-topic-2' - const thirdTopic = 'test-topic-3' - const messages = [ - ...Array.from({ length: 4 }, (_, i) => ({ - id: i + 1, - content: `Message ${i + 1}`, - topic: firstTopic, - partition: 0, - })), - ...Array.from({ length: 4 }, (_, i) => ({ - id: i + 1, - content: `Message ${i + 1}`, - topic: secondTopic, - partition: 1, - })), - ...Array.from({ length: 3 }, (_, i) => ({ - id: i + 1, - content: `Message ${i + 1}`, - topic: thirdTopic, - partition: 0, - })), - ] - - // When - const receivedBatchesByTopicPartition: Record = {} - let receivedMessagesCounter = 0 - - let resolvePromise: () => void - const dataFetchingPromise = new Promise((resolve) => { - resolvePromise = resolve - }) - - const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>({ - batchSize: 2, - timeoutMilliseconds: 10000, - }) // Setting big timeout to check batch size only - - batchStream.on('data', (batch) => { - const key = `${batch.topic}:${batch.partition}` - if (!receivedBatchesByTopicPartition[key]) { - receivedBatchesByTopicPartition[key] = [] - } - receivedBatchesByTopicPartition[key]!.push(batch.messages) - - // We expect 5 batches and last message waiting in the stream - receivedMessagesCounter++ - if (receivedMessagesCounter >= 5) { - resolvePromise() - } - }) - - for (const message of messages) { - batchStream.write(message) - } - - await dataFetchingPromise - - // Then - expect(receivedBatchesByTopicPartition[`${firstTopic}:0`]).toEqual([ - [messages[0], messages[1]], - [messages[2], messages[3]], - ]) - expect(receivedBatchesByTopicPartition[`${secondTopic}:1`]).toEqual([ - [messages[4], messages[5]], - [messages[6], messages[7]], - ]) - expect(receivedBatchesByTopicPartition[`${thirdTopic}:0`]).toEqual([[messages[8], messages[9]]]) - }) - - it('should batch messages separately for different partitions of the same topic', async () => { - // Given - const topic = 'test-topic' - const messages = [ - ...Array.from({ length: 3 }, (_, i) => ({ - id: i + 1, - content: `Message ${i + 1}`, - topic, - partition: 0, - })), - ...Array.from({ length: 3 }, (_, i) => ({ - id: i + 4, - content: `Message ${i + 4}`, - topic, - partition: 1, - })), - { - id: 7, - content: `Message 7`, - topic, - partition: 0, - }, - { - id: 8, - content: `Message 8`, - topic, - partition: 1, - }, - ] - - // When - const receivedBatches: any[] = [] - let receivedBatchesCounter = 0 - - let resolvePromise: () => void - const dataFetchingPromise = new Promise((resolve) => { - resolvePromise = resolve - }) - - const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>({ - batchSize: 2, - timeoutMilliseconds: 10000, - }) // Setting big timeout to check batch size only - - batchStream.on('data', (batch) => { - receivedBatches.push(batch) - - // We expect 4 batches (2 per partition) - receivedBatchesCounter++ - if (receivedBatchesCounter >= 4) { - resolvePromise() - } - }) - - for (const message of messages) { - batchStream.write(message) - } - - await dataFetchingPromise - - // Then - expect(receivedBatches).toEqual([ - { topic, partition: 0, messages: [messages[0], messages[1]] }, - { topic, partition: 1, messages: [messages[3], messages[4]] }, - { topic, partition: 0, messages: [messages[2], messages[6]] }, - { topic, partition: 1, messages: [messages[5], messages[7]] }, - ]) - }) - - // Skipping for beta - it.skip('should emit batches based on timeout with delayed writes', async () => { - // Given - const topic = 'test-topic' - const messages = Array.from({ length: 6 }, (_, i) => ({ - id: i + 1, - content: `Message ${i + 1}`, - topic, - partition: 0, - })) - - const batchMessageCounts: number[] = [] // Track number of messages per batch - - const batchStream = new KafkaMessageBatchStream({ - batchSize: 1000, // Large batch size to never trigger size-based flushing - timeoutMilliseconds: 30, // Short timeout to trigger flush - }) - - batchStream.on('data', (batch) => { - batchMessageCounts.push(batch.messages.length) - }) - - // When: Write messages with 20ms delay between them - for (const message of messages) { - batchStream.write(message) - await setTimeout(20) - } - - // Then: Wait for batches to be emitted - await waitAndRetry(() => batchMessageCounts.length >= 6, 500, 20) - - // Each message triggers its own timeout flush since they arrive 20ms apart - expect(batchMessageCounts.length).toBeGreaterThanOrEqual(1) - expect(batchMessageCounts.reduce((a, b) => a + b, 0)).toBe(6) + expect(receivedBatches).toEqual([messages]) }) }) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index 7830476d..bf6c458d 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -1,4 +1,4 @@ -import { Transform } from 'node:stream' +import { Transform, type TransformCallback } from 'node:stream' // Topic and partition are required for the stream to work properly type MessageWithTopicAndPartition = { topic: string; partition: number } @@ -23,75 +23,54 @@ export class KafkaMessageBatchStream< private readonly batchSize: number private readonly timeout: number - private readonly currentBatchPerTopicPartition: Record - private readonly batchTimeoutPerTopicPartition: Record + private readonly messages: TMessage[] + private existingTimeout: NodeJS.Timeout | undefined + constructor(options: { batchSize: number; timeoutMilliseconds: number }) { super({ objectMode: true }) this.batchSize = options.batchSize this.timeout = options.timeoutMilliseconds - this.currentBatchPerTopicPartition = {} - this.batchTimeoutPerTopicPartition = {} - } - - override _transform(message: TMessage, _encoding: BufferEncoding, callback: () => void) { - const key = getTopicPartitionKey(message.topic, message.partition) - // Accumulate the message - if (!this.currentBatchPerTopicPartition[key]) this.currentBatchPerTopicPartition[key] = [] - this.currentBatchPerTopicPartition[key].push(message) - - // Check if the batch is complete by size - if (this.currentBatchPerTopicPartition[key].length >= this.batchSize) { - this.flushCurrentBatchMessages(message.topic, message.partition) - callback() - return - } + this.messages = [] + } - // Start timeout for this partition if not already started - if (!this.batchTimeoutPerTopicPartition[key]) { - this.batchTimeoutPerTopicPartition[key] = setTimeout( - () => this.flushCurrentBatchMessages(message.topic, message.partition), - this.timeout, - ) + override _transform(message: TMessage, _encoding: BufferEncoding, callback: TransformCallback) { + try { + this.messages.push(message) + + // Check if the batch is complete by size + if (this.messages.length >= this.batchSize) { + this.flushMessages() + return + } else if (!this.existingTimeout) { + // Start timeout if not already started + this.existingTimeout = setTimeout(() => this.flushMessages(), this.timeout) + } + } finally { + callback(null) } - - callback() } // Flush all remaining batches when stream is closing override _flush(callback: () => void) { - this.flushAllBatches() + this.flushMessages() this.push(null) // end of stream callback() } - private flushAllBatches() { - for (const key of Object.keys(this.currentBatchPerTopicPartition)) { - const { topic, partition } = splitTopicPartitionKey(key) - this.flushCurrentBatchMessages(topic, partition) + private flushMessages() { + if (this.existingTimeout) { + clearTimeout(this.existingTimeout) + this.existingTimeout = undefined } - } - private flushCurrentBatchMessages(topic: string, partition: number) { - const key = getTopicPartitionKey(topic, partition) - - // Clear timeout - if (this.batchTimeoutPerTopicPartition[key]) { - clearTimeout(this.batchTimeoutPerTopicPartition[key]) - this.batchTimeoutPerTopicPartition[key] = undefined + const messages = this.messages.splice(0, this.messages.length) + if (messages.length) { + this.push(messages) } - - const messages = this.currentBatchPerTopicPartition[key] ?? [] - if (!messages.length) return - - this.push({ topic, partition, messages: messages }) - this.currentBatchPerTopicPartition[key] = [] } - override push( - chunk: { topic: string; partition: number; messages: TMessage[] } | null, - encoding?: BufferEncoding, - ): boolean { + override push(chunk: TMessage[] | null, encoding?: BufferEncoding): boolean { return super.push(chunk, encoding) } } From d18cab05bc7f86d19ca4a64a6ee1f6bc5172be76 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 16 Jan 2026 10:52:27 +0100 Subject: [PATCH 05/20] Adding lfush protection just in case --- .../kafka/lib/utils/KafkaMessageBatchStream.ts | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index bf6c458d..6089a9a7 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -25,6 +25,7 @@ export class KafkaMessageBatchStream< private readonly messages: TMessage[] private existingTimeout: NodeJS.Timeout | undefined + private isFlushing: boolean = false constructor(options: { batchSize: number; timeoutMilliseconds: number }) { super({ objectMode: true }) @@ -63,24 +64,17 @@ export class KafkaMessageBatchStream< clearTimeout(this.existingTimeout) this.existingTimeout = undefined } + if (this.isFlushing) return + this.isFlushing = true const messages = this.messages.splice(0, this.messages.length) if (messages.length) { this.push(messages) } + this.isFlushing = false } override push(chunk: TMessage[] | null, encoding?: BufferEncoding): boolean { return super.push(chunk, encoding) } } - -const getTopicPartitionKey = (topic: string, partition: number): string => `${topic}:${partition}` -const splitTopicPartitionKey = (key: string): { topic: string; partition: number } => { - const [topic, partition] = key.split(':') - /* v8 ignore start */ - if (!topic || !partition) throw new Error('Invalid topic-partition key format') - /* v8 ignore stop */ - - return { topic, partition: Number.parseInt(partition, 10) } -} From 1f49769bcef46d90462aa64442ec077a776dd557 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 16 Jan 2026 10:52:32 +0100 Subject: [PATCH 06/20] release prepare --- packages/kafka/package.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/kafka/package.json b/packages/kafka/package.json index 96a3d9a0..7775cce0 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/kafka", - "version": "0.9.1-beta.3", + "version": "0.9.1-beta.4", "engines": { "node": ">= 22.14.0" }, @@ -53,7 +53,7 @@ "dependencies": { "@lokalise/node-core": "^14.2.0", "@lokalise/universal-ts-utils": "^4.5.1", - "@platformatic/kafka": "^1.22.0" + "@platformatic/kafka": "^1.23.0" }, "peerDependencies": { "@message-queue-toolkit/core": ">=23.0.0", From d1e4bd3bfa48a2492a87c6b529069b220630e211 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 16 Jan 2026 14:29:53 +0100 Subject: [PATCH 07/20] Using final instead of flush --- packages/kafka/lib/utils/KafkaMessageBatchStream.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index 6089a9a7..eb2a7a18 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -52,10 +52,8 @@ export class KafkaMessageBatchStream< } } - // Flush all remaining batches when stream is closing - override _flush(callback: () => void) { + override _final(callback: (error?: Error | null) => void) { this.flushMessages() - this.push(null) // end of stream callback() } From a4888bfe84af855fea58d1a939c3ba3148f75b0b Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 16 Jan 2026 14:30:10 +0100 Subject: [PATCH 08/20] release prepare --- packages/kafka/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/kafka/package.json b/packages/kafka/package.json index 7775cce0..b3d1a086 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/kafka", - "version": "0.9.1-beta.4", + "version": "0.9.1-beta.5", "engines": { "node": ">= 22.14.0" }, From 3b2b03b4e6dbbf352315611bf77c8fe840cfc8b3 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 16 Jan 2026 17:51:04 +0100 Subject: [PATCH 09/20] Minor fixes --- packages/kafka/lib/utils/KafkaMessageBatchStream.ts | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index eb2a7a18..f3e9a1e7 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -39,13 +39,12 @@ export class KafkaMessageBatchStream< try { this.messages.push(message) - // Check if the batch is complete by size if (this.messages.length >= this.batchSize) { + // Check if the batch is complete by size this.flushMessages() - return - } else if (!this.existingTimeout) { + } else { // Start timeout if not already started - this.existingTimeout = setTimeout(() => this.flushMessages(), this.timeout) + this.existingTimeout ??= setTimeout(() => this.flushMessages(), this.timeout) } } finally { callback(null) @@ -66,9 +65,7 @@ export class KafkaMessageBatchStream< this.isFlushing = true const messages = this.messages.splice(0, this.messages.length) - if (messages.length) { - this.push(messages) - } + if (messages.length) this.push(messages) this.isFlushing = false } From 36f4fe07e85fbbe3f4f168b8a1410b7884462f21 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 16 Jan 2026 18:45:58 +0100 Subject: [PATCH 10/20] Trying to go back to duplex --- .../lib/utils/KafkaMessageBatchStream.ts | 52 +++++++++++++------ 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index f3e9a1e7..d393fb3e 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -1,7 +1,6 @@ -import { Transform, type TransformCallback } from 'node:stream' +import { Duplex } from 'node:stream' -// Topic and partition are required for the stream to work properly -type MessageWithTopicAndPartition = { topic: string; partition: number } +type CallbackFunction = (error?: Error | null) => void export type KafkaMessageBatchOptions = { batchSize: number @@ -17,14 +16,13 @@ export type MessageBatch = { topic: string; partition: number; message * When the downstream consumer is slow, the stream will automatically pause accepting new messages * until the consumer catches up, preventing memory leaks and OOM errors. */ -export class KafkaMessageBatchStream< - TMessage extends MessageWithTopicAndPartition, -> extends Transform { +export class KafkaMessageBatchStream extends Duplex { private readonly batchSize: number private readonly timeout: number private readonly messages: TMessage[] private existingTimeout: NodeJS.Timeout | undefined + private pendingCallback: CallbackFunction | undefined private isFlushing: boolean = false constructor(options: { batchSize: number; timeoutMilliseconds: number }) { @@ -33,40 +31,64 @@ export class KafkaMessageBatchStream< this.timeout = options.timeoutMilliseconds this.messages = [] + + // Listen to 'drain' event to resume after backpressure + this.on('drain', () => { + if (!this.pendingCallback) return + + const cb = this.pendingCallback + this.pendingCallback = undefined + cb() + }) + } + + override _read() { + // No-op, as we push data when we have a full batch or timeout } - override _transform(message: TMessage, _encoding: BufferEncoding, callback: TransformCallback) { + override _write(message: TMessage, _encoding: BufferEncoding, callback: CallbackFunction) { + // biome-ignore lint/correctness/noUnusedVariables: All good in this version + let canContinue = true + try { this.messages.push(message) if (this.messages.length >= this.batchSize) { - // Check if the batch is complete by size - this.flushMessages() + canContinue = this.flushMessages() } else { - // Start timeout if not already started + // If backpressure happens, we don't have a callback to hold + // The next _write will handle backpressure + // TODO: check if we can handle this. this.existingTimeout ??= setTimeout(() => this.flushMessages(), this.timeout) } } finally { - callback(null) + // TODO: to be enabled in next beta version + // if (!canContinue) this.pendingCallback = callback + // else callback() + callback() } } - override _final(callback: (error?: Error | null) => void) { + override _final(callback: CallbackFunction) { this.flushMessages() + this.push(null) // End readable side callback() } - private flushMessages() { + private flushMessages(): boolean { if (this.existingTimeout) { clearTimeout(this.existingTimeout) this.existingTimeout = undefined } - if (this.isFlushing) return + if (this.isFlushing) return true this.isFlushing = true const messages = this.messages.splice(0, this.messages.length) - if (messages.length) this.push(messages) + let canContinue = true + if (messages.length) canContinue = this.push(messages) this.isFlushing = false + + return canContinue } override push(chunk: TMessage[] | null, encoding?: BufferEncoding): boolean { From 29b7e75c6cbb1ad740fa447bf29528b4053d1dec Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 16 Jan 2026 20:04:53 +0100 Subject: [PATCH 11/20] beta 6 release prepare --- packages/kafka/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/kafka/package.json b/packages/kafka/package.json index b3d1a086..1b18e026 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/kafka", - "version": "0.9.1-beta.5", + "version": "0.9.1-beta.6", "engines": { "node": ">= 22.14.0" }, From ab8afa34841a10b92e1f318e44e6dd22214a9c94 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Sat, 17 Jan 2026 15:41:44 +0100 Subject: [PATCH 12/20] backpreasure handling with _read and callback --- .../lib/utils/KafkaMessageBatchStream.ts | 32 +++++++------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index d393fb3e..2e9e3861 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -31,23 +31,19 @@ export class KafkaMessageBatchStream extends Duplex { this.timeout = options.timeoutMilliseconds this.messages = [] - - // Listen to 'drain' event to resume after backpressure - this.on('drain', () => { - if (!this.pendingCallback) return - - const cb = this.pendingCallback - this.pendingCallback = undefined - cb() - }) } override _read() { - // No-op, as we push data when we have a full batch or timeout + // When _read is called, it means the downstream consumer is ready for more data + // This is when we should resume the writable side by calling the pending callback if it exists + if (!this.pendingCallback) return + + const cb = this.pendingCallback + this.pendingCallback = undefined + cb() } override _write(message: TMessage, _encoding: BufferEncoding, callback: CallbackFunction) { - // biome-ignore lint/correctness/noUnusedVariables: All good in this version let canContinue = true try { @@ -58,14 +54,11 @@ export class KafkaMessageBatchStream extends Duplex { } else { // If backpressure happens, we don't have a callback to hold // The next _write will handle backpressure - // TODO: check if we can handle this. this.existingTimeout ??= setTimeout(() => this.flushMessages(), this.timeout) } } finally { - // TODO: to be enabled in next beta version - // if (!canContinue) this.pendingCallback = callback - // else callback() - callback() + if (!canContinue) this.pendingCallback = callback + else callback() } } @@ -76,10 +69,9 @@ export class KafkaMessageBatchStream extends Duplex { } private flushMessages(): boolean { - if (this.existingTimeout) { - clearTimeout(this.existingTimeout) - this.existingTimeout = undefined - } + clearTimeout(this.existingTimeout) + this.existingTimeout = undefined + if (this.isFlushing) return true this.isFlushing = true From 747ea9eacf28670b779094f489d71887001513f4 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Sat, 17 Jan 2026 15:41:51 +0100 Subject: [PATCH 13/20] beta 7 --- packages/kafka/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/kafka/package.json b/packages/kafka/package.json index 1b18e026..30ff70fd 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/kafka", - "version": "0.9.1-beta.6", + "version": "0.9.1-beta.7", "engines": { "node": ">= 22.14.0" }, From ab7c7861cefa860f848148bae224b25c1e1b70bf Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Mon, 19 Jan 2026 10:47:38 +0100 Subject: [PATCH 14/20] Ignoring all commit errors --- packages/kafka/lib/AbstractKafkaConsumer.ts | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index c2da622c..881ca444 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -480,8 +480,9 @@ export abstract class AbstractKafkaConsumer< this.logger.debug(logDetails, 'Message committed successfully') } catch (error) { this.logger.debug(logDetails, 'Message commit failed') - if (error instanceof ResponseError) return this.handleResponseErrorOnCommit(error) - throw error + if (error instanceof ResponseError) { + return this.handleResponseErrorOnCommit(error) + } } } @@ -493,7 +494,7 @@ export abstract class AbstractKafkaConsumer< error.apiCode && commitErrorCodesToIgnore.has(error.apiCode) ) { - this.logger.error( + this.logger.warn( { apiCode: error.apiCode, apiId: error.apiId, @@ -503,9 +504,6 @@ export abstract class AbstractKafkaConsumer< }, `Failed to commit message: ${error.message}`, ) - } else { - // If error is not recognized, rethrow it - throw responseError } } } From 9bfe742baa1c0d5fc72466359a478dcc88961419 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Mon, 19 Jan 2026 10:47:52 +0100 Subject: [PATCH 15/20] beta 8 release prepare --- packages/kafka/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/kafka/package.json b/packages/kafka/package.json index 30ff70fd..6a5c0029 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/kafka", - "version": "0.9.1-beta.7", + "version": "0.9.1-beta.8", "engines": { "node": ">= 22.14.0" }, From d09aa66f2c8d0e25078062bbc19de75460dd2d46 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Tue, 20 Jan 2026 10:10:02 +0100 Subject: [PATCH 16/20] beta 9 --- packages/kafka/lib/AbstractKafkaConsumer.ts | 20 ++++++++++++++++++++ packages/kafka/package.json | 2 +- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 881ca444..16dc25e3 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -198,6 +198,26 @@ export abstract class AbstractKafkaConsumer< timeoutMilliseconds: this.options.batchProcessingOptions.timeoutMilliseconds, }) this.consumerStream.pipe(this.messageBatchStream) + + setInterval(() => { + try { + this.logger.info( + { + isPaused: this.consumerStream?.isPaused(), + isActive: this.consumerStream?.isActive(), + isConnected: this.consumerStream?.isConnected(), + }, + 'Checking consumer stream status...', + ) + if (this.consumerStream?.isPaused()) { + this.logger.info('Consumer stream was paused. trying to resume') + this.consumerStream?.resume() + this.logger.info('Stream resumed successfully') + } + } finally { + // do nothing + } + }, 5000) } } catch (error) { throw new InternalError({ diff --git a/packages/kafka/package.json b/packages/kafka/package.json index 6a5c0029..a361730d 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/kafka", - "version": "0.9.1-beta.8", + "version": "0.9.1-beta.9", "engines": { "node": ">= 22.14.0" }, From 1c3d361dd3b687660198c369256322c78bc818c5 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Tue, 20 Jan 2026 10:10:25 +0100 Subject: [PATCH 17/20] Removing interval test --- packages/kafka/lib/AbstractKafkaConsumer.ts | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 16dc25e3..881ca444 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -198,26 +198,6 @@ export abstract class AbstractKafkaConsumer< timeoutMilliseconds: this.options.batchProcessingOptions.timeoutMilliseconds, }) this.consumerStream.pipe(this.messageBatchStream) - - setInterval(() => { - try { - this.logger.info( - { - isPaused: this.consumerStream?.isPaused(), - isActive: this.consumerStream?.isActive(), - isConnected: this.consumerStream?.isConnected(), - }, - 'Checking consumer stream status...', - ) - if (this.consumerStream?.isPaused()) { - this.logger.info('Consumer stream was paused. trying to resume') - this.consumerStream?.resume() - this.logger.info('Stream resumed successfully') - } - } finally { - // do nothing - } - }, 5000) } } catch (error) { throw new InternalError({ From eca96fdc8aae20944acf8975c69dce9395cea6ef Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Tue, 20 Jan 2026 10:55:25 +0100 Subject: [PATCH 18/20] beta 10 -> using pipeline + better logging to see where the issue is --- packages/kafka/lib/AbstractKafkaConsumer.ts | 26 +++++++++++++++++---- packages/kafka/package.json | 2 +- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 881ca444..a772c52e 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -1,4 +1,5 @@ import { randomUUID } from 'node:crypto' +import { pipeline } from 'node:stream/promises' import { setTimeout } from 'node:timers/promises' import { InternalError, @@ -197,7 +198,12 @@ export abstract class AbstractKafkaConsumer< batchSize: this.options.batchProcessingOptions.batchSize, timeoutMilliseconds: this.options.batchProcessingOptions.timeoutMilliseconds, }) - this.consumerStream.pipe(this.messageBatchStream) + + // Use pipeline for better error handling and backpressure management + pipeline(this.consumerStream, this.messageBatchStream).catch((error) => { + this.logger.error('Stream pipeline failed') + this.handlerError(error) + }) } } catch (error) { throw new InternalError({ @@ -211,10 +217,22 @@ export abstract class AbstractKafkaConsumer< this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error)) } else { this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error)) + this.consumerStream.on('error', (error) => this.handlerError(error)) } - this.consumerStream.on('error', (error) => this.handlerError(error)) - this.messageBatchStream?.on('error', (error) => this.handlerError(error)) + // To be removed + // Add logging for stream lifecycle events to diagnose backpressure issues + this.consumerStream.on('pause', () => this.logger.warn('Consumer stream PAUSED')) + this.consumerStream.on('resume', () => this.logger.info('Consumer stream RESUMED')) + this.consumerStream.on('fetch', () => this.logger.info('Consumer stream FETCH')) + this.consumerStream.on('close', () => this.logger.info('Consumer stream CLOSE')) + this.consumerStream.on('end', () => this.logger.info('Consumer stream END')) + this.consumerStream.on('end', () => this.logger.info('Consumer stream END')) + this.messageBatchStream?.on('pause', () => this.logger.warn('Message batch stream PAUSED')) + this.messageBatchStream?.on('resume', () => this.logger.info('Message batch stream RESUMED')) + this.messageBatchStream?.on('drain', () => this.logger.info('Message batch stream DRAINED')) + this.messageBatchStream?.on('close', () => this.logger.info('Message batch stream CLOSE')) + this.messageBatchStream?.on('end', () => this.logger.info('Message batch stream END')) } private async handleSyncStream( @@ -254,7 +272,7 @@ export abstract class AbstractKafkaConsumer< messagesByTopicPartition[key].push(message) } - await Promise.all( + await Promise.allSettled( Object.entries(messagesByTopicPartition).map(([key, messages]) => this.consume(splitTopicPartitionKey(key).topic, messages), ), diff --git a/packages/kafka/package.json b/packages/kafka/package.json index a361730d..1a78cb9a 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/kafka", - "version": "0.9.1-beta.9", + "version": "0.9.1-beta.10", "engines": { "node": ">= 22.14.0" }, From 3daa8fd950e4b1bda21433cebcb15ab6e6053094 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 21 Jan 2026 13:09:20 +0100 Subject: [PATCH 19/20] Adjusting logs --- packages/kafka/lib/AbstractKafkaConsumer.ts | 21 +++++++++---------- .../lib/utils/KafkaMessageBatchStream.ts | 12 +---------- 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index a772c52e..5b8b7a8d 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -222,17 +222,16 @@ export abstract class AbstractKafkaConsumer< // To be removed // Add logging for stream lifecycle events to diagnose backpressure issues - this.consumerStream.on('pause', () => this.logger.warn('Consumer stream PAUSED')) - this.consumerStream.on('resume', () => this.logger.info('Consumer stream RESUMED')) - this.consumerStream.on('fetch', () => this.logger.info('Consumer stream FETCH')) - this.consumerStream.on('close', () => this.logger.info('Consumer stream CLOSE')) - this.consumerStream.on('end', () => this.logger.info('Consumer stream END')) - this.consumerStream.on('end', () => this.logger.info('Consumer stream END')) - this.messageBatchStream?.on('pause', () => this.logger.warn('Message batch stream PAUSED')) - this.messageBatchStream?.on('resume', () => this.logger.info('Message batch stream RESUMED')) - this.messageBatchStream?.on('drain', () => this.logger.info('Message batch stream DRAINED')) - this.messageBatchStream?.on('close', () => this.logger.info('Message batch stream CLOSE')) - this.messageBatchStream?.on('end', () => this.logger.info('Message batch stream END')) + this.consumerStream.on('pause', () => this.logger.debug('Consumer stream PAUSED')) + this.consumerStream.on('resume', () => this.logger.debug('Consumer stream RESUMED')) + this.consumerStream.on('close', () => this.logger.warn('Consumer stream CLOSE')) + this.consumerStream.on('end', () => this.logger.warn('Consumer stream END')) + this.consumerStream.on('error', () => this.logger.error('Consumer stream ERROR')) + this.messageBatchStream?.on('pause', () => this.logger.debug('Message batch stream PAUSED')) + this.messageBatchStream?.on('resume', () => this.logger.debug('Message batch stream RESUMED')) + this.messageBatchStream?.on('close', () => this.logger.warn('Message batch stream CLOSE')) + this.messageBatchStream?.on('end', () => this.logger.warn('Message batch stream END')) + this.messageBatchStream?.on('error', () => this.logger.warn('Message batch stream ERROR')) } private async handleSyncStream( diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index 2e9e3861..da2711d5 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -7,8 +7,6 @@ export type KafkaMessageBatchOptions = { timeoutMilliseconds: number } -export type MessageBatch = { topic: string; partition: number; messages: TMessage[] } - /** * Collects messages in batches based on provided batchSize and flushes them when messages amount or timeout is reached. * @@ -23,7 +21,6 @@ export class KafkaMessageBatchStream extends Duplex { private readonly messages: TMessage[] private existingTimeout: NodeJS.Timeout | undefined private pendingCallback: CallbackFunction | undefined - private isFlushing: boolean = false constructor(options: { batchSize: number; timeoutMilliseconds: number }) { super({ objectMode: true }) @@ -72,15 +69,8 @@ export class KafkaMessageBatchStream extends Duplex { clearTimeout(this.existingTimeout) this.existingTimeout = undefined - if (this.isFlushing) return true - this.isFlushing = true - const messages = this.messages.splice(0, this.messages.length) - let canContinue = true - if (messages.length) canContinue = this.push(messages) - this.isFlushing = false - - return canContinue + return this.push(messages) } override push(chunk: TMessage[] | null, encoding?: BufferEncoding): boolean { From 2c66c50f55b5a0f5cf6afecbda5e3c382819db7b Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 21 Jan 2026 13:11:41 +0100 Subject: [PATCH 20/20] Lint fix + release prepare --- packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts | 6 +++--- packages/kafka/package.json | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts index 79561d6e..6168132e 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts @@ -1,5 +1,5 @@ import { setTimeout } from 'node:timers/promises' -import { KafkaMessageBatchStream, type MessageBatch } from './KafkaMessageBatchStream.ts' +import { KafkaMessageBatchStream } from './KafkaMessageBatchStream.ts' describe('KafkaMessageBatchStream', () => { it('should batch messages based on batch size', async () => { @@ -13,7 +13,7 @@ describe('KafkaMessageBatchStream', () => { })) // When - const receivedBatches: MessageBatch[] = [] + const receivedBatches: any[] = [] let resolvePromise: () => void const dataFetchingPromise = new Promise((resolve) => { @@ -58,7 +58,7 @@ describe('KafkaMessageBatchStream', () => { })) // When - const receivedBatches: MessageBatch[] = [] + const receivedBatches: any[] = [] const batchStream = new KafkaMessageBatchStream({ batchSize: 1000, diff --git a/packages/kafka/package.json b/packages/kafka/package.json index 1a78cb9a..5dd97c7f 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/kafka", - "version": "0.9.1-beta.10", + "version": "0.9.1-beta.11", "engines": { "node": ">= 22.14.0" },