This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new ae92a189d37 HDDS-13415. Support DeleteRange operation as part of the 
rocksdb batch write (#8774)
ae92a189d37 is described below

commit ae92a189d37b98071b78b075163757c7b4b79aec
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Sun Aug 24 14:56:00 2025 -0400

    HDDS-13415. Support DeleteRange operation as part of the rocksdb batch 
write (#8774)
---
 .../ozone/container/metadata/DatanodeTable.java    |   5 +
 .../hadoop/hdds/utils/db/RDBBatchOperation.java    | 434 ++++++++++++++++++---
 .../org/apache/hadoop/hdds/utils/db/RDBTable.java  |   9 +
 .../apache/hadoop/hdds/utils/db/RocksDatabase.java |  10 +
 .../org/apache/hadoop/hdds/utils/db/Table.java     |   8 +
 .../apache/hadoop/hdds/utils/db/TypedTable.java    |   5 +
 .../hadoop/hdds/utils/db/InMemoryTestTable.java    |   5 +
 .../hdds/utils/db/TestRDBBatchOperation.java       | 129 ++++++
 .../hadoop/hdds/utils/db/TestRDBTableStore.java    |  84 +++-
 9 files changed, 619 insertions(+), 70 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
index 2621b1f7d85..8cbaec82f43 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
@@ -72,6 +72,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key) 
throws CodecException
     table.deleteWithBatch(batch, key);
   }
 
