This is an automated email from the ASF dual-hosted git repository.

ivandika pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 52930c50af HDDS-12005. Refactor TestBlockDataStreamOutput (#7716)
52930c50af is described below

commit 52930c50af4f5368ed7191d081dd7ecbba17eaf5
Author: Chia-Chuan Yu <[email protected]>
AuthorDate: Tue Jan 21 23:43:05 2025 +0800

    HDDS-12005. Refactor TestBlockDataStreamOutput (#7716)
---
 .../client/rpc/TestBlockDataStreamOutput.java      | 423 +++++++++++----------
 1 file changed, 227 insertions(+), 196 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index c1345207d9..0238f4f498 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -19,15 +19,19 @@
 
 import org.apache.hadoop.hdds.DatanodeVersion;
 import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
 import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
 import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
-import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.ClientConfigForTesting;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -44,19 +48,26 @@
 import org.apache.ozone.test.tag.Flaky;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.PutBlock;
-import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.WriteChunk;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_CHUNK_READ_NETTY_CHUNKED_NIO_FILE_KEY;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -64,131 +75,157 @@
 /**
  * Tests BlockDataStreamOutput class.
  */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @Timeout(300)
 public class TestBlockDataStreamOutput {
-  private static MiniOzoneCluster cluster;
-  private static OzoneConfiguration conf = new OzoneConfiguration();
-  private static OzoneClient client;
-  private static ObjectStore objectStore;
-  private static int chunkSize;
-  private static int flushSize;
-  private static int maxFlushSize;
-  private static int blockSize;
-  private static String volumeName;
-  private static String bucketName;
-  private static String keyString;
+  private MiniOzoneCluster cluster;
+  private static final int CHUNK_SIZE = 100;
+  private static final int FLUSH_SIZE = 2 * CHUNK_SIZE;
+  private static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE;
+  private static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE;
+  private static final String VOLUME_NAME = "testblockoutputstream";
+  private static final String BUCKET_NAME = VOLUME_NAME;
+  private static String keyString = UUID.randomUUID().toString();;
   private static final DatanodeVersion DN_OLD_VERSION = 
DatanodeVersion.SEPARATE_RATIS_PORTS_AVAILABLE;
 
-  @BeforeAll
-  public static void init() throws Exception {
-    chunkSize = 100;
-    flushSize = 2 * chunkSize;
-    maxFlushSize = 2 * flushSize;
-    blockSize = 2 * maxFlushSize;
-
+  static MiniOzoneCluster createCluster() throws IOException,
+      InterruptedException, TimeoutException {
+    OzoneConfiguration conf = new OzoneConfiguration();
     OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+    clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
+    clientConfig.setStreamBufferFlushDelay(false);
+    clientConfig.setEnablePutblockPiggybacking(true);
     conf.setFromObject(clientConfig);
 
-    conf.setBoolean(OZONE_CHUNK_READ_NETTY_CHUNKED_NIO_FILE_KEY, true);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, TimeUnit.SECONDS);
     conf.setQuietMode(false);
-    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
-        StorageUnit.MB);
+    conf.setStorageSize(OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB);
+    conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 3);
+
+    conf.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, true);
+    conf.setBoolean("ozone.client.hbase.enhancements.allowed", true);
+
+    DatanodeRatisServerConfig ratisServerConfig =
+        conf.getObject(DatanodeRatisServerConfig.class);
+    ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3));
+    ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(3));
+    conf.setFromObject(ratisServerConfig);
+
+    RatisClientConfig.RaftConfig raftClientConfig =
+        conf.getObject(RatisClientConfig.RaftConfig.class);
+    raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3));
+    raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(5));
+    conf.setFromObject(raftClientConfig);
+
+    RatisClientConfig ratisClientConfig =
+        conf.getObject(RatisClientConfig.class);
+    ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(30));
+    ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(30));
+    conf.setFromObject(ratisClientConfig);
 
     ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
