This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new ef4bb7f278d HDDS-14369. RatisPipelineProvider does not honor 
OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT (#9609)
ef4bb7f278d is described below

commit ef4bb7f278df1374638d313e47a286581b63e0ce
Author: Russole <[email protected]>
AuthorDate: Wed Jan 14 01:30:15 2026 +0800

    HDDS-14369. RatisPipelineProvider does not honor 
OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT (#9609)
---
 .../hadoop/hdds/scm/node/SCMNodeManager.java       | 11 +++--
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java | 11 +++--
 .../hdds/scm/pipeline/RatisPipelineProvider.java   | 16 +++----
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   | 22 +++++++++
 .../scm/pipeline/TestPipelinePlacementPolicy.java  | 54 ++++++++++++++++++++++
 .../scm/pipeline/TestRatisPipelineProvider.java    | 50 ++++++++++++++++++++
 6 files changed, 146 insertions(+), 18 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 26d9e49bafc..e2a17dd1d5c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -133,7 +133,7 @@ public class SCMNodeManager implements NodeManager {
   private final boolean useHostname;
   private final Map<String, Set<DatanodeID>> dnsToDnIdMap = new 
ConcurrentHashMap<>();
   private final int numPipelinesPerMetadataVolume;
-  private final int heavyNodeCriteria;
+  private final int datanodePipelineLimit;
   private final HDDSLayoutVersionManager scmLayoutVersionManager;
   private final EventPublisher scmNodeEventPublisher;
   private final SCMContext scmContext;
@@ -195,8 +195,9 @@ public SCMNodeManager(
     this.numPipelinesPerMetadataVolume =
         conf.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME,
             ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME_DEFAULT);
-    String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
-    this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
+    this.datanodePipelineLimit = conf.getInt(
+        ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
+        ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
     this.numContainerPerVolume = conf.getInt(
         ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
         ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
@@ -1602,8 +1603,8 @@ public int totalHealthyVolumeCount() {
   @Override
   public int pipelineLimit(DatanodeDetails dn) {
     try {
-      if (heavyNodeCriteria > 0) {
-        return heavyNodeCriteria;
+      if (datanodePipelineLimit > 0) {
+        return datanodePipelineLimit;
       } else if (nodeStateManager.getNode(dn).getHealthyVolumeCount() > 0) {
         return numPipelinesPerMetadataVolume *
             nodeStateManager.getNode(dn).getMetaDataVolumeCount();
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 696d6ecc336..366a33dccf2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -54,7 +54,7 @@ public final class PipelinePlacementPolicy extends 
SCMCommonPlacementPolicy {
       LoggerFactory.getLogger(PipelinePlacementPolicy.class);
   private final NodeManager nodeManager;
   private final PipelineStateManager stateManager;
-  private final int heavyNodeCriteria;
+  private final int datanodePipelineLimit;
   private static final int REQUIRED_RACKS = 2;
 
   public static final String MULTIPLE_RACK_PIPELINE_MSG =
@@ -76,8 +76,9 @@ public PipelinePlacementPolicy(final NodeManager nodeManager,
     super(nodeManager, conf);
     this.nodeManager = nodeManager;
     this.stateManager = stateManager;
-    String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
-    this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
+    this.datanodePipelineLimit = conf.getInt(
+        ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
+        ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
   }
 
   public static int currentRatisThreePipelineCount(
@@ -182,7 +183,7 @@ List<DatanodeDetails> filterViableNodes(
     if (healthyList.size() < nodesRequired) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Unable to find enough nodes that meet the criteria that" +
-            " cannot engage in more than" + heavyNodeCriteria +
+            " cannot engage in more than" + datanodePipelineLimit +
             " pipelines. Nodes required: " + nodesRequired + " Excluded: " +
             excludedNodesSize + " Found:" +
             healthyList.size() + " healthy nodes count in NodeManager: " +
@@ -191,7 +192,7 @@ List<DatanodeDetails> filterViableNodes(
       msg = String.format("Pipeline creation failed because nodes are engaged" 
+
               " in other pipelines and every node can only be engaged in" +
               " max %d pipelines. Required %d. Found %d. Excluded: %d.",
-          heavyNodeCriteria, nodesRequired, healthyList.size(),
+          datanodePipelineLimit, nodesRequired, healthyList.size(),
           excludedNodesSize);
       throw new SCMException(msg,
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index b35a1f28148..800a7ebfd83 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -60,7 +60,7 @@ public class RatisPipelineProvider
   private final EventPublisher eventPublisher;
   private final PlacementPolicy placementPolicy;
   private final int pipelineNumberLimit;
-  private final int maxPipelinePerDatanode;
+  private final int datanodePipelineLimit;
   private final LeaderChoosePolicy leaderChoosePolicy;
   private final SCMContext scmContext;
   private final long containerSizeBytes;
@@ -80,9 +80,9 @@ public RatisPipelineProvider(NodeManager nodeManager,
     this.pipelineNumberLimit = conf.getInt(
         ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT,
         ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT_DEFAULT);
-    String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
-    this.maxPipelinePerDatanode = dnLimit == null ? 0 :
-        Integer.parseInt(dnLimit);
+    this.datanodePipelineLimit = conf.getInt(
+        ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
+        ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
     this.containerSizeBytes = (long) conf.getStorageSize(
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
@@ -110,10 +110,10 @@ private boolean 
exceedPipelineNumberLimit(RatisReplicationConfig replicationConf
     int closedPipelines = pipelineStateManager.getPipelines(replicationConfig, 
PipelineState.CLOSED).size();
     int openPipelines = totalActivePipelines - closedPipelines;
     // Check per-datanode pipeline limit
-    if (maxPipelinePerDatanode > 0) {
+    if (datanodePipelineLimit > 0) {
       int healthyNodeCount = getNodeManager()
           .getNodeCount(NodeStatus.inServiceHealthy());
-      int allowedOpenPipelines = (maxPipelinePerDatanode * healthyNodeCount)
+      int allowedOpenPipelines = (datanodePipelineLimit * healthyNodeCount)
           / replicationConfig.getRequiredNodes();
       return openPipelines >= allowedOpenPipelines;
     }
@@ -145,8 +145,8 @@ public synchronized Pipeline create(RatisReplicationConfig 
replicationConfig,
       List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
       throws IOException {
     if (exceedPipelineNumberLimit(replicationConfig)) {
-      String limitInfo = (maxPipelinePerDatanode > 0)
-          ? String.format("per datanode: %d", maxPipelinePerDatanode)
+      String limitInfo = (datanodePipelineLimit > 0)
+          ? String.format("per datanode: %d", datanodePipelineLimit)
           : String.format(": %d", pipelineNumberLimit);
 
       throw new SCMException(
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index 84b548a4c04..928e38295f5 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -2059,6 +2059,28 @@ public void testScmRegisterNodeWithUpdatedIpAndHostname()
     }
   }
 
+  /**
+   * Test that pipelineLimit() uses the default value when the config is not 
set.
+   */
+  @Test
+  public void testUsesDefaultPipelineLimitWhenUnset()
+      throws IOException, AuthenticationException {
+
+    // Creates node manager with config without limit set
+    OzoneConfiguration conf = getConf();
+    conf.unset(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+
+      // Registers datanode with healthy volumes
+      DatanodeDetails dn = registerWithCapacity(nodeManager);
+
+      // Calls pipelineLimit() and verifies returns default value
+      int limit = nodeManager.pipelineLimit(dn);
+      assertEquals(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT, limit);
+    }
+  }
+
   private static Stream<Arguments> nodeStateTransitions() {
     return Stream.of(
         // start decommissioning or entering maintenance
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index b9b5ac88455..7094a39c79e 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -722,6 +722,60 @@ public void testCurrentRatisThreePipelineCount()
     assertEquals(pipelineCount, 2);
   }
 
+  @Test
+  public void testPipelinePlacementPolicyDefaultLimitFiltersNodeAtLimit()
+      throws IOException, TimeoutException {
+
+    // 1) Creates policy with config without limit set
+    OzoneConfiguration localConf = new OzoneConfiguration(conf);
+    localConf.unset(OZONE_DATANODE_PIPELINE_LIMIT);
+
+    MockNodeManager localNodeManager = new MockNodeManager(cluster,
+        getNodesWithRackAwareness(), false, 
PIPELINE_PLACEMENT_MAX_NODES_COUNT);
+
+    // Ensure NodeManager uses default limit (=2) when limit is not set in conf
+    localNodeManager.setNumPipelinePerDatanode(
+        ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
+
+    PipelineStateManager localStateManager = 
PipelineStateManagerImpl.newBuilder()
+        .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setNodeManager(localNodeManager)
+        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+        .build();
+
+    PipelinePlacementPolicy localPolicy = new PipelinePlacementPolicy(
+        localNodeManager, localStateManager, localConf);
+
+    List<DatanodeDetails> healthy =
+        localNodeManager.getNodes(NodeStatus.inServiceHealthy());
+    DatanodeDetails target = healthy.get(0);
+
+    // 2) Adds exactly 2 pipelines to test node (default limit)
+    List<DatanodeDetails> p1Dns = new ArrayList<>();
+    p1Dns.add(target);
+    p1Dns.add(healthy.get(1));
+    p1Dns.add(healthy.get(2));
+    createPipelineWithReplicationConfig(p1Dns, RATIS, THREE);
+
+    List<DatanodeDetails> p2Dns = new ArrayList<>();
+    p2Dns.add(target);
+    p2Dns.add(healthy.get(3));
+    p2Dns.add(healthy.get(4));
+    createPipelineWithReplicationConfig(p2Dns, RATIS, THREE);
+
+    assertEquals(2, PipelinePlacementPolicy.currentRatisThreePipelineCount(
+        localNodeManager, localStateManager, target));
+
+    // 3) Verifies node is filtered out when choosing nodes for new pipeline
+    int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
+    List<DatanodeDetails> chosen = localPolicy.chooseDatanodes(
+        new ArrayList<>(), new ArrayList<>(), nodesRequired, 0, 0);
+
+    assertEquals(nodesRequired, chosen.size());
+    assertThat(chosen).doesNotContain(target);
+  }
+
   private void createPipelineWithReplicationConfig(List<DatanodeDetails> 
dnList,
                                                    HddsProtos.ReplicationType
                                                        replicationType,
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index cfeba61c320..ca9d1f5a6c3 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -365,6 +365,56 @@ public void testCreatePipelinesWhenNotEnoughSpace(@TempDir 
File tempDir) throws
     }
   }
 
+  @Test
+  public void testCreatePipelineWithDefaultLimit() throws Exception {
+    // Create conf without setting OZONE_DATANODE_PIPELINE_LIMIT
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+
+    dbStore = DBStoreBuilder.createDBStore(conf, SCMDBDefinition.get());
+
+    // MockNodeManager(true, 10) typically gives 8 healthy nodes in this test 
suite.
+    nodeManager = new MockNodeManager(true, nodeCount);
+    // Give a large quota in MockNodeManager so we don't fail early due to 
mock quota.
+    nodeManager.setNumPipelinePerDatanode(100);
+
+    SCMHAManager scmhaManager = SCMHAManagerStub.getInstance(true);
+    stateManager = PipelineStateManagerImpl.newBuilder()
+        .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setNodeManager(nodeManager)
+        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+        .build();
+
+    provider = new MockRatisPipelineProvider(nodeManager, stateManager, conf);
+
+    int healthyCount = 
nodeManager.getNodes(NodeStatus.inServiceHealthy()).size();
+    int defaultLimit = ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT;
+    assertEquals(2, defaultLimit);
+
+    // Max pipelines before exceeding per-DN default limit.
+    int maxPipelines = (healthyCount * defaultLimit)
+        / ReplicationFactor.THREE.getNumber();
+
+    // Create pipelines up to maxPipelines.
+    for (int i = 0; i < maxPipelines; i++) {
+      Pipeline p = provider.create(
+          RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
+          new ArrayList<>(), new ArrayList<>());
+      
stateManager.addPipeline(p.getProtobufMessage(ClientVersion.CURRENT_VERSION));
+    }
+
+    // Next pipeline creation should fail with default limit message.
+    SCMException ex = assertThrows(SCMException.class, () ->
+        
provider.create(RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
+            new ArrayList<>(), new ArrayList<>())
+    );
+
+    assertThat(ex.getMessage())
+        .contains("limit per datanode: " + defaultLimit)
+        .contains("replicationConfig: RATIS/THREE");
+  }
+
   @ParameterizedTest
   @CsvSource({ "1, 3", "2, 6"})
   public void testCreatePipelineThrowErrorWithDataNodeLimit(int limit, int 
pipelineCount) throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to