This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 66cb1e72606 HDDS-14185. Fix ManagedDirectSlice usage as wrapper for
ByteBuffer class (#9508)
66cb1e72606 is described below
commit 66cb1e72606e206352f54282e7f19461ad7761bc
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Fri Dec 19 19:18:28 2025 -0500
HDDS-14185. Fix ManagedDirectSlice usage as wrapper for ByteBuffer class
(#9508)
---
.../hdds/utils/db/managed/ManagedDirectSlice.java | 93 +++++++++++++++++----
.../hdds/utils/db/managed/ManagedObject.java | 4 +-
.../utils/db/managed/TestManagedDirectSlice.java | 86 +++++++++++++++++++
.../hadoop/hdds/utils/db/RDBSstFileWriter.java | 8 +-
.../hadoop/hdds/utils/db/TestRDBSstFileWriter.java | 97 ++++++++++++++++++++++
5 files changed, 268 insertions(+), 20 deletions(-)
diff --git
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java
index b68a250cb8b..52b8b5aabb1 100644
---
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java
+++
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java
@@ -17,35 +17,98 @@
package org.apache.hadoop.hdds.utils.db.managed;
-import static
org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.track;
+import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB.NOT_FOUND;
+import com.google.common.annotations.VisibleForTesting;
import java.nio.ByteBuffer;
-import org.apache.ratis.util.UncheckedAutoCloseable;
+import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
+import org.apache.ratis.util.function.CheckedConsumer;
+import org.apache.ratis.util.function.CheckedFunction;
import org.rocksdb.DirectSlice;
+import org.rocksdb.RocksDBException;
/**
- * Managed Direct Slice.
+ * ManagedDirectSlice is a managed wrapper around the DirectSlice object. It
ensures
+ * proper handling of native resources associated with DirectSlice, utilizing
+ * the ManagedObject infrastructure to prevent resource leaks. It works in
tandem
+ * with a ByteBuffer, which acts as the data source for the managed slice.
+ *
+ * This class overrides certain operations to tightly control the lifecycle and
+ * behavior of the DirectSlice it manages. It specifically caters to use cases
+ * where the slice is used in RocksDB operations, providing methods for safely
+ * interacting with the slice for put-like operations.
*/
-public class ManagedDirectSlice extends DirectSlice {
- private final UncheckedAutoCloseable leakTracker = track(this);
+public class ManagedDirectSlice extends ManagedObject<DirectSlice> {
+
+ private final ByteBuffer data;
public ManagedDirectSlice(ByteBuffer data) {
- super(data);
+ super(new DirectSlice(data));
+ this.data = data;
}
@Override
- public synchronized long getNativeHandle() {
- return super.getNativeHandle();
+ public DirectSlice get() {
+ throw new UnsupportedOperationException("get() is not supported.");
}
- @Override
- protected void disposeInternal() {
- // RocksMutableObject.close is final thus can't be decorated.
- // So, we decorate disposeInternal instead to track closure.
+ /**
+ * Executes the provided consumer on the internal {@code DirectSlice} after
+ * adjusting the slice's prefix and length based on the current position and
+ * remaining data in the associated {@code ByteBuffer}. If the consumer
throws
+ * a {@code RocksDBException}, it is wrapped and rethrown as a
+ * {@code RocksDatabaseException}.
+ *
+ * @param consumer the operation to perform on the managed {@code
DirectSlice}.
+ * The consumer must handle a {@code DirectSlice} and may
throw
+ * a {@code RocksDBException}.
+ * @throws RocksDatabaseException if the provided consumer throws a
+ * {@code RocksDBException}.
+ */
+ public void putFromBuffer(CheckedConsumer<DirectSlice, ? extends
RocksDBException> consumer)
+ throws RocksDatabaseException {
+ DirectSlice slice = super.get();
+ slice.removePrefix(this.data.position());
+ slice.setLength(this.data.remaining());
try {
- super.disposeInternal();
- } finally {
- leakTracker.close();
+ consumer.accept(slice);
+ } catch (RocksDBException e) {
+ throw new RocksDatabaseException("Error while performing put op with
directSlice", e);
}
+ data.position(data.limit());
+ }
+
+ /**
+ * Retrieves data from the associated DirectSlice into the buffer managed by
this instance.
+ * The supplied function is applied to the DirectSlice to process the data,
and the method
+ * adjusts the buffer's position and limit based on the result.
+ *
+ * @param function a function that operates on a DirectSlice and returns the
number
+ * of bytes written to the buffer, or a specific "not found"
value
+ * if the operation fails. The function may throw a
RocksDBException.
+ * @return the number of bytes written to the buffer if successful, or a
specific
+ * "not found" value indicating the requested data was absent.
+ * @throws RocksDatabaseException if the provided function throws a
RocksDBException,
+ * wrapping the original exception.
+ */
+ public int getToBuffer(CheckedFunction<DirectSlice, Integer, ? extends
RocksDBException> function)
+ throws RocksDatabaseException {
+ DirectSlice slice = super.get();
+ slice.removePrefix(this.data.position());
+ slice.setLength(this.data.remaining());
+ try {
+ int lengthWritten = function.apply(slice);
+ if (lengthWritten != NOT_FOUND) {
+ this.data.limit(Math.min(data.limit(), data.position() +
lengthWritten));
+ }
+ return lengthWritten;
+ } catch (RocksDBException e) {
+ throw new RocksDatabaseException("Error while performing put op with
directSlice", e);
+ }
+ }
+
+ @VisibleForTesting
+ DirectSlice getDirectSlice() {
+ return super.get();
}
}
diff --git
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedObject.java
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedObject.java
index df3a23b4bf8..a5c77748ec5 100644
---
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedObject.java
+++
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedObject.java
@@ -20,13 +20,13 @@
import static
org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.track;
import org.apache.ratis.util.UncheckedAutoCloseable;
-import org.rocksdb.RocksObject;
+import org.rocksdb.AbstractNativeReference;
/**
* General template for a managed RocksObject.
* @param <T>
*/
-class ManagedObject<T extends RocksObject> implements AutoCloseable {
+class ManagedObject<T extends AbstractNativeReference> implements
AutoCloseable {
private final T original;
private final UncheckedAutoCloseable leakTracker = track(this);
diff --git
a/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java
b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java
new file mode 100644
index 00000000000..c332d32704f
--- /dev/null
+++
b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db.managed;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.rocksdb.DirectSlice;
+
+/**
+ * Tests for ManagedDirectSlice.
+ */
+public class TestManagedDirectSlice {
+
+ static {
+ ManagedRocksObjectUtils.loadRocksDBLibrary();
+ }
+
+ @ParameterizedTest
+ @CsvSource({"0, 1024", "1024, 1024", "512, 1024", "0, 100", "10, 512", "0,
0"})
+ public void testManagedDirectSliceWithOffsetMovedAheadByteBuffer(int offset,
int numberOfBytesWritten)
+ throws RocksDatabaseException {
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
+ byte[] randomBytes = RandomUtils.secure().nextBytes(numberOfBytesWritten);
+ byteBuffer.put(randomBytes);
+ byteBuffer.flip();
+ try (ManagedDirectSlice directSlice = new ManagedDirectSlice(byteBuffer);
+ ManagedSlice slice = new ManagedSlice(Arrays.copyOfRange(randomBytes,
offset, numberOfBytesWritten))) {
+ byteBuffer.position(offset);
+ directSlice.putFromBuffer((ds) -> {
+ DirectSlice directSliceFromByteBuffer = directSlice.getDirectSlice();
+ assertEquals(numberOfBytesWritten - offset, ds.size());
+ assertEquals(0, directSliceFromByteBuffer.compare(slice));
+ assertEquals(0, slice.compare(directSliceFromByteBuffer));
+ });
+ Assertions.assertEquals(numberOfBytesWritten, byteBuffer.position());
+ }
+ }
+
+ @ParameterizedTest
+ @CsvSource({"0, 1024, 512", "1024, 1024, 5", "512, 1024, 600", "0, 100, 80",
"10, 512, 80", "0, 0, 10",
+ "100, 256, -1"})
+ public void testManagedDirectSliceWithOpPutToByteBuffer(int offset, int
maxNumberOfBytesWrite,
+ int numberOfBytesToWrite) throws RocksDatabaseException {
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
+ byte[] randomBytes = RandomUtils.secure().nextBytes(offset);
+ byteBuffer.put(randomBytes);
+ try (ManagedDirectSlice directSlice = new ManagedDirectSlice(byteBuffer)) {
+ byteBuffer.position(offset);
+ byteBuffer.limit(Math.min(offset + maxNumberOfBytesWrite, 1024));
+ assertEquals(numberOfBytesToWrite, directSlice.getToBuffer((ds) -> {
+ assertEquals(byteBuffer.remaining(), ds.size());
+ return numberOfBytesToWrite;
+ }));
+ Assertions.assertEquals(offset, byteBuffer.position());
+ if (numberOfBytesToWrite == -1) {
+ assertEquals(offset + maxNumberOfBytesWrite, byteBuffer.limit());
+ } else {
+ Assertions.assertEquals(Math.min(Math.min(offset +
numberOfBytesToWrite, 1024), maxNumberOfBytesWrite),
+ byteBuffer.limit());
+ }
+
+ }
+ }
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
similarity index 96%
rename from
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
rename to
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
index a689e9fdea1..14f553a9b18 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
@@ -85,9 +85,11 @@ public void delete(byte[] key) throws RocksDatabaseException
{
public void delete(CodecBuffer key) throws RocksDatabaseException {
try (ManagedDirectSlice slice = new
ManagedDirectSlice(key.asReadOnlyByteBuffer())) {
- sstFileWriter.delete(slice);
- keyCounter.incrementAndGet();
- } catch (RocksDBException e) {
+ slice.putFromBuffer(directSlice -> {
+ sstFileWriter.delete(directSlice);
+ keyCounter.incrementAndGet();
+ });
+ } catch (RocksDatabaseException e) {
closeOnFailure();
throw new RocksDatabaseException("Failed to delete key (length=" +
key.readableBytes()
+ "), sstFile=" + sstFile.getAbsolutePath(), e);
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBSstFileWriter.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBSstFileWriter.java
new file mode 100644
index 00000000000..c3129da85c4
--- /dev/null
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBSstFileWriter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import static
org.apache.hadoop.hdds.utils.NativeConstants.ROCKS_TOOLS_NATIVE_PROPERTY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileIterator;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.io.TempDir;
+import org.rocksdb.RocksDBException;
+
+/**
+ * Test for RDBSstFileWriter.
+ */
+public class TestRDBSstFileWriter {
+
+ @TempDir
+ private Path path;
+
+ @EnabledIfSystemProperty(named = ROCKS_TOOLS_NATIVE_PROPERTY, matches =
"true")
+ @Test
+ public void testSstFileTombstoneCreationWithCodecBufferReuse() throws
IOException, RocksDBException {
+ ManagedRawSSTFileReader.tryLoadLibrary();
+ Path sstPath = path.resolve("test.sst").toAbsolutePath();
+ try (CodecBuffer codecBuffer = CodecBuffer.allocateDirect(1024);
+ RDBSstFileWriter sstFileWriter = new
RDBSstFileWriter(sstPath.toFile());
+ CodecBuffer emptyBuffer = CodecBuffer.getEmptyBuffer()) {
+ Queue<String> keys = new LinkedList<>(ImmutableList.of("key1_renamed",
"key1", "key1_renamed"));
+ PutToByteBuffer<IOException> putFunc = byteBuffer -> {
+ byte[] keyBytes = StringUtils.string2Bytes(keys.peek());
+ byteBuffer.put(keyBytes);
+ return keyBytes.length;
+ };
+ int len = codecBuffer.putFromSource(putFunc);
+ assertEquals(codecBuffer.readableBytes(), len);
+ assertEquals(keys.poll(),
StringUtils.bytes2String(codecBuffer.getArray()));
+ codecBuffer.clear();
+ int idx = 0;
+ while (!keys.isEmpty()) {
+ codecBuffer.putFromSource(putFunc);
+ byte[] keyBytes = new byte[codecBuffer.readableBytes()];
+ assertEquals(keyBytes.length,
codecBuffer.getInputStream().read(keyBytes));
+ if (idx++ % 2 == 0) {
+ sstFileWriter.delete(codecBuffer);
+ } else {
+ sstFileWriter.put(codecBuffer, emptyBuffer);
+ }
+ assertEquals(keys.poll(),
StringUtils.bytes2String(codecBuffer.getArray()));
+ codecBuffer.clear();
+ }
+ }
+ Assertions.assertTrue(sstPath.toFile().exists());
+ try (ManagedOptions options = new ManagedOptions();
+ ManagedRawSSTFileReader<ManagedRawSSTFileIterator.KeyValue> reader =
new ManagedRawSSTFileReader<>(options,
+ sstPath.toString(), 1024);
+ ManagedRawSSTFileIterator<ManagedRawSSTFileIterator.KeyValue> itr =
+ reader.newIterator(kv -> kv, null, null,
IteratorType.KEY_AND_VALUE)) {
+
+ int idx = 0;
+ List<String> keys = ImmutableList.of("key1", "key1_rename");
+ while (itr.hasNext()) {
+ ManagedRawSSTFileIterator.KeyValue kv = itr.next();
+ assertEquals(idx, kv.getType());
+ assertEquals(keys.get(idx), keys.get(idx++));
+ assertEquals(0, kv.getValue().length);
+ }
+ assertEquals(2, idx);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]