This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/main by this push:
     new f810a3b  add ranges to bulk imports in Bulk RW test (#299)
f810a3b is described below

commit f810a3b1ec728d1e27aba4d7e2f5806c9319fcf7
Author: Keith Turner <[email protected]>
AuthorDate: Thu Mar 12 15:34:39 2026 -0400

    add ranges to bulk imports in Bulk RW test (#299)
---
 conf/log4j2.properties                             |   2 +-
 .../testing/randomwalk/bulk/BulkMinusOne.java      |   5 +-
 .../testing/randomwalk/bulk/BulkPlusOne.java       | 172 ++++++++++++++++++---
 .../accumulo/testing/randomwalk/bulk/Setup.java    |  10 +-
 .../accumulo/testing/randomwalk/bulk/Verify.java   |  15 +-
 5 files changed, 173 insertions(+), 31 deletions(-)

diff --git a/conf/log4j2.properties b/conf/log4j2.properties
index fcd0f45..94c52b3 100644
--- a/conf/log4j2.properties
+++ b/conf/log4j2.properties
@@ -27,7 +27,7 @@ appender.console.type = Console
 appender.console.name = STDERR
 appender.console.target = SYSTEM_ERR
 appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{ISO8601} [%c{3}] %-5p: %m%n
+appender.console.layout.pattern = %d{ISO8601} %T [%c{3}] %-5p: %m%n
 
 loggers = accumulotesting, accumulo, hadooptest, hadoopmapred, hadooputil, 
zookeepertest, curatortest
 logger.accumulotesting.name = org.apache.accumulo.testing
diff --git 
a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkMinusOne.java 
b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkMinusOne.java
index b217f16..a1de666 100644
--- 
a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkMinusOne.java
+++ 
b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkMinusOne.java
@@ -30,8 +30,7 @@ public class BulkMinusOne extends BulkImportTest {
 
   @Override
   protected void runLater(State state, RandWalkEnv env) throws Exception {
-    log.info("Decrementing");
-    BulkPlusOne.bulkLoadLots(log, state, env, negOne);
+    var bulkRange = BulkPlusOne.rangeExchange.nextDecrementRange(env);
+    BulkPlusOne.bulkLoadLots(log, state, env, bulkRange, negOne);
   }
-
 }
diff --git 
a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java 
b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java
index 7287da2..de716b4 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java
@@ -18,8 +18,12 @@
  */
 package org.apache.accumulo.testing.randomwalk.bulk;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
 import java.util.List;
+import java.util.Random;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
@@ -39,9 +43,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 
+import com.google.common.base.Preconditions;
+
 public class BulkPlusOne extends BulkImportTest {
 
   public static final int LOTS = 100000;
+  public static final int ZONES = 50;
+  public static final int ZONE_SIZE = LOTS / ZONES;
   public static final int COLS = 10;
   public static final int HEX_SIZE = (int) Math.ceil(Math.log(LOTS) / 
Math.log(16));
   public static final String FMT = "r%0" + HEX_SIZE + "x";
@@ -51,61 +59,183 @@ public class BulkPlusOne extends BulkImportTest {
           .map(t -> new Column(CHECK_COLUMN_FAMILY, 
t)).collect(Collectors.toList());
 
   public static final Text MARKER_CF = new Text("marker");
-  static final AtomicLong counter = new AtomicLong();
+
+  /**
+   * Inclusive start exclusive end zone range.
+   */
+  record BulkRange(int startZone, int endZone) {
+    public BulkRange {
+      Preconditions.checkArgument(startZone >= 0 && startZone < endZone && 
endZone <= ZONES,
+          "startZone:%s endZone:%s", startZone, endZone);
+    }
+
+    static BulkRange randomRange(Random random) {
+      int start = random.nextInt(ZONES);
+      int end = random.nextInt(ZONES);
+      if (end < start) {
+        int tmp = end;
+        end = start;
+        start = tmp;
+      }
+      if (end == start) {
+        end++;
+      }
+      return new BulkRange(start, end);
+    }
+  }
+
+  /**
+   * Every increment range must also be decremented and visa versa. This 
ensures that happens as
+   * threads doing bulk imports request ranges.
+   */
+  static class RangeExchange {
+    private Deque<BulkRange> incrementRanges = new ArrayDeque<>();
+    private Deque<BulkRange> decrementRanges = new ArrayDeque<>();
+
+    synchronized BulkRange nextIncrementRange(RandWalkEnv env) {
+      var next = incrementRanges.poll();
+      if (next == null) {
+        next = BulkRange.randomRange(env.getRandom());
+        decrementRanges.push(next);
+      }
+      return next;
+    }
+
+    synchronized BulkRange nextDecrementRange(RandWalkEnv env) {
+      var next = decrementRanges.poll();
+      if (next == null) {
+        next = BulkRange.randomRange(env.getRandom());
+        incrementRanges.push(next);
+      }
+      return next;
+    }
+
+    synchronized boolean isEmpty() {
+      return incrementRanges.isEmpty() && decrementRanges.isEmpty();
+    }
+
+    synchronized void clear() {
+      incrementRanges.clear();
+      decrementRanges.clear();
+    }
+  }
+
+  static final RangeExchange rangeExchange = new RangeExchange();
+  static final AtomicLong perZoneCounters[] = new AtomicLong[ZONES];
+  static {
+    for (int i = 0; i < perZoneCounters.length; i++) {
+      perZoneCounters[i] = new AtomicLong(0);
+    }
+  }
 
   private static final Value ONE = new Value("1".getBytes());
 
-  static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value 
value) throws Exception {
-    String markerColumnQualifier = String.format("%07d", 
counter.incrementAndGet());
-    String markerLog = "marker:" + markerColumnQualifier;
+  /**
+   * Load a plus one or minus one into a random range of the table. Overall 
this test should load a
+   * minus one into the same range for every plus one loaded (or visa versa) 
and at the end of the
+   * test the sum should be zero. In order to aid with debugging data loss, 
this test loads markers
+   * along with the plus one and minus ones. These markers help pin point 
which bulk load operation
+   * was missing. The tables row range is divided into zones and each zone has 
a one up counter that
+   * is used to generate markers. At the end of the test each zone in the 
table should have a
+   * contiguous set of markers. If a marker is missing, then the logging from 
this method should be
+   * consulted to determine the corresponding bulk import operation. Once the 
bulk import operation
+   * is found it can be followed in the Accumulo server logs. The test does 
analysis to find missing
+   * markers and prints holes. Like if the test prints that it saw marker 97 
and 99 in zone 3, then
+   * that means marker 98 is missing and the bulk import operation related to 
98 needs to be found
+   * in the test logs. When looking for missing markers, look for the correct 
zone in the test logs.
+   *
+   * All the logging in this method includes the bulk import directory uuid. 
If there is a problem,
+   * then this directory uuid can be used to find the corresponding fate uuid 
in the accumulo server
+   * logs. There should be a log message in the server logs that includes the 
bulk directory and the
+   * fate uuid.
+   */
+  static void bulkLoadLots(Logger log, State state, RandWalkEnv env, BulkRange 
bulkRange,
+      Value value) throws Exception {
+
+    long[] markers = new long[ZONES];
+    Arrays.fill(markers, -1);
+
+    // Allocate a marker for each zone and build a log message that links the 
marker for each zone
+    // to this bulk import directory name uuid.
+    StringBuilder makersBuilder = new StringBuilder("[");
+    String sep = "";
+    for (int z = bulkRange.startZone; z < bulkRange.endZone; z++) {
+      long zoneMarker = perZoneCounters[z].incrementAndGet();
+      markers[z] = zoneMarker;
+      
makersBuilder.append(sep).append(z).append(":").append(String.format("%07d", 
zoneMarker));
+      sep = ",";
+    }
+    makersBuilder.append("]");
+    String markersLog = makersBuilder.toString();
 
+    final UUID uuid = UUID.randomUUID();
     final FileSystem fs = (FileSystem) state.get("fs");
-    final Path dir = new Path(fs.getUri() + "/tmp", "bulk_" + 
UUID.randomUUID());
-    log.debug("{} bulk loading from {}", markerLog, dir);
+    final Path dir = new Path(fs.getUri() + "/tmp", "bulk_" + uuid);
+    log.debug("{} bulk loading {} over {} from {}", uuid, value, bulkRange, 
dir);
+    log.debug("{} zone markers:{}", uuid, markersLog);
     final int parts = env.getRandom().nextInt(10) + 1;
 
+    // Must mutate all rows in the zone, so expand the start and end to 
encompass all the rows in
+    // the zone.
+    final int start = bulkRange.startZone * ZONE_SIZE;
+    final int end = bulkRange.endZone * ZONE_SIZE;
+
     // The set created below should always contain 0. So its very important 
that zero is first in
     // concat below.
-    TreeSet<Integer> startRows =
-        Stream.concat(Stream.of(0), Stream.generate(() -> 
env.getRandom().nextInt(LOTS))).distinct()
-            .limit(parts).collect(Collectors.toCollection(TreeSet::new));
+    TreeSet<Integer> startRows = Stream
+        .concat(Stream.of(start),
+            Stream.generate(() -> env.getRandom().nextInt(end - start) + 
start))
+        
.distinct().limit(parts).collect(Collectors.toCollection(TreeSet::new));
 
     List<String> printRows =
         startRows.stream().map(row -> String.format(FMT, 
row)).collect(Collectors.toList());
 
-    log.debug("{} preparing bulk files with start rows {} last row {} marker 
", markerLog,
-        printRows, String.format(FMT, LOTS - 1));
+    log.debug("{} preparing bulk files with start rows {} last row {}", uuid, 
printRows,
+        String.format(FMT, end));
 
+    startRows.add(end);
     List<Integer> rows = new ArrayList<>(startRows);
-    rows.add(LOTS);
+
+    long currentZone = -1;
+    Text markerColumnQualifier = null;
 
     for (int i = 0; i < parts; i++) {
       String fileName = dir + "/" + String.format("part_%d.rf", i);
 
-      log.debug("{} creating {}", markerLog, fileName);
+      log.trace("creating {}", fileName);
       try (RFileWriter writer = 
RFile.newWriter().to(fileName).withFileSystem(fs).build()) {
         writer.startDefaultLocalityGroup();
-        int start = rows.get(i);
-        int end = rows.get(i + 1);
-        for (int j = start; j < end; j++) {
+        int partStart = rows.get(i);
+        int partEnd = rows.get(i + 1);
+        int eCount = 0;
+        for (int j = partStart; j < partEnd; j++) {
+          int zone = j / ZONE_SIZE;
+          if (currentZone != zone) {
+            Preconditions.checkState(markers[zone] > 0, "%s %s %s", zone, j, 
bulkRange);
+            markerColumnQualifier = new Text(String.format("%07d", 
markers[zone]));
+            currentZone = zone;
+          }
+
           Text row = new Text(String.format(FMT, j));
           for (Column col : COLNAMES) {
             writer.append(new Key(row, col.getColumnFamily(), 
col.getColumnQualifier()), value);
+            eCount++;
           }
-          writer.append(new Key(row, MARKER_CF, new 
Text(markerColumnQualifier)), ONE);
+          writer.append(new Key(row, MARKER_CF, markerColumnQualifier), ONE);
+          eCount++;
         }
+        log.debug("{} created {} with {} entries", uuid, fileName, eCount);
       }
     }
     env.getAccumuloClient().tableOperations().importDirectory(dir.toString())
         .to(Setup.getTableName()).tableTime(true).load();
     fs.delete(dir, true);
-    log.debug("{} Finished bulk import", markerLog);
+    log.debug("{} Finished bulk import", uuid);
   }
 
   @Override
   protected void runLater(State state, RandWalkEnv env) throws Exception {
-    log.info("Incrementing");
-    bulkLoadLots(log, state, env, ONE);
+    var bulkRange = rangeExchange.nextIncrementRange(env);
+    bulkLoadLots(log, state, env, bulkRange, ONE);
   }
-
 }
diff --git 
a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java 
b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java
index 3fd2d8d..6b193e5 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
 
 public class Setup extends Test {
 
-  private static final int MAX_POOL_SIZE = 8;
+  private static final int MAX_POOL_SIZE = 16;
   static String tableName = null;
 
   @Override
@@ -54,7 +54,8 @@ public class Setup extends Test {
         IteratorSetting is = new IteratorSetting(10, SummingCombiner.class);
         SummingCombiner.setEncodingType(is, LongCombiner.Type.STRING);
         SummingCombiner.setCombineAllColumns(is, true);
-        var tableProps = Map.of(Property.TABLE_BULK_MAX_TABLET_FILES.getKey(), 
"1000");
+        var tableProps = Map.of(Property.TABLE_BULK_MAX_TABLET_FILES.getKey(), 
"1000",
+            Property.TABLE_BULK_MAX_TABLETS.getKey(), "1000");
 
         tableOps.create(getTableName(),
             new 
NewTableConfiguration().attachIterator(is).setProperties(tableProps));
@@ -65,7 +66,10 @@ public class Setup extends Test {
     state.setRandom(env.getRandom());
     state.set("fs", FileSystem.get(env.getHadoopConfiguration()));
     state.set(BulkTest.BACKGROUND_FAILURE_KEY, Boolean.FALSE);
-    BulkPlusOne.counter.set(0L);
+    BulkPlusOne.rangeExchange.clear();
+    for (int i = 0; i < BulkPlusOne.perZoneCounters.length; i++) {
+      BulkPlusOne.perZoneCounters[i].set(0);
+    }
     ThreadPoolExecutor e = 
ThreadPools.getServerThreadPools().getPoolBuilder("bulkImportPool")
         .numCoreThreads(MAX_POOL_SIZE).build();
     state.set("pool", e);
diff --git 
a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java 
b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java
index 6db6ab5..9c465f1 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java
@@ -67,6 +67,11 @@ public class Verify extends Test {
       errorFound = true;
     }
 
+    if (!BulkPlusOne.rangeExchange.isEmpty()) {
+      log.error("BulkPlusOne.rangeExchange is not empty");
+      errorFound = true;
+    }
+
     String user = env.getAccumuloClient().whoami();
     Authorizations auths = 
env.getAccumuloClient().securityOperations().getUserAuthorizations(user);
     RowIterator rowIter;
@@ -112,9 +117,13 @@ public class Verify extends Test {
           prev = curr;
         }
 
-        if (BulkPlusOne.counter.get() != prev) {
-          log.error("Row {} does not have all markers. Current marker: {}, 
Previous marker:{}",
-              rowText, BulkPlusOne.counter.get(), prev);
+        long parsedRow = Long.parseLong(rowText.toString().substring(1), 16);
+        int zone = (int) (parsedRow / BulkPlusOne.ZONE_SIZE);
+
+        if (BulkPlusOne.perZoneCounters[zone].get() != prev) {
+          log.error(
+              "Row {} does not have all markers. Current marker: {}, Previous 
marker:{} zone:{}",
+              rowText, BulkPlusOne.perZoneCounters[zone].get(), prev, zone);
           errorFound = true;
         }
       }

Reply via email to