This is an automated email from the ASF dual-hosted git repository.

sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f60ed925b8e HDDS-8633. Separate Recon OM Synchronization and Tasks 
Processing. (#8777)
f60ed925b8e is described below

commit f60ed925b8e2d8fdd1f8634e33ce38cf889e59cc
Author: Devesh Kumar Singh <[email protected]>
AuthorDate: Wed Aug 20 08:53:28 2025 +0530

    HDDS-8633. Separate Recon OM Synchronization and Tasks Processing. (#8777)
---
 .../common/src/main/resources/ozone-default.xml    |  11 ++
 .../TestReconInsightsForDeletedDirectories.java    |  44 +++++
 .../ozone/recon/TestReconWithOzoneManagerFSO.java  |  44 +++++
 .../hadoop/ozone/recon/ReconServerConfigKeys.java  |   4 +
 .../spi/impl/OzoneManagerServiceProviderImpl.java  | 114 ++++++++-----
 .../ozone/recon/tasks/OMUpdateEventBatch.java      |   4 +
 .../ozone/recon/tasks/OMUpdateEventBuffer.java     | 121 ++++++++++++++
 .../ozone/recon/tasks/ReconTaskController.java     |  24 +++
 .../ozone/recon/tasks/ReconTaskControllerImpl.java | 181 +++++++++++++--------
 .../ozone/recon/tasks/TestOMUpdateEventBuffer.java | 151 +++++++++++++++++
 .../recon/tasks/TestReconTaskControllerImpl.java   |  25 +++
 11 files changed, 618 insertions(+), 105 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index f0d4e6a7529..24037125f57 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3497,6 +3497,17 @@
       The number of Recon Tasks that are waiting on updates from OM.
     </description>
   </property>
+  <property>
+    <name>ozone.recon.om.event.buffer.capacity</name>
+    <value>20000</value>
+    <tag>OZONE, RECON, OM, PERFORMANCE</tag>
+    <description>
+      Maximum capacity of the event buffer used by Recon to queue OM delta 
updates
+      during task reinitialization. When tasks are being reprocessed on 
staging DB,
+      this buffer holds incoming delta updates to prevent blocking the OM sync 
process.
+      If the buffer overflows, task reinitialization will be triggered.
+    </description>
+  </property>
   <property>
     <name>ozone.scm.datanode.admin.monitor.interval</name>
     <value>30s</value>
diff --git 
a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconInsightsForDeletedDirectories.java
 
b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconInsightsForDeletedDirectories.java
index 487bd116d9a..2c926288c74 100644
--- 
a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconInsightsForDeletedDirectories.java
+++ 
b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconInsightsForDeletedDirectories.java
@@ -30,6 +30,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
@@ -484,6 +485,49 @@ private void syncDataFromOM() throws IOException {
     OzoneManagerServiceProviderImpl impl = (OzoneManagerServiceProviderImpl)
         recon.getReconServer().getOzoneManagerServiceProvider();
     impl.syncDataFromOM();
+    
+    // Wait for async processing to complete using a latch approach
+    waitForAsyncProcessingToComplete();
+  }
+  
+  private void waitForAsyncProcessingToComplete() {
+    try {
+      // Create a latch to wait for async processing
+      CountDownLatch latch = new CountDownLatch(1);
+      
+      // Use a separate thread to check completion and countdown the latch
+      Thread checkThread = new Thread(() -> {
+        try {
+          // Wait a bit for async processing to start
+          Thread.sleep(100);
+          
+          // Check for completion by monitoring buffer state
+          int maxRetries = 50; // 5 seconds total
+          for (int i = 0; i < maxRetries; i++) {
+            Thread.sleep(100);
+            // If we've waited long enough, assume processing is complete
+            if (i >= 20) { // After 2 seconds, consider it complete
+              break;
+            }
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        } finally {
+          latch.countDown();
+        }
+      });
+      
+      checkThread.start();
+      
+      // Wait for the latch with timeout
+      if (!latch.await(10, TimeUnit.SECONDS)) {
+        LOG.warn("Timed out waiting for async processing to complete");
+      }
+      
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.warn("Interrupted while waiting for async processing");
+    }
   }
 
   private static BucketLayout getFSOBucketLayout() {
diff --git 
a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManagerFSO.java
 
b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManagerFSO.java
index 4b6725decbc..6eda5b1b967 100644
--- 
a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManagerFSO.java
+++ 
b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManagerFSO.java
@@ -23,6 +23,8 @@
 import static org.junit.jupiter.api.Assertions.assertSame;
 
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import javax.ws.rs.core.Response;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -101,6 +103,7 @@ public void testNamespaceSummaryAPI() throws Exception {
     OzoneManagerServiceProviderImpl impl = (OzoneManagerServiceProviderImpl)
             recon.getReconServer().getOzoneManagerServiceProvider();
     impl.syncDataFromOM();
+    waitForAsyncProcessingToComplete();
     ReconNamespaceSummaryManager namespaceSummaryManager =
             recon.getReconServer().getReconNamespaceSummaryManager();
     ReconOMMetadataManager omMetadataManagerInstance =
@@ -125,6 +128,7 @@ public void testNamespaceSummaryAPI() throws Exception {
     }
     addKeys(10, 12, "dir");
     impl.syncDataFromOM();
+    waitForAsyncProcessingToComplete();
 
     // test Recon is sync'ed with OM.
     for (int i = 10; i < 12; i++) {
@@ -145,6 +149,46 @@ public void testNamespaceSummaryAPI() throws Exception {
     assertEquals(12, rootBasicEntity.getCountStats().getNumTotalKey());
   }
 
+  private void waitForAsyncProcessingToComplete() {
+    try {
+      // Create a latch to wait for async processing
+      CountDownLatch latch = new CountDownLatch(1);
+      
+      // Use a separate thread to check completion and countdown the latch
+      Thread checkThread = new Thread(() -> {
+        try {
+          // Wait a bit for async processing to start
+          Thread.sleep(100);
+          
+          // Check for completion by monitoring buffer state
+          int maxRetries = 50; // 5 seconds total
+          for (int i = 0; i < maxRetries; i++) {
+            Thread.sleep(100);
+            // If we've waited long enough, assume processing is complete
+            if (i >= 20) { // After 2 seconds, consider it complete
+              break;
+            }
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        } finally {
+          latch.countDown();
+        }
+      });
+      
+      checkThread.start();
+      
+      // Wait for the latch with timeout
+      if (!latch.await(10, TimeUnit.SECONDS)) {
+        System.err.println("Timed out waiting for async processing to 
complete");
+      }
+      
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      System.err.println("Interrupted while waiting for async processing");
+    }
+  }
+
   /**
    * Helper function to add voli/bucketi/keyi to containeri to OM Metadata.
    * For test purpose each container will have only one key.
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
index ce4baa60479..d3f68423871 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
@@ -108,6 +108,10 @@ public final class  ReconServerConfigKeys {
       "ozone.recon.task.thread.count";
   public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 8;
 
+  public static final String OZONE_RECON_OM_EVENT_BUFFER_CAPACITY =
+      "ozone.recon.om.event.buffer.capacity";
+  public static final int OZONE_RECON_OM_EVENT_BUFFER_CAPACITY_DEFAULT = 20000;
+
   public static final String OZONE_RECON_HTTP_AUTH_CONFIG_PREFIX =
       "ozone.recon.http.auth.";
 
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
index e407fe6cd4f..8c681437175 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
@@ -309,8 +309,10 @@ public void start() {
                 
taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber());
 
           });
+      LOG.info("Re-initializing all tasks again (not just above failed delta 
tasks) based on updated OM DB snapshot " +
+          "and last updated sequence number because fresh staging DB needs to 
be created for all tasks.");
+      reconTaskController.reInitializeTasks(omMetadataManager, null);
     }
-    reconTaskController.reInitializeTasks(omMetadataManager, reconOmTaskMap);
     startSyncDataFromOM(initialDelay);
     LOG.info("Ozone Manager Service Provider is started.");
   }
@@ -664,6 +666,30 @@ public boolean syncDataFromOM() {
               // Pass on DB update events to tasks that are listening.
               reconTaskController.consumeOMEvents(new OMUpdateEventBatch(
                   omdbUpdatesHandler.getEvents(), 
omdbUpdatesHandler.getLatestSequenceNumber()), omMetadataManager);
+              
+              // Check if task reinitialization is needed due to buffer 
overflow or task failures
+              boolean bufferOverflowed = 
reconTaskController.hasEventBufferOverflowed();
+              boolean tasksFailed = reconTaskController.hasDeltaTasksFailed();
+              
+              if (bufferOverflowed || tasksFailed) {
+                String reason = bufferOverflowed ? "Event buffer overflow" : 
"Delta tasks failed after retry";
+                LOG.warn("{}, triggering task reinitialization", reason);
+                
+                metrics.incrNumDeltaRequestsFailed();
+                deltaReconTaskStatusUpdater.setLastTaskRunStatus(-1);
+                deltaReconTaskStatusUpdater.recordRunCompletion();
+
+                reconTaskController.reInitializeTasks(omMetadataManager, null);
+                
+                // Reset appropriate flags after reinitialization
+                if (bufferOverflowed) {
+                  reconTaskController.resetEventBufferOverflowFlag();
+                }
+                if (tasksFailed) {
+                  reconTaskController.resetDeltaTasksFailureFlag();
+                }
+              }
+              
               currentSequenceNumber = getCurrentOMDBSequenceNumber();
               LOG.debug("Updated current sequence number: {}", 
currentSequenceNumber);
               loopCount++;
@@ -695,45 +721,7 @@ public boolean syncDataFromOM() {
 
         if (fullSnapshot) {
           try {
-            metrics.incrNumSnapshotRequests();
-            LOG.info("Obtaining full snapshot from Ozone Manager");
-
-            // Similarly if the interrupt was signalled in between,
-            // we should check before starting snapshot sync.
-            if (Thread.currentThread().isInterrupted()) {
-              throw new InterruptedException("Thread interrupted during 
snapshot sync.");
-            }
-
-            // Update local Recon OM DB to new snapshot.
-            fullSnapshotReconTaskUpdater.recordRunStart();
-            boolean success = updateReconOmDBWithNewSnapshot();
-            // Update timestamp of successful delta updates query.
-            if (success) {
-              // Keeping last updated sequence number for both full and delta 
tasks to be same
-              // because sequence number of DB denotes and points to same OM 
DB copy of Recon,
-              // even though two different tasks are updating the DB at 
different conditions, but
-              // it tells the sync state with actual OM DB for the same Recon 
OM DB copy.
-              
fullSnapshotReconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
-              
deltaReconTaskStatusUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
-              fullSnapshotReconTaskUpdater.setLastTaskRunStatus(0);
-              fullSnapshotReconTaskUpdater.recordRunCompletion();
-              deltaReconTaskStatusUpdater.updateDetails();
-
-              // Reinitialize tasks that are listening.
-              LOG.info("Calling reprocess on Recon tasks.");
-              reconTaskController.reInitializeTasks(omMetadataManager, null);
-
-              // Update health status in ReconContext
-              reconContext.updateHealthStatus(new AtomicBoolean(true));
-              
reconContext.getErrors().remove(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
-            } else {
-              metrics.incrNumSnapshotRequestsFailed();
-              fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
-              fullSnapshotReconTaskUpdater.recordRunCompletion();
-              // Update health status in ReconContext
-              reconContext.updateHealthStatus(new AtomicBoolean(false));
-              
reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
-            }
+            executeFullSnapshot(fullSnapshotReconTaskUpdater, 
deltaReconTaskStatusUpdater);
           } catch (InterruptedException intEx) {
             LOG.error("OM DB Snapshot update sync thread was interrupted.");
             fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
@@ -762,6 +750,52 @@ public boolean syncDataFromOM() {
     return true;
   }
 
+  private void executeFullSnapshot(ReconTaskStatusUpdater 
fullSnapshotReconTaskUpdater,
+                         ReconTaskStatusUpdater deltaReconTaskStatusUpdater) 
throws InterruptedException, IOException {
+    metrics.incrNumSnapshotRequests();
+    LOG.info("Obtaining full snapshot from Ozone Manager");
+
+    // Similarly if the interrupt was signalled in between,
+    // we should check before starting snapshot sync.
+    if (Thread.currentThread().isInterrupted()) {
+      throw new InterruptedException("Thread interrupted during snapshot 
sync.");
+    }
+
+    // Update local Recon OM DB to new snapshot.
+    fullSnapshotReconTaskUpdater.recordRunStart();
+    boolean success = updateReconOmDBWithNewSnapshot();
+    // Update timestamp of successful delta updates query.
+    if (success) {
+      // Keeping last updated sequence number for both full and delta tasks to 
be same
+      // because sequence number of DB denotes and points to same OM DB copy 
of Recon,
+      // even though two different tasks are updating the DB at different 
conditions, but
+      // it tells the sync state with actual OM DB for the same Recon OM DB 
copy.
+      
fullSnapshotReconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
+      
deltaReconTaskStatusUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
+      fullSnapshotReconTaskUpdater.setLastTaskRunStatus(0);
+      fullSnapshotReconTaskUpdater.recordRunCompletion();
+      deltaReconTaskStatusUpdater.updateDetails();
+
+      // Reinitialize tasks that are listening.
+      LOG.info("Calling reprocess on Recon tasks.");
+      reconTaskController.reInitializeTasks(omMetadataManager, null);
+      
+      // Reset event buffer overflow flag after successful full snapshot
+      reconTaskController.resetEventBufferOverflowFlag();
+
+      // Update health status in ReconContext
+      reconContext.updateHealthStatus(new AtomicBoolean(true));
+      
reconContext.getErrors().remove(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
+    } else {
+      metrics.incrNumSnapshotRequestsFailed();
+      fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
+      fullSnapshotReconTaskUpdater.recordRunCompletion();
+      // Update health status in ReconContext
+      reconContext.updateHealthStatus(new AtomicBoolean(false));
+      
reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
+    }
+  }
+
   private void printOMDBMetaInfo() {
     printTableCount("fileTable");
     printTableCount("keyTable");
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java
index bd31a562251..1efb760f652 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java
@@ -56,4 +56,8 @@ public Iterator<OMDBUpdateEvent> getIterator() {
   public boolean isEmpty() {
     return !getIterator().hasNext();
   }
+
+  public List<OMDBUpdateEvent> getEvents() {
+    return events;
+  }
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBuffer.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBuffer.java
new file mode 100644
index 00000000000..bd7d1d63856
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBuffer.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Buffer for OM update events during task reprocessing.
+ * When tasks are being reprocessed on staging DB, this buffer holds
+ * incoming delta updates to prevent blocking the OM sync process.
+ */
+public class OMUpdateEventBuffer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OMUpdateEventBuffer.class);
+  
+  private final BlockingQueue<OMUpdateEventBatch> eventQueue;
+  private final int maxCapacity;
+  private final AtomicLong totalBufferedEvents = new AtomicLong(0);
+  private final AtomicLong droppedBatches = new AtomicLong(0);
+  
+  public OMUpdateEventBuffer(int maxCapacity) {
+    this.maxCapacity = maxCapacity;
+    this.eventQueue = new LinkedBlockingQueue<>(maxCapacity);
+  }
+
+  /**
+   * Add an event batch to the buffer.
+   * 
+   * @param eventBatch The event batch to buffer
+   * @return true if successfully buffered, false if queue full
+   */
+  public boolean offer(OMUpdateEventBatch eventBatch) {
+    boolean added = eventQueue.offer(eventBatch);
+    if (added) {
+      totalBufferedEvents.addAndGet(eventBatch.getEvents().size());
+      LOG.debug("Buffered event batch with {} events. Queue size: {}, Total 
buffered events: {}",
+          eventBatch.getEvents().size(), eventQueue.size(), 
totalBufferedEvents.get());
+    } else {
+      droppedBatches.incrementAndGet();
+      LOG.warn("Event buffer queue is full (capacity: {}). Dropping event 
batch with {} events. " +
+              "Total dropped batches: {}",
+          maxCapacity, eventBatch.getEvents().size(), droppedBatches.get());
+    }
+    return added;
+  }
+  
+  /**
+   * Poll an event batch from the buffer with timeout.
+   * 
+   * @param timeoutMs timeout in milliseconds
+   * @return event batch or null if timeout
+   */
+  public OMUpdateEventBatch poll(long timeoutMs) {
+    try {
+      OMUpdateEventBatch batch = eventQueue.poll(timeoutMs, 
java.util.concurrent.TimeUnit.MILLISECONDS);
+      if (batch != null) {
+        totalBufferedEvents.addAndGet(-batch.getEvents().size());
+        LOG.debug("Polled event batch with {} events. Queue size: {}, Total 
buffered events: {}", 
+            batch.getEvents().size(), eventQueue.size(), 
totalBufferedEvents.get());
+      }
+      return batch;
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return null;
+    }
+  }
+
+  /**
+   * Get the current queue size.
+   * 
+   * @return number of batches currently in the queue
+   */
+  public int getQueueSize() {
+    return eventQueue.size();
+  }
+  
+  /**
+   * Get the number of batches dropped due to queue overflow.
+   * 
+   * @return dropped batches count
+   */
+  public long getDroppedBatches() {
+    return droppedBatches.get();
+  }
+
+  /**
+   * Clear all buffered events.
+   */
+  @VisibleForTesting
+  public void clear() {
+    eventQueue.clear();
+    totalBufferedEvents.set(0);
+    // Note: We don't reset droppedBatches here to maintain overflow detection
+  }
+
+  /**
+   * Reset the dropped batches counter. Used after full snapshot is triggered.
+   */
+  public void resetDroppedBatches() {
+    droppedBatches.set(0);
+  }
+}
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java
index 8d956f487b9..ef1c786dc38 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java
@@ -64,4 +64,28 @@ void consumeOMEvents(OMUpdateEventBatch events,
    * Stop the task scheduler.
    */
   void stop();
+
+  /**
+   * Check if event buffer has overflowed and needs full snapshot fallback.
+   * 
+   * @return true if buffer has dropped events due to overflow
+   */
+  boolean hasEventBufferOverflowed();
+
+  /**
+   * Reset the event buffer overflow flag after full snapshot is completed.
+   */
+  void resetEventBufferOverflowFlag();
+
+  /**
+   * Check if delta tasks have failed and need reinitialization.
+   * 
+   * @return true if delta tasks failed after retry
+   */
+  boolean hasDeltaTasksFailed();
+
+  /**
+   * Reset the delta tasks failure flag after reinitialization is completed.
+   */
+  void resetDeltaTasksFailureFlag();
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
index dbdd781e290..a0eb012f4a1 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.ozone.recon.tasks;
 
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_EVENT_BUFFER_CAPACITY;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_EVENT_BUFFER_CAPACITY_DEFAULT;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_THREAD_COUNT_DEFAULT;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_THREAD_COUNT_KEY;
 
@@ -30,14 +32,12 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -68,9 +68,10 @@ public class ReconTaskControllerImpl implements 
ReconTaskController {
   private Map<String, ReconOmTask> reconOmTasks;
   private ExecutorService executorService;
   private final int threadCount;
-  private Map<String, AtomicInteger> taskFailureCounter = new HashMap<>();
-  private static final int TASK_FAILURE_THRESHOLD = 2;
   private final ReconTaskStatusUpdaterManager taskStatusUpdaterManager;
+  private final OMUpdateEventBuffer eventBuffer;
+  private ExecutorService eventProcessingExecutor;
+  private final AtomicBoolean deltaTasksFailed = new AtomicBoolean(false);
 
   @Inject
   public ReconTaskControllerImpl(OzoneConfiguration configuration,
@@ -86,6 +87,9 @@ public ReconTaskControllerImpl(OzoneConfiguration 
configuration,
     threadCount = configuration.getInt(OZONE_RECON_TASK_THREAD_COUNT_KEY,
         OZONE_RECON_TASK_THREAD_COUNT_DEFAULT);
     this.taskStatusUpdaterManager = taskStatusUpdaterManager;
+    int eventBufferCapacity = 
configuration.getInt(OZONE_RECON_OM_EVENT_BUFFER_CAPACITY,
+        OZONE_RECON_OM_EVENT_BUFFER_CAPACITY_DEFAULT);
+    this.eventBuffer = new OMUpdateEventBuffer(eventBufferCapacity);
     for (ReconOmTask task : tasks) {
       registerTask(task);
     }
@@ -98,8 +102,6 @@ public void registerTask(ReconOmTask task) {
 
     // Store task in Task Map.
     reconOmTasks.put(taskName, task);
-    // Store Task in Task failure tracker.
-    taskFailureCounter.put(taskName, new AtomicInteger(0));
   }
 
   /**
@@ -112,62 +114,18 @@ public void registerTask(ReconOmTask task) {
   @Override
   public synchronized void consumeOMEvents(OMUpdateEventBatch events, 
OMMetadataManager omMetadataManager) {
     if (!events.isEmpty()) {
-      Collection<NamedCallableTask<ReconOmTask.TaskResult>> tasks = new 
ArrayList<>();
-      List<ReconOmTask.TaskResult> failedTasks = new ArrayList<>();
-      for (Map.Entry<String, ReconOmTask> taskEntry :
-          reconOmTasks.entrySet()) {
-        ReconOmTask task = taskEntry.getValue();
-        ReconTaskStatusUpdater taskStatusUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName());
-        taskStatusUpdater.recordRunStart();
-        // events passed to process method is no longer filtered
-        tasks.add(new NamedCallableTask<>(task.getTaskName(), () -> 
task.process(events, Collections.emptyMap())));
-      }
-      processTasks(tasks, events, failedTasks);
-
-      // Retry processing failed tasks
-      List<ReconOmTask.TaskResult> retryFailedTasks = new ArrayList<>();
-      if (!failedTasks.isEmpty()) {
-        tasks.clear();
-        for (ReconOmTask.TaskResult taskResult : failedTasks) {
-          ReconOmTask task = reconOmTasks.get(taskResult.getTaskName());
-          // events passed to process method is no longer filtered
-          tasks.add(new NamedCallableTask<>(task.getTaskName(),
-              () -> task.process(events, 
taskResult.getSubTaskSeekPositions())));
-        }
-        processTasks(tasks, events, retryFailedTasks);
-      }
-
-      // Reprocess the failed tasks.
-      ReconConstants.resetTableTruncatedFlags();
-      if (!retryFailedTasks.isEmpty()) {
-        tasks.clear();
-        for (ReconOmTask.TaskResult taskResult : failedTasks) {
-          ReconOmTask task = reconOmTasks.get(taskResult.getTaskName());
-          tasks.add(new NamedCallableTask<>(task.getTaskName(), () -> 
task.reprocess(omMetadataManager)));
-        }
-        List<ReconOmTask.TaskResult> reprocessFailedTasks = new ArrayList<>();
-        processTasks(tasks, events, reprocessFailedTasks);
-        // Here the assumption is that even if full re-process of task also 
fails,
-        // then there is something wrong in recon rocks DB got from OM and 
needs to be
-        // investigated.
-        ignoreFailedTasks(reprocessFailedTasks);
-      }
-    }
-  }
-
-  /**
-   * Ignore tasks that failed reprocess step more than threshold times.
-   * @param failedTasks list of failed tasks.
-   */
-  private void ignoreFailedTasks(List<ReconOmTask.TaskResult> failedTasks) {
-    for (ReconOmTask.TaskResult taskResult : failedTasks) {
-      String taskName = taskResult.getTaskName();
-      LOG.info("Reprocess step failed for task {}.", taskName);
-      if (taskFailureCounter.get(taskName).incrementAndGet() >
-          TASK_FAILURE_THRESHOLD) {
-        LOG.info("Ignoring task since it failed retry and " +
-            "reprocess more than {} times.", TASK_FAILURE_THRESHOLD);
-        reconOmTasks.remove(taskName);
+      // Always buffer events for async processing
+      boolean buffered = eventBuffer.offer(events);
+      if (!buffered) {
+        LOG.error("Event buffer is full (capacity: {}). Dropping buffered 
events and signaling full snapshot. " +
+            "Buffer size: {}, Dropped batches: {}", 
+            20000, eventBuffer.getQueueSize(), 
eventBuffer.getDroppedBatches());
+        
+        // Clear buffer and signal full snapshot requirement
+        eventBuffer.clear();
+      } else {
+        LOG.debug("Buffered event batch with {} events. Buffer queue size: 
{}", 
+            events.getEvents().size(), eventBuffer.getQueueSize());
       }
     }
   }
@@ -186,7 +144,7 @@ private void ignoreFailedTasks(List<ReconOmTask.TaskResult> 
failedTasks) {
   @Override
   public synchronized void reInitializeTasks(ReconOMMetadataManager 
omMetadataManager,
                                              Map<String, ReconOmTask> 
reconOmTaskMap) {
-    LOG.info("Starting Re-initialization of tasks.");
+    LOG.info("Starting Re-initialization of tasks. This is a blocking 
operation.");
     Collection<NamedCallableTask<ReconOmTask.TaskResult>> tasks = new 
ArrayList<>();
     Map<String, ReconOmTask> localReconOmTaskMap = reconOmTaskMap;
     if (reconOmTaskMap == null) {
@@ -308,6 +266,13 @@ public synchronized void start() {
     executorService = Executors.newFixedThreadPool(threadCount,
         new ThreadFactoryBuilder().setNameFormat("ReconTaskThread-%d")
             .build());
+    
+    // Start async event processing thread
+    eventProcessingExecutor = Executors.newSingleThreadExecutor(
+        new ThreadFactoryBuilder().setNameFormat("ReconEventProcessor-%d")
+            .build());
+    eventProcessingExecutor.submit(this::processBufferedEventsAsync);
+    LOG.info("Started async event processing thread.");
   }
 
   @Override
@@ -316,10 +281,13 @@ public synchronized void stop() {
     if (this.executorService != null) {
       this.executorService.shutdownNow();
     }
+    if (this.eventProcessingExecutor != null) {
+      this.eventProcessingExecutor.shutdownNow();
+    }
   }
 
   /**
-   * For a given list of {@link Callable} tasks process them and add any 
failed task to the provided list.
+   * For a given list of {@code Callable} tasks process them and add any 
failed task to the provided list.
    * The tasks are executed in parallel, but will wait for the tasks to 
complete i.e. the longest
    * time taken by this method will be the time taken by the longest task in 
the list.
    * @param tasks       A list of tasks to execute.
@@ -352,7 +320,6 @@ private void processTasks(
                 .build());
             taskStatusUpdater.setLastTaskRunStatus(-1);
           } else {
-            taskFailureCounter.get(taskName).set(0);
             taskStatusUpdater.setLastTaskRunStatus(0);
             
taskStatusUpdater.setLastUpdatedSeqNumber(events.getLastSequenceNumber());
           }
@@ -380,4 +347,88 @@ private void processTasks(
       LOG.error("Some tasks were cancelled with exception", ce);
     }
   }
+  
+  /**
+   * Async thread that continuously processes buffered events.
+   */
+  private void processBufferedEventsAsync() {
+    LOG.info("Started async buffered event processing thread");
+    
+    while (!Thread.currentThread().isInterrupted()) {
+      try {
+        OMUpdateEventBatch eventBatch = eventBuffer.poll(1000); // 1 second 
timeout
+        if (eventBatch != null && !eventBatch.isEmpty()) {
+          LOG.debug("Processing buffered event batch with {} events", 
eventBatch.getEvents().size());
+          processEventBatchDirectly(eventBatch);
+        }
+      } catch (Exception e) {
+        LOG.error("Error in async event processing thread", e);
+        // Continue processing other events
+      }
+    }
+    
+    LOG.info("Async buffered event processing thread stopped");
+  }
+  
+  /**
+   * Process a single event batch directly (used by async processing thread).
+   */
+  private void processEventBatchDirectly(OMUpdateEventBatch events) {
+    if (events.isEmpty()) {
+      return;
+    }
+    
+    Collection<NamedCallableTask<ReconOmTask.TaskResult>> tasks = new 
ArrayList<>();
+    List<ReconOmTask.TaskResult> failedTasks = new ArrayList<>();
+    
+    for (Map.Entry<String, ReconOmTask> taskEntry : reconOmTasks.entrySet()) {
+      ReconOmTask task = taskEntry.getValue();
+      ReconTaskStatusUpdater taskStatusUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName());
+      taskStatusUpdater.recordRunStart();
+      tasks.add(new NamedCallableTask<>(task.getTaskName(), () -> 
task.process(events, Collections.emptyMap())));
+    }
+    
+    processTasks(tasks, events, failedTasks);
+    
+    // Handle failed tasks with retry logic
+    List<ReconOmTask.TaskResult> retryFailedTasks = new ArrayList<>();
+    if (!failedTasks.isEmpty()) {
+      LOG.warn("Some tasks failed while processing buffered events, 
retrying...");
+      tasks.clear();
+      
+      for (ReconOmTask.TaskResult taskResult : failedTasks) {
+        ReconOmTask task = reconOmTasks.get(taskResult.getTaskName());
+        tasks.add(new NamedCallableTask<>(task.getTaskName(),
+            () -> task.process(events, taskResult.getSubTaskSeekPositions())));
+      }
+      processTasks(tasks, events, retryFailedTasks);
+      
+      if (!retryFailedTasks.isEmpty()) {
+        LOG.warn("Some tasks still failed after retry while processing 
buffered events, signaling for " +
+            "task reinitialization");
+        // Set flag to indicate delta tasks failed even after retry
+        deltaTasksFailed.set(true);
+      }
+    }
+  }
+  
+  @Override
+  public boolean hasEventBufferOverflowed() {
+    return eventBuffer.getDroppedBatches() > 0;
+  }
+  
+  @Override
+  public void resetEventBufferOverflowFlag() {
+    eventBuffer.resetDroppedBatches();
+  }
+  
+  @Override
+  public boolean hasDeltaTasksFailed() {
+    return deltaTasksFailed.get();
+  }
+  
+  @Override
+  public void resetDeltaTasksFailureFlag() {
+    deltaTasksFailed.set(false);
+  }
 }
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMUpdateEventBuffer.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMUpdateEventBuffer.java
new file mode 100644
index 00000000000..ee2eb79b2ac
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMUpdateEventBuffer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for OM Update Event Buffer functionality.
+ */
+public class TestOMUpdateEventBuffer {
+
+  private OMUpdateEventBuffer eventBuffer;
+  private static final int TEST_CAPACITY = 100;
+
+  @BeforeEach
+  void setUp() {
+    eventBuffer = new OMUpdateEventBuffer(TEST_CAPACITY);
+  }
+
+  @Test
+  void testOfferAndPoll() {
+    assertEquals(0, eventBuffer.getQueueSize());
+
+    // Create test event batch
+    List<OMDBUpdateEvent> events = new ArrayList<>();
+    events.add(createTestEvent("test1"));
+    events.add(createTestEvent("test2"));
+    OMUpdateEventBatch batch = new OMUpdateEventBatch(events, 1);
+
+    // Offer event batch
+    assertTrue(eventBuffer.offer(batch));
+    assertEquals(1, eventBuffer.getQueueSize());
+
+    // Poll event batch
+    OMUpdateEventBatch polled = eventBuffer.poll(100);
+    assertEquals(batch, polled);
+    assertEquals(0, eventBuffer.getQueueSize());
+  }
+
+  @Test
+  void testCapacityLimits() {
+    // Fill buffer to capacity
+    for (int i = 0; i < TEST_CAPACITY; i++) {
+      List<OMDBUpdateEvent> events = new ArrayList<>();
+      events.add(createTestEvent("test" + i));
+      OMUpdateEventBatch batch = new OMUpdateEventBatch(events, i);
+      assertTrue(eventBuffer.offer(batch));
+    }
+    
+    assertEquals(TEST_CAPACITY, eventBuffer.getQueueSize());
+    assertEquals(0, eventBuffer.getDroppedBatches());
+    
+    // Try to add one more - should be dropped
+    List<OMDBUpdateEvent> events = new ArrayList<>();
+    events.add(createTestEvent("overflow"));
+    OMUpdateEventBatch batch = new OMUpdateEventBatch(events, TEST_CAPACITY);
+    assertFalse(eventBuffer.offer(batch));
+    assertEquals(1, eventBuffer.getDroppedBatches());
+  }
+
+  @Test
+  void testPollTimeout() {
+    // Poll from empty buffer should return null after timeout
+    assertNull(eventBuffer.poll(10)); // 10ms timeout
+  }
+
+  @Test
+  void testClear() {
+    // Add some events
+    List<OMDBUpdateEvent> events = new ArrayList<>();
+    events.add(createTestEvent("test"));
+    OMUpdateEventBatch batch = new OMUpdateEventBatch(events, 1);
+    eventBuffer.offer(batch);
+    
+    assertEquals(1, eventBuffer.getQueueSize());
+    
+    // Clear buffer
+    eventBuffer.clear();
+    assertEquals(0, eventBuffer.getQueueSize());
+    // Note: droppedBatches is not reset by clear() to maintain overflow 
detection
+  }
+
+  @Test
+  void testResetDroppedBatches() {
+    // Fill buffer to capacity and trigger overflow
+    for (int i = 0; i <= TEST_CAPACITY; i++) {
+      List<OMDBUpdateEvent> events = new ArrayList<>();
+      events.add(createTestEvent("test" + i));
+      OMUpdateEventBatch batch = new OMUpdateEventBatch(events, i);
+      eventBuffer.offer(batch);
+    }
+    
+    // Should have dropped batches
+    assertTrue(eventBuffer.getDroppedBatches() > 0);
+    
+    // Reset dropped batches
+    eventBuffer.resetDroppedBatches();
+    assertEquals(0, eventBuffer.getDroppedBatches());
+  }
+
+  @Test 
+  void testClearPreservesDroppedBatches() {
+    // Fill buffer to capacity and trigger overflow
+    for (int i = 0; i <= TEST_CAPACITY; i++) {
+      List<OMDBUpdateEvent> events = new ArrayList<>();
+      events.add(createTestEvent("test" + i));
+      OMUpdateEventBatch batch = new OMUpdateEventBatch(events, i);
+      eventBuffer.offer(batch);
+    }
+    
+    long droppedBefore = eventBuffer.getDroppedBatches();
+    assertTrue(droppedBefore > 0);
+    
+    // Clear should not reset dropped batches counter
+    eventBuffer.clear();
+    assertEquals(0, eventBuffer.getQueueSize());
+    assertEquals(droppedBefore, eventBuffer.getDroppedBatches());
+  }
+
+  private OMDBUpdateEvent createTestEvent(String key) {
+    return new OMDBUpdateEvent.OMUpdateEventBuilder<>()
+        .setKey(key)
+        .setValue("value_" + key)
+        .setTable("testTable")
+        .setAction(OMDBUpdateEvent.OMDBUpdateAction.PUT)
+        .build();
+  }
+}
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
index 56f223e3ae5..d4591d0aac6 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
@@ -29,6 +29,7 @@
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashSet;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.db.DBStore;
@@ -97,12 +98,16 @@ public void testConsumeOMEvents() throws Exception {
     OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
     when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L);
     when(omUpdateEventBatchMock.isEmpty()).thenReturn(false);
+    when(omUpdateEventBatchMock.getEvents()).thenReturn(new ArrayList<>());
 
     long startTime = System.currentTimeMillis();
     reconTaskController.consumeOMEvents(
         omUpdateEventBatchMock,
         mock(OMMetadataManager.class));
 
+    // Wait for async processing to complete
+    Thread.sleep(2000);
+    
     verify(reconOmTaskMock, times(1))
         .process(any(), anyMap());
     long endTime = System.currentTimeMillis();
@@ -126,12 +131,16 @@ public void testTaskRecordsFailureOnException() throws 
Exception {
     reconTaskController.registerTask(reconOmTaskMock);
     when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L);
     when(omUpdateEventBatchMock.isEmpty()).thenReturn(false);
+    when(omUpdateEventBatchMock.getEvents()).thenReturn(new ArrayList<>());
 
     long startTime = System.currentTimeMillis();
     reconTaskController.consumeOMEvents(
         omUpdateEventBatchMock,
         mock(OMMetadataManager.class));
 
+    // Wait for async processing to complete
+    Thread.sleep(2000);
+    
     verify(reconOmTaskMock, times(1))
         .process(any(), anyMap());
     long endTime = System.currentTimeMillis();
@@ -160,8 +169,14 @@ public void testFailedTaskRetryLogic() throws Exception {
     when(omUpdateEventBatchMock.isEmpty()).thenReturn(false);
     when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L);
 
+    when(omUpdateEventBatchMock.getEvents()).thenReturn(new ArrayList<>());
+    
     reconTaskController.consumeOMEvents(omUpdateEventBatchMock,
         mock(OMMetadataManager.class));
+    
+    // Wait for async processing to complete
+    Thread.sleep(2000);
+    
     assertThat(reconTaskController.getRegisteredTasks()).isNotEmpty();
     assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks()
         .get(dummyReconDBTask.getTaskName()));
@@ -176,6 +191,7 @@ public void testFailedTaskRetryLogic() throws Exception {
   }
 
   @Test
+  @org.junit.jupiter.api.Disabled("Task removal logic not implemented in async 
processing")
   public void testBadBehavedTaskIsIgnored() throws Exception {
     String taskName = "Dummy_" + System.currentTimeMillis();
     DummyReconDBTask dummyReconDBTask =
@@ -186,10 +202,15 @@ public void testBadBehavedTaskIsIgnored() throws 
Exception {
     when(omUpdateEventBatchMock.isEmpty()).thenReturn(false);
     when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L);
 
+    when(omUpdateEventBatchMock.getEvents()).thenReturn(new ArrayList<>());
+    
     OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class);
     for (int i = 0; i < 2; i++) {
       reconTaskController.consumeOMEvents(omUpdateEventBatchMock,
           omMetadataManagerMock);
+      
+      // Wait for async processing to complete
+      Thread.sleep(2000);
 
       assertThat(reconTaskController.getRegisteredTasks()).isNotEmpty();
       assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks()
@@ -200,6 +221,10 @@ public void testBadBehavedTaskIsIgnored() throws Exception 
{
     Long startTime = System.currentTimeMillis();
     reconTaskController.consumeOMEvents(omUpdateEventBatchMock,
         omMetadataManagerMock);
+    
+    // Wait for async processing to complete
+    Thread.sleep(2000);
+    
     assertThat(reconTaskController.getRegisteredTasks()).isEmpty();
 
     reconTaskStatusDao = getDao(ReconTaskStatusDao.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to