This is an automated email from the ASF dual-hosted git repository.

sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 20a13da5ef HDDS-12615. Failure of any OM task during bootstrapping of 
Recon needs to be handled (#8098)
20a13da5ef is described below

commit 20a13da5ef648c92dfd8d82e419d06c72f17fcf8
Author: Devesh Kumar Singh <[email protected]>
AuthorDate: Tue Apr 1 11:28:23 2025 +0530

    HDDS-12615. Failure of any OM task during bootstrapping of Recon needs to 
be handled (#8098)
---
 .../spi/impl/OzoneManagerServiceProviderImpl.java  | 66 +++++++++++++---------
 .../impl/TestOzoneManagerServiceProviderImpl.java  | 19 +++++--
 2 files changed, 52 insertions(+), 33 deletions(-)

diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
index 8dd33fdb39..ee492e2916 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
@@ -281,9 +281,11 @@ public void start() {
           String taskName = entry.getKey();
           ReconTaskStatusUpdater taskStatusUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
 
-          return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) && 
 // Condition 1
-              !taskStatusUpdater.getLastUpdatedSeqNumber()
-                  .equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber());  
// Condition 2
+          return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name())  
// Condition 1
+              && !taskName.equals(OmSnapshotTaskName.OmSnapshotRequest.name()) 
 // Condition 2
+              &&
+              taskStatusUpdater.getLastUpdatedSeqNumber().compareTo(
+                  deltaTaskStatusUpdater.getLastUpdatedSeqNumber()) < 0; // 
Condition 3
         })
         .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));  
