This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e6f3642ca Add CountDownLatch to concurrent tests (#5853)
2e6f3642ca is described below

commit 2e6f3642ca6a93e6250a969f96734c714357d231
Author: Dom G. <[email protected]>
AuthorDate: Fri Sep 12 14:10:16 2025 -0400

    Add CountDownLatch to concurrent tests (#5853)
    
    * Add CountDownLatch to concurrent tests to improve concurrency and to be 
able to choose where the threads sync up in order to better stress the code we 
are trying to stress
    * Add assert statements and shared variables to ensure we are properly 
handling integers and other values that pertain to the correctness of test 
logic within the changed tests
---
 .../core/clientImpl/ClientTabletCacheImplTest.java | 10 +++-
 .../apache/accumulo/core/crypto/CryptoTest.java    | 12 ++++-
 .../core/file/rfile/bcfile/CompressionTest.java    | 62 +++++++++++++---------
 .../org/apache/accumulo/core/util/LockMapTest.java | 19 +++++--
 .../java/org/apache/accumulo/test/LargeReadIT.java | 16 ++++--
 .../accumulo/test/ScanServerMultipleScansIT.java   | 56 +++++++++++--------
 .../accumulo/test/UniqueNameAllocatorIT.java       | 20 +++++--
 .../apache/accumulo/test/WriteAfterCloseIT.java    | 25 +++++++--
 .../test/conf/PropStoreConfigIT_SimpleSuite.java   | 38 +++++++++----
 .../apache/accumulo/test/fate/FateStoreITBase.java | 21 +++++---
 .../accumulo/test/fate/meta/ZooMutatorIT.java      | 38 ++++++++-----
 .../apache/accumulo/test/functional/BulkNewIT.java | 26 +++++++--
 .../accumulo/test/functional/CompactionIT.java     | 19 ++++---
 .../test/functional/ConcurrentDeleteTableIT.java   |  5 +-
 .../accumulo/test/functional/FateStarvationIT.java | 15 +++++-
 15 files changed, 274 insertions(+), 108 deletions(-)

diff --git 
a/core/src/test/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImplTest.java
 
b/core/src/test/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImplTest.java
index 372af80083..9c35fd312b 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImplTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImplTest.java
@@ -40,6 +40,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -1989,8 +1990,12 @@ public class ClientTabletCacheImplTest {
     var executor = Executors.newCachedThreadPool();
     final int lookupCount = 128;
     final int roundCount = 8;
+    final int numTasks = roundCount * lookupCount;
 
-    List<Future<CachedTablet>> futures = new ArrayList<>(roundCount * 
lookupCount);
+    List<Future<CachedTablet>> futures = new ArrayList<>(numTasks);
+    CountDownLatch startLatch = new CountDownLatch(32); // start a portion of 
threads at once
+    assertTrue(numTasks >= startLatch.getCount(),
+        "Not enough tasks to satisfy latch count - deadlock risk");
 
     // multiple rounds to increase the chance of contention
     for (int round = 0; round < roundCount; round++) {
@@ -2008,6 +2013,8 @@ public class ClientTabletCacheImplTest {
 
       for (var lookup : rowsToLookup) {
         var future = executor.submit(() -> {
+          startLatch.countDown();
+          startLatch.await();
           if (RANDOM.get().nextInt(10) < 3) {
             Thread.yield();
           }
@@ -2024,6 +2031,7 @@ public class ClientTabletCacheImplTest {
         futures.add(future);
       }
     }
+    assertEquals(numTasks, futures.size());
 
     for (var future : futures) {
       assertNotNull(future.get());
diff --git a/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java 
b/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java
index d56b4b0993..ae9cc8e4a6 100644
--- a/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java
@@ -49,6 +49,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
@@ -539,14 +540,20 @@ public class CryptoTest {
 
     var executor = Executors.newCachedThreadPool();
 
-    List<Future<Boolean>> verifyFutures = new ArrayList<>();
+    final int numTasks = 32;
+    List<Future<Boolean>> verifyFutures = new ArrayList<>(numTasks);
+    CountDownLatch startLatch = new CountDownLatch(numTasks);
+    assertTrue(numTasks >= startLatch.getCount(),
+        "Not enough tasks to satisfy latch count - deadlock risk");
 
     FileDecrypter decrypter = cs.getFileDecrypter(new 
CryptoEnvironmentImpl(scope, null, params));
 
     // verify that each input stream returned by decrypter.decryptStream() is 
independent when used
     // by multiple threads
-    for (int i = 0; i < 32; i++) {
+    for (int i = 0; i < numTasks; i++) {
       var future = executor.submit(() -> {
+        startLatch.countDown();
+        startLatch.await();
         try (ByteArrayInputStream in = new ByteArrayInputStream(cipherText);
             DataInputStream decrypted = new 
DataInputStream(decrypter.decryptStream(in))) {
           byte[] dataRead = new byte[plainText.length];
@@ -556,6 +563,7 @@ public class CryptoTest {
       });
       verifyFutures.add(future);
     }
+    assertEquals(numTasks, verifyFutures.size());
 
     for (var future : verifyFutures) {
       assertTrue(future.get());
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
index 8f38b1b30e..32d6737000 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -187,16 +188,23 @@ public class CompressionTest {
 
         assertNotNull(codec, al + " should not be null");
 
-        ExecutorService service = Executors.newFixedThreadPool(10);
+        final int numTasks = 32;
+        ExecutorService service = Executors.newFixedThreadPool(numTasks);
 
-        ArrayList<Future<Boolean>> results = new ArrayList<>();
+        ArrayList<Future<Boolean>> results = new ArrayList<>(numTasks);
+        CountDownLatch startLatch = new CountDownLatch(numTasks);
+        assertTrue(numTasks >= startLatch.getCount(),
+            "Not enough tasks/threads to satisfy latch count - deadlock risk");
 
-        for (int i = 0; i < 30; i++) {
+        for (int i = 0; i < numTasks; i++) {
           results.add(service.submit(() -> {
+            startLatch.countDown();
+            startLatch.await();
             assertNotNull(al.getCodec(), al + " should not be null");
             return true;
           }));
         }
+        assertEquals(numTasks, results.size());
 
         service.shutdown();
 
@@ -227,17 +235,24 @@ public class CompressionTest {
         // first call to isSupported should be true
         assertTrue(al.isSupported(), al + " is not supported, but should be");
 
-        ExecutorService service = Executors.newFixedThreadPool(10);
+        final int numTasks = 32;
+        ExecutorService service = Executors.newFixedThreadPool(numTasks);
 
-        ArrayList<Future<Boolean>> results = new ArrayList<>();
+        ArrayList<Future<Boolean>> results = new ArrayList<>(numTasks);
+        CountDownLatch startLatch = new CountDownLatch(numTasks);
+        assertTrue(numTasks >= startLatch.getCount(),
+            "Not enough tasks/threads to satisfy latch count - deadlock risk");
 
-        for (int i = 0; i < 30; i++) {
+        for (int i = 0; i < numTasks; i++) {
 
           results.add(service.submit(() -> {
+            startLatch.countDown();
+            startLatch.await();
             assertNotNull(al.getCodec(), al + " should have a non-null codec");
             return true;
           }));
         }
+        assertEquals(numTasks, results.size());
 
         service.shutdown();
 
@@ -265,40 +280,39 @@ public class CompressionTest {
         // first call to isSupported should be true
         assertTrue(al.isSupported(), al + " is not supported, but should be");
 
-        ExecutorService service = Executors.newFixedThreadPool(20);
+        final int numTasks = 32;
+        ExecutorService service = Executors.newFixedThreadPool(numTasks);
 
-        ArrayList<Callable<Boolean>> list = new ArrayList<>();
+        ArrayList<Callable<Integer>> list = new ArrayList<>(numTasks);
 
-        ArrayList<Future<Boolean>> results = new ArrayList<>();
+        CountDownLatch startLatch = new CountDownLatch(numTasks);
+        assertTrue(numTasks >= startLatch.getCount(),
+            "Not enough tasks/threads to satisfy latch count - deadlock risk");
 
         // keep track of the system's identity hashcodes.
-        final HashSet<Integer> testSet = new HashSet<>();
 
-        for (int i = 0; i < 40; i++) {
+        for (int i = 0; i < numTasks; i++) {
           list.add(() -> {
+            startLatch.countDown();
+            startLatch.await();
             CompressionCodec codec = al.getCodec();
             assertNotNull(codec, al + " resulted in a non-null codec");
-            // add the identity hashcode to the set.
-            synchronized (testSet) {
-              testSet.add(System.identityHashCode(codec));
-            }
-            return true;
+            return System.identityHashCode(codec);
           });
         }
+        assertEquals(numTasks, list.size());
+
+        final HashSet<Integer> hashCodes = new HashSet<>();
+        for (Future<Integer> result : service.invokeAll(list)) {
+          hashCodes.add(result.get());
+        }
+        assertEquals(1, hashCodes.size(), al + " created too many codecs");
 
-        results.addAll(service.invokeAll(list));
-        // ensure that we
-        assertEquals(1, testSet.size(), al + " created too many codecs");
         service.shutdown();
 
         while (!service.awaitTermination(1, SECONDS)) {
           // wait
         }
-
-        for (Future<Boolean> result : results) {
-          assertTrue(result.get(),
-              al + " resulted in a failed call to getcodec within the thread 
pool");
-        }
       }
     }
   }
diff --git a/core/src/test/java/org/apache/accumulo/core/util/LockMapTest.java 
b/core/src/test/java/org/apache/accumulo/core/util/LockMapTest.java
index 151e40ac7e..d678285098 100644
--- a/core/src/test/java/org/apache/accumulo/core/util/LockMapTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/util/LockMapTest.java
@@ -18,12 +18,14 @@
  */
 package org.apache.accumulo.core.util;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -88,7 +90,9 @@ public class LockMapTest {
    */
   @Test
   public void testConcurrency() throws Exception {
-    var executor = Executors.newFixedThreadPool(32);
+    final int numThreads = 32;
+    var executor = Executors.newFixedThreadPool(numThreads);
+    final int numTasks = numThreads * 4;
 
     try {
       var lockMap = new LockMap<Integer>();
@@ -97,13 +101,21 @@ public class LockMapTest {
       var booleans = new AtomicBoolean[] {new AtomicBoolean(), new 
AtomicBoolean(),
           new AtomicBoolean(), new AtomicBoolean(), new AtomicBoolean()};
 
-      var futures = new ArrayList<Future<Boolean>>();
+      var futures = new ArrayList<Future<Boolean>>(numTasks);
+      // start a portion of threads at the same time
+      CountDownLatch startLatch = new CountDownLatch(numThreads);
+      assertTrue(numTasks >= startLatch.getCount(),
+          "Not enough tasks to satisfy latch count - deadlock risk");
+      assertTrue(numThreads >= startLatch.getCount(),
+          "Not enough threads to satisfy latch count - deadlock risk");
 
       var maxLocked = new AtomicLong(0);
 
-      for (int i = 0; i < 100; i++) {
+      for (int i = 0; i < numTasks; i++) {
         int key = random.nextInt(booleans.length);
         var future = executor.submit(() -> {
+          startLatch.countDown();
+          startLatch.await();
           try (var unused = lockMap.lock(key)) {
             var set1 = booleans[key].compareAndSet(false, true);
             // maxLocked is used to check that at some point we see another 
thread w/ a lock for a
@@ -121,6 +133,7 @@ public class LockMapTest {
         });
         futures.add(future);
       }
+      assertEquals(numTasks, futures.size());
 
       for (var future : futures) {
         assertTrue(future.get());
diff --git a/test/src/main/java/org/apache/accumulo/test/LargeReadIT.java 
b/test/src/main/java/org/apache/accumulo/test/LargeReadIT.java
index c81e673553..ca3184ee54 100644
--- a/test/src/main/java/org/apache/accumulo/test/LargeReadIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/LargeReadIT.java
@@ -19,11 +19,13 @@
 package org.apache.accumulo.test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.time.Duration;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
@@ -123,9 +125,14 @@ public class LargeReadIT extends AccumuloClusterHarness {
         }
       }
 
-      final int numThreads = 64;
-      var executor = Executors.newFixedThreadPool(numThreads);
+      final int numTasks = 64;
+      var executor = Executors.newFixedThreadPool(numTasks);
+      CountDownLatch startLatch = new CountDownLatch(numTasks);
+      assertTrue(numTasks >= startLatch.getCount(),
+          "Not enough tasks/threads to satisfy latch count - deadlock risk");
       Callable<Long> scanTask = () -> {
+        startLatch.countDown();
+        startLatch.await();
         try (var scanner = client.createScanner(tableName)) {
           scannerConfigurer.accept(scanner);
           return scanner.stream().count();
@@ -135,9 +142,8 @@ public class LargeReadIT extends AccumuloClusterHarness {
       // Run lots of concurrent task that should only read the small data, if 
they read the big
       // column family then it will exceed the tablet server memory and cause 
it to die and the test
       // to timeout.
-      var tasks =
-          Stream.iterate(scanTask, t -> t).limit(numThreads * 
5).collect(Collectors.toList());
-      assertEquals(numThreads * 5, tasks.size());
+      var tasks = Stream.iterate(scanTask, t -> t).limit(numTasks * 
5).collect(Collectors.toList());
+      assertEquals(numTasks * 5, tasks.size());
       for (var future : executor.invokeAll(tasks)) {
         assertEquals(100, future.get());
       }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java 
b/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java
index 735dd7f7d0..b5eb4e9dbd 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.test;
 import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
 import static org.apache.accumulo.test.ScanServerIT.createTableAndIngest;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 import java.util.ArrayList;
@@ -117,15 +118,18 @@ public class ScanServerMultipleScansIT extends 
SharedMiniClusterBase {
 
       final int ingestedEntryCount = createTableAndIngest(client, tableName, 
null, 10, 10, "colf");
 
-      final CountDownLatch latch = new CountDownLatch(1);
+      final CountDownLatch startLatch = new CountDownLatch(NUM_SCANS);
+      assertTrue(NUM_SCANS >= startLatch.getCount(),
+          "Not enough tasks to satisfy latch count - deadlock risk");
 
       List<Future<?>> futures = new ArrayList<>(NUM_SCANS);
       for (int i = 0; i < NUM_SCANS; i++) {
         var future = executor.submit(() -> {
           try {
-            latch.await();
+            startLatch.countDown();
+            startLatch.await();
           } catch (InterruptedException e1) {
-            fail("InterruptedException waiting for latch");
+            fail("InterruptedException waiting for startLatch");
           }
           try (Scanner scanner = client.createScanner(tableName, 
Authorizations.EMPTY)) {
             scanner.setRange(new Range());
@@ -138,7 +142,7 @@ public class ScanServerMultipleScansIT extends 
SharedMiniClusterBase {
 
         futures.add(future);
       }
-      latch.countDown();
+      assertEquals(NUM_SCANS, futures.size());
       for (Future<?> future : futures) {
         future.get();
       }
@@ -181,7 +185,9 @@ public class ScanServerMultipleScansIT extends 
SharedMiniClusterBase {
       assertEquals(splitPoints, new TreeSet<>(splitsFound));
       log.debug("Splits found: {}", splitsFound);
 
-      final CountDownLatch latch = new CountDownLatch(1);
+      final CountDownLatch startLatch = new CountDownLatch(NUM_SCANS);
+      assertTrue(NUM_SCANS >= startLatch.getCount(),
+          "Not enough tasks to satisfy latch count - deadlock risk");
 
       final AtomicLong counter = new AtomicLong(0);
 
@@ -191,9 +197,10 @@ public class ScanServerMultipleScansIT extends 
SharedMiniClusterBase {
         final int threadNum = i;
         var future = executor.submit(() -> {
           try {
-            latch.await();
+            startLatch.countDown();
+            startLatch.await();
           } catch (InterruptedException e1) {
-            fail("InterruptedException waiting for latch");
+            fail("InterruptedException waiting for startLatch");
           }
           try (Scanner scanner = client.createScanner(tableName, 
Authorizations.EMPTY)) {
             switch (threadNum) {
@@ -223,7 +230,7 @@ public class ScanServerMultipleScansIT extends 
SharedMiniClusterBase {
 
         futures.add(future);
       }
-      latch.countDown();
+      assertEquals(NUM_SCANS, futures.size());
       for (Future<?> future : futures) {
         future.get();
       }
@@ -239,14 +246,17 @@ public class ScanServerMultipleScansIT extends 
SharedMiniClusterBase {
 
       final int ingestedEntryCount = createTableAndIngest(client, tableName, 
null, 10, 10, "colf");
 
-      final CountDownLatch latch = new CountDownLatch(1);
+      final CountDownLatch startLatch = new CountDownLatch(NUM_SCANS);
+      assertTrue(NUM_SCANS >= startLatch.getCount(),
+          "Not enough tasks to satisfy latch count - deadlock risk");
 
       List<Future<?>> futures = new ArrayList<>(NUM_SCANS);
 
       for (int i = 0; i < NUM_SCANS; i++) {
         var future = executor.submit(() -> {
           try {
-            latch.await();
+            startLatch.countDown();
+            startLatch.await();
           } catch (InterruptedException e1) {
             fail("InterruptedException waiting for latch");
           }
@@ -260,7 +270,7 @@ public class ScanServerMultipleScansIT extends 
SharedMiniClusterBase {
         });
         futures.add(future);
       }
-      latch.countDown();
+      assertEquals(NUM_SCANS, futures.size());
       for (Future<?> future : futures) {
         future.get();
       }
@@ -304,16 +314,16 @@ public class ScanServerMultipleScansIT extends 
SharedMiniClusterBase {
       assertEquals(splitPoints, new TreeSet<>(splitsFound));
       log.debug("Splits found: {}", splitsFound);
 
-      final CountDownLatch latch = new CountDownLatch(1);
-
-      final AtomicLong counter = new AtomicLong(0);
-
-      List<Future<?>> futures = new ArrayList<>(NUM_SCANS);
+      final CountDownLatch startLatch = new CountDownLatch(NUM_SCANS);
+      assertTrue(NUM_SCANS >= startLatch.getCount(),
+          "Not enough tasks to satisfy latch count - deadlock risk");
+      List<Future<Long>> futures = new ArrayList<>(NUM_SCANS);
       for (int i = 0; i < NUM_SCANS; i++) {
         final int threadNum = i;
         var future = executor.submit(() -> {
           try {
-            latch.await();
+            startLatch.countDown();
+            startLatch.await();
           } catch (InterruptedException e1) {
             fail("InterruptedException waiting for latch");
           }
@@ -339,19 +349,21 @@ public class ScanServerMultipleScansIT extends 
SharedMiniClusterBase {
             }
             scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
 
-            counter.addAndGet(scanner.stream().count());
+            return scanner.stream().count();
           } catch (TableNotFoundException e) {
             fail("Table not found");
+            return 0L;
           }
         });
         futures.add(future);
       }
-      latch.countDown();
-      for (Future<?> future : futures) {
-        future.get();
+      assertEquals(NUM_SCANS, futures.size());
+      long total = 0;
+      for (Future<Long> future : futures) {
+        total += future.get();
       }
 
-      assertEquals(ingestedEntryCount, counter.get());
+      assertEquals(ingestedEntryCount, total);
     }
   }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/UniqueNameAllocatorIT.java 
b/test/src/main/java/org/apache/accumulo/test/UniqueNameAllocatorIT.java
index 58a56eed09..4b3b082851 100644
--- a/test/src/main/java/org/apache/accumulo/test/UniqueNameAllocatorIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/UniqueNameAllocatorIT.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -29,6 +30,7 @@ import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
@@ -67,11 +69,20 @@ class UniqueNameAllocatorIT extends SharedMiniClusterBase {
     Set<String> namesSeen = ConcurrentHashMap.newKeySet();
 
     var executorService = Executors.newCachedThreadPool();
-    List<Future<Integer>> futures = new ArrayList<>();
+    final int numLargeTasks = 64;
+    final int numSmallTasks = 10;
+    final int numTasks = numLargeTasks + numSmallTasks;
+    List<Future<Integer>> futures = new ArrayList<>(numTasks);
+    // start a portion of threads at the same time
+    CountDownLatch startLatch = new CountDownLatch(32);
+    assertTrue(numTasks >= startLatch.getCount(),
+        "Not enough tasks to satisfy latch count - deadlock risk");
 
     // create threads that are allocating large random chunks
-    for (int i = 0; i < 64; i++) {
+    for (int i = 0; i < numLargeTasks; i++) {
       var future = executorService.submit(() -> {
+        startLatch.countDown();
+        startLatch.await();
         int added = 0;
         while (namesSeen.size() < 1_000_000) {
           var allocator = allocators[random.nextInt(allocators.length)];
@@ -85,9 +96,11 @@ class UniqueNameAllocatorIT extends SharedMiniClusterBase {
     }
 
     // create threads that are always allocating a small amount
-    for (int i = 1; i <= 10; i++) {
+    for (int i = 1; i <= numSmallTasks; i++) {
       int needed = i;
       var future = executorService.submit(() -> {
+        startLatch.countDown();
+        startLatch.await();
         int added = 0;
         while (namesSeen.size() < 1_000_000) {
           var allocator = allocators[random.nextInt(allocators.length)];
@@ -98,6 +111,7 @@ class UniqueNameAllocatorIT extends SharedMiniClusterBase {
       });
       futures.add(future);
     }
+    assertEquals(numTasks, futures.size());
 
     for (var future : futures) {
       // expect all threads to add some names
diff --git a/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java 
b/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java
index 6892a35e34..4a2e18a8df 100644
--- a/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -141,12 +142,26 @@ public class WriteAfterCloseIT extends 
AccumuloClusterHarness {
     try (AccumuloClient c = Accumulo.newClient().from(props).build()) {
       c.tableOperations().create(table, ntc);
 
-      List<Future<?>> futures = new ArrayList<>();
-
-      for (int i = 0; i < 100; i++) {
-        futures.add(
-            executor.submit(createWriteTask(i * 1000, c, table, timeout, 
useConditionalWriter)));
+      int numTasks = 100;
+      List<Future<?>> futures = new ArrayList<>(numTasks);
+      // synchronize start of a portion of the tasks
+      CountDownLatch startLatch = new CountDownLatch(32);
+      assertTrue(numTasks >= startLatch.getCount(),
+          "Not enough tasks to satisfy latch count - deadlock risk");
+
+      for (int i = 0; i < numTasks; i++) {
+        final int row = i * 1000;
+        futures.add(executor.submit(() -> {
+          try {
+            startLatch.countDown();
+            startLatch.await();
+            createWriteTask(row, c, table, timeout, 
useConditionalWriter).call();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }));
       }
+      assertEquals(numTasks, futures.size());
 
       if (killTservers) {
         Thread.sleep(250);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/conf/PropStoreConfigIT_SimpleSuite.java
 
b/test/src/main/java/org/apache/accumulo/test/conf/PropStoreConfigIT_SimpleSuite.java
index c73341940a..08b053197c 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/conf/PropStoreConfigIT_SimpleSuite.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/conf/PropStoreConfigIT_SimpleSuite.java
@@ -29,11 +29,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -574,11 +575,18 @@ public class PropStoreConfigIT_SimpleSuite extends 
SharedMiniClusterBase {
    * if any single modification is lost it can be detected.
    */
   private static void runConcurrentPropsModificationTest(PropertyShim 
propShim) throws Exception {
-    ExecutorService executor = Executors.newFixedThreadPool(4);
+    final int numTasks = 4;
+    ExecutorService executor = Executors.newFixedThreadPool(numTasks);
+    CountDownLatch startLatch = new CountDownLatch(numTasks);
+    assertTrue(numTasks >= startLatch.getCount(),
+        "Not enough tasks/threads to satisfy latch count - deadlock risk");
+    var tasks = new ArrayList<Callable<Void>>(numTasks);
 
     final int iterations = 151;
 
-    Callable<Void> task1 = () -> {
+    tasks.add(() -> {
+      startLatch.countDown();
+      startLatch.await();
       for (int i = 0; i < iterations; i++) {
 
         Map<String,String> prevProps = null;
@@ -619,9 +627,11 @@ public class PropStoreConfigIT_SimpleSuite extends 
SharedMiniClusterBase {
         }
       }
       return null;
-    };
+    });
 
-    Callable<Void> task2 = () -> {
+    tasks.add(() -> {
+      startLatch.countDown();
+      startLatch.await();
       for (int i = 0; i < iterations; i++) {
         propShim.modifyProperties(tableProps -> {
           int B = Integer.parseInt(tableProps.getOrDefault("general.custom.B", 
"0"));
@@ -632,9 +642,11 @@ public class PropStoreConfigIT_SimpleSuite extends 
SharedMiniClusterBase {
         });
       }
       return null;
-    };
+    });
 
-    Callable<Void> task3 = () -> {
+    tasks.add(() -> {
+      startLatch.countDown();
+      startLatch.await();
       for (int i = 0; i < iterations; i++) {
         propShim.modifyProperties(tableProps -> {
           int B = Integer.parseInt(tableProps.getOrDefault("general.custom.B", 
"0"));
@@ -643,9 +655,11 @@ public class PropStoreConfigIT_SimpleSuite extends 
SharedMiniClusterBase {
         });
       }
       return null;
-    };
+    });
 
-    Callable<Void> task4 = () -> {
+    tasks.add(() -> {
+      startLatch.countDown();
+      startLatch.await();
       for (int i = 0; i < iterations; i++) {
         propShim.modifyProperties(tableProps -> {
           int E = Integer.parseInt(tableProps.getOrDefault("general.custom.E", 
"0"));
@@ -653,10 +667,12 @@ public class PropStoreConfigIT_SimpleSuite extends 
SharedMiniClusterBase {
         });
       }
       return null;
-    };
+    });
+
+    assertEquals(numTasks, tasks.size());
 
     // run all of the above task concurrently
-    for (Future<Void> future : executor.invokeAll(List.of(task1, task2, task3, 
task4))) {
+    for (Future<Void> future : executor.invokeAll(tasks)) {
       // see if there were any exceptions in the background thread and wait 
for it to finish
       future.get();
     }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreITBase.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreITBase.java
index e1891bc3e9..2198b73767 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreITBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreITBase.java
@@ -40,6 +40,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -606,14 +607,22 @@ public abstract class FateStoreITBase extends 
SharedMiniClusterBase
         new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new 
Text("aaa"));
     FateKey fateKey = FateKey.forSplit(ke);
 
-    var executor = Executors.newFixedThreadPool(10);
+    final int numTasks = 10;
+    var executor = Executors.newFixedThreadPool(numTasks);
+    List<Future<Optional<FateId>>> futures = new ArrayList<>(numTasks);
+    CountDownLatch startLatch = new CountDownLatch(numTasks);
+    assertTrue(numTasks >= startLatch.getCount(),
+        "Not enough tasks/threads to satisfy latch count - deadlock risk");
     try {
-      // have 10 threads all try to seed the same fate key, only one should 
succeed.
-      List<Future<Optional<FateId>>> futures = new ArrayList<>(10);
-      for (int i = 0; i < 10; i++) {
-        futures.add(executor
-            .submit(() -> seedTransaction(store, TEST_FATE_OP, fateKey, new 
TestRepo(), true)));
+      // have all threads try to seed the same fate key, only one should 
succeed.
+      for (int i = 0; i < numTasks; i++) {
+        futures.add(executor.submit(() -> {
+          startLatch.countDown();
+          startLatch.await();
+          return seedTransaction(store, TEST_FATE_OP, fateKey, new TestRepo(), 
true);
+        }));
       }
+      assertEquals(numTasks, futures.size());
 
       int idsSeen = 0;
       for (var future : futures) {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/meta/ZooMutatorIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/meta/ZooMutatorIT.java
index aba0eb0f49..278996fd21 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/meta/ZooMutatorIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/ZooMutatorIT.java
@@ -26,8 +26,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
@@ -87,20 +89,22 @@ public class ZooMutatorIT extends WithTestNames {
     try (var testZk = new ZooKeeperTestingServer(newFolder.toFile()); var zk = 
testZk.newClient()) {
       var zrw = zk.asReaderWriter();
 
-      var executor = Executors.newFixedThreadPool(16);
+      final int numTasks = 16;
+      var executor = Executors.newFixedThreadPool(numTasks);
 
       String initialData = hash("Accumulo Zookeeper Mutator test data") + " 0";
 
-      List<Future<?>> futures = new ArrayList<>();
+      List<Future<List<Integer>>> futures = new ArrayList<>(numTasks);
+      CountDownLatch startLatch = new CountDownLatch(numTasks);
+      assertTrue(numTasks >= startLatch.getCount(),
+          "Not enough tasks/threads to satisfy latch count - deadlock risk");
 
-      // This map is used to ensure multiple threads do not successfully write 
the same value and no
-      // values are skipped. The hash in the value also verifies similar 
things in a different way.
-      ConcurrentHashMap<Integer,Integer> countCounts = new 
ConcurrentHashMap<>();
-
-      for (int i = 0; i < 16; i++) {
+      for (int i = 0; i < numTasks; i++) {
         futures.add(executor.submit(() -> {
+          List<Integer> observedCounts = new ArrayList<>();
           try {
-
+            startLatch.countDown();
+            startLatch.await();
             int count = -1;
             while (count < 200) {
               byte[] val =
@@ -108,18 +112,24 @@ public class ZooMutatorIT extends WithTestNames {
               int nextCount = getCount(val);
               assertTrue(nextCount > count, "nextCount <= count " + nextCount 
+ " " + count);
               count = nextCount;
-              countCounts.merge(count, 1, Integer::sum);
+              observedCounts.add(count);
             }
-
+            return observedCounts;
           } catch (Exception e) {
             throw new RuntimeException(e);
           }
         }));
       }
+      assertEquals(numTasks, futures.size());
+
+      // collect observed counts from all threads to ensure no values are 
duplicated or skipped
+      Map<Integer,Integer> countCounts = new HashMap<>();
 
-      // wait and check for errors in background threads
-      for (Future<?> future : futures) {
-        future.get();
+      for (Future<List<Integer>> future : futures) {
+        List<Integer> observedCounts = future.get();
+        for (Integer count : observedCounts) {
+          countCounts.put(count, countCounts.getOrDefault(count, 0) + 1);
+        }
       }
       executor.shutdown();
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
index a7c812d0d4..95e5ca2dee 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
@@ -49,6 +49,7 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -936,9 +937,15 @@ public class BulkNewIT extends SharedMiniClusterBase {
       final int N = 100;
 
       ExecutorService executor;
+      CountDownLatch startLatch;
       if (parallelBulkImports) {
-        executor = Executors.newFixedThreadPool(16);
+        final int numThreads = 16;
+        executor = Executors.newFixedThreadPool(numThreads);
+        startLatch = new CountDownLatch(numThreads); // wait for a portion of 
the tasks to be ready
+        assertTrue(N >= startLatch.getCount(),
+            "Not enough tasks/threads to satisfy latch count - deadlock risk");
       } else {
+        startLatch = null;
         // execute the bulk imports in the current thread which will cause 
them to run serially
         executor = MoreExecutors.newDirectExecutorService();
       }
@@ -951,12 +958,17 @@ public class BulkNewIT extends SharedMiniClusterBase {
           for (int f = 0; f < 10; f++) {
             writeData(fs, iterationDir + "/f" + f + ".", aconf, f * 1000, (f + 
1) * 1000 - 1);
           }
+          if (parallelBulkImports) {
+            startLatch.countDown();
+            startLatch.await();
+          }
           
c.tableOperations().importDirectory(iterationDir).to(tableName).tableTime(true).load();
           getCluster().getFileSystem().delete(new Path(iterationDir), true);
         } catch (Exception e) {
           throw new IllegalStateException(e);
         }
       })).collect(Collectors.toList());
+      assertEquals(N, futures.size());
 
       // wait for all bulk imports and check for errors in background threads
       for (var future : futures) {
@@ -1102,8 +1114,13 @@ public class BulkNewIT extends SharedMiniClusterBase {
           .collect(Collectors.toCollection(TreeSet::new));
       c.tableOperations().addSplits(tableName, splits);
 
-      var executor = Executors.newFixedThreadPool(16);
-      var futures = new ArrayList<Future<?>>();
+      final int numTasks = 16;
+      var executor = Executors.newFixedThreadPool(numTasks);
+      var futures = new ArrayList<Future<?>>(numTasks);
+      // wait for a portion of the tasks to be ready
+      CountDownLatch startLatch = new CountDownLatch(numTasks);
+      assertTrue(numTasks >= startLatch.getCount(),
+          "Not enough tasks/threads to satisfy latch count - deadlock risk");
 
       var loadPlanBuilder = LoadPlan.builder();
       var rowsExpected = new HashSet<>();
@@ -1115,12 +1132,15 @@ public class BulkNewIT extends SharedMiniClusterBase {
         loadPlanBuilder.loadFileTo(filename + RFile.EXTENSION, 
RangeType.TABLE, row(data - 1),
             row(data));
         var future = executor.submit(() -> {
+          startLatch.countDown();
+          startLatch.await();
           writeData(fs, dir + "/" + filename, aconf, data, data);
           return null;
         });
         futures.add(future);
         rowsExpected.add(row(data));
       }
+      assertEquals(imports.size(), futures.size());
 
       for (var future : futures) {
         future.get();
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index c3e4e1df40..20187c78d1 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -1067,21 +1067,28 @@ public class CompactionIT extends CompactionITBase {
 
       // start a bunch of compactions in the background
       var executor = Executors.newCachedThreadPool();
-      List<Future<?>> futures = new ArrayList<>();
+      final int numTasks = 20;
+      List<Future<?>> futures = new ArrayList<>(numTasks);
+      CountDownLatch startLatch = new CountDownLatch(numTasks);
+      assertTrue(numTasks >= startLatch.getCount(),
+          "Not enough tasks/threads to satisfy latch count - deadlock risk");
       // start user compactions on a subset of the tables tablets, system 
compactions should attempt
       // to run on all tablets. With concurrency should get a mix.
-      for (int i = 1; i < 20; i++) {
+      for (int i = 1; i < numTasks + 1; i++) {
         var startRow = new Text(String.format("r:%04d", i - 1));
         var endRow = new Text(String.format("r:%04d", i));
+        final CompactionConfig config = new CompactionConfig();
+        config.setWait(true);
+        config.setStartRow(startRow);
+        config.setEndRow(endRow);
         futures.add(executor.submit(() -> {
-          CompactionConfig config = new CompactionConfig();
-          config.setWait(true);
-          config.setStartRow(startRow);
-          config.setEndRow(endRow);
+          startLatch.countDown();
+          startLatch.await();
           client.tableOperations().compact(table, config);
           return null;
         }));
       }
+      assertEquals(numTasks, futures.size());
 
       log.debug("Waiting for offline");
       // take tablet offline while there are concurrent compactions
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
index fab780f121..7072581acd 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.test.functional;
 import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -83,8 +84,10 @@ public class ConcurrentDeleteTableIT extends 
AccumuloClusterHarness {
         count++;
 
         final CountDownLatch cdl = new CountDownLatch(numDeleteOps);
+        assertTrue(numDeleteOps >= cdl.getCount(),
+            "Not enough tasks/threads to satisfy latch count - deadlock risk");
 
-        List<Future<?>> futures = new ArrayList<>();
+        List<Future<?>> futures = new ArrayList<>(numDeleteOps);
 
         for (int i = 0; i < numDeleteOps; i++) {
           futures.add(es.submit(() -> {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java
index 69afdfe943..7e2df20c76 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java
@@ -19,10 +19,13 @@
 package org.apache.accumulo.test.functional;
 
 import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
@@ -83,20 +86,28 @@ public class FateStarvationIT extends 
AccumuloClusterHarness {
 
       List<Text> splits = new ArrayList<>(TestIngest.getSplitPoints(0, 100000, 
67));
 
-      List<Future<?>> futures = new ArrayList<>();
+      int numTasks = 100;
+      List<Future<?>> futures = new ArrayList<>(numTasks);
       var executor = Executors.newCachedThreadPool();
+      // wait for a portion of the tasks to be ready
+      CountDownLatch startLatch = new CountDownLatch(32);
+      assertTrue(numTasks >= startLatch.getCount(),
+          "Not enough tasks to satisfy latch count - deadlock risk");
 
-      for (int i = 0; i < 100; i++) {
+      for (int i = 0; i < numTasks; i++) {
         int idx1 = RANDOM.get().nextInt(splits.size() - 1);
         int idx2 = RANDOM.get().nextInt(splits.size() - (idx1 + 1)) + idx1 + 1;
 
         var future = executor.submit(() -> {
+          startLatch.countDown();
+          startLatch.await();
           c.tableOperations().compact(tableName, splits.get(idx1), 
splits.get(idx2), false, true);
           return null;
         });
 
         futures.add(future);
       }
+      assertEquals(numTasks, futures.size());
 
       log.debug("Started compactions");
 


Reply via email to