This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new cc2ba99b42a HDDS-14022. Move getSCMAddressForDatanodes and
getReconAddresses to HddsServerUtil (#9556)
cc2ba99b42a is described below
commit cc2ba99b42af1356ac25fdfc0b985b2cd8a2d0f3
Author: Russole <[email protected]>
AuthorDate: Sat Dec 27 20:37:35 2025 +0800
HDDS-14022. Move getSCMAddressForDatanodes and getReconAddresses to
HddsServerUtil (#9556)
---
.../java/org/apache/hadoop/hdds/HddsUtils.java | 116 -----------------
.../java/org/apache/hadoop/hdds/TestHddsUtils.java | 133 --------------------
.../apache/hadoop/ozone/HddsDatanodeService.java | 4 +-
.../common/statemachine/DatanodeStateMachine.java | 3 +-
.../common/states/datanode/InitDatanodeState.java | 6 +-
.../apache/hadoop/hdds/utils/HddsServerUtil.java | 121 ++++++++++++++++++
.../apache/hadoop/hdds/scm/TestHddsServerUtil.java | 137 +++++++++++++++++++++
.../hadoop/ozone/freon/DatanodeSimulator.java | 6 +-
.../org/apache/hadoop/ozone/recon/ReconServer.java | 3 +-
9 files changed, 268 insertions(+), 261 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index e002de647a8..d74cae83942 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -25,15 +25,9 @@
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_DNS_INTERFACE_KEY;
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_DNS_NAMESERVER_KEY;
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_HOST_NAME_KEY;
-import static
org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_ADDRESS_KEY;
-import static
org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DATANODE_PORT_DEFAULT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_KEY;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEFAULT_SERVICE_ID;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY;
@@ -58,12 +52,9 @@
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
-import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import javax.management.ObjectName;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.ConfigRedactor;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
@@ -280,113 +271,6 @@ public static OptionalInt getPortNumberFromConfigKeys(
return OptionalInt.empty();
}
- /**
- * Retrieve the socket addresses of all storage container managers.
- *
- * @return A collection of SCM addresses
- * @throws IllegalArgumentException If the configuration is invalid
- */
- public static Collection<InetSocketAddress> getSCMAddressForDatanodes(
- ConfigurationSource conf) {
-
- // First check HA style config, if not defined fall back to OZONE_SCM_NAMES
-
- if (getScmServiceId(conf) != null) {
- List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
- Collection<InetSocketAddress> scmAddressList =
- new HashSet<>(scmNodeInfoList.size());
- for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
- scmAddressList.add(
- NetUtils.createSocketAddr(scmNodeInfo.getScmDatanodeAddress()));
- }
- return scmAddressList;
- } else {
- // fall back to OZONE_SCM_NAMES.
- Collection<String> names =
- conf.getTrimmedStringCollection(ScmConfigKeys.OZONE_SCM_NAMES);
- if (names.isEmpty()) {
- throw new IllegalArgumentException(ScmConfigKeys.OZONE_SCM_NAMES
- + " need to be a set of valid DNS names or IP addresses."
- + " Empty address list found.");
- }
-
- Collection<InetSocketAddress> addresses = new HashSet<>(names.size());
- for (String address : names) {
- Optional<String> hostname = getHostName(address);
- if (!hostname.isPresent()) {
- throw new IllegalArgumentException("Invalid hostname for SCM: "
- + address);
- }
- int port = getHostPort(address)
- .orElse(conf.getInt(OZONE_SCM_DATANODE_PORT_KEY,
- OZONE_SCM_DATANODE_PORT_DEFAULT));
- InetSocketAddress addr = NetUtils.createSocketAddr(hostname.get(),
- port);
- addresses.add(addr);
- }
-
- if (addresses.size() > 1) {
- LOG.warn("When SCM HA is configured, configure {} appended with " +
- "serviceId and nodeId. {} is deprecated.", OZONE_SCM_ADDRESS_KEY,
- OZONE_SCM_NAMES);
- }
- return addresses;
- }
- }
-
- /**
- * Returns the SCM address for datanodes based on the service ID and the SCM
addresses.
- * @param conf Configuration
- * @param scmServiceId SCM service ID
- * @param scmNodeIds Requested SCM node IDs
- * @return A collection with addresses of the request SCM node IDs.
- * Null if there is any wrongly configured SCM address. Note that the
returned collection
- * might not be ordered the same way as the requested SCM node IDs
- */
- public static Collection<Pair<String, InetSocketAddress>>
getSCMAddressForDatanodes(
- ConfigurationSource conf, String scmServiceId, Set<String> scmNodeIds) {
- Collection<Pair<String, InetSocketAddress>> scmNodeAddress = new
HashSet<>(scmNodeIds.size());
- for (String scmNodeId : scmNodeIds) {
- String addressKey = ConfUtils.addKeySuffixes(
- OZONE_SCM_ADDRESS_KEY, scmServiceId, scmNodeId);
- String scmAddress = conf.get(addressKey);
- if (scmAddress == null) {
- LOG.warn("The SCM address configuration {} is not defined, return
nothing", addressKey);
- return null;
- }
-
- int scmDatanodePort = SCMNodeInfo.getPort(conf, scmServiceId, scmNodeId,
- OZONE_SCM_DATANODE_ADDRESS_KEY, OZONE_SCM_DATANODE_PORT_KEY,
- OZONE_SCM_DATANODE_PORT_DEFAULT);
-
- String scmDatanodeAddressStr = SCMNodeInfo.buildAddress(scmAddress,
scmDatanodePort);
- InetSocketAddress scmDatanodeAddress =
NetUtils.createSocketAddr(scmDatanodeAddressStr);
- scmNodeAddress.add(Pair.of(scmNodeId, scmDatanodeAddress));
- }
- return scmNodeAddress;
- }
-
- /**
- * Retrieve the socket addresses of recon.
- *
- * @return Recon address
- * @throws IllegalArgumentException If the configuration is invalid
- */
- public static InetSocketAddress getReconAddresses(
- ConfigurationSource conf) {
- String name = conf.get(OZONE_RECON_ADDRESS_KEY);
- if (StringUtils.isEmpty(name)) {
- return null;
- }
- Optional<String> hostname = getHostName(name);
- if (!hostname.isPresent()) {
- throw new IllegalArgumentException("Invalid hostname for Recon: "
- + name);
- }
- int port = getHostPort(name).orElse(OZONE_RECON_DATANODE_PORT_DEFAULT);
- return NetUtils.createSocketAddr(hostname.get(), port);
- }
-
/**
* Returns the hostname for this datanode. If the hostname is not
* explicitly configured in the given config, then it is determined
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
index 5eb1b5e31fe..650db8c5439 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
@@ -17,25 +17,14 @@
package org.apache.hadoop.hdds;
-import static org.apache.hadoop.hdds.HddsUtils.getSCMAddressForDatanodes;
import static org.apache.hadoop.hdds.HddsUtils.processForLogging;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT;
-import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.net.InetSocketAddress;
import java.nio.file.Paths;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -103,128 +92,6 @@ void validatePathRejectsInvalidPath(String path, String
ancestor) {
() -> HddsUtils.validatePath(Paths.get(path), Paths.get(ancestor)));
}
- @Test
- void testGetSCMAddresses() {
- final OzoneConfiguration conf = new OzoneConfiguration();
- Collection<InetSocketAddress> addresses;
- InetSocketAddress addr;
- Iterator<InetSocketAddress> it;
-
- // Verify valid IP address setup
- conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "1.2.3.4");
- addresses = getSCMAddressForDatanodes(conf);
- assertEquals(1, addresses.size());
- addr = addresses.iterator().next();
- assertEquals("1.2.3.4", addr.getHostName());
- assertEquals(OZONE_SCM_DATANODE_PORT_DEFAULT, addr.getPort());
-
- // Verify valid hostname setup
- conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1");
- addresses = getSCMAddressForDatanodes(conf);
- assertEquals(1, addresses.size());
- addr = addresses.iterator().next();
- assertEquals("scm1", addr.getHostName());
- assertEquals(OZONE_SCM_DATANODE_PORT_DEFAULT, addr.getPort());
-
- // Verify valid hostname and port
- conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234");
- addresses = getSCMAddressForDatanodes(conf);
- assertEquals(1, addresses.size());
- addr = addresses.iterator().next();
- assertEquals("scm1", addr.getHostName());
- assertEquals(1234, addr.getPort());
-
- final Map<String, Integer> hostsAndPorts = new HashMap<>();
- hostsAndPorts.put("scm1", 1234);
- hostsAndPorts.put("scm2", 2345);
- hostsAndPorts.put("scm3", 3456);
-
- // Verify multiple hosts and port
- conf.setStrings(
- ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234,scm2:2345,scm3:3456");
- addresses = getSCMAddressForDatanodes(conf);
- assertEquals(3, addresses.size());
- it = addresses.iterator();
- HashMap<String, Integer> expected1 = new HashMap<>(hostsAndPorts);
- while (it.hasNext()) {
- InetSocketAddress current = it.next();
- assertTrue(expected1.remove(current.getHostName(),
- current.getPort()));
- }
- assertThat(expected1).isEmpty();
-
- // Verify names with spaces
- conf.setStrings(
- ScmConfigKeys.OZONE_SCM_NAMES, " scm1:1234, scm2:2345 , scm3:3456 ");
- addresses = getSCMAddressForDatanodes(conf);
- assertEquals(3, addresses.size());
- it = addresses.iterator();
- HashMap<String, Integer> expected2 = new HashMap<>(hostsAndPorts);
- while (it.hasNext()) {
- InetSocketAddress current = it.next();
- assertTrue(expected2.remove(current.getHostName(),
- current.getPort()));
- }
- assertThat(expected2).isEmpty();
-
- // Verify empty value
- conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "");
- assertThrows(IllegalArgumentException.class,
- () -> getSCMAddressForDatanodes(conf),
- "Empty value should cause an IllegalArgumentException");
-
- // Verify invalid hostname
- conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "s..x..:1234");
- assertThrows(IllegalArgumentException.class,
- () -> getSCMAddressForDatanodes(conf),
- "An invalid hostname should cause an IllegalArgumentException");
-
- // Verify invalid port
- conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm:xyz");
- assertThrows(IllegalArgumentException.class,
- () -> getSCMAddressForDatanodes(conf),
- "An invalid port should cause an IllegalArgumentException");
-
- // Verify a mixed case (valid and invalid value both appears)
- conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234, scm:xyz");
- assertThrows(IllegalArgumentException.class,
- () -> getSCMAddressForDatanodes(conf),
- "An invalid value should cause an IllegalArgumentException");
- }
-
- @Test
- void testGetSCMAddressesWithHAConfig() {
- OzoneConfiguration conf = new OzoneConfiguration();
- String scmServiceId = "scmserviceId";
- String[] nodes = new String[]{"scm1", "scm2", "scm3"};
- conf.set(ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY, scmServiceId);
- conf.set(ScmConfigKeys.OZONE_SCM_NODES_KEY + "." + scmServiceId,
- "scm1,scm2,scm3");
-
- int port = 9880;
- List<String> expected = new ArrayList<>();
- for (String nodeId : nodes) {
- conf.set(ConfUtils.addKeySuffixes(OZONE_SCM_ADDRESS_KEY,
- scmServiceId, nodeId), "scm");
- conf.setInt(ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
- scmServiceId, nodeId), ++port);
- expected.add("scm" + ":" + port);
- }
-
- Collection<InetSocketAddress> scmAddressList =
- HddsUtils.getSCMAddressForDatanodes(conf);
-
- assertNotNull(scmAddressList);
- assertEquals(3, scmAddressList.size());
-
- for (InetSocketAddress next : scmAddressList) {
- expected.remove(next.getHostName() + ":" + next.getPort());
- }
-
- assertEquals(0, expected.size());
-
- }
-
@Test
void testGetNumberFromConfigKeys() {
final String testnum1 = "8";
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index 28d5d5a2cb1..9bd10a83b3e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -739,12 +739,12 @@ private String reconfigScmNodes(String value) {
LOG.info("Reconfiguring SCM nodes for service ID {} with new SCM nodes {}
and remove SCM nodes {}",
scmServiceId, scmNodesIdsToAdd, scmNodesIdsToRemove);
- Collection<Pair<String, InetSocketAddress>> scmToAdd =
HddsUtils.getSCMAddressForDatanodes(
+ Collection<Pair<String, InetSocketAddress>> scmToAdd =
HddsServerUtil.getSCMAddressForDatanodes(
getConf(), scmServiceId, scmNodesIdsToAdd);
if (scmToAdd == null) {
throw new IllegalStateException("Reconfiguration failed to get SCM
address to add due to wrong configuration");
}
- Collection<Pair<String, InetSocketAddress>> scmToRemove =
HddsUtils.getSCMAddressForDatanodes(
+ Collection<Pair<String, InetSocketAddress>> scmToRemove =
HddsServerUtil.getSCMAddressForDatanodes(
getConf(), scmServiceId, scmNodesIdsToRemove);
if (scmToRemove == null) {
throw new IllegalArgumentException(
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index f05a14e2109..dfe89a17aa2 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -32,7 +32,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
@@ -314,7 +313,7 @@ private int getEndPointTaskThreadPoolSize() {
int totalServerCount = 1;
try {
- totalServerCount += HddsUtils.getSCMAddressForDatanodes(conf).size();
+ totalServerCount +=
HddsServerUtil.getSCMAddressForDatanodes(conf).size();
} catch (Exception e) {
LOG.error("Fail to get scm addresses", e);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
index 6fe0e111bad..2787093b1bf 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
@@ -17,8 +17,8 @@
package org.apache.hadoop.ozone.container.common.states.datanode;
-import static org.apache.hadoop.hdds.HddsUtils.getReconAddresses;
-import static org.apache.hadoop.hdds.HddsUtils.getSCMAddressForDatanodes;
+import static
org.apache.hadoop.hdds.utils.HddsServerUtil.getReconAddressForDatanodes;
+import static
org.apache.hadoop.hdds.utils.HddsServerUtil.getSCMAddressForDatanodes;
import com.google.common.base.Strings;
import java.io.File;
@@ -104,7 +104,7 @@ public DatanodeStateMachine.DatanodeStates call() throws
Exception {
connectionManager.addSCMServer(addr, context.getThreadNamePrefix());
this.context.addEndpoint(addr);
}
- InetSocketAddress reconAddress = getReconAddresses(conf);
+ InetSocketAddress reconAddress = getReconAddressForDatanodes(conf);
if (reconAddress != null) {
connectionManager.addReconServer(reconAddress,
context.getThreadNamePrefix());
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index df4c31077aa..3d8b6178025 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -25,9 +25,17 @@
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_RECON_HEARTBEAT_INTERVAL_DEFAULT;
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_RECON_INITIAL_HEARTBEAT_INTERVAL;
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_RECON_INITIAL_HEARTBEAT_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdds.HddsUtils.getHostName;
import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
+import static org.apache.hadoop.hdds.HddsUtils.getHostPort;
import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
+import static org.apache.hadoop.hdds.HddsUtils.getScmServiceId;
+import static
org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_ADDRESS_KEY;
+import static
org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DATANODE_PORT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_DEFAULT;
@@ -40,6 +48,7 @@
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_RPC_RETRY_INTERVAL_DEFAULT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdds.server.ServerUtils.sanitizeUserArgs;
@@ -58,6 +67,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
@@ -68,7 +79,9 @@
import java.util.stream.Stream;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
@@ -85,6 +98,7 @@
import org.apache.hadoop.hdds.ratis.ServerNotLeaderException;
import org.apache.hadoop.hdds.recon.ReconConfigKeys;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.proxy.SCMClientConfig;
import
org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider;
@@ -104,6 +118,7 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.ha.ConfUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.ratis.util.Preconditions;
@@ -757,4 +772,110 @@ public static void setPoolSize(ThreadPoolExecutor
executor, int size, Logger log
}
}
+ /**
+ * Retrieve the socket addresses of all storage container managers.
+ *
+ * @return A collection of SCM addresses
+ * @throws IllegalArgumentException If the configuration is invalid
+ */
+ public static Collection<InetSocketAddress> getSCMAddressForDatanodes(
+ ConfigurationSource conf) {
+
+ // First check HA style config, if not defined fall back to OZONE_SCM_NAMES
+
+ if (getScmServiceId(conf) != null) {
+ List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
+ Collection<InetSocketAddress> scmAddressList =
+ new HashSet<>(scmNodeInfoList.size());
+ for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
+ scmAddressList.add(
+ NetUtils.createSocketAddr(scmNodeInfo.getScmDatanodeAddress()));
+ }
+ return scmAddressList;
+ } else {
+ // fall back to OZONE_SCM_NAMES.
+ Collection<String> names =
+ conf.getTrimmedStringCollection(ScmConfigKeys.OZONE_SCM_NAMES);
+ if (names.isEmpty()) {
+ throw new IllegalArgumentException(ScmConfigKeys.OZONE_SCM_NAMES
+ + " need to be a set of valid DNS names or IP addresses."
+ + " Empty address list found.");
+ }
+
+ Collection<InetSocketAddress> addresses = new HashSet<>(names.size());
+ for (String address : names) {
+ Optional<String> hostname = getHostName(address);
+ if (!hostname.isPresent()) {
+ throw new IllegalArgumentException("Invalid hostname for SCM: "
+ + address);
+ }
+ int port = getHostPort(address)
+ .orElse(conf.getInt(OZONE_SCM_DATANODE_PORT_KEY,
+ OZONE_SCM_DATANODE_PORT_DEFAULT));
+ InetSocketAddress addr = NetUtils.createSocketAddr(hostname.get(),
+ port);
+ addresses.add(addr);
+ }
+
+ if (addresses.size() > 1) {
+ LOG.warn("When SCM HA is configured, configure {} appended with " +
+ "serviceId and nodeId. {} is deprecated.",
OZONE_SCM_ADDRESS_KEY,
+ OZONE_SCM_NAMES);
+ }
+ return addresses;
+ }
+ }
+
+ /**
+ * Returns the SCM address for datanodes based on the service ID and the SCM
addresses.
+ * @param conf Configuration
+ * @param scmServiceId SCM service ID
+ * @param scmNodeIds Requested SCM node IDs
+ * @return A collection with addresses of the request SCM node IDs.
+ * Null if there is any wrongly configured SCM address. Note that the
returned collection
+ * might not be ordered the same way as the requested SCM node IDs
+ */
+ public static Collection<Pair<String, InetSocketAddress>>
getSCMAddressForDatanodes(
+ ConfigurationSource conf, String scmServiceId, Set<String> scmNodeIds) {
+ Collection<Pair<String, InetSocketAddress>> scmNodeAddress = new
HashSet<>(scmNodeIds.size());
+ for (String scmNodeId : scmNodeIds) {
+ String addressKey = ConfUtils.addKeySuffixes(
+ OZONE_SCM_ADDRESS_KEY, scmServiceId, scmNodeId);
+ String scmAddress = conf.get(addressKey);
+ if (scmAddress == null) {
+ LOG.warn("The SCM address configuration {} is not defined, return
nothing", addressKey);
+ return null;
+ }
+
+ int scmDatanodePort = SCMNodeInfo.getPort(conf, scmServiceId, scmNodeId,
+ OZONE_SCM_DATANODE_ADDRESS_KEY, OZONE_SCM_DATANODE_PORT_KEY,
+ OZONE_SCM_DATANODE_PORT_DEFAULT);
+
+ String scmDatanodeAddressStr = SCMNodeInfo.buildAddress(scmAddress,
scmDatanodePort);
+ InetSocketAddress scmDatanodeAddress =
NetUtils.createSocketAddr(scmDatanodeAddressStr);
+ scmNodeAddress.add(Pair.of(scmNodeId, scmDatanodeAddress));
+ }
+ return scmNodeAddress;
+ }
+
+ /**
+ * Retrieve the socket addresses of recon.
+ *
+ * @return Recon address
+ * @throws IllegalArgumentException If the configuration is invalid
+ */
+ public static InetSocketAddress getReconAddressForDatanodes(
+ ConfigurationSource conf) {
+ String name = conf.get(OZONE_RECON_ADDRESS_KEY);
+ if (StringUtils.isEmpty(name)) {
+ return null;
+ }
+ Optional<String> hostname = getHostName(name);
+ if (!hostname.isPresent()) {
+ throw new IllegalArgumentException("Invalid hostname for Recon: "
+ + name);
+ }
+ int port = getHostPort(name).orElse(OZONE_RECON_DATANODE_PORT_DEFAULT);
+ return NetUtils.createSocketAddr(hostname.get(), port);
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestHddsServerUtil.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestHddsServerUtil.java
index 6feba1a48f3..2878304f386 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestHddsServerUtil.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestHddsServerUtil.java
@@ -17,13 +17,28 @@
package org.apache.hadoop.hdds.scm;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
+import static
org.apache.hadoop.hdds.utils.HddsServerUtil.getSCMAddressForDatanodes;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.ha.ConfUtils;
import org.junit.jupiter.api.Test;
/**
@@ -169,4 +184,126 @@ public void testScmDataNodeBindHostDefault() {
assertEquals(200, addr.getPort());
}
+ @Test
+ void testGetSCMAddresses() {
+ final OzoneConfiguration conf = new OzoneConfiguration();
+ Collection<InetSocketAddress> addresses;
+ InetSocketAddress addr;
+ Iterator<InetSocketAddress> it;
+
+ // Verify valid IP address setup
+ conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "1.2.3.4");
+ addresses = getSCMAddressForDatanodes(conf);
+ assertEquals(1, addresses.size());
+ addr = addresses.iterator().next();
+ assertEquals("1.2.3.4", addr.getHostName());
+ assertEquals(OZONE_SCM_DATANODE_PORT_DEFAULT, addr.getPort());
+
+ // Verify valid hostname setup
+ conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1");
+ addresses = getSCMAddressForDatanodes(conf);
+ assertEquals(1, addresses.size());
+ addr = addresses.iterator().next();
+ assertEquals("scm1", addr.getHostName());
+ assertEquals(OZONE_SCM_DATANODE_PORT_DEFAULT, addr.getPort());
+
+ // Verify valid hostname and port
+ conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234");
+ addresses = getSCMAddressForDatanodes(conf);
+ assertEquals(1, addresses.size());
+ addr = addresses.iterator().next();
+ assertEquals("scm1", addr.getHostName());
+ assertEquals(1234, addr.getPort());
+
+ final Map<String, Integer> hostsAndPorts = new HashMap<>();
+ hostsAndPorts.put("scm1", 1234);
+ hostsAndPorts.put("scm2", 2345);
+ hostsAndPorts.put("scm3", 3456);
+
+ // Verify multiple hosts and port
+ conf.setStrings(
+ ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234,scm2:2345,scm3:3456");
+ addresses = getSCMAddressForDatanodes(conf);
+ assertEquals(3, addresses.size());
+ it = addresses.iterator();
+ HashMap<String, Integer> expected1 = new HashMap<>(hostsAndPorts);
+ while (it.hasNext()) {
+ InetSocketAddress current = it.next();
+ assertTrue(expected1.remove(current.getHostName(),
+ current.getPort()));
+ }
+ assertThat(expected1).isEmpty();
+
+ // Verify names with spaces
+ conf.setStrings(
+ ScmConfigKeys.OZONE_SCM_NAMES, " scm1:1234, scm2:2345 , scm3:3456 ");
+ addresses = getSCMAddressForDatanodes(conf);
+ assertEquals(3, addresses.size());
+ it = addresses.iterator();
+ HashMap<String, Integer> expected2 = new HashMap<>(hostsAndPorts);
+ while (it.hasNext()) {
+ InetSocketAddress current = it.next();
+ assertTrue(expected2.remove(current.getHostName(),
+ current.getPort()));
+ }
+ assertThat(expected2).isEmpty();
+
+ // Verify empty value
+ conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "");
+ assertThrows(IllegalArgumentException.class,
+ () -> getSCMAddressForDatanodes(conf),
+ "Empty value should cause an IllegalArgumentException");
+
+ // Verify invalid hostname
+ conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "s..x..:1234");
+ assertThrows(IllegalArgumentException.class,
+ () -> getSCMAddressForDatanodes(conf),
+ "An invalid hostname should cause an IllegalArgumentException");
+
+ // Verify invalid port
+ conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm:xyz");
+ assertThrows(IllegalArgumentException.class,
+ () -> getSCMAddressForDatanodes(conf),
+ "An invalid port should cause an IllegalArgumentException");
+
+ // Verify a mixed case (valid and invalid value both appears)
+ conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234, scm:xyz");
+ assertThrows(IllegalArgumentException.class,
+ () -> getSCMAddressForDatanodes(conf),
+ "An invalid value should cause an IllegalArgumentException");
+ }
+
+ @Test
+ void testGetSCMAddressesWithHAConfig() {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ String scmServiceId = "scmserviceId";
+ String[] nodes = new String[]{"scm1", "scm2", "scm3"};
+ conf.set(ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY, scmServiceId);
+ conf.set(ScmConfigKeys.OZONE_SCM_NODES_KEY + "." + scmServiceId,
+ "scm1,scm2,scm3");
+
+ int port = 9880;
+ List<String> expected = new ArrayList<>();
+ for (String nodeId : nodes) {
+ conf.set(ConfUtils.addKeySuffixes(OZONE_SCM_ADDRESS_KEY,
+ scmServiceId, nodeId), "scm");
+ conf.setInt(ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
+ scmServiceId, nodeId), ++port);
+ expected.add("scm" + ":" + port);
+ }
+
+ Collection<InetSocketAddress> scmAddressList =
+ getSCMAddressForDatanodes(conf);
+
+ assertNotNull(scmAddressList);
+ assertEquals(3, scmAddressList.size());
+
+ for (InetSocketAddress next : scmAddressList) {
+ expected.remove(next.getHostName() + ":" + next.getPort());
+ }
+
+ assertEquals(0, expected.size());
+
+ }
+
}
diff --git
a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/DatanodeSimulator.java
b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/DatanodeSimulator.java
index 300a506305d..cb3769e7fd2 100644
---
a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/DatanodeSimulator.java
+++
b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/DatanodeSimulator.java
@@ -19,10 +19,10 @@
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL_DEFAULT;
-import static org.apache.hadoop.hdds.HddsUtils.getReconAddresses;
-import static org.apache.hadoop.hdds.HddsUtils.getSCMAddressForDatanodes;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT;
+import static
org.apache.hadoop.hdds.utils.HddsServerUtil.getReconAddressForDatanodes;
+import static
org.apache.hadoop.hdds.utils.HddsServerUtil.getSCMAddressForDatanodes;
import static
org.apache.hadoop.hdds.utils.HddsServerUtil.getScmHeartbeatInterval;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmRpcRetryCount;
import static
org.apache.hadoop.hdds.utils.HddsServerUtil.getScmRpcRetryInterval;
@@ -430,7 +430,7 @@ private void init() throws IOException {
scmClients.put(address, createScmClient(address));
}
- reconAddress = getReconAddresses(conf);
+ reconAddress = getReconAddressForDatanodes(conf);
reconClient = createReconClient(reconAddress);
heartbeatScheduler = Executors.newScheduledThreadPool(threadCount);
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java
index 493e340bc5d..c9fc0ab3470 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java
@@ -34,7 +34,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.sql.DataSource;
-import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.cli.GenericCli;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
@@ -381,7 +380,7 @@ private static void loginReconUser(OzoneConfiguration conf)
reconConfig.getKerberosPrincipal(),
reconConfig.getKerberosKeytab());
UserGroupInformation.setConfiguration(conf);
- InetSocketAddress socAddr = HddsUtils.getReconAddresses(conf);
+ InetSocketAddress socAddr =
HddsServerUtil.getReconAddressForDatanodes(conf);
SecurityUtil.login(conf,
OZONE_RECON_KERBEROS_KEYTAB_FILE_KEY,
OZONE_RECON_KERBEROS_PRINCIPAL_KEY,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]