// Collect into desired Map
     if (!reconOmTaskMap.isEmpty()) {
@@ -570,7 +572,10 @@ fromSequenceNumber, getCurrentOMDBSequenceNumber(), 
numUpdates,
    */
   @VisibleForTesting
   public boolean syncDataFromOM() {
-    ReconTaskStatusUpdater reconTaskUpdater;
+    ReconTaskStatusUpdater fullSnapshotReconTaskUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(
+        OmSnapshotTaskName.OmSnapshotRequest.name());
+    ReconTaskStatusUpdater deltaReconTaskStatusUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(
+        OmSnapshotTaskName.OmDeltaRequest.name());
     if (isSyncDataFromOMRunning.compareAndSet(false, true)) {
       try {
         long currentSequenceNumber = getCurrentOMDBSequenceNumber();
@@ -580,11 +585,8 @@ public boolean syncDataFromOM() {
         if (currentSequenceNumber <= 0) {
           fullSnapshot = true;
         } else {
-          reconTaskUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(
-              OmSnapshotTaskName.OmDeltaRequest.name());
-
           // Get updates from OM and apply to local Recon OM DB and update 
task status in table
-          reconTaskUpdater.recordRunStart();
+          deltaReconTaskStatusUpdater.recordRunStart();
           int loopCount = 0;
           long fromSequenceNumber = currentSequenceNumber;
           long diffBetweenOMDbAndReconDBSeqNumber = deltaUpdateLimit + 1;
@@ -606,9 +608,15 @@ public boolean syncDataFromOM() {
               }
               diffBetweenOMDbAndReconDBSeqNumber =
                   getAndApplyDeltaUpdatesFromOM(currentSequenceNumber, 
omdbUpdatesHandler);
-              reconTaskUpdater.setLastTaskRunStatus(0);
-              
reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
-              reconTaskUpdater.recordRunCompletion();
+              deltaReconTaskStatusUpdater.setLastTaskRunStatus(0);
+              // Keeping last updated sequence number for both full and delta 
tasks to be same
+              // because sequence number of DB denotes and points to same OM 
DB copy of Recon,
+              // even though two different tasks are updating the DB at 
different conditions, but
+              // it tells the sync state with actual OM DB for the same Recon 
OM DB copy.
+              
deltaReconTaskStatusUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
+              
fullSnapshotReconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
+              deltaReconTaskStatusUpdater.recordRunCompletion();
+              fullSnapshotReconTaskUpdater.updateDetails();
               // Pass on DB update events to tasks that are listening.
               reconTaskController.consumeOMEvents(new OMUpdateEventBatch(
                   omdbUpdatesHandler.getEvents(), 
omdbUpdatesHandler.getLatestSequenceNumber()), omMetadataManager);
@@ -619,16 +627,16 @@ public boolean syncDataFromOM() {
               LOG.error("OM DB Delta update sync thread was interrupted and 
delta sync failed.");
               // We are updating the table even if it didn't run i.e. got 
interrupted beforehand
               // to indicate that a task was supposed to run, but it didn't.
-              reconTaskUpdater.setLastTaskRunStatus(-1);
-              reconTaskUpdater.recordRunCompletion();
+              deltaReconTaskStatusUpdater.setLastTaskRunStatus(-1);
+              deltaReconTaskStatusUpdater.recordRunCompletion();
               Thread.currentThread().interrupt();
               // Since thread is interrupted, we do not fall back to snapshot 
sync.
               // Return with sync failed status.
               return false;
             } catch (Exception e) {
               metrics.incrNumDeltaRequestsFailed();
-              reconTaskUpdater.setLastTaskRunStatus(-1);
-              reconTaskUpdater.recordRunCompletion();
+              deltaReconTaskStatusUpdater.setLastTaskRunStatus(-1);
+              deltaReconTaskStatusUpdater.recordRunCompletion();
               LOG.warn("Unable to get and apply delta updates from OM: {}, 
falling back to full snapshot",
                   e.getMessage());
               fullSnapshot = true;
@@ -642,8 +650,6 @@ public boolean syncDataFromOM() {
         }
 
         if (fullSnapshot) {
-          reconTaskUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(
-              OmSnapshotTaskName.OmSnapshotRequest.name());
           try {
             metrics.incrNumSnapshotRequests();
             LOG.info("Obtaining full snapshot from Ozone Manager");
@@ -655,13 +661,19 @@ public boolean syncDataFromOM() {
             }
 
             // Update local Recon OM DB to new snapshot.
-            reconTaskUpdater.recordRunStart();
+            fullSnapshotReconTaskUpdater.recordRunStart();
             boolean success = updateReconOmDBWithNewSnapshot();
             // Update timestamp of successful delta updates query.
             if (success) {
-              
reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
-              reconTaskUpdater.setLastTaskRunStatus(0);
-              reconTaskUpdater.recordRunCompletion();
+              // Keeping last updated sequence number for both full and delta 
tasks to be same
+              // because sequence number of DB denotes and points to same OM 
DB copy of Recon,
+              // even though two different tasks are updating the DB at 
different conditions, but
+              // it tells the sync state with actual OM DB for the same Recon 
OM DB copy.
+              
fullSnapshotReconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
+              
deltaReconTaskStatusUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
+              fullSnapshotReconTaskUpdater.setLastTaskRunStatus(0);
+              fullSnapshotReconTaskUpdater.recordRunCompletion();
+              deltaReconTaskStatusUpdater.updateDetails();
 
               // Reinitialize tasks that are listening.
               LOG.info("Calling reprocess on Recon tasks.");
@@ -672,23 +684,23 @@ public boolean syncDataFromOM() {
               
reconContext.getErrors().remove(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
             } else {
               metrics.incrNumSnapshotRequestsFailed();
-              reconTaskUpdater.setLastTaskRunStatus(-1);
-              reconTaskUpdater.recordRunCompletion();
+              fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
+              fullSnapshotReconTaskUpdater.recordRunCompletion();
               // Update health status in ReconContext
               reconContext.updateHealthStatus(new AtomicBoolean(false));
               
reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
             }
           } catch (InterruptedException intEx) {
             LOG.error("OM DB Snapshot update sync thread was interrupted.");
-            reconTaskUpdater.setLastTaskRunStatus(-1);
-            reconTaskUpdater.recordRunCompletion();
+            fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
+            fullSnapshotReconTaskUpdater.recordRunCompletion();
             Thread.currentThread().interrupt();
             // Mark sync status as failed.
             return false;
           } catch (Exception e) {
             metrics.incrNumSnapshotRequestsFailed();
-            reconTaskUpdater.setLastTaskRunStatus(-1);
-            reconTaskUpdater.recordRunCompletion();
+            fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
+            fullSnapshotReconTaskUpdater.recordRunCompletion();
             LOG.error("Unable to update Recon's metadata with new OM DB. ", e);
             // Update health status in ReconContext
             reconContext.updateHealthStatus(new AtomicBoolean(false));
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
index 6771f727f8..6eef1d4c1e 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
@@ -50,6 +50,7 @@
 import java.net.HttpURLConnection;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
@@ -491,8 +492,10 @@ public void testSyncDataFromOMFullSnapshot(
     ozoneManagerServiceProvider.syncDataFromOM();
 
     ArgumentCaptor<String> taskNameCaptor = 
ArgumentCaptor.forClass(String.class);
-    
verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(taskNameCaptor.capture());
-    assertEquals(OmSnapshotRequest.name(), taskNameCaptor.getValue());
+    verify(reconTaskStatusUpdaterManager, 
times(2)).getTaskStatusUpdater(taskNameCaptor.capture());
+    List<String> capturedValues = taskNameCaptor.getAllValues();
+    assertTrue(capturedValues.contains(OmSnapshotRequest.name()));
+    assertTrue(capturedValues.contains(OmDeltaRequest.name()));
     verify(reconTaskControllerMock, times(1))
         .reInitializeTasks(omMetadataManager, null);
     assertEquals(1, metrics.getNumSnapshotRequests());
@@ -524,8 +527,10 @@ public void testSyncDataFromOMDeltaUpdates(
 
     ArgumentCaptor<String> captor =
         ArgumentCaptor.forClass(String.class);
-    
verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(captor.capture());
-    assertEquals(OmDeltaRequest.name(), captor.getValue());
+    verify(reconTaskStatusUpdaterManager, 
times(2)).getTaskStatusUpdater(captor.capture());
+    List<String> capturedValues = captor.getAllValues();
+    assertTrue(capturedValues.contains(OmSnapshotRequest.name()));
+    assertTrue(capturedValues.contains(OmDeltaRequest.name()));
 
     verify(reconTaskControllerMock, times(1))
         .consumeOMEvents(any(OMUpdateEventBatch.class),
@@ -559,8 +564,10 @@ public void testSyncDataFromOMFullSnapshotForSNNFE(
 
     ArgumentCaptor<String> captor =
         ArgumentCaptor.forClass(String.class);
-    
verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(captor.capture());
-    assertEquals(OmSnapshotRequest.name(), captor.getValue());
+    verify(reconTaskStatusUpdaterManager, 
times(2)).getTaskStatusUpdater(captor.capture());
+    List<String> capturedValues = captor.getAllValues();
+    assertTrue(capturedValues.contains(OmSnapshotRequest.name()));
+    assertTrue(capturedValues.contains(OmDeltaRequest.name()));
     verify(reconTaskControllerMock, times(1))
         .reInitializeTasks(omMetadataManager, null);
     assertEquals(1, metrics.getNumSnapshotRequests());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to