+  @Override
+  public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY 
endKey) throws CodecException {
+    table.deleteRangeWithBatch(batch, beginKey, endKey);
+  }
+
   @Override
   public final KeyValueIterator<KEY, VALUE> iterator(KEY prefix, 
KeyValueIterator.Type type) {
     throw new UnsupportedOperationException("Iterating tables directly is not" 
+
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
index 8b9fa7295c7..f7b025ed98f 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
@@ -20,12 +20,22 @@
 import static org.apache.hadoop.hdds.StringUtils.bytes2String;
 
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedBytes;
+import java.io.Closeable;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions;
@@ -50,7 +60,7 @@ public class RDBBatchOperation implements BatchOperation {
 
   private final OpCache opCache = new OpCache();
 
-  private enum Op { DELETE }
+  private enum Op { DELETE, PUT, DELETE_RANGE }
 
   private static void debug(Supplier<String> message) {
     if (LOG.isTraceEnabled()) {
@@ -67,11 +77,11 @@ private static String countSize2String(int count, long 
size) {
   }
 
   /**
-   * The key type of {@link RDBBatchOperation.OpCache.FamilyCache#ops}.
+   * The key type of {@link RDBBatchOperation.OpCache.FamilyCache#opsKeys}.
    * To implement {@link #equals(Object)} and {@link #hashCode()}
    * based on the contents of the bytes.
    */
-  static final class Bytes {
+  static final class Bytes implements Comparable<Bytes> {
     private final byte[] array;
     private final CodecBuffer buffer;
     /** Cache the hash value. */
@@ -89,10 +99,6 @@ static final class Bytes {
       this.hash = ByteBuffer.wrap(array).hashCode();
     }
 
-    byte[] array() {
-      return array;
-    }
-
     ByteBuffer asReadOnlyByteBuffer() {
       return buffer.asReadOnlyByteBuffer();
     }
@@ -125,6 +131,195 @@ public String toString() {
       return array != null ? bytes2String(array)
           : bytes2String(asReadOnlyByteBuffer());
     }
+
+    // This method mimics the ByteWiseComparator in RocksDB.
+    @Override
+    public int compareTo(RDBBatchOperation.Bytes that) {
+      final ByteBuffer thisBuf = this.array != null ?
+          ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer();
+      final ByteBuffer thatBuf = that.array != null ?
+          ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer();
+
+      for (int i = 0; i < Math.min(thisBuf.remaining(), thatBuf.remaining()); 
i++) {
+        int cmp = UnsignedBytes.compare(thisBuf.get(i), thatBuf.get(i));
+        if (cmp != 0) {
+          return cmp;
+        }
+      }
+      return thisBuf.remaining() - thatBuf.remaining();
+    }
+  }
+
+  private abstract class Operation implements Closeable {
+    private Bytes keyBytes;
+
+    private Operation(Bytes keyBytes) {
+      this.keyBytes = keyBytes;
+    }
+
+    abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws 
RocksDatabaseException;
+
+    abstract int keyLen();
+
+    abstract int valLen();
+
+    Bytes getKey() {
+      return keyBytes;
+    }
+
+    int totalLength() {
+      return keyLen() + valLen();
+    }
+
+    abstract Op getOpType();
+
+    @Override
+    public void close() {
+    }
+  }
+
+  /**
+   * Delete operation to be applied to a {@link ColumnFamily} batch.
+   */
+  private final class DeleteOperation extends Operation {
+    private final byte[] key;
+
+    private DeleteOperation(byte[] key, Bytes keyBytes) {
+      super(Objects.requireNonNull(keyBytes, "keyBytes == null"));
+      this.key = Objects.requireNonNull(key, "key == null");
+    }
+
+    @Override
+    public void apply(ColumnFamily family, ManagedWriteBatch batch) throws 
RocksDatabaseException {
+      family.batchDelete(batch, this.key);
+    }
+
+    @Override
+    public int keyLen() {
+      return key.length;
+    }
+
+    @Override
+    public int valLen() {
+      return 0;
+    }
+
+    @Override
+    public Op getOpType() {
+      return Op.DELETE;
+    }
+  }
+
+  /**
+   * Put operation to be applied to a {@link ColumnFamily} batch using the 
CodecBuffer api.
+   */
+  private final class CodecBufferPutOperation extends Operation {
+    private final CodecBuffer key;
+    private final CodecBuffer value;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    private CodecBufferPutOperation(CodecBuffer key, CodecBuffer value, Bytes 
keyBytes) {
+      super(keyBytes);
+      this.key = key;
+      this.value = value;
+    }
+
+    @Override
+    public void apply(ColumnFamily family, ManagedWriteBatch batch) throws 
RocksDatabaseException {
+      family.batchPut(batch, key.asReadOnlyByteBuffer(), 
value.asReadOnlyByteBuffer());
+    }
+
+    @Override
+    public int keyLen() {
+      return key.readableBytes();
+    }
+
+    @Override
+    public int valLen() {
+      return value.readableBytes();
+    }
+
+    @Override
+    public Op getOpType() {
+      return Op.PUT;
+    }
+
+    @Override
+    public void close() {
+      if (closed.compareAndSet(false, true)) {
+        key.release();
+        value.release();
+      }
+      super.close();
+    }
+  }
+
+  /**
+   * Put operation to be applied to a {@link ColumnFamily} batch using the 
byte array api.
+   */
+  private final class ByteArrayPutOperation extends Operation {
+    private final byte[] key;
+    private final byte[] value;
+
+    private ByteArrayPutOperation(byte[] key, byte[] value, Bytes keyBytes) {
+      super(Objects.requireNonNull(keyBytes));
+      this.key = Objects.requireNonNull(key, "key == null");
+      this.value = Objects.requireNonNull(value, "value == null");
+    }
+
+    @Override
+    public void apply(ColumnFamily family, ManagedWriteBatch batch) throws 
RocksDatabaseException {
+      family.batchPut(batch, key, value);
+    }
+
+    @Override
+    public int keyLen() {
+      return key.length;
+    }
+
+    @Override
+    public int valLen() {
+      return value.length;
+    }
+
+    @Override
+    public Op getOpType() {
+      return Op.PUT;
+    }
+  }
+
+  /**
+   * Delete range operation to be applied to a {@link ColumnFamily} batch.
+   */
+  private final class DeleteRangeOperation extends Operation {
+    private final byte[] startKey;
+    private final byte[] endKey;
+
+    private DeleteRangeOperation(byte[] startKey, byte[] endKey) {
+      super(null);
+      this.startKey = Objects.requireNonNull(startKey, "startKey == null");
+      this.endKey = Objects.requireNonNull(endKey, "endKey == null");
+    }
+
+    @Override
+    public void apply(ColumnFamily family, ManagedWriteBatch batch) throws 
RocksDatabaseException {
+      family.batchDeleteRange(batch, startKey, endKey);
+    }
+
+    @Override
+    public int keyLen() {
+      return startKey.length + endKey.length;
+    }
+
+    @Override
+    public int valLen() {
+      return 0;
+    }
+
+    @Override
+    public Op getOpType() {
+      return Op.DELETE_RANGE;
+    }
   }
 
   /** Cache and deduplicate db ops (put/delete). */
@@ -136,12 +331,40 @@ private class OpCache {
     private class FamilyCache {
       private final ColumnFamily family;
       /**
-       * A (dbKey -> dbValue) map, where the dbKey type is {@link Bytes}
-       * and the dbValue type is {@link Object}.
-       * When dbValue is a byte[]/{@link ByteBuffer}, it represents a put-op.
-       * Otherwise, it represents a delete-op (dbValue is {@link Op#DELETE}).
+       * A mapping of operation keys to their respective indices in {@code 
FamilyCache}.
+       *
+       * Key details:
+       * - Maintains a mapping of unique operation keys to their insertion or 
processing order.
+       * - Used internally to manage and sort operations during batch writes.
+       * - Facilitates filtering, overwriting, or deletion of operations based 
on their keys.
+       *
+       * Constraints:
+       * - Keys must be unique, represented using {@link Bytes}, to avoid 
collisions.
+       * - Each key is associated with a unique integer index to track 
insertion order.
+       *
+       * This field plays a critical role in managing the logical consistency 
and proper execution
+       * order of operations stored in the batch when interacting with a 
RocksDB-backed system.
+       */
+      private final Map<Bytes, Integer> opsKeys = new HashMap<>();
+      /**
+       * Maintains a mapping of unique operation indices to their 
corresponding {@code Operation} instances.
+       *
+       * This map serves as the primary container for recording operations in 
preparation for a batch write
+       * within a RocksDB-backed system. Each operation is referenced by an 
integer index, which determines
+       * its insertion order and ensures correct sequencing during batch 
execution.
+       *
+       * Key characteristics:
+       * - Stores operations of type {@code Operation}.
+       * - Uses a unique integer key (index) for mapping each operation.
+       * - Serves as an intermediary structure during batch preparation and 
execution.
+       *
+       * Usage context:
+       * - This map is managed as part of the batch-writing process, which 
involves organizing,
+       *   filtering, and applying multiple operations in a single cohesive 
batch.
+       * - Operations stored in this map are expected to define specific 
actions (e.g., put, delete,
+       *   delete range) and their associated data (e.g., keys, values).
        */
-      private final Map<Bytes, Object> ops = new HashMap<>();
+      private final Map<Integer, Operation> batchOps = new HashMap<>();
       private boolean isCommit;
 
       private long batchSize;
@@ -149,31 +372,105 @@ private class FamilyCache {
       private int discardedCount;
       private int putCount;
       private int delCount;
+      private int delRangeCount;
+      private AtomicInteger opIndex;
 
       FamilyCache(ColumnFamily family) {
         this.family = family;
+        this.opIndex = new AtomicInteger(0);
       }
 
-      /** Prepare batch write for the entire family. */
+      /**
+       * Prepares a batch write operation for a RocksDB-backed system.
+       *
+       * This method ensures the orderly execution of operations accumulated 
in the batch,
+       * respecting their respective types and order of insertion.
+       *
+       * Key functionalities:
+       * 1. Ensures that the batch is not already committed before proceeding.
+       * 2. Sorts all operations by their `opIndex` to maintain a consistent 
execution order.
+       * 3. Filters and adapts operations to account for any delete range 
operations that might
+       *    affect other operations in the batch:
+       *    - Operations with keys that fall within the range specified by a 
delete range operation
+       *      are discarded.
+       *    - Delete range operations are executed in their correct order.
+       * 4. Applies remaining operations to the write batch, ensuring proper 
filtering and execution.
+       * 5. Logs a summary of the batch execution for debugging purposes.
+       *
+       * Throws:
+       * - RocksDatabaseException if any error occurs while applying 
operations to the write batch.
+       *
+       * Prerequisites:
+       * - The method assumes that the operations are represented by 
`Operation` objects, each of which
+       *   encapsulates the logic for its specific type.
+       * - Delete range operations must be represented by the 
`DeleteRangeOperation` class.
+       */
       void prepareBatchWrite() throws RocksDatabaseException {
         Preconditions.checkState(!isCommit, "%s is already committed.", this);
         isCommit = true;
-        for (Map.Entry<Bytes, Object> op : ops.entrySet()) {
-          final Bytes key = op.getKey();
-          final Object value = op.getValue();
-          if (value instanceof byte[]) {
-            family.batchPut(writeBatch, key.array(), (byte[]) value);
-          } else if (value instanceof CodecBuffer) {
-            family.batchPut(writeBatch, key.asReadOnlyByteBuffer(),
-                ((CodecBuffer) value).asReadOnlyByteBuffer());
-          } else if (value == Op.DELETE) {
-            family.batchDelete(writeBatch, key.array());
-          } else {
-            throw new IllegalStateException("Unexpected value: " + value
-                + ", class=" + value.getClass().getSimpleName());
+        // Sort Entries based on opIndex and flush the operation to the batch 
in the same order.
+        List<Operation> ops = 
batchOps.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey))
+            .map(Map.Entry::getValue).collect(Collectors.toList());
+        List<List<Integer>> deleteRangeIndices = new ArrayList<>();
+        int index = 0;
+        int prevIndex = -2;
+        for (Operation op : ops) {
+          if (Op.DELETE_RANGE == op.getOpType()) {
+            if (index - prevIndex > 1) {
+              deleteRangeIndices.add(new ArrayList<>());
+            }
+            List<Integer> continuousIndices = 
deleteRangeIndices.get(deleteRangeIndices.size() - 1);
+            continuousIndices.add(index);
+            prevIndex = index;
           }
+          index++;
+        }
+        // This is to apply the last batch of entries after the last 
DeleteRangeOperation.
+        deleteRangeIndices.add(Collections.emptyList());
+        int startIndex = 0;
+        for (List<Integer> continuousDeleteRangeIndices : deleteRangeIndices) {
+          List<DeleteRangeOperation> deleteRangeOps = 
continuousDeleteRangeIndices.stream()
+              .map(i -> (DeleteRangeOperation)ops.get(i))
+              .collect(Collectors.toList());
+          List<Pair<Bytes, Bytes>> deleteRangeOpsRanges = 
continuousDeleteRangeIndices.stream()
+              .map(i -> (DeleteRangeOperation)ops.get(i))
+              .map(i -> Pair.of(new Bytes(i.startKey), new Bytes(i.endKey)))
+              .collect(Collectors.toList());
+          int firstOpIndex = continuousDeleteRangeIndices.isEmpty() ? 
ops.size() : continuousDeleteRangeIndices.get(0);
+
+          for (int i = startIndex; i < firstOpIndex; i++) {
+            Operation op = ops.get(i);
+            Bytes key = op.getKey();
+            // Compare the key with the startKey and endKey of the delete 
range operation. Add to Batch if key
+            // doesn't fall [startKey, endKey) range.
+            boolean keyInRange = false;
+            Pair<Bytes, Bytes> deleteRange = null;
+            for (Pair<Bytes, Bytes> deleteRangeOp : deleteRangeOpsRanges) {
+              if (key.compareTo(deleteRangeOp.getLeft()) >= 0 && 
key.compareTo(deleteRangeOp.getRight()) < 0) {
+                keyInRange = true;
+                deleteRange = deleteRangeOp;
+                break;
+              }
+            }
+            if (!keyInRange) {
+              op.apply(family, writeBatch);
+            } else {
+              Pair<Bytes, Bytes> finalDeleteRange = deleteRange;
+              debug(() -> String.format("Discarding Operation with Key: %s as 
it falls within the range of [%s, %s)",
+                  bytes2String(key.asReadOnlyByteBuffer()),
+                  
bytes2String(finalDeleteRange.getKey().asReadOnlyByteBuffer()),
+                  
bytes2String(finalDeleteRange.getRight().asReadOnlyByteBuffer())));
+              discardedCount++;
+              discardedSize += op.totalLength();
+            }
+          }
+          for (DeleteRangeOperation deleteRangeOp : deleteRangeOps) {
+            // Apply the delete range operation to the batch.
+            deleteRangeOp.apply(family, writeBatch);
+          }
+          // Update the startIndex to start from the next operation after the 
delete range operation.
+          startIndex = firstOpIndex + continuousDeleteRangeIndices.size();
         }
-
         debug(this::summary);
       }
 
@@ -186,48 +483,38 @@ void clear() {
         final boolean warn = !isCommit && batchSize > 0;
         String details = warn ? summary() : null;
 
-        for (Object value : ops.values()) {
-          if (value instanceof CodecBuffer) {
-            ((CodecBuffer) value).release(); // the key will also be released
-          }
-        }
-        ops.clear();
+        IOUtils.close(LOG, batchOps.values());
+        batchOps.clear();
 
         if (warn) {
           LOG.warn("discarding changes {}", details);
         }
       }
 
-      void putOrDelete(Bytes key, int keyLen, Object val, int valLen) {
-        Preconditions.checkState(!isCommit, "%s is already committed.", this);
-        batchSize += keyLen + valLen;
+      private void deleteIfExist(Bytes key, boolean removeFromIndexMap) {
         // remove previous first in order to call release()
-        final Object previous = ops.remove(key);
-        if (previous != null) {
-          final boolean isPut = previous != Op.DELETE;
-          final int preLen;
-          if (!isPut) {
-            preLen = 0;
-          } else if (previous instanceof CodecBuffer) {
-            final CodecBuffer previousValue = (CodecBuffer) previous;
-            preLen = previousValue.readableBytes();
-            previousValue.release(); // key will also be released
-          } else if (previous instanceof byte[]) {
-            preLen = ((byte[]) previous).length;
-          } else {
-            throw new IllegalStateException("Unexpected previous: " + previous
-                + ", class=" + previous.getClass().getSimpleName());
-          }
-          discardedSize += keyLen + preLen;
+        if (opsKeys.containsKey(key)) {
+          int previousIndex = removeFromIndexMap ? opsKeys.remove(key) : 
opsKeys.get(key);
+          final Operation previous = batchOps.remove(previousIndex);
+          previous.close();
+          discardedSize += previous.totalLength();
           discardedCount++;
-          debug(() -> String.format("%s overwriting a previous %s", this,
-              isPut ? "put (value: " + byteSize2String(preLen) + ")" : "del"));
+          debug(() -> String.format("%s overwriting a previous %s[valLen => 
%s]", this, previous.getOpType(),
+              previous.valLen()));
         }
-        final Object overwritten = ops.put(key, val);
-        Preconditions.checkState(overwritten == null);
+      }
 
+      void overWriteOpIfExist(Bytes key, Operation operation) {
+        Preconditions.checkState(!isCommit, "%s is already committed.", this);
+        deleteIfExist(key, true);
+        batchSize += operation.totalLength();
+        int newIndex = opIndex.getAndIncrement();
+        final Integer overwritten = opsKeys.put(key, newIndex);
+        batchOps.put(newIndex, operation);
+        Preconditions.checkState(overwritten == null || 
!batchOps.containsKey(overwritten));
         debug(() -> String.format("%s %s, %s; key=%s", this,
-            valLen == 0 ? delString(keyLen) : putString(keyLen, valLen),
+            Op.DELETE == operation.getOpType() ? 
delString(operation.totalLength()) : putString(operation.keyLen(),
+                operation.valLen()),
             batchSizeDiscardedString(), key));
       }
 
@@ -235,19 +522,25 @@ void put(CodecBuffer key, CodecBuffer value) {
         putCount++;
 
         // always release the key with the value
-        value.getReleaseFuture().thenAccept(v -> key.release());
-        putOrDelete(new Bytes(key), key.readableBytes(),
-            value, value.readableBytes());
+        Bytes keyBytes = new Bytes(key);
+        overWriteOpIfExist(keyBytes, new CodecBufferPutOperation(key, value, 
keyBytes));
       }
 
       void put(byte[] key, byte[] value) {
         putCount++;
-        putOrDelete(new Bytes(key), key.length, value, value.length);
+        Bytes keyBytes = new Bytes(key);
+        overWriteOpIfExist(keyBytes, new ByteArrayPutOperation(key, value, 
keyBytes));
       }
 
       void delete(byte[] key) {
         delCount++;
-        putOrDelete(new Bytes(key), key.length, Op.DELETE, 0);
+        Bytes keyBytes = new Bytes(key);
+        overWriteOpIfExist(keyBytes, new DeleteOperation(key, keyBytes));
+      }
+
+      void deleteRange(byte[] startKey, byte[] endKey) {
+        delRangeCount++;
+        batchOps.put(opIndex.getAndIncrement(), new 
DeleteRangeOperation(startKey, endKey));
       }
 
       String putString(int keySize, int valueSize) {
@@ -287,6 +580,11 @@ void delete(ColumnFamily family, byte[] key) {
           .delete(key);
     }
 
+    void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) {
+      name2cache.computeIfAbsent(family.getName(), k -> new 
FamilyCache(family))
+          .deleteRange(startKey, endKey);
+    }
+
     /** Prepare batch write for the entire cache. */
     UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException {
       for (Map.Entry<String, FamilyCache> e : name2cache.entrySet()) {
@@ -308,6 +606,7 @@ String getCommitString() {
       int opSize = 0;
       int discardedCount = 0;
       int discardedSize = 0;
+      int delRangeCount = 0;
 
       for (FamilyCache f : name2cache.values()) {
         putCount += f.putCount;
@@ -315,12 +614,13 @@ String getCommitString() {
         opSize += f.batchSize;
         discardedCount += f.discardedCount;
         discardedSize += f.discardedSize;
+        delRangeCount += f.delRangeCount;
       }
 
       final int opCount = putCount + delCount;
       return String.format(
-          "#put=%s, #del=%s, batchSize: %s, discarded: %s, committed: %s",
-          putCount, delCount,
+          "#put=%s, #del=%s, #delRange=%s, batchSize: %s, discarded: %s, 
committed: %s",
+          putCount, delCount, delRangeCount,
           countSize2String(opCount, opSize),
           countSize2String(discardedCount, discardedSize),
           countSize2String(opCount - discardedCount, opSize - discardedSize));
@@ -374,4 +674,8 @@ public void put(ColumnFamily family, CodecBuffer key, 
CodecBuffer value) {
   public void put(ColumnFamily family, byte[] key, byte[] value) {
     opCache.put(family, key, value);
   }
+
+  public void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) 
{
+    opCache.deleteRange(family, startKey, endKey);
+  }
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index 4ad625ed511..3e784bec10f 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -203,6 +203,15 @@ public void deleteWithBatch(BatchOperation batch, byte[] 
key) {
 
   }
 
+  @Override
+  public void deleteRangeWithBatch(BatchOperation batch, byte[] beginKey, 
byte[] endKey) {
+    if (batch instanceof RDBBatchOperation) {
+      ((RDBBatchOperation) batch).deleteRange(family, beginKey, endKey);
+    } else {
+      throw new IllegalArgumentException("batch should be RDBBatchOperation");
+    }
+  }
+
   @Override
   public KeyValueIterator<byte[], byte[]> iterator(byte[] prefix, 
KeyValueIterator.Type type)
       throws RocksDatabaseException {
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index cf0c84f375e..24f1971dccb 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -302,6 +302,16 @@ public void batchDelete(ManagedWriteBatch writeBatch, 
byte[] key)
       }
     }
 
+    public void batchDeleteRange(ManagedWriteBatch writeBatch, byte[] 
beginKey, byte[] endKey)
+        throws RocksDatabaseException {
+      try (UncheckedAutoCloseable ignored = acquire()) {
+        writeBatch.deleteRange(getHandle(), beginKey, endKey);
+      } catch (RocksDBException e) {
+        throw toRocksDatabaseException(this, "batchDeleteRange key " + 
bytes2String(beginKey) + " - " +
+            bytes2String(endKey), e);
+      }
+    }
+
     public void batchPut(ManagedWriteBatch writeBatch, byte[] key, byte[] 
value)
         throws RocksDatabaseException {
       if (LOG.isDebugEnabled()) {
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
index 98ae6ff621b..7f5d74ad4ee 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
@@ -134,6 +134,14 @@ default VALUE getReadCopy(KEY key) throws 
RocksDatabaseException, CodecException
    */
   void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException;
 
+  /**
+   * Deletes a range of keys from the metadata store as part of a batch 
operation.
+   * @param batch Batch operation to perform the delete operation.
+   * @param beginKey start metadata key, inclusive.
+   * @param endKey end metadata key, exclusive.
+   */
+  void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) 
throws CodecException;
+
   /**
    * Deletes a range of keys from the metadata store.
    *
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index 978e7168c20..cd02c91ecb3 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -380,6 +380,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key) 
throws CodecException
     rawTable.deleteWithBatch(batch, encodeKey(key));
   }
 
+  @Override
+  public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY 
endKey) throws CodecException {
+    rawTable.deleteRangeWithBatch(batch, encodeKey(beginKey), 
encodeKey(endKey));
+  }
+
   @Override
   public void deleteRange(KEY beginKey, KEY endKey) throws 
RocksDatabaseException, CodecException {
     rawTable.deleteRange(encodeKey(beginKey), encodeKey(endKey));
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
index f234364ade4..51baeb45177 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
@@ -79,6 +79,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key) {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY 
endKey) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public void deleteRange(KEY beginKey, KEY endKey) {
     throw new UnsupportedOperationException();
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java
new file mode 100644
index 00000000000..bbf53b9a960
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import static java.util.Arrays.asList;
+import static org.apache.hadoop.hdds.StringUtils.bytes2String;
+import static org.apache.hadoop.hdds.StringUtils.string2Bytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
+import org.mockito.Mockito;
+import org.rocksdb.ColumnFamilyHandle;
+
+/**
+ * Test class for verifying batch operations with delete ranges using the
+ * RDBBatchOperation and MockedConstruction of ManagedWriteBatch.
+ *
+ * This test class includes:
+ * - Mocking and tracking of operations including put, delete, and delete range
+ *   within a batch operation.
+ * - Validation of committed operations using assertions on collected data.
+ * - Ensures that the batch operation interacts correctly with the
+ *   RocksDatabase and ColumnFamilyHandle components.
+ *
+ * The test method includes:
+ * 1. Setup of mocked ColumnFamilyHandle and RocksDatabase.ColumnFamily.
+ * 2. Mocking of methods to track operations performed on*/
+public class TestRDBBatchOperation {
+  @Test
+  public void testBatchOperationWithDeleteRange() throws 
RocksDatabaseException {
+    final List<Pair<Pair<String, String>, Integer>> deleteKeyRangePairs = new 
ArrayList<>();
+    final List<Pair<Pair<String, String>, Integer>> putKeys = new 
ArrayList<>();
+    final List<Pair<String, Integer>> deleteKeys = new ArrayList<>();
+    AtomicInteger cnt = new AtomicInteger(0);
+    try (MockedConstruction<ManagedWriteBatch> mockedConstruction = 
Mockito.mockConstruction(ManagedWriteBatch.class,
+        (writeBatch, context) -> {
+          doAnswer(i -> {
+            deleteKeyRangePairs.add(Pair.of(Pair.of(bytes2String((byte[]) 
i.getArgument(1)),
+                bytes2String((byte[]) i.getArgument(2))), 
cnt.getAndIncrement()));
+            return null;
+          
}).when(writeBatch).deleteRange(Mockito.any(ColumnFamilyHandle.class), 
Mockito.any(byte[].class),
+              Mockito.any(byte[].class));
+          doAnswer(i -> {
+            putKeys.add(Pair.of(Pair.of(bytes2String((byte[]) 
i.getArgument(1)),
+                    bytes2String((byte[]) i.getArgument(2))),
+                cnt.getAndIncrement()));
+            return null;
+          }).when(writeBatch)
+              .put(Mockito.any(ColumnFamilyHandle.class), 
Mockito.any(byte[].class), Mockito.any(byte[].class));
+          doAnswer(i -> {
+            deleteKeys.add(Pair.of(bytes2String((byte[]) i.getArgument(1)), 
cnt.getAndIncrement()));
+            return null;
+          }).when(writeBatch).delete(Mockito.any(ColumnFamilyHandle.class), 
Mockito.any(byte[].class));
+
+        });
+         RDBBatchOperation batchOperation = new RDBBatchOperation()) {
+      ColumnFamilyHandle columnFamilyHandle = 
Mockito.mock(ColumnFamilyHandle.class);
+      RocksDatabase.ColumnFamily columnFamily = 
Mockito.mock(RocksDatabase.ColumnFamily.class);
+      doAnswer((i) -> {
+        ((ManagedWriteBatch)i.getArgument(0))
+            .put(columnFamilyHandle, (byte[]) i.getArgument(1), (byte[]) 
i.getArgument(2));
+        return null;
+      }).when(columnFamily).batchPut(any(ManagedWriteBatch.class), 
any(byte[].class), any(byte[].class));
+
+      doAnswer((i) -> {
+        ((ManagedWriteBatch)i.getArgument(0))
+            .deleteRange(columnFamilyHandle, (byte[]) i.getArgument(1), 
(byte[]) i.getArgument(2));
+        return null;
+      }).when(columnFamily).batchDeleteRange(any(ManagedWriteBatch.class), 
any(byte[].class), any(byte[].class));
+
+      doAnswer((i) -> {
+        ((ManagedWriteBatch)i.getArgument(0))
+            .delete(columnFamilyHandle, (byte[]) i.getArgument(1));
+        return null;
+      }).when(columnFamily).batchDelete(any(ManagedWriteBatch.class), 
any(byte[].class));
+
+      when(columnFamily.getHandle()).thenReturn(columnFamilyHandle);
+      when(columnFamily.getName()).thenReturn("test");
+      batchOperation.put(columnFamily, string2Bytes("key01"), 
string2Bytes("value01"));
+      batchOperation.put(columnFamily, string2Bytes("key02"), 
string2Bytes("value02"));
+      batchOperation.put(columnFamily, string2Bytes("key03"), 
string2Bytes("value03"));
+      batchOperation.put(columnFamily, string2Bytes("key03"), 
string2Bytes("value04"));
+      batchOperation.delete(columnFamily, string2Bytes("key05"));
+      batchOperation.deleteRange(columnFamily, string2Bytes("key01"), 
string2Bytes("key02"));
+      batchOperation.deleteRange(columnFamily, string2Bytes("key02"), 
string2Bytes("key03"));
+      batchOperation.put(columnFamily, string2Bytes("key04"), 
string2Bytes("value04"));
+      batchOperation.put(columnFamily, string2Bytes("key06"), 
string2Bytes("value05"));
+      batchOperation.deleteRange(columnFamily, string2Bytes("key06"), 
string2Bytes("key12"));
+      batchOperation.deleteRange(columnFamily, string2Bytes("key09"), 
string2Bytes("key10"));
+      RocksDatabase db = Mockito.mock(RocksDatabase.class);
+      doNothing().when(db).batchWrite(any());
+      batchOperation.commit(db);
+      assertEquals(deleteKeys, Collections.singletonList(Pair.of("key05", 1)));
+      assertEquals(deleteKeyRangePairs, asList(Pair.of(Pair.of("key01", 
"key02"), 2),
+          Pair.of(Pair.of("key02", "key03"), 3),
+          Pair.of(Pair.of("key06", "key12"), 5),
+          Pair.of(Pair.of("key09", "key10"), 6)));
+      assertEquals(putKeys, Arrays.asList(Pair.of(Pair.of("key03", "value04"), 
0),
+          Pair.of(Pair.of("key04", "value04"), 4)));
+    }
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
index 008878de1d3..bd8a00becf0 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.hdds.utils.db;
 
+import static org.apache.hadoop.hdds.StringUtils.bytes2String;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -66,7 +67,7 @@ public class TestRDBTableStore {
   public static final int MAX_DB_UPDATES_SIZE_THRESHOLD = 80;
   private static int count = 0;
   private final List<String> families =
-      Arrays.asList(StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
+      Arrays.asList(bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
           "First", "Second", "Third",
           "Fourth", "Fifth",
           "Sixth", "Seventh",
@@ -635,21 +636,21 @@ public void testPrefixedRangeKVs() throws Exception {
 
     // test start with a middle key
     startKey = StringUtils.string2Bytes(
-        StringUtils.bytes2String(samplePrefix) + "3");
+        bytes2String(samplePrefix) + "3");
     rangeKVs = testTable.getRangeKVs(startKey, blockCount, samplePrefix);
     assertEquals(2, rangeKVs.size());
 
     // test with a filter
-    final KeyPrefixFilter filter1 = 
KeyPrefixFilter.newFilter(StringUtils.bytes2String(samplePrefix) + "1");
+    final KeyPrefixFilter filter1 = 
KeyPrefixFilter.newFilter(bytes2String(samplePrefix) + "1");
     startKey = StringUtils.string2Bytes(
-        StringUtils.bytes2String(samplePrefix));
+        bytes2String(samplePrefix));
     rangeKVs = testTable.getRangeKVs(startKey, blockCount,
         samplePrefix, filter1);
     assertEquals(1, rangeKVs.size());
 
     // test start with a non-exist key
     startKey = StringUtils.string2Bytes(
-        StringUtils.bytes2String(samplePrefix) + 123);
+        bytes2String(samplePrefix) + 123);
     rangeKVs = testTable.getRangeKVs(startKey, 10, samplePrefix);
     assertEquals(0, rangeKVs.size());
   }
@@ -775,4 +776,77 @@ private void populateTable(Table<String, String> table,
       }
     }
   }
+
+  @Test
+  public void batchDeleteWithRange() throws Exception {
+    final Table<byte[], byte[]> testTable = rdbStore.getTable("Fifth");
+    try (BatchOperation batch = rdbStore.initBatchOperation()) {
+
+      //given
+      String keyStr = RandomStringUtils.secure().next(10);
+      byte[] startKey = ("1-" + keyStr).getBytes(StandardCharsets.UTF_8);
+      byte[] keyInRange1 = ("2-" + keyStr).getBytes(StandardCharsets.UTF_8);
+      byte[] keyInRange2 = ("3-" + keyStr).getBytes(StandardCharsets.UTF_8);
+      byte[] endKey = ("4-" + keyStr).getBytes(StandardCharsets.UTF_8);
+      byte[] value =
+          RandomStringUtils.secure().next(10).getBytes(StandardCharsets.UTF_8);
+      testTable.put(startKey, value);
+      testTable.put(keyInRange1, value);
+      testTable.put(keyInRange2, value);
+      testTable.put(endKey, value);
+      assertNotNull(testTable.get(startKey));
+      assertNotNull(testTable.get(keyInRange1));
+      assertNotNull(testTable.get(keyInRange2));
+      assertNotNull(testTable.get(endKey));
+
+      //when
+      testTable.deleteRangeWithBatch(batch, startKey, endKey);
+      rdbStore.commitBatchOperation(batch);
+
+      //then
+      assertNull(testTable.get(startKey));
+      assertNull(testTable.get(keyInRange1));
+      assertNull(testTable.get(keyInRange2));
+      assertNotNull(testTable.get(endKey));
+    }
+  }
+
+  @Test
+  public void orderOfBatchOperations() throws Exception {
+    final Table<byte[], byte[]> testTable = rdbStore.getTable("Fifth");
+    try (BatchOperation batch = rdbStore.initBatchOperation()) {
+
+      //given
+      String keyStr = RandomStringUtils.secure().next(10);
+      byte[] startKey = ("1-" + keyStr).getBytes(StandardCharsets.UTF_8);
+      byte[] keyInRange1 = ("2-" + keyStr).getBytes(StandardCharsets.UTF_8);
+      byte[] endKey = ("3-" + keyStr).getBytes(StandardCharsets.UTF_8);
+      byte[] value1 = ("value1-" + 
RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8);
+      byte[] value2 = ("value2-" + 
RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8);
+      byte[] value3 = ("value3-" + 
RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8);
+
+      //when
+      testTable.putWithBatch(batch, startKey, value1);
+      testTable.putWithBatch(batch, keyInRange1, value1);
+      testTable.deleteWithBatch(batch, keyInRange1);
+      // ops map key should be <<startKey, endKey>, 1>
+      testTable.deleteRangeWithBatch(batch, startKey, endKey);
+      testTable.putWithBatch(batch, startKey, value2);
+      testTable.putWithBatch(batch, keyInRange1, value2);
+      // ops map key is <<startKey, keyInRange1>, 2>.
+      testTable.deleteRangeWithBatch(batch, startKey, keyInRange1);
+      testTable.putWithBatch(batch, endKey, value1);
+      testTable.putWithBatch(batch, endKey, value2);
+      // ops map key is <<startKey, endKey>, 3>.
+      testTable.deleteRangeWithBatch(batch, startKey, endKey);
+      testTable.putWithBatch(batch, startKey, value3);
+
+      rdbStore.commitBatchOperation(batch);
+
+      //then
+      assertEquals(bytes2String(value3), 
bytes2String(testTable.get(startKey)));
+      assertNull(testTable.get(keyInRange1));
+      assertEquals(bytes2String(value2), bytes2String(testTable.get(endKey)));
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to