This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit ba3e886f486e3f6bea60352f4bab56d243665926
Merge: bef5c7d076 b5237ac40a
Author: Dom Garguilo <[email protected]>
AuthorDate: Thu Oct 16 14:14:36 2025 -0400

    Merge remote-tracking branch 'upstream/2.1'

 .../apache/accumulo/test/functional/BulkNewIT.java |  84 +++++++++++
 .../org/apache/accumulo/test/shell/ShellIT.java    | 162 ++++++++++++---------
 2 files changed, 174 insertions(+), 72 deletions(-)

diff --cc test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
index 95e5ca2dee,eb52ec2c81..782beaa3a4
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
@@@ -54,7 -53,8 +54,9 @@@ import java.util.concurrent.ExecutionEx
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Executors;
  import java.util.concurrent.Future;
+ import java.util.concurrent.TimeUnit;
 +import java.util.function.Function;
+ import java.util.function.Predicate;
  import java.util.stream.Collectors;
  import java.util.stream.IntStream;
  
@@@ -120,14 -109,20 +122,18 @@@ import org.junit.jupiter.api.AfterAll
  import org.junit.jupiter.api.BeforeAll;
  import org.junit.jupiter.api.BeforeEach;
  import org.junit.jupiter.api.Test;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
  
 +import com.google.common.collect.MoreCollectors;
 +import com.google.common.util.concurrent.MoreExecutors;
 +
  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
  
 -/**
 - * Tests new bulk import technique. For the old technique see {@link 
BulkOldIT}
 - *
 - * @since 2.0.0
 - */
  public class BulkNewIT extends SharedMiniClusterBase {
  
+   private static final Logger LOG = LoggerFactory.getLogger(BulkNewIT.class);
+ 
    @Override
    protected Duration defaultTimeout() {
      return Duration.ofMinutes(4);
@@@ -229,9 -216,87 +235,87 @@@
      }
    }
  
