This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/main by this push:
     new c309590  fixes file count issue w/ bulk import random walk test (#297)
c309590 is described below

commit c309590f43a3eaaea37149d0d2d66611153bed4a
Author: Keith Turner <[email protected]>
AuthorDate: Fri Aug 29 16:00:08 2025 -0400

    fixes file count issue w/ bulk import random walk test (#297)
    
    Running bulk import test failed with the following error.
    
    ```
    2025-08-29T00:44:43,285 [randomwalk.bulk.BulkMinusOne] ERROR: 
java.lang.IllegalStateException: java.util.concurrent.ExecutionException: 
java.lang.IllegalArgumentException: The file 
hdfs://localhost:8020/tmp/bulk_ab3612b1-ef53-4cd5-8c28
    -7dd75de47c87/part_0.rf attempted to import to 122 tablets. Max tablets 
allowed set to 100
    java.lang.IllegalStateException: java.util.concurrent.ExecutionException: 
java.lang.IllegalArgumentException: The file 
hdfs://localhost:8020/tmp/bulk_ab3612b1-ef53-4cd5-8c28-7dd75de47c87/part_0.rf 
attempted to import to 122 tablets. Max tablets allowed set to 100
            at 
org.apache.accumulo.core.clientImpl.bulk.BulkImport.computeFileToTabletMappings(BulkImport.java:591)
 ~[accumulo-testing-shaded.jar:?]
            at 
org.apache.accumulo.core.clientImpl.bulk.BulkImport.computeMappingFromFiles(BulkImport.java:499)
 ~[accumulo-testing-shaded.jar:?]
            at 
org.apache.accumulo.core.clientImpl.bulk.BulkImport.load(BulkImport.java:155) 
~[accumulo-testing-shaded.jar:?]
            at 
org.apache.accumulo.testing.randomwalk.bulk.BulkPlusOne.bulkLoadLots(BulkPlusOne.java:98)
 ~[accumulo-testing-shaded.jar:?]
            at 
org.apache.accumulo.testing.randomwalk.bulk.BulkMinusOne.runLater(BulkMinusOne.java:34)
 ~[accumulo-testing-shaded.jar:?]
            at 
org.apache.accumulo.testing.randomwalk.bulk.BulkTest.lambda$visit$0(BulkTest.java:33)
 ~[accumulo-testing-shaded.jar:?]
            at 
org.apache.accumulo.core.trace.TraceWrappedRunnable.run(TraceWrappedRunnable.java:52)
 [accumulo-testing-shaded.jar:?]
            at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
            at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
            at 
org.apache.accumulo.core.trace.TraceWrappedRunnable.run(TraceWrappedRunnable.java:52)
 [accumulo-testing-shaded.jar:?]
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 
[?:?]
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
[?:?]
            at 
org.apache.accumulo.core.trace.TraceWrappedRunnable.run(TraceWrappedRunnable.java:52)
 [accumulo-testing-shaded.jar:?]
            at java.lang.Thread.run(Thread.java:840) [?:?]
    Caused by: java.util.concurrent.ExecutionException: 
java.lang.IllegalArgumentException: The file 
hdfs://localhost:8020/tmp/bulk_ab3612b1-ef53-4cd5-8c28-7dd75de47c87/part_0.rf 
attempted to import to 122 tablets. Max tablets allowed set
    to 100
            at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) 
~[?:?]
            at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?]
            at 
org.apache.accumulo.core.clientImpl.bulk.BulkImport.computeFileToTabletMappings(BulkImport.java:585)
 ~[accumulo-testing-shaded.jar:?]
            ... 13 more
    Caused by: java.lang.IllegalArgumentException: The file 
hdfs://localhost:8020/tmp/bulk_ab3612b1-ef53-4cd5-8c28-7dd75de47c87/part_0.rf 
attempted to import to 122 tablets. Max tablets allowed set to 100
            at 
org.apache.accumulo.core.clientImpl.bulk.BulkImport.checkTabletCount(BulkImport.java:627)
 ~[accumulo-testing-shaded.jar:?]
            at 
org.apache.accumulo.core.clientImpl.bulk.BulkImport.lambda$computeFileToTabletMappings$7(BulkImport.java:563)
 ~[accumulo-testing-shaded.jar:?]
            at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
 ~[?:?]
            ... 5 more
    ```
    
    To avoid this adjusted the tablet setting for the max number of files
    that can go to a single tablet to 1000.  Also fixed the following problems
    with the test code that made this problem harder to debug.
    
     * The test schedules lots of background task, but keeps running when
       they fail. Made the test exit when it notices a background test
       failed.  This changes required making a map synchronized.
     * Some log messages related to the test bulk import code logged a
       marker count that correlates the bulk import w/ data in the table.
       This marker count is really important for tracking down bugs w/
       missing data, but only a subet of log messages included it.  Added
       the marker count to all log messages since multiple threads
       log messages are interleaved in test output.
    
    Since the test did not immediataly fail when the bulk import call
    failed, I assumed all bulk imports were succesful and accumulo lost
    data. Looked down the wrong path for a bit, hoping these changes help
    avoid that in the future.
---
 .../org/apache/accumulo/testing/randomwalk/Module.java |  6 ++++--
 .../org/apache/accumulo/testing/randomwalk/State.java  | 18 ++++++++++++------
 .../accumulo/testing/randomwalk/bulk/BulkPlusOne.java  | 15 ++++++++-------
 .../accumulo/testing/randomwalk/bulk/BulkTest.java     | 10 ++++++++++
 .../apache/accumulo/testing/randomwalk/bulk/Setup.java |  9 +++++++--
 .../accumulo/testing/randomwalk/bulk/Verify.java       | 10 ++++++----
 6 files changed, 47 insertions(+), 21 deletions(-)

diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java 
b/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java
index aec30a3..bb78a5b 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -342,8 +343,9 @@ public class Module extends Node {
             log.debug("  " + entry.getKey() + ": " + entry.getValue());
           }
           log.debug("State information");
-          for (String key : new TreeSet<>(state.getMap().keySet())) {
-            Object value = state.getMap().get(key);
+          var stateSnapshot = new TreeMap<>(state.getMap());
+          for (String key : stateSnapshot.keySet()) {
+            Object value = stateSnapshot.get(key);
             String logMsg = "  " + key + ": ";
             if (value == null)
               logMsg += "null";
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/State.java 
b/src/main/java/org/apache/accumulo/testing/randomwalk/State.java
index c79a725..3125e86 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/State.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/State.java
@@ -19,8 +19,10 @@
 package org.apache.accumulo.testing.randomwalk;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 /**
@@ -28,7 +30,7 @@ import java.util.Random;
  */
 public class State {
 
-  private final HashMap<String,Object> stateMap = new HashMap<>();
+  private final Map<String,Object> stateMap = Collections.synchronizedMap(new 
HashMap<>());
   private final List<String> tables = new ArrayList<>();
   private final List<String> namespaces = new ArrayList<>();
   private final List<String> users = new ArrayList<>();
@@ -80,10 +82,12 @@ public class State {
    * @throws RuntimeException if state object is not present
    */
   public Object get(String key) {
-    if (!stateMap.containsKey(key)) {
-      throw new RuntimeException("State does not contain " + key);
+    synchronized (stateMap) {
+      if (!stateMap.containsKey(key)) {
+        throw new RuntimeException("State does not contain " + key);
+      }
+      return stateMap.get(key);
     }
-    return stateMap.get(key);
   }
 
   public List<String> getTableNames() {
@@ -135,8 +139,10 @@ public class State {
    *
    * @return state map
    */
-  HashMap<String,Object> getMap() {
-    return stateMap;
+  Map<String,Object> getMap() {
+    synchronized (stateMap) {
+      return Map.copyOf(stateMap);
+    }
   }
 
   /**
diff --git 
a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java 
b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java
index 39d3f5f..7287da2 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java
@@ -56,9 +56,12 @@ public class BulkPlusOne extends BulkImportTest {
   private static final Value ONE = new Value("1".getBytes());
 
   static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value 
value) throws Exception {
+    String markerColumnQualifier = String.format("%07d", 
counter.incrementAndGet());
+    String markerLog = "marker:" + markerColumnQualifier;
+
     final FileSystem fs = (FileSystem) state.get("fs");
     final Path dir = new Path(fs.getUri() + "/tmp", "bulk_" + 
UUID.randomUUID());
-    log.debug("Bulk loading from {}", dir);
+    log.debug("{} bulk loading from {}", markerLog, dir);
     final int parts = env.getRandom().nextInt(10) + 1;
 
     // The set created below should always contain 0. So its very important 
that zero is first in
@@ -70,9 +73,8 @@ public class BulkPlusOne extends BulkImportTest {
     List<String> printRows =
         startRows.stream().map(row -> String.format(FMT, 
row)).collect(Collectors.toList());
 
-    String markerColumnQualifier = String.format("%07d", 
counter.incrementAndGet());
-    log.debug("preparing bulk files with start rows " + printRows + " last row 
"
-        + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
+    log.debug("{} preparing bulk files with start rows {} last row {} marker 
", markerLog,
+        printRows, String.format(FMT, LOTS - 1));
 
     List<Integer> rows = new ArrayList<>(startRows);
     rows.add(LOTS);
@@ -80,7 +82,7 @@ public class BulkPlusOne extends BulkImportTest {
     for (int i = 0; i < parts; i++) {
       String fileName = dir + "/" + String.format("part_%d.rf", i);
 
-      log.debug("Creating {}", fileName);
+      log.debug("{} creating {}", markerLog, fileName);
       try (RFileWriter writer = 
RFile.newWriter().to(fileName).withFileSystem(fs).build()) {
         writer.startDefaultLocalityGroup();
         int start = rows.get(i);
@@ -97,8 +99,7 @@ public class BulkPlusOne extends BulkImportTest {
     env.getAccumuloClient().tableOperations().importDirectory(dir.toString())
         .to(Setup.getTableName()).tableTime(true).load();
     fs.delete(dir, true);
-    log.debug("Finished bulk import, start rows " + printRows + " last row "
-        + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
+    log.debug("{} Finished bulk import", markerLog);
   }
 
   @Override
diff --git 
a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java 
b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java
index 26e8c20..f584b5d 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java
@@ -26,12 +26,22 @@ import org.apache.accumulo.testing.randomwalk.Test;
 
 public abstract class BulkTest extends Test {
 
+  public static final String BACKGROUND_FAILURE_KEY = "sawBackgroundFailure";
+
   @Override
   public void visit(final State state, final RandWalkEnv env, Properties 
props) throws Exception {
+
+    if ((Boolean) state.get(BACKGROUND_FAILURE_KEY)) {
+      // fail the test early because a previous background task failed
+      throw new IllegalArgumentException(
+          "One or more previous background task failed, aborting test");
+    }
+
     Setup.run(state, () -> {
       try {
         runLater(state, env);
       } catch (Throwable ex) {
+        state.set(BACKGROUND_FAILURE_KEY, Boolean.TRUE);
         log.error(ex.toString(), ex);
       }
     });
diff --git 
a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java 
b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java
index 3c6d46a..3fd2d8d 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.testing.randomwalk.bulk;
 
 import java.net.InetAddress;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ThreadPoolExecutor;
 
@@ -26,6 +27,7 @@ import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.iterators.LongCombiner;
 import org.apache.accumulo.core.iterators.user.SummingCombiner;
 import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -52,14 +54,17 @@ public class Setup extends Test {
         IteratorSetting is = new IteratorSetting(10, SummingCombiner.class);
         SummingCombiner.setEncodingType(is, LongCombiner.Type.STRING);
         SummingCombiner.setCombineAllColumns(is, true);
-        tableOps.create(getTableName(), new 
NewTableConfiguration().attachIterator(is));
+        var tableProps = Map.of(Property.TABLE_BULK_MAX_TABLET_FILES.getKey(), 
"1000");
+
+        tableOps.create(getTableName(),
+            new 
NewTableConfiguration().attachIterator(is).setProperties(tableProps));
       }
     } catch (TableExistsException ex) {
       // expected if there are multiple walkers
     }
     state.setRandom(env.getRandom());
     state.set("fs", FileSystem.get(env.getHadoopConfiguration()));
-    state.set("bulkImportSuccess", "true");
+    state.set(BulkTest.BACKGROUND_FAILURE_KEY, Boolean.FALSE);
     BulkPlusOne.counter.set(0L);
     ThreadPoolExecutor e = 
ThreadPools.getServerThreadPools().getPoolBuilder("bulkImportPool")
         .numCoreThreads(MAX_POOL_SIZE).build();
diff --git 
a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java 
b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java
index 8d61149..6db6ab5 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java
@@ -59,15 +59,17 @@ public class Verify extends Test {
       lastSize = size;
       threadPool.awaitTermination(10, TimeUnit.SECONDS);
     }
-    if (!"true".equals(state.get("bulkImportSuccess"))) {
-      log.info("Not verifying bulk import test due to import failures");
-      return;
+
+    boolean errorFound = false;
+
+    if ((Boolean) state.get(BulkTest.BACKGROUND_FAILURE_KEY)) {
+      log.error("One or more background task failed");
+      errorFound = true;
     }
 
     String user = env.getAccumuloClient().whoami();
     Authorizations auths = 
env.getAccumuloClient().securityOperations().getUserAuthorizations(user);
     RowIterator rowIter;
-    boolean errorFound = false;
     try (Scanner scanner = 
env.getAccumuloClient().createScanner(Setup.getTableName(), auths)) {
       scanner.fetchColumnFamily(BulkPlusOne.CHECK_COLUMN_FAMILY);
       for (Entry<Key,Value> entry : scanner) {

Reply via email to