Repository: accumulo Updated Branches: refs/heads/1.6 0ca44d7df -> 343b7359b
ACCUMULO-3533 Moving static Hadoop 1 & 2 compatibility method getConfiguration() to it's own util class outside of the client API. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/343b7359 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/343b7359 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/343b7359 Branch: refs/heads/1.6 Commit: 343b7359ba86ef8038082c306e8d9deec20648f4 Parents: 0ca44d7 Author: Corey J. Nolet <cjno...@gmail.com> Authored: Mon Jan 26 13:44:38 2015 -0500 Committer: Corey J. Nolet <cjno...@gmail.com> Committed: Mon Jan 26 14:03:27 2015 -0500 ---------------------------------------------------------------------- .../client/mapreduce/AbstractInputFormat.java | 19 +++----- .../mapreduce/AccumuloFileOutputFormat.java | 5 ++- .../AccumuloMultiTableInputFormat.java | 3 +- .../client/mapreduce/AccumuloOutputFormat.java | 19 ++++---- .../core/client/mapreduce/InputFormatBase.java | 3 +- .../accumulo/core/util/HadoopCompatUtil.java | 47 ++++++++++++++++++++ .../simple/mapreduce/TeraSortIngest.java | 7 +-- 7 files changed, 74 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/343b7359/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java index bcbfddc..95fed00 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java @@ -17,7 +17,6 @@ package org.apache.accumulo.core.client.mapreduce; import java.io.IOException; -import java.lang.reflect.Method; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; @@ -54,6 +53,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.util.HadoopCompatUtil; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.hadoop.conf.Configuration; @@ -71,7 +71,7 @@ import org.apache.log4j.Logger; * An abstract input format to provide shared methods common to all other input format classes. At the very least, any classes inheriting from this class will * need to define their own {@link RecordReader}. */ -public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { +public abstract class AbstractInputFormat<K, V> extends InputFormat<K,V> { protected static final Class<?> CLASS = AccumuloInputFormat.class; protected static final Logger log = Logger.getLogger(CLASS); @@ -329,6 +329,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { } // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) + /** * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}. * @@ -354,7 +355,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { * <li>int {@link #numKeysRead} (used for progress reporting)</li> * </ul> */ - protected abstract static class AbstractRecordReader<K,V> extends RecordReader<K,V> { + protected abstract static class AbstractRecordReader<K, V> extends RecordReader<K,V> { protected long numKeysRead; protected Iterator<Map.Entry<Key,Value>> scannerIterator; protected RangeInputSplit split; @@ -675,15 +676,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { return splits; } - // use reflection to pull the Configuration out of the JobContext for Hadoop 1 and Hadoop 2 compatibility - public static Configuration getConfiguration(JobContext context) { - try { - Class<?> c = AbstractInputFormat.class.getClassLoader().loadClass("org.apache.hadoop.mapreduce.JobContext"); - Method m = c.getMethod("getConfiguration"); - Object o = m.invoke(context, new Object[0]); - return (Configuration) o; - } catch (Exception e) { - throw new RuntimeException(e); - } + protected static Configuration getConfiguration(JobContext context) { + return HadoopCompatUtil.getConfiguration(context); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/343b7359/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java index c68dd56..500f072 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Arrays; import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator; +import org.apache.accumulo.core.util.HadoopCompatUtil; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.ArrayByteSequence; @@ -62,7 +63,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { * @since 1.5.0 */ protected static AccumuloConfiguration getAccumuloConfiguration(JobContext context) { - return FileOutputConfigurator.getAccumuloConfiguration(CLASS, InputFormatBase.getConfiguration(context)); + return FileOutputConfigurator.getAccumuloConfiguration(CLASS, HadoopCompatUtil.getConfiguration(context)); } /** @@ -138,7 +139,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { @Override public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext context) throws IOException { // get the path of the temporary output file - final Configuration conf = InputFormatBase.getConfiguration(context); + final Configuration conf = HadoopCompatUtil.getConfiguration(context); final AccumuloConfiguration acuConf = getAccumuloConfiguration(context); final String extension = acuConf.get(Property.TABLE_FILE_TYPE); http://git-wip-us.apache.org/repos/asf/accumulo/blob/343b7359/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java index 010a94f..14950bc 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java @@ -26,6 +26,7 @@ import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; +import org.apache.accumulo.core.util.HadoopCompatUtil; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; @@ -64,7 +65,7 @@ public class AccumuloMultiTableInputFormat extends AbstractInputFormat<Key,Value */ public static void setInputTableConfigs(Job job, Map<String,InputTableConfig> configs) { checkNotNull(configs); - InputConfigurator.setInputTableConfigs(CLASS, getConfiguration(job), configs); + InputConfigurator.setInputTableConfigs(CLASS, HadoopCompatUtil.getConfiguration(job), configs); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/343b7359/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java index 0f495f0..a09b37f 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator; +import org.apache.accumulo.core.util.HadoopCompatUtil; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.SecurityErrorCode; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; @@ -120,7 +121,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setConnectorInfo(Job, String, AuthenticationToken) */ protected static Boolean isConnectorInfoSet(JobContext context) { - return OutputConfigurator.isConnectorInfoSet(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.isConnectorInfoSet(CLASS, HadoopCompatUtil.getConfiguration(context)); } /** @@ -133,7 +134,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setConnectorInfo(Job, String, AuthenticationToken) */ protected static String getPrincipal(JobContext context) { - return OutputConfigurator.getPrincipal(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.getPrincipal(CLASS, HadoopCompatUtil.getConfiguration(context)); } /** @@ -169,7 +170,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setConnectorInfo(Job, String, String) */ protected static AuthenticationToken getAuthenticationToken(JobContext context) { - return OutputConfigurator.getAuthenticationToken(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.getAuthenticationToken(CLASS, HadoopCompatUtil.getConfiguration(context)); } /** @@ -226,7 +227,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setMockInstance(Job, String) */ protected static Instance getInstance(JobContext context) { - return OutputConfigurator.getInstance(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.getInstance(CLASS, HadoopCompatUtil.getConfiguration(context)); } /** @@ -252,7 +253,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setLogLevel(Job, Level) */ protected static Level getLogLevel(JobContext context) { - return OutputConfigurator.getLogLevel(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.getLogLevel(CLASS, HadoopCompatUtil.getConfiguration(context)); } /** @@ -279,7 +280,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setDefaultTableName(Job, String) */ protected static String getDefaultTableName(JobContext context) { - return OutputConfigurator.getDefaultTableName(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.getDefaultTableName(CLASS, HadoopCompatUtil.getConfiguration(context)); } /** @@ -306,7 +307,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setBatchWriterOptions(Job, BatchWriterConfig) */ protected static BatchWriterConfig getBatchWriterOptions(JobContext context) { - return OutputConfigurator.getBatchWriterOptions(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.getBatchWriterOptions(CLASS, HadoopCompatUtil.getConfiguration(context)); } /** @@ -335,7 +336,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setCreateTables(Job, boolean) */ protected static Boolean canCreateTables(JobContext context) { - return OutputConfigurator.canCreateTables(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.canCreateTables(CLASS, HadoopCompatUtil.getConfiguration(context)); } /** @@ -364,7 +365,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setSimulationMode(Job, boolean) */ protected static Boolean getSimulationMode(JobContext context) { - return OutputConfigurator.getSimulationMode(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.getSimulationMode(CLASS, HadoopCompatUtil.getConfiguration(context)); } /** http://git-wip-us.apache.org/repos/asf/accumulo/blob/343b7359/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java index a60cb80..de65f1d 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java @@ -308,7 +308,8 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { */ @Deprecated protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException { - return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), InputConfigurator.getInputTableName(CLASS, getConfiguration(context))); + return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), InputConfigurator.getInputTableName(CLASS, + getConfiguration(context))); } protected abstract static class RecordReaderBase<K,V> extends AbstractRecordReader<K,V> { http://git-wip-us.apache.org/repos/asf/accumulo/blob/343b7359/core/src/main/java/org/apache/accumulo/core/util/HadoopCompatUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/util/HadoopCompatUtil.java b/core/src/main/java/org/apache/accumulo/core/util/HadoopCompatUtil.java new file mode 100644 index 0000000..27e07e1 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/util/HadoopCompatUtil.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.lang.reflect.Method; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; + +/** + * Utility to help manage binary compatibility between Hadoop versions 1 and 2. + */ +public class HadoopCompatUtil { + + /** + * Uses reflection to pull Configuration out of the JobContext for Hadoop 1 and Hadoop2 compatibility + * @param context + * The job context for which to return the configuration + * @return + * The Hadoop Configuration- irrespective of the version of Hadoop on the classpath. + */ + public static Configuration getConfiguration(JobContext context) { + try { + Class<?> c = HadoopCompatUtil.class.getClassLoader().loadClass("org.apache.hadoop.mapreduce.JobContext"); + Method m = c.getMethod("getConfiguration"); + Object o = m.invoke(context, new Object[0]); + return (Configuration) o; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/343b7359/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java index 1b8cbaf..f591e2c 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java @@ -28,7 +28,7 @@ import java.util.Random; import org.apache.accumulo.core.cli.ClientOnRequiredTable; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; -import org.apache.accumulo.core.client.mapreduce.InputFormatBase; +import org.apache.accumulo.core.util.HadoopCompatUtil; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.hadoop.conf.Configuration; @@ -50,6 +50,7 @@ import org.apache.hadoop.util.ToolRunner; import com.beust.jcommander.Parameter; + /** * Generate the *almost* official terasort input data set. (See below) The user specifies the number of rows and the output directory and this class runs a * map/reduce program to generate the data. The format of the data is: @@ -166,8 +167,8 @@ public class TeraSortIngest extends Configured implements Tool { */ @Override public List<InputSplit> getSplits(JobContext job) { - long totalRows = InputFormatBase.getConfiguration(job).getLong(NUMROWS, 0); - int numSplits = InputFormatBase.getConfiguration(job).getInt(NUMSPLITS, 1); + long totalRows = HadoopCompatUtil.getConfiguration(job).getLong(NUMROWS, 0); + int numSplits = HadoopCompatUtil.getConfiguration(job).getInt(NUMSPLITS, 1); long rowsPerSplit = totalRows / numSplits; System.out.println("Generating " + totalRows + " using " + numSplits + " maps with step of " + rowsPerSplit); ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numSplits);