This is an automated email from the ASF dual-hosted git repository.
sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 841f3ce752e HDDS-13614. Create separate bounded executor for
ClosePipelineCommandHandler and CreatePipelineCommandHandlerHandler. (#8977)
841f3ce752e is described below
commit 841f3ce752e7f9570d7e7c3d32a4086b1136abb6
Author: Aswin Shakil Balasubramanian <[email protected]>
AuthorDate: Wed Sep 10 23:39:12 2025 -0700
HDDS-13614. Create separate bounded executor for
ClosePipelineCommandHandler and CreatePipelineCommandHandlerHandler. (#8977)
---
.../common/statemachine/DatanodeStateMachine.java | 33 +++--
.../ClosePipelineCommandHandler.java | 146 ++++++++++++---------
.../CreatePipelineCommandHandler.java | 117 ++++++++++-------
3 files changed, 180 insertions(+), 116 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index b5a8362eb78..5275c047d43 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -26,7 +26,9 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
@@ -96,7 +98,8 @@ public class DatanodeStateMachine implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(DatanodeStateMachine.class);
private final ExecutorService executorService;
- private final ExecutorService pipelineCommandExecutorService;
+ private final ExecutorService closePipelineCommandExecutorService;
+ private final ExecutorService createPipelineCommandExecutorService;
private final ConfigurationSource conf;
private final SCMConnectionManager connectionManager;
private final ECReconstructionCoordinator ecReconstructionCoordinator;
@@ -236,11 +239,24 @@ public DatanodeStateMachine(HddsDatanodeService
hddsDatanodeService,
// datanode clients.
DNContainerOperationClient dnClient = new DNContainerOperationClient(conf,
certClient, secretKeyClient);
- ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setNameFormat(threadNamePrefix + "PipelineCommandHandlerThread-%d")
+ // Create separate bounded executors for pipeline command handlers
+ ThreadFactory closePipelineThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat(threadNamePrefix +
"ClosePipelineCommandHandlerThread-%d")
.build();
- pipelineCommandExecutorService = Executors
- .newSingleThreadExecutor(threadFactory);
+ closePipelineCommandExecutorService = new ThreadPoolExecutor(
+ 1, 1,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(dnConf.getCommandQueueLimit()),
+ closePipelineThreadFactory);
+
+ ThreadFactory createPipelineThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat(threadNamePrefix +
"CreatePipelineCommandHandlerThread-%d")
+ .build();
+ createPipelineCommandExecutorService = new ThreadPoolExecutor(
+ 1, 1,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(dnConf.getCommandQueueLimit()),
+ createPipelineThreadFactory);
// When we add new handlers just adding a new handler here should do the
// trick.
@@ -257,9 +273,9 @@ public DatanodeStateMachine(HddsDatanodeService
hddsDatanodeService,
dnConf.getContainerDeleteThreads(), clock,
dnConf.getCommandQueueLimit(), threadNamePrefix))
.addHandler(new ClosePipelineCommandHandler(conf,
- pipelineCommandExecutorService))
+ closePipelineCommandExecutorService))
.addHandler(new CreatePipelineCommandHandler(conf,
- pipelineCommandExecutorService))
+ createPipelineCommandExecutorService))
.addHandler(new SetNodeOperationalStateCommandHandler(conf,
supervisor::nodeStateUpdated))
.addHandler(new FinalizeNewLayoutVersionCommandHandler())
@@ -436,7 +452,8 @@ public void close() throws IOException {
replicationSupervisorMetrics.unRegister();
ecReconstructionMetrics.unRegister();
executorServiceShutdownGraceful(executorService);
- executorServiceShutdownGraceful(pipelineCommandExecutorService);
+ executorServiceShutdownGraceful(closePipelineCommandExecutorService);
+ executorServiceShutdownGraceful(createPipelineCommandExecutorService);
if (connectionManager != null) {
connectionManager.close();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
index 1dd9ef24ecc..2e392ccf663 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -19,8 +19,12 @@
import java.io.IOException;
import java.util.Collection;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
@@ -62,6 +66,7 @@ public class ClosePipelineCommandHandler implements
CommandHandler {
private final Executor executor;
private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;
private final MutableRate opsLatencyMs;
+ private final Set<UUID> pipelinesInProgress;
/**
* Constructs a closePipelineCommand handler.
@@ -82,6 +87,7 @@ public ClosePipelineCommandHandler(
MetricsRegistry registry = new MetricsRegistry(
ClosePipelineCommandHandler.class.getSimpleName());
this.opsLatencyMs =
registry.newRate(SCMCommandProto.Type.closePipelineCommand + "Ms");
+ this.pipelinesInProgress = ConcurrentHashMap.newKeySet();
}
/**
@@ -95,70 +101,88 @@ public ClosePipelineCommandHandler(
@Override
public void handle(SCMCommand<?> command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
- queuedCount.incrementAndGet();
- CompletableFuture.runAsync(() -> {
- invocationCount.incrementAndGet();
- final long startTime = Time.monotonicNow();
- final DatanodeDetails dn = context.getParent().getDatanodeDetails();
- ClosePipelineCommand closePipelineCommand =
- (ClosePipelineCommand) command;
- final PipelineID pipelineID = closePipelineCommand.getPipelineID();
- final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
+ final ClosePipelineCommand closePipelineCommand = (ClosePipelineCommand)
command;
+ final PipelineID pipelineID = closePipelineCommand.getPipelineID();
+ final UUID pipelineUUID = pipelineID.getId();
+
+ // Check if this pipeline is already being processed
+ if (!pipelinesInProgress.add(pipelineUUID)) {
+ LOG.debug("Close Pipeline command for pipeline {} is already in
progress, " +
+ "skipping duplicate command.", pipelineID);
+ return;
+ }
+
+ try {
+ queuedCount.incrementAndGet();
+ CompletableFuture.runAsync(() -> {
+ invocationCount.incrementAndGet();
+ final long startTime = Time.monotonicNow();
+ final DatanodeDetails dn = context.getParent().getDatanodeDetails();
+ final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
- try {
- XceiverServerSpi server = ozoneContainer.getWriteChannel();
- if (server.isExist(pipelineIdProto)) {
- if (server instanceof XceiverServerRatis) {
- // TODO: Refactor Ratis logic to XceiverServerRatis
- // Propagate the group remove to the other Raft peers in the
pipeline
- XceiverServerRatis ratisServer = (XceiverServerRatis) server;
- final RaftGroupId raftGroupId =
RaftGroupId.valueOf(pipelineID.getId());
- final boolean shouldDeleteRatisLogDirectory =
ratisServer.getShouldDeleteRatisLogDirectory();
- // This might throw GroupMismatchException if the Ratis group has
been closed by other datanodes
- final Collection<RaftPeer> peers =
ratisServer.getRaftPeersInPipeline(pipelineID);
- // Try to send remove group for the other datanodes first,
ignoring GroupMismatchException
- // if the Ratis group has been closed in the other datanodes
- peers.stream()
- .filter(peer ->
!peer.getId().equals(ratisServer.getServer().getId()))
- .forEach(peer -> {
- try (RaftClient client = newRaftClient.apply(peer,
ozoneContainer.getTlsClientConfig())) {
- client.getGroupManagementApi(peer.getId())
- .remove(raftGroupId, shouldDeleteRatisLogDirectory,
!shouldDeleteRatisLogDirectory);
- } catch (GroupMismatchException ae) {
- // ignore silently since this means that the group has
been closed by earlier close pipeline
- // command in another datanode
- LOG.debug("Failed to remove group {} for pipeline {} on
peer {} since the group has " +
- "been removed by earlier close pipeline command
handled in another datanode", raftGroupId,
- pipelineID, peer.getId());
- } catch (IOException ioe) {
- LOG.warn("Failed to remove group {} of pipeline {} on peer
{}",
- raftGroupId, pipelineID, peer.getId(), ioe);
- }
- });
+ try {
+ XceiverServerSpi server = ozoneContainer.getWriteChannel();
+ if (server.isExist(pipelineIdProto)) {
+ if (server instanceof XceiverServerRatis) {
+ // TODO: Refactor Ratis logic to XceiverServerRatis
+ // Propagate the group remove to the other Raft peers in the
pipeline
+ XceiverServerRatis ratisServer = (XceiverServerRatis) server;
+ final RaftGroupId raftGroupId =
RaftGroupId.valueOf(pipelineID.getId());
+ final boolean shouldDeleteRatisLogDirectory =
ratisServer.getShouldDeleteRatisLogDirectory();
+ // This might throw GroupMismatchException if the Ratis group
has been closed by other datanodes
+ final Collection<RaftPeer> peers =
ratisServer.getRaftPeersInPipeline(pipelineID);
+ // Try to send remove group for the other datanodes first,
ignoring GroupMismatchException
+ // if the Ratis group has been closed in the other datanodes
+ peers.stream()
+ .filter(peer ->
!peer.getId().equals(ratisServer.getServer().getId()))
+ .forEach(peer -> {
+ try (RaftClient client = newRaftClient.apply(peer,
ozoneContainer.getTlsClientConfig())) {
+ client.getGroupManagementApi(peer.getId())
+ .remove(raftGroupId, shouldDeleteRatisLogDirectory,
!shouldDeleteRatisLogDirectory);
+ } catch (GroupMismatchException ae) {
+ // ignore silently since this means that the group has
been closed by earlier close pipeline
+ // command in another datanode
+ LOG.debug("Failed to remove group {} for pipeline {} on
peer {} since the group has " +
+ "been removed by earlier close pipeline command
handled in another datanode", raftGroupId,
+ pipelineID, peer.getId());
+ } catch (IOException ioe) {
+ LOG.warn("Failed to remove group {} of pipeline {} on
peer {}",
+ raftGroupId, pipelineID, peer.getId(), ioe);
+ }
+ });
+ }
+ // Remove the Ratis group from the current datanode pipeline,
might throw GroupMismatchException as
+ // well. It is a no-op for XceiverServerSpi implementations (e.g.
XceiverServerGrpc)
+ server.removeGroup(pipelineIdProto);
+ LOG.info("Close Pipeline {} command on datanode {}.", pipelineID,
dn);
+ } else {
+ LOG.debug("Ignoring close pipeline command for pipeline {} on
datanode {} " +
+ "as it does not exist", pipelineID, dn);
}
- // Remove the Ratis group from the current datanode pipeline, might
throw GroupMismatchException as
- // well. It is a no-op for XceiverServerSpi implementations (e.g.
XceiverServerGrpc)
- server.removeGroup(pipelineIdProto);
- LOG.info("Close Pipeline {} command on datanode {}.", pipelineID,
dn);
- } else {
- LOG.debug("Ignoring close pipeline command for pipeline {} on
datanode {} " +
- "as it does not exist", pipelineID, dn);
- }
- } catch (IOException e) {
- Throwable gme = HddsClientUtils.containsException(e,
GroupMismatchException.class);
- if (gme != null) {
- // ignore silently since this means that the group has been closed
by earlier close pipeline
- // command in another datanode
- LOG.debug("The group for pipeline {} on datanode {} has been removed
by earlier close " +
- "pipeline command handled in another datanode", pipelineID, dn);
- } else {
- LOG.error("Can't close pipeline {}", pipelineID, e);
+ } catch (IOException e) {
+ Throwable gme = HddsClientUtils.containsException(e,
GroupMismatchException.class);
+ if (gme != null) {
+ // ignore silently since this means that the group has been closed
by earlier close pipeline
+ // command in another datanode
+ LOG.debug("The group for pipeline {} on datanode {} has been
removed by earlier close " +
+ "pipeline command handled in another datanode", pipelineID,
dn);
+ } else {
+ LOG.error("Can't close pipeline {}", pipelineID, e);
+ }
+ } finally {
+ long endTime = Time.monotonicNow();
+ this.opsLatencyMs.add(endTime - startTime);
}
- } finally {
- long endTime = Time.monotonicNow();
- this.opsLatencyMs.add(endTime - startTime);
- }
- }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
+ }, executor).whenComplete((v, e) -> {
+ queuedCount.decrementAndGet();
+ pipelinesInProgress.remove(pipelineUUID);
+ });
+ } catch (RejectedExecutionException ex) {
+ queuedCount.decrementAndGet();
+ pipelinesInProgress.remove(pipelineUUID);
+ LOG.warn("Close Pipeline command for pipeline {} is rejected as " +
+ "command queue has reached max size.", pipelineID);
+ }
}
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
index 53734c9ffef..19f7b7c1633 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
@@ -19,8 +19,12 @@
import java.io.IOException;
import java.util.List;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
@@ -62,6 +66,7 @@ public class CreatePipelineCommandHandler implements
CommandHandler {
private final Executor executor;
private final MutableRate opsLatencyMs;
+ private final Set<UUID> pipelinesInProgress;
/**
* Constructs a createPipelineCommand handler.
@@ -79,6 +84,7 @@ public CreatePipelineCommandHandler(ConfigurationSource conf,
MetricsRegistry registry = new MetricsRegistry(
CreatePipelineCommandHandler.class.getSimpleName());
this.opsLatencyMs =
registry.newRate(SCMCommandProto.Type.createPipelineCommand + "Ms");
+ this.pipelinesInProgress = ConcurrentHashMap.newKeySet();
}
/**
@@ -92,55 +98,72 @@ public CreatePipelineCommandHandler(ConfigurationSource
conf,
@Override
public void handle(SCMCommand<?> command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
- queuedCount.incrementAndGet();
- CompletableFuture.runAsync(() -> {
- invocationCount.incrementAndGet();
- final long startTime = Time.monotonicNow();
- final DatanodeDetails dn = context.getParent()
- .getDatanodeDetails();
- final CreatePipelineCommand createCommand =
- (CreatePipelineCommand) command;
- final PipelineID pipelineID = createCommand.getPipelineID();
- final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
- final List<DatanodeDetails> peers = createCommand.getNodeList();
- final List<Integer> priorityList = createCommand.getPriorityList();
+ final CreatePipelineCommand createCommand = (CreatePipelineCommand)
command;
+ final PipelineID pipelineID = createCommand.getPipelineID();
+ final UUID pipelineUUID = pipelineID.getId();
+
+ // Check if this pipeline is already being processed
+ if (!pipelinesInProgress.add(pipelineUUID)) {
+ LOG.debug("Create Pipeline command for pipeline {} is already in
progress, " +
+ "skipping duplicate command.", pipelineID);
+ return;
+ }
+
+ try {
+ queuedCount.incrementAndGet();
+ CompletableFuture.runAsync(() -> {
+ invocationCount.incrementAndGet();
+ final long startTime = Time.monotonicNow();
+ final DatanodeDetails dn = context.getParent().getDatanodeDetails();
+ final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
+ final List<DatanodeDetails> peers = createCommand.getNodeList();
+ final List<Integer> priorityList = createCommand.getPriorityList();
- try {
- XceiverServerSpi server = ozoneContainer.getWriteChannel();
- if (!server.isExist(pipelineIdProto)) {
- final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId());
- final RaftGroup group =
- RatisHelper.newRaftGroup(groupId, peers, priorityList);
- server.addGroup(pipelineIdProto, peers, priorityList);
- peers.stream().filter(d -> !d.getID().equals(dn.getID()))
- .forEach(d -> {
- final RaftPeer peer = RatisHelper.toRaftPeer(d);
- try (RaftClient client = newRaftClient.apply(peer,
- ozoneContainer.getTlsClientConfig())) {
- client.getGroupManagementApi(peer.getId()).add(group);
- } catch (AlreadyExistsException ae) {
- // do not log
- } catch (IOException ioe) {
- LOG.warn("Add group failed for {}", d, ioe);
- }
- });
- LOG.info("Created Pipeline {} {} {}.",
- createCommand.getReplicationType(), createCommand.getFactor(),
- pipelineID);
+ try {
+ XceiverServerSpi server = ozoneContainer.getWriteChannel();
+ if (!server.isExist(pipelineIdProto)) {
+ final RaftGroupId groupId =
RaftGroupId.valueOf(pipelineID.getId());
+ final RaftGroup group =
+ RatisHelper.newRaftGroup(groupId, peers, priorityList);
+ server.addGroup(pipelineIdProto, peers, priorityList);
+ peers.stream().filter(d -> !d.getID().equals(dn.getID()))
+ .forEach(d -> {
+ final RaftPeer peer = RatisHelper.toRaftPeer(d);
+ try (RaftClient client = newRaftClient.apply(peer,
+ ozoneContainer.getTlsClientConfig())) {
+ client.getGroupManagementApi(peer.getId()).add(group);
+ } catch (AlreadyExistsException ae) {
+ // do not log
+ } catch (IOException ioe) {
+ LOG.warn("Add group failed for {}", d, ioe);
+ }
+ });
+ LOG.info("Created Pipeline {} {} {}.",
+ createCommand.getReplicationType(), createCommand.getFactor(),
+ pipelineID);
+ }
+ } catch (IOException e) {
+ // The server.addGroup may exec after a getGroupManagementApi call
+ // from another peer, so we may got an AlreadyExistsException.
+ if (!(e.getCause() instanceof AlreadyExistsException)) {
+ LOG.error("Can't create pipeline {} {} {}",
+ createCommand.getReplicationType(),
+ createCommand.getFactor(), pipelineID, e);
+ }
+ } finally {
+ long endTime = Time.monotonicNow();
+ this.opsLatencyMs.add(endTime - startTime);
}
- } catch (IOException e) {
- // The server.addGroup may exec after a getGroupManagementApi call
- // from another peer, so we may got an AlreadyExistsException.
- if (!(e.getCause() instanceof AlreadyExistsException)) {
- LOG.error("Can't create pipeline {} {} {}",
- createCommand.getReplicationType(),
- createCommand.getFactor(), pipelineID, e);
- }
- } finally {
- long endTime = Time.monotonicNow();
- this.opsLatencyMs.add(endTime - startTime);
- }
- }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
+ }, executor).whenComplete((v, e) -> {
+ queuedCount.decrementAndGet();
+ pipelinesInProgress.remove(pipelineUUID);
+ });
+ } catch (RejectedExecutionException ex) {
+ queuedCount.decrementAndGet();
+ pipelinesInProgress.remove(pipelineUUID);
+ LOG.warn("Create Pipeline command for pipeline {} is rejected as " +
+ "command queue has reached max size.", pipelineID);
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]