ACCUMULO-4510 Moved remaining external test code from Accumulo
Project: http://git-wip-us.apache.org/repos/asf/accumulo-testing/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo-testing/commit/0d97273c Tree: http://git-wip-us.apache.org/repos/asf/accumulo-testing/tree/0d97273c Diff: http://git-wip-us.apache.org/repos/asf/accumulo-testing/diff/0d97273c Branch: refs/heads/master Commit: 0d97273cc18be723cb5ce84c91c329dc1885274b Parents: fc3ddfc Author: Mike Walch <mwa...@apache.org> Authored: Tue Jan 24 17:27:50 2017 -0500 Committer: Mike Walch <mwa...@apache.org> Committed: Wed Jan 25 13:15:43 2017 -0500 ---------------------------------------------------------------------- .../core/continuous/ContinuousIngest.java | 4 +- .../core/ingest/BulkImportDirectory.java | 68 ++ .../testing/core/ingest/TestIngest.java | 344 ++++++ .../testing/core/ingest/VerifyIngest.java | 237 +++++ .../testing/core/mapreduce/RowHash.java | 95 ++ .../testing/core/mapreduce/TeraSortIngest.java | 399 +++++++ .../testing/core/merkle/MerkleTree.java | 92 ++ .../testing/core/merkle/MerkleTreeNode.java | 131 +++ .../testing/core/merkle/RangeSerialization.java | 72 ++ .../testing/core/merkle/cli/CompareTables.java | 176 +++ .../core/merkle/cli/ComputeRootHash.java | 100 ++ .../testing/core/merkle/cli/GenerateHashes.java | 287 +++++ .../core/merkle/cli/ManualComparison.java | 95 ++ .../core/merkle/ingest/RandomWorkload.java | 120 +++ .../testing/core/merkle/package-info.java | 39 + .../core/merkle/skvi/DigestIterator.java | 149 +++ .../testing/core/scalability/Ingest.java | 143 +++ .../accumulo/testing/core/scalability/Run.java | 97 ++ .../testing/core/scalability/ScaleTest.java | 88 ++ .../testing/core/stress/DataWriter.java | 50 + .../testing/core/stress/IntArgValidator.java | 34 + .../testing/core/stress/RandomByteArrays.java | 33 + .../testing/core/stress/RandomMutations.java | 56 + .../testing/core/stress/RandomWithinRange.java | 58 + .../accumulo/testing/core/stress/Scan.java | 121 +++ .../accumulo/testing/core/stress/ScanOpts.java | 46 + .../accumulo/testing/core/stress/Stream.java | 40 + .../accumulo/testing/core/stress/Write.java | 77 ++ .../testing/core/stress/WriteOptions.java | 169 +++ .../testing/core/stress/package-info.java | 36 + test/agitator/.gitignore | 3 + test/agitator/README.md | 39 + test/agitator/agitator.ini.example | 56 + test/agitator/agitator.py | 241 +++++ test/agitator/hosts.example | 16 + test/bench/README.md | 61 ++ test/bench/cloudstone1/__init__.py | 15 + test/bench/cloudstone1/cloudstone1.py | 44 + test/bench/cloudstone2/__init__.py | 15 + test/bench/cloudstone2/cloudstone2.py | 49 + test/bench/cloudstone3/__init__.py | 15 + test/bench/cloudstone3/cloudstone3.py | 50 + test/bench/cloudstone4/__init__.py | 15 + test/bench/cloudstone4/cloudstone4.py | 29 + test/bench/cloudstone5/__init__.py | 15 + test/bench/cloudstone5/cloudstone5.py | 29 + test/bench/cloudstone6/__init__.py | 15 + test/bench/cloudstone6/cloudstone6.py | 29 + test/bench/cloudstone7/__init__.py | 15 + test/bench/cloudstone7/cloudstone7.py | 29 + test/bench/cloudstone8/__init__.py | 15 + test/bench/cloudstone8/cloudstone8.py | 64 ++ test/bench/lib/Benchmark.py | 115 ++ test/bench/lib/CreateTablesBenchmark.py | 78 ++ test/bench/lib/IngestBenchmark.py | 94 ++ test/bench/lib/RowHashBenchmark.py | 136 +++ test/bench/lib/TableSplitsBenchmark.py | 76 ++ test/bench/lib/TeraSortBenchmark.py | 110 ++ test/bench/lib/__init__.py | 15 + test/bench/lib/cloudshell.py | 33 + test/bench/lib/fastsplits | 300 ++++++ test/bench/lib/mediumsplits | 650 ++++++++++++ test/bench/lib/options.py | 39 + test/bench/lib/path.py | 38 + test/bench/lib/runner.py | 28 + test/bench/lib/slowsplits | 1000 ++++++++++++++++++ test/bench/lib/splits | 190 ++++ test/bench/lib/tservers.py | 89 ++ test/bench/lib/util.py | 20 + test/bench/run.py | 116 ++ test/compat/diffAPI.pl | 104 ++ test/compat/japi-compliance/README | 53 + test/compat/japi-compliance/exclude_classes.txt | 1 + .../japi-compliance/japi-accumulo-1.5.0.xml | 36 + .../japi-compliance/japi-accumulo-1.5.1.xml | 36 + .../japi-compliance/japi-accumulo-1.5.2.xml | 36 + .../japi-compliance/japi-accumulo-1.6.0.xml | 38 + .../japi-compliance/japi-accumulo-1.6.1.xml | 38 + .../japi-compliance/japi-accumulo-1.6.2.xml | 38 + .../japi-compliance/japi-accumulo-1.7.0.xml | 38 + .../japi-compliance/japi-accumulo-master.xml | 38 + test/merkle-replication/README | 65 ++ .../merkle-replication/configure-replication.sh | 99 ++ test/merkle-replication/ingest-data.sh | 39 + test/merkle-replication/merkle-env.sh | 48 + test/merkle-replication/verify-data.sh | 91 ++ test/scalability/README.md | 57 + test/scalability/conf/Ingest.conf.example | 27 + test/scalability/conf/site.conf.example | 27 + test/scalability/run.py | 228 ++++ test/scale/agitator.txt | 27 + test/scale/catastrophic.txt | 24 + test/scale/deleteLargeTable.txt | 16 + test/scale/restart.txt | 19 + test/stress/README.md | 105 ++ test/stress/reader.sh | 39 + test/stress/readers | 17 + test/stress/start-readers.sh | 40 + test/stress/start-writers.sh | 40 + test/stress/stop-readers.sh | 36 + test/stress/stop-writers.sh | 36 + test/stress/stress-env.sh.example | 60 ++ test/stress/writer.sh | 44 + test/stress/writers | 17 + test/test1/README.md | 46 + test/test1/ingest_test.sh | 22 + test/test1/ingest_test_2.sh | 22 + test/test1/ingest_test_3.sh | 22 + test/test1/verify_test.sh | 22 + test/test1/verify_test_2.sh | 22 + test/test2/README.md | 27 + test/test2/concurrent.sh | 99 ++ test/test3/README.md | 22 + test/test3/bigrow.sh | 27 + test/test4/README.md | 26 + test/test4/bulk_import_test.sh | 72 ++ test/upgrade/upgrade_test.sh | 77 ++ 117 files changed, 9603 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java index f260e78..d583e32 100644 --- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java +++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java @@ -177,7 +177,7 @@ public class ContinuousIngest { return lastFlushTime; } - static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVisibility cv, + public static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVisibility cv, byte[] ingestInstanceId, long count, byte[] prevRow, boolean checksum) { // Adler32 is supposed to be faster, but according to wikipedia is not good for small data.... so used CRC32 instead @@ -202,7 +202,7 @@ public class ContinuousIngest { return m; } - static long genLong(long min, long max, Random r) { + public static long genLong(long min, long max, Random r) { return ((r.nextLong() & 0x7fffffffffffffffL) % (max - min)) + min; } http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/ingest/BulkImportDirectory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/ingest/BulkImportDirectory.java b/core/src/main/java/org/apache/accumulo/testing/core/ingest/BulkImportDirectory.java new file mode 100644 index 0000000..074bd8b --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/ingest/BulkImportDirectory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.ingest; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.server.cli.ClientOnRequiredTable; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.beust.jcommander.Parameter; + +public class BulkImportDirectory { + static class Opts extends ClientOnRequiredTable { + @Parameter(names = {"-s", "--source"}, description = "directory to import from") + String source = null; + @Parameter(names = {"-f", "--failures"}, description = "directory to copy failures into: will be deleted before the bulk import") + String failures = null; + @Parameter(description = "<username> <password> <tablename> <sourcedir> <failuredir>") + List<String> args = new ArrayList<>(); + } + + public static void main(String[] args) throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { + final FileSystem fs = FileSystem.get(CachedConfiguration.getInstance()); + Opts opts = new Opts(); + if (args.length == 5) { + System.err.println("Deprecated syntax for BulkImportDirectory, please use the new style (see --help)"); + final String user = args[0]; + final byte[] pass = args[1].getBytes(UTF_8); + final String tableName = args[2]; + final String dir = args[3]; + final String failureDir = args[4]; + final Path failureDirPath = new Path(failureDir); + fs.delete(failureDirPath, true); + fs.mkdirs(failureDirPath); + HdfsZooInstance.getInstance().getConnector(user, new PasswordToken(pass)).tableOperations().importDirectory(tableName, dir, failureDir, false); + } else { + opts.parseArgs(BulkImportDirectory.class.getName(), args); + fs.delete(new Path(opts.failures), true); + fs.mkdirs(new Path(opts.failures)); + opts.getConnector().tableOperations().importDirectory(opts.getTableName(), opts.source, opts.failures, false); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/ingest/TestIngest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/ingest/TestIngest.java b/core/src/main/java/org/apache/accumulo/testing/core/ingest/TestIngest.java new file mode 100644 index 0000000..bb40f6f --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/ingest/TestIngest.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.ingest; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.IOException; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ClientOnDefaultTable; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.impl.TabletServerBatchWriter; +import org.apache.accumulo.core.client.security.SecurityErrorCode; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.ConstraintViolationSummary; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.TabletId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.file.rfile.RFile; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.trace.DistributedTrace; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.FastFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import com.beust.jcommander.Parameter; + +public class TestIngest { + public static final Authorizations AUTHS = new Authorizations("L1", "L2", "G1", "GROUP2"); + + public static class Opts extends ClientOnDefaultTable { + + @Parameter(names = "--createTable") + public boolean createTable = false; + + @Parameter(names = "--splits", description = "the number of splits to use when creating the table") + public int numsplits = 1; + + @Parameter(names = "--start", description = "the starting row number") + public int startRow = 0; + + @Parameter(names = "--rows", description = "the number of rows to ingest") + public int rows = 100000; + + @Parameter(names = "--cols", description = "the number of columns to ingest per row") + public int cols = 1; + + @Parameter(names = "--random", description = "insert random rows and use the given number to seed the psuedo-random number generator") + public Integer random = null; + + @Parameter(names = "--size", description = "the size of the value to ingest") + public int dataSize = 1000; + + @Parameter(names = "--delete", description = "delete values instead of inserting them") + public boolean delete = false; + + @Parameter(names = {"-ts", "--timestamp"}, description = "timestamp to use for all values") + public long timestamp = -1; + + @Parameter(names = "--rfile", description = "generate data into a file that can be imported") + public String outputFile = null; + + @Parameter(names = "--stride", description = "the difference between successive row ids") + public int stride; + + @Parameter(names = {"-cf", "--columnFamily"}, description = "place columns in this column family") + public String columnFamily = "colf"; + + @Parameter(names = {"-cv", "--columnVisibility"}, description = "place columns in this column family", converter = VisibilityConverter.class) + public ColumnVisibility columnVisibility = new ColumnVisibility(); + + public Configuration conf = null; + public FileSystem fs = null; + + public Opts() { + super("test_ingest"); + } + } + + public static void createTable(Connector conn, Opts args) throws AccumuloException, AccumuloSecurityException, TableExistsException { + if (args.createTable) { + TreeSet<Text> splits = getSplitPoints(args.startRow, args.startRow + args.rows, args.numsplits); + + if (!conn.tableOperations().exists(args.getTableName())) + conn.tableOperations().create(args.getTableName()); + try { + conn.tableOperations().addSplits(args.getTableName(), splits); + } catch (TableNotFoundException ex) { + // unlikely + throw new RuntimeException(ex); + } + } + } + + public static TreeSet<Text> getSplitPoints(long start, long end, long numsplits) { + long splitSize = (end - start) / numsplits; + + long pos = start + splitSize; + + TreeSet<Text> splits = new TreeSet<>(); + + while (pos < end) { + splits.add(new Text(String.format("row_%010d", pos))); + pos += splitSize; + } + return splits; + } + + public static byte[][] generateValues(int dataSize) { + + byte[][] bytevals = new byte[10][]; + + byte[] letters = {'1', '2', '3', '4', '5', '6', '7', '8', '9', '0'}; + + for (int i = 0; i < 10; i++) { + bytevals[i] = new byte[dataSize]; + for (int j = 0; j < dataSize; j++) + bytevals[i][j] = letters[i]; + } + return bytevals; + } + + private static byte ROW_PREFIX[] = "row_".getBytes(UTF_8); + private static byte COL_PREFIX[] = "col_".getBytes(UTF_8); + + public static Text generateRow(int rowid, int startRow) { + return new Text(FastFormat.toZeroPaddedString(rowid + startRow, 10, 10, ROW_PREFIX)); + } + + public static byte[] genRandomValue(Random random, byte dest[], int seed, int row, int col) { + random.setSeed((row ^ seed) ^ col); + random.nextBytes(dest); + toPrintableChars(dest); + + return dest; + } + + public static void toPrintableChars(byte[] dest) { + // transform to printable chars + for (int i = 0; i < dest.length; i++) { + dest[i] = (byte) (((0xff & dest[i]) % 92) + ' '); + } + } + + public static void main(String[] args) throws Exception { + + Opts opts = new Opts(); + BatchWriterOpts bwOpts = new BatchWriterOpts(); + opts.parseArgs(TestIngest.class.getName(), args, bwOpts); + + String name = TestIngest.class.getSimpleName(); + DistributedTrace.enable(name); + + try { + opts.startTracing(name); + + if (opts.debug) + Logger.getLogger(TabletServerBatchWriter.class.getName()).setLevel(Level.TRACE); + + // test batch update + + ingest(opts.getConnector(), opts, bwOpts); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + opts.stopTracing(); + DistributedTrace.disable(); + } + } + + public static void ingest(Connector connector, FileSystem fs, Opts opts, BatchWriterOpts bwOpts) throws IOException, AccumuloException, + AccumuloSecurityException, TableNotFoundException, MutationsRejectedException, TableExistsException { + long stopTime; + + byte[][] bytevals = generateValues(opts.dataSize); + + byte randomValue[] = new byte[opts.dataSize]; + Random random = new Random(); + + long bytesWritten = 0; + + createTable(connector, opts); + + BatchWriter bw = null; + FileSKVWriter writer = null; + + if (opts.outputFile != null) { + Configuration conf = CachedConfiguration.getInstance(); + writer = FileOperations.getInstance().newWriterBuilder().forFile(opts.outputFile + "." + RFile.EXTENSION, fs, conf) + .withTableConfiguration(AccumuloConfiguration.getDefaultConfiguration()).build(); + writer.startDefaultLocalityGroup(); + } else { + bw = connector.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig()); + connector.securityOperations().changeUserAuthorizations(opts.getPrincipal(), AUTHS); + } + Text labBA = new Text(opts.columnVisibility.getExpression()); + + long startTime = System.currentTimeMillis(); + for (int i = 0; i < opts.rows; i++) { + int rowid; + if (opts.stride > 0) { + rowid = ((i % opts.stride) * (opts.rows / opts.stride)) + (i / opts.stride); + } else { + rowid = i; + } + + Text row = generateRow(rowid, opts.startRow); + Mutation m = new Mutation(row); + for (int j = 0; j < opts.cols; j++) { + Text colf = new Text(opts.columnFamily); + Text colq = new Text(FastFormat.toZeroPaddedString(j, 7, 10, COL_PREFIX)); + + if (writer != null) { + Key key = new Key(row, colf, colq, labBA); + if (opts.timestamp >= 0) { + key.setTimestamp(opts.timestamp); + } else { + key.setTimestamp(startTime); + } + + if (opts.delete) { + key.setDeleted(true); + } else { + key.setDeleted(false); + } + + bytesWritten += key.getSize(); + + if (opts.delete) { + writer.append(key, new Value(new byte[0])); + } else { + byte value[]; + if (opts.random != null) { + value = genRandomValue(random, randomValue, opts.random.intValue(), rowid + opts.startRow, j); + } else { + value = bytevals[j % bytevals.length]; + } + + Value v = new Value(value); + writer.append(key, v); + bytesWritten += v.getSize(); + } + + } else { + Key key = new Key(row, colf, colq, labBA); + bytesWritten += key.getSize(); + + if (opts.delete) { + if (opts.timestamp >= 0) + m.putDelete(colf, colq, opts.columnVisibility, opts.timestamp); + else + m.putDelete(colf, colq, opts.columnVisibility); + } else { + byte value[]; + if (opts.random != null) { + value = genRandomValue(random, randomValue, opts.random.intValue(), rowid + opts.startRow, j); + } else { + value = bytevals[j % bytevals.length]; + } + bytesWritten += value.length; + + if (opts.timestamp >= 0) { + m.put(colf, colq, opts.columnVisibility, opts.timestamp, new Value(value, true)); + } else { + m.put(colf, colq, opts.columnVisibility, new Value(value, true)); + + } + } + } + + } + if (bw != null) + bw.addMutation(m); + + } + + if (writer != null) { + writer.close(); + } else if (bw != null) { + try { + bw.close(); + } catch (MutationsRejectedException e) { + if (e.getSecurityErrorCodes().size() > 0) { + for (Entry<TabletId,Set<SecurityErrorCode>> entry : e.getSecurityErrorCodes().entrySet()) { + System.err.println("ERROR : Not authorized to write to : " + entry.getKey() + " due to " + entry.getValue()); + } + } + + if (e.getConstraintViolationSummaries().size() > 0) { + for (ConstraintViolationSummary cvs : e.getConstraintViolationSummaries()) { + System.err.println("ERROR : Constraint violates : " + cvs); + } + } + + throw e; + } + } + + stopTime = System.currentTimeMillis(); + + int totalValues = opts.rows * opts.cols; + double elapsed = (stopTime - startTime) / 1000.0; + + System.out.printf("%,12d records written | %,8d records/sec | %,12d bytes written | %,8d bytes/sec | %6.3f secs %n", totalValues, + (int) (totalValues / elapsed), bytesWritten, (int) (bytesWritten / elapsed), elapsed); + } + + public static void ingest(Connector c, Opts opts, BatchWriterOpts batchWriterOpts) throws MutationsRejectedException, IOException, AccumuloException, + AccumuloSecurityException, TableNotFoundException, TableExistsException { + ingest(c, FileSystem.get(CachedConfiguration.getInstance()), opts, batchWriterOpts); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/ingest/VerifyIngest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/ingest/VerifyIngest.java b/core/src/main/java/org/apache/accumulo/testing/core/ingest/VerifyIngest.java new file mode 100644 index 0000000..4f8862a --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/ingest/VerifyIngest.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.ingest; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Random; + +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.trace.DistributedTrace; +import org.apache.accumulo.core.trace.Trace; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.Parameter; + +public class VerifyIngest { + + private static final Logger log = LoggerFactory.getLogger(VerifyIngest.class); + + public static int getRow(Key k) { + return Integer.parseInt(k.getRow().toString().split("_")[1]); + } + + public static int getCol(Key k) { + return Integer.parseInt(k.getColumnQualifier().toString().split("_")[1]); + } + + public static class Opts extends TestIngest.Opts { + @Parameter(names = "-useGet", description = "fetches values one at a time, instead of scanning") + public boolean useGet = false; + } + + public static void main(String[] args) throws Exception { + Opts opts = new Opts(); + ScannerOpts scanOpts = new ScannerOpts(); + opts.parseArgs(VerifyIngest.class.getName(), args, scanOpts); + try { + if (opts.trace) { + String name = VerifyIngest.class.getSimpleName(); + DistributedTrace.enable(); + Trace.on(name); + Trace.data("cmdLine", Arrays.asList(args).toString()); + } + + verifyIngest(opts.getConnector(), opts, scanOpts); + + } finally { + Trace.off(); + DistributedTrace.disable(); + } + } + + public static void verifyIngest(Connector connector, Opts opts, ScannerOpts scanOpts) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException { + byte[][] bytevals = TestIngest.generateValues(opts.dataSize); + + Authorizations labelAuths = new Authorizations("L1", "L2", "G1", "GROUP2"); + connector.securityOperations().changeUserAuthorizations(opts.getPrincipal(), labelAuths); + + int expectedRow = opts.startRow; + int expectedCol = 0; + int recsRead = 0; + + long bytesRead = 0; + long t1 = System.currentTimeMillis(); + + byte randomValue[] = new byte[opts.dataSize]; + Random random = new Random(); + + Key endKey = new Key(new Text("row_" + String.format("%010d", opts.rows + opts.startRow))); + + int errors = 0; + + while (expectedRow < (opts.rows + opts.startRow)) { + + if (opts.useGet) { + Text rowKey = new Text("row_" + String.format("%010d", expectedRow + opts.startRow)); + Text colf = new Text(opts.columnFamily); + Text colq = new Text("col_" + String.format("%07d", expectedCol)); + + Scanner scanner = connector.createScanner("test_ingest", labelAuths); + scanner.setBatchSize(1); + Key startKey = new Key(rowKey, colf, colq); + Range range = new Range(startKey, startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL)); + scanner.setRange(range); + + byte[] val = null; // t.get(rowKey, column); + + Iterator<Entry<Key,Value>> iter = scanner.iterator(); + + if (iter.hasNext()) { + val = iter.next().getValue().get(); + } + + byte ev[]; + if (opts.random != null) { + ev = TestIngest.genRandomValue(random, randomValue, opts.random.intValue(), expectedRow, expectedCol); + } else { + ev = bytevals[expectedCol % bytevals.length]; + } + + if (val == null) { + log.error("Did not find " + rowKey + " " + colf + " " + colq); + errors++; + } else { + recsRead++; + bytesRead += val.length; + Value value = new Value(val); + if (value.compareTo(ev) != 0) { + log.error("unexpected value (" + rowKey + " " + colf + " " + colq + " : saw " + value + " expected " + new Value(ev)); + errors++; + } + } + + expectedCol++; + if (expectedCol >= opts.cols) { + expectedCol = 0; + expectedRow++; + } + + } else { + + Key startKey = new Key(new Text("row_" + String.format("%010d", expectedRow))); + + Scanner scanner = connector.createScanner(opts.getTableName(), labelAuths); + scanner.setBatchSize(scanOpts.scanBatchSize); + scanner.setRange(new Range(startKey, endKey)); + for (int j = 0; j < opts.cols; j++) { + scanner.fetchColumn(new Text(opts.columnFamily), new Text("col_" + String.format("%07d", j))); + } + + int recsReadBefore = recsRead; + + for (Entry<Key,Value> entry : scanner) { + + recsRead++; + + bytesRead += entry.getKey().getLength(); + bytesRead += entry.getValue().getSize(); + + int rowNum = getRow(entry.getKey()); + int colNum = getCol(entry.getKey()); + + if (rowNum != expectedRow) { + log.error("rowNum != expectedRow " + rowNum + " != " + expectedRow); + errors++; + expectedRow = rowNum; + } + + if (colNum != expectedCol) { + log.error("colNum != expectedCol " + colNum + " != " + expectedCol + " rowNum : " + rowNum); + errors++; + } + + if (expectedRow >= (opts.rows + opts.startRow)) { + log.error("expectedRow (" + expectedRow + ") >= (ingestArgs.rows + ingestArgs.startRow) (" + (opts.rows + opts.startRow) + + "), get batch returned data passed end key"); + errors++; + break; + } + + byte value[]; + if (opts.random != null) { + value = TestIngest.genRandomValue(random, randomValue, opts.random.intValue(), expectedRow, colNum); + } else { + value = bytevals[colNum % bytevals.length]; + } + + if (entry.getValue().compareTo(value) != 0) { + log.error("unexpected value, rowNum : " + rowNum + " colNum : " + colNum); + log.error(" saw = " + new String(entry.getValue().get()) + " expected = " + new String(value)); + errors++; + } + + if (opts.timestamp >= 0 && entry.getKey().getTimestamp() != opts.timestamp) { + log.error("unexpected timestamp " + entry.getKey().getTimestamp() + ", rowNum : " + rowNum + " colNum : " + colNum); + errors++; + } + + expectedCol++; + if (expectedCol >= opts.cols) { + expectedCol = 0; + expectedRow++; + } + + } + + if (recsRead == recsReadBefore) { + log.warn("Scan returned nothing, breaking..."); + break; + } + + } + } + + long t2 = System.currentTimeMillis(); + + if (errors > 0) { + throw new AccumuloException("saw " + errors + " errors "); + } + + if (expectedRow != (opts.rows + opts.startRow)) { + throw new AccumuloException("Did not read expected number of rows. Saw " + (expectedRow - opts.startRow) + " expected " + opts.rows); + } else { + System.out.printf("%,12d records read | %,8d records/sec | %,12d bytes read | %,8d bytes/sec | %6.3f secs %n", recsRead, + (int) ((recsRead) / ((t2 - t1) / 1000.0)), bytesRead, (int) (bytesRead / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/mapreduce/RowHash.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/mapreduce/RowHash.java b/core/src/main/java/org/apache/accumulo/testing/core/mapreduce/RowHash.java new file mode 100644 index 0000000..cb91e19 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/mapreduce/RowHash.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.mapreduce; + +import java.io.IOException; +import java.util.Base64; +import java.util.Collections; + +import org.apache.accumulo.core.cli.MapReduceClientOnRequiredTable; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.beust.jcommander.Parameter; + +public class RowHash extends Configured implements Tool { + /** + * The Mapper class that given a row number, will generate the appropriate output line. + */ + public static class HashDataMapper extends Mapper<Key,Value,Text,Mutation> { + @Override + public void map(Key row, Value data, Context context) throws IOException, InterruptedException { + Mutation m = new Mutation(row.getRow()); + m.put(new Text("cf-HASHTYPE"), new Text("cq-MD5BASE64"), new Value(Base64.getEncoder().encode(MD5Hash.digest(data.toString()).getDigest()))); + context.write(null, m); + context.progress(); + } + + @Override + public void setup(Context job) {} + } + + private static class Opts extends MapReduceClientOnRequiredTable { + @Parameter(names = "--column", required = true) + String column; + } + + @Override + public int run(String[] args) throws Exception { + Job job = Job.getInstance(getConf()); + job.setJobName(this.getClass().getName()); + job.setJarByClass(this.getClass()); + Opts opts = new Opts(); + opts.parseArgs(RowHash.class.getName(), args); + job.setInputFormatClass(AccumuloInputFormat.class); + opts.setAccumuloConfigs(job); + + String col = opts.column; + int idx = col.indexOf(":"); + Text cf = new Text(idx < 0 ? col : col.substring(0, idx)); + Text cq = idx < 0 ? null : new Text(col.substring(idx + 1)); + if (cf.getLength() > 0) + AccumuloInputFormat.fetchColumns(job, Collections.singleton(new Pair<>(cf, cq))); + + job.setMapperClass(HashDataMapper.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Mutation.class); + + job.setNumReduceTasks(0); + + job.setOutputFormatClass(AccumuloOutputFormat.class); + + job.waitForCompletion(true); + return job.isSuccessful() ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new RowHash(), args); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/mapreduce/TeraSortIngest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/mapreduce/TeraSortIngest.java b/core/src/main/java/org/apache/accumulo/testing/core/mapreduce/TeraSortIngest.java new file mode 100644 index 0000000..6e18000 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/mapreduce/TeraSortIngest.java @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.testing.core.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.accumulo.core.cli.MapReduceClientOnRequiredTable; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.beust.jcommander.Parameter; + +/** + * Generate the *almost* official terasort input data set. (See below) The user specifies the number of rows and the output directory and this class runs a + * map/reduce program to generate the data. The format of the data is: + * <ul> + * <li>(10 bytes key) (10 bytes rowid) (78 bytes filler) \r \n + * <li>The keys are random characters from the set ' ' .. '~'. + * <li>The rowid is the right justified row id as a int. + * <li>The filler consists of 7 runs of 10 characters from 'A' to 'Z'. + * </ul> + * + * This TeraSort is slightly modified to allow for variable length key sizes and value sizes. The row length isn't variable. To generate a terabyte of data in + * the same way TeraSort does use 10000000000 rows and 10/10 byte key length and 78/78 byte value length. Along with the 10 byte row id and \r\n this gives you + * 100 byte row * 10000000000 rows = 1tb. Min/Max ranges for key and value parameters are inclusive/inclusive respectively. + * + * + */ +public class TeraSortIngest extends Configured implements Tool { + /** + * An input format that assigns ranges of longs to each mapper. + */ + static class RangeInputFormat extends InputFormat<LongWritable,NullWritable> { + /** + * An input split consisting of a range on numbers. + */ + static class RangeInputSplit extends InputSplit implements Writable { + long firstRow; + long rowCount; + + public RangeInputSplit() {} + + public RangeInputSplit(long offset, long length) { + firstRow = offset; + rowCount = length; + } + + @Override + public long getLength() throws IOException { + return 0; + } + + @Override + public String[] getLocations() throws IOException { + return new String[] {}; + } + + @Override + public void readFields(DataInput in) throws IOException { + firstRow = WritableUtils.readVLong(in); + rowCount = WritableUtils.readVLong(in); + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeVLong(out, firstRow); + WritableUtils.writeVLong(out, rowCount); + } + } + + /** + * A record reader that will generate a range of numbers. + */ + static class RangeRecordReader extends RecordReader<LongWritable,NullWritable> { + long startRow; + long finishedRows; + long totalRows; + + public RangeRecordReader(RangeInputSplit split) { + startRow = split.firstRow; + finishedRows = 0; + totalRows = split.rowCount; + } + + @Override + public void close() throws IOException {} + + @Override + public float getProgress() throws IOException { + return finishedRows / (float) totalRows; + } + + @Override + public LongWritable getCurrentKey() throws IOException, InterruptedException { + return new LongWritable(startRow + finishedRows); + } + + @Override + public NullWritable getCurrentValue() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {} + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (finishedRows < totalRows) { + ++finishedRows; + return true; + } + return false; + } + } + + @Override + public RecordReader<LongWritable,NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { + // reporter.setStatus("Creating record reader"); + return new RangeRecordReader((RangeInputSplit) split); + } + + /** + * Create the desired number of splits, dividing the number of rows between the mappers. + */ + @Override + public List<InputSplit> getSplits(JobContext job) { + long totalRows = job.getConfiguration().getLong(NUMROWS, 0); + int numSplits = job.getConfiguration().getInt(NUMSPLITS, 1); + long rowsPerSplit = totalRows / numSplits; + System.out.println("Generating " + totalRows + " using " + numSplits + " maps with step of " + rowsPerSplit); + ArrayList<InputSplit> splits = new ArrayList<>(numSplits); + long currentRow = 0; + for (int split = 0; split < numSplits - 1; ++split) { + splits.add(new RangeInputSplit(currentRow, rowsPerSplit)); + currentRow += rowsPerSplit; + } + splits.add(new RangeInputSplit(currentRow, totalRows - currentRow)); + System.out.println("Done Generating."); + return splits; + } + + } + + private static String NUMSPLITS = "terasort.overridesplits"; + private static String NUMROWS = "terasort.numrows"; + + static class RandomGenerator { + private long seed = 0; + private static final long mask32 = (1l << 32) - 1; + /** + * The number of iterations separating the precomputed seeds. + */ + private static final int seedSkip = 128 * 1024 * 1024; + /** + * The precomputed seed values after every seedSkip iterations. There should be enough values so that a 2**32 iterations are covered. + */ + private static final long[] seeds = new long[] {0L, 4160749568L, 4026531840L, 3892314112L, 3758096384L, 3623878656L, 3489660928L, 3355443200L, 3221225472L, + 3087007744L, 2952790016L, 2818572288L, 2684354560L, 2550136832L, 2415919104L, 2281701376L, 2147483648L, 2013265920L, 1879048192L, 1744830464L, + 1610612736L, 1476395008L, 1342177280L, 1207959552L, 1073741824L, 939524096L, 805306368L, 671088640L, 536870912L, 402653184L, 268435456L, 134217728L,}; + + /** + * Start the random number generator on the given iteration. + * + * @param initalIteration + * the iteration number to start on + */ + RandomGenerator(long initalIteration) { + int baseIndex = (int) ((initalIteration & mask32) / seedSkip); + seed = seeds[baseIndex]; + for (int i = 0; i < initalIteration % seedSkip; ++i) { + next(); + } + } + + RandomGenerator() { + this(0); + } + + long next() { + seed = (seed * 3141592621l + 663896637) & mask32; + return seed; + } + } + + /** + * The Mapper class that given a row number, will generate the appropriate output line. + */ + public static class SortGenMapper extends Mapper<LongWritable,NullWritable,Text,Mutation> { + private Text tableName = null; + private int minkeylength = 0; + private int maxkeylength = 0; + private int minvaluelength = 0; + private int maxvaluelength = 0; + + private Text key = new Text(); + private Text value = new Text(); + private RandomGenerator rand; + private byte[] keyBytes; // = new byte[12]; + private byte[] spaces = " ".getBytes(); + private byte[][] filler = new byte[26][]; + { + for (int i = 0; i < 26; ++i) { + filler[i] = new byte[10]; + for (int j = 0; j < 10; ++j) { + filler[i][j] = (byte) ('A' + i); + } + } + } + + /** + * Add a random key to the text + */ + private Random random = new Random(); + + private void addKey() { + int range = random.nextInt(maxkeylength - minkeylength + 1); + int keylen = range + minkeylength; + int keyceil = keylen + (4 - (keylen % 4)); + keyBytes = new byte[keyceil]; + + long temp = 0; + for (int i = 0; i < keyceil / 4; i++) { + temp = rand.next() / 52; + keyBytes[3 + 4 * i] = (byte) (' ' + (temp % 95)); + temp /= 95; + keyBytes[2 + 4 * i] = (byte) (' ' + (temp % 95)); + temp /= 95; + keyBytes[1 + 4 * i] = (byte) (' ' + (temp % 95)); + temp /= 95; + keyBytes[4 * i] = (byte) (' ' + (temp % 95)); + } + key.set(keyBytes, 0, keylen); + } + + /** + * Add the rowid to the row. + */ + private Text getRowIdString(long rowId) { + Text paddedRowIdString = new Text(); + byte[] rowid = Integer.toString((int) rowId).getBytes(); + int padSpace = 10 - rowid.length; + if (padSpace > 0) { + paddedRowIdString.append(spaces, 0, 10 - rowid.length); + } + paddedRowIdString.append(rowid, 0, Math.min(rowid.length, 10)); + return paddedRowIdString; + } + + /** + * Add the required filler bytes. Each row consists of 7 blocks of 10 characters and 1 block of 8 characters. + * + * @param rowId + * the current row number + */ + private void addFiller(long rowId) { + int base = (int) ((rowId * 8) % 26); + + // Get Random var + Random random = new Random(rand.seed); + + int range = random.nextInt(maxvaluelength - minvaluelength + 1); + int valuelen = range + minvaluelength; + + while (valuelen > 10) { + value.append(filler[(base + valuelen) % 26], 0, 10); + valuelen -= 10; + } + + if (valuelen > 0) + value.append(filler[(base + valuelen) % 26], 0, valuelen); + } + + @Override + public void map(LongWritable row, NullWritable ignored, Context context) throws IOException, InterruptedException { + context.setStatus("Entering"); + long rowId = row.get(); + if (rand == null) { + // we use 3 random numbers per a row + rand = new RandomGenerator(rowId * 3); + } + addKey(); + value.clear(); + // addRowId(rowId); + addFiller(rowId); + + // New + Mutation m = new Mutation(key); + m.put(new Text("c"), // column family + getRowIdString(rowId), // column qual + new Value(value.toString().getBytes())); // data + + context.setStatus("About to add to accumulo"); + context.write(tableName, m); + context.setStatus("Added to accumulo " + key.toString()); + } + + @Override + public void setup(Context job) { + minkeylength = job.getConfiguration().getInt("cloudgen.minkeylength", 0); + maxkeylength = job.getConfiguration().getInt("cloudgen.maxkeylength", 0); + minvaluelength = job.getConfiguration().getInt("cloudgen.minvaluelength", 0); + maxvaluelength = job.getConfiguration().getInt("cloudgen.maxvaluelength", 0); + tableName = new Text(job.getConfiguration().get("cloudgen.tablename")); + } + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new TeraSortIngest(), args); + } + + static class Opts extends MapReduceClientOnRequiredTable { + @Parameter(names = "--count", description = "number of rows to ingest", required = true) + long numRows; + @Parameter(names = {"-nk", "--minKeySize"}, description = "miniumum key size", required = true) + int minKeyLength; + @Parameter(names = {"-xk", "--maxKeySize"}, description = "maximum key size", required = true) + int maxKeyLength; + @Parameter(names = {"-nv", "--minValueSize"}, description = "minimum key size", required = true) + int minValueLength; + @Parameter(names = {"-xv", "--maxValueSize"}, description = "maximum key size", required = true) + int maxValueLength; + @Parameter(names = "--splits", description = "number of splits to create in the table") + int splits = 0; + } + + @Override + public int run(String[] args) throws Exception { + Job job = Job.getInstance(getConf()); + job.setJobName("TeraSortCloud"); + job.setJarByClass(this.getClass()); + Opts opts = new Opts(); + opts.parseArgs(TeraSortIngest.class.getName(), args); + + job.setInputFormatClass(RangeInputFormat.class); + job.setMapperClass(SortGenMapper.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Mutation.class); + + job.setNumReduceTasks(0); + + job.setOutputFormatClass(AccumuloOutputFormat.class); + opts.setAccumuloConfigs(job); + BatchWriterConfig bwConfig = new BatchWriterConfig().setMaxMemory(10L * 1000 * 1000); + AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig); + + Configuration conf = job.getConfiguration(); + conf.setLong(NUMROWS, opts.numRows); + conf.setInt("cloudgen.minkeylength", opts.minKeyLength); + conf.setInt("cloudgen.maxkeylength", opts.maxKeyLength); + conf.setInt("cloudgen.minvaluelength", opts.minValueLength); + conf.setInt("cloudgen.maxvaluelength", opts.maxValueLength); + conf.set("cloudgen.tablename", opts.getTableName()); + + if (args.length > 10) + conf.setInt(NUMSPLITS, opts.splits); + + job.waitForCompletion(true); + return job.isSuccessful() ? 0 : 1; + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/merkle/MerkleTree.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/merkle/MerkleTree.java b/core/src/main/java/org/apache/accumulo/testing/core/merkle/MerkleTree.java new file mode 100644 index 0000000..003cd5d --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/merkle/MerkleTree.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.merkle; + +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.accumulo.core.util.Pair; + +import com.google.common.collect.Iterables; + +/** + * Simple implementation of a Merkle tree + */ +public class MerkleTree { + protected List<MerkleTreeNode> leaves; + protected String digestAlgorithm; + + public MerkleTree(List<MerkleTreeNode> leaves, String digestAlgorithm) { + this.leaves = leaves; + this.digestAlgorithm = digestAlgorithm; + } + + public MerkleTreeNode getRootNode() throws NoSuchAlgorithmException { + ArrayList<MerkleTreeNode> buffer = new ArrayList<>(leaves.size()); + buffer.addAll(leaves); + + while (buffer.size() > 1) { + // Find two nodes that we want to roll up + Pair<Integer,Integer> pairToJoin = findNextPair(buffer); + + // Make a parent node from them + MerkleTreeNode parent = new MerkleTreeNode(Arrays.asList(buffer.get(pairToJoin.getFirst()), buffer.get(pairToJoin.getSecond())), digestAlgorithm); + + // Insert it back into the "tree" at the position of the first child + buffer.set(pairToJoin.getFirst(), parent); + + // Remove the second child completely + buffer.remove(pairToJoin.getSecond().intValue()); + + // "recurse" + } + + return Iterables.getOnlyElement(buffer); + } + + protected Pair<Integer,Integer> findNextPair(List<MerkleTreeNode> nodes) { + int i = 0, j = 1; + while (i < nodes.size() && j < nodes.size()) { + MerkleTreeNode left = nodes.get(i), right = nodes.get(j); + + // At the same level + if (left.getLevel() == right.getLevel()) { + return new Pair<>(i, j); + } + + // Peek to see if we have another element + if (j + 1 < nodes.size()) { + // If we do, try to match those + i++; + j++; + } else { + // Otherwise, the last two elements must be paired + return new Pair<>(i, j); + } + } + + if (2 < nodes.size()) { + throw new IllegalStateException("Should not have exited loop without pairing two elements when we have at least 3 nodes"); + } else if (2 == nodes.size()) { + return new Pair<>(0, 1); + } else { + throw new IllegalStateException("Must have at least two nodes to pair"); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/merkle/MerkleTreeNode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/merkle/MerkleTreeNode.java b/core/src/main/java/org/apache/accumulo/testing/core/merkle/MerkleTreeNode.java new file mode 100644 index 0000000..168b6e1 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/merkle/MerkleTreeNode.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.merkle; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encapsulates the level (height) within the tree, the ranges that it covers, and the new hash + */ +public class MerkleTreeNode { + private static final Logger log = LoggerFactory.getLogger(MerkleTreeNode.class); + + private Range range; + private int level; + private List<Range> children; + private byte[] hash; + + public MerkleTreeNode(Range range, int level, List<Range> children, byte[] hash) { + this.range = range; + this.level = level; + this.children = children; + this.hash = hash; + } + + public MerkleTreeNode(Key k, Value v) { + range = RangeSerialization.toRange(k); + level = 0; + children = Collections.emptyList(); + hash = v.get(); + } + + public MerkleTreeNode(List<MerkleTreeNode> children, String digestAlgorithm) throws NoSuchAlgorithmException { + level = 0; + this.children = new ArrayList<>(children.size()); + MessageDigest digest = MessageDigest.getInstance(digestAlgorithm); + + Range childrenRange = null; + for (MerkleTreeNode child : children) { + this.children.add(child.getRange()); + level = Math.max(child.getLevel(), level); + digest.update(child.getHash()); + + if (null == childrenRange) { + childrenRange = child.getRange(); + } else { + List<Range> overlappingRanges = Range.mergeOverlapping(Arrays.asList(childrenRange, child.getRange())); + if (1 != overlappingRanges.size()) { + log.error("Tried to merge non-contiguous ranges: {} {}", childrenRange, child.getRange()); + throw new IllegalArgumentException("Ranges must be contiguous: " + childrenRange + ", " + child.getRange()); + } + + childrenRange = overlappingRanges.get(0); + } + } + + // Our actual level is one more than the highest level of our children + level++; + + // Roll the hash up the tree + hash = digest.digest(); + + // Set the range to be the merged result of the children + range = childrenRange; + } + + public Range getRange() { + return range; + } + + public int getLevel() { + return level; + } + + public List<Range> getChildren() { + return children; + } + + public byte[] getHash() { + return hash; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(32); + sb.append("range=").append(range).append(" level=").append(level).append(" hash=").append(Hex.encodeHexString(hash)).append(" children=").append(children); + return sb.toString(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof MerkleTreeNode) { + MerkleTreeNode other = (MerkleTreeNode) o; + return range.equals(other.getRange()) && level == other.getLevel() && children.equals(other.getChildren()) && Arrays.equals(hash, other.getHash()); + } + + return false; + } + + @Override + public int hashCode() { + HashCodeBuilder hcb = new HashCodeBuilder(1395, 39532); + return hcb.append(range).append(level).append(children).append(hash).toHashCode(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/merkle/RangeSerialization.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/merkle/RangeSerialization.java b/core/src/main/java/org/apache/accumulo/testing/core/merkle/RangeSerialization.java new file mode 100644 index 0000000..c301d49 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/merkle/RangeSerialization.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.merkle; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.Text; + +/** + * + */ +public class RangeSerialization { + private static final Text EMPTY = new Text(new byte[0]); + + public static Range toRange(Key key) { + Text holder = new Text(); + key.getRow(holder); + Key startKey; + if (0 == holder.getLength()) { + startKey = null; + } else { + startKey = new Key(holder); + } + + key.getColumnQualifier(holder); + Key endKey; + if (0 == holder.getLength()) { + endKey = null; + } else { + endKey = new Key(holder); + } + + // Don't be inclusive for no bounds on a Range + return new Range(startKey, startKey != null, endKey, endKey != null); + } + + public static Key toKey(Range range) { + Text row = getRow(range); + return new Key(row, EMPTY, getColumnQualifier(range)); + } + + public static Mutation toMutation(Range range, Value v) { + Text row = getRow(range); + Mutation m = new Mutation(row); + m.put(EMPTY, getColumnQualifier(range), v); + return m; + } + + public static Text getRow(Range range) { + return range.isInfiniteStartKey() ? EMPTY : range.getStartKey().getRow(); + } + + public static Text getColumnQualifier(Range range) { + return range.isInfiniteStopKey() ? EMPTY : range.getEndKey().getRow(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/CompareTables.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/CompareTables.java b/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/CompareTables.java new file mode 100644 index 0000000..fd6251f --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/CompareTables.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.merkle.cli; + +import java.io.FileNotFoundException; +import java.security.NoSuchAlgorithmException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ClientOpts; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Range; +import org.apache.commons.codec.binary.Hex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.Parameter; + +/** + * Accepts a set of tables, computes the hashes for each, and prints the top-level hash for each table. + * <p> + * Will automatically create output tables for intermediate hashes instead of requiring their existence. This will raise an exception when the table we want to + * use already exists. + */ +public class CompareTables { + private static final Logger log = LoggerFactory.getLogger(CompareTables.class); + + public static class CompareTablesOpts extends ClientOpts { + @Parameter(names = {"--tables"}, description = "Tables to compare", variableArity = true) + public List<String> tables; + + @Parameter(names = {"-nt", "--numThreads"}, required = false, description = "number of concurrent threads calculating digests") + private int numThreads = 4; + + @Parameter(names = {"-hash", "--hash"}, required = true, description = "type of hash to use") + private String hashName; + + @Parameter(names = {"-iter", "--iterator"}, required = false, description = "Should pushdown digest to iterators") + private boolean iteratorPushdown = false; + + @Parameter(names = {"-s", "--splits"}, required = false, description = "File of splits to use for merkle tree") + private String splitsFile = null; + + public List<String> getTables() { + return this.tables; + } + + public void setTables(List<String> tables) { + this.tables = tables; + } + + public int getNumThreads() { + return numThreads; + } + + public void setNumThreads(int numThreads) { + this.numThreads = numThreads; + } + + public String getHashName() { + return hashName; + } + + public void setHashName(String hashName) { + this.hashName = hashName; + } + + public boolean isIteratorPushdown() { + return iteratorPushdown; + } + + public void setIteratorPushdown(boolean iteratorPushdown) { + this.iteratorPushdown = iteratorPushdown; + } + + public String getSplitsFile() { + return splitsFile; + } + + public void setSplitsFile(String splitsFile) { + this.splitsFile = splitsFile; + } + } + + private CompareTablesOpts opts; + + protected CompareTables() {} + + public CompareTables(CompareTablesOpts opts) { + this.opts = opts; + } + + public Map<String,String> computeAllHashes() throws AccumuloException, AccumuloSecurityException, TableExistsException, NoSuchAlgorithmException, + TableNotFoundException, FileNotFoundException { + final Connector conn = opts.getConnector(); + final Map<String,String> hashesByTable = new HashMap<>(); + + for (String table : opts.getTables()) { + final String outputTableName = table + "_merkle"; + + if (conn.tableOperations().exists(outputTableName)) { + throw new IllegalArgumentException("Expected output table name to not yet exist: " + outputTableName); + } + + conn.tableOperations().create(outputTableName); + + GenerateHashes genHashes = new GenerateHashes(); + Collection<Range> ranges = genHashes.getRanges(opts.getConnector(), table, opts.getSplitsFile()); + + try { + genHashes.run(opts.getConnector(), table, table + "_merkle", opts.getHashName(), opts.getNumThreads(), opts.isIteratorPushdown(), ranges); + } catch (Exception e) { + log.error("Error generating hashes for {}", table, e); + throw new RuntimeException(e); + } + + ComputeRootHash computeRootHash = new ComputeRootHash(); + String hash = Hex.encodeHexString(computeRootHash.getHash(conn, outputTableName, opts.getHashName())); + + hashesByTable.put(table, hash); + } + + return hashesByTable; + } + + public static void main(String[] args) throws Exception { + CompareTablesOpts opts = new CompareTablesOpts(); + BatchWriterOpts bwOpts = new BatchWriterOpts(); + opts.parseArgs("CompareTables", args, bwOpts); + + if (opts.isIteratorPushdown() && null != opts.getSplitsFile()) { + throw new IllegalArgumentException("Cannot use iterator pushdown with anything other than table split points"); + } + + CompareTables compareTables = new CompareTables(opts); + Map<String,String> tableToHashes = compareTables.computeAllHashes(); + + boolean hashesEqual = true; + String previousHash = null; + for (Entry<String,String> entry : tableToHashes.entrySet()) { + // Set the previous hash if we dont' have one + if (null == previousHash) { + previousHash = entry.getValue(); + } else if (hashesEqual) { + // If the hashes are still equal, check that the new hash is also equal + hashesEqual = previousHash.equals(entry.getValue()); + } + + System.out.println(entry.getKey() + " " + entry.getValue()); + } + + System.exit(hashesEqual ? 0 : 1); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/ComputeRootHash.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/ComputeRootHash.java b/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/ComputeRootHash.java new file mode 100644 index 0000000..4511446 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/ComputeRootHash.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.merkle.cli; + +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.accumulo.core.cli.ClientOnRequiredTable; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.test.replication.merkle.MerkleTree; +import org.apache.accumulo.test.replication.merkle.MerkleTreeNode; +import org.apache.accumulo.test.replication.merkle.RangeSerialization; +import org.apache.commons.codec.binary.Hex; + +import com.beust.jcommander.Parameter; + +/** + * Given a table created by {@link GenerateHashes} which contains the leaves of a Merkle tree, compute the root node of the Merkle tree which can be quickly + * compared to the root node of another Merkle tree to ascertain equality. + */ +public class ComputeRootHash { + + public static class ComputeRootHashOpts extends ClientOnRequiredTable { + @Parameter(names = {"-hash", "--hash"}, required = true, description = "type of hash to use") + private String hashName; + + public String getHashName() { + return hashName; + } + + public void setHashName(String hashName) { + this.hashName = hashName; + } + } + + public byte[] getHash(ComputeRootHashOpts opts) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, NoSuchAlgorithmException { + Connector conn = opts.getConnector(); + String table = opts.getTableName(); + + return getHash(conn, table, opts.getHashName()); + } + + public byte[] getHash(Connector conn, String table, String hashName) throws TableNotFoundException, NoSuchAlgorithmException { + List<MerkleTreeNode> leaves = getLeaves(conn, table); + + MerkleTree tree = new MerkleTree(leaves, hashName); + + return tree.getRootNode().getHash(); + } + + protected ArrayList<MerkleTreeNode> getLeaves(Connector conn, String tableName) throws TableNotFoundException { + // TODO make this a bit more resilient to very large merkle trees by lazily reading more data from the table when necessary + final Scanner s = conn.createScanner(tableName, Authorizations.EMPTY); + final ArrayList<MerkleTreeNode> leaves = new ArrayList<>(); + + for (Entry<Key,Value> entry : s) { + Range range = RangeSerialization.toRange(entry.getKey()); + byte[] hash = entry.getValue().get(); + + leaves.add(new MerkleTreeNode(range, 0, Collections.<Range> emptyList(), hash)); + } + + return leaves; + } + + public static void main(String[] args) throws Exception { + ComputeRootHashOpts opts = new ComputeRootHashOpts(); + opts.parseArgs("ComputeRootHash", args); + + ComputeRootHash computeRootHash = new ComputeRootHash(); + byte[] rootHash = computeRootHash.getHash(opts); + + System.out.println(Hex.encodeHexString(rootHash)); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/GenerateHashes.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/GenerateHashes.java b/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/GenerateHashes.java new file mode 100644 index 0000000..35bf684 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/GenerateHashes.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.merkle.cli; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map.Entry; +import java.util.TreeSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ClientOnRequiredTable; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.test.replication.merkle.RangeSerialization; +import org.apache.accumulo.test.replication.merkle.skvi.DigestIterator; +import org.apache.commons.codec.binary.Hex; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.Parameter; +import com.google.common.collect.Iterables; + +/** + * Read from a table, compute a Merkle tree and output it to a table. Each key-value pair in the destination table is a leaf node of the Merkle tree. + */ +public class GenerateHashes { + private static final Logger log = LoggerFactory.getLogger(GenerateHashes.class); + + public static class GenerateHashesOpts extends ClientOnRequiredTable { + @Parameter(names = {"-hash", "--hash"}, required = true, description = "type of hash to use") + private String hashName; + + @Parameter(names = {"-o", "--output"}, required = true, description = "output table name, expected to exist and be writable") + private String outputTableName; + + @Parameter(names = {"-nt", "--numThreads"}, required = false, description = "number of concurrent threads calculating digests") + private int numThreads = 4; + + @Parameter(names = {"-iter", "--iterator"}, required = false, description = "Should we push down logic with an iterator") + private boolean iteratorPushdown = false; + + @Parameter(names = {"-s", "--splits"}, required = false, description = "File of splits to use for merkle tree") + private String splitsFile = null; + + public String getHashName() { + return hashName; + } + + public void setHashName(String hashName) { + this.hashName = hashName; + } + + public String getOutputTableName() { + return outputTableName; + } + + public void setOutputTableName(String outputTableName) { + this.outputTableName = outputTableName; + } + + public int getNumThreads() { + return numThreads; + } + + public void setNumThreads(int numThreads) { + this.numThreads = numThreads; + } + + public boolean isIteratorPushdown() { + return iteratorPushdown; + } + + public void setIteratorPushdown(boolean iteratorPushdown) { + this.iteratorPushdown = iteratorPushdown; + } + + public String getSplitsFile() { + return splitsFile; + } + + public void setSplitsFile(String splitsFile) { + this.splitsFile = splitsFile; + } + } + + public Collection<Range> getRanges(Connector conn, String tableName, String splitsFile) throws TableNotFoundException, AccumuloSecurityException, + AccumuloException, FileNotFoundException { + if (null == splitsFile) { + log.info("Using table split points"); + Collection<Text> endRows = conn.tableOperations().listSplits(tableName); + return endRowsToRanges(endRows); + } else { + log.info("Using provided split points"); + ArrayList<Text> splits = new ArrayList<>(); + + String line; + java.util.Scanner file = new java.util.Scanner(new File(splitsFile), UTF_8.name()); + try { + while (file.hasNextLine()) { + line = file.nextLine(); + if (!line.isEmpty()) { + splits.add(new Text(line)); + } + } + } finally { + file.close(); + } + + Collections.sort(splits); + return endRowsToRanges(splits); + } + } + + public void run(GenerateHashesOpts opts) throws TableNotFoundException, AccumuloSecurityException, AccumuloException, NoSuchAlgorithmException, + FileNotFoundException { + Collection<Range> ranges = getRanges(opts.getConnector(), opts.getTableName(), opts.getSplitsFile()); + + run(opts.getConnector(), opts.getTableName(), opts.getOutputTableName(), opts.getHashName(), opts.getNumThreads(), opts.isIteratorPushdown(), ranges); + } + + public void run(final Connector conn, final String inputTableName, final String outputTableName, final String digestName, int numThreads, + final boolean iteratorPushdown, final Collection<Range> ranges) throws TableNotFoundException, AccumuloSecurityException, AccumuloException, + NoSuchAlgorithmException { + if (!conn.tableOperations().exists(outputTableName)) { + throw new IllegalArgumentException(outputTableName + " does not exist, please create it"); + } + + // Get some parallelism + ExecutorService svc = Executors.newFixedThreadPool(numThreads); + final BatchWriter bw = conn.createBatchWriter(outputTableName, new BatchWriterConfig()); + + try { + for (final Range range : ranges) { + final MessageDigest digest = getDigestAlgorithm(digestName); + + svc.execute(new Runnable() { + + @Override + public void run() { + Scanner s; + try { + s = conn.createScanner(inputTableName, Authorizations.EMPTY); + } catch (Exception e) { + log.error("Could not get scanner for " + inputTableName, e); + throw new RuntimeException(e); + } + + s.setRange(range); + + Value v = null; + Mutation m = null; + if (iteratorPushdown) { + IteratorSetting cfg = new IteratorSetting(50, DigestIterator.class); + cfg.addOption(DigestIterator.HASH_NAME_KEY, digestName); + s.addScanIterator(cfg); + + // The scanner should only ever return us one Key-Value, otherwise this approach won't work + Entry<Key,Value> entry = Iterables.getOnlyElement(s); + + v = entry.getValue(); + m = RangeSerialization.toMutation(range, v); + } else { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (Entry<Key,Value> entry : s) { + DataOutputStream out = new DataOutputStream(baos); + try { + entry.getKey().write(out); + entry.getValue().write(out); + } catch (Exception e) { + log.error("Error writing {}", entry, e); + throw new RuntimeException(e); + } + + digest.update(baos.toByteArray()); + baos.reset(); + } + + v = new Value(digest.digest()); + m = RangeSerialization.toMutation(range, v); + } + + // Log some progress + log.info("{} computed digest for {} of {}", Thread.currentThread().getName(), range, Hex.encodeHexString(v.get())); + + try { + bw.addMutation(m); + } catch (MutationsRejectedException e) { + log.error("Could not write mutation", e); + throw new RuntimeException(e); + } + } + }); + } + + svc.shutdown(); + + // Wait indefinitely for the scans to complete + while (!svc.isTerminated()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + log.error("Interrupted while waiting for executor service to gracefully complete. Exiting now"); + svc.shutdownNow(); + return; + } + } + } finally { + // We can only safely close this when we're exiting or we've completely all tasks + bw.close(); + } + } + + public TreeSet<Range> endRowsToRanges(Collection<Text> endRows) { + ArrayList<Text> sortedEndRows = new ArrayList<>(endRows); + Collections.sort(sortedEndRows); + + Text prevEndRow = null; + TreeSet<Range> ranges = new TreeSet<>(); + for (Text endRow : sortedEndRows) { + if (null == prevEndRow) { + ranges.add(new Range(null, false, endRow, true)); + } else { + ranges.add(new Range(prevEndRow, false, endRow, true)); + } + prevEndRow = endRow; + } + + ranges.add(new Range(prevEndRow, false, null, false)); + + return ranges; + } + + protected MessageDigest getDigestAlgorithm(String digestName) throws NoSuchAlgorithmException { + return MessageDigest.getInstance(digestName); + } + + public static void main(String[] args) throws Exception { + GenerateHashesOpts opts = new GenerateHashesOpts(); + BatchWriterOpts bwOpts = new BatchWriterOpts(); + opts.parseArgs(GenerateHashes.class.getName(), args, bwOpts); + + if (opts.isIteratorPushdown() && null != opts.getSplitsFile()) { + throw new IllegalArgumentException("Cannot use iterator pushdown with anything other than table split points"); + } + + GenerateHashes generate = new GenerateHashes(); + generate.run(opts); + } +}