This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 41dd1509c76 HDDS-14117. Add nonStreamRead and fileRead cases to tests. 
(#9476)
41dd1509c76 is described below

commit 41dd1509c76ff0a16fe6bda00fdce16cea217996
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Dec 22 04:35:08 2025 -0800

    HDDS-14117. Add nonStreamRead and fileRead cases to tests. (#9476)
---
 .../hdds/scm/storage/MultipartInputStream.java     |   4 +
 .../ozone/client/rpc/read/TestStreamRead.java      | 201 ++++++++++++++-------
 2 files changed, 144 insertions(+), 61 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
index a28658b1ebb..221a48be828 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
@@ -82,6 +82,10 @@ public MultipartInputStream(String keyName,
     this.length = streamLength;
   }
 
+  public boolean isStreamBlockInputStream() {
+    return isStreamBlockInputStream;
+  }
+
   @Override
   protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
       throws IOException {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
index 29a43876630..9fc217b6df3 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
@@ -19,12 +19,21 @@
 
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.file.Files;
 import java.security.MessageDigest;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
@@ -33,15 +42,22 @@
 import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream;
 import org.apache.hadoop.hdds.utils.db.CodecBuffer;
 import org.apache.hadoop.ozone.ClientConfigForTesting;
+import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.io.KeyInputStream;
+import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.om.TestBucket;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.function.CheckedBiConsumer;
 import org.junit.jupiter.api.Test;
 import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
@@ -52,15 +68,11 @@
 public class TestStreamRead {
   {
     GenericTestUtils.setLogLevel(LoggerFactory.getLogger("com"), Level.ERROR);
-    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ipc"), 
Level.ERROR);
-    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.server.http"),
 Level.ERROR);
-    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.container"),
 Level.ERROR);
-    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.ha"),
 Level.ERROR);
-    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.safemode"),
 Level.ERROR);
-    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.utils"),
 Level.ERROR);
-    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.container.common"),
 Level.ERROR);
-    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.om"),
 Level.ERROR);
-    GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.ratis"), 
Level.ERROR);
+    GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org"), Level.ERROR);
+
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("BackgroundPipelineScrubber"),
 Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("ExpiredContainerReplicaOpScrubber"),
 Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("SCMHATransactionMonitor"),
 Level.ERROR);
     GenericTestUtils.setLogLevel(LoggerFactory.getLogger(CodecBuffer.class), 
Level.ERROR);
   }
 
@@ -68,8 +80,10 @@ public class TestStreamRead {
   static final int FLUSH_SIZE = 2 * CHUNK_SIZE;       // 2MB
   static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE;   // 4MB
 
-  static final int BLOCK_SIZE = 64 << 20;
   static final SizeInBytes KEY_SIZE = SizeInBytes.valueOf("128M");
+  static final int BLOCK_SIZE = KEY_SIZE.getSizeInt();
+
+  static final String DUMMY_KEY = "dummyKey";
 
   static MiniOzoneCluster newCluster(int bytesPerChecksum) throws Exception {
     final OzoneConfiguration conf = new OzoneConfiguration();
@@ -79,9 +93,8 @@ static MiniOzoneCluster newCluster(int bytesPerChecksum) 
throws Exception {
     conf.setFromObject(config);
 
     conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1);
-    conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 5);
+    conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 1);
     conf.setQuietMode(true);
-    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 64, 
StorageUnit.MB);
 
     ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
         .setBlockSize(BLOCK_SIZE)
@@ -114,53 +127,127 @@ void testReadKey256k() throws Exception {
   }
 
   void runTestReadKey(SizeInBytes keySize, SizeInBytes bytesPerChecksum) 
