This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d556460d86 fix: clean pull offset in #removeOffset (#9490)
d556460d86 is described below

commit d556460d86d82742bf662b7a7020710289d260d2
Author: hqbfz <[email protected]>
AuthorDate: Mon Jun 23 18:41:23 2025 +0800

    fix: clean pull offset in #removeOffset (#9490)
    
    Co-authored-by: hqbfzwang <[email protected]>
---
 .../broker/config/v2/ConsumerOffsetManagerV2.java  | 55 +++++++++++++++-------
 1 file changed, 38 insertions(+), 17 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
index 1821c801cb..28214baf1c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
@@ -118,33 +118,54 @@ public class ConsumerOffsetManagerV2 extends 
ConsumerOffsetManager {
             + Short.BYTES /* group-len */ + groupBytes.length + 1 /* CTRL_1 */;
 
         // [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1 
byte][group-len, 2 bytes][group bytes][CTRL_1, 1 byte]
-        ByteBuf beginKey = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
-        beginKey.writeByte(TablePrefix.TABLE.getValue());
-        beginKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
-        beginKey.writeByte(RecordPrefix.DATA.getValue());
-        beginKey.writeShort(groupBytes.length);
-        beginKey.writeBytes(groupBytes);
-        beginKey.writeByte(AbstractRocksDBStorage.CTRL_1);
+        ByteBuf consumerOffsetBeginKey = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+        consumerOffsetBeginKey.writeByte(TablePrefix.TABLE.getValue());
+        consumerOffsetBeginKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
+        consumerOffsetBeginKey.writeByte(RecordPrefix.DATA.getValue());
+        consumerOffsetBeginKey.writeShort(groupBytes.length);
+        consumerOffsetBeginKey.writeBytes(groupBytes);
+        consumerOffsetBeginKey.writeByte(AbstractRocksDBStorage.CTRL_1);
+
+        ByteBuf consumerOffsetEndKey = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+        consumerOffsetEndKey.writeByte(TablePrefix.TABLE.getValue());
+        consumerOffsetEndKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
+        consumerOffsetEndKey.writeByte(RecordPrefix.DATA.getValue());
+        consumerOffsetEndKey.writeShort(groupBytes.length);
+        consumerOffsetEndKey.writeBytes(groupBytes);
+        consumerOffsetEndKey.writeByte(AbstractRocksDBStorage.CTRL_2);
 
-        ByteBuf endKey = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
-        endKey.writeByte(TablePrefix.TABLE.getValue());
-        endKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
-        endKey.writeByte(RecordPrefix.DATA.getValue());
-        endKey.writeShort(groupBytes.length);
-        endKey.writeBytes(groupBytes);
-        endKey.writeByte(AbstractRocksDBStorage.CTRL_2);
+        // [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1 
byte][group-len, 2 bytes][group bytes][CTRL_1, 1 byte]
+        ByteBuf pullOffsetBeginKey = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+        pullOffsetBeginKey.writeByte(TablePrefix.TABLE.getValue());
+        pullOffsetBeginKey.writeShort(TableId.PULL_OFFSET.getValue());
+        pullOffsetBeginKey.writeByte(RecordPrefix.DATA.getValue());
+        pullOffsetBeginKey.writeShort(groupBytes.length);
+        pullOffsetBeginKey.writeBytes(groupBytes);
+        pullOffsetBeginKey.writeByte(AbstractRocksDBStorage.CTRL_1);
+
+        ByteBuf pullOffsetEndKey = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+        pullOffsetEndKey.writeByte(TablePrefix.TABLE.getValue());
+        pullOffsetEndKey.writeShort(TableId.PULL_OFFSET.getValue());
+        pullOffsetEndKey.writeByte(RecordPrefix.DATA.getValue());
+        pullOffsetEndKey.writeShort(groupBytes.length);
+        pullOffsetEndKey.writeBytes(groupBytes);
+        pullOffsetEndKey.writeByte(AbstractRocksDBStorage.CTRL_2);
         try (WriteBatch writeBatch = new WriteBatch()) {
             // TODO: we have to make a copy here as WriteBatch lacks 
ByteBuffer API here
-            writeBatch.deleteRange(ConfigHelper.readBytes(beginKey), 
ConfigHelper.readBytes(endKey));
+            
writeBatch.deleteRange(ConfigHelper.readBytes(consumerOffsetBeginKey), 
ConfigHelper.readBytes(consumerOffsetEndKey));
+            writeBatch.deleteRange(ConfigHelper.readBytes(pullOffsetBeginKey), 
ConfigHelper.readBytes(pullOffsetEndKey));
             MessageStore messageStore = brokerController.getMessageStore();
             long stateMachineVersion = messageStore != null ? 
messageStore.getStateMachineVersion() : 0;
             ConfigHelper.stampDataVersion(writeBatch, TableId.CONSUMER_OFFSET, 
dataVersion, stateMachineVersion);
+            ConfigHelper.stampDataVersion(writeBatch, TableId.PULL_OFFSET, 
dataVersion, stateMachineVersion);
             configStorage.write(writeBatch);
         } catch (RocksDBException e) {
             LOG.error("Failed to consumer offsets by group={}", group, e);
         } finally {
-            beginKey.release();
-            endKey.release();
+            consumerOffsetBeginKey.release();
+            consumerOffsetEndKey.release();
+            pullOffsetBeginKey.release();
+            pullOffsetEndKey.release();
         }
     }
 

Reply via email to