This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 03cede1f3d Test concurrent bulk imports from the same source directory
(#5930)
03cede1f3d is described below
commit 03cede1f3d713ace689fd21ba39c2250b66ded37
Author: Dom G. <[email protected]>
AuthorDate: Tue Sep 30 09:13:57 2025 -0400
Test concurrent bulk imports from the same source directory (#5930)
---
.../apache/accumulo/test/functional/BulkNewIT.java | 88 ++++++++++++++++++++++
1 file changed, 88 insertions(+)
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
index 4ce809be4d..eb52ec2c81 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
@@ -37,6 +37,7 @@ import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -47,8 +48,13 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -103,6 +109,8 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -113,6 +121,8 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
*/
public class BulkNewIT extends SharedMiniClusterBase {
+ private static final Logger LOG = LoggerFactory.getLogger(BulkNewIT.class);
+
@Override
protected Duration defaultTimeout() {
return Duration.ofMinutes(4);
@@ -206,6 +216,84 @@ public class BulkNewIT extends SharedMiniClusterBase {
}
}
+ @Test
+ public void testConcurrentImportSameDirectory() throws Exception {
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ final int numTasks = 16;
+ final int iterations = 3;
+ final int startRow = 0;
+ final int endRow = 199;
+
+ ExecutorService pool = Executors.newFixedThreadPool(numTasks);
+
+ try {
+ for (int i = 0; i < iterations; i++) {
+ LOG.debug("Running concurrent import iteration {}/{}", i + 1,
iterations);
+ final String table = getUniqueNames(1)[0] + i;
+ client.tableOperations().create(table);
+
+ Path sourceDir = new Path(rootPath + "/concurrent/" + table +
"_sourceDir");
+ assertTrue(fs.mkdirs(sourceDir), "Failed to create " + sourceDir);
+
+ writeData(fs, sourceDir + "/f.", aconf, startRow, endRow);
+
+ CountDownLatch startSignal = new CountDownLatch(numTasks);
+ List<Future<Boolean>> futures = new ArrayList<>(numTasks);
+ Predicate<Throwable> expectedConcurrentFailure = throwable -> {
+ if (throwable instanceof IOException || throwable instanceof
AccumuloException) {
+ LOG.debug("Concurrent import attempt ({}) failed as expected
with {}: {}",
+ Thread.currentThread().getName(),
throwable.getClass().getSimpleName(),
+ throwable.getMessage());
+ return true;
+ }
+ return false;
+ };
+
+ for (int task = 0; task < numTasks; task++) {
+ futures.add(pool.submit(() -> {
+ final var importMappingOptions =
+
client.tableOperations().importDirectory(sourceDir.toString()).to(table);
+ try {
+ startSignal.countDown();
+ startSignal.await();
+ importMappingOptions.load();
+ return true;
+ } catch (Exception e) {
+ if (expectedConcurrentFailure.test(e)) {
+ return false;
+ }
+ throw e;
+ }
+ }));
+ }
+ assertEquals(numTasks, futures.size());
+
+ int success = 0;
+ int failures = 0;
+ for (Future<Boolean> future : futures) {
+ if (future.get()) {
+ success++;
+ } else {
+ failures++;
+ }
+ }
+ assertEquals(1, success, "Expected exactly one successful bulk
import");
+ assertEquals(numTasks - 1, failures,
+ "Expected all other attempts to fail with a concurrency related
exception");
+
+ try (var scanner = client.createScanner(table,
Authorizations.EMPTY)) {
+ long count = scanner.stream().count();
+ assertEquals(endRow - startRow + 1, count);
+ }
+ client.tableOperations().delete(table);
+ }
+ } finally {
+ pool.shutdownNow();
+ pool.awaitTermination(30, TimeUnit.SECONDS);
+ }
+ }
+ }
+
@Test
public void testSetTime() throws Exception {
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {