This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/main by this push:
new f810a3b add ranges to bulk imports in Bulk RW test (#299)
f810a3b is described below
commit f810a3b1ec728d1e27aba4d7e2f5806c9319fcf7
Author: Keith Turner <[email protected]>
AuthorDate: Thu Mar 12 15:34:39 2026 -0400
add ranges to bulk imports in Bulk RW test (#299)
---
conf/log4j2.properties | 2 +-
.../testing/randomwalk/bulk/BulkMinusOne.java | 5 +-
.../testing/randomwalk/bulk/BulkPlusOne.java | 172 ++++++++++++++++++---
.../accumulo/testing/randomwalk/bulk/Setup.java | 10 +-
.../accumulo/testing/randomwalk/bulk/Verify.java | 15 +-
5 files changed, 173 insertions(+), 31 deletions(-)
diff --git a/conf/log4j2.properties b/conf/log4j2.properties
index fcd0f45..94c52b3 100644
--- a/conf/log4j2.properties
+++ b/conf/log4j2.properties
@@ -27,7 +27,7 @@ appender.console.type = Console
appender.console.name = STDERR
appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{ISO8601} [%c{3}] %-5p: %m%n
+appender.console.layout.pattern = %d{ISO8601} %T [%c{3}] %-5p: %m%n
loggers = accumulotesting, accumulo, hadooptest, hadoopmapred, hadooputil,
zookeepertest, curatortest
logger.accumulotesting.name = org.apache.accumulo.testing
diff --git
a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkMinusOne.java
b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkMinusOne.java
index b217f16..a1de666 100644
---
a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkMinusOne.java
+++
b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkMinusOne.java
@@ -30,8 +30,7 @@ public class BulkMinusOne extends BulkImportTest {
@Override
protected void runLater(State state, RandWalkEnv env) throws Exception {
- log.info("Decrementing");
- BulkPlusOne.bulkLoadLots(log, state, env, negOne);
+ var bulkRange = BulkPlusOne.rangeExchange.nextDecrementRange(env);
+ BulkPlusOne.bulkLoadLots(log, state, env, bulkRange, negOne);
}
-
}
diff --git
a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java
b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java
index 7287da2..de716b4 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java
@@ -18,8 +18,12 @@
*/
package org.apache.accumulo.testing.randomwalk.bulk;
+import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
import java.util.List;
+import java.util.Random;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
@@ -39,9 +43,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
+import com.google.common.base.Preconditions;
+
public class BulkPlusOne extends BulkImportTest {
public static final int LOTS = 100000;
+ public static final int ZONES = 50;
+ public static final int ZONE_SIZE = LOTS / ZONES;
public static final int COLS = 10;
public static final int HEX_SIZE = (int) Math.ceil(Math.log(LOTS) /
Math.log(16));
public static final String FMT = "r%0" + HEX_SIZE + "x";
@@ -51,61 +59,183 @@ public class BulkPlusOne extends BulkImportTest {
.map(t -> new Column(CHECK_COLUMN_FAMILY,
t)).collect(Collectors.toList());
public static final Text MARKER_CF = new Text("marker");
- static final AtomicLong counter = new AtomicLong();
+
+ /**
+ * Inclusive start exclusive end zone range.
+ */
+ record BulkRange(int startZone, int endZone) {
+ public BulkRange {
+ Preconditions.checkArgument(startZone >= 0 && startZone < endZone &&
endZone <= ZONES,
+ "startZone:%s endZone:%s", startZone, endZone);
+ }
+
+ static BulkRange randomRange(Random random) {
+ int start = random.nextInt(ZONES);
+ int end = random.nextInt(ZONES);
+ if (end < start) {
+ int tmp = end;
+ end = start;
+ start = tmp;
+ }
+ if (end == start) {
+ end++;
+ }
+ return new BulkRange(start, end);
+ }
+ }
+
+ /**
+ * Every increment range must also be decremented and visa versa. This
ensures that happens as
+ * threads doing bulk imports request ranges.
+ */
+ static class RangeExchange {
+ private Deque<BulkRange> incrementRanges = new ArrayDeque<>();
+ private Deque<BulkRange> decrementRanges = new ArrayDeque<>();
+
+ synchronized BulkRange nextIncrementRange(RandWalkEnv env) {
+ var next = incrementRanges.poll();
+ if (next == null) {
+ next = BulkRange.randomRange(env.getRandom());
+ decrementRanges.push(next);
+ }
+ return next;
+ }
+
+ synchronized BulkRange nextDecrementRange(RandWalkEnv env) {
+ var next = decrementRanges.poll();
+ if (next == null) {
+ next = BulkRange.randomRange(env.getRandom());
+ incrementRanges.push(next);
+ }
+ return next;
+ }
+
+ synchronized boolean isEmpty() {
+ return incrementRanges.isEmpty() && decrementRanges.isEmpty();
+ }
+
+ synchronized void clear() {
+ incrementRanges.clear();
+ decrementRanges.clear();
+ }
+ }
+
+ static final RangeExchange rangeExchange = new RangeExchange();
+ static final AtomicLong perZoneCounters[] = new AtomicLong[ZONES];
+ static {
+ for (int i = 0; i < perZoneCounters.length; i++) {
+ perZoneCounters[i] = new AtomicLong(0);
+ }
+ }
private static final Value ONE = new Value("1".getBytes());
- static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value
value) throws Exception {
- String markerColumnQualifier = String.format("%07d",
counter.incrementAndGet());
- String markerLog = "marker:" + markerColumnQualifier;
+ /**
+ * Load a plus one or minus one into a random range of the table. Overall
this test should load a
+ * minus one into the same range for every plus one loaded (or visa versa)
and at the end of the
+ * test the sum should be zero. In order to aid with debugging data loss,
this test loads markers
+ * along with the plus one and minus ones. These markers help pin point
which bulk load operation
+ * was missing. The tables row range is divided into zones and each zone has
a one up counter that
+ * is used to generate markers. At the end of the test each zone in the
table should have a
+ * contiguous set of markers. If a marker is missing, then the logging from
this method should be
+ * consulted to determine the corresponding bulk import operation. Once the
bulk import operation
+ * is found it can be followed in the Accumulo server logs. The test does
analysis to find missing
+ * markers and prints holes. Like if the test prints that it saw marker 97
and 99 in zone 3, then
+ * that means marker 98 is missing and the bulk import operation related to
98 needs to be found
+ * in the test logs. When looking for missing markers, look for the correct
zone in the test logs.
+ *
+ * All the logging in this method includes the bulk import directory uuid.
If there is a problem,
+ * then this directory uuid can be used to find the corresponding fate uuid
in the accumulo server
+ * logs. There should be a log message in the server logs that includes the
bulk directory and the
+ * fate uuid.
+ */
+ static void bulkLoadLots(Logger log, State state, RandWalkEnv env, BulkRange
bulkRange,
+ Value value) throws Exception {
+
+ long[] markers = new long[ZONES];
+ Arrays.fill(markers, -1);
+
+ // Allocate a marker for each zone and build a log message that links the
marker for each zone
+ // to this bulk import directory name uuid.
+ StringBuilder makersBuilder = new StringBuilder("[");
+ String sep = "";
+ for (int z = bulkRange.startZone; z < bulkRange.endZone; z++) {
+ long zoneMarker = perZoneCounters[z].incrementAndGet();
+ markers[z] = zoneMarker;
+
makersBuilder.append(sep).append(z).append(":").append(String.format("%07d",
zoneMarker));
+ sep = ",";
+ }
+ makersBuilder.append("]");
+ String markersLog = makersBuilder.toString();
+ final UUID uuid = UUID.randomUUID();
final FileSystem fs = (FileSystem) state.get("fs");
- final Path dir = new Path(fs.getUri() + "/tmp", "bulk_" +
UUID.randomUUID());
- log.debug("{} bulk loading from {}", markerLog, dir);
+ final Path dir = new Path(fs.getUri() + "/tmp", "bulk_" + uuid);
+ log.debug("{} bulk loading {} over {} from {}", uuid, value, bulkRange,
dir);
+ log.debug("{} zone markers:{}", uuid, markersLog);
final int parts = env.getRandom().nextInt(10) + 1;
+ // Must mutate all rows in the zone, so expand the start and end to
encompass all the rows in
+ // the zone.
+ final int start = bulkRange.startZone * ZONE_SIZE;
+ final int end = bulkRange.endZone * ZONE_SIZE;
+
// The set created below should always contain 0. So its very important
that zero is first in
// concat below.
- TreeSet<Integer> startRows =
- Stream.concat(Stream.of(0), Stream.generate(() ->
env.getRandom().nextInt(LOTS))).distinct()
- .limit(parts).collect(Collectors.toCollection(TreeSet::new));
+ TreeSet<Integer> startRows = Stream
+ .concat(Stream.of(start),
+ Stream.generate(() -> env.getRandom().nextInt(end - start) +
start))
+
.distinct().limit(parts).collect(Collectors.toCollection(TreeSet::new));
List<String> printRows =
startRows.stream().map(row -> String.format(FMT,
row)).collect(Collectors.toList());
- log.debug("{} preparing bulk files with start rows {} last row {} marker
", markerLog,
- printRows, String.format(FMT, LOTS - 1));
+ log.debug("{} preparing bulk files with start rows {} last row {}", uuid,
printRows,
+ String.format(FMT, end));
+ startRows.add(end);
List<Integer> rows = new ArrayList<>(startRows);
- rows.add(LOTS);
+
+ long currentZone = -1;
+ Text markerColumnQualifier = null;
for (int i = 0; i < parts; i++) {
String fileName = dir + "/" + String.format("part_%d.rf", i);
- log.debug("{} creating {}", markerLog, fileName);
+ log.trace("creating {}", fileName);
try (RFileWriter writer =
RFile.newWriter().to(fileName).withFileSystem(fs).build()) {
writer.startDefaultLocalityGroup();
- int start = rows.get(i);
- int end = rows.get(i + 1);
- for (int j = start; j < end; j++) {
+ int partStart = rows.get(i);
+ int partEnd = rows.get(i + 1);
+ int eCount = 0;
+ for (int j = partStart; j < partEnd; j++) {
+ int zone = j / ZONE_SIZE;
+ if (currentZone != zone) {
+ Preconditions.checkState(markers[zone] > 0, "%s %s %s", zone, j,
bulkRange);
+ markerColumnQualifier = new Text(String.format("%07d",
markers[zone]));
+ currentZone = zone;
+ }
+
Text row = new Text(String.format(FMT, j));
for (Column col : COLNAMES) {
writer.append(new Key(row, col.getColumnFamily(),
col.getColumnQualifier()), value);
+ eCount++;
}
- writer.append(new Key(row, MARKER_CF, new
Text(markerColumnQualifier)), ONE);
+ writer.append(new Key(row, MARKER_CF, markerColumnQualifier), ONE);
+ eCount++;
}
+ log.debug("{} created {} with {} entries", uuid, fileName, eCount);
}
}
env.getAccumuloClient().tableOperations().importDirectory(dir.toString())
.to(Setup.getTableName()).tableTime(true).load();
fs.delete(dir, true);
- log.debug("{} Finished bulk import", markerLog);
+ log.debug("{} Finished bulk import", uuid);
}
@Override
protected void runLater(State state, RandWalkEnv env) throws Exception {
- log.info("Incrementing");
- bulkLoadLots(log, state, env, ONE);
+ var bulkRange = rangeExchange.nextIncrementRange(env);
+ bulkLoadLots(log, state, env, bulkRange, ONE);
}
-
}
diff --git
a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java
b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java
index 3fd2d8d..6b193e5 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
public class Setup extends Test {
- private static final int MAX_POOL_SIZE = 8;
+ private static final int MAX_POOL_SIZE = 16;
static String tableName = null;
@Override
@@ -54,7 +54,8 @@ public class Setup extends Test {
IteratorSetting is = new IteratorSetting(10, SummingCombiner.class);
SummingCombiner.setEncodingType(is, LongCombiner.Type.STRING);
SummingCombiner.setCombineAllColumns(is, true);
- var tableProps = Map.of(Property.TABLE_BULK_MAX_TABLET_FILES.getKey(),
"1000");
+ var tableProps = Map.of(Property.TABLE_BULK_MAX_TABLET_FILES.getKey(),
"1000",
+ Property.TABLE_BULK_MAX_TABLETS.getKey(), "1000");
tableOps.create(getTableName(),
new
NewTableConfiguration().attachIterator(is).setProperties(tableProps));
@@ -65,7 +66,10 @@ public class Setup extends Test {
state.setRandom(env.getRandom());
state.set("fs", FileSystem.get(env.getHadoopConfiguration()));
state.set(BulkTest.BACKGROUND_FAILURE_KEY, Boolean.FALSE);
- BulkPlusOne.counter.set(0L);
+ BulkPlusOne.rangeExchange.clear();
+ for (int i = 0; i < BulkPlusOne.perZoneCounters.length; i++) {
+ BulkPlusOne.perZoneCounters[i].set(0);
+ }
ThreadPoolExecutor e =
ThreadPools.getServerThreadPools().getPoolBuilder("bulkImportPool")
.numCoreThreads(MAX_POOL_SIZE).build();
state.set("pool", e);
diff --git
a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java
b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java
index 6db6ab5..9c465f1 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java
@@ -67,6 +67,11 @@ public class Verify extends Test {
errorFound = true;
}
+ if (!BulkPlusOne.rangeExchange.isEmpty()) {
+ log.error("BulkPlusOne.rangeExchange is not empty");
+ errorFound = true;
+ }
+
String user = env.getAccumuloClient().whoami();
Authorizations auths =
env.getAccumuloClient().securityOperations().getUserAuthorizations(user);
RowIterator rowIter;
@@ -112,9 +117,13 @@ public class Verify extends Test {
prev = curr;
}
- if (BulkPlusOne.counter.get() != prev) {
- log.error("Row {} does not have all markers. Current marker: {},
Previous marker:{}",
- rowText, BulkPlusOne.counter.get(), prev);
+ long parsedRow = Long.parseLong(rowText.toString().substring(1), 16);
+ int zone = (int) (parsedRow / BulkPlusOne.ZONE_SIZE);
+
+ if (BulkPlusOne.perZoneCounters[zone].get() != prev) {
+ log.error(
+ "Row {} does not have all markers. Current marker: {}, Previous
marker:{} zone:{}",
+ rowText, BulkPlusOne.perZoneCounters[zone].get(), prev, zone);
errorFound = true;
}
}