Senrian opened a new pull request, #10180: URL: https://github.com/apache/rocketmq/pull/10180
## Brief Description The `topicGroupTable` in `ConsumerManager` uses `ConcurrentHashMap` but stores plain `HashSet` values, which are **NOT thread-safe**. When multiple consumers concurrently register with the same topic via heartbeat requests, `HashSet.add()` may lose entries due to race conditions. ## Concurrent Scenarios That Trigger This Bug - Multiple consumers start up simultaneously and send heartbeat requests - Network reconnect triggers batch consumer re-registration - Proxy syncs consumer info to broker concurrently ## Impact - Consumer groups may not be recorded in `topicGroupTable`, causing message routing failures - In extreme cases, `HashSet` internal structure may be corrupted, leading to infinite loops during iteration ## Solution Replace `HashSet` with `ConcurrentHashMap.newKeySet()` which provides thread-safe add/remove operations using CAS + synchronized mechanism. Additionally, replaced the `topicGroupTable.get() + null check + putIfAbsent()` pattern with `computeIfAbsent()` for atomic get-or-create, eliminating a potential TOCTOU race window. This pattern is **already used** in other RocketMQ components: - `LiteSubscriptionRegistryImpl.liteTopic2Group` - `TopicList.topicList` - `LiteSubscription.liteTopicSet` ## Modified Files - `broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java` ## Testing The fix uses `ConcurrentHashMap.newKeySet()` which is a well-established thread-safe pattern in the JVM ecosystem. The change is minimal and non-breaking - it only changes the internal synchronization mechanism without altering any public API or behavior visible to callers. --- Fixes #10179 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
