This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 03cede1f3d Test concurrent bulk imports from the same source directory 
(#5930)
03cede1f3d is described below

commit 03cede1f3d713ace689fd21ba39c2250b66ded37
Author: Dom G. <[email protected]>
AuthorDate: Tue Sep 30 09:13:57 2025 -0400

    Test concurrent bulk imports from the same source directory (#5930)
---
 .../apache/accumulo/test/functional/BulkNewIT.java | 88 ++++++++++++++++++++++
 1 file changed, 88 insertions(+)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
index 4ce809be4d..eb52ec2c81 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
@@ -37,6 +37,7 @@ import java.nio.file.Paths;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -47,8 +48,13 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -103,6 +109,8 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
@@ -113,6 +121,8 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
  */
 public class BulkNewIT extends SharedMiniClusterBase {
 
+  private static final Logger LOG = LoggerFactory.getLogger(BulkNewIT.class);
+
   @Override
   protected Duration defaultTimeout() {
     return Duration.ofMinutes(4);
@@ -206,6 +216,84 @@ public class BulkNewIT extends SharedMiniClusterBase {
     }
   }
 
+  @Test
+  public void testConcurrentImportSameDirectory() throws Exception {
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      final int numTasks = 16;
+      final int iterations = 3;
+      final int startRow = 0;
+      final int endRow = 199;
+
+      ExecutorService pool = Executors.newFixedThreadPool(numTasks);
+
+      try {
+        for (int i = 0; i < iterations; i++) {
+          LOG.debug("Running concurrent import iteration {}/{}", i + 1, 
iterations);
+          final String table = getUniqueNames(1)[0] + i;
+          client.tableOperations().create(table);
+
+          Path sourceDir = new Path(rootPath + "/concurrent/" + table + 
"_sourceDir");
+          assertTrue(fs.mkdirs(sourceDir), "Failed to create " + sourceDir);
+
+          writeData(fs, sourceDir + "/f.", aconf, startRow, endRow);
+
+          CountDownLatch startSignal = new CountDownLatch(numTasks);
+          List<Future<Boolean>> futures = new ArrayList<>(numTasks);
+          Predicate<Throwable> expectedConcurrentFailure = throwable -> {
+            if (throwable instanceof IOException || throwable instanceof 
AccumuloException) {
+              LOG.debug("Concurrent import attempt ({}) failed as expected 
with {}: {}",
+                  Thread.currentThread().getName(), 
throwable.getClass().getSimpleName(),
+                  throwable.getMessage());
+              return true;
+            }
+            return false;
+          };
+
+          for (int task = 0; task < numTasks; task++) {
+            futures.add(pool.submit(() -> {
+              final var importMappingOptions =
+                  
client.tableOperations().importDirectory(sourceDir.toString()).to(table);
+              try {
+                startSignal.countDown();
+                startSignal.await();
+                importMappingOptions.load();
+                return true;
+              } catch (Exception e) {
+                if (expectedConcurrentFailure.test(e)) {
+                  return false;
+                }
+                throw e;
+              }
+            }));
+          }
+          assertEquals(numTasks, futures.size());
+
+          int success = 0;
+          int failures = 0;
+          for (Future<Boolean> future : futures) {
+            if (future.get()) {
+              success++;
+            } else {
+              failures++;
+            }
+          }
+          assertEquals(1, success, "Expected exactly one successful bulk 
import");
+          assertEquals(numTasks - 1, failures,
+              "Expected all other attempts to fail with a concurrency related 
exception");
+
+          try (var scanner = client.createScanner(table, 
Authorizations.EMPTY)) {
+            long count = scanner.stream().count();
+            assertEquals(endRow - startRow + 1, count);
+          }
+          client.tableOperations().delete(table);
+        }
+      } finally {
+        pool.shutdownNow();
+        pool.awaitTermination(30, TimeUnit.SECONDS);
+      }
+    }
+  }
+
   @Test
   public void testSetTime() throws Exception {
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {

Reply via email to