This is an automated email from the ASF dual-hosted git repository.
swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 64bb019407b HDDS-14238. Move RDBBatchOperation Byte comparison to
native comparison for optimization (#9550)
64bb019407b is described below
commit 64bb019407bc001fbe2c6339141908c3e7d59b8f
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Mon Dec 29 13:19:37 2025 -0500
HDDS-14238. Move RDBBatchOperation Byte comparison to native comparison for
optimization (#9550)
---
.../hadoop/hdds/utils/db/RDBBatchOperation.java | 70 +++++++++++++---------
.../org/apache/hadoop/hdds/utils/db/TestCodec.java | 15 ++---
pom.xml | 1 +
3 files changed, 51 insertions(+), 35 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
index 3681959b4b4..de181ae0c8d 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
@@ -17,7 +17,7 @@
package org.apache.hadoop.hdds.utils.db;
-import static org.apache.hadoop.hdds.StringUtils.bytes2String;
+import static org.apache.ratis.util.Preconditions.assertTrue;
import com.google.common.base.Preconditions;
import java.io.Closeable;
@@ -30,10 +30,13 @@
import java.util.function.Supplier;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedDirectSlice;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedSlice;
import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch;
import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions;
import org.apache.ratis.util.TraditionalBinaryPrefix;
import org.apache.ratis.util.UncheckedAutoCloseable;
+import org.rocksdb.AbstractSlice;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,26 +83,26 @@ private static String countSize2String(int count, long
size) {
* To implement {@link #equals(Object)} and {@link #hashCode()}
* based on the contents of the bytes.
*/
- static final class Bytes {
- private final byte[] array;
- private final CodecBuffer buffer;
+ static final class Bytes implements Closeable {
+ private final AbstractSlice<?> slice;
/** Cache the hash value. */
private final int hash;
- Bytes(CodecBuffer buffer) {
- this.array = null;
- this.buffer = Objects.requireNonNull(buffer, "buffer == null");
- this.hash = buffer.asReadOnlyByteBuffer().hashCode();
+ static Bytes newBytes(CodecBuffer buffer) {
+ return buffer.isDirect() ? new Bytes(buffer.asReadOnlyByteBuffer()) :
new Bytes(buffer.getArray());
}
- Bytes(byte[] array) {
- this.array = array;
- this.buffer = null;
- this.hash = ByteBuffer.wrap(array).hashCode();
+ Bytes(ByteBuffer buffer) {
+ Objects.requireNonNull(buffer, "buffer == null");
+ assertTrue(buffer.isDirect(), "buffer must be direct");
+ this.slice = new ManagedDirectSlice(buffer);
+ this.hash = buffer.hashCode();
}
- ByteBuffer asReadOnlyByteBuffer() {
- return buffer.asReadOnlyByteBuffer();
+ Bytes(byte[] array) {
+ Objects.requireNonNull(array, "array == null");
+ this.slice = new ManagedSlice(array);
+ this.hash = ByteBuffer.wrap(array).hashCode();
}
@Override
@@ -113,11 +116,7 @@ public boolean equals(Object obj) {
if (this.hash != that.hash) {
return false;
}
- final ByteBuffer thisBuf = this.array != null ?
- ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer();
- final ByteBuffer thatBuf = that.array != null ?
- ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer();
- return thisBuf.equals(thatBuf);
+ return slice.equals(that.slice);
}
@Override
@@ -127,12 +126,21 @@ public int hashCode() {
@Override
public String toString() {
- return array != null ? bytes2String(array)
- : bytes2String(asReadOnlyByteBuffer());
+ return slice.toString();
+ }
+
+ @Override
+ public void close() {
+ slice.close();
}
}
private abstract static class Op implements Closeable {
+ private final Bytes keyBytes;
+
+ private Op(Bytes keyBytes) {
+ this.keyBytes = keyBytes;
+ }
abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws
RocksDatabaseException;
@@ -148,6 +156,9 @@ int totalLength() {
@Override
public void close() {
+ if (keyBytes != null) {
+ keyBytes.close();
+ }
}
}
@@ -157,7 +168,8 @@ public void close() {
private static final class DeleteOp extends Op {
private final byte[] key;
- private DeleteOp(byte[] key) {
+ private DeleteOp(byte[] key, Bytes keyBytes) {
+ super(Objects.requireNonNull(keyBytes, "keyBytes == null"));
this.key = Objects.requireNonNull(key, "key == null");
}
@@ -180,7 +192,8 @@ private final class PutOp extends Op {
private final CodecBuffer value;
private final AtomicBoolean closed = new AtomicBoolean(false);
- private PutOp(CodecBuffer key, CodecBuffer value) {
+ private PutOp(CodecBuffer key, CodecBuffer value, Bytes keyBytes) {
+ super(keyBytes);
this.key = key;
this.value = value;
}
@@ -217,7 +230,8 @@ private static final class ByteArrayPutOp extends Op {
private final byte[] key;
private final byte[] value;
- private ByteArrayPutOp(byte[] key, byte[] value) {
+ private ByteArrayPutOp(byte[] key, byte[] value, Bytes keyBytes) {
+ super(keyBytes);
this.key = Objects.requireNonNull(key, "key == null");
this.value = Objects.requireNonNull(value, "value == null");
}
@@ -323,20 +337,20 @@ void overwriteIfExists(Bytes key, Op op) {
void put(CodecBuffer key, CodecBuffer value) {
putCount++;
// always release the key with the value
- Bytes keyBytes = new Bytes(key);
- overwriteIfExists(keyBytes, new PutOp(key, value));
+ Bytes keyBytes = Bytes.newBytes(key);
+ overwriteIfExists(keyBytes, new PutOp(key, value, keyBytes));
}
void put(byte[] key, byte[] value) {
putCount++;
Bytes keyBytes = new Bytes(key);
- overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value));
+ overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value, keyBytes));
}
void delete(byte[] key) {
delCount++;
Bytes keyBytes = new Bytes(key);
- overwriteIfExists(keyBytes, new DeleteOp(key));
+ overwriteIfExists(keyBytes, new DeleteOp(key, keyBytes));
}
String putString(int keySize, int valueSize) {
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java
index f695f286405..4ce46b97cf8 100644
---
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java
@@ -19,6 +19,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.utils.db.CodecTestUtil.gc;
+import static org.apache.hadoop.hdds.utils.db.RDBBatchOperation.Bytes.newBytes;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -35,6 +36,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation.Bytes;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.slf4j.Logger;
@@ -49,6 +51,7 @@ public final class TestCodec {
static {
CodecBuffer.enableLeakDetection();
+ ManagedRocksObjectUtils.loadRocksDBLibrary();
}
@Test
@@ -289,17 +292,15 @@ public void testUuidCodec() throws Exception {
public static <T> void runTest(Codec<T> codec, T original,
Integer serializedSize) throws Exception {
CodecTestUtil.runTest(codec, original, serializedSize, null);
- runTestBytes(original, codec);
+ runTestBytes(original, codec, CodecBuffer.Allocator.HEAP);
+ runTestBytes(original, codec, CodecBuffer.Allocator.DIRECT);
}
- static <T> void runTestBytes(T object, Codec<T> codec) throws IOException {
+ static <T> void runTestBytes(T object, Codec<T> codec, CodecBuffer.Allocator
allocator) throws IOException {
final byte[] array = codec.toPersistedFormat(object);
final Bytes fromArray = new Bytes(array);
-
- try (CodecBuffer buffer = codec.toCodecBuffer(object,
- CodecBuffer.Allocator.HEAP)) {
- final Bytes fromBuffer = new Bytes(buffer);
-
+ try (CodecBuffer buffer = codec.toCodecBuffer(object, allocator)) {
+ final Bytes fromBuffer = newBytes(buffer);
assertEquals(fromArray.hashCode(), fromBuffer.hashCode());
assertEquals(fromArray, fromBuffer);
assertEquals(fromBuffer, fromArray);
diff --git a/pom.xml b/pom.xml
index 88c60ed8a31..813226c07b0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1959,6 +1959,7 @@
<allowedImport>org.rocksdb.ColumnFamilyHandle</allowedImport>
<allowedImport>org.rocksdb.Env</allowedImport>
<allowedImport>org.rocksdb.Statistics</allowedImport>
+ <allowedImport>org.rocksdb.AbstractSlice</allowedImport>
<!-- Allow RocksDB constants and static methods to be
used. -->
<allowedImport>org.rocksdb.RocksDB.*</allowedImport>
</allowedImports>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]