This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d556460d86 fix: clean pull offset in #removeOffset (#9490)
d556460d86 is described below
commit d556460d86d82742bf662b7a7020710289d260d2
Author: hqbfz <[email protected]>
AuthorDate: Mon Jun 23 18:41:23 2025 +0800
fix: clean pull offset in #removeOffset (#9490)
Co-authored-by: hqbfzwang <[email protected]>
---
.../broker/config/v2/ConsumerOffsetManagerV2.java | 55 +++++++++++++++-------
1 file changed, 38 insertions(+), 17 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
index 1821c801cb..28214baf1c 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
@@ -118,33 +118,54 @@ public class ConsumerOffsetManagerV2 extends
ConsumerOffsetManager {
+ Short.BYTES /* group-len */ + groupBytes.length + 1 /* CTRL_1 */;
// [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1
byte][group-len, 2 bytes][group bytes][CTRL_1, 1 byte]
- ByteBuf beginKey =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
- beginKey.writeByte(TablePrefix.TABLE.getValue());
- beginKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
- beginKey.writeByte(RecordPrefix.DATA.getValue());
- beginKey.writeShort(groupBytes.length);
- beginKey.writeBytes(groupBytes);
- beginKey.writeByte(AbstractRocksDBStorage.CTRL_1);
+ ByteBuf consumerOffsetBeginKey =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+ consumerOffsetBeginKey.writeByte(TablePrefix.TABLE.getValue());
+ consumerOffsetBeginKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
+ consumerOffsetBeginKey.writeByte(RecordPrefix.DATA.getValue());
+ consumerOffsetBeginKey.writeShort(groupBytes.length);
+ consumerOffsetBeginKey.writeBytes(groupBytes);
+ consumerOffsetBeginKey.writeByte(AbstractRocksDBStorage.CTRL_1);
+
+ ByteBuf consumerOffsetEndKey =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+ consumerOffsetEndKey.writeByte(TablePrefix.TABLE.getValue());
+ consumerOffsetEndKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
+ consumerOffsetEndKey.writeByte(RecordPrefix.DATA.getValue());
+ consumerOffsetEndKey.writeShort(groupBytes.length);
+ consumerOffsetEndKey.writeBytes(groupBytes);
+ consumerOffsetEndKey.writeByte(AbstractRocksDBStorage.CTRL_2);
- ByteBuf endKey =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
- endKey.writeByte(TablePrefix.TABLE.getValue());
- endKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
- endKey.writeByte(RecordPrefix.DATA.getValue());
- endKey.writeShort(groupBytes.length);
- endKey.writeBytes(groupBytes);
- endKey.writeByte(AbstractRocksDBStorage.CTRL_2);
+ // [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1
byte][group-len, 2 bytes][group bytes][CTRL_1, 1 byte]
+ ByteBuf pullOffsetBeginKey =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+ pullOffsetBeginKey.writeByte(TablePrefix.TABLE.getValue());
+ pullOffsetBeginKey.writeShort(TableId.PULL_OFFSET.getValue());
+ pullOffsetBeginKey.writeByte(RecordPrefix.DATA.getValue());
+ pullOffsetBeginKey.writeShort(groupBytes.length);
+ pullOffsetBeginKey.writeBytes(groupBytes);
+ pullOffsetBeginKey.writeByte(AbstractRocksDBStorage.CTRL_1);
+
+ ByteBuf pullOffsetEndKey =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+ pullOffsetEndKey.writeByte(TablePrefix.TABLE.getValue());
+ pullOffsetEndKey.writeShort(TableId.PULL_OFFSET.getValue());
+ pullOffsetEndKey.writeByte(RecordPrefix.DATA.getValue());
+ pullOffsetEndKey.writeShort(groupBytes.length);
+ pullOffsetEndKey.writeBytes(groupBytes);
+ pullOffsetEndKey.writeByte(AbstractRocksDBStorage.CTRL_2);
try (WriteBatch writeBatch = new WriteBatch()) {
// TODO: we have to make a copy here as WriteBatch lacks
ByteBuffer API here
- writeBatch.deleteRange(ConfigHelper.readBytes(beginKey),
ConfigHelper.readBytes(endKey));
+
writeBatch.deleteRange(ConfigHelper.readBytes(consumerOffsetBeginKey),
ConfigHelper.readBytes(consumerOffsetEndKey));
+ writeBatch.deleteRange(ConfigHelper.readBytes(pullOffsetBeginKey),
ConfigHelper.readBytes(pullOffsetEndKey));
MessageStore messageStore = brokerController.getMessageStore();
long stateMachineVersion = messageStore != null ?
messageStore.getStateMachineVersion() : 0;
ConfigHelper.stampDataVersion(writeBatch, TableId.CONSUMER_OFFSET,
dataVersion, stateMachineVersion);
+ ConfigHelper.stampDataVersion(writeBatch, TableId.PULL_OFFSET,
dataVersion, stateMachineVersion);
configStorage.write(writeBatch);
} catch (RocksDBException e) {
LOG.error("Failed to consumer offsets by group={}", group, e);
} finally {
- beginKey.release();
- endKey.release();
+ consumerOffsetBeginKey.release();
+ consumerOffsetEndKey.release();
+ pullOffsetBeginKey.release();
+ pullOffsetEndKey.release();
}
}