This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
commit 9f597c67ad13907454d5f899b2724aa54b15b866 Merge: ed97a2f 8a76d06 Author: Dom Garguilo <domgargu...@apache.org> AuthorDate: Tue Dec 10 15:49:38 2024 -0500 Merge remote-tracking branch 'upstream/3.1' into main conf/accumulo-testing.properties | 8 ++++ .../testing/continuous/ContinuousIngest.java | 52 +++++++++++++++++++++- 2 files changed, 59 insertions(+), 1 deletion(-) diff --cc conf/accumulo-testing.properties index 1872491,49aa611..8faab5d --- a/conf/accumulo-testing.properties +++ b/conf/accumulo-testing.properties @@@ -84,11 -89,14 +84,19 @@@ test.ci.ingest.pause.duration.max=12 # The probability (between 0.0 and 1.0) that a set of entries will be deleted during continuous ingest # To disable deletes, set probability to 0.0 test.ci.ingest.delete.probability=0.1 +# If set to a path in hdfs will use bulk import instead of batch writer to ingest data +test.ci.ingest.bulk.workdir= +# When using bulk import to ingest data this determines how much memory can be used to buffer mutations before creating +# rfiles and importing them. +test.ci.ingest.bulk.memory.limit=512000000 + # Enables Zipfian distribution for value size. If set to true, the value will have random bytes inserted into it with a size generated based on a Zipfian distribution. + test.ci.ingest.zipfian.enabled=true + # Minimum size to insert into the value when Zipfian distribution is enabled + test.ci.ingest.zipfian.min.size=0 + # Maximum size to insert into the value when Zipfian distribution is enabled + test.ci.ingest.zipfian.max.size=10000 + # Exponent of the Zipfian distribution + test.ci.ingest.zipfian.exponent=1.5 # Batch walker # ------------ diff --cc src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java index 776f330,bb93ca7..b4c3c9f --- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java @@@ -43,9 -38,7 +43,10 @@@ import org.apache.accumulo.core.data.Mu import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.testing.TestProps; import org.apache.accumulo.testing.util.FastFormat; + import org.apache.commons.math3.random.RandomDataGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -66,130 -57,13 +67,137 @@@ public class ContinuousIngest private static int pauseMin; private static int pauseMax; + private static boolean zipfianEnabled; + private static int minSize; + private static int maxSize; + private static double exponent; + + private static RandomDataGenerator rnd; + + public interface RandomGeneratorFactory extends Supplier<LongSupplier> { + static RandomGeneratorFactory create(ContinuousEnv env, AccumuloClient client, + Supplier<SortedSet<Text>> splitSupplier, Random random) { + final long rowMin = env.getRowMin(); + final long rowMax = env.getRowMax(); + Properties testProps = env.getTestProperties(); + final int maxTablets = + Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_MAX_TABLETS)); + + if (maxTablets == Integer.MAX_VALUE) { + return new MinMaxRandomGeneratorFactory(rowMin, rowMax, random); + } else { + return new MaxTabletsRandomGeneratorFactory(rowMin, rowMax, maxTablets, splitSupplier, + random); + } + } + } + + public static class MinMaxRandomGeneratorFactory implements RandomGeneratorFactory { + private final LongSupplier generator; + + public MinMaxRandomGeneratorFactory(long rowMin, long rowMax, Random random) { + Preconditions.checkState(0 <= rowMin && rowMin <= rowMax, + "Bad rowMin/rowMax, must conform to: 0 <= rowMin <= rowMax"); + generator = () -> ContinuousIngest.genLong(rowMin, rowMax, random); + } + + @Override + public LongSupplier get() { + return generator; + } + } + + /** + * Chooses X random tablets and only generates random rows that fall within those tablets. + */ + public static class MaxTabletsRandomGeneratorFactory implements RandomGeneratorFactory { + private final int maxTablets; + private final Supplier<SortedSet<Text>> splitSupplier; + private final Random random; + private final long minRow; + private final long maxRow; + + public MaxTabletsRandomGeneratorFactory(long minRow, long maxRow, int maxTablets, + Supplier<SortedSet<Text>> splitSupplier, Random random) { + // writing to a single tablet does not make much sense because this test it predicated on + // having rows in tablets point to rows in other tablet to detect errors + Preconditions.checkState(maxTablets > 1, "max tablets config must be > 1"); + this.maxTablets = maxTablets; + this.splitSupplier = splitSupplier; + this.random = random; + this.minRow = minRow; + this.maxRow = maxRow; + } + + @Override + public LongSupplier get() { + var splits = splitSupplier.get(); + if (splits.size() < maxTablets) { + // There are less tablets so generate within the entire range + return new MinMaxRandomGeneratorFactory(minRow, maxRow, random).get(); + } else { + long prev = minRow; + List<LongSupplier> allGenerators = new ArrayList<>(splits.size() + 1); + for (var split : splits) { + // splits are derived from inspecting rfile indexes and rfile indexes can shorten rows + // introducing non-hex chars so need to handle non-hex chars in the splits + // TODO this handling may not be correct, but it will not introduce errors but may cause + // writing a small amount of data to an extra tablet. + byte[] bytes = split.copyBytes(); + int len = bytes.length; + int last = bytes.length - 1; + if (bytes[last] < '0') { + len = last; + } else if (bytes[last] > '9' && bytes[last] < 'a') { + bytes[last] = '9'; + } else if (bytes[last] > 'f') { + bytes[last] = 'f'; + } + + var splitStr = new String(bytes, 0, len, UTF_8); + var splitNum = Long.parseLong(splitStr, 16) << (64 - splitStr.length() * 4); + allGenerators.add(new MinMaxRandomGeneratorFactory(prev, splitNum, random).get()); + prev = splitNum; + } + allGenerators.add(new MinMaxRandomGeneratorFactory(prev, maxRow, random).get()); + + Collections.shuffle(allGenerators, random); + var generators = List.copyOf(allGenerators.subList(0, maxTablets)); + + return () -> { + // pick a generator for random tablet + var generator = generators.get(random.nextInt(generators.size())); + // pick a random long that falls within that tablet + return generator.getAsLong(); + }; + } + } + } + + public interface BatchWriterFactory { + BatchWriter create(String tableName) throws TableNotFoundException; + + static BatchWriterFactory create(AccumuloClient client, ContinuousEnv env, + Supplier<SortedSet<Text>> splitSupplier) { + Properties testProps = env.getTestProperties(); + final String bulkWorkDir = testProps.getProperty(TestProps.CI_INGEST_BULK_WORK_DIR); + if (bulkWorkDir == null || bulkWorkDir.isBlank()) { + return client::createBatchWriter; + } else { + try { + var conf = new Configuration(); + var workDir = new Path(bulkWorkDir); + var filesystem = workDir.getFileSystem(conf); + var memLimit = Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_BULK_MEM_LIMIT)); + return tableName -> new BulkBatchWriter(client, tableName, filesystem, workDir, memLimit, + splitSupplier); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + } + private static ColumnVisibility getVisibility(Random rand) { return visibilities.get(rand.nextInt(visibilities.size())); } @@@ -320,7 -181,19 +328,20 @@@ log.info("DELETES will occur with a probability of {}", String.format("%.02f", deleteProbability)); - zipfianEnabled = Boolean.parseBoolean(testProps.getProperty("test.ci.ingest.zipfian.enabled")); + try (BatchWriter bw = batchWriterFactory.create(tableName)) { ++ zipfianEnabled = ++ Boolean.parseBoolean(testProps.getProperty("test.ci.ingest.zipfian.enabled")); + - if (zipfianEnabled) { - minSize = Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.min.size")); - maxSize = Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.max.size")); - exponent = Double.parseDouble(testProps.getProperty("test.ci.ingest.zipfian.exponent")); - rnd = new RandomDataGenerator(); ++ if (zipfianEnabled) { ++ minSize = Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.min.size")); ++ maxSize = Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.max.size")); ++ exponent = Double.parseDouble(testProps.getProperty("test.ci.ingest.zipfian.exponent")); ++ rnd = new RandomDataGenerator(); + - log.info("Zipfian distribution enabled with min size: {}, max size: {}, exponent: {}", - minSize, maxSize, exponent); - } ++ log.info("Zipfian distribution enabled with min size: {}, max size: {}, exponent: {}", ++ minSize, maxSize, exponent); ++ } + - try (BatchWriter bw = client.createBatchWriter(tableName)) { out: while (true) { ColumnVisibility cv = getVisibility(random);