ACCUMULO-2566 More reflection for hadoop 1/2 Pull counter reflection out of ContinuousVerify and apply it to server utils that are using it. Add reflection to TeraSortIngest.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f0759dcb Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f0759dcb Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f0759dcb Branch: refs/heads/1.5.2-SNAPSHOT Commit: f0759dcb8b983250340655979c9163cb7297aeea Parents: 9e5854f Author: Mike Drob <md...@cloudera.com> Authored: Thu Mar 27 13:26:38 2014 -0400 Committer: Mike Drob <md...@cloudera.com> Committed: Fri Mar 28 17:10:18 2014 -0400 ---------------------------------------------------------------------- .../simple/mapreduce/TeraSortIngest.java | 5 ++- .../server/test/continuous/ContinuousMoru.java | 5 ++- .../test/continuous/ContinuousVerify.java | 31 +++----------- .../server/test/functional/RunTests.java | 3 +- .../accumulo/server/util/CountRowKeys.java | 3 +- .../server/util/reflection/CounterUtils.java | 43 ++++++++++++++++++++ 6 files changed, 58 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f0759dcb/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java ---------------------------------------------------------------------- diff --git a/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java b/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java index 0ff2c19..b2d3f4c 100644 --- a/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java +++ b/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Random; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.client.mapreduce.InputFormatBase; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.util.CachedConfiguration; @@ -157,8 +158,8 @@ public class TeraSortIngest extends Configured implements Tool { * Create the desired number of splits, dividing the number of rows between the mappers. */ public List<InputSplit> getSplits(JobContext job) { - long totalRows = job.getConfiguration().getLong(NUMROWS, 0); - int numSplits = job.getConfiguration().getInt(NUMSPLITS, 1); + long totalRows = InputFormatBase.getConfiguration(job).getLong(NUMROWS, 0); + int numSplits = InputFormatBase.getConfiguration(job).getInt(NUMSPLITS, 1); long rowsPerSplit = totalRows / numSplits; System.out.println("Generating " + totalRows + " using " + numSplits + " maps with step of " + rowsPerSplit); ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numSplits); http://git-wip-us.apache.org/repos/asf/accumulo/blob/f0759dcb/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java index 88fbb25..443b79d 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java +++ b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java @@ -30,6 +30,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.server.util.reflection.CounterUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.Text; @@ -102,7 +103,7 @@ public class ContinuousMoru extends Configured implements Tool { } } else { - ContinuousVerify.increment(context.getCounter(Counts.SELF_READ)); + CounterUtils.increment(context.getCounter(Counts.SELF_READ)); } } } @@ -177,4 +178,4 @@ public class ContinuousMoru extends Configured implements Tool { if (res != 0) System.exit(res); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f0759dcb/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java index 4b465a8..c67310a 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java +++ b/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java @@ -17,7 +17,6 @@ package org.apache.accumulo.server.test.continuous; import java.io.IOException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -34,12 +33,12 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.server.test.continuous.ContinuousWalk.BadChecksumException; +import org.apache.accumulo.server.util.reflection.CounterUtils; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.VLongWritable; -import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; @@ -52,26 +51,6 @@ import org.apache.hadoop.util.ToolRunner; */ public class ContinuousVerify extends Configured implements Tool { - // work around hadoop-1/hadoop-2 runtime incompatibility - static private Method INCREMENT; - static { - try { - Class<Counter> counter = Counter.class; - - INCREMENT = counter.getMethod("increment", Long.TYPE); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - static void increment(Object obj) { - try { - INCREMENT.invoke(obj, 1L); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - public static final VLongWritable DEF = new VLongWritable(-1); public static class CMapper extends Mapper<Key,Value,LongWritable,VLongWritable> { @@ -90,7 +69,7 @@ public class ContinuousVerify extends Configured implements Tool { try { ContinuousWalk.validate(key, data); } catch (BadChecksumException bce) { - increment(context.getCounter(Counts.CORRUPT)); + CounterUtils.increment(context.getCounter(Counts.CORRUPT)); if (corrupt < 1000) { System.out.println("ERROR Bad checksum : " + key); } else if (corrupt == 1000) { @@ -144,12 +123,12 @@ public class ContinuousVerify extends Configured implements Tool { } context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString())); - increment(context.getCounter(Counts.UNDEFINED)); + CounterUtils.increment(context.getCounter(Counts.UNDEFINED)); } else if (defCount > 0 && refs.size() == 0) { - increment(context.getCounter(Counts.UNREFERENCED)); + CounterUtils.increment(context.getCounter(Counts.UNREFERENCED)); } else { - increment(context.getCounter(Counts.REFERENCED)); + CounterUtils.increment(context.getCounter(Counts.REFERENCED)); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/f0759dcb/src/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java b/src/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java index 7a7e7d3..c21d3a1 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java +++ b/src/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import org.apache.accumulo.server.logger.IdentityReducer; +import org.apache.accumulo.server.util.reflection.CounterUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; @@ -117,7 +118,7 @@ public class RunTests extends Configured implements Tool { if (resultLine.length() > 0) { Outcome outcome = OUTCOME_COUNTERS.get(resultLine.charAt(0)); if (outcome != null) { - context.getCounter(outcome).increment(1); + CounterUtils.increment(context.getCounter(outcome)); } } String taskAttemptId = context.getTaskAttemptID().toString(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/f0759dcb/src/server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java b/src/server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java index 5676394..88b2dfb 100644 --- a/src/server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java +++ b/src/server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java @@ -22,6 +22,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.util.reflection.CounterUtils; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; @@ -49,7 +50,7 @@ public class CountRowKeys extends Configured implements Tool { } public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException { - context.getCounter(Count.uniqueRows).increment(1); + CounterUtils.increment(context.getCounter(Count.uniqueRows)); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/f0759dcb/src/server/src/main/java/org/apache/accumulo/server/util/reflection/CounterUtils.java ---------------------------------------------------------------------- diff --git a/src/server/src/main/java/org/apache/accumulo/server/util/reflection/CounterUtils.java b/src/server/src/main/java/org/apache/accumulo/server/util/reflection/CounterUtils.java new file mode 100644 index 0000000..dbd5f60 --- /dev/null +++ b/src/server/src/main/java/org/apache/accumulo/server/util/reflection/CounterUtils.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util.reflection; + +import java.lang.reflect.Method; + +import org.apache.hadoop.mapreduce.Counter; + +/** + * Utility class for incrementing counters in a compatible way between hadoop 1 and 2 + */ +public class CounterUtils { + static private Method INCREMENT; + static { + try { + INCREMENT = Counter.class.getMethod("increment", Long.TYPE); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + public static void increment(Counter counter) { + try { + INCREMENT.invoke(counter, 1L); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } +}