This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new ec4a21c476 [ISSUE #9454] Make sure topic can be deleted when using 
RocksDB CQ (#9457)
ec4a21c476 is described below

commit ec4a21c47660d26c1a8c393e95af9025a54a3fb7
Author: qianye <[email protected]>
AuthorDate: Mon Jun 9 13:58:53 2025 +0800

    [ISSUE #9454] Make sure topic can be deleted when using RocksDB CQ (#9457)
---
 .../common/config/AbstractRocksDBStorage.java      | 103 +++++++++++++++++++++
 .../queue/RocksDBConsumeQueueOffsetTable.java      |  28 +++++-
 .../store/queue/RocksDBConsumeQueueStore.java      |  17 ++++
 .../store/rocksdb/ConsumeQueueRocksDBStorage.java  |   1 -
 4 files changed, 146 insertions(+), 3 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index 6c0bce5929..c47825e855 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.ThreadUtils;
@@ -47,6 +49,8 @@ import org.rocksdb.Priority;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
 import org.rocksdb.Statistics;
 import org.rocksdb.Status;
 import org.rocksdb.WriteBatch;
@@ -311,6 +315,105 @@ public abstract class AbstractRocksDBStorage {
         }
     }
 
+    public void iterate(ColumnFamilyHandle columnFamilyHandle, final byte[] 
prefix, BiConsumer<byte[], byte[]> callback)
+        throws RocksDBException {
+
+        if (ArrayUtils.isEmpty(prefix)) {
+            throw new RocksDBException("Prefix is not allowed to be null");
+        }
+
+        iterate(columnFamilyHandle, prefix, null, null, callback);
+    }
+
+    public void iterate(ColumnFamilyHandle columnFamilyHandle, byte[] prefix,
+        final byte[] start, final byte[] end, BiConsumer<byte[], byte[]> 
callback) throws RocksDBException {
+
+        if (ArrayUtils.isEmpty(prefix) && ArrayUtils.isEmpty(start)) {
+            throw new RocksDBException("To determine lower boundary, prefix 
and start may not be null at the same "
+                + "time.");
+        }
+
+        if (ArrayUtils.isEmpty(prefix) && ArrayUtils.isEmpty(end)) {
+            throw new RocksDBException("To determine upper boundary, prefix 
and end may not be null at the same time.");
+        }
+
+        if (columnFamilyHandle == null) {
+            return;
+        }
+
+        ReadOptions readOptions = null;
+        Slice startSlice = null;
+        Slice endSlice = null;
+        Slice prefixSlice = null;
+        RocksIterator iterator = null;
+        try {
+            readOptions = new ReadOptions();
+            readOptions.setTotalOrderSeek(true);
+            readOptions.setReadaheadSize(4L * 1024 * 1024);
+            boolean hasStart = !ArrayUtils.isEmpty(start);
+            boolean hasPrefix = !ArrayUtils.isEmpty(prefix);
+
+            if (hasStart) {
+                startSlice = new Slice(start);
+                readOptions.setIterateLowerBound(startSlice);
+            }
+
+            if (!ArrayUtils.isEmpty(end)) {
+                endSlice = new Slice(end);
+                readOptions.setIterateUpperBound(endSlice);
+            }
+
+            if (!hasStart && hasPrefix) {
+                prefixSlice = new Slice(prefix);
+                readOptions.setIterateLowerBound(prefixSlice);
+            }
+
+            iterator = db.newIterator(columnFamilyHandle, readOptions);
+            if (hasStart) {
+                iterator.seek(start);
+            } else if (hasPrefix) {
+                iterator.seek(prefix);
+            }
+
+            while (iterator.isValid()) {
+                byte[] key = iterator.key();
+                if (hasPrefix && !checkPrefix(key, prefix)) {
+                    break;
+                }
+                callback.accept(iterator.key(), iterator.value());
+                iterator.next();
+            }
+        } finally {
+            if (startSlice != null) {
+                startSlice.close();
+            }
+            if (endSlice != null) {
+                endSlice.close();
+            }
+            if (prefixSlice != null) {
+                prefixSlice.close();
+            }
+            if (readOptions != null) {
+                readOptions.close();
+            }
+            if (iterator != null) {
+                iterator.close();
+            }
+        }
+    }
+
+    private boolean checkPrefix(byte[] key, byte[] upperBound) {
+        if (key.length < upperBound.length) {
+            return false;
+        }
+        for (int i = 0; i < upperBound.length; i++) {
+            if (key[i] > upperBound[i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     protected void manualCompactionDefaultCfRange(CompactRangeOptions 
compactRangeOptions) {
         if (!hold()) {
             return;
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
index 821cb23baa..da898cf78b 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.store.queue;
 
+import static org.apache.rocketmq.common.config.AbstractRocksDBStorage.CTRL_1;
+
 import io.netty.util.internal.PlatformDependent;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
@@ -44,8 +46,6 @@ import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
 import org.rocksdb.WriteBatch;
 
-import static org.apache.rocketmq.common.config.AbstractRocksDBStorage.CTRL_1;
-
 public class RocksDBConsumeQueueOffsetTable {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
     private static final Logger ERROR_LOG = 
LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
@@ -87,6 +87,14 @@ public class RocksDBConsumeQueueOffsetTable {
     public static final int OFFSET_KEY_LENGTH_WITHOUT_TOPIC_BYTES = 4 + 1 + 1 
+ 3 + 1 + 4;
     private static final int OFFSET_VALUE_LENGTH = 8 + 8;
 
+    /**
+     * 
┌─────────────────────────┬───────────┬───────────┬───────────┬───────────┐
+     * │ Topic Bytes Array Size  │  CTRL_1   │  CTRL_1   │  Max(Min) │  CTRL_1 
  │
+     * │        (4 Bytes)        │ (1 Bytes) │ (1 Bytes) │ (3 Bytes) │ (1 
Bytes) │
+     * 
├─────────────────────────┴───────────┴───────────┴───────────┴───────────┤
+     */
+    public static final int OFFSET_KEY_LENGTH_WITHOUT_TOPIC_QUEUE_ID_BYTES = 4 
+ 1 + 1 + 3 + 1;
+
     /**
      * We use a new system topic='CHECKPOINT_TOPIC' to record the maxPhyOffset 
built by CQ dispatch thread.
      *
@@ -139,6 +147,22 @@ public class RocksDBConsumeQueueOffsetTable {
         loadMaxConsumeQueueOffsets();
     }
 
+    public Set<Integer> scanAllQueueIdInTopic(String topic) throws 
RocksDBException {
+        Set<Integer> queueIdSet = new HashSet<>();
+        byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8);
+        ByteBuffer byteBuffer = 
ByteBuffer.allocate(OFFSET_KEY_LENGTH_WITHOUT_TOPIC_QUEUE_ID_BYTES + 
topicBytes.length);
+        
byteBuffer.putInt(topicBytes.length).put(CTRL_1).put(topicBytes).put(CTRL_1).put(MAX_BYTES).put(CTRL_1);
+        byteBuffer.flip();
+        byte[] prefix = byteBuffer.array();
+        rocksDBStorage.iterate(offsetCFH, prefix, (keyBytes, unused) -> {
+            ByteBuffer keyBuffer = ByteBuffer.wrap(keyBytes);
+            keyBuffer.position(prefix.length);
+            int queueId = keyBuffer.getInt();
+            queueIdSet.add(queueId);
+        });
+        return queueIdSet;
+    }
+
     private void loadMaxConsumeQueueOffsets() {
         Function<OffsetEntry, Boolean> predicate = entry -> entry.type == 
OffsetEntryType.MAXIMUM;
         Consumer<OffsetEntry> fn = entry -> {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index 2176a2fa6a..9e72b0e565 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -403,6 +403,23 @@ public class RocksDBConsumeQueueStore extends 
AbstractConsumeQueueStore {
         }
     }
 
+    /**
+     * ConsumerQueueTable, as an in-memory data structure, uses lazy loading 
mechanism in RocksDBConsumeQueueStore.
+     * This means that when the broker restarts, it may not be able to 
retrieve all ConsumerQueues from the table.
+     * Therefore, before deleting a topic, we need to attempt to build all 
ConsumerQueues under that topic to ensure
+     * the completeness of the deletion operation.
+     */
+    @Override
+    public boolean deleteTopic(String topic) {
+        try {
+            Set<Integer> queueIds = 
rocksDBConsumeQueueOffsetTable.scanAllQueueIdInTopic(topic);
+            queueIds.forEach(queueId -> findOrCreateConsumeQueue(topic, 
queueId));
+        } catch (RocksDBException e) {
+            ERROR_LOG.error("Failed to scan queueIds for topic. topic={}", 
topic, e);
+        }
+        return super.deleteTopic(topic);
+    }
+
     @Override
     public void flush() throws StoreException {
         try (FlushOptions flushOptions = new FlushOptions()) {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
index b343a5b4b5..b04aeab6bd 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.store.rocksdb;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
 import org.apache.rocketmq.store.MessageStore;

Reply via email to