This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit ba3e886f486e3f6bea60352f4bab56d243665926 Merge: bef5c7d076 b5237ac40a Author: Dom Garguilo <[email protected]> AuthorDate: Thu Oct 16 14:14:36 2025 -0400 Merge remote-tracking branch 'upstream/2.1' .../apache/accumulo/test/functional/BulkNewIT.java | 84 +++++++++++ .../org/apache/accumulo/test/shell/ShellIT.java | 162 ++++++++++++--------- 2 files changed, 174 insertions(+), 72 deletions(-) diff --cc test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index 95e5ca2dee,eb52ec2c81..782beaa3a4 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@@ -54,7 -53,8 +54,9 @@@ import java.util.concurrent.ExecutionEx import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; + import java.util.concurrent.TimeUnit; +import java.util.function.Function; + import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; @@@ -120,14 -109,20 +122,18 @@@ import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; +import com.google.common.collect.MoreCollectors; +import com.google.common.util.concurrent.MoreExecutors; + import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -/** - * Tests new bulk import technique. For the old technique see {@link BulkOldIT} - * - * @since 2.0.0 - */ public class BulkNewIT extends SharedMiniClusterBase { + private static final Logger LOG = LoggerFactory.getLogger(BulkNewIT.class); + @Override protected Duration defaultTimeout() { return Duration.ofMinutes(4); @@@ -229,9 -216,87 +235,87 @@@ } } + @Test + public void testConcurrentImportSameDirectory() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + final int numTasks = 16; + final int iterations = 3; + final int startRow = 0; + final int endRow = 199; + + ExecutorService pool = Executors.newFixedThreadPool(numTasks); + + try { + for (int i = 0; i < iterations; i++) { + LOG.debug("Running concurrent import iteration {}/{}", i + 1, iterations); + final String table = getUniqueNames(1)[0] + i; + client.tableOperations().create(table); + + Path sourceDir = new Path(rootPath + "/concurrent/" + table + "_sourceDir"); + assertTrue(fs.mkdirs(sourceDir), "Failed to create " + sourceDir); + + writeData(fs, sourceDir + "/f.", aconf, startRow, endRow); + + CountDownLatch startSignal = new CountDownLatch(numTasks); + List<Future<Boolean>> futures = new ArrayList<>(numTasks); + Predicate<Throwable> expectedConcurrentFailure = throwable -> { + if (throwable instanceof IOException || throwable instanceof AccumuloException) { + LOG.debug("Concurrent import attempt ({}) failed as expected with {}: {}", + Thread.currentThread().getName(), throwable.getClass().getSimpleName(), + throwable.getMessage()); + return true; + } + return false; + }; + + for (int task = 0; task < numTasks; task++) { + futures.add(pool.submit(() -> { + final var importMappingOptions = + client.tableOperations().importDirectory(sourceDir.toString()).to(table); + try { + startSignal.countDown(); + startSignal.await(); + importMappingOptions.load(); + return true; + } catch (Exception e) { + if (expectedConcurrentFailure.test(e)) { + return false; + } + throw e; + } + })); + } + assertEquals(numTasks, futures.size()); + + int success = 0; + int failures = 0; + for (Future<Boolean> future : futures) { + if (future.get()) { + success++; + } else { + failures++; + } + } + assertEquals(1, success, "Expected exactly one successful bulk import"); + assertEquals(numTasks - 1, failures, + "Expected all other attempts to fail with a concurrency related exception"); + + try (var scanner = client.createScanner(table, Authorizations.EMPTY)) { + long count = scanner.stream().count(); + assertEquals(endRow - startRow + 1, count); + } + client.tableOperations().delete(table); + } + } finally { + pool.shutdownNow(); + pool.awaitTermination(30, TimeUnit.SECONDS); + } + } + } + @Test public void testSetTime() throws Exception { - try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + try (var client = (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { tableName = "testSetTime_table1"; NewTableConfiguration newTableConf = new NewTableConfiguration(); // set logical time type so we can set time on bulk import diff --cc test/src/main/java/org/apache/accumulo/test/shell/ShellIT.java index 9a5bccafe4,bea0a69623..acf1f6be89 --- a/test/src/main/java/org/apache/accumulo/test/shell/ShellIT.java +++ b/test/src/main/java/org/apache/accumulo/test/shell/ShellIT.java @@@ -342,10 -356,13 +352,10 @@@ public class ShellIT extends SharedMini // delete will show the timestamp exec("deletemany -r 1 -f -st", true, "[DELETED] 1 1:1 [] 1"); - - // DeleteManyCommand has its own Formatter (DeleterFormatter), so it does not honor the -fm flag - exec("deletemany -r 2 -f -st -fm org.apache.accumulo.core.util.format.DateStringFormatter", - true, "[DELETED] 2 2:2 [] 2"); + exec("deletemany -r 2 -f -st", true, "[DELETED] 2 2:2 [] 2"); exec("setauths -c ", true); - exec("deletetable test -f", true, "Table: [test] has been deleted"); + exec("deletetable " + table + " -f", true, "Table: [" + table + "] has been deleted"); } @Test @@@ -423,10 -445,36 +438,10 @@@ String expectedFew = "1 123:12345 [12345678] 123456789\t12345"; exec("scan -st", true, expected); exec("scan -st -f 5", true, expectedFew); - // also prove that BinaryFormatter behaves same as the default - exec("scan -st -fm org.apache.accumulo.core.util.format.BinaryFormatter", true, expected); - exec("scan -st -f 5 -fm org.apache.accumulo.core.util.format.BinaryFormatter", true, - expectedFew); exec("setauths -c", true); - exec("deletetable " + name + " -f", true, "Table: [" + name + "] has been deleted"); + exec("deletetable " + table + " -f", true, "Table: [" + table + "] has been deleted"); } - @Test - public void scanDateStringFormatterTest() throws IOException { - Shell.log.debug("Starting scan dateStringFormatter test --------------------------"); - String table = getUniqueNames(1)[0]; - exec("createtable " + table, true); - exec("insert r f q v -ts 0", true); - @SuppressWarnings("deprecation") - DateFormat dateFormat = - new SimpleDateFormat(org.apache.accumulo.core.util.format.DateStringFormatter.DATE_FORMAT); - String expected = String.format("r f:q [] %s\tv", dateFormat.format(new Date(0))); - // historically, showing few did not pertain to ColVis or Timestamp - String expectedNoTimestamp = "r f:q []\tv"; - exec("scan -fm org.apache.accumulo.core.util.format.DateStringFormatter -st", true, expected); - exec("scan -fm org.apache.accumulo.core.util.format.DateStringFormatter -st -f 1000", true, - expected); - exec("scan -fm org.apache.accumulo.core.util.format.DateStringFormatter -st -f 5", true, - expected); - exec("scan -fm org.apache.accumulo.core.util.format.DateStringFormatter", true, - expectedNoTimestamp); - exec("deletetable " + table + " -f", true, "Table: [" + table + "] has been deleted"); - } - @Test public void grepTest() throws IOException { Shell.log.debug("Starting grep test --------------------------"); @@@ -625,11 -667,11 +641,10 @@@ @Test public void propFileNotFoundTest() throws IOException { - - String fileName = new File(tempDir, "propFile.shellit").getAbsolutePath(); + Path fileName = Path.of(tempDir.getPath(), "propFile.shellit"); Shell.log.debug("Starting prop file not found test --------------------------"); - exec("config --propFile " + fileName, false, - "FileNotFoundException: " + fileName + " (No such file or directory)"); + exec("config --propFile " + fileName, false, "NoSuchFileException: " + fileName); } @Test
