This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git

commit 9f597c67ad13907454d5f899b2724aa54b15b866
Merge: ed97a2f 8a76d06
Author: Dom Garguilo <domgargu...@apache.org>
AuthorDate: Tue Dec 10 15:49:38 2024 -0500

    Merge remote-tracking branch 'upstream/3.1' into main

 conf/accumulo-testing.properties                   |  8 ++++
 .../testing/continuous/ContinuousIngest.java       | 52 +++++++++++++++++++++-
 2 files changed, 59 insertions(+), 1 deletion(-)

diff --cc conf/accumulo-testing.properties
index 1872491,49aa611..8faab5d
--- a/conf/accumulo-testing.properties
+++ b/conf/accumulo-testing.properties
@@@ -84,11 -89,14 +84,19 @@@ test.ci.ingest.pause.duration.max=12
  # The probability (between 0.0 and 1.0) that a set of entries will be deleted 
during continuous ingest
  # To disable deletes, set probability to 0.0
  test.ci.ingest.delete.probability=0.1
 +# If set to a path in hdfs will use bulk import instead of batch writer to 
ingest data
 +test.ci.ingest.bulk.workdir=
 +# When using bulk import to ingest data this determines how much memory can 
be used to buffer mutations before creating
 +# rfiles and importing them.
 +test.ci.ingest.bulk.memory.limit=512000000
+ # Enables Zipfian distribution for value size. If set to true, the value will 
have random bytes inserted into it with a size generated based on a Zipfian 
distribution.
+ test.ci.ingest.zipfian.enabled=true
+ # Minimum size to insert into the value when Zipfian distribution is enabled
+ test.ci.ingest.zipfian.min.size=0
+ # Maximum size to insert into the value when Zipfian distribution is enabled
+ test.ci.ingest.zipfian.max.size=10000
+ # Exponent of the Zipfian distribution
+ test.ci.ingest.zipfian.exponent=1.5
  
  # Batch walker
  # ------------
diff --cc 
src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index 776f330,bb93ca7..b4c3c9f
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@@ -43,9 -38,7 +43,10 @@@ import org.apache.accumulo.core.data.Mu
  import org.apache.accumulo.core.security.ColumnVisibility;
  import org.apache.accumulo.testing.TestProps;
  import org.apache.accumulo.testing.util.FastFormat;
+ import org.apache.commons.math3.random.RandomDataGenerator;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -66,130 -57,13 +67,137 @@@ public class ContinuousIngest 
    private static int pauseMin;
    private static int pauseMax;
  
+   private static boolean zipfianEnabled;
+   private static int minSize;
+   private static int maxSize;
+   private static double exponent;
+ 
+   private static RandomDataGenerator rnd;
+ 
 +  public interface RandomGeneratorFactory extends Supplier<LongSupplier> {
 +    static RandomGeneratorFactory create(ContinuousEnv env, AccumuloClient 
client,
 +        Supplier<SortedSet<Text>> splitSupplier, Random random) {
 +      final long rowMin = env.getRowMin();
 +      final long rowMax = env.getRowMax();
 +      Properties testProps = env.getTestProperties();
 +      final int maxTablets =
 +          
Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_MAX_TABLETS));
 +
 +      if (maxTablets == Integer.MAX_VALUE) {
 +        return new MinMaxRandomGeneratorFactory(rowMin, rowMax, random);
 +      } else {
 +        return new MaxTabletsRandomGeneratorFactory(rowMin, rowMax, 
maxTablets, splitSupplier,
 +            random);
 +      }
 +    }
 +  }
 +
 +  public static class MinMaxRandomGeneratorFactory implements 
RandomGeneratorFactory {
 +    private final LongSupplier generator;
 +
 +    public MinMaxRandomGeneratorFactory(long rowMin, long rowMax, Random 
random) {
 +      Preconditions.checkState(0 <= rowMin && rowMin <= rowMax,
 +          "Bad rowMin/rowMax, must conform to: 0 <= rowMin <= rowMax");
 +      generator = () -> ContinuousIngest.genLong(rowMin, rowMax, random);
 +    }
 +
 +    @Override
 +    public LongSupplier get() {
 +      return generator;
 +    }
 +  }
 +
 +  /**
 +   * Chooses X random tablets and only generates random rows that fall within 
those tablets.
 +   */
 +  public static class MaxTabletsRandomGeneratorFactory implements 
RandomGeneratorFactory {
 +    private final int maxTablets;
 +    private final Supplier<SortedSet<Text>> splitSupplier;
 +    private final Random random;
 +    private final long minRow;
 +    private final long maxRow;
 +
 +    public MaxTabletsRandomGeneratorFactory(long minRow, long maxRow, int 
maxTablets,
 +        Supplier<SortedSet<Text>> splitSupplier, Random random) {
 +      // writing to a single tablet does not make much sense because this 
test it predicated on
 +      // having rows in tablets point to rows in other tablet to detect errors
 +      Preconditions.checkState(maxTablets > 1, "max tablets config must be > 
1");
 +      this.maxTablets = maxTablets;
 +      this.splitSupplier = splitSupplier;
 +      this.random = random;
 +      this.minRow = minRow;
 +      this.maxRow = maxRow;
 +    }
 +
 +    @Override
 +    public LongSupplier get() {
 +      var splits = splitSupplier.get();
 +      if (splits.size() < maxTablets) {
 +        // There are less tablets so generate within the entire range
 +        return new MinMaxRandomGeneratorFactory(minRow, maxRow, random).get();
 +      } else {
 +        long prev = minRow;
 +        List<LongSupplier> allGenerators = new ArrayList<>(splits.size() + 1);
 +        for (var split : splits) {
 +          // splits are derived from inspecting rfile indexes and rfile 
indexes can shorten rows
 +          // introducing non-hex chars so need to handle non-hex chars in the 
splits
 +          // TODO this handling may not be correct, but it will not introduce 
errors but may cause
 +          // writing a small amount of data to an extra tablet.
 +          byte[] bytes = split.copyBytes();
 +          int len = bytes.length;
 +          int last = bytes.length - 1;
 +          if (bytes[last] < '0') {
 +            len = last;
 +          } else if (bytes[last] > '9' && bytes[last] < 'a') {
 +            bytes[last] = '9';
 +          } else if (bytes[last] > 'f') {
 +            bytes[last] = 'f';
 +          }
 +
 +          var splitStr = new String(bytes, 0, len, UTF_8);
 +          var splitNum = Long.parseLong(splitStr, 16) << (64 - 
splitStr.length() * 4);
 +          allGenerators.add(new MinMaxRandomGeneratorFactory(prev, splitNum, 
random).get());
 +          prev = splitNum;
 +        }
 +        allGenerators.add(new MinMaxRandomGeneratorFactory(prev, maxRow, 
random).get());
 +
 +        Collections.shuffle(allGenerators, random);
 +        var generators = List.copyOf(allGenerators.subList(0, maxTablets));
 +
 +        return () -> {
 +          // pick a generator for random tablet
 +          var generator = generators.get(random.nextInt(generators.size()));
 +          // pick a random long that falls within that tablet
 +          return generator.getAsLong();
 +        };
 +      }
 +    }
 +  }
 +
 +  public interface BatchWriterFactory {
 +    BatchWriter create(String tableName) throws TableNotFoundException;
 +
 +    static BatchWriterFactory create(AccumuloClient client, ContinuousEnv env,
 +        Supplier<SortedSet<Text>> splitSupplier) {
 +      Properties testProps = env.getTestProperties();
 +      final String bulkWorkDir = 
testProps.getProperty(TestProps.CI_INGEST_BULK_WORK_DIR);
 +      if (bulkWorkDir == null || bulkWorkDir.isBlank()) {
 +        return client::createBatchWriter;
 +      } else {
 +        try {
 +          var conf = new Configuration();
 +          var workDir = new Path(bulkWorkDir);
 +          var filesystem = workDir.getFileSystem(conf);
 +          var memLimit = 
Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_BULK_MEM_LIMIT));
 +          return tableName -> new BulkBatchWriter(client, tableName, 
filesystem, workDir, memLimit,
 +              splitSupplier);
 +        } catch (IOException e) {
 +          throw new UncheckedIOException(e);
 +        }
 +      }
 +    }
 +  }
 +
    private static ColumnVisibility getVisibility(Random rand) {
      return visibilities.get(rand.nextInt(visibilities.size()));
    }
@@@ -320,7 -181,19 +328,20 @@@
      log.info("DELETES will occur with a probability of {}",
          String.format("%.02f", deleteProbability));
  
 -    zipfianEnabled = 
Boolean.parseBoolean(testProps.getProperty("test.ci.ingest.zipfian.enabled"));
 +    try (BatchWriter bw = batchWriterFactory.create(tableName)) {
++      zipfianEnabled =
++          
Boolean.parseBoolean(testProps.getProperty("test.ci.ingest.zipfian.enabled"));
+ 
 -    if (zipfianEnabled) {
 -      minSize = 
Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.min.size"));
 -      maxSize = 
Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.max.size"));
 -      exponent = 
Double.parseDouble(testProps.getProperty("test.ci.ingest.zipfian.exponent"));
 -      rnd = new RandomDataGenerator();
++      if (zipfianEnabled) {
++        minSize = 
Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.min.size"));
++        maxSize = 
Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.max.size"));
++        exponent = 
Double.parseDouble(testProps.getProperty("test.ci.ingest.zipfian.exponent"));
++        rnd = new RandomDataGenerator();
+ 
 -      log.info("Zipfian distribution enabled with min size: {}, max size: {}, 
exponent: {}",
 -          minSize, maxSize, exponent);
 -    }
++        log.info("Zipfian distribution enabled with min size: {}, max size: 
{}, exponent: {}",
++            minSize, maxSize, exponent);
++      }
+ 
 -    try (BatchWriter bw = client.createBatchWriter(tableName)) {
        out: while (true) {
          ColumnVisibility cv = getVisibility(random);
  

Reply via email to