http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/ManualComparison.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/ManualComparison.java b/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/ManualComparison.java new file mode 100644 index 0000000..87489f6 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/merkle/cli/ManualComparison.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.merkle.cli; + +import java.util.Iterator; +import java.util.Map.Entry; + +import org.apache.accumulo.core.cli.ClientOpts; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; + +import com.beust.jcommander.Parameter; + +/** + * Accepts two table names and enumerates all key-values pairs in both checking for correctness. All differences between the two tables will be printed to the + * console. + */ +public class ManualComparison { + + public static class ManualComparisonOpts extends ClientOpts { + @Parameter(names = {"--table1"}, required = true, description = "First table") + public String table1; + + @Parameter(names = {"--table2"}, required = true, description = "First table") + public String table2; + } + + public static void main(String[] args) throws Exception { + ManualComparisonOpts opts = new ManualComparisonOpts(); + opts.parseArgs("ManualComparison", args); + + Connector conn = opts.getConnector(); + + Scanner s1 = conn.createScanner(opts.table1, Authorizations.EMPTY), s2 = conn.createScanner(opts.table2, Authorizations.EMPTY); + Iterator<Entry<Key,Value>> iter1 = s1.iterator(), iter2 = s2.iterator(); + boolean incrementFirst = true, incrementSecond = true; + + Entry<Key,Value> entry1 = iter1.next(), entry2 = iter2.next(); + while (iter1.hasNext() && iter2.hasNext()) { + if (incrementFirst) { + entry1 = iter1.next(); + } + if (incrementSecond) { + entry2 = iter2.next(); + } + incrementFirst = false; + incrementSecond = false; + + if (!entry1.equals(entry2)) { + + if (entry1.getKey().compareTo(entry2.getKey()) < 0) { + System.out.println("Exist in original " + entry1); + incrementFirst = true; + } else if (entry2.getKey().compareTo(entry1.getKey()) < 0) { + System.out.println("Exist in replica " + entry2); + incrementSecond = true; + } else { + System.out.println("Differ... " + entry1 + " " + entry2); + incrementFirst = true; + incrementSecond = true; + } + } else { + incrementFirst = true; + incrementSecond = true; + } + } + + System.out.println("\nExtra entries from " + opts.table1); + while (iter1.hasNext()) { + System.out.println(iter1.next()); + } + + System.out.println("\nExtra entries from " + opts.table2); + while (iter2.hasNext()) { + System.out.println(iter2.next()); + } + } +}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/merkle/ingest/RandomWorkload.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/merkle/ingest/RandomWorkload.java b/core/src/main/java/org/apache/accumulo/testing/core/merkle/ingest/RandomWorkload.java new file mode 100644 index 0000000..440b009 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/merkle/ingest/RandomWorkload.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.merkle.ingest; + +import java.util.Random; + +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ClientOnDefaultTable; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.Text; + +import com.beust.jcommander.Parameter; + +/** + * Generates some random data with a given percent of updates to be deletes. + */ +public class RandomWorkload { + public static final String DEFAULT_TABLE_NAME = "randomWorkload"; + + public static class RandomWorkloadOpts extends ClientOnDefaultTable { + @Parameter(names = {"-n", "--num"}, required = true, description = "Num records to write") + public long numRecords; + + @Parameter(names = {"-r", "--rows"}, required = true, description = "Range of rows that can be generated") + public int rowMax; + + @Parameter(names = {"-cf", "--colfams"}, required = true, description = "Range of column families that can be generated") + public int cfMax; + + @Parameter(names = {"-cq", "--colquals"}, required = true, description = "Range of column qualifiers that can be generated") + public int cqMax; + + @Parameter(names = {"-d", "--deletes"}, required = false, description = "Percentage of updates that should be deletes") + public int deletePercent = 5; + + public RandomWorkloadOpts() { + super(DEFAULT_TABLE_NAME); + } + + public RandomWorkloadOpts(String tableName) { + super(tableName); + } + } + + public void run(RandomWorkloadOpts opts, BatchWriterConfig cfg) throws Exception { + run(opts.getConnector(), opts.getTableName(), cfg, opts.numRecords, opts.rowMax, opts.cfMax, opts.cqMax, opts.deletePercent); + } + + public void run(final Connector conn, final String tableName, final BatchWriterConfig cfg, final long numRecords, int rowMax, int cfMax, int cqMax, + int deletePercent) throws Exception { + + final Random rowRand = new Random(12345); + final Random cfRand = new Random(12346); + final Random cqRand = new Random(12347); + final Random deleteRand = new Random(12348); + long valueCounter = 0l; + + if (!conn.tableOperations().exists(tableName)) { + conn.tableOperations().create(tableName); + } + + BatchWriter bw = conn.createBatchWriter(tableName, cfg); + try { + final Text row = new Text(), cf = new Text(), cq = new Text(); + final Value value = new Value(); + for (long i = 0; i < numRecords; i++) { + row.set(Integer.toString(rowRand.nextInt(rowMax))); + cf.set(Integer.toString(cfRand.nextInt(cfMax))); + cq.set(Integer.toString(cqRand.nextInt(cqMax))); + + Mutation m = new Mutation(row); + + // Choose a random value between [0,100) + int deleteValue = deleteRand.nextInt(100); + + // putDelete if the value we chose is less than our delete percentage + if (deleteValue < deletePercent) { + m.putDelete(cf, cq); + } else { + value.set(Long.toString(valueCounter).getBytes()); + m.put(cf, cq, valueCounter, value); + } + + bw.addMutation(m); + + valueCounter++; + } + } finally { + bw.close(); + } + } + + public static void main(String[] args) throws Exception { + RandomWorkloadOpts opts = new RandomWorkloadOpts(); + BatchWriterOpts bwOpts = new BatchWriterOpts(); + opts.parseArgs(RandomWorkload.class.getSimpleName(), args, bwOpts); + + RandomWorkload rw = new RandomWorkload(); + + rw.run(opts, bwOpts.getBatchWriterConfig()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/merkle/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/merkle/package-info.java b/core/src/main/java/org/apache/accumulo/testing/core/merkle/package-info.java new file mode 100644 index 0000000..5b27e4b --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/merkle/package-info.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * A <a href="http://en.wikipedia.org/wiki/Merkle_tree">Merkle tree</a> is a hash tree and can be used to evaluate equality over large + * files with the ability to ascertain what portions of the files differ. Each leaf of the Merkle tree is some hash of a + * portion of the file, with each leaf corresponding to some "range" within the source file. As such, if all leaves are + * considered as ranges of the source file, the "sum" of all leaves creates a contiguous range over the entire file. + * <p> + * The parent of any nodes (typically, a binary tree; however this is not required) is the concatenation of the hashes of + * the children. We can construct a full tree by walking up the tree, creating parents from children, until we have a root + * node. To check equality of two files that each have a merkle tree built, we can very easily compare the value of at the + * root of the Merkle tree to know whether or not the files are the same. + * <p> + * Additionally, in the situation where we have two files with we expect to be the same but are not, we can walk back down + * the tree, finding subtrees that are equal and subtrees that are not. Subtrees that are equal correspond to portions of + * the files which are identical, where subtrees that are not equal correspond to discrepancies between the two files. + * <p> + * We can apply this concept to Accumulo, treating a table as a file, and ranges within a file as an Accumulo Range. We can + * then compute the hashes over each of these Ranges and compute the entire Merkle tree to determine if two tables are + * equivalent. + * + * @since 1.7.0 + */ +package org.apache.accumulo.testing.core.merkle; + http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/merkle/skvi/DigestIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/merkle/skvi/DigestIterator.java b/core/src/main/java/org/apache/accumulo/testing/core/merkle/skvi/DigestIterator.java new file mode 100644 index 0000000..ab7ad43 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/merkle/skvi/DigestIterator.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testin.core.merkle.skvi; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Collection; +import java.util.Map; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; + +/** + * {@link SortedKeyValueIterator} which attempts to compute a hash over some range of Key-Value pairs. + * <p> + * For the purposes of constructing a Merkle tree, this class will only generate a meaningful result if the (Batch)Scanner will compute a single digest over a + * Range. If the (Batch)Scanner stops and restarts in the middle of a session, incorrect values will be returned and the merkle tree will be invalid. + */ +public class DigestIterator implements SortedKeyValueIterator<Key,Value> { + public static final String HASH_NAME_KEY = "hash.name"; + + private MessageDigest digest; + private Key topKey; + private Value topValue; + private SortedKeyValueIterator<Key,Value> source; + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + String hashName = options.get(HASH_NAME_KEY); + if (null == hashName) { + throw new IOException(HASH_NAME_KEY + " must be provided as option"); + } + + try { + this.digest = MessageDigest.getInstance(hashName); + } catch (NoSuchAlgorithmException e) { + throw new IOException(e); + } + + this.topKey = null; + this.topValue = null; + this.source = source; + } + + @Override + public boolean hasTop() { + return null != topKey; + } + + @Override + public void next() throws IOException { + // We can't call next() if we already consumed it all + if (!this.source.hasTop()) { + this.topKey = null; + this.topValue = null; + return; + } + + this.source.next(); + + consume(); + } + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + this.source.seek(range, columnFamilies, inclusive); + + consume(); + } + + protected void consume() throws IOException { + digest.reset(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + + if (!this.source.hasTop()) { + this.topKey = null; + this.topValue = null; + + return; + } + + Key lastKeySeen = null; + while (this.source.hasTop()) { + baos.reset(); + + Key currentKey = this.source.getTopKey(); + lastKeySeen = currentKey; + + currentKey.write(dos); + this.source.getTopValue().write(dos); + + digest.update(baos.toByteArray()); + + this.source.next(); + } + + this.topKey = lastKeySeen; + this.topValue = new Value(digest.digest()); + } + + @Override + public Key getTopKey() { + return topKey; + } + + @Override + public Value getTopValue() { + return topValue; + } + + @Override + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + DigestIterator copy = new DigestIterator(); + try { + copy.digest = MessageDigest.getInstance(digest.getAlgorithm()); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + + copy.topKey = this.topKey; + copy.topValue = this.topValue; + copy.source = this.source.deepCopy(env); + + return copy; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/scalability/Ingest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/scalability/Ingest.java b/core/src/main/java/org/apache/accumulo/testing/core/scalability/Ingest.java new file mode 100644 index 0000000..1a61eea --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/scalability/Ingest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.scalability; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.testing.core.continuous.ContinuousIngest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Ingest extends ScaleTest { + + private static final Logger log = LoggerFactory.getLogger(Ingest.class); + + @Override + public void setup() { + + Connector conn = getConnector(); + String tableName = getTestProperty("TABLE"); + + // delete existing table + if (conn.tableOperations().exists(tableName)) { + System.out.println("Deleting existing table: " + tableName); + try { + conn.tableOperations().delete(tableName); + } catch (Exception e) { + log.error("Failed to delete table '" + tableName + "'.", e); + } + } + + // create table + try { + conn.tableOperations().create(tableName); + conn.tableOperations().addSplits(tableName, calculateSplits()); + conn.tableOperations().setProperty(tableName, "table.split.threshold", "256M"); + } catch (Exception e) { + log.error("Failed to create table '" + tableName + "'.", e); + } + + } + + @Override + public void client() { + + Connector conn = getConnector(); + String tableName = getTestProperty("TABLE"); + + // get batch writer configuration + long maxMemory = Long.parseLong(getTestProperty("MAX_MEMORY")); + long maxLatency = Long.parseLong(getTestProperty("MAX_LATENCY")); + int maxWriteThreads = Integer.parseInt(getTestProperty("NUM_THREADS")); + + // create batch writer + BatchWriter bw = null; + try { + bw = conn.createBatchWriter(tableName, new BatchWriterConfig().setMaxMemory(maxMemory).setMaxLatency(maxLatency, TimeUnit.MILLISECONDS) + .setMaxWriteThreads(maxWriteThreads)); + } catch (TableNotFoundException e) { + log.error("Table '" + tableName + "' not found.", e); + System.exit(-1); + } + + // configure writing + Random r = new Random(); + String ingestInstanceId = UUID.randomUUID().toString(); + long numIngestEntries = Long.parseLong(getTestProperty("NUM_ENTRIES")); + long minRow = 0L; + long maxRow = 9223372036854775807L; + int maxColF = 32767; + int maxColQ = 32767; + long count = 0; + long totalBytes = 0; + + ColumnVisibility cv = new ColumnVisibility(); + + // start timer + startTimer(); + + // write specified number of entries + while (count < numIngestEntries) { + count++; + long rowId = ContinuousIngest.genLong(minRow, maxRow, r); + Mutation m = ContinuousIngest.genMutation(rowId, r.nextInt(maxColF), r.nextInt(maxColQ), cv, ingestInstanceId.getBytes(UTF_8), count, null, false); + totalBytes += m.numBytes(); + try { + bw.addMutation(m); + } catch (MutationsRejectedException e) { + log.error("Mutations rejected.", e); + System.exit(-1); + } + } + + // close writer + try { + bw.close(); + } catch (MutationsRejectedException e) { + log.error("Could not close BatchWriter due to mutations being rejected.", e); + System.exit(-1); + } + + // stop timer + stopTimer(count, totalBytes); + } + + @Override + public void teardown() { + + Connector conn = getConnector(); + String tableName = getTestProperty("TABLE"); + + try { + conn.tableOperations().delete(tableName); + } catch (Exception e) { + log.error("Failed to delete table '" + tableName + "'", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/scalability/Run.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/scalability/Run.java b/core/src/main/java/org/apache/accumulo/testing/core/scalability/Run.java new file mode 100644 index 0000000..f7f7458 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/scalability/Run.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.scalability; + +import java.io.FileInputStream; +import java.net.InetAddress; +import java.util.Properties; + +import org.apache.accumulo.core.cli.Help; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.Parameter; + +public class Run { + + private static final Logger log = LoggerFactory.getLogger(Run.class); + + static class Opts extends Help { + @Parameter(names = "--testId", required = true) + String testId; + @Parameter(names = "--action", required = true, description = "one of 'setup', 'teardown' or 'client'") + String action; + @Parameter(names = "--count", description = "number of tablet servers", required = true) + int numTabletServers; + } + + public static void main(String[] args) throws Exception { + + final String sitePath = "/tmp/scale-site.conf"; + final String testPath = "/tmp/scale-test.conf"; + Opts opts = new Opts(); + opts.parseArgs(Run.class.getName(), args); + + Configuration conf = CachedConfiguration.getInstance(); + FileSystem fs; + fs = FileSystem.get(conf); + + fs.copyToLocalFile(new Path("/accumulo-scale/conf/site.conf"), new Path(sitePath)); + fs.copyToLocalFile(new Path(String.format("/accumulo-scale/conf/%s.conf", opts.testId)), new Path(testPath)); + + // load configuration file properties + Properties scaleProps = new Properties(); + Properties testProps = new Properties(); + try { + FileInputStream fis = new FileInputStream(sitePath); + try { + scaleProps.load(fis); + } finally { + fis.close(); + } + fis = new FileInputStream(testPath); + try { + testProps.load(fis); + } finally { + fis.close(); + } + } catch (Exception e) { + log.error("Error loading config file.", e); + } + + ScaleTest test = (ScaleTest) Class.forName(String.format("org.apache.accumulo.test.scalability.%s", opts.testId)).newInstance(); + + test.init(scaleProps, testProps, opts.numTabletServers); + + if (opts.action.equalsIgnoreCase("setup")) { + test.setup(); + } else if (opts.action.equalsIgnoreCase("client")) { + InetAddress addr = InetAddress.getLocalHost(); + String host = addr.getHostName(); + fs.createNewFile(new Path("/accumulo-scale/clients/" + host)); + test.client(); + fs.copyFromLocalFile(new Path("/tmp/scale.out"), new Path("/accumulo-scale/results/" + host)); + } else if (opts.action.equalsIgnoreCase("teardown")) { + test.teardown(); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/scalability/ScaleTest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/scalability/ScaleTest.java b/core/src/main/java/org/apache/accumulo/testing/core/scalability/ScaleTest.java new file mode 100644 index 0000000..a78f39e --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/scalability/ScaleTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.scalability; + +import java.util.Properties; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.hadoop.io.Text; + +public abstract class ScaleTest { + + private Connector conn; + private Properties scaleProps; + private Properties testProps; + private int numTabletServers; + private long startTime; + + public void init(Properties scaleProps, Properties testProps, int numTabletServers) throws AccumuloException, AccumuloSecurityException { + + this.scaleProps = scaleProps; + this.testProps = testProps; + this.numTabletServers = numTabletServers; + + // get properties to create connector + String instanceName = this.scaleProps.getProperty("INSTANCE_NAME"); + String zookeepers = this.scaleProps.getProperty("ZOOKEEPERS"); + String user = this.scaleProps.getProperty("USER"); + String password = this.scaleProps.getProperty("PASSWORD"); + System.out.println(password); + + conn = new ZooKeeperInstance(new ClientConfiguration().withInstance(instanceName).withZkHosts(zookeepers)).getConnector(user, new PasswordToken(password)); + } + + protected void startTimer() { + startTime = System.currentTimeMillis(); + } + + protected void stopTimer(long numEntries, long numBytes) { + long endTime = System.currentTimeMillis(); + System.out.printf("ELAPSEDMS %d %d %d%n", endTime - startTime, numEntries, numBytes); + } + + public abstract void setup(); + + public abstract void client(); + + public abstract void teardown(); + + public TreeSet<Text> calculateSplits() { + int numSplits = numTabletServers - 1; + long distance = (Long.MAX_VALUE / numTabletServers) + 1; + long split = distance; + TreeSet<Text> keys = new TreeSet<>(); + for (int i = 0; i < numSplits; i++) { + keys.add(new Text(String.format("%016x", split))); + split += distance; + } + return keys; + } + + public Connector getConnector() { + return conn; + } + + public String getTestProperty(String key) { + return testProps.getProperty(key); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/stress/DataWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/stress/DataWriter.java b/core/src/main/java/org/apache/accumulo/testing/core/stress/DataWriter.java new file mode 100644 index 0000000..e7158e2 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/stress/DataWriter.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.stress.random; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; + +public class DataWriter extends Stream<Void> { + private final BatchWriter writer; + private final RandomMutations mutations; + + public DataWriter(BatchWriter writer, RandomMutations mutations) { + this.writer = writer; + this.mutations = mutations; + } + + @Override + public Void next() { + try { + writer.addMutation(mutations.next()); + } catch (MutationsRejectedException e) { + throw new RuntimeException(e); + } + return null; + } + + @Override + public void finalize() { + try { + this.writer.close(); + } catch (MutationsRejectedException e) { + System.err.println("Error closing batch writer."); + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/stress/IntArgValidator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/stress/IntArgValidator.java b/core/src/main/java/org/apache/accumulo/testing/core/stress/IntArgValidator.java new file mode 100644 index 0000000..5a5ad3e --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/stress/IntArgValidator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.test.stress.random; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import com.beust.jcommander.IValueValidator; +import com.beust.jcommander.ParameterException; + +public class IntArgValidator implements IValueValidator<Integer> { + + @Override + public void validate(String name, Integer value) throws ParameterException { + requireNonNull(value); + checkArgument(value > 0); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/stress/RandomByteArrays.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/stress/RandomByteArrays.java b/core/src/main/java/org/apache/accumulo/testing/core/stress/RandomByteArrays.java new file mode 100644 index 0000000..405fabb --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/stress/RandomByteArrays.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.stress.random; + +/** + * A stream that will create random byte arrays as it is looped over. + */ +public class RandomByteArrays extends Stream<byte[]> { + private final RandomWithinRange random_arrays; + + public RandomByteArrays(RandomWithinRange random_arrays) { + this.random_arrays = random_arrays; + } + + @Override + public byte[] next() { + return random_arrays.next_bytes(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/stress/RandomMutations.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/stress/RandomMutations.java b/core/src/main/java/org/apache/accumulo/testing/core/stress/RandomMutations.java new file mode 100644 index 0000000..db5da55 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/stress/RandomMutations.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.stress.random; + +import org.apache.accumulo.core.data.Mutation; + +public class RandomMutations extends Stream<Mutation> { + private final RandomByteArrays rows, column_families, column_qualifiers, values; + private final RandomWithinRange row_widths; + private final int max_cells_per_mutation; + private byte[] current_row; + private int cells_remaining_in_row; + + public RandomMutations(RandomByteArrays rows, RandomByteArrays column_families, RandomByteArrays column_qualifiers, RandomByteArrays values, + RandomWithinRange row_widths, int max_cells_per_mutation) { + this.rows = rows; + this.column_families = column_families; + this.column_qualifiers = column_qualifiers; + this.values = values; + this.row_widths = row_widths; + this.max_cells_per_mutation = (max_cells_per_mutation > 0 ? max_cells_per_mutation : Integer.MAX_VALUE); + + current_row = null; + cells_remaining_in_row = 0; + } + + // TODO should we care about timestamps? + @Override + public Mutation next() { + if (cells_remaining_in_row == 0) { + current_row = rows.next(); + cells_remaining_in_row = row_widths.next(); + } + Mutation m = new Mutation(current_row); + final int cells = Math.min(cells_remaining_in_row, max_cells_per_mutation); + for (int i = 1; i <= cells; i++) { + m.put(column_families.next(), column_qualifiers.next(), values.next()); + } + cells_remaining_in_row -= cells; + return m; + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/stress/RandomWithinRange.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/stress/RandomWithinRange.java b/core/src/main/java/org/apache/accumulo/testing/core/stress/RandomWithinRange.java new file mode 100644 index 0000000..06cea28 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/stress/RandomWithinRange.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.stress.random; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.Random; + +/** + * Class that returns positive integers between some minimum and maximum. + * + */ +public class RandomWithinRange { + private final Random random; + private final int min, max; + + public RandomWithinRange(int seed, int min, int max) { + this(new Random(seed), min, max); + } + + public RandomWithinRange(Random random, int min, int max) { + checkArgument(min > 0, "Min must be positive."); + checkArgument(max >= min, "Max must be greater than or equal to min."); + this.random = random; + this.min = min; + this.max = max; + } + + public int next() { + if (min == max) { + return min; + } else { + // we pick a random number that's between 0 and (max - min), then add + // min as an offset to get a random number that's [min, max) + return random.nextInt(max - min) + min; + } + } + + public byte[] next_bytes() { + byte[] b = new byte[next()]; + random.nextBytes(b); + return b; + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/stress/Scan.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/stress/Scan.java b/core/src/main/java/org/apache/accumulo/testing/core/stress/Scan.java new file mode 100644 index 0000000..3e8d5fd --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/stress/Scan.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.stress.random; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Random; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; + +import com.google.common.collect.Lists; + +public class Scan { + + public static void main(String[] args) throws Exception { + ScanOpts opts = new ScanOpts(); + opts.parseArgs(Scan.class.getName(), args); + + Connector connector = opts.getConnector(); + Scanner scanner = connector.createScanner(opts.getTableName(), new Authorizations()); + + if (opts.isolate) { + scanner.enableIsolation(); + } + + Random tablet_index_generator = new Random(opts.scan_seed); + + LoopControl scanning_condition = opts.continuous ? new ContinuousLoopControl() : new IterativeLoopControl(opts.scan_iterations); + + while (scanning_condition.keepScanning()) { + Range range = pickRange(connector.tableOperations(), opts.getTableName(), tablet_index_generator); + scanner.setRange(range); + if (opts.batch_size > 0) { + scanner.setBatchSize(opts.batch_size); + } + try { + consume(scanner); + } catch (Exception e) { + System.err.println(String.format("Exception while scanning range %s. Check the state of Accumulo for errors.", range)); + throw e; + } + } + } + + public static void consume(Iterable<?> iterable) { + Iterator<?> itr = iterable.iterator(); + while (itr.hasNext()) { + itr.next(); + } + } + + public static Range pickRange(TableOperations tops, String table, Random r) throws TableNotFoundException, AccumuloSecurityException, AccumuloException { + ArrayList<Text> splits = Lists.newArrayList(tops.listSplits(table)); + if (splits.isEmpty()) { + return new Range(); + } else { + int index = r.nextInt(splits.size()); + Text endRow = splits.get(index); + Text startRow = index == 0 ? null : splits.get(index - 1); + return new Range(startRow, false, endRow, true); + } + } + + /* + * These interfaces + implementations are used to determine how many times the scanner should look up a random tablet and scan it. + */ + static interface LoopControl { + public boolean keepScanning(); + } + + // Does a finite number of iterations + static class IterativeLoopControl implements LoopControl { + private final int max; + private int current; + + public IterativeLoopControl(int max) { + this.max = max; + this.current = 0; + } + + @Override + public boolean keepScanning() { + if (current < max) { + ++current; + return true; + } else { + return false; + } + } + } + + // Does an infinite number of iterations + static class ContinuousLoopControl implements LoopControl { + @Override + public boolean keepScanning() { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/stress/ScanOpts.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/stress/ScanOpts.java b/core/src/main/java/org/apache/accumulo/testing/core/stress/ScanOpts.java new file mode 100644 index 0000000..e3f73f7 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/stress/ScanOpts.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.stress.random; + +import org.apache.accumulo.core.cli.ClientOnDefaultTable; + +import com.beust.jcommander.Parameter; + +class ScanOpts extends ClientOnDefaultTable { + @Parameter(names = "--isolate", description = "true to turn on scan isolation, false to turn off. default is false.") + boolean isolate = false; + + @Parameter(names = "--num-iterations", description = "number of scan iterations") + int scan_iterations = 1024; + + @Parameter(names = "--continuous", description = "continuously scan the table. note that this overrides --num-iterations") + boolean continuous; + + @Parameter(names = "--scan-seed", description = "seed for randomly choosing tablets to scan") + int scan_seed = 1337; + + @Parameter(names = "--scan-batch-size", description = "scanner batch size") + int batch_size = -1; + + public ScanOpts() { + this(WriteOptions.DEFAULT_TABLE); + } + + public ScanOpts(String table) { + super(table); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/stress/Stream.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/stress/Stream.java b/core/src/main/java/org/apache/accumulo/testing/core/stress/Stream.java new file mode 100644 index 0000000..72b31e5 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/stress/Stream.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.stress.random; + +import java.util.Iterator; + +/** + * Base class to model an infinite stream of data. A stream implements an iterator whose {{@link #hasNext()} method will always return true. + * + */ +public abstract class Stream<T> implements Iterator<T> { + + @Override + public final boolean hasNext() { + return true; + } + + @Override + public abstract T next(); + + @Override + public final void remove() { + throw new UnsupportedOperationException(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/stress/Write.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/stress/Write.java b/core/src/main/java/org/apache/accumulo/testing/core/stress/Write.java new file mode 100644 index 0000000..ea6f164 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/stress/Write.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.stress.random; + +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; + +public class Write { + + public static void main(String[] args) throws Exception { + WriteOptions opts = new WriteOptions(); + BatchWriterOpts batch_writer_opts = new BatchWriterOpts(); + opts.parseArgs(Write.class.getName(), args, batch_writer_opts); + + opts.check(); + + Connector c = opts.getConnector(); + + if (opts.clear_table && c.tableOperations().exists(opts.getTableName())) { + try { + c.tableOperations().delete(opts.getTableName()); + } catch (TableNotFoundException e) { + System.err.println("Couldn't delete the table because it doesn't exist any more."); + } + } + + if (!c.tableOperations().exists(opts.getTableName())) { + try { + c.tableOperations().create(opts.getTableName()); + } catch (TableExistsException e) { + System.err.println("Couldn't create table ourselves, but that's ok. Continuing."); + } + } + + long writeDelay = opts.write_delay; + if (writeDelay < 0) { + writeDelay = 0; + } + + DataWriter dw = new DataWriter(c.createBatchWriter(opts.getTableName(), batch_writer_opts.getBatchWriterConfig()), new RandomMutations( + // rows + new RandomByteArrays(new RandomWithinRange(opts.row_seed, opts.rowMin(), opts.rowMax())), + // cfs + new RandomByteArrays(new RandomWithinRange(opts.cf_seed, opts.cfMin(), opts.cfMax())), + // cqs + new RandomByteArrays(new RandomWithinRange(opts.cq_seed, opts.cqMin(), opts.cqMax())), + // vals + new RandomByteArrays(new RandomWithinRange(opts.value_seed, opts.valueMin(), opts.valueMax())), + // number of cells per row + new RandomWithinRange(opts.row_width_seed, opts.rowWidthMin(), opts.rowWidthMax()), + // max cells per mutation + opts.max_cells_per_mutation)); + + while (true) { + dw.next(); + if (writeDelay > 0) { + Thread.sleep(writeDelay); + } + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/stress/WriteOptions.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/stress/WriteOptions.java b/core/src/main/java/org/apache/accumulo/testing/core/stress/WriteOptions.java new file mode 100644 index 0000000..f92a9eb --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/stress/WriteOptions.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.stress.random; + +import org.apache.accumulo.core.cli.ClientOnDefaultTable; + +import com.beust.jcommander.Parameter; + +class WriteOptions extends ClientOnDefaultTable { + static final String DEFAULT_TABLE = "stress_test"; + static final int DEFAULT_MIN = 1, DEFAULT_MAX = 128, DEFAULT_SPREAD = DEFAULT_MAX - DEFAULT_MIN; + + @Parameter(validateValueWith = IntArgValidator.class, names = "--min-row-size", description = "minimum row size") + Integer row_min; + + @Parameter(validateValueWith = IntArgValidator.class, names = "--max-row-size", description = "maximum row size") + Integer row_max; + + @Parameter(validateValueWith = IntArgValidator.class, names = "--min-cf-size", description = "minimum column family size") + Integer cf_min; + + @Parameter(validateValueWith = IntArgValidator.class, names = "--max-cf-size", description = "maximum column family size") + Integer cf_max; + + @Parameter(validateValueWith = IntArgValidator.class, names = "--min-cq-size", description = "minimum column qualifier size") + Integer cq_min; + + @Parameter(validateValueWith = IntArgValidator.class, names = "--max-cq-size", description = "maximum column qualifier size") + Integer cq_max; + + @Parameter(validateValueWith = IntArgValidator.class, names = "--min-value-size", description = "minimum value size") + Integer value_min; + + @Parameter(validateValueWith = IntArgValidator.class, names = "--max-value-size", description = "maximum value size") + Integer value_max; + + @Parameter(validateValueWith = IntArgValidator.class, names = "--min-row-width", description = "minimum row width") + Integer row_width_min; + + @Parameter(validateValueWith = IntArgValidator.class, names = "--max-row-width", description = "maximum row width") + Integer row_width_max; + + @Parameter(names = "--clear-table", description = "clears the table before ingesting") + boolean clear_table; + + @Parameter(names = "--row-seed", description = "seed for generating rows") + int row_seed = 87; + + @Parameter(names = "--cf-seed", description = "seed for generating column families") + int cf_seed = 7; + + @Parameter(names = "--cq-seed", description = "seed for generating column qualifiers") + int cq_seed = 43; + + @Parameter(names = "--value-seed", description = "seed for generating values") + int value_seed = 99; + + @Parameter(names = "--row-width-seed", description = "seed for generating the number of cells within a row (a row's \"width\")") + int row_width_seed = 444; + + @Parameter(names = "--max-cells-per-mutation", description = "maximum number of cells per mutation; non-positive value implies no limit") + int max_cells_per_mutation = -1; + + @Parameter(names = "--write-delay", description = "milliseconds to wait between writes") + long write_delay = 0L; + + public WriteOptions(String table) { + super(table); + } + + public WriteOptions() { + this(DEFAULT_TABLE); + } + + private static int minOrDefault(Integer ref) { + return ref == null ? DEFAULT_MIN : ref; + } + + private static int calculateMax(Integer min_ref, Integer max_ref) { + if (max_ref == null) { + if (min_ref == null) { + return DEFAULT_MAX; + } else { + return min_ref + DEFAULT_SPREAD; + } + } else { + return max_ref; + } + } + + public void check() { + checkPair("ROW", row_min, row_max); + checkPair("COLUMN FAMILY", cf_min, cf_max); + checkPair("COLUMN QUALIFIER", cq_min, cq_max); + checkPair("VALUE", value_min, value_max); + } + + public void checkPair(String label, Integer min_ref, Integer max_ref) { + // we've already asserted that the numbers will either be + // 1) null + // 2) positive + // need to verify that they're coherent here + + if (min_ref == null && max_ref != null) { + // we don't support just specifying a max yet + throw new IllegalArgumentException(String.format("[%s] Maximum value supplied, but no minimum. Must supply a minimum with a maximum value.", label)); + } else if (min_ref != null && max_ref != null) { + // if a user supplied lower and upper bounds, we need to verify + // that min <= max + if (min_ref.compareTo(max_ref) > 0) { + throw new IllegalArgumentException(String.format("[%s] Min value (%d) is greater than max value (%d)", label, min_ref, max_ref)); + } + } + } + + public int rowMin() { + return minOrDefault(row_min); + } + + public int rowMax() { + return calculateMax(row_min, row_max); + } + + public int cfMin() { + return minOrDefault(cf_min); + } + + public int cfMax() { + return calculateMax(cf_min, cf_max); + } + + public int cqMin() { + return minOrDefault(cq_min); + } + + public int cqMax() { + return calculateMax(cq_min, cq_max); + } + + public int valueMin() { + return minOrDefault(value_min); + } + + public int valueMax() { + return calculateMax(value_min, value_max); + } + + public int rowWidthMin() { + return minOrDefault(row_width_min); + } + + public int rowWidthMax() { + return calculateMax(row_width_min, row_width_max); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/core/src/main/java/org/apache/accumulo/testing/core/stress/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/stress/package-info.java b/core/src/main/java/org/apache/accumulo/testing/core/stress/package-info.java new file mode 100644 index 0000000..fdbf72e --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/stress/package-info.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains utility classes designed to test Accumulo when large cells are being written. This is an attempt to observe the behavior Accumulo + * displays when compacting and reading these cells. + * + * There are two components to this package: {@link org.apache.accumulo.test.stress.random.Write} and {@link org.apache.accumulo.test.stress.random.Scan}. + * + * The {@link org.apache.accumulo.test.stress.random.Write} provides facilities for writing random sized cells. Users can configure minimum and maximum + * sized portions of a cell. The portions users can configure are the row, column family, column qualifier and value. Note that the sizes are uniformly + * distributed between the minimum and maximum values. See {@link org.apache.accumulo.test.stress.random.WriteOptions} for available options and default sizing + * information. + * + * The Scan provides users with the ability to query tables generated by the Write. It will pick a tablet at random and scan the entire range. The + * amount of times this process is done is user configurable. By default, it happens 1,024 times. Users can also specify whether or not the scan should be + * isolated or not. + * + * There is no shared state intended by either of these services. This allows multiple clients to be run in parallel, either on the same host or distributed + * across hosts. + */ +package org.apache.accumulo.test.stress.random; + http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/test/agitator/.gitignore ---------------------------------------------------------------------- diff --git a/test/agitator/.gitignore b/test/agitator/.gitignore new file mode 100644 index 0000000..3429b01 --- /dev/null +++ b/test/agitator/.gitignore @@ -0,0 +1,3 @@ +*~ +*.ini +*.pyc http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/test/agitator/README.md ---------------------------------------------------------------------- diff --git a/test/agitator/README.md b/test/agitator/README.md new file mode 100644 index 0000000..8abb74c --- /dev/null +++ b/test/agitator/README.md @@ -0,0 +1,39 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +Agitator: randomly kill processes +=========================== + +The agitator is used to randomly select processes for termination during +system test. + +Configure the agitator using the example agitator.ini file provided. + +Create a list of hosts to be agitated: + + $ cp ../../../conf/tservers hosts + $ echo master >> hosts + $ echo namenode >> hosts + +The agitator can be used to kill and restart any part of the accumulo +ecosystem: zookeepers, namenode, datanodes, tablet servers and master. +You can choose to agitate them all with "--all" + + $ ./agitator.py --all --hosts=hosts --config=agitator.ini --log DEBUG + +You will need to be able to ssh, without passwords, to all your hosts as +the user that can kill and start the services. http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/test/agitator/agitator.ini.example ---------------------------------------------------------------------- diff --git a/test/agitator/agitator.ini.example b/test/agitator/agitator.ini.example new file mode 100644 index 0000000..3512561 --- /dev/null +++ b/test/agitator/agitator.ini.example @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[DEFAULT] +install=%(env.pwd)s/../../../.. +user=%(env.user)s + +[agitator] +kill=kill -9 +ssh=ssh -q -A -o StrictHostKeyChecking=no +sleep=300 +sleep.restart=30 +sleep.jitter=30 + +[accumulo] +home=%(install)s/accumulo +tserver.kill.min=1 +tserver.kill.max=1 +tserver.frequency=0.8 + +master.kill.min=1 +master.kill.max=1 +master.frequency=0.1 + +gc.kill.min=1 +gc.kill.max=1 +gc.frequency=0.1 + +[hadoop] +home=%(install)s/hadoop +bin=%(home)s/bin +datanode.frequency=0.8 +datanode.kill.min=1 +datanode.kill.max=1 +namenode.frequency=0.05 +namenode.kill.min=1 +namenode.kill.max=1 +secondarynamenode.frequency=0.05 +secondarynamenode.kill.min=1 +secondarynamenode.kill.max=1 + +[zookeeper] +home=%(install)s/zookeeper +frequency=0.05 http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/test/agitator/agitator.py ---------------------------------------------------------------------- diff --git a/test/agitator/agitator.py b/test/agitator/agitator.py new file mode 100755 index 0000000..db94546 --- /dev/null +++ b/test/agitator/agitator.py @@ -0,0 +1,241 @@ +#! /usr/bin/python + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +import logging +import ConfigParser + +# add the environment variables as default settings +import os +defaults=dict([('env.' + k, v) for k, v in os.environ.iteritems()]) +config = ConfigParser.ConfigParser(defaults) + +# things you can do to a particular kind of process +class Proc: + program = 'Unknown' + _frequencyToKill = 1.0 + + def start(self, host): + pass + + def find(self, host): + pass + + def numberToKill(self): + return (1, 1) + + def frequencyToKill(self): + return self._frequencyToKill + + def user(self): + return config.get(self.program, 'user') + + def kill(self, host, pid): + kill = config.get('agitator', 'kill').split() + code, stdout, stderr = self.runOn(host, kill + [pid]) + if code != 0: + raise logging.warn("Unable to kill %d on %s (%s)", pid, host, stderr) + + def runOn(self, host, cmd): + ssh = config.get('agitator', 'ssh').split() + return self.run(ssh + ["%s@%s" % (self.user(), host)] + cmd) + + def run(self, cmd): + import subprocess + cmd = map(str, cmd) + logging.debug('Running %s', cmd) + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + if stdout.strip(): + logging.debug("%s", stdout.strip()) + if stderr.strip(): + logging.error("%s", stderr.strip()) + if p.returncode != 0: + logging.error("Problem running %s", ' '.join(cmd)) + return p.returncode, stdout, stderr + + def __repr__(self): + return self.program + +class Zookeeper(Proc): + program = 'zookeeper' + def __init__(self): + self._frequencyToKill = config.getfloat(self.program, 'frequency') + + def start(self, host): + self.runOn(host, [config.get(self.program, 'home') + '/bin/zkServer.sh start']) + + def find(self, host): + code, stdout, stderr = self.runOn(host, ['pgrep -f [Q]uorumPeerMain || true']) + return map(int, [line for line in stdout.split("\n") if line]) + +class Hadoop(Proc): + section = 'hadoop' + def __init__(self, program): + self.program = program + self._frequencyToKill = config.getfloat(self.section, program + '.frequency') + self.minimumToKill = config.getint(self.section, program + '.kill.min') + self.maximumToKill = config.getint(self.section, program + '.kill.max') + + def start(self, host): + binDir = config.get(self.section, 'bin') + self.runOn(host, ['nohup %s/hdfs %s < /dev/null >/dev/null 2>&1 &' %(binDir, self.program)]) + + def find(self, host): + code, stdout, stderr = self.runOn(host, ["pgrep -f 'proc[_]%s' || true" % (self.program,)]) + return map(int, [line for line in stdout.split("\n") if line]) + + def numberToKill(self): + return (self.minimumToKill, self.maximumToKill) + + def user(self): + return config.get(self.section, 'user') + +class Accumulo(Hadoop): + section = 'accumulo' + def start(self, host): + home = config.get(self.section, 'home') + self.runOn(host, ['nohup %s/bin/accumulo %s </dev/null >/dev/null 2>&1 & ' %(home, self.program)]) + + def find(self, host): + code, stdout, stderr = self.runOn(host, ["pgrep -f 'app[=]%s' || true" % self.program]) + return map(int, [line for line in stdout.split("\n") if line]) + +def fail(msg): + import sys + logging.critical(msg) + sys.exit(1) + +def jitter(n): + return random.random() * n - n / 2 + +def sleep(n): + if n > 0: + logging.info("Sleeping %.2f", n) + import time + time.sleep(n) + +def agitate(hosts, procs): + starters = [] + + logging.info("Agitating %s on %d hosts" % (procs, len(hosts))) + + section = 'agitator' + + # repeatedly... + while True: + if starters: + # start up services that were previously killed + t = max(0, config.getfloat(section, 'sleep.restart') + jitter(config.getfloat(section, 'sleep.jitter'))) + sleep(t) + for host, proc in starters: + logging.info('Starting %s on %s', proc, host) + proc.start(host) + starters = [] + + # wait some time + t = max(0, config.getfloat(section, 'sleep') + jitter(config.getfloat(section, 'sleep.jitter'))) + sleep(t) + + # for some processes + for p in procs: + + # roll dice: should it be killed? + if random.random() < p.frequencyToKill(): + + # find them + from multiprocessing import Pool + def finder(host): + return host, p.find(host) + with Pool(5) as pool: + result = pool.map(finder, hosts) + candidates = {} + for host, pids in result: + if pids: + candidates[host] = pids + + # how many? + minKill, maxKill = p.numberToKill() + count = min(random.randrange(minKill, maxKill + 1), len(candidates)) + + # pick the victims + doomedHosts = random.sample(candidates.keys(), count) + + # kill them + logging.info("Killing %s on %s", p, doomedHosts) + for doomedHost in doomedHosts: + pids = candidates[doomedHost] + if not pids: + logging.error("Unable to kill any %s on %s: no processes of that type are running", p, doomedHost) + else: + pid = random.choice(pids) + logging.debug("Killing %s (%d) on %s", p, pid, doomedHost) + p.kill(doomedHost, pid) + # remember to restart them later + starters.append((doomedHost, p)) + +def main(): + import argparse + parser = argparse.ArgumentParser(description='Kill random processes') + parser.add_argument('--log', help='set the log level', default='INFO') + parser.add_argument('--namenodes', help='randomly kill namenodes', action="store_true") + parser.add_argument('--secondary', help='randomly kill secondary namenode', action="store_true") + parser.add_argument('--datanodes', help='randomly kill datanodes', action="store_true") + parser.add_argument('--tservers', help='randomly kill tservers', action="store_true") + parser.add_argument('--masters', help='randomly kill masters', action="store_true") + parser.add_argument('--zookeepers', help='randomly kill zookeepers', action="store_true") + parser.add_argument('--gc', help='randomly kill the file garbage collector', action="store_true") + parser.add_argument('--all', + help='kill any of the tservers, masters, datanodes, namenodes or zookeepers', + action='store_true') + parser.add_argument('--hosts', type=argparse.FileType('r'), required=True) + parser.add_argument('--config', type=argparse.FileType('r'), required=True) + args = parser.parse_args() + + config.readfp(args.config) + + level = getattr(logging, args.log.upper(), None) + if isinstance(level, int): + logging.basicConfig(level=level) + + procs = [] + def addIf(flag, proc): + if flag or args.all: + procs.append(proc) + + addIf(args.namenodes, Hadoop('namenode')) + addIf(args.datanodes, Hadoop('datanode')) + addIf(args.secondary, Hadoop('secondarynamenode')) + addIf(args.tservers, Accumulo('tserver')) + addIf(args.masters, Accumulo('master')) + addIf(args.gc, Accumulo('gc')) + addIf(args.zookeepers, Zookeeper()) + if len(procs) == 0: + fail("No processes to agitate!\n") + + hosts = [] + for line in args.hosts.readlines(): + line = line.strip() + if line and line[0] != '#': + hosts.append(line) + if not hosts: + fail('No hosts to agitate!\n') + + agitate(hosts, procs) + +if __name__ == '__main__': + main() http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/test/agitator/hosts.example ---------------------------------------------------------------------- diff --git a/test/agitator/hosts.example b/test/agitator/hosts.example new file mode 100644 index 0000000..63fb8bb --- /dev/null +++ b/test/agitator/hosts.example @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +localhost http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/test/bench/README.md ---------------------------------------------------------------------- diff --git a/test/bench/README.md b/test/bench/README.md new file mode 100644 index 0000000..0929bc3 --- /dev/null +++ b/test/bench/README.md @@ -0,0 +1,61 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +Benchmark Tests +=============== + +Running the Benchmarks +---------------------- + +Syntax for running run.py: + +> `$ ./run.py [-l -v <log_level> -s <run_speed> -u <user> -p <password> -i <instance>] [Benchmark1 ... BenchmarkN]` + +Specifying a specific benchmark or set of benchmarks runs only those, while +not specifying any runs all benchmarks. + +`-l` lists the benchmarks that will be run +`-v <run_speed>` can either be slow, medium or fast +`-s <log_level>` is a number representing the verbosity of the debugging output: 10 is debug, 20 is info, 30 is warning, etc. +`-u <user>` user to use when connecting with accumulo. If not set you will be prompted to input it. +`-p <password>` password to use when connecting with accumulo. If not set you will be prompted to input it. +`-z <zookeepers>` comma delimited lit of zookeeper host:port pairs to use when connecting with accumulo. If not set you will be prompted to input it. +`-i <instance>` instance to use when connecting with accumulo. If not set you will be prompted to input it. + +The Benchmarks +-------------- + +Values in a 3-tuple are the slow,medium,fast speeds at which you can run the benchmarks. + +* CloudStone1: Test the speed at which we can check that accumulo is up and we can reach all the tservers. Lower is better. +* CloudStone2: Ingest 10000,100000,1000000 rows of values 50 bytes on every tserver. Higher is better. +* CloudStone3: Ingest 1000,5000,10000 rows of values 1024,8192,65535 bytes on every tserver. Higher is better. +* CloudStone4 (TeraSort): Ingests 10000,10000000,10000000000 rows. Lower score is better. +* CloudStone5: Creates 100,500,1000 tables named TestTableX and then deletes them. Lower is better. +* CloudStone6: Creates a table with 400, 800, 1000 splits. Lower is better. + +Terasort +-------- + +The 4th Benchmark is Terasort. Run the benchmarks with speed 'slow' to do a full terasort. + +Misc +---- + +These benchmarks create tables in accumulo named 'test_ingest' and 'CloudIngestTest'. These tables are deleted +at the end of the benchmarks. The benchmarks will also alter user auths while it runs. It is recommended that +a benchmark user is created. + http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/test/bench/cloudstone1/__init__.py ---------------------------------------------------------------------- diff --git a/test/bench/cloudstone1/__init__.py b/test/bench/cloudstone1/__init__.py new file mode 100755 index 0000000..09697dc --- /dev/null +++ b/test/bench/cloudstone1/__init__.py @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/test/bench/cloudstone1/cloudstone1.py ---------------------------------------------------------------------- diff --git a/test/bench/cloudstone1/cloudstone1.py b/test/bench/cloudstone1/cloudstone1.py new file mode 100755 index 0000000..309ef9c --- /dev/null +++ b/test/bench/cloudstone1/cloudstone1.py @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import time + +from lib import cloudshell +from lib.Benchmark import Benchmark +from lib.tservers import runAll +from lib.path import accumulo + +class CloudStone1(Benchmark): + + def shortDescription(self): + return 'Test the speed at which we can check that accumulo is up '\ + 'and we can reach all the tservers. Lower is better.' + + def runTest(self): + code, out, err = cloudshell.run(self.username, self.password, 'table accumulo.metadata\nscan\n') + self.assertEqual(code, 0, "Could not scan the metadata table. %s %s" % (out, err)) + results = runAll('echo help | %s shell -u %s -p %s' % + (accumulo('bin', 'accumulo'), self.username, self.password)) + + def setSpeed(self, speed): + "We want to override this method but no speed can be set" + +def suite(): + result = unittest.TestSuite([ + CloudStone1(), + ]) + return result http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/test/bench/cloudstone2/__init__.py ---------------------------------------------------------------------- diff --git a/test/bench/cloudstone2/__init__.py b/test/bench/cloudstone2/__init__.py new file mode 100755 index 0000000..09697dc --- /dev/null +++ b/test/bench/cloudstone2/__init__.py @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/test/bench/cloudstone2/cloudstone2.py ---------------------------------------------------------------------- diff --git a/test/bench/cloudstone2/cloudstone2.py b/test/bench/cloudstone2/cloudstone2.py new file mode 100755 index 0000000..996e5ef --- /dev/null +++ b/test/bench/cloudstone2/cloudstone2.py @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest + +from lib import cloudshell +from lib.IngestBenchmark import IngestBenchmark + +class CloudStone2(IngestBenchmark): + "TestIngest one million small records on each tserver" + + _size = 50 + _count = 1000000 + + def size(self): + return self._size + + def count(self): + return self._count + + def setSpeed(self, speed): + if speed == "fast": + self._size = 50 + self._count = 10000 + elif speed == "medium": + self._size = 50 + self._count = 100000 + elif speed == "slow": + self._size = 50 + self._count = 1000000 + +def suite(): + result = unittest.TestSuite([ + CloudStone2(), + ]) + return result http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/0d97273c/test/bench/cloudstone3/__init__.py ---------------------------------------------------------------------- diff --git a/test/bench/cloudstone3/__init__.py b/test/bench/cloudstone3/__init__.py new file mode 100755 index 0000000..09697dc --- /dev/null +++ b/test/bench/cloudstone3/__init__.py @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +