This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new d79ea9c47a5 HDDS-13281. Disable Ratis metadata write to Raft Log on OM
& SCM. (#8637)
d79ea9c47a5 is described below
commit d79ea9c47a5aaf1be8c1388dcfc2af4bd7ff3020
Author: Nandakumar Vadivelu <[email protected]>
AuthorDate: Wed Jul 2 16:29:30 2025 +0530
HDDS-13281. Disable Ratis metadata write to Raft Log on OM & SCM. (#8637)
---
.../org/apache/hadoop/hdds/scm/ha/RatisUtil.java | 6 ++
.../apache/hadoop/hdds/scm/ha/SCMStateMachine.java | 4 +-
...estStorageContainerManagerHAWithAllRunning.java | 2 +-
.../hadoop/hdds/upgrade/TestScmHAFinalization.java | 87 +++++++++++++---------
.../hadoop/ozone/shell/TestOzoneTenantShell.java | 2 +-
.../apache/hadoop/ozone/om/OmSnapshotManager.java | 10 ++-
.../ozone/om/ratis/OzoneManagerRatisServer.java | 5 ++
.../ozone/om/request/upgrade/OMPrepareRequest.java | 27 +++----
8 files changed, 90 insertions(+), 53 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
index a482ae34f23..a921ffe7167 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
@@ -196,6 +196,12 @@ private static int setRaftLogProperties(final
RaftProperties properties,
ozoneConf.getInt(ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_PURGE_GAP,
ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_PURGE_GAP_DEFAULT));
Log.setSegmentCacheNumMax(properties, 2);
+
+ // This avoids writing commit metadata to Raft Log, which can be used to
recover the
+ // commit index even if a majority of servers are dead. We don't need this
for StorageContainerManager,
+ // disabling this will avoid the additional disk IO.
+ Log.setLogMetadataEnabled(properties, false);
+
return logAppenderQueueByteLimit;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index a948b9a7a87..ef9ffc03f26 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -167,7 +167,9 @@ public CompletableFuture<Message> applyTransaction(
if (scm.isInSafeMode() && refreshedAfterLeaderReady.get()) {
scm.getScmSafeModeManager().refreshAndValidate();
}
-
transactionBuffer.updateLatestTrxInfo(TransactionInfo.valueOf(TermIndex.valueOf(trx.getLogEntry())));
+ final TermIndex appliedTermIndex = TermIndex.valueOf(trx.getLogEntry());
+
transactionBuffer.updateLatestTrxInfo(TransactionInfo.valueOf(appliedTermIndex));
+ updateLastAppliedTermIndex(appliedTermIndex);
} catch (Exception ex) {
applyTransactionFuture.completeExceptionally(ex);
ExitUtils.terminate(1, ex.getMessage(), ex, StateMachine.LOG);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHAWithAllRunning.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHAWithAllRunning.java
index bfc3d61953d..9a3722c2528 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHAWithAllRunning.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHAWithAllRunning.java
@@ -145,7 +145,7 @@ private void doPutKey() throws Exception {
.setReplicationConfig(replication)
.setKeyName(keyName)
.build();
- final OmKeyInfo keyInfo = cluster().getOzoneManager().lookupKey(keyArgs);
+ final OmKeyInfo keyInfo = cluster().getOMLeader().lookupKey(keyArgs);
final List<OmKeyLocationInfo> keyLocationInfos =
keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
long index = -1;
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
index b55c55fe6a6..e4960cce160 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
@@ -19,8 +19,6 @@
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -162,13 +160,13 @@ public void testFinalizationWithLeaderChange(
oldLeaderScm.getSCMNodeId());
cluster.shutdownStorageContainerManager(oldLeaderScm);
+ // Wait for the remaining two SCMs to elect a new leader.
+ cluster.waitForClusterToBeReady();
+
// While finalization is paused, check its state on the remaining SCMs.
checkMidFinalizationConditions(haltingPoint,
cluster.getStorageContainerManagersList());
- // Wait for the remaining two SCMs to elect a new leader.
- cluster.waitForClusterToBeReady();
-
// Restart actually creates a new SCM.
// Since this SCM will be a follower, the implementation of its upgrade
// finalization executor does not matter for this test.
@@ -324,35 +322,56 @@ private void waitForScmToFinalize(StorageContainerManager
scm)
private void checkMidFinalizationConditions(
UpgradeTestInjectionPoints haltingPoint,
List<StorageContainerManager> scms) {
- for (StorageContainerManager scm: scms) {
- switch (haltingPoint) {
- case BEFORE_PRE_FINALIZE_UPGRADE:
- assertFalse(scm.getPipelineManager().isPipelineCreationFrozen());
- assertEquals(
- scm.getScmContext().getFinalizationCheckpoint(),
- FinalizationCheckpoint.FINALIZATION_REQUIRED);
- break;
- case AFTER_PRE_FINALIZE_UPGRADE:
- assertTrue(scm.getPipelineManager().isPipelineCreationFrozen());
- assertEquals(
- scm.getScmContext().getFinalizationCheckpoint(),
- FinalizationCheckpoint.FINALIZATION_STARTED);
- break;
- case AFTER_COMPLETE_FINALIZATION:
- assertFalse(scm.getPipelineManager().isPipelineCreationFrozen());
- assertEquals(
- scm.getScmContext().getFinalizationCheckpoint(),
- FinalizationCheckpoint.MLV_EQUALS_SLV);
- break;
- case AFTER_POST_FINALIZE_UPGRADE:
- assertFalse(scm.getPipelineManager().isPipelineCreationFrozen());
- assertEquals(
- scm.getScmContext().getFinalizationCheckpoint(),
- FinalizationCheckpoint.FINALIZATION_COMPLETE);
- break;
- default:
- fail("Unknown halting point in test: " + haltingPoint);
- }
+
+ // Ratis only makes sure that the Leader has processed the finalization,
+ // the followers might have this in the Raft Log and not yet processed it.
+ switch (haltingPoint) {
+ case BEFORE_PRE_FINALIZE_UPGRADE:
+ // At least one node (leader) should be in the FINALIZATION_REQUIRED
stage.
+ assertTrue(scms.stream().anyMatch(scm ->
+ scm.getScmContext().getFinalizationCheckpoint() ==
FinalizationCheckpoint.FINALIZATION_REQUIRED));
+ // Pipeline creation should not be frozen at this point, even on leader.
+ assertTrue(scms.stream().noneMatch(scm ->
+ scm.getPipelineManager().isPipelineCreationFrozen()));
+ break;
+ case AFTER_PRE_FINALIZE_UPGRADE:
+ // At least one node (leader) should be in the FINALIZATION_STARTED
stage.
+ assertTrue(scms.stream().anyMatch(scm ->
+ scm.getScmContext().getFinalizationCheckpoint() ==
FinalizationCheckpoint.FINALIZATION_STARTED));
+ // Pipeline creation should be frozen on nodes where the finalization
checkpoint is FINALIZATION_STARTED,
+ // this should include the leader SCM.
+ assertTrue(scms.stream()
+ .filter(scm ->
+ scm.getScmContext().getFinalizationCheckpoint() ==
FinalizationCheckpoint.FINALIZATION_STARTED)
+ .allMatch(scm ->
+ scm.getPipelineManager().isPipelineCreationFrozen()));
+ break;
+ case AFTER_COMPLETE_FINALIZATION:
+ // At least one node (leader) should be in the MLV_EQUALS_SLV stage.
+ assertTrue(scms.stream().anyMatch(scm ->
+ scm.getScmContext().getFinalizationCheckpoint() ==
FinalizationCheckpoint.MLV_EQUALS_SLV));
+ // Pipeline creation should not be frozen on nodes where the
finalization checkpoint is MLV_EQUALS_SLV,
+ // this should include the leader SCM.
+ assertTrue(scms.stream()
+ .filter(scm ->
+ scm.getScmContext().getFinalizationCheckpoint() ==
FinalizationCheckpoint.MLV_EQUALS_SLV)
+ .noneMatch(scm ->
+ scm.getPipelineManager().isPipelineCreationFrozen()));
+ break;
+ case AFTER_POST_FINALIZE_UPGRADE:
+ // At least one node (leader) should be in the FINALIZATION_COMPLETE
stage.
+ assertTrue(scms.stream().anyMatch(scm ->
+ scm.getScmContext().getFinalizationCheckpoint() ==
FinalizationCheckpoint.FINALIZATION_COMPLETE));
+ // Pipeline creation should not be frozen on nodes where the
finalization checkpoint is FINALIZATION_COMPLETE,
+ // this should include the leader SCM.
+ assertTrue(scms.stream()
+ .filter(scm ->
+ scm.getScmContext().getFinalizationCheckpoint() ==
FinalizationCheckpoint.FINALIZATION_COMPLETE)
+ .noneMatch(scm ->
+ scm.getPipelineManager().isPipelineCreationFrozen()));
+ break;
+ default:
+ fail("Unknown halting point in test: " + haltingPoint);
}
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
index fd7439a3878..2ca65335851 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
@@ -382,7 +382,7 @@ public void testOzoneTenantBasicOperations() throws
IOException {
checkOutput(lines.get(lines.size() - 1), "ret=SUCCESS", false);
// Check volume creation
- OmVolumeArgs volArgs = cluster.getOzoneManager().getVolumeInfo("finance");
+ OmVolumeArgs volArgs = cluster.getOMLeader().getVolumeInfo("finance");
assertEquals("finance", volArgs.getVolume());
// Creating the tenant with the same name again should fail
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 93070fcbe05..24ae4e03ce1 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -910,7 +910,15 @@ public void snapshotLimitCheck() throws IOException,
OMException {
}
public void decrementInFlightSnapshotCount() {
- inFlightSnapshotCount.decrementAndGet();
+ // TODO this is a work around for the accounting logic of
`inFlightSnapshotCount`.
+ // - It incorrectly assumes that LeaderReady means that there are no
inflight snapshot requests.
+ // We may consider fixing it by waiting all the pending requests in
notifyLeaderReady().
+ // - Also, it seems to have another bug that the PrepareState could
disallow snapshot requests.
+ // In such case, `inFlightSnapshotCount` won't be decremented.
+ int result = inFlightSnapshotCount.decrementAndGet();
+ if (result < 0) {
+ resetInFlightSnapshotCount();
+ }
}
public void resetInFlightSnapshotCount() {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index afa3151a8c3..76ca50b4124 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -722,6 +722,11 @@ private static void setRaftLogProperties(RaftProperties
properties,
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP,
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP_DEFAULT));
+ // This avoids writing commit metadata to Raft Log, which can be used to
recover the
+ // commit index even if a majority of servers are dead. We don't need this
for OzoneManager,
+ // disabling this will avoid the additional disk IO.
+ RaftServerConfigKeys.Log.setLogMetadataEnabled(properties, false);
+
// Set the number of maximum cached segments
RaftServerConfigKeys.Log.setSegmentCacheNumMax(properties, 2);
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
index ef5ee2a4ea4..f349e74e541 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
@@ -167,7 +167,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager
ozoneManager, Execut
* - the applied index is updated after the transaction is flushed to db.
* - after a transaction (i) is committed, ratis will append another
ratis-metadata transaction (i+1).
*
- * @return the last Ratis commit index
+ * @return the last Ratis applied index
*/
private static long waitForLogIndex(long minOMDBFlushIndex,
OzoneManager om, OzoneManagerStateMachine stateMachine,
@@ -179,20 +179,18 @@ private static long waitForLogIndex(long
minOMDBFlushIndex,
boolean omDBFlushed = false;
boolean ratisStateMachineApplied = false;
- // Wait for Ratis commit index after the specified index to be applied to
- // Ratis' state machine. This index will not appear in the OM DB until a
+ // Wait for the given Ratis commit index to be applied to Ratis'
+ // state machine. This index will not appear in the OM DB until a
// snapshot is taken.
// If we purge logs without waiting for this index, it may not make it to
// the RocksDB snapshot, and then the log entry is lost on this OM.
- long minRatisStateMachineIndex = minOMDBFlushIndex + 1; // for the
ratis-metadata transaction
- long lastRatisCommitIndex = RaftLog.INVALID_LOG_INDEX;
// Wait OM state machine to apply the given index.
long lastOMDBFlushIndex = RaftLog.INVALID_LOG_INDEX;
+ long lastRatisAppliedIndex = RaftLog.INVALID_LOG_INDEX;
- LOG.info("{} waiting for index {} to flush to OM DB and index {} to flush"
+
- " to Ratis state machine.", om.getOMNodeId(), minOMDBFlushIndex,
- minRatisStateMachineIndex);
+ LOG.info("{} waiting for index {} to flush to OM DB and flush" +
+ " to Ratis state machine.", om.getOMNodeId(), minOMDBFlushIndex);
while (!(omDBFlushed && ratisStateMachineApplied) &&
Time.monotonicNow() < endTime) {
// Check OM DB.
@@ -202,11 +200,10 @@ private static long waitForLogIndex(long
minOMDBFlushIndex,
lastOMDBFlushIndex);
// Check ratis state machine.
- lastRatisCommitIndex =
stateMachine.getLastNotifiedTermIndex().getIndex();
- ratisStateMachineApplied = (lastRatisCommitIndex >=
- minRatisStateMachineIndex);
+ lastRatisAppliedIndex =
stateMachine.getLastAppliedTermIndex().getIndex();
+ ratisStateMachineApplied = lastRatisAppliedIndex >= minOMDBFlushIndex;
LOG.debug("{} Current Ratis state machine transaction index {}.",
- om.getOMNodeId(), lastRatisCommitIndex);
+ om.getOMNodeId(), lastRatisAppliedIndex);
if (!(omDBFlushed && ratisStateMachineApplied)) {
Thread.sleep(flushCheckInterval.toMillis());
@@ -225,10 +222,10 @@ private static long waitForLogIndex(long
minOMDBFlushIndex,
throw new IOException(String.format("After waiting for %d seconds, " +
"Ratis state machine applied index %d which is less than" +
" the minimum required index %d.",
- flushTimeout.getSeconds(), lastRatisCommitIndex,
- minRatisStateMachineIndex));
+ flushTimeout.getSeconds(), lastRatisAppliedIndex,
+ minOMDBFlushIndex));
}
- return lastRatisCommitIndex;
+ return lastRatisAppliedIndex;
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]