This is an automated email from the ASF dual-hosted git repository.
swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 329719836d5 HDDS-14237. Simplify ManagedDirectSlice to use ByteBuffers
(#9549)
329719836d5 is described below
commit 329719836d515a25afbfe4b05328476d456387f8
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Fri Dec 26 21:48:42 2025 -0500
HDDS-14237. Simplify ManagedDirectSlice to use ByteBuffers (#9549)
---
.../hdds/utils/db/managed/ManagedDirectSlice.java | 96 +++++-----------------
.../utils/db/managed/TestManagedDirectSlice.java | 42 +---------
.../hadoop/hdds/utils/db/RDBSstFileWriter.java | 8 +-
3 files changed, 25 insertions(+), 121 deletions(-)
diff --git
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java
index 52b8b5aabb1..8842e4a9bf0 100644
---
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java
+++
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java
@@ -17,98 +17,40 @@
package org.apache.hadoop.hdds.utils.db.managed;
-import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB.NOT_FOUND;
+import static
org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.track;
-import com.google.common.annotations.VisibleForTesting;
import java.nio.ByteBuffer;
-import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
-import org.apache.ratis.util.function.CheckedConsumer;
-import org.apache.ratis.util.function.CheckedFunction;
+import java.util.Objects;
+import org.apache.ratis.util.UncheckedAutoCloseable;
import org.rocksdb.DirectSlice;
-import org.rocksdb.RocksDBException;
/**
- * ManagedDirectSlice is a managed wrapper around the DirectSlice object. It
ensures
- * proper handling of native resources associated with DirectSlice, utilizing
- * the ManagedObject infrastructure to prevent resource leaks. It works in
tandem
- * with a ByteBuffer, which acts as the data source for the managed slice.
+ * Managed {@link DirectSlice} for leak detection.
*
- * This class overrides certain operations to tightly control the lifecycle and
- * behavior of the DirectSlice it manages. It specifically caters to use cases
- * where the slice is used in RocksDB operations, providing methods for safely
- * interacting with the slice for put-like operations.
+ * The {@link DirectSlice} class is designed to handle specific memory
slicing operations while ensuring that the
+ * provided ByteBuffer’s constraints are respected.
*/
-public class ManagedDirectSlice extends ManagedObject<DirectSlice> {
+public class ManagedDirectSlice extends DirectSlice {
- private final ByteBuffer data;
+ private final UncheckedAutoCloseable leakTracker = track(this);
- public ManagedDirectSlice(ByteBuffer data) {
- super(new DirectSlice(data));
- this.data = data;
+ public ManagedDirectSlice(ByteBuffer buffer) {
+ super(Objects.requireNonNull(buffer, "buffer == null").slice(),
buffer.remaining());
}
@Override
- public DirectSlice get() {
- throw new UnsupportedOperationException("get() is not supported.");
+ public synchronized long getNativeHandle() {
+ return super.getNativeHandle();
}
- /**
- * Executes the provided consumer on the internal {@code DirectSlice} after
- * adjusting the slice's prefix and length based on the current position and
- * remaining data in the associated {@code ByteBuffer}. If the consumer
throws
- * a {@code RocksDBException}, it is wrapped and rethrown as a
- * {@code RocksDatabaseException}.
- *
- * @param consumer the operation to perform on the managed {@code
DirectSlice}.
- * The consumer must handle a {@code DirectSlice} and may
throw
- * a {@code RocksDBException}.
- * @throws RocksDatabaseException if the provided consumer throws a
- * {@code RocksDBException}.
- */
- public void putFromBuffer(CheckedConsumer<DirectSlice, ? extends
RocksDBException> consumer)
- throws RocksDatabaseException {
- DirectSlice slice = super.get();
- slice.removePrefix(this.data.position());
- slice.setLength(this.data.remaining());
- try {
- consumer.accept(slice);
- } catch (RocksDBException e) {
- throw new RocksDatabaseException("Error while performing put op with
directSlice", e);
- }
- data.position(data.limit());
- }
-
- /**
- * Retrieves data from the associated DirectSlice into the buffer managed by
this instance.
- * The supplied function is applied to the DirectSlice to process the data,
and the method
- * adjusts the buffer's position and limit based on the result.
- *
- * @param function a function that operates on a DirectSlice and returns the
number
- * of bytes written to the buffer, or a specific "not found"
value
- * if the operation fails. The function may throw a
RocksDBException.
- * @return the number of bytes written to the buffer if successful, or a
specific
- * "not found" value indicating the requested data was absent.
- * @throws RocksDatabaseException if the provided function throws a
RocksDBException,
- * wrapping the original exception.
- */
- public int getToBuffer(CheckedFunction<DirectSlice, Integer, ? extends
RocksDBException> function)
- throws RocksDatabaseException {
- DirectSlice slice = super.get();
- slice.removePrefix(this.data.position());
- slice.setLength(this.data.remaining());
+ @Override
+ protected void disposeInternal() {
+ // RocksMutableObject.close is final thus can't be decorated.
+ // So, we decorate disposeInternal instead to track closure.
try {
- int lengthWritten = function.apply(slice);
- if (lengthWritten != NOT_FOUND) {
- this.data.limit(Math.min(data.limit(), data.position() +
lengthWritten));
- }
- return lengthWritten;
- } catch (RocksDBException e) {
- throw new RocksDatabaseException("Error while performing put op with
directSlice", e);
+ super.disposeInternal();
+ } finally {
+ leakTracker.close();
}
}
-
- @VisibleForTesting
- DirectSlice getDirectSlice() {
- return super.get();
- }
}
diff --git
a/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java
b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java
index c332d32704f..f13aba39a7b 100644
---
a/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java
+++
b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java
@@ -22,11 +22,8 @@
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.commons.lang3.RandomUtils;
-import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
-import org.rocksdb.DirectSlice;
/**
* Tests for ManagedDirectSlice.
@@ -39,48 +36,15 @@ public class TestManagedDirectSlice {
@ParameterizedTest
@CsvSource({"0, 1024", "1024, 1024", "512, 1024", "0, 100", "10, 512", "0,
0"})
- public void testManagedDirectSliceWithOffsetMovedAheadByteBuffer(int offset,
int numberOfBytesWritten)
- throws RocksDatabaseException {
+ public void testManagedDirectSliceWithOffset(int offset, int
numberOfBytesWritten) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
byte[] randomBytes = RandomUtils.secure().nextBytes(numberOfBytesWritten);
byteBuffer.put(randomBytes);
byteBuffer.flip();
+ byteBuffer.position(offset);
try (ManagedDirectSlice directSlice = new ManagedDirectSlice(byteBuffer);
ManagedSlice slice = new ManagedSlice(Arrays.copyOfRange(randomBytes,
offset, numberOfBytesWritten))) {
- byteBuffer.position(offset);
- directSlice.putFromBuffer((ds) -> {
- DirectSlice directSliceFromByteBuffer = directSlice.getDirectSlice();
- assertEquals(numberOfBytesWritten - offset, ds.size());
- assertEquals(0, directSliceFromByteBuffer.compare(slice));
- assertEquals(0, slice.compare(directSliceFromByteBuffer));
- });
- Assertions.assertEquals(numberOfBytesWritten, byteBuffer.position());
- }
- }
-
- @ParameterizedTest
- @CsvSource({"0, 1024, 512", "1024, 1024, 5", "512, 1024, 600", "0, 100, 80",
"10, 512, 80", "0, 0, 10",
- "100, 256, -1"})
- public void testManagedDirectSliceWithOpPutToByteBuffer(int offset, int
maxNumberOfBytesWrite,
- int numberOfBytesToWrite) throws RocksDatabaseException {
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
- byte[] randomBytes = RandomUtils.secure().nextBytes(offset);
- byteBuffer.put(randomBytes);
- try (ManagedDirectSlice directSlice = new ManagedDirectSlice(byteBuffer)) {
- byteBuffer.position(offset);
- byteBuffer.limit(Math.min(offset + maxNumberOfBytesWrite, 1024));
- assertEquals(numberOfBytesToWrite, directSlice.getToBuffer((ds) -> {
- assertEquals(byteBuffer.remaining(), ds.size());
- return numberOfBytesToWrite;
- }));
- Assertions.assertEquals(offset, byteBuffer.position());
- if (numberOfBytesToWrite == -1) {
- assertEquals(offset + maxNumberOfBytesWrite, byteBuffer.limit());
- } else {
- Assertions.assertEquals(Math.min(Math.min(offset +
numberOfBytesToWrite, 1024), maxNumberOfBytesWrite),
- byteBuffer.limit());
- }
-
+ assertEquals(slice, directSlice);
}
}
}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
index 14f553a9b18..a689e9fdea1 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
@@ -85,11 +85,9 @@ public void delete(byte[] key) throws RocksDatabaseException
{
public void delete(CodecBuffer key) throws RocksDatabaseException {
try (ManagedDirectSlice slice = new
ManagedDirectSlice(key.asReadOnlyByteBuffer())) {
- slice.putFromBuffer(directSlice -> {
- sstFileWriter.delete(directSlice);
- keyCounter.incrementAndGet();
- });
- } catch (RocksDatabaseException e) {
+ sstFileWriter.delete(slice);
+ keyCounter.incrementAndGet();
+ } catch (RocksDBException e) {
closeOnFailure();
throw new RocksDatabaseException("Failed to delete key (length=" +
key.readableBytes()
+ "), sstFile=" + sstFile.getAbsolutePath(), e);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]