This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new d79ea9c47a5 HDDS-13281. Disable Ratis metadata write to Raft Log on OM 
& SCM. (#8637)
d79ea9c47a5 is described below

commit d79ea9c47a5aaf1be8c1388dcfc2af4bd7ff3020
Author: Nandakumar Vadivelu <[email protected]>
AuthorDate: Wed Jul 2 16:29:30 2025 +0530

    HDDS-13281. Disable Ratis metadata write to Raft Log on OM & SCM. (#8637)
---
 .../org/apache/hadoop/hdds/scm/ha/RatisUtil.java   |  6 ++
 .../apache/hadoop/hdds/scm/ha/SCMStateMachine.java |  4 +-
 ...estStorageContainerManagerHAWithAllRunning.java |  2 +-
 .../hadoop/hdds/upgrade/TestScmHAFinalization.java | 87 +++++++++++++---------
 .../hadoop/ozone/shell/TestOzoneTenantShell.java   |  2 +-
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  | 10 ++-
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |  5 ++
 .../ozone/om/request/upgrade/OMPrepareRequest.java | 27 +++----
 8 files changed, 90 insertions(+), 53 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
index a482ae34f23..a921ffe7167 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
@@ -196,6 +196,12 @@ private static int setRaftLogProperties(final 
RaftProperties properties,
             ozoneConf.getInt(ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_PURGE_GAP,
                     ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_PURGE_GAP_DEFAULT));
     Log.setSegmentCacheNumMax(properties, 2);
+
+    // This avoids writing commit metadata to Raft Log, which can be used to 
recover the
+    // commit index even if a majority of servers are dead. We don't need this 
for StorageContainerManager,
+    // disabling this will avoid the additional disk IO.
+    Log.setLogMetadataEnabled(properties, false);
+
     return logAppenderQueueByteLimit;
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index a948b9a7a87..ef9ffc03f26 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -167,7 +167,9 @@ public CompletableFuture<Message> applyTransaction(
       if (scm.isInSafeMode() && refreshedAfterLeaderReady.get()) {
         scm.getScmSafeModeManager().refreshAndValidate();
       }
-      
transactionBuffer.updateLatestTrxInfo(TransactionInfo.valueOf(TermIndex.valueOf(trx.getLogEntry())));
+      final TermIndex appliedTermIndex = TermIndex.valueOf(trx.getLogEntry());
+      
transactionBuffer.updateLatestTrxInfo(TransactionInfo.valueOf(appliedTermIndex));
+      updateLastAppliedTermIndex(appliedTermIndex);
     } catch (Exception ex) {
       applyTransactionFuture.completeExceptionally(ex);
       ExitUtils.terminate(1, ex.getMessage(), ex, StateMachine.LOG);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHAWithAllRunning.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHAWithAllRunning.java
index bfc3d61953d..9a3722c2528 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHAWithAllRunning.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHAWithAllRunning.java
@@ -145,7 +145,7 @@ private void doPutKey() throws Exception {
           .setReplicationConfig(replication)
           .setKeyName(keyName)
           .build();
-      final OmKeyInfo keyInfo = cluster().getOzoneManager().lookupKey(keyArgs);
+      final OmKeyInfo keyInfo = cluster().getOMLeader().lookupKey(keyArgs);
       final List<OmKeyLocationInfo> keyLocationInfos =
           keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
       long index = -1;
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
index b55c55fe6a6..e4960cce160 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
@@ -19,8 +19,6 @@
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -162,13 +160,13 @@ public void testFinalizationWithLeaderChange(
         oldLeaderScm.getSCMNodeId());
     cluster.shutdownStorageContainerManager(oldLeaderScm);
 
+    // Wait for the remaining two SCMs to elect a new leader.
+    cluster.waitForClusterToBeReady();
+
     // While finalization is paused, check its state on the remaining SCMs.
     checkMidFinalizationConditions(haltingPoint,
         cluster.getStorageContainerManagersList());
 
-    // Wait for the remaining two SCMs to elect a new leader.
-    cluster.waitForClusterToBeReady();
-
     // Restart actually creates a new SCM.
     // Since this SCM will be a follower, the implementation of its upgrade
     // finalization executor does not matter for this test.
@@ -324,35 +322,56 @@ private void waitForScmToFinalize(StorageContainerManager 
scm)
   private void checkMidFinalizationConditions(
       UpgradeTestInjectionPoints haltingPoint,
       List<StorageContainerManager> scms) {
-    for (StorageContainerManager scm: scms) {
-      switch (haltingPoint) {
-      case BEFORE_PRE_FINALIZE_UPGRADE:
-        assertFalse(scm.getPipelineManager().isPipelineCreationFrozen());
-        assertEquals(
-            scm.getScmContext().getFinalizationCheckpoint(),
-            FinalizationCheckpoint.FINALIZATION_REQUIRED);
-        break;
-      case AFTER_PRE_FINALIZE_UPGRADE:
-        assertTrue(scm.getPipelineManager().isPipelineCreationFrozen());
-        assertEquals(
-            scm.getScmContext().getFinalizationCheckpoint(),
-            FinalizationCheckpoint.FINALIZATION_STARTED);
-        break;
-      case AFTER_COMPLETE_FINALIZATION:
-        assertFalse(scm.getPipelineManager().isPipelineCreationFrozen());
-        assertEquals(
-            scm.getScmContext().getFinalizationCheckpoint(),
-            FinalizationCheckpoint.MLV_EQUALS_SLV);
-        break;
-      case AFTER_POST_FINALIZE_UPGRADE:
-        assertFalse(scm.getPipelineManager().isPipelineCreationFrozen());
-        assertEquals(
-            scm.getScmContext().getFinalizationCheckpoint(),
-            FinalizationCheckpoint.FINALIZATION_COMPLETE);
-        break;
-      default:
-        fail("Unknown halting point in test: " + haltingPoint);
-      }
+
+    // Ratis only makes sure that the Leader has processed the finalization,
+    // the followers might have this in the Raft Log and not yet processed it.
+    switch (haltingPoint) {
+    case BEFORE_PRE_FINALIZE_UPGRADE:
+      // At least one node (leader) should be in the FINALIZATION_REQUIRED 
stage.
+      assertTrue(scms.stream().anyMatch(scm ->
+          scm.getScmContext().getFinalizationCheckpoint() == 
FinalizationCheckpoint.FINALIZATION_REQUIRED));
+      // Pipeline creation should not be frozen at this point, even on leader.
+      assertTrue(scms.stream().noneMatch(scm ->
+          scm.getPipelineManager().isPipelineCreationFrozen()));
+      break;
+    case AFTER_PRE_FINALIZE_UPGRADE:
+      // At least one node (leader) should be in the FINALIZATION_STARTED 
stage.
+      assertTrue(scms.stream().anyMatch(scm ->
+          scm.getScmContext().getFinalizationCheckpoint() == 
FinalizationCheckpoint.FINALIZATION_STARTED));
+      // Pipeline creation should be frozen on nodes where the finalization 
checkpoint is FINALIZATION_STARTED,
+      // this should include the leader SCM.
+      assertTrue(scms.stream()
+          .filter(scm ->
+              scm.getScmContext().getFinalizationCheckpoint() == 
FinalizationCheckpoint.FINALIZATION_STARTED)
+          .allMatch(scm ->
+              scm.getPipelineManager().isPipelineCreationFrozen()));
+      break;
+    case AFTER_COMPLETE_FINALIZATION:
+      // At least one node (leader) should be in the MLV_EQUALS_SLV stage.
+      assertTrue(scms.stream().anyMatch(scm ->
+          scm.getScmContext().getFinalizationCheckpoint() == 
FinalizationCheckpoint.MLV_EQUALS_SLV));
+      // Pipeline creation should not be frozen on nodes where the 
finalization checkpoint is MLV_EQUALS_SLV,
+      // this should include the leader SCM.
+      assertTrue(scms.stream()
+          .filter(scm ->
+              scm.getScmContext().getFinalizationCheckpoint() == 
FinalizationCheckpoint.MLV_EQUALS_SLV)
+          .noneMatch(scm ->
+              scm.getPipelineManager().isPipelineCreationFrozen()));
+      break;
+    case AFTER_POST_FINALIZE_UPGRADE:
+      // At least one node (leader) should be in the FINALIZATION_COMPLETE 
stage.
+      assertTrue(scms.stream().anyMatch(scm ->
+          scm.getScmContext().getFinalizationCheckpoint() == 
FinalizationCheckpoint.FINALIZATION_COMPLETE));
+      // Pipeline creation should not be frozen on nodes where the 
finalization checkpoint is FINALIZATION_COMPLETE,
+      // this should include the leader SCM.
+      assertTrue(scms.stream()
+          .filter(scm ->
+              scm.getScmContext().getFinalizationCheckpoint() == 
FinalizationCheckpoint.FINALIZATION_COMPLETE)
+          .noneMatch(scm ->
+              scm.getPipelineManager().isPipelineCreationFrozen()));
+      break;
+    default:
+      fail("Unknown halting point in test: " + haltingPoint);
     }
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
index fd7439a3878..2ca65335851 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
@@ -382,7 +382,7 @@ public void testOzoneTenantBasicOperations() throws 
IOException {
     checkOutput(lines.get(lines.size() - 1), "ret=SUCCESS", false);
 
     // Check volume creation
-    OmVolumeArgs volArgs = cluster.getOzoneManager().getVolumeInfo("finance");
+    OmVolumeArgs volArgs = cluster.getOMLeader().getVolumeInfo("finance");
     assertEquals("finance", volArgs.getVolume());
 
     // Creating the tenant with the same name again should fail
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 93070fcbe05..24ae4e03ce1 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -910,7 +910,15 @@ public void snapshotLimitCheck() throws IOException, 
OMException {
   }
 
   public void decrementInFlightSnapshotCount() {
-    inFlightSnapshotCount.decrementAndGet();
+    // TODO this is a work around for the accounting logic of 
`inFlightSnapshotCount`.
+    //    - It incorrectly assumes that LeaderReady means that there are no 
inflight snapshot requests.
+    //    We may consider fixing it by waiting all the pending requests in 
notifyLeaderReady().
+    //    - Also, it seems to have another bug that the PrepareState could 
disallow snapshot requests.
+    //    In such case, `inFlightSnapshotCount` won't be decremented.
+    int result = inFlightSnapshotCount.decrementAndGet();
+    if (result < 0) {
+      resetInFlightSnapshotCount();
+    }
   }
 
   public void resetInFlightSnapshotCount() {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index afa3151a8c3..76ca50b4124 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -722,6 +722,11 @@ private static void setRaftLogProperties(RaftProperties 
properties,
         OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP,
         OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP_DEFAULT));
 
+    // This avoids writing commit metadata to Raft Log, which can be used to 
recover the
+    // commit index even if a majority of servers are dead. We don't need this 
for OzoneManager,
+    // disabling this will avoid the additional disk IO.
+    RaftServerConfigKeys.Log.setLogMetadataEnabled(properties, false);
+
     // Set the number of maximum cached segments
     RaftServerConfigKeys.Log.setSegmentCacheNumMax(properties, 2);
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
index ef5ee2a4ea4..f349e74e541 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
@@ -167,7 +167,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager 
ozoneManager, Execut
    * - the applied index is updated after the transaction is flushed to db.
    * - after a transaction (i) is committed, ratis will append another 
ratis-metadata transaction (i+1).
    *
-   * @return the last Ratis commit index
+   * @return the last Ratis applied index
    */
   private static long waitForLogIndex(long minOMDBFlushIndex,
       OzoneManager om, OzoneManagerStateMachine stateMachine,
@@ -179,20 +179,18 @@ private static long waitForLogIndex(long 
minOMDBFlushIndex,
     boolean omDBFlushed = false;
     boolean ratisStateMachineApplied = false;
 
-    // Wait for Ratis commit index after the specified index to be applied to
-    // Ratis' state machine. This index will not appear in the OM DB until a
+    // Wait for the given Ratis commit index to be applied to Ratis'
+    // state machine. This index will not appear in the OM DB until a
     // snapshot is taken.
     // If we purge logs without waiting for this index, it may not make it to
     // the RocksDB snapshot, and then the log entry is lost on this OM.
-    long minRatisStateMachineIndex = minOMDBFlushIndex + 1; // for the 
ratis-metadata transaction
-    long lastRatisCommitIndex = RaftLog.INVALID_LOG_INDEX;
 
     // Wait OM state machine to apply the given index.
     long lastOMDBFlushIndex = RaftLog.INVALID_LOG_INDEX;
+    long lastRatisAppliedIndex = RaftLog.INVALID_LOG_INDEX;
 
-    LOG.info("{} waiting for index {} to flush to OM DB and index {} to flush" 
+
-            " to Ratis state machine.", om.getOMNodeId(), minOMDBFlushIndex,
-        minRatisStateMachineIndex);
+    LOG.info("{} waiting for index {} to flush to OM DB and flush" +
+            " to Ratis state machine.", om.getOMNodeId(), minOMDBFlushIndex);
     while (!(omDBFlushed && ratisStateMachineApplied) &&
         Time.monotonicNow() < endTime) {
       // Check OM DB.
@@ -202,11 +200,10 @@ private static long waitForLogIndex(long 
minOMDBFlushIndex,
           lastOMDBFlushIndex);
 
       // Check ratis state machine.
-      lastRatisCommitIndex = 
stateMachine.getLastNotifiedTermIndex().getIndex();
-      ratisStateMachineApplied = (lastRatisCommitIndex >=
-          minRatisStateMachineIndex);
+      lastRatisAppliedIndex = 
stateMachine.getLastAppliedTermIndex().getIndex();
+      ratisStateMachineApplied = lastRatisAppliedIndex >= minOMDBFlushIndex;
       LOG.debug("{} Current Ratis state machine transaction index {}.",
-          om.getOMNodeId(), lastRatisCommitIndex);
+          om.getOMNodeId(), lastRatisAppliedIndex);
 
       if (!(omDBFlushed && ratisStateMachineApplied)) {
         Thread.sleep(flushCheckInterval.toMillis());
@@ -225,10 +222,10 @@ private static long waitForLogIndex(long 
minOMDBFlushIndex,
       throw new IOException(String.format("After waiting for %d seconds, " +
               "Ratis state machine applied index %d which is less than" +
               " the minimum required index %d.",
-          flushTimeout.getSeconds(), lastRatisCommitIndex,
-          minRatisStateMachineIndex));
+          flushTimeout.getSeconds(), lastRatisAppliedIndex,
+          minOMDBFlushIndex));
     }
-    return lastRatisCommitIndex;
+    return lastRatisAppliedIndex;
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to