This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a4a09f  Create new test that creates lots of splits (#267)
3a4a09f is described below

commit 3a4a09f62a0a5d60400b82b094faf5ba970bcd4f
Author: Dom G <domgargu...@apache.org>
AuthorDate: Fri Jan 5 16:14:43 2024 -0500

    Create new test that creates lots of splits (#267)
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 bin/cingest                                        |   9 +-
 conf/accumulo-testing.properties                   |  23 ++
 .../org/apache/accumulo/testing/TestProps.java     |  14 +
 .../testing/continuous/ContinuousIngest.java       | 197 +++++++-------
 .../accumulo/testing/continuous/CreateTable.java   |  63 +++--
 .../accumulo/testing/continuous/ManySplits.java    | 289 +++++++++++++++++++++
 6 files changed, 474 insertions(+), 121 deletions(-)

diff --git a/bin/cingest b/bin/cingest
index 43328ea..4ef312c 100755
--- a/bin/cingest
+++ b/bin/cingest
@@ -28,14 +28,16 @@ Usage: cingest <application> {-o test.<prop>=<value>}
 
 Available applications:
 
-    createtable   Creates Accumulo table for continous ingest
+    createtable   Creates Accumulo table for continuous ingest
     ingest        Inserts data into Accumulo that will form random graph.
     walk          Randomly walks the graph using a scanner
     batchwalk     Randomly walks the graph using a batch scanner
     scan          Scans the graph
-    verify        Verifies continous ingest test. Stop ingest before running.
+    verify        Verifies continuous ingest test. Stop ingest before running.
     moru          Stresses Accumulo by reading and writing to the ingest table.
                   Stop ingest before running.
+    manysplits    Repeatedly lowers the split threshold on a table to create
+                  many splits in order to test split performance
     bulk          Create RFiles in a Map Reduce job and calls importDirectory 
if successful
 EOF
 }
@@ -69,6 +71,9 @@ case "$1" in
   moru)
     ci_main="${ci_package}.ContinuousMoru"
     ;;
+  manysplits)
+    ci_main="${ci_package}.ManySplits"
+    ;;
   bulk)
     if [ "$#" -ne 2 ]; then
       echo "Usage : $0 $1 <bulk dir>"
diff --git a/conf/accumulo-testing.properties b/conf/accumulo-testing.properties
index b2cbb5e..93c3227 100644
--- a/conf/accumulo-testing.properties
+++ b/conf/accumulo-testing.properties
@@ -140,6 +140,29 @@ test.ci.bulk.map.nodes=1000000
 # produce a bulk import file.
 test.ci.bulk.reducers.max=1024
 
+# Splits Scaling
+# -----------
+# The number of tables to create
+test.ci.split.table.count=3
+# Minimum random row to generate
+test.ci.split.ingest.row.min=0
+# Maximum random row to generate
+test.ci.split.ingest.row.max=9223372036854775807
+# Maximum number of random column families to generate
+test.ci.split.ingest.max.cf=32767
+# Maximum number of random column qualifiers to generate
+test.ci.split.ingest.max.cq=32767
+# The number of tablets to create on each table on table creation
+test.ci.split.initial.tablets=1
+# The amount of data to write to each table
+test.ci.split.write.size=10000000
+# The split threshold to set for each table on creation
+test.ci.split.threshold=1G
+# The factor to reduce the split threshold by for each iteration of the test
+test.ci.split.threshold.reduction.factor=10
+# Number of rounds to run the test
+test.ci.split.test.rounds=3
+
 ###############################
 # Garbage Collection Simulation
 ###############################
