ACCUMULO-4510 Moved remaining external test code from Accumulo

Project: http://git-wip-us.apache.org/repos/asf/accumulo-testing/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-testing/commit/0d97273c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-testing/tree/0d97273c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-testing/diff/0d97273c

Branch: refs/heads/master
Commit: 0d97273cc18be723cb5ce84c91c329dc1885274b
Parents: fc3ddfc
Author: Mike Walch <mwa...@apache.org>
Authored: Tue Jan 24 17:27:50 2017 -0500
Committer: Mike Walch <mwa...@apache.org>
Committed: Wed Jan 25 13:15:43 2017 -0500

----------------------------------------------------------------------
 .../core/continuous/ContinuousIngest.java       |    4 +-
 .../core/ingest/BulkImportDirectory.java        |   68 ++
 .../testing/core/ingest/TestIngest.java         |  344 ++++++
 .../testing/core/ingest/VerifyIngest.java       |  237 +++++
 .../testing/core/mapreduce/RowHash.java         |   95 ++
 .../testing/core/mapreduce/TeraSortIngest.java  |  399 +++++++
 .../testing/core/merkle/MerkleTree.java         |   92 ++
 .../testing/core/merkle/MerkleTreeNode.java     |  131 +++
 .../testing/core/merkle/RangeSerialization.java |   72 ++
 .../testing/core/merkle/cli/CompareTables.java  |  176 +++
 .../core/merkle/cli/ComputeRootHash.java        |  100 ++
 .../testing/core/merkle/cli/GenerateHashes.java |  287 +++++
 .../core/merkle/cli/ManualComparison.java       |   95 ++
 .../core/merkle/ingest/RandomWorkload.java      |  120 +++
 .../testing/core/merkle/package-info.java       |   39 +
 .../core/merkle/skvi/DigestIterator.java        |  149 +++
 .../testing/core/scalability/Ingest.java        |  143 +++
 .../accumulo/testing/core/scalability/Run.java  |   97 ++
 .../testing/core/scalability/ScaleTest.java     |   88 ++
 .../testing/core/stress/DataWriter.java         |   50 +
 .../testing/core/stress/IntArgValidator.java    |   34 +
 .../testing/core/stress/RandomByteArrays.java   |   33 +
 .../testing/core/stress/RandomMutations.java    |   56 +
 .../testing/core/stress/RandomWithinRange.java  |   58 +
 .../accumulo/testing/core/stress/Scan.java      |  121 +++
 .../accumulo/testing/core/stress/ScanOpts.java  |   46 +
 .../accumulo/testing/core/stress/Stream.java    |   40 +
 .../accumulo/testing/core/stress/Write.java     |   77 ++
 .../testing/core/stress/WriteOptions.java       |  169 +++
 .../testing/core/stress/package-info.java       |   36 +
 test/agitator/.gitignore                        |    3 +
 test/agitator/README.md                         |   39 +
 test/agitator/agitator.ini.example              |   56 +
 test/agitator/agitator.py                       |  241 +++++
 test/agitator/hosts.example                     |   16 +
 test/bench/README.md                            |   61 ++
 test/bench/cloudstone1/__init__.py              |   15 +
 test/bench/cloudstone1/cloudstone1.py           |   44 +
 test/bench/cloudstone2/__init__.py              |   15 +
 test/bench/cloudstone2/cloudstone2.py           |   49 +
 test/bench/cloudstone3/__init__.py              |   15 +
 test/bench/cloudstone3/cloudstone3.py           |   50 +
 test/bench/cloudstone4/__init__.py              |   15 +
 test/bench/cloudstone4/cloudstone4.py           |   29 +
 test/bench/cloudstone5/__init__.py              |   15 +
 test/bench/cloudstone5/cloudstone5.py           |   29 +
 test/bench/cloudstone6/__init__.py              |   15 +
 test/bench/cloudstone6/cloudstone6.py           |   29 +
 test/bench/cloudstone7/__init__.py              |   15 +
 test/bench/cloudstone7/cloudstone7.py           |   29 +
 test/bench/cloudstone8/__init__.py              |   15 +
 test/bench/cloudstone8/cloudstone8.py           |   64 ++
 test/bench/lib/Benchmark.py                     |  115 ++
 test/bench/lib/CreateTablesBenchmark.py         |   78 ++
 test/bench/lib/IngestBenchmark.py               |   94 ++
 test/bench/lib/RowHashBenchmark.py              |  136 +++
 test/bench/lib/TableSplitsBenchmark.py          |   76 ++
 test/bench/lib/TeraSortBenchmark.py             |  110 ++
 test/bench/lib/__init__.py                      |   15 +
 test/bench/lib/cloudshell.py                    |   33 +
 test/bench/lib/fastsplits                       |  300 ++++++
 test/bench/lib/mediumsplits                     |  650 ++++++++++++
 test/bench/lib/options.py                       |   39 +
 test/bench/lib/path.py                          |   38 +
 test/bench/lib/runner.py                        |   28 +
 test/bench/lib/slowsplits                       | 1000 ++++++++++++++++++
 test/bench/lib/splits                           |  190 ++++
 test/bench/lib/tservers.py                      |   89 ++
 test/bench/lib/util.py                          |   20 +
 test/bench/run.py                               |  116 ++
 test/compat/diffAPI.pl                          |  104 ++
 test/compat/japi-compliance/README              |   53 +
 test/compat/japi-compliance/exclude_classes.txt |    1 +
 .../japi-compliance/japi-accumulo-1.5.0.xml     |   36 +
 .../japi-compliance/japi-accumulo-1.5.1.xml     |   36 +
 .../japi-compliance/japi-accumulo-1.5.2.xml     |   36 +
 .../japi-compliance/japi-accumulo-1.6.0.xml     |   38 +
 .../japi-compliance/japi-accumulo-1.6.1.xml     |   38 +
 .../japi-compliance/japi-accumulo-1.6.2.xml     |   38 +
 .../japi-compliance/japi-accumulo-1.7.0.xml     |   38 +
 .../japi-compliance/japi-accumulo-master.xml    |   38 +
 test/merkle-replication/README                  |   65 ++
 .../merkle-replication/configure-replication.sh |   99 ++
 test/merkle-replication/ingest-data.sh          |   39 +
 test/merkle-replication/merkle-env.sh           |   48 +
 test/merkle-replication/verify-data.sh          |   91 ++
 test/scalability/README.md                      |   57 +
 test/scalability/conf/Ingest.conf.example       |   27 +
 test/scalability/conf/site.conf.example         |   27 +
 test/scalability/run.py                         |  228 ++++
 test/scale/agitator.txt                         |   27 +
 test/scale/catastrophic.txt                     |   24 +
 test/scale/deleteLargeTable.txt                 |   16 +
 test/scale/restart.txt                          |   19 +
 test/stress/README.md                           |  105 ++
 test/stress/reader.sh                           |   39 +
 test/stress/readers                             |   17 +
 test/stress/start-readers.sh                    |   40 +
 test/stress/start-writers.sh                    |   40 +
 test/stress/stop-readers.sh                     |   36 +
 test/stress/stop-writers.sh                     |   36 +
 test/stress/stress-env.sh.example               |   60 ++
 test/stress/writer.sh                           |   44 +
 test/stress/writers                             |   17 +
 test/test1/README.md                            |   46 +
 test/test1/ingest_test.sh                       |   22 +
 test/test1/ingest_test_2.sh                     |   22 +
 test/test1/ingest_test_3.sh                     |   22 +
 test/test1/verify_test.sh                       |   22 +
 test/test1/verify_test_2.sh                     |   22 +
 test/test2/README.md                            |   27 +
 test/test2/concurrent.sh                        |   99 ++
 test/test3/README.md                            |   22 +
 test/test3/bigrow.sh                            |   27 +
 test/test4/README.md                            |   26 +
 test/test4/bulk_import_test.sh                  |   72 ++
 test/upgrade/upgrade_test.sh                    |   77 ++
 117 files changed, 9603 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
 
b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
index f260e78..d583e32 100644
--- 
a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
+++ 
b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
@@ -177,7 +177,7 @@ public class ContinuousIngest {
     return lastFlushTime;
   }
 
