http://git-wip-us.apache.org/repos/asf/accumulo/blob/23bb4321/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java index a9d7153,0000000..c82d03b mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java @@@ -1,265 -1,0 +1,244 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.continuous; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import org.apache.accumulo.core.cli.ClientOnDefaultTable; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.test.continuous.ContinuousWalk.BadChecksumException; +import org.apache.hadoop.conf.Configured; - import org.apache.hadoop.filecache.DistributedCache; - import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.VLongWritable; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; - import org.apache.log4j.Logger; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.validators.PositiveInteger; + +/** + * A map reduce job that verifies a table created by continuous ingest. It verifies that all referenced nodes are defined. + */ + +public class ContinuousVerify extends Configured implements Tool { - private static final Logger log = Logger.getLogger(ContinuousVerify.class); + + // work around hadoop-1/hadoop-2 runtime incompatibility + static private Method INCREMENT; + static { + try { + INCREMENT = Counter.class.getMethod("increment", Long.TYPE); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + static void increment(Object obj) { + try { + INCREMENT.invoke(obj, 1L); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + public static final VLongWritable DEF = new VLongWritable(-1); + + public static class CMapper extends Mapper<Key,Value,LongWritable,VLongWritable> { + + private LongWritable row = new LongWritable(); + private LongWritable ref = new LongWritable(); + private VLongWritable vrow = new VLongWritable(); + + private long corrupt = 0; + + @Override + public void map(Key key, Value data, Context context) throws IOException, InterruptedException { + long r = Long.parseLong(key.getRow().toString(), 16); + if (r < 0) + throw new IllegalArgumentException(); + + try { + ContinuousWalk.validate(key, data); + } catch (BadChecksumException bce) { + increment(context.getCounter(Counts.CORRUPT)); + if (corrupt < 1000) { + System.out.println("ERROR Bad checksum : " + key); + } else if (corrupt == 1000) { + System.out.println("Too many bad checksums, not printing anymore!"); + } + corrupt++; + return; + } + + row.set(r); + + context.write(row, DEF); + byte[] val = data.get(); + + int offset = ContinuousWalk.getPrevRowOffset(val); + if (offset > 0) { + ref.set(Long.parseLong(new String(val, offset, 16), 16)); + vrow.set(r); + context.write(ref, vrow); + } + } + } + + public static enum Counts { + UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT + } + + public static class CReducer extends Reducer<LongWritable,VLongWritable,Text,Text> { + private ArrayList<Long> refs = new ArrayList<Long>(); + + @Override + public void reduce(LongWritable key, Iterable<VLongWritable> values, Context context) throws IOException, InterruptedException { + + int defCount = 0; + + refs.clear(); + for (VLongWritable type : values) { + if (type.get() == -1) { + defCount++; + } else { + refs.add(type.get()); + } + } + + if (defCount == 0 && refs.size() > 0) { + StringBuilder sb = new StringBuilder(); + String comma = ""; + for (Long ref : refs) { + sb.append(comma); + comma = ","; + sb.append(new String(ContinuousIngest.genRow(ref))); + } + + context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString())); + increment(context.getCounter(Counts.UNDEFINED)); + + } else if (defCount > 0 && refs.size() == 0) { + increment(context.getCounter(Counts.UNREFERENCED)); + } else { + increment(context.getCounter(Counts.REFERENCED)); + } + + } + } + + static class Opts extends ClientOnDefaultTable { + @Parameter(names = "--output", description = "location in HDFS to store the results; must not exist", required = true) + String outputDir = "/tmp/continuousVerify"; + + @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class) + int maxMaps = 0; + + @Parameter(names = "--reducers", description = "the number of reducers to use", required = true, validateWith = PositiveInteger.class) + int reducers = 0; + + @Parameter(names = "--offline", description = "perform the verification directly on the files while the table is offline") + boolean scanOffline = false; + - @Parameter(names = "--sitefile", description = "location of accumulo-site.xml in HDFS", required = true) - String siteFile; - + public Opts() { + super("ci"); + } + } + + @Override + public int run(String[] args) throws Exception { + Opts opts = new Opts(); + opts.parseArgs(this.getClass().getName(), args); + + Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + job.setJarByClass(this.getClass()); + + job.setInputFormatClass(AccumuloInputFormat.class); + opts.setAccumuloConfigs(job); + + String clone = opts.getTableName(); + Connector conn = null; + if (opts.scanOffline) { + Random random = new Random(); + clone = opts.getTableName() + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffl)); + conn = opts.getConnector(); + conn.tableOperations().clone(opts.getTableName(), clone, true, new HashMap<String,String>(), new HashSet<String>()); + conn.tableOperations().offline(clone); + AccumuloInputFormat.setInputTableName(job, clone); + AccumuloInputFormat.setOfflineTableScan(job, true); + } + + // set up ranges + try { + Set<Range> ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps); + AccumuloInputFormat.setRanges(job, ranges); + AccumuloInputFormat.setAutoAdjustRanges(job, false); + } catch (Exception e) { + throw new IOException(e); + } + + job.setMapperClass(CMapper.class); + job.setMapOutputKeyClass(LongWritable.class); + job.setMapOutputValueClass(VLongWritable.class); + + job.setReducerClass(CReducer.class); + job.setNumReduceTasks(opts.reducers); + + job.setOutputFormatClass(TextOutputFormat.class); + + job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", opts.scanOffline); + + TextOutputFormat.setOutputPath(job, new Path(opts.outputDir)); + - Path sitePath = new Path(opts.siteFile); - Path siteParentPath = sitePath.getParent(); - if (null == siteParentPath) { - siteParentPath = new Path("/"); - } - - URI siteUri = new URI("hdfs://" + opts.siteFile); - - log.info("Adding " + siteUri + " to DistributedCache"); - - // Make sure that accumulo-site.xml is available for mappers running offline scans - // as they need to correctly choose instance.dfs.dir for the installation - DistributedCache.addFileToClassPath(siteParentPath, job.getConfiguration(), FileSystem.get(siteUri, job.getConfiguration())); - + job.waitForCompletion(true); + + if (opts.scanOffline) { + conn.tableOperations().delete(clone); + } + opts.stopTracing(); + return job.isSuccessful() ? 0 : 1; + } + + /** + * + * @param args + * instanceName zookeepers username password table columns outputpath + * @throws Exception + */ + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousVerify(), args); + if (res != 0) + System.exit(res); + } +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/23bb4321/test/system/continuous/run-verify.sh ---------------------------------------------------------------------- diff --cc test/system/continuous/run-verify.sh index 07f797f,edf58b7..a0c64d1 --- a/test/system/continuous/run-verify.sh +++ b/test/system/continuous/run-verify.sh @@@ -23,31 -23,6 +23,9 @@@ AUTH_OPT="" if [ -n "$VERIFY_AUTHS" ] ; then AUTH_OPT="--auths $VERIFY_AUTHS"; fi +SCAN_OPT=--offline +if [ "$SCAN_OFFLINE" == "false" ] ; then + SCAN_OPT= +fi - if [ ! -r $ACCUMULO_CONF_DIR/accumulo-site.xml ]; then - echo "Could not find accumulo-site.xml in $ACCUMULO_CONF_DIR" - exit 1 - fi - - TARGET_DIR="ci-conf-`date '+%s'`" - hadoop fs -mkdir $TARGET_DIR - - if [ $? -ne 0 ]; then - echo "Could not create $TAGET_DIR in HDFS" - exit 1 - fi - - hadoop fs -put $ACCUMULO_CONF_DIR/accumulo-site.xml ${TARGET_DIR}/ - - if [ $? -ne 0 ]; then - echo "Could not upload accumulo-site.xml to HDFS" - exit 1 - fi - - ABS_DIR="/user/`whoami`/${TARGET_DIR}/accumulo-site.xml" - - $ACCUMULO_HOME/bin/tool.sh "$SERVER_LIBJAR" org.apache.accumulo.test.continuous.ContinuousVerify -libjars "$SERVER_LIBJAR" $AUTH_OPT -i $INSTANCE_NAME -z $ZOO_KEEPERS -u $USER -p $PASS --table $TABLE --output $VERIFY_OUT --maxMappers $VERIFY_MAX_MAPS --reducers $VERIFY_REDUCERS --sitefile $ABS_DIR $SCAN_OPT -$ACCUMULO_HOME/bin/tool.sh "$SERVER_LIBJAR" org.apache.accumulo.server.test.continuous.ContinuousVerify -libjars "$SERVER_LIBJAR" $AUTH_OPT $INSTANCE_NAME $ZOO_KEEPERS $USER $PASS $TABLE $VERIFY_OUT $VERIFY_MAX_MAPS $VERIFY_REDUCERS $SCAN_OFFLINE - ++$ACCUMULO_HOME/bin/tool.sh "$SERVER_LIBJAR" org.apache.accumulo.test.continuous.ContinuousVerify -libjars "$SERVER_LIBJAR" $AUTH_OPT -i $INSTANCE_NAME -z $ZOO_KEEPERS -u $USER -p $PASS --table $TABLE --output $VERIFY_OUT --maxMappers $VERIFY_MAX_MAPS --reducers $VERIFY_REDUCERS $SCAN_OPT