diff --git a/src/main/java/org/apache/accumulo/testing/TestProps.java 
b/src/main/java/org/apache/accumulo/testing/TestProps.java
index 8d094bf..aa8e9e6 100644
--- a/src/main/java/org/apache/accumulo/testing/TestProps.java
+++ b/src/main/java/org/apache/accumulo/testing/TestProps.java
@@ -41,6 +41,7 @@ public class TestProps {
   private static final String CI_SCANNER = CI + "scanner.";
   private static final String CI_VERIFY = CI + "verify.";
   private static final String CI_BULK = CI + "bulk.";
+  private static final String CI_SPLIT = CI + "split.";
   public static final String TERASORT = PREFIX + "terasort.";
   public static final String ROWHASH = PREFIX + "rowhash.";
 
@@ -148,6 +149,19 @@ public class TestProps {
   public static final String CI_BULK_MAP_NODES = CI_BULK + "map.nodes";
   public static final String CI_BULK_REDUCERS = CI_BULK + "reducers.max";
 
+  /** Split **/
+  public static final String CI_SPLIT_TABLE_COUNT = CI_SPLIT + "table.count";
+  public static final String CI_SPLIT_INGEST_ROW_MIN = CI_SPLIT + 
"ingest.row.min";
+  public static final String CI_SPLIT_INGEST_ROW_MAX = CI_SPLIT + 
"ingest.row.max";
+  public static final String CI_SPLIT_INGEST_MAX_CF = CI_SPLIT + 
"ingest.max.cf";
+  public static final String CI_SPLIT_INGEST_MAX_CQ = CI_SPLIT + 
"ingest.max.cq";
+  public static final String CI_SPLIT_INITIAL_TABLETS = CI_SPLIT + 
"initial.tablets";
+  public static final String CI_SPLIT_WRITE_SIZE = CI_SPLIT + "write.size";
+  public static final String CI_SPLIT_THRESHOLD = CI_SPLIT + "threshold";
+  public static final String CI_SPLIT_THRESHOLD_REDUCTION_FACTOR =
+      CI_SPLIT + "threshold.reduction.factor";
+  public static final String CI_SPLIT_TEST_ROUNDS = CI_SPLIT + "test.rounds";
+
   /** TeraSort **/
   public static final String TERASORT_TABLE = TERASORT + "table";
   public static final String TERASORT_NUM_ROWS = TERASORT + "num.rows";
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java 
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index 34043af..1bb32a5 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -106,125 +106,134 @@ public class ContinuousIngest {
 
       final long rowMin = env.getRowMin();
       final long rowMax = env.getRowMax();
-      Preconditions.checkState(0 <= rowMin && rowMin <= rowMax,
-          "Bad rowMin/rowMax, must conform to: 0 <= rowMin <= rowMax");
-
       String tableName = env.getAccumuloTableName();
-      if (!client.tableOperations().exists(tableName)) {
-        throw new TableNotFoundException(null, tableName,
-            "Consult the README and create the table before starting ingest.");
-      }
+      Properties testProps = env.getTestProperties();
+      final int maxColF = env.getMaxColF();
+      final int maxColQ = env.getMaxColQ();
+      Random random = env.getRandom();
+      final long numEntries =
+          
Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_CLIENT_ENTRIES));
+      final boolean checksum =
+          
Boolean.parseBoolean(testProps.getProperty(TestProps.CI_INGEST_CHECKSUM));
 
-      byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
-      log.info("Ingest instance ID: {} current time: {}ms", new 
String(ingestInstanceId, UTF_8),
-          System.currentTimeMillis());
+      doIngest(client, rowMin, rowMax, tableName, testProps, maxColF, maxColQ, 
numEntries, checksum,
+          random);
+    }
+  }
 
-      Properties testProps = env.getTestProperties();
+  protected static void doIngest(AccumuloClient client, long rowMin, long 
rowMax, String tableName,
+      Properties testProps, int maxColF, int maxColQ, long numEntries, boolean 
checksum,
+      Random random)
+      throws TableNotFoundException, MutationsRejectedException, 
InterruptedException {
+    Preconditions.checkState(0 <= rowMin && rowMin <= rowMax,
+        "Bad rowMin/rowMax, must conform to: 0 <= rowMin <= rowMax");
 
-      long entriesWritten = 0L;
-      long entriesDeleted = 0L;
-      final int flushInterval = getFlushEntries(testProps);
-      log.info("A flush will occur after every {} entries written", 
flushInterval);
-      final int maxDepth = 25;
+    if (!client.tableOperations().exists(tableName)) {
+      throw new TableNotFoundException(null, tableName,
+          "Consult the README and create the table before starting ingest.");
+    }
 
-      // always want to point back to flushed data. This way the previous item 
should
-      // always exist in accumulo when verifying data. To do this make insert 
N point
-      // back to the row from insert (N - flushInterval). The array below is 
used to keep
-      // track of all inserts.
-      MutationInfo[][] nodeMap = new MutationInfo[maxDepth][flushInterval];
+    byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
+    log.info("Ingest instance ID: {} current time: {}ms", new 
String(ingestInstanceId, UTF_8),
+        System.currentTimeMillis());
 
-      long lastFlushTime = System.currentTimeMillis();
+    long entriesWritten = 0L;
+    long entriesDeleted = 0L;
+    final int flushInterval = getFlushEntries(testProps);
+    log.info("A flush will occur after every {} entries written", 
flushInterval);
+    final int maxDepth = 25;
 
-      final int maxColF = env.getMaxColF();
-      final int maxColQ = env.getMaxColQ();
-      final boolean checksum =
-          
Boolean.parseBoolean(testProps.getProperty(TestProps.CI_INGEST_CHECKSUM));
-      final long numEntries =
-          
Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_CLIENT_ENTRIES));
-      log.info("Total entries to be written: {}", numEntries);
+    // always want to point back to flushed data. This way the previous item 
should
+    // always exist in accumulo when verifying data. To do this make insert N 
point
+    // back to the row from insert (N - flushInterval). The array below is 
used to keep
+    // track of all inserts.
+    MutationInfo[][] nodeMap = new MutationInfo[maxDepth][flushInterval];
 
