This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 41dd1509c76 HDDS-14117. Add nonStreamRead and fileRead cases to tests.
(#9476)
41dd1509c76 is described below
commit 41dd1509c76ff0a16fe6bda00fdce16cea217996
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Dec 22 04:35:08 2025 -0800
HDDS-14117. Add nonStreamRead and fileRead cases to tests. (#9476)
---
.../hdds/scm/storage/MultipartInputStream.java | 4 +
.../ozone/client/rpc/read/TestStreamRead.java | 201 ++++++++++++++-------
2 files changed, 144 insertions(+), 61 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
index a28658b1ebb..221a48be828 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
@@ -82,6 +82,10 @@ public MultipartInputStream(String keyName,
this.length = streamLength;
}
+ public boolean isStreamBlockInputStream() {
+ return isStreamBlockInputStream;
+ }
+
@Override
protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
throws IOException {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
index 29a43876630..9fc217b6df3 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
@@ -19,12 +19,21 @@
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.file.Files;
import java.security.MessageDigest;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
@@ -33,15 +42,22 @@
import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream;
import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.hadoop.ozone.ClientConfigForTesting;
+import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.KeyInputStream;
+import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.om.TestBucket;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.function.CheckedBiConsumer;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
@@ -52,15 +68,11 @@
public class TestStreamRead {
{
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("com"), Level.ERROR);
-
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ipc"),
Level.ERROR);
-
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.server.http"),
Level.ERROR);
-
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.container"),
Level.ERROR);
-
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.ha"),
Level.ERROR);
-
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.safemode"),
Level.ERROR);
-
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.utils"),
Level.ERROR);
-
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.container.common"),
Level.ERROR);
-
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.om"),
Level.ERROR);
- GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.ratis"),
Level.ERROR);
+ GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org"), Level.ERROR);
+
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("BackgroundPipelineScrubber"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("ExpiredContainerReplicaOpScrubber"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("SCMHATransactionMonitor"),
Level.ERROR);
GenericTestUtils.setLogLevel(LoggerFactory.getLogger(CodecBuffer.class),
Level.ERROR);
}
@@ -68,8 +80,10 @@ public class TestStreamRead {
static final int FLUSH_SIZE = 2 * CHUNK_SIZE; // 2MB
static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE; // 4MB
- static final int BLOCK_SIZE = 64 << 20;
static final SizeInBytes KEY_SIZE = SizeInBytes.valueOf("128M");
+ static final int BLOCK_SIZE = KEY_SIZE.getSizeInt();
+
+ static final String DUMMY_KEY = "dummyKey";
static MiniOzoneCluster newCluster(int bytesPerChecksum) throws Exception {
final OzoneConfiguration conf = new OzoneConfiguration();
@@ -79,9 +93,8 @@ static MiniOzoneCluster newCluster(int bytesPerChecksum)
throws Exception {
conf.setFromObject(config);
conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1);
- conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 5);
+ conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 1);
conf.setQuietMode(true);
- conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 64,
StorageUnit.MB);
ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
.setBlockSize(BLOCK_SIZE)
@@ -114,53 +127,127 @@ void testReadKey256k() throws Exception {
}
void runTestReadKey(SizeInBytes keySize, SizeInBytes bytesPerChecksum)
throws Exception {
+ System.out.println("cluster starting ...");
try (MiniOzoneCluster cluster = newCluster(bytesPerChecksum.getSizeInt()))
{
cluster.waitForClusterToBeReady();
-
System.out.println("cluster ready");
+ final List<HddsDatanodeService> datanodes = cluster.getHddsDatanodes();
+ assertEquals(1, datanodes.size());
+ final HddsDatanodeService datanode = datanodes.get(0);
+
OzoneConfiguration conf = cluster.getConf();
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setStreamReadBlock(true);
- OzoneConfiguration copy = new OzoneConfiguration(conf);
- copy.setFromObject(clientConfig);
+ final OzoneConfiguration steamReadConf = new OzoneConfiguration(conf);
+ steamReadConf.setFromObject(clientConfig);
+
+ clientConfig.setStreamReadBlock(false);
+ final OzoneConfiguration nonSteamReadConf = new OzoneConfiguration(conf);
+ nonSteamReadConf.setFromObject(clientConfig);
- final int n = 5;
- final SizeInBytes writeBufferSize = SizeInBytes.valueOf("8MB");
- final SizeInBytes[] readBufferSizes = {
+ final SizeInBytes[] bufferSizes = {
SizeInBytes.valueOf("32M"),
SizeInBytes.valueOf("8M"),
SizeInBytes.valueOf("1M"),
SizeInBytes.valueOf("4k"),
};
- try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
- final TestBucket bucket = TestBucket.newBuilder(client).build();
+ try (OzoneClient streamReadClient =
OzoneClientFactory.getRpcClient(steamReadConf);
+ OzoneClient nonStreamReadClient =
OzoneClientFactory.getRpcClient(nonSteamReadConf)) {
+ final TestBucket testBucket =
TestBucket.newBuilder(streamReadClient).build();
+ final String volume = testBucket.delegate().getVolumeName();
+ final String bucket = testBucket.delegate().getName();
+ final String keyName = "key0";
+
+ // get the client ready by writing a dummy key
+ createKey(testBucket.delegate(), DUMMY_KEY, SizeInBytes.ONE_KB,
SizeInBytes.ONE_KB);
+
+ for (SizeInBytes bufferSize : bufferSizes) {
+ // create key
+
System.out.println("---------------------------------------------------------");
+ createKey(testBucket.delegate(), keyName, keySize, bufferSize);
+
+ // get block file and generate md5
+ final OmKeyInfo info =
nonStreamReadClient.getProxy().getKeyInfo(volume, bucket, keyName, false);
+ final List<OmKeyLocationInfo> locations =
info.getLatestVersionLocations().getLocationList();
+ assertEquals(1, locations.size());
+ final BlockID blockId = locations.get(0).getBlockID();
+ final ContainerData containerData =
datanode.getDatanodeStateMachine().getContainer().getContainerSet()
+ .getContainer(blockId.getContainerID()).getContainerData();
+ final File blockFile =
ContainerLayoutVersion.FILE_PER_BLOCK.getChunkFile(containerData, blockId,
null);
+ assertTrue(blockFile.exists());
+ assertEquals(BLOCK_SIZE, blockFile.length());
+ final String expectedMd5 = generateMd5(keySize, SizeInBytes.ONE_MB,
blockFile);
- for (int i = 0; i < n; i++) {
- final String keyName = "key" + i;
+ // run tests
System.out.println("---------------------------------------------------------");
System.out.printf("%s with %s bytes and %s bytesPerChecksum%n",
keyName, keySize, bytesPerChecksum);
- final String md5 = createKey(bucket.delegate(), keyName, keySize,
writeBufferSize);
- for (SizeInBytes readBufferSize : readBufferSizes) {
- runTestReadKey(keyName, keySize, readBufferSize, null, bucket);
- runTestReadKey(keyName, keySize, readBufferSize, md5, bucket);
+ final CheckedBiConsumer<SizeInBytes, String, Exception> streamRead =
(readBufferSize, md5)
+ -> streamRead(keySize, readBufferSize, md5, testBucket, keyName);
+ final CheckedBiConsumer<SizeInBytes, String, Exception>
nonStreamRead = (readBufferSize, md5)
+ -> nonStreamRead(keySize, readBufferSize, md5,
nonStreamReadClient, volume, bucket, keyName);
+ final CheckedBiConsumer<SizeInBytes, String, Exception> fileRead =
(readBufferSize, md5)
+ -> fileRead(keySize, readBufferSize, md5, blockFile);
+ final List<CheckedBiConsumer<SizeInBytes, String, Exception>>
operations
+ = Arrays.asList(streamRead, nonStreamRead, fileRead);
+ Collections.shuffle(operations);
+
+ for (CheckedBiConsumer<SizeInBytes, String, Exception> op :
operations) {
+ for (int i = 0; i < 5; i++) {
+ op.accept(bufferSize, null);
+ }
+ op.accept(bufferSize, expectedMd5);
}
}
}
}
}
+ static void streamRead(SizeInBytes keySize, SizeInBytes bufferSize, String
expectedMD5,
+ TestBucket bucket, String keyName) throws Exception {
+ try (KeyInputStream in = bucket.getKeyInputStream(keyName)) {
+ assertTrue(in.isStreamBlockInputStream());
+ runTestReadKey(keySize, bufferSize, expectedMD5, in);
+ }
+ }
+
+ static void nonStreamRead(SizeInBytes keySize, SizeInBytes bufferSize,
String expectedMD5,
+ OzoneClient nonStreamReadClient, String volume, String bucket, String
keyName) throws Exception {
+ final ClientProtocol proxy = nonStreamReadClient.getProxy();
+ try (KeyInputStream in = (KeyInputStream) proxy.getKey(volume, bucket,
keyName).getInputStream()) {
+ assertFalse(in.isStreamBlockInputStream());
+ runTestReadKey(keySize, bufferSize, expectedMD5, in);
+ }
+ }
+
+ static void fileRead(SizeInBytes keySize, SizeInBytes bufferSize, String
expectedMD5,
+ File blockFile) throws Exception {
+ try (InputStream in = new
BufferedInputStream(Files.newInputStream(blockFile.toPath()),
bufferSize.getSizeInt())) {
+ runTestReadKey(keySize, bufferSize, expectedMD5, in);
+ }
+ }
+
+ static String generateMd5(SizeInBytes keySize, SizeInBytes bufferSize, File
blockFile) throws Exception {
+ try (InputStream in = new
BufferedInputStream(Files.newInputStream(blockFile.toPath()),
bufferSize.getSizeInt())) {
+ return runTestReadKey("generateMd5", keySize, bufferSize, true, in);
+ }
+ }
+
static void print(String name, long keySizeByte, long elapsedNanos,
SizeInBytes bufferSize, String computedMD5) {
final double keySizeMb = keySizeByte * 1.0 / (1 << 20);
final double elapsedSeconds = elapsedNanos / 1_000_000_000.0;
- System.out.printf("%16s: %8.2f MB/s (%7.3f s, buffer %16s, keySize %8.2f
MB, md5=%s)%n",
- name, keySizeMb / elapsedSeconds, elapsedSeconds, bufferSize,
keySizeMb, computedMD5);
+ if (computedMD5 == null) {
+ System.out.printf("%16s: %8.2f MB/s (%7.3f s, buffer %16s, keySize %8.2f
MB)%n",
+ name, keySizeMb / elapsedSeconds, elapsedSeconds, bufferSize,
keySizeMb);
+ } else {
+ System.out.printf("%16s md5=%s%n", name, computedMD5);
+ }
}
- static String createKey(OzoneBucket bucket, String keyName, SizeInBytes
keySize, SizeInBytes bufferSize)
+ static void createKey(OzoneBucket bucket, String keyName, SizeInBytes
keySize, SizeInBytes bufferSize)
throws Exception {
final byte[] buffer = new byte[bufferSize.getSizeInt()];
ThreadLocalRandom.current().nextBytes(buffer);
@@ -176,50 +263,42 @@ static String createKey(OzoneBucket bucket, String
keyName, SizeInBytes keySize,
}
}
final long elapsedNanos = System.nanoTime() - startTime;
-
- final MessageDigest md5 = MessageDigest.getInstance("MD5");
- for (long pos = 0; pos < keySizeByte;) {
- final int writeSize = Math.toIntExact(Math.min(buffer.length,
keySizeByte - pos));
- md5.update(buffer, 0, writeSize);
- pos += writeSize;
+ if (!keyName.startsWith(DUMMY_KEY)) {
+ print("createStreamKey", keySizeByte, elapsedNanos, bufferSize, null);
}
+ }
- final String computedMD5 = StringUtils.bytes2Hex(md5.digest());
- print("createStreamKey", keySizeByte, elapsedNanos, bufferSize,
computedMD5);
- return computedMD5;
+ static void runTestReadKey(SizeInBytes keySize, SizeInBytes bufferSize,
String expectedMD5,
+ InputStream in) throws Exception {
+ final String method =
JavaUtils.getCallerStackTraceElement().getMethodName();
+ final String computedMD5 = runTestReadKey(method, keySize, bufferSize,
expectedMD5 != null, in);
+ assertEquals(expectedMD5, computedMD5);
}
- private void runTestReadKey(String keyName, SizeInBytes keySize, SizeInBytes
bufferSize, String expectedMD5,
- TestBucket bucket) throws Exception {
+ static String runTestReadKey(String name, SizeInBytes keySize, SizeInBytes
bufferSize, boolean generateMd5,
+ InputStream in) throws Exception {
final long keySizeByte = keySize.getSize();
final MessageDigest md5 = MessageDigest.getInstance("MD5");
// Read the data fully into a large enough byte array
final byte[] buffer = new byte[bufferSize.getSizeInt()];
final long startTime = System.nanoTime();
- try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
- int pos = 0;
- for (; pos < keySizeByte;) {
- final int read = keyInputStream.read(buffer, 0, buffer.length);
- if (read == -1) {
- break;
- }
+ int pos = 0;
+ for (; pos < keySizeByte;) {
+ final int read = in.read(buffer, 0, buffer.length);
+ if (read == -1) {
+ break;
+ }
- if (expectedMD5 != null) {
- md5.update(buffer, 0, read);
- }
- pos += read;
+ if (generateMd5) {
+ md5.update(buffer, 0, read);
}
- assertEquals(keySizeByte, pos);
+ pos += read;
}
+ assertEquals(keySizeByte, pos);
final long elapsedNanos = System.nanoTime() - startTime;
- final String computedMD5;
- if (expectedMD5 == null) {
- computedMD5 = null;
- } else {
- computedMD5 = StringUtils.bytes2Hex(md5.digest());
- assertEquals(expectedMD5, computedMD5);
- }
- print("readStreamKey", keySizeByte, elapsedNanos, bufferSize, computedMD5);
+ final String computedMD5 = generateMd5 ?
StringUtils.bytes2Hex(md5.digest()) : null;
+ print(name, keySizeByte, elapsedNanos, bufferSize, computedMD5);
+ return computedMD5;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]