-        .setBlockSize(blockSize)
-        .setChunkSize(chunkSize)
-        .setStreamBufferFlushSize(flushSize)
-        .setStreamBufferMaxSize(maxFlushSize)
-        .setDataStreamBufferFlushSize(maxFlushSize)
-        .setDataStreamMinPacketSize(chunkSize)
-        .setDataStreamWindowSize(5 * chunkSize)
+        .setBlockSize(BLOCK_SIZE)
+        .setChunkSize(CHUNK_SIZE)
+        .setStreamBufferFlushSize(FLUSH_SIZE)
+        .setStreamBufferMaxSize(MAX_FLUSH_SIZE)
+        .setDataStreamBufferFlushSize(MAX_FLUSH_SIZE)
+        .setDataStreamMinPacketSize(CHUNK_SIZE)
+        .setDataStreamWindowSize(5 * CHUNK_SIZE)
         .applyTo(conf);
 
-    cluster = MiniOzoneCluster.newBuilder(conf)
+    MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(5)
         .setDatanodeFactory(UniformDatanodesFactory.newBuilder()
             .setCurrentVersion(DN_OLD_VERSION)
             .build())
         .build();
+    cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.THREE,
+        180000);
     cluster.waitForClusterToBeReady();
-    //the easiest way to create an open container is creating a key
-    client = OzoneClientFactory.getRpcClient(conf);
-    objectStore = client.getObjectStore();
-    keyString = UUID.randomUUID().toString();
-    volumeName = "testblockdatastreamoutput";
-    bucketName = volumeName;
-    objectStore.createVolume(volumeName);
-    objectStore.getVolume(volumeName).createBucket(bucketName);
+
+    try (OzoneClient client = cluster.newClient()) {
+      ObjectStore objectStore = client.getObjectStore();
+      objectStore.createVolume(VOLUME_NAME);
+      objectStore.getVolume(VOLUME_NAME).createBucket(BUCKET_NAME);
+    }
+
+    return cluster;
   }
 
-  static String getKeyName() {
-    return UUID.randomUUID().toString();
+  private static Stream<Arguments> clientParameters() {
+    return Stream.of(
+        Arguments.of(true),
+        Arguments.of(false)
+    );
   }
 
-  @AfterAll
-  public static void shutdown() {
-    IOUtils.closeQuietly(client);
-    if (cluster != null) {
-      cluster.shutdown();
-    }
+  private static Stream<Arguments> dataLengthParameters() {
+    return Stream.of(
+        Arguments.of(CHUNK_SIZE / 2),
+        Arguments.of(CHUNK_SIZE),
+        Arguments.of(CHUNK_SIZE + 50),
+        Arguments.of(BLOCK_SIZE + 50)
+    );
   }
 
-  @Test
-  public void testHalfChunkWrite() throws Exception {
-    testWrite(chunkSize / 2);
-    testWriteWithFailure(chunkSize / 2);
+  static OzoneClientConfig newClientConfig(ConfigurationSource source,
+                                           boolean flushDelay) {
+    OzoneClientConfig clientConfig = source.getObject(OzoneClientConfig.class);
+    clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
+    clientConfig.setStreamBufferFlushDelay(flushDelay);
+    return clientConfig;
   }
 
-  @Test
-  public void testSingleChunkWrite() throws Exception {
-    testWrite(chunkSize);
-    testWriteWithFailure(chunkSize);
+  static OzoneClient newClient(OzoneConfiguration conf,
+                               OzoneClientConfig config) throws IOException {
+    OzoneConfiguration copy = new OzoneConfiguration(conf);
+    copy.setFromObject(config);
+    return OzoneClientFactory.getRpcClient(copy);
   }
 
-  @Test
-  public void testMultiChunkWrite() throws Exception {
-    testWrite(chunkSize + 50);
-    testWriteWithFailure(chunkSize + 50);
+  @BeforeAll
+  public void init() throws Exception {
+    cluster = createCluster();
   }
 
-  @Test
-  @Flaky("HDDS-12027")
-  public void testMultiBlockWrite() throws Exception {
-    testWrite(blockSize + 50);
-    testWriteWithFailure(blockSize + 50);
+  static String getKeyName() {
+    return UUID.randomUUID().toString();
   }
 
-  static void testWrite(int dataLength) throws Exception {
-    XceiverClientMetrics metrics =
-        XceiverClientManager.getXceiverClientMetrics();
-    long pendingWriteChunkCount = 
metrics.getPendingContainerOpCountMetrics(WriteChunk);
-    long pendingPutBlockCount = 
metrics.getPendingContainerOpCountMetrics(PutBlock);
+  @AfterAll
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @ParameterizedTest
+  @MethodSource("dataLengthParameters")
+  @Flaky("HDDS-12027")
+  public void testStreamWrite(int dataLength) throws Exception {
+    OzoneClientConfig config = newClientConfig(cluster.getConf(), false);
+    try (OzoneClient client = newClient(cluster.getConf(), config)) {
+      testWrite(client, dataLength);
+      testWriteWithFailure(client, dataLength);
+    }
+  }
 
+  static void testWrite(OzoneClient client, int dataLength) throws Exception {
     String keyName = getKeyName();
     OzoneDataStreamOutput key = createKey(
-        keyName, ReplicationType.RATIS, dataLength);
+        client, keyName, dataLength);
     final byte[] data = ContainerTestHelper.generateData(dataLength, false);
     key.write(ByteBuffer.wrap(data));
     // now close the stream, It will update the key length.
     key.close();
-    validateData(keyName, data);
-
-    assertEquals(pendingPutBlockCount,
-        metrics.getPendingContainerOpCountMetrics(PutBlock));
-    assertEquals(pendingWriteChunkCount,
-        metrics.getPendingContainerOpCountMetrics(WriteChunk));
+    validateData(client, keyName, data);
   }
 
-  private void testWriteWithFailure(int dataLength) throws Exception {
-    XceiverClientMetrics metrics =
-        XceiverClientManager.getXceiverClientMetrics();
-    long pendingWriteChunkCount = 
metrics.getPendingContainerOpCountMetrics(WriteChunk);
-    long pendingPutBlockCount = 
metrics.getPendingContainerOpCountMetrics(PutBlock);
-
+  private void testWriteWithFailure(OzoneClient client, int dataLength) throws 
Exception {
     String keyName = getKeyName();
     OzoneDataStreamOutput key = createKey(
-        keyName, ReplicationType.RATIS, dataLength);
+        client, keyName, dataLength);
     byte[] data =
         ContainerTestHelper.getFixedLengthString(keyString, dataLength)
             .getBytes(UTF_8);
@@ -203,129 +240,123 @@ private void testWriteWithFailure(int dataLength) 
throws Exception {
     key.write(b);
     key.close();
     String dataString = new String(data, UTF_8);
-    validateData(keyName, dataString.concat(dataString).getBytes(UTF_8));
-
-    assertEquals(pendingPutBlockCount,
-        metrics.getPendingContainerOpCountMetrics(PutBlock));
-    assertEquals(pendingWriteChunkCount,
-        metrics.getPendingContainerOpCountMetrics(WriteChunk));
-  }
-
-  @Test
-  public void testPutBlockAtBoundary() throws Exception {
-    int dataLength = maxFlushSize + 100;
-    XceiverClientMetrics metrics =
-        XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount = metrics.getContainerOpCountMetrics(WriteChunk);
-    long putBlockCount = metrics.getContainerOpCountMetrics(PutBlock);
-    long pendingWriteChunkCount = 
metrics.getPendingContainerOpCountMetrics(WriteChunk);
-    long pendingPutBlockCount = 
metrics.getPendingContainerOpCountMetrics(PutBlock);
-    long totalOpCount = metrics.getTotalOpCount();
-
-    String keyName = getKeyName();
-    OzoneDataStreamOutput key = createKey(
-        keyName, ReplicationType.RATIS, 0);
-    byte[] data =
-        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
-            .getBytes(UTF_8);
-    key.write(ByteBuffer.wrap(data));
-    assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
-        .isLessThanOrEqualTo(pendingPutBlockCount + 1);
-    assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
-        .isLessThanOrEqualTo(pendingWriteChunkCount + 5);
-    key.close();
-    // Since data length is 500 , first putBlock will be at 400(flush boundary)
-    // and the other at 500
-    assertEquals(putBlockCount + 2,
-        metrics.getContainerOpCountMetrics(PutBlock));
-    // Each chunk is 100 so there will be 500 / 100 = 5 chunks.
-    assertEquals(writeChunkCount + 5,
-        metrics.getContainerOpCountMetrics(WriteChunk));
-    assertEquals(totalOpCount + 7,
-        metrics.getTotalOpCount());
-    assertEquals(pendingPutBlockCount,
-        metrics.getPendingContainerOpCountMetrics(PutBlock));
-    assertEquals(pendingWriteChunkCount,
-        metrics.getPendingContainerOpCountMetrics(WriteChunk));
-
-    validateData(keyName, data);
+    validateData(client, keyName, 
dataString.concat(dataString).getBytes(UTF_8));
   }
 
-
-  static OzoneDataStreamOutput createKey(String keyName, ReplicationType type,
+  static OzoneDataStreamOutput createKey(OzoneClient client, String keyName,
                                          long size) throws Exception {
-    return TestHelper.createStreamKey(
-        keyName, type, size, objectStore, volumeName, bucketName);
+    return TestHelper.createStreamKey(keyName, ReplicationType.RATIS, size,
+        client.getObjectStore(), VOLUME_NAME, BUCKET_NAME);
   }
-  static void validateData(String keyName, byte[] data) throws Exception {
+
+  static void validateData(OzoneClient client, String keyName, byte[] data) 
throws Exception {
     TestHelper.validateData(
-        keyName, data, objectStore, volumeName, bucketName);
+        keyName, data, client.getObjectStore(), VOLUME_NAME, BUCKET_NAME);
   }
 
-
-  @Test
-  public void testMinPacketSize() throws Exception {
-    String keyName = getKeyName();
-    XceiverClientMetrics metrics =
-        XceiverClientManager.getXceiverClientMetrics();
-    OzoneDataStreamOutput key = createKey(keyName, ReplicationType.RATIS, 0);
-    long writeChunkCount = metrics.getContainerOpCountMetrics(WriteChunk);
-    long pendingWriteChunkCount = 
metrics.getPendingContainerOpCountMetrics(WriteChunk);
-    byte[] data =
-        ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 2)
-            .getBytes(UTF_8);
-    key.write(ByteBuffer.wrap(data));
-    // minPacketSize= 100, so first write of 50 wont trigger a writeChunk
-    assertEquals(writeChunkCount,
-        metrics.getContainerOpCountMetrics(WriteChunk));
-    key.write(ByteBuffer.wrap(data));
-    assertEquals(writeChunkCount + 1,
-        metrics.getContainerOpCountMetrics(WriteChunk));
-    // now close the stream, It will update the key length.
-    key.close();
-    assertEquals(pendingWriteChunkCount,
-        metrics.getPendingContainerOpCountMetrics(WriteChunk));
-    String dataString = new String(data, UTF_8);
-    validateData(keyName, dataString.concat(dataString).getBytes(UTF_8));
+  @ParameterizedTest
+  @MethodSource("clientParameters")
+  public void testPutBlockAtBoundary(boolean flushDelay) throws Exception {
+    OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
+    try (OzoneClient client = newClient(cluster.getConf(), config)) {
+      int dataLength = 500;
+      XceiverClientMetrics metrics =
+          XceiverClientManager.getXceiverClientMetrics();
+      long putBlockCount = metrics.getContainerOpCountMetrics(
+          ContainerProtos.Type.PutBlock);
+      long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics(
+          ContainerProtos.Type.PutBlock);
+      String keyName = getKeyName();
+      OzoneDataStreamOutput key = createKey(
+          client, keyName, 0);
+      byte[] data =
+          ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+              .getBytes(UTF_8);
+      key.write(ByteBuffer.wrap(data));
+      
assertThat(metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock))
+          .isLessThanOrEqualTo(pendingPutBlockCount + 1);
+      key.close();
+      // Since data length is 500 , first putBlock will be at 400(flush 
boundary)
+      // and the other at 500
+      assertEquals(
+          metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock),
+          putBlockCount + 2);
+      validateData(client, keyName, data);
+    }
   }
 
-  @Test
-  public void testTotalAckDataLength() throws Exception {
-    int dataLength = 400;
-    String keyName = getKeyName();
-    OzoneDataStreamOutput key = createKey(
-        keyName, ReplicationType.RATIS, 0);
-    byte[] data =
-        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
-            .getBytes(UTF_8);
-    KeyDataStreamOutput keyDataStreamOutput =
-        (KeyDataStreamOutput) key.getByteBufStreamOutput();
-    BlockDataStreamOutputEntry stream =
-        keyDataStreamOutput.getStreamEntries().get(0);
-    key.write(ByteBuffer.wrap(data));
-    key.close();
-    assertEquals(dataLength, stream.getTotalAckDataLength());
+  @ParameterizedTest
+  @MethodSource("clientParameters")
+  public void testMinPacketSize(boolean flushDelay) throws Exception {
+    OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
+    try (OzoneClient client = newClient(cluster.getConf(), config)) {
+      String keyName = getKeyName();
+      XceiverClientMetrics metrics =
+          XceiverClientManager.getXceiverClientMetrics();
+      OzoneDataStreamOutput key = createKey(client, keyName, 0);
+      long writeChunkCount =
+          metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
+      byte[] data =
+          ContainerTestHelper.getFixedLengthString(keyString, CHUNK_SIZE / 2)
+              .getBytes(UTF_8);
+      key.write(ByteBuffer.wrap(data));
+      // minPacketSize= 100, so first write of 50 won't trigger a writeChunk
+      assertEquals(writeChunkCount,
+          metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+      key.write(ByteBuffer.wrap(data));
+      assertEquals(writeChunkCount + 1,
+          metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+      // now close the stream, It will update the key length.
+      key.close();
+      String dataString = new String(data, UTF_8);
+      validateData(client, keyName, 
dataString.concat(dataString).getBytes(UTF_8));
+    }
   }
 
-  @Test
-  public void testDatanodeVersion() throws Exception {
-    // Verify all DNs internally have versions set correctly
-    List<HddsDatanodeService> dns = cluster.getHddsDatanodes();
-    for (HddsDatanodeService dn : dns) {
-      DatanodeDetails details = dn.getDatanodeDetails();
-      assertEquals(DN_OLD_VERSION.toProtoValue(), details.getCurrentVersion());
+  @ParameterizedTest
+  @MethodSource("clientParameters")
+  public void testTotalAckDataLength(boolean flushDelay) throws Exception {
+    OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
+    try (OzoneClient client = newClient(cluster.getConf(), config)) {
+      int dataLength = 400;
+      String keyName = getKeyName();
+      OzoneDataStreamOutput key = createKey(
+          client, keyName, 0);
+      byte[] data =
+          ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+              .getBytes(UTF_8);
+      KeyDataStreamOutput keyDataStreamOutput =
+          (KeyDataStreamOutput) key.getByteBufStreamOutput();
+      BlockDataStreamOutputEntry stream =
+          keyDataStreamOutput.getStreamEntries().get(0);
+      key.write(ByteBuffer.wrap(data));
+      key.close();
+      assertEquals(dataLength, stream.getTotalAckDataLength());
     }
+  }
 
-    String keyName = getKeyName();
-    OzoneDataStreamOutput key = createKey(keyName, ReplicationType.RATIS, 0);
-    KeyDataStreamOutput keyDataStreamOutput = (KeyDataStreamOutput) 
key.getByteBufStreamOutput();
-    BlockDataStreamOutputEntry stream = 
keyDataStreamOutput.getStreamEntries().get(0);
-
-    // Now check 3 DNs in a random pipeline returns the correct DN versions
-    List<DatanodeDetails> streamDnDetails = stream.getPipeline().getNodes();
-    for (DatanodeDetails details : streamDnDetails) {
-      assertEquals(DN_OLD_VERSION.toProtoValue(), details.getCurrentVersion());
+  @ParameterizedTest
+  @MethodSource("clientParameters")
+  public void testDatanodeVersion(boolean flushDelay) throws Exception {
+    OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
+    try (OzoneClient client = newClient(cluster.getConf(), config)) {
+      // Verify all DNs internally have versions set correctly
+      List<HddsDatanodeService> dns = cluster.getHddsDatanodes();
+      for (HddsDatanodeService dn : dns) {
+        DatanodeDetails details = dn.getDatanodeDetails();
+        assertEquals(DN_OLD_VERSION.toProtoValue(), 
details.getCurrentVersion());
+      }
+
+      String keyName = getKeyName();
+      OzoneDataStreamOutput key = createKey(client, keyName, 0);
+      KeyDataStreamOutput keyDataStreamOutput = (KeyDataStreamOutput) 
key.getByteBufStreamOutput();
+      BlockDataStreamOutputEntry stream = 
keyDataStreamOutput.getStreamEntries().get(0);
+
+      // Now check 3 DNs in a random pipeline returns the correct DN versions
+      List<DatanodeDetails> streamDnDetails = stream.getPipeline().getNodes();
+      for (DatanodeDetails details : streamDnDetails) {
+        assertEquals(DN_OLD_VERSION.toProtoValue(), 
details.getCurrentVersion());
+      }
     }
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to