-      visibilities = 
parseVisibilities(testProps.getProperty(TestProps.CI_INGEST_VISIBILITIES));
+    long lastFlushTime = System.currentTimeMillis();
 
-      pauseEnabled = pauseEnabled(testProps);
+    log.info("Total entries to be written: {}", numEntries);
 
-      pauseMin = 
Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MIN));
-      pauseMax = 
Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MAX));
-      Preconditions.checkState(0 < pauseMin && pauseMin <= pauseMax,
-          "Bad pause wait min/max, must conform to: 0 < min <= max");
+    visibilities = 
parseVisibilities(testProps.getProperty(TestProps.CI_INGEST_VISIBILITIES));
 
-      if (pauseEnabled) {
-        lastPauseNs = System.nanoTime();
-        pauseWaitSec = getPause(env.getRandom());
-        log.info("PAUSING enabled");
-        log.info("INGESTING for {}s", pauseWaitSec);
-      }
+    pauseEnabled = pauseEnabled(testProps);
 
-      final float deleteProbability = getDeleteProbability(testProps);
-      log.info("DELETES will occur with a probability of {}",
-          String.format("%.02f", deleteProbability));
+    pauseMin = 
Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MIN));
+    pauseMax = 
Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MAX));
+    Preconditions.checkState(0 < pauseMin && pauseMin <= pauseMax,
+        "Bad pause wait min/max, must conform to: 0 < min <= max");
+
+    if (pauseEnabled) {
+      lastPauseNs = System.nanoTime();
+      pauseWaitSec = getPause(random);
+      log.info("PAUSING enabled");
+      log.info("INGESTING for {}s", pauseWaitSec);
+    }
 
