ACCUMULO-4510 Removed continuous ingest code * Code was moved to accumulo-testing repo * Kept copy of ContinuousIngest class as its used by other tests
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/39830635 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/39830635 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/39830635 Branch: refs/heads/master Commit: 39830635f3711c30d61744cc0804cace03d3e028 Parents: 6d8f411 Author: Mike Walch <mwa...@apache.org> Authored: Wed Jan 11 13:18:18 2017 -0500 Committer: Mike Walch <mwa...@apache.org> Committed: Mon Jan 23 17:10:21 2017 -0500 ---------------------------------------------------------------------- TESTING.md | 5 +- assemble/src/main/assemblies/component.xml | 3 - test/pom.xml | 1 - test/src/main/findbugs/exclude-filter.xml | 2 - .../test/continuous/ContinuousBatchWalker.java | 175 ---------- .../test/continuous/ContinuousIngest.java | 252 ------------- .../test/continuous/ContinuousMoru.java | 180 ---------- .../test/continuous/ContinuousOpts.java | 80 ----- .../test/continuous/ContinuousQuery.java | 74 ---- .../test/continuous/ContinuousScanner.java | 108 ------ .../continuous/ContinuousStatsCollector.java | 206 ----------- .../test/continuous/ContinuousUtil.java | 49 --- .../test/continuous/ContinuousVerify.java | 226 ------------ .../test/continuous/ContinuousWalk.java | 240 ------------- .../accumulo/test/continuous/GenSplits.java | 87 ----- .../accumulo/test/continuous/HistData.java | 49 --- .../accumulo/test/continuous/Histogram.java | 153 -------- .../test/continuous/PrintScanTimeHistogram.java | 95 ----- .../accumulo/test/continuous/TimeBinner.java | 196 ----------- .../test/continuous/UndefinedAnalyzer.java | 350 ------------------- .../test/performance/ContinuousIngest.java | 252 +++++++++++++ .../test/performance/ContinuousOpts.java | 80 +++++ .../test/performance/RollWALPerformanceIT.java | 1 - .../accumulo/test/scalability/Ingest.java | 2 +- test/system/continuous/README.md | 103 ------ test/system/continuous/analyze-missing.pl | 127 ------- test/system/continuous/analyze-missing.sh | 23 -- .../system/continuous/batch_walkers.txt.example | 16 - .../system/continuous/continuous-env.sh.example | 131 ------- test/system/continuous/datanode-agitator.pl | 140 -------- test/system/continuous/hdfs-agitator.pl | 217 ------------ test/system/continuous/ingesters.txt.example | 17 - test/system/continuous/master-agitator.pl | 92 ----- test/system/continuous/report.pl | 120 ------- test/system/continuous/run-moru.sh | 37 -- test/system/continuous/run-verify.sh | 42 --- test/system/continuous/scanners.txt.example | 16 - test/system/continuous/start-agitator.sh | 72 ---- test/system/continuous/start-batchwalkers.sh | 42 --- test/system/continuous/start-ingest.sh | 45 --- test/system/continuous/start-scanners.sh | 41 --- test/system/continuous/start-stats.sh | 49 --- test/system/continuous/start-walkers.sh | 41 --- test/system/continuous/stop-agitator.sh | 51 --- test/system/continuous/stop-batchwalkers.sh | 33 -- test/system/continuous/stop-ingest.sh | 33 -- test/system/continuous/stop-scanners.sh | 33 -- test/system/continuous/stop-stats.sh | 33 -- test/system/continuous/stop-walkers.sh | 33 -- test/system/continuous/tserver-agitator.pl | 134 ------- test/system/continuous/walkers.txt.example | 17 - 51 files changed, 335 insertions(+), 4269 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/TESTING.md ---------------------------------------------------------------------- diff --git a/TESTING.md b/TESTING.md index 98790e0..f5c94fa 100644 --- a/TESTING.md +++ b/TESTING.md @@ -172,10 +172,9 @@ These files do exist in the build tree, but at different locations) # Manual Distributed Testing Apache Accumulo has a number of tests which are suitable for running against large clusters for hours to days at a time. -Some of these test suites (like the [Continuous Ingest][1] test) exist in the repository under `test/system` and contain their -own README files for configuration. Others (like the Random Walk test) are in the [accumulo-testing repo][2]. +Some of these test suites exist in the repository under `test/system` and contain their own README files for configuration. +Others (like the Continuous Ingest and Random Walk tests) are in the [accumulo-testing repo][2]. -[1]: test/system/continuous/README.md [2]: https://github.com/apache/accumulo-testing [3]: https://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html [4]: http://maven.apache.org/surefire/maven-surefire-plugin/ http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/assemble/src/main/assemblies/component.xml ---------------------------------------------------------------------- diff --git a/assemble/src/main/assemblies/component.xml b/assemble/src/main/assemblies/component.xml index 8151528..6d76d60 100644 --- a/assemble/src/main/assemblies/component.xml +++ b/assemble/src/main/assemblies/component.xml @@ -161,7 +161,6 @@ <excludes> <exclude>src/**</exclude> <exclude>target/**</exclude> - <exclude>**/continuous-env.sh</exclude> </excludes> </fileSet> <fileSet> @@ -179,8 +178,6 @@ <exclude>**/*.pl</exclude> <exclude>**/*.pyc</exclude> <exclude>**/*.pyo</exclude> - <exclude>**/walkers.txt</exclude> - <exclude>**/ingesters.txt</exclude> </excludes> </fileSet> <!-- Lift generated thrift proxy code into its own directory --> http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/test/pom.xml ---------------------------------------------------------------------- diff --git a/test/pom.xml b/test/pom.xml index f5fb354..500d088 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -194,7 +194,6 @@ <excludes> <exclude>compat/japi-compliance/exclude_classes.txt</exclude> <exclude>system/bench/lib/*splits</exclude> - <exclude>system/continuous/*.txt</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/test/src/main/findbugs/exclude-filter.xml ---------------------------------------------------------------------- diff --git a/test/src/main/findbugs/exclude-filter.xml b/test/src/main/findbugs/exclude-filter.xml index 3c3e596..e9acd09 100644 --- a/test/src/main/findbugs/exclude-filter.xml +++ b/test/src/main/findbugs/exclude-filter.xml @@ -18,8 +18,6 @@ <Match> <!-- ignore intentional infinite loop in test main methods --> <Or> - <Class name="org.apache.accumulo.test.continuous.ContinuousQuery" /> - <Class name="org.apache.accumulo.test.continuous.ContinuousScanner" /> <Class name="org.apache.accumulo.test.stress.random.Write" /> </Or> <Method name="main" params="java.lang.String[]" returns="void" /> http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java deleted file mode 100644 index e08be10..0000000 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.continuous; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.apache.accumulo.core.cli.BatchScannerOpts; -import org.apache.accumulo.core.cli.ClientOnDefaultTable; -import org.apache.accumulo.core.cli.ScannerOpts; -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.io.Text; - -import com.beust.jcommander.Parameter; -import com.beust.jcommander.validators.PositiveInteger; -import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; - -public class ContinuousBatchWalker { - - static class Opts extends ContinuousWalk.Opts { - @Parameter(names = "--numToScan", description = "Number rows to scan between sleeps", required = true, validateWith = PositiveInteger.class) - long numToScan = 0; - } - - public static void main(String[] args) throws Exception { - - Opts opts = new Opts(); - ScannerOpts scanOpts = new ScannerOpts(); - BatchScannerOpts bsOpts = new BatchScannerOpts(); - ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci"); - clientOpts.parseArgs(ContinuousBatchWalker.class.getName(), args, scanOpts, bsOpts, opts); - - Random r = new Random(); - Authorizations auths = opts.randomAuths.getAuths(r); - - Connector conn = clientOpts.getConnector(); - Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), auths); - scanner.setBatchSize(scanOpts.scanBatchSize); - - while (true) { - BatchScanner bs = conn.createBatchScanner(clientOpts.getTableName(), auths, bsOpts.scanThreads); - bs.setTimeout(bsOpts.scanTimeout, TimeUnit.MILLISECONDS); - - Set<Text> batch = getBatch(scanner, opts.min, opts.max, scanOpts.scanBatchSize, r); - List<Range> ranges = new ArrayList<>(batch.size()); - - for (Text row : batch) { - ranges.add(new Range(row)); - } - - runBatchScan(scanOpts.scanBatchSize, bs, batch, ranges); - - sleepUninterruptibly(opts.sleepTime, TimeUnit.MILLISECONDS); - } - - } - - private static void runBatchScan(int batchSize, BatchScanner bs, Set<Text> batch, List<Range> ranges) { - bs.setRanges(ranges); - - Set<Text> rowsSeen = new HashSet<>(); - - int count = 0; - - long t1 = System.currentTimeMillis(); - - for (Entry<Key,Value> entry : bs) { - ContinuousWalk.validate(entry.getKey(), entry.getValue()); - - rowsSeen.add(entry.getKey().getRow()); - - addRow(batchSize, entry.getValue()); - - count++; - } - bs.close(); - - long t2 = System.currentTimeMillis(); - - if (!rowsSeen.equals(batch)) { - HashSet<Text> copy1 = new HashSet<>(rowsSeen); - HashSet<Text> copy2 = new HashSet<>(batch); - - copy1.removeAll(batch); - copy2.removeAll(rowsSeen); - - System.out.printf("DIF %d %d %d%n", t1, copy1.size(), copy2.size()); - System.err.printf("DIF %d %d %d%n", t1, copy1.size(), copy2.size()); - System.err.println("Extra seen : " + copy1); - System.err.println("Not seen : " + copy2); - } else { - System.out.printf("BRQ %d %d %d %d %d%n", t1, (t2 - t1), rowsSeen.size(), count, (int) (rowsSeen.size() / ((t2 - t1) / 1000.0))); - } - - } - - private static void addRow(int batchSize, Value v) { - byte[] val = v.get(); - - int offset = ContinuousWalk.getPrevRowOffset(val); - if (offset > 1) { - Text prevRow = new Text(); - prevRow.set(val, offset, 16); - if (rowsToQuery.size() < 3 * batchSize) { - rowsToQuery.add(prevRow); - } - } - } - - private static HashSet<Text> rowsToQuery = new HashSet<>(); - - private static Set<Text> getBatch(Scanner scanner, long min, long max, int batchSize, Random r) { - - while (rowsToQuery.size() < batchSize) { - byte[] scanStart = ContinuousIngest.genRow(min, max, r); - scanner.setRange(new Range(new Text(scanStart), null)); - - int count = 0; - - long t1 = System.currentTimeMillis(); - - Iterator<Entry<Key,Value>> iter = scanner.iterator(); - while (iter.hasNext() && rowsToQuery.size() < 3 * batchSize) { - Entry<Key,Value> entry = iter.next(); - ContinuousWalk.validate(entry.getKey(), entry.getValue()); - addRow(batchSize, entry.getValue()); - count++; - } - - long t2 = System.currentTimeMillis(); - - System.out.println("FSB " + t1 + " " + (t2 - t1) + " " + count); - - sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - } - - HashSet<Text> ret = new HashSet<>(); - - Iterator<Text> iter = rowsToQuery.iterator(); - - for (int i = 0; i < batchSize; i++) { - ret.add(iter.next()); - iter.remove(); - } - - return ret; - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java deleted file mode 100644 index b59cf04..0000000 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.continuous; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Random; -import java.util.UUID; -import java.util.zip.CRC32; -import java.util.zip.Checksum; - -import org.apache.accumulo.core.cli.BatchWriterOpts; -import org.apache.accumulo.core.cli.ClientOnDefaultTable; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.core.trace.CountSampler; -import org.apache.accumulo.core.trace.Trace; -import org.apache.accumulo.core.util.FastFormat; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; - -public class ContinuousIngest { - - private static final byte[] EMPTY_BYTES = new byte[0]; - - private static List<ColumnVisibility> visibilities; - - private static void initVisibilities(ContinuousOpts opts) throws Exception { - if (opts.visFile == null) { - visibilities = Collections.singletonList(new ColumnVisibility()); - return; - } - - visibilities = new ArrayList<>(); - - FileSystem fs = FileSystem.get(new Configuration()); - BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(opts.visFile)), UTF_8)); - - String line; - - while ((line = in.readLine()) != null) { - visibilities.add(new ColumnVisibility(line)); - } - - in.close(); - } - - private static ColumnVisibility getVisibility(Random rand) { - return visibilities.get(rand.nextInt(visibilities.size())); - } - - public static void main(String[] args) throws Exception { - - ContinuousOpts opts = new ContinuousOpts(); - BatchWriterOpts bwOpts = new BatchWriterOpts(); - ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci"); - clientOpts.parseArgs(ContinuousIngest.class.getName(), args, bwOpts, opts); - - initVisibilities(opts); - - if (opts.min < 0 || opts.max < 0 || opts.max <= opts.min) { - throw new IllegalArgumentException("bad min and max"); - } - Connector conn = clientOpts.getConnector(); - - if (!conn.tableOperations().exists(clientOpts.getTableName())) { - throw new TableNotFoundException(null, clientOpts.getTableName(), "Consult the README and create the table before starting ingest."); - } - - BatchWriter bw = conn.createBatchWriter(clientOpts.getTableName(), bwOpts.getBatchWriterConfig()); - bw = Trace.wrapAll(bw, new CountSampler(1024)); - - Random r = new Random(); - - byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8); - - System.out.printf("UUID %d %s%n", System.currentTimeMillis(), new String(ingestInstanceId, UTF_8)); - - long count = 0; - final int flushInterval = 1000000; - final int maxDepth = 25; - - // always want to point back to flushed data. This way the previous item should - // always exist in accumulo when verifying data. To do this make insert N point - // back to the row from insert (N - flushInterval). The array below is used to keep - // track of this. - long prevRows[] = new long[flushInterval]; - long firstRows[] = new long[flushInterval]; - int firstColFams[] = new int[flushInterval]; - int firstColQuals[] = new int[flushInterval]; - - long lastFlushTime = System.currentTimeMillis(); - - out: while (true) { - // generate first set of nodes - ColumnVisibility cv = getVisibility(r); - - for (int index = 0; index < flushInterval; index++) { - long rowLong = genLong(opts.min, opts.max, r); - prevRows[index] = rowLong; - firstRows[index] = rowLong; - - int cf = r.nextInt(opts.maxColF); - int cq = r.nextInt(opts.maxColQ); - - firstColFams[index] = cf; - firstColQuals[index] = cq; - - Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, r, opts.checksum); - count++; - bw.addMutation(m); - } - - lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); - if (count >= opts.num) - break out; - - // generate subsequent sets of nodes that link to previous set of nodes - for (int depth = 1; depth < maxDepth; depth++) { - for (int index = 0; index < flushInterval; index++) { - long rowLong = genLong(opts.min, opts.max, r); - byte[] prevRow = genRow(prevRows[index]); - prevRows[index] = rowLong; - Mutation m = genMutation(rowLong, r.nextInt(opts.maxColF), r.nextInt(opts.maxColQ), cv, ingestInstanceId, count, prevRow, r, opts.checksum); - count++; - bw.addMutation(m); - } - - lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); - if (count >= opts.num) - break out; - } - - // create one big linked list, this makes all of the first inserts - // point to something - for (int index = 0; index < flushInterval - 1; index++) { - Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], cv, ingestInstanceId, count, genRow(prevRows[index + 1]), r, - opts.checksum); - count++; - bw.addMutation(m); - } - lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); - if (count >= opts.num) - break out; - } - - bw.close(); - clientOpts.stopTracing(); - } - - private static long flush(BatchWriter bw, long count, final int flushInterval, long lastFlushTime) throws MutationsRejectedException { - long t1 = System.currentTimeMillis(); - bw.flush(); - long t2 = System.currentTimeMillis(); - System.out.printf("FLUSH %d %d %d %d %d%n", t2, (t2 - lastFlushTime), (t2 - t1), count, flushInterval); - lastFlushTime = t2; - return lastFlushTime; - } - - public static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVisibility cv, byte[] ingestInstanceId, long count, byte[] prevRow, Random r, - boolean checksum) { - // Adler32 is supposed to be faster, but according to wikipedia is not good for small data.... so used CRC32 instead - CRC32 cksum = null; - - byte[] rowString = genRow(rowLong); - - byte[] cfString = FastFormat.toZeroPaddedString(cfInt, 4, 16, EMPTY_BYTES); - byte[] cqString = FastFormat.toZeroPaddedString(cqInt, 4, 16, EMPTY_BYTES); - - if (checksum) { - cksum = new CRC32(); - cksum.update(rowString); - cksum.update(cfString); - cksum.update(cqString); - cksum.update(cv.getExpression()); - } - - Mutation m = new Mutation(new Text(rowString)); - - m.put(new Text(cfString), new Text(cqString), cv, createValue(ingestInstanceId, count, prevRow, cksum)); - return m; - } - - public static final long genLong(long min, long max, Random r) { - return ((r.nextLong() & 0x7fffffffffffffffl) % (max - min)) + min; - } - - static final byte[] genRow(long min, long max, Random r) { - return genRow(genLong(min, max, r)); - } - - static final byte[] genRow(long rowLong) { - return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES); - } - - private static Value createValue(byte[] ingestInstanceId, long count, byte[] prevRow, Checksum cksum) { - int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3; - if (cksum != null) - dataLen += 8; - byte val[] = new byte[dataLen]; - System.arraycopy(ingestInstanceId, 0, val, 0, ingestInstanceId.length); - int index = ingestInstanceId.length; - val[index++] = ':'; - int added = FastFormat.toZeroPaddedString(val, index, count, 16, 16, EMPTY_BYTES); - if (added != 16) - throw new RuntimeException(" " + added); - index += 16; - val[index++] = ':'; - if (prevRow != null) { - System.arraycopy(prevRow, 0, val, index, prevRow.length); - index += prevRow.length; - } - - val[index++] = ':'; - - if (cksum != null) { - cksum.update(val, 0, index); - cksum.getValue(); - FastFormat.toZeroPaddedString(val, index, cksum.getValue(), 8, 16, EMPTY_BYTES); - } - - // System.out.println("val "+new String(val)); - - return new Value(val); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java deleted file mode 100644 index 48154a6..0000000 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.continuous; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.IOException; -import java.util.Random; -import java.util.Set; -import java.util.UUID; - -import org.apache.accumulo.core.cli.BatchWriterOpts; -import org.apache.accumulo.core.cli.MapReduceClientOnDefaultTable; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -import com.beust.jcommander.Parameter; -import com.beust.jcommander.validators.PositiveInteger; - -/** - * A map only job that reads a table created by continuous ingest and creates doubly linked list. This map reduce job tests the ability of a map only job to - * read and write to accumulo at the same time. This map reduce job mutates the table in such a way that it should not create any undefined nodes. - * - */ -public class ContinuousMoru extends Configured implements Tool { - private static final String PREFIX = ContinuousMoru.class.getSimpleName() + "."; - private static final String MAX_CQ = PREFIX + "MAX_CQ"; - private static final String MAX_CF = PREFIX + "MAX_CF"; - private static final String MAX = PREFIX + "MAX"; - private static final String MIN = PREFIX + "MIN"; - private static final String CI_ID = PREFIX + "CI_ID"; - - static enum Counts { - SELF_READ; - } - - public static class CMapper extends Mapper<Key,Value,Text,Mutation> { - - private short max_cf; - private short max_cq; - private Random random; - private String ingestInstanceId; - private byte[] iiId; - private long count; - - private static final ColumnVisibility EMPTY_VIS = new ColumnVisibility(); - - @Override - public void setup(Context context) throws IOException, InterruptedException { - int max_cf = context.getConfiguration().getInt(MAX_CF, -1); - int max_cq = context.getConfiguration().getInt(MAX_CQ, -1); - - if (max_cf > Short.MAX_VALUE || max_cq > Short.MAX_VALUE) - throw new IllegalArgumentException(); - - this.max_cf = (short) max_cf; - this.max_cq = (short) max_cq; - - random = new Random(); - ingestInstanceId = context.getConfiguration().get(CI_ID); - iiId = ingestInstanceId.getBytes(UTF_8); - - count = 0; - } - - @Override - public void map(Key key, Value data, Context context) throws IOException, InterruptedException { - - ContinuousWalk.validate(key, data); - - if (WritableComparator.compareBytes(iiId, 0, iiId.length, data.get(), 0, iiId.length) != 0) { - // only rewrite data not written by this M/R job - byte[] val = data.get(); - - int offset = ContinuousWalk.getPrevRowOffset(val); - if (offset > 0) { - long rowLong = Long.parseLong(new String(val, offset, 16, UTF_8), 16); - Mutation m = ContinuousIngest.genMutation(rowLong, random.nextInt(max_cf), random.nextInt(max_cq), EMPTY_VIS, iiId, count++, key.getRowData() - .toArray(), random, true); - context.write(null, m); - } - - } else { - context.getCounter(Counts.SELF_READ).increment(1l); - } - } - } - - static class Opts extends ContinuousOpts { - @Parameter(names = "--maxColF", description = "maximum column family value to use", converter = ShortConverter.class) - short maxColF = Short.MAX_VALUE; - - @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter = ShortConverter.class) - short maxColQ = Short.MAX_VALUE; - - @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class) - int maxMaps = 0; - } - - @Override - public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, AccumuloSecurityException { - Opts opts = new Opts(); - BatchWriterOpts bwOpts = new BatchWriterOpts(); - MapReduceClientOnDefaultTable clientOpts = new MapReduceClientOnDefaultTable("ci"); - clientOpts.parseArgs(ContinuousMoru.class.getName(), args, bwOpts, opts); - - Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); - job.setJarByClass(this.getClass()); - - job.setInputFormatClass(AccumuloInputFormat.class); - clientOpts.setAccumuloConfigs(job); - - // set up ranges - try { - Set<Range> ranges = clientOpts.getConnector().tableOperations().splitRangeByTablets(clientOpts.getTableName(), new Range(), opts.maxMaps); - AccumuloInputFormat.setRanges(job, ranges); - AccumuloInputFormat.setAutoAdjustRanges(job, false); - } catch (Exception e) { - throw new IOException(e); - } - - job.setMapperClass(CMapper.class); - - job.setNumReduceTasks(0); - - job.setOutputFormatClass(AccumuloOutputFormat.class); - AccumuloOutputFormat.setBatchWriterOptions(job, bwOpts.getBatchWriterConfig()); - - Configuration conf = job.getConfiguration(); - conf.setLong(MIN, opts.min); - conf.setLong(MAX, opts.max); - conf.setInt(MAX_CF, opts.maxColF); - conf.setInt(MAX_CQ, opts.maxColQ); - conf.set(CI_ID, UUID.randomUUID().toString()); - - job.waitForCompletion(true); - clientOpts.stopTracing(); - return job.isSuccessful() ? 0 : 1; - } - - /** - * - * @param args - * instanceName zookeepers username password table columns outputpath - */ - public static void main(String[] args) throws Exception { - int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousMoru(), args); - if (res != 0) - System.exit(res); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousOpts.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousOpts.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousOpts.java deleted file mode 100644 index 48a77e7..0000000 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousOpts.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.continuous; - -import java.io.IOException; - -import org.apache.accumulo.core.Constants; -import org.apache.log4j.FileAppender; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.log4j.PatternLayout; - -import com.beust.jcommander.IStringConverter; -import com.beust.jcommander.Parameter; - -/** - * Common CLI arguments for the Continuous Ingest suite. - */ -public class ContinuousOpts { - - public static class DebugConverter implements IStringConverter<String> { - @Override - public String convert(String debugLog) { - Logger logger = Logger.getLogger(Constants.CORE_PACKAGE_NAME); - logger.setLevel(Level.TRACE); - logger.setAdditivity(false); - try { - logger.addAppender(new FileAppender(new PatternLayout("%d{dd HH:mm:ss,SSS} [%-8c{2}] %-5p: %m%n"), debugLog, true)); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - return debugLog; - } - } - - public static class ShortConverter implements IStringConverter<Short> { - @Override - public Short convert(String value) { - return Short.valueOf(value); - } - } - - @Parameter(names = "--min", description = "lowest random row number to use") - long min = 0; - - @Parameter(names = "--max", description = "maximum random row number to use") - long max = Long.MAX_VALUE; - - @Parameter(names = "--debugLog", description = "file to write debugging output", converter = DebugConverter.class) - String debugLog = null; - - @Parameter(names = "--num", description = "the number of entries to ingest") - long num = Long.MAX_VALUE; - - @Parameter(names = "--maxColF", description = "maximum column family value to use", converter = ShortConverter.class) - short maxColF = Short.MAX_VALUE; - - @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter = ShortConverter.class) - short maxColQ = Short.MAX_VALUE; - - @Parameter(names = "--addCheckSum", description = "turn on checksums") - boolean checksum = false; - - @Parameter(names = "--visibilities", description = "read the visibilities to ingest with from a file") - String visFile = null; -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java deleted file mode 100644 index 7f89a94..0000000 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.continuous; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.util.Map.Entry; -import java.util.Random; - -import org.apache.accumulo.core.cli.ClientOnDefaultTable; -import org.apache.accumulo.core.cli.ClientOpts.TimeConverter; -import org.apache.accumulo.core.cli.ScannerOpts; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.hadoop.io.Text; - -import com.beust.jcommander.Parameter; - -public class ContinuousQuery { - - public static class Opts extends ContinuousOpts { - @Parameter(names = "--sleep", description = "the time to wait between queries", converter = TimeConverter.class) - long sleepTime = 100; - } - - public static void main(String[] args) throws Exception { - Opts opts = new Opts(); - ScannerOpts scanOpts = new ScannerOpts(); - ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci"); - clientOpts.parseArgs(ContinuousQuery.class.getName(), args, scanOpts, opts); - - Connector conn = clientOpts.getConnector(); - Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), clientOpts.auths); - scanner.setBatchSize(scanOpts.scanBatchSize); - - Random r = new Random(); - - while (true) { - byte[] row = ContinuousIngest.genRow(opts.min, opts.max, r); - - int count = 0; - - long t1 = System.currentTimeMillis(); - scanner.setRange(new Range(new Text(row))); - for (Entry<Key,Value> entry : scanner) { - ContinuousWalk.validate(entry.getKey(), entry.getValue()); - count++; - } - long t2 = System.currentTimeMillis(); - - System.out.printf("SRQ %d %s %d %d%n", t1, new String(row, UTF_8), (t2 - t1), count); - - if (opts.sleepTime > 0) - Thread.sleep(opts.sleepTime); - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java deleted file mode 100644 index 63709df..0000000 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.continuous; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.Random; -import java.util.concurrent.TimeUnit; - -import org.apache.accumulo.core.cli.ClientOnDefaultTable; -import org.apache.accumulo.core.cli.ScannerOpts; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.io.Text; - -import com.beust.jcommander.Parameter; -import com.beust.jcommander.validators.PositiveInteger; -import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; - -public class ContinuousScanner { - - static class Opts extends ContinuousWalk.Opts { - @Parameter(names = "--numToScan", description = "Number rows to scan between sleeps", required = true, validateWith = PositiveInteger.class) - long numToScan = 0; - } - - public static void main(String[] args) throws Exception { - Opts opts = new Opts(); - ScannerOpts scanOpts = new ScannerOpts(); - ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci"); - clientOpts.parseArgs(ContinuousScanner.class.getName(), args, scanOpts, opts); - - Random r = new Random(); - - long distance = 1000000000000l; - - Connector conn = clientOpts.getConnector(); - Authorizations auths = opts.randomAuths.getAuths(r); - Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), auths); - scanner.setBatchSize(scanOpts.scanBatchSize); - - double delta = Math.min(.05, .05 / (opts.numToScan / 1000.0)); - - while (true) { - long startRow = ContinuousIngest.genLong(opts.min, opts.max - distance, r); - byte[] scanStart = ContinuousIngest.genRow(startRow); - byte[] scanStop = ContinuousIngest.genRow(startRow + distance); - - scanner.setRange(new Range(new Text(scanStart), new Text(scanStop))); - - int count = 0; - Iterator<Entry<Key,Value>> iter = scanner.iterator(); - - long t1 = System.currentTimeMillis(); - - while (iter.hasNext()) { - Entry<Key,Value> entry = iter.next(); - ContinuousWalk.validate(entry.getKey(), entry.getValue()); - count++; - } - - long t2 = System.currentTimeMillis(); - - // System.out.println("P1 " +count +" "+((1-delta) * numToScan)+" "+((1+delta) * numToScan)+" "+numToScan); - - if (count < (1 - delta) * opts.numToScan || count > (1 + delta) * opts.numToScan) { - if (count == 0) { - distance = distance * 10; - if (distance < 0) - distance = 1000000000000l; - } else { - double ratio = (double) opts.numToScan / count; - // move ratio closer to 1 to make change slower - ratio = ratio - (ratio - 1.0) * (2.0 / 3.0); - distance = (long) (ratio * distance); - } - - // System.out.println("P2 "+delta +" "+numToScan+" "+distance+" "+((double)numToScan/count )); - } - - System.out.printf("SCN %d %s %d %d%n", t1, new String(scanStart, UTF_8), (t2 - t1), count); - - if (opts.sleepTime > 0) - sleepUninterruptibly(opts.sleepTime, TimeUnit.MILLISECONDS); - } - - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java deleted file mode 100644 index 8f8c791..0000000 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.continuous; - -import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; - -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; -import java.util.Timer; -import java.util.TimerTask; - -import org.apache.accumulo.core.cli.ScannerOpts; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.impl.ClientContext; -import org.apache.accumulo.core.client.impl.Credentials; -import org.apache.accumulo.core.client.impl.MasterClient; -import org.apache.accumulo.core.client.impl.Tables; -import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.data.impl.KeyExtent; -import org.apache.accumulo.core.iterators.ColumnFamilyCounter; -import org.apache.accumulo.core.master.thrift.MasterClientService; -import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; -import org.apache.accumulo.core.master.thrift.TableInfo; -import org.apache.accumulo.core.master.thrift.TabletServerStatus; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; -import org.apache.accumulo.core.trace.Tracer; -import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.core.util.Stat; -import org.apache.accumulo.server.ServerConstants; -import org.apache.accumulo.server.cli.ClientOnRequiredTable; -import org.apache.accumulo.server.conf.ServerConfigurationFactory; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.fs.VolumeManagerImpl; -import org.apache.accumulo.server.util.TableInfoUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.ClusterStatus; -import org.apache.hadoop.mapred.JobClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ContinuousStatsCollector { - - private static final Logger log = LoggerFactory.getLogger(ContinuousStatsCollector.class); - - static class StatsCollectionTask extends TimerTask { - - private final String tableId; - private final Opts opts; - private final int scanBatchSize; - - public StatsCollectionTask(Opts opts, int scanBatchSize) { - this.opts = opts; - this.scanBatchSize = scanBatchSize; - this.tableId = Tables.getNameToIdMap(opts.getInstance()).get(opts.getTableName()); - System.out - .println("TIME TABLET_SERVERS TOTAL_ENTRIES TOTAL_INGEST TOTAL_QUERY TABLE_RECS TABLE_RECS_IN_MEM TABLE_INGEST TABLE_QUERY TABLE_TABLETS TABLE_TABLETS_ONLINE" - + " ACCUMULO_DU ACCUMULO_DIRS ACCUMULO_FILES TABLE_DU TABLE_DIRS TABLE_FILES" - + " MAP_TASK MAX_MAP_TASK REDUCE_TASK MAX_REDUCE_TASK TASK_TRACKERS BLACK_LISTED MIN_FILES/TABLET MAX_FILES/TABLET AVG_FILES/TABLET STDDEV_FILES/TABLET"); - } - - @Override - public void run() { - try { - String acuStats = getACUStats(); - String fsStats = getFSStats(); - String mrStats = getMRStats(); - String tabletStats = getTabletStats(); - - System.out.println(System.currentTimeMillis() + " " + acuStats + " " + fsStats + " " + mrStats + " " + tabletStats); - } catch (Exception e) { - log.error(System.currentTimeMillis() + " - Failed to collect stats", e); - } - } - - private String getTabletStats() throws Exception { - - Connector conn = opts.getConnector(); - Scanner scanner = conn.createScanner(MetadataTable.NAME, opts.auths); - scanner.setBatchSize(scanBatchSize); - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - scanner.addScanIterator(new IteratorSetting(1000, "cfc", ColumnFamilyCounter.class.getName())); - scanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); - - Stat s = new Stat(); - - int count = 0; - for (Entry<Key,Value> entry : scanner) { - count++; - s.addStat(Long.parseLong(entry.getValue().toString())); - } - - if (count > 0) - return String.format("%d %d %.3f %.3f", s.getMin(), s.getMax(), s.getAverage(), s.getStdDev()); - else - return "0 0 0 0"; - - } - - private String getFSStats() throws Exception { - VolumeManager fs = VolumeManagerImpl.get(); - long length1 = 0, dcount1 = 0, fcount1 = 0; - long length2 = 0, dcount2 = 0, fcount2 = 0; - for (String dir : ServerConstants.getTablesDirs()) { - ContentSummary contentSummary = fs.getContentSummary(new Path(dir)); - length1 += contentSummary.getLength(); - dcount1 += contentSummary.getDirectoryCount(); - fcount1 += contentSummary.getFileCount(); - contentSummary = fs.getContentSummary(new Path(dir, tableId)); - length2 += contentSummary.getLength(); - dcount2 += contentSummary.getDirectoryCount(); - fcount2 += contentSummary.getFileCount(); - } - - return "" + length1 + " " + dcount1 + " " + fcount1 + " " + length2 + " " + dcount2 + " " + fcount2; - } - - private String getACUStats() throws Exception { - - MasterClientService.Iface client = null; - while (true) { - try { - ClientContext context = new ClientContext(opts.getInstance(), new Credentials(opts.getPrincipal(), opts.getToken()), new ServerConfigurationFactory( - opts.getInstance()).getConfiguration()); - client = MasterClient.getConnectionWithRetry(context); - MasterMonitorInfo stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds()); - - TableInfo all = new TableInfo(); - Map<String,TableInfo> tableSummaries = new HashMap<>(); - - for (TabletServerStatus server : stats.tServerInfo) { - for (Entry<String,TableInfo> info : server.tableMap.entrySet()) { - TableInfo tableSummary = tableSummaries.get(info.getKey()); - if (tableSummary == null) { - tableSummary = new TableInfo(); - tableSummaries.put(info.getKey(), tableSummary); - } - TableInfoUtil.add(tableSummary, info.getValue()); - TableInfoUtil.add(all, info.getValue()); - } - } - - TableInfo ti = tableSummaries.get(tableId); - - return "" + stats.tServerInfo.size() + " " + all.recs + " " + (long) all.ingestRate + " " + (long) all.queryRate + " " + ti.recs + " " - + ti.recsInMemory + " " + (long) ti.ingestRate + " " + (long) ti.queryRate + " " + ti.tablets + " " + ti.onlineTablets; - - } catch (ThriftNotActiveServiceException e) { - // Let it loop, fetching a new location - log.debug("Contacted a Master which is no longer active, retrying"); - sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - } finally { - if (client != null) - MasterClient.close(client); - } - } - } - - } - - private static String getMRStats() throws Exception { - Configuration conf = CachedConfiguration.getInstance(); - // No alternatives for hadoop 20 - JobClient jc = new JobClient(new org.apache.hadoop.mapred.JobConf(conf)); - - ClusterStatus cs = jc.getClusterStatus(false); - - return "" + cs.getMapTasks() + " " + cs.getMaxMapTasks() + " " + cs.getReduceTasks() + " " + cs.getMaxReduceTasks() + " " + cs.getTaskTrackers() + " " - + cs.getBlacklistedTrackers(); - - } - - static class Opts extends ClientOnRequiredTable {} - - public static void main(String[] args) { - Opts opts = new Opts(); - ScannerOpts scanOpts = new ScannerOpts(); - opts.parseArgs(ContinuousStatsCollector.class.getName(), args, scanOpts); - Timer jtimer = new Timer(); - - jtimer.schedule(new StatsCollectionTask(opts, scanOpts.scanBatchSize), 0, 30000); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousUtil.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousUtil.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousUtil.java deleted file mode 100644 index a8b2930..0000000 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousUtil.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.continuous; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.security.Authorizations; - -/** - * Useful utility methods common to the Continuous test suite. - */ -final class ContinuousUtil { - private ContinuousUtil() {} - - /** - * Attempt to create a table scanner, or fail if the table does not exist. - * - * @param connector - * A populated connector object - * @param table - * The table name to scan over - * @param auths - * The authorizations to use for the scanner - * @return a scanner for the requested table - * @throws TableNotFoundException - * If the table does not exist - */ - static Scanner createScanner(Connector connector, String table, Authorizations auths) throws TableNotFoundException { - if (!connector.tableOperations().exists(table)) { - throw new TableNotFoundException(null, table, "Consult the README and create the table before starting test processes."); - } - return connector.createScanner(table, auths); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java deleted file mode 100644 index 4222005..0000000 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.continuous; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Random; -import java.util.Set; - -import org.apache.accumulo.core.cli.MapReduceClientOnDefaultTable; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.test.continuous.ContinuousWalk.BadChecksumException; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.VLongWritable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.beust.jcommander.Parameter; -import com.beust.jcommander.validators.PositiveInteger; - -/** - * A map reduce job that verifies a table created by continuous ingest. It verifies that all referenced nodes are defined. - */ - -public class ContinuousVerify extends Configured implements Tool { - - public static final VLongWritable DEF = new VLongWritable(-1); - - public static class CMapper extends Mapper<Key,Value,LongWritable,VLongWritable> { - - private static final Logger log = LoggerFactory.getLogger(CMapper.class); - private LongWritable row = new LongWritable(); - private LongWritable ref = new LongWritable(); - private VLongWritable vrow = new VLongWritable(); - - private long corrupt = 0; - - @Override - public void map(Key key, Value data, Context context) throws IOException, InterruptedException { - long r = Long.parseLong(key.getRow().toString(), 16); - if (r < 0) - throw new IllegalArgumentException(); - - try { - ContinuousWalk.validate(key, data); - } catch (BadChecksumException bce) { - context.getCounter(Counts.CORRUPT).increment(1l); - if (corrupt < 1000) { - log.error("Bad checksum : " + key); - } else if (corrupt == 1000) { - System.out.println("Too many bad checksums, not printing anymore!"); - } - corrupt++; - return; - } - - row.set(r); - - context.write(row, DEF); - byte[] val = data.get(); - - int offset = ContinuousWalk.getPrevRowOffset(val); - if (offset > 0) { - ref.set(Long.parseLong(new String(val, offset, 16, UTF_8), 16)); - vrow.set(r); - context.write(ref, vrow); - } - } - } - - public static enum Counts { - UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT - } - - public static class CReducer extends Reducer<LongWritable,VLongWritable,Text,Text> { - private ArrayList<Long> refs = new ArrayList<>(); - - @Override - public void reduce(LongWritable key, Iterable<VLongWritable> values, Context context) throws IOException, InterruptedException { - - int defCount = 0; - - refs.clear(); - for (VLongWritable type : values) { - if (type.get() == -1) { - defCount++; - } else { - refs.add(type.get()); - } - } - - if (defCount == 0 && refs.size() > 0) { - StringBuilder sb = new StringBuilder(); - String comma = ""; - for (Long ref : refs) { - sb.append(comma); - comma = ","; - sb.append(new String(ContinuousIngest.genRow(ref), UTF_8)); - } - - context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString())); - context.getCounter(Counts.UNDEFINED).increment(1l); - - } else if (defCount > 0 && refs.size() == 0) { - context.getCounter(Counts.UNREFERENCED).increment(1l); - } else { - context.getCounter(Counts.REFERENCED).increment(1l); - } - - } - } - - static class Opts extends MapReduceClientOnDefaultTable { - @Parameter(names = "--output", description = "location in HDFS to store the results; must not exist") - String outputDir = "/tmp/continuousVerify"; - - @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", validateWith = PositiveInteger.class) - int maxMaps = 1; - - @Parameter(names = "--reducers", description = "the number of reducers to use", validateWith = PositiveInteger.class) - int reducers = 1; - - @Parameter(names = "--offline", description = "perform the verification directly on the files while the table is offline") - boolean scanOffline = false; - - public Opts() { - super("ci"); - } - } - - @Override - public int run(String[] args) throws Exception { - Opts opts = new Opts(); - opts.parseArgs(this.getClass().getName(), args); - - Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); - job.setJarByClass(this.getClass()); - - job.setInputFormatClass(AccumuloInputFormat.class); - opts.setAccumuloConfigs(job); - - Set<Range> ranges = null; - String clone = opts.getTableName(); - Connector conn = null; - - if (opts.scanOffline) { - Random random = new Random(); - clone = opts.getTableName() + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffl)); - conn = opts.getConnector(); - conn.tableOperations().clone(opts.getTableName(), clone, true, new HashMap<String,String>(), new HashSet<String>()); - ranges = conn.tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps); - conn.tableOperations().offline(clone); - AccumuloInputFormat.setInputTableName(job, clone); - AccumuloInputFormat.setOfflineTableScan(job, true); - } else { - ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps); - } - - AccumuloInputFormat.setRanges(job, ranges); - AccumuloInputFormat.setAutoAdjustRanges(job, false); - - job.setMapperClass(CMapper.class); - job.setMapOutputKeyClass(LongWritable.class); - job.setMapOutputValueClass(VLongWritable.class); - - job.setReducerClass(CReducer.class); - job.setNumReduceTasks(opts.reducers); - - job.setOutputFormatClass(TextOutputFormat.class); - - job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", opts.scanOffline); - - TextOutputFormat.setOutputPath(job, new Path(opts.outputDir)); - - job.waitForCompletion(true); - - if (opts.scanOffline) { - conn.tableOperations().delete(clone); - } - opts.stopTracing(); - return job.isSuccessful() ? 0 : 1; - } - - /** - * - * @param args - * instanceName zookeepers username password table columns outputpath - */ - public static void main(String[] args) throws Exception { - int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousVerify(), args); - if (res != 0) - System.exit(res); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java deleted file mode 100644 index 3f75d5a..0000000 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.continuous; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map.Entry; -import java.util.Random; -import java.util.zip.CRC32; - -import org.apache.accumulo.core.cli.ClientOnDefaultTable; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.trace.Span; -import org.apache.accumulo.core.trace.Trace; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; - -import com.beust.jcommander.IStringConverter; -import com.beust.jcommander.Parameter; - -public class ContinuousWalk { - - static public class Opts extends ContinuousQuery.Opts { - class RandomAuthsConverter implements IStringConverter<RandomAuths> { - @Override - public RandomAuths convert(String value) { - try { - return new RandomAuths(value); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - @Parameter(names = "--authsFile", description = "read the authorities to use from a file") - RandomAuths randomAuths = new RandomAuths(); - } - - static class BadChecksumException extends RuntimeException { - private static final long serialVersionUID = 1L; - - public BadChecksumException(String msg) { - super(msg); - } - - } - - static class RandomAuths { - private List<Authorizations> auths; - - RandomAuths() { - auths = Collections.singletonList(Authorizations.EMPTY); - } - - RandomAuths(String file) throws IOException { - if (file == null) { - auths = Collections.singletonList(Authorizations.EMPTY); - return; - } - - auths = new ArrayList<>(); - - FileSystem fs = FileSystem.get(new Configuration()); - BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(file)), UTF_8)); - try { - String line; - while ((line = in.readLine()) != null) { - auths.add(new Authorizations(line.split(","))); - } - } finally { - in.close(); - } - } - - Authorizations getAuths(Random r) { - return auths.get(r.nextInt(auths.size())); - } - } - - public static void main(String[] args) throws Exception { - Opts opts = new Opts(); - ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci"); - clientOpts.parseArgs(ContinuousWalk.class.getName(), args, opts); - - Connector conn = clientOpts.getConnector(); - - Random r = new Random(); - - ArrayList<Value> values = new ArrayList<>(); - - while (true) { - Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), opts.randomAuths.getAuths(r)); - String row = findAStartRow(opts.min, opts.max, scanner, r); - - while (row != null) { - - values.clear(); - - long t1 = System.currentTimeMillis(); - Span span = Trace.on("walk"); - try { - scanner.setRange(new Range(new Text(row))); - for (Entry<Key,Value> entry : scanner) { - validate(entry.getKey(), entry.getValue()); - values.add(entry.getValue()); - } - } finally { - span.stop(); - } - long t2 = System.currentTimeMillis(); - - System.out.printf("SRQ %d %s %d %d%n", t1, row, (t2 - t1), values.size()); - - if (values.size() > 0) { - row = getPrevRow(values.get(r.nextInt(values.size()))); - } else { - System.out.printf("MIS %d %s%n", t1, row); - System.err.printf("MIS %d %s%n", t1, row); - row = null; - } - - if (opts.sleepTime > 0) - Thread.sleep(opts.sleepTime); - } - - if (opts.sleepTime > 0) - Thread.sleep(opts.sleepTime); - } - } - - private static String findAStartRow(long min, long max, Scanner scanner, Random r) { - - byte[] scanStart = ContinuousIngest.genRow(min, max, r); - scanner.setRange(new Range(new Text(scanStart), null)); - scanner.setBatchSize(100); - - int count = 0; - String pr = null; - - long t1 = System.currentTimeMillis(); - - for (Entry<Key,Value> entry : scanner) { - validate(entry.getKey(), entry.getValue()); - pr = getPrevRow(entry.getValue()); - count++; - if (pr != null) - break; - } - - long t2 = System.currentTimeMillis(); - - System.out.printf("FSR %d %s %d %d%n", t1, new String(scanStart, UTF_8), (t2 - t1), count); - - return pr; - } - - static int getPrevRowOffset(byte val[]) { - if (val.length == 0) - throw new IllegalArgumentException(); - if (val[53] != ':') - throw new IllegalArgumentException(new String(val, UTF_8)); - - // prev row starts at 54 - if (val[54] != ':') { - if (val[54 + 16] != ':') - throw new IllegalArgumentException(new String(val, UTF_8)); - return 54; - } - - return -1; - } - - static String getPrevRow(Value value) { - - byte[] val = value.get(); - int offset = getPrevRowOffset(val); - if (offset > 0) { - return new String(val, offset, 16, UTF_8); - } - - return null; - } - - static int getChecksumOffset(byte val[]) { - if (val[val.length - 1] != ':') { - if (val[val.length - 9] != ':') - throw new IllegalArgumentException(new String(val, UTF_8)); - return val.length - 8; - } - - return -1; - } - - static void validate(Key key, Value value) throws BadChecksumException { - int ckOff = getChecksumOffset(value.get()); - if (ckOff < 0) - return; - - long storedCksum = Long.parseLong(new String(value.get(), ckOff, 8, UTF_8), 16); - - CRC32 cksum = new CRC32(); - - cksum.update(key.getRowData().toArray()); - cksum.update(key.getColumnFamilyData().toArray()); - cksum.update(key.getColumnQualifierData().toArray()); - cksum.update(key.getColumnVisibilityData().toArray()); - cksum.update(value.get(), 0, ckOff); - - if (cksum.getValue() != storedCksum) { - throw new BadChecksumException("Checksum invalid " + key + " " + value); - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/test/src/main/java/org/apache/accumulo/test/continuous/GenSplits.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/GenSplits.java b/test/src/main/java/org/apache/accumulo/test/continuous/GenSplits.java deleted file mode 100644 index ba39f1c..0000000 --- a/test/src/main/java/org/apache/accumulo/test/continuous/GenSplits.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.continuous; - -import java.util.List; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.ParameterException; - -/** - * - */ -public class GenSplits { - - static class Opts { - @Parameter(names = "--min", description = "minimum row") - long minRow = 0; - - @Parameter(names = "--max", description = "maximum row") - long maxRow = Long.MAX_VALUE; - - @Parameter(description = "<num tablets>") - List<String> args = null; - } - - public static void main(String[] args) { - - Opts opts = new Opts(); - JCommander jcommander = new JCommander(opts); - jcommander.setProgramName(GenSplits.class.getSimpleName()); - - try { - jcommander.parse(args); - } catch (ParameterException pe) { - System.err.println(pe.getMessage()); - jcommander.usage(); - System.exit(-1); - } - - if (opts.args == null || opts.args.size() != 1) { - jcommander.usage(); - System.exit(-1); - } - - int numTablets = Integer.parseInt(opts.args.get(0)); - - if (numTablets < 1) { - System.err.println("ERROR: numTablets < 1"); - System.exit(-1); - } - - if (opts.minRow >= opts.maxRow) { - System.err.println("ERROR: min >= max"); - System.exit(-1); - } - - int numSplits = numTablets - 1; - long distance = ((opts.maxRow - opts.minRow) / numTablets) + 1; - long split = distance; - for (int i = 0; i < numSplits; i++) { - - String s = String.format("%016x", split + opts.minRow); - - while (s.charAt(s.length() - 1) == '0') { - s = s.substring(0, s.length() - 1); - } - - System.out.println(s); - split += distance; - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/test/src/main/java/org/apache/accumulo/test/continuous/HistData.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/HistData.java b/test/src/main/java/org/apache/accumulo/test/continuous/HistData.java deleted file mode 100644 index cf23482..0000000 --- a/test/src/main/java/org/apache/accumulo/test/continuous/HistData.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.continuous; - -import java.io.Serializable; -import java.util.Objects; - -class HistData<T> implements Comparable<HistData<T>>, Serializable { - private static final long serialVersionUID = 1L; - - T bin; - long count; - - HistData(T bin) { - this.bin = bin; - count = 0; - } - - @Override - public int hashCode() { - return Objects.hashCode(bin) + Objects.hashCode(count); - } - - @SuppressWarnings("unchecked") - @Override - public boolean equals(Object obj) { - return obj == this || (obj != null && obj instanceof HistData && 0 == compareTo((HistData<T>) obj)); - } - - @SuppressWarnings("unchecked") - @Override - public int compareTo(HistData<T> o) { - return ((Comparable<T>) bin).compareTo(o.bin); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java b/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java deleted file mode 100644 index f4b21df..0000000 --- a/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.continuous; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.BufferedOutputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -public class Histogram<T> implements Serializable { - - private static final long serialVersionUID = 1L; - - protected long sum; - protected HashMap<T,HistData<T>> counts; - - public Histogram() { - sum = 0; - counts = new HashMap<>(); - } - - public void addPoint(T x) { - addPoint(x, 1); - } - - public void addPoint(T x, long y) { - - HistData<T> hd = counts.get(x); - if (hd == null) { - hd = new HistData<>(x); - counts.put(x, hd); - } - - hd.count += y; - sum += y; - } - - public long getCount(T x) { - HistData<T> hd = counts.get(x); - if (hd == null) - return 0; - return hd.count; - } - - public double getPercentage(T x) { - if (getSum() == 0) { - return 0; - } - return (double) getCount(x) / (double) getSum() * 100.0; - } - - public long getSum() { - return sum; - } - - public List<T> getKeysInCountSortedOrder() { - - ArrayList<HistData<T>> sortedCounts = new ArrayList<>(counts.values()); - - Collections.sort(sortedCounts, new Comparator<HistData<T>>() { - @Override - public int compare(HistData<T> o1, HistData<T> o2) { - if (o1.count < o2.count) - return -1; - if (o1.count > o2.count) - return 1; - return 0; - } - }); - - ArrayList<T> sortedKeys = new ArrayList<>(); - - for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) { - HistData<T> hd = iter.next(); - sortedKeys.add(hd.bin); - } - - return sortedKeys; - } - - public void print(StringBuilder out) { - TreeSet<HistData<T>> sortedCounts = new TreeSet<>(counts.values()); - - int maxValueLen = 0; - - for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) { - HistData<T> hd = iter.next(); - if (("" + hd.bin).length() > maxValueLen) { - maxValueLen = ("" + hd.bin).length(); - } - } - - double psum = 0; - - for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) { - HistData<T> hd = iter.next(); - - psum += getPercentage(hd.bin); - - out.append(String.format(" %" + (maxValueLen + 1) + "s %,16d %6.2f%s %6.2f%s%n", hd.bin + "", hd.count, getPercentage(hd.bin), "%", psum, "%")); - } - out.append(String.format("%n %" + (maxValueLen + 1) + "s %,16d %n", "TOTAL", sum)); - } - - public void save(String file) throws IOException { - - FileOutputStream fos = new FileOutputStream(file); - BufferedOutputStream bos = new BufferedOutputStream(fos); - PrintStream ps = new PrintStream(bos, false, UTF_8.name()); - - TreeSet<HistData<T>> sortedCounts = new TreeSet<>(counts.values()); - for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) { - HistData<T> hd = iter.next(); - ps.println(" " + hd.bin + " " + hd.count); - } - - ps.close(); - } - - public Set<T> getKeys() { - return counts.keySet(); - } - - public void clear() { - counts.clear(); - sum = 0; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/39830635/test/src/main/java/org/apache/accumulo/test/continuous/PrintScanTimeHistogram.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/PrintScanTimeHistogram.java b/test/src/main/java/org/apache/accumulo/test/continuous/PrintScanTimeHistogram.java deleted file mode 100644 index 1a25bab..0000000 --- a/test/src/main/java/org/apache/accumulo/test/continuous/PrintScanTimeHistogram.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.continuous; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.BufferedReader; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PrintScanTimeHistogram { - - private static final Logger log = LoggerFactory.getLogger(PrintScanTimeHistogram.class); - - public static void main(String[] args) throws Exception { - Histogram<String> srqHist = new Histogram<>(); - Histogram<String> fsrHist = new Histogram<>(); - - processFile(System.in, srqHist, fsrHist); - - StringBuilder report = new StringBuilder(); - report.append(String.format("%n *** Single row queries histogram *** %n")); - srqHist.print(report); - log.info("{}", report); - - report = new StringBuilder(); - report.append(String.format("%n *** Find start rows histogram *** %n")); - fsrHist.print(report); - log.info("{}", report); - } - - private static void processFile(InputStream ins, Histogram<String> srqHist, Histogram<String> fsrHist) throws FileNotFoundException, IOException { - String line; - BufferedReader in = new BufferedReader(new InputStreamReader(ins, UTF_8)); - - while ((line = in.readLine()) != null) { - - try { - String[] tokens = line.split(" "); - - String type = tokens[0]; - if (type.equals("SRQ")) { - long delta = Long.parseLong(tokens[3]); - String point = generateHistPoint(delta); - srqHist.addPoint(point); - } else if (type.equals("FSR")) { - long delta = Long.parseLong(tokens[3]); - String point = generateHistPoint(delta); - fsrHist.addPoint(point); - } - } catch (Exception e) { - log.error("Failed to process line '" + line + "'.", e); - } - } - - in.close(); - } - - private static String generateHistPoint(long delta) { - String point; - - if (delta / 1000.0 < .1) { - point = String.format("%07.2f", delta / 1000.0); - if (point.equals("0000.10")) - point = "0000.1x"; - } else if (delta / 1000.0 < 1.0) { - point = String.format("%06.1fx", delta / 1000.0); - if (point.equals("0001.0x")) - point = "0001.xx"; - } else { - point = String.format("%04.0f.xx", delta / 1000.0); - } - return point; - } - -}