This is an automated email from the ASF dual-hosted git repository.

ashishkr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 8efc0cd65a HDDS-11633. Delete message body too large, causing SCM to 
fail writing raft log (#8354)
8efc0cd65a is described below

commit 8efc0cd65a6318ccb45df76747336fc327b3ad84
Author: Aryan Gupta <[email protected]>
AuthorDate: Wed Apr 30 17:02:54 2025 +0530

    HDDS-11633. Delete message body too large, causing SCM to fail writing raft 
log (#8354)
---
 .../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 34 ++++++++++++++++++----
 .../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 19 ++++++++++++
 2 files changed, 48 insertions(+), 5 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index bc675d0ff9..f6fd621393 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -36,10 +36,12 @@
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import 
org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -61,7 +63,7 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * A implement class of {@link DeletedBlockLog}, and it uses
+ * An implement class of {@link DeletedBlockLog}, and it uses
  * K/V db to maintain block deletion transactions between scm and datanode.
  * This is a very basic implementation, it simply scans the log and
  * memorize the position that scanned by last time, and uses this to
@@ -81,7 +83,7 @@ public class DeletedBlockLogImpl
   private final Lock lock;
   // The access to DeletedBlocksTXTable is protected by
   // DeletedBlockLogStateManager.
-  private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+  private DeletedBlockLogStateManager deletedBlockLogStateManager;
   private final SCMContext scmContext;
   private final SequenceIdGenerator sequenceIdGen;
   private final ScmBlockDeletingServiceMetrics metrics;
@@ -91,6 +93,7 @@ public class DeletedBlockLogImpl
 
   private static final int LIST_ALL_FAILED_TRANSACTIONS = -1;
   private long lastProcessedTransactionId = -1;
+  private final int logAppenderQueueByteLimit;
 
   public DeletedBlockLogImpl(ConfigurationSource conf,
       StorageContainerManager scm,
@@ -116,6 +119,16 @@ public DeletedBlockLogImpl(ConfigurationSource conf,
     this.transactionStatusManager =
         new 
SCMDeletedBlockTransactionStatusManager(deletedBlockLogStateManager,
             containerManager, metrics, scmCommandTimeoutMs);
+    int limit = (int) conf.getStorageSize(
+        ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT,
+        ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
+        StorageUnit.BYTES);
+    this.logAppenderQueueByteLimit = (int) (limit * 0.9);
+  }
+
+  @VisibleForTesting
+  void setDeletedBlockLogStateManager(DeletedBlockLogStateManager manager) {
+    this.deletedBlockLogStateManager = manager;
   }
 
   @Override
@@ -285,16 +298,27 @@ public void addTransactions(Map<Long, List<Long>> 
containerBlocksMap)
     lock.lock();
     try {
       ArrayList<DeletedBlocksTransaction> txsToBeAdded = new ArrayList<>();
+      long currentBatchSizeBytes = 0;
       for (Map.Entry< Long, List< Long > > entry :
           containerBlocksMap.entrySet()) {
         long nextTXID = sequenceIdGen.getNextId(DEL_TXN_ID);
         DeletedBlocksTransaction tx = constructNewTransaction(nextTXID,
             entry.getKey(), entry.getValue());
         txsToBeAdded.add(tx);
+        long txSize = tx.getSerializedSize();
+        currentBatchSizeBytes += txSize;
+
+        if (currentBatchSizeBytes >= logAppenderQueueByteLimit) {
+          deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded);
+          metrics.incrBlockDeletionTransactionCreated(txsToBeAdded.size());
+          txsToBeAdded.clear();
+          currentBatchSizeBytes = 0;
+        }
+      }
+      if (!txsToBeAdded.isEmpty()) {
+        deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded);
+        metrics.incrBlockDeletionTransactionCreated(txsToBeAdded.size());
       }
-
-      deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded);
-      metrics.incrBlockDeletionTransactionCreated(txsToBeAdded.size());
     } finally {
       lock.unlock();
     }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index 4725b30ab8..f2289ecb2f 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -21,9 +21,11 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
@@ -48,6 +50,7 @@
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -57,6 +60,7 @@
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
@@ -434,6 +438,21 @@ public void testResetCount() throws Exception {
     assertEquals(30 * THREE, blocks.size());
   }
 
+  @Test
+  public void testAddTransactionsIsBatched() throws Exception {
+    
conf.setStorageSize(ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT,
 1, StorageUnit.KB);
+
+    DeletedBlockLogStateManager mockStateManager = 
mock(DeletedBlockLogStateManager.class);
+    DeletedBlockLogImpl log = new DeletedBlockLogImpl(conf, scm, 
containerManager, scmHADBTransactionBuffer, metrics);
+
+    log.setDeletedBlockLogStateManager(mockStateManager);
+
+    Map<Long, List<Long>> containerBlocksMap = generateData(100);
+    log.addTransactions(containerBlocksMap);
+
+    verify(mockStateManager, atLeast(2)).addTransactionsToDB(any());
+  }
+
   @Test
   public void testSCMDelIteratorProgress() throws Exception {
     int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to