This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c5be5c8f2e [ISSUE #9304] Resolve cold data read control issue in 
DefaultMessageStore (#9305)
c5be5c8f2e is described below

commit c5be5c8f2e5714593d03896a3aff57dbb2a7812a
Author: ymwneu <[email protected]>
AuthorDate: Tue Apr 1 13:38:46 2025 +0800

    [ISSUE #9304] Resolve cold data read control issue in DefaultMessageStore 
(#9305)
---
 store/src/main/java/org/apache/rocketmq/store/CommitLog.java | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index b061aa7a0d..75000b25d2 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -64,6 +64,9 @@ import org.apache.rocketmq.store.ha.HAService;
 import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
 import org.apache.rocketmq.store.lock.AdaptiveBackOffSpinLockImpl;
 import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
+import org.apache.rocketmq.store.queue.CqUnit;
+import org.apache.rocketmq.store.queue.ReferredIterator;
 import org.apache.rocketmq.store.util.LibC;
 import org.rocksdb.RocksDBException;
 
@@ -2442,16 +2445,15 @@ public class CommitLog implements Swappable {
                 return false;
             }
             try {
-                ConsumeQueue consumeQueue = (ConsumeQueue) 
defaultMessageStore.findConsumeQueue(topic, queueId);
+                ConsumeQueueInterface consumeQueue = 
defaultMessageStore.findConsumeQueue(topic, queueId);
                 if (null == consumeQueue) {
                     return false;
                 }
-                SelectMappedBufferResult bufferConsumeQueue = 
consumeQueue.getIndexBuffer(offset);
-                if (null == bufferConsumeQueue || null == 
bufferConsumeQueue.getByteBuffer()) {
+                ReferredIterator<CqUnit> bufferConsumeQueue = 
consumeQueue.iterateFrom(offset, 1);
+                if (null == bufferConsumeQueue || 
!bufferConsumeQueue.hasNext()) {
                     return false;
                 }
-                long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
-                return 
defaultMessageStore.checkInColdAreaByCommitOffset(offsetPy, getMaxOffset());
+                return 
defaultMessageStore.checkInColdAreaByCommitOffset(bufferConsumeQueue.next().getPos(),
 getMaxOffset());
             } catch (Exception e) {
                 log.error("isMsgInColdArea group: {}, topic: {}, queueId: {}, 
offset: {}",
                     group, topic, queueId, offset, e);

Reply via email to