Skip to content

[ISSUE #10076] Make orderly resetOffset wait on consume lock while preserving timeout semantics#10175

Merged
lizhimins merged 1 commit intoapache:developfrom
LystranG:improve-resetoffset-wait-time-1
Mar 31, 2026
Merged

[ISSUE #10076] Make orderly resetOffset wait on consume lock while preserving timeout semantics#10175
lizhimins merged 1 commit intoapache:developfrom
LystranG:improve-resetoffset-wait-time-1

Conversation

@LystranG
Copy link
Copy Markdown
Contributor

@LystranG LystranG commented Mar 19, 2026

Which Issue(s) This PR Fixes

Brief Description

This change mainly optimizes the execution of resetOffset on the client side for orderly consumption to reduce the waiting time (during this period, the consumer is in a suspended state and should be resumed as soon as possible).

For orderly consumption, resetOffset no longer enforces a mandatory 10-second wait. Instead, it reuses the writeLock of the ProcessQueue. If the lock is successfully acquired, it indicates that no other thread is currently holding the ProcessQueue, and subsequent operations can be safely executed. Otherwise, it waits for the lock to be released (up to a maximum of 10 seconds, which is consistent with the original logic).

For concurrent consumption, the logic currently remains the same as before, i.e., waiting for 10 seconds. The current idea is to define a counter variable in the ProcessQueue to track the number of threads currently holding it. However, this approach would be intrusive to the codebase and require modifications to ConsumeMessageConcurrentlyService.

How Did You Test This Change?

image

@LystranG LystranG marked this pull request as draft March 19, 2026 08:38
@LystranG LystranG marked this pull request as ready for review March 19, 2026 08:40
if (topic.equals(mq.getTopic()) && offset != null) {
try {
ProcessQueue pq = processQueueTable.get(mq);
waitResetOffsetReady(consumer, pq);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: The purpose of acquiring a write lock is to wait for the currently ongoing sequential consumption process to complete.

@lizhimins lizhimins merged commit 3b12a25 into apache:develop Mar 31, 2026
10 of 12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Enhancement] Consumer may wait shorter time after receive reset offset request

2 participants