Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT Conflicts: src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java src/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0d2cd1c0 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0d2cd1c0 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0d2cd1c0 Branch: refs/heads/1.5.2-SNAPSHOT Commit: 0d2cd1c06cef923aa4026d9bee3df1966ee50d9c Parents: 2cca3ee f0759dc Author: Mike Drob <md...@cloudera.com> Authored: Fri Mar 28 17:23:48 2014 -0400 Committer: Mike Drob <md...@cloudera.com> Committed: Fri Mar 28 17:23:48 2014 -0400 ---------------------------------------------------------------------- .../simple/mapreduce/TeraSortIngest.java | 4 +- .../accumulo/server/util/CountRowKeys.java | 3 +- .../server/util/reflection/CounterUtils.java | 43 ++++++++++++++++++++ .../test/continuous/ContinuousMoru.java | 3 +- .../test/continuous/ContinuousVerify.java | 29 +++---------- .../accumulo/test/functional/RunTests.java | 3 +- 6 files changed, 56 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java ---------------------------------------------------------------------- diff --cc examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java index f06aeec,0000000..f131e6c mode 100644,000000..100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java @@@ -1,404 -1,0 +1,404 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.examples.simple.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.accumulo.core.cli.ClientOnRequiredTable; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.beust.jcommander.Parameter; + +/** + * Generate the *almost* official terasort input data set. (See below) The user specifies the number of rows and the output directory and this class runs a + * map/reduce program to generate the data. The format of the data is: + * <ul> + * <li>(10 bytes key) (10 bytes rowid) (78 bytes filler) \r \n + * <li>The keys are random characters from the set ' ' .. '~'. + * <li>The rowid is the right justified row id as a int. + * <li>The filler consists of 7 runs of 10 characters from 'A' to 'Z'. + * </ul> + * + * This TeraSort is slightly modified to allow for variable length key sizes and value sizes. The row length isn't variable. To generate a terabyte of data in + * the same way TeraSort does use 10000000000 rows and 10/10 byte key length and 78/78 byte value length. Along with the 10 byte row id and \r\n this gives you + * 100 byte row * 10000000000 rows = 1tb. Min/Max ranges for key and value parameters are inclusive/inclusive respectively. + * + * + */ +public class TeraSortIngest extends Configured implements Tool { + /** + * An input format that assigns ranges of longs to each mapper. + */ + static class RangeInputFormat extends InputFormat<LongWritable,NullWritable> { + /** + * An input split consisting of a range on numbers. + */ + static class RangeInputSplit extends InputSplit implements Writable { + long firstRow; + long rowCount; + + public RangeInputSplit() {} + + public RangeInputSplit(long offset, long length) { + firstRow = offset; + rowCount = length; + } + + @Override + public long getLength() throws IOException { + return 0; + } + + @Override + public String[] getLocations() throws IOException { + return new String[] {}; + } + + @Override + public void readFields(DataInput in) throws IOException { + firstRow = WritableUtils.readVLong(in); + rowCount = WritableUtils.readVLong(in); + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeVLong(out, firstRow); + WritableUtils.writeVLong(out, rowCount); + } + } + + /** + * A record reader that will generate a range of numbers. + */ + static class RangeRecordReader extends RecordReader<LongWritable,NullWritable> { + long startRow; + long finishedRows; + long totalRows; + + LongWritable currentKey; + + public RangeRecordReader(RangeInputSplit split) { + startRow = split.firstRow; + finishedRows = 0; + totalRows = split.rowCount; + } + + @Override + public void close() throws IOException {} + + @Override + public float getProgress() throws IOException { + return finishedRows / (float) totalRows; + } + + @Override + public LongWritable getCurrentKey() throws IOException, InterruptedException { + return new LongWritable(startRow + finishedRows); + } + + @Override + public NullWritable getCurrentValue() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {} + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (finishedRows < totalRows) { + ++finishedRows; + return true; + } + return false; + } + } + + @Override + public RecordReader<LongWritable,NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { + // reporter.setStatus("Creating record reader"); + return new RangeRecordReader((RangeInputSplit) split); + } + + /** + * Create the desired number of splits, dividing the number of rows between the mappers. + */ + @Override + public List<InputSplit> getSplits(JobContext job) { - long totalRows = job.getConfiguration().getLong(NUMROWS, 0); - int numSplits = job.getConfiguration().getInt(NUMSPLITS, 1); ++ long totalRows = InputFormatBase.getConfiguration(job).getLong(NUMROWS, 0); ++ int numSplits = InputFormatBase.getConfiguration(job).getInt(NUMSPLITS, 1); + long rowsPerSplit = totalRows / numSplits; + System.out.println("Generating " + totalRows + " using " + numSplits + " maps with step of " + rowsPerSplit); + ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numSplits); + long currentRow = 0; + for (int split = 0; split < numSplits - 1; ++split) { + splits.add(new RangeInputSplit(currentRow, rowsPerSplit)); + currentRow += rowsPerSplit; + } + splits.add(new RangeInputSplit(currentRow, totalRows - currentRow)); + System.out.println("Done Generating."); + return splits; + } + + } + + private static String NUMSPLITS = "terasort.overridesplits"; + private static String NUMROWS = "terasort.numrows"; + + static class RandomGenerator { + private long seed = 0; + private static final long mask32 = (1l << 32) - 1; + /** + * The number of iterations separating the precomputed seeds. + */ + private static final int seedSkip = 128 * 1024 * 1024; + /** + * The precomputed seed values after every seedSkip iterations. There should be enough values so that a 2**32 iterations are covered. + */ + private static final long[] seeds = new long[] {0L, 4160749568L, 4026531840L, 3892314112L, 3758096384L, 3623878656L, 3489660928L, 3355443200L, 3221225472L, + 3087007744L, 2952790016L, 2818572288L, 2684354560L, 2550136832L, 2415919104L, 2281701376L, 2147483648L, 2013265920L, 1879048192L, 1744830464L, + 1610612736L, 1476395008L, 1342177280L, 1207959552L, 1073741824L, 939524096L, 805306368L, 671088640L, 536870912L, 402653184L, 268435456L, 134217728L,}; + + /** + * Start the random number generator on the given iteration. + * + * @param initalIteration + * the iteration number to start on + */ + RandomGenerator(long initalIteration) { + int baseIndex = (int) ((initalIteration & mask32) / seedSkip); + seed = seeds[baseIndex]; + for (int i = 0; i < initalIteration % seedSkip; ++i) { + next(); + } + } + + RandomGenerator() { + this(0); + } + + long next() { + seed = (seed * 3141592621l + 663896637) & mask32; + return seed; + } + } + + /** + * The Mapper class that given a row number, will generate the appropriate output line. + */ + public static class SortGenMapper extends Mapper<LongWritable,NullWritable,Text,Mutation> { + private Text table = null; + private int minkeylength = 0; + private int maxkeylength = 0; + private int minvaluelength = 0; + private int maxvaluelength = 0; + + private Text key = new Text(); + private Text value = new Text(); + private RandomGenerator rand; + private byte[] keyBytes; // = new byte[12]; + private byte[] spaces = " ".getBytes(); + private byte[][] filler = new byte[26][]; + { + for (int i = 0; i < 26; ++i) { + filler[i] = new byte[10]; + for (int j = 0; j < 10; ++j) { + filler[i][j] = (byte) ('A' + i); + } + } + } + + /** + * Add a random key to the text + */ + private Random random = new Random(); + + private void addKey() { + int range = random.nextInt(maxkeylength - minkeylength + 1); + int keylen = range + minkeylength; + int keyceil = keylen + (4 - (keylen % 4)); + keyBytes = new byte[keyceil]; + + long temp = 0; + for (int i = 0; i < keyceil / 4; i++) { + temp = rand.next() / 52; + keyBytes[3 + 4 * i] = (byte) (' ' + (temp % 95)); + temp /= 95; + keyBytes[2 + 4 * i] = (byte) (' ' + (temp % 95)); + temp /= 95; + keyBytes[1 + 4 * i] = (byte) (' ' + (temp % 95)); + temp /= 95; + keyBytes[4 * i] = (byte) (' ' + (temp % 95)); + } + key.set(keyBytes, 0, keylen); + } + + /** + * Add the rowid to the row. + * + * @param rowId + */ + private Text getRowIdString(long rowId) { + Text paddedRowIdString = new Text(); + byte[] rowid = Integer.toString((int) rowId).getBytes(); + int padSpace = 10 - rowid.length; + if (padSpace > 0) { + paddedRowIdString.append(spaces, 0, 10 - rowid.length); + } + paddedRowIdString.append(rowid, 0, Math.min(rowid.length, 10)); + return paddedRowIdString; + } + + /** + * Add the required filler bytes. Each row consists of 7 blocks of 10 characters and 1 block of 8 characters. + * + * @param rowId + * the current row number + */ + private void addFiller(long rowId) { + int base = (int) ((rowId * 8) % 26); + + // Get Random var + Random random = new Random(rand.seed); + + int range = random.nextInt(maxvaluelength - minvaluelength + 1); + int valuelen = range + minvaluelength; + + while (valuelen > 10) { + value.append(filler[(base + valuelen) % 26], 0, 10); + valuelen -= 10; + } + + if (valuelen > 0) + value.append(filler[(base + valuelen) % 26], 0, valuelen); + } + + @Override + public void map(LongWritable row, NullWritable ignored, Context context) throws IOException, InterruptedException { + context.setStatus("Entering"); + long rowId = row.get(); + if (rand == null) { + // we use 3 random numbers per a row + rand = new RandomGenerator(rowId * 3); + } + addKey(); + value.clear(); + // addRowId(rowId); + addFiller(rowId); + + // New + Mutation m = new Mutation(key); + m.put(new Text("c"), // column family + getRowIdString(rowId), // column qual + new Value(value.toString().getBytes())); // data + + context.setStatus("About to add to accumulo"); + context.write(table, m); + context.setStatus("Added to accumulo " + key.toString()); + } + + @Override + public void setup(Context job) { + minkeylength = job.getConfiguration().getInt("cloudgen.minkeylength", 0); + maxkeylength = job.getConfiguration().getInt("cloudgen.maxkeylength", 0); + minvaluelength = job.getConfiguration().getInt("cloudgen.minvaluelength", 0); + maxvaluelength = job.getConfiguration().getInt("cloudgen.maxvaluelength", 0); + table = new Text(job.getConfiguration().get("cloudgen.tablename")); + } + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(CachedConfiguration.getInstance(), new TeraSortIngest(), args); + System.exit(res); + } + + static class Opts extends ClientOnRequiredTable { + @Parameter(names = "--count", description = "number of rows to ingest", required = true) + long numRows; + @Parameter(names = {"-nk", "--minKeySize"}, description = "miniumum key size", required = true) + int minKeyLength; + @Parameter(names = {"-xk", "--maxKeySize"}, description = "maximum key size", required = true) + int maxKeyLength; + @Parameter(names = {"-nv", "--minValueSize"}, description = "minimum key size", required = true) + int minValueLength; + @Parameter(names = {"-xv", "--maxValueSize"}, description = "maximum key size", required = true) + int maxValueLength; + @Parameter(names = "--splits", description = "number of splits to create in the table") + int splits = 0; + } + + @Override + public int run(String[] args) throws Exception { + Job job = new Job(getConf(), "TeraSortCloud"); + job.setJarByClass(this.getClass()); + Opts opts = new Opts(); + opts.parseArgs(TeraSortIngest.class.getName(), args); + + job.setInputFormatClass(RangeInputFormat.class); + job.setMapperClass(SortGenMapper.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Mutation.class); + + job.setNumReduceTasks(0); + + job.setOutputFormatClass(AccumuloOutputFormat.class); + opts.setAccumuloConfigs(job); + BatchWriterConfig bwConfig = new BatchWriterConfig().setMaxMemory(10L * 1000 * 1000); + AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig); + + Configuration conf = job.getConfiguration(); + conf.setLong(NUMROWS, opts.numRows); + conf.setInt("cloudgen.minkeylength", opts.minKeyLength); + conf.setInt("cloudgen.maxkeylength", opts.maxKeyLength); + conf.setInt("cloudgen.minvaluelength", opts.minValueLength); + conf.setInt("cloudgen.maxvaluelength", opts.maxValueLength); + conf.set("cloudgen.tablename", opts.tableName); + + if (args.length > 10) + conf.setInt(NUMSPLITS, opts.splits); + + job.waitForCompletion(true); + return job.isSuccessful() ? 0 : 1; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java ---------------------------------------------------------------------- diff --cc server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java index 5676394,0000000..88b2dfb mode 100644,000000..100644 --- a/server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java +++ b/server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java @@@ -1,86 -1,0 +1,87 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.io.IOException; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.server.ServerConstants; ++import org.apache.accumulo.server.util.reflection.CounterUtils; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class CountRowKeys extends Configured implements Tool { + private static class MyMapper extends Mapper<Key,Value,Text,NullWritable> { + Text k = new Text(); + + public void map(Key key, Value value, Context context) throws IOException, InterruptedException { + context.write(key.getRow(k), NullWritable.get()); + } + } + + private static class MyReducer extends Reducer<Text,NullWritable,Text,Text> { + public enum Count { + uniqueRows + } + + public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException { - context.getCounter(Count.uniqueRows).increment(1); ++ CounterUtils.increment(context.getCounter(Count.uniqueRows)); + } + } + + @Override + public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException { + if (args.length != 2) { + System.out.println("Usage: CountRowKeys tableName outputPath"); + return 1; + } + + Job job = new Job(getConf(), this.getClass().getName()); + job.setJarByClass(this.getClass()); + + job.setInputFormatClass(SequenceFileInputFormat.class); + SequenceFileInputFormat.addInputPath(job, new Path(ServerConstants.getTablesDir() + "/" + args[0] + "/*/*/data")); + + job.setMapperClass(MyMapper.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(NullWritable.class); + + job.setReducerClass(MyReducer.class); + + TextOutputFormat.setOutputPath(job, new Path(args[1])); + + job.waitForCompletion(true); + return job.isSuccessful() ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(CachedConfiguration.getInstance(), new CountRowKeys(), args); + if (res != 0) + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/server/src/main/java/org/apache/accumulo/server/util/reflection/CounterUtils.java ---------------------------------------------------------------------- diff --cc server/src/main/java/org/apache/accumulo/server/util/reflection/CounterUtils.java index 0000000,0000000..dbd5f60 new file mode 100644 --- /dev/null +++ b/server/src/main/java/org/apache/accumulo/server/util/reflection/CounterUtils.java @@@ -1,0 -1,0 +1,43 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.accumulo.server.util.reflection; ++ ++import java.lang.reflect.Method; ++ ++import org.apache.hadoop.mapreduce.Counter; ++ ++/** ++ * Utility class for incrementing counters in a compatible way between hadoop 1 and 2 ++ */ ++public class CounterUtils { ++ static private Method INCREMENT; ++ static { ++ try { ++ INCREMENT = Counter.class.getMethod("increment", Long.TYPE); ++ } catch (Exception ex) { ++ throw new RuntimeException(ex); ++ } ++ } ++ ++ public static void increment(Counter counter) { ++ try { ++ INCREMENT.invoke(counter, 1L); ++ } catch (Exception ex) { ++ throw new RuntimeException(ex); ++ } ++ } ++} http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java index bbe7fa3,0000000..a35ca66 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java @@@ -1,180 -1,0 +1,181 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.continuous; + +import java.io.IOException; +import java.util.Random; +import java.util.Set; +import java.util.UUID; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.util.CachedConfiguration; ++import org.apache.accumulo.server.util.reflection.CounterUtils; +import org.apache.accumulo.test.continuous.ContinuousIngest.BaseOpts; +import org.apache.accumulo.test.continuous.ContinuousIngest.ShortConverter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.validators.PositiveInteger; + +/** + * A map only job that reads a table created by continuous ingest and creates doubly linked list. This map reduce job tests the ability of a map only job to + * read and write to accumulo at the same time. This map reduce job mutates the table in such a way that it should not create any undefined nodes. + * + */ +public class ContinuousMoru extends Configured implements Tool { + private static final String PREFIX = ContinuousMoru.class.getSimpleName() + "."; + private static final String MAX_CQ = PREFIX + "MAX_CQ"; + private static final String MAX_CF = PREFIX + "MAX_CF"; + private static final String MAX = PREFIX + "MAX"; + private static final String MIN = PREFIX + "MIN"; + private static final String CI_ID = PREFIX + "CI_ID"; + + static enum Counts { + SELF_READ; + } + + public static class CMapper extends Mapper<Key,Value,Text,Mutation> { + + private short max_cf; + private short max_cq; + private Random random; + private String ingestInstanceId; + private byte[] iiId; + private long count; + + private static final ColumnVisibility EMPTY_VIS = new ColumnVisibility(); + + @Override + public void setup(Context context) throws IOException, InterruptedException { + int max_cf = context.getConfiguration().getInt(MAX_CF, -1); + int max_cq = context.getConfiguration().getInt(MAX_CQ, -1); + + if (max_cf > Short.MAX_VALUE || max_cq > Short.MAX_VALUE) + throw new IllegalArgumentException(); + + this.max_cf = (short) max_cf; + this.max_cq = (short) max_cq; + + random = new Random(); + ingestInstanceId = context.getConfiguration().get(CI_ID); + iiId = ingestInstanceId.getBytes(Constants.UTF8); + + count = 0; + } + + @Override + public void map(Key key, Value data, Context context) throws IOException, InterruptedException { + + ContinuousWalk.validate(key, data); + + if (WritableComparator.compareBytes(iiId, 0, iiId.length, data.get(), 0, iiId.length) != 0) { + // only rewrite data not written by this M/R job + byte[] val = data.get(); + + int offset = ContinuousWalk.getPrevRowOffset(val); + if (offset > 0) { + long rowLong = Long.parseLong(new String(val, offset, 16, Constants.UTF8), 16); + Mutation m = ContinuousIngest.genMutation(rowLong, random.nextInt(max_cf), random.nextInt(max_cq), EMPTY_VIS, iiId, count++, key.getRowData() + .toArray(), random, true); + context.write(null, m); + } + + } else { - ContinuousVerify.increment(context.getCounter(Counts.SELF_READ)); ++ CounterUtils.increment(context.getCounter(Counts.SELF_READ)); + } + } + } + + static class Opts extends BaseOpts { + @Parameter(names = "--maxColF", description = "maximum column family value to use", converter=ShortConverter.class) + short maxColF = Short.MAX_VALUE; + + @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter=ShortConverter.class) + short maxColQ = Short.MAX_VALUE; + + @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class) + int maxMaps = 0; + } + + @Override + public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, AccumuloSecurityException { + Opts opts = new Opts(); + BatchWriterOpts bwOpts = new BatchWriterOpts(); + opts.parseArgs(ContinuousMoru.class.getName(), args, bwOpts); + + Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + job.setJarByClass(this.getClass()); + + job.setInputFormatClass(AccumuloInputFormat.class); + opts.setAccumuloConfigs(job); + + // set up ranges + try { + Set<Range> ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps); + AccumuloInputFormat.setRanges(job, ranges); + AccumuloInputFormat.setAutoAdjustRanges(job, false); + } catch (Exception e) { + throw new IOException(e); + } + + job.setMapperClass(CMapper.class); + + job.setNumReduceTasks(0); + + job.setOutputFormatClass(AccumuloOutputFormat.class); + AccumuloOutputFormat.setBatchWriterOptions(job, bwOpts.getBatchWriterConfig()); + + Configuration conf = job.getConfiguration(); + conf.setLong(MIN, opts.min); + conf.setLong(MAX, opts.max); + conf.setInt(MAX_CF, opts.maxColF); + conf.setInt(MAX_CQ, opts.maxColQ); + conf.set(CI_ID, UUID.randomUUID().toString()); + + job.waitForCompletion(true); + opts.stopTracing(); + return job.isSuccessful() ? 0 : 1; + } + + /** + * + * @param args + * instanceName zookeepers username password table columns outputpath + * @throws Exception + */ + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousMoru(), args); + if (res != 0) + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java index 07e0c92,0000000..8095b50 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java @@@ -1,243 -1,0 +1,224 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.continuous; + +import java.io.IOException; - import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.ClientOnDefaultTable; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.CachedConfiguration; ++import org.apache.accumulo.server.util.reflection.CounterUtils; +import org.apache.accumulo.test.continuous.ContinuousWalk.BadChecksumException; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.VLongWritable; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.validators.PositiveInteger; + +/** + * A map reduce job that verifies a table created by continuous ingest. It verifies that all referenced nodes are defined. + */ + +public class ContinuousVerify extends Configured implements Tool { - - // work around hadoop-1/hadoop-2 runtime incompatibility - static private Method INCREMENT; - static { - try { - INCREMENT = Counter.class.getMethod("increment", Long.TYPE); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - static void increment(Object obj) { - try { - INCREMENT.invoke(obj, 1L); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - + public static final VLongWritable DEF = new VLongWritable(-1); + + public static class CMapper extends Mapper<Key,Value,LongWritable,VLongWritable> { + + private LongWritable row = new LongWritable(); + private LongWritable ref = new LongWritable(); + private VLongWritable vrow = new VLongWritable(); + + private long corrupt = 0; + + @Override + public void map(Key key, Value data, Context context) throws IOException, InterruptedException { + long r = Long.parseLong(key.getRow().toString(), 16); + if (r < 0) + throw new IllegalArgumentException(); + + try { + ContinuousWalk.validate(key, data); + } catch (BadChecksumException bce) { - increment(context.getCounter(Counts.CORRUPT)); ++ CounterUtils.increment(context.getCounter(Counts.CORRUPT)); + if (corrupt < 1000) { + System.out.println("ERROR Bad checksum : " + key); + } else if (corrupt == 1000) { + System.out.println("Too many bad checksums, not printing anymore!"); + } + corrupt++; + return; + } + + row.set(r); + + context.write(row, DEF); + byte[] val = data.get(); + + int offset = ContinuousWalk.getPrevRowOffset(val); + if (offset > 0) { + ref.set(Long.parseLong(new String(val, offset, 16, Constants.UTF8), 16)); + vrow.set(r); + context.write(ref, vrow); + } + } + } + + public static enum Counts { + UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT + } + + public static class CReducer extends Reducer<LongWritable,VLongWritable,Text,Text> { + private ArrayList<Long> refs = new ArrayList<Long>(); + + @Override + public void reduce(LongWritable key, Iterable<VLongWritable> values, Context context) throws IOException, InterruptedException { + + int defCount = 0; + + refs.clear(); + for (VLongWritable type : values) { + if (type.get() == -1) { + defCount++; + } else { + refs.add(type.get()); + } + } + + if (defCount == 0 && refs.size() > 0) { + StringBuilder sb = new StringBuilder(); + String comma = ""; + for (Long ref : refs) { + sb.append(comma); + comma = ","; + sb.append(new String(ContinuousIngest.genRow(ref), Constants.UTF8)); + } + + context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString())); - increment(context.getCounter(Counts.UNDEFINED)); ++ CounterUtils.increment(context.getCounter(Counts.UNDEFINED)); + + } else if (defCount > 0 && refs.size() == 0) { - increment(context.getCounter(Counts.UNREFERENCED)); ++ CounterUtils.increment(context.getCounter(Counts.UNREFERENCED)); + } else { - increment(context.getCounter(Counts.REFERENCED)); ++ CounterUtils.increment(context.getCounter(Counts.REFERENCED)); + } + + } + } + + static class Opts extends ClientOnDefaultTable { + @Parameter(names = "--output", description = "location in HDFS to store the results; must not exist", required = true) + String outputDir = "/tmp/continuousVerify"; + + @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class) + int maxMaps = 0; + + @Parameter(names = "--reducers", description = "the number of reducers to use", required = true, validateWith = PositiveInteger.class) + int reducers = 0; + + @Parameter(names = "--offline", description = "perform the verification directly on the files while the table is offline") + boolean scanOffline = false; + + public Opts() { + super("ci"); + } + } + + @Override + public int run(String[] args) throws Exception { + Opts opts = new Opts(); + opts.parseArgs(this.getClass().getName(), args); + + Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + job.setJarByClass(this.getClass()); + + job.setInputFormatClass(AccumuloInputFormat.class); + opts.setAccumuloConfigs(job); + + Set<Range> ranges = null; + String clone = opts.getTableName(); + Connector conn = null; + + if (opts.scanOffline) { + Random random = new Random(); + clone = opts.getTableName() + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffl)); + conn = opts.getConnector(); + conn.tableOperations().clone(opts.getTableName(), clone, true, new HashMap<String,String>(), new HashSet<String>()); + ranges = conn.tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps); + conn.tableOperations().offline(clone); + AccumuloInputFormat.setInputTableName(job, clone); + AccumuloInputFormat.setOfflineTableScan(job, true); + } else { + ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps); + } + + AccumuloInputFormat.setRanges(job, ranges); + AccumuloInputFormat.setAutoAdjustRanges(job, false); + + job.setMapperClass(CMapper.class); + job.setMapOutputKeyClass(LongWritable.class); + job.setMapOutputValueClass(VLongWritable.class); + + job.setReducerClass(CReducer.class); + job.setNumReduceTasks(opts.reducers); + + job.setOutputFormatClass(TextOutputFormat.class); + + job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", opts.scanOffline); + + TextOutputFormat.setOutputPath(job, new Path(opts.outputDir)); + + job.waitForCompletion(true); + + if (opts.scanOffline) { + conn.tableOperations().delete(clone); + } + opts.stopTracing(); + return job.isSuccessful() ? 0 : 1; + } + + /** + * + * @param args + * instanceName zookeepers username password table columns outputpath + * @throws Exception + */ + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousVerify(), args); + if (res != 0) + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/RunTests.java index f6ebe87,0000000..2b775c5 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java @@@ -1,216 -1,0 +1,217 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.Help; ++import org.apache.accumulo.server.util.reflection.CounterUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Logger; + +import com.beust.jcommander.Parameter; + +/** + * Runs the functional tests via map-reduce. + * + * First, be sure everything is compiled. + * + * Second, get a list of the tests you want to run: + * + * <pre> + * $ python test/system/auto/run.py -l > tests + * </pre> + * + * Put the list of tests into HDFS: + * + * <pre> + * $ hadoop fs -put tests /user/hadoop/tests + * </pre> + * + * Run the map-reduce job: + * + * <pre> + * $ ./bin/accumulo accumulo.test.functional.RunTests --tests /user/hadoop/tests --output /user/hadoop/results + * </pre> + * + * Note that you will need to have some configuration in conf/accumulo-site.xml (to locate zookeeper). The map-reduce jobs will not use your local accumulo + * instance. + * + */ +public class RunTests extends Configured implements Tool { + + static final public String JOB_NAME = "Functional Test Runner"; + private static final Logger log = Logger.getLogger(RunTests.class); + + private Job job = null; + + private static final int DEFAULT_TIMEOUT_FACTOR = 1; + + static class Opts extends Help { + @Parameter(names="--tests", description="newline separated list of tests to run", required=true) + String testFile; + @Parameter(names="--output", description="destination for the results of tests in HDFS", required=true) + String outputPath; + @Parameter(names="--timeoutFactor", description="Optional scaling factor for timeout for both mapred.task.timeout and -f flag on run.py", required=false) + Integer intTimeoutFactor = DEFAULT_TIMEOUT_FACTOR; + } + + static final String TIMEOUT_FACTOR = RunTests.class.getName() + ".timeoutFactor"; + + static public class TestMapper extends Mapper<LongWritable,Text,Text,Text> { + + private static final String REDUCER_RESULT_START = "::::: "; + private static final int RRS_LEN = REDUCER_RESULT_START.length(); + private Text result = new Text(); + String mapperTimeoutFactor = null; + + private static enum Outcome { + SUCCESS, FAILURE, ERROR, UNEXPECTED_SUCCESS, EXPECTED_FAILURE + } + private static final Map<Character, Outcome> OUTCOME_COUNTERS; + static { + OUTCOME_COUNTERS = new java.util.HashMap<Character, Outcome>(); + OUTCOME_COUNTERS.put('S', Outcome.SUCCESS); + OUTCOME_COUNTERS.put('F', Outcome.FAILURE); + OUTCOME_COUNTERS.put('E', Outcome.ERROR); + OUTCOME_COUNTERS.put('T', Outcome.UNEXPECTED_SUCCESS); + OUTCOME_COUNTERS.put('G', Outcome.EXPECTED_FAILURE); + } + + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + List<String> cmd = Arrays.asList("/usr/bin/python", "test/system/auto/run.py", "-m", "-f", mapperTimeoutFactor, "-t", value.toString()); + log.info("Running test " + cmd); + ProcessBuilder pb = new ProcessBuilder(cmd); + pb.directory(new File(context.getConfiguration().get("accumulo.home"))); + pb.redirectErrorStream(true); + Process p = pb.start(); + p.getOutputStream().close(); + InputStream out = p.getInputStream(); + InputStreamReader outr = new InputStreamReader(out, Constants.UTF8); + BufferedReader br = new BufferedReader(outr); + String line; + try { + while ((line = br.readLine()) != null) { + log.info("More: " + line); + if (line.startsWith(REDUCER_RESULT_START)) { + String resultLine = line.substring(RRS_LEN); + if (resultLine.length() > 0) { + Outcome outcome = OUTCOME_COUNTERS.get(resultLine.charAt(0)); + if (outcome != null) { - context.getCounter(outcome).increment(1); ++ CounterUtils.increment(context.getCounter(outcome)); + } + } + String taskAttemptId = context.getTaskAttemptID().toString(); + result.set(taskAttemptId + " " + resultLine); + context.write(value, result); + } + } + } catch (Exception ex) { + log.error(ex); + context.progress(); + } + + p.waitFor(); + } + + @Override + protected void setup(Mapper<LongWritable,Text,Text,Text>.Context context) throws IOException, InterruptedException { + mapperTimeoutFactor = Integer.toString(context.getConfiguration().getInt(TIMEOUT_FACTOR, DEFAULT_TIMEOUT_FACTOR)); + } + } + + @Override + public int run(String[] args) throws Exception { + job = new Job(getConf(), JOB_NAME); + job.setJarByClass(this.getClass()); + Opts opts = new Opts(); + opts.parseArgs(RunTests.class.getName(), args); + + // this is like 1-2 tests per mapper + Configuration conf = job.getConfiguration(); + conf.setInt("mapred.max.split.size", 40); + conf.set("accumulo.home", System.getenv("ACCUMULO_HOME")); + + // Taking third argument as scaling factor to setting mapred.task.timeout + // and TIMEOUT_FACTOR + conf.setInt("mapred.task.timeout", opts.intTimeoutFactor * 8 * 60 * 1000); + conf.setInt(TIMEOUT_FACTOR, opts.intTimeoutFactor); + conf.setBoolean("mapred.map.tasks.speculative.execution", false); + + // set input + job.setInputFormatClass(TextInputFormat.class); + TextInputFormat.setInputPaths(job, new Path(opts.testFile)); + + // set output + job.setOutputFormatClass(TextOutputFormat.class); + FileSystem fs = FileSystem.get(conf); + Path destination = new Path(opts.outputPath); + if (fs.exists(destination)) { + log.info("Deleting existing output directory " + opts.outputPath); + fs.delete(destination, true); + } + TextOutputFormat.setOutputPath(job, destination); + + // configure default reducer: put the results into one file + job.setNumReduceTasks(1); + + // set mapper + job.setMapperClass(TestMapper.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + // don't do anything with the results (yet) a summary would be nice + job.setNumReduceTasks(0); + + // submit the job + log.info("Starting tests"); + return 0; + } + + /** + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + RunTests tests = new RunTests(); + ToolRunner.run(new Configuration(), tests, args); + tests.job.waitForCompletion(true); + if (!tests.job.isSuccessful()) + System.exit(1); + } + +}