This is an automated email from the ASF dual-hosted git repository.

sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 841f3ce752e HDDS-13614. Create separate bounded executor for 
ClosePipelineCommandHandler and CreatePipelineCommandHandlerHandler. (#8977)
841f3ce752e is described below

commit 841f3ce752e7f9570d7e7c3d32a4086b1136abb6
Author: Aswin Shakil Balasubramanian <[email protected]>
AuthorDate: Wed Sep 10 23:39:12 2025 -0700

    HDDS-13614. Create separate bounded executor for 
ClosePipelineCommandHandler and CreatePipelineCommandHandlerHandler. (#8977)
---
 .../common/statemachine/DatanodeStateMachine.java  |  33 +++--
 .../ClosePipelineCommandHandler.java               | 146 ++++++++++++---------
 .../CreatePipelineCommandHandler.java              | 117 ++++++++++-------
 3 files changed, 180 insertions(+), 116 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index b5a8362eb78..5275c047d43 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -26,7 +26,9 @@
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -96,7 +98,8 @@ public class DatanodeStateMachine implements Closeable {
   private static final Logger LOG =
       LoggerFactory.getLogger(DatanodeStateMachine.class);
   private final ExecutorService executorService;
-  private final ExecutorService pipelineCommandExecutorService;
+  private final ExecutorService closePipelineCommandExecutorService;
+  private final ExecutorService createPipelineCommandExecutorService;
   private final ConfigurationSource conf;
   private final SCMConnectionManager connectionManager;
   private final ECReconstructionCoordinator ecReconstructionCoordinator;
@@ -236,11 +239,24 @@ public DatanodeStateMachine(HddsDatanodeService 
hddsDatanodeService,
     //  datanode clients.
     DNContainerOperationClient dnClient = new DNContainerOperationClient(conf, 
certClient, secretKeyClient);
 
-    ThreadFactory threadFactory = new ThreadFactoryBuilder()
-        .setNameFormat(threadNamePrefix + "PipelineCommandHandlerThread-%d")
+    // Create separate bounded executors for pipeline command handlers
+    ThreadFactory closePipelineThreadFactory = new ThreadFactoryBuilder()
+        .setNameFormat(threadNamePrefix + 
"ClosePipelineCommandHandlerThread-%d")
         .build();
-    pipelineCommandExecutorService = Executors
-        .newSingleThreadExecutor(threadFactory);
+    closePipelineCommandExecutorService = new ThreadPoolExecutor(
+        1, 1,
+        0L, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<>(dnConf.getCommandQueueLimit()),
+        closePipelineThreadFactory);
+
+    ThreadFactory createPipelineThreadFactory = new ThreadFactoryBuilder()
+        .setNameFormat(threadNamePrefix + 
"CreatePipelineCommandHandlerThread-%d")
+        .build();
+    createPipelineCommandExecutorService = new ThreadPoolExecutor(
+        1, 1,
+        0L, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<>(dnConf.getCommandQueueLimit()),
+        createPipelineThreadFactory);
 
     // When we add new handlers just adding a new handler here should do the
     // trick.
@@ -257,9 +273,9 @@ public DatanodeStateMachine(HddsDatanodeService 
hddsDatanodeService,
             dnConf.getContainerDeleteThreads(), clock,
             dnConf.getCommandQueueLimit(), threadNamePrefix))
         .addHandler(new ClosePipelineCommandHandler(conf,
-            pipelineCommandExecutorService))
+            closePipelineCommandExecutorService))
         .addHandler(new CreatePipelineCommandHandler(conf,
-            pipelineCommandExecutorService))
+            createPipelineCommandExecutorService))
         .addHandler(new SetNodeOperationalStateCommandHandler(conf,
             supervisor::nodeStateUpdated))
         .addHandler(new FinalizeNewLayoutVersionCommandHandler())
