Skip to content

[ISSUE #xxxx] Fix thread-safe issue in ConsumerManager.topicGroupTable#10179

Open
cherryMJY wants to merge 3 commits intoapache:developfrom
cherryMJY:develop
Open

[ISSUE #xxxx] Fix thread-safe issue in ConsumerManager.topicGroupTable#10179
cherryMJY wants to merge 3 commits intoapache:developfrom
cherryMJY:develop

Conversation

@cherryMJY
Copy link

Brief Description
The topicGroupTable in ConsumerManager uses HashSet to store consumer groups, which is not thread-safe. When multiple consumers concurrently register with the same topic via heartbeat requests, HashSet.add() may lose entries due to race conditions.

Concurrent scenarios that trigger this bug:

Multiple consumers start up simultaneously and send heartbeat requests
Network reconnect triggers batch consumer re-registration
Proxy syncs consumer info to broker concurrently
Impact:

Consumer groups may not be recorded in topicGroupTable, causing message routing failures
In extreme cases, HashSet internal structure may be corrupted, leading to infinite loops during iteration
Solution:
Replace HashSet with ConcurrentHashMap.newKeySet() which provides thread-safe add/remove operations using CAS + synchronized mechanism. This pattern is already used in other RocketMQ components:

LiteSubscriptionRegistryImpl.liteTopic2Group
TopicList.topicList
LiteSubscription.liteTopicSet
Modified files:

broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java

Replace HashSet with ConcurrentHashMap.newKeySet() to prevent data loss
when multiple consumers concurrently register with the same topic.

HashSet is not thread-safe and may lose entries under concurrent add()
operations. ConcurrentHashMap.newKeySet() provides thread-safe mutations
and is already used in other RocketMQ components.
Senrian added a commit to Senrian/rocketmq that referenced this pull request Mar 20, 2026
…fe topicGroupTable

The topicGroupTable in ConsumerManager uses ConcurrentHashMap but stores
plain HashSet values, which are NOT thread-safe. When multiple consumers
concurrently register with the same topic via heartbeat requests,
HashSet.add() may lose entries due to race conditions.

This fix replaces:
  - new HashSet<>() with ConcurrentHashMap.newKeySet()
  - topicGroupTable.get() + null check + putIfAbsent() pattern with
    computeIfAbsent() for atomic get-or-create

This pattern is already used in other RocketMQ components:
  - LiteSubscriptionRegistryImpl.liteTopic2Group
  - TopicList.topicList  
  - LiteSubscription.liteTopicSet

Fixes thread-safety issue reported in apache#10179
lizhimins
lizhimins previously approved these changes Mar 21, 2026
@codecov-commenter
Copy link

codecov-commenter commented Mar 21, 2026

Codecov Report

❌ Patch coverage is 95.45455% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 48.89%. Comparing base (e100743) to head (4e2c2ef).

Files with missing lines Patch % Lines
...saction/queue/TransactionalMessageServiceImpl.java 93.75% 1 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             develop   #10179      +/-   ##
=============================================
- Coverage      48.93%   48.89%   -0.04%     
+ Complexity     13384    13365      -19     
=============================================
  Files           1373     1373              
  Lines          99924    99936      +12     
  Branches       12908    12907       -1     
=============================================
- Hits           48898    48866      -32     
- Misses         45098    45133      +35     
- Partials        5928     5937       +9     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

…e check when putBackHalfMsgQueue fails

Problem: When putBackHalfMsgQueue() fails during transaction message check, the code executes continue without updating the offset, causing an infinite loop that blocks the entire transaction message processing for up to 60 seconds.

Solution: Add limited retry mechanism (PUT_BACK_RETRY_TIMES = 3) with exponential backoff. Skip the message after retries are exhausted and update offset to continue. Add detailed error log for troubleshooting.

Impact: Before - 60s blocking per failed message with CPU spinning. After - ~300ms per failed message then continue.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants