This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
commit 506b583ca2dd7a02c4c3b190859240f77fd8d4d3 Author: Keith Turner <ktur...@apache.org> AuthorDate: Sun Nov 17 22:41:55 2024 +0000 consolidate getting table splits in the code --- .../testing/continuous/BulkBatchWriter.java | 14 ++------ .../testing/continuous/ContinuousIngest.java | 39 +++++++++++++--------- .../accumulo/testing/continuous/ManySplits.java | 6 ++-- 3 files changed, 30 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java b/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java index fe50575..6e763b3 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.SortedSet; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -43,7 +42,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; public class BulkBatchWriter implements BatchWriter { @@ -61,21 +59,13 @@ public class BulkBatchWriter implements BatchWriter { private boolean closed = false; public BulkBatchWriter(AccumuloClient client, String tableName, FileSystem fileSystem, - Path workPath, long memLimit) { + Path workPath, long memLimit, Supplier<SortedSet<Text>> splitSupplier) { this.client = client; this.tableName = tableName; this.fileSystem = fileSystem; this.workPath = workPath; this.memLimit = memLimit; - this.splitSupplier = Suppliers.memoizeWithExpiration(() -> { - try { - var splits = client.tableOperations().listSplits(tableName); - return new TreeSet<>(splits); - } catch (Exception e) { - throw new IllegalStateException(e); - } - - }, 10, TimeUnit.MINUTES); + this.splitSupplier = splitSupplier; } @Override diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java index d775785..b15cc3c 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java @@ -75,7 +75,8 @@ public class ContinuousIngest { private static RandomDataGenerator rnd; public interface RandomGeneratorFactory extends Supplier<LongSupplier> { - static RandomGeneratorFactory create(ContinuousEnv env, AccumuloClient client, Random random) { + static RandomGeneratorFactory create(ContinuousEnv env, AccumuloClient client, + Supplier<SortedSet<Text>> splitSupplier, Random random) { final long rowMin = env.getRowMin(); final long rowMax = env.getRowMax(); Properties testProps = env.getTestProperties(); @@ -86,15 +87,6 @@ public class ContinuousIngest { return new MinMaxRandomGeneratorFactory(rowMin, rowMax, random); } else { var tableName = env.getAccumuloTableName(); - Supplier<SortedSet<Text>> splitSupplier = Suppliers.memoizeWithExpiration(() -> { - try { - var splits = client.tableOperations().listSplits(tableName); - return new TreeSet<>(splits); - } catch (Exception e) { - throw new IllegalStateException(e); - } - - }, 10, TimeUnit.MINUTES); return new MaxTabletsRandomGeneratorFactory(rowMin, rowMax, maxTablets, splitSupplier, random); } @@ -142,7 +134,7 @@ public class ContinuousIngest { public LongSupplier get() { var splits = splitSupplier.get(); if (splits.size() < maxTablets) { - // There are less tablets so generate within the tablet range + // There are less tablets so generate within the entire range return new MinMaxRandomGeneratorFactory(minRow, maxRow, random).get(); } else { long prev = minRow; @@ -186,7 +178,8 @@ public class ContinuousIngest { public interface BatchWriterFactory { BatchWriter create(String tableName) throws TableNotFoundException; - static BatchWriterFactory create(AccumuloClient client, ContinuousEnv env) { + static BatchWriterFactory create(AccumuloClient client, ContinuousEnv env, + Supplier<SortedSet<Text>> splitSupplier) { Properties testProps = env.getTestProperties(); final String bulkWorkDir = testProps.getProperty(TestProps.CI_INGEST_BULK_WORK_DIR); if (bulkWorkDir == null || bulkWorkDir.isBlank()) { @@ -197,7 +190,8 @@ public class ContinuousIngest { var workDir = new Path(bulkWorkDir); var filesystem = workDir.getFileSystem(conf); var memLimit = Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_BULK_MEM_LIMIT)); - return tableName -> new BulkBatchWriter(client, tableName, filesystem, workDir, memLimit); + return tableName -> new BulkBatchWriter(client, tableName, filesystem, workDir, memLimit, + splitSupplier); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -247,6 +241,20 @@ public class ContinuousIngest { } } + static Supplier<SortedSet<Text>> createSplitSupplier(AccumuloClient client, String tableName) { + + Supplier<SortedSet<Text>> splitSupplier = Suppliers.memoizeWithExpiration(() -> { + try { + var splits = client.tableOperations().listSplits(tableName); + return new TreeSet<>(splits); + } catch (Exception e) { + throw new IllegalStateException(e); + } + + }, 10, TimeUnit.MINUTES); + return splitSupplier; + } + public static void main(String[] args) throws Exception { try (ContinuousEnv env = new ContinuousEnv(args)) { @@ -263,8 +271,9 @@ public class ContinuousIngest { final boolean checksum = Boolean.parseBoolean(testProps.getProperty(TestProps.CI_INGEST_CHECKSUM)); - var randomFactory = RandomGeneratorFactory.create(env, client, random); - var batchWriterFactory = BatchWriterFactory.create(client, env); + var splitSupplier = createSplitSupplier(client, tableName); + var randomFactory = RandomGeneratorFactory.create(env, client, splitSupplier, random); + var batchWriterFactory = BatchWriterFactory.create(client, env, splitSupplier); doIngest(client, randomFactory, batchWriterFactory, tableName, testProps, maxColF, maxColQ, numEntries, checksum, random); } diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java b/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java index b590e1a..d28f557 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java @@ -109,8 +109,10 @@ public class ManySplits { Map.of()); log.info("Ingesting {} entries into first table, {}.", initialData, firstTable); - var randomFactory = RandomGeneratorFactory.create(env, client, random); - var batchWriterFactory = ContinuousIngest.BatchWriterFactory.create(client, env); + var splitSupplier = ContinuousIngest.createSplitSupplier(client, firstTable); + var randomFactory = RandomGeneratorFactory.create(env, client, splitSupplier, random); + var batchWriterFactory = + ContinuousIngest.BatchWriterFactory.create(client, env, splitSupplier); ContinuousIngest.doIngest(client, randomFactory, batchWriterFactory, firstTable, testProps, maxColF, maxColQ, initialData, false, random);