@@ -436,7 +452,8 @@ public void close() throws IOException {
     replicationSupervisorMetrics.unRegister();
     ecReconstructionMetrics.unRegister();
     executorServiceShutdownGraceful(executorService);
-    executorServiceShutdownGraceful(pipelineCommandExecutorService);
+    executorServiceShutdownGraceful(closePipelineCommandExecutorService);
+    executorServiceShutdownGraceful(createPipelineCommandExecutorService);
 
     if (connectionManager != null) {
       connectionManager.close();
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
index 1dd9ef24ecc..2e392ccf663 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -19,8 +19,12 @@
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
@@ -62,6 +66,7 @@ public class ClosePipelineCommandHandler implements 
CommandHandler {
   private final Executor executor;
   private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;
   private final MutableRate opsLatencyMs;
+  private final Set<UUID> pipelinesInProgress;
 
   /**
    * Constructs a closePipelineCommand handler.
@@ -82,6 +87,7 @@ public ClosePipelineCommandHandler(
     MetricsRegistry registry = new MetricsRegistry(
         ClosePipelineCommandHandler.class.getSimpleName());
     this.opsLatencyMs = 
registry.newRate(SCMCommandProto.Type.closePipelineCommand + "Ms");
+    this.pipelinesInProgress = ConcurrentHashMap.newKeySet();
   }
 
   /**
@@ -95,70 +101,88 @@ public ClosePipelineCommandHandler(
   @Override
   public void handle(SCMCommand<?> command, OzoneContainer ozoneContainer,
       StateContext context, SCMConnectionManager connectionManager) {
-    queuedCount.incrementAndGet();
-    CompletableFuture.runAsync(() -> {
-      invocationCount.incrementAndGet();
-      final long startTime = Time.monotonicNow();
-      final DatanodeDetails dn = context.getParent().getDatanodeDetails();
-      ClosePipelineCommand closePipelineCommand =
-          (ClosePipelineCommand) command;
-      final PipelineID pipelineID = closePipelineCommand.getPipelineID();
-      final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
+    final ClosePipelineCommand closePipelineCommand = (ClosePipelineCommand) 
command;
+    final PipelineID pipelineID = closePipelineCommand.getPipelineID();
+    final UUID pipelineUUID = pipelineID.getId();
+    
+    // Check if this pipeline is already being processed
+    if (!pipelinesInProgress.add(pipelineUUID)) {
+      LOG.debug("Close Pipeline command for pipeline {} is already in 
progress, " +
+          "skipping duplicate command.", pipelineID);
+      return;
+    }
+    
+    try {
+      queuedCount.incrementAndGet();
+      CompletableFuture.runAsync(() -> {
+        invocationCount.incrementAndGet();
+        final long startTime = Time.monotonicNow();
+        final DatanodeDetails dn = context.getParent().getDatanodeDetails();
+        final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
 
-      try {
-        XceiverServerSpi server = ozoneContainer.getWriteChannel();
-        if (server.isExist(pipelineIdProto)) {
-          if (server instanceof XceiverServerRatis) {
-            // TODO: Refactor Ratis logic to XceiverServerRatis
-            // Propagate the group remove to the other Raft peers in the 
pipeline
-            XceiverServerRatis ratisServer = (XceiverServerRatis) server;
-            final RaftGroupId raftGroupId = 
RaftGroupId.valueOf(pipelineID.getId());
-            final boolean shouldDeleteRatisLogDirectory = 
ratisServer.getShouldDeleteRatisLogDirectory();
-            // This might throw GroupMismatchException if the Ratis group has 
been closed by other datanodes
-            final Collection<RaftPeer> peers = 
ratisServer.getRaftPeersInPipeline(pipelineID);
-            // Try to send remove group for the other datanodes first, 
ignoring GroupMismatchException
-            // if the Ratis group has been closed in the other datanodes
-            peers.stream()
-                .filter(peer -> 
!peer.getId().equals(ratisServer.getServer().getId()))
-                .forEach(peer -> {
-                  try (RaftClient client = newRaftClient.apply(peer, 
ozoneContainer.getTlsClientConfig())) {
-                    client.getGroupManagementApi(peer.getId())
-                        .remove(raftGroupId, shouldDeleteRatisLogDirectory, 
!shouldDeleteRatisLogDirectory);
-                  } catch (GroupMismatchException ae) {
-                    // ignore silently since this means that the group has 
been closed by earlier close pipeline
-                    // command in another datanode
-                    LOG.debug("Failed to remove group {} for pipeline {} on 
peer {} since the group has " +
-                        "been removed by earlier close pipeline command 
handled in another datanode", raftGroupId,
-                        pipelineID, peer.getId());
-                  } catch (IOException ioe) {
-                    LOG.warn("Failed to remove group {} of pipeline {} on peer 
{}",
-                        raftGroupId, pipelineID, peer.getId(), ioe);
-                  }
-                });
+        try {
+          XceiverServerSpi server = ozoneContainer.getWriteChannel();
+          if (server.isExist(pipelineIdProto)) {
+            if (server instanceof XceiverServerRatis) {
+              // TODO: Refactor Ratis logic to XceiverServerRatis
+              // Propagate the group remove to the other Raft peers in the 
pipeline
+              XceiverServerRatis ratisServer = (XceiverServerRatis) server;
+              final RaftGroupId raftGroupId = 
RaftGroupId.valueOf(pipelineID.getId());
+              final boolean shouldDeleteRatisLogDirectory = 
ratisServer.getShouldDeleteRatisLogDirectory();
+              // This might throw GroupMismatchException if the Ratis group 
has been closed by other datanodes
+              final Collection<RaftPeer> peers = 
ratisServer.getRaftPeersInPipeline(pipelineID);
+              // Try to send remove group for the other datanodes first, 
ignoring GroupMismatchException
+              // if the Ratis group has been closed in the other datanodes
+              peers.stream()
+                  .filter(peer -> 
!peer.getId().equals(ratisServer.getServer().getId()))
+                  .forEach(peer -> {
+                    try (RaftClient client = newRaftClient.apply(peer, 
ozoneContainer.getTlsClientConfig())) {
+                      client.getGroupManagementApi(peer.getId())
+                          .remove(raftGroupId, shouldDeleteRatisLogDirectory, 
!shouldDeleteRatisLogDirectory);
+                    } catch (GroupMismatchException ae) {
+                      // ignore silently since this means that the group has 
been closed by earlier close pipeline
+                      // command in another datanode
+                      LOG.debug("Failed to remove group {} for pipeline {} on 
peer {} since the group has " +
+                          "been removed by earlier close pipeline command 
handled in another datanode", raftGroupId,
+                          pipelineID, peer.getId());
+                    } catch (IOException ioe) {
+                      LOG.warn("Failed to remove group {} of pipeline {} on 
peer {}",
+                          raftGroupId, pipelineID, peer.getId(), ioe);
+                    }
+                  });
+            }
+            // Remove the Ratis group from the current datanode pipeline, 
might throw GroupMismatchException as
+            // well. It is a no-op for XceiverServerSpi implementations (e.g. 
XceiverServerGrpc)
+            server.removeGroup(pipelineIdProto);
+            LOG.info("Close Pipeline {} command on datanode {}.", pipelineID, 
dn);
+          } else {
+            LOG.debug("Ignoring close pipeline command for pipeline {} on 
datanode {} " +
+                "as it does not exist", pipelineID, dn);
           }
-          // Remove the Ratis group from the current datanode pipeline, might 
throw GroupMismatchException as
-          // well. It is a no-op for XceiverServerSpi implementations (e.g. 
XceiverServerGrpc)
-          server.removeGroup(pipelineIdProto);
-          LOG.info("Close Pipeline {} command on datanode {}.", pipelineID, 
dn);
-        } else {
-          LOG.debug("Ignoring close pipeline command for pipeline {} on 
datanode {} " +
-              "as it does not exist", pipelineID, dn);
-        }
-      } catch (IOException e) {
-        Throwable gme = HddsClientUtils.containsException(e, 
GroupMismatchException.class);
-        if (gme != null) {
-          // ignore silently since this means that the group has been closed 
by earlier close pipeline
-          // command in another datanode
-          LOG.debug("The group for pipeline {} on datanode {} has been removed 
by earlier close " +
-              "pipeline command handled in another datanode", pipelineID, dn);
-        } else {
-          LOG.error("Can't close pipeline {}", pipelineID, e);
+        } catch (IOException e) {
+          Throwable gme = HddsClientUtils.containsException(e, 
GroupMismatchException.class);
+          if (gme != null) {
+            // ignore silently since this means that the group has been closed 
by earlier close pipeline
+            // command in another datanode
+            LOG.debug("The group for pipeline {} on datanode {} has been 
removed by earlier close " +
+                "pipeline command handled in another datanode", pipelineID, 
dn);
+          } else {
+            LOG.error("Can't close pipeline {}", pipelineID, e);
+          }
+        } finally {
+          long endTime = Time.monotonicNow();
+          this.opsLatencyMs.add(endTime - startTime);
         }
-      } finally {
-        long endTime = Time.monotonicNow();
-        this.opsLatencyMs.add(endTime - startTime);
-      }
-    }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
+      }, executor).whenComplete((v, e) -> {
+        queuedCount.decrementAndGet();
+        pipelinesInProgress.remove(pipelineUUID);
+      });
+    } catch (RejectedExecutionException ex) {
+      queuedCount.decrementAndGet();
+      pipelinesInProgress.remove(pipelineUUID);
+      LOG.warn("Close Pipeline command for pipeline {} is rejected as " +
+          "command queue has reached max size.", pipelineID);
+    }
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
index 53734c9ffef..19f7b7c1633 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
@@ -19,8 +19,12 @@
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
@@ -62,6 +66,7 @@ public class CreatePipelineCommandHandler implements 
CommandHandler {
 
   private final Executor executor;
   private final MutableRate opsLatencyMs;
+  private final Set<UUID> pipelinesInProgress;
 
   /**
    * Constructs a createPipelineCommand handler.
@@ -79,6 +84,7 @@ public CreatePipelineCommandHandler(ConfigurationSource conf,
     MetricsRegistry registry = new MetricsRegistry(
         CreatePipelineCommandHandler.class.getSimpleName());
     this.opsLatencyMs = 
registry.newRate(SCMCommandProto.Type.createPipelineCommand + "Ms");
+    this.pipelinesInProgress = ConcurrentHashMap.newKeySet();
   }
 
   /**
@@ -92,55 +98,72 @@ public CreatePipelineCommandHandler(ConfigurationSource 
conf,
   @Override
   public void handle(SCMCommand<?> command, OzoneContainer ozoneContainer,
       StateContext context, SCMConnectionManager connectionManager) {
-    queuedCount.incrementAndGet();
-    CompletableFuture.runAsync(() -> {
-      invocationCount.incrementAndGet();
-      final long startTime = Time.monotonicNow();
-      final DatanodeDetails dn = context.getParent()
-          .getDatanodeDetails();
-      final CreatePipelineCommand createCommand =
-          (CreatePipelineCommand) command;
-      final PipelineID pipelineID = createCommand.getPipelineID();
-      final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
-      final List<DatanodeDetails> peers = createCommand.getNodeList();
-      final List<Integer> priorityList = createCommand.getPriorityList();
+    final CreatePipelineCommand createCommand = (CreatePipelineCommand) 
command;
+    final PipelineID pipelineID = createCommand.getPipelineID();
+    final UUID pipelineUUID = pipelineID.getId();
+    
+    // Check if this pipeline is already being processed
+    if (!pipelinesInProgress.add(pipelineUUID)) {
+      LOG.debug("Create Pipeline command for pipeline {} is already in 
progress, " +
+          "skipping duplicate command.", pipelineID);
+      return;
+    }
+    
+    try {
+      queuedCount.incrementAndGet();
+      CompletableFuture.runAsync(() -> {
+        invocationCount.incrementAndGet();
+        final long startTime = Time.monotonicNow();
+        final DatanodeDetails dn = context.getParent().getDatanodeDetails();
+        final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
+        final List<DatanodeDetails> peers = createCommand.getNodeList();
+        final List<Integer> priorityList = createCommand.getPriorityList();
 
-      try {
-        XceiverServerSpi server = ozoneContainer.getWriteChannel();
-        if (!server.isExist(pipelineIdProto)) {
-          final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId());
-          final RaftGroup group =
-              RatisHelper.newRaftGroup(groupId, peers, priorityList);
-          server.addGroup(pipelineIdProto, peers, priorityList);
-          peers.stream().filter(d -> !d.getID().equals(dn.getID()))
-              .forEach(d -> {
-                final RaftPeer peer = RatisHelper.toRaftPeer(d);
-                try (RaftClient client = newRaftClient.apply(peer,
-                    ozoneContainer.getTlsClientConfig())) {
-                  client.getGroupManagementApi(peer.getId()).add(group);
-                } catch (AlreadyExistsException ae) {
-                  // do not log
-                } catch (IOException ioe) {
-                  LOG.warn("Add group failed for {}", d, ioe);
-                }
-              });
-          LOG.info("Created Pipeline {} {} {}.",
-              createCommand.getReplicationType(), createCommand.getFactor(),
-              pipelineID);
+        try {
+          XceiverServerSpi server = ozoneContainer.getWriteChannel();
+          if (!server.isExist(pipelineIdProto)) {
+            final RaftGroupId groupId = 
RaftGroupId.valueOf(pipelineID.getId());
+            final RaftGroup group =
+                RatisHelper.newRaftGroup(groupId, peers, priorityList);
+            server.addGroup(pipelineIdProto, peers, priorityList);
+            peers.stream().filter(d -> !d.getID().equals(dn.getID()))
+                .forEach(d -> {
+                  final RaftPeer peer = RatisHelper.toRaftPeer(d);
+                  try (RaftClient client = newRaftClient.apply(peer,
+                      ozoneContainer.getTlsClientConfig())) {
+                    client.getGroupManagementApi(peer.getId()).add(group);
+                  } catch (AlreadyExistsException ae) {
+                    // do not log
+                  } catch (IOException ioe) {
+                    LOG.warn("Add group failed for {}", d, ioe);
+                  }
+                });
+            LOG.info("Created Pipeline {} {} {}.",
+                createCommand.getReplicationType(), createCommand.getFactor(),
+                pipelineID);
+          }
+        } catch (IOException e) {
+          // The server.addGroup may exec after a getGroupManagementApi call
+          // from another peer, so we may got an AlreadyExistsException.
+          if (!(e.getCause() instanceof AlreadyExistsException)) {
+            LOG.error("Can't create pipeline {} {} {}",
+                createCommand.getReplicationType(),
+                createCommand.getFactor(), pipelineID, e);
+          }
+        } finally {
+          long endTime = Time.monotonicNow();
+          this.opsLatencyMs.add(endTime - startTime);
         }
-      } catch (IOException e) {
-        // The server.addGroup may exec after a getGroupManagementApi call
-        // from another peer, so we may got an AlreadyExistsException.
-        if (!(e.getCause() instanceof AlreadyExistsException)) {
-          LOG.error("Can't create pipeline {} {} {}",
-              createCommand.getReplicationType(),
-              createCommand.getFactor(), pipelineID, e);
-        }
-      } finally {
-        long endTime = Time.monotonicNow();
-        this.opsLatencyMs.add(endTime - startTime);
-      }
-    }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
+      }, executor).whenComplete((v, e) -> {
+        queuedCount.decrementAndGet();
+        pipelinesInProgress.remove(pipelineUUID);
+      });
+    } catch (RejectedExecutionException ex) {
+      queuedCount.decrementAndGet();
+      pipelinesInProgress.remove(pipelineUUID);
+      LOG.warn("Create Pipeline command for pipeline {} is rejected as " +
+          "command queue has reached max size.", pipelineID);
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to