-      try (BatchWriter bw = client.createBatchWriter(tableName)) {
-        out: while (true) {
-          ColumnVisibility cv = getVisibility(env.getRandom());
+    final float deleteProbability = getDeleteProbability(testProps);
+    log.info("DELETES will occur with a probability of {}",
+        String.format("%.02f", deleteProbability));
 
-          // generate sets nodes that link to previous set of nodes
-          for (int depth = 0; depth < maxDepth; depth++) {
-            for (int index = 0; index < flushInterval; index++) {
-              long rowLong = genLong(rowMin, rowMax, env.getRandom());
+    try (BatchWriter bw = client.createBatchWriter(tableName)) {
+      out: while (true) {
+        ColumnVisibility cv = getVisibility(random);
 
-              byte[] prevRow = depth == 0 ? null : genRow(nodeMap[depth - 
1][index].row);
+        // generate sets nodes that link to previous set of nodes
+        for (int depth = 0; depth < maxDepth; depth++) {
+          for (int index = 0; index < flushInterval; index++) {
+            long rowLong = genLong(rowMin, rowMax, random);
 
-              int cfInt = env.getRandom().nextInt(maxColF);
-              int cqInt = env.getRandom().nextInt(maxColQ);
+            byte[] prevRow = depth == 0 ? null : genRow(nodeMap[depth - 
1][index].row);
 
-              nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt);
-              Mutation m = genMutation(rowLong, cfInt, cqInt, cv, 
ingestInstanceId, entriesWritten,
-                  prevRow, checksum);
-              entriesWritten++;
-              bw.addMutation(m);
-            }
+            int cfInt = random.nextInt(maxColF);
+            int cqInt = random.nextInt(maxColQ);
 
-            lastFlushTime = flush(bw, entriesWritten, entriesDeleted, 
lastFlushTime);
-            if (entriesWritten >= numEntries)
-              break out;
-            pauseCheck(env.getRandom());
+            nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt);
+            Mutation m = genMutation(rowLong, cfInt, cqInt, cv, 
ingestInstanceId, entriesWritten,
+                prevRow, checksum);
+            entriesWritten++;
+            bw.addMutation(m);
           }
 
-          // random chance that the entries will be deleted
-          final boolean delete = env.getRandom().nextFloat() < 
deleteProbability;
-
-          // if the previously written entries are scheduled to be deleted
-          if (delete) {
-            log.info("Deleting last portion of written entries");
-            // add delete mutations in the reverse order in which they were 
written
-            for (int depth = nodeMap.length - 1; depth >= 0; depth--) {
-              for (int index = nodeMap[depth].length - 1; index >= 0; index--) 
{
-                MutationInfo currentNode = nodeMap[depth][index];
-                Mutation m = new Mutation(genRow(currentNode.row));
-                m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq));
-                entriesDeleted++;
-                bw.addMutation(m);
-              }
-              lastFlushTime = flush(bw, entriesWritten, entriesDeleted, 
lastFlushTime);
-              pauseCheck(env.getRandom());
-            }
-          } else {
-            // create one big linked list, this makes all the first inserts 
point to something
-            for (int index = 0; index < flushInterval - 1; index++) {
-              MutationInfo firstEntry = nodeMap[0][index];
-              MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1];
-              Mutation m = genMutation(firstEntry.row, firstEntry.cf, 
firstEntry.cq, cv,
-                  ingestInstanceId, entriesWritten, genRow(lastEntry.row), 
checksum);
-              entriesWritten++;
+          lastFlushTime = flush(bw, entriesWritten, entriesDeleted, 
lastFlushTime);
+          if (entriesWritten >= numEntries)
+            break out;
+          pauseCheck(random);
+        }
+
+        // random chance that the entries will be deleted
+        final boolean delete = random.nextFloat() < deleteProbability;
+
+        // if the previously written entries are scheduled to be deleted
+        if (delete) {
+          log.info("Deleting last portion of written entries");
+          // add delete mutations in the reverse order in which they were 
written
+          for (int depth = nodeMap.length - 1; depth >= 0; depth--) {
+            for (int index = nodeMap[depth].length - 1; index >= 0; index--) {
+              MutationInfo currentNode = nodeMap[depth][index];
+              Mutation m = new Mutation(genRow(currentNode.row));
+              m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq));
+              entriesDeleted++;
               bw.addMutation(m);
             }
             lastFlushTime = flush(bw, entriesWritten, entriesDeleted, 
lastFlushTime);
+            pauseCheck(random);
           }
-
-          if (entriesWritten >= numEntries)
-            break out;
-          pauseCheck(env.getRandom());
+        } else {
+          // create one big linked list, this makes all the first inserts 
point to something
+          for (int index = 0; index < flushInterval - 1; index++) {
+            MutationInfo firstEntry = nodeMap[0][index];
+            MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1];
+            Mutation m = genMutation(firstEntry.row, firstEntry.cf, 
firstEntry.cq, cv,
+                ingestInstanceId, entriesWritten, genRow(lastEntry.row), 
checksum);
+            entriesWritten++;
+            bw.addMutation(m);
+          }
+          lastFlushTime = flush(bw, entriesWritten, entriesDeleted, 
lastFlushTime);
         }