-  static Mutation genMutation(long rowLong, int cfInt, int cqInt, 
ColumnVisibility cv,
+  public static Mutation genMutation(long rowLong, int cfInt, int cqInt, 
ColumnVisibility cv,
                               byte[] ingestInstanceId, long count, byte[] 
prevRow,
                               boolean checksum) {
     // Adler32 is supposed to be faster, but according to wikipedia is not 
good for small data.... so used CRC32 instead
@@ -202,7 +202,7 @@ public class ContinuousIngest {
     return m;
   }
 
-  static long genLong(long min, long max, Random r) {
+  public static long genLong(long min, long max, Random r) {
     return ((r.nextLong() & 0x7fffffffffffffffL) % (max - min)) + min;
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/ingest/BulkImportDirectory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/testing/core/ingest/BulkImportDirectory.java
 
b/core/src/main/java/org/apache/accumulo/testing/core/ingest/BulkImportDirectory.java
new file mode 100644
index 0000000..074bd8b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/testing/core/ingest/BulkImportDirectory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.ingest;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.cli.ClientOnRequiredTable;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.beust.jcommander.Parameter;
+
+public class BulkImportDirectory {
+  static class Opts extends ClientOnRequiredTable {
+    @Parameter(names = {"-s", "--source"}, description = "directory to import 
from")
+    String source = null;
+    @Parameter(names = {"-f", "--failures"}, description = "directory to copy 
failures into: will be deleted before the bulk import")
+    String failures = null;
+    @Parameter(description = "<username> <password> <tablename> <sourcedir> 
<failuredir>")
+    List<String> args = new ArrayList<>();
+  }
+
+  public static void main(String[] args) throws IOException, 
AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    final FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+    Opts opts = new Opts();
+    if (args.length == 5) {
+      System.err.println("Deprecated syntax for BulkImportDirectory, please 
use the new style (see --help)");
+      final String user = args[0];
+      final byte[] pass = args[1].getBytes(UTF_8);
+      final String tableName = args[2];
+      final String dir = args[3];
+      final String failureDir = args[4];
+      final Path failureDirPath = new Path(failureDir);
+      fs.delete(failureDirPath, true);
+      fs.mkdirs(failureDirPath);
+      HdfsZooInstance.getInstance().getConnector(user, new 
PasswordToken(pass)).tableOperations().importDirectory(tableName, dir, 
failureDir, false);
+    } else {
+      opts.parseArgs(BulkImportDirectory.class.getName(), args);
+      fs.delete(new Path(opts.failures), true);
+      fs.mkdirs(new Path(opts.failures));
+      
opts.getConnector().tableOperations().importDirectory(opts.getTableName(), 
opts.source, opts.failures, false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/ingest/TestIngest.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/testing/core/ingest/TestIngest.java 
b/core/src/main/java/org/apache/accumulo/testing/core/ingest/TestIngest.java
new file mode 100644
index 0000000..bb40f6f
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/ingest/TestIngest.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.ingest;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOnDefaultTable;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.TabletServerBatchWriter;
+import org.apache.accumulo.core.client.security.SecurityErrorCode;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.ConstraintViolationSummary;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.TabletId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.trace.DistributedTrace;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.FastFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.beust.jcommander.Parameter;
+
+public class TestIngest {
+  public static final Authorizations AUTHS = new Authorizations("L1", "L2", 
"G1", "GROUP2");
+
+  public static class Opts extends ClientOnDefaultTable {
+
+    @Parameter(names = "--createTable")
+    public boolean createTable = false;
+
+    @Parameter(names = "--splits", description = "the number of splits to use 
when creating the table")
+    public int numsplits = 1;
+
+    @Parameter(names = "--start", description = "the starting row number")
+    public int startRow = 0;
+
+    @Parameter(names = "--rows", description = "the number of rows to ingest")
+    public int rows = 100000;
+
+    @Parameter(names = "--cols", description = "the number of columns to 
ingest per row")
+    public int cols = 1;
+
+    @Parameter(names = "--random", description = "insert random rows and use 
the given number to seed the psuedo-random number generator")
+    public Integer random = null;
+
+    @Parameter(names = "--size", description = "the size of the value to 
ingest")
+    public int dataSize = 1000;
+
+    @Parameter(names = "--delete", description = "delete values instead of 
inserting them")
+    public boolean delete = false;
+
+    @Parameter(names = {"-ts", "--timestamp"}, description = "timestamp to use 
for all values")
+    public long timestamp = -1;
+
+    @Parameter(names = "--rfile", description = "generate data into a file 
that can be imported")
+    public String outputFile = null;
+
+    @Parameter(names = "--stride", description = "the difference between 
successive row ids")
+    public int stride;
+
+    @Parameter(names = {"-cf", "--columnFamily"}, description = "place columns 
in this column family")
+    public String columnFamily = "colf";
+
+    @Parameter(names = {"-cv", "--columnVisibility"}, description = "place 
columns in this column family", converter = VisibilityConverter.class)
+    public ColumnVisibility columnVisibility = new ColumnVisibility();
+
+    public Configuration conf = null;
+    public FileSystem fs = null;
+
+    public Opts() {
+      super("test_ingest");
+    }
+  }
+
+  public static void createTable(Connector conn, Opts args) throws 
AccumuloException, AccumuloSecurityException, TableExistsException {
+    if (args.createTable) {
+      TreeSet<Text> splits = getSplitPoints(args.startRow, args.startRow + 
args.rows, args.numsplits);
+
+      if (!conn.tableOperations().exists(args.getTableName()))
+        conn.tableOperations().create(args.getTableName());
+      try {
+        conn.tableOperations().addSplits(args.getTableName(), splits);
+      } catch (TableNotFoundException ex) {
+        // unlikely
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+
+  public static TreeSet<Text> getSplitPoints(long start, long end, long 
numsplits) {
+    long splitSize = (end - start) / numsplits;
+
+    long pos = start + splitSize;
+
+    TreeSet<Text> splits = new TreeSet<>();
+
+    while (pos < end) {
+      splits.add(new Text(String.format("row_%010d", pos)));
+      pos += splitSize;
+    }
+    return splits;
+  }
+
+  public static byte[][] generateValues(int dataSize) {
+
+    byte[][] bytevals = new byte[10][];
+
+    byte[] letters = {'1', '2', '3', '4', '5', '6', '7', '8', '9', '0'};
+
+    for (int i = 0; i < 10; i++) {
+      bytevals[i] = new byte[dataSize];
+      for (int j = 0; j < dataSize; j++)
+        bytevals[i][j] = letters[i];
+    }
+    return bytevals;
+  }
+
+  private static byte ROW_PREFIX[] = "row_".getBytes(UTF_8);
+  private static byte COL_PREFIX[] = "col_".getBytes(UTF_8);
+
+  public static Text generateRow(int rowid, int startRow) {
+    return new Text(FastFormat.toZeroPaddedString(rowid + startRow, 10, 10, 
ROW_PREFIX));
+  }
+
+  public static byte[] genRandomValue(Random random, byte dest[], int seed, 
int row, int col) {
+    random.setSeed((row ^ seed) ^ col);
+    random.nextBytes(dest);
+    toPrintableChars(dest);
+
+    return dest;
+  }
+
+  public static void toPrintableChars(byte[] dest) {
+    // transform to printable chars
+    for (int i = 0; i < dest.length; i++) {
+      dest[i] = (byte) (((0xff & dest[i]) % 92) + ' ');
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+
+    Opts opts = new Opts();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    opts.parseArgs(TestIngest.class.getName(), args, bwOpts);
+
+    String name = TestIngest.class.getSimpleName();
+    DistributedTrace.enable(name);
+
+    try {
+      opts.startTracing(name);
+
+      if (opts.debug)
+        
Logger.getLogger(TabletServerBatchWriter.class.getName()).setLevel(Level.TRACE);
+
+      // test batch update
+
+      ingest(opts.getConnector(), opts, bwOpts);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      opts.stopTracing();
+      DistributedTrace.disable();
+    }
+  }
+
+  public static void ingest(Connector connector, FileSystem fs, Opts opts, 
BatchWriterOpts bwOpts) throws IOException, AccumuloException,
+      AccumuloSecurityException, TableNotFoundException, 
MutationsRejectedException, TableExistsException {
+    long stopTime;
+
+    byte[][] bytevals = generateValues(opts.dataSize);
+
+    byte randomValue[] = new byte[opts.dataSize];
+    Random random = new Random();
+
+    long bytesWritten = 0;
+
+    createTable(connector, opts);
+
+    BatchWriter bw = null;
+    FileSKVWriter writer = null;
+
+    if (opts.outputFile != null) {
+      Configuration conf = CachedConfiguration.getInstance();
+      writer = 
FileOperations.getInstance().newWriterBuilder().forFile(opts.outputFile + "." + 
RFile.EXTENSION, fs, conf)
+          
.withTableConfiguration(AccumuloConfiguration.getDefaultConfiguration()).build();
+      writer.startDefaultLocalityGroup();
+    } else {
+      bw = connector.createBatchWriter(opts.getTableName(), 
bwOpts.getBatchWriterConfig());
+      
connector.securityOperations().changeUserAuthorizations(opts.getPrincipal(), 
AUTHS);
+    }
+    Text labBA = new Text(opts.columnVisibility.getExpression());
+
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < opts.rows; i++) {
+      int rowid;
+      if (opts.stride > 0) {
+        rowid = ((i % opts.stride) * (opts.rows / opts.stride)) + (i / 
opts.stride);
+      } else {
+        rowid = i;
+      }
+
+      Text row = generateRow(rowid, opts.startRow);
+      Mutation m = new Mutation(row);
+      for (int j = 0; j < opts.cols; j++) {
+        Text colf = new Text(opts.columnFamily);
+        Text colq = new Text(FastFormat.toZeroPaddedString(j, 7, 10, 
COL_PREFIX));
+
+        if (writer != null) {
+          Key key = new Key(row, colf, colq, labBA);
+          if (opts.timestamp >= 0) {
+            key.setTimestamp(opts.timestamp);
+          } else {
+            key.setTimestamp(startTime);
+          }
+
+          if (opts.delete) {
+            key.setDeleted(true);
+          } else {
+            key.setDeleted(false);
+          }
+
+          bytesWritten += key.getSize();
+
+          if (opts.delete) {
+            writer.append(key, new Value(new byte[0]));
+          } else {
+            byte value[];
+            if (opts.random != null) {
+              value = genRandomValue(random, randomValue, 
opts.random.intValue(), rowid + opts.startRow, j);
+            } else {
+              value = bytevals[j % bytevals.length];
+            }
+
+            Value v = new Value(value);
+            writer.append(key, v);
+            bytesWritten += v.getSize();
+          }
+
+        } else {
+          Key key = new Key(row, colf, colq, labBA);
+          bytesWritten += key.getSize();
+
+          if (opts.delete) {
+            if (opts.timestamp >= 0)
+              m.putDelete(colf, colq, opts.columnVisibility, opts.timestamp);
+            else
+              m.putDelete(colf, colq, opts.columnVisibility);
+          } else {
+            byte value[];
+            if (opts.random != null) {
+              value = genRandomValue(random, randomValue, 
opts.random.intValue(), rowid + opts.startRow, j);
+            } else {
+              value = bytevals[j % bytevals.length];
+            }
+            bytesWritten += value.length;
+
+            if (opts.timestamp >= 0) {
+              m.put(colf, colq, opts.columnVisibility, opts.timestamp, new 
Value(value, true));
+            } else {
+              m.put(colf, colq, opts.columnVisibility, new Value(value, true));
+
+            }
+          }
+        }
+
+      }
+      if (bw != null)
+        bw.addMutation(m);
+
+    }
+
+    if (writer != null) {
+      writer.close();
+    } else if (bw != null) {
+      try {
+        bw.close();
+      } catch (MutationsRejectedException e) {
+        if (e.getSecurityErrorCodes().size() > 0) {
+          for (Entry<TabletId,Set<SecurityErrorCode>> entry : 
e.getSecurityErrorCodes().entrySet()) {
+            System.err.println("ERROR : Not authorized to write to : " + 
entry.getKey() + " due to " + entry.getValue());
+          }
+        }
+
+        if (e.getConstraintViolationSummaries().size() > 0) {
+          for (ConstraintViolationSummary cvs : 
e.getConstraintViolationSummaries()) {
+            System.err.println("ERROR : Constraint violates : " + cvs);
+          }
+        }
+
+        throw e;
+      }
+    }
+
+    stopTime = System.currentTimeMillis();
+
+    int totalValues = opts.rows * opts.cols;
+    double elapsed = (stopTime - startTime) / 1000.0;
+
+    System.out.printf("%,12d records written | %,8d records/sec | %,12d bytes 
written | %,8d bytes/sec | %6.3f secs   %n", totalValues,
+        (int) (totalValues / elapsed), bytesWritten, (int) (bytesWritten / 
elapsed), elapsed);
+  }
+
+  public static void ingest(Connector c, Opts opts, BatchWriterOpts 
batchWriterOpts) throws MutationsRejectedException, IOException, 
AccumuloException,
+      AccumuloSecurityException, TableNotFoundException, TableExistsException {
+    ingest(c, FileSystem.get(CachedConfiguration.getInstance()), opts, 
batchWriterOpts);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/ingest/VerifyIngest.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/testing/core/ingest/VerifyIngest.java 
b/core/src/main/java/org/apache/accumulo/testing/core/ingest/VerifyIngest.java
new file mode 100644
index 0000000..4f8862a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/testing/core/ingest/VerifyIngest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.ingest;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.trace.DistributedTrace;
+import org.apache.accumulo.core.trace.Trace;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+public class VerifyIngest {
+
+  private static final Logger log = 
LoggerFactory.getLogger(VerifyIngest.class);
+
+  public static int getRow(Key k) {
+    return Integer.parseInt(k.getRow().toString().split("_")[1]);
+  }
+
+  public static int getCol(Key k) {
+    return Integer.parseInt(k.getColumnQualifier().toString().split("_")[1]);
+  }
+
+  public static class Opts extends TestIngest.Opts {
+    @Parameter(names = "-useGet", description = "fetches values one at a time, 
instead of scanning")
+    public boolean useGet = false;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    ScannerOpts scanOpts = new ScannerOpts();
+    opts.parseArgs(VerifyIngest.class.getName(), args, scanOpts);
+    try {
+      if (opts.trace) {
+        String name = VerifyIngest.class.getSimpleName();
+        DistributedTrace.enable();
+        Trace.on(name);
+        Trace.data("cmdLine", Arrays.asList(args).toString());
+      }
+
+      verifyIngest(opts.getConnector(), opts, scanOpts);
+
+    } finally {
+      Trace.off();
+      DistributedTrace.disable();
+    }
+  }
+
+  public static void verifyIngest(Connector connector, Opts opts, ScannerOpts 
scanOpts) throws AccumuloException, AccumuloSecurityException,
+      TableNotFoundException {
+    byte[][] bytevals = TestIngest.generateValues(opts.dataSize);
+
+    Authorizations labelAuths = new Authorizations("L1", "L2", "G1", "GROUP2");
+    
connector.securityOperations().changeUserAuthorizations(opts.getPrincipal(), 
labelAuths);
+
+    int expectedRow = opts.startRow;
+    int expectedCol = 0;
+    int recsRead = 0;
+
+    long bytesRead = 0;
+    long t1 = System.currentTimeMillis();
+
+    byte randomValue[] = new byte[opts.dataSize];
+    Random random = new Random();
+
+    Key endKey = new Key(new Text("row_" + String.format("%010d", opts.rows + 
opts.startRow)));
+
+    int errors = 0;
+
+    while (expectedRow < (opts.rows + opts.startRow)) {
+
+      if (opts.useGet) {
+        Text rowKey = new Text("row_" + String.format("%010d", expectedRow + 
opts.startRow));
+        Text colf = new Text(opts.columnFamily);
+        Text colq = new Text("col_" + String.format("%07d", expectedCol));
+
+        Scanner scanner = connector.createScanner("test_ingest", labelAuths);
+        scanner.setBatchSize(1);
+        Key startKey = new Key(rowKey, colf, colq);
+        Range range = new Range(startKey, 
startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL));
+        scanner.setRange(range);
+
+        byte[] val = null; // t.get(rowKey, column);
+
+        Iterator<Entry<Key,Value>> iter = scanner.iterator();
+
+        if (iter.hasNext()) {
+          val = iter.next().getValue().get();
+        }
+
+        byte ev[];
+        if (opts.random != null) {
+          ev = TestIngest.genRandomValue(random, randomValue, 
opts.random.intValue(), expectedRow, expectedCol);
+        } else {
+          ev = bytevals[expectedCol % bytevals.length];
+        }
+
+        if (val == null) {
+          log.error("Did not find " + rowKey + " " + colf + " " + colq);
+          errors++;
+        } else {
+          recsRead++;
+          bytesRead += val.length;
+          Value value = new Value(val);
+          if (value.compareTo(ev) != 0) {
+            log.error("unexpected value  (" + rowKey + " " + colf + " " + colq 
+ " : saw " + value + " expected " + new Value(ev));
+            errors++;
+          }
+        }
+
+        expectedCol++;
+        if (expectedCol >= opts.cols) {
+          expectedCol = 0;
+          expectedRow++;
+        }
+
+      } else {
+
+        Key startKey = new Key(new Text("row_" + String.format("%010d", 
expectedRow)));
+
+        Scanner scanner = connector.createScanner(opts.getTableName(), 
labelAuths);
+        scanner.setBatchSize(scanOpts.scanBatchSize);
+        scanner.setRange(new Range(startKey, endKey));
+        for (int j = 0; j < opts.cols; j++) {
+          scanner.fetchColumn(new Text(opts.columnFamily), new Text("col_" + 
String.format("%07d", j)));
+        }
+
+        int recsReadBefore = recsRead;
+
+        for (Entry<Key,Value> entry : scanner) {
+
+          recsRead++;
+
+          bytesRead += entry.getKey().getLength();
+          bytesRead += entry.getValue().getSize();
+
+          int rowNum = getRow(entry.getKey());
+          int colNum = getCol(entry.getKey());
+
+          if (rowNum != expectedRow) {
+            log.error("rowNum != expectedRow   " + rowNum + " != " + 
expectedRow);
+            errors++;
+            expectedRow = rowNum;
+          }
+
+          if (colNum != expectedCol) {
+            log.error("colNum != expectedCol  " + colNum + " != " + 
expectedCol + "  rowNum : " + rowNum);
+            errors++;
+          }
+
+          if (expectedRow >= (opts.rows + opts.startRow)) {
+            log.error("expectedRow (" + expectedRow + ") >= (ingestArgs.rows + 
ingestArgs.startRow)  (" + (opts.rows + opts.startRow)
+                + "), get batch returned data passed end key");
+            errors++;
+            break;
+          }
+
+          byte value[];
+          if (opts.random != null) {
+            value = TestIngest.genRandomValue(random, randomValue, 
opts.random.intValue(), expectedRow, colNum);
+          } else {
+            value = bytevals[colNum % bytevals.length];
+          }
+
+          if (entry.getValue().compareTo(value) != 0) {
+            log.error("unexpected value, rowNum : " + rowNum + " colNum : " + 
colNum);
+            log.error(" saw = " + new String(entry.getValue().get()) + " 
expected = " + new String(value));
+            errors++;
+          }
+
+          if (opts.timestamp >= 0 && entry.getKey().getTimestamp() != 
opts.timestamp) {
+            log.error("unexpected timestamp " + entry.getKey().getTimestamp() 
+ ", rowNum : " + rowNum + " colNum : " + colNum);
+            errors++;
+          }
+
+          expectedCol++;
+          if (expectedCol >= opts.cols) {
+            expectedCol = 0;
+            expectedRow++;
+          }
+
+        }
+
+        if (recsRead == recsReadBefore) {
+          log.warn("Scan returned nothing, breaking...");
+          break;
+        }
+
+      }
+    }
+
+    long t2 = System.currentTimeMillis();
+
+    if (errors > 0) {
+      throw new AccumuloException("saw " + errors + " errors ");
+    }
+
+    if (expectedRow != (opts.rows + opts.startRow)) {
+      throw new AccumuloException("Did not read expected number of rows. Saw " 
+ (expectedRow - opts.startRow) + " expected " + opts.rows);
+    } else {
+      System.out.printf("%,12d records read | %,8d records/sec | %,12d bytes 
read | %,8d bytes/sec | %6.3f secs   %n", recsRead,
+          (int) ((recsRead) / ((t2 - t1) / 1000.0)), bytesRead, (int) 
(bytesRead / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/mapreduce/RowHash.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/testing/core/mapreduce/RowHash.java 
b/core/src/main/java/org/apache/accumulo/testing/core/mapreduce/RowHash.java
new file mode 100644
index 0000000..cb91e19
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/mapreduce/RowHash.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.mapreduce;
+
+import java.io.IOException;
+import java.util.Base64;
+import java.util.Collections;
+
+import org.apache.accumulo.core.cli.MapReduceClientOnRequiredTable;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.beust.jcommander.Parameter;
+
+public class RowHash extends Configured implements Tool {
+  /**
+   * The Mapper class that given a row number, will generate the appropriate 
output line.
+   */
+  public static class HashDataMapper extends Mapper<Key,Value,Text,Mutation> {
+    @Override
+    public void map(Key row, Value data, Context context) throws IOException, 
InterruptedException {
+      Mutation m = new Mutation(row.getRow());
+      m.put(new Text("cf-HASHTYPE"), new Text("cq-MD5BASE64"), new 
Value(Base64.getEncoder().encode(MD5Hash.digest(data.toString()).getDigest())));
+      context.write(null, m);
+      context.progress();
+    }
+
+    @Override
+    public void setup(Context job) {}
+  }
+
+  private static class Opts extends MapReduceClientOnRequiredTable {
+    @Parameter(names = "--column", required = true)
+    String column;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Job job = Job.getInstance(getConf());
+    job.setJobName(this.getClass().getName());
+    job.setJarByClass(this.getClass());
+    Opts opts = new Opts();
+    opts.parseArgs(RowHash.class.getName(), args);
+    job.setInputFormatClass(AccumuloInputFormat.class);
+    opts.setAccumuloConfigs(job);
+
+    String col = opts.column;
+    int idx = col.indexOf(":");
+    Text cf = new Text(idx < 0 ? col : col.substring(0, idx));
+    Text cq = idx < 0 ? null : new Text(col.substring(idx + 1));
+    if (cf.getLength() > 0)
+      AccumuloInputFormat.fetchColumns(job, Collections.singleton(new 
Pair<>(cf, cq)));
+
+    job.setMapperClass(HashDataMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Mutation.class);
+
+    job.setNumReduceTasks(0);
+
+    job.setOutputFormatClass(AccumuloOutputFormat.class);
+
+    job.waitForCompletion(true);
+    return job.isSuccessful() ? 0 : 1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new RowHash(), args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/mapreduce/TeraSortIngest.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/testing/core/mapreduce/TeraSortIngest.java
 
b/core/src/main/java/org/apache/accumulo/testing/core/mapreduce/TeraSortIngest.java
new file mode 100644
index 0000000..6e18000
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/testing/core/mapreduce/TeraSortIngest.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.core.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.accumulo.core.cli.MapReduceClientOnRequiredTable;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Generate the *almost* official terasort input data set. (See below) The 
user specifies the number of rows and the output directory and this class runs a
+ * map/reduce program to generate the data. The format of the data is:
+ * <ul>
+ * <li>(10 bytes key) (10 bytes rowid) (78 bytes filler) \r \n
+ * <li>The keys are random characters from the set ' ' .. '~'.
+ * <li>The rowid is the right justified row id as a int.
+ * <li>The filler consists of 7 runs of 10 characters from 'A' to 'Z'.
+ * </ul>
+ *
+ * This TeraSort is slightly modified to allow for variable length key sizes 
and value sizes. The row length isn't variable. To generate a terabyte of data 
in
+ * the same way TeraSort does use 10000000000 rows and 10/10 byte key length 
and 78/78 byte value length. Along with the 10 byte row id and \r\n this gives 
you
+ * 100 byte row * 10000000000 rows = 1tb. Min/Max ranges for key and value 
parameters are inclusive/inclusive respectively.
+ *
+ *
+ */
+public class TeraSortIngest extends Configured implements Tool {
+  /**
+   * An input format that assigns ranges of longs to each mapper.
+   */
+  static class RangeInputFormat extends InputFormat<LongWritable,NullWritable> 
{
+    /**
+     * An input split consisting of a range on numbers.
+     */
+    static class RangeInputSplit extends InputSplit implements Writable {
+      long firstRow;
+      long rowCount;
+
+      public RangeInputSplit() {}
+
+      public RangeInputSplit(long offset, long length) {
+        firstRow = offset;
+        rowCount = length;
+      }
+
+      @Override
+      public long getLength() throws IOException {
+        return 0;
+      }
+
+      @Override
+      public String[] getLocations() throws IOException {
+        return new String[] {};
+      }
+
+      @Override
+      public void readFields(DataInput in) throws IOException {
+        firstRow = WritableUtils.readVLong(in);
+        rowCount = WritableUtils.readVLong(in);
+      }
+
+      @Override
+      public void write(DataOutput out) throws IOException {
+        WritableUtils.writeVLong(out, firstRow);
+        WritableUtils.writeVLong(out, rowCount);
+      }
+    }
+
+    /**
+     * A record reader that will generate a range of numbers.
+     */
+    static class RangeRecordReader extends 
RecordReader<LongWritable,NullWritable> {
+      long startRow;
+      long finishedRows;
+      long totalRows;
+
+      public RangeRecordReader(RangeInputSplit split) {
+        startRow = split.firstRow;
+        finishedRows = 0;
+        totalRows = split.rowCount;
+      }
+
+      @Override
+      public void close() throws IOException {}
+
+      @Override
+      public float getProgress() throws IOException {
+        return finishedRows / (float) totalRows;
+      }
+
+      @Override
+      public LongWritable getCurrentKey() throws IOException, 
InterruptedException {
+        return new LongWritable(startRow + finishedRows);
+      }
+
+      @Override
+      public NullWritable getCurrentValue() throws IOException, 
InterruptedException {
+        return NullWritable.get();
+      }
+
+      @Override
+      public void initialize(InputSplit split, TaskAttemptContext context) 
throws IOException, InterruptedException {}
+
+      @Override
+      public boolean nextKeyValue() throws IOException, InterruptedException {
+        if (finishedRows < totalRows) {
+          ++finishedRows;
+          return true;
+        }
+        return false;
+      }
+    }
+
+    @Override
+    public RecordReader<LongWritable,NullWritable> 
createRecordReader(InputSplit split, TaskAttemptContext context) throws 
IOException {
+      // reporter.setStatus("Creating record reader");
+      return new RangeRecordReader((RangeInputSplit) split);
+    }
+
+    /**
+     * Create the desired number of splits, dividing the number of rows 
between the mappers.
+     */
+    @Override
+    public List<InputSplit> getSplits(JobContext job) {
+      long totalRows = job.getConfiguration().getLong(NUMROWS, 0);
+      int numSplits = job.getConfiguration().getInt(NUMSPLITS, 1);
+      long rowsPerSplit = totalRows / numSplits;
+      System.out.println("Generating " + totalRows + " using " + numSplits + " 
maps with step of " + rowsPerSplit);
+      ArrayList<InputSplit> splits = new ArrayList<>(numSplits);
+      long currentRow = 0;
+      for (int split = 0; split < numSplits - 1; ++split) {
+        splits.add(new RangeInputSplit(currentRow, rowsPerSplit));
+        currentRow += rowsPerSplit;
+      }
+      splits.add(new RangeInputSplit(currentRow, totalRows - currentRow));
+      System.out.println("Done Generating.");
+      return splits;
+    }
+
+  }
+
+  private static String NUMSPLITS = "terasort.overridesplits";
+  private static String NUMROWS = "terasort.numrows";
+
+  static class RandomGenerator {
+    private long seed = 0;
+    private static final long mask32 = (1l << 32) - 1;
+    /**
+     * The number of iterations separating the precomputed seeds.
+     */
+    private static final int seedSkip = 128 * 1024 * 1024;
+    /**
+     * The precomputed seed values after every seedSkip iterations. There 
should be enough values so that a 2**32 iterations are covered.
+     */
+    private static final long[] seeds = new long[] {0L, 4160749568L, 
4026531840L, 3892314112L, 3758096384L, 3623878656L, 3489660928L, 3355443200L, 
3221225472L,
+        3087007744L, 2952790016L, 2818572288L, 2684354560L, 2550136832L, 
2415919104L, 2281701376L, 2147483648L, 2013265920L, 1879048192L, 1744830464L,
+        1610612736L, 1476395008L, 1342177280L, 1207959552L, 1073741824L, 
939524096L, 805306368L, 671088640L, 536870912L, 402653184L, 268435456L, 
134217728L,};
+
+    /**
+     * Start the random number generator on the given iteration.
+     *
+     * @param initalIteration
+     *          the iteration number to start on
+     */
+    RandomGenerator(long initalIteration) {
+      int baseIndex = (int) ((initalIteration & mask32) / seedSkip);
+      seed = seeds[baseIndex];
+      for (int i = 0; i < initalIteration % seedSkip; ++i) {
+        next();
+      }
+    }
+
+    RandomGenerator() {
+      this(0);
+    }
+
+    long next() {
+      seed = (seed * 3141592621l + 663896637) & mask32;
+      return seed;
+    }
+  }
+
+  /**
+   * The Mapper class that given a row number, will generate the appropriate 
output line.
+   */
+  public static class SortGenMapper extends 
Mapper<LongWritable,NullWritable,Text,Mutation> {
+    private Text tableName = null;
+    private int minkeylength = 0;
+    private int maxkeylength = 0;
+    private int minvaluelength = 0;
+    private int maxvaluelength = 0;
+
+    private Text key = new Text();
+    private Text value = new Text();
+    private RandomGenerator rand;
+    private byte[] keyBytes; // = new byte[12];
+    private byte[] spaces = "          ".getBytes();
+    private byte[][] filler = new byte[26][];
+    {
+      for (int i = 0; i < 26; ++i) {
+        filler[i] = new byte[10];
+        for (int j = 0; j < 10; ++j) {
+          filler[i][j] = (byte) ('A' + i);
+        }
+      }
+    }
+
+    /**
+     * Add a random key to the text
+     */
+    private Random random = new Random();
+
+    private void addKey() {
+      int range = random.nextInt(maxkeylength - minkeylength + 1);
+      int keylen = range + minkeylength;
+      int keyceil = keylen + (4 - (keylen % 4));
+      keyBytes = new byte[keyceil];
+
+      long temp = 0;
+      for (int i = 0; i < keyceil / 4; i++) {
+        temp = rand.next() / 52;
+        keyBytes[3 + 4 * i] = (byte) (' ' + (temp % 95));
+        temp /= 95;
+        keyBytes[2 + 4 * i] = (byte) (' ' + (temp % 95));
+        temp /= 95;
+        keyBytes[1 + 4 * i] = (byte) (' ' + (temp % 95));
+        temp /= 95;
+        keyBytes[4 * i] = (byte) (' ' + (temp % 95));
+      }
+      key.set(keyBytes, 0, keylen);
+    }
+
+    /**
+     * Add the rowid to the row.
+     */
+    private Text getRowIdString(long rowId) {
+      Text paddedRowIdString = new Text();
+      byte[] rowid = Integer.toString((int) rowId).getBytes();
+      int padSpace = 10 - rowid.length;
+      if (padSpace > 0) {
+        paddedRowIdString.append(spaces, 0, 10 - rowid.length);
+      }
+      paddedRowIdString.append(rowid, 0, Math.min(rowid.length, 10));
+      return paddedRowIdString;
+    }
+
+    /**
+     * Add the required filler bytes. Each row consists of 7 blocks of 10 
characters and 1 block of 8 characters.
+     *
+     * @param rowId
+     *          the current row number
+     */
+    private void addFiller(long rowId) {
+      int base = (int) ((rowId * 8) % 26);
+
+      // Get Random var
+      Random random = new Random(rand.seed);
+
+      int range = random.nextInt(maxvaluelength - minvaluelength + 1);
+      int valuelen = range + minvaluelength;
+
+      while (valuelen > 10) {
+        value.append(filler[(base + valuelen) % 26], 0, 10);
+        valuelen -= 10;
+      }
+
+      if (valuelen > 0)
+        value.append(filler[(base + valuelen) % 26], 0, valuelen);
+    }
+
+    @Override
+    public void map(LongWritable row, NullWritable ignored, Context context) 
throws IOException, InterruptedException {
+      context.setStatus("Entering");
+      long rowId = row.get();
+      if (rand == null) {
+        // we use 3 random numbers per a row
+        rand = new RandomGenerator(rowId * 3);
+      }
+      addKey();
+      value.clear();
+      // addRowId(rowId);
+      addFiller(rowId);
+
+      // New
+      Mutation m = new Mutation(key);
+      m.put(new Text("c"), // column family
+          getRowIdString(rowId), // column qual
+          new Value(value.toString().getBytes())); // data
+
+      context.setStatus("About to add to accumulo");
+      context.write(tableName, m);
+      context.setStatus("Added to accumulo " + key.toString());
+    }
+
+    @Override
+    public void setup(Context job) {
+      minkeylength = job.getConfiguration().getInt("cloudgen.minkeylength", 0);
+      maxkeylength = job.getConfiguration().getInt("cloudgen.maxkeylength", 0);
+      minvaluelength = 
job.getConfiguration().getInt("cloudgen.minvaluelength", 0);
+      maxvaluelength = 
job.getConfiguration().getInt("cloudgen.maxvaluelength", 0);
+      tableName = new Text(job.getConfiguration().get("cloudgen.tablename"));
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new TeraSortIngest(), args);
+  }
+
+  static class Opts extends MapReduceClientOnRequiredTable {
+    @Parameter(names = "--count", description = "number of rows to ingest", 
required = true)
+    long numRows;
+    @Parameter(names = {"-nk", "--minKeySize"}, description = "miniumum key 
size", required = true)
+    int minKeyLength;
+    @Parameter(names = {"-xk", "--maxKeySize"}, description = "maximum key 
size", required = true)
+    int maxKeyLength;
+    @Parameter(names = {"-nv", "--minValueSize"}, description = "minimum key 
size", required = true)
+    int minValueLength;
+    @Parameter(names = {"-xv", "--maxValueSize"}, description = "maximum key 
size", required = true)
+    int maxValueLength;
+    @Parameter(names = "--splits", description = "number of splits to create 
in the table")
+    int splits = 0;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Job job = Job.getInstance(getConf());
+    job.setJobName("TeraSortCloud");
+    job.setJarByClass(this.getClass());
+    Opts opts = new Opts();
+    opts.parseArgs(TeraSortIngest.class.getName(), args);
+
+    job.setInputFormatClass(RangeInputFormat.class);
+    job.setMapperClass(SortGenMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Mutation.class);
+
+    job.setNumReduceTasks(0);
+
+    job.setOutputFormatClass(AccumuloOutputFormat.class);
+    opts.setAccumuloConfigs(job);
+    BatchWriterConfig bwConfig = new BatchWriterConfig().setMaxMemory(10L * 
1000 * 1000);
+    AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
+
+    Configuration conf = job.getConfiguration();
+    conf.setLong(NUMROWS, opts.numRows);
+    conf.setInt("cloudgen.minkeylength", opts.minKeyLength);
+    conf.setInt("cloudgen.maxkeylength", opts.maxKeyLength);
+    conf.setInt("cloudgen.minvaluelength", opts.minValueLength);
+    conf.setInt("cloudgen.maxvaluelength", opts.maxValueLength);
+    conf.set("cloudgen.tablename", opts.getTableName());
+
+    if (args.length > 10)
+      conf.setInt(NUMSPLITS, opts.splits);
+
+    job.waitForCompletion(true);
+    return job.isSuccessful() ? 0 : 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/merkle/MerkleTree.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/testing/core/merkle/MerkleTree.java 
b/core/src/main/java/org/apache/accumulo/testing/core/merkle/MerkleTree.java
new file mode 100644
index 0000000..003cd5d
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/merkle/MerkleTree.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.merkle;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.accumulo.core.util.Pair;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Simple implementation of a Merkle tree
+ */
+public class MerkleTree {
+  protected List<MerkleTreeNode> leaves;
+  protected String digestAlgorithm;
+
+  public MerkleTree(List<MerkleTreeNode> leaves, String digestAlgorithm) {
+    this.leaves = leaves;
+    this.digestAlgorithm = digestAlgorithm;
+  }
+
+  public MerkleTreeNode getRootNode() throws NoSuchAlgorithmException {
+    ArrayList<MerkleTreeNode> buffer = new ArrayList<>(leaves.size());
+    buffer.addAll(leaves);
+
+    while (buffer.size() > 1) {
+      // Find two nodes that we want to roll up
+      Pair<Integer,Integer> pairToJoin = findNextPair(buffer);
+
+      // Make a parent node from them
+      MerkleTreeNode parent = new 
MerkleTreeNode(Arrays.asList(buffer.get(pairToJoin.getFirst()), 
buffer.get(pairToJoin.getSecond())), digestAlgorithm);
+
+      // Insert it back into the "tree" at the position of the first child
+      buffer.set(pairToJoin.getFirst(), parent);
+
+      // Remove the second child completely
+      buffer.remove(pairToJoin.getSecond().intValue());
+
+      // "recurse"
+    }
+
+    return Iterables.getOnlyElement(buffer);
+  }
+
+  protected Pair<Integer,Integer> findNextPair(List<MerkleTreeNode> nodes) {
+    int i = 0, j = 1;
+    while (i < nodes.size() && j < nodes.size()) {
+      MerkleTreeNode left = nodes.get(i), right = nodes.get(j);
+
+      // At the same level
+      if (left.getLevel() == right.getLevel()) {
+        return new Pair<>(i, j);
+      }
+
+      // Peek to see if we have another element
+      if (j + 1 < nodes.size()) {
+        // If we do, try to match those
+        i++;
+        j++;
+      } else {
+        // Otherwise, the last two elements must be paired
+        return new Pair<>(i, j);
+      }
+    }
+
+    if (2 < nodes.size()) {
+      throw new IllegalStateException("Should not have exited loop without 
pairing two elements when we have at least 3 nodes");
+    } else if (2 == nodes.size()) {
+      return new Pair<>(0, 1);
+    } else {
+      throw new IllegalStateException("Must have at least two nodes to pair");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/merkle/MerkleTreeNode.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/testing/core/merkle/MerkleTreeNode.java
 
b/core/src/main/java/org/apache/accumulo/testing/core/merkle/MerkleTreeNode.java
new file mode 100644
index 0000000..168b6e1
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/testing/core/merkle/MerkleTreeNode.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.merkle;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the level (height) within the tree, the ranges that it covers, 
and the new hash
+ */
+public class MerkleTreeNode {
+  private static final Logger log = 
LoggerFactory.getLogger(MerkleTreeNode.class);
+
+  private Range range;
+  private int level;
+  private List<Range> children;
+  private byte[] hash;
+
+  public MerkleTreeNode(Range range, int level, List<Range> children, byte[] 
hash) {
+    this.range = range;
+    this.level = level;
+    this.children = children;
+    this.hash = hash;
+  }
+
+  public MerkleTreeNode(Key k, Value v) {
+    range = RangeSerialization.toRange(k);
+    level = 0;
+    children = Collections.emptyList();
+    hash = v.get();
+  }
+
+  public MerkleTreeNode(List<MerkleTreeNode> children, String digestAlgorithm) 
throws NoSuchAlgorithmException {
+    level = 0;
+    this.children = new ArrayList<>(children.size());
+    MessageDigest digest = MessageDigest.getInstance(digestAlgorithm);
+
+    Range childrenRange = null;
+    for (MerkleTreeNode child : children) {
+      this.children.add(child.getRange());
+      level = Math.max(child.getLevel(), level);
+      digest.update(child.getHash());
+
+      if (null == childrenRange) {
+        childrenRange = child.getRange();
+      } else {
+        List<Range> overlappingRanges = 
Range.mergeOverlapping(Arrays.asList(childrenRange, child.getRange()));
+        if (1 != overlappingRanges.size()) {
+          log.error("Tried to merge non-contiguous ranges: {} {}", 
childrenRange, child.getRange());
+          throw new IllegalArgumentException("Ranges must be contiguous: " + 
childrenRange + ", " + child.getRange());
+        }
+
+        childrenRange = overlappingRanges.get(0);
+      }
+    }
+
+    // Our actual level is one more than the highest level of our children
+    level++;
+
+    // Roll the hash up the tree
+    hash = digest.digest();
+
+    // Set the range to be the merged result of the children
+    range = childrenRange;
+  }
+
+  public Range getRange() {
+    return range;
+  }
+
+  public int getLevel() {
+    return level;
+  }
+
+  public List<Range> getChildren() {
+    return children;
+  }
+
+  public byte[] getHash() {
+    return hash;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(32);
+    sb.append("range=").append(range).append(" level=").append(level).append(" 
hash=").append(Hex.encodeHexString(hash)).append(" children=").append(children);
+    return sb.toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof MerkleTreeNode) {
+      MerkleTreeNode other = (MerkleTreeNode) o;
+      return range.equals(other.getRange()) && level == other.getLevel() && 
children.equals(other.getChildren()) && Arrays.equals(hash, other.getHash());
+    }
+
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder(1395, 39532);
+    return 
hcb.append(range).append(level).append(children).append(hash).toHashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/merkle/RangeSerialization.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/testing/core/merkle/RangeSerialization.java
 
b/core/src/main/java/org/apache/accumulo/testing/core/merkle/RangeSerialization.java
new file mode 100644
index 0000000..c301d49
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/testing/core/merkle/RangeSerialization.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.merkle;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+/**
+ *
+ */
+public class RangeSerialization {
+  private static final Text EMPTY = new Text(new byte[0]);
+
+  public static Range toRange(Key key) {
+    Text holder = new Text();
+    key.getRow(holder);
+    Key startKey;
+    if (0 == holder.getLength()) {
+      startKey = null;
+    } else {
+      startKey = new Key(holder);
+    }
+
+    key.getColumnQualifier(holder);
+    Key endKey;
+    if (0 == holder.getLength()) {
+      endKey = null;
+    } else {
+      endKey = new Key(holder);
+    }
+
+    // Don't be inclusive for no bounds on a Range
+    return new Range(startKey, startKey != null, endKey, endKey != null);
+  }
+
+  public static Key toKey(Range range) {
+    Text row = getRow(range);
+    return new Key(row, EMPTY, getColumnQualifier(range));
+  }
+
+  public static Mutation toMutation(Range range, Value v) {
+    Text row = getRow(range);
+    Mutation m = new Mutation(row);
+    m.put(EMPTY, getColumnQualifier(range), v);
+    return m;
+  }
+
+  public static Text getRow(Range range) {
+    return range.isInfiniteStartKey() ? EMPTY : range.getStartKey().getRow();
+  }
+
+  public static Text getColumnQualifier(Range range) {
+    return range.isInfiniteStopKey() ? EMPTY : range.getEndKey().getRow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/CompareTables.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/CompareTables.java
 
b/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/CompareTables.java
new file mode 100644
index 0000000..fd6251f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/CompareTables.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.merkle.cli;
+
+import java.io.FileNotFoundException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Range;
+import org.apache.commons.codec.binary.Hex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Accepts a set of tables, computes the hashes for each, and prints the 
top-level hash for each table.
+ * <p>
+ * Will automatically create output tables for intermediate hashes instead of 
requiring their existence. This will raise an exception when the table we want 
to
+ * use already exists.
+ */
+public class CompareTables {
+  private static final Logger log = 
LoggerFactory.getLogger(CompareTables.class);
+
+  public static class CompareTablesOpts extends ClientOpts {
+    @Parameter(names = {"--tables"}, description = "Tables to compare", 
variableArity = true)
+    public List<String> tables;
+
+    @Parameter(names = {"-nt", "--numThreads"}, required = false, description 
= "number of concurrent threads calculating digests")
+    private int numThreads = 4;
+
+    @Parameter(names = {"-hash", "--hash"}, required = true, description = 
"type of hash to use")
+    private String hashName;
+
+    @Parameter(names = {"-iter", "--iterator"}, required = false, description 
= "Should pushdown digest to iterators")
+    private boolean iteratorPushdown = false;
+
+    @Parameter(names = {"-s", "--splits"}, required = false, description = 
"File of splits to use for merkle tree")
+    private String splitsFile = null;
+
+    public List<String> getTables() {
+      return this.tables;
+    }
+
+    public void setTables(List<String> tables) {
+      this.tables = tables;
+    }
+
+    public int getNumThreads() {
+      return numThreads;
+    }
+
+    public void setNumThreads(int numThreads) {
+      this.numThreads = numThreads;
+    }
+
+    public String getHashName() {
+      return hashName;
+    }
+
+    public void setHashName(String hashName) {
+      this.hashName = hashName;
+    }
+
+    public boolean isIteratorPushdown() {
+      return iteratorPushdown;
+    }
+
+    public void setIteratorPushdown(boolean iteratorPushdown) {
+      this.iteratorPushdown = iteratorPushdown;
+    }
+
+    public String getSplitsFile() {
+      return splitsFile;
+    }
+
+    public void setSplitsFile(String splitsFile) {
+      this.splitsFile = splitsFile;
+    }
+  }
+
+  private CompareTablesOpts opts;
+
+  protected CompareTables() {}
+
+  public CompareTables(CompareTablesOpts opts) {
+    this.opts = opts;
+  }
+
+  public Map<String,String> computeAllHashes() throws AccumuloException, 
AccumuloSecurityException, TableExistsException, NoSuchAlgorithmException,
+      TableNotFoundException, FileNotFoundException {
+    final Connector conn = opts.getConnector();
+    final Map<String,String> hashesByTable = new HashMap<>();
+
+    for (String table : opts.getTables()) {
+      final String outputTableName = table + "_merkle";
+
+      if (conn.tableOperations().exists(outputTableName)) {
+        throw new IllegalArgumentException("Expected output table name to not 
yet exist: " + outputTableName);
+      }
+
+      conn.tableOperations().create(outputTableName);
+
+      GenerateHashes genHashes = new GenerateHashes();
+      Collection<Range> ranges = genHashes.getRanges(opts.getConnector(), 
table, opts.getSplitsFile());
+
+      try {
+        genHashes.run(opts.getConnector(), table, table + "_merkle", 
opts.getHashName(), opts.getNumThreads(), opts.isIteratorPushdown(), ranges);
+      } catch (Exception e) {
+        log.error("Error generating hashes for {}", table, e);
+        throw new RuntimeException(e);
+      }
+
+      ComputeRootHash computeRootHash = new ComputeRootHash();
+      String hash = Hex.encodeHexString(computeRootHash.getHash(conn, 
outputTableName, opts.getHashName()));
+
+      hashesByTable.put(table, hash);
+    }
+
+    return hashesByTable;
+  }
+
+  public static void main(String[] args) throws Exception {
+    CompareTablesOpts opts = new CompareTablesOpts();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    opts.parseArgs("CompareTables", args, bwOpts);
+
+    if (opts.isIteratorPushdown() && null != opts.getSplitsFile()) {
+      throw new IllegalArgumentException("Cannot use iterator pushdown with 
anything other than table split points");
+    }
+
+    CompareTables compareTables = new CompareTables(opts);
+    Map<String,String> tableToHashes = compareTables.computeAllHashes();
+
+    boolean hashesEqual = true;
+    String previousHash = null;
+    for (Entry<String,String> entry : tableToHashes.entrySet()) {
+      // Set the previous hash if we dont' have one
+      if (null == previousHash) {
+        previousHash = entry.getValue();
+      } else if (hashesEqual) {
+        // If the hashes are still equal, check that the new hash is also equal
+        hashesEqual = previousHash.equals(entry.getValue());
+      }
+
+      System.out.println(entry.getKey() + " " + entry.getValue());
+    }
+
+    System.exit(hashesEqual ? 0 : 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/ComputeRootHash.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/ComputeRootHash.java
 
b/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/ComputeRootHash.java
new file mode 100644
index 0000000..4511446
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/ComputeRootHash.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.merkle.cli;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.test.replication.merkle.MerkleTree;
+import org.apache.accumulo.test.replication.merkle.MerkleTreeNode;
+import org.apache.accumulo.test.replication.merkle.RangeSerialization;
+import org.apache.commons.codec.binary.Hex;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Given a table created by {@link GenerateHashes} which contains the leaves 
of a Merkle tree, compute the root node of the Merkle tree which can be quickly
+ * compared to the root node of another Merkle tree to ascertain equality.
+ */
+public class ComputeRootHash {
+
+  public static class ComputeRootHashOpts extends ClientOnRequiredTable {
+    @Parameter(names = {"-hash", "--hash"}, required = true, description = 
"type of hash to use")
+    private String hashName;
+
+    public String getHashName() {
+      return hashName;
+    }
+
+    public void setHashName(String hashName) {
+      this.hashName = hashName;
+    }
+  }
+
+  public byte[] getHash(ComputeRootHashOpts opts) throws AccumuloException, 
AccumuloSecurityException, TableNotFoundException, NoSuchAlgorithmException {
+    Connector conn = opts.getConnector();
+    String table = opts.getTableName();
+
+    return getHash(conn, table, opts.getHashName());
+  }
+
+  public byte[] getHash(Connector conn, String table, String hashName) throws 
TableNotFoundException, NoSuchAlgorithmException {
+    List<MerkleTreeNode> leaves = getLeaves(conn, table);
+
+    MerkleTree tree = new MerkleTree(leaves, hashName);
+
+    return tree.getRootNode().getHash();
+  }
+
+  protected ArrayList<MerkleTreeNode> getLeaves(Connector conn, String 
tableName) throws TableNotFoundException {
+    // TODO make this a bit more resilient to very large merkle trees by 
lazily reading more data from the table when necessary
+    final Scanner s = conn.createScanner(tableName, Authorizations.EMPTY);
+    final ArrayList<MerkleTreeNode> leaves = new ArrayList<>();
+
+    for (Entry<Key,Value> entry : s) {
+      Range range = RangeSerialization.toRange(entry.getKey());
+      byte[] hash = entry.getValue().get();
+
+      leaves.add(new MerkleTreeNode(range, 0, Collections.<Range> emptyList(), 
hash));
+    }
+
+    return leaves;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ComputeRootHashOpts opts = new ComputeRootHashOpts();
+    opts.parseArgs("ComputeRootHash", args);
+
+    ComputeRootHash computeRootHash = new ComputeRootHash();
+    byte[] rootHash = computeRootHash.getHash(opts);
+
+    System.out.println(Hex.encodeHexString(rootHash));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/GenerateHashes.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/GenerateHashes.java
 
b/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/GenerateHashes.java
new file mode 100644
index 0000000..35bf684
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/GenerateHashes.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.merkle.cli;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.test.replication.merkle.RangeSerialization;
+import org.apache.accumulo.test.replication.merkle.skvi.DigestIterator;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.collect.Iterables;
+
+/**
+ * Read from a table, compute a Merkle tree and output it to a table. Each 
key-value pair in the destination table is a leaf node of the Merkle tree.
+ */
+public class GenerateHashes {
+  private static final Logger log = 
LoggerFactory.getLogger(GenerateHashes.class);
+
+  public static class GenerateHashesOpts extends ClientOnRequiredTable {
+    @Parameter(names = {"-hash", "--hash"}, required = true, description = 
"type of hash to use")
+    private String hashName;
+
+    @Parameter(names = {"-o", "--output"}, required = true, description = 
"output table name, expected to exist and be writable")
+    private String outputTableName;
+
+    @Parameter(names = {"-nt", "--numThreads"}, required = false, description 
= "number of concurrent threads calculating digests")
+    private int numThreads = 4;
+
+    @Parameter(names = {"-iter", "--iterator"}, required = false, description 
= "Should we push down logic with an iterator")
+    private boolean iteratorPushdown = false;
+
+    @Parameter(names = {"-s", "--splits"}, required = false, description = 
"File of splits to use for merkle tree")
+    private String splitsFile = null;
+
+    public String getHashName() {
+      return hashName;
+    }
+
+    public void setHashName(String hashName) {
+      this.hashName = hashName;
+    }
+
+    public String getOutputTableName() {
+      return outputTableName;
+    }
+
+    public void setOutputTableName(String outputTableName) {
+      this.outputTableName = outputTableName;
+    }
+
+    public int getNumThreads() {
+      return numThreads;
+    }
+
+    public void setNumThreads(int numThreads) {
+      this.numThreads = numThreads;
+    }
+
+    public boolean isIteratorPushdown() {
+      return iteratorPushdown;
+    }
+
+    public void setIteratorPushdown(boolean iteratorPushdown) {
+      this.iteratorPushdown = iteratorPushdown;
+    }
+
+    public String getSplitsFile() {
+      return splitsFile;
+    }
+
+    public void setSplitsFile(String splitsFile) {
+      this.splitsFile = splitsFile;
+    }
+  }
+
+  public Collection<Range> getRanges(Connector conn, String tableName, String 
splitsFile) throws TableNotFoundException, AccumuloSecurityException,
+      AccumuloException, FileNotFoundException {
+    if (null == splitsFile) {
+      log.info("Using table split points");
+      Collection<Text> endRows = conn.tableOperations().listSplits(tableName);
+      return endRowsToRanges(endRows);
+    } else {
+      log.info("Using provided split points");
+      ArrayList<Text> splits = new ArrayList<>();
+
+      String line;
+      java.util.Scanner file = new java.util.Scanner(new File(splitsFile), 
UTF_8.name());
+      try {
+        while (file.hasNextLine()) {
+          line = file.nextLine();
+          if (!line.isEmpty()) {
+            splits.add(new Text(line));
+          }
+        }
+      } finally {
+        file.close();
+      }
+
+      Collections.sort(splits);
+      return endRowsToRanges(splits);
+    }
+  }
+
+  public void run(GenerateHashesOpts opts) throws TableNotFoundException, 
AccumuloSecurityException, AccumuloException, NoSuchAlgorithmException,
+      FileNotFoundException {
+    Collection<Range> ranges = getRanges(opts.getConnector(), 
opts.getTableName(), opts.getSplitsFile());
+
+    run(opts.getConnector(), opts.getTableName(), opts.getOutputTableName(), 
opts.getHashName(), opts.getNumThreads(), opts.isIteratorPushdown(), ranges);
+  }
+
+  public void run(final Connector conn, final String inputTableName, final 
String outputTableName, final String digestName, int numThreads,
+      final boolean iteratorPushdown, final Collection<Range> ranges) throws 
TableNotFoundException, AccumuloSecurityException, AccumuloException,
+      NoSuchAlgorithmException {
+    if (!conn.tableOperations().exists(outputTableName)) {
+      throw new IllegalArgumentException(outputTableName + " does not exist, 
please create it");
+    }
+
+    // Get some parallelism
+    ExecutorService svc = Executors.newFixedThreadPool(numThreads);
+    final BatchWriter bw = conn.createBatchWriter(outputTableName, new 
BatchWriterConfig());
+
+    try {
+      for (final Range range : ranges) {
+        final MessageDigest digest = getDigestAlgorithm(digestName);
+
+        svc.execute(new Runnable() {
+
+          @Override
+          public void run() {
+            Scanner s;
+            try {
+              s = conn.createScanner(inputTableName, Authorizations.EMPTY);
+            } catch (Exception e) {
+              log.error("Could not get scanner for " + inputTableName, e);
+              throw new RuntimeException(e);
+            }
+
+            s.setRange(range);
+
+            Value v = null;
+            Mutation m = null;
+            if (iteratorPushdown) {
+              IteratorSetting cfg = new IteratorSetting(50, 
DigestIterator.class);
+              cfg.addOption(DigestIterator.HASH_NAME_KEY, digestName);
+              s.addScanIterator(cfg);
+
+              // The scanner should only ever return us one Key-Value, 
otherwise this approach won't work
+              Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+
+              v = entry.getValue();
+              m = RangeSerialization.toMutation(range, v);
+            } else {
+              ByteArrayOutputStream baos = new ByteArrayOutputStream();
+              for (Entry<Key,Value> entry : s) {
+                DataOutputStream out = new DataOutputStream(baos);
+                try {
+                  entry.getKey().write(out);
+                  entry.getValue().write(out);
+                } catch (Exception e) {
+                  log.error("Error writing {}", entry, e);
+                  throw new RuntimeException(e);
+                }
+
+                digest.update(baos.toByteArray());
+                baos.reset();
+              }
+
+              v = new Value(digest.digest());
+              m = RangeSerialization.toMutation(range, v);
+            }
+
+            // Log some progress
+            log.info("{} computed digest for {} of {}", 
Thread.currentThread().getName(), range, Hex.encodeHexString(v.get()));
+
+            try {
+              bw.addMutation(m);
+            } catch (MutationsRejectedException e) {
+              log.error("Could not write mutation", e);
+              throw new RuntimeException(e);
+            }
+          }
+        });
+      }
+
+      svc.shutdown();
+
+      // Wait indefinitely for the scans to complete
+      while (!svc.isTerminated()) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          log.error("Interrupted while waiting for executor service to 
gracefully complete. Exiting now");
+          svc.shutdownNow();
+          return;
+        }
+      }
+    } finally {
+      // We can only safely close this when we're exiting or we've completely 
all tasks
+      bw.close();
+    }
+  }
+
+  public TreeSet<Range> endRowsToRanges(Collection<Text> endRows) {
+    ArrayList<Text> sortedEndRows = new ArrayList<>(endRows);
+    Collections.sort(sortedEndRows);
+
+    Text prevEndRow = null;
+    TreeSet<Range> ranges = new TreeSet<>();
+    for (Text endRow : sortedEndRows) {
+      if (null == prevEndRow) {
+        ranges.add(new Range(null, false, endRow, true));
+      } else {
+        ranges.add(new Range(prevEndRow, false, endRow, true));
+      }
+      prevEndRow = endRow;
+    }
+
+    ranges.add(new Range(prevEndRow, false, null, false));
+
+    return ranges;
+  }
+
+  protected MessageDigest getDigestAlgorithm(String digestName) throws 
NoSuchAlgorithmException {
+    return MessageDigest.getInstance(digestName);
+  }
+
+  public static void main(String[] args) throws Exception {
+    GenerateHashesOpts opts = new GenerateHashesOpts();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    opts.parseArgs(GenerateHashes.class.getName(), args, bwOpts);
+
+    if (opts.isIteratorPushdown() && null != opts.getSplitsFile()) {
+      throw new IllegalArgumentException("Cannot use iterator pushdown with 
anything other than table split points");
+    }
+
+    GenerateHashes generate = new GenerateHashes();
+    generate.run(opts);
+  }
+}

Reply via email to