This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bb0587d37 HDDS-11712. Process other DeletedBlocksTransaction before 
retrying failed one. (#7532)
8bb0587d37 is described below

commit 8bb0587d37435b32177f20b842063e80a378f296
Author: Ashish Kumar <[email protected]>
AuthorDate: Tue Dec 17 12:31:46 2024 +0530

    HDDS-11712. Process other DeletedBlocksTransaction before retrying failed 
one. (#7532)
---
 .../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 49 ++++++++++++++-
 .../scm/block/DeletedBlockLogStateManagerImpl.java |  3 +-
 .../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 69 ++++++++++++++++++++++
 3 files changed, 119 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 45d6a02493..226489482f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -93,6 +93,7 @@ public class DeletedBlockLogImpl
   private long scmCommandTimeoutMs = Duration.ofSeconds(300).toMillis();
 
   private static final int LIST_ALL_FAILED_TRANSACTIONS = -1;
+  private long lastProcessedTransactionId = -1;
 
   public DeletedBlockLogImpl(ConfigurationSource conf,
       StorageContainerManager scm,
@@ -344,6 +345,34 @@ public class DeletedBlockLogImpl
       try (TableIterator<Long,
           ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
                deletedBlockLogStateManager.getReadOnlyIterator()) {
+        if (lastProcessedTransactionId != -1) {
+          iter.seek(lastProcessedTransactionId);
+          /*
+           * We should start from (lastProcessedTransactionId + 1) transaction.
+           * Now the iterator (iter.next call) is pointing at
+           * lastProcessedTransactionId, read the current value to move
+           * the cursor.
+           */
+          if (iter.hasNext()) {
+            /*
+             * There is a possibility that the lastProcessedTransactionId got
+             * deleted from the table, in that case we have to set
+             * lastProcessedTransactionId to next available transaction in the 
table.
+             *
+             * By doing this there is a chance that we will skip processing 
the new
+             * lastProcessedTransactionId, that should be ok. We can get to it 
in the
+             * next run.
+             */
+            lastProcessedTransactionId = iter.next().getKey();
+          }
+
+          // If we have reached the end, go to beginning.
+          if (!iter.hasNext()) {
+            iter.seekToFirst();
+            lastProcessedTransactionId = -1;
+          }
+        }
+
         // Get the CmdStatus status of the aggregation, so that the current
         // status of the specified transaction can be found faster
         Map<UUID, Map<Long, CmdStatus>> commandStatus =
@@ -352,13 +381,14 @@ public class DeletedBlockLogImpl
                 map(DatanodeDetails::getUuid).collect(Collectors.toSet()));
         ArrayList<Long> txIDs = new ArrayList<>();
         metrics.setNumBlockDeletionTransactionDataNodes(dnList.size());
+        Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = null;
         // Here takes block replica count as the threshold to avoid the case
         // that part of replicas committed the TXN and recorded in the
         // SCMDeletedBlockTransactionStatusManager, while they are counted
         // in the threshold.
         while (iter.hasNext() &&
             transactions.getBlocksDeleted() < blockDeletionLimit) {
-          Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = 
iter.next();
+          keyValue = iter.next();
           DeletedBlocksTransaction txn = keyValue.getValue();
           final ContainerID id = ContainerID.valueOf(txn.getContainerID());
           try {
@@ -386,7 +416,24 @@ public class DeletedBlockLogImpl
             LOG.warn("Container: {} was not found for the transaction: {}.", 
id, txn);
             txIDs.add(txn.getTxID());
           }
+
+          if (lastProcessedTransactionId == keyValue.getKey()) {
+            // We have circled back to the last transaction.
+            break;
+          }
+
+          if (!iter.hasNext() && lastProcessedTransactionId != -1) {
+            /*
+             * We started from in-between and reached end of the table,
+             * now we should go to the start of the table and process
+             * the transactions.
+             */
+            iter.seekToFirst();
+          }
         }
+
+        lastProcessedTransactionId = keyValue != null ? keyValue.getKey() : -1;
+
         if (!txIDs.isEmpty()) {
           deletedBlockLogStateManager.removeTransactionsFromDB(txIDs);
           metrics.incrBlockDeletionTransactionCompleted(txIDs.size());
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java
index 6e6440c324..43809acf4b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java
@@ -133,7 +133,8 @@ public class DeletedBlockLogStateManagerImpl
 
       @Override
       public void seekToFirst() {
-        throw new UnsupportedOperationException("seekToFirst");
+        iter.seekToFirst();
+        findNext();
       }
 
       @Override
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index c8e2f267af..2a012cbe18 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -441,6 +441,75 @@ public class TestDeletedBlockLog {
     assertEquals(30 * THREE, blocks.size());
   }
 
+
+  @Test
+  public void testSCMDelIteratorProgress() throws Exception {
+    int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
+
+    // CASE1: When all transactions are valid and available
+    // Create 8 TXs in the log.
+    int noOfTransactions = 8;
+    addTransactions(generateData(noOfTransactions), true);
+    mockContainerHealthResult(true);
+    List<DeletedBlocksTransaction> blocks;
+
+    List<Long> txIDs = new ArrayList<>();
+    int i = 1;
+    while (i < noOfTransactions) {
+      // In each iteration read two transaction, API returns all the 
transactions in order.
+      // 1st iteration: {1, 2}
+      // 2nd iteration: {3, 4}
+      // 3rd iteration: {5, 6}
+      // 4th iteration: {7, 8}
+      blocks = getTransactions(2 * BLOCKS_PER_TXN * THREE);
+      assertEquals(blocks.get(0).getTxID(), i++);
+      assertEquals(blocks.get(1).getTxID(), i++);
+    }
+
+    // CASE2: When some transactions are not available for delete in the 
current iteration,
+    // either due to max retry reach or some other issue.
+    // New transactions Id is { 9, 10, 11, 12, 13, 14, 15, 16}
+    addTransactions(generateData(noOfTransactions), true);
+    mockContainerHealthResult(true);
+
+    // Mark transaction Id 11 as reached max retry count so that it will be 
ignored
+    // by scm deleting service while fetching transaction for delete
+    int ignoreTransactionId = 11;
+    txIDs.add((long) ignoreTransactionId);
+    for (i = 0; i < maxRetry; i++) {
+      incrementCount(txIDs);
+    }
+    incrementCount(txIDs);
+
+    i = 9;
+    while (true) {
+      // In each iteration read two transaction.
+      // If any transaction which is not available for delete in the current 
iteration,
+      // it will be ignored and will be re-checked again only after complete 
table is read.
+      // 1st iteration: {9, 10}
+      // 2nd iteration: {12, 13} Transaction 11 is ignored here
+      // 3rd iteration: {14, 15} Transaction 11 is available here,
+      // but it will be read only when all db records are read till the end.
+      // 4th iteration: {16, 11} Since iterator reached at the end of table 
after reading transaction 16,
+      // Iterator starts from beginning again, and it returns transaction 11 
as well
+      blocks = getTransactions(2 * BLOCKS_PER_TXN * THREE);
+      if (i == ignoreTransactionId) {
+        i++;
+      }
+      assertEquals(blocks.get(0).getTxID(), i++);
+      if (i == 17) {
+        assertEquals(blocks.get(1).getTxID(), ignoreTransactionId);
+        break;
+      }
+      assertEquals(blocks.get(1).getTxID(), i++);
+
+      if (i == 14) {
+        // Reset transaction 11 so that it will be available in scm key 
deleting service in the subsequent iterations.
+        resetCount(txIDs);
+      }
+    }
+  }
+
   @Test
   public void testCommitTransactions() throws Exception {
     deletedBlockLog.setScmCommandTimeoutMs(Long.MAX_VALUE);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to