Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 69 additions & 16 deletions packages/kafka/lib/AbstractKafkaConsumer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { randomUUID } from 'node:crypto'
import { pipeline } from 'node:stream/promises'
import { setTimeout } from 'node:timers/promises'
import {
InternalError,
Expand Down Expand Up @@ -190,19 +191,19 @@ export abstract class AbstractKafkaConsumer<
})

this.consumerStream = await this.consumer.consume({ ...consumeOptions, topics })
this.consumerStream.on('error', (error) => this.handlerError(error))

if (this.options.batchProcessingEnabled && this.options.batchProcessingOptions) {
this.messageBatchStream = new KafkaMessageBatchStream<
DeserializedMessage<SupportedMessageValues<TopicsConfig>>
>(
(batch) =>
this.consume(batch.topic, batch.messages).catch((error) => this.handlerError(error)),
this.options.batchProcessingOptions,
)
this.consumerStream.pipe(this.messageBatchStream)
} else {
this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error))
>({
batchSize: this.options.batchProcessingOptions.batchSize,
timeoutMilliseconds: this.options.batchProcessingOptions.timeoutMilliseconds,
})

// Use pipeline for better error handling and backpressure management
pipeline(this.consumerStream, this.messageBatchStream).catch((error) => {
this.logger.error('Stream pipeline failed')
this.handlerError(error)
})
}
} catch (error) {
throw new InternalError({
Expand All @@ -211,6 +212,26 @@ export abstract class AbstractKafkaConsumer<
cause: error,
})
}

if (this.options.batchProcessingEnabled && this.messageBatchStream) {
this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error))
} else {
this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error))
this.consumerStream.on('error', (error) => this.handlerError(error))
}

// To be removed
// Add logging for stream lifecycle events to diagnose backpressure issues
this.consumerStream.on('pause', () => this.logger.debug('Consumer stream PAUSED'))
this.consumerStream.on('resume', () => this.logger.debug('Consumer stream RESUMED'))
this.consumerStream.on('close', () => this.logger.warn('Consumer stream CLOSE'))
this.consumerStream.on('end', () => this.logger.warn('Consumer stream END'))
this.consumerStream.on('error', () => this.logger.error('Consumer stream ERROR'))
this.messageBatchStream?.on('pause', () => this.logger.debug('Message batch stream PAUSED'))
this.messageBatchStream?.on('resume', () => this.logger.debug('Message batch stream RESUMED'))
this.messageBatchStream?.on('close', () => this.logger.warn('Message batch stream CLOSE'))
this.messageBatchStream?.on('end', () => this.logger.warn('Message batch stream END'))
this.messageBatchStream?.on('error', () => this.logger.warn('Message batch stream ERROR'))
}

private async handleSyncStream(
Expand All @@ -223,6 +244,40 @@ export abstract class AbstractKafkaConsumer<
)
}
}
private async handleSyncStreamBatch(
stream: KafkaMessageBatchStream<DeserializedMessage<SupportedMessageValues<TopicsConfig>>>,
): Promise<void> {
const getTopicPartitionKey = (topic: string, partition: number): string =>
`${topic}:${partition}`
const splitTopicPartitionKey = (key: string): { topic: string; partition: number } => {
const [topic, partition] = key.split(':')
/* v8 ignore start */
if (!topic || !partition) throw new Error('Invalid topic-partition key format')
/* v8 ignore stop */

return { topic, partition: Number.parseInt(partition, 10) }
}

for await (const messageBatch of stream) {
const messagesByTopicPartition: Record<
string,
DeserializedMessage<SupportedMessageValues<TopicsConfig>>[]
> = {}
for (const message of messageBatch) {
const key = getTopicPartitionKey(message.topic, message.partition)
if (!messagesByTopicPartition[key]) {
messagesByTopicPartition[key] = []
}
messagesByTopicPartition[key].push(message)
}

await Promise.allSettled(
Object.entries(messagesByTopicPartition).map(([key, messages]) =>
this.consume(splitTopicPartitionKey(key).topic, messages),
),
)
}
}

async close(): Promise<void> {
if (!this.consumerStream && !this.messageBatchStream) {
Expand Down Expand Up @@ -442,8 +497,9 @@ export abstract class AbstractKafkaConsumer<
this.logger.debug(logDetails, 'Message committed successfully')
} catch (error) {
this.logger.debug(logDetails, 'Message commit failed')
if (error instanceof ResponseError) return this.handleResponseErrorOnCommit(error)
throw error
if (error instanceof ResponseError) {
return this.handleResponseErrorOnCommit(error)
}
}
}

Expand All @@ -455,7 +511,7 @@ export abstract class AbstractKafkaConsumer<
error.apiCode &&
commitErrorCodesToIgnore.has(error.apiCode)
) {
this.logger.error(
this.logger.warn(
{
apiCode: error.apiCode,
apiId: error.apiId,
Expand All @@ -465,9 +521,6 @@ export abstract class AbstractKafkaConsumer<
},
`Failed to commit message: ${error.message}`,
)
} else {
// If error is not recognized, rethrow it
throw responseError
}
}
}
Expand Down
Loading
Loading