This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 6d4cd1c4c6a HDDS-14384. Cleanup PipelineProvider and related classes
(#9608)
6d4cd1c4c6a is described below
commit 6d4cd1c4c6a9bf36c34dab336d474848f1d33205
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jan 12 04:56:33 2026 -0800
HDDS-14384. Cleanup PipelineProvider and related classes (#9608)
---
.../hadoop/hdds/client/ReplicationFactor.java | 34 ++++++++--------------
.../hadoop/hdds/scm/SCMCommonPlacementPolicy.java | 9 ++----
.../hadoop/hdds/scm/node/SCMNodeManager.java | 10 +++----
.../hdds/scm/pipeline/ECPipelineProvider.java | 4 ---
.../hadoop/hdds/scm/pipeline/PipelineFactory.java | 4 ---
.../hdds/scm/pipeline/PipelineManagerImpl.java | 4 +--
.../hadoop/hdds/scm/pipeline/PipelineProvider.java | 9 ++----
.../hdds/scm/pipeline/RatisPipelineProvider.java | 20 ++++---------
.../hdds/scm/pipeline/SimplePipelineProvider.java | 4 ---
.../scm/pipeline/MockRatisPipelineProvider.java | 5 ----
.../ozone/recon/scm/ReconPipelineFactory.java | 4 ---
11 files changed, 28 insertions(+), 79 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java
index c15f20424fe..9f47cc2fd26 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java
@@ -23,21 +23,8 @@
* The replication factor to be used while writing key into ozone.
*/
public enum ReplicationFactor {
- ONE(1),
- THREE(3);
-
- /**
- * Integer representation of replication.
- */
- private int value;
-
- /**
- * Initializes ReplicationFactor with value.
- * @param value replication value
- */
- ReplicationFactor(int value) {
- this.value = value;
- }
+ ONE,
+ THREE;
/**
* Returns enum value corresponding to the int value.
@@ -85,16 +72,19 @@ public HddsProtos.ReplicationFactor toProto() {
case THREE:
return HddsProtos.ReplicationFactor.THREE;
default:
- throw new IllegalArgumentException(
- "Unsupported ProtoBuf replication factor: " + this);
+ throw new IllegalStateException("Unexpected enum value: " + this);
}
}
- /**
- * Returns integer representation of ReplicationFactor.
- * @return replication value
- */
+ /** @return the number of replication(s). */
public int getValue() {
- return value;
+ switch (this) {
+ case ONE:
+ return 1;
+ case THREE:
+ return 3;
+ default:
+ throw new IllegalStateException("Unexpected enum value: " + this);
+ }
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
index 934e13bb53b..c51f6d0429f 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
@@ -273,7 +273,7 @@ public List<DatanodeDetails>
filterNodesWithSpace(List<DatanodeDetails> nodes,
int nodesRequired, long metadataSizeRequired, long dataSizeRequired)
throws SCMException {
List<DatanodeDetails> nodesWithSpace = nodes.stream().filter(d ->
- hasEnoughSpace(d, metadataSizeRequired, dataSizeRequired, conf))
+ hasEnoughSpace(d, metadataSizeRequired, dataSizeRequired))
.collect(Collectors.toList());
if (nodesWithSpace.size() < nodesRequired) {
@@ -298,8 +298,7 @@ public List<DatanodeDetails>
filterNodesWithSpace(List<DatanodeDetails> nodes,
*/
public static boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
long metadataSizeRequired,
- long dataSizeRequired,
- ConfigurationSource conf) {
+ long dataSizeRequired) {
Preconditions.checkArgument(datanodeDetails instanceof DatanodeInfo);
boolean enoughForData = false;
@@ -523,9 +522,7 @@ public boolean isValidNode(DatanodeDetails datanodeDetails,
return false;
}
NodeStatus nodeStatus = datanodeInfo.getNodeStatus();
- if (nodeStatus.isNodeWritable() &&
- (hasEnoughSpace(datanodeInfo, metadataSizeRequired,
- dataSizeRequired, conf))) {
+ if (nodeStatus.isNodeWritable() && (hasEnoughSpace(datanodeInfo,
metadataSizeRequired, dataSizeRequired))) {
LOG.debug("Datanode {} is chosen. Required metadata size is {} and " +
"required data size is {} and NodeStatus is {}",
datanodeDetails, metadataSizeRequired, dataSizeRequired, nodeStatus);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 4af4465a95a..c3ef2238769 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -127,7 +127,6 @@ public class SCMNodeManager implements NodeManager {
private final SCMNodeMetrics metrics;
// Node manager MXBean
private ObjectName nmInfoBean;
- private final OzoneConfiguration conf;
private final SCMStorageConfig scmStorageConfig;
private final NetworkTopology clusterMap;
private final Function<String, String> nodeResolver;
@@ -140,6 +139,7 @@ public class SCMNodeManager implements NodeManager {
private final SCMContext scmContext;
private final Map<SCMCommandProto.Type,
BiConsumer<DatanodeDetails, SCMCommand<?>>> sendCommandNotifyMap;
+ private final NonWritableNodeFilter nonWritableNodeFilter;
/**
* Lock used to synchronize some operation in Node manager to ensure a
@@ -176,7 +176,6 @@ public SCMNodeManager(
SCMContext scmContext,
HDDSLayoutVersionManager layoutVersionManager,
Function<String, String> nodeResolver) {
- this.conf = conf;
this.scmNodeEventPublisher = eventPublisher;
this.nodeStateManager = new NodeStateManager(conf, eventPublisher,
layoutVersionManager, scmContext);
@@ -199,6 +198,7 @@ public SCMNodeManager(
this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
this.scmContext = scmContext;
this.sendCommandNotifyMap = new HashMap<>();
+ this.nonWritableNodeFilter = new NonWritableNodeFilter(conf);
}
@Override
@@ -1394,7 +1394,7 @@ private void nodeSpaceStatistics(Map<String, String>
nodeStatics) {
private void nodeNonWritableStatistics(Map<String, String> nodeStatics) {
int nonWritableNodesCount = (int) getAllNodes().parallelStream()
- .filter(new NonWritableNodeFilter(conf))
+ .filter(nonWritableNodeFilter)
.count();
nodeStatics.put("NonWritableNodes", String.valueOf(nonWritableNodesCount));
@@ -1405,7 +1405,6 @@ static class NonWritableNodeFilter implements
Predicate<DatanodeInfo> {
private final long blockSize;
private final long minRatisVolumeSizeBytes;
private final long containerSize;
- private final ConfigurationSource conf;
NonWritableNodeFilter(ConfigurationSource conf) {
blockSize = (long) conf.getStorageSize(
@@ -1420,13 +1419,12 @@ static class NonWritableNodeFilter implements
Predicate<DatanodeInfo> {
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
StorageUnit.BYTES);
- this.conf = conf;
}
@Override
public boolean test(DatanodeInfo dn) {
return !dn.getNodeStatus().isNodeWritable()
- || (!hasEnoughSpace(dn, minRatisVolumeSizeBytes, containerSize, conf)
+ || (!hasEnoughSpace(dn, minRatisVolumeSizeBytes, containerSize)
&& !hasEnoughCommittedVolumeSpace(dn));
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
index c1f14d8cc65..f9b94bd8d0d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
@@ -148,8 +148,4 @@ private Pipeline createPipelineInternal(ECReplicationConfig
repConfig,
protected void close(Pipeline pipeline) throws IOException {
}
- @Override
- protected void shutdown() {
- }
-
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index c07cfc4ff15..e55947d0d4d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -123,10 +123,6 @@ public void close(ReplicationType type, Pipeline pipeline)
providers.get(type).close(pipeline);
}
- public void shutdown() {
- providers.values().forEach(provider -> provider.shutdown());
- }
-
@VisibleForTesting
public Map<ReplicationType, PipelineProvider> getProviders() {
return providers;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 32474b04a81..d0d261b21fd 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -641,7 +641,7 @@ public boolean hasEnoughSpace(Pipeline pipeline, long
containerSize) {
if (!(node instanceof DatanodeInfo)) {
node = nodeManager.getDatanodeInfo(node);
}
- if (!SCMCommonPlacementPolicy.hasEnoughSpace(node, 0, containerSize,
null)) {
+ if (!SCMCommonPlacementPolicy.hasEnoughSpace(node, 0, containerSize)) {
return false;
}
}
@@ -842,8 +842,6 @@ public void close() throws IOException {
SCMPipelineMetrics.unRegister();
- // shutdown pipeline provider.
- pipelineFactory.shutdown();
try {
stateManager.close();
} catch (Exception ex) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index 99ace4b981f..b1b2d734906 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -23,7 +23,6 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
@@ -81,18 +80,14 @@ protected abstract Pipeline createForRead(
protected abstract void close(Pipeline pipeline) throws IOException;
- protected abstract void shutdown();
-
List<DatanodeDetails> pickNodesNotUsed(REPLICATION_CONFIG replicationConfig,
long metadataSizeRequired,
- long dataSizeRequired,
- ConfigurationSource conf)
+ long dataSizeRequired)
throws SCMException {
int nodesRequired = replicationConfig.getRequiredNodes();
List<DatanodeDetails> healthyDNs = pickAllNodesNotUsed(replicationConfig);
List<DatanodeDetails> healthyDNsWithSpace = healthyDNs.stream()
- .filter(dn -> SCMCommonPlacementPolicy
- .hasEnoughSpace(dn, metadataSizeRequired, dataSizeRequired, conf))
+ .filter(dn -> SCMCommonPlacementPolicy.hasEnoughSpace(dn,
metadataSizeRequired, dataSizeRequired))
.limit(nodesRequired)
.collect(Collectors.toList());
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 491e4d15adf..b35a1f28148 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -57,11 +57,10 @@ public class RatisPipelineProvider
private static final Logger LOG =
LoggerFactory.getLogger(RatisPipelineProvider.class);
- private final ConfigurationSource conf;
private final EventPublisher eventPublisher;
private final PlacementPolicy placementPolicy;
- private int pipelineNumberLimit;
- private int maxPipelinePerDatanode;
+ private final int pipelineNumberLimit;
+ private final int maxPipelinePerDatanode;
private final LeaderChoosePolicy leaderChoosePolicy;
private final SCMContext scmContext;
private final long containerSizeBytes;
@@ -74,7 +73,6 @@ public RatisPipelineProvider(NodeManager nodeManager,
EventPublisher eventPublisher,
SCMContext scmContext) {
super(nodeManager, stateManager);
- this.conf = conf;
this.eventPublisher = eventPublisher;
this.scmContext = scmContext;
this.placementPolicy = PipelinePlacementPolicyFactory
@@ -85,11 +83,11 @@ public RatisPipelineProvider(NodeManager nodeManager,
String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
this.maxPipelinePerDatanode = dnLimit == null ? 0 :
Integer.parseInt(dnLimit);
- this.containerSizeBytes = (long) this.conf.getStorageSize(
+ this.containerSizeBytes = (long) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
StorageUnit.BYTES);
- this.minRatisVolumeSizeBytes = (long) this.conf.getStorageSize(
+ this.minRatisVolumeSizeBytes = (long) conf.getStorageSize(
ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN,
ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN_DEFAULT,
StorageUnit.BYTES);
@@ -158,14 +156,12 @@ public synchronized Pipeline
create(RatisReplicationConfig replicationConfig,
);
}
- List<DatanodeDetails> dns;
-
+ final List<DatanodeDetails> dns;
final ReplicationFactor factor =
replicationConfig.getReplicationFactor();
switch (factor) {
case ONE:
- dns = pickNodesNotUsed(replicationConfig, minRatisVolumeSizeBytes,
- containerSizeBytes, conf);
+ dns = pickNodesNotUsed(replicationConfig, minRatisVolumeSizeBytes,
containerSizeBytes);
break;
case THREE:
List<DatanodeDetails> excludeDueToEngagement =
filterPipelineEngagement();
@@ -251,10 +247,6 @@ private List<DatanodeDetails> filterPipelineEngagement() {
return excluded;
}
- @Override
- public void shutdown() {
- }
-
/**
* Removes pipeline from SCM. Sends command to destroy pipeline on all
* the datanodes.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index 85be042eccc..a7cfd4bd597 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -95,8 +95,4 @@ public void close(Pipeline pipeline) throws IOException {
}
- @Override
- public void shutdown() {
- // Do nothing.
- }
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index 82c661210c6..933465c0245 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -91,11 +91,6 @@ public static void markPipelineHealthy(Pipeline pipeline)
pipeline.setLeaderId(pipeline.getFirstNode().getID());
}
- @Override
- public void shutdown() {
- // Do nothing.
- }
-
@Override
public Pipeline create(RatisReplicationConfig replicationConfig,
List<DatanodeDetails> nodes) {
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
index 869c6f90ca1..154849c5a45 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
@@ -79,9 +79,5 @@ public void close(Pipeline pipeline) {
// Do nothing in Recon.
}
- @Override
- public void shutdown() {
- // Do nothing
- }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]