+   @Test
+   public void testConcurrentImportSameDirectory() throws Exception {
+     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+       final int numTasks = 16;
+       final int iterations = 3;
+       final int startRow = 0;
+       final int endRow = 199;
+ 
+       ExecutorService pool = Executors.newFixedThreadPool(numTasks);
+ 
+       try {
+         for (int i = 0; i < iterations; i++) {
+           LOG.debug("Running concurrent import iteration {}/{}", i + 1, 
iterations);
+           final String table = getUniqueNames(1)[0] + i;
+           client.tableOperations().create(table);
+ 
+           Path sourceDir = new Path(rootPath + "/concurrent/" + table + 
"_sourceDir");
+           assertTrue(fs.mkdirs(sourceDir), "Failed to create " + sourceDir);
+ 
+           writeData(fs, sourceDir + "/f.", aconf, startRow, endRow);
+ 
+           CountDownLatch startSignal = new CountDownLatch(numTasks);
+           List<Future<Boolean>> futures = new ArrayList<>(numTasks);
+           Predicate<Throwable> expectedConcurrentFailure = throwable -> {
+             if (throwable instanceof IOException || throwable instanceof 
AccumuloException) {
+               LOG.debug("Concurrent import attempt ({}) failed as expected 
with {}: {}",
+                   Thread.currentThread().getName(), 
throwable.getClass().getSimpleName(),
+                   throwable.getMessage());
+               return true;
+             }
+             return false;
+           };
+ 
+           for (int task = 0; task < numTasks; task++) {
+             futures.add(pool.submit(() -> {
+               final var importMappingOptions =
+                   
client.tableOperations().importDirectory(sourceDir.toString()).to(table);
+               try {
+                 startSignal.countDown();
+                 startSignal.await();
+                 importMappingOptions.load();
+                 return true;
+               } catch (Exception e) {
+                 if (expectedConcurrentFailure.test(e)) {
+                   return false;
+                 }
+                 throw e;
+               }
+             }));
+           }
+           assertEquals(numTasks, futures.size());
+ 
+           int success = 0;
+           int failures = 0;
+           for (Future<Boolean> future : futures) {
+             if (future.get()) {
+               success++;
+             } else {
+               failures++;
+             }
+           }
+           assertEquals(1, success, "Expected exactly one successful bulk 
import");
+           assertEquals(numTasks - 1, failures,
+               "Expected all other attempts to fail with a concurrency related 
exception");
+ 
+           try (var scanner = client.createScanner(table, 
Authorizations.EMPTY)) {
+             long count = scanner.stream().count();
+             assertEquals(endRow - startRow + 1, count);
+           }
+           client.tableOperations().delete(table);
+         }
+       } finally {
+         pool.shutdownNow();
+         pool.awaitTermination(30, TimeUnit.SECONDS);
+       }
+     }
+   }
+ 
    @Test
    public void testSetTime() throws Exception {
 -    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +    try (var client = (ClientContext) 
Accumulo.newClient().from(getClientProps()).build()) {
        tableName = "testSetTime_table1";
        NewTableConfiguration newTableConf = new NewTableConfiguration();
        // set logical time type so we can set time on bulk import
diff --cc test/src/main/java/org/apache/accumulo/test/shell/ShellIT.java
index 9a5bccafe4,bea0a69623..acf1f6be89
--- a/test/src/main/java/org/apache/accumulo/test/shell/ShellIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/shell/ShellIT.java
@@@ -342,10 -356,13 +352,10 @@@ public class ShellIT extends SharedMini
  
      // delete will show the timestamp
      exec("deletemany -r 1 -f -st", true, "[DELETED] 1 1:1 [] 1");
 -
 -    // DeleteManyCommand has its own Formatter (DeleterFormatter), so it does 
not honor the -fm flag
 -    exec("deletemany -r 2 -f -st -fm 
org.apache.accumulo.core.util.format.DateStringFormatter",
 -        true, "[DELETED] 2 2:2 [] 2");
 +    exec("deletemany -r 2 -f -st", true, "[DELETED] 2 2:2 [] 2");
  
      exec("setauths -c ", true);
-     exec("deletetable test -f", true, "Table: [test] has been deleted");
+     exec("deletetable " + table + " -f", true, "Table: [" + table + "] has 
been deleted");
    }
  
    @Test
@@@ -423,10 -445,36 +438,10 @@@
      String expectedFew = "1 123:12345 [12345678] 123456789\t12345";
      exec("scan -st", true, expected);
      exec("scan -st -f 5", true, expectedFew);
 -    // also prove that BinaryFormatter behaves same as the default
 -    exec("scan -st -fm org.apache.accumulo.core.util.format.BinaryFormatter", 
true, expected);
 -    exec("scan -st -f 5 -fm 
org.apache.accumulo.core.util.format.BinaryFormatter", true,
 -        expectedFew);
      exec("setauths -c", true);
-     exec("deletetable " + name + " -f", true, "Table: [" + name + "] has been 
deleted");
+     exec("deletetable " + table + " -f", true, "Table: [" + table + "] has 
been deleted");
    }
  
 -  @Test
 -  public void scanDateStringFormatterTest() throws IOException {
 -    Shell.log.debug("Starting scan dateStringFormatter test 
--------------------------");
 -    String table = getUniqueNames(1)[0];
 -    exec("createtable " + table, true);
 -    exec("insert r f q v -ts 0", true);
 -    @SuppressWarnings("deprecation")
 -    DateFormat dateFormat =
 -        new 
SimpleDateFormat(org.apache.accumulo.core.util.format.DateStringFormatter.DATE_FORMAT);
 -    String expected = String.format("r f:q [] %s\tv", dateFormat.format(new 
Date(0)));
 -    // historically, showing few did not pertain to ColVis or Timestamp
 -    String expectedNoTimestamp = "r f:q []\tv";
 -    exec("scan -fm org.apache.accumulo.core.util.format.DateStringFormatter 
-st", true, expected);
 -    exec("scan -fm org.apache.accumulo.core.util.format.DateStringFormatter 
-st -f 1000", true,
 -        expected);
 -    exec("scan -fm org.apache.accumulo.core.util.format.DateStringFormatter 
-st -f 5", true,
 -        expected);
 -    exec("scan -fm org.apache.accumulo.core.util.format.DateStringFormatter", 
true,
 -        expectedNoTimestamp);
 -    exec("deletetable " + table + " -f", true, "Table: [" + table + "] has 
been deleted");
 -  }
 -
    @Test
    public void grepTest() throws IOException {
      Shell.log.debug("Starting grep test --------------------------");
@@@ -625,11 -667,11 +641,10 @@@
  
    @Test
    public void propFileNotFoundTest() throws IOException {
- 
 -    String fileName = new File(tempDir, "propFile.shellit").getAbsolutePath();
 +    Path fileName = Path.of(tempDir.getPath(), "propFile.shellit");
  
      Shell.log.debug("Starting prop file not found test 
--------------------------");
 -    exec("config --propFile " + fileName, false,
 -        "FileNotFoundException: " + fileName + " (No such file or 
directory)");
 +    exec("config --propFile " + fileName, false, "NoSuchFileException: " + 
fileName);
    }
  
    @Test

Reply via email to