This is an automated email from the ASF dual-hosted git repository.
sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new f72de13ff53 HDDS-14070. set statemachine ready on election performed
for follower (#9671)
f72de13ff53 is described below
commit f72de13ff53badbdf14cf288c958862aae33c051
Author: Sumit Agrawal <[email protected]>
AuthorDate: Fri Feb 6 10:56:37 2026 -0800
HDDS-14070. set statemachine ready on election performed for follower
(#9671)
---
.../apache/hadoop/hdds/scm/ha/SCMStateMachine.java | 24 +++--
.../hdds/scm/safemode/StateMachineReadyRule.java | 4 +-
.../hdds/scm/safemode/TestSCMSafeModeManager.java | 15 +++
.../hdds/scm/safemode/TestSafeModeSCMHA.java | 110 +++++++++++++++++++++
4 files changed, 141 insertions(+), 12 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index 840b3880269..e157a430033 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -86,7 +86,7 @@ public class SCMStateMachine extends BaseStateMachine {
private List<ManagedSecretKey> installingSecretKeys = null;
private AtomicLong currentLeaderTerm = new AtomicLong(-1L);
- private AtomicBoolean refreshedAfterLeaderReady = new AtomicBoolean();
+ private AtomicBoolean isStateMachineReady = new AtomicBoolean();
public SCMStateMachine(final StorageContainerManager scm,
SCMHADBTransactionBuffer buffer) {
@@ -164,7 +164,7 @@ public CompletableFuture<Message> applyTransaction(
// After previous term transactions are applied, still in safe mode,
// perform refreshAndValidate to update the safemode rule state.
- if (scm.isInSafeMode() && refreshedAfterLeaderReady.get()) {
+ if (scm.isInSafeMode() && isStateMachineReady.get()) {
scm.getScmSafeModeManager().refreshAndValidate();
}
final TermIndex appliedTermIndex = TermIndex.valueOf(trx.getLogEntry());
@@ -285,6 +285,14 @@ public void notifyLeaderChanged(RaftGroupMemberId
groupMemberId,
currentLeaderTerm.set(scm.getScmHAManager().getRatisServer().getDivision()
.getInfo().getCurrentTerm());
+ if (isStateMachineReady.compareAndSet(false, true)) {
+ // refresh and validate safe mode rules if it can exit safe mode
+ // if being leader, all previous term transactions have been applied
+ // if other states, just refresh safe mode rules, and transaction keeps
flushing from leader
+ // and does not depend on pending transactions.
+ scm.getScmSafeModeManager().refreshAndValidate();
+ }
+
if (!groupMemberId.getPeerId().equals(newLeaderId)) {
LOG.info("leader changed, yet current SCM is still follower.");
return;
@@ -355,21 +363,17 @@ public void notifyTermIndexUpdated(long term, long index)
{
}
if (currentLeaderTerm.get() == term) {
- // Means all transactions before this term have been applied.
// This means after a restart, all pending transactions have been
applied.
- // Perform
- // 1. Refresh Safemode rules state.
- // 2. Start DN Rpc server.
- if (!refreshedAfterLeaderReady.get()) {
- refreshedAfterLeaderReady.set(true);
+ if (isStateMachineReady.compareAndSet(false, true)) {
+ // Refresh Safemode rules state if not already done.
scm.getScmSafeModeManager().refreshAndValidate();
}
currentLeaderTerm.set(-1L);
}
}
- public boolean isRefreshedAfterLeaderReady() {
- return refreshedAfterLeaderReady.get();
+ public boolean getIsStateMachineReady() {
+ return isStateMachineReady.get();
}
@Override
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/StateMachineReadyRule.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/StateMachineReadyRule.java
index d5724979f30..8c6762b1087 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/StateMachineReadyRule.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/StateMachineReadyRule.java
@@ -41,7 +41,7 @@ protected TypedEvent<Boolean> getEventType() {
@Override
protected boolean validate() {
if (null != scmStateMachine) {
- return scmStateMachine.isRefreshedAfterLeaderReady();
+ return scmStateMachine.getIsStateMachineReady();
}
// if no HA, always return true.
return true;
@@ -58,7 +58,7 @@ protected void cleanup() {
@Override
public String getStatusText() {
return String.format("Refreshed SCM State Machine after leader ready: %s",
- scmStateMachine != null ?
scmStateMachine.isRefreshedAfterLeaderReady() : "NA");
+ scmStateMachine != null ? scmStateMachine.getIsStateMachineReady() :
"NA");
}
@Override
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index c05c66b3cc1..fdf38a7a67c 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -54,8 +54,11 @@
import
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.hdds.scm.ha.SCMStateMachine;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -67,6 +70,7 @@
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
@@ -177,6 +181,16 @@ public void testSafeModeExitRule() throws Exception {
}
ContainerManager containerManager = mock(ContainerManager.class);
when(containerManager.getContainers(ReplicationType.RATIS)).thenReturn(containers);
+
+ StorageContainerManager mockScmManager =
mock(StorageContainerManager.class);
+ SCMHAManager mockScmhaManager = mock(SCMHAManager.class);
+ when(mockScmManager.getScmHAManager()).thenReturn(mockScmhaManager);
+ SCMRatisServer mockScmRatisServer = mock(SCMRatisServer.class);
+ when(mockScmhaManager.getRatisServer()).thenReturn(mockScmRatisServer);
+ SCMStateMachine mockScmStateMachine = mock(SCMStateMachine.class);
+
when(mockScmRatisServer.getSCMStateMachine()).thenReturn(mockScmStateMachine);
+ when((mockScmStateMachine.getIsStateMachineReady())).thenReturn(true);
+ scmContext = new SCMContext.Builder().setSCM(mockScmManager).build();
scmSafeModeManager = new SCMSafeModeManager(config, null, null,
containerManager,
serviceManager, queue, scmContext);
scmSafeModeManager.start();
@@ -207,6 +221,7 @@ public void testSafeModeExitRule() throws Exception {
testContainerThreshold(containers.subList(75, 100), 1.0);
assertEquals(100, scmSafeModeManager.getSafeModeMetrics()
.getCurrentContainersWithOneReplicaReportedCount().value());
+
scmSafeModeManager.validateSafeModeExitRules(StateMachineReadyRule.class.getSimpleName());
GenericTestUtils.waitFor(() -> !scmSafeModeManager.getInSafeMode(),
100, 1000 * 5);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeSCMHA.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeSCMHA.java
new file mode 100644
index 00000000000..27d0c81923e
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeSCMHA.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.safemode;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.io.IOException;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ha.SCMStateMachine;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests safemode with SCM HA setup.
+ */
+public class TestSafeModeSCMHA {
+ private static final String OM_SERVICE_ID = "om-service-test1";
+ private static final String SCM_SERVICE_ID = "scm-service-test1";
+ private static final int NUM_OF_OMS = 1;
+ private static final int NUM_OF_SCMS = 3;
+
+ private MiniOzoneHAClusterImpl cluster = null;
+
+ @BeforeEach
+ public void init() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ cluster = MiniOzoneCluster.newHABuilder(conf)
+ .setOMServiceId(OM_SERVICE_ID)
+ .setSCMServiceId(SCM_SERVICE_ID).setNumOfOzoneManagers(NUM_OF_OMS)
+ .setNumOfStorageContainerManagers(NUM_OF_SCMS).setNumOfActiveSCMs(3)
+ .build();
+ cluster.waitForClusterToBeReady();
+ }
+
+ @AfterEach
+ public void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testFollowerRestartExitSafeMode() throws Exception {
+ try (OzoneClient client = cluster.newClient()) {
+ createTestData(client);
+ }
+
+ StorageContainerManager followerScm = null;
+ StorageContainerManager leaderScm = null;
+ for (StorageContainerManager scm : cluster.getStorageContainerManagers()) {
+ if (!scm.checkLeader()) {
+ followerScm = scm;
+ } else {
+ leaderScm = scm;
+ }
+ }
+
+ assertNotNull(followerScm);
+ assertNotNull(leaderScm);
+ // wait for sync between leader and follower
+ SCMStateMachine leaderScmStateMachine =
leaderScm.getScmHAManager().getRatisServer().getSCMStateMachine();
+ SCMStateMachine followerScmStateMachine =
followerScm.getScmHAManager().getRatisServer().getSCMStateMachine();
+ GenericTestUtils.waitFor(() ->
leaderScmStateMachine.getLastAppliedTermIndex().getIndex()
+ == followerScmStateMachine.getLastAppliedTermIndex().getIndex(), 1000,
60000);
+
+ // wait for follower to exit safe mode
+ StorageContainerManager newFollowerScm =
cluster.restartStorageContainerManager(followerScm, false);
+ GenericTestUtils.waitFor(() -> !newFollowerScm.isInSafeMode(), 1000,
60000);
+ }
+
+ private void createTestData(OzoneClient client) throws IOException {
+ ObjectStore objectStore = client.getObjectStore();
+ objectStore.createVolume("testvolume");
+ OzoneVolume volume = objectStore.getVolume("testvolume");
+ volume.createBucket("testbucket");
+
+ OzoneBucket bucket = volume.getBucket("testbucket");
+
+ TestDataUtil.createKey(bucket, "testkey123",
+ RatisReplicationConfig.getInstance(THREE), "Hello".getBytes(UTF_8));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]