throws Exception {
+    System.out.println("cluster starting ...");
     try (MiniOzoneCluster cluster = newCluster(bytesPerChecksum.getSizeInt())) 
{
       cluster.waitForClusterToBeReady();
-
       System.out.println("cluster ready");
 
+      final List<HddsDatanodeService> datanodes = cluster.getHddsDatanodes();
+      assertEquals(1, datanodes.size());
+      final HddsDatanodeService datanode = datanodes.get(0);
+
       OzoneConfiguration conf = cluster.getConf();
       OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
       clientConfig.setStreamReadBlock(true);
-      OzoneConfiguration copy = new OzoneConfiguration(conf);
-      copy.setFromObject(clientConfig);
+      final OzoneConfiguration steamReadConf = new OzoneConfiguration(conf);
+      steamReadConf.setFromObject(clientConfig);
+
+      clientConfig.setStreamReadBlock(false);
+      final OzoneConfiguration nonSteamReadConf = new OzoneConfiguration(conf);
+      nonSteamReadConf.setFromObject(clientConfig);
 
-      final int n = 5;
-      final SizeInBytes writeBufferSize = SizeInBytes.valueOf("8MB");
-      final SizeInBytes[] readBufferSizes = {
+      final SizeInBytes[] bufferSizes = {
           SizeInBytes.valueOf("32M"),
           SizeInBytes.valueOf("8M"),
           SizeInBytes.valueOf("1M"),
           SizeInBytes.valueOf("4k"),
       };
 
-      try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
-        final TestBucket bucket = TestBucket.newBuilder(client).build();
+      try (OzoneClient streamReadClient = 
OzoneClientFactory.getRpcClient(steamReadConf);
+           OzoneClient nonStreamReadClient = 
OzoneClientFactory.getRpcClient(nonSteamReadConf)) {
+        final TestBucket testBucket = 
TestBucket.newBuilder(streamReadClient).build();
+        final String volume = testBucket.delegate().getVolumeName();
+        final String bucket = testBucket.delegate().getName();
+        final String keyName = "key0";
+
+        // get the client ready by writing a dummy key
+        createKey(testBucket.delegate(), DUMMY_KEY, SizeInBytes.ONE_KB, 
SizeInBytes.ONE_KB);
+
+        for (SizeInBytes bufferSize : bufferSizes) {
+          // create key
+          
System.out.println("---------------------------------------------------------");
+          createKey(testBucket.delegate(), keyName, keySize, bufferSize);
+
+          // get block file and generate md5
+          final OmKeyInfo info = 
nonStreamReadClient.getProxy().getKeyInfo(volume, bucket, keyName, false);
+          final List<OmKeyLocationInfo> locations = 
info.getLatestVersionLocations().getLocationList();
+          assertEquals(1, locations.size());
+          final BlockID blockId = locations.get(0).getBlockID();
+          final ContainerData containerData = 
datanode.getDatanodeStateMachine().getContainer().getContainerSet()
+              .getContainer(blockId.getContainerID()).getContainerData();
+          final File blockFile = 
ContainerLayoutVersion.FILE_PER_BLOCK.getChunkFile(containerData, blockId, 
null);
+          assertTrue(blockFile.exists());
+          assertEquals(BLOCK_SIZE, blockFile.length());
+          final String expectedMd5 = generateMd5(keySize, SizeInBytes.ONE_MB, 
blockFile);
 
-        for (int i = 0; i < n; i++) {
-          final String keyName = "key" + i;
+          // run tests
           
System.out.println("---------------------------------------------------------");
           System.out.printf("%s with %s bytes and %s bytesPerChecksum%n",
               keyName, keySize, bytesPerChecksum);
 
-          final String md5 = createKey(bucket.delegate(), keyName, keySize, 
writeBufferSize);
-          for (SizeInBytes readBufferSize : readBufferSizes) {
-            runTestReadKey(keyName, keySize, readBufferSize, null, bucket);
-            runTestReadKey(keyName, keySize, readBufferSize, md5, bucket);
+          final CheckedBiConsumer<SizeInBytes, String, Exception> streamRead = 
(readBufferSize, md5)
+              -> streamRead(keySize, readBufferSize, md5, testBucket, keyName);
+          final CheckedBiConsumer<SizeInBytes, String, Exception> 
nonStreamRead = (readBufferSize, md5)
+              -> nonStreamRead(keySize, readBufferSize, md5, 
nonStreamReadClient, volume, bucket, keyName);
+          final CheckedBiConsumer<SizeInBytes, String, Exception> fileRead = 
(readBufferSize, md5)
+              -> fileRead(keySize, readBufferSize, md5, blockFile);
+          final List<CheckedBiConsumer<SizeInBytes, String, Exception>> 
operations
+              = Arrays.asList(streamRead, nonStreamRead, fileRead);
+          Collections.shuffle(operations);
+
+          for (CheckedBiConsumer<SizeInBytes, String, Exception> op : 
operations) {
+            for (int i = 0; i < 5; i++) {
+              op.accept(bufferSize, null);
+            }
+            op.accept(bufferSize, expectedMd5);
           }
         }
       }
     }
   }
 
+  static void streamRead(SizeInBytes keySize, SizeInBytes bufferSize, String 
expectedMD5,
+      TestBucket bucket, String keyName) throws Exception {
+    try (KeyInputStream in = bucket.getKeyInputStream(keyName)) {
+      assertTrue(in.isStreamBlockInputStream());
+      runTestReadKey(keySize, bufferSize, expectedMD5, in);
+    }
+  }
+
+  static void nonStreamRead(SizeInBytes keySize, SizeInBytes bufferSize, 
String expectedMD5,
+      OzoneClient nonStreamReadClient, String volume, String bucket, String 
keyName) throws Exception {
+    final ClientProtocol proxy = nonStreamReadClient.getProxy();
+    try (KeyInputStream in = (KeyInputStream) proxy.getKey(volume, bucket, 
keyName).getInputStream()) {
+      assertFalse(in.isStreamBlockInputStream());
+      runTestReadKey(keySize, bufferSize, expectedMD5, in);
+    }
+  }
+
+  static void fileRead(SizeInBytes keySize, SizeInBytes bufferSize, String 
expectedMD5,
+      File blockFile) throws Exception {
+    try (InputStream in = new 
BufferedInputStream(Files.newInputStream(blockFile.toPath()), 
bufferSize.getSizeInt())) {
+      runTestReadKey(keySize, bufferSize, expectedMD5, in);
+    }
+  }
+
+  static String generateMd5(SizeInBytes keySize, SizeInBytes bufferSize, File 
blockFile) throws Exception {
+    try (InputStream in = new 
BufferedInputStream(Files.newInputStream(blockFile.toPath()), 
bufferSize.getSizeInt())) {
+      return runTestReadKey("generateMd5", keySize, bufferSize, true, in);
+    }
+  }
+
   static void print(String name, long keySizeByte, long elapsedNanos, 
SizeInBytes bufferSize, String computedMD5) {
     final double keySizeMb = keySizeByte * 1.0 / (1 << 20);
     final double elapsedSeconds = elapsedNanos / 1_000_000_000.0;
-    System.out.printf("%16s: %8.2f MB/s (%7.3f s, buffer %16s, keySize %8.2f 
MB, md5=%s)%n",
-        name, keySizeMb / elapsedSeconds, elapsedSeconds, bufferSize, 
keySizeMb, computedMD5);
+    if (computedMD5 == null) {
+      System.out.printf("%16s: %8.2f MB/s (%7.3f s, buffer %16s, keySize %8.2f 
MB)%n",
+          name, keySizeMb / elapsedSeconds, elapsedSeconds, bufferSize, 
keySizeMb);
+    } else {
+      System.out.printf("%16s md5=%s%n", name, computedMD5);
+    }
   }
 
-  static String createKey(OzoneBucket bucket, String keyName, SizeInBytes 
keySize, SizeInBytes bufferSize)
+  static void createKey(OzoneBucket bucket, String keyName, SizeInBytes 
keySize, SizeInBytes bufferSize)
       throws Exception {
     final byte[] buffer = new byte[bufferSize.getSizeInt()];
     ThreadLocalRandom.current().nextBytes(buffer);
@@ -176,50 +263,42 @@ static String createKey(OzoneBucket bucket, String 
keyName, SizeInBytes keySize,
       }
     }
     final long elapsedNanos = System.nanoTime() - startTime;
-
-    final MessageDigest md5 = MessageDigest.getInstance("MD5");
-    for (long pos = 0; pos < keySizeByte;) {
-      final int writeSize = Math.toIntExact(Math.min(buffer.length, 
keySizeByte - pos));
-      md5.update(buffer, 0, writeSize);
-      pos += writeSize;
+    if (!keyName.startsWith(DUMMY_KEY)) {
+      print("createStreamKey", keySizeByte, elapsedNanos, bufferSize, null);
     }
+  }
 
-    final String computedMD5 = StringUtils.bytes2Hex(md5.digest());
-    print("createStreamKey", keySizeByte, elapsedNanos, bufferSize, 
computedMD5);
-    return computedMD5;
+  static void runTestReadKey(SizeInBytes keySize, SizeInBytes bufferSize, 
String expectedMD5,
+      InputStream in) throws Exception {
+    final String method = 
JavaUtils.getCallerStackTraceElement().getMethodName();
+    final String computedMD5 = runTestReadKey(method, keySize, bufferSize, 
expectedMD5 != null, in);
+    assertEquals(expectedMD5, computedMD5);
   }
 
-  private void runTestReadKey(String keyName, SizeInBytes keySize, SizeInBytes 
bufferSize, String expectedMD5,
-      TestBucket bucket) throws Exception {
+  static String runTestReadKey(String name, SizeInBytes keySize, SizeInBytes 
bufferSize, boolean generateMd5,
+      InputStream in) throws Exception {
     final long keySizeByte = keySize.getSize();
     final MessageDigest md5 = MessageDigest.getInstance("MD5");
     // Read the data fully into a large enough byte array
     final byte[] buffer = new byte[bufferSize.getSizeInt()];
     final long startTime = System.nanoTime();
-    try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
-      int pos = 0;
-      for (; pos < keySizeByte;) {
-        final int read = keyInputStream.read(buffer, 0, buffer.length);
-        if (read == -1) {
-          break;
-        }
+    int pos = 0;
+    for (; pos < keySizeByte;) {
+      final int read = in.read(buffer, 0, buffer.length);
+      if (read == -1) {
+        break;
+      }
 
-        if (expectedMD5 != null) {
-          md5.update(buffer, 0, read);
-        }
-        pos += read;
+      if (generateMd5) {
+        md5.update(buffer, 0, read);
       }
-      assertEquals(keySizeByte, pos);
+      pos += read;
     }
+    assertEquals(keySizeByte, pos);
     final long elapsedNanos = System.nanoTime() - startTime;
 
-    final String computedMD5;
-    if (expectedMD5 == null) {
-      computedMD5 = null;
-    } else {
-      computedMD5 = StringUtils.bytes2Hex(md5.digest());
-      assertEquals(expectedMD5, computedMD5);
-    }
-    print("readStreamKey", keySizeByte, elapsedNanos, bufferSize, computedMD5);
+    final String computedMD5 = generateMd5 ? 
StringUtils.bytes2Hex(md5.digest()) : null;
+    print(name, keySizeByte, elapsedNanos, bufferSize, computedMD5);
+    return computedMD5;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to