http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/GenerateHashes.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/GenerateHashes.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/GenerateHashes.java deleted file mode 100644 index 72da0ae..0000000 --- a/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/GenerateHashes.java +++ /dev/null @@ -1,287 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.replication.merkle.cli; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileNotFoundException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Map.Entry; -import java.util.TreeSet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import org.apache.accumulo.core.cli.BatchWriterOpts; -import org.apache.accumulo.core.cli.ClientOnRequiredTable; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.test.replication.merkle.RangeSerialization; -import org.apache.accumulo.test.replication.merkle.skvi.DigestIterator; -import org.apache.commons.codec.binary.Hex; -import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.beust.jcommander.Parameter; -import com.google.common.collect.Iterables; - -/** - * Read from a table, compute a Merkle tree and output it to a table. Each key-value pair in the destination table is a leaf node of the Merkle tree. - */ -public class GenerateHashes { - private static final Logger log = LoggerFactory.getLogger(GenerateHashes.class); - - public static class GenerateHashesOpts extends ClientOnRequiredTable { - @Parameter(names = {"-hash", "--hash"}, required = true, description = "type of hash to use") - private String hashName; - - @Parameter(names = {"-o", "--output"}, required = true, description = "output table name, expected to exist and be writable") - private String outputTableName; - - @Parameter(names = {"-nt", "--numThreads"}, required = false, description = "number of concurrent threads calculating digests") - private int numThreads = 4; - - @Parameter(names = {"-iter", "--iterator"}, required = false, description = "Should we push down logic with an iterator") - private boolean iteratorPushdown = false; - - @Parameter(names = {"-s", "--splits"}, required = false, description = "File of splits to use for merkle tree") - private String splitsFile = null; - - public String getHashName() { - return hashName; - } - - public void setHashName(String hashName) { - this.hashName = hashName; - } - - public String getOutputTableName() { - return outputTableName; - } - - public void setOutputTableName(String outputTableName) { - this.outputTableName = outputTableName; - } - - public int getNumThreads() { - return numThreads; - } - - public void setNumThreads(int numThreads) { - this.numThreads = numThreads; - } - - public boolean isIteratorPushdown() { - return iteratorPushdown; - } - - public void setIteratorPushdown(boolean iteratorPushdown) { - this.iteratorPushdown = iteratorPushdown; - } - - public String getSplitsFile() { - return splitsFile; - } - - public void setSplitsFile(String splitsFile) { - this.splitsFile = splitsFile; - } - } - - public Collection<Range> getRanges(Connector conn, String tableName, String splitsFile) throws TableNotFoundException, AccumuloSecurityException, - AccumuloException, FileNotFoundException { - if (null == splitsFile) { - log.info("Using table split points"); - Collection<Text> endRows = conn.tableOperations().listSplits(tableName); - return endRowsToRanges(endRows); - } else { - log.info("Using provided split points"); - ArrayList<Text> splits = new ArrayList<>(); - - String line; - java.util.Scanner file = new java.util.Scanner(new File(splitsFile), UTF_8.name()); - try { - while (file.hasNextLine()) { - line = file.nextLine(); - if (!line.isEmpty()) { - splits.add(new Text(line)); - } - } - } finally { - file.close(); - } - - Collections.sort(splits); - return endRowsToRanges(splits); - } - } - - public void run(GenerateHashesOpts opts) throws TableNotFoundException, AccumuloSecurityException, AccumuloException, NoSuchAlgorithmException, - FileNotFoundException { - Collection<Range> ranges = getRanges(opts.getConnector(), opts.getTableName(), opts.getSplitsFile()); - - run(opts.getConnector(), opts.getTableName(), opts.getOutputTableName(), opts.getHashName(), opts.getNumThreads(), opts.isIteratorPushdown(), ranges); - } - - public void run(final Connector conn, final String inputTableName, final String outputTableName, final String digestName, int numThreads, - final boolean iteratorPushdown, final Collection<Range> ranges) throws TableNotFoundException, AccumuloSecurityException, AccumuloException, - NoSuchAlgorithmException { - if (!conn.tableOperations().exists(outputTableName)) { - throw new IllegalArgumentException(outputTableName + " does not exist, please create it"); - } - - // Get some parallelism - ExecutorService svc = Executors.newFixedThreadPool(numThreads); - final BatchWriter bw = conn.createBatchWriter(outputTableName, new BatchWriterConfig()); - - try { - for (final Range range : ranges) { - final MessageDigest digest = getDigestAlgorithm(digestName); - - svc.execute(new Runnable() { - - @Override - public void run() { - Scanner s; - try { - s = conn.createScanner(inputTableName, Authorizations.EMPTY); - } catch (Exception e) { - log.error("Could not get scanner for " + inputTableName, e); - throw new RuntimeException(e); - } - - s.setRange(range); - - Value v = null; - Mutation m = null; - if (iteratorPushdown) { - IteratorSetting cfg = new IteratorSetting(50, DigestIterator.class); - cfg.addOption(DigestIterator.HASH_NAME_KEY, digestName); - s.addScanIterator(cfg); - - // The scanner should only ever return us one Key-Value, otherwise this approach won't work - Entry<Key,Value> entry = Iterables.getOnlyElement(s); - - v = entry.getValue(); - m = RangeSerialization.toMutation(range, v); - } else { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - for (Entry<Key,Value> entry : s) { - DataOutputStream out = new DataOutputStream(baos); - try { - entry.getKey().write(out); - entry.getValue().write(out); - } catch (Exception e) { - log.error("Error writing {}", entry, e); - throw new RuntimeException(e); - } - - digest.update(baos.toByteArray()); - baos.reset(); - } - - v = new Value(digest.digest()); - m = RangeSerialization.toMutation(range, v); - } - - // Log some progress - log.info("{} computed digest for {} of {}", Thread.currentThread().getName(), range, Hex.encodeHexString(v.get())); - - try { - bw.addMutation(m); - } catch (MutationsRejectedException e) { - log.error("Could not write mutation", e); - throw new RuntimeException(e); - } - } - }); - } - - svc.shutdown(); - - // Wait indefinitely for the scans to complete - while (!svc.isTerminated()) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - log.error("Interrupted while waiting for executor service to gracefully complete. Exiting now"); - svc.shutdownNow(); - return; - } - } - } finally { - // We can only safely close this when we're exiting or we've completely all tasks - bw.close(); - } - } - - public TreeSet<Range> endRowsToRanges(Collection<Text> endRows) { - ArrayList<Text> sortedEndRows = new ArrayList<>(endRows); - Collections.sort(sortedEndRows); - - Text prevEndRow = null; - TreeSet<Range> ranges = new TreeSet<>(); - for (Text endRow : sortedEndRows) { - if (null == prevEndRow) { - ranges.add(new Range(null, false, endRow, true)); - } else { - ranges.add(new Range(prevEndRow, false, endRow, true)); - } - prevEndRow = endRow; - } - - ranges.add(new Range(prevEndRow, false, null, false)); - - return ranges; - } - - protected MessageDigest getDigestAlgorithm(String digestName) throws NoSuchAlgorithmException { - return MessageDigest.getInstance(digestName); - } - - public static void main(String[] args) throws Exception { - GenerateHashesOpts opts = new GenerateHashesOpts(); - BatchWriterOpts bwOpts = new BatchWriterOpts(); - opts.parseArgs(GenerateHashes.class.getName(), args, bwOpts); - - if (opts.isIteratorPushdown() && null != opts.getSplitsFile()) { - throw new IllegalArgumentException("Cannot use iterator pushdown with anything other than table split points"); - } - - GenerateHashes generate = new GenerateHashes(); - generate.run(opts); - } -}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ManualComparison.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ManualComparison.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ManualComparison.java deleted file mode 100644 index c1d6337..0000000 --- a/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ManualComparison.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.replication.merkle.cli; - -import java.util.Iterator; -import java.util.Map.Entry; - -import org.apache.accumulo.core.cli.ClientOpts; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; - -import com.beust.jcommander.Parameter; - -/** - * Accepts two table names and enumerates all key-values pairs in both checking for correctness. All differences between the two tables will be printed to the - * console. - */ -public class ManualComparison { - - public static class ManualComparisonOpts extends ClientOpts { - @Parameter(names = {"--table1"}, required = true, description = "First table") - public String table1; - - @Parameter(names = {"--table2"}, required = true, description = "First table") - public String table2; - } - - public static void main(String[] args) throws Exception { - ManualComparisonOpts opts = new ManualComparisonOpts(); - opts.parseArgs("ManualComparison", args); - - Connector conn = opts.getConnector(); - - Scanner s1 = conn.createScanner(opts.table1, Authorizations.EMPTY), s2 = conn.createScanner(opts.table2, Authorizations.EMPTY); - Iterator<Entry<Key,Value>> iter1 = s1.iterator(), iter2 = s2.iterator(); - boolean incrementFirst = true, incrementSecond = true; - - Entry<Key,Value> entry1 = iter1.next(), entry2 = iter2.next(); - while (iter1.hasNext() && iter2.hasNext()) { - if (incrementFirst) { - entry1 = iter1.next(); - } - if (incrementSecond) { - entry2 = iter2.next(); - } - incrementFirst = false; - incrementSecond = false; - - if (!entry1.equals(entry2)) { - - if (entry1.getKey().compareTo(entry2.getKey()) < 0) { - System.out.println("Exist in original " + entry1); - incrementFirst = true; - } else if (entry2.getKey().compareTo(entry1.getKey()) < 0) { - System.out.println("Exist in replica " + entry2); - incrementSecond = true; - } else { - System.out.println("Differ... " + entry1 + " " + entry2); - incrementFirst = true; - incrementSecond = true; - } - } else { - incrementFirst = true; - incrementSecond = true; - } - } - - System.out.println("\nExtra entries from " + opts.table1); - while (iter1.hasNext()) { - System.out.println(iter1.next()); - } - - System.out.println("\nExtra entries from " + opts.table2); - while (iter2.hasNext()) { - System.out.println(iter2.next()); - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/replication/merkle/ingest/RandomWorkload.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/ingest/RandomWorkload.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/ingest/RandomWorkload.java deleted file mode 100644 index 5558350..0000000 --- a/test/src/main/java/org/apache/accumulo/test/replication/merkle/ingest/RandomWorkload.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.replication.merkle.ingest; - -import java.util.Random; - -import org.apache.accumulo.core.cli.BatchWriterOpts; -import org.apache.accumulo.core.cli.ClientOnDefaultTable; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.hadoop.io.Text; - -import com.beust.jcommander.Parameter; - -/** - * Generates some random data with a given percent of updates to be deletes. - */ -public class RandomWorkload { - public static final String DEFAULT_TABLE_NAME = "randomWorkload"; - - public static class RandomWorkloadOpts extends ClientOnDefaultTable { - @Parameter(names = {"-n", "--num"}, required = true, description = "Num records to write") - public long numRecords; - - @Parameter(names = {"-r", "--rows"}, required = true, description = "Range of rows that can be generated") - public int rowMax; - - @Parameter(names = {"-cf", "--colfams"}, required = true, description = "Range of column families that can be generated") - public int cfMax; - - @Parameter(names = {"-cq", "--colquals"}, required = true, description = "Range of column qualifiers that can be generated") - public int cqMax; - - @Parameter(names = {"-d", "--deletes"}, required = false, description = "Percentage of updates that should be deletes") - public int deletePercent = 5; - - public RandomWorkloadOpts() { - super(DEFAULT_TABLE_NAME); - } - - public RandomWorkloadOpts(String tableName) { - super(tableName); - } - } - - public void run(RandomWorkloadOpts opts, BatchWriterConfig cfg) throws Exception { - run(opts.getConnector(), opts.getTableName(), cfg, opts.numRecords, opts.rowMax, opts.cfMax, opts.cqMax, opts.deletePercent); - } - - public void run(final Connector conn, final String tableName, final BatchWriterConfig cfg, final long numRecords, int rowMax, int cfMax, int cqMax, - int deletePercent) throws Exception { - - final Random rowRand = new Random(12345); - final Random cfRand = new Random(12346); - final Random cqRand = new Random(12347); - final Random deleteRand = new Random(12348); - long valueCounter = 0l; - - if (!conn.tableOperations().exists(tableName)) { - conn.tableOperations().create(tableName); - } - - BatchWriter bw = conn.createBatchWriter(tableName, cfg); - try { - final Text row = new Text(), cf = new Text(), cq = new Text(); - final Value value = new Value(); - for (long i = 0; i < numRecords; i++) { - row.set(Integer.toString(rowRand.nextInt(rowMax))); - cf.set(Integer.toString(cfRand.nextInt(cfMax))); - cq.set(Integer.toString(cqRand.nextInt(cqMax))); - - Mutation m = new Mutation(row); - - // Choose a random value between [0,100) - int deleteValue = deleteRand.nextInt(100); - - // putDelete if the value we chose is less than our delete percentage - if (deleteValue < deletePercent) { - m.putDelete(cf, cq); - } else { - value.set(Long.toString(valueCounter).getBytes()); - m.put(cf, cq, valueCounter, value); - } - - bw.addMutation(m); - - valueCounter++; - } - } finally { - bw.close(); - } - } - - public static void main(String[] args) throws Exception { - RandomWorkloadOpts opts = new RandomWorkloadOpts(); - BatchWriterOpts bwOpts = new BatchWriterOpts(); - opts.parseArgs(RandomWorkload.class.getSimpleName(), args, bwOpts); - - RandomWorkload rw = new RandomWorkload(); - - rw.run(opts, bwOpts.getBatchWriterConfig()); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java deleted file mode 100644 index fd19658..0000000 --- a/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * A <a href="http://en.wikipedia.org/wiki/Merkle_tree">Merkle tree</a> is a hash tree and can be used to evaluate equality over large - * files with the ability to ascertain what portions of the files differ. Each leaf of the Merkle tree is some hash of a - * portion of the file, with each leaf corresponding to some "range" within the source file. As such, if all leaves are - * considered as ranges of the source file, the "sum" of all leaves creates a contiguous range over the entire file. - * <p> - * The parent of any nodes (typically, a binary tree; however this is not required) is the concatenation of the hashes of - * the children. We can construct a full tree by walking up the tree, creating parents from children, until we have a root - * node. To check equality of two files that each have a merkle tree built, we can very easily compare the value of at the - * root of the Merkle tree to know whether or not the files are the same. - * <p> - * Additionally, in the situation where we have two files with we expect to be the same but are not, we can walk back down - * the tree, finding subtrees that are equal and subtrees that are not. Subtrees that are equal correspond to portions of - * the files which are identical, where subtrees that are not equal correspond to discrepancies between the two files. - * <p> - * We can apply this concept to Accumulo, treating a table as a file, and ranges within a file as an Accumulo Range. We can - * then compute the hashes over each of these Ranges and compute the entire Merkle tree to determine if two tables are - * equivalent. - * - * @since 1.7.0 - */ -package org.apache.accumulo.test.replication.merkle; - http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java deleted file mode 100644 index 769241e..0000000 --- a/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.replication.merkle.skvi; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.Collection; -import java.util.Map; - -import org.apache.accumulo.core.data.ByteSequence; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; - -/** - * {@link SortedKeyValueIterator} which attempts to compute a hash over some range of Key-Value pairs. - * <p> - * For the purposes of constructing a Merkle tree, this class will only generate a meaningful result if the (Batch)Scanner will compute a single digest over a - * Range. If the (Batch)Scanner stops and restarts in the middle of a session, incorrect values will be returned and the merkle tree will be invalid. - */ -public class DigestIterator implements SortedKeyValueIterator<Key,Value> { - public static final String HASH_NAME_KEY = "hash.name"; - - private MessageDigest digest; - private Key topKey; - private Value topValue; - private SortedKeyValueIterator<Key,Value> source; - - @Override - public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { - String hashName = options.get(HASH_NAME_KEY); - if (null == hashName) { - throw new IOException(HASH_NAME_KEY + " must be provided as option"); - } - - try { - this.digest = MessageDigest.getInstance(hashName); - } catch (NoSuchAlgorithmException e) { - throw new IOException(e); - } - - this.topKey = null; - this.topValue = null; - this.source = source; - } - - @Override - public boolean hasTop() { - return null != topKey; - } - - @Override - public void next() throws IOException { - // We can't call next() if we already consumed it all - if (!this.source.hasTop()) { - this.topKey = null; - this.topValue = null; - return; - } - - this.source.next(); - - consume(); - } - - @Override - public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { - this.source.seek(range, columnFamilies, inclusive); - - consume(); - } - - protected void consume() throws IOException { - digest.reset(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos); - - if (!this.source.hasTop()) { - this.topKey = null; - this.topValue = null; - - return; - } - - Key lastKeySeen = null; - while (this.source.hasTop()) { - baos.reset(); - - Key currentKey = this.source.getTopKey(); - lastKeySeen = currentKey; - - currentKey.write(dos); - this.source.getTopValue().write(dos); - - digest.update(baos.toByteArray()); - - this.source.next(); - } - - this.topKey = lastKeySeen; - this.topValue = new Value(digest.digest()); - } - - @Override - public Key getTopKey() { - return topKey; - } - - @Override - public Value getTopValue() { - return topValue; - } - - @Override - public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { - DigestIterator copy = new DigestIterator(); - try { - copy.digest = MessageDigest.getInstance(digest.getAlgorithm()); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException(e); - } - - copy.topKey = this.topKey; - copy.topValue = this.topValue; - copy.source = this.source.deepCopy(env); - - return copy; - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/scalability/Ingest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/scalability/Ingest.java b/test/src/main/java/org/apache/accumulo/test/scalability/Ingest.java deleted file mode 100644 index 1c99cce..0000000 --- a/test/src/main/java/org/apache/accumulo/test/scalability/Ingest.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.scalability; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.test.performance.ContinuousIngest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class Ingest extends ScaleTest { - - private static final Logger log = LoggerFactory.getLogger(Ingest.class); - - @Override - public void setup() { - - Connector conn = getConnector(); - String tableName = getTestProperty("TABLE"); - - // delete existing table - if (conn.tableOperations().exists(tableName)) { - System.out.println("Deleting existing table: " + tableName); - try { - conn.tableOperations().delete(tableName); - } catch (Exception e) { - log.error("Failed to delete table '" + tableName + "'.", e); - } - } - - // create table - try { - conn.tableOperations().create(tableName); - conn.tableOperations().addSplits(tableName, calculateSplits()); - conn.tableOperations().setProperty(tableName, "table.split.threshold", "256M"); - } catch (Exception e) { - log.error("Failed to create table '" + tableName + "'.", e); - } - - } - - @Override - public void client() { - - Connector conn = getConnector(); - String tableName = getTestProperty("TABLE"); - - // get batch writer configuration - long maxMemory = Long.parseLong(getTestProperty("MAX_MEMORY")); - long maxLatency = Long.parseLong(getTestProperty("MAX_LATENCY")); - int maxWriteThreads = Integer.parseInt(getTestProperty("NUM_THREADS")); - - // create batch writer - BatchWriter bw = null; - try { - bw = conn.createBatchWriter(tableName, new BatchWriterConfig().setMaxMemory(maxMemory).setMaxLatency(maxLatency, TimeUnit.MILLISECONDS) - .setMaxWriteThreads(maxWriteThreads)); - } catch (TableNotFoundException e) { - log.error("Table '" + tableName + "' not found.", e); - System.exit(-1); - } - - // configure writing - Random r = new Random(); - String ingestInstanceId = UUID.randomUUID().toString(); - long numIngestEntries = Long.parseLong(getTestProperty("NUM_ENTRIES")); - long minRow = 0L; - long maxRow = 9223372036854775807L; - int maxColF = 32767; - int maxColQ = 32767; - long count = 0; - long totalBytes = 0; - - ColumnVisibility cv = new ColumnVisibility(); - - // start timer - startTimer(); - - // write specified number of entries - while (count < numIngestEntries) { - count++; - long rowId = ContinuousIngest.genLong(minRow, maxRow, r); - Mutation m = ContinuousIngest.genMutation(rowId, r.nextInt(maxColF), r.nextInt(maxColQ), cv, ingestInstanceId.getBytes(UTF_8), count, null, r, false); - totalBytes += m.numBytes(); - try { - bw.addMutation(m); - } catch (MutationsRejectedException e) { - log.error("Mutations rejected.", e); - System.exit(-1); - } - } - - // close writer - try { - bw.close(); - } catch (MutationsRejectedException e) { - log.error("Could not close BatchWriter due to mutations being rejected.", e); - System.exit(-1); - } - - // stop timer - stopTimer(count, totalBytes); - } - - @Override - public void teardown() { - - Connector conn = getConnector(); - String tableName = getTestProperty("TABLE"); - - try { - conn.tableOperations().delete(tableName); - } catch (Exception e) { - log.error("Failed to delete table '" + tableName + "'", e); - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/scalability/Run.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/scalability/Run.java b/test/src/main/java/org/apache/accumulo/test/scalability/Run.java deleted file mode 100644 index ab16574..0000000 --- a/test/src/main/java/org/apache/accumulo/test/scalability/Run.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.scalability; - -import java.io.FileInputStream; -import java.net.InetAddress; -import java.util.Properties; - -import org.apache.accumulo.core.cli.Help; -import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.beust.jcommander.Parameter; - -public class Run { - - private static final Logger log = LoggerFactory.getLogger(Run.class); - - static class Opts extends Help { - @Parameter(names = "--testId", required = true) - String testId; - @Parameter(names = "--action", required = true, description = "one of 'setup', 'teardown' or 'client'") - String action; - @Parameter(names = "--count", description = "number of tablet servers", required = true) - int numTabletServers; - } - - public static void main(String[] args) throws Exception { - - final String sitePath = "/tmp/scale-site.conf"; - final String testPath = "/tmp/scale-test.conf"; - Opts opts = new Opts(); - opts.parseArgs(Run.class.getName(), args); - - Configuration conf = CachedConfiguration.getInstance(); - FileSystem fs; - fs = FileSystem.get(conf); - - fs.copyToLocalFile(new Path("/accumulo-scale/conf/site.conf"), new Path(sitePath)); - fs.copyToLocalFile(new Path(String.format("/accumulo-scale/conf/%s.conf", opts.testId)), new Path(testPath)); - - // load configuration file properties - Properties scaleProps = new Properties(); - Properties testProps = new Properties(); - try { - FileInputStream fis = new FileInputStream(sitePath); - try { - scaleProps.load(fis); - } finally { - fis.close(); - } - fis = new FileInputStream(testPath); - try { - testProps.load(fis); - } finally { - fis.close(); - } - } catch (Exception e) { - log.error("Error loading config file.", e); - } - - ScaleTest test = (ScaleTest) Class.forName(String.format("org.apache.accumulo.test.scalability.%s", opts.testId)).newInstance(); - - test.init(scaleProps, testProps, opts.numTabletServers); - - if (opts.action.equalsIgnoreCase("setup")) { - test.setup(); - } else if (opts.action.equalsIgnoreCase("client")) { - InetAddress addr = InetAddress.getLocalHost(); - String host = addr.getHostName(); - fs.createNewFile(new Path("/accumulo-scale/clients/" + host)); - test.client(); - fs.copyFromLocalFile(new Path("/tmp/scale.out"), new Path("/accumulo-scale/results/" + host)); - } else if (opts.action.equalsIgnoreCase("teardown")) { - test.teardown(); - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/scalability/ScaleTest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/scalability/ScaleTest.java b/test/src/main/java/org/apache/accumulo/test/scalability/ScaleTest.java deleted file mode 100644 index 2f82bfa..0000000 --- a/test/src/main/java/org/apache/accumulo/test/scalability/ScaleTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.scalability; - -import java.util.Properties; -import java.util.TreeSet; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.hadoop.io.Text; - -public abstract class ScaleTest { - - private Connector conn; - private Properties scaleProps; - private Properties testProps; - private int numTabletServers; - private long startTime; - - public void init(Properties scaleProps, Properties testProps, int numTabletServers) throws AccumuloException, AccumuloSecurityException { - - this.scaleProps = scaleProps; - this.testProps = testProps; - this.numTabletServers = numTabletServers; - - // get properties to create connector - String instanceName = this.scaleProps.getProperty("INSTANCE_NAME"); - String zookeepers = this.scaleProps.getProperty("ZOOKEEPERS"); - String user = this.scaleProps.getProperty("USER"); - String password = this.scaleProps.getProperty("PASSWORD"); - System.out.println(password); - - conn = new ZooKeeperInstance(new ClientConfiguration().withInstance(instanceName).withZkHosts(zookeepers)).getConnector(user, new PasswordToken(password)); - } - - protected void startTimer() { - startTime = System.currentTimeMillis(); - } - - protected void stopTimer(long numEntries, long numBytes) { - long endTime = System.currentTimeMillis(); - System.out.printf("ELAPSEDMS %d %d %d%n", endTime - startTime, numEntries, numBytes); - } - - public abstract void setup(); - - public abstract void client(); - - public abstract void teardown(); - - public TreeSet<Text> calculateSplits() { - int numSplits = numTabletServers - 1; - long distance = (Long.MAX_VALUE / numTabletServers) + 1; - long split = distance; - TreeSet<Text> keys = new TreeSet<>(); - for (int i = 0; i < numSplits; i++) { - keys.add(new Text(String.format("%016x", split))); - split += distance; - } - return keys; - } - - public Connector getConnector() { - return conn; - } - - public String getTestProperty(String key) { - return testProps.getProperty(key); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/stress/random/DataWriter.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/stress/random/DataWriter.java b/test/src/main/java/org/apache/accumulo/test/stress/random/DataWriter.java deleted file mode 100644 index e7158e2..0000000 --- a/test/src/main/java/org/apache/accumulo/test/stress/random/DataWriter.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.stress.random; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.MutationsRejectedException; - -public class DataWriter extends Stream<Void> { - private final BatchWriter writer; - private final RandomMutations mutations; - - public DataWriter(BatchWriter writer, RandomMutations mutations) { - this.writer = writer; - this.mutations = mutations; - } - - @Override - public Void next() { - try { - writer.addMutation(mutations.next()); - } catch (MutationsRejectedException e) { - throw new RuntimeException(e); - } - return null; - } - - @Override - public void finalize() { - try { - this.writer.close(); - } catch (MutationsRejectedException e) { - System.err.println("Error closing batch writer."); - e.printStackTrace(); - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/stress/random/IntArgValidator.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/stress/random/IntArgValidator.java b/test/src/main/java/org/apache/accumulo/test/stress/random/IntArgValidator.java deleted file mode 100644 index 5a5ad3e..0000000 --- a/test/src/main/java/org/apache/accumulo/test/stress/random/IntArgValidator.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.accumulo.test.stress.random; - -import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; - -import com.beust.jcommander.IValueValidator; -import com.beust.jcommander.ParameterException; - -public class IntArgValidator implements IValueValidator<Integer> { - - @Override - public void validate(String name, Integer value) throws ParameterException { - requireNonNull(value); - checkArgument(value > 0); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/stress/random/RandomByteArrays.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/stress/random/RandomByteArrays.java b/test/src/main/java/org/apache/accumulo/test/stress/random/RandomByteArrays.java deleted file mode 100644 index 405fabb..0000000 --- a/test/src/main/java/org/apache/accumulo/test/stress/random/RandomByteArrays.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.stress.random; - -/** - * A stream that will create random byte arrays as it is looped over. - */ -public class RandomByteArrays extends Stream<byte[]> { - private final RandomWithinRange random_arrays; - - public RandomByteArrays(RandomWithinRange random_arrays) { - this.random_arrays = random_arrays; - } - - @Override - public byte[] next() { - return random_arrays.next_bytes(); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/stress/random/RandomMutations.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/stress/random/RandomMutations.java b/test/src/main/java/org/apache/accumulo/test/stress/random/RandomMutations.java deleted file mode 100644 index db5da55..0000000 --- a/test/src/main/java/org/apache/accumulo/test/stress/random/RandomMutations.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.stress.random; - -import org.apache.accumulo.core.data.Mutation; - -public class RandomMutations extends Stream<Mutation> { - private final RandomByteArrays rows, column_families, column_qualifiers, values; - private final RandomWithinRange row_widths; - private final int max_cells_per_mutation; - private byte[] current_row; - private int cells_remaining_in_row; - - public RandomMutations(RandomByteArrays rows, RandomByteArrays column_families, RandomByteArrays column_qualifiers, RandomByteArrays values, - RandomWithinRange row_widths, int max_cells_per_mutation) { - this.rows = rows; - this.column_families = column_families; - this.column_qualifiers = column_qualifiers; - this.values = values; - this.row_widths = row_widths; - this.max_cells_per_mutation = (max_cells_per_mutation > 0 ? max_cells_per_mutation : Integer.MAX_VALUE); - - current_row = null; - cells_remaining_in_row = 0; - } - - // TODO should we care about timestamps? - @Override - public Mutation next() { - if (cells_remaining_in_row == 0) { - current_row = rows.next(); - cells_remaining_in_row = row_widths.next(); - } - Mutation m = new Mutation(current_row); - final int cells = Math.min(cells_remaining_in_row, max_cells_per_mutation); - for (int i = 1; i <= cells; i++) { - m.put(column_families.next(), column_qualifiers.next(), values.next()); - } - cells_remaining_in_row -= cells; - return m; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/stress/random/RandomWithinRange.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/stress/random/RandomWithinRange.java b/test/src/main/java/org/apache/accumulo/test/stress/random/RandomWithinRange.java deleted file mode 100644 index 06cea28..0000000 --- a/test/src/main/java/org/apache/accumulo/test/stress/random/RandomWithinRange.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.stress.random; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.util.Random; - -/** - * Class that returns positive integers between some minimum and maximum. - * - */ -public class RandomWithinRange { - private final Random random; - private final int min, max; - - public RandomWithinRange(int seed, int min, int max) { - this(new Random(seed), min, max); - } - - public RandomWithinRange(Random random, int min, int max) { - checkArgument(min > 0, "Min must be positive."); - checkArgument(max >= min, "Max must be greater than or equal to min."); - this.random = random; - this.min = min; - this.max = max; - } - - public int next() { - if (min == max) { - return min; - } else { - // we pick a random number that's between 0 and (max - min), then add - // min as an offset to get a random number that's [min, max) - return random.nextInt(max - min) + min; - } - } - - public byte[] next_bytes() { - byte[] b = new byte[next()]; - random.nextBytes(b); - return b; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/stress/random/Scan.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/stress/random/Scan.java b/test/src/main/java/org/apache/accumulo/test/stress/random/Scan.java deleted file mode 100644 index 3e8d5fd..0000000 --- a/test/src/main/java/org/apache/accumulo/test/stress/random/Scan.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.stress.random; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Random; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.io.Text; - -import com.google.common.collect.Lists; - -public class Scan { - - public static void main(String[] args) throws Exception { - ScanOpts opts = new ScanOpts(); - opts.parseArgs(Scan.class.getName(), args); - - Connector connector = opts.getConnector(); - Scanner scanner = connector.createScanner(opts.getTableName(), new Authorizations()); - - if (opts.isolate) { - scanner.enableIsolation(); - } - - Random tablet_index_generator = new Random(opts.scan_seed); - - LoopControl scanning_condition = opts.continuous ? new ContinuousLoopControl() : new IterativeLoopControl(opts.scan_iterations); - - while (scanning_condition.keepScanning()) { - Range range = pickRange(connector.tableOperations(), opts.getTableName(), tablet_index_generator); - scanner.setRange(range); - if (opts.batch_size > 0) { - scanner.setBatchSize(opts.batch_size); - } - try { - consume(scanner); - } catch (Exception e) { - System.err.println(String.format("Exception while scanning range %s. Check the state of Accumulo for errors.", range)); - throw e; - } - } - } - - public static void consume(Iterable<?> iterable) { - Iterator<?> itr = iterable.iterator(); - while (itr.hasNext()) { - itr.next(); - } - } - - public static Range pickRange(TableOperations tops, String table, Random r) throws TableNotFoundException, AccumuloSecurityException, AccumuloException { - ArrayList<Text> splits = Lists.newArrayList(tops.listSplits(table)); - if (splits.isEmpty()) { - return new Range(); - } else { - int index = r.nextInt(splits.size()); - Text endRow = splits.get(index); - Text startRow = index == 0 ? null : splits.get(index - 1); - return new Range(startRow, false, endRow, true); - } - } - - /* - * These interfaces + implementations are used to determine how many times the scanner should look up a random tablet and scan it. - */ - static interface LoopControl { - public boolean keepScanning(); - } - - // Does a finite number of iterations - static class IterativeLoopControl implements LoopControl { - private final int max; - private int current; - - public IterativeLoopControl(int max) { - this.max = max; - this.current = 0; - } - - @Override - public boolean keepScanning() { - if (current < max) { - ++current; - return true; - } else { - return false; - } - } - } - - // Does an infinite number of iterations - static class ContinuousLoopControl implements LoopControl { - @Override - public boolean keepScanning() { - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/stress/random/ScanOpts.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/stress/random/ScanOpts.java b/test/src/main/java/org/apache/accumulo/test/stress/random/ScanOpts.java deleted file mode 100644 index e3f73f7..0000000 --- a/test/src/main/java/org/apache/accumulo/test/stress/random/ScanOpts.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.stress.random; - -import org.apache.accumulo.core.cli.ClientOnDefaultTable; - -import com.beust.jcommander.Parameter; - -class ScanOpts extends ClientOnDefaultTable { - @Parameter(names = "--isolate", description = "true to turn on scan isolation, false to turn off. default is false.") - boolean isolate = false; - - @Parameter(names = "--num-iterations", description = "number of scan iterations") - int scan_iterations = 1024; - - @Parameter(names = "--continuous", description = "continuously scan the table. note that this overrides --num-iterations") - boolean continuous; - - @Parameter(names = "--scan-seed", description = "seed for randomly choosing tablets to scan") - int scan_seed = 1337; - - @Parameter(names = "--scan-batch-size", description = "scanner batch size") - int batch_size = -1; - - public ScanOpts() { - this(WriteOptions.DEFAULT_TABLE); - } - - public ScanOpts(String table) { - super(table); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/stress/random/Stream.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/stress/random/Stream.java b/test/src/main/java/org/apache/accumulo/test/stress/random/Stream.java deleted file mode 100644 index 72b31e5..0000000 --- a/test/src/main/java/org/apache/accumulo/test/stress/random/Stream.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.stress.random; - -import java.util.Iterator; - -/** - * Base class to model an infinite stream of data. A stream implements an iterator whose {{@link #hasNext()} method will always return true. - * - */ -public abstract class Stream<T> implements Iterator<T> { - - @Override - public final boolean hasNext() { - return true; - } - - @Override - public abstract T next(); - - @Override - public final void remove() { - throw new UnsupportedOperationException(); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/stress/random/Write.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/stress/random/Write.java b/test/src/main/java/org/apache/accumulo/test/stress/random/Write.java deleted file mode 100644 index ea6f164..0000000 --- a/test/src/main/java/org/apache/accumulo/test/stress/random/Write.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.stress.random; - -import org.apache.accumulo.core.cli.BatchWriterOpts; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; - -public class Write { - - public static void main(String[] args) throws Exception { - WriteOptions opts = new WriteOptions(); - BatchWriterOpts batch_writer_opts = new BatchWriterOpts(); - opts.parseArgs(Write.class.getName(), args, batch_writer_opts); - - opts.check(); - - Connector c = opts.getConnector(); - - if (opts.clear_table && c.tableOperations().exists(opts.getTableName())) { - try { - c.tableOperations().delete(opts.getTableName()); - } catch (TableNotFoundException e) { - System.err.println("Couldn't delete the table because it doesn't exist any more."); - } - } - - if (!c.tableOperations().exists(opts.getTableName())) { - try { - c.tableOperations().create(opts.getTableName()); - } catch (TableExistsException e) { - System.err.println("Couldn't create table ourselves, but that's ok. Continuing."); - } - } - - long writeDelay = opts.write_delay; - if (writeDelay < 0) { - writeDelay = 0; - } - - DataWriter dw = new DataWriter(c.createBatchWriter(opts.getTableName(), batch_writer_opts.getBatchWriterConfig()), new RandomMutations( - // rows - new RandomByteArrays(new RandomWithinRange(opts.row_seed, opts.rowMin(), opts.rowMax())), - // cfs - new RandomByteArrays(new RandomWithinRange(opts.cf_seed, opts.cfMin(), opts.cfMax())), - // cqs - new RandomByteArrays(new RandomWithinRange(opts.cq_seed, opts.cqMin(), opts.cqMax())), - // vals - new RandomByteArrays(new RandomWithinRange(opts.value_seed, opts.valueMin(), opts.valueMax())), - // number of cells per row - new RandomWithinRange(opts.row_width_seed, opts.rowWidthMin(), opts.rowWidthMax()), - // max cells per mutation - opts.max_cells_per_mutation)); - - while (true) { - dw.next(); - if (writeDelay > 0) { - Thread.sleep(writeDelay); - } - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/stress/random/WriteOptions.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/stress/random/WriteOptions.java b/test/src/main/java/org/apache/accumulo/test/stress/random/WriteOptions.java deleted file mode 100644 index f92a9eb..0000000 --- a/test/src/main/java/org/apache/accumulo/test/stress/random/WriteOptions.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.stress.random; - -import org.apache.accumulo.core.cli.ClientOnDefaultTable; - -import com.beust.jcommander.Parameter; - -class WriteOptions extends ClientOnDefaultTable { - static final String DEFAULT_TABLE = "stress_test"; - static final int DEFAULT_MIN = 1, DEFAULT_MAX = 128, DEFAULT_SPREAD = DEFAULT_MAX - DEFAULT_MIN; - - @Parameter(validateValueWith = IntArgValidator.class, names = "--min-row-size", description = "minimum row size") - Integer row_min; - - @Parameter(validateValueWith = IntArgValidator.class, names = "--max-row-size", description = "maximum row size") - Integer row_max; - - @Parameter(validateValueWith = IntArgValidator.class, names = "--min-cf-size", description = "minimum column family size") - Integer cf_min; - - @Parameter(validateValueWith = IntArgValidator.class, names = "--max-cf-size", description = "maximum column family size") - Integer cf_max; - - @Parameter(validateValueWith = IntArgValidator.class, names = "--min-cq-size", description = "minimum column qualifier size") - Integer cq_min; - - @Parameter(validateValueWith = IntArgValidator.class, names = "--max-cq-size", description = "maximum column qualifier size") - Integer cq_max; - - @Parameter(validateValueWith = IntArgValidator.class, names = "--min-value-size", description = "minimum value size") - Integer value_min; - - @Parameter(validateValueWith = IntArgValidator.class, names = "--max-value-size", description = "maximum value size") - Integer value_max; - - @Parameter(validateValueWith = IntArgValidator.class, names = "--min-row-width", description = "minimum row width") - Integer row_width_min; - - @Parameter(validateValueWith = IntArgValidator.class, names = "--max-row-width", description = "maximum row width") - Integer row_width_max; - - @Parameter(names = "--clear-table", description = "clears the table before ingesting") - boolean clear_table; - - @Parameter(names = "--row-seed", description = "seed for generating rows") - int row_seed = 87; - - @Parameter(names = "--cf-seed", description = "seed for generating column families") - int cf_seed = 7; - - @Parameter(names = "--cq-seed", description = "seed for generating column qualifiers") - int cq_seed = 43; - - @Parameter(names = "--value-seed", description = "seed for generating values") - int value_seed = 99; - - @Parameter(names = "--row-width-seed", description = "seed for generating the number of cells within a row (a row's \"width\")") - int row_width_seed = 444; - - @Parameter(names = "--max-cells-per-mutation", description = "maximum number of cells per mutation; non-positive value implies no limit") - int max_cells_per_mutation = -1; - - @Parameter(names = "--write-delay", description = "milliseconds to wait between writes") - long write_delay = 0L; - - public WriteOptions(String table) { - super(table); - } - - public WriteOptions() { - this(DEFAULT_TABLE); - } - - private static int minOrDefault(Integer ref) { - return ref == null ? DEFAULT_MIN : ref; - } - - private static int calculateMax(Integer min_ref, Integer max_ref) { - if (max_ref == null) { - if (min_ref == null) { - return DEFAULT_MAX; - } else { - return min_ref + DEFAULT_SPREAD; - } - } else { - return max_ref; - } - } - - public void check() { - checkPair("ROW", row_min, row_max); - checkPair("COLUMN FAMILY", cf_min, cf_max); - checkPair("COLUMN QUALIFIER", cq_min, cq_max); - checkPair("VALUE", value_min, value_max); - } - - public void checkPair(String label, Integer min_ref, Integer max_ref) { - // we've already asserted that the numbers will either be - // 1) null - // 2) positive - // need to verify that they're coherent here - - if (min_ref == null && max_ref != null) { - // we don't support just specifying a max yet - throw new IllegalArgumentException(String.format("[%s] Maximum value supplied, but no minimum. Must supply a minimum with a maximum value.", label)); - } else if (min_ref != null && max_ref != null) { - // if a user supplied lower and upper bounds, we need to verify - // that min <= max - if (min_ref.compareTo(max_ref) > 0) { - throw new IllegalArgumentException(String.format("[%s] Min value (%d) is greater than max value (%d)", label, min_ref, max_ref)); - } - } - } - - public int rowMin() { - return minOrDefault(row_min); - } - - public int rowMax() { - return calculateMax(row_min, row_max); - } - - public int cfMin() { - return minOrDefault(cf_min); - } - - public int cfMax() { - return calculateMax(cf_min, cf_max); - } - - public int cqMin() { - return minOrDefault(cq_min); - } - - public int cqMax() { - return calculateMax(cq_min, cq_max); - } - - public int valueMin() { - return minOrDefault(value_min); - } - - public int valueMax() { - return calculateMax(value_min, value_max); - } - - public int rowWidthMin() { - return minOrDefault(row_width_min); - } - - public int rowWidthMax() { - return calculateMax(row_width_min, row_width_max); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/src/main/java/org/apache/accumulo/test/stress/random/package-info.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/stress/random/package-info.java b/test/src/main/java/org/apache/accumulo/test/stress/random/package-info.java deleted file mode 100644 index fdbf72e..0000000 --- a/test/src/main/java/org/apache/accumulo/test/stress/random/package-info.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * This package contains utility classes designed to test Accumulo when large cells are being written. This is an attempt to observe the behavior Accumulo - * displays when compacting and reading these cells. - * - * There are two components to this package: {@link org.apache.accumulo.test.stress.random.Write} and {@link org.apache.accumulo.test.stress.random.Scan}. - * - * The {@link org.apache.accumulo.test.stress.random.Write} provides facilities for writing random sized cells. Users can configure minimum and maximum - * sized portions of a cell. The portions users can configure are the row, column family, column qualifier and value. Note that the sizes are uniformly - * distributed between the minimum and maximum values. See {@link org.apache.accumulo.test.stress.random.WriteOptions} for available options and default sizing - * information. - * - * The Scan provides users with the ability to query tables generated by the Write. It will pick a tablet at random and scan the entire range. The - * amount of times this process is done is user configurable. By default, it happens 1,024 times. Users can also specify whether or not the scan should be - * isolated or not. - * - * There is no shared state intended by either of these services. This allows multiple clients to be run in parallel, either on the same host or distributed - * across hosts. - */ -package org.apache.accumulo.test.stress.random; - http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/system/agitator/.gitignore ---------------------------------------------------------------------- diff --git a/test/system/agitator/.gitignore b/test/system/agitator/.gitignore deleted file mode 100644 index 3429b01..0000000 --- a/test/system/agitator/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -*~ -*.ini -*.pyc http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/system/agitator/README.md ---------------------------------------------------------------------- diff --git a/test/system/agitator/README.md b/test/system/agitator/README.md deleted file mode 100644 index 8abb74c..0000000 --- a/test/system/agitator/README.md +++ /dev/null @@ -1,39 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---> - -Agitator: randomly kill processes -=========================== - -The agitator is used to randomly select processes for termination during -system test. - -Configure the agitator using the example agitator.ini file provided. - -Create a list of hosts to be agitated: - - $ cp ../../../conf/tservers hosts - $ echo master >> hosts - $ echo namenode >> hosts - -The agitator can be used to kill and restart any part of the accumulo -ecosystem: zookeepers, namenode, datanodes, tablet servers and master. -You can choose to agitate them all with "--all" - - $ ./agitator.py --all --hosts=hosts --config=agitator.ini --log DEBUG - -You will need to be able to ssh, without passwords, to all your hosts as -the user that can kill and start the services. http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/system/agitator/agitator.ini.example ---------------------------------------------------------------------- diff --git a/test/system/agitator/agitator.ini.example b/test/system/agitator/agitator.ini.example deleted file mode 100644 index 3512561..0000000 --- a/test/system/agitator/agitator.ini.example +++ /dev/null @@ -1,56 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -[DEFAULT] -install=%(env.pwd)s/../../../.. -user=%(env.user)s - -[agitator] -kill=kill -9 -ssh=ssh -q -A -o StrictHostKeyChecking=no -sleep=300 -sleep.restart=30 -sleep.jitter=30 - -[accumulo] -home=%(install)s/accumulo -tserver.kill.min=1 -tserver.kill.max=1 -tserver.frequency=0.8 - -master.kill.min=1 -master.kill.max=1 -master.frequency=0.1 - -gc.kill.min=1 -gc.kill.max=1 -gc.frequency=0.1 - -[hadoop] -home=%(install)s/hadoop -bin=%(home)s/bin -datanode.frequency=0.8 -datanode.kill.min=1 -datanode.kill.max=1 -namenode.frequency=0.05 -namenode.kill.min=1 -namenode.kill.max=1 -secondarynamenode.frequency=0.05 -secondarynamenode.kill.min=1 -secondarynamenode.kill.max=1 - -[zookeeper] -home=%(install)s/zookeeper -frequency=0.05 http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/system/agitator/agitator.py ---------------------------------------------------------------------- diff --git a/test/system/agitator/agitator.py b/test/system/agitator/agitator.py deleted file mode 100755 index db94546..0000000 --- a/test/system/agitator/agitator.py +++ /dev/null @@ -1,241 +0,0 @@ -#! /usr/bin/python - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import random -import logging -import ConfigParser - -# add the environment variables as default settings -import os -defaults=dict([('env.' + k, v) for k, v in os.environ.iteritems()]) -config = ConfigParser.ConfigParser(defaults) - -# things you can do to a particular kind of process -class Proc: - program = 'Unknown' - _frequencyToKill = 1.0 - - def start(self, host): - pass - - def find(self, host): - pass - - def numberToKill(self): - return (1, 1) - - def frequencyToKill(self): - return self._frequencyToKill - - def user(self): - return config.get(self.program, 'user') - - def kill(self, host, pid): - kill = config.get('agitator', 'kill').split() - code, stdout, stderr = self.runOn(host, kill + [pid]) - if code != 0: - raise logging.warn("Unable to kill %d on %s (%s)", pid, host, stderr) - - def runOn(self, host, cmd): - ssh = config.get('agitator', 'ssh').split() - return self.run(ssh + ["%s@%s" % (self.user(), host)] + cmd) - - def run(self, cmd): - import subprocess - cmd = map(str, cmd) - logging.debug('Running %s', cmd) - p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = p.communicate() - if stdout.strip(): - logging.debug("%s", stdout.strip()) - if stderr.strip(): - logging.error("%s", stderr.strip()) - if p.returncode != 0: - logging.error("Problem running %s", ' '.join(cmd)) - return p.returncode, stdout, stderr - - def __repr__(self): - return self.program - -class Zookeeper(Proc): - program = 'zookeeper' - def __init__(self): - self._frequencyToKill = config.getfloat(self.program, 'frequency') - - def start(self, host): - self.runOn(host, [config.get(self.program, 'home') + '/bin/zkServer.sh start']) - - def find(self, host): - code, stdout, stderr = self.runOn(host, ['pgrep -f [Q]uorumPeerMain || true']) - return map(int, [line for line in stdout.split("\n") if line]) - -class Hadoop(Proc): - section = 'hadoop' - def __init__(self, program): - self.program = program - self._frequencyToKill = config.getfloat(self.section, program + '.frequency') - self.minimumToKill = config.getint(self.section, program + '.kill.min') - self.maximumToKill = config.getint(self.section, program + '.kill.max') - - def start(self, host): - binDir = config.get(self.section, 'bin') - self.runOn(host, ['nohup %s/hdfs %s < /dev/null >/dev/null 2>&1 &' %(binDir, self.program)]) - - def find(self, host): - code, stdout, stderr = self.runOn(host, ["pgrep -f 'proc[_]%s' || true" % (self.program,)]) - return map(int, [line for line in stdout.split("\n") if line]) - - def numberToKill(self): - return (self.minimumToKill, self.maximumToKill) - - def user(self): - return config.get(self.section, 'user') - -class Accumulo(Hadoop): - section = 'accumulo' - def start(self, host): - home = config.get(self.section, 'home') - self.runOn(host, ['nohup %s/bin/accumulo %s </dev/null >/dev/null 2>&1 & ' %(home, self.program)]) - - def find(self, host): - code, stdout, stderr = self.runOn(host, ["pgrep -f 'app[=]%s' || true" % self.program]) - return map(int, [line for line in stdout.split("\n") if line]) - -def fail(msg): - import sys - logging.critical(msg) - sys.exit(1) - -def jitter(n): - return random.random() * n - n / 2 - -def sleep(n): - if n > 0: - logging.info("Sleeping %.2f", n) - import time - time.sleep(n) - -def agitate(hosts, procs): - starters = [] - - logging.info("Agitating %s on %d hosts" % (procs, len(hosts))) - - section = 'agitator' - - # repeatedly... - while True: - if starters: - # start up services that were previously killed - t = max(0, config.getfloat(section, 'sleep.restart') + jitter(config.getfloat(section, 'sleep.jitter'))) - sleep(t) - for host, proc in starters: - logging.info('Starting %s on %s', proc, host) - proc.start(host) - starters = [] - - # wait some time - t = max(0, config.getfloat(section, 'sleep') + jitter(config.getfloat(section, 'sleep.jitter'))) - sleep(t) - - # for some processes - for p in procs: - - # roll dice: should it be killed? - if random.random() < p.frequencyToKill(): - - # find them - from multiprocessing import Pool - def finder(host): - return host, p.find(host) - with Pool(5) as pool: - result = pool.map(finder, hosts) - candidates = {} - for host, pids in result: - if pids: - candidates[host] = pids - - # how many? - minKill, maxKill = p.numberToKill() - count = min(random.randrange(minKill, maxKill + 1), len(candidates)) - - # pick the victims - doomedHosts = random.sample(candidates.keys(), count) - - # kill them - logging.info("Killing %s on %s", p, doomedHosts) - for doomedHost in doomedHosts: - pids = candidates[doomedHost] - if not pids: - logging.error("Unable to kill any %s on %s: no processes of that type are running", p, doomedHost) - else: - pid = random.choice(pids) - logging.debug("Killing %s (%d) on %s", p, pid, doomedHost) - p.kill(doomedHost, pid) - # remember to restart them later - starters.append((doomedHost, p)) - -def main(): - import argparse - parser = argparse.ArgumentParser(description='Kill random processes') - parser.add_argument('--log', help='set the log level', default='INFO') - parser.add_argument('--namenodes', help='randomly kill namenodes', action="store_true") - parser.add_argument('--secondary', help='randomly kill secondary namenode', action="store_true") - parser.add_argument('--datanodes', help='randomly kill datanodes', action="store_true") - parser.add_argument('--tservers', help='randomly kill tservers', action="store_true") - parser.add_argument('--masters', help='randomly kill masters', action="store_true") - parser.add_argument('--zookeepers', help='randomly kill zookeepers', action="store_true") - parser.add_argument('--gc', help='randomly kill the file garbage collector', action="store_true") - parser.add_argument('--all', - help='kill any of the tservers, masters, datanodes, namenodes or zookeepers', - action='store_true') - parser.add_argument('--hosts', type=argparse.FileType('r'), required=True) - parser.add_argument('--config', type=argparse.FileType('r'), required=True) - args = parser.parse_args() - - config.readfp(args.config) - - level = getattr(logging, args.log.upper(), None) - if isinstance(level, int): - logging.basicConfig(level=level) - - procs = [] - def addIf(flag, proc): - if flag or args.all: - procs.append(proc) - - addIf(args.namenodes, Hadoop('namenode')) - addIf(args.datanodes, Hadoop('datanode')) - addIf(args.secondary, Hadoop('secondarynamenode')) - addIf(args.tservers, Accumulo('tserver')) - addIf(args.masters, Accumulo('master')) - addIf(args.gc, Accumulo('gc')) - addIf(args.zookeepers, Zookeeper()) - if len(procs) == 0: - fail("No processes to agitate!\n") - - hosts = [] - for line in args.hosts.readlines(): - line = line.strip() - if line and line[0] != '#': - hosts.append(line) - if not hosts: - fail('No hosts to agitate!\n') - - agitate(hosts, procs) - -if __name__ == '__main__': - main() http://git-wip-us.apache.org/repos/asf/accumulo/blob/81f215c0/test/system/agitator/hosts.example ---------------------------------------------------------------------- diff --git a/test/system/agitator/hosts.example b/test/system/agitator/hosts.example deleted file mode 100644 index 63fb8bb..0000000 --- a/test/system/agitator/hosts.example +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -localhost