This is an automated email from the ASF dual-hosted git repository.
arafat2198 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a55efa17c9b HDDS-13573. Intermittent failure in
TestNSSummaryUnifiedControl.testMultipleConcurrentAttempts (#9416).
a55efa17c9b is described below
commit a55efa17c9b622d98d98b35783f45df73182c3d4
Author: Devesh Kumar Singh <[email protected]>
AuthorDate: Wed Dec 10 12:30:03 2025 +0530
HDDS-13573. Intermittent failure in
TestNSSummaryUnifiedControl.testMultipleConcurrentAttempts (#9416).
---
.../recon/tasks/TestNSSummaryUnifiedControl.java | 805 ++++++++++++++-------
1 file changed, 552 insertions(+), 253 deletions(-)
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryUnifiedControl.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryUnifiedControl.java
index e87852ebc0f..415c3952d33 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryUnifiedControl.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryUnifiedControl.java
@@ -17,20 +17,28 @@
package org.apache.hadoop.ozone.recon.tasks;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
@@ -44,13 +52,21 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconFileMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconGlobalStatsManager;
import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
+import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider;
import org.apache.hadoop.ozone.recon.tasks.NSSummaryTask.RebuildState;
-import org.apache.hadoop.ozone.recon.tasks.ReconOmTask.TaskResult;
-import org.apache.ozone.test.tag.Flaky;
+import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdater;
+import
org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager;
+import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -58,67 +74,133 @@
import org.slf4j.LoggerFactory;
/**
- * Integration tests for HDDS-13443: Unified and controlled sync access to
- * retrigger of build of NSSummary tree.
- *
- * <p>These tests verify that the unified control mechanism prevents concurrent
- * rebuilds and properly manages state transitions across all entry points.
+ * Unified and controlled sync access to
+ * retrigger of build of NSSummary tree using queue-based architecture.
+ *
+ * <p>These tests verify that the queue-based unified control mechanism
+ * correctly handles concurrent queueReInitializationEvent() calls in
production.
+ *
+ * <p>Execution Flow:
+ * <pre>
+ * Multiple Concurrent Callers
+ * ↓ ↓ ↓
+ * queueReInitializationEvent() [Thread-safe public API]
+ * ↓
+ * BlockingQueue<ReconEvent> [Serialization layer]
+ * ↓
+ * Single Async Thread [Sequential processing]
+ * ↓
+ * processReInitializationEvent()
+ * ↓
+ * reInitializeTasks()
+ * ↓
+ * task.reprocess() [Only ONE execution at a time]
+ * </pre>
*/
public class TestNSSummaryUnifiedControl {
private static final Logger LOG =
LoggerFactory.getLogger(TestNSSummaryUnifiedControl.class);
- private NSSummaryTask nsSummaryTask;
+ private ReconTaskControllerImpl taskController;
private ReconNamespaceSummaryManager mockNamespaceSummaryManager;
private ReconOMMetadataManager mockReconOMMetadataManager;
- private OMMetadataManager mockOMMetadataManager;
private OzoneConfiguration ozoneConfiguration;
@BeforeEach
- void setUp() throws IOException {
+ void setUp() throws Exception {
// Reset static state before each test
NSSummaryTask.resetRebuildState();
-
+
// Create mocks
mockNamespaceSummaryManager = mock(ReconNamespaceSummaryManager.class);
mockReconOMMetadataManager = mock(ReconOMMetadataManager.class);
- mockOMMetadataManager = mock(OMMetadataManager.class);
ozoneConfiguration = new OzoneConfiguration();
- // Create NSSummaryTask instance that will use mocked sub-tasks
- nsSummaryTask = createTestableNSSummaryTask();
+ // Configure small buffer for easier testing
+
ozoneConfiguration.setInt(ReconServerConfigKeys.OZONE_RECON_OM_EVENT_BUFFER_CAPACITY,
100);
+
+ // Setup task controller
+ ReconTaskStatusUpdaterManager mockTaskStatusUpdaterManager =
mock(ReconTaskStatusUpdaterManager.class);
+ ReconTaskStatusUpdater mockTaskStatusUpdater =
mock(ReconTaskStatusUpdater.class);
+
when(mockTaskStatusUpdaterManager.getTaskStatusUpdater(any())).thenReturn(mockTaskStatusUpdater);
+
+ ReconDBProvider reconDbProvider = mock(ReconDBProvider.class);
+ when(reconDbProvider.getDbStore()).thenReturn(mock(DBStore.class));
+
when(reconDbProvider.getStagedReconDBProvider()).thenReturn(reconDbProvider);
+
+ ReconContainerMetadataManager reconContainerMgr =
mock(ReconContainerMetadataManager.class);
+ ReconGlobalStatsManager reconGlobalStatsManager =
mock(ReconGlobalStatsManager.class);
+ ReconFileMetadataManager reconFileMetadataManager =
mock(ReconFileMetadataManager.class);
+
+ taskController = new ReconTaskControllerImpl(ozoneConfiguration, new
HashSet<>(),
+ mockTaskStatusUpdaterManager, reconDbProvider, reconContainerMgr,
mockNamespaceSummaryManager,
+ reconGlobalStatsManager, reconFileMetadataManager);
+
+ // Register testable NSSummaryTask instance
+ taskController.registerTask(createTestableNSSummaryTask());
+
+ // Setup mock OM metadata manager with checkpoint support
+ setupMockOMMetadataManager();
+ taskController.updateOMMetadataManager(mockReconOMMetadataManager);
+
+ // Setup successful rebuild by default
+ doNothing().when(mockNamespaceSummaryManager).clearNSSummaryTable();
+
+ // Start async processing
+ taskController.start();
}
@AfterEach
void tearDown() {
- // Reset static state after each test to ensure test isolation
+ // Reset static state after each test
NSSummaryTask.resetRebuildState();
+
+ // Shutdown task controller
+ if (taskController != null) {
+ taskController.stop();
+ }
}
-
- /**
- * Create a testable NSSummaryTask that uses mocked sub-tasks for successful
execution.
- */
+
+ private void setupMockOMMetadataManager() throws IOException {
+ DBStore mockDBStore = mock(DBStore.class);
+ File mockDbLocation = mock(File.class);
+ DBCheckpoint mockCheckpoint = mock(DBCheckpoint.class);
+ Path mockCheckpointPath = Paths.get("/tmp/test/checkpoint");
+
+ when(mockReconOMMetadataManager.getStore()).thenReturn(mockDBStore);
+ when(mockDBStore.getDbLocation()).thenReturn(mockDbLocation);
+ when(mockDbLocation.getParent()).thenReturn("/tmp/test");
+ when(mockDBStore.getCheckpoint(anyString(),
any(Boolean.class))).thenReturn(mockCheckpoint);
+
when(mockCheckpoint.getCheckpointLocation()).thenReturn(mockCheckpointPath);
+
+ ReconOMMetadataManager mockCheckpointedManager =
mock(ReconOMMetadataManager.class);
+ when(mockCheckpointedManager.getStore()).thenReturn(mockDBStore);
+
when(mockReconOMMetadataManager.createCheckpointReconMetadataManager(any(),
any()))
+ .thenReturn(mockCheckpointedManager);
+ }
+
private NSSummaryTask createTestableNSSummaryTask() {
return new NSSummaryTask(
- mockNamespaceSummaryManager,
- mockReconOMMetadataManager,
+ mockNamespaceSummaryManager,
+ mockReconOMMetadataManager,
ozoneConfiguration) {
-
+
@Override
public TaskResult buildTaskResult(boolean success) {
return super.buildTaskResult(success);
}
-
- @Override
+
+ @Override
+ public NSSummaryTask getStagedTask(ReconOMMetadataManager
stagedOmMetadataManager, DBStore stagedReconDbStore)
+ throws IOException {
+ return this;
+ }
+
+ @Override
protected TaskResult executeReprocess(OMMetadataManager
omMetadataManager, long startTime) {
- // Simplified test implementation that mimics the real execution flow
- // but bypasses the complex sub-task execution while maintaining
proper state management
-
- // Initialize a list of tasks to run in parallel (empty for testing)
Collection<Callable<Boolean>> tasks = new ArrayList<>();
try {
- // This will call the mocked clearNSSummaryTable (might throw
Exception for failure tests)
getReconNamespaceSummaryManager().clearNSSummaryTable();
} catch (IOException ioEx) {
LOG.error("Unable to clear NSSummary table in Recon DB. ", ioEx);
@@ -126,10 +208,9 @@ protected TaskResult executeReprocess(OMMetadataManager
omMetadataManager, long
return buildTaskResult(false);
}
- // Add mock sub-tasks that always succeed
- tasks.add(() -> true); // Mock FSO task
- tasks.add(() -> true); // Mock Legacy task
- tasks.add(() -> true); // Mock OBS task
+ tasks.add(() -> true);
+ tasks.add(() -> true);
+ tasks.add(() -> true);
List<Future<Boolean>> results;
ThreadFactory threadFactory = new ThreadFactoryBuilder()
@@ -137,7 +218,7 @@ protected TaskResult executeReprocess(OMMetadataManager
omMetadataManager, long
.build();
ExecutorService executorService = Executors.newFixedThreadPool(2,
threadFactory);
boolean success = false;
-
+
try {
results = executorService.invokeAll(tasks);
for (Future<Boolean> result : results) {
@@ -148,23 +229,21 @@ protected TaskResult executeReprocess(OMMetadataManager
omMetadataManager, long
}
}
success = true;
-
+
} catch (InterruptedException | ExecutionException ex) {
LOG.error("Error while reprocessing NSSummary table in Recon DB.",
ex);
NSSummaryTask.setRebuildStateToFailed();
return buildTaskResult(false);
-
+
} finally {
executorService.shutdown();
-
long endTime = System.nanoTime();
long durationInMillis = TimeUnit.NANOSECONDS.toMillis(endTime -
startTime);
LOG.info("Test NSSummary reprocess execution time: {} milliseconds",
durationInMillis);
-
- // Reset state to IDLE on successful completion
+
if (success) {
NSSummaryTask.resetRebuildState();
- LOG.info("Test NSSummary tree reprocess completed successfully
with unified control.");
+ LOG.info("Test NSSummary tree reprocess completed successfully.");
}
}
@@ -183,215 +262,392 @@ void testInitialState() {
}
/**
- * Test successful single rebuild operation.
+ * Test single successful rebuild via queue.
*/
@Test
void testSingleSuccessfulRebuild() throws Exception {
- // Setup successful rebuild
- // Setup successful rebuild by default - no exception thrown
- doNothing().when(mockNamespaceSummaryManager).clearNSSummaryTable();
+ AtomicBoolean rebuildExecuted = new AtomicBoolean(false);
+ CountDownLatch rebuildLatch = new CountDownLatch(1);
+
+ doAnswer(invocation -> {
+ rebuildExecuted.set(true);
+ rebuildLatch.countDown();
+ return null;
+ }).when(mockNamespaceSummaryManager).clearNSSummaryTable();
+
+ // Queue rebuild via production API
+ ReconTaskController.ReInitializationResult result =
+ taskController.queueReInitializationEvent(
+
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
+
+ assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result,
+ "Rebuild should be queued successfully");
- // Execute rebuild
- TaskResult result = nsSummaryTask.reprocess(mockOMMetadataManager);
+ // Wait for async processing
+ assertTrue(rebuildLatch.await(10, TimeUnit.SECONDS),
+ "Rebuild should execute");
+ assertTrue(rebuildExecuted.get(), "Rebuild should have executed");
- // Verify results
- assertTrue(result.isTaskSuccess(), "Rebuild should succeed");
+ // wait for 5 secs for state to return to IDLE
+ GenericTestUtils.waitFor(() -> NSSummaryTask.getRebuildState() ==
RebuildState.IDLE,
+ 100, 5000);
assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
"State should return to IDLE after successful rebuild");
-
- // Verify interactions
- verify(mockNamespaceSummaryManager, times(1)).clearNSSummaryTable();
}
/**
- * Test rebuild failure sets state to FAILED.
+ * Test rebuild failure sets proper state.
*/
@Test
- void testRebuildFailure() throws IOException {
- // Setup failure scenario
- doThrow(new IOException("Test
failure")).when(mockNamespaceSummaryManager).clearNSSummaryTable();
+ void testRebuildFailure() throws Exception {
+ CountDownLatch failureLatch = new CountDownLatch(1);
- // Execute rebuild
- TaskResult result = nsSummaryTask.reprocess(mockOMMetadataManager);
+ doAnswer(invocation -> {
+ failureLatch.countDown();
+ throw new IOException("Test failure");
+ }).when(mockNamespaceSummaryManager).clearNSSummaryTable();
+
+ ReconTaskController.ReInitializationResult result =
+ taskController.queueReInitializationEvent(
+
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
+
+ assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result,
+ "Rebuild should be queued successfully");
+
+ assertTrue(failureLatch.await(10, TimeUnit.SECONDS),
+ "Rebuild should be attempted");
- // Verify results
- assertFalse(result.isTaskSuccess(), "Rebuild should fail");
+ // Wait for 5 secs time for state update
+ GenericTestUtils.waitFor(() -> NSSummaryTask.getRebuildState() ==
RebuildState.FAILED,
+ 100, 5000);
assertEquals(RebuildState.FAILED, NSSummaryTask.getRebuildState(),
"State should be FAILED after rebuild failure");
}
/**
- * Test concurrent rebuild attempts - second call should be rejected.
+ * Test rebuild can be triggered again after failure.
*/
@Test
- void testConcurrentRebuildPrevention() throws Exception {
- CountDownLatch startLatch = new CountDownLatch(1);
- CountDownLatch finishLatch = new CountDownLatch(1);
- AtomicBoolean firstRebuildStarted = new AtomicBoolean(false);
- AtomicBoolean secondRebuildRejected = new AtomicBoolean(false);
+ void testRebuildAfterFailure() throws Exception {
+ CountDownLatch firstAttempt = new CountDownLatch(1);
+ CountDownLatch secondAttempt = new CountDownLatch(1);
+ AtomicInteger attemptCount = new AtomicInteger(0);
- // Setup first rebuild to block until we signal
+ // Setup mock to fail first time, succeed second time
doAnswer(invocation -> {
- firstRebuildStarted.set(true);
- startLatch.countDown();
- // Wait for test to signal completion
- boolean awaitSuccess = finishLatch.await(10, TimeUnit.SECONDS);
- if (!awaitSuccess) {
- LOG.warn("finishLatch.await() timed out");
+ int attempt = attemptCount.incrementAndGet();
+ LOG.info("clearNSSummaryTable attempt #{}", attempt);
+ if (attempt == 1) {
+ firstAttempt.countDown();
+ throw new IOException("First failure");
+ } else {
+ secondAttempt.countDown();
+ return null;
}
- return null;
}).when(mockNamespaceSummaryManager).clearNSSummaryTable();
- ExecutorService executor = Executors.newFixedThreadPool(2);
-
- try {
- // Start first rebuild asynchronously
- CompletableFuture<TaskResult> firstRebuild =
CompletableFuture.supplyAsync(() -> {
- LOG.info("Starting first rebuild");
- return nsSummaryTask.reprocess(mockOMMetadataManager);
- }, executor);
-
- // Wait for first rebuild to start
- assertTrue(startLatch.await(5, TimeUnit.SECONDS),
- "First rebuild should start within timeout");
- assertTrue(firstRebuildStarted.get(), "First rebuild should have
started");
- assertEquals(RebuildState.RUNNING, NSSummaryTask.getRebuildState(),
- "State should be RUNNING during first rebuild");
-
- // Attempt second rebuild - should be rejected immediately
- CompletableFuture<TaskResult> secondRebuild =
CompletableFuture.supplyAsync(() -> {
- LOG.info("Attempting second rebuild");
- TaskResult result = nsSummaryTask.reprocess(mockOMMetadataManager);
- secondRebuildRejected.set(!result.isTaskSuccess());
- return result;
- }, executor);
-
- // Get second rebuild result quickly (should be immediate rejection)
- TaskResult secondResult = secondRebuild.get(2, TimeUnit.SECONDS);
- assertFalse(secondResult.isTaskSuccess(),
- "Second rebuild should be rejected");
- assertTrue(secondRebuildRejected.get(), "Second rebuild should have been
rejected");
-
- // Signal first rebuild to complete
- finishLatch.countDown();
- TaskResult firstResult = firstRebuild.get(5, TimeUnit.SECONDS);
- assertTrue(firstResult.isTaskSuccess(), "First rebuild should succeed");
-
- // Verify final state
- assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
- "State should return to IDLE after first rebuild completes");
-
- } finally {
- finishLatch.countDown(); // Ensure cleanup
- executor.shutdown();
- executor.awaitTermination(5, TimeUnit.SECONDS);
- }
- }
-
- /**
- * Test that rebuild can be triggered again after failure.
- */
- @Test
- void testRebuildAfterFailure() throws Exception {
// First rebuild fails
- doThrow(new IOException("Test
failure")).when(mockNamespaceSummaryManager).clearNSSummaryTable();
-
- TaskResult failedResult = nsSummaryTask.reprocess(mockOMMetadataManager);
- assertFalse(failedResult.isTaskSuccess(), "First rebuild should fail");
+ ReconTaskController.ReInitializationResult result1 =
+ taskController.queueReInitializationEvent(
+
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
+ assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result1,
+ "First event should be queued successfully");
+
+ assertTrue(firstAttempt.await(10, TimeUnit.SECONDS), "First rebuild should
be attempted");
+ GenericTestUtils.waitFor(() -> NSSummaryTask.getRebuildState() ==
RebuildState.FAILED,
+ 100, 5000);
assertEquals(RebuildState.FAILED, NSSummaryTask.getRebuildState(),
"State should be FAILED after first rebuild");
- // Second rebuild succeeds
- // Setup successful rebuild by default - no exception thrown
- doNothing().when(mockNamespaceSummaryManager).clearNSSummaryTable();
-
- TaskResult successResult = nsSummaryTask.reprocess(mockOMMetadataManager);
- assertTrue(successResult.isTaskSuccess(), "Second rebuild should succeed");
+ // Second rebuild succeeds - must wait for time-based retry delay to expire
+ // This is a deliberate time-based test of the retry mechanism
+ int retryDelayMs = 2100; // RETRY_DELAY_MS (2000ms) + buffer
+ long deadline = System.currentTimeMillis() + retryDelayMs;
+ GenericTestUtils.waitFor(() -> System.currentTimeMillis() >= deadline,
+ 100, retryDelayMs + 1000);
+
+ ReconTaskController.ReInitializationResult result2 =
+ taskController.queueReInitializationEvent(
+
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
+ assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result2,
+ "Second event should be queued successfully after retry delay");
+
+ assertTrue(secondAttempt.await(10, TimeUnit.SECONDS), "Second rebuild
should be attempted");
+ GenericTestUtils.waitFor(() -> NSSummaryTask.getRebuildState() ==
RebuildState.IDLE,
+ 100, 5000);
assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
"State should be IDLE after successful rebuild");
}
/**
- * Test multiple concurrent attempts - only one should succeed, others
rejected.
+ * Test multiple concurrent queueReInitializationEvent() calls.
+ * <p>
+ * This is the KEY test for production behavior - multiple threads
+ * simultaneously calling queueReInitializationEvent(), which is what
+ * actually happens in production (not direct reprocess() calls).
+ *
+ * <p>Important: The queue-based architecture provides SEQUENTIAL processing,
+ * not event deduplication. Multiple successfully queued events will execute
+ * sequentially (not concurrently). The AtomicReference in NSSummaryTask
+ * prevents concurrent execution within a single reprocess() call.
*/
@Test
- @Flaky("HDDS-13573")
void testMultipleConcurrentAttempts() throws Exception {
- int threadCount = 5;
- CountDownLatch startLatch = new CountDownLatch(1);
- CountDownLatch finishLatch = new CountDownLatch(1);
- AtomicInteger successCount = new AtomicInteger(0);
- AtomicInteger rejectedCount = new AtomicInteger(0);
- AtomicInteger clearTableCallCount = new AtomicInteger(0);
-
- // Ensure clean initial state
+ ConcurrentTestContext ctx = new ConcurrentTestContext(5);
+
NSSummaryTask.resetRebuildState();
- assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
- "Initial state must be IDLE");
+ assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(), "Initial
state must be IDLE");
+
+ setupConcurrentExecutionTracking(ctx);
+
+ ExecutorService executor =
Executors.newFixedThreadPool(ctx.getThreadCount());
+ List<CompletableFuture<ReconTaskController.ReInitializationResult>>
futures =
+ launchConcurrentQueueRequests(ctx, executor);
+
+ try {
+ coordinateConcurrentExecution(ctx, futures);
+ verifyConcurrentExecutionResults(ctx, futures);
+ } finally {
+ ctx.getFirstRebuildCanComplete().countDown();
+ executor.shutdown();
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ }
+ }
- // Setup rebuild to block and count calls
+ private void setupConcurrentExecutionTracking(ConcurrentTestContext ctx)
throws IOException {
doAnswer(invocation -> {
- int callNum = clearTableCallCount.incrementAndGet();
- LOG.info("clearNSSummaryTable called #{}, current state: {}", callNum,
NSSummaryTask.getRebuildState());
-
- if (callNum == 1) {
- startLatch.countDown();
- boolean awaitSuccess = finishLatch.await(10, TimeUnit.SECONDS);
- if (!awaitSuccess) {
- LOG.warn("finishLatch.await() timed out");
+ int callNum = ctx.getClearTableCallCount().incrementAndGet();
+ int currentConcurrent = ctx.getConcurrentExecutions().incrementAndGet();
+
+ ctx.getMaxConcurrentExecutions().updateAndGet(max -> Math.max(max,
currentConcurrent));
+ LOG.info("clearNSSummaryTable call #{}, concurrent executions: {},
state: {}",
+ callNum, currentConcurrent, NSSummaryTask.getRebuildState());
+
+ try {
+ if (callNum == 1) {
+ ctx.getFirstRebuildStarted().countDown();
+ boolean awaitSuccess = ctx.getFirstRebuildCanComplete().await(15,
TimeUnit.SECONDS);
+ if (!awaitSuccess) {
+ LOG.error("firstRebuildCanComplete.await() timed out");
+ }
+ } else {
+ Thread.sleep(100); // Simulate operation time (acceptable use in
mock)
}
+ } finally {
+ ctx.getConcurrentExecutions().decrementAndGet();
}
return null;
}).when(mockNamespaceSummaryManager).clearNSSummaryTable();
+ }
+
+ private List<CompletableFuture<ReconTaskController.ReInitializationResult>>
+ launchConcurrentQueueRequests(ConcurrentTestContext ctx, ExecutorService
executor) {
+ List<CompletableFuture<ReconTaskController.ReInitializationResult>>
futures = new ArrayList<>();
- ExecutorService executor = Executors.newFixedThreadPool(threadCount);
- CompletableFuture<Void>[] futures = new CompletableFuture[threadCount];
+ for (int i = 0; i < ctx.threadCount; i++) {
+ final int threadId = i;
+ futures.add(CompletableFuture.supplyAsync(() ->
+ executeQueueRequest(ctx, threadId), executor));
+ }
+ return futures;
+ }
+ private ReconTaskController.ReInitializationResult executeQueueRequest(
+ ConcurrentTestContext ctx, int threadId) {
try {
- // Launch multiple concurrent rebuilds
- for (int i = 0; i < threadCount; i++) {
- final int threadId = i;
- futures[i] = CompletableFuture.runAsync(() -> {
- LOG.info("Thread {} attempting rebuild, current state: {}",
threadId, NSSummaryTask.getRebuildState());
- TaskResult result = nsSummaryTask.reprocess(mockOMMetadataManager);
- if (result.isTaskSuccess()) {
- int count = successCount.incrementAndGet();
- LOG.info("Thread {} rebuild succeeded (success #{})", threadId,
count);
- } else {
- int count = rejectedCount.incrementAndGet();
- LOG.info("Thread {} rebuild rejected (rejection #{})", threadId,
count);
- }
- }, executor);
+ ctx.getAllThreadsReady().countDown();
+ LOG.info("Thread {} ready, waiting for all threads", threadId);
+
+ if (!ctx.getAllThreadsReady().await(10, TimeUnit.SECONDS)) {
+ throw new RuntimeException("Not all threads ready in time");
}
- // Wait for first rebuild to start
- assertTrue(startLatch.await(5, TimeUnit.SECONDS),
- "At least one rebuild should start");
- assertEquals(RebuildState.RUNNING, NSSummaryTask.getRebuildState(),
- "State should be RUNNING");
-
- // Let rebuilds complete
- finishLatch.countDown();
- CompletableFuture.allOf(futures).get(10, TimeUnit.SECONDS);
-
- // Debug output
- LOG.info("Final counts - Success: {}, Rejected: {}, ClearTable calls:
{}, Final state: {}",
- successCount.get(), rejectedCount.get(), clearTableCallCount.get(),
NSSummaryTask.getRebuildState());
-
- // Verify results - only one thread should have successfully executed
the rebuild
- assertEquals(1, clearTableCallCount.get(),
- "clearNSSummaryTable should only be called once due to unified
control");
- assertEquals(1, successCount.get(),
- "Exactly one rebuild should succeed");
- assertEquals(threadCount - 1, rejectedCount.get(),
- "All other rebuilds should be rejected");
- assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
- "Final state should be IDLE");
+ Thread.sleep(threadId * 10L); // Staggered delay for race conditions
- } finally {
- finishLatch.countDown(); // Ensure cleanup
- executor.shutdown();
- executor.awaitTermination(5, TimeUnit.SECONDS);
+ LOG.info("Thread {} calling queueReInitializationEvent()", threadId);
+ ctx.getTotalQueueAttempts().incrementAndGet();
+
+ ReconTaskController.ReInitializationResult result =
+ taskController.queueReInitializationEvent(
+
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
+
+ if (result == ReconTaskController.ReInitializationResult.SUCCESS) {
+ ctx.getSuccessfulQueueCount().incrementAndGet();
+ }
+
+ LOG.info("Thread {} completed with result={}", threadId, result);
+ return result;
+ } catch (InterruptedException e) {
+ LOG.error("Thread {} interrupted", threadId, e);
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void coordinateConcurrentExecution(ConcurrentTestContext ctx,
+
List<CompletableFuture<ReconTaskController.ReInitializationResult>>
+ futures)
+ throws Exception {
+ assertTrue(ctx.getFirstRebuildStarted().await(15, TimeUnit.SECONDS),
"First rebuild should start");
+ LOG.info("First rebuild started, state: {}",
NSSummaryTask.getRebuildState());
+
+ GenericTestUtils.waitFor(() -> ctx.getTotalQueueAttempts().get() >=
ctx.getThreadCount() / 2,
+ 100, 5000);
+
+ ctx.getFirstRebuildCanComplete().countDown();
+
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(20,
TimeUnit.SECONDS);
+
+ // Wait for all queued events to be processed and state to return to IDLE
+ // Use longer timeout and smaller check interval for more robust waiting
+ GenericTestUtils.waitFor(() -> {
+ RebuildState state = NSSummaryTask.getRebuildState();
+ LOG.info("Current state: {}, clearTableCallCount: {}", state,
ctx.getClearTableCallCount().get());
+ return state == RebuildState.IDLE;
+ }, 100, 20000);
+ }
+
+ private void verifyConcurrentExecutionResults(ConcurrentTestContext ctx,
+
List<CompletableFuture<ReconTaskController.ReInitializationResult>>
+ futures)
+ throws Exception {
+ ResultCounts counts = collectResultCounts(futures);
+
+ LOG.info("Test completed - Total queue attempts: {}, Successful queues:
{}, " +
+ "Result breakdown: SUCCESS={}, RETRY_LATER={}, MAX_RETRIES={}, " +
+ "ClearTable calls: {}, Max concurrent: {}, Final state: {}",
+ ctx.getTotalQueueAttempts().get(), ctx.getSuccessfulQueueCount().get(),
+ counts.getSuccessCount(), counts.getRetryLaterCount(),
counts.getMaxRetriesCount(),
+ ctx.getClearTableCallCount().get(),
ctx.getMaxConcurrentExecutions().get(),
+ NSSummaryTask.getRebuildState());
+
+ assertEquals(1, ctx.getMaxConcurrentExecutions().get(),
+ "Should never have concurrent executions - queue provides
serialization");
+ assertEquals(ctx.getThreadCount(), ctx.getTotalQueueAttempts().get(),
+ "All threads should have attempted to queue events");
+ assertTrue(ctx.getSuccessfulQueueCount().get() >= 1,
+ "At least one thread should have successfully queued rebuild");
+ assertThat(ctx.getClearTableCallCount().get())
+ .as("At least one rebuild should execute and not exceed successfully
queued events")
+ .isPositive()
+ .isLessThanOrEqualTo(ctx.getSuccessfulQueueCount().get());
+
+ // Final verification that state is IDLE - wait one more time to ensure
stability
+ GenericTestUtils.waitFor(() -> NSSummaryTask.getRebuildState() ==
RebuildState.IDLE,
+ 100, 5000);
+ assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
+ "Final state should be IDLE after all rebuilds complete");
+
+ LOG.info("VERIFIED: Queue architecture prevents concurrent executions. " +
+ "Multiple events can be queued but execute sequentially.");
+ }
+
+ private ResultCounts collectResultCounts(
+ List<CompletableFuture<ReconTaskController.ReInitializationResult>>
futures)
+ throws Exception {
+ ResultCounts counts = new ResultCounts();
+ for (CompletableFuture<ReconTaskController.ReInitializationResult> future
: futures) {
+ ReconTaskController.ReInitializationResult result = future.get();
+ switch (result) {
+ case SUCCESS:
+ counts.successCount++;
+ break;
+ case RETRY_LATER:
+ counts.retryLaterCount++;
+ break;
+ case MAX_RETRIES_EXCEEDED:
+ counts.maxRetriesCount++;
+ break;
+ default:
+ LOG.warn("Unexpected result: {}", result);
+ }
+ }
+ return counts;
+ }
+
+ /**
+ * Context for concurrent test execution tracking.
+ */
+ private static class ConcurrentTestContext {
+ private final int threadCount;
+ private final CountDownLatch allThreadsReady;
+ private final CountDownLatch firstRebuildStarted;
+ private final CountDownLatch firstRebuildCanComplete;
+ private final AtomicInteger clearTableCallCount;
+ private final AtomicInteger concurrentExecutions;
+ private final AtomicInteger maxConcurrentExecutions;
+ private final AtomicInteger successfulQueueCount;
+ private final AtomicInteger totalQueueAttempts;
+
+ ConcurrentTestContext(int threadCount) {
+ this.threadCount = threadCount;
+ this.allThreadsReady = new CountDownLatch(threadCount);
+ this.firstRebuildStarted = new CountDownLatch(1);
+ this.firstRebuildCanComplete = new CountDownLatch(1);
+ this.clearTableCallCount = new AtomicInteger(0);
+ this.concurrentExecutions = new AtomicInteger(0);
+ this.maxConcurrentExecutions = new AtomicInteger(0);
+ this.successfulQueueCount = new AtomicInteger(0);
+ this.totalQueueAttempts = new AtomicInteger(0);
+ }
+
+ public int getThreadCount() {
+ return threadCount;
+ }
+
+ public CountDownLatch getAllThreadsReady() {
+ return allThreadsReady;
+ }
+
+ public CountDownLatch getFirstRebuildStarted() {
+ return firstRebuildStarted;
+ }
+
+ public CountDownLatch getFirstRebuildCanComplete() {
+ return firstRebuildCanComplete;
+ }
+
+ public AtomicInteger getClearTableCallCount() {
+ return clearTableCallCount;
+ }
+
+ public AtomicInteger getConcurrentExecutions() {
+ return concurrentExecutions;
+ }
+
+ public AtomicInteger getMaxConcurrentExecutions() {
+ return maxConcurrentExecutions;
+ }
+
+ public AtomicInteger getSuccessfulQueueCount() {
+ return successfulQueueCount;
+ }
+
+ public AtomicInteger getTotalQueueAttempts() {
+ return totalQueueAttempts;
+ }
+ }
+
+ /**
+ * Result counts from concurrent execution.
+ */
+ private static class ResultCounts {
+ private long successCount = 0;
+ private long retryLaterCount = 0;
+ private long maxRetriesCount = 0;
+
+ public long getSuccessCount() {
+ return successCount;
+ }
+
+ public long getRetryLaterCount() {
+ return retryLaterCount;
+ }
+
+ public long getMaxRetriesCount() {
+ return maxRetriesCount;
}
}
@@ -400,38 +656,32 @@ void testMultipleConcurrentAttempts() throws Exception {
*/
@Test
void testReconUtilsIntegration() throws Exception {
- // Test initial state access via ReconUtils
assertEquals(RebuildState.IDLE, ReconUtils.getNSSummaryRebuildState(),
"Initial state should be IDLE via ReconUtils");
- // Start a rebuild to test RUNNING state
CountDownLatch rebuildStarted = new CountDownLatch(1);
CountDownLatch rebuildCanFinish = new CountDownLatch(1);
- // Setup rebuild to block so we can test state
doAnswer(invocation -> {
rebuildStarted.countDown();
- boolean awaitSuccess = rebuildCanFinish.await(5, TimeUnit.SECONDS);
+ boolean awaitSuccess = rebuildCanFinish.await(10, TimeUnit.SECONDS);
if (!awaitSuccess) {
LOG.warn("rebuildCanFinish.await() timed out");
}
return null;
}).when(mockNamespaceSummaryManager).clearNSSummaryTable();
- // Start rebuild in background to test state transitions
- CompletableFuture<TaskResult> rebuild = CompletableFuture.supplyAsync(()
->
- nsSummaryTask.reprocess(mockOMMetadataManager));
+ taskController.queueReInitializationEvent(
+ ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
- // Wait for rebuild to start and verify RUNNING state
- assertTrue(rebuildStarted.await(5, TimeUnit.SECONDS),
- "Rebuild should start within timeout");
+ assertTrue(rebuildStarted.await(10, TimeUnit.SECONDS),
+ "Rebuild should start");
assertEquals(RebuildState.RUNNING, ReconUtils.getNSSummaryRebuildState(),
"State should be RUNNING during rebuild");
- // Complete rebuild and verify IDLE state
rebuildCanFinish.countDown();
- TaskResult result = rebuild.get(5, TimeUnit.SECONDS);
- assertTrue(result.isTaskSuccess(), "Rebuild should succeed");
+ GenericTestUtils.waitFor(() -> ReconUtils.getNSSummaryRebuildState() ==
RebuildState.IDLE,
+ 100, 5000);
assertEquals(RebuildState.IDLE, ReconUtils.getNSSummaryRebuildState(),
"State should return to IDLE after completion");
}
@@ -441,65 +691,114 @@ void testReconUtilsIntegration() throws Exception {
*/
@Test
void testStateTransitionsDuringExceptions() throws Exception {
- // Test exception during clearNSSummaryTable
- doThrow(new RuntimeException("Unexpected error"))
- .when(mockNamespaceSummaryManager).clearNSSummaryTable();
+ CountDownLatch exceptionLatch = new CountDownLatch(1);
+ CountDownLatch recoveryLatch = new CountDownLatch(1);
+ AtomicInteger callCount = new AtomicInteger(0);
- TaskResult result = nsSummaryTask.reprocess(mockOMMetadataManager);
-
- assertFalse(result.isTaskSuccess(), "Rebuild should fail on exception");
+ // Setup mock to throw exception first time, succeed second time
+ doAnswer(invocation -> {
+ int call = callCount.incrementAndGet();
+ LOG.info("clearNSSummaryTable call #{}", call);
+ if (call == 1) {
+ exceptionLatch.countDown();
+ throw new RuntimeException("Unexpected error");
+ } else {
+ recoveryLatch.countDown();
+ return null;
+ }
+ }).when(mockNamespaceSummaryManager).clearNSSummaryTable();
+
+ // First rebuild throws exception
+ ReconTaskController.ReInitializationResult result1 =
+ taskController.queueReInitializationEvent(
+
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
+ assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result1,
+ "First event should be queued successfully");
+
+ assertTrue(exceptionLatch.await(10, TimeUnit.SECONDS),
+ "Exception should occur");
+ GenericTestUtils.waitFor(() -> NSSummaryTask.getRebuildState() ==
RebuildState.FAILED,
+ 100, 5000);
assertEquals(RebuildState.FAILED, NSSummaryTask.getRebuildState(),
"State should be FAILED after exception");
- // Verify we can recover from FAILED state
- // Setup successful rebuild by default - no exception thrown
- doNothing().when(mockNamespaceSummaryManager).clearNSSummaryTable();
-
- TaskResult recoveryResult = nsSummaryTask.reprocess(mockOMMetadataManager);
- assertTrue(recoveryResult.isTaskSuccess(), "Recovery rebuild should
succeed");
+ // Second rebuild succeeds (recovery) - must wait for time-based retry
delay to expire
+ // This is a deliberate time-based test of the retry mechanism
+ int retryDelayMs = 2100; // RETRY_DELAY_MS (2000ms) + buffer
+ long deadline = System.currentTimeMillis() + retryDelayMs;
+ GenericTestUtils.waitFor(() -> System.currentTimeMillis() >= deadline,
+ 100, retryDelayMs + 1000);
+
+ ReconTaskController.ReInitializationResult result2 =
+ taskController.queueReInitializationEvent(
+
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
+ assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result2,
+ "Second event should be queued successfully after retry delay");
+
+ assertTrue(recoveryLatch.await(10, TimeUnit.SECONDS), "Recovery should
execute");
+ GenericTestUtils.waitFor(() -> NSSummaryTask.getRebuildState() ==
RebuildState.IDLE,
+ 100, 5000);
assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
"State should be IDLE after recovery");
}
/**
- * Test that interrupted threads are handled properly.
+ * Test checkpoint creation failure and retry mechanism.
*/
@Test
- void testInterruptedThreadHandling() throws Exception {
- CountDownLatch rebuildStarted = new CountDownLatch(1);
- AtomicBoolean wasInterrupted = new AtomicBoolean(false);
+ void testCheckpointCreationFailureRetry() throws Exception {
+ ReconTaskControllerImpl controllerSpy = spy(taskController);
+ doThrow(new IOException("Checkpoint creation failed"))
+ .when(controllerSpy).createOMCheckpoint(any());
+
+ // First few attempts should return RETRY_LATER
+ ReconTaskController.ReInitializationResult result1 =
+ controllerSpy.queueReInitializationEvent(
+
ReconTaskReInitializationEvent.ReInitializationReason.BUFFER_OVERFLOW);
+ assertEquals(ReconTaskController.ReInitializationResult.RETRY_LATER,
result1,
+ "First attempt should return RETRY_LATER due to checkpoint failure");
+
+ // After delay, try again - must wait for time-based retry delay to expire
+ int retryDelayMs = 2100; // RETRY_DELAY_MS (2000ms) + buffer
+ long deadline = System.currentTimeMillis() + retryDelayMs;
+ GenericTestUtils.waitFor(() -> System.currentTimeMillis() >= deadline,
+ 100, retryDelayMs + 1000);
+ ReconTaskController.ReInitializationResult result2 =
+ controllerSpy.queueReInitializationEvent(
+
ReconTaskReInitializationEvent.ReInitializationReason.BUFFER_OVERFLOW);
+ assertEquals(ReconTaskController.ReInitializationResult.RETRY_LATER,
result2,
+ "Second attempt should also return RETRY_LATER");
+
+ verify(controllerSpy, times(2)).createOMCheckpoint(any());
+ }
- // Setup rebuild to detect interruption
+ /**
+ * Test event buffer integration with concurrent queueing.
+ */
+ @Test
+ void testEventBufferWithConcurrentQueueing() throws Exception {
+ int initialBufferSize = taskController.getEventBufferSize();
+ LOG.info("Initial buffer size: {}", initialBufferSize);
+
+ CountDownLatch queuedLatch = new CountDownLatch(1);
doAnswer(invocation -> {
- rebuildStarted.countDown();
- try {
- Thread.sleep(5000); // Long sleep to ensure interruption
- } catch (InterruptedException e) {
- wasInterrupted.set(true);
- throw new RuntimeException("Interrupted", e);
- }
+ queuedLatch.countDown();
+ Thread.sleep(100);
return null;
}).when(mockNamespaceSummaryManager).clearNSSummaryTable();
- // Start rebuild in separate thread
- Thread rebuildThread = new Thread(() -> {
- nsSummaryTask.reprocess(mockOMMetadataManager);
- });
- rebuildThread.start();
+ // Queue an event
+ ReconTaskController.ReInitializationResult result =
+ taskController.queueReInitializationEvent(
+
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
- // Wait for rebuild to start
- assertTrue(rebuildStarted.await(5, TimeUnit.SECONDS),
- "Rebuild should start");
- assertEquals(RebuildState.RUNNING, NSSummaryTask.getRebuildState(),
- "State should be RUNNING");
+ assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result,
+ "Event should be successfully queued");
- // Interrupt the thread
- rebuildThread.interrupt();
- rebuildThread.join(5000);
+ // Event should be in buffer or being processed
+ assertTrue(queuedLatch.await(10, TimeUnit.SECONDS),
+ "Event should be processed");
- // Verify interruption was handled
- assertTrue(wasInterrupted.get(), "Thread should have been interrupted");
- assertEquals(RebuildState.FAILED, NSSummaryTask.getRebuildState(),
- "State should be FAILED after interruption");
+ LOG.info("Event buffer integration test completed successfully");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]