This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/main by this push: new 3a4a09f Create new test that creates lots of splits (#267) 3a4a09f is described below commit 3a4a09f62a0a5d60400b82b094faf5ba970bcd4f Author: Dom G <domgargu...@apache.org> AuthorDate: Fri Jan 5 16:14:43 2024 -0500 Create new test that creates lots of splits (#267) Co-authored-by: Keith Turner <ktur...@apache.org> --- bin/cingest | 9 +- conf/accumulo-testing.properties | 23 ++ .../org/apache/accumulo/testing/TestProps.java | 14 + .../testing/continuous/ContinuousIngest.java | 197 +++++++------- .../accumulo/testing/continuous/CreateTable.java | 63 +++-- .../accumulo/testing/continuous/ManySplits.java | 289 +++++++++++++++++++++ 6 files changed, 474 insertions(+), 121 deletions(-) diff --git a/bin/cingest b/bin/cingest index 43328ea..4ef312c 100755 --- a/bin/cingest +++ b/bin/cingest @@ -28,14 +28,16 @@ Usage: cingest <application> {-o test.<prop>=<value>} Available applications: - createtable Creates Accumulo table for continous ingest + createtable Creates Accumulo table for continuous ingest ingest Inserts data into Accumulo that will form random graph. walk Randomly walks the graph using a scanner batchwalk Randomly walks the graph using a batch scanner scan Scans the graph - verify Verifies continous ingest test. Stop ingest before running. + verify Verifies continuous ingest test. Stop ingest before running. moru Stresses Accumulo by reading and writing to the ingest table. Stop ingest before running. + manysplits Repeatedly lowers the split threshold on a table to create + many splits in order to test split performance bulk Create RFiles in a Map Reduce job and calls importDirectory if successful EOF } @@ -69,6 +71,9 @@ case "$1" in moru) ci_main="${ci_package}.ContinuousMoru" ;; + manysplits) + ci_main="${ci_package}.ManySplits" + ;; bulk) if [ "$#" -ne 2 ]; then echo "Usage : $0 $1 <bulk dir>" diff --git a/conf/accumulo-testing.properties b/conf/accumulo-testing.properties index b2cbb5e..93c3227 100644 --- a/conf/accumulo-testing.properties +++ b/conf/accumulo-testing.properties @@ -140,6 +140,29 @@ test.ci.bulk.map.nodes=1000000 # produce a bulk import file. test.ci.bulk.reducers.max=1024 +# Splits Scaling +# ----------- +# The number of tables to create +test.ci.split.table.count=3 +# Minimum random row to generate +test.ci.split.ingest.row.min=0 +# Maximum random row to generate +test.ci.split.ingest.row.max=9223372036854775807 +# Maximum number of random column families to generate +test.ci.split.ingest.max.cf=32767 +# Maximum number of random column qualifiers to generate +test.ci.split.ingest.max.cq=32767 +# The number of tablets to create on each table on table creation +test.ci.split.initial.tablets=1 +# The amount of data to write to each table +test.ci.split.write.size=10000000 +# The split threshold to set for each table on creation +test.ci.split.threshold=1G +# The factor to reduce the split threshold by for each iteration of the test +test.ci.split.threshold.reduction.factor=10 +# Number of rounds to run the test +test.ci.split.test.rounds=3 + ############################### # Garbage Collection Simulation ############################### diff --git a/src/main/java/org/apache/accumulo/testing/TestProps.java b/src/main/java/org/apache/accumulo/testing/TestProps.java index 8d094bf..aa8e9e6 100644 --- a/src/main/java/org/apache/accumulo/testing/TestProps.java +++ b/src/main/java/org/apache/accumulo/testing/TestProps.java @@ -41,6 +41,7 @@ public class TestProps { private static final String CI_SCANNER = CI + "scanner."; private static final String CI_VERIFY = CI + "verify."; private static final String CI_BULK = CI + "bulk."; + private static final String CI_SPLIT = CI + "split."; public static final String TERASORT = PREFIX + "terasort."; public static final String ROWHASH = PREFIX + "rowhash."; @@ -148,6 +149,19 @@ public class TestProps { public static final String CI_BULK_MAP_NODES = CI_BULK + "map.nodes"; public static final String CI_BULK_REDUCERS = CI_BULK + "reducers.max"; + /** Split **/ + public static final String CI_SPLIT_TABLE_COUNT = CI_SPLIT + "table.count"; + public static final String CI_SPLIT_INGEST_ROW_MIN = CI_SPLIT + "ingest.row.min"; + public static final String CI_SPLIT_INGEST_ROW_MAX = CI_SPLIT + "ingest.row.max"; + public static final String CI_SPLIT_INGEST_MAX_CF = CI_SPLIT + "ingest.max.cf"; + public static final String CI_SPLIT_INGEST_MAX_CQ = CI_SPLIT + "ingest.max.cq"; + public static final String CI_SPLIT_INITIAL_TABLETS = CI_SPLIT + "initial.tablets"; + public static final String CI_SPLIT_WRITE_SIZE = CI_SPLIT + "write.size"; + public static final String CI_SPLIT_THRESHOLD = CI_SPLIT + "threshold"; + public static final String CI_SPLIT_THRESHOLD_REDUCTION_FACTOR = + CI_SPLIT + "threshold.reduction.factor"; + public static final String CI_SPLIT_TEST_ROUNDS = CI_SPLIT + "test.rounds"; + /** TeraSort **/ public static final String TERASORT_TABLE = TERASORT + "table"; public static final String TERASORT_NUM_ROWS = TERASORT + "num.rows"; diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java index 34043af..1bb32a5 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java @@ -106,125 +106,134 @@ public class ContinuousIngest { final long rowMin = env.getRowMin(); final long rowMax = env.getRowMax(); - Preconditions.checkState(0 <= rowMin && rowMin <= rowMax, - "Bad rowMin/rowMax, must conform to: 0 <= rowMin <= rowMax"); - String tableName = env.getAccumuloTableName(); - if (!client.tableOperations().exists(tableName)) { - throw new TableNotFoundException(null, tableName, - "Consult the README and create the table before starting ingest."); - } + Properties testProps = env.getTestProperties(); + final int maxColF = env.getMaxColF(); + final int maxColQ = env.getMaxColQ(); + Random random = env.getRandom(); + final long numEntries = + Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_CLIENT_ENTRIES)); + final boolean checksum = + Boolean.parseBoolean(testProps.getProperty(TestProps.CI_INGEST_CHECKSUM)); - byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8); - log.info("Ingest instance ID: {} current time: {}ms", new String(ingestInstanceId, UTF_8), - System.currentTimeMillis()); + doIngest(client, rowMin, rowMax, tableName, testProps, maxColF, maxColQ, numEntries, checksum, + random); + } + } - Properties testProps = env.getTestProperties(); + protected static void doIngest(AccumuloClient client, long rowMin, long rowMax, String tableName, + Properties testProps, int maxColF, int maxColQ, long numEntries, boolean checksum, + Random random) + throws TableNotFoundException, MutationsRejectedException, InterruptedException { + Preconditions.checkState(0 <= rowMin && rowMin <= rowMax, + "Bad rowMin/rowMax, must conform to: 0 <= rowMin <= rowMax"); - long entriesWritten = 0L; - long entriesDeleted = 0L; - final int flushInterval = getFlushEntries(testProps); - log.info("A flush will occur after every {} entries written", flushInterval); - final int maxDepth = 25; + if (!client.tableOperations().exists(tableName)) { + throw new TableNotFoundException(null, tableName, + "Consult the README and create the table before starting ingest."); + } - // always want to point back to flushed data. This way the previous item should - // always exist in accumulo when verifying data. To do this make insert N point - // back to the row from insert (N - flushInterval). The array below is used to keep - // track of all inserts. - MutationInfo[][] nodeMap = new MutationInfo[maxDepth][flushInterval]; + byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8); + log.info("Ingest instance ID: {} current time: {}ms", new String(ingestInstanceId, UTF_8), + System.currentTimeMillis()); - long lastFlushTime = System.currentTimeMillis(); + long entriesWritten = 0L; + long entriesDeleted = 0L; + final int flushInterval = getFlushEntries(testProps); + log.info("A flush will occur after every {} entries written", flushInterval); + final int maxDepth = 25; - final int maxColF = env.getMaxColF(); - final int maxColQ = env.getMaxColQ(); - final boolean checksum = - Boolean.parseBoolean(testProps.getProperty(TestProps.CI_INGEST_CHECKSUM)); - final long numEntries = - Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_CLIENT_ENTRIES)); - log.info("Total entries to be written: {}", numEntries); + // always want to point back to flushed data. This way the previous item should + // always exist in accumulo when verifying data. To do this make insert N point + // back to the row from insert (N - flushInterval). The array below is used to keep + // track of all inserts. + MutationInfo[][] nodeMap = new MutationInfo[maxDepth][flushInterval]; - visibilities = parseVisibilities(testProps.getProperty(TestProps.CI_INGEST_VISIBILITIES)); + long lastFlushTime = System.currentTimeMillis(); - pauseEnabled = pauseEnabled(testProps); + log.info("Total entries to be written: {}", numEntries); - pauseMin = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MIN)); - pauseMax = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MAX)); - Preconditions.checkState(0 < pauseMin && pauseMin <= pauseMax, - "Bad pause wait min/max, must conform to: 0 < min <= max"); + visibilities = parseVisibilities(testProps.getProperty(TestProps.CI_INGEST_VISIBILITIES)); - if (pauseEnabled) { - lastPauseNs = System.nanoTime(); - pauseWaitSec = getPause(env.getRandom()); - log.info("PAUSING enabled"); - log.info("INGESTING for {}s", pauseWaitSec); - } + pauseEnabled = pauseEnabled(testProps); - final float deleteProbability = getDeleteProbability(testProps); - log.info("DELETES will occur with a probability of {}", - String.format("%.02f", deleteProbability)); + pauseMin = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MIN)); + pauseMax = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MAX)); + Preconditions.checkState(0 < pauseMin && pauseMin <= pauseMax, + "Bad pause wait min/max, must conform to: 0 < min <= max"); + + if (pauseEnabled) { + lastPauseNs = System.nanoTime(); + pauseWaitSec = getPause(random); + log.info("PAUSING enabled"); + log.info("INGESTING for {}s", pauseWaitSec); + } - try (BatchWriter bw = client.createBatchWriter(tableName)) { - out: while (true) { - ColumnVisibility cv = getVisibility(env.getRandom()); + final float deleteProbability = getDeleteProbability(testProps); + log.info("DELETES will occur with a probability of {}", + String.format("%.02f", deleteProbability)); - // generate sets nodes that link to previous set of nodes - for (int depth = 0; depth < maxDepth; depth++) { - for (int index = 0; index < flushInterval; index++) { - long rowLong = genLong(rowMin, rowMax, env.getRandom()); + try (BatchWriter bw = client.createBatchWriter(tableName)) { + out: while (true) { + ColumnVisibility cv = getVisibility(random); - byte[] prevRow = depth == 0 ? null : genRow(nodeMap[depth - 1][index].row); + // generate sets nodes that link to previous set of nodes + for (int depth = 0; depth < maxDepth; depth++) { + for (int index = 0; index < flushInterval; index++) { + long rowLong = genLong(rowMin, rowMax, random); - int cfInt = env.getRandom().nextInt(maxColF); - int cqInt = env.getRandom().nextInt(maxColQ); + byte[] prevRow = depth == 0 ? null : genRow(nodeMap[depth - 1][index].row); - nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt); - Mutation m = genMutation(rowLong, cfInt, cqInt, cv, ingestInstanceId, entriesWritten, - prevRow, checksum); - entriesWritten++; - bw.addMutation(m); - } + int cfInt = random.nextInt(maxColF); + int cqInt = random.nextInt(maxColQ); - lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime); - if (entriesWritten >= numEntries) - break out; - pauseCheck(env.getRandom()); + nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt); + Mutation m = genMutation(rowLong, cfInt, cqInt, cv, ingestInstanceId, entriesWritten, + prevRow, checksum); + entriesWritten++; + bw.addMutation(m); } - // random chance that the entries will be deleted - final boolean delete = env.getRandom().nextFloat() < deleteProbability; - - // if the previously written entries are scheduled to be deleted - if (delete) { - log.info("Deleting last portion of written entries"); - // add delete mutations in the reverse order in which they were written - for (int depth = nodeMap.length - 1; depth >= 0; depth--) { - for (int index = nodeMap[depth].length - 1; index >= 0; index--) { - MutationInfo currentNode = nodeMap[depth][index]; - Mutation m = new Mutation(genRow(currentNode.row)); - m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq)); - entriesDeleted++; - bw.addMutation(m); - } - lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime); - pauseCheck(env.getRandom()); - } - } else { - // create one big linked list, this makes all the first inserts point to something - for (int index = 0; index < flushInterval - 1; index++) { - MutationInfo firstEntry = nodeMap[0][index]; - MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1]; - Mutation m = genMutation(firstEntry.row, firstEntry.cf, firstEntry.cq, cv, - ingestInstanceId, entriesWritten, genRow(lastEntry.row), checksum); - entriesWritten++; + lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime); + if (entriesWritten >= numEntries) + break out; + pauseCheck(random); + } + + // random chance that the entries will be deleted + final boolean delete = random.nextFloat() < deleteProbability; + + // if the previously written entries are scheduled to be deleted + if (delete) { + log.info("Deleting last portion of written entries"); + // add delete mutations in the reverse order in which they were written + for (int depth = nodeMap.length - 1; depth >= 0; depth--) { + for (int index = nodeMap[depth].length - 1; index >= 0; index--) { + MutationInfo currentNode = nodeMap[depth][index]; + Mutation m = new Mutation(genRow(currentNode.row)); + m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq)); + entriesDeleted++; bw.addMutation(m); } lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime); + pauseCheck(random); } - - if (entriesWritten >= numEntries) - break out; - pauseCheck(env.getRandom()); + } else { + // create one big linked list, this makes all the first inserts point to something + for (int index = 0; index < flushInterval - 1; index++) { + MutationInfo firstEntry = nodeMap[0][index]; + MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1]; + Mutation m = genMutation(firstEntry.row, firstEntry.cf, firstEntry.cq, cv, + ingestInstanceId, entriesWritten, genRow(lastEntry.row), checksum); + entriesWritten++; + bw.addMutation(m); + } + lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime); } + + if (entriesWritten >= numEntries) + break out; + pauseCheck(random); } } } diff --git a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java index 061c9a0..ab952cc 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java @@ -41,54 +41,67 @@ public class CreateTable { try (ContinuousEnv env = new ContinuousEnv(args)) { AccumuloClient client = env.getAccumuloClient(); - String tableName = env.getAccumuloTableName(); - if (client.tableOperations().exists(tableName)) { - log.error("Accumulo table {} already exists", tableName); - System.exit(-1); - } - int numTablets = Integer.parseInt(env.getTestProperty(CI_COMMON_ACCUMULO_NUM_TABLETS)); + long rowMin = env.getRowMin(); + long rowMax = env.getRowMax(); + Map<String,String> serverProps = getProps(env, TestProps.CI_COMMON_ACCUMULO_SERVER_PROPS); + Map<String,String> tableProps = getProps(env, TestProps.CI_COMMON_ACCUMULO_TABLE_PROPS); - if (numTablets < 1) { - log.error("numTablets < 1"); - System.exit(-1); - } - if (env.getRowMin() >= env.getRowMax()) { - log.error("min >= max"); - System.exit(-1); - } + createTable(client, tableName, numTablets, rowMin, rowMax, serverProps, tableProps); + } + } + + public static void createTable(AccumuloClient client, String tableName, int numTablets, + long rowMin, long rowMax, Map<String,String> serverProps, Map<String,String> tableProps) + throws Exception { + if (client.tableOperations().exists(tableName)) { + log.error("Accumulo table {} already exists", tableName); + System.exit(-1); + } - // retrieve and set tserver props - Map<String,String> props = getProps(env, TestProps.CI_COMMON_ACCUMULO_SERVER_PROPS); + if (numTablets < 1) { + log.error("numTablets < 1"); + System.exit(-1); + } + if (rowMin >= rowMax) { + log.error("min >= max"); + System.exit(-1); + } + + // set tserver props + if (!serverProps.isEmpty()) { try { - client.instanceOperations().modifyProperties(properties -> properties.putAll(props)); + client.instanceOperations().modifyProperties(properties -> properties.putAll(serverProps)); } catch (AccumuloException | AccumuloSecurityException e) { log.error("Failed to set tserver props"); throw new Exception(e); } + } + + NewTableConfiguration ntc = new NewTableConfiguration(); + if (numTablets > 1) { SortedSet<Text> splits = new TreeSet<>(); final int numSplits = numTablets - 1; - final long distance = ((env.getRowMax() - env.getRowMin()) / numTablets) + 1; + final long distance = ((rowMax - rowMin) / numTablets) + 1; long split = distance; for (int i = 0; i < numSplits; i++) { - String s = String.format("%016x", split + env.getRowMin()); + String s = String.format("%016x", split + rowMin); while (s.charAt(s.length() - 1) == '0') { s = s.substring(0, s.length() - 1); } splits.add(new Text(s)); split += distance; } - - NewTableConfiguration ntc = new NewTableConfiguration(); ntc.withSplits(splits); - ntc.setProperties(getProps(env, TestProps.CI_COMMON_ACCUMULO_TABLE_PROPS)); + } - client.tableOperations().create(tableName, ntc); + ntc.setProperties(tableProps); - log.info("Created Accumulo table {} with {} tablets", tableName, numTablets); - } + client.tableOperations().create(tableName, ntc); + + log.info("Created Accumulo table {} with {} tablets", tableName, numTablets); } private static Map<String,String> getProps(ContinuousEnv env, String propType) { diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java b/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java new file mode 100644 index 0000000..6a96574 --- /dev/null +++ b/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.testing.continuous; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.NamespaceExistsException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.ConfigurationTypeHelper; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.testing.TestProps; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class ManySplits { + private static final Logger log = LoggerFactory.getLogger(ManySplits.class); + + private static final String NAMESPACE = "manysplits"; + + public static void main(String[] args) throws Exception { + try (ContinuousEnv env = new ContinuousEnv(args)) { + + AccumuloClient client = env.getAccumuloClient(); + Properties testProps = env.getTestProperties(); + final int tableCount = + Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_TABLE_COUNT)); + final long rowMin = Long.parseLong(testProps.getProperty(TestProps.CI_SPLIT_INGEST_ROW_MIN)); + final long rowMax = Long.parseLong(testProps.getProperty(TestProps.CI_SPLIT_INGEST_ROW_MAX)); + final int maxColF = Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_INGEST_MAX_CF)); + final int maxColQ = Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_INGEST_MAX_CQ)); + final int initialTabletCount = + Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_INITIAL_TABLETS)); + final int initialData = + Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_WRITE_SIZE)); + String initialSplitThresholdStr = testProps.getProperty(TestProps.CI_SPLIT_THRESHOLD); + final long initialSplitThreshold = + ConfigurationTypeHelper.getFixedMemoryAsBytes(initialSplitThresholdStr); + final int splitThresholdReductionFactor = + Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_THRESHOLD_REDUCTION_FACTOR)); + final int testRounds = + Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_TEST_ROUNDS)); + + // disable deletes for ingest + testProps.setProperty(TestProps.CI_INGEST_DELETE_PROBABILITY, "0.0"); + + final Random random = env.getRandom(); + + Preconditions.checkArgument(tableCount > 0, "Test cannot run without any tables"); + + final List<String> tableNames = IntStream.range(1, tableCount + 1) + .mapToObj(i -> NAMESPACE + ".table" + i).collect(Collectors.toList()); + + try { + client.namespaceOperations().create(NAMESPACE); + } catch (NamespaceExistsException e) { + log.warn("The namespace '{}' already exists. Continuing with existing namespace.", + NAMESPACE); + } + + final String firstTable = tableNames.get(0); + + Map<String,String> tableProps = + Map.of(Property.TABLE_SPLIT_THRESHOLD.getKey(), initialSplitThresholdStr); + + log.info("Properties being used to create tables for this test: {}", tableProps); + + log.info("Creating initial table: {}", firstTable); + CreateTable.createTable(client, firstTable, initialTabletCount, rowMin, rowMax, tableProps, + Map.of()); + + log.info("Ingesting {} entries into first table, {}.", initialData, firstTable); + ContinuousIngest.doIngest(client, rowMin, rowMax, firstTable, testProps, maxColF, maxColQ, + initialData, false, random); + + client.tableOperations().flush(firstTable); + + // clone tables instead of ingesting into each. it's a lot quicker + log.info("Creating {} more tables by cloning the first", tableCount - 1); + tableNames.stream().parallel().skip(1).forEach(tableName -> { + try { + client.tableOperations().clone(firstTable, tableName, true, null, null); + } catch (TableExistsException e) { + log.warn( + "table {} already exists. Continuing with existing table. Previous data will affect splits", + tableName); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + StringBuilder testResults = new StringBuilder(); + testResults.append("Test results:\n"); + testResults.append("Total test rounds: ").append(testRounds).append("\n"); + testResults.append("Table count: ").append(tableCount).append("\n"); + + SECONDS.sleep(5); + + // main loop + // reduce the split threshold then wait for the expected file size per tablet to be reached + long previousSplitThreshold = initialSplitThreshold; + for (int i = 0; i < testRounds; i++) { + + // apply the reduction factor to the previous threshold + final long splitThreshold = previousSplitThreshold / splitThresholdReductionFactor; + final String splitThresholdStr = bytesToMemoryString(splitThreshold); + final int totalSplitCountBefore = getTotalSplitCount(client, tableNames); + + log.info("Changing split threshold on all tables from {} to {}", + bytesToMemoryString(previousSplitThreshold), splitThresholdStr); + + long beforeThresholdUpdate = System.nanoTime(); + + // update the split threshold on all tables + tableNames.stream().parallel().forEach(tableName -> { + try { + client.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), + splitThresholdStr); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + log.info("Waiting for each tablet to have a sum file size <= {}", splitThresholdStr); + + // wait for all tablets to reach the expected sum file size + tableNames.stream().parallel().forEach(tableName -> { + int elapsedMillis = 0; + long sleepMillis = SECONDS.toMillis(1); + try { + // wait for each tablet to reach the expected sum file size + while (true) { + Collection<Long> tabletFileSizes = getTabletFileSizes(client, tableName).values(); + // filter out the tablets that are already the expected size + Set<Long> offendingTabletSizes = + tabletFileSizes.stream().filter(tabletFileSize -> tabletFileSize > splitThreshold) + .collect(Collectors.toSet()); + // if all tablets are good, move on + if (offendingTabletSizes.isEmpty()) { + break; + } + + elapsedMillis += sleepMillis; + // log every 3 seconds + if (elapsedMillis % SECONDS.toMillis(3) == 0) { + double averageFileSize = + offendingTabletSizes.stream().mapToLong(l -> l).average().orElse(0); + long diff = (long) (averageFileSize - splitThreshold); + log.info( + "{} tablets have file sizes not yet <= {} on table {}. Diff of avg offending file(s): {}", + offendingTabletSizes.size(), splitThresholdStr, tableName, + bytesToMemoryString(diff)); + } + MILLISECONDS.sleep(sleepMillis); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + long timeTakenNanos = System.nanoTime() - beforeThresholdUpdate; + long seconds = NANOSECONDS.toSeconds(timeTakenNanos); + long millis = NANOSECONDS.toMillis(timeTakenNanos); + + final int splitCountAfter = getTotalSplitCount(client, tableNames); + final int splitCountThisRound = splitCountAfter - totalSplitCountBefore; + + log.info( + "Time taken for all tables to reach expected total file size ({}): {} seconds ({}ms)", + splitThresholdStr, seconds, millis); + + testResults.append("Test round ").append(i).append(":\n"); + testResults.append("TABLE_SPLIT_THRESHOLD ") + .append(bytesToMemoryString(previousSplitThreshold)).append(" -> ") + .append(splitThresholdStr).append("\n"); + testResults.append("Splits count: ").append(totalSplitCountBefore).append(" -> ") + .append(splitCountAfter).append("\n"); + String splitsPerSecond = String.format("%.2f", (double) splitCountThisRound / seconds); + testResults.append("Splits per second: ").append(splitsPerSecond).append("\n"); + + previousSplitThreshold = splitThreshold; + } + + log.info("Test completed successfully."); + log.info(testResults.toString()); + log.info("Deleting tables"); + tableNames.stream().parallel().forEach(tableName -> { + try { + client.tableOperations().delete(tableName); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + log.info("Deleting namespace"); + client.namespaceOperations().delete(NAMESPACE); + + } + } + + /** + * @return a map of tablets to the sum of their file size + */ + private static Map<Text,Long> getTabletFileSizes(AccumuloClient client, String tableName) + throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + try (Scanner scanner = client.createScanner("accumulo.metadata")) { + scanner.fetchColumnFamily("file"); + scanner.setRange(getMetaRangeForTable(tableId.canonical())); + + Map<Text,Long> result = new HashMap<>(); + for (var entry : scanner) { + String encodedDFV = new String(entry.getValue().get(), UTF_8); + String[] ba = encodedDFV.split(",", 2); + long tabletFileSize = Long.parseLong(ba[0]); + result.merge(entry.getKey().getRow(), tabletFileSize, Long::sum); + } + + return result; + } + } + + public static String bytesToMemoryString(long bytes) { + if (bytes < 1024) { + return bytes + "B"; // Bytes + } else if (bytes < 1024 * 1024) { + return (bytes / 1024) + "K"; // Kilobytes + } else if (bytes < 1024 * 1024 * 1024) { + return (bytes / (1024 * 1024)) + "M"; // Megabytes + } else { + return (bytes / (1024 * 1024 * 1024)) + "G"; // Gigabytes + } + } + + private static Range getMetaRangeForTable(String tableId) { + return new Range(tableId + ";", false, tableId + "<", true); + } + + /** + * @return the total number of splits across all given tables + */ + private static int getTotalSplitCount(AccumuloClient client, List<String> tableNames) { + return tableNames.stream().parallel().mapToInt(tableName -> { + try { + return client.tableOperations().listSplits(tableName).size(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }).sum(); + } + +}