This is an automated email from the ASF dual-hosted git repository.
sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 20a13da5ef HDDS-12615. Failure of any OM task during bootstrapping of
Recon needs to be handled (#8098)
20a13da5ef is described below
commit 20a13da5ef648c92dfd8d82e419d06c72f17fcf8
Author: Devesh Kumar Singh <[email protected]>
AuthorDate: Tue Apr 1 11:28:23 2025 +0530
HDDS-12615. Failure of any OM task during bootstrapping of Recon needs to
be handled (#8098)
---
.../spi/impl/OzoneManagerServiceProviderImpl.java | 66 +++++++++++++---------
.../impl/TestOzoneManagerServiceProviderImpl.java | 19 +++++--
2 files changed, 52 insertions(+), 33 deletions(-)
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
index 8dd33fdb39..ee492e2916 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
@@ -281,9 +281,11 @@ public void start() {
String taskName = entry.getKey();
ReconTaskStatusUpdater taskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
- return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) &&
// Condition 1
- !taskStatusUpdater.getLastUpdatedSeqNumber()
- .equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber());
// Condition 2
+ return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name())
// Condition 1
+ && !taskName.equals(OmSnapshotTaskName.OmSnapshotRequest.name())
// Condition 2
+ &&
+ taskStatusUpdater.getLastUpdatedSeqNumber().compareTo(
+ deltaTaskStatusUpdater.getLastUpdatedSeqNumber()) < 0; //
Condition 3
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// Collect into desired Map
if (!reconOmTaskMap.isEmpty()) {
@@ -570,7 +572,10 @@ fromSequenceNumber, getCurrentOMDBSequenceNumber(),
numUpdates,
*/
@VisibleForTesting
public boolean syncDataFromOM() {
- ReconTaskStatusUpdater reconTaskUpdater;
+ ReconTaskStatusUpdater fullSnapshotReconTaskUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(
+ OmSnapshotTaskName.OmSnapshotRequest.name());
+ ReconTaskStatusUpdater deltaReconTaskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(
+ OmSnapshotTaskName.OmDeltaRequest.name());
if (isSyncDataFromOMRunning.compareAndSet(false, true)) {
try {
long currentSequenceNumber = getCurrentOMDBSequenceNumber();
@@ -580,11 +585,8 @@ public boolean syncDataFromOM() {
if (currentSequenceNumber <= 0) {
fullSnapshot = true;
} else {
- reconTaskUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(
- OmSnapshotTaskName.OmDeltaRequest.name());
-
// Get updates from OM and apply to local Recon OM DB and update
task status in table
- reconTaskUpdater.recordRunStart();
+ deltaReconTaskStatusUpdater.recordRunStart();
int loopCount = 0;
long fromSequenceNumber = currentSequenceNumber;
long diffBetweenOMDbAndReconDBSeqNumber = deltaUpdateLimit + 1;
@@ -606,9 +608,15 @@ public boolean syncDataFromOM() {
}
diffBetweenOMDbAndReconDBSeqNumber =
getAndApplyDeltaUpdatesFromOM(currentSequenceNumber,
omdbUpdatesHandler);
- reconTaskUpdater.setLastTaskRunStatus(0);
-
reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
- reconTaskUpdater.recordRunCompletion();
+ deltaReconTaskStatusUpdater.setLastTaskRunStatus(0);
+ // Keeping last updated sequence number for both full and delta
tasks to be same
+ // because sequence number of DB denotes and points to same OM
DB copy of Recon,
+ // even though two different tasks are updating the DB at
different conditions, but
+ // it tells the sync state with actual OM DB for the same Recon
OM DB copy.
+
deltaReconTaskStatusUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
+
fullSnapshotReconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
+ deltaReconTaskStatusUpdater.recordRunCompletion();
+ fullSnapshotReconTaskUpdater.updateDetails();
// Pass on DB update events to tasks that are listening.
reconTaskController.consumeOMEvents(new OMUpdateEventBatch(
omdbUpdatesHandler.getEvents(),
omdbUpdatesHandler.getLatestSequenceNumber()), omMetadataManager);
@@ -619,16 +627,16 @@ public boolean syncDataFromOM() {
LOG.error("OM DB Delta update sync thread was interrupted and
delta sync failed.");
// We are updating the table even if it didn't run i.e. got
interrupted beforehand
// to indicate that a task was supposed to run, but it didn't.
- reconTaskUpdater.setLastTaskRunStatus(-1);
- reconTaskUpdater.recordRunCompletion();
+ deltaReconTaskStatusUpdater.setLastTaskRunStatus(-1);
+ deltaReconTaskStatusUpdater.recordRunCompletion();
Thread.currentThread().interrupt();
// Since thread is interrupted, we do not fall back to snapshot
sync.
// Return with sync failed status.
return false;
} catch (Exception e) {
metrics.incrNumDeltaRequestsFailed();
- reconTaskUpdater.setLastTaskRunStatus(-1);
- reconTaskUpdater.recordRunCompletion();
+ deltaReconTaskStatusUpdater.setLastTaskRunStatus(-1);
+ deltaReconTaskStatusUpdater.recordRunCompletion();
LOG.warn("Unable to get and apply delta updates from OM: {},
falling back to full snapshot",
e.getMessage());
fullSnapshot = true;
@@ -642,8 +650,6 @@ public boolean syncDataFromOM() {
}
if (fullSnapshot) {
- reconTaskUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(
- OmSnapshotTaskName.OmSnapshotRequest.name());
try {
metrics.incrNumSnapshotRequests();
LOG.info("Obtaining full snapshot from Ozone Manager");
@@ -655,13 +661,19 @@ public boolean syncDataFromOM() {
}
// Update local Recon OM DB to new snapshot.
- reconTaskUpdater.recordRunStart();
+ fullSnapshotReconTaskUpdater.recordRunStart();
boolean success = updateReconOmDBWithNewSnapshot();
// Update timestamp of successful delta updates query.
if (success) {
-
reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
- reconTaskUpdater.setLastTaskRunStatus(0);
- reconTaskUpdater.recordRunCompletion();
+ // Keeping last updated sequence number for both full and delta
tasks to be same
+ // because sequence number of DB denotes and points to same OM
DB copy of Recon,
+ // even though two different tasks are updating the DB at
different conditions, but
+ // it tells the sync state with actual OM DB for the same Recon
OM DB copy.
+
fullSnapshotReconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
+
deltaReconTaskStatusUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
+ fullSnapshotReconTaskUpdater.setLastTaskRunStatus(0);
+ fullSnapshotReconTaskUpdater.recordRunCompletion();
+ deltaReconTaskStatusUpdater.updateDetails();
// Reinitialize tasks that are listening.
LOG.info("Calling reprocess on Recon tasks.");
@@ -672,23 +684,23 @@ public boolean syncDataFromOM() {
reconContext.getErrors().remove(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
} else {
metrics.incrNumSnapshotRequestsFailed();
- reconTaskUpdater.setLastTaskRunStatus(-1);
- reconTaskUpdater.recordRunCompletion();
+ fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
+ fullSnapshotReconTaskUpdater.recordRunCompletion();
// Update health status in ReconContext
reconContext.updateHealthStatus(new AtomicBoolean(false));
reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
}
} catch (InterruptedException intEx) {
LOG.error("OM DB Snapshot update sync thread was interrupted.");
- reconTaskUpdater.setLastTaskRunStatus(-1);
- reconTaskUpdater.recordRunCompletion();
+ fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
+ fullSnapshotReconTaskUpdater.recordRunCompletion();
Thread.currentThread().interrupt();
// Mark sync status as failed.
return false;
} catch (Exception e) {
metrics.incrNumSnapshotRequestsFailed();
- reconTaskUpdater.setLastTaskRunStatus(-1);
- reconTaskUpdater.recordRunCompletion();
+ fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
+ fullSnapshotReconTaskUpdater.recordRunCompletion();
LOG.error("Unable to update Recon's metadata with new OM DB. ", e);
// Update health status in ReconContext
reconContext.updateHealthStatus(new AtomicBoolean(false));
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
index 6771f727f8..6eef1d4c1e 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
@@ -50,6 +50,7 @@
import java.net.HttpURLConnection;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
@@ -491,8 +492,10 @@ public void testSyncDataFromOMFullSnapshot(
ozoneManagerServiceProvider.syncDataFromOM();
ArgumentCaptor<String> taskNameCaptor =
ArgumentCaptor.forClass(String.class);
-
verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(taskNameCaptor.capture());
- assertEquals(OmSnapshotRequest.name(), taskNameCaptor.getValue());
+ verify(reconTaskStatusUpdaterManager,
times(2)).getTaskStatusUpdater(taskNameCaptor.capture());
+ List<String> capturedValues = taskNameCaptor.getAllValues();
+ assertTrue(capturedValues.contains(OmSnapshotRequest.name()));
+ assertTrue(capturedValues.contains(OmDeltaRequest.name()));
verify(reconTaskControllerMock, times(1))
.reInitializeTasks(omMetadataManager, null);
assertEquals(1, metrics.getNumSnapshotRequests());
@@ -524,8 +527,10 @@ public void testSyncDataFromOMDeltaUpdates(
ArgumentCaptor<String> captor =
ArgumentCaptor.forClass(String.class);
-
verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(captor.capture());
- assertEquals(OmDeltaRequest.name(), captor.getValue());
+ verify(reconTaskStatusUpdaterManager,
times(2)).getTaskStatusUpdater(captor.capture());
+ List<String> capturedValues = captor.getAllValues();
+ assertTrue(capturedValues.contains(OmSnapshotRequest.name()));
+ assertTrue(capturedValues.contains(OmDeltaRequest.name()));
verify(reconTaskControllerMock, times(1))
.consumeOMEvents(any(OMUpdateEventBatch.class),
@@ -559,8 +564,10 @@ public void testSyncDataFromOMFullSnapshotForSNNFE(
ArgumentCaptor<String> captor =
ArgumentCaptor.forClass(String.class);
-
verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(captor.capture());
- assertEquals(OmSnapshotRequest.name(), captor.getValue());
+ verify(reconTaskStatusUpdaterManager,
times(2)).getTaskStatusUpdater(captor.capture());
+ List<String> capturedValues = captor.getAllValues();
+ assertTrue(capturedValues.contains(OmSnapshotRequest.name()));
+ assertTrue(capturedValues.contains(OmDeltaRequest.name()));
verify(reconTaskControllerMock, times(1))
.reInitializeTasks(omMetadataManager, null);
assertEquals(1, metrics.getNumSnapshotRequests());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]