+
+        if (entriesWritten >= numEntries)
+          break out;
+        pauseCheck(random);
       }
     }
   }
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java 
b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
index 061c9a0..ab952cc 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
@@ -41,54 +41,67 @@ public class CreateTable {
 
     try (ContinuousEnv env = new ContinuousEnv(args)) {
       AccumuloClient client = env.getAccumuloClient();
-
       String tableName = env.getAccumuloTableName();
-      if (client.tableOperations().exists(tableName)) {
-        log.error("Accumulo table {} already exists", tableName);
-        System.exit(-1);
-      }
-
       int numTablets = 
Integer.parseInt(env.getTestProperty(CI_COMMON_ACCUMULO_NUM_TABLETS));
+      long rowMin = env.getRowMin();
+      long rowMax = env.getRowMax();
+      Map<String,String> serverProps = getProps(env, 
TestProps.CI_COMMON_ACCUMULO_SERVER_PROPS);
+      Map<String,String> tableProps = getProps(env, 
TestProps.CI_COMMON_ACCUMULO_TABLE_PROPS);
 
-      if (numTablets < 1) {
-        log.error("numTablets < 1");
-        System.exit(-1);
-      }
-      if (env.getRowMin() >= env.getRowMax()) {
-        log.error("min >= max");
-        System.exit(-1);
-      }
+      createTable(client, tableName, numTablets, rowMin, rowMax, serverProps, 
tableProps);
+    }
+  }
+
+  public static void createTable(AccumuloClient client, String tableName, int 
numTablets,
+      long rowMin, long rowMax, Map<String,String> serverProps, 
Map<String,String> tableProps)
+      throws Exception {
+    if (client.tableOperations().exists(tableName)) {
+      log.error("Accumulo table {} already exists", tableName);
+      System.exit(-1);
+    }
 
-      // retrieve and set tserver props
-      Map<String,String> props = getProps(env, 
TestProps.CI_COMMON_ACCUMULO_SERVER_PROPS);
+    if (numTablets < 1) {
+      log.error("numTablets < 1");
+      System.exit(-1);
+    }
+    if (rowMin >= rowMax) {
+      log.error("min >= max");
+      System.exit(-1);
+    }
+
+    // set tserver props
+    if (!serverProps.isEmpty()) {
       try {
-        client.instanceOperations().modifyProperties(properties -> 
properties.putAll(props));
+        client.instanceOperations().modifyProperties(properties -> 
properties.putAll(serverProps));
       } catch (AccumuloException | AccumuloSecurityException e) {
         log.error("Failed to set tserver props");
         throw new Exception(e);
       }
+    }
+
+    NewTableConfiguration ntc = new NewTableConfiguration();
 
+    if (numTablets > 1) {
       SortedSet<Text> splits = new TreeSet<>();
       final int numSplits = numTablets - 1;
-      final long distance = ((env.getRowMax() - env.getRowMin()) / numTablets) 
+ 1;
+      final long distance = ((rowMax - rowMin) / numTablets) + 1;
       long split = distance;
       for (int i = 0; i < numSplits; i++) {
-        String s = String.format("%016x", split + env.getRowMin());
+        String s = String.format("%016x", split + rowMin);
         while (s.charAt(s.length() - 1) == '0') {
           s = s.substring(0, s.length() - 1);
         }
         splits.add(new Text(s));
         split += distance;
       }
-
-      NewTableConfiguration ntc = new NewTableConfiguration();
       ntc.withSplits(splits);
-      ntc.setProperties(getProps(env, 
TestProps.CI_COMMON_ACCUMULO_TABLE_PROPS));
+    }
 
-      client.tableOperations().create(tableName, ntc);
+    ntc.setProperties(tableProps);
 
-      log.info("Created Accumulo table {} with {} tablets", tableName, 
numTablets);
-    }
+    client.tableOperations().create(tableName, ntc);
+
+    log.info("Created Accumulo table {} with {} tablets", tableName, 
numTablets);
   }
 
   private static Map<String,String> getProps(ContinuousEnv env, String 
propType) {
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java 
b/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java
new file mode 100644
index 0000000..6a96574
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.testing.continuous;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.NamespaceExistsException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.testing.TestProps;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class ManySplits {
+  private static final Logger log = LoggerFactory.getLogger(ManySplits.class);
+
+  private static final String NAMESPACE = "manysplits";
+
+  public static void main(String[] args) throws Exception {
+    try (ContinuousEnv env = new ContinuousEnv(args)) {
+
+      AccumuloClient client = env.getAccumuloClient();
+      Properties testProps = env.getTestProperties();
+      final int tableCount =
+          
Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_TABLE_COUNT));
+      final long rowMin = 
Long.parseLong(testProps.getProperty(TestProps.CI_SPLIT_INGEST_ROW_MIN));
+      final long rowMax = 
Long.parseLong(testProps.getProperty(TestProps.CI_SPLIT_INGEST_ROW_MAX));
+      final int maxColF = 
Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_INGEST_MAX_CF));
+      final int maxColQ = 
Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_INGEST_MAX_CQ));
+      final int initialTabletCount =
+          
Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_INITIAL_TABLETS));
+      final int initialData =
+          
Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_WRITE_SIZE));
+      String initialSplitThresholdStr = 
testProps.getProperty(TestProps.CI_SPLIT_THRESHOLD);
+      final long initialSplitThreshold =
+          
ConfigurationTypeHelper.getFixedMemoryAsBytes(initialSplitThresholdStr);
+      final int splitThresholdReductionFactor =
+          
Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_THRESHOLD_REDUCTION_FACTOR));
+      final int testRounds =
+          
Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_TEST_ROUNDS));
+
+      // disable deletes for ingest
+      testProps.setProperty(TestProps.CI_INGEST_DELETE_PROBABILITY, "0.0");
+
+      final Random random = env.getRandom();
+
+      Preconditions.checkArgument(tableCount > 0, "Test cannot run without any 
tables");
+
+      final List<String> tableNames = IntStream.range(1, tableCount + 1)
+          .mapToObj(i -> NAMESPACE + ".table" + 
i).collect(Collectors.toList());
+
+      try {
+        client.namespaceOperations().create(NAMESPACE);
+      } catch (NamespaceExistsException e) {
+        log.warn("The namespace '{}' already exists. Continuing with existing 
namespace.",
+            NAMESPACE);
+      }
+
+      final String firstTable = tableNames.get(0);
+
+      Map<String,String> tableProps =
+          Map.of(Property.TABLE_SPLIT_THRESHOLD.getKey(), 
initialSplitThresholdStr);
+
+      log.info("Properties being used to create tables for this test: {}", 
tableProps);
+
+      log.info("Creating initial table: {}", firstTable);
+      CreateTable.createTable(client, firstTable, initialTabletCount, rowMin, 
rowMax, tableProps,
+          Map.of());
+
+      log.info("Ingesting {} entries into first table, {}.", initialData, 
firstTable);
+      ContinuousIngest.doIngest(client, rowMin, rowMax, firstTable, testProps, 
maxColF, maxColQ,
+          initialData, false, random);
+
+      client.tableOperations().flush(firstTable);
+
+      // clone tables instead of ingesting into each. it's a lot quicker
+      log.info("Creating {} more tables by cloning the first", tableCount - 1);
+      tableNames.stream().parallel().skip(1).forEach(tableName -> {
+        try {
+          client.tableOperations().clone(firstTable, tableName, true, null, 
null);
+        } catch (TableExistsException e) {
+          log.warn(
+              "table {} already exists. Continuing with existing table. 
Previous data will affect splits",
+              tableName);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      });
+
+      StringBuilder testResults = new StringBuilder();
+      testResults.append("Test results:\n");
+      testResults.append("Total test rounds: 
").append(testRounds).append("\n");
+      testResults.append("Table count: ").append(tableCount).append("\n");
+
+      SECONDS.sleep(5);
+
+      // main loop
+      // reduce the split threshold then wait for the expected file size per 
tablet to be reached
+      long previousSplitThreshold = initialSplitThreshold;
+      for (int i = 0; i < testRounds; i++) {
+
+        // apply the reduction factor to the previous threshold
+        final long splitThreshold = previousSplitThreshold / 
splitThresholdReductionFactor;
+        final String splitThresholdStr = bytesToMemoryString(splitThreshold);
+        final int totalSplitCountBefore = getTotalSplitCount(client, 
tableNames);
+
+        log.info("Changing split threshold on all tables from {} to {}",
+            bytesToMemoryString(previousSplitThreshold), splitThresholdStr);
+
+        long beforeThresholdUpdate = System.nanoTime();
+
+        // update the split threshold on all tables
+        tableNames.stream().parallel().forEach(tableName -> {
+          try {
+            client.tableOperations().setProperty(tableName, 
Property.TABLE_SPLIT_THRESHOLD.getKey(),
+                splitThresholdStr);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        });
+
+        log.info("Waiting for each tablet to have a sum file size <= {}", 
splitThresholdStr);
+
+        // wait for all tablets to reach the expected sum file size
+        tableNames.stream().parallel().forEach(tableName -> {
+          int elapsedMillis = 0;
+          long sleepMillis = SECONDS.toMillis(1);
+          try {
+            // wait for each tablet to reach the expected sum file size
+            while (true) {
+              Collection<Long> tabletFileSizes = getTabletFileSizes(client, 
tableName).values();
+              // filter out the tablets that are already the expected size
+              Set<Long> offendingTabletSizes =
+                  tabletFileSizes.stream().filter(tabletFileSize -> 
tabletFileSize > splitThreshold)
+                      .collect(Collectors.toSet());
+              // if all tablets are good, move on
+              if (offendingTabletSizes.isEmpty()) {
+                break;
+              }
+
+              elapsedMillis += sleepMillis;
+              // log every 3 seconds
+              if (elapsedMillis % SECONDS.toMillis(3) == 0) {
+                double averageFileSize =
+                    offendingTabletSizes.stream().mapToLong(l -> 
l).average().orElse(0);
+                long diff = (long) (averageFileSize - splitThreshold);
+                log.info(
+                    "{} tablets have file sizes not yet <= {} on table {}. 
Diff of avg offending file(s): {}",
+                    offendingTabletSizes.size(), splitThresholdStr, tableName,
+                    bytesToMemoryString(diff));
+              }
+              MILLISECONDS.sleep(sleepMillis);
+            }
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        });
+
+        long timeTakenNanos = System.nanoTime() - beforeThresholdUpdate;
+        long seconds = NANOSECONDS.toSeconds(timeTakenNanos);
+        long millis = NANOSECONDS.toMillis(timeTakenNanos);
+
+        final int splitCountAfter = getTotalSplitCount(client, tableNames);
+        final int splitCountThisRound = splitCountAfter - 
totalSplitCountBefore;
+
+        log.info(
+            "Time taken for all tables to reach expected total file size ({}): 
{} seconds ({}ms)",
+            splitThresholdStr, seconds, millis);
+
+        testResults.append("Test round ").append(i).append(":\n");
+        testResults.append("TABLE_SPLIT_THRESHOLD ")
+            .append(bytesToMemoryString(previousSplitThreshold)).append(" -> ")
+            .append(splitThresholdStr).append("\n");
+        testResults.append("Splits count:         
").append(totalSplitCountBefore).append(" -> ")
+            .append(splitCountAfter).append("\n");
+        String splitsPerSecond = String.format("%.2f", (double) 
splitCountThisRound / seconds);
+        testResults.append("Splits per second:    
").append(splitsPerSecond).append("\n");
+
+        previousSplitThreshold = splitThreshold;
+      }
+
+      log.info("Test completed successfully.");
+      log.info(testResults.toString());
+      log.info("Deleting tables");
+      tableNames.stream().parallel().forEach(tableName -> {
+        try {
+          client.tableOperations().delete(tableName);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      });
+      log.info("Deleting namespace");
+      client.namespaceOperations().delete(NAMESPACE);
+
+    }
+  }
+
+  /**
+   * @return a map of tablets to the sum of their file size
+   */
+  private static Map<Text,Long> getTabletFileSizes(AccumuloClient client, 
String tableName)
+      throws TableNotFoundException, AccumuloException, 
AccumuloSecurityException {
+    TableId tableId = 
TableId.of(client.tableOperations().tableIdMap().get(tableName));
+    try (Scanner scanner = client.createScanner("accumulo.metadata")) {
+      scanner.fetchColumnFamily("file");
+      scanner.setRange(getMetaRangeForTable(tableId.canonical()));
+
+      Map<Text,Long> result = new HashMap<>();
+      for (var entry : scanner) {
+        String encodedDFV = new String(entry.getValue().get(), UTF_8);
+        String[] ba = encodedDFV.split(",", 2);
+        long tabletFileSize = Long.parseLong(ba[0]);
+        result.merge(entry.getKey().getRow(), tabletFileSize, Long::sum);
+      }
+
+      return result;
+    }
+  }
+
+  public static String bytesToMemoryString(long bytes) {
+    if (bytes < 1024) {
+      return bytes + "B"; // Bytes
+    } else if (bytes < 1024 * 1024) {
+      return (bytes / 1024) + "K"; // Kilobytes
+    } else if (bytes < 1024 * 1024 * 1024) {
+      return (bytes / (1024 * 1024)) + "M"; // Megabytes
+    } else {
+      return (bytes / (1024 * 1024 * 1024)) + "G"; // Gigabytes
+    }
+  }
+
+  private static Range getMetaRangeForTable(String tableId) {
+    return new Range(tableId + ";", false, tableId + "<", true);
+  }
+
+  /**
+   * @return the total number of splits across all given tables
+   */
+  private static int getTotalSplitCount(AccumuloClient client, List<String> 
tableNames) {
+    return tableNames.stream().parallel().mapToInt(tableName -> {
+      try {
+        return client.tableOperations().listSplits(tableName).size();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }).sum();
+  }
+
+}

Reply via email to