This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e76d99041f HDDS-12088. Speed up TestStorageContainerManager (#7706)
e76d99041f is described below
commit e76d99041fb2150b402181183a998826f10e6c96
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Fri Jan 17 21:09:08 2025 +0100
HDDS-12088. Speed up TestStorageContainerManager (#7706)
---
.../hdds/scm/TestStorageContainerManager.java | 470 +++++++++------------
1 file changed, 193 insertions(+), 277 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java
index b00c7f8040..c7e6e96284 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java
@@ -21,9 +21,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
-import org.apache.hadoop.hdds.conf.DefaultConfigManager;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -89,30 +89,25 @@ import
org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
-import org.mockito.ArgumentMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.net.UnknownHostException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
@@ -131,7 +126,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Predicate;
import java.util.stream.Stream;
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
@@ -162,55 +156,57 @@ import static org.mockito.Mockito.verify;
*/
@Timeout(900)
public class TestStorageContainerManager {
+ private static final int KEY_COUNT = 5;
private static final String LOCALHOST_IP = "127.0.0.1";
- private static XceiverClientManager xceiverClientManager;
private static final Logger LOG = LoggerFactory.getLogger(
TestStorageContainerManager.class);
- @BeforeAll
- public static void setup() throws IOException {
- xceiverClientManager = new XceiverClientManager(new OzoneConfiguration());
- }
+ /** This runs most test cases in a single cluster. */
+ @Test
+ void test(@TempDir Path tempDir) throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ configureTopology(conf);
+ configureBlockDeletion(conf);
+ Path scmPath = tempDir.resolve("scm-meta");
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
- @AfterAll
- public static void cleanup() {
- if (xceiverClientManager != null) {
- xceiverClientManager.close();
- }
- }
+ try (MiniOzoneCluster cluster =
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build()) {
+ cluster.waitForClusterToBeReady();
- @AfterEach
- public void cleanupDefaults() {
- DefaultConfigManager.clearDefaultConfigs();
+ // non-destructive test cases
+ testBlockDeletionTransactions(cluster);
+ testRpcPermission(cluster);
+ testScmProcessDatanodeHeartbeat(cluster);
+
+ StorageContainerManager scm = cluster.getStorageContainerManager();
+ List<File> directories = Arrays.asList(
+ new File(SCMHAUtils.getRatisStorageDir(scm.getConfiguration())),
+ scm.getScmMetadataStore().getStore().getDbLocation(),
+ new File(scm.getScmStorageConfig().getStorageDir())
+ );
+
+ // re-init
+ testSCMReinitialization(cluster);
+
+ // re-init after delete
+ directories.forEach(FileUtil::fullyDelete);
+ testOldDNRegistersToReInitialisedSCM(cluster);
+ }
}
- @Test
- public void testRpcPermission() throws Exception {
+ private void testRpcPermission(MiniOzoneCluster cluster) throws Exception {
// Test with default configuration
- OzoneConfiguration defaultConf = new OzoneConfiguration();
- testRpcPermissionWithConf(defaultConf, any -> false, "unknownUser");
+ testRpcPermission(cluster, "anyUser", true);
+
+ // Update ozone.administrators in configuration
+ cluster.getStorageContainerManager()
+ .getReconfigurationHandler()
+ .reconfigureProperty(OzoneConfigKeys.OZONE_ADMINISTRATORS,
"adminUser1, adminUser2");
- // Test with ozone.administrators defined in configuration
- String admins = "adminUser1, adminUser2";
- OzoneConfiguration ozoneConf = new OzoneConfiguration();
- ozoneConf.setStrings(OzoneConfigKeys.OZONE_ADMINISTRATORS, admins);
// Non-admin user will get permission denied.
+ testRpcPermission(cluster, "unknownUser", true);
// Admin user will pass the permission check.
- testRpcPermissionWithConf(ozoneConf, admins::contains,
- "unknownUser", "adminUser2");
- }
-
- private void testRpcPermissionWithConf(
- OzoneConfiguration ozoneConf,
- Predicate<String> isAdmin,
- String... usernames) throws Exception {
- try (MiniOzoneCluster cluster =
MiniOzoneCluster.newBuilder(ozoneConf).build()) {
- cluster.waitForClusterToBeReady();
- for (String username : usernames) {
- testRpcPermission(cluster, username,
- !isAdmin.test(username));
- }
- } // The cluster is automatically closed here
+ testRpcPermission(cluster, "adminUser2", false);
}
private void testRpcPermission(MiniOzoneCluster cluster,
@@ -260,10 +256,64 @@ public class TestStorageContainerManager {
assertEquals(expectedErrorMessage, e.getMessage());
}
- @Test
- public void testBlockDeletionTransactions() throws Exception {
- int numKeys = 5;
- OzoneConfiguration conf = new OzoneConfiguration();
+ private void testBlockDeletionTransactions(MiniOzoneCluster cluster) throws
Exception {
+ DeletedBlockLog delLog = cluster.getStorageContainerManager()
+ .getScmBlockManager().getDeletedBlockLog();
+ assertEquals(0, delLog.getNumOfValidTransactions());
+
+ Map<String, OmKeyInfo> keyLocations = TestDataUtil.createKeys(cluster,
KEY_COUNT);
+ // Wait for container report
+ Thread.sleep(1000);
+ for (OmKeyInfo keyInfo : keyLocations.values()) {
+ OzoneTestUtils.closeContainers(keyInfo.getKeyLocationVersions(),
+ cluster.getStorageContainerManager());
+ }
+ Map<Long, List<Long>> containerBlocks = createDeleteTXLog(
+ cluster.getStorageContainerManager(),
+ delLog, keyLocations, cluster);
+
+ // Verify a few TX gets created in the TX log.
+ assertThat(delLog.getNumOfValidTransactions()).isGreaterThan(0);
+
+ // Once TXs are written into the log, SCM starts to fetch TX
+ // entries from the log and schedule block deletions in HB interval,
+ // after sometime, all the TX should be proceed and by then
+ // the number of containerBlocks of all known containers will be
+ // empty again.
+ OzoneTestUtils.waitBlockDeleted(cluster.getStorageContainerManager());
+ assertTrue(verifyBlocksWithTxnTable(cluster, containerBlocks));
+ // Continue the work, add some TXs that with known container names,
+ // but unknown block IDs.
+ for (Long containerID : containerBlocks.keySet()) {
+ // Add 2 TXs per container.
+ Map<Long, List<Long>> deletedBlocks = new HashMap<>();
+ List<Long> blocks = new ArrayList<>();
+ blocks.add(RandomUtils.nextLong());
+ blocks.add(RandomUtils.nextLong());
+ deletedBlocks.put(containerID, blocks);
+ addTransactions(cluster.getStorageContainerManager(), delLog,
+ deletedBlocks);
+ }
+
+ // Verify a few TX gets created in the TX log.
+ assertThat(delLog.getNumOfValidTransactions()).isGreaterThan(0);
+
+ // These blocks cannot be found in the container, skip deleting them
+ // eventually these TX will success.
+ GenericTestUtils.waitFor(() -> {
+ try {
+ if (SCMHAUtils.isSCMHAEnabled(cluster.getConf())) {
+ cluster.getStorageContainerManager().getScmHAManager()
+ .asSCMHADBTransactionBuffer().flush();
+ }
+ return delLog.getFailedTransactions(-1, 0).size() == 0;
+ } catch (IOException e) {
+ return false;
+ }
+ }, 1000, 20000);
+ }
+
+ private static void configureBlockDeletion(OzoneConfiguration conf) {
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
TimeUnit.MILLISECONDS);
DatanodeConfiguration datanodeConfiguration = conf.getObject(
@@ -289,153 +339,71 @@ public class TestStorageContainerManager {
conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5);
// Reset container provision size, otherwise only one container
// is created by default.
- conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
- numKeys);
-
- try (MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
- .build()) {
- cluster.waitForClusterToBeReady();
- DeletedBlockLog delLog = cluster.getStorageContainerManager()
- .getScmBlockManager().getDeletedBlockLog();
- assertEquals(0, delLog.getNumOfValidTransactions());
-
- Map<String, OmKeyInfo> keyLocations = TestDataUtil.createKeys(cluster,
numKeys);
- // Wait for container report
- Thread.sleep(1000);
- for (OmKeyInfo keyInfo : keyLocations.values()) {
- OzoneTestUtils.closeContainers(keyInfo.getKeyLocationVersions(),
- cluster.getStorageContainerManager());
- }
- Map<Long, List<Long>> containerBlocks = createDeleteTXLog(
- cluster.getStorageContainerManager(),
- delLog, keyLocations, cluster, conf);
-
- // Verify a few TX gets created in the TX log.
- assertThat(delLog.getNumOfValidTransactions()).isGreaterThan(0);
-
- // Once TXs are written into the log, SCM starts to fetch TX
- // entries from the log and schedule block deletions in HB interval,
- // after sometime, all the TX should be proceed and by then
- // the number of containerBlocks of all known containers will be
- // empty again.
- OzoneTestUtils.waitBlockDeleted(cluster.getStorageContainerManager());
- assertTrue(verifyBlocksWithTxnTable(cluster, conf, containerBlocks));
- // Continue the work, add some TXs that with known container names,
- // but unknown block IDs.
- for (Long containerID : containerBlocks.keySet()) {
- // Add 2 TXs per container.
- Map<Long, List<Long>> deletedBlocks = new HashMap<>();
- List<Long> blocks = new ArrayList<>();
- blocks.add(RandomUtils.nextLong());
- blocks.add(RandomUtils.nextLong());
- deletedBlocks.put(containerID, blocks);
- addTransactions(cluster.getStorageContainerManager(), delLog,
- deletedBlocks);
- }
-
- // Verify a few TX gets created in the TX log.
- assertThat(delLog.getNumOfValidTransactions()).isGreaterThan(0);
-
- // These blocks cannot be found in the container, skip deleting them
- // eventually these TX will success.
- GenericTestUtils.waitFor(() -> {
- try {
- if (SCMHAUtils.isSCMHAEnabled(cluster.getConf())) {
- cluster.getStorageContainerManager().getScmHAManager()
- .asSCMHADBTransactionBuffer().flush();
- }
- return delLog.getFailedTransactions(-1, 0).size() == 0;
- } catch (IOException e) {
- return false;
- }
- }, 1000, 20000);
- }
+ conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 10 *
KEY_COUNT);
}
- @Test
- public void testOldDNRegistersToReInitialisedSCM() throws Exception {
- OzoneConfiguration conf = new OzoneConfiguration();
- conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1000, TimeUnit.MILLISECONDS);
- conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
3000, TimeUnit.MILLISECONDS);
-
-
-
- try (MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(1)
- .build()) {
- cluster.waitForClusterToBeReady();
- HddsDatanodeService datanode = cluster.getHddsDatanodes().get(0);
- StorageContainerManager scm = cluster.getStorageContainerManager();
- File dbDir = scm.getScmMetadataStore().getStore().getDbLocation();
- scm.stop();
-
- // re-initialise SCM with new clusterID
-
- GenericTestUtils.deleteDirectory(new
File(SCMHAUtils.getRatisStorageDir(conf)));
- GenericTestUtils.deleteDirectory(dbDir);
- GenericTestUtils.deleteDirectory(
- new File(scm.getScmStorageConfig().getStorageDir()));
- String newClusterId = UUID.randomUUID().toString();
- StorageContainerManager.scmInit(scm.getConfiguration(), newClusterId);
- scm = HddsTestUtils.getScmSimple(scm.getConfiguration());
-
- DatanodeStateMachine dsm = datanode.getDatanodeStateMachine();
- assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
+ // assumes SCM is already stopped
+ private void testOldDNRegistersToReInitialisedSCM(MiniOzoneCluster cluster)
throws Exception {
+ HddsDatanodeService datanode = cluster.getHddsDatanodes().get(0);
+
+ // re-initialise SCM with new clusterID
+ String newClusterId = UUID.randomUUID().toString();
+ StorageContainerManager.scmInit(cluster.getConf(), newClusterId);
+ StorageContainerManager scm =
HddsTestUtils.getScmSimple(cluster.getConf());
+
+ DatanodeStateMachine dsm = datanode.getDatanodeStateMachine();
+ assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
+ dsm.getContext().getState());
+ // DN Endpoint State has already gone through GetVersion and Register,
+ // so it will be in HEARTBEAT state.
+ for (EndpointStateMachine endpoint : dsm.getConnectionManager()
+ .getValues()) {
+ assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT,
+ endpoint.getState());
+ }
+ GenericTestUtils.LogCapturer scmDnHBDispatcherLog =
+ GenericTestUtils.LogCapturer.captureLogs(
+ SCMDatanodeHeartbeatDispatcher.LOG);
+ LogManager.getLogger(HeartbeatEndpointTask.class).setLevel(Level.DEBUG);
+ GenericTestUtils.LogCapturer heartbeatEndpointTaskLog =
+ GenericTestUtils.LogCapturer.captureLogs(HeartbeatEndpointTask.LOG);
+ GenericTestUtils.LogCapturer versionEndPointTaskLog =
+ GenericTestUtils.LogCapturer.captureLogs(VersionEndpointTask.LOG);
+ // Initially empty
+ assertThat(scmDnHBDispatcherLog.getOutput()).isEmpty();
+ assertThat(versionEndPointTaskLog.getOutput()).isEmpty();
+ // start the new SCM
+ try {
+ scm.start();
+ // DN heartbeats to new SCM, SCM doesn't recognize the node, sends the
+ // command to DN to re-register. Wait for SCM to send re-register command
+ String expectedLog = String.format(
+ "SCM received heartbeat from an unregistered datanode %s. "
+ + "Asking datanode to re-register.",
+ datanode.getDatanodeDetails());
+ GenericTestUtils.waitFor(
+ () -> scmDnHBDispatcherLog.getOutput().contains(expectedLog), 100,
+ 30000);
+ ExitUtil.disableSystemExit();
+ // As part of processing response for re-register, DN
EndpointStateMachine
+ // goes to GET-VERSION state which checks if there is already existing
+ // version file on the DN & if the clusterID matches with that of the SCM
+ // In this case, it won't match and gets
InconsistentStorageStateException
+ // and DN shuts down.
+ String expectedLog2 = "Received SCM notification to register."
+ + " Interrupt HEARTBEAT and transit to GETVERSION state.";
+ GenericTestUtils.waitFor(
+ () -> heartbeatEndpointTaskLog.getOutput().contains(expectedLog2),
+ 100, 5000);
+ GenericTestUtils.waitFor(() -> dsm.getContext().getShutdownOnError(),
100,
+ 5000);
+ assertEquals(DatanodeStateMachine.DatanodeStates.SHUTDOWN,
dsm.getContext().getState());
- // DN Endpoint State has already gone through GetVersion and Register,
- // so it will be in HEARTBEAT state.
- for (EndpointStateMachine endpoint : dsm.getConnectionManager()
- .getValues()) {
- assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT,
- endpoint.getState());
- }
- GenericTestUtils.LogCapturer scmDnHBDispatcherLog =
- GenericTestUtils.LogCapturer.captureLogs(
- SCMDatanodeHeartbeatDispatcher.LOG);
- LogManager.getLogger(HeartbeatEndpointTask.class).setLevel(Level.DEBUG);
- GenericTestUtils.LogCapturer heartbeatEndpointTaskLog =
- GenericTestUtils.LogCapturer.captureLogs(HeartbeatEndpointTask.LOG);
- GenericTestUtils.LogCapturer versionEndPointTaskLog =
- GenericTestUtils.LogCapturer.captureLogs(VersionEndpointTask.LOG);
- // Initially empty
- assertThat(scmDnHBDispatcherLog.getOutput()).isEmpty();
- assertThat(versionEndPointTaskLog.getOutput()).isEmpty();
- // start the new SCM
- try {
- scm.start();
- // Initially DatanodeStateMachine will be in Running state
- assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
- dsm.getContext().getState());
- // DN heartbeats to new SCM, SCM doesn't recognize the node, sends the
- // command to DN to re-register. Wait for SCM to send re-register
command
- String expectedLog = String.format(
- "SCM received heartbeat from an unregistered datanode %s. "
- + "Asking datanode to re-register.",
- datanode.getDatanodeDetails());
- GenericTestUtils.waitFor(
- () -> scmDnHBDispatcherLog.getOutput().contains(expectedLog), 100,
- 30000);
- ExitUtil.disableSystemExit();
- // As part of processing response for re-register, DN
EndpointStateMachine
- // goes to GET-VERSION state which checks if there is already existing
- // version file on the DN & if the clusterID matches with that of the
SCM
- // In this case, it won't match and gets
InconsistentStorageStateException
- // and DN shuts down.
- String expectedLog2 = "Received SCM notification to register."
- + " Interrupt HEARTBEAT and transit to GETVERSION state.";
- GenericTestUtils.waitFor(
- () -> heartbeatEndpointTaskLog.getOutput().contains(expectedLog2),
- 100, 5000);
- GenericTestUtils.waitFor(() -> dsm.getContext().getShutdownOnError(),
100,
- 5000);
- assertEquals(DatanodeStateMachine.DatanodeStates.SHUTDOWN,
- dsm.getContext().getState());
- assertThat(versionEndPointTaskLog.getOutput()).contains(
- "org.apache.hadoop.ozone.common" +
- ".InconsistentStorageStateException: Mismatched ClusterIDs");
- } finally {
- scm.stop();
- }
+ assertThat(versionEndPointTaskLog.getOutput()).contains(
+ "org.apache.hadoop.ozone.common" +
+ ".InconsistentStorageStateException: Mismatched ClusterIDs");
+ } finally {
+ scm.stop();
}
}
@@ -481,7 +449,7 @@ public class TestStorageContainerManager {
}
createDeleteTXLog(cluster.getStorageContainerManager(),
- delLog, keyLocations, cluster, conf);
+ delLog, keyLocations, cluster);
// Verify a few TX gets created in the TX log.
assertThat(delLog.getNumOfValidTransactions()).isGreaterThan(0);
@@ -508,7 +476,7 @@ public class TestStorageContainerManager {
private Map<Long, List<Long>> createDeleteTXLog(
StorageContainerManager scm,
DeletedBlockLog delLog,
- Map<String, OmKeyInfo> keyLocations, MiniOzoneCluster cluster,
OzoneConfiguration conf)
+ Map<String, OmKeyInfo> keyLocations, MiniOzoneCluster cluster)
throws IOException, TimeoutException {
// These keys will be written into a bunch of containers,
// gets a set of container names, verify container containerBlocks
@@ -527,7 +495,7 @@ public class TestStorageContainerManager {
}
assertThat(totalCreatedBlocks).isGreaterThan(0);
assertEquals(totalCreatedBlocks,
- getAllBlocks(cluster, conf, containerNames).size());
+ getAllBlocks(cluster, containerNames).size());
// Create a deletion TX for each key.
Map<Long, List<Long>> containerBlocks = Maps.newHashMap();
@@ -571,25 +539,15 @@ public class TestStorageContainerManager {
validateRatisGroupExists(conf, clusterId.toString());
}
- @Test
- public void testSCMReinitialization(@TempDir Path tempDir) throws Exception {
- OzoneConfiguration conf = new OzoneConfiguration();
- Path scmPath = tempDir.resolve("scm-meta");
- conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
- //This will set the cluster id in the version file
-
+ private void testSCMReinitialization(MiniOzoneCluster cluster) throws
Exception {
+ cluster.getStorageContainerManager().stop();
- try (MiniOzoneCluster cluster =
- MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build()) {
- cluster.waitForClusterToBeReady();
- cluster.getStorageContainerManager().stop();
- final UUID clusterId = UUID.randomUUID();
- // This will initialize SCM
- StorageContainerManager.scmInit(conf, clusterId.toString());
- SCMStorageConfig scmStore = new SCMStorageConfig(conf);
- assertNotEquals(clusterId.toString(), scmStore.getClusterID());
- assertTrue(scmStore.isSCMHAEnabled());
- }
+ final UUID clusterId = UUID.randomUUID();
+ // This will initialize SCM
+ StorageContainerManager.scmInit(cluster.getConf(), clusterId.toString());
+ SCMStorageConfig scmStore = new SCMStorageConfig(cluster.getConf());
+ assertNotEquals(clusterId.toString(), scmStore.getClusterID());
+ assertTrue(scmStore.isSCMHAEnabled());
}
@VisibleForTesting
@@ -678,10 +636,22 @@ public class TestStorageContainerManager {
/**
* Test datanode heartbeat well processed with a 4-layer network topology.
*/
- @Test
- public void testScmProcessDatanodeHeartbeat() throws Exception {
+ private void testScmProcessDatanodeHeartbeat(MiniOzoneCluster cluster) {
+ NodeManager nodeManager =
cluster.getStorageContainerManager().getScmNodeManager();
+ List<DatanodeDetails> allNodes = nodeManager.getAllNodes();
+ assertEquals(cluster.getHddsDatanodes().size(), allNodes.size());
+
+ for (DatanodeDetails node : allNodes) {
+ DatanodeInfo datanodeInfo = assertInstanceOf(DatanodeInfo.class,
nodeManager.getNodeByUuid(node.getUuid()));
+ assertNotNull(datanodeInfo);
+ assertThat(datanodeInfo.getLastHeartbeatTime()).isPositive();
+ assertEquals(datanodeInfo.getUuidString(),
datanodeInfo.getNetworkName());
+ assertEquals("/rack1", datanodeInfo.getNetworkLocation());
+ }
+ }
+
+ private static void configureTopology(OzoneConfiguration conf) throws
UnknownHostException {
String rackName = "/rack1";
- OzoneConfiguration conf = new OzoneConfiguration();
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
StaticMapping.class, DNSToSwitchMapping.class);
StaticMapping.addNodeToRack(NetUtils.normalizeHostName(HddsUtils.getHostName(conf)),
@@ -689,34 +659,6 @@ public class TestStorageContainerManager {
// In case of JDK17, the IP address is resolved to localhost mapped to
127.0.0.1 which is not in sync with JDK8
// and hence need to make following entry under HDDS-10132
StaticMapping.addNodeToRack(LOCALHOST_IP, rackName);
-
- final int datanodeNum = 3;
-
- try (MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(datanodeNum)
- .build()) {
- cluster.waitForClusterToBeReady();
- StorageContainerManager scm = cluster.getStorageContainerManager();
- // first sleep 10s
- Thread.sleep(10000);
- // verify datanode heartbeats are well processed
- long heartbeatCheckerIntervalMs = cluster.getConf()
- .getTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1000,
- TimeUnit.MILLISECONDS);
- long start = Time.monotonicNow();
- Thread.sleep(heartbeatCheckerIntervalMs * 2);
-
- List<DatanodeDetails> allNodes = scm.getScmNodeManager().getAllNodes();
- assertEquals(datanodeNum, allNodes.size());
- for (DatanodeDetails node : allNodes) {
- DatanodeInfo datanodeInfo = (DatanodeInfo) scm.getScmNodeManager()
- .getNodeByUuid(node.getUuidString());
- assertThat(datanodeInfo.getLastHeartbeatTime()).isGreaterThan(start);
- assertEquals(datanodeInfo.getUuidString(),
- datanodeInfo.getNetworkName());
- assertEquals("/rack1", datanodeInfo.getNetworkLocation());
- }
- }
}
@Test
@@ -951,44 +893,18 @@ public class TestStorageContainerManager {
}
}
- private static class CloseContainerCommandMatcher
- implements ArgumentMatcher<CommandForDatanode> {
-
- private final CommandForDatanode cmd;
- private final UUID uuid;
-
- CloseContainerCommandMatcher(UUID uuid, CommandForDatanode cmd) {
- this.uuid = uuid;
- this.cmd = cmd;
- }
-
- @Override
- public boolean matches(CommandForDatanode cmdRight) {
- CloseContainerCommand left = (CloseContainerCommand) cmd.getCommand();
- CloseContainerCommand right =
- (CloseContainerCommand) cmdRight.getCommand();
- return cmdRight.getDatanodeId().equals(uuid)
- && left.getContainerID() == right.getContainerID()
- && left.getPipelineID().equals(right.getPipelineID())
- && left.getType() == right.getType()
- && left.getProto().equals(right.getProto());
- }
- }
-
- public List<Long> getAllBlocks(MiniOzoneCluster cluster, OzoneConfiguration
conf, Set<Long> containerIDs)
- throws IOException {
+ public List<Long> getAllBlocks(MiniOzoneCluster cluster, Set<Long>
containerIDs) throws IOException {
List<Long> allBlocks = Lists.newArrayList();
for (Long containerID : containerIDs) {
- allBlocks.addAll(getAllBlocks(cluster, conf, containerID));
+ allBlocks.addAll(getAllBlocks(cluster, containerID));
}
return allBlocks;
}
- public List<Long> getAllBlocks(MiniOzoneCluster cluster,
- OzoneConfiguration conf, Long containerID) throws IOException {
+ public List<Long> getAllBlocks(MiniOzoneCluster cluster, Long containerID)
throws IOException {
List<Long> allBlocks = Lists.newArrayList();
KeyValueContainerData cData = getContainerMetadata(cluster, containerID);
- try (DBHandle db = BlockUtils.getDB(cData, conf)) {
+ try (DBHandle db = BlockUtils.getDB(cData, cluster.getConf())) {
List<? extends Table.KeyValue<String, BlockData>> kvs =
db.getStore().getBlockDataTable()
@@ -1003,12 +919,12 @@ public class TestStorageContainerManager {
return allBlocks;
}
- public boolean verifyBlocksWithTxnTable(MiniOzoneCluster cluster,
OzoneConfiguration conf,
+ public boolean verifyBlocksWithTxnTable(MiniOzoneCluster cluster,
Map<Long, List<Long>> containerBlocks)
throws IOException {
for (Map.Entry<Long, List<Long>> entry : containerBlocks.entrySet()) {
KeyValueContainerData cData = getContainerMetadata(cluster,
entry.getKey());
- try (DBHandle db = BlockUtils.getDB(cData, conf)) {
+ try (DBHandle db = BlockUtils.getDB(cData, cluster.getConf())) {
DatanodeStore ds = db.getStore();
DatanodeStoreSchemaThreeImpl dnStoreImpl =
(DatanodeStoreSchemaThreeImpl) ds;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]