This is an automated email from the ASF dual-hosted git repository.
ashishkr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 8efc0cd65a HDDS-11633. Delete message body too large, causing SCM to
fail writing raft log (#8354)
8efc0cd65a is described below
commit 8efc0cd65a6318ccb45df76747336fc327b3ad84
Author: Aryan Gupta <[email protected]>
AuthorDate: Wed Apr 30 17:02:54 2025 +0530
HDDS-11633. Delete message body too large, causing SCM to fail writing raft
log (#8354)
---
.../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 34 ++++++++++++++++++----
.../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 19 ++++++++++++
2 files changed, 48 insertions(+), 5 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index bc675d0ff9..f6fd621393 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -36,10 +36,12 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import
org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -61,7 +63,7 @@
import org.slf4j.LoggerFactory;
/**
- * A implement class of {@link DeletedBlockLog}, and it uses
+ * An implement class of {@link DeletedBlockLog}, and it uses
* K/V db to maintain block deletion transactions between scm and datanode.
* This is a very basic implementation, it simply scans the log and
* memorize the position that scanned by last time, and uses this to
@@ -81,7 +83,7 @@ public class DeletedBlockLogImpl
private final Lock lock;
// The access to DeletedBlocksTXTable is protected by
// DeletedBlockLogStateManager.
- private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+ private DeletedBlockLogStateManager deletedBlockLogStateManager;
private final SCMContext scmContext;
private final SequenceIdGenerator sequenceIdGen;
private final ScmBlockDeletingServiceMetrics metrics;
@@ -91,6 +93,7 @@ public class DeletedBlockLogImpl
private static final int LIST_ALL_FAILED_TRANSACTIONS = -1;
private long lastProcessedTransactionId = -1;
+ private final int logAppenderQueueByteLimit;
public DeletedBlockLogImpl(ConfigurationSource conf,
StorageContainerManager scm,
@@ -116,6 +119,16 @@ public DeletedBlockLogImpl(ConfigurationSource conf,
this.transactionStatusManager =
new
SCMDeletedBlockTransactionStatusManager(deletedBlockLogStateManager,
containerManager, metrics, scmCommandTimeoutMs);
+ int limit = (int) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT,
+ ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
+ StorageUnit.BYTES);
+ this.logAppenderQueueByteLimit = (int) (limit * 0.9);
+ }
+
+ @VisibleForTesting
+ void setDeletedBlockLogStateManager(DeletedBlockLogStateManager manager) {
+ this.deletedBlockLogStateManager = manager;
}
@Override
@@ -285,16 +298,27 @@ public void addTransactions(Map<Long, List<Long>>
containerBlocksMap)
lock.lock();
try {
ArrayList<DeletedBlocksTransaction> txsToBeAdded = new ArrayList<>();
+ long currentBatchSizeBytes = 0;
for (Map.Entry< Long, List< Long > > entry :
containerBlocksMap.entrySet()) {
long nextTXID = sequenceIdGen.getNextId(DEL_TXN_ID);
DeletedBlocksTransaction tx = constructNewTransaction(nextTXID,
entry.getKey(), entry.getValue());
txsToBeAdded.add(tx);
+ long txSize = tx.getSerializedSize();
+ currentBatchSizeBytes += txSize;
+
+ if (currentBatchSizeBytes >= logAppenderQueueByteLimit) {
+ deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded);
+ metrics.incrBlockDeletionTransactionCreated(txsToBeAdded.size());
+ txsToBeAdded.clear();
+ currentBatchSizeBytes = 0;
+ }
+ }
+ if (!txsToBeAdded.isEmpty()) {
+ deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded);
+ metrics.incrBlockDeletionTransactionCreated(txsToBeAdded.size());
}
-
- deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded);
- metrics.incrBlockDeletionTransactionCreated(txsToBeAdded.size());
} finally {
lock.unlock();
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index 4725b30ab8..f2289ecb2f 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -21,9 +21,11 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
@@ -48,6 +50,7 @@
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -57,6 +60,7 @@
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
@@ -434,6 +438,21 @@ public void testResetCount() throws Exception {
assertEquals(30 * THREE, blocks.size());
}
+ @Test
+ public void testAddTransactionsIsBatched() throws Exception {
+
conf.setStorageSize(ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT,
1, StorageUnit.KB);
+
+ DeletedBlockLogStateManager mockStateManager =
mock(DeletedBlockLogStateManager.class);
+ DeletedBlockLogImpl log = new DeletedBlockLogImpl(conf, scm,
containerManager, scmHADBTransactionBuffer, metrics);
+
+ log.setDeletedBlockLogStateManager(mockStateManager);
+
+ Map<Long, List<Long>> containerBlocksMap = generateData(100);
+ log.addTransactions(containerBlocksMap);
+
+ verify(mockStateManager, atLeast(2)).addTransactionsToDB(any());
+ }
+
@Test
public void testSCMDelIteratorProgress() throws Exception {
int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]