This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new ec4a21c476 [ISSUE #9454] Make sure topic can be deleted when using
RocksDB CQ (#9457)
ec4a21c476 is described below
commit ec4a21c47660d26c1a8c393e95af9025a54a3fb7
Author: qianye <[email protected]>
AuthorDate: Mon Jun 9 13:58:53 2025 +0800
[ISSUE #9454] Make sure topic can be deleted when using RocksDB CQ (#9457)
---
.../common/config/AbstractRocksDBStorage.java | 103 +++++++++++++++++++++
.../queue/RocksDBConsumeQueueOffsetTable.java | 28 +++++-
.../store/queue/RocksDBConsumeQueueStore.java | 17 ++++
.../store/rocksdb/ConsumeQueueRocksDBStorage.java | 1 -
4 files changed, 146 insertions(+), 3 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index 6c0bce5929..c47825e855 100644
---
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.ThreadUtils;
@@ -47,6 +49,8 @@ import org.rocksdb.Priority;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
import org.rocksdb.Statistics;
import org.rocksdb.Status;
import org.rocksdb.WriteBatch;
@@ -311,6 +315,105 @@ public abstract class AbstractRocksDBStorage {
}
}
+ public void iterate(ColumnFamilyHandle columnFamilyHandle, final byte[]
prefix, BiConsumer<byte[], byte[]> callback)
+ throws RocksDBException {
+
+ if (ArrayUtils.isEmpty(prefix)) {
+ throw new RocksDBException("Prefix is not allowed to be null");
+ }
+
+ iterate(columnFamilyHandle, prefix, null, null, callback);
+ }
+
+ public void iterate(ColumnFamilyHandle columnFamilyHandle, byte[] prefix,
+ final byte[] start, final byte[] end, BiConsumer<byte[], byte[]>
callback) throws RocksDBException {
+
+ if (ArrayUtils.isEmpty(prefix) && ArrayUtils.isEmpty(start)) {
+ throw new RocksDBException("To determine lower boundary, prefix
and start may not be null at the same "
+ + "time.");
+ }
+
+ if (ArrayUtils.isEmpty(prefix) && ArrayUtils.isEmpty(end)) {
+ throw new RocksDBException("To determine upper boundary, prefix
and end may not be null at the same time.");
+ }
+
+ if (columnFamilyHandle == null) {
+ return;
+ }
+
+ ReadOptions readOptions = null;
+ Slice startSlice = null;
+ Slice endSlice = null;
+ Slice prefixSlice = null;
+ RocksIterator iterator = null;
+ try {
+ readOptions = new ReadOptions();
+ readOptions.setTotalOrderSeek(true);
+ readOptions.setReadaheadSize(4L * 1024 * 1024);
+ boolean hasStart = !ArrayUtils.isEmpty(start);
+ boolean hasPrefix = !ArrayUtils.isEmpty(prefix);
+
+ if (hasStart) {
+ startSlice = new Slice(start);
+ readOptions.setIterateLowerBound(startSlice);
+ }
+
+ if (!ArrayUtils.isEmpty(end)) {
+ endSlice = new Slice(end);
+ readOptions.setIterateUpperBound(endSlice);
+ }
+
+ if (!hasStart && hasPrefix) {
+ prefixSlice = new Slice(prefix);
+ readOptions.setIterateLowerBound(prefixSlice);
+ }
+
+ iterator = db.newIterator(columnFamilyHandle, readOptions);
+ if (hasStart) {
+ iterator.seek(start);
+ } else if (hasPrefix) {
+ iterator.seek(prefix);
+ }
+
+ while (iterator.isValid()) {
+ byte[] key = iterator.key();
+ if (hasPrefix && !checkPrefix(key, prefix)) {
+ break;
+ }
+ callback.accept(iterator.key(), iterator.value());
+ iterator.next();
+ }
+ } finally {
+ if (startSlice != null) {
+ startSlice.close();
+ }
+ if (endSlice != null) {
+ endSlice.close();
+ }
+ if (prefixSlice != null) {
+ prefixSlice.close();
+ }
+ if (readOptions != null) {
+ readOptions.close();
+ }
+ if (iterator != null) {
+ iterator.close();
+ }
+ }
+ }
+
+ private boolean checkPrefix(byte[] key, byte[] upperBound) {
+ if (key.length < upperBound.length) {
+ return false;
+ }
+ for (int i = 0; i < upperBound.length; i++) {
+ if (key[i] > upperBound[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
protected void manualCompactionDefaultCfRange(CompactRangeOptions
compactRangeOptions) {
if (!hold()) {
return;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
index 821cb23baa..da898cf78b 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.store.queue;
+import static org.apache.rocketmq.common.config.AbstractRocksDBStorage.CTRL_1;
+
import io.netty.util.internal.PlatformDependent;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -44,8 +46,6 @@ import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
-import static org.apache.rocketmq.common.config.AbstractRocksDBStorage.CTRL_1;
-
public class RocksDBConsumeQueueOffsetTable {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final Logger ERROR_LOG =
LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
@@ -87,6 +87,14 @@ public class RocksDBConsumeQueueOffsetTable {
public static final int OFFSET_KEY_LENGTH_WITHOUT_TOPIC_BYTES = 4 + 1 + 1
+ 3 + 1 + 4;
private static final int OFFSET_VALUE_LENGTH = 8 + 8;
+ /**
+ *
┌─────────────────────────┬───────────┬───────────┬───────────┬───────────┐
+ * │ Topic Bytes Array Size │ CTRL_1 │ CTRL_1 │ Max(Min) │ CTRL_1
│
+ * │ (4 Bytes) │ (1 Bytes) │ (1 Bytes) │ (3 Bytes) │ (1
Bytes) │
+ *
├─────────────────────────┴───────────┴───────────┴───────────┴───────────┤
+ */
+ public static final int OFFSET_KEY_LENGTH_WITHOUT_TOPIC_QUEUE_ID_BYTES = 4
+ 1 + 1 + 3 + 1;
+
/**
* We use a new system topic='CHECKPOINT_TOPIC' to record the maxPhyOffset
built by CQ dispatch thread.
*
@@ -139,6 +147,22 @@ public class RocksDBConsumeQueueOffsetTable {
loadMaxConsumeQueueOffsets();
}
+ public Set<Integer> scanAllQueueIdInTopic(String topic) throws
RocksDBException {
+ Set<Integer> queueIdSet = new HashSet<>();
+ byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8);
+ ByteBuffer byteBuffer =
ByteBuffer.allocate(OFFSET_KEY_LENGTH_WITHOUT_TOPIC_QUEUE_ID_BYTES +
topicBytes.length);
+
byteBuffer.putInt(topicBytes.length).put(CTRL_1).put(topicBytes).put(CTRL_1).put(MAX_BYTES).put(CTRL_1);
+ byteBuffer.flip();
+ byte[] prefix = byteBuffer.array();
+ rocksDBStorage.iterate(offsetCFH, prefix, (keyBytes, unused) -> {
+ ByteBuffer keyBuffer = ByteBuffer.wrap(keyBytes);
+ keyBuffer.position(prefix.length);
+ int queueId = keyBuffer.getInt();
+ queueIdSet.add(queueId);
+ });
+ return queueIdSet;
+ }
+
private void loadMaxConsumeQueueOffsets() {
Function<OffsetEntry, Boolean> predicate = entry -> entry.type ==
OffsetEntryType.MAXIMUM;
Consumer<OffsetEntry> fn = entry -> {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index 2176a2fa6a..9e72b0e565 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -403,6 +403,23 @@ public class RocksDBConsumeQueueStore extends
AbstractConsumeQueueStore {
}
}
+ /**
+ * ConsumerQueueTable, as an in-memory data structure, uses lazy loading
mechanism in RocksDBConsumeQueueStore.
+ * This means that when the broker restarts, it may not be able to
retrieve all ConsumerQueues from the table.
+ * Therefore, before deleting a topic, we need to attempt to build all
ConsumerQueues under that topic to ensure
+ * the completeness of the deletion operation.
+ */
+ @Override
+ public boolean deleteTopic(String topic) {
+ try {
+ Set<Integer> queueIds =
rocksDBConsumeQueueOffsetTable.scanAllQueueIdInTopic(topic);
+ queueIds.forEach(queueId -> findOrCreateConsumeQueue(topic,
queueId));
+ } catch (RocksDBException e) {
+ ERROR_LOG.error("Failed to scan queueIds for topic. topic={}",
topic, e);
+ }
+ return super.deleteTopic(topic);
+ }
+
@Override
public void flush() throws StoreException {
try (FlushOptions flushOptions = new FlushOptions()) {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
index b343a5b4b5..b04aeab6bd 100644
---
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
+++
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.store.rocksdb;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
import org.apache.rocketmq.store.MessageStore;