This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c713042169 HDDS-14100. Close OutputStream properly in 
TestFailureHandlingByClient (#9455)
4c713042169 is described below

commit 4c713042169027e56bb7e7030b1cc3308d4fd2c4
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Thu Dec 11 12:07:06 2025 +0100

    HDDS-14100. Close OutputStream properly in TestFailureHandlingByClient 
(#9455)
---
 .../client/rpc/TestFailureHandlingByClient.java    | 335 +++++++++++----------
 .../rpc/TestFailureHandlingByClientFlushDelay.java |  70 ++---
 2 files changed, 205 insertions(+), 200 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
index f115b3d99ff..a465930b323 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -19,6 +19,7 @@
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
+import static org.apache.hadoop.hdds.client.ReplicationType.RATIS;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -70,6 +71,7 @@
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.tag.Flaky;
 import org.apache.ratis.proto.RaftProtos;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -174,35 +176,35 @@ public void shutdown() {
   @Test
   public void testBlockWritesWithDnFailures() throws Exception {
     String keyName = UUID.randomUUID().toString();
-    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
     byte[] data = ContainerTestHelper.getFixedLengthString(
         keyString, 2 * chunkSize + chunkSize / 2).getBytes(UTF_8);
-    key.write(data);
-
-    // get the name of a valid container
-    KeyOutputStream groupOutputStream =
-        assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-    // assert that the exclude list's expire time equals to
-    // default value 600000 ms in OzoneClientConfig.java
-    assertEquals(groupOutputStream.getExcludeList().getExpiryTime(), 600000);
-    List<OmKeyLocationInfo> locationInfoList =
-        groupOutputStream.getLocationInfoList();
-    assertEquals(1, locationInfoList.size());
-    long containerId = locationInfoList.get(0).getContainerID();
-    ContainerInfo container = cluster.getStorageContainerManager()
-        .getContainerManager()
-        .getContainer(ContainerID.valueOf(containerId));
-    Pipeline pipeline =
-        cluster.getStorageContainerManager().getPipelineManager()
-            .getPipeline(container.getPipelineID());
-    List<DatanodeDetails> datanodes = pipeline.getNodes();
-    cluster.shutdownHddsDatanode(datanodes.get(0));
-    cluster.shutdownHddsDatanode(datanodes.get(1));
-    restartDataNodes.add(datanodes.get(0));
-    restartDataNodes.add(datanodes.get(1));
-    // The write will fail but exception will be handled and length will be
-    // updated correctly in OzoneManager once the steam is closed
-    key.close();
+    try (OzoneOutputStream key = createKey(keyName, RATIS, 0)) {
+      key.write(data);
+
+      // get the name of a valid container
+      KeyOutputStream groupOutputStream =
+          assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+      // assert that the exclude list's expire time equals to
+      // default value 600000 ms in OzoneClientConfig.java
+      assertEquals(groupOutputStream.getExcludeList().getExpiryTime(), 600000);
+      List<OmKeyLocationInfo> locationInfoList =
+          groupOutputStream.getLocationInfoList();
+      assertEquals(1, locationInfoList.size());
+      long containerId = locationInfoList.get(0).getContainerID();
+      ContainerInfo container = cluster.getStorageContainerManager()
+          .getContainerManager()
+          .getContainer(ContainerID.valueOf(containerId));
+      Pipeline pipeline =
+          cluster.getStorageContainerManager().getPipelineManager()
+              .getPipeline(container.getPipelineID());
+      List<DatanodeDetails> datanodes = pipeline.getNodes();
+      cluster.shutdownHddsDatanode(datanodes.get(0));
+      cluster.shutdownHddsDatanode(datanodes.get(1));
+      restartDataNodes.add(datanodes.get(0));
+      restartDataNodes.add(datanodes.get(1));
+      // The write will fail but exception will be handled and length will be
+      // updated correctly in OzoneManager once the steam is closed
+    }
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
         .setBucketName(bucketName)
@@ -314,31 +316,33 @@ private void testBlockCountOnFailures(OmKeyInfo 
omKeyInfo) throws Exception {
   @Test
   public void testWriteSmallFile() throws Exception {
     String keyName = UUID.randomUUID().toString();
-    OzoneOutputStream key =
-        createKey(keyName, ReplicationType.RATIS, 0);
     String data = ContainerTestHelper
         .getFixedLengthString(keyString,  chunkSize / 2);
-    key.write(data.getBytes(UTF_8));
-    // get the name of a valid container
-    KeyOutputStream keyOutputStream =
-        assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-    List<OmKeyLocationInfo> locationInfoList =
-        keyOutputStream.getLocationInfoList();
-    long containerId = locationInfoList.get(0).getContainerID();
-    BlockID blockId = locationInfoList.get(0).getBlockID();
-    ContainerInfo container =
-        cluster.getStorageContainerManager().getContainerManager()
-            .getContainer(ContainerID.valueOf(containerId));
-    Pipeline pipeline =
-        cluster.getStorageContainerManager().getPipelineManager()
-            .getPipeline(container.getPipelineID());
-    List<DatanodeDetails> datanodes = pipeline.getNodes();
-
-    cluster.shutdownHddsDatanode(datanodes.get(0));
-    cluster.shutdownHddsDatanode(datanodes.get(1));
-    restartDataNodes.add(datanodes.get(0));
-    restartDataNodes.add(datanodes.get(1));
-    key.close();
+
+    BlockID blockId;
+    try (OzoneOutputStream key = createKey(keyName, RATIS, 0)) {
+      key.write(data.getBytes(UTF_8));
+      // get the name of a valid container
+      KeyOutputStream keyOutputStream =
+          assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+      List<OmKeyLocationInfo> locationInfoList =
+          keyOutputStream.getLocationInfoList();
+      long containerId = locationInfoList.get(0).getContainerID();
+      blockId = locationInfoList.get(0).getBlockID();
+      ContainerInfo container =
+          cluster.getStorageContainerManager().getContainerManager()
+              .getContainer(ContainerID.valueOf(containerId));
+      Pipeline pipeline =
+          cluster.getStorageContainerManager().getPipelineManager()
+              .getPipeline(container.getPipelineID());
+      List<DatanodeDetails> datanodes = pipeline.getNodes();
+
+      cluster.shutdownHddsDatanode(datanodes.get(0));
+      cluster.shutdownHddsDatanode(datanodes.get(1));
+      restartDataNodes.add(datanodes.get(0));
+      restartDataNodes.add(datanodes.get(1));
+    }
+
     // this will throw AlreadyClosedException and and current stream
     // will be discarded and write a new block
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
@@ -360,42 +364,43 @@ public void testWriteSmallFile() throws Exception {
   public void testContainerExclusionWithClosedContainerException()
       throws Exception {
     String keyName = UUID.randomUUID().toString();
-    OzoneOutputStream key =
-        createKey(keyName, ReplicationType.RATIS, blockSize);
     String data = ContainerTestHelper
         .getFixedLengthString(keyString,  chunkSize);
 
-    // get the name of a valid container
-    KeyOutputStream keyOutputStream =
-        assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-    List<BlockOutputStreamEntry> streamEntryList =
-        keyOutputStream.getStreamEntries();
-
-    // Assert that 1 block will be preallocated
-    assertEquals(1, streamEntryList.size());
-    key.write(data.getBytes(UTF_8));
-    key.flush();
-    long containerId = streamEntryList.get(0).getBlockID().getContainerID();
-    BlockID blockId = streamEntryList.get(0).getBlockID();
-    List<Long> containerIdList = new ArrayList<>();
-    containerIdList.add(containerId);
-
-    // below check will assert if the container does not get closed
-    TestHelper
-        .waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
-
-    // This write will hit ClosedContainerException and this container should
-    // will be added in the excludelist
-    key.write(data.getBytes(UTF_8));
-    key.flush();
-
-    assertThat(keyOutputStream.getExcludeList().getContainerIds())
-        .contains(ContainerID.valueOf(containerId));
-    assertThat(keyOutputStream.getExcludeList().getDatanodes()).isEmpty();
-    assertThat(keyOutputStream.getExcludeList().getPipelineIds()).isEmpty();
+    BlockID blockId;
+    try (OzoneOutputStream key = createKey(keyName, RATIS, blockSize)) {
+      // get the name of a valid container
+      KeyOutputStream keyOutputStream =
+          assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+      List<BlockOutputStreamEntry> streamEntryList =
+          keyOutputStream.getStreamEntries();
+
+      // Assert that 1 block will be preallocated
+      assertEquals(1, streamEntryList.size());
+      key.write(data.getBytes(UTF_8));
+      key.flush();
+      long containerId = streamEntryList.get(0).getBlockID().getContainerID();
+      blockId = streamEntryList.get(0).getBlockID();
+      List<Long> containerIdList = new ArrayList<>();
+      containerIdList.add(containerId);
+
+      // below check will assert if the container does not get closed
+      TestHelper
+          .waitForContainerClose(cluster, containerIdList.toArray(new 
Long[0]));
+
+      // This write will hit ClosedContainerException and this container should
+      // will be added in the excludelist
+      key.write(data.getBytes(UTF_8));
+      key.flush();
+
+      assertThat(keyOutputStream.getExcludeList().getContainerIds())
+          .contains(ContainerID.valueOf(containerId));
+      assertThat(keyOutputStream.getExcludeList().getDatanodes()).isEmpty();
+      assertThat(keyOutputStream.getExcludeList().getPipelineIds()).isEmpty();
+
+      // The close will just write to the buffer
+    }
 
-    // The close will just write to the buffer
-    key.close();
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
         .setBucketName(bucketName)
         .setReplicationConfig(RatisReplicationConfig.getInstance(THREE))
@@ -413,6 +418,7 @@ public void 
testContainerExclusionWithClosedContainerException()
 
   @ParameterizedTest
   @EnumSource(value = RaftProtos.ReplicationLevel.class, names = 
{"MAJORITY_COMMITTED", "ALL_COMMITTED"})
+  @Flaky("HDDS-13972")
   public void 
testDatanodeExclusionWithMajorityCommit(RaftProtos.ReplicationLevel type) 
throws Exception {
     OzoneConfiguration localConfig = new OzoneConfiguration(conf);
     RatisClientConfig ratisClientConfig = 
localConfig.getObject(RatisClientConfig.class);
@@ -422,52 +428,52 @@ public void 
testDatanodeExclusionWithMajorityCommit(RaftProtos.ReplicationLevel
     ObjectStore localObjectStore = localClient.getObjectStore();
 
     String keyName = UUID.randomUUID().toString();
-    OzoneOutputStream key =
-        TestHelper.createKey(keyName, ReplicationType.RATIS, blockSize, 
localObjectStore, volumeName,
-            bucketName);
     String data = ContainerTestHelper
-        .getFixedLengthString(keyString,  chunkSize);
-
-    // get the name of a valid container
-    KeyOutputStream keyOutputStream =
-        assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-    List<BlockOutputStreamEntry> streamEntryList =
-        keyOutputStream.getStreamEntries();
-
-    // Assert that 1 block will be preallocated
-    assertEquals(1, streamEntryList.size());
-    key.write(data.getBytes(UTF_8));
-    key.flush();
-    long containerId = streamEntryList.get(0).getBlockID().getContainerID();
-    BlockID blockId = streamEntryList.get(0).getBlockID();
-    ContainerInfo container =
-        cluster.getStorageContainerManager().getContainerManager()
-            .getContainer(ContainerID.valueOf(containerId));
-    Pipeline pipeline =
-        cluster.getStorageContainerManager().getPipelineManager()
-            .getPipeline(container.getPipelineID());
-    List<DatanodeDetails> datanodes = pipeline.getNodes();
-
-    // shutdown 1 datanode. This will make sure the 2 way commit happens for
-    // next write ops.
-    cluster.shutdownHddsDatanode(datanodes.get(0));
-    restartDataNodes.add(datanodes.get(0));
-
-    HddsDatanodeService hddsDatanode = 
cluster.getHddsDatanode(datanodes.get(0));
-    GenericTestUtils.waitFor(hddsDatanode::isStopped, 1000, 30000);
-
-    key.write(data.getBytes(UTF_8));
-    key.write(data.getBytes(UTF_8));
-    key.flush();
-
-    if (type == RaftProtos.ReplicationLevel.ALL_COMMITTED) {
-      assertThat(keyOutputStream.getExcludeList().getDatanodes())
-          .contains(datanodes.get(0));
+        .getFixedLengthString(keyString, chunkSize);
+
+    BlockID blockId;
+    try (OzoneOutputStream key = TestHelper.createKey(keyName, RATIS, 
blockSize, localObjectStore, volumeName,
+        bucketName)) {
+      // get the name of a valid container
+      KeyOutputStream keyOutputStream =
+          assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+      List<BlockOutputStreamEntry> streamEntryList =
+          keyOutputStream.getStreamEntries();
+
+      // Assert that 1 block will be preallocated
+      assertEquals(1, streamEntryList.size());
+      key.write(data.getBytes(UTF_8));
+      key.flush();
+      long containerId = streamEntryList.get(0).getBlockID().getContainerID();
+      blockId = streamEntryList.get(0).getBlockID();
+      ContainerInfo container =
+          cluster.getStorageContainerManager().getContainerManager()
+              .getContainer(ContainerID.valueOf(containerId));
+      Pipeline pipeline =
+          cluster.getStorageContainerManager().getPipelineManager()
+              .getPipeline(container.getPipelineID());
+      List<DatanodeDetails> datanodes = pipeline.getNodes();
+
+      // shutdown 1 datanode. This will make sure the 2 way commit happens for
+      // next write ops.
+      cluster.shutdownHddsDatanode(datanodes.get(0));
+      restartDataNodes.add(datanodes.get(0));
+
+      HddsDatanodeService hddsDatanode = 
cluster.getHddsDatanode(datanodes.get(0));
+      GenericTestUtils.waitFor(hddsDatanode::isStopped, 1000, 30000);
+
+      key.write(data.getBytes(UTF_8));
+      key.write(data.getBytes(UTF_8));
+      key.flush();
+
+      if (type == RaftProtos.ReplicationLevel.ALL_COMMITTED) {
+        assertThat(keyOutputStream.getExcludeList().getDatanodes())
+            .contains(datanodes.get(0));
+      }
+      assertThat(keyOutputStream.getExcludeList().getContainerIds()).isEmpty();
+      assertThat(keyOutputStream.getExcludeList().getPipelineIds()).isEmpty();
+      // The close will just write to the buffer
     }
-    assertThat(keyOutputStream.getExcludeList().getContainerIds()).isEmpty();
-    assertThat(keyOutputStream.getExcludeList().getPipelineIds()).isEmpty();
-    // The close will just write to the buffer
-    key.close();
 
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
         .setBucketName(bucketName)
@@ -489,47 +495,46 @@ public void 
testDatanodeExclusionWithMajorityCommit(RaftProtos.ReplicationLevel
   @Test
   public void testPipelineExclusionWithPipelineFailure() throws Exception {
     String keyName = UUID.randomUUID().toString();
-    OzoneOutputStream key =
-        createKey(keyName, ReplicationType.RATIS, blockSize);
     String data = ContainerTestHelper
         .getFixedLengthString(keyString,  chunkSize);
-
-    // get the name of a valid container
-    KeyOutputStream keyOutputStream =
-        assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-    List<BlockOutputStreamEntry> streamEntryList =
-        keyOutputStream.getStreamEntries();
-
-    // Assert that 1 block will be preallocated
-    assertEquals(1, streamEntryList.size());
-    key.write(data.getBytes(UTF_8));
-    key.flush();
-    long containerId = streamEntryList.get(0).getBlockID().getContainerID();
-    BlockID blockId = streamEntryList.get(0).getBlockID();
-    ContainerInfo container =
-        cluster.getStorageContainerManager().getContainerManager()
-            .getContainer(ContainerID.valueOf(containerId));
-    Pipeline pipeline =
-        cluster.getStorageContainerManager().getPipelineManager()
-            .getPipeline(container.getPipelineID());
-    List<DatanodeDetails> datanodes = pipeline.getNodes();
-
-    // Two nodes, next write will hit AlreadyClosedException , the pipeline
-    // will be added in the exclude list
-    cluster.shutdownHddsDatanode(datanodes.get(0));
-    cluster.shutdownHddsDatanode(datanodes.get(1));
-    restartDataNodes.add(datanodes.get(0));
-    restartDataNodes.add(datanodes.get(1));
-
-    key.write(data.getBytes(UTF_8));
-    key.write(data.getBytes(UTF_8));
-    key.flush();
-    assertThat(keyOutputStream.getExcludeList().getPipelineIds())
-        .contains(pipeline.getId());
-    assertThat(keyOutputStream.getExcludeList().getContainerIds()).isEmpty();
-    assertThat(keyOutputStream.getExcludeList().getDatanodes()).isEmpty();
-    // The close will just write to the buffer
-    key.close();
+    BlockID blockId;
+    try (OzoneOutputStream key = createKey(keyName, RATIS, blockSize)) {
+      // get the name of a valid container
+      KeyOutputStream keyOutputStream =
+          assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+      List<BlockOutputStreamEntry> streamEntryList =
+          keyOutputStream.getStreamEntries();
+
+      // Assert that 1 block will be preallocated
+      assertEquals(1, streamEntryList.size());
+      key.write(data.getBytes(UTF_8));
+      key.flush();
+      long containerId = streamEntryList.get(0).getBlockID().getContainerID();
+      blockId = streamEntryList.get(0).getBlockID();
+      ContainerInfo container =
+          cluster.getStorageContainerManager().getContainerManager()
+              .getContainer(ContainerID.valueOf(containerId));
+      Pipeline pipeline =
+          cluster.getStorageContainerManager().getPipelineManager()
+              .getPipeline(container.getPipelineID());
+      List<DatanodeDetails> datanodes = pipeline.getNodes();
+
+      // Two nodes, next write will hit AlreadyClosedException , the pipeline
+      // will be added in the exclude list
+      cluster.shutdownHddsDatanode(datanodes.get(0));
+      cluster.shutdownHddsDatanode(datanodes.get(1));
+      restartDataNodes.add(datanodes.get(0));
+      restartDataNodes.add(datanodes.get(1));
+
+      key.write(data.getBytes(UTF_8));
+      key.write(data.getBytes(UTF_8));
+      key.flush();
+      assertThat(keyOutputStream.getExcludeList().getPipelineIds())
+          .contains(pipeline.getId());
+      assertThat(keyOutputStream.getExcludeList().getContainerIds()).isEmpty();
+      assertThat(keyOutputStream.getExcludeList().getDatanodes()).isEmpty();
+      // The close will just write to the buffer
+    }
 
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
         .setBucketName(bucketName)
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClientFlushDelay.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClientFlushDelay.java
index a4d79e99f06..e356ef64e88 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClientFlushDelay.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClientFlushDelay.java
@@ -153,44 +153,44 @@ public void shutdown() {
   public void testPipelineExclusionWithPipelineFailure() throws Exception {
     startCluster();
     String keyName = UUID.randomUUID().toString();
-    OzoneOutputStream key =
-        createKey(keyName, ReplicationType.RATIS, BLOCK_SIZE);
     String data = ContainerTestHelper
         .getFixedLengthString(keyString, CHUNK_SIZE);
 
-    // get the name of a valid container
-    KeyOutputStream keyOutputStream =
-        assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-    List<BlockOutputStreamEntry> streamEntryList =
-        keyOutputStream.getStreamEntries();
-
-    // Assert that 1 block will be preallocated
-    assertEquals(1, streamEntryList.size());
-    key.write(data.getBytes(UTF_8));
-    key.flush();
-    long containerId = streamEntryList.get(0).getBlockID().getContainerID();
-    BlockID blockId = streamEntryList.get(0).getBlockID();
-    ContainerInfo container =
-        cluster.getStorageContainerManager().getContainerManager()
-            .getContainer(ContainerID.valueOf(containerId));
-    Pipeline pipeline =
-        cluster.getStorageContainerManager().getPipelineManager()
-            .getPipeline(container.getPipelineID());
-    List<DatanodeDetails> datanodes = pipeline.getNodes();
-
-    // Two nodes, next write will hit AlreadyClosedException , the pipeline
-    // will be added in the exclude list
-    cluster.shutdownHddsDatanode(datanodes.get(0));
-    cluster.shutdownHddsDatanode(datanodes.get(1));
-
-    key.write(data.getBytes(UTF_8));
-    key.flush();
-    assertThat(keyOutputStream.getExcludeList().getContainerIds()).isEmpty();
-    assertThat(keyOutputStream.getExcludeList().getDatanodes()).isEmpty();
-    assertThat(keyOutputStream.getExcludeList().getDatanodes()).isEmpty();
-    key.write(data.getBytes(UTF_8));
-    // The close will just write to the buffer
-    key.close();
+    BlockID blockId;
+    try (OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 
BLOCK_SIZE)) {
+      // get the name of a valid container
+      KeyOutputStream keyOutputStream =
+          assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+      List<BlockOutputStreamEntry> streamEntryList =
+          keyOutputStream.getStreamEntries();
+
+      // Assert that 1 block will be preallocated
+      assertEquals(1, streamEntryList.size());
+      key.write(data.getBytes(UTF_8));
+      key.flush();
+      long containerId = streamEntryList.get(0).getBlockID().getContainerID();
+      blockId = streamEntryList.get(0).getBlockID();
+      ContainerInfo container =
+          cluster.getStorageContainerManager().getContainerManager()
+              .getContainer(ContainerID.valueOf(containerId));
+      Pipeline pipeline =
+          cluster.getStorageContainerManager().getPipelineManager()
+              .getPipeline(container.getPipelineID());
+      List<DatanodeDetails> datanodes = pipeline.getNodes();
+
+      // Two nodes, next write will hit AlreadyClosedException , the pipeline
+      // will be added in the exclude list
+      cluster.shutdownHddsDatanode(datanodes.get(0));
+      cluster.shutdownHddsDatanode(datanodes.get(1));
+
+      key.write(data.getBytes(UTF_8));
+      key.flush();
+      assertThat(keyOutputStream.getExcludeList().getContainerIds()).isEmpty();
+      assertThat(keyOutputStream.getExcludeList().getDatanodes()).isEmpty();
+      assertThat(keyOutputStream.getExcludeList().getDatanodes()).isEmpty();
+      key.write(data.getBytes(UTF_8));
+      // The close will just write to the buffer
+    }
 
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
         .setBucketName(bucketName)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to