This is an automated email from the ASF dual-hosted git repository.

arafat2198 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a55efa17c9b HDDS-13573. Intermittent failure in 
TestNSSummaryUnifiedControl.testMultipleConcurrentAttempts (#9416).
a55efa17c9b is described below

commit a55efa17c9b622d98d98b35783f45df73182c3d4
Author: Devesh Kumar Singh <[email protected]>
AuthorDate: Wed Dec 10 12:30:03 2025 +0530

    HDDS-13573. Intermittent failure in 
TestNSSummaryUnifiedControl.testMultipleConcurrentAttempts (#9416).
---
 .../recon/tasks/TestNSSummaryUnifiedControl.java   | 805 ++++++++++++++-------
 1 file changed, 552 insertions(+), 253 deletions(-)

diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryUnifiedControl.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryUnifiedControl.java
index e87852ebc0f..415c3952d33 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryUnifiedControl.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryUnifiedControl.java
@@ -17,20 +17,28 @@
 
 package org.apache.hadoop.ozone.recon.tasks;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
@@ -44,13 +52,21 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
 import org.apache.hadoop.ozone.recon.ReconUtils;
 import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconFileMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconGlobalStatsManager;
 import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
+import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider;
 import org.apache.hadoop.ozone.recon.tasks.NSSummaryTask.RebuildState;
-import org.apache.hadoop.ozone.recon.tasks.ReconOmTask.TaskResult;
-import org.apache.ozone.test.tag.Flaky;
+import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdater;
+import 
org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager;
+import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -58,67 +74,133 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * Integration tests for HDDS-13443: Unified and controlled sync access to 
- * retrigger of build of NSSummary tree.
- * 
- * <p>These tests verify that the unified control mechanism prevents concurrent
- * rebuilds and properly manages state transitions across all entry points.
+ * Unified and controlled sync access to
+ * retrigger of build of NSSummary tree using queue-based architecture.
+ *
+ * <p>These tests verify that the queue-based unified control mechanism
+ * correctly handles concurrent queueReInitializationEvent() calls in 
production.
+ *
+ * <p>Execution Flow:
+ * <pre>
+ * Multiple Concurrent Callers
+ *         ↓  ↓  ↓
+ * queueReInitializationEvent()  [Thread-safe public API]
+ *         ↓
+ * BlockingQueue&lt;ReconEvent&gt;     [Serialization layer]
+ *         ↓
+ * Single Async Thread           [Sequential processing]
+ *         ↓
+ * processReInitializationEvent()
+ *         ↓
+ * reInitializeTasks()
+ *         ↓
+ * task.reprocess()              [Only ONE execution at a time]
+ * </pre>
  */
 public class TestNSSummaryUnifiedControl {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestNSSummaryUnifiedControl.class);
 
-  private NSSummaryTask nsSummaryTask;
+  private ReconTaskControllerImpl taskController;
   private ReconNamespaceSummaryManager mockNamespaceSummaryManager;
   private ReconOMMetadataManager mockReconOMMetadataManager;
-  private OMMetadataManager mockOMMetadataManager;
   private OzoneConfiguration ozoneConfiguration;
 
   @BeforeEach
-  void setUp() throws IOException {
+  void setUp() throws Exception {
     // Reset static state before each test
     NSSummaryTask.resetRebuildState();
-    
+
     // Create mocks
     mockNamespaceSummaryManager = mock(ReconNamespaceSummaryManager.class);
     mockReconOMMetadataManager = mock(ReconOMMetadataManager.class);
-    mockOMMetadataManager = mock(OMMetadataManager.class);
     ozoneConfiguration = new OzoneConfiguration();
 
-    // Create NSSummaryTask instance that will use mocked sub-tasks
-    nsSummaryTask = createTestableNSSummaryTask();
+    // Configure small buffer for easier testing
+    
ozoneConfiguration.setInt(ReconServerConfigKeys.OZONE_RECON_OM_EVENT_BUFFER_CAPACITY,
 100);
+
+    // Setup task controller
+    ReconTaskStatusUpdaterManager mockTaskStatusUpdaterManager = 
mock(ReconTaskStatusUpdaterManager.class);
+    ReconTaskStatusUpdater mockTaskStatusUpdater = 
mock(ReconTaskStatusUpdater.class);
+    
when(mockTaskStatusUpdaterManager.getTaskStatusUpdater(any())).thenReturn(mockTaskStatusUpdater);
+
+    ReconDBProvider reconDbProvider = mock(ReconDBProvider.class);
+    when(reconDbProvider.getDbStore()).thenReturn(mock(DBStore.class));
+    
when(reconDbProvider.getStagedReconDBProvider()).thenReturn(reconDbProvider);
+
+    ReconContainerMetadataManager reconContainerMgr = 
mock(ReconContainerMetadataManager.class);
+    ReconGlobalStatsManager reconGlobalStatsManager = 
mock(ReconGlobalStatsManager.class);
+    ReconFileMetadataManager reconFileMetadataManager = 
mock(ReconFileMetadataManager.class);
+
+    taskController = new ReconTaskControllerImpl(ozoneConfiguration, new 
HashSet<>(),
+        mockTaskStatusUpdaterManager, reconDbProvider, reconContainerMgr, 
mockNamespaceSummaryManager,
+        reconGlobalStatsManager, reconFileMetadataManager);
+
+    // Register testable NSSummaryTask instance
+    taskController.registerTask(createTestableNSSummaryTask());
+
+    // Setup mock OM metadata manager with checkpoint support
+    setupMockOMMetadataManager();
+    taskController.updateOMMetadataManager(mockReconOMMetadataManager);
+
+    // Setup successful rebuild by default
+    doNothing().when(mockNamespaceSummaryManager).clearNSSummaryTable();
+
+    // Start async processing
+    taskController.start();
   }
 
   @AfterEach
   void tearDown() {
-    // Reset static state after each test to ensure test isolation
+    // Reset static state after each test
     NSSummaryTask.resetRebuildState();
+
+    // Shutdown task controller
+    if (taskController != null) {
+      taskController.stop();
+    }
   }
-  
-  /**
-   * Create a testable NSSummaryTask that uses mocked sub-tasks for successful 
execution.
-   */
+
+  private void setupMockOMMetadataManager() throws IOException {
+    DBStore mockDBStore = mock(DBStore.class);
+    File mockDbLocation = mock(File.class);
+    DBCheckpoint mockCheckpoint = mock(DBCheckpoint.class);
+    Path mockCheckpointPath = Paths.get("/tmp/test/checkpoint");
+
+    when(mockReconOMMetadataManager.getStore()).thenReturn(mockDBStore);
+    when(mockDBStore.getDbLocation()).thenReturn(mockDbLocation);
+    when(mockDbLocation.getParent()).thenReturn("/tmp/test");
+    when(mockDBStore.getCheckpoint(anyString(), 
any(Boolean.class))).thenReturn(mockCheckpoint);
+    
when(mockCheckpoint.getCheckpointLocation()).thenReturn(mockCheckpointPath);
+
+    ReconOMMetadataManager mockCheckpointedManager = 
mock(ReconOMMetadataManager.class);
+    when(mockCheckpointedManager.getStore()).thenReturn(mockDBStore);
+    
when(mockReconOMMetadataManager.createCheckpointReconMetadataManager(any(), 
any()))
+        .thenReturn(mockCheckpointedManager);
+  }
+
   private NSSummaryTask createTestableNSSummaryTask() {
     return new NSSummaryTask(
-        mockNamespaceSummaryManager, 
-        mockReconOMMetadataManager, 
+        mockNamespaceSummaryManager,
+        mockReconOMMetadataManager,
         ozoneConfiguration) {
-      
+
       @Override
       public TaskResult buildTaskResult(boolean success) {
         return super.buildTaskResult(success);
       }
-      
-      @Override 
+
+      @Override
+      public NSSummaryTask getStagedTask(ReconOMMetadataManager 
stagedOmMetadataManager, DBStore stagedReconDbStore)
+          throws IOException {
+        return this;
+      }
+
+      @Override
       protected TaskResult executeReprocess(OMMetadataManager 
omMetadataManager, long startTime) {
-        // Simplified test implementation that mimics the real execution flow
-        // but bypasses the complex sub-task execution while maintaining 
proper state management
-        
-        // Initialize a list of tasks to run in parallel (empty for testing)
         Collection<Callable<Boolean>> tasks = new ArrayList<>();
 
         try {
-          // This will call the mocked clearNSSummaryTable (might throw 
Exception for failure tests)
           getReconNamespaceSummaryManager().clearNSSummaryTable();
         } catch (IOException ioEx) {
           LOG.error("Unable to clear NSSummary table in Recon DB. ", ioEx);
@@ -126,10 +208,9 @@ protected TaskResult executeReprocess(OMMetadataManager 
omMetadataManager, long
           return buildTaskResult(false);
         }
 
-        // Add mock sub-tasks that always succeed
-        tasks.add(() -> true); // Mock FSO task
-        tasks.add(() -> true); // Mock Legacy task  
-        tasks.add(() -> true); // Mock OBS task
+        tasks.add(() -> true);
+        tasks.add(() -> true);
+        tasks.add(() -> true);
 
         List<Future<Boolean>> results;
         ThreadFactory threadFactory = new ThreadFactoryBuilder()
@@ -137,7 +218,7 @@ protected TaskResult executeReprocess(OMMetadataManager 
omMetadataManager, long
             .build();
         ExecutorService executorService = Executors.newFixedThreadPool(2, 
threadFactory);
         boolean success = false;
-        
+
         try {
           results = executorService.invokeAll(tasks);
           for (Future<Boolean> result : results) {
@@ -148,23 +229,21 @@ protected TaskResult executeReprocess(OMMetadataManager 
omMetadataManager, long
             }
           }
           success = true;
-          
+
         } catch (InterruptedException | ExecutionException ex) {
           LOG.error("Error while reprocessing NSSummary table in Recon DB.", 
ex);
           NSSummaryTask.setRebuildStateToFailed();
           return buildTaskResult(false);
-          
+
         } finally {
           executorService.shutdown();
-
           long endTime = System.nanoTime();
           long durationInMillis = TimeUnit.NANOSECONDS.toMillis(endTime - 
startTime);
           LOG.info("Test NSSummary reprocess execution time: {} milliseconds", 
durationInMillis);
-          
-          // Reset state to IDLE on successful completion
+
           if (success) {
             NSSummaryTask.resetRebuildState();
-            LOG.info("Test NSSummary tree reprocess completed successfully 
with unified control.");
+            LOG.info("Test NSSummary tree reprocess completed successfully.");
           }
         }
 
@@ -183,215 +262,392 @@ void testInitialState() {
   }
 
   /**
-   * Test successful single rebuild operation.
+   * Test single successful rebuild via queue.
    */
   @Test
   void testSingleSuccessfulRebuild() throws Exception {
-    // Setup successful rebuild
-    // Setup successful rebuild by default - no exception thrown
-    doNothing().when(mockNamespaceSummaryManager).clearNSSummaryTable();
+    AtomicBoolean rebuildExecuted = new AtomicBoolean(false);
+    CountDownLatch rebuildLatch = new CountDownLatch(1);
+
+    doAnswer(invocation -> {
+      rebuildExecuted.set(true);
+      rebuildLatch.countDown();
+      return null;
+    }).when(mockNamespaceSummaryManager).clearNSSummaryTable();
+
+    // Queue rebuild via production API
+    ReconTaskController.ReInitializationResult result =
+        taskController.queueReInitializationEvent(
+            
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
+
+    assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result,
+        "Rebuild should be queued successfully");
 
-    // Execute rebuild
-    TaskResult result = nsSummaryTask.reprocess(mockOMMetadataManager);
+    // Wait for async processing
+    assertTrue(rebuildLatch.await(10, TimeUnit.SECONDS),
+        "Rebuild should execute");
+    assertTrue(rebuildExecuted.get(), "Rebuild should have executed");
 
-    // Verify results
-    assertTrue(result.isTaskSuccess(), "Rebuild should succeed");
+    // wait for 5 secs for state to return to IDLE
+    GenericTestUtils.waitFor(() -> NSSummaryTask.getRebuildState() == 
RebuildState.IDLE,
+        100, 5000);
     assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
         "State should return to IDLE after successful rebuild");
-    
-    // Verify interactions
-    verify(mockNamespaceSummaryManager, times(1)).clearNSSummaryTable();
   }
 
   /**
-   * Test rebuild failure sets state to FAILED.
+   * Test rebuild failure sets proper state.
    */
   @Test
-  void testRebuildFailure() throws IOException {
-    // Setup failure scenario
-    doThrow(new IOException("Test 
failure")).when(mockNamespaceSummaryManager).clearNSSummaryTable();
+  void testRebuildFailure() throws Exception {
+    CountDownLatch failureLatch = new CountDownLatch(1);
 
-    // Execute rebuild
-    TaskResult result = nsSummaryTask.reprocess(mockOMMetadataManager);
+    doAnswer(invocation -> {
+      failureLatch.countDown();
+      throw new IOException("Test failure");
+    }).when(mockNamespaceSummaryManager).clearNSSummaryTable();
+
+    ReconTaskController.ReInitializationResult result =
+        taskController.queueReInitializationEvent(
+            
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
+
+    assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result,
+        "Rebuild should be queued successfully");
+
+    assertTrue(failureLatch.await(10, TimeUnit.SECONDS),
+        "Rebuild should be attempted");
 
-    // Verify results
-    assertFalse(result.isTaskSuccess(), "Rebuild should fail");
+    // Wait for 5 secs time for state update
+    GenericTestUtils.waitFor(() -> NSSummaryTask.getRebuildState() == 
RebuildState.FAILED,
+        100, 5000);
     assertEquals(RebuildState.FAILED, NSSummaryTask.getRebuildState(),
         "State should be FAILED after rebuild failure");
   }
 
   /**
-   * Test concurrent rebuild attempts - second call should be rejected.
+   * Test rebuild can be triggered again after failure.
    */
   @Test
-  void testConcurrentRebuildPrevention() throws Exception {
-    CountDownLatch startLatch = new CountDownLatch(1);
-    CountDownLatch finishLatch = new CountDownLatch(1);
-    AtomicBoolean firstRebuildStarted = new AtomicBoolean(false);
-    AtomicBoolean secondRebuildRejected = new AtomicBoolean(false);
+  void testRebuildAfterFailure() throws Exception {
+    CountDownLatch firstAttempt = new CountDownLatch(1);
+    CountDownLatch secondAttempt = new CountDownLatch(1);
+    AtomicInteger attemptCount = new AtomicInteger(0);
 
-    // Setup first rebuild to block until we signal
+    // Setup mock to fail first time, succeed second time
     doAnswer(invocation -> {
-      firstRebuildStarted.set(true);
-      startLatch.countDown();
-      // Wait for test to signal completion
-      boolean awaitSuccess = finishLatch.await(10, TimeUnit.SECONDS);
-      if (!awaitSuccess) {
-        LOG.warn("finishLatch.await() timed out");
+      int attempt = attemptCount.incrementAndGet();
+      LOG.info("clearNSSummaryTable attempt #{}", attempt);
+      if (attempt == 1) {
+        firstAttempt.countDown();
+        throw new IOException("First failure");
+      } else {
+        secondAttempt.countDown();
+        return null;
       }
-      return null;
     }).when(mockNamespaceSummaryManager).clearNSSummaryTable();
 
-    ExecutorService executor = Executors.newFixedThreadPool(2);
-
-    try {
-      // Start first rebuild asynchronously
-      CompletableFuture<TaskResult> firstRebuild = 
CompletableFuture.supplyAsync(() -> {
-        LOG.info("Starting first rebuild");
-        return nsSummaryTask.reprocess(mockOMMetadataManager);
-      }, executor);
-
-      // Wait for first rebuild to start
-      assertTrue(startLatch.await(5, TimeUnit.SECONDS), 
-          "First rebuild should start within timeout");
-      assertTrue(firstRebuildStarted.get(), "First rebuild should have 
started");
-      assertEquals(RebuildState.RUNNING, NSSummaryTask.getRebuildState(),
-          "State should be RUNNING during first rebuild");
-
-      // Attempt second rebuild - should be rejected immediately
-      CompletableFuture<TaskResult> secondRebuild = 
CompletableFuture.supplyAsync(() -> {
-        LOG.info("Attempting second rebuild");
-        TaskResult result = nsSummaryTask.reprocess(mockOMMetadataManager);
-        secondRebuildRejected.set(!result.isTaskSuccess());
-        return result;
-      }, executor);
-
-      // Get second rebuild result quickly (should be immediate rejection)
-      TaskResult secondResult = secondRebuild.get(2, TimeUnit.SECONDS);
-      assertFalse(secondResult.isTaskSuccess(), 
-          "Second rebuild should be rejected");
-      assertTrue(secondRebuildRejected.get(), "Second rebuild should have been 
rejected");
-
-      // Signal first rebuild to complete
-      finishLatch.countDown();
-      TaskResult firstResult = firstRebuild.get(5, TimeUnit.SECONDS);
-      assertTrue(firstResult.isTaskSuccess(), "First rebuild should succeed");
-
-      // Verify final state
-      assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
-          "State should return to IDLE after first rebuild completes");
-
-    } finally {
-      finishLatch.countDown(); // Ensure cleanup
-      executor.shutdown();
-      executor.awaitTermination(5, TimeUnit.SECONDS);
-    }
-  }
-
-  /**
-   * Test that rebuild can be triggered again after failure.
-   */
-  @Test
-  void testRebuildAfterFailure() throws Exception {
     // First rebuild fails
-    doThrow(new IOException("Test 
failure")).when(mockNamespaceSummaryManager).clearNSSummaryTable();
-    
-    TaskResult failedResult = nsSummaryTask.reprocess(mockOMMetadataManager);
-    assertFalse(failedResult.isTaskSuccess(), "First rebuild should fail");
+    ReconTaskController.ReInitializationResult result1 =
+        taskController.queueReInitializationEvent(
+            
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
+    assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result1,
+        "First event should be queued successfully");
+
+    assertTrue(firstAttempt.await(10, TimeUnit.SECONDS), "First rebuild should 
be attempted");
+    GenericTestUtils.waitFor(() -> NSSummaryTask.getRebuildState() == 
RebuildState.FAILED,
+        100, 5000);
     assertEquals(RebuildState.FAILED, NSSummaryTask.getRebuildState(),
         "State should be FAILED after first rebuild");
 
-    // Second rebuild succeeds
-    // Setup successful rebuild by default - no exception thrown
-    doNothing().when(mockNamespaceSummaryManager).clearNSSummaryTable();
-    
-    TaskResult successResult = nsSummaryTask.reprocess(mockOMMetadataManager);
-    assertTrue(successResult.isTaskSuccess(), "Second rebuild should succeed");
+    // Second rebuild succeeds - must wait for time-based retry delay to expire
+    // This is a deliberate time-based test of the retry mechanism
+    int retryDelayMs = 2100; // RETRY_DELAY_MS (2000ms) + buffer
+    long deadline = System.currentTimeMillis() + retryDelayMs;
+    GenericTestUtils.waitFor(() -> System.currentTimeMillis() >= deadline,
+        100, retryDelayMs + 1000);
+
+    ReconTaskController.ReInitializationResult result2 =
+        taskController.queueReInitializationEvent(
+            
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
+    assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result2,
+        "Second event should be queued successfully after retry delay");
+
+    assertTrue(secondAttempt.await(10, TimeUnit.SECONDS), "Second rebuild 
should be attempted");
+    GenericTestUtils.waitFor(() -> NSSummaryTask.getRebuildState() == 
RebuildState.IDLE,
+        100, 5000);
     assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
         "State should be IDLE after successful rebuild");
   }
 
   /**
-   * Test multiple concurrent attempts - only one should succeed, others 
rejected.
+   * Test multiple concurrent queueReInitializationEvent() calls.
+   * <p>
+   * This is the KEY test for production behavior - multiple threads
+   * simultaneously calling queueReInitializationEvent(), which is what
+   * actually happens in production (not direct reprocess() calls).
+   *
+   * <p>Important: The queue-based architecture provides SEQUENTIAL processing,
+   * not event deduplication. Multiple successfully queued events will execute
+   * sequentially (not concurrently). The AtomicReference in NSSummaryTask
+   * prevents concurrent execution within a single reprocess() call.
    */
   @Test
-  @Flaky("HDDS-13573")
   void testMultipleConcurrentAttempts() throws Exception {
-    int threadCount = 5;
-    CountDownLatch startLatch = new CountDownLatch(1);
-    CountDownLatch finishLatch = new CountDownLatch(1);
-    AtomicInteger successCount = new AtomicInteger(0);
-    AtomicInteger rejectedCount = new AtomicInteger(0);
-    AtomicInteger clearTableCallCount = new AtomicInteger(0);
-
-    // Ensure clean initial state
+    ConcurrentTestContext ctx = new ConcurrentTestContext(5);
+
     NSSummaryTask.resetRebuildState();
-    assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(), 
-        "Initial state must be IDLE");
+    assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(), "Initial 
state must be IDLE");
+
+    setupConcurrentExecutionTracking(ctx);
+
+    ExecutorService executor = 
Executors.newFixedThreadPool(ctx.getThreadCount());
+    List<CompletableFuture<ReconTaskController.ReInitializationResult>> 
futures =
+        launchConcurrentQueueRequests(ctx, executor);
+
+    try {
+      coordinateConcurrentExecution(ctx, futures);
+      verifyConcurrentExecutionResults(ctx, futures);
+    } finally {
+      ctx.getFirstRebuildCanComplete().countDown();
+      executor.shutdown();
+      if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+        executor.shutdownNow();
+      }
+    }
+  }
 
-    // Setup rebuild to block and count calls
+  private void setupConcurrentExecutionTracking(ConcurrentTestContext ctx) 
throws IOException {
     doAnswer(invocation -> {
-      int callNum = clearTableCallCount.incrementAndGet();
-      LOG.info("clearNSSummaryTable called #{}, current state: {}", callNum, 
NSSummaryTask.getRebuildState());
-      
-      if (callNum == 1) {
-        startLatch.countDown();
-        boolean awaitSuccess = finishLatch.await(10, TimeUnit.SECONDS);
-        if (!awaitSuccess) {
-          LOG.warn("finishLatch.await() timed out");
+      int callNum = ctx.getClearTableCallCount().incrementAndGet();
+      int currentConcurrent = ctx.getConcurrentExecutions().incrementAndGet();
+
+      ctx.getMaxConcurrentExecutions().updateAndGet(max -> Math.max(max, 
currentConcurrent));
+      LOG.info("clearNSSummaryTable call #{}, concurrent executions: {}, 
state: {}",
+          callNum, currentConcurrent, NSSummaryTask.getRebuildState());
+
+      try {
+        if (callNum == 1) {
+          ctx.getFirstRebuildStarted().countDown();
+          boolean awaitSuccess = ctx.getFirstRebuildCanComplete().await(15, 
TimeUnit.SECONDS);
+          if (!awaitSuccess) {
+            LOG.error("firstRebuildCanComplete.await() timed out");
+          }
+        } else {
+          Thread.sleep(100); // Simulate operation time (acceptable use in 
mock)
         }
+      } finally {
+        ctx.getConcurrentExecutions().decrementAndGet();
       }
       return null;
     }).when(mockNamespaceSummaryManager).clearNSSummaryTable();
+  }
+
+  private List<CompletableFuture<ReconTaskController.ReInitializationResult>>
+      launchConcurrentQueueRequests(ConcurrentTestContext ctx, ExecutorService 
executor) {
+    List<CompletableFuture<ReconTaskController.ReInitializationResult>> 
futures = new ArrayList<>();
 
-    ExecutorService executor = Executors.newFixedThreadPool(threadCount);
-    CompletableFuture<Void>[] futures = new CompletableFuture[threadCount];
+    for (int i = 0; i < ctx.threadCount; i++) {
+      final int threadId = i;
+      futures.add(CompletableFuture.supplyAsync(() ->
+          executeQueueRequest(ctx, threadId), executor));
+    }
+    return futures;
+  }
 
+  private ReconTaskController.ReInitializationResult executeQueueRequest(
+      ConcurrentTestContext ctx, int threadId) {
     try {
-      // Launch multiple concurrent rebuilds
-      for (int i = 0; i < threadCount; i++) {
-        final int threadId = i;
-        futures[i] = CompletableFuture.runAsync(() -> {
-          LOG.info("Thread {} attempting rebuild, current state: {}", 
threadId, NSSummaryTask.getRebuildState());
-          TaskResult result = nsSummaryTask.reprocess(mockOMMetadataManager);
-          if (result.isTaskSuccess()) {
-            int count = successCount.incrementAndGet();
-            LOG.info("Thread {} rebuild succeeded (success #{})", threadId, 
count);
-          } else {
-            int count = rejectedCount.incrementAndGet();
-            LOG.info("Thread {} rebuild rejected (rejection #{})", threadId, 
count);
-          }
-        }, executor);
+      ctx.getAllThreadsReady().countDown();
+      LOG.info("Thread {} ready, waiting for all threads", threadId);
+
+      if (!ctx.getAllThreadsReady().await(10, TimeUnit.SECONDS)) {
+        throw new RuntimeException("Not all threads ready in time");
       }
 
-      // Wait for first rebuild to start
-      assertTrue(startLatch.await(5, TimeUnit.SECONDS), 
-          "At least one rebuild should start");
-      assertEquals(RebuildState.RUNNING, NSSummaryTask.getRebuildState(),
-          "State should be RUNNING");
-
-      // Let rebuilds complete
-      finishLatch.countDown();
-      CompletableFuture.allOf(futures).get(10, TimeUnit.SECONDS);
-
-      // Debug output
-      LOG.info("Final counts - Success: {}, Rejected: {}, ClearTable calls: 
{}, Final state: {}", 
-          successCount.get(), rejectedCount.get(), clearTableCallCount.get(), 
NSSummaryTask.getRebuildState());
-
-      // Verify results - only one thread should have successfully executed 
the rebuild
-      assertEquals(1, clearTableCallCount.get(), 
-          "clearNSSummaryTable should only be called once due to unified 
control");
-      assertEquals(1, successCount.get(), 
-          "Exactly one rebuild should succeed");
-      assertEquals(threadCount - 1, rejectedCount.get(), 
-          "All other rebuilds should be rejected");
-      assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
-          "Final state should be IDLE");
+      Thread.sleep(threadId * 10L); // Staggered delay for race conditions
 
-    } finally {
-      finishLatch.countDown(); // Ensure cleanup
-      executor.shutdown();
-      executor.awaitTermination(5, TimeUnit.SECONDS);
+      LOG.info("Thread {} calling queueReInitializationEvent()", threadId);
+      ctx.getTotalQueueAttempts().incrementAndGet();
+
+      ReconTaskController.ReInitializationResult result =
+          taskController.queueReInitializationEvent(
+              
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
+
+      if (result == ReconTaskController.ReInitializationResult.SUCCESS) {
+        ctx.getSuccessfulQueueCount().incrementAndGet();
+      }
+
+      LOG.info("Thread {} completed with result={}", threadId, result);
+      return result;
+    } catch (InterruptedException e) {
+      LOG.error("Thread {} interrupted", threadId, e);
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void coordinateConcurrentExecution(ConcurrentTestContext ctx,
+                                             
List<CompletableFuture<ReconTaskController.ReInitializationResult>>
+                                                 futures)
+      throws Exception {
+    assertTrue(ctx.getFirstRebuildStarted().await(15, TimeUnit.SECONDS), 
"First rebuild should start");
+    LOG.info("First rebuild started, state: {}", 
NSSummaryTask.getRebuildState());
+
+    GenericTestUtils.waitFor(() -> ctx.getTotalQueueAttempts().get() >= 
ctx.getThreadCount() / 2,
+        100, 5000);
+
+    ctx.getFirstRebuildCanComplete().countDown();
+
+    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(20, 
TimeUnit.SECONDS);
+
+    // Wait for all queued events to be processed and state to return to IDLE
+    // Use longer timeout and smaller check interval for more robust waiting
+    GenericTestUtils.waitFor(() -> {
+      RebuildState state = NSSummaryTask.getRebuildState();
+      LOG.info("Current state: {}, clearTableCallCount: {}", state, 
ctx.getClearTableCallCount().get());
+      return state == RebuildState.IDLE;
+    }, 100, 20000);
+  }
+
+  private void verifyConcurrentExecutionResults(ConcurrentTestContext ctx,
+                                                
List<CompletableFuture<ReconTaskController.ReInitializationResult>>
+                                                    futures)
+      throws Exception {
+    ResultCounts counts = collectResultCounts(futures);
+
+    LOG.info("Test completed - Total queue attempts: {}, Successful queues: 
{}, " +
+            "Result breakdown: SUCCESS={}, RETRY_LATER={}, MAX_RETRIES={}, " +
+            "ClearTable calls: {}, Max concurrent: {}, Final state: {}",
+        ctx.getTotalQueueAttempts().get(), ctx.getSuccessfulQueueCount().get(),
+        counts.getSuccessCount(), counts.getRetryLaterCount(), 
counts.getMaxRetriesCount(),
+        ctx.getClearTableCallCount().get(), 
ctx.getMaxConcurrentExecutions().get(),
+        NSSummaryTask.getRebuildState());
+
+    assertEquals(1, ctx.getMaxConcurrentExecutions().get(),
+        "Should never have concurrent executions - queue provides 
serialization");
+    assertEquals(ctx.getThreadCount(), ctx.getTotalQueueAttempts().get(),
+        "All threads should have attempted to queue events");
+    assertTrue(ctx.getSuccessfulQueueCount().get() >= 1,
+        "At least one thread should have successfully queued rebuild");
+    assertThat(ctx.getClearTableCallCount().get())
+        .as("At least one rebuild should execute and not exceed successfully 
queued events")
+        .isPositive()
+        .isLessThanOrEqualTo(ctx.getSuccessfulQueueCount().get());
+
+    // Final verification that state is IDLE - wait one more time to ensure 
stability
+    GenericTestUtils.waitFor(() -> NSSummaryTask.getRebuildState() == 
RebuildState.IDLE,
+        100, 5000);
+    assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
+        "Final state should be IDLE after all rebuilds complete");
+
+    LOG.info("VERIFIED: Queue architecture prevents concurrent executions. " +
+        "Multiple events can be queued but execute sequentially.");
+  }
+
+  private ResultCounts collectResultCounts(
+      List<CompletableFuture<ReconTaskController.ReInitializationResult>> 
futures)
+      throws Exception {
+    ResultCounts counts = new ResultCounts();
+    for (CompletableFuture<ReconTaskController.ReInitializationResult> future 
: futures) {
+      ReconTaskController.ReInitializationResult result = future.get();
+      switch (result) {
+      case SUCCESS:
+        counts.successCount++;
+        break;
+      case RETRY_LATER:
+        counts.retryLaterCount++;
+        break;
+      case MAX_RETRIES_EXCEEDED:
+        counts.maxRetriesCount++;
+        break;
+      default:
+        LOG.warn("Unexpected result: {}", result);
+      }
+    }
+    return counts;
+  }
+
+  /**
+   * Context for concurrent test execution tracking.
+   */
+  private static class ConcurrentTestContext {
+    private final int threadCount;
+    private final CountDownLatch allThreadsReady;
+    private final CountDownLatch firstRebuildStarted;
+    private final CountDownLatch firstRebuildCanComplete;
+    private final AtomicInteger clearTableCallCount;
+    private final AtomicInteger concurrentExecutions;
+    private final AtomicInteger maxConcurrentExecutions;
+    private final AtomicInteger successfulQueueCount;
+    private final AtomicInteger totalQueueAttempts;
+
+    ConcurrentTestContext(int threadCount) {
+      this.threadCount = threadCount;
+      this.allThreadsReady = new CountDownLatch(threadCount);
+      this.firstRebuildStarted = new CountDownLatch(1);
+      this.firstRebuildCanComplete = new CountDownLatch(1);
+      this.clearTableCallCount = new AtomicInteger(0);
+      this.concurrentExecutions = new AtomicInteger(0);
+      this.maxConcurrentExecutions = new AtomicInteger(0);
+      this.successfulQueueCount = new AtomicInteger(0);
+      this.totalQueueAttempts = new AtomicInteger(0);
+    }
+
+    public int getThreadCount() {
+      return threadCount;
+    }
+
+    public CountDownLatch getAllThreadsReady() {
+      return allThreadsReady;
+    }
+
+    public CountDownLatch getFirstRebuildStarted() {
+      return firstRebuildStarted;
+    }
+
+    public CountDownLatch getFirstRebuildCanComplete() {
+      return firstRebuildCanComplete;
+    }
+
+    public AtomicInteger getClearTableCallCount() {
+      return clearTableCallCount;
+    }
+
+    public AtomicInteger getConcurrentExecutions() {
+      return concurrentExecutions;
+    }
+
+    public AtomicInteger getMaxConcurrentExecutions() {
+      return maxConcurrentExecutions;
+    }
+
+    public AtomicInteger getSuccessfulQueueCount() {
+      return successfulQueueCount;
+    }
+
+    public AtomicInteger getTotalQueueAttempts() {
+      return totalQueueAttempts;
+    }
+  }
+
+  /**
+   * Result counts from concurrent execution.
+   */
+  private static class ResultCounts {
+    private long successCount = 0;
+    private long retryLaterCount = 0;
+    private long maxRetriesCount = 0;
+
+    public long getSuccessCount() {
+      return successCount;
+    }
+
+    public long getRetryLaterCount() {
+      return retryLaterCount;
+    }
+
+    public long getMaxRetriesCount() {
+      return maxRetriesCount;
     }
   }
 
@@ -400,38 +656,32 @@ void testMultipleConcurrentAttempts() throws Exception {
    */
   @Test
   void testReconUtilsIntegration() throws Exception {
-    // Test initial state access via ReconUtils
     assertEquals(RebuildState.IDLE, ReconUtils.getNSSummaryRebuildState(),
         "Initial state should be IDLE via ReconUtils");
 
-    // Start a rebuild to test RUNNING state
     CountDownLatch rebuildStarted = new CountDownLatch(1);
     CountDownLatch rebuildCanFinish = new CountDownLatch(1);
 
-    // Setup rebuild to block so we can test state
     doAnswer(invocation -> {
       rebuildStarted.countDown();
-      boolean awaitSuccess = rebuildCanFinish.await(5, TimeUnit.SECONDS);
+      boolean awaitSuccess = rebuildCanFinish.await(10, TimeUnit.SECONDS);
       if (!awaitSuccess) {
         LOG.warn("rebuildCanFinish.await() timed out");
       }
       return null;
     }).when(mockNamespaceSummaryManager).clearNSSummaryTable();
 
-    // Start rebuild in background to test state transitions
-    CompletableFuture<TaskResult> rebuild = CompletableFuture.supplyAsync(() 
-> 
-        nsSummaryTask.reprocess(mockOMMetadataManager));
+    taskController.queueReInitializationEvent(
+        ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
 
-    // Wait for rebuild to start and verify RUNNING state
-    assertTrue(rebuildStarted.await(5, TimeUnit.SECONDS), 
-        "Rebuild should start within timeout");
+    assertTrue(rebuildStarted.await(10, TimeUnit.SECONDS),
+        "Rebuild should start");
     assertEquals(RebuildState.RUNNING, ReconUtils.getNSSummaryRebuildState(),
         "State should be RUNNING during rebuild");
 
-    // Complete rebuild and verify IDLE state
     rebuildCanFinish.countDown();
-    TaskResult result = rebuild.get(5, TimeUnit.SECONDS);
-    assertTrue(result.isTaskSuccess(), "Rebuild should succeed");
+    GenericTestUtils.waitFor(() -> ReconUtils.getNSSummaryRebuildState() == 
RebuildState.IDLE,
+        100, 5000);
     assertEquals(RebuildState.IDLE, ReconUtils.getNSSummaryRebuildState(),
         "State should return to IDLE after completion");
   }
@@ -441,65 +691,114 @@ void testReconUtilsIntegration() throws Exception {
    */
   @Test
   void testStateTransitionsDuringExceptions() throws Exception {
-    // Test exception during clearNSSummaryTable
-    doThrow(new RuntimeException("Unexpected error"))
-        .when(mockNamespaceSummaryManager).clearNSSummaryTable();
+    CountDownLatch exceptionLatch = new CountDownLatch(1);
+    CountDownLatch recoveryLatch = new CountDownLatch(1);
+    AtomicInteger callCount = new AtomicInteger(0);
 
-    TaskResult result = nsSummaryTask.reprocess(mockOMMetadataManager);
-    
-    assertFalse(result.isTaskSuccess(), "Rebuild should fail on exception");
+    // Setup mock to throw exception first time, succeed second time
+    doAnswer(invocation -> {
+      int call = callCount.incrementAndGet();
+      LOG.info("clearNSSummaryTable call #{}", call);
+      if (call == 1) {
+        exceptionLatch.countDown();
+        throw new RuntimeException("Unexpected error");
+      } else {
+        recoveryLatch.countDown();
+        return null;
+      }
+    }).when(mockNamespaceSummaryManager).clearNSSummaryTable();
+
+    // First rebuild throws exception
+    ReconTaskController.ReInitializationResult result1 =
+        taskController.queueReInitializationEvent(
+            
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
+    assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result1,
+        "First event should be queued successfully");
+
+    assertTrue(exceptionLatch.await(10, TimeUnit.SECONDS),
+        "Exception should occur");
+    GenericTestUtils.waitFor(() -> NSSummaryTask.getRebuildState() == 
RebuildState.FAILED,
+        100, 5000);
     assertEquals(RebuildState.FAILED, NSSummaryTask.getRebuildState(),
         "State should be FAILED after exception");
 
-    // Verify we can recover from FAILED state
-    // Setup successful rebuild by default - no exception thrown
-    doNothing().when(mockNamespaceSummaryManager).clearNSSummaryTable();
-    
-    TaskResult recoveryResult = nsSummaryTask.reprocess(mockOMMetadataManager);
-    assertTrue(recoveryResult.isTaskSuccess(), "Recovery rebuild should 
succeed");
+    // Second rebuild succeeds (recovery) - must wait for time-based retry 
delay to expire
+    // This is a deliberate time-based test of the retry mechanism
+    int retryDelayMs = 2100; // RETRY_DELAY_MS (2000ms) + buffer
+    long deadline = System.currentTimeMillis() + retryDelayMs;
+    GenericTestUtils.waitFor(() -> System.currentTimeMillis() >= deadline,
+        100, retryDelayMs + 1000);
+
+    ReconTaskController.ReInitializationResult result2 =
+        taskController.queueReInitializationEvent(
+            
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
+    assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result2,
+        "Second event should be queued successfully after retry delay");
+
+    assertTrue(recoveryLatch.await(10, TimeUnit.SECONDS), "Recovery should 
execute");
+    GenericTestUtils.waitFor(() -> NSSummaryTask.getRebuildState() == 
RebuildState.IDLE,
+        100, 5000);
     assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
         "State should be IDLE after recovery");
   }
 
   /**
-   * Test that interrupted threads are handled properly.
+   * Test checkpoint creation failure and retry mechanism.
    */
   @Test
-  void testInterruptedThreadHandling() throws Exception {
-    CountDownLatch rebuildStarted = new CountDownLatch(1);
-    AtomicBoolean wasInterrupted = new AtomicBoolean(false);
+  void testCheckpointCreationFailureRetry() throws Exception {
+    ReconTaskControllerImpl controllerSpy = spy(taskController);
+    doThrow(new IOException("Checkpoint creation failed"))
+        .when(controllerSpy).createOMCheckpoint(any());
+
+    // First few attempts should return RETRY_LATER
+    ReconTaskController.ReInitializationResult result1 =
+        controllerSpy.queueReInitializationEvent(
+            
ReconTaskReInitializationEvent.ReInitializationReason.BUFFER_OVERFLOW);
+    assertEquals(ReconTaskController.ReInitializationResult.RETRY_LATER, 
result1,
+        "First attempt should return RETRY_LATER due to checkpoint failure");
+
+    // After delay, try again - must wait for time-based retry delay to expire
+    int retryDelayMs = 2100; // RETRY_DELAY_MS (2000ms) + buffer
+    long deadline = System.currentTimeMillis() + retryDelayMs;
+    GenericTestUtils.waitFor(() -> System.currentTimeMillis() >= deadline,
+        100, retryDelayMs + 1000);
+    ReconTaskController.ReInitializationResult result2 =
+        controllerSpy.queueReInitializationEvent(
+            
ReconTaskReInitializationEvent.ReInitializationReason.BUFFER_OVERFLOW);
+    assertEquals(ReconTaskController.ReInitializationResult.RETRY_LATER, 
result2,
+        "Second attempt should also return RETRY_LATER");
+
+    verify(controllerSpy, times(2)).createOMCheckpoint(any());
+  }
 
-    // Setup rebuild to detect interruption
+  /**
+   * Test event buffer integration with concurrent queueing.
+   */
+  @Test
+  void testEventBufferWithConcurrentQueueing() throws Exception {
+    int initialBufferSize = taskController.getEventBufferSize();
+    LOG.info("Initial buffer size: {}", initialBufferSize);
+
+    CountDownLatch queuedLatch = new CountDownLatch(1);
     doAnswer(invocation -> {
-      rebuildStarted.countDown();
-      try {
-        Thread.sleep(5000); // Long sleep to ensure interruption
-      } catch (InterruptedException e) {
-        wasInterrupted.set(true);
-        throw new RuntimeException("Interrupted", e);
-      }
+      queuedLatch.countDown();
+      Thread.sleep(100);
       return null;
     }).when(mockNamespaceSummaryManager).clearNSSummaryTable();
 
-    // Start rebuild in separate thread
-    Thread rebuildThread = new Thread(() -> {
-      nsSummaryTask.reprocess(mockOMMetadataManager);
-    });
-    rebuildThread.start();
+    // Queue an event
+    ReconTaskController.ReInitializationResult result =
+        taskController.queueReInitializationEvent(
+            
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
 
-    // Wait for rebuild to start
-    assertTrue(rebuildStarted.await(5, TimeUnit.SECONDS),
-        "Rebuild should start");
-    assertEquals(RebuildState.RUNNING, NSSummaryTask.getRebuildState(),
-        "State should be RUNNING");
+    assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result,
+        "Event should be successfully queued");
 
-    // Interrupt the thread
-    rebuildThread.interrupt();
-    rebuildThread.join(5000);
+    // Event should be in buffer or being processed
+    assertTrue(queuedLatch.await(10, TimeUnit.SECONDS),
+        "Event should be processed");
 
-    // Verify interruption was handled
-    assertTrue(wasInterrupted.get(), "Thread should have been interrupted");
-    assertEquals(RebuildState.FAILED, NSSummaryTask.getRebuildState(),
-        "State should be FAILED after interruption");
+    LOG.info("Event buffer integration test completed successfully");
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to