This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 66cb1e72606 HDDS-14185. Fix ManagedDirectSlice usage as wrapper for 
ByteBuffer class (#9508)
66cb1e72606 is described below

commit 66cb1e72606e206352f54282e7f19461ad7761bc
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Fri Dec 19 19:18:28 2025 -0500

    HDDS-14185. Fix ManagedDirectSlice usage as wrapper for ByteBuffer class 
(#9508)
---
 .../hdds/utils/db/managed/ManagedDirectSlice.java  | 93 +++++++++++++++++----
 .../hdds/utils/db/managed/ManagedObject.java       |  4 +-
 .../utils/db/managed/TestManagedDirectSlice.java   | 86 +++++++++++++++++++
 .../hadoop/hdds/utils/db/RDBSstFileWriter.java     |  8 +-
 .../hadoop/hdds/utils/db/TestRDBSstFileWriter.java | 97 ++++++++++++++++++++++
 5 files changed, 268 insertions(+), 20 deletions(-)

diff --git 
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java
 
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java
index b68a250cb8b..52b8b5aabb1 100644
--- 
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java
+++ 
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java
@@ -17,35 +17,98 @@
 
 package org.apache.hadoop.hdds.utils.db.managed;
 
-import static 
org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.track;
+import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB.NOT_FOUND;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.nio.ByteBuffer;
-import org.apache.ratis.util.UncheckedAutoCloseable;
+import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
+import org.apache.ratis.util.function.CheckedConsumer;
+import org.apache.ratis.util.function.CheckedFunction;
 import org.rocksdb.DirectSlice;
+import org.rocksdb.RocksDBException;
 
 /**
- * Managed Direct Slice.
+ * ManagedDirectSlice is a managed wrapper around the DirectSlice object. It 
ensures
+ * proper handling of native resources associated with DirectSlice, utilizing
+ * the ManagedObject infrastructure to prevent resource leaks. It works in 
tandem
+ * with a ByteBuffer, which acts as the data source for the managed slice.
+ *
+ * This class overrides certain operations to tightly control the lifecycle and
+ * behavior of the DirectSlice it manages. It specifically caters to use cases
+ * where the slice is used in RocksDB operations, providing methods for safely
+ * interacting with the slice for put-like operations.
  */
-public class ManagedDirectSlice extends DirectSlice {
-  private final UncheckedAutoCloseable leakTracker = track(this);
+public class ManagedDirectSlice extends ManagedObject<DirectSlice> {
+
+  private final ByteBuffer data;
 
   public ManagedDirectSlice(ByteBuffer data) {
-    super(data);
+    super(new DirectSlice(data));
+    this.data = data;
   }
 
   @Override
-  public synchronized long getNativeHandle() {
-    return super.getNativeHandle();
+  public DirectSlice get() {
+    throw new UnsupportedOperationException("get() is not supported.");
   }
 
-  @Override
-  protected void disposeInternal() {
-    // RocksMutableObject.close is final thus can't be decorated.
-    // So, we decorate disposeInternal instead to track closure.
+  /**
+   * Executes the provided consumer on the internal {@code DirectSlice} after
+   * adjusting the slice's prefix and length based on the current position and
+   * remaining data in the associated {@code ByteBuffer}. If the consumer 
throws
+   * a {@code RocksDBException}, it is wrapped and rethrown as a
+   * {@code RocksDatabaseException}.
+   *
+   * @param consumer the operation to perform on the managed {@code 
DirectSlice}.
+   *                 The consumer must handle a {@code DirectSlice} and may 
throw
+   *                 a {@code RocksDBException}.
+   * @throws RocksDatabaseException if the provided consumer throws a
+   *                                {@code RocksDBException}.
+   */
+  public void putFromBuffer(CheckedConsumer<DirectSlice, ? extends 
RocksDBException> consumer)
+      throws RocksDatabaseException {
+    DirectSlice slice = super.get();
+    slice.removePrefix(this.data.position());
+    slice.setLength(this.data.remaining());
     try {
-      super.disposeInternal();
-    } finally {
-      leakTracker.close();
+      consumer.accept(slice);
+    } catch (RocksDBException e) {
+      throw new RocksDatabaseException("Error while performing put op with 
directSlice", e);
     }
+    data.position(data.limit());
+  }
+
+  /**
+   * Retrieves data from the associated DirectSlice into the buffer managed by 
this instance.
+   * The supplied function is applied to the DirectSlice to process the data, 
and the method
+   * adjusts the buffer's position and limit based on the result.
+   *
+   * @param function a function that operates on a DirectSlice and returns the 
number
+   *                 of bytes written to the buffer, or a specific "not found" 
value
+   *                 if the operation fails. The function may throw a 
RocksDBException.
+   * @return the number of bytes written to the buffer if successful, or a 
specific
+   *         "not found" value indicating the requested data was absent.
+   * @throws RocksDatabaseException if the provided function throws a 
RocksDBException,
+   *                                wrapping the original exception.
+   */
+  public int getToBuffer(CheckedFunction<DirectSlice, Integer, ? extends 
RocksDBException> function)
+      throws RocksDatabaseException {
+    DirectSlice slice = super.get();
+    slice.removePrefix(this.data.position());
+    slice.setLength(this.data.remaining());
+    try {
+      int lengthWritten = function.apply(slice);
+      if (lengthWritten != NOT_FOUND) {
+        this.data.limit(Math.min(data.limit(), data.position() + 
lengthWritten));
+      }
+      return lengthWritten;
+    } catch (RocksDBException e) {
+      throw new RocksDatabaseException("Error while performing put op with 
directSlice", e);
+    }
+  }
+
+  @VisibleForTesting
+  DirectSlice getDirectSlice() {
+    return super.get();
   }
 }
diff --git 
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedObject.java
 
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedObject.java
index df3a23b4bf8..a5c77748ec5 100644
--- 
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedObject.java
+++ 
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedObject.java
@@ -20,13 +20,13 @@
 import static 
org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.track;
 
 import org.apache.ratis.util.UncheckedAutoCloseable;
-import org.rocksdb.RocksObject;
+import org.rocksdb.AbstractNativeReference;
 
 /**
  * General template for a managed RocksObject.
  * @param <T>
  */
-class ManagedObject<T extends RocksObject> implements AutoCloseable {
+class ManagedObject<T extends AbstractNativeReference> implements 
AutoCloseable {
   private final T original;
   private final UncheckedAutoCloseable leakTracker = track(this);
 
diff --git 
a/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java
 
b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java
new file mode 100644
index 00000000000..c332d32704f
--- /dev/null
+++ 
b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db.managed;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.rocksdb.DirectSlice;
+
+/**
+ * Tests for ManagedDirectSlice.
+ */
+public class TestManagedDirectSlice {
+
+  static {
+    ManagedRocksObjectUtils.loadRocksDBLibrary();
+  }
+
+  @ParameterizedTest
+  @CsvSource({"0, 1024", "1024, 1024", "512, 1024", "0, 100", "10, 512", "0, 
0"})
+  public void testManagedDirectSliceWithOffsetMovedAheadByteBuffer(int offset, 
int numberOfBytesWritten)
+      throws RocksDatabaseException {
+    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
+    byte[] randomBytes = RandomUtils.secure().nextBytes(numberOfBytesWritten);
+    byteBuffer.put(randomBytes);
+    byteBuffer.flip();
+    try (ManagedDirectSlice directSlice = new ManagedDirectSlice(byteBuffer);
+         ManagedSlice slice = new ManagedSlice(Arrays.copyOfRange(randomBytes, 
offset, numberOfBytesWritten))) {
+      byteBuffer.position(offset);
+      directSlice.putFromBuffer((ds) -> {
+        DirectSlice directSliceFromByteBuffer = directSlice.getDirectSlice();
+        assertEquals(numberOfBytesWritten - offset, ds.size());
+        assertEquals(0, directSliceFromByteBuffer.compare(slice));
+        assertEquals(0, slice.compare(directSliceFromByteBuffer));
+      });
+      Assertions.assertEquals(numberOfBytesWritten, byteBuffer.position());
+    }
+  }
+
+  @ParameterizedTest
+  @CsvSource({"0, 1024, 512", "1024, 1024, 5", "512, 1024, 600", "0, 100, 80", 
"10, 512, 80", "0, 0, 10",
+      "100, 256, -1"})
+  public void testManagedDirectSliceWithOpPutToByteBuffer(int offset, int 
maxNumberOfBytesWrite,
+      int numberOfBytesToWrite) throws RocksDatabaseException {
+    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
+    byte[] randomBytes = RandomUtils.secure().nextBytes(offset);
+    byteBuffer.put(randomBytes);
+    try (ManagedDirectSlice directSlice = new ManagedDirectSlice(byteBuffer)) {
+      byteBuffer.position(offset);
+      byteBuffer.limit(Math.min(offset + maxNumberOfBytesWrite, 1024));
+      assertEquals(numberOfBytesToWrite, directSlice.getToBuffer((ds) -> {
+        assertEquals(byteBuffer.remaining(), ds.size());
+        return numberOfBytesToWrite;
+      }));
+      Assertions.assertEquals(offset, byteBuffer.position());
+      if (numberOfBytesToWrite == -1) {
+        assertEquals(offset + maxNumberOfBytesWrite, byteBuffer.limit());
+      } else {
+        Assertions.assertEquals(Math.min(Math.min(offset + 
numberOfBytesToWrite, 1024), maxNumberOfBytesWrite),
+            byteBuffer.limit());
+      }
+
+    }
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
similarity index 96%
rename from 
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
rename to 
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
index a689e9fdea1..14f553a9b18 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
@@ -85,9 +85,11 @@ public void delete(byte[] key) throws RocksDatabaseException 
{
 
   public void delete(CodecBuffer key) throws RocksDatabaseException {
     try (ManagedDirectSlice slice = new 
ManagedDirectSlice(key.asReadOnlyByteBuffer())) {
-      sstFileWriter.delete(slice);
-      keyCounter.incrementAndGet();
-    } catch (RocksDBException e) {
+      slice.putFromBuffer(directSlice -> {
+        sstFileWriter.delete(directSlice);
+        keyCounter.incrementAndGet();
+      });
+    } catch (RocksDatabaseException e) {
       closeOnFailure();
       throw new RocksDatabaseException("Failed to delete key (length=" + 
key.readableBytes()
           + "), sstFile=" + sstFile.getAbsolutePath(), e);
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBSstFileWriter.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBSstFileWriter.java
new file mode 100644
index 00000000000..c3129da85c4
--- /dev/null
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBSstFileWriter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import static 
org.apache.hadoop.hdds.utils.NativeConstants.ROCKS_TOOLS_NATIVE_PROPERTY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileIterator;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.io.TempDir;
+import org.rocksdb.RocksDBException;
+
+/**
+ * Test for RDBSstFileWriter.
+ */
+public class TestRDBSstFileWriter {
+
+  @TempDir
+  private Path path;
+
+  @EnabledIfSystemProperty(named = ROCKS_TOOLS_NATIVE_PROPERTY, matches = 
"true")
+  @Test
+  public void testSstFileTombstoneCreationWithCodecBufferReuse() throws 
IOException, RocksDBException {
+    ManagedRawSSTFileReader.tryLoadLibrary();
+    Path sstPath = path.resolve("test.sst").toAbsolutePath();
+    try (CodecBuffer codecBuffer = CodecBuffer.allocateDirect(1024);
+         RDBSstFileWriter sstFileWriter = new 
RDBSstFileWriter(sstPath.toFile());
+         CodecBuffer emptyBuffer = CodecBuffer.getEmptyBuffer()) {
+      Queue<String> keys = new LinkedList<>(ImmutableList.of("key1_renamed", 
"key1", "key1_renamed"));
+      PutToByteBuffer<IOException> putFunc = byteBuffer -> {
+        byte[] keyBytes = StringUtils.string2Bytes(keys.peek());
+        byteBuffer.put(keyBytes);
+        return keyBytes.length;
+      };
+      int len = codecBuffer.putFromSource(putFunc);
+      assertEquals(codecBuffer.readableBytes(), len);
+      assertEquals(keys.poll(), 
StringUtils.bytes2String(codecBuffer.getArray()));
+      codecBuffer.clear();
+      int idx = 0;
+      while (!keys.isEmpty()) {
+        codecBuffer.putFromSource(putFunc);
+        byte[] keyBytes = new byte[codecBuffer.readableBytes()];
+        assertEquals(keyBytes.length, 
codecBuffer.getInputStream().read(keyBytes));
+        if (idx++ % 2 == 0) {
+          sstFileWriter.delete(codecBuffer);
+        } else {
+          sstFileWriter.put(codecBuffer, emptyBuffer);
+        }
+        assertEquals(keys.poll(), 
StringUtils.bytes2String(codecBuffer.getArray()));
+        codecBuffer.clear();
+      }
+    }
+    Assertions.assertTrue(sstPath.toFile().exists());
+    try (ManagedOptions options = new ManagedOptions();
+         ManagedRawSSTFileReader<ManagedRawSSTFileIterator.KeyValue> reader = 
new ManagedRawSSTFileReader<>(options,
+             sstPath.toString(), 1024);
+         ManagedRawSSTFileIterator<ManagedRawSSTFileIterator.KeyValue> itr =
+             reader.newIterator(kv -> kv, null, null, 
IteratorType.KEY_AND_VALUE)) {
+
+      int idx = 0;
+      List<String> keys = ImmutableList.of("key1", "key1_rename");
+      while (itr.hasNext()) {
+        ManagedRawSSTFileIterator.KeyValue kv = itr.next();
+        assertEquals(idx, kv.getType());
+        assertEquals(keys.get(idx), keys.get(idx++));
+        assertEquals(0, kv.getValue().length);
+      }
+      assertEquals(2, idx);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to