This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 1d0c9ba5c4a HDDS-14572. ReInit DeletedBlocksTransactionSummary after
SCM leader transfer (#9720)
1d0c9ba5c4a is described below
commit 1d0c9ba5c4aea1d759303dc733c7add6824c4112
Author: Sammi Chen <[email protected]>
AuthorDate: Mon Feb 9 15:51:17 2026 +0800
HDDS-14572. ReInit DeletedBlocksTransactionSummary after SCM leader
transfer (#9720)
---
.../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 2 +-
.../SCMDeletedBlockTransactionStatusManager.java | 18 ++++--
.../apache/hadoop/hdds/scm/ha/SCMStateMachine.java | 6 ++
.../TestScmDataDistributionFinalization.java | 64 +++++++++++++++++-----
4 files changed, 69 insertions(+), 21 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index d54cd9a243d..99a1a6b4dd0 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -209,7 +209,7 @@ public void reinitialize(
* leader.
*/
public void onBecomeLeader() {
- transactionStatusManager.clear();
+ transactionStatusManager.onBecomeLeader();
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java
index 42df0591bcb..66c9d2070be 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java
@@ -419,11 +419,19 @@ void recordTransactionCreated(DatanodeID dnId, long
scmCmdId, Set<Long> dnTxSet)
.putIfAbsent(txId, new LinkedHashSet<>()));
}
- public void clear() {
+ public void onBecomeLeader() {
transactionToRetryCountMap.clear();
scmDeleteBlocksCommandStatusManager.clear();
transactionToDNsCommitMap.clear();
txSizeMap.clear();
+ try {
+ initDataDistributionData();
+ } catch (IOException e) {
+ LOG.warn("Failed to initialize Storage space distribution data. The
feature will continue with current " +
+ "totalTxCount {}, totalBlockCount {}, totalBlocksSize {} and
totalReplicatedBlocksSize {}. " +
+ "There is a high chance that the real data and current data has
a gap.",
+ totalTxCount.get(), totalBlockCount.get(), totalBlocksSize.get(),
totalReplicatedBlocksSize.get());
+ }
}
public void cleanAllTimeoutSCMCommand(long timeoutMs) {
@@ -672,8 +680,9 @@ private void initDataDistributionData() throws IOException {
totalBlockCount.set(summary.getTotalBlockCount());
totalBlocksSize.set(summary.getTotalBlockSize());
totalReplicatedBlocksSize.set(summary.getTotalBlockReplicatedSize());
- LOG.info("Data distribution is enabled with totalBlockCount {}
totalBlocksSize {}",
- totalBlockCount.get(), totalBlocksSize.get());
+ LOG.info("Storage space distribution is initialized with totalTxCount
{}, totalBlockCount {}, " +
+ "totalBlocksSize {} and totalReplicatedBlocksSize {}",
totalTxCount.get(),
+ totalBlockCount.get(), totalBlocksSize.get(),
totalReplicatedBlocksSize.get());
}
}
@@ -688,8 +697,7 @@ private DeletedBlocksTransactionSummary
loadDeletedBlocksSummary() throws IOExce
}
return DeletedBlocksTransactionSummary.parseFrom(byteString);
} catch (IOException e) {
- LOG.error("Failed to get property {} for service {}. DataDistribution
function will be disabled.",
- propertyName, SERVICE_NAME, e);
+ LOG.error("Failed to get property {} for service {}.", propertyName,
SERVICE_NAME, e);
throw new IOException("Failed to get property " + propertyName, e);
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index e157a430033..d702bb2a5d4 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -304,6 +304,12 @@ public void notifyLeaderChanged(RaftGroupMemberId
groupMemberId,
currentLeaderTerm.get());
scm.getSequenceIdGen().invalidateBatch();
+ try {
+ transactionBuffer.flush();
+ } catch (Exception ex) {
+ ExitUtils.terminate(1, "Failed to flush transactionBuffer", ex,
StateMachine.LOG);
+ }
+
DeletedBlockLog deletedBlockLog = scm.getScmBlockManager()
.getDeletedBlockLog();
Preconditions.checkArgument(
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmDataDistributionFinalization.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmDataDistributionFinalization.java
index efc9523ab9c..f98685f0ff7 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmDataDistributionFinalization.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmDataDistributionFinalization.java
@@ -18,6 +18,11 @@
package org.apache.hadoop.hdds.upgrade;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT;
import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
import static org.apache.hadoop.hdds.client.ReplicationType.RATIS;
@@ -109,12 +114,14 @@ public void init(OzoneConfiguration conf,
configurator.setUpgradeFinalizationExecutor(executor);
conf.setInt(SCMStorageConfig.TESTING_INIT_LAYOUT_VERSION_KEY,
HDDSLayoutFeature.HBASE_SUPPORT.layoutVersion());
- conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
- TimeUnit.MILLISECONDS);
- conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
- TimeUnit.MILLISECONDS);
- conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
- 100, TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 100, TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 100,
MILLISECONDS);
+ conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 100,
TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 100,
TimeUnit.MILLISECONDS);
+
ScmConfig scmConfig = conf.getObject(ScmConfig.class);
scmConfig.setBlockDeletionInterval(Duration.ofMillis(100));
conf.setFromObject(scmConfig);
@@ -123,6 +130,7 @@ public void init(OzoneConfiguration conf,
DatanodeConfiguration dnConf =
conf.getObject(DatanodeConfiguration.class);
dnConf.setBlockDeletionInterval(Duration.ofMillis(100));
+ dnConf.setBlockDeleteCommandWorkerInterval(Duration.ofMillis(100));
conf.setFromObject(dnConf);
MiniOzoneHAClusterImpl.Builder clusterBuilder =
MiniOzoneCluster.newHABuilder(conf);
@@ -344,13 +352,39 @@ public void
testFinalizationNonEmptyClusterDataDistribution() throws Exception {
assertEquals(value.getBytes(UTF_8).length, summary.getTotalBlockSize());
assertEquals(value.getBytes(UTF_8).length * 3,
summary.getTotalBlockReplicatedSize());
+ // transfer SCM leader
+ String newLeaderScmId = null;
+ for (StorageContainerManager scm:
cluster.getStorageContainerManagersList()) {
+ if (scm != activeSCM) {
+ newLeaderScmId = scm.getScmId();
+ break;
+ }
+ }
+
cluster.getStorageContainerLocationClient().transferLeadership(newLeaderScmId);
+ StorageContainerManager newActiveSCM = cluster.getActiveSCM();
+ deletedBlockLog = (DeletedBlockLogImpl)
newActiveSCM.getScmBlockManager().getDeletedBlockLog();
+ SCMDeletedBlockTransactionStatusManager newStatusManager =
+ deletedBlockLog.getSCMDeletedBlockTransactionStatusManager();
+ // new leader SCM should have the right deletion tx summary
+ summary = newStatusManager.getTransactionSummary();
+ assertEquals(1, summary.getTotalTransactionCount());
+ assertEquals(1, summary.getTotalBlockCount());
+ assertEquals(value.getBytes(UTF_8).length, summary.getTotalBlockSize());
+ assertEquals(value.getBytes(UTF_8).length * 3,
summary.getTotalBlockReplicatedSize());
+
+ // flush buffer and start SCMBlockDeletingService
+ for (StorageContainerManager scm:
cluster.getStorageContainerManagersList()) {
+ flushDBTransactionBuffer(scm);
+ scm.getScmBlockManager().getSCMBlockDeletingService().start();
+ }
+
// force close the container so that block can be deleted
- activeSCM.getClientProtocolServer().closeContainer(
+ newActiveSCM.getClientProtocolServer().closeContainer(
keyDetails.getOzoneKeyLocations().get(0).getContainerID());
// wait for container to be closed
GenericTestUtils.waitFor(() -> {
try {
- return activeSCM.getClientProtocolServer().getContainer(
+ return newActiveSCM.getClientProtocolServer().getContainer(
keyDetails.getOzoneKeyLocations().get(0).getContainerID())
.getState() == HddsProtos.LifeCycleState.CLOSED;
} catch (IOException e) {
@@ -359,15 +393,15 @@ public void
testFinalizationNonEmptyClusterDataDistribution() throws Exception {
}
}, 100, 5000);
- // flush buffer and start SCMBlockDeletingService
- for (StorageContainerManager scm:
cluster.getStorageContainerManagersList()) {
- flushDBTransactionBuffer(scm);
- scm.getScmBlockManager().getSCMBlockDeletingService().start();
- }
-
// wait for block deletion transactions to be confirmed by DN
GenericTestUtils.waitFor(
- () -> statusManager.getTransactionSummary().getTotalTransactionCount()
== 0, 100, 30000);
+ () ->
newStatusManager.getTransactionSummary().getTotalTransactionCount() == 0, 100,
30000);
+
+ // transfer leader back to old SCM and verify
+
cluster.getStorageContainerLocationClient().transferLeadership(activeSCM.getScmId());
+ deletedBlockLog = (DeletedBlockLogImpl)
activeSCM.getScmBlockManager().getDeletedBlockLog();
+ summary =
deletedBlockLog.getSCMDeletedBlockTransactionStatusManager().getTransactionSummary();
+ assertEquals(EMPTY_SUMMARY, summary);
}
private Map<Long, List<DeletedBlock>> generateDeletedBlocks(int dataSize,
boolean withSize) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]