This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d4cd1c4c6a HDDS-14384. Cleanup PipelineProvider and related classes 
(#9608)
6d4cd1c4c6a is described below

commit 6d4cd1c4c6a9bf36c34dab336d474848f1d33205
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jan 12 04:56:33 2026 -0800

    HDDS-14384. Cleanup PipelineProvider and related classes (#9608)
---
 .../hadoop/hdds/client/ReplicationFactor.java      | 34 ++++++++--------------
 .../hadoop/hdds/scm/SCMCommonPlacementPolicy.java  |  9 ++----
 .../hadoop/hdds/scm/node/SCMNodeManager.java       | 10 +++----
 .../hdds/scm/pipeline/ECPipelineProvider.java      |  4 ---
 .../hadoop/hdds/scm/pipeline/PipelineFactory.java  |  4 ---
 .../hdds/scm/pipeline/PipelineManagerImpl.java     |  4 +--
 .../hadoop/hdds/scm/pipeline/PipelineProvider.java |  9 ++----
 .../hdds/scm/pipeline/RatisPipelineProvider.java   | 20 ++++---------
 .../hdds/scm/pipeline/SimplePipelineProvider.java  |  4 ---
 .../scm/pipeline/MockRatisPipelineProvider.java    |  5 ----
 .../ozone/recon/scm/ReconPipelineFactory.java      |  4 ---
 11 files changed, 28 insertions(+), 79 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java
index c15f20424fe..9f47cc2fd26 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java
@@ -23,21 +23,8 @@
  * The replication factor to be used while writing key into ozone.
  */
 public enum ReplicationFactor {
-  ONE(1),
-  THREE(3);
-
-  /**
-   * Integer representation of replication.
-   */
-  private int value;
-
-  /**
-   * Initializes ReplicationFactor with value.
-   * @param value replication value
-   */
-  ReplicationFactor(int value) {
-    this.value = value;
-  }
+  ONE,
+  THREE;
 
   /**
    * Returns enum value corresponding to the int value.
@@ -85,16 +72,19 @@ public HddsProtos.ReplicationFactor toProto() {
     case THREE:
       return HddsProtos.ReplicationFactor.THREE;
     default:
-      throw new IllegalArgumentException(
-          "Unsupported ProtoBuf replication factor: " + this);
+      throw new IllegalStateException("Unexpected enum value: " + this);
     }
   }
 
-  /**
-   * Returns integer representation of ReplicationFactor.
-   * @return replication value
-   */
+  /** @return the number of replication(s). */
   public int getValue() {
-    return value;
+    switch (this) {
+    case ONE:
+      return 1;
+    case THREE:
+      return 3;
+    default:
+      throw new IllegalStateException("Unexpected enum value: " + this);
+    }
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
index 934e13bb53b..c51f6d0429f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
@@ -273,7 +273,7 @@ public List<DatanodeDetails> 
filterNodesWithSpace(List<DatanodeDetails> nodes,
       int nodesRequired, long metadataSizeRequired, long dataSizeRequired)
       throws SCMException {
     List<DatanodeDetails> nodesWithSpace = nodes.stream().filter(d ->
-        hasEnoughSpace(d, metadataSizeRequired, dataSizeRequired, conf))
+        hasEnoughSpace(d, metadataSizeRequired, dataSizeRequired))
         .collect(Collectors.toList());
 
     if (nodesWithSpace.size() < nodesRequired) {
@@ -298,8 +298,7 @@ public List<DatanodeDetails> 
filterNodesWithSpace(List<DatanodeDetails> nodes,
    */
   public static boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
                                        long metadataSizeRequired,
-                                       long dataSizeRequired,
-                                       ConfigurationSource conf) {
+                                       long dataSizeRequired) {
     Preconditions.checkArgument(datanodeDetails instanceof DatanodeInfo);
 
     boolean enoughForData = false;
@@ -523,9 +522,7 @@ public boolean isValidNode(DatanodeDetails datanodeDetails,
       return false;
     }
     NodeStatus nodeStatus = datanodeInfo.getNodeStatus();
-    if (nodeStatus.isNodeWritable() &&
-        (hasEnoughSpace(datanodeInfo, metadataSizeRequired,
-            dataSizeRequired, conf))) {
+    if (nodeStatus.isNodeWritable() && (hasEnoughSpace(datanodeInfo, 
metadataSizeRequired, dataSizeRequired))) {
       LOG.debug("Datanode {} is chosen. Required metadata size is {} and " +
               "required data size is {} and NodeStatus is {}",
           datanodeDetails, metadataSizeRequired, dataSizeRequired, nodeStatus);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 4af4465a95a..c3ef2238769 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -127,7 +127,6 @@ public class SCMNodeManager implements NodeManager {
   private final SCMNodeMetrics metrics;
   // Node manager MXBean
   private ObjectName nmInfoBean;
-  private final OzoneConfiguration conf;
   private final SCMStorageConfig scmStorageConfig;
   private final NetworkTopology clusterMap;
   private final Function<String, String> nodeResolver;
@@ -140,6 +139,7 @@ public class SCMNodeManager implements NodeManager {
   private final SCMContext scmContext;
   private final Map<SCMCommandProto.Type,
       BiConsumer<DatanodeDetails, SCMCommand<?>>> sendCommandNotifyMap;
+  private final NonWritableNodeFilter nonWritableNodeFilter;
 
   /**
    * Lock used to synchronize some operation in Node manager to ensure a
@@ -176,7 +176,6 @@ public SCMNodeManager(
       SCMContext scmContext,
       HDDSLayoutVersionManager layoutVersionManager,
       Function<String, String> nodeResolver) {
-    this.conf = conf;
     this.scmNodeEventPublisher = eventPublisher;
     this.nodeStateManager = new NodeStateManager(conf, eventPublisher,
         layoutVersionManager, scmContext);
@@ -199,6 +198,7 @@ public SCMNodeManager(
     this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
     this.scmContext = scmContext;
     this.sendCommandNotifyMap = new HashMap<>();
+    this.nonWritableNodeFilter = new NonWritableNodeFilter(conf);
   }
 
   @Override
@@ -1394,7 +1394,7 @@ private void nodeSpaceStatistics(Map<String, String> 
nodeStatics) {
 
   private void nodeNonWritableStatistics(Map<String, String> nodeStatics) {
     int nonWritableNodesCount = (int) getAllNodes().parallelStream()
-        .filter(new NonWritableNodeFilter(conf))
+        .filter(nonWritableNodeFilter)
         .count();
 
     nodeStatics.put("NonWritableNodes", String.valueOf(nonWritableNodesCount));
@@ -1405,7 +1405,6 @@ static class NonWritableNodeFilter implements 
Predicate<DatanodeInfo> {
     private final long blockSize;
     private final long minRatisVolumeSizeBytes;
     private final long containerSize;
-    private final ConfigurationSource conf;
 
     NonWritableNodeFilter(ConfigurationSource conf) {
       blockSize = (long) conf.getStorageSize(
@@ -1420,13 +1419,12 @@ static class NonWritableNodeFilter implements 
Predicate<DatanodeInfo> {
           ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
           ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
           StorageUnit.BYTES);
-      this.conf = conf;
     }
 
     @Override
     public boolean test(DatanodeInfo dn) {
       return !dn.getNodeStatus().isNodeWritable()
-          || (!hasEnoughSpace(dn, minRatisVolumeSizeBytes, containerSize, conf)
+          || (!hasEnoughSpace(dn, minRatisVolumeSizeBytes, containerSize)
           && !hasEnoughCommittedVolumeSpace(dn));
     }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
index c1f14d8cc65..f9b94bd8d0d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
@@ -148,8 +148,4 @@ private Pipeline createPipelineInternal(ECReplicationConfig 
repConfig,
   protected void close(Pipeline pipeline) throws IOException {
   }
 
-  @Override
-  protected void shutdown() {
-  }
-
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index c07cfc4ff15..e55947d0d4d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -123,10 +123,6 @@ public void close(ReplicationType type, Pipeline pipeline)
     providers.get(type).close(pipeline);
   }
 
-  public void shutdown() {
-    providers.values().forEach(provider -> provider.shutdown());
-  }
-
   @VisibleForTesting
   public Map<ReplicationType, PipelineProvider> getProviders() {
     return providers;
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 32474b04a81..d0d261b21fd 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -641,7 +641,7 @@ public boolean hasEnoughSpace(Pipeline pipeline, long 
containerSize) {
       if (!(node instanceof DatanodeInfo)) {
         node = nodeManager.getDatanodeInfo(node);
       }
-      if (!SCMCommonPlacementPolicy.hasEnoughSpace(node, 0, containerSize, 
null)) {
+      if (!SCMCommonPlacementPolicy.hasEnoughSpace(node, 0, containerSize)) {
         return false;
       }
     }
@@ -842,8 +842,6 @@ public void close() throws IOException {
 
     SCMPipelineMetrics.unRegister();
 
-    // shutdown pipeline provider.
-    pipelineFactory.shutdown();
     try {
       stateManager.close();
     } catch (Exception ex) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index 99ace4b981f..b1b2d734906 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -23,7 +23,6 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
@@ -81,18 +80,14 @@ protected abstract Pipeline createForRead(
 
   protected abstract void close(Pipeline pipeline) throws IOException;
 
-  protected abstract void shutdown();
-
   List<DatanodeDetails> pickNodesNotUsed(REPLICATION_CONFIG replicationConfig,
                                          long metadataSizeRequired,
-                                         long dataSizeRequired,
-                                         ConfigurationSource conf)
+                                         long dataSizeRequired)
       throws SCMException {
     int nodesRequired = replicationConfig.getRequiredNodes();
     List<DatanodeDetails> healthyDNs = pickAllNodesNotUsed(replicationConfig);
     List<DatanodeDetails> healthyDNsWithSpace = healthyDNs.stream()
-        .filter(dn -> SCMCommonPlacementPolicy
-            .hasEnoughSpace(dn, metadataSizeRequired, dataSizeRequired, conf))
+        .filter(dn -> SCMCommonPlacementPolicy.hasEnoughSpace(dn, 
metadataSizeRequired, dataSizeRequired))
         .limit(nodesRequired)
         .collect(Collectors.toList());
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 491e4d15adf..b35a1f28148 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -57,11 +57,10 @@ public class RatisPipelineProvider
   private static final Logger LOG =
       LoggerFactory.getLogger(RatisPipelineProvider.class);
 
-  private final ConfigurationSource conf;
   private final EventPublisher eventPublisher;
   private final PlacementPolicy placementPolicy;
-  private int pipelineNumberLimit;
-  private int maxPipelinePerDatanode;
+  private final int pipelineNumberLimit;
+  private final int maxPipelinePerDatanode;
   private final LeaderChoosePolicy leaderChoosePolicy;
   private final SCMContext scmContext;
   private final long containerSizeBytes;
@@ -74,7 +73,6 @@ public RatisPipelineProvider(NodeManager nodeManager,
                                EventPublisher eventPublisher,
                                SCMContext scmContext) {
     super(nodeManager, stateManager);
-    this.conf = conf;
     this.eventPublisher = eventPublisher;
     this.scmContext = scmContext;
     this.placementPolicy = PipelinePlacementPolicyFactory
@@ -85,11 +83,11 @@ public RatisPipelineProvider(NodeManager nodeManager,
     String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
     this.maxPipelinePerDatanode = dnLimit == null ? 0 :
         Integer.parseInt(dnLimit);
-    this.containerSizeBytes = (long) this.conf.getStorageSize(
+    this.containerSizeBytes = (long) conf.getStorageSize(
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
         StorageUnit.BYTES);
-    this.minRatisVolumeSizeBytes = (long) this.conf.getStorageSize(
+    this.minRatisVolumeSizeBytes = (long) conf.getStorageSize(
         ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN,
         ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN_DEFAULT,
         StorageUnit.BYTES);
@@ -158,14 +156,12 @@ public synchronized Pipeline 
create(RatisReplicationConfig replicationConfig,
       );
     }
 
-    List<DatanodeDetails> dns;
-
+    final List<DatanodeDetails> dns;
     final ReplicationFactor factor =
         replicationConfig.getReplicationFactor();
     switch (factor) {
     case ONE:
-      dns = pickNodesNotUsed(replicationConfig, minRatisVolumeSizeBytes,
-          containerSizeBytes, conf);
+      dns = pickNodesNotUsed(replicationConfig, minRatisVolumeSizeBytes, 
containerSizeBytes);
       break;
     case THREE:
       List<DatanodeDetails> excludeDueToEngagement = 
filterPipelineEngagement();
@@ -251,10 +247,6 @@ private List<DatanodeDetails> filterPipelineEngagement() {
     return excluded;
   }
 
-  @Override
-  public void shutdown() {
-  }
-
   /**
    * Removes pipeline from SCM. Sends command to destroy pipeline on all
    * the datanodes.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index 85be042eccc..a7cfd4bd597 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -95,8 +95,4 @@ public void close(Pipeline pipeline) throws IOException {
 
   }
 
-  @Override
-  public void shutdown() {
-    // Do nothing.
-  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index 82c661210c6..933465c0245 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -91,11 +91,6 @@ public static void markPipelineHealthy(Pipeline pipeline)
     pipeline.setLeaderId(pipeline.getFirstNode().getID());
   }
 
-  @Override
-  public void shutdown() {
-    // Do nothing.
-  }
-
   @Override
   public Pipeline create(RatisReplicationConfig replicationConfig,
       List<DatanodeDetails> nodes) {
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
index 869c6f90ca1..154849c5a45 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
@@ -79,9 +79,5 @@ public void close(Pipeline pipeline) {
       // Do nothing in Recon.
     }
 
-    @Override
-    public void shutdown() {
-      // Do nothing
-    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to