This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a6e31d50cf HDDS-13618. Avoid frequent pipeline close action from DN 
(#9024)
2a6e31d50cf is described below

commit 2a6e31d50cf87609fb510fe0bd4fe146d13cd51a
Author: Sarveksha Yeshavantha Raju 
<[email protected]>
AuthorDate: Thu Sep 18 16:44:43 2025 +0530

    HDDS-13618. Avoid frequent pipeline close action from DN (#9024)
---
 .../common/statemachine/StateContext.java          |  8 +++
 .../ClosePipelineCommandHandler.java               |  9 ++++
 .../commandhandler/CommandDispatcher.java          |  4 ++
 .../transport/server/ratis/XceiverServerRatis.java | 12 ++++-
 .../TestClosePipelineCommandHandler.java           | 58 ++++++++++++++++++++++
 .../hdds/scm/pipeline/TestPipelineClose.java       | 52 +++++++++++++++++++
 6 files changed, 142 insertions(+), 1 deletion(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 3b7fd7efe03..305b7b55a22 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -43,6 +43,7 @@
 import java.util.OptionalLong;
 import java.util.Queue;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -70,6 +71,7 @@
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ClosePipelineCommandHandler;
 import org.apache.hadoop.ozone.container.common.states.DatanodeState;
 import 
org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
 import 
org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState;
@@ -807,6 +809,12 @@ public Map<SCMCommandProto.Type, Integer> 
getCommandQueueSummary() {
     return summary;
   }
 
+  public boolean isPipelineCloseInProgress(UUID pipelineID) {
+    ClosePipelineCommandHandler handler = 
parentDatanodeStateMachine.getCommandDispatcher()
+        .getClosePipelineCommandHandler();
+    return handler.isPipelineCloseInProgress(pipelineID);
+  }
+
   /**
    * Returns the count of the Execution.
    * @return long
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
index 2e392ccf663..38924e9dcac 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -90,6 +90,15 @@ public ClosePipelineCommandHandler(
     this.pipelinesInProgress = ConcurrentHashMap.newKeySet();
   }
 
+  /**
+   * Returns true if pipeline close is in progress, else false.
+   *
+   * @return boolean
+   */
+  public boolean isPipelineCloseInProgress(UUID pipelineID) {
+    return pipelinesInProgress.contains(pipelineID);
+  }
+
   /**
    * Handles a given SCM command.
    *
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
index 696b04defe3..ece91ffdd1c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
@@ -80,6 +80,10 @@ public CommandHandler getDeleteBlocksCommandHandler() {
     return handlerMap.get(Type.deleteBlocksCommand);
   }
 
+  public ClosePipelineCommandHandler getClosePipelineCommandHandler() {
+    return (ClosePipelineCommandHandler) 
handlerMap.get(Type.closePipelineCommand);
+  }
+
   /**
    * Dispatch the command to the correct handler.
    *
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 6661823f9a1..02167d750b0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -741,9 +741,19 @@ private void handlePipelineFailure(RaftGroupId groupId, 
RoleInfoProto roleInfoPr
     triggerPipelineClose(groupId, b.toString(), 
ClosePipelineInfo.Reason.PIPELINE_FAILED);
   }
 
-  private void triggerPipelineClose(RaftGroupId groupId, String detail,
+  @VisibleForTesting
+  public void triggerPipelineClose(RaftGroupId groupId, String detail,
       ClosePipelineInfo.Reason reasonCode) {
     PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid());
+
+    if (context != null) {
+      if (context.isPipelineCloseInProgress(pipelineID.getId())) {
+        LOG.debug("Skipped triggering pipeline close for {} as it is already 
in progress. Reason: {}",
+            pipelineID.getId(), detail);
+        return;
+      }
+    }
+
     ClosePipelineInfo.Builder closePipelineInfo =
         ClosePipelineInfo.newBuilder()
             .setPipelineID(pipelineID.getProtobuf())
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java
index 2120615081e..70744efbbed 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java
@@ -17,8 +17,12 @@
 
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
@@ -31,6 +35,11 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -132,6 +141,55 @@ void testCommandIdempotency() throws IOException {
         .remove(any(), anyBoolean(), anyBoolean());
   }
 
+  @Test
+  void testPendingPipelineClose() throws IOException, InterruptedException {
+    final List<DatanodeDetails> datanodes = getDatanodes();
+    final DatanodeDetails currentDatanode = datanodes.get(0);
+    final PipelineID pipelineID = PipelineID.randomId();
+    final UUID pipelineUUID = pipelineID.getId();
+    final SCMCommand<ClosePipelineCommandProto> command1 = new 
ClosePipelineCommand(pipelineID);
+    final SCMCommand<ClosePipelineCommandProto> command2 = new 
ClosePipelineCommand(pipelineID);
+    StateContext stateContext = 
ContainerTestUtils.getMockContext(currentDatanode, conf);
+
+    final boolean shouldDeleteRatisLogDirectory = true;
+    XceiverServerRatis writeChannel = mock(XceiverServerRatis.class);
+    when(ozoneContainer.getWriteChannel()).thenReturn(writeChannel);
+    
when(writeChannel.getShouldDeleteRatisLogDirectory()).thenReturn(shouldDeleteRatisLogDirectory);
+    when(writeChannel.isExist(pipelineID.getProtobuf())).thenReturn(true);
+    Collection<RaftPeer> raftPeers = datanodes.stream()
+        .map(RatisHelper::toRaftPeer)
+        .collect(Collectors.toList());
+    when(writeChannel.getServer()).thenReturn(mock(RaftServer.class));
+    
when(writeChannel.getServer().getId()).thenReturn(RatisHelper.toRaftPeerId(currentDatanode));
+    
when(writeChannel.getRaftPeersInPipeline(pipelineID)).thenReturn(raftPeers);
+
+    CountDownLatch firstCommandStarted = new CountDownLatch(1);
+    CountDownLatch secondCommandSubmitted = new CountDownLatch(1);
+
+    doAnswer(invocation -> {
+      firstCommandStarted.countDown();
+      secondCommandSubmitted.await();
+      return null;
+    }).when(writeChannel).removeGroup(pipelineID.getProtobuf());
+
+    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
+
+    final ClosePipelineCommandHandler commandHandler =
+        new ClosePipelineCommandHandler((leader, tls) -> raftClient, 
singleThreadExecutor);
+    assertFalse(commandHandler.isPipelineCloseInProgress(pipelineUUID));
+    commandHandler.handle(command1, ozoneContainer, stateContext, 
connectionManager);
+    assertTrue(firstCommandStarted.await(5, TimeUnit.SECONDS));
+    commandHandler.handle(command2, ozoneContainer, stateContext, 
connectionManager);
+    secondCommandSubmitted.countDown();
+
+    singleThreadExecutor.shutdown();
+    assertTrue(singleThreadExecutor.awaitTermination(10, TimeUnit.SECONDS));
+    
+    // Only one command should have been processed due to duplicate prevention
+    assertEquals(1, commandHandler.getInvocationCount());
+    assertFalse(commandHandler.isPipelineCloseInProgress(pipelineUUID));
+  }
+
   private List<DatanodeDetails> getDatanodes() {
     final DatanodeDetails dnOne = MockDatanodeDetails.randomDatanodeDetails();
     final DatanodeDetails dnTwo = MockDatanodeDetails.randomDatanodeDetails();
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index 1763d9e269b..1e6aa4e04bc 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -27,11 +27,13 @@
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -39,6 +41,7 @@
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -56,6 +59,8 @@
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ClosePipelineCommandHandler;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.ozone.test.GenericTestUtils;
@@ -66,6 +71,7 @@
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.mockito.ArgumentCaptor;
+import org.slf4j.event.Level;
 
 /**
  * Tests for Pipeline Closing.
@@ -256,6 +262,52 @@ void testPipelineCloseWithLogFailure()
     verifyCloseForPipeline(openPipeline, actionsFromDatanode);
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  void testPipelineCloseTriggersSkippedWhenAlreadyInProgress() throws 
Exception {
+    ContainerInfo allocateContainer = containerManager
+        .allocateContainer(RatisReplicationConfig.getInstance(
+            ReplicationFactor.THREE), "newTestOwner");
+    ContainerWithPipeline containerWithPipeline = new 
ContainerWithPipeline(allocateContainer,
+        pipelineManager.getPipeline(allocateContainer.getPipelineID()));
+    
+    DatanodeStateMachine datanodeStateMachine = 
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine();
+    XceiverServerRatis xceiverRatis = (XceiverServerRatis) 
datanodeStateMachine.getContainer().getWriteChannel();
+
+    GenericTestUtils.setLogLevel(XceiverServerRatis.class, Level.DEBUG);
+    GenericTestUtils.LogCapturer xceiverLogCapturer =
+        GenericTestUtils.LogCapturer.captureLogs(XceiverServerRatis.class);
+
+    RaftGroupId groupId = 
RaftGroupId.valueOf(containerWithPipeline.getPipeline().getId().getId());
+    PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid());
+
+    ClosePipelineCommandHandler handler = 
datanodeStateMachine.getCommandDispatcher().getClosePipelineCommandHandler();
+     
+    Field pipelinesInProgressField = 
handler.getClass().getDeclaredField("pipelinesInProgress");
+    pipelinesInProgressField.setAccessible(true);
+    Set<UUID> pipelinesInProgress = (Set<UUID>) 
pipelinesInProgressField.get(handler);
+
+    try {
+      pipelinesInProgress.add(pipelineID.getId());
+
+      String detail = "test duplicate trigger ";
+      int numOfDuplicateTriggers = 10;
+      for (int i = 1; i <= numOfDuplicateTriggers; i++) {
+        xceiverRatis.triggerPipelineClose(groupId, detail + i, 
ClosePipelineInfo.Reason.PIPELINE_FAILED);
+      }
+
+      String xceiverLogs = xceiverLogCapturer.getOutput();
+      int skippedCount = StringUtils.countMatches(xceiverLogs.toLowerCase(), 
"skipped triggering pipeline close");
+      assertEquals(numOfDuplicateTriggers, skippedCount);
+    } finally {
+      pipelinesInProgress.remove(pipelineID.getId());
+      xceiverLogCapturer.stopCapturing();
+
+      
pipelineManager.closePipeline(containerWithPipeline.getPipeline().getId());
+      
pipelineManager.deletePipeline(containerWithPipeline.getPipeline().getId());
+    }
+  }
+
   private boolean verifyCloseForPipeline(Pipeline pipeline,
       PipelineActionsFromDatanode report) {
     UUID uuidToFind = pipeline.getId().getId();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to