This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new ae92a189d37 HDDS-13415. Support DeleteRange operation as part of the
rocksdb batch write (#8774)
ae92a189d37 is described below
commit ae92a189d37b98071b78b075163757c7b4b79aec
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Sun Aug 24 14:56:00 2025 -0400
HDDS-13415. Support DeleteRange operation as part of the rocksdb batch
write (#8774)
---
.../ozone/container/metadata/DatanodeTable.java | 5 +
.../hadoop/hdds/utils/db/RDBBatchOperation.java | 434 ++++++++++++++++++---
.../org/apache/hadoop/hdds/utils/db/RDBTable.java | 9 +
.../apache/hadoop/hdds/utils/db/RocksDatabase.java | 10 +
.../org/apache/hadoop/hdds/utils/db/Table.java | 8 +
.../apache/hadoop/hdds/utils/db/TypedTable.java | 5 +
.../hadoop/hdds/utils/db/InMemoryTestTable.java | 5 +
.../hdds/utils/db/TestRDBBatchOperation.java | 129 ++++++
.../hadoop/hdds/utils/db/TestRDBTableStore.java | 84 +++-
9 files changed, 619 insertions(+), 70 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
index 2621b1f7d85..8cbaec82f43 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
@@ -72,6 +72,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key)
throws CodecException
table.deleteWithBatch(batch, key);
}
+ @Override
+ public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY
endKey) throws CodecException {
+ table.deleteRangeWithBatch(batch, beginKey, endKey);
+ }
+
@Override
public final KeyValueIterator<KEY, VALUE> iterator(KEY prefix,
KeyValueIterator.Type type) {
throw new UnsupportedOperationException("Iterating tables directly is not"
+
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
index 8b9fa7295c7..f7b025ed98f 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
@@ -20,12 +20,22 @@
import static org.apache.hadoop.hdds.StringUtils.bytes2String;
import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedBytes;
+import java.io.Closeable;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch;
import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions;
@@ -50,7 +60,7 @@ public class RDBBatchOperation implements BatchOperation {
private final OpCache opCache = new OpCache();
- private enum Op { DELETE }
+ private enum Op { DELETE, PUT, DELETE_RANGE }
private static void debug(Supplier<String> message) {
if (LOG.isTraceEnabled()) {
@@ -67,11 +77,11 @@ private static String countSize2String(int count, long
size) {
}
/**
- * The key type of {@link RDBBatchOperation.OpCache.FamilyCache#ops}.
+ * The key type of {@link RDBBatchOperation.OpCache.FamilyCache#opsKeys}.
* To implement {@link #equals(Object)} and {@link #hashCode()}
* based on the contents of the bytes.
*/
- static final class Bytes {
+ static final class Bytes implements Comparable<Bytes> {
private final byte[] array;
private final CodecBuffer buffer;
/** Cache the hash value. */
@@ -89,10 +99,6 @@ static final class Bytes {
this.hash = ByteBuffer.wrap(array).hashCode();
}
- byte[] array() {
- return array;
- }
-
ByteBuffer asReadOnlyByteBuffer() {
return buffer.asReadOnlyByteBuffer();
}
@@ -125,6 +131,195 @@ public String toString() {
return array != null ? bytes2String(array)
: bytes2String(asReadOnlyByteBuffer());
}
+
+ // This method mimics the ByteWiseComparator in RocksDB.
+ @Override
+ public int compareTo(RDBBatchOperation.Bytes that) {
+ final ByteBuffer thisBuf = this.array != null ?
+ ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer();
+ final ByteBuffer thatBuf = that.array != null ?
+ ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer();
+
+ for (int i = 0; i < Math.min(thisBuf.remaining(), thatBuf.remaining());
i++) {
+ int cmp = UnsignedBytes.compare(thisBuf.get(i), thatBuf.get(i));
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+ return thisBuf.remaining() - thatBuf.remaining();
+ }
+ }
+
+ private abstract class Operation implements Closeable {
+ private Bytes keyBytes;
+
+ private Operation(Bytes keyBytes) {
+ this.keyBytes = keyBytes;
+ }
+
+ abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws
RocksDatabaseException;
+
+ abstract int keyLen();
+
+ abstract int valLen();
+
+ Bytes getKey() {
+ return keyBytes;
+ }
+
+ int totalLength() {
+ return keyLen() + valLen();
+ }
+
+ abstract Op getOpType();
+
+ @Override
+ public void close() {
+ }
+ }
+
+ /**
+ * Delete operation to be applied to a {@link ColumnFamily} batch.
+ */
+ private final class DeleteOperation extends Operation {
+ private final byte[] key;
+
+ private DeleteOperation(byte[] key, Bytes keyBytes) {
+ super(Objects.requireNonNull(keyBytes, "keyBytes == null"));
+ this.key = Objects.requireNonNull(key, "key == null");
+ }
+
+ @Override
+ public void apply(ColumnFamily family, ManagedWriteBatch batch) throws
RocksDatabaseException {
+ family.batchDelete(batch, this.key);
+ }
+
+ @Override
+ public int keyLen() {
+ return key.length;
+ }
+
+ @Override
+ public int valLen() {
+ return 0;
+ }
+
+ @Override
+ public Op getOpType() {
+ return Op.DELETE;
+ }
+ }
+
+ /**
+ * Put operation to be applied to a {@link ColumnFamily} batch using the
CodecBuffer api.
+ */
+ private final class CodecBufferPutOperation extends Operation {
+ private final CodecBuffer key;
+ private final CodecBuffer value;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private CodecBufferPutOperation(CodecBuffer key, CodecBuffer value, Bytes
keyBytes) {
+ super(keyBytes);
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public void apply(ColumnFamily family, ManagedWriteBatch batch) throws
RocksDatabaseException {
+ family.batchPut(batch, key.asReadOnlyByteBuffer(),
value.asReadOnlyByteBuffer());
+ }
+
+ @Override
+ public int keyLen() {
+ return key.readableBytes();
+ }
+
+ @Override
+ public int valLen() {
+ return value.readableBytes();
+ }
+
+ @Override
+ public Op getOpType() {
+ return Op.PUT;
+ }
+
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ key.release();
+ value.release();
+ }
+ super.close();
+ }
+ }
+
+ /**
+ * Put operation to be applied to a {@link ColumnFamily} batch using the
byte array api.
+ */
+ private final class ByteArrayPutOperation extends Operation {
+ private final byte[] key;
+ private final byte[] value;
+
+ private ByteArrayPutOperation(byte[] key, byte[] value, Bytes keyBytes) {
+ super(Objects.requireNonNull(keyBytes));
+ this.key = Objects.requireNonNull(key, "key == null");
+ this.value = Objects.requireNonNull(value, "value == null");
+ }
+
+ @Override
+ public void apply(ColumnFamily family, ManagedWriteBatch batch) throws
RocksDatabaseException {
+ family.batchPut(batch, key, value);
+ }
+
+ @Override
+ public int keyLen() {
+ return key.length;
+ }
+
+ @Override
+ public int valLen() {
+ return value.length;
+ }
+
+ @Override
+ public Op getOpType() {
+ return Op.PUT;
+ }
+ }
+
+ /**
+ * Delete range operation to be applied to a {@link ColumnFamily} batch.
+ */
+ private final class DeleteRangeOperation extends Operation {
+ private final byte[] startKey;
+ private final byte[] endKey;
+
+ private DeleteRangeOperation(byte[] startKey, byte[] endKey) {
+ super(null);
+ this.startKey = Objects.requireNonNull(startKey, "startKey == null");
+ this.endKey = Objects.requireNonNull(endKey, "endKey == null");
+ }
+
+ @Override
+ public void apply(ColumnFamily family, ManagedWriteBatch batch) throws
RocksDatabaseException {
+ family.batchDeleteRange(batch, startKey, endKey);
+ }
+
+ @Override
+ public int keyLen() {
+ return startKey.length + endKey.length;
+ }
+
+ @Override
+ public int valLen() {
+ return 0;
+ }
+
+ @Override
+ public Op getOpType() {
+ return Op.DELETE_RANGE;
+ }
}
/** Cache and deduplicate db ops (put/delete). */
@@ -136,12 +331,40 @@ private class OpCache {
private class FamilyCache {
private final ColumnFamily family;
/**
- * A (dbKey -> dbValue) map, where the dbKey type is {@link Bytes}
- * and the dbValue type is {@link Object}.
- * When dbValue is a byte[]/{@link ByteBuffer}, it represents a put-op.
- * Otherwise, it represents a delete-op (dbValue is {@link Op#DELETE}).
+ * A mapping of operation keys to their respective indices in {@code
FamilyCache}.
+ *
+ * Key details:
+ * - Maintains a mapping of unique operation keys to their insertion or
processing order.
+ * - Used internally to manage and sort operations during batch writes.
+ * - Facilitates filtering, overwriting, or deletion of operations based
on their keys.
+ *
+ * Constraints:
+ * - Keys must be unique, represented using {@link Bytes}, to avoid
collisions.
+ * - Each key is associated with a unique integer index to track
insertion order.
+ *
+ * This field plays a critical role in managing the logical consistency
and proper execution
+ * order of operations stored in the batch when interacting with a
RocksDB-backed system.
+ */
+ private final Map<Bytes, Integer> opsKeys = new HashMap<>();
+ /**
+ * Maintains a mapping of unique operation indices to their
corresponding {@code Operation} instances.
+ *
+ * This map serves as the primary container for recording operations in
preparation for a batch write
+ * within a RocksDB-backed system. Each operation is referenced by an
integer index, which determines
+ * its insertion order and ensures correct sequencing during batch
execution.
+ *
+ * Key characteristics:
+ * - Stores operations of type {@code Operation}.
+ * - Uses a unique integer key (index) for mapping each operation.
+ * - Serves as an intermediary structure during batch preparation and
execution.
+ *
+ * Usage context:
+ * - This map is managed as part of the batch-writing process, which
involves organizing,
+ * filtering, and applying multiple operations in a single cohesive
batch.
+ * - Operations stored in this map are expected to define specific
actions (e.g., put, delete,
+ * delete range) and their associated data (e.g., keys, values).
*/
- private final Map<Bytes, Object> ops = new HashMap<>();
+ private final Map<Integer, Operation> batchOps = new HashMap<>();
private boolean isCommit;
private long batchSize;
@@ -149,31 +372,105 @@ private class FamilyCache {
private int discardedCount;
private int putCount;
private int delCount;
+ private int delRangeCount;
+ private AtomicInteger opIndex;
FamilyCache(ColumnFamily family) {
this.family = family;
+ this.opIndex = new AtomicInteger(0);
}
- /** Prepare batch write for the entire family. */
+ /**
+ * Prepares a batch write operation for a RocksDB-backed system.
+ *
+ * This method ensures the orderly execution of operations accumulated
in the batch,
+ * respecting their respective types and order of insertion.
+ *
+ * Key functionalities:
+ * 1. Ensures that the batch is not already committed before proceeding.
+ * 2. Sorts all operations by their `opIndex` to maintain a consistent
execution order.
+ * 3. Filters and adapts operations to account for any delete range
operations that might
+ * affect other operations in the batch:
+ * - Operations with keys that fall within the range specified by a
delete range operation
+ * are discarded.
+ * - Delete range operations are executed in their correct order.
+ * 4. Applies remaining operations to the write batch, ensuring proper
filtering and execution.
+ * 5. Logs a summary of the batch execution for debugging purposes.
+ *
+ * Throws:
+ * - RocksDatabaseException if any error occurs while applying
operations to the write batch.
+ *
+ * Prerequisites:
+ * - The method assumes that the operations are represented by
`Operation` objects, each of which
+ * encapsulates the logic for its specific type.
+ * - Delete range operations must be represented by the
`DeleteRangeOperation` class.
+ */
void prepareBatchWrite() throws RocksDatabaseException {
Preconditions.checkState(!isCommit, "%s is already committed.", this);
isCommit = true;
- for (Map.Entry<Bytes, Object> op : ops.entrySet()) {
- final Bytes key = op.getKey();
- final Object value = op.getValue();
- if (value instanceof byte[]) {
- family.batchPut(writeBatch, key.array(), (byte[]) value);
- } else if (value instanceof CodecBuffer) {
- family.batchPut(writeBatch, key.asReadOnlyByteBuffer(),
- ((CodecBuffer) value).asReadOnlyByteBuffer());
- } else if (value == Op.DELETE) {
- family.batchDelete(writeBatch, key.array());
- } else {
- throw new IllegalStateException("Unexpected value: " + value
- + ", class=" + value.getClass().getSimpleName());
+ // Sort Entries based on opIndex and flush the operation to the batch
in the same order.
+ List<Operation> ops =
batchOps.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey))
+ .map(Map.Entry::getValue).collect(Collectors.toList());
+ List<List<Integer>> deleteRangeIndices = new ArrayList<>();
+ int index = 0;
+ int prevIndex = -2;
+ for (Operation op : ops) {
+ if (Op.DELETE_RANGE == op.getOpType()) {
+ if (index - prevIndex > 1) {
+ deleteRangeIndices.add(new ArrayList<>());
+ }
+ List<Integer> continuousIndices =
deleteRangeIndices.get(deleteRangeIndices.size() - 1);
+ continuousIndices.add(index);
+ prevIndex = index;
}
+ index++;
+ }
+ // This is to apply the last batch of entries after the last
DeleteRangeOperation.
+ deleteRangeIndices.add(Collections.emptyList());
+ int startIndex = 0;
+ for (List<Integer> continuousDeleteRangeIndices : deleteRangeIndices) {
+ List<DeleteRangeOperation> deleteRangeOps =
continuousDeleteRangeIndices.stream()
+ .map(i -> (DeleteRangeOperation)ops.get(i))
+ .collect(Collectors.toList());
+ List<Pair<Bytes, Bytes>> deleteRangeOpsRanges =
continuousDeleteRangeIndices.stream()
+ .map(i -> (DeleteRangeOperation)ops.get(i))
+ .map(i -> Pair.of(new Bytes(i.startKey), new Bytes(i.endKey)))
+ .collect(Collectors.toList());
+ int firstOpIndex = continuousDeleteRangeIndices.isEmpty() ?
ops.size() : continuousDeleteRangeIndices.get(0);
+
+ for (int i = startIndex; i < firstOpIndex; i++) {
+ Operation op = ops.get(i);
+ Bytes key = op.getKey();
+ // Compare the key with the startKey and endKey of the delete
range operation. Add to Batch if key
+ // doesn't fall [startKey, endKey) range.
+ boolean keyInRange = false;
+ Pair<Bytes, Bytes> deleteRange = null;
+ for (Pair<Bytes, Bytes> deleteRangeOp : deleteRangeOpsRanges) {
+ if (key.compareTo(deleteRangeOp.getLeft()) >= 0 &&
key.compareTo(deleteRangeOp.getRight()) < 0) {
+ keyInRange = true;
+ deleteRange = deleteRangeOp;
+ break;
+ }
+ }
+ if (!keyInRange) {
+ op.apply(family, writeBatch);
+ } else {
+ Pair<Bytes, Bytes> finalDeleteRange = deleteRange;
+ debug(() -> String.format("Discarding Operation with Key: %s as
it falls within the range of [%s, %s)",
+ bytes2String(key.asReadOnlyByteBuffer()),
+
bytes2String(finalDeleteRange.getKey().asReadOnlyByteBuffer()),
+
bytes2String(finalDeleteRange.getRight().asReadOnlyByteBuffer())));
+ discardedCount++;
+ discardedSize += op.totalLength();
+ }
+ }
+ for (DeleteRangeOperation deleteRangeOp : deleteRangeOps) {
+ // Apply the delete range operation to the batch.
+ deleteRangeOp.apply(family, writeBatch);
+ }
+ // Update the startIndex to start from the next operation after the
delete range operation.
+ startIndex = firstOpIndex + continuousDeleteRangeIndices.size();
}
-
debug(this::summary);
}
@@ -186,48 +483,38 @@ void clear() {
final boolean warn = !isCommit && batchSize > 0;
String details = warn ? summary() : null;
- for (Object value : ops.values()) {
- if (value instanceof CodecBuffer) {
- ((CodecBuffer) value).release(); // the key will also be released
- }
- }
- ops.clear();
+ IOUtils.close(LOG, batchOps.values());
+ batchOps.clear();
if (warn) {
LOG.warn("discarding changes {}", details);
}
}
- void putOrDelete(Bytes key, int keyLen, Object val, int valLen) {
- Preconditions.checkState(!isCommit, "%s is already committed.", this);
- batchSize += keyLen + valLen;
+ private void deleteIfExist(Bytes key, boolean removeFromIndexMap) {
// remove previous first in order to call release()
- final Object previous = ops.remove(key);
- if (previous != null) {
- final boolean isPut = previous != Op.DELETE;
- final int preLen;
- if (!isPut) {
- preLen = 0;
- } else if (previous instanceof CodecBuffer) {
- final CodecBuffer previousValue = (CodecBuffer) previous;
- preLen = previousValue.readableBytes();
- previousValue.release(); // key will also be released
- } else if (previous instanceof byte[]) {
- preLen = ((byte[]) previous).length;
- } else {
- throw new IllegalStateException("Unexpected previous: " + previous
- + ", class=" + previous.getClass().getSimpleName());
- }
- discardedSize += keyLen + preLen;
+ if (opsKeys.containsKey(key)) {
+ int previousIndex = removeFromIndexMap ? opsKeys.remove(key) :
opsKeys.get(key);
+ final Operation previous = batchOps.remove(previousIndex);
+ previous.close();
+ discardedSize += previous.totalLength();
discardedCount++;
- debug(() -> String.format("%s overwriting a previous %s", this,
- isPut ? "put (value: " + byteSize2String(preLen) + ")" : "del"));
+ debug(() -> String.format("%s overwriting a previous %s[valLen =>
%s]", this, previous.getOpType(),
+ previous.valLen()));
}
- final Object overwritten = ops.put(key, val);
- Preconditions.checkState(overwritten == null);
+ }
+ void overWriteOpIfExist(Bytes key, Operation operation) {
+ Preconditions.checkState(!isCommit, "%s is already committed.", this);
+ deleteIfExist(key, true);
+ batchSize += operation.totalLength();
+ int newIndex = opIndex.getAndIncrement();
+ final Integer overwritten = opsKeys.put(key, newIndex);
+ batchOps.put(newIndex, operation);
+ Preconditions.checkState(overwritten == null ||
!batchOps.containsKey(overwritten));
debug(() -> String.format("%s %s, %s; key=%s", this,
- valLen == 0 ? delString(keyLen) : putString(keyLen, valLen),
+ Op.DELETE == operation.getOpType() ?
delString(operation.totalLength()) : putString(operation.keyLen(),
+ operation.valLen()),
batchSizeDiscardedString(), key));
}
@@ -235,19 +522,25 @@ void put(CodecBuffer key, CodecBuffer value) {
putCount++;
// always release the key with the value
- value.getReleaseFuture().thenAccept(v -> key.release());
- putOrDelete(new Bytes(key), key.readableBytes(),
- value, value.readableBytes());
+ Bytes keyBytes = new Bytes(key);
+ overWriteOpIfExist(keyBytes, new CodecBufferPutOperation(key, value,
keyBytes));
}
void put(byte[] key, byte[] value) {
putCount++;
- putOrDelete(new Bytes(key), key.length, value, value.length);
+ Bytes keyBytes = new Bytes(key);
+ overWriteOpIfExist(keyBytes, new ByteArrayPutOperation(key, value,
keyBytes));
}
void delete(byte[] key) {
delCount++;
- putOrDelete(new Bytes(key), key.length, Op.DELETE, 0);
+ Bytes keyBytes = new Bytes(key);
+ overWriteOpIfExist(keyBytes, new DeleteOperation(key, keyBytes));
+ }
+
+ void deleteRange(byte[] startKey, byte[] endKey) {
+ delRangeCount++;
+ batchOps.put(opIndex.getAndIncrement(), new
DeleteRangeOperation(startKey, endKey));
}
String putString(int keySize, int valueSize) {
@@ -287,6 +580,11 @@ void delete(ColumnFamily family, byte[] key) {
.delete(key);
}
+ void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) {
+ name2cache.computeIfAbsent(family.getName(), k -> new
FamilyCache(family))
+ .deleteRange(startKey, endKey);
+ }
+
/** Prepare batch write for the entire cache. */
UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException {
for (Map.Entry<String, FamilyCache> e : name2cache.entrySet()) {
@@ -308,6 +606,7 @@ String getCommitString() {
int opSize = 0;
int discardedCount = 0;
int discardedSize = 0;
+ int delRangeCount = 0;
for (FamilyCache f : name2cache.values()) {
putCount += f.putCount;
@@ -315,12 +614,13 @@ String getCommitString() {
opSize += f.batchSize;
discardedCount += f.discardedCount;
discardedSize += f.discardedSize;
+ delRangeCount += f.delRangeCount;
}
final int opCount = putCount + delCount;
return String.format(
- "#put=%s, #del=%s, batchSize: %s, discarded: %s, committed: %s",
- putCount, delCount,
+ "#put=%s, #del=%s, #delRange=%s, batchSize: %s, discarded: %s,
committed: %s",
+ putCount, delCount, delRangeCount,
countSize2String(opCount, opSize),
countSize2String(discardedCount, discardedSize),
countSize2String(opCount - discardedCount, opSize - discardedSize));
@@ -374,4 +674,8 @@ public void put(ColumnFamily family, CodecBuffer key,
CodecBuffer value) {
public void put(ColumnFamily family, byte[] key, byte[] value) {
opCache.put(family, key, value);
}
+
+ public void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey)
{
+ opCache.deleteRange(family, startKey, endKey);
+ }
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index 4ad625ed511..3e784bec10f 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -203,6 +203,15 @@ public void deleteWithBatch(BatchOperation batch, byte[]
key) {
}
+ @Override
+ public void deleteRangeWithBatch(BatchOperation batch, byte[] beginKey,
byte[] endKey) {
+ if (batch instanceof RDBBatchOperation) {
+ ((RDBBatchOperation) batch).deleteRange(family, beginKey, endKey);
+ } else {
+ throw new IllegalArgumentException("batch should be RDBBatchOperation");
+ }
+ }
+
@Override
public KeyValueIterator<byte[], byte[]> iterator(byte[] prefix,
KeyValueIterator.Type type)
throws RocksDatabaseException {
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index cf0c84f375e..24f1971dccb 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -302,6 +302,16 @@ public void batchDelete(ManagedWriteBatch writeBatch,
byte[] key)
}
}
+ public void batchDeleteRange(ManagedWriteBatch writeBatch, byte[]
beginKey, byte[] endKey)
+ throws RocksDatabaseException {
+ try (UncheckedAutoCloseable ignored = acquire()) {
+ writeBatch.deleteRange(getHandle(), beginKey, endKey);
+ } catch (RocksDBException e) {
+ throw toRocksDatabaseException(this, "batchDeleteRange key " +
bytes2String(beginKey) + " - " +
+ bytes2String(endKey), e);
+ }
+ }
+
public void batchPut(ManagedWriteBatch writeBatch, byte[] key, byte[]
value)
throws RocksDatabaseException {
if (LOG.isDebugEnabled()) {
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
index 98ae6ff621b..7f5d74ad4ee 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
@@ -134,6 +134,14 @@ default VALUE getReadCopy(KEY key) throws
RocksDatabaseException, CodecException
*/
void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException;
+ /**
+ * Deletes a range of keys from the metadata store as part of a batch
operation.
+ * @param batch Batch operation to perform the delete operation.
+ * @param beginKey start metadata key, inclusive.
+ * @param endKey end metadata key, exclusive.
+ */
+ void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey)
throws CodecException;
+
/**
* Deletes a range of keys from the metadata store.
*
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index 978e7168c20..cd02c91ecb3 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -380,6 +380,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key)
throws CodecException
rawTable.deleteWithBatch(batch, encodeKey(key));
}
+ @Override
+ public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY
endKey) throws CodecException {
+ rawTable.deleteRangeWithBatch(batch, encodeKey(beginKey),
encodeKey(endKey));
+ }
+
@Override
public void deleteRange(KEY beginKey, KEY endKey) throws
RocksDatabaseException, CodecException {
rawTable.deleteRange(encodeKey(beginKey), encodeKey(endKey));
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
index f234364ade4..51baeb45177 100644
---
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
@@ -79,6 +79,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key) {
throw new UnsupportedOperationException();
}
+ @Override
+ public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY
endKey) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void deleteRange(KEY beginKey, KEY endKey) {
throw new UnsupportedOperationException();
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java
new file mode 100644
index 00000000000..bbf53b9a960
--- /dev/null
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import static java.util.Arrays.asList;
+import static org.apache.hadoop.hdds.StringUtils.bytes2String;
+import static org.apache.hadoop.hdds.StringUtils.string2Bytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
+import org.mockito.Mockito;
+import org.rocksdb.ColumnFamilyHandle;
+
+/**
+ * Test class for verifying batch operations with delete ranges using the
+ * RDBBatchOperation and MockedConstruction of ManagedWriteBatch.
+ *
+ * This test class includes:
+ * - Mocking and tracking of operations including put, delete, and delete range
+ * within a batch operation.
+ * - Validation of committed operations using assertions on collected data.
+ * - Ensures that the batch operation interacts correctly with the
+ * RocksDatabase and ColumnFamilyHandle components.
+ *
+ * The test method includes:
+ * 1. Setup of mocked ColumnFamilyHandle and RocksDatabase.ColumnFamily.
+ * 2. Mocking of methods to track operations performed on*/
+public class TestRDBBatchOperation {
+ @Test
+ public void testBatchOperationWithDeleteRange() throws
RocksDatabaseException {
+ final List<Pair<Pair<String, String>, Integer>> deleteKeyRangePairs = new
ArrayList<>();
+ final List<Pair<Pair<String, String>, Integer>> putKeys = new
ArrayList<>();
+ final List<Pair<String, Integer>> deleteKeys = new ArrayList<>();
+ AtomicInteger cnt = new AtomicInteger(0);
+ try (MockedConstruction<ManagedWriteBatch> mockedConstruction =
Mockito.mockConstruction(ManagedWriteBatch.class,
+ (writeBatch, context) -> {
+ doAnswer(i -> {
+ deleteKeyRangePairs.add(Pair.of(Pair.of(bytes2String((byte[])
i.getArgument(1)),
+ bytes2String((byte[]) i.getArgument(2))),
cnt.getAndIncrement()));
+ return null;
+
}).when(writeBatch).deleteRange(Mockito.any(ColumnFamilyHandle.class),
Mockito.any(byte[].class),
+ Mockito.any(byte[].class));
+ doAnswer(i -> {
+ putKeys.add(Pair.of(Pair.of(bytes2String((byte[])
i.getArgument(1)),
+ bytes2String((byte[]) i.getArgument(2))),
+ cnt.getAndIncrement()));
+ return null;
+ }).when(writeBatch)
+ .put(Mockito.any(ColumnFamilyHandle.class),
Mockito.any(byte[].class), Mockito.any(byte[].class));
+ doAnswer(i -> {
+ deleteKeys.add(Pair.of(bytes2String((byte[]) i.getArgument(1)),
cnt.getAndIncrement()));
+ return null;
+ }).when(writeBatch).delete(Mockito.any(ColumnFamilyHandle.class),
Mockito.any(byte[].class));
+
+ });
+ RDBBatchOperation batchOperation = new RDBBatchOperation()) {
+ ColumnFamilyHandle columnFamilyHandle =
Mockito.mock(ColumnFamilyHandle.class);
+ RocksDatabase.ColumnFamily columnFamily =
Mockito.mock(RocksDatabase.ColumnFamily.class);
+ doAnswer((i) -> {
+ ((ManagedWriteBatch)i.getArgument(0))
+ .put(columnFamilyHandle, (byte[]) i.getArgument(1), (byte[])
i.getArgument(2));
+ return null;
+ }).when(columnFamily).batchPut(any(ManagedWriteBatch.class),
any(byte[].class), any(byte[].class));
+
+ doAnswer((i) -> {
+ ((ManagedWriteBatch)i.getArgument(0))
+ .deleteRange(columnFamilyHandle, (byte[]) i.getArgument(1),
(byte[]) i.getArgument(2));
+ return null;
+ }).when(columnFamily).batchDeleteRange(any(ManagedWriteBatch.class),
any(byte[].class), any(byte[].class));
+
+ doAnswer((i) -> {
+ ((ManagedWriteBatch)i.getArgument(0))
+ .delete(columnFamilyHandle, (byte[]) i.getArgument(1));
+ return null;
+ }).when(columnFamily).batchDelete(any(ManagedWriteBatch.class),
any(byte[].class));
+
+ when(columnFamily.getHandle()).thenReturn(columnFamilyHandle);
+ when(columnFamily.getName()).thenReturn("test");
+ batchOperation.put(columnFamily, string2Bytes("key01"),
string2Bytes("value01"));
+ batchOperation.put(columnFamily, string2Bytes("key02"),
string2Bytes("value02"));
+ batchOperation.put(columnFamily, string2Bytes("key03"),
string2Bytes("value03"));
+ batchOperation.put(columnFamily, string2Bytes("key03"),
string2Bytes("value04"));
+ batchOperation.delete(columnFamily, string2Bytes("key05"));
+ batchOperation.deleteRange(columnFamily, string2Bytes("key01"),
string2Bytes("key02"));
+ batchOperation.deleteRange(columnFamily, string2Bytes("key02"),
string2Bytes("key03"));
+ batchOperation.put(columnFamily, string2Bytes("key04"),
string2Bytes("value04"));
+ batchOperation.put(columnFamily, string2Bytes("key06"),
string2Bytes("value05"));
+ batchOperation.deleteRange(columnFamily, string2Bytes("key06"),
string2Bytes("key12"));
+ batchOperation.deleteRange(columnFamily, string2Bytes("key09"),
string2Bytes("key10"));
+ RocksDatabase db = Mockito.mock(RocksDatabase.class);
+ doNothing().when(db).batchWrite(any());
+ batchOperation.commit(db);
+ assertEquals(deleteKeys, Collections.singletonList(Pair.of("key05", 1)));
+ assertEquals(deleteKeyRangePairs, asList(Pair.of(Pair.of("key01",
"key02"), 2),
+ Pair.of(Pair.of("key02", "key03"), 3),
+ Pair.of(Pair.of("key06", "key12"), 5),
+ Pair.of(Pair.of("key09", "key10"), 6)));
+ assertEquals(putKeys, Arrays.asList(Pair.of(Pair.of("key03", "value04"),
0),
+ Pair.of(Pair.of("key04", "value04"), 4)));
+ }
+ }
+}
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
index 008878de1d3..bd8a00becf0 100644
---
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.utils.db;
+import static org.apache.hadoop.hdds.StringUtils.bytes2String;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -66,7 +67,7 @@ public class TestRDBTableStore {
public static final int MAX_DB_UPDATES_SIZE_THRESHOLD = 80;
private static int count = 0;
private final List<String> families =
- Arrays.asList(StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
+ Arrays.asList(bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
"First", "Second", "Third",
"Fourth", "Fifth",
"Sixth", "Seventh",
@@ -635,21 +636,21 @@ public void testPrefixedRangeKVs() throws Exception {
// test start with a middle key
startKey = StringUtils.string2Bytes(
- StringUtils.bytes2String(samplePrefix) + "3");
+ bytes2String(samplePrefix) + "3");
rangeKVs = testTable.getRangeKVs(startKey, blockCount, samplePrefix);
assertEquals(2, rangeKVs.size());
// test with a filter
- final KeyPrefixFilter filter1 =
KeyPrefixFilter.newFilter(StringUtils.bytes2String(samplePrefix) + "1");
+ final KeyPrefixFilter filter1 =
KeyPrefixFilter.newFilter(bytes2String(samplePrefix) + "1");
startKey = StringUtils.string2Bytes(
- StringUtils.bytes2String(samplePrefix));
+ bytes2String(samplePrefix));
rangeKVs = testTable.getRangeKVs(startKey, blockCount,
samplePrefix, filter1);
assertEquals(1, rangeKVs.size());
// test start with a non-exist key
startKey = StringUtils.string2Bytes(
- StringUtils.bytes2String(samplePrefix) + 123);
+ bytes2String(samplePrefix) + 123);
rangeKVs = testTable.getRangeKVs(startKey, 10, samplePrefix);
assertEquals(0, rangeKVs.size());
}
@@ -775,4 +776,77 @@ private void populateTable(Table<String, String> table,
}
}
}
+
+ @Test
+ public void batchDeleteWithRange() throws Exception {
+ final Table<byte[], byte[]> testTable = rdbStore.getTable("Fifth");
+ try (BatchOperation batch = rdbStore.initBatchOperation()) {
+
+ //given
+ String keyStr = RandomStringUtils.secure().next(10);
+ byte[] startKey = ("1-" + keyStr).getBytes(StandardCharsets.UTF_8);
+ byte[] keyInRange1 = ("2-" + keyStr).getBytes(StandardCharsets.UTF_8);
+ byte[] keyInRange2 = ("3-" + keyStr).getBytes(StandardCharsets.UTF_8);
+ byte[] endKey = ("4-" + keyStr).getBytes(StandardCharsets.UTF_8);
+ byte[] value =
+ RandomStringUtils.secure().next(10).getBytes(StandardCharsets.UTF_8);
+ testTable.put(startKey, value);
+ testTable.put(keyInRange1, value);
+ testTable.put(keyInRange2, value);
+ testTable.put(endKey, value);
+ assertNotNull(testTable.get(startKey));
+ assertNotNull(testTable.get(keyInRange1));
+ assertNotNull(testTable.get(keyInRange2));
+ assertNotNull(testTable.get(endKey));
+
+ //when
+ testTable.deleteRangeWithBatch(batch, startKey, endKey);
+ rdbStore.commitBatchOperation(batch);
+
+ //then
+ assertNull(testTable.get(startKey));
+ assertNull(testTable.get(keyInRange1));
+ assertNull(testTable.get(keyInRange2));
+ assertNotNull(testTable.get(endKey));
+ }
+ }
+
+ @Test
+ public void orderOfBatchOperations() throws Exception {
+ final Table<byte[], byte[]> testTable = rdbStore.getTable("Fifth");
+ try (BatchOperation batch = rdbStore.initBatchOperation()) {
+
+ //given
+ String keyStr = RandomStringUtils.secure().next(10);
+ byte[] startKey = ("1-" + keyStr).getBytes(StandardCharsets.UTF_8);
+ byte[] keyInRange1 = ("2-" + keyStr).getBytes(StandardCharsets.UTF_8);
+ byte[] endKey = ("3-" + keyStr).getBytes(StandardCharsets.UTF_8);
+ byte[] value1 = ("value1-" +
RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8);
+ byte[] value2 = ("value2-" +
RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8);
+ byte[] value3 = ("value3-" +
RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8);
+
+ //when
+ testTable.putWithBatch(batch, startKey, value1);
+ testTable.putWithBatch(batch, keyInRange1, value1);
+ testTable.deleteWithBatch(batch, keyInRange1);
+ // ops map key should be <<startKey, endKey>, 1>
+ testTable.deleteRangeWithBatch(batch, startKey, endKey);
+ testTable.putWithBatch(batch, startKey, value2);
+ testTable.putWithBatch(batch, keyInRange1, value2);
+ // ops map key is <<startKey, keyInRange1>, 2>.
+ testTable.deleteRangeWithBatch(batch, startKey, keyInRange1);
+ testTable.putWithBatch(batch, endKey, value1);
+ testTable.putWithBatch(batch, endKey, value2);
+ // ops map key is <<startKey, endKey>, 3>.
+ testTable.deleteRangeWithBatch(batch, startKey, endKey);
+ testTable.putWithBatch(batch, startKey, value3);
+
+ rdbStore.commitBatchOperation(batch);
+
+ //then
+ assertEquals(bytes2String(value3),
bytes2String(testTable.get(startKey)));
+ assertNull(testTable.get(keyInRange1));
+ assertEquals(bytes2String(value2), bytes2String(testTable.get(endKey)));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]