This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new fd8948134ef HDDS-13890. Datanode supports dynamic configuration of SCM
(#9385)
fd8948134ef is described below
commit fd8948134efa1e96505d4446d4657f2ad5003396
Author: Ivan Andika <[email protected]>
AuthorDate: Mon Dec 15 16:48:22 2025 +0800
HDDS-13890. Datanode supports dynamic configuration of SCM (#9385)
---
.../java/org/apache/hadoop/hdds/HddsUtils.java | 35 ++
.../org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java | 7 +-
.../hadoop/hdds/conf/ConfigurationSource.java | 4 +
.../apache/hadoop/ozone/HddsDatanodeService.java | 128 ++++++
.../common/statemachine/DatanodeQueueMetrics.java | 22 +
.../common/statemachine/DatanodeStateMachine.java | 16 +
.../common/statemachine/SCMConnectionManager.java | 20 +-
.../common/statemachine/StateContext.java | 11 +
.../container/common/states/DatanodeState.java | 3 +-
.../states/datanode/RunningDatanodeState.java | 14 +-
.../states/datanode/TestRunningDatanodeState.java | 1 +
.../hdds/scm/server/StorageContainerManager.java | 21 +-
.../scm/TestDatanodeSCMNodesReconfiguration.java | 446 +++++++++++++++++++++
hadoop-ozone/mini-cluster/pom.xml | 4 +
.../hadoop/ozone/MiniOzoneHAClusterImpl.java | 203 ++++++++++
15 files changed, 923 insertions(+), 12 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index 9170f61a3f7..e002de647a8 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -31,6 +31,7 @@
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_KEY;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEFAULT_SERVICE_ID;
@@ -57,10 +58,12 @@
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
+import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import javax.management.ObjectName;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.ConfigRedactor;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
@@ -331,6 +334,38 @@ public static Collection<InetSocketAddress>
getSCMAddressForDatanodes(
}
}
+ /**
+ * Returns the SCM address for datanodes based on the service ID and the SCM
addresses.
+ * @param conf Configuration
+ * @param scmServiceId SCM service ID
+ * @param scmNodeIds Requested SCM node IDs
+ * @return A collection with addresses of the request SCM node IDs.
+ * Null if there is any wrongly configured SCM address. Note that the
returned collection
+ * might not be ordered the same way as the requested SCM node IDs
+ */
+ public static Collection<Pair<String, InetSocketAddress>>
getSCMAddressForDatanodes(
+ ConfigurationSource conf, String scmServiceId, Set<String> scmNodeIds) {
+ Collection<Pair<String, InetSocketAddress>> scmNodeAddress = new
HashSet<>(scmNodeIds.size());
+ for (String scmNodeId : scmNodeIds) {
+ String addressKey = ConfUtils.addKeySuffixes(
+ OZONE_SCM_ADDRESS_KEY, scmServiceId, scmNodeId);
+ String scmAddress = conf.get(addressKey);
+ if (scmAddress == null) {
+ LOG.warn("The SCM address configuration {} is not defined, return
nothing", addressKey);
+ return null;
+ }
+
+ int scmDatanodePort = SCMNodeInfo.getPort(conf, scmServiceId, scmNodeId,
+ OZONE_SCM_DATANODE_ADDRESS_KEY, OZONE_SCM_DATANODE_PORT_KEY,
+ OZONE_SCM_DATANODE_PORT_DEFAULT);
+
+ String scmDatanodeAddressStr = SCMNodeInfo.buildAddress(scmAddress,
scmDatanodePort);
+ InetSocketAddress scmDatanodeAddress =
NetUtils.createSocketAddr(scmDatanodeAddressStr);
+ scmNodeAddress.add(Pair.of(scmNodeId, scmDatanodeAddress));
+ }
+ return scmNodeAddress;
+ }
+
/**
* Retrieve the socket addresses of recon.
*
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java
index d0806b34d58..53bc67b1fe9 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java
@@ -181,12 +181,11 @@ public static List<SCMNodeInfo>
buildNodeInfo(ConfigurationSource conf) {
}
- private static String buildAddress(String address, int port) {
- return new StringBuilder().append(address).append(':')
- .append(port).toString();
+ public static String buildAddress(String address, int port) {
+ return address + ':' + port;
}
- private static int getPort(ConfigurationSource conf,
+ public static int getPort(ConfigurationSource conf,
String scmServiceId, String scmNodeId, String configKey,
String portKey, int defaultPort) {
String suffixKey = ConfUtils.addKeySuffixes(configKey, scmServiceId,
diff --git
a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
index ff659ddba35..5fc7c3c3ddc 100644
---
a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
+++
b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
@@ -98,6 +98,10 @@ default String getTrimmed(String key, String defaultValue) {
default String[] getTrimmedStrings(String name) {
String valueString = get(name);
+ return getTrimmedStringsFromValue(valueString);
+ }
+
+ static String[] getTrimmedStringsFromValue(String valueString) {
if (null == valueString) {
return EMPTY_STRING_ARRAY;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index 3efa83a55cd..28d5d5a2cb1 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -19,6 +19,7 @@
import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.HTTP;
import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.HTTPS;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NODES_KEY;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getRemoteUser;
import static
org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClientWithMaxRetry;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY;
@@ -34,23 +35,33 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.management.ObjectName;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hdds.DatanodeVersion;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.cli.GenericCli;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -72,6 +83,9 @@
import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
+import
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -124,6 +138,7 @@ public class HddsDatanodeService extends GenericCli
implements Callable<Void>, S
private HddsDatanodeClientProtocolServer clientProtocolServer;
private OzoneAdmins admins;
private ReconfigurationHandler reconfigurationHandler;
+ private String scmServiceId;
//Constructor for DataNode PluginService
public HddsDatanodeService() { }
@@ -207,6 +222,7 @@ public void start(OzoneConfiguration configuration) {
start();
}
+ @SuppressWarnings("methodlength")
public void start() {
serviceRuntimeInfo = new DNMXBeanImpl(HddsVersionInfo.HDDS_VERSION_INFO) {
@Override
@@ -294,6 +310,12 @@ public String getNamespace() {
.register(REPLICATION_STREAMS_LIMIT_KEY,
this::reconfigReplicationStreamsLimit);
+ scmServiceId = HddsUtils.getScmServiceId(conf);
+ if (scmServiceId != null) {
+ reconfigurationHandler.register(OZONE_SCM_NODES_KEY + "." +
scmServiceId,
+ this::reconfigScmNodes);
+ }
+
reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback());
datanodeStateMachine = new DatanodeStateMachine(this, datanodeDetails,
conf,
@@ -680,6 +702,112 @@ private String reconfigBlockDeletingServiceTimeout(String
value) {
return value;
}
+ /**
+ * Reconfigure the SCM nodes configuration which will trigger the creation
and removal of
+ * SCM connections based on the difference between the old and the new SCM
nodes configuration.
+ * <p>
+ * The assumption is that the SCM node address configurations exists for all
the involved node IDs
+ * This is because reconfiguration can only support one configuration field
at a time
+ * @param value The new configuration value for "ozone.scm.nodes.SERVICEID"
+ * @return new configuration for "ozone.scm.nodes.SERVICEID" which reflects
the SCMs that the datanode has
+ * is not connected to.
+ */
+ private String reconfigScmNodes(String value) {
+ if (StringUtils.isBlank(value)) {
+ throw new IllegalArgumentException("Reconfiguration failed since setting
the empty SCM nodes " +
+ "configuration is not allowed");
+ }
+ Set<String> previousNodeIds = new
HashSet<>(HddsUtils.getSCMNodeIds(getConf(), scmServiceId));
+ Set<String> newScmNodeIds =
Stream.of(ConfigurationSource.getTrimmedStringsFromValue(value))
+ .collect(Collectors.toSet());
+
+ if (newScmNodeIds.isEmpty()) {
+ throw new IllegalArgumentException("Reconfiguration failed since setting
the empty SCM nodes " +
+ "configuration is not allowed");
+ }
+
+ Set<String> scmNodesIdsToAdd = Sets.difference(newScmNodeIds,
previousNodeIds);
+ Set<String> scmNodesIdsToRemove = Sets.difference(previousNodeIds,
newScmNodeIds);
+
+ // We should only update configuration with the SCMs that are actually
added / removed
+ // If there is partial reconfiguration (e.g. one successful add and one
failed add),
+ // we want to be able to retry on the failed node reconfiguration.
+ // If we don't handle this, the subsequent reconfiguration will not work
since the node
+ // configuration is already exists / removed.
+ Set<String> effectiveScmNodeIds = new HashSet<>(previousNodeIds);
+
+ LOG.info("Reconfiguring SCM nodes for service ID {} with new SCM nodes {}
and remove SCM nodes {}",
+ scmServiceId, scmNodesIdsToAdd, scmNodesIdsToRemove);
+
+ Collection<Pair<String, InetSocketAddress>> scmToAdd =
HddsUtils.getSCMAddressForDatanodes(
+ getConf(), scmServiceId, scmNodesIdsToAdd);
+ if (scmToAdd == null) {
+ throw new IllegalStateException("Reconfiguration failed to get SCM
address to add due to wrong configuration");
+ }
+ Collection<Pair<String, InetSocketAddress>> scmToRemove =
HddsUtils.getSCMAddressForDatanodes(
+ getConf(), scmServiceId, scmNodesIdsToRemove);
+ if (scmToRemove == null) {
+ throw new IllegalArgumentException(
+ "Reconfiguration failed to get SCM address to remove due to wrong
configuration");
+ }
+
+ StateContext context = datanodeStateMachine.getContext();
+ SCMConnectionManager connectionManager =
datanodeStateMachine.getConnectionManager();
+
+ // Assert that the datanode is in RUNNING state since
+ // 1. If the datanode state is INIT, there might be concurrent connection
manager operations
+ // that might cause unpredictable behaviors
+ // 2. If the datanode state is SHUTDOWN, it means that datanode is
shutting down and there is no need
+ // to reconfigure the connections.
+ if (!DatanodeStates.RUNNING.equals(context.getState())) {
+ throw new IllegalStateException("Reconfiguration failed since the
datanode the current state" +
+ context.getState().toString() + " is not in RUNNING state");
+ }
+
+ // Add the new SCM servers
+ for (Pair<String, InetSocketAddress> pair : scmToAdd) {
+ String scmNodeId = pair.getLeft();
+ InetSocketAddress scmAddress = pair.getRight();
+ if (scmAddress.isUnresolved()) {
+ LOG.warn("Reconfiguration failed to add SCM address {} for SCM service
{} since it can't " +
+ "be resolved, skipping", scmAddress, scmServiceId);
+ continue;
+ }
+ try {
+ connectionManager.addSCMServer(scmAddress,
context.getThreadNamePrefix());
+ context.addEndpoint(scmAddress);
+ effectiveScmNodeIds.add(scmNodeId);
+ LOG.info("Reconfiguration successfully add SCM address {} for SCM
service {}", scmAddress, scmServiceId);
+ } catch (IOException e) {
+ LOG.error("Reconfiguration failed to add SCM address {} for SCM
service {}", scmAddress, scmServiceId, e);
+ }
+ }
+
+ // Remove the old SCM server
+ for (Pair<String, InetSocketAddress> pair : scmToRemove) {
+ String scmNodeId = pair.getLeft();
+ InetSocketAddress scmAddress = pair.getRight();
+ try {
+ connectionManager.removeSCMServer(scmAddress);
+ context.removeEndpoint(scmAddress);
+ effectiveScmNodeIds.remove(scmNodeId);
+ LOG.info("Reconfiguration successfully remove SCM address {} for SCM
service {}",
+ scmAddress, scmServiceId);
+ } catch (IOException e) {
+ LOG.error("Reconfiguration failed to remove SCM address {} for SCM
service {}", scmAddress, scmServiceId, e);
+ }
+ }
+
+ // Resize the executor pool size to (number of SCMs + 1 Recon)
+ // Refer to DatanodeStateMachine#getEndPointTaskThreadPoolSize
+
datanodeStateMachine.resizeExecutor(connectionManager.getNumOfConnections());
+
+ // TODO: In the future, we might also do some assertions on the SCM
+ // - The SCM cannot be a leader since this causes the datanode to
disappear
+ // - The SCM should be decommissioned
+ return String.join(",", effectiveScmNodeIds);
+ }
+
/**
* Returns the initial version of the datanode.
*/
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeQueueMetrics.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeQueueMetrics.java
index c0ed734da69..d442b95285d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeQueueMetrics.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeQueueMetrics.java
@@ -19,6 +19,7 @@
import static org.apache.hadoop.metrics2.lib.Interns.info;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import java.net.InetSocketAddress;
import java.util.HashMap;
@@ -171,6 +172,27 @@ public void addEndpoint(InetSocketAddress endpoint) {
.to(CaseFormat.UPPER_CAMEL, k.getHostName())));
}
+ public void removeEndpoint(InetSocketAddress endpoint) {
+ incrementalReportsQueueMap.remove(endpoint);
+ containerActionQueueMap.remove(endpoint);
+ pipelineActionQueueMap.remove(endpoint);
+ }
+
+ @VisibleForTesting
+ public int getIncrementalReportsQueueMapSize() {
+ return incrementalReportsQueueMap.size();
+ }
+
+ @VisibleForTesting
+ public int getContainerActionQueueMapSize() {
+ return containerActionQueueMap.size();
+ }
+
+ @VisibleForTesting
+ public int getPipelineActionQueueMapSize() {
+ return pipelineActionQueueMap.size();
+ }
+
private MetricsInfo getMetricsInfo(String prefix, String metricName) {
String metric = prefix + WordUtils.capitalize(metricName) + "Size";
String description = "Queue size of " + metricName + " from " + prefix;
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index ad64b291e16..f05a14e2109 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -45,6 +45,7 @@
import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.NettyMetrics;
import org.apache.hadoop.hdfs.util.EnumCounters;
@@ -795,4 +796,19 @@ public VolumeChoosingPolicy getVolumeChoosingPolicy() {
public void setNextHB(long time) {
nextHB.set(time);
}
+
+ @VisibleForTesting
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ /**
+ * Resize the executor based on the number of active endpoint tasks.
+ */
+ public void resizeExecutor(int size) {
+ if (executorService instanceof ThreadPoolExecutor) {
+ ThreadPoolExecutor tpe = (ThreadPoolExecutor) executorService;
+ HddsServerUtil.setPoolSize(tpe, size, LOG);
+ }
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
index 27bbb30d77b..e5d586832c5 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
@@ -44,6 +44,7 @@
import org.apache.hadoop.ipc_.RPC;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.NetUtils;
+import
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine.EndPointStates;
import org.apache.hadoop.ozone.protocolPB.ReconDatanodeProtocolPB;
import
org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
@@ -227,15 +228,14 @@ public void addReconServer(InetSocketAddress address,
public void removeSCMServer(InetSocketAddress address) throws IOException {
writeLock();
try {
- if (!scmMachines.containsKey(address)) {
+ EndpointStateMachine endPoint = scmMachines.remove(address);
+ if (endPoint == null) {
LOG.warn("Trying to remove a non-existent SCM machine. " +
"Ignoring the request.");
return;
}
-
- EndpointStateMachine endPoint = scmMachines.get(address);
+ endPoint.setState(EndPointStates.SHUTDOWN);
endPoint.close();
- scmMachines.remove(address);
} finally {
writeUnlock();
}
@@ -274,4 +274,16 @@ public List<EndpointStateMachineMBean> getSCMServers() {
readUnlock();
}
}
+
+ /**
+ * @return the number of connections (both SCM and Recon)
+ */
+ public int getNumOfConnections() {
+ readLock();
+ try {
+ return scmMachines.size();
+ } finally {
+ readUnlock();
+ }
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 24496525a56..150159eb84a 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -911,6 +911,17 @@ public void addEndpoint(InetSocketAddress endpoint) {
}
}
+ public void removeEndpoint(InetSocketAddress endpoint) {
+ this.endpoints.remove(endpoint);
+ this.containerActions.remove(endpoint);
+ this.pipelineActions.remove(endpoint);
+ this.incrementalReportsQueue.remove(endpoint);
+ this.isFullReportReadyToBeSent.remove(endpoint);
+ if (getQueueMetrics() != null) {
+ getQueueMetrics().removeEndpoint(endpoint);
+ }
+ }
+
@VisibleForTesting
public Message getContainerReports() {
return containerReports.get();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
index d1d0f6dc7bd..8eeee8793d1 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
@@ -38,8 +38,9 @@ public interface DatanodeState<T> {
/**
* Executes one or more tasks that is needed by this state.
+ * Note that it is unsafe to call this method concurrently.
*
- * @param executor - ExecutorService
+ * @param executor - ExecutorService that can be used by the DatanodeState
to submit tasks to.
*/
void execute(ExecutorService executor);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
index e246238b541..6b8ea712a93 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
@@ -52,6 +52,10 @@ public class RunningDatanodeState implements DatanodeState {
private final ConfigurationSource conf;
private final StateContext context;
private CompletionService<EndPointStates> ecs;
+ // Since we connectionManager endpoints can be changed by reconfiguration
+ // we should not rely on ConnectionManager#getValues being unchanged between
+ // execute and await
+ private int executingEndpointCount = 0;
public RunningDatanodeState(ConfigurationSource conf,
SCMConnectionManager connectionManager,
@@ -85,6 +89,7 @@ public void onExit() {
@Override
public void execute(ExecutorService executor) {
ecs = new ExecutorCompletionService<>(executor);
+ executingEndpointCount = 0;
for (EndpointStateMachine endpoint : connectionManager.getValues()) {
Callable<EndPointStates> endpointTask = buildEndPointTask(endpoint);
if (endpointTask != null) {
@@ -109,6 +114,7 @@ public void execute(ExecutorService executor) {
throw timeoutEx;
}
});
+ executingEndpointCount++;
} else {
// This can happen if a task is taking more time than the timeOut
// specified for the task in await, and when it is completed the task
@@ -125,6 +131,11 @@ public void
setExecutorCompletionService(ExecutorCompletionService e) {
this.ecs = e;
}
+ @VisibleForTesting
+ public void setExecutingEndpointCount(int executingEndpointCount) {
+ this.executingEndpointCount = executingEndpointCount;
+ }
+
private Callable<EndPointStates> buildEndPointTask(
EndpointStateMachine endpoint) {
switch (endpoint.getState()) {
@@ -199,14 +210,13 @@ private Callable<EndPointStates> buildEndPointTask(
public DatanodeStateMachine.DatanodeStates
await(long duration, TimeUnit timeUnit)
throws InterruptedException {
- int count = connectionManager.getValues().size();
int returned = 0;
long durationMS = timeUnit.toMillis(duration);
long timeLeft = durationMS;
long startTime = Time.monotonicNow();
List<Future<EndPointStates>> results = new LinkedList<>();
- while (returned < count && timeLeft > 0) {
+ while (returned < executingEndpointCount && timeLeft > 0) {
Future<EndPointStates> result =
ecs.poll(timeLeft, TimeUnit.MILLISECONDS);
if (result != null) {
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java
index 84313c82b98..51390ae632b 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java
@@ -64,6 +64,7 @@ public void testAwait() throws InterruptedException {
for (int i = 0; i < threadPoolSize; i++) {
ecs.submit(() -> futureOne.get());
}
+ state.setExecutingEndpointCount(threadPoolSize);
long startTime = Time.monotonicNow();
state.await(500, TimeUnit.MILLISECONDS);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 2a9c5f0ae49..f9a7f94f8e5 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -283,7 +283,7 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
private RootCARotationManager rootCARotationManager;
private ContainerTokenSecretManager containerTokenMgr;
- private final OzoneConfiguration configuration;
+ private OzoneConfiguration configuration;
private SCMContainerMetrics scmContainerMetrics;
private SCMContainerPlacementMetrics placementMetrics;
private PlacementPolicy containerPlacementPolicy;
@@ -603,6 +603,11 @@ public OzoneConfiguration getConfiguration() {
return configuration;
}
+ @VisibleForTesting
+ public void setConfiguration(OzoneConfiguration conf) {
+ this.configuration = conf;
+ }
+
/**
* Create an SCM instance based on the supplied configuration.
*
@@ -2245,6 +2250,20 @@ public boolean removePeerFromHARing(String scmId)
}
+ /**
+ * Check if the input scmId exists in the peers list.
+ * @return true if the nodeId is self, or it exists in peer node list,
+ * false otherwise.
+ */
+ @VisibleForTesting
+ public boolean doesPeerExist(String scmId) {
+ if (getScmId().equals(scmId)) {
+ return true;
+ }
+ return getScmHAManager().getRatisServer().getDivision()
+ .getGroup().getPeer(RaftPeerId.valueOf(scmId)) != null;
+ }
+
public void scmHAMetricsUpdate(String leaderId) {
// unregister, in case metrics already exist
// so that the metric tags will get updated.
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestDatanodeSCMNodesReconfiguration.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestDatanodeSCMNodesReconfiguration.java
new file mode 100644
index 00000000000..d2c90c21d65
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestDatanodeSCMNodesReconfiguration.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NODES_KEY;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmResponseProto;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
+import
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine.EndPointStates;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test datanode's SCM nodes reconfiguration.
+ */
+@Timeout(300)
+public class TestDatanodeSCMNodesReconfiguration {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestDatanodeSCMNodesReconfiguration.class);
+
+ private MiniOzoneHAClusterImpl cluster = null;
+ private String scmServiceId;
+
+ @BeforeEach
+ public void init() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s");
+ conf.set(ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL,
+ "5s");
+ conf.set(ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_GAP, "1");
+ conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
+ MILLISECONDS);
+ conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS);
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
+ conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
+ conf.setQuietMode(false);
+ scmServiceId = "scm-service-test1";
+ cluster = MiniOzoneCluster.newHABuilder(conf)
+ .setOMServiceId("om-service-test1")
+ .setSCMServiceId(scmServiceId)
+ .setNumOfStorageContainerManagers(3)
+ .setNumOfOzoneManagers(3)
+ .build();
+ cluster.waitForClusterToBeReady();
+ }
+
+ @AfterEach
+ public void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * This tests the SCM migration scenario using datanode reconfiguration with
zero restarts.
+ */
+ @Test
+ void testSCMMigration() throws Exception {
+ assertEquals(3, cluster.getStorageContainerManagersList().size());
+ // Bootstrap three new SCMs (there will be 6 SCMs after this)
+ // (SCM ID, SCM Node ID)
+ List<Pair<String, String>> initialSCMs =
cluster.getStorageContainerManagersList().stream()
+ .map(scm -> Pair.of(scm.getScmId(),
scm.getSCMNodeId())).collect(Collectors.toList());
+ String newScmNodeIdPrefix = "newScmNode-";
+ List<String> newSCMIds = new ArrayList<>();
+ for (int i = 1; i <= 3; i++) {
+ String scmNodeId = newScmNodeIdPrefix + i;
+ cluster.bootstrapSCM(scmNodeId, true);
+ StorageContainerManager newSCM = cluster.getSCM(scmNodeId);
+ newSCMIds.add(newSCM.getScmId());
+ }
+ cluster.waitForClusterToBeReady();
+
+ // Reconfigure the datanodes to add the three new SCMs
+ String scmNodesKey = ConfUtils.addKeySuffixes(
+ OZONE_SCM_NODES_KEY, scmServiceId);
+
+ for (HddsDatanodeService datanode: cluster.getHddsDatanodes()) {
+ assertIsPropertyReconfigurable(datanode, scmNodesKey);
+ // SCM addresses need to be added to the datanode configuration first,
reconfiguration will fail otherwise
+ for (StorageContainerManager scm :
cluster.getStorageContainerManagers()) {
+ String scmAddrKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_ADDRESS_KEY, scmServiceId,
scm.getSCMNodeId());
+ datanode.getConf().set(scmAddrKey, cluster.getConf().get(scmAddrKey));
+ String dnPortKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, scmServiceId,
scm.getSCMNodeId());
+ datanode.getConf().set(dnPortKey, cluster.getConf().get(dnPortKey));
+ }
+
+ // Sanity check before reconfiguration, there should still be 3 datanodes
+ assertEquals(3,
datanode.getConf().getTrimmedStringCollection(scmNodesKey).size());
+ // Trigger reconfiguration which will create new connections to the SCMs
+ datanode.getReconfigurationHandler().reconfigureProperty(
+ scmNodesKey, cluster.getConf().get(scmNodesKey)
+ );
+ String newValue = datanode.getConf().get(scmNodesKey);
+
+ // Assert that the datanode has added the new SCMs
+ String[] scmNodeIds = newValue.split(",");
+ assertEquals(6, scmNodeIds.length);
+ for (String scmNodeId : scmNodeIds) {
+ assertTrue(cluster.isSCMActive(scmNodeId));
+ }
+ assertEquals(6,
datanode.getDatanodeStateMachine().getConnectionManager().getSCMServers().size());
+ assertEquals(6,
datanode.getDatanodeStateMachine().getQueueMetrics().getIncrementalReportsQueueMapSize());
+ assertEquals(6,
datanode.getDatanodeStateMachine().getQueueMetrics().getContainerActionQueueMapSize());
+ assertEquals(6,
datanode.getDatanodeStateMachine().getQueueMetrics().getPipelineActionQueueMapSize());
+ // There are no Recon so the thread pool size is equal to the number of
SCMs
+ assertEquals(6, ((ThreadPoolExecutor)
datanode.getDatanodeStateMachine().getExecutorService())
+ .getCorePoolSize());
+ assertEquals(6, ((ThreadPoolExecutor)
datanode.getDatanodeStateMachine().getExecutorService())
+ .getMaximumPoolSize());
+
+ Collection<EndpointStateMachine> scmMachines =
datanode.getDatanodeStateMachine().getConnectionManager()
+ .getValues();
+ for (EndpointStateMachine scmMachine : scmMachines) {
+ assertEquals("SCM", scmMachine.getType());
+ }
+ assertEquals(6, scmMachines.size());
+ }
+
+ // Ensure that the datanodes have registered to the new SCMs
+ cluster.waitForClusterToBeReady();
+
+ GenericTestUtils.waitFor(() -> {
+ for (HddsDatanodeService datanode: cluster.getHddsDatanodes()) {
+ Collection<EndpointStateMachine> scmMachines =
datanode.getDatanodeStateMachine()
+ .getConnectionManager().getValues();
+ for (EndpointStateMachine scmMachine : scmMachines) {
+ if (!scmMachine.getState().equals(EndPointStates.HEARTBEAT)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }, 1000, 30000);
+
+ // Wait until the added datanodes are HEALTHY for all the SCMs, which
indicates
+ // that the added datanode has already registered to the SCMs
+ List<StorageContainerManager> activeSCMs = new ArrayList<>();
+ for (StorageContainerManager scm : cluster.getStorageContainerManagers()) {
+ String scmNodeId = scm.getSCMNodeId();
+ if (cluster.isSCMActive(scmNodeId)) {
+ activeSCMs.add(scm);
+ }
+ }
+ GenericTestUtils.waitFor(() -> {
+ for (StorageContainerManager activeSCM : activeSCMs) {
+ int healthy = activeSCM.getNodeCount(HEALTHY);
+ int staleOrDead = activeSCM.getNodeCount(STALE) +
activeSCM.getNodeCount(DEAD);
+ if (healthy != 3 || staleOrDead != 0) {
+ LOG.info("SCM {} currently has mismatched healthy (current {},
expected 3) or stale/dead DNs " +
+ "(current {}, expected 0), waiting for next checks",
+ activeSCM.getSCMNodeId(), healthy, staleOrDead);
+ return false;
+ }
+ }
+ return true;
+ }, 1000, 120000);
+
+ // Transfer SCM leadership to one of the new SCMs before decommissioning
the old SCMs
+ Collections.shuffle(newSCMIds);
+ String newLeaderScmId = newSCMIds.get(0);
+
cluster.getStorageContainerLocationClient().transferLeadership(newLeaderScmId);
+
+ // Decommission the initial SCMs (there will be 3 SCMs after this)
+ for (Pair<String, String> pair : initialSCMs) {
+ decommissionSCM(pair.getLeft(), pair.getRight());
+ }
+
+ for (HddsDatanodeService datanode : cluster.getHddsDatanodes()) {
+ // Sanity check before reconfiguration, there should still be 6 datanodes
+ assertEquals(6,
datanode.getConf().getTrimmedStringCollection(scmNodesKey).size());
+ // Reconfigure the datanodes to remove the three initial SCMs
+ datanode.getReconfigurationHandler().reconfigureProperty(
+ scmNodesKey, cluster.getConf().get(scmNodesKey)
+ );
+ String newValue = datanode.getConf().get(scmNodesKey);
+
+ // Assert that the datanode have removed the initial SCMs
+ String[] scmNodeIds = newValue.split(",");
+ assertEquals(3, scmNodeIds.length);
+ for (String scmNodeId : scmNodeIds) {
+ assertTrue(cluster.isSCMActive(scmNodeId));
+ }
+ assertEquals(3,
datanode.getDatanodeStateMachine().getConnectionManager().getSCMServers().size());
+ assertEquals(3, datanode.getDatanodeStateMachine().getQueueMetrics()
+ .getIncrementalReportsQueueMapSize());
+ assertEquals(3, datanode.getDatanodeStateMachine().getQueueMetrics()
+ .getContainerActionQueueMapSize());
+ assertEquals(3,
datanode.getDatanodeStateMachine().getQueueMetrics().getPipelineActionQueueMapSize());
+ // There are no Recon so the thread pool size is equal to the number of
SCMs
+ assertEquals(3, ((ThreadPoolExecutor)
datanode.getDatanodeStateMachine().getExecutorService())
+ .getCorePoolSize());
+ assertEquals(3, ((ThreadPoolExecutor)
datanode.getDatanodeStateMachine().getExecutorService())
+ .getMaximumPoolSize());
+
+ Collection<EndpointStateMachine> scmMachines =
datanode.getDatanodeStateMachine()
+ .getConnectionManager().getValues();
+ for (EndpointStateMachine scmMachine : scmMachines) {
+ assertEquals("SCM", scmMachine.getType());
+ }
+ assertEquals(3, scmMachines.size());
+ }
+ }
+
+ /**
+ * Test the addition and removal of a single SCM using datanode
reconfiguration.
+ * @throws Exception
+ */
+ @Test
+ void testAddAndRemoveOneSCM() throws Exception {
+ assertEquals(3, cluster.getStorageContainerManagersList().size());
+ // Bootstrap a single SCM
+ String newScmNodeId = "newScmNode-1";
+ cluster.bootstrapSCM(newScmNodeId, true);
+ StorageContainerManager newSCM = cluster.getSCM(newScmNodeId);
+ cluster.waitForClusterToBeReady();
+
+ // Reconfigure the datanode to add the new SCM
+ String scmNodesKey = ConfUtils.addKeySuffixes(OZONE_SCM_NODES_KEY,
scmServiceId);
+
+ for (HddsDatanodeService datanode: cluster.getHddsDatanodes()) {
+ assertIsPropertyReconfigurable(datanode, scmNodesKey);
+ // SCM addresses need to be added to the datanode configuration first,
reconfiguration will fail otherwise
+ for (StorageContainerManager scm :
cluster.getStorageContainerManagers()) {
+ String scmAddrKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_ADDRESS_KEY, scmServiceId,
scm.getSCMNodeId());
+ datanode.getConf().set(scmAddrKey, cluster.getConf().get(scmAddrKey));
+ String dnPortKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, scmServiceId,
scm.getSCMNodeId());
+ datanode.getConf().set(dnPortKey, cluster.getConf().get(dnPortKey));
+ }
+
+ // Sanity check before reconfiguration, there should still be 3 datanodes
+ assertEquals(3,
datanode.getConf().getTrimmedStringCollection(scmNodesKey).size());
+ // Trigger reconfiguration which will create new connections to the new
SCM
+ datanode.getReconfigurationHandler().reconfigureProperty(
+ scmNodesKey, cluster.getConf().get(scmNodesKey)
+ );
+ String newValue = datanode.getConf().get(scmNodesKey);
+
+ // Assert that the datanode has added the new SCMs
+ String[] scmNodeIds = newValue.split(",");
+ assertEquals(4, scmNodeIds.length);
+ for (String scmNodeId : scmNodeIds) {
+ assertTrue(cluster.isSCMActive(scmNodeId));
+ }
+ assertEquals(4,
datanode.getDatanodeStateMachine().getConnectionManager().getSCMServers().size());
+ assertEquals(4, datanode.getDatanodeStateMachine().getQueueMetrics()
+ .getIncrementalReportsQueueMapSize());
+ assertEquals(4, datanode.getDatanodeStateMachine().getQueueMetrics()
+ .getContainerActionQueueMapSize());
+ assertEquals(4, datanode.getDatanodeStateMachine().getQueueMetrics()
+ .getPipelineActionQueueMapSize());
+ // There are no Recon so the thread pool size is equal to the number of
SCMs
+ assertEquals(4, ((ThreadPoolExecutor)
datanode.getDatanodeStateMachine().getExecutorService())
+ .getCorePoolSize());
+ assertEquals(4, ((ThreadPoolExecutor)
datanode.getDatanodeStateMachine().getExecutorService())
+ .getMaximumPoolSize());
+ Collection<EndpointStateMachine> scmMachines =
datanode.getDatanodeStateMachine()
+ .getConnectionManager().getValues();
+ for (EndpointStateMachine scmMachine : scmMachines) {
+ assertEquals("SCM", scmMachine.getType());
+ }
+ assertEquals(4, scmMachines.size());
+ }
+
+ // Ensure that the datanodes have registered to the new SCM
+ cluster.waitForClusterToBeReady();
+
+ GenericTestUtils.waitFor(() -> {
+ for (HddsDatanodeService datanode: cluster.getHddsDatanodes()) {
+ Collection<EndpointStateMachine> scmMachines =
datanode.getDatanodeStateMachine()
+ .getConnectionManager().getValues();
+ for (EndpointStateMachine scmMachine : scmMachines) {
+ if (!scmMachine.getState().equals(EndPointStates.HEARTBEAT)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }, 1000, 30000);
+
+ // Wait until the added datanodes are HEALTHY for all the SCMs, which
indicates
+ // that the added datanode has already registered to the SCMs
+ List<StorageContainerManager> activeSCMs = new ArrayList<>();
+ for (StorageContainerManager scm : cluster.getStorageContainerManagers()) {
+ String scmNodeId = scm.getSCMNodeId();
+ if (cluster.isSCMActive(scmNodeId)) {
+ activeSCMs.add(scm);
+ }
+ }
+ GenericTestUtils.waitFor(() -> {
+ for (StorageContainerManager activeSCM : activeSCMs) {
+ int healthy = activeSCM.getNodeCount(HEALTHY);
+ int staleOrDead = activeSCM.getNodeCount(STALE) +
activeSCM.getNodeCount(DEAD);
+ if (healthy != 3 || staleOrDead != 0) {
+ LOG.info("SCM {} currently has mismatched healthy (current {},
expected 3) or stale/dead DNs " +
+ "(current {}, expected 0), waiting for next checks",
+ activeSCM.getSCMNodeId(), healthy, staleOrDead);
+ return false;
+ }
+ }
+ return true;
+ }, 1000, 120000);
+
+ // Now reconfigure DN to remove the new SCM
+ Collection<String> scmNodes =
cluster.getConf().getTrimmedStringCollection(scmNodesKey);
+ scmNodes.remove(newScmNodeId);
+ cluster.getConf().setStrings(scmNodesKey, scmNodes.toArray(new String[0]));
+
+ for (HddsDatanodeService datanode : cluster.getHddsDatanodes()) {
+ // Sanity check before reconfiguration, there should be 4 SCMs in the
configuration
+ assertEquals(4,
datanode.getConf().getTrimmedStringCollection(scmNodesKey).size());
+ // Reconfigure the datanodes to remove the new SCM
+ datanode.getReconfigurationHandler().reconfigureProperty(
+ scmNodesKey, cluster.getConf().get(scmNodesKey)
+ );
+ String newValue = datanode.getConf().get(scmNodesKey);
+
+ // Assert that the datanode have removed the initial SCMs
+ String[] scmNodeIds = newValue.split(",");
+ assertEquals(3, scmNodeIds.length);
+ for (String scmNodeId : scmNodeIds) {
+ assertTrue(cluster.isSCMActive(scmNodeId));
+ }
+ assertEquals(3,
datanode.getDatanodeStateMachine().getConnectionManager().getSCMServers().size());
+ assertEquals(3, datanode.getDatanodeStateMachine().getQueueMetrics().
+ getIncrementalReportsQueueMapSize());
+ assertEquals(3, datanode.getDatanodeStateMachine().getQueueMetrics()
+ .getContainerActionQueueMapSize());
+ assertEquals(3, datanode.getDatanodeStateMachine().getQueueMetrics()
+ .getPipelineActionQueueMapSize());
+ // There are no Recon so the thread pool size is equal to the number of
SCMs
+ assertEquals(3, ((ThreadPoolExecutor)
datanode.getDatanodeStateMachine().getExecutorService())
+ .getCorePoolSize());
+ assertEquals(3, ((ThreadPoolExecutor)
datanode.getDatanodeStateMachine().getExecutorService())
+ .getMaximumPoolSize());
+
+ Collection<EndpointStateMachine> scmMachines =
datanode.getDatanodeStateMachine()
+ .getConnectionManager().getValues();
+ for (EndpointStateMachine scmMachine : scmMachines) {
+ assertEquals("SCM", scmMachine.getType());
+ }
+ assertEquals(3, scmMachines.size());
+ }
+
+ // Since all DN has stopped sending heartbeats to the new SCM, the new SCM
should mark
+ // these DNs as STALE/DEAD
+ GenericTestUtils.waitFor(() -> {
+ int healthy = newSCM.getNodeCount(HEALTHY);
+ int staleOrDead = newSCM.getNodeCount(STALE) + newSCM.getNodeCount(DEAD);
+ return healthy == 0 || staleOrDead == 3;
+ }, 1000, 120000);
+ }
+
+ private void decommissionSCM(String decommScmId, String decommScmNodeId)
throws Exception {
+ // Decommissioned SCM will be stopped automatically, see
SCMStateMachine#close
+ DecommissionScmResponseProto response =
cluster.getStorageContainerLocationClient().decommissionScm(decommScmId);
+ assertTrue(response.getSuccess());
+ assertTrue(StringUtils.isEmpty(response.getErrorMsg()));
+
+ cluster.deactivateSCM(decommScmNodeId);
+
+ List<StorageContainerManager> activeSCMs = new ArrayList<>();
+ for (StorageContainerManager scm : cluster.getStorageContainerManagers()) {
+ String scmNodeId = scm.getSCMNodeId();
+ if (cluster.isSCMActive(scmNodeId)) {
+ activeSCMs.add(scm);
+ }
+ }
+
+ // Update the configuration
+ String scmNodesKey = ConfUtils.addKeySuffixes(
+ OZONE_SCM_NODES_KEY, scmServiceId);
+
+ Collection<String> scmNodes =
cluster.getConf().getTrimmedStringCollection(scmNodesKey);
+ scmNodes.remove(decommScmNodeId);
+ cluster.getConf().setStrings(scmNodesKey, scmNodes.toArray(new String[0]));
+
+ // Verify decomm node is removed from the HA ring
+ GenericTestUtils.waitFor(() -> {
+ for (StorageContainerManager scm : activeSCMs) {
+ if (scm.doesPeerExist(decommScmId)) {
+ return false;
+ }
+ }
+ return true;
+ }, 100, 60000);
+
+ cluster.waitForClusterToBeReady();
+ }
+
+ private void assertIsPropertyReconfigurable(HddsDatanodeService datanode,
String configKey) throws IOException {
+
assertTrue(datanode.getReconfigurationHandler().isPropertyReconfigurable(configKey));
+
assertTrue(datanode.getReconfigurationHandler().listReconfigureProperties().contains(configKey));
+ }
+}
diff --git a/hadoop-ozone/mini-cluster/pom.xml
b/hadoop-ozone/mini-cluster/pom.xml
index a96cfc23184..055ecc8ee8c 100644
--- a/hadoop-ozone/mini-cluster/pom.xml
+++ b/hadoop-ozone/mini-cluster/pom.xml
@@ -109,6 +109,10 @@
<groupId>org.apache.ratis</groupId>
<artifactId>ratis-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.ratis</groupId>
+ <artifactId>ratis-server-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
diff --git
a/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
b/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
index f1173496583..d3369534617 100644
---
a/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
+++
b/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -117,6 +117,10 @@ public boolean isOMActive(String omNodeId) {
return omhaService.isServiceActive(omNodeId);
}
+ public boolean isSCMActive(String scmNodeId) {
+ return scmhaService.isServiceActive(scmNodeId);
+ }
+
public Iterator<StorageContainerManager> getInactiveSCM() {
return scmhaService.inactiveServices();
}
@@ -151,6 +155,20 @@ public StorageContainerManager getScmLeader() {
.findFirst().orElse(null);
}
+ public StorageContainerManager getScmLeader(boolean waitForLeaderElection)
+ throws TimeoutException, InterruptedException {
+ if (waitForLeaderElection) {
+ final StorageContainerManager[] scm = new StorageContainerManager[1];
+ GenericTestUtils.waitFor(() -> {
+ scm[0] = getScmLeader();
+ return scm[0] != null;
+ }, 200, waitForClusterToBeReadyTimeout);
+ return scm[0];
+ } else {
+ return getScmLeader();
+ }
+ }
+
public OzoneManager waitForLeaderOM()
throws TimeoutException, InterruptedException {
final OzoneManager[] om = new OzoneManager[1];
@@ -332,6 +350,20 @@ private static void configureOMPorts(ConfigurationTarget
conf,
conf.setInt(omRatisPortKey, getFreePort());
}
+ private void stopAndDeactivate(StorageContainerManager scm) {
+ stopSCM(scm.getSCMNodeId());
+ scmhaService.deactivate(scm);
+ }
+
+ public void stopSCM(String scmNodeId) {
+ stopAndDeactivate(scmhaService.getServiceById(scmNodeId));
+ }
+
+ public void deactivateSCM(String scmNodeId) {
+ StorageContainerManager scm = scmhaService.getServiceById(scmNodeId);
+ scmhaService.deactivate(scm);
+ }
+
/**
* Builder for configuring the MiniOzoneCluster to run.
*/
@@ -902,6 +934,177 @@ public void setupExitManagerForTesting() {
}
}
+ private OzoneConfiguration addNewSCMToConfig(String scmServiceId, String
scmNodeId) {
+ OzoneConfiguration newConf = new OzoneConfiguration(getConf());
+ StringBuilder scmNames = new StringBuilder();
+ scmNames.append(newConf.get(ScmConfigKeys.OZONE_SCM_NAMES));
+ configureSCMPorts(newConf, scmServiceId, scmNodeId, scmNames);
+
+ String scmNodesKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_NODES_KEY, scmServiceId);
+ newConf.set(scmNodesKey, newConf.get(scmNodesKey) + "," + scmNodeId);
+ newConf.set(ScmConfigKeys.OZONE_SCM_NAMES, scmNames.toString());
+
+ return newConf;
+ }
+
+ private static void configureSCMPorts(ConfigurationTarget conf, String
scmServiceId, String scmNodeId,
+ StringBuilder scmNames) {
+ String scmAddrKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_ADDRESS_KEY, scmServiceId, scmNodeId);
+ String scmHttpAddrKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY, scmServiceId, scmNodeId);
+ String scmHttpsAddrKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_HTTPS_ADDRESS_KEY, scmServiceId, scmNodeId);
+ String scmRatisPortKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY, scmServiceId, scmNodeId);
+ String dnPortKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY,
+ scmServiceId, scmNodeId);
+ String blockClientKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY,
+ scmServiceId, scmNodeId);
+ String ssClientKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY,
+ scmServiceId, scmNodeId);
+ String scmGrpcPortKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY, scmServiceId, scmNodeId);
+ String scmSecurityAddrKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY, scmServiceId,
+ scmNodeId);
+
+ conf.set(scmAddrKey, "127.0.0.1");
+ conf.set(scmHttpAddrKey, localhostWithFreePort());
+ conf.set(scmHttpsAddrKey, localhostWithFreePort());
+ conf.set(scmSecurityAddrKey, localhostWithFreePort());
+ conf.set("ozone.scm.update.service.port", "0");
+
+ int ratisPort = getFreePort();
+ conf.setInt(scmRatisPortKey, ratisPort);
+ //conf.setInt("ozone.scm.ha.ratis.bind.port", ratisPort);
+
+ int dnPort = getFreePort();
+ conf.set(dnPortKey, "127.0.0.1:" + dnPort);
+ scmNames.append(",localhost:").append(dnPort);
+
+ conf.set(ssClientKey, localhostWithFreePort());
+ conf.setInt(scmGrpcPortKey, getFreePort());
+
+ String blockAddress = localhostWithFreePort();
+ conf.set(blockClientKey, blockAddress);
+ conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY,
+ blockAddress);
+ }
+
+ /**
+ * Update the configurations of the given list of SCMs on an SCM HA Service.
+ */
+ public void updateSCMConfigs(OzoneConfiguration newConf) {
+ for (StorageContainerManager scm : scmhaService.getActiveServices()) {
+ scm.setConfiguration(newConf);
+ }
+ }
+
+ public void bootstrapSCM(String scmNodeId, boolean updateConfigs) throws
Exception {
+ int retryCount = 0;
+ StorageContainerManager scm = null;
+
+ StorageContainerManager scmLeader = getScmLeader(true);
+ long leaderSnapshotIndex =
scmLeader.getScmHAManager().getRatisServer().getSCMStateMachine()
+ .getLatestSnapshot().getIndex();
+
+ while (true) {
+ try {
+ OzoneConfiguration newConf =
addNewSCMToConfig(scmhaService.getServiceId(), scmNodeId);
+
+ if (updateConfigs) {
+ updateSCMConfigs(newConf);
+ }
+
+ scm = bootstrapNewSCM(scmNodeId, newConf);
+
+ LOG.info("Bootstrapped SCM {} RPC server at {}", scmNodeId,
+ scm.getClientRpcAddress());
+
+ // Add new SCMs to cluster's in memory map and update existing SCMs
conf.
+ setConf(newConf);
+
+ break;
+ } catch (IOException e) {
+ // Existing SCM config could have been updated with new conf. Reset it.
+ for (StorageContainerManager existingSCM : scmhaService.getServices())
{
+ existingSCM.setConfiguration(getConf());
+ }
+ if (e instanceof BindException ||
+ e.getCause() instanceof BindException) {
+ ++retryCount;
+ LOG.info("MiniOzoneHACluster port conflicts, retried {} times",
+ retryCount, e);
+ } else {
+ throw e;
+ }
+ }
+
+ waitForBootstrappedNodeToBeReady(scm, leaderSnapshotIndex);
+ if (updateConfigs) {
+ waitForConfigUpdateOnActiveSCMs(scmNodeId);
+ }
+ }
+ }
+
+ /**
+ * Start a new SCM in Bootstrap mode. Configs (address and ports) for the new
+ * SCM must already be set in the newConf.
+ */
+ private StorageContainerManager bootstrapNewSCM(String nodeId,
+ OzoneConfiguration newConf) throws Exception {
+ OzoneConfiguration config = new OzoneConfiguration(newConf);
+
+ // For bootstrapping node, set the nodeId config also.
+ config.set(ScmConfigKeys.OZONE_SCM_NODE_ID_KEY, nodeId);
+
+ // Set metadata/DB dir base path
+ String metaDirPath = clusterMetaPath + "/" + nodeId;
+ config.set(OZONE_METADATA_DIRS, metaDirPath);
+
+ StorageContainerManager.scmBootstrap(config);
+ StorageContainerManager scm = StorageContainerManager.createSCM(config);
+
+ scmhaService.addInstance(scm, false);
+ startInactiveSCM(nodeId);
+
+ return scm;
+ }
+
+ private void waitForBootstrappedNodeToBeReady(StorageContainerManager newSCM,
+ long leaderSnapshotIndex) throws Exception {
+ // Wait for bootstrapped nodes to catch up with others
+ GenericTestUtils.waitFor(() -> {
+ return newSCM.getScmHAManager().getRatisServer().getSCMStateMachine()
+ .getLatestSnapshot().getIndex() >= leaderSnapshotIndex;
+ }, 1000, waitForClusterToBeReadyTimeout);
+ }
+
+ private void waitForConfigUpdateOnActiveSCMs(
+ String newSCMNodeId) throws Exception {
+ StorageContainerManager newSCMNode = scmhaService
+ .getServiceById(newSCMNodeId);
+ GenericTestUtils.waitFor(() -> {
+ // Each existing active SCM should contain the new SCM in its peerList.
+ // Also, the new SCM should contain each existing active SCM in it's SCM
+ // peer list and RatisServer peerList.
+ for (StorageContainerManager scm : scmhaService.getActiveServices()) {
+ if (!scm.doesPeerExist(scm.getScmId())) {
+ return false;
+ }
+ if (!newSCMNode.doesPeerExist(scm.getSCMNodeId())) {
+ return false;
+ }
+ }
+ return true;
+ }, 1000, waitForClusterToBeReadyTimeout);
+ }
+
/**
* MiniOzoneHAService is a helper class used for both SCM and OM HA.
* This class keeps track of active and inactive OM/SCM services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]