This is an automated email from the ASF dual-hosted git repository.
swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 90f62ad63b9 HDDS-14240. Revert HDDS-13415 to separate out Refactoring
and implementation of Delete Range with batch (#9554)
90f62ad63b9 is described below
commit 90f62ad63b99b1534f73a8c5790157ed40619ee0
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Thu Dec 25 00:20:57 2025 -0500
HDDS-14240. Revert HDDS-13415 to separate out Refactoring and
implementation of Delete Range with batch (#9554)
---
.../ozone/container/metadata/DatanodeTable.java | 5 -
.../hadoop/hdds/utils/db/RDBBatchOperation.java | 434 +++------------------
.../org/apache/hadoop/hdds/utils/db/RDBTable.java | 9 -
.../apache/hadoop/hdds/utils/db/RocksDatabase.java | 10 -
.../org/apache/hadoop/hdds/utils/db/Table.java | 8 -
.../apache/hadoop/hdds/utils/db/TypedTable.java | 5 -
.../hadoop/hdds/utils/db/InMemoryTestTable.java | 5 -
.../hdds/utils/db/TestRDBBatchOperation.java | 129 ------
.../hadoop/hdds/utils/db/TestRDBTableStore.java | 84 +---
.../apache/hadoop/ozone/om/OmSnapshotManager.java | 7 +-
10 files changed, 76 insertions(+), 620 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
index 5b39147f3e2..ed7a05027e8 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
@@ -73,11 +73,6 @@ public void deleteWithBatch(BatchOperation batch, KEY key)
throws CodecException
table.deleteWithBatch(batch, key);
}
- @Override
- public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY
endKey) throws CodecException {
- table.deleteRangeWithBatch(batch, beginKey, endKey);
- }
-
@Override
public final KeyValueIterator<KEY, VALUE> iterator(KEY prefix, IteratorType
type) {
throw new UnsupportedOperationException("Iterating tables directly is not"
+
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
index 49693bd2967..87d9ffc625c 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
@@ -20,22 +20,12 @@
import static org.apache.hadoop.hdds.StringUtils.bytes2String;
import com.google.common.base.Preconditions;
-import com.google.common.primitives.UnsignedBytes;
-import java.io.Closeable;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch;
import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions;
@@ -60,7 +50,7 @@ public final class RDBBatchOperation implements
BatchOperation {
private final OpCache opCache = new OpCache();
- private enum Op { DELETE, PUT, DELETE_RANGE }
+ private enum Op { DELETE }
public static RDBBatchOperation newAtomicOperation() {
return newAtomicOperation(new ManagedWriteBatch());
@@ -85,11 +75,11 @@ private static String countSize2String(int count, long
size) {
}
/**
- * The key type of {@link RDBBatchOperation.OpCache.FamilyCache#opsKeys}.
+ * The key type of {@link RDBBatchOperation.OpCache.FamilyCache#ops}.
* To implement {@link #equals(Object)} and {@link #hashCode()}
* based on the contents of the bytes.
*/
- static final class Bytes implements Comparable<Bytes> {
+ static final class Bytes {
private final byte[] array;
private final CodecBuffer buffer;
/** Cache the hash value. */
@@ -107,6 +97,10 @@ static final class Bytes implements Comparable<Bytes> {
this.hash = ByteBuffer.wrap(array).hashCode();
}
+ byte[] array() {
+ return array;
+ }
+
ByteBuffer asReadOnlyByteBuffer() {
return buffer.asReadOnlyByteBuffer();
}
@@ -139,195 +133,6 @@ public String toString() {
return array != null ? bytes2String(array)
: bytes2String(asReadOnlyByteBuffer());
}
-
- // This method mimics the ByteWiseComparator in RocksDB.
- @Override
- public int compareTo(RDBBatchOperation.Bytes that) {
- final ByteBuffer thisBuf = this.array != null ?
- ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer();
- final ByteBuffer thatBuf = that.array != null ?
- ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer();
-
- for (int i = 0; i < Math.min(thisBuf.remaining(), thatBuf.remaining());
i++) {
- int cmp = UnsignedBytes.compare(thisBuf.get(i), thatBuf.get(i));
- if (cmp != 0) {
- return cmp;
- }
- }
- return thisBuf.remaining() - thatBuf.remaining();
- }
- }
-
- private abstract class Operation implements Closeable {
- private Bytes keyBytes;
-
- private Operation(Bytes keyBytes) {
- this.keyBytes = keyBytes;
- }
-
- abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws
RocksDatabaseException;
-
- abstract int keyLen();
-
- abstract int valLen();
-
- Bytes getKey() {
- return keyBytes;
- }
-
- int totalLength() {
- return keyLen() + valLen();
- }
-
- abstract Op getOpType();
-
- @Override
- public void close() {
- }
- }
-
- /**
- * Delete operation to be applied to a {@link ColumnFamily} batch.
- */
- private final class DeleteOperation extends Operation {
- private final byte[] key;
-
- private DeleteOperation(byte[] key, Bytes keyBytes) {
- super(Objects.requireNonNull(keyBytes, "keyBytes == null"));
- this.key = Objects.requireNonNull(key, "key == null");
- }
-
- @Override
- public void apply(ColumnFamily family, ManagedWriteBatch batch) throws
RocksDatabaseException {
- family.batchDelete(batch, this.key);
- }
-
- @Override
- public int keyLen() {
- return key.length;
- }
-
- @Override
- public int valLen() {
- return 0;
- }
-
- @Override
- public Op getOpType() {
- return Op.DELETE;
- }
- }
-
- /**
- * Put operation to be applied to a {@link ColumnFamily} batch using the
CodecBuffer api.
- */
- private final class CodecBufferPutOperation extends Operation {
- private final CodecBuffer key;
- private final CodecBuffer value;
- private final AtomicBoolean closed = new AtomicBoolean(false);
-
- private CodecBufferPutOperation(CodecBuffer key, CodecBuffer value, Bytes
keyBytes) {
- super(keyBytes);
- this.key = key;
- this.value = value;
- }
-
- @Override
- public void apply(ColumnFamily family, ManagedWriteBatch batch) throws
RocksDatabaseException {
- family.batchPut(batch, key.asReadOnlyByteBuffer(),
value.asReadOnlyByteBuffer());
- }
-
- @Override
- public int keyLen() {
- return key.readableBytes();
- }
-
- @Override
- public int valLen() {
- return value.readableBytes();
- }
-
- @Override
- public Op getOpType() {
- return Op.PUT;
- }
-
- @Override
- public void close() {
- if (closed.compareAndSet(false, true)) {
- key.release();
- value.release();
- }
- super.close();
- }
- }
-
- /**
- * Put operation to be applied to a {@link ColumnFamily} batch using the
byte array api.
- */
- private final class ByteArrayPutOperation extends Operation {
- private final byte[] key;
- private final byte[] value;
-
- private ByteArrayPutOperation(byte[] key, byte[] value, Bytes keyBytes) {
- super(Objects.requireNonNull(keyBytes));
- this.key = Objects.requireNonNull(key, "key == null");
- this.value = Objects.requireNonNull(value, "value == null");
- }
-
- @Override
- public void apply(ColumnFamily family, ManagedWriteBatch batch) throws
RocksDatabaseException {
- family.batchPut(batch, key, value);
- }
-
- @Override
- public int keyLen() {
- return key.length;
- }
-
- @Override
- public int valLen() {
- return value.length;
- }
-
- @Override
- public Op getOpType() {
- return Op.PUT;
- }
- }
-
- /**
- * Delete range operation to be applied to a {@link ColumnFamily} batch.
- */
- private final class DeleteRangeOperation extends Operation {
- private final byte[] startKey;
- private final byte[] endKey;
-
- private DeleteRangeOperation(byte[] startKey, byte[] endKey) {
- super(null);
- this.startKey = Objects.requireNonNull(startKey, "startKey == null");
- this.endKey = Objects.requireNonNull(endKey, "endKey == null");
- }
-
- @Override
- public void apply(ColumnFamily family, ManagedWriteBatch batch) throws
RocksDatabaseException {
- family.batchDeleteRange(batch, startKey, endKey);
- }
-
- @Override
- public int keyLen() {
- return startKey.length + endKey.length;
- }
-
- @Override
- public int valLen() {
- return 0;
- }
-
- @Override
- public Op getOpType() {
- return Op.DELETE_RANGE;
- }
}
/** Cache and deduplicate db ops (put/delete). */
@@ -339,40 +144,12 @@ private class OpCache {
private class FamilyCache {
private final ColumnFamily family;
/**
- * A mapping of operation keys to their respective indices in {@code
FamilyCache}.
- *
- * Key details:
- * - Maintains a mapping of unique operation keys to their insertion or
processing order.
- * - Used internally to manage and sort operations during batch writes.
- * - Facilitates filtering, overwriting, or deletion of operations based
on their keys.
- *
- * Constraints:
- * - Keys must be unique, represented using {@link Bytes}, to avoid
collisions.
- * - Each key is associated with a unique integer index to track
insertion order.
- *
- * This field plays a critical role in managing the logical consistency
and proper execution
- * order of operations stored in the batch when interacting with a
RocksDB-backed system.
- */
- private final Map<Bytes, Integer> opsKeys = new HashMap<>();
- /**
- * Maintains a mapping of unique operation indices to their
corresponding {@code Operation} instances.
- *
- * This map serves as the primary container for recording operations in
preparation for a batch write
- * within a RocksDB-backed system. Each operation is referenced by an
integer index, which determines
- * its insertion order and ensures correct sequencing during batch
execution.
- *
- * Key characteristics:
- * - Stores operations of type {@code Operation}.
- * - Uses a unique integer key (index) for mapping each operation.
- * - Serves as an intermediary structure during batch preparation and
execution.
- *
- * Usage context:
- * - This map is managed as part of the batch-writing process, which
involves organizing,
- * filtering, and applying multiple operations in a single cohesive
batch.
- * - Operations stored in this map are expected to define specific
actions (e.g., put, delete,
- * delete range) and their associated data (e.g., keys, values).
+ * A (dbKey -> dbValue) map, where the dbKey type is {@link Bytes}
+ * and the dbValue type is {@link Object}.
+ * When dbValue is a byte[]/{@link ByteBuffer}, it represents a put-op.
+ * Otherwise, it represents a delete-op (dbValue is {@link Op#DELETE}).
*/
- private final Map<Integer, Operation> batchOps = new HashMap<>();
+ private final Map<Bytes, Object> ops = new HashMap<>();
private boolean isCommit;
private long batchSize;
@@ -380,105 +157,31 @@ private class FamilyCache {
private int discardedCount;
private int putCount;
private int delCount;
- private int delRangeCount;
- private AtomicInteger opIndex;
FamilyCache(ColumnFamily family) {
this.family = family;
- this.opIndex = new AtomicInteger(0);
}
- /**
- * Prepares a batch write operation for a RocksDB-backed system.
- *
- * This method ensures the orderly execution of operations accumulated
in the batch,
- * respecting their respective types and order of insertion.
- *
- * Key functionalities:
- * 1. Ensures that the batch is not already committed before proceeding.
- * 2. Sorts all operations by their `opIndex` to maintain a consistent
execution order.
- * 3. Filters and adapts operations to account for any delete range
operations that might
- * affect other operations in the batch:
- * - Operations with keys that fall within the range specified by a
delete range operation
- * are discarded.
- * - Delete range operations are executed in their correct order.
- * 4. Applies remaining operations to the write batch, ensuring proper
filtering and execution.
- * 5. Logs a summary of the batch execution for debugging purposes.
- *
- * Throws:
- * - RocksDatabaseException if any error occurs while applying
operations to the write batch.
- *
- * Prerequisites:
- * - The method assumes that the operations are represented by
`Operation` objects, each of which
- * encapsulates the logic for its specific type.
- * - Delete range operations must be represented by the
`DeleteRangeOperation` class.
- */
+ /** Prepare batch write for the entire family. */
void prepareBatchWrite() throws RocksDatabaseException {
Preconditions.checkState(!isCommit, "%s is already committed.", this);
isCommit = true;
- // Sort Entries based on opIndex and flush the operation to the batch
in the same order.
- List<Operation> ops =
batchOps.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey))
- .map(Map.Entry::getValue).collect(Collectors.toList());
- List<List<Integer>> deleteRangeIndices = new ArrayList<>();
- int index = 0;
- int prevIndex = -2;
- for (Operation op : ops) {
- if (Op.DELETE_RANGE == op.getOpType()) {
- if (index - prevIndex > 1) {
- deleteRangeIndices.add(new ArrayList<>());
- }
- List<Integer> continuousIndices =
deleteRangeIndices.get(deleteRangeIndices.size() - 1);
- continuousIndices.add(index);
- prevIndex = index;
- }
- index++;
- }
- // This is to apply the last batch of entries after the last
DeleteRangeOperation.
- deleteRangeIndices.add(Collections.emptyList());
- int startIndex = 0;
- for (List<Integer> continuousDeleteRangeIndices : deleteRangeIndices) {
- List<DeleteRangeOperation> deleteRangeOps =
continuousDeleteRangeIndices.stream()
- .map(i -> (DeleteRangeOperation)ops.get(i))
- .collect(Collectors.toList());
- List<Pair<Bytes, Bytes>> deleteRangeOpsRanges =
continuousDeleteRangeIndices.stream()
- .map(i -> (DeleteRangeOperation)ops.get(i))
- .map(i -> Pair.of(new Bytes(i.startKey), new Bytes(i.endKey)))
- .collect(Collectors.toList());
- int firstOpIndex = continuousDeleteRangeIndices.isEmpty() ?
ops.size() : continuousDeleteRangeIndices.get(0);
-
- for (int i = startIndex; i < firstOpIndex; i++) {
- Operation op = ops.get(i);
- Bytes key = op.getKey();
- // Compare the key with the startKey and endKey of the delete
range operation. Add to Batch if key
- // doesn't fall [startKey, endKey) range.
- boolean keyInRange = false;
- Pair<Bytes, Bytes> deleteRange = null;
- for (Pair<Bytes, Bytes> deleteRangeOp : deleteRangeOpsRanges) {
- if (key.compareTo(deleteRangeOp.getLeft()) >= 0 &&
key.compareTo(deleteRangeOp.getRight()) < 0) {
- keyInRange = true;
- deleteRange = deleteRangeOp;
- break;
- }
- }
- if (!keyInRange) {
- op.apply(family, writeBatch);
- } else {
- Pair<Bytes, Bytes> finalDeleteRange = deleteRange;
- debug(() -> String.format("Discarding Operation with Key: %s as
it falls within the range of [%s, %s)",
- bytes2String(key.asReadOnlyByteBuffer()),
-
bytes2String(finalDeleteRange.getKey().asReadOnlyByteBuffer()),
-
bytes2String(finalDeleteRange.getRight().asReadOnlyByteBuffer())));
- discardedCount++;
- discardedSize += op.totalLength();
- }
+ for (Map.Entry<Bytes, Object> op : ops.entrySet()) {
+ final Bytes key = op.getKey();
+ final Object value = op.getValue();
+ if (value instanceof byte[]) {
+ family.batchPut(writeBatch, key.array(), (byte[]) value);
+ } else if (value instanceof CodecBuffer) {
+ family.batchPut(writeBatch, key.asReadOnlyByteBuffer(),
+ ((CodecBuffer) value).asReadOnlyByteBuffer());
+ } else if (value == Op.DELETE) {
+ family.batchDelete(writeBatch, key.array());
+ } else {
+ throw new IllegalStateException("Unexpected value: " + value
+ + ", class=" + value.getClass().getSimpleName());
}
- for (DeleteRangeOperation deleteRangeOp : deleteRangeOps) {
- // Apply the delete range operation to the batch.
- deleteRangeOp.apply(family, writeBatch);
- }
- // Update the startIndex to start from the next operation after the
delete range operation.
- startIndex = firstOpIndex + continuousDeleteRangeIndices.size();
}
+
debug(this::summary);
}
@@ -491,38 +194,48 @@ void clear() {
final boolean warn = !isCommit && batchSize > 0;
String details = warn ? summary() : null;
- IOUtils.close(LOG, batchOps.values());
- batchOps.clear();
+ for (Object value : ops.values()) {
+ if (value instanceof CodecBuffer) {
+ ((CodecBuffer) value).release(); // the key will also be released
+ }
+ }
+ ops.clear();
if (warn) {
LOG.warn("discarding changes {}", details);
}
}
- private void deleteIfExist(Bytes key, boolean removeFromIndexMap) {
+ void putOrDelete(Bytes key, int keyLen, Object val, int valLen) {
+ Preconditions.checkState(!isCommit, "%s is already committed.", this);
+ batchSize += keyLen + valLen;
// remove previous first in order to call release()
- if (opsKeys.containsKey(key)) {
- int previousIndex = removeFromIndexMap ? opsKeys.remove(key) :
opsKeys.get(key);
- final Operation previous = batchOps.remove(previousIndex);
- previous.close();
- discardedSize += previous.totalLength();
+ final Object previous = ops.remove(key);
+ if (previous != null) {
+ final boolean isPut = previous != Op.DELETE;
+ final int preLen;
+ if (!isPut) {
+ preLen = 0;
+ } else if (previous instanceof CodecBuffer) {
+ final CodecBuffer previousValue = (CodecBuffer) previous;
+ preLen = previousValue.readableBytes();
+ previousValue.release(); // key will also be released
+ } else if (previous instanceof byte[]) {
+ preLen = ((byte[]) previous).length;
+ } else {
+ throw new IllegalStateException("Unexpected previous: " + previous
+ + ", class=" + previous.getClass().getSimpleName());
+ }
+ discardedSize += keyLen + preLen;
discardedCount++;
- debug(() -> String.format("%s overwriting a previous %s[valLen =>
%s]", this, previous.getOpType(),
- previous.valLen()));
+ debug(() -> String.format("%s overwriting a previous %s", this,
+ isPut ? "put (value: " + byteSize2String(preLen) + ")" : "del"));
}
- }
+ final Object overwritten = ops.put(key, val);
+ Preconditions.checkState(overwritten == null);
- void overWriteOpIfExist(Bytes key, Operation operation) {
- Preconditions.checkState(!isCommit, "%s is already committed.", this);
- deleteIfExist(key, true);
- batchSize += operation.totalLength();
- int newIndex = opIndex.getAndIncrement();
- final Integer overwritten = opsKeys.put(key, newIndex);
- batchOps.put(newIndex, operation);
- Preconditions.checkState(overwritten == null ||
!batchOps.containsKey(overwritten));
debug(() -> String.format("%s %s, %s; key=%s", this,
- Op.DELETE == operation.getOpType() ?
delString(operation.totalLength()) : putString(operation.keyLen(),
- operation.valLen()),
+ valLen == 0 ? delString(keyLen) : putString(keyLen, valLen),
batchSizeDiscardedString(), key));
}
@@ -530,25 +243,19 @@ void put(CodecBuffer key, CodecBuffer value) {
putCount++;
// always release the key with the value
- Bytes keyBytes = new Bytes(key);
- overWriteOpIfExist(keyBytes, new CodecBufferPutOperation(key, value,
keyBytes));
+ value.getReleaseFuture().thenAccept(v -> key.release());
+ putOrDelete(new Bytes(key), key.readableBytes(),
+ value, value.readableBytes());
}
void put(byte[] key, byte[] value) {
putCount++;
- Bytes keyBytes = new Bytes(key);
- overWriteOpIfExist(keyBytes, new ByteArrayPutOperation(key, value,
keyBytes));
+ putOrDelete(new Bytes(key), key.length, value, value.length);
}
void delete(byte[] key) {
delCount++;
- Bytes keyBytes = new Bytes(key);
- overWriteOpIfExist(keyBytes, new DeleteOperation(key, keyBytes));
- }
-
- void deleteRange(byte[] startKey, byte[] endKey) {
- delRangeCount++;
- batchOps.put(opIndex.getAndIncrement(), new
DeleteRangeOperation(startKey, endKey));
+ putOrDelete(new Bytes(key), key.length, Op.DELETE, 0);
}
String putString(int keySize, int valueSize) {
@@ -588,11 +295,6 @@ void delete(ColumnFamily family, byte[] key) {
.delete(key);
}
- void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) {
- name2cache.computeIfAbsent(family.getName(), k -> new
FamilyCache(family))
- .deleteRange(startKey, endKey);
- }
-
/** Prepare batch write for the entire cache. */
UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException {
for (Map.Entry<String, FamilyCache> e : name2cache.entrySet()) {
@@ -614,7 +316,6 @@ String getCommitString() {
int opSize = 0;
int discardedCount = 0;
int discardedSize = 0;
- int delRangeCount = 0;
for (FamilyCache f : name2cache.values()) {
putCount += f.putCount;
@@ -622,13 +323,12 @@ String getCommitString() {
opSize += f.batchSize;
discardedCount += f.discardedCount;
discardedSize += f.discardedSize;
- delRangeCount += f.delRangeCount;
}
final int opCount = putCount + delCount;
return String.format(
- "#put=%s, #del=%s, #delRange=%s, batchSize: %s, discarded: %s,
committed: %s",
- putCount, delCount, delRangeCount,
+ "#put=%s, #del=%s, batchSize: %s, discarded: %s, committed: %s",
+ putCount, delCount,
countSize2String(opCount, opSize),
countSize2String(discardedCount, discardedSize),
countSize2String(opCount - discardedCount, opSize - discardedSize));
@@ -678,8 +378,4 @@ public void put(ColumnFamily family, CodecBuffer key,
CodecBuffer value) {
public void put(ColumnFamily family, byte[] key, byte[] value) {
opCache.put(family, key, value);
}
-
- public void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey)
{
- opCache.deleteRange(family, startKey, endKey);
- }
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index 045f020b2fe..f732735cbe3 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -203,15 +203,6 @@ public void deleteWithBatch(BatchOperation batch, byte[]
key) {
}
- @Override
- public void deleteRangeWithBatch(BatchOperation batch, byte[] beginKey,
byte[] endKey) {
- if (batch instanceof RDBBatchOperation) {
- ((RDBBatchOperation) batch).deleteRange(family, beginKey, endKey);
- } else {
- throw new IllegalArgumentException("batch should be RDBBatchOperation");
- }
- }
-
@Override
public KeyValueIterator<byte[], byte[]> iterator(byte[] prefix, IteratorType
type)
throws RocksDatabaseException {
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index bdc5124ac3b..659954a861b 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -308,16 +308,6 @@ public void batchDelete(ManagedWriteBatch writeBatch,
byte[] key)
}
}
- public void batchDeleteRange(ManagedWriteBatch writeBatch, byte[]
beginKey, byte[] endKey)
- throws RocksDatabaseException {
- try (UncheckedAutoCloseable ignored = acquire()) {
- writeBatch.deleteRange(getHandle(), beginKey, endKey);
- } catch (RocksDBException e) {
- throw toRocksDatabaseException(this, "batchDeleteRange key " +
bytes2String(beginKey) + " - " +
- bytes2String(endKey), e);
- }
- }
-
public void batchPut(ManagedWriteBatch writeBatch, byte[] key, byte[]
value)
throws RocksDatabaseException {
if (LOG.isDebugEnabled()) {
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
index 6904f22d7d8..fc049034406 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
@@ -134,14 +134,6 @@ default VALUE getReadCopy(KEY key) throws
RocksDatabaseException, CodecException
*/
void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException;
- /**
- * Deletes a range of keys from the metadata store as part of a batch
operation.
- * @param batch Batch operation to perform the delete operation.
- * @param beginKey start metadata key, inclusive.
- * @param endKey end metadata key, exclusive.
- */
- void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey)
throws CodecException;
-
/**
* Deletes a range of keys from the metadata store.
*
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index 6d2fa3a99ff..8000d48c618 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -380,11 +380,6 @@ public void deleteWithBatch(BatchOperation batch, KEY key)
throws CodecException
rawTable.deleteWithBatch(batch, encodeKey(key));
}
- @Override
- public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY
endKey) throws CodecException {
- rawTable.deleteRangeWithBatch(batch, encodeKey(beginKey),
encodeKey(endKey));
- }
-
@Override
public void deleteRange(KEY beginKey, KEY endKey) throws
RocksDatabaseException, CodecException {
rawTable.deleteRange(encodeKey(beginKey), encodeKey(endKey));
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
index 7f2ce3fc3a5..1dbb5029713 100644
---
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
@@ -90,11 +90,6 @@ public void deleteWithBatch(BatchOperation batch, KEY key) {
throw new UnsupportedOperationException();
}
- @Override
- public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY
endKey) {
- throw new UnsupportedOperationException();
- }
-
@Override
public void deleteRange(KEY beginKey, KEY endKey) {
map.subMap(beginKey, endKey).clear();
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java
deleted file mode 100644
index bd33ab070ce..00000000000
---
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.utils.db;
-
-import static java.util.Arrays.asList;
-import static org.apache.hadoop.hdds.StringUtils.bytes2String;
-import static org.apache.hadoop.hdds.StringUtils.string2Bytes;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch;
-import org.junit.jupiter.api.Test;
-import org.mockito.MockedConstruction;
-import org.mockito.Mockito;
-import org.rocksdb.ColumnFamilyHandle;
-
-/**
- * Test class for verifying batch operations with delete ranges using the
- * RDBBatchOperation and MockedConstruction of ManagedWriteBatch.
- *
- * This test class includes:
- * - Mocking and tracking of operations including put, delete, and delete range
- * within a batch operation.
- * - Validation of committed operations using assertions on collected data.
- * - Ensures that the batch operation interacts correctly with the
- * RocksDatabase and ColumnFamilyHandle components.
- *
- * The test method includes:
- * 1. Setup of mocked ColumnFamilyHandle and RocksDatabase.ColumnFamily.
- * 2. Mocking of methods to track operations performed on*/
-public class TestRDBBatchOperation {
- @Test
- public void testBatchOperationWithDeleteRange() throws
RocksDatabaseException {
- final List<Pair<Pair<String, String>, Integer>> deleteKeyRangePairs = new
ArrayList<>();
- final List<Pair<Pair<String, String>, Integer>> putKeys = new
ArrayList<>();
- final List<Pair<String, Integer>> deleteKeys = new ArrayList<>();
- AtomicInteger cnt = new AtomicInteger(0);
- try (MockedConstruction<ManagedWriteBatch> mockedConstruction =
Mockito.mockConstruction(ManagedWriteBatch.class,
- (writeBatch, context) -> {
- doAnswer(i -> {
- deleteKeyRangePairs.add(Pair.of(Pair.of(bytes2String((byte[])
i.getArgument(1)),
- bytes2String((byte[]) i.getArgument(2))),
cnt.getAndIncrement()));
- return null;
-
}).when(writeBatch).deleteRange(Mockito.any(ColumnFamilyHandle.class),
Mockito.any(byte[].class),
- Mockito.any(byte[].class));
- doAnswer(i -> {
- putKeys.add(Pair.of(Pair.of(bytes2String((byte[])
i.getArgument(1)),
- bytes2String((byte[]) i.getArgument(2))),
- cnt.getAndIncrement()));
- return null;
- }).when(writeBatch)
- .put(Mockito.any(ColumnFamilyHandle.class),
Mockito.any(byte[].class), Mockito.any(byte[].class));
- doAnswer(i -> {
- deleteKeys.add(Pair.of(bytes2String((byte[]) i.getArgument(1)),
cnt.getAndIncrement()));
- return null;
- }).when(writeBatch).delete(Mockito.any(ColumnFamilyHandle.class),
Mockito.any(byte[].class));
-
- });
- RDBBatchOperation batchOperation =
RDBBatchOperation.newAtomicOperation()) {
- ColumnFamilyHandle columnFamilyHandle =
Mockito.mock(ColumnFamilyHandle.class);
- RocksDatabase.ColumnFamily columnFamily =
Mockito.mock(RocksDatabase.ColumnFamily.class);
- doAnswer((i) -> {
- ((ManagedWriteBatch)i.getArgument(0))
- .put(columnFamilyHandle, (byte[]) i.getArgument(1), (byte[])
i.getArgument(2));
- return null;
- }).when(columnFamily).batchPut(any(ManagedWriteBatch.class),
any(byte[].class), any(byte[].class));
-
- doAnswer((i) -> {
- ((ManagedWriteBatch)i.getArgument(0))
- .deleteRange(columnFamilyHandle, (byte[]) i.getArgument(1),
(byte[]) i.getArgument(2));
- return null;
- }).when(columnFamily).batchDeleteRange(any(ManagedWriteBatch.class),
any(byte[].class), any(byte[].class));
-
- doAnswer((i) -> {
- ((ManagedWriteBatch)i.getArgument(0))
- .delete(columnFamilyHandle, (byte[]) i.getArgument(1));
- return null;
- }).when(columnFamily).batchDelete(any(ManagedWriteBatch.class),
any(byte[].class));
-
- when(columnFamily.getHandle()).thenReturn(columnFamilyHandle);
- when(columnFamily.getName()).thenReturn("test");
- batchOperation.put(columnFamily, string2Bytes("key01"),
string2Bytes("value01"));
- batchOperation.put(columnFamily, string2Bytes("key02"),
string2Bytes("value02"));
- batchOperation.put(columnFamily, string2Bytes("key03"),
string2Bytes("value03"));
- batchOperation.put(columnFamily, string2Bytes("key03"),
string2Bytes("value04"));
- batchOperation.delete(columnFamily, string2Bytes("key05"));
- batchOperation.deleteRange(columnFamily, string2Bytes("key01"),
string2Bytes("key02"));
- batchOperation.deleteRange(columnFamily, string2Bytes("key02"),
string2Bytes("key03"));
- batchOperation.put(columnFamily, string2Bytes("key04"),
string2Bytes("value04"));
- batchOperation.put(columnFamily, string2Bytes("key06"),
string2Bytes("value05"));
- batchOperation.deleteRange(columnFamily, string2Bytes("key06"),
string2Bytes("key12"));
- batchOperation.deleteRange(columnFamily, string2Bytes("key09"),
string2Bytes("key10"));
- RocksDatabase db = Mockito.mock(RocksDatabase.class);
- doNothing().when(db).batchWrite(any());
- batchOperation.commit(db);
- assertEquals(deleteKeys, Collections.singletonList(Pair.of("key05", 1)));
- assertEquals(deleteKeyRangePairs, asList(Pair.of(Pair.of("key01",
"key02"), 2),
- Pair.of(Pair.of("key02", "key03"), 3),
- Pair.of(Pair.of("key06", "key12"), 5),
- Pair.of(Pair.of("key09", "key10"), 6)));
- assertEquals(putKeys, Arrays.asList(Pair.of(Pair.of("key03", "value04"),
0),
- Pair.of(Pair.of("key04", "value04"), 4)));
- }
- }
-}
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
index cd155f27a96..2741834c9d7 100644
---
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
@@ -17,7 +17,6 @@
package org.apache.hadoop.hdds.utils.db;
-import static org.apache.hadoop.hdds.StringUtils.bytes2String;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -67,7 +66,7 @@ public class TestRDBTableStore {
public static final int MAX_DB_UPDATES_SIZE_THRESHOLD = 80;
private static int count = 0;
private final List<String> families =
- Arrays.asList(bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
+ Arrays.asList(StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
"First", "Second", "Third",
"Fourth", "Fifth",
"Sixth", "Seventh",
@@ -636,21 +635,21 @@ public void testPrefixedRangeKVs() throws Exception {
// test start with a middle key
startKey = StringUtils.string2Bytes(
- bytes2String(samplePrefix) + "3");
+ StringUtils.bytes2String(samplePrefix) + "3");
rangeKVs = testTable.getRangeKVs(startKey, blockCount, samplePrefix);
assertEquals(2, rangeKVs.size());
// test with a filter
- final KeyPrefixFilter filter1 =
KeyPrefixFilter.newFilter(bytes2String(samplePrefix) + "1");
+ final KeyPrefixFilter filter1 =
KeyPrefixFilter.newFilter(StringUtils.bytes2String(samplePrefix) + "1");
startKey = StringUtils.string2Bytes(
- bytes2String(samplePrefix));
+ StringUtils.bytes2String(samplePrefix));
rangeKVs = testTable.getRangeKVs(startKey, blockCount,
samplePrefix, filter1);
assertEquals(1, rangeKVs.size());
// test start with a non-exist key
startKey = StringUtils.string2Bytes(
- bytes2String(samplePrefix) + 123);
+ StringUtils.bytes2String(samplePrefix) + 123);
rangeKVs = testTable.getRangeKVs(startKey, 10, samplePrefix);
assertEquals(0, rangeKVs.size());
}
@@ -776,77 +775,4 @@ private void populateTable(Table<String, String> table,
}
}
}
-
- @Test
- public void batchDeleteWithRange() throws Exception {
- final Table<byte[], byte[]> testTable = rdbStore.getTable("Fifth");
- try (BatchOperation batch = rdbStore.initBatchOperation()) {
-
- //given
- String keyStr = RandomStringUtils.secure().next(10);
- byte[] startKey = ("1-" + keyStr).getBytes(StandardCharsets.UTF_8);
- byte[] keyInRange1 = ("2-" + keyStr).getBytes(StandardCharsets.UTF_8);
- byte[] keyInRange2 = ("3-" + keyStr).getBytes(StandardCharsets.UTF_8);
- byte[] endKey = ("4-" + keyStr).getBytes(StandardCharsets.UTF_8);
- byte[] value =
- RandomStringUtils.secure().next(10).getBytes(StandardCharsets.UTF_8);
- testTable.put(startKey, value);
- testTable.put(keyInRange1, value);
- testTable.put(keyInRange2, value);
- testTable.put(endKey, value);
- assertNotNull(testTable.get(startKey));
- assertNotNull(testTable.get(keyInRange1));
- assertNotNull(testTable.get(keyInRange2));
- assertNotNull(testTable.get(endKey));
-
- //when
- testTable.deleteRangeWithBatch(batch, startKey, endKey);
- rdbStore.commitBatchOperation(batch);
-
- //then
- assertNull(testTable.get(startKey));
- assertNull(testTable.get(keyInRange1));
- assertNull(testTable.get(keyInRange2));
- assertNotNull(testTable.get(endKey));
- }
- }
-
- @Test
- public void orderOfBatchOperations() throws Exception {
- final Table<byte[], byte[]> testTable = rdbStore.getTable("Fifth");
- try (BatchOperation batch = rdbStore.initBatchOperation()) {
-
- //given
- String keyStr = RandomStringUtils.secure().next(10);
- byte[] startKey = ("1-" + keyStr).getBytes(StandardCharsets.UTF_8);
- byte[] keyInRange1 = ("2-" + keyStr).getBytes(StandardCharsets.UTF_8);
- byte[] endKey = ("3-" + keyStr).getBytes(StandardCharsets.UTF_8);
- byte[] value1 = ("value1-" +
RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8);
- byte[] value2 = ("value2-" +
RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8);
- byte[] value3 = ("value3-" +
RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8);
-
- //when
- testTable.putWithBatch(batch, startKey, value1);
- testTable.putWithBatch(batch, keyInRange1, value1);
- testTable.deleteWithBatch(batch, keyInRange1);
- // ops map key should be <<startKey, endKey>, 1>
- testTable.deleteRangeWithBatch(batch, startKey, endKey);
- testTable.putWithBatch(batch, startKey, value2);
- testTable.putWithBatch(batch, keyInRange1, value2);
- // ops map key is <<startKey, keyInRange1>, 2>.
- testTable.deleteRangeWithBatch(batch, startKey, keyInRange1);
- testTable.putWithBatch(batch, endKey, value1);
- testTable.putWithBatch(batch, endKey, value2);
- // ops map key is <<startKey, endKey>, 3>.
- testTable.deleteRangeWithBatch(batch, startKey, endKey);
- testTable.putWithBatch(batch, startKey, value3);
-
- rdbStore.commitBatchOperation(batch);
-
- //then
- assertEquals(bytes2String(value3),
bytes2String(testTable.get(startKey)));
- assertNull(testTable.get(keyInRange1));
- assertEquals(bytes2String(value2), bytes2String(testTable.get(endKey)));
- }
- }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 77ed8a82487..b488997b522 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -92,6 +92,7 @@
import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint;
import org.apache.hadoop.hdds.utils.db.RocksDatabase;
import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
@@ -583,7 +584,11 @@ private static void
deleteKeysFromTableWithBucketPrefix(OMMetadataManager metada
String endKey = getLexicographicallyHigherString(prefix);
LOG.debug("Deleting key range from {} - startKey: {}, endKey: {}",
table.getName(), prefix, endKey);
- table.deleteRangeWithBatch(batchOperation, prefix, endKey);
+ try (TableIterator<String, String> itr = table.keyIterator(prefix)) {
+ while (itr.hasNext()) {
+ table.deleteWithBatch(batchOperation, itr.next());
+ }
+ }
}
@VisibleForTesting
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]