This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 2a6e31d50cf HDDS-13618. Avoid frequent pipeline close action from DN
(#9024)
2a6e31d50cf is described below
commit 2a6e31d50cf87609fb510fe0bd4fe146d13cd51a
Author: Sarveksha Yeshavantha Raju
<[email protected]>
AuthorDate: Thu Sep 18 16:44:43 2025 +0530
HDDS-13618. Avoid frequent pipeline close action from DN (#9024)
---
.../common/statemachine/StateContext.java | 8 +++
.../ClosePipelineCommandHandler.java | 9 ++++
.../commandhandler/CommandDispatcher.java | 4 ++
.../transport/server/ratis/XceiverServerRatis.java | 12 ++++-
.../TestClosePipelineCommandHandler.java | 58 ++++++++++++++++++++++
.../hdds/scm/pipeline/TestPipelineClose.java | 52 +++++++++++++++++++
6 files changed, 142 insertions(+), 1 deletion(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 3b7fd7efe03..305b7b55a22 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -43,6 +43,7 @@
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -70,6 +71,7 @@
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ClosePipelineCommandHandler;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import
org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
import
org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState;
@@ -807,6 +809,12 @@ public Map<SCMCommandProto.Type, Integer>
getCommandQueueSummary() {
return summary;
}
+ public boolean isPipelineCloseInProgress(UUID pipelineID) {
+ ClosePipelineCommandHandler handler =
parentDatanodeStateMachine.getCommandDispatcher()
+ .getClosePipelineCommandHandler();
+ return handler.isPipelineCloseInProgress(pipelineID);
+ }
+
/**
* Returns the count of the Execution.
* @return long
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
index 2e392ccf663..38924e9dcac 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -90,6 +90,15 @@ public ClosePipelineCommandHandler(
this.pipelinesInProgress = ConcurrentHashMap.newKeySet();
}
+ /**
+ * Returns true if pipeline close is in progress, else false.
+ *
+ * @return boolean
+ */
+ public boolean isPipelineCloseInProgress(UUID pipelineID) {
+ return pipelinesInProgress.contains(pipelineID);
+ }
+
/**
* Handles a given SCM command.
*
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
index 696b04defe3..ece91ffdd1c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
@@ -80,6 +80,10 @@ public CommandHandler getDeleteBlocksCommandHandler() {
return handlerMap.get(Type.deleteBlocksCommand);
}
+ public ClosePipelineCommandHandler getClosePipelineCommandHandler() {
+ return (ClosePipelineCommandHandler)
handlerMap.get(Type.closePipelineCommand);
+ }
+
/**
* Dispatch the command to the correct handler.
*
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 6661823f9a1..02167d750b0 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -741,9 +741,19 @@ private void handlePipelineFailure(RaftGroupId groupId,
RoleInfoProto roleInfoPr
triggerPipelineClose(groupId, b.toString(),
ClosePipelineInfo.Reason.PIPELINE_FAILED);
}
- private void triggerPipelineClose(RaftGroupId groupId, String detail,
+ @VisibleForTesting
+ public void triggerPipelineClose(RaftGroupId groupId, String detail,
ClosePipelineInfo.Reason reasonCode) {
PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid());
+
+ if (context != null) {
+ if (context.isPipelineCloseInProgress(pipelineID.getId())) {
+ LOG.debug("Skipped triggering pipeline close for {} as it is already
in progress. Reason: {}",
+ pipelineID.getId(), detail);
+ return;
+ }
+ }
+
ClosePipelineInfo.Builder closePipelineInfo =
ClosePipelineInfo.newBuilder()
.setPipelineID(pipelineID.getProtobuf())
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java
index 2120615081e..70744efbbed 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java
@@ -17,8 +17,12 @@
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
@@ -31,6 +35,11 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -132,6 +141,55 @@ void testCommandIdempotency() throws IOException {
.remove(any(), anyBoolean(), anyBoolean());
}
+ @Test
+ void testPendingPipelineClose() throws IOException, InterruptedException {
+ final List<DatanodeDetails> datanodes = getDatanodes();
+ final DatanodeDetails currentDatanode = datanodes.get(0);
+ final PipelineID pipelineID = PipelineID.randomId();
+ final UUID pipelineUUID = pipelineID.getId();
+ final SCMCommand<ClosePipelineCommandProto> command1 = new
ClosePipelineCommand(pipelineID);
+ final SCMCommand<ClosePipelineCommandProto> command2 = new
ClosePipelineCommand(pipelineID);
+ StateContext stateContext =
ContainerTestUtils.getMockContext(currentDatanode, conf);
+
+ final boolean shouldDeleteRatisLogDirectory = true;
+ XceiverServerRatis writeChannel = mock(XceiverServerRatis.class);
+ when(ozoneContainer.getWriteChannel()).thenReturn(writeChannel);
+
when(writeChannel.getShouldDeleteRatisLogDirectory()).thenReturn(shouldDeleteRatisLogDirectory);
+ when(writeChannel.isExist(pipelineID.getProtobuf())).thenReturn(true);
+ Collection<RaftPeer> raftPeers = datanodes.stream()
+ .map(RatisHelper::toRaftPeer)
+ .collect(Collectors.toList());
+ when(writeChannel.getServer()).thenReturn(mock(RaftServer.class));
+
when(writeChannel.getServer().getId()).thenReturn(RatisHelper.toRaftPeerId(currentDatanode));
+
when(writeChannel.getRaftPeersInPipeline(pipelineID)).thenReturn(raftPeers);
+
+ CountDownLatch firstCommandStarted = new CountDownLatch(1);
+ CountDownLatch secondCommandSubmitted = new CountDownLatch(1);
+
+ doAnswer(invocation -> {
+ firstCommandStarted.countDown();
+ secondCommandSubmitted.await();
+ return null;
+ }).when(writeChannel).removeGroup(pipelineID.getProtobuf());
+
+ ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
+
+ final ClosePipelineCommandHandler commandHandler =
+ new ClosePipelineCommandHandler((leader, tls) -> raftClient,
singleThreadExecutor);
+ assertFalse(commandHandler.isPipelineCloseInProgress(pipelineUUID));
+ commandHandler.handle(command1, ozoneContainer, stateContext,
connectionManager);
+ assertTrue(firstCommandStarted.await(5, TimeUnit.SECONDS));
+ commandHandler.handle(command2, ozoneContainer, stateContext,
connectionManager);
+ secondCommandSubmitted.countDown();
+
+ singleThreadExecutor.shutdown();
+ assertTrue(singleThreadExecutor.awaitTermination(10, TimeUnit.SECONDS));
+
+ // Only one command should have been processed due to duplicate prevention
+ assertEquals(1, commandHandler.getInvocationCount());
+ assertFalse(commandHandler.isPipelineCloseInProgress(pipelineUUID));
+ }
+
private List<DatanodeDetails> getDatanodes() {
final DatanodeDetails dnOne = MockDatanodeDetails.randomDatanodeDetails();
final DatanodeDetails dnTwo = MockDatanodeDetails.randomDatanodeDetails();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index 1763d9e269b..1e6aa4e04bc 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -27,11 +27,13 @@
import static org.mockito.Mockito.verify;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -39,6 +41,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -56,6 +59,8 @@
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ClosePipelineCommandHandler;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.ozone.test.GenericTestUtils;
@@ -66,6 +71,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.mockito.ArgumentCaptor;
+import org.slf4j.event.Level;
/**
* Tests for Pipeline Closing.
@@ -256,6 +262,52 @@ void testPipelineCloseWithLogFailure()
verifyCloseForPipeline(openPipeline, actionsFromDatanode);
}
+ @Test
+ @SuppressWarnings("unchecked")
+ void testPipelineCloseTriggersSkippedWhenAlreadyInProgress() throws
Exception {
+ ContainerInfo allocateContainer = containerManager
+ .allocateContainer(RatisReplicationConfig.getInstance(
+ ReplicationFactor.THREE), "newTestOwner");
+ ContainerWithPipeline containerWithPipeline = new
ContainerWithPipeline(allocateContainer,
+ pipelineManager.getPipeline(allocateContainer.getPipelineID()));
+
+ DatanodeStateMachine datanodeStateMachine =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine();
+ XceiverServerRatis xceiverRatis = (XceiverServerRatis)
datanodeStateMachine.getContainer().getWriteChannel();
+
+ GenericTestUtils.setLogLevel(XceiverServerRatis.class, Level.DEBUG);
+ GenericTestUtils.LogCapturer xceiverLogCapturer =
+ GenericTestUtils.LogCapturer.captureLogs(XceiverServerRatis.class);
+
+ RaftGroupId groupId =
RaftGroupId.valueOf(containerWithPipeline.getPipeline().getId().getId());
+ PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid());
+
+ ClosePipelineCommandHandler handler =
datanodeStateMachine.getCommandDispatcher().getClosePipelineCommandHandler();
+
+ Field pipelinesInProgressField =
handler.getClass().getDeclaredField("pipelinesInProgress");
+ pipelinesInProgressField.setAccessible(true);
+ Set<UUID> pipelinesInProgress = (Set<UUID>)
pipelinesInProgressField.get(handler);
+
+ try {
+ pipelinesInProgress.add(pipelineID.getId());
+
+ String detail = "test duplicate trigger ";
+ int numOfDuplicateTriggers = 10;
+ for (int i = 1; i <= numOfDuplicateTriggers; i++) {
+ xceiverRatis.triggerPipelineClose(groupId, detail + i,
ClosePipelineInfo.Reason.PIPELINE_FAILED);
+ }
+
+ String xceiverLogs = xceiverLogCapturer.getOutput();
+ int skippedCount = StringUtils.countMatches(xceiverLogs.toLowerCase(),
"skipped triggering pipeline close");
+ assertEquals(numOfDuplicateTriggers, skippedCount);
+ } finally {
+ pipelinesInProgress.remove(pipelineID.getId());
+ xceiverLogCapturer.stopCapturing();
+
+
pipelineManager.closePipeline(containerWithPipeline.getPipeline().getId());
+
pipelineManager.deletePipeline(containerWithPipeline.getPipeline().getId());
+ }
+ }
+
private boolean verifyCloseForPipeline(Pipeline pipeline,
PipelineActionsFromDatanode report) {
UUID uuidToFind = pipeline.getId().getId();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]