This is an automated email from the ASF dual-hosted git repository.

agupta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 64a49585542 HDDS-13410. Control block deletion for each DN from SCM. 
(#8767)
64a49585542 is described below

commit 64a495855428874590f4bebf1455e8bd106881ee
Author: Ashish Kumar <[email protected]>
AuthorDate: Mon Jul 21 11:10:24 2025 +0530

    HDDS-13410. Control block deletion for each DN from SCM. (#8767)
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |  5 +++
 .../common/src/main/resources/ozone-default.xml    | 10 ++++++
 .../block/DatanodeDeletedBlockTransactions.java    |  8 +++++
 .../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 31 ++++++++++++++++--
 .../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 37 ++++++++++++++++++++++
 5 files changed, 89 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 44229338ec6..30e2f5e73d4 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -456,6 +456,11 @@ public final class ScmConfigKeys {
       "ozone.scm.block.deletion.max.retry";
   public static final int OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT = 4096;
 
+  public static final String 
OZONE_SCM_BLOCK_DELETION_PER_DN_DISTRIBUTION_FACTOR =
+      "ozone.scm.block.deletion.per.dn.distribution.factor";
+
+  public static final int 
OZONE_SCM_BLOCK_DELETION_PER_DN_DISTRIBUTION_FACTOR_DEFAULT = 8;
+
   public static final String OZONE_SCM_SEQUENCE_ID_BATCH_SIZE =
       "ozone.scm.sequence.id.batch.size";
   public static final int OZONE_SCM_SEQUENCE_ID_BATCH_SIZE_DEFAULT = 1000;
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 16387561c2d..816d20296d8 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -778,6 +778,16 @@
       times SCM is going to retry sending a deletion operation to the data 
node.
     </description>
   </property>
+  <property>
+    <name>ozone.scm.block.deletion.per.dn.distribution.factor</name>
+    <value>8</value>
+    <tag>OZONE, SCM</tag>
+    <description>
+      Factor with which number of delete blocks sent to each datanode in every 
interval.
+      If total number of DNs are 100 and 
hdds.scm.block.deletion.per-interval.max is 500000
+      Then maximum 500000/(100/8) = 40000 blocks will be sent to each DN in 
every interval.
+    </description>
+  </property>
   <property>
     <name>ozone.scm.block.size</name>
     <value>256MB</value>
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
index 989971492a8..06a003f1866 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
@@ -68,6 +68,14 @@ List<String> getTransactionIDList(DatanodeID dnId) {
         .collect(Collectors.toList());
   }
 
+  public int getNumberOfBlocksForDatanode(DatanodeID dnId) {
+    return Optional.ofNullable(transactions.get(dnId))
+        .orElse(new LinkedList<>())
+        .stream()
+        .mapToInt(DeletedBlocksTransaction::getLocalIDCount)
+        .sum();
+  }
+
   boolean isEmpty() {
     return transactions.isEmpty();
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 3d4509b9d87..48ffadfe76d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -19,6 +19,8 @@
 
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_PER_DN_DISTRIBUTION_FACTOR;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_PER_DN_DISTRIBUTION_FACTOR_DEFAULT;
 import static 
org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
 import static org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator.DEL_TXN_ID;
 
@@ -93,6 +95,7 @@ public class DeletedBlockLogImpl
   private static final int LIST_ALL_FAILED_TRANSACTIONS = -1;
   private long lastProcessedTransactionId = -1;
   private final int logAppenderQueueByteLimit;
+  private int deletionFactorPerDatanode;
 
   public DeletedBlockLogImpl(ConfigurationSource conf,
       StorageContainerManager scm,
@@ -123,6 +126,10 @@ public DeletedBlockLogImpl(ConfigurationSource conf,
         ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
         StorageUnit.BYTES);
     this.logAppenderQueueByteLimit = (int) (limit * 0.9);
+    int deletionFactor = 
conf.getInt(OZONE_SCM_BLOCK_DELETION_PER_DN_DISTRIBUTION_FACTOR,
+        OZONE_SCM_BLOCK_DELETION_PER_DN_DISTRIBUTION_FACTOR_DEFAULT);
+    this.deletionFactorPerDatanode = deletionFactor <= 0 ? 1 : deletionFactor;
+
   }
 
   @VisibleForTesting
@@ -130,6 +137,11 @@ void 
setDeletedBlockLogStateManager(DeletedBlockLogStateManager manager) {
     this.deletedBlockLogStateManager = manager;
   }
 
+  @VisibleForTesting
+  void setDeleteBlocksFactorPerDatanode(int deleteBlocksFactorPerDatanode) {
+    this.deletionFactorPerDatanode = deleteBlocksFactorPerDatanode;
+  }
+
   @Override
   public List<DeletedBlocksTransaction> getFailedTransactions(int count,
       long startTxId) throws IOException {
@@ -328,7 +340,16 @@ public void close() {
   private void getTransaction(DeletedBlocksTransaction tx,
       DatanodeDeletedBlockTransactions transactions,
       Set<ContainerReplica> replicas,
-      Map<DatanodeID, Map<Long, CmdStatus>> commandStatus) {
+      Map<DatanodeID, Map<Long, CmdStatus>> commandStatus,
+      int maxDeleteBlocksPerDatanode) {
+    // Ensure all DNs for this transaction are below their max block limit.
+    if (!replicas.stream().allMatch(replica -> {
+      final DatanodeID datanodeID = replica.getDatanodeDetails().getID();
+      return transactions.getNumberOfBlocksForDatanode(datanodeID) < 
maxDeleteBlocksPerDatanode;
+    })) {
+      return;
+    }
+
     DeletedBlocksTransaction updatedTxn =
         DeletedBlocksTransaction.newBuilder(tx)
             .setCount(transactionStatusManager.getRetryCount(tx.getTxID()))
@@ -430,6 +451,12 @@ public DatanodeDeletedBlockTransactions getTransactions(
         ArrayList<Long> txIDs = new ArrayList<>();
         metrics.setNumBlockDeletionTransactionDataNodes(dnList.size());
         Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = null;
+
+        int factor = dnList.size() / deletionFactorPerDatanode;
+        int maxDeleteBlocksPerDatanode = (factor > 0)
+            ? Math.min(blockDeletionLimit, blockDeletionLimit / factor)
+            : blockDeletionLimit;
+
         // Here takes block replica count as the threshold to avoid the case
         // that part of replicas committed the TXN and recorded in the
         // SCMDeletedBlockTransactionStatusManager, while they are counted
@@ -455,7 +482,7 @@ public DatanodeDeletedBlockTransactions getTransactions(
                 metrics.incrSkippedTransaction();
                 continue;
               }
-              getTransaction(txn, transactions, replicas, commandStatus);
+              getTransaction(txn, transactions, replicas, commandStatus, 
maxDeleteBlocksPerDatanode);
             } else if (txn.getCount() >= maxRetry || 
containerManager.getContainer(id).isOpen()) {
               metrics.incrSkippedTransaction();
             }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index 0f15c9aa365..eba8a8eeb18 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -82,6 +82,8 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * Tests for DeletedBlockLog.
@@ -880,6 +882,41 @@ public void testDeletedBlockTransactions() throws 
IOException {
     assertEquals(2, blocks.size());
   }
 
+  @ParameterizedTest
+  @ValueSource(ints = {30, 45})
+  public void testGetTransactionsWithMaxBlocksPerDatanode(int 
maxAllowedBlockNum) throws IOException {
+    int deleteBlocksFactorPerDatanode = 1;
+    
deletedBlockLog.setDeleteBlocksFactorPerDatanode(deleteBlocksFactorPerDatanode);
+    mockContainerHealthResult(true);
+    int txNum = 10;
+    DatanodeDetails dnId1 = dnList.get(0), dnId2 = dnList.get(1);
+
+    // Creates {TXNum} TX in the log.
+    Map<Long, List<Long>> deletedBlocks = generateData(txNum);
+    addTransactions(deletedBlocks, true);
+    List<Long> containerIds = new ArrayList<>(deletedBlocks.keySet());
+    for (int i = 0; i < containerIds.size(); i++) {
+      DatanodeDetails assignedDn = (i % 2 == 0) ? dnId1 : dnId2;
+      mockStandAloneContainerInfo(containerIds.get(i), assignedDn);
+    }
+
+    int blocksPerDataNode = maxAllowedBlockNum / (dnList.size() / 
deleteBlocksFactorPerDatanode);
+    DatanodeDeletedBlockTransactions transactions =
+        deletedBlockLog.getTransactions(maxAllowedBlockNum, new 
HashSet<>(dnList));
+
+    Map<DatanodeID, Integer> datanodeBlockCountMap =  
transactions.getDatanodeTransactionMap()
+        .entrySet()
+        .stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue()
+                .stream()
+                .mapToInt(tx -> tx.getLocalIDList().size())
+                .sum()
+        ));
+    // Transactions should have blocksPerDataNode for both DNs
+    assertEquals(datanodeBlockCountMap.get(dnId1.getID()), blocksPerDataNode);
+    assertEquals(datanodeBlockCountMap.get(dnId2.getID()), blocksPerDataNode);
+  }
+
   @Test
   public void testDeletedBlockTransactionsOfDeletedContainer() throws 
IOException {
     int txNum = 10;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to