This is an automated email from the ASF dual-hosted git repository.

sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new ebacba1a49f HDDS-14040. Ozone client hang for data write in failure 
scenario (#9401)
ebacba1a49f is described below

commit ebacba1a49ffb744e61772c8f3619c8dd7e6d9c5
Author: Sumit Agrawal <[email protected]>
AuthorDate: Fri Dec 5 12:01:05 2025 +0530

    HDDS-14040. Ozone client hang for data write in failure scenario (#9401)
---
 .../server/ratis/ContainerStateMachine.java        | 14 ++---
 .../server/ratis/TestContainerStateMachine.java    | 43 ++++++++-------
 .../rpc/TestContainerStateMachineFailures.java     | 62 +++++++++++++++++++++-
 3 files changed, 93 insertions(+), 26 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index d5773b5abe5..1ec30914e2c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -582,7 +582,8 @@ private CompletableFuture<Message> writeStateMachineData(
     try {
       validateLongRunningWrite();
     } catch (StorageContainerException e) {
-      return completeExceptionally(e);
+      ContainerCommandResponseProto result = 
ContainerUtils.logAndReturnError(LOG, e, requestProto);
+      return CompletableFuture.completedFuture(result::toByteString);
     }
     final WriteChunkRequestProto write = requestProto.getWriteChunk();
     RaftServer server = ratisServer.getServer();
@@ -631,8 +632,11 @@ private CompletableFuture<Message> writeStateMachineData(
             // see the stateMachine is marked unhealthy by other parallel 
thread
             unhealthyContainers.add(write.getBlockID().getContainerID());
             stateMachineHealthy.set(false);
-            raftFuture.completeExceptionally(e);
-            throw e;
+            StorageContainerException sce = new 
StorageContainerException("Failed to write chunk data",
+                e, ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
+            ContainerCommandResponseProto result = 
ContainerUtils.logAndReturnError(LOG, sce, requestProto);
+            raftFuture.complete(result::toByteString);
+            return result;
           } finally {
             // Remove the future once it finishes execution from the
             writeChunkFutureMap.remove(entryIndex);
@@ -657,8 +661,6 @@ private void 
handleCommandResult(ContainerCommandRequestProto requestProto, long
         // After concurrent flushes are allowed on the same key, chunk file 
inconsistencies can happen and
         // that should not crash the pipeline.
         && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
-      StorageContainerException sce =
-          new StorageContainerException(r.getMessage(), r.getResult());
       LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: 
blockId" +
           write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
           write.getChunkData().getChunkName() + " Error message: " +
@@ -669,7 +671,7 @@ private void 
handleCommandResult(ContainerCommandRequestProto requestProto, long
       // handling the entry for the write chunk in cache.
       stateMachineHealthy.set(false);
       unhealthyContainers.add(write.getBlockID().getContainerID());
-      raftFuture.completeExceptionally(sce);
+      raftFuture.complete(r::toByteString);
     } else {
       metrics.incNumBytesWrittenCount(
           requestProto.getWriteChunk().getChunkData().getLen());
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java
index d0b2dc5358e..fd869c6127e 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java
@@ -22,6 +22,7 @@
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -57,6 +58,7 @@
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -120,7 +122,8 @@ public void shutdown() {
 
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
-  public void testWriteFailure(boolean failWithException) throws 
ExecutionException, InterruptedException {
+  public void testWriteFailure(boolean failWithException)
+      throws ExecutionException, InterruptedException, 
InvalidProtocolBufferException {
     RaftProtos.LogEntryProto entry = mock(RaftProtos.LogEntryProto.class);
     when(entry.getTerm()).thenReturn(1L);
     when(entry.getIndex()).thenReturn(1L);
@@ -134,23 +137,28 @@ public void testWriteFailure(boolean failWithException) 
throws ExecutionExceptio
     setUpMockDispatcherReturn(failWithException);
     setUpMockRequestProtoReturn(context, 1, 1);
 
-    ThrowableCatcher catcher = new ThrowableCatcher();
-
-    stateMachine.write(entry, trx).exceptionally(catcher.asSetter()).get();
+    Message message = stateMachine.write(entry, trx).get();
     verify(dispatcher, 
times(1)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
         any(DispatcherContext.class));
     reset(dispatcher);
-    assertNotNull(catcher.getReceived());
-    assertResults(failWithException, catcher.getCaught());
+    ContainerProtos.ContainerCommandResponseProto 
containerCommandResponseProto =
+        
ContainerProtos.ContainerCommandResponseProto.parseFrom(message.getContent());
+    if (failWithException) {
+      assertEquals("Failed to write chunk data", 
containerCommandResponseProto.getMessage());
+      assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, 
containerCommandResponseProto.getResult());
+    } else {
+      // If dispatcher returned failure response, the state machine should 
propagate the same failure.
+      assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, 
containerCommandResponseProto.getResult());
+    }
 
     // Writing data to another container(containerId 2) should also fail.
     setUpMockRequestProtoReturn(context, 2, 1);
-    stateMachine.write(entryNext, trx).exceptionally(catcher.asSetter()).get();
+    message = stateMachine.write(entryNext, trx).get();
     verify(dispatcher, 
times(0)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
         any(DispatcherContext.class));
-    assertInstanceOf(StorageContainerException.class, catcher.getReceived());
-    StorageContainerException sce = (StorageContainerException) 
catcher.getReceived();
-    assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY, sce.getResult());
+    containerCommandResponseProto
+        = 
ContainerProtos.ContainerCommandResponseProto.parseFrom(message.getContent());
+    assertTrue(containerCommandResponseProto.getMessage().contains("failed, 
stopping all writes to container"));
   }
 
   @ParameterizedTest
@@ -224,20 +232,17 @@ public void testWriteTimout() throws Exception {
     }).when(dispatcher).dispatch(any(), any());
 
     setUpMockRequestProtoReturn(context, 1, 1);
-    ThrowableCatcher catcher = new ThrowableCatcher();
 
     CompletableFuture<Message> firstWrite = stateMachine.write(entry, trx);
     Thread.sleep(2000);
     CompletableFuture<Message> secondWrite = stateMachine.write(entryNext, 
trx);
-    firstWrite.exceptionally(catcher.asSetter()).get();
-    assertNotNull(catcher.getCaught());
-    assertInstanceOf(InterruptedException.class, catcher.getReceived());
+    ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto
+        = 
ContainerProtos.ContainerCommandResponseProto.parseFrom(firstWrite.get().getContent());
+    assertEquals("Failed to write chunk data", 
containerCommandResponseProto.getMessage());
 
-    secondWrite.exceptionally(catcher.asSetter()).get();
-    assertNotNull(catcher.getReceived());
-    assertInstanceOf(StorageContainerException.class, catcher.getReceived());
-    StorageContainerException sce = (StorageContainerException) 
catcher.getReceived();
-    assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, 
sce.getResult());
+    containerCommandResponseProto
+        = 
ContainerProtos.ContainerCommandResponseProto.parseFrom(secondWrite.get().getContent());
+    assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, 
containerCommandResponseProto.getResult());
   }
 
   private void setUpMockDispatcherReturn(boolean failWithException) {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index c5ff85010bd..22d3a2b5450 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -53,6 +53,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
@@ -87,10 +88,12 @@
 import org.apache.hadoop.ozone.container.TestHelper;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -98,6 +101,7 @@
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.LambdaTestUtils;
 import org.apache.ozone.test.tag.Flaky;
@@ -135,7 +139,7 @@ public static void init() throws Exception {
         TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
         TimeUnit.MILLISECONDS);
-    conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 200,
+    conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 2000,
         TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 200, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
@@ -764,9 +768,14 @@ void testContainerStateMachineSingleFailureRetry()
       assertEquals(1, locationInfoList.size());
 
       OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+      ContainerSet containerSet = 
cluster.getHddsDatanode(omKeyLocationInfo.getPipeline().getLeaderNode())
+          .getDatanodeStateMachine().getContainer().getContainerSet();
 
       induceFollowerFailure(omKeyLocationInfo, 2);
       key.flush();
+      // wait for container close for failure in flush for both followers 
applyTransaction failure
+      GenericTestUtils.waitFor(() -> 
containerSet.getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
+              
.getState().equals(ContainerProtos.ContainerDataProto.State.CLOSED), 100, 
30000);
       key.write("ratis".getBytes(UTF_8));
       key.flush();
     }
@@ -805,6 +814,57 @@ void testContainerStateMachineDualFailureRetry()
     validateData("ratis1", 2, "ratisratisratisratis");
   }
 
+  @Test
+  void testContainerStateMachineAllNodeFailure()
+      throws Exception {
+    // mark all dn volume as full to induce failure
+    List<Pair<StorageVolume, Long>> increasedVolumeSpace = new ArrayList<>();
+    cluster.getHddsDatanodes().forEach(
+        dn -> {
+          List<StorageVolume> volumesList = 
dn.getDatanodeStateMachine().getContainer().getVolumeSet().getVolumesList();
+          volumesList.forEach(sv -> {
+            if (sv.getVolumeUsage().isPresent()) {
+              increasedVolumeSpace.add(Pair.of(sv, 
sv.getCurrentUsage().getAvailable()));
+              
sv.getVolumeUsage().get().incrementUsedSpace(sv.getCurrentUsage().getAvailable());
+            }
+          });
+        }
+    );
+
+    long startTime = Time.monotonicNow();
+    ReplicationConfig replicationConfig = 
ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
+        ReplicationFactor.THREE);
+    try (OzoneOutputStream key = 
objectStore.getVolume(volumeName).getBucket(bucketName).createKey(
+        "testkey1", 1024, replicationConfig, new HashMap<>())) {
+
+      key.write("ratis".getBytes(UTF_8));
+      key.flush();
+      fail();
+    } catch (IOException ex) {
+      assertTrue(ex.getMessage().contains("Retry request failed. retries get 
failed due to exceeded" +
+          " maximum allowed retries number: 5"), ex.getMessage());
+    } finally {
+      increasedVolumeSpace.forEach(e -> e.getLeft().getVolumeUsage().ifPresent(
+          p -> p.decrementUsedSpace(e.getRight())));
+      // test execution is less than 2 sec but to be safe putting 30 sec as 
without fix, taking more than 60 sec
+      assertTrue(Time.monotonicNow() - startTime < 30000, "Operation took 
longer than expected: "
+          + (Time.monotonicNow() - startTime));
+    }
+
+    // previous pipeline gets closed due to disk full failure, so created a 
new pipeline and write should succeed,
+    // and this ensures later test case can pass (should not fail due to 
pipeline unavailability as timeout is 200ms
+    // for pipeline creation which can fail in testcase later on)
+    Pipeline pipeline = 
cluster.getStorageContainerManager().getPipelineManager().createPipeline(replicationConfig);
+    
cluster.getStorageContainerManager().getPipelineManager().waitPipelineReady(pipeline.getId(),
 60000);
+
+    try (OzoneOutputStream key = 
objectStore.getVolume(volumeName).getBucket(bucketName).createKey(
+        "testkey2", 1024, replicationConfig, new HashMap<>())) {
+
+      key.write("ratis".getBytes(UTF_8));
+      key.flush();
+    }
+  }
+
   private void induceFollowerFailure(OmKeyLocationInfo omKeyLocationInfo,
                                      int failureCount) {
     DatanodeID leader = omKeyLocationInfo.getPipeline().getLeaderId();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to