Repository: accumulo Updated Branches: refs/heads/master 6c433364f -> 2643a8c05
ACCUMULO-3479 Drops Hadoop 1 support. Removes the following uses of reflection: * Use of MapReduce Counters * Getter for Configuration from JobContext * ViewFS.resolvePath(Path) * FileSystem sync and append configs * FileSystem.getDefaultReplication * Getter for FileSystem SafeMode state * Removes bare "sync" support from WAL code Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2643a8c0 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2643a8c0 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2643a8c0 Branch: refs/heads/master Commit: 2643a8c051ce86b256209106488b5f580e31b718 Parents: 6c43336 Author: Josh Elser <els...@apache.org> Authored: Wed Jan 14 10:47:22 2015 -0500 Committer: Josh Elser <els...@apache.org> Committed: Wed Jan 14 13:40:14 2015 -0500 ---------------------------------------------------------------------- .../client/mapreduce/AbstractInputFormat.java | 34 ++--- .../mapreduce/AccumuloFileOutputFormat.java | 4 +- .../AccumuloMultiTableInputFormat.java | 2 +- .../client/mapreduce/AccumuloOutputFormat.java | 18 +-- .../core/client/mapreduce/InputFormatBase.java | 18 +-- .../simple/mapreduce/TeraSortIngest.java | 5 +- pom.xml | 20 +-- .../apache/accumulo/server/fs/ViewFSUtils.java | 20 --- .../accumulo/server/fs/VolumeManagerImpl.java | 123 +++---------------- .../server/master/recovery/HadoopLogCloser.java | 2 +- .../apache/accumulo/tserver/log/DfsLogger.java | 13 +- .../tserver/TabletServerSyncCheckTest.java | 25 +--- .../test/continuous/ContinuousMoru.java | 2 +- .../test/continuous/ContinuousVerify.java | 28 +---- 14 files changed, 64 insertions(+), 250 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/2643a8c0/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java index 384c221..5c7b780 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java @@ -17,7 +17,6 @@ package org.apache.accumulo.core.client.mapreduce; import java.io.IOException; -import java.lang.reflect.Method; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; @@ -57,7 +56,6 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -124,7 +122,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { * @see #setConnectorInfo(Job, String, AuthenticationToken) */ protected static Boolean isConnectorInfoSet(JobContext context) { - return InputConfigurator.isConnectorInfoSet(CLASS, getConfiguration(context)); + return InputConfigurator.isConnectorInfoSet(CLASS, context.getConfiguration()); } /** @@ -137,7 +135,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { * @see #setConnectorInfo(Job, String, AuthenticationToken) */ protected static String getPrincipal(JobContext context) { - return InputConfigurator.getPrincipal(CLASS, getConfiguration(context)); + return InputConfigurator.getPrincipal(CLASS, context.getConfiguration()); } /** @@ -173,7 +171,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { * @see #setConnectorInfo(Job, String, String) */ protected static AuthenticationToken getAuthenticationToken(JobContext context) { - return InputConfigurator.getAuthenticationToken(CLASS, getConfiguration(context)); + return InputConfigurator.getAuthenticationToken(CLASS, context.getConfiguration()); } /** @@ -231,7 +229,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { * @see #setMockInstance(Job, String) */ protected static Instance getInstance(JobContext context) { - return InputConfigurator.getInstance(CLASS, getConfiguration(context)); + return InputConfigurator.getInstance(CLASS, context.getConfiguration()); } /** @@ -257,7 +255,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { * @see #setLogLevel(Job, Level) */ protected static Level getLogLevel(JobContext context) { - return InputConfigurator.getLogLevel(CLASS, getConfiguration(context)); + return InputConfigurator.getLogLevel(CLASS, context.getConfiguration()); } /** @@ -282,7 +280,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { * @see #setScanAuthorizations(Job, Authorizations) */ protected static Authorizations getScanAuthorizations(JobContext context) { - return InputConfigurator.getScanAuthorizations(CLASS, getConfiguration(context)); + return InputConfigurator.getScanAuthorizations(CLASS, context.getConfiguration()); } /** @@ -294,7 +292,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { * @since 1.6.0 */ protected static Map<String,InputTableConfig> getInputTableConfigs(JobContext context) { - return InputConfigurator.getInputTableConfigs(CLASS, getConfiguration(context)); + return InputConfigurator.getInputTableConfigs(CLASS, context.getConfiguration()); } /** @@ -311,7 +309,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { * @since 1.6.0 */ protected static InputTableConfig getInputTableConfig(JobContext context, String tableName) { - return InputConfigurator.getInputTableConfig(CLASS, getConfiguration(context), tableName); + return InputConfigurator.getInputTableConfig(CLASS, context.getConfiguration(), tableName); } /** @@ -327,7 +325,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { * @since 1.6.0 */ protected static TabletLocator getTabletLocator(JobContext context, String table) throws TableNotFoundException { - return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), table); + return InputConfigurator.getTabletLocator(CLASS, context.getConfiguration(), table); } // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) @@ -341,7 +339,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { * @since 1.5.0 */ protected static void validateOptions(JobContext context) throws IOException { - InputConfigurator.validateOptions(CLASS, getConfiguration(context)); + InputConfigurator.validateOptions(CLASS, context.getConfiguration()); } /** @@ -678,16 +676,4 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { } return splits; } - - // use reflection to pull the Configuration out of the JobContext for Hadoop 1 and Hadoop 2 compatibility - public static Configuration getConfiguration(JobContext context) { - try { - Class<?> c = AbstractInputFormat.class.getClassLoader().loadClass("org.apache.hadoop.mapreduce.JobContext"); - Method m = c.getMethod("getConfiguration"); - Object o = m.invoke(context, new Object[0]); - return (Configuration) o; - } catch (Exception e) { - throw new RuntimeException(e); - } - } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/2643a8c0/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java index c68dd56..db7b689 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java @@ -62,7 +62,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { * @since 1.5.0 */ protected static AccumuloConfiguration getAccumuloConfiguration(JobContext context) { - return FileOutputConfigurator.getAccumuloConfiguration(CLASS, InputFormatBase.getConfiguration(context)); + return FileOutputConfigurator.getAccumuloConfiguration(CLASS, context.getConfiguration()); } /** @@ -138,7 +138,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { @Override public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext context) throws IOException { // get the path of the temporary output file - final Configuration conf = InputFormatBase.getConfiguration(context); + final Configuration conf = context.getConfiguration(); final AccumuloConfiguration acuConf = getAccumuloConfiguration(context); final String extension = acuConf.get(Property.TABLE_FILE_TYPE); http://git-wip-us.apache.org/repos/asf/accumulo/blob/2643a8c0/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java index 010a94f..bed0def 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java @@ -64,7 +64,7 @@ public class AccumuloMultiTableInputFormat extends AbstractInputFormat<Key,Value */ public static void setInputTableConfigs(Job job, Map<String,InputTableConfig> configs) { checkNotNull(configs); - InputConfigurator.setInputTableConfigs(CLASS, getConfiguration(job), configs); + InputConfigurator.setInputTableConfigs(CLASS, job.getConfiguration(), configs); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/2643a8c0/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java index 9a8ab58..5e0aa73 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java @@ -120,7 +120,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setConnectorInfo(Job, String, AuthenticationToken) */ protected static Boolean isConnectorInfoSet(JobContext context) { - return OutputConfigurator.isConnectorInfoSet(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.isConnectorInfoSet(CLASS, context.getConfiguration()); } /** @@ -133,7 +133,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setConnectorInfo(Job, String, AuthenticationToken) */ protected static String getPrincipal(JobContext context) { - return OutputConfigurator.getPrincipal(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.getPrincipal(CLASS, context.getConfiguration()); } /** @@ -169,7 +169,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setConnectorInfo(Job, String, String) */ protected static AuthenticationToken getAuthenticationToken(JobContext context) { - return OutputConfigurator.getAuthenticationToken(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.getAuthenticationToken(CLASS, context.getConfiguration()); } /** @@ -227,7 +227,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setMockInstance(Job, String) */ protected static Instance getInstance(JobContext context) { - return OutputConfigurator.getInstance(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.getInstance(CLASS, context.getConfiguration()); } /** @@ -253,7 +253,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setLogLevel(Job, Level) */ protected static Level getLogLevel(JobContext context) { - return OutputConfigurator.getLogLevel(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.getLogLevel(CLASS, context.getConfiguration()); } /** @@ -280,7 +280,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setDefaultTableName(Job, String) */ protected static String getDefaultTableName(JobContext context) { - return OutputConfigurator.getDefaultTableName(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.getDefaultTableName(CLASS, context.getConfiguration()); } /** @@ -307,7 +307,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setBatchWriterOptions(Job, BatchWriterConfig) */ protected static BatchWriterConfig getBatchWriterOptions(JobContext context) { - return OutputConfigurator.getBatchWriterOptions(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.getBatchWriterOptions(CLASS, context.getConfiguration()); } /** @@ -336,7 +336,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setCreateTables(Job, boolean) */ protected static Boolean canCreateTables(JobContext context) { - return OutputConfigurator.canCreateTables(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.canCreateTables(CLASS, context.getConfiguration()); } /** @@ -365,7 +365,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @see #setSimulationMode(Job, boolean) */ protected static Boolean getSimulationMode(JobContext context) { - return OutputConfigurator.getSimulationMode(CLASS, InputFormatBase.getConfiguration(context)); + return OutputConfigurator.getSimulationMode(CLASS, context.getConfiguration()); } /** http://git-wip-us.apache.org/repos/asf/accumulo/blob/2643a8c0/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java index a60cb80..d81030d 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java @@ -62,7 +62,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { * @see #setInputTableName(Job, String) */ protected static String getInputTableName(JobContext context) { - return InputConfigurator.getInputTableName(CLASS, getConfiguration(context)); + return InputConfigurator.getInputTableName(CLASS, context.getConfiguration()); } /** @@ -101,7 +101,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { * @see #setRanges(Job, Collection) */ protected static List<Range> getRanges(JobContext context) throws IOException { - return InputConfigurator.getRanges(CLASS, getConfiguration(context)); + return InputConfigurator.getRanges(CLASS, context.getConfiguration()); } /** @@ -128,7 +128,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { * @see #fetchColumns(Job, Collection) */ protected static Set<Pair<Text,Text>> getFetchedColumns(JobContext context) { - return InputConfigurator.getFetchedColumns(CLASS, getConfiguration(context)); + return InputConfigurator.getFetchedColumns(CLASS, context.getConfiguration()); } /** @@ -154,7 +154,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { * @see #addIterator(Job, IteratorSetting) */ protected static List<IteratorSetting> getIterators(JobContext context) { - return InputConfigurator.getIterators(CLASS, getConfiguration(context)); + return InputConfigurator.getIterators(CLASS, context.getConfiguration()); } /** @@ -185,7 +185,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { * @see #setAutoAdjustRanges(Job, boolean) */ protected static boolean getAutoAdjustRanges(JobContext context) { - return InputConfigurator.getAutoAdjustRanges(CLASS, getConfiguration(context)); + return InputConfigurator.getAutoAdjustRanges(CLASS, context.getConfiguration()); } /** @@ -214,7 +214,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { * @see #setScanIsolation(Job, boolean) */ protected static boolean isIsolated(JobContext context) { - return InputConfigurator.isIsolated(CLASS, getConfiguration(context)); + return InputConfigurator.isIsolated(CLASS, context.getConfiguration()); } /** @@ -244,7 +244,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { * @see #setLocalIterators(Job, boolean) */ protected static boolean usesLocalIterators(JobContext context) { - return InputConfigurator.usesLocalIterators(CLASS, getConfiguration(context)); + return InputConfigurator.usesLocalIterators(CLASS, context.getConfiguration()); } /** @@ -292,7 +292,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { * @see #setOfflineTableScan(Job, boolean) */ protected static boolean isOfflineScan(JobContext context) { - return InputConfigurator.isOfflineScan(CLASS, getConfiguration(context)); + return InputConfigurator.isOfflineScan(CLASS, context.getConfiguration()); } /** @@ -308,7 +308,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { */ @Deprecated protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException { - return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), InputConfigurator.getInputTableName(CLASS, getConfiguration(context))); + return InputConfigurator.getTabletLocator(CLASS, context.getConfiguration(), InputConfigurator.getInputTableName(CLASS, context.getConfiguration())); } protected abstract static class RecordReaderBase<K,V> extends AbstractRecordReader<K,V> { http://git-wip-us.apache.org/repos/asf/accumulo/blob/2643a8c0/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java index 8c48877..7870688 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java @@ -28,7 +28,6 @@ import java.util.Random; import org.apache.accumulo.core.cli.MapReduceClientOnRequiredTable; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; -import org.apache.accumulo.core.client.mapreduce.InputFormatBase; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.hadoop.conf.Configuration; @@ -166,8 +165,8 @@ public class TeraSortIngest extends Configured implements Tool { */ @Override public List<InputSplit> getSplits(JobContext job) { - long totalRows = InputFormatBase.getConfiguration(job).getLong(NUMROWS, 0); - int numSplits = InputFormatBase.getConfiguration(job).getInt(NUMSPLITS, 1); + long totalRows = job.getConfiguration().getLong(NUMROWS, 0); + int numSplits = job.getConfiguration().getInt(NUMSPLITS, 1); long rowsPerSplit = totalRows / numSplits; System.out.println("Generating " + totalRows + " using " + numSplits + " maps with step of " + rowsPerSplit); ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numSplits); http://git-wip-us.apache.org/repos/asf/accumulo/blob/2643a8c0/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b016a33..6b9ed4a 100644 --- a/pom.xml +++ b/pom.xml @@ -121,7 +121,7 @@ <bouncycastle.version>1.50</bouncycastle.version> <!-- surefire/failsafe plugin option --> <forkCount>1</forkCount> - <!-- overwritten in profiles hadoop-1 or hadoop-2 --> + <!-- overwritten in profiles hadoop-2 --> <hadoop.version>2.2.0</hadoop.version> <htrace.version>3.0.4</htrace.version> <httpclient.version>3.1</httpclient.version> @@ -135,7 +135,7 @@ <!-- surefire/failsafe plugin option --> <reuseForks>false</reuseForks> <sealJars>false</sealJars> - <!-- overwritten in profiles hadoop-1 or hadoop-2 --> + <!-- overwritten in profiles hadoop-2 --> <slf4j.version>1.7.5</slf4j.version> <!-- Thrift version --> <thrift.version>0.9.1</thrift.version> @@ -1308,22 +1308,6 @@ <slf4j.version>1.7.5</slf4j.version> </properties> </profile> - <!-- profile for building against Hadoop 1.x - Activate using: mvn -Dhadoop.profile=1 --> - <profile> - <id>hadoop-1</id> - <activation> - <property> - <name>hadoop.profile</name> - <value>1</value> - </property> - </activation> - <properties> - <hadoop.version>1.2.1</hadoop.version> - <httpclient.version>3.0.1</httpclient.version> - <slf4j.version>1.7.5</slf4j.version> - </properties> - </profile> <!-- profile for building against Hadoop 2.x XXX Since this is the default, make sure to sync hadoop-default when changing. Activate using: mvn -Dhadoop.profile=2 --> http://git-wip-us.apache.org/repos/asf/accumulo/blob/2643a8c0/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java b/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java index 73535d9..9c15e16 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java @@ -17,8 +17,6 @@ package org.apache.accumulo.server.fs; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -48,24 +46,6 @@ public class ViewFSUtils { return fs.getClass().getName().equals(VIEWFS_CLASSNAME); } - public static Path resolvePath(FileSystem fs, Path path) throws IOException { - // resolve path is new hadoop 2 so call it via reflection - try { - Method method = fs.getClass().getMethod("resolvePath", Path.class); - return (Path) method.invoke(fs, path); - } catch (IllegalArgumentException e) { - throw new IOException(e); - } catch (IllegalAccessException e) { - throw new IOException(e); - } catch (InvocationTargetException e) { - throw new IOException(e); - } catch (SecurityException e) { - throw new IOException(e); - } catch (NoSuchMethodException e) { - throw new IOException(e); - } - } - public static Path matchingFileSystem(Path source, String[] options, Configuration conf) throws IOException { if (!isViewFS(source, conf)) http://git-wip-us.apache.org/repos/asf/accumulo/blob/2643a8c0/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index 7a1221c..7d40b9a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -19,8 +19,6 @@ package org.apache.accumulo.server.fs; import static com.google.common.base.Preconditions.checkNotNull; import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URI; import java.util.ArrayList; @@ -56,8 +54,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; @@ -222,66 +220,30 @@ public class VolumeManagerImpl implements VolumeManager { protected void ensureSyncIsEnabled() { for (Entry<String,Volume> entry : getFileSystems().entrySet()) { - final String volumeName = entry.getKey(); FileSystem fs = entry.getValue().getFileSystem(); if (fs instanceof DistributedFileSystem) { - final String DFS_DURABLE_SYNC = "dfs.durable.sync", DFS_SUPPORT_APPEND = "dfs.support.append"; + // Avoid use of DFSConfigKeys since it's private + final String DFS_SUPPORT_APPEND = "dfs.support.append", DFS_DATANODE_SYNCONCLOSE = "dfs.datanode.synconclose"; final String ticketMessage = "See ACCUMULO-623 and ACCUMULO-1637 for more details."; - // Check to make sure that we have proper defaults configured - try { - // If the default is off (0.20.205.x or 1.0.x) - DFSConfigKeys configKeys = new DFSConfigKeys(); - - // Can't use the final constant itself as Java will inline it at compile time - Field dfsSupportAppendDefaultField = configKeys.getClass().getField("DFS_SUPPORT_APPEND_DEFAULT"); - boolean dfsSupportAppendDefaultValue = dfsSupportAppendDefaultField.getBoolean(configKeys); - - if (!dfsSupportAppendDefaultValue) { - // See if the user did the correct override - if (!fs.getConf().getBoolean(DFS_SUPPORT_APPEND, false)) { - String msg = "Accumulo requires that dfs.support.append to true. " + ticketMessage; - log.fatal(msg); - throw new RuntimeException(msg); - } - } - } catch (NoSuchFieldException e) { - // If we can't find DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT, the user is running - // 1.1.x or 1.2.x. This is ok, though, as, by default, these versions have append/sync enabled. - } catch (Exception e) { - log.warn("Error while checking for " + DFS_SUPPORT_APPEND + " on volume " + volumeName - + ". The user should ensure that Hadoop is configured to properly supports append and sync. " + ticketMessage, e); - } // If either of these parameters are configured to be false, fail. // This is a sign that someone is writing bad configuration. - if (!fs.getConf().getBoolean(DFS_SUPPORT_APPEND, true) || !fs.getConf().getBoolean(DFS_DURABLE_SYNC, true)) { - String msg = "Accumulo requires that " + DFS_SUPPORT_APPEND + " and " + DFS_DURABLE_SYNC + " not be configured as false. " + ticketMessage; + if (!fs.getConf().getBoolean(DFS_SUPPORT_APPEND, true)) { + String msg = "Accumulo requires that " + DFS_SUPPORT_APPEND + " not be configured as false. " + ticketMessage; log.fatal(msg); throw new RuntimeException(msg); } - try { - // Check DFSConfigKeys to see if DFS_DATANODE_SYNCONCLOSE_KEY exists (should be everything >=1.1.1 and the 0.23 line) - Class<?> dfsConfigKeysClz = Class.forName("org.apache.hadoop.hdfs.DFSConfigKeys"); - dfsConfigKeysClz.getDeclaredField("DFS_DATANODE_SYNCONCLOSE_KEY"); - - // Everything else - if (!fs.getConf().getBoolean("dfs.datanode.synconclose", false)) { - // Only warn once per process per volume URI - synchronized (WARNED_ABOUT_SYNCONCLOSE) { - if (!WARNED_ABOUT_SYNCONCLOSE.contains(entry.getKey())) { - WARNED_ABOUT_SYNCONCLOSE.add(entry.getKey()); - log.warn("dfs.datanode.synconclose set to false in hdfs-site.xml: data loss is possible on hard system reset or power loss"); - } + // Warn if synconclose isn't set + if (!fs.getConf().getBoolean(DFS_DATANODE_SYNCONCLOSE, false)) { + // Only warn once per process per volume URI + synchronized (WARNED_ABOUT_SYNCONCLOSE) { + if (!WARNED_ABOUT_SYNCONCLOSE.contains(entry.getKey())) { + WARNED_ABOUT_SYNCONCLOSE.add(entry.getKey()); + log.warn(DFS_DATANODE_SYNCONCLOSE + " set to false in hdfs-site.xml: data loss is possible on hard system reset or power loss"); } } - } catch (ClassNotFoundException ex) { - // hadoop 1.0.X or hadoop 1.1.0 - } catch (SecurityException e) { - // hadoop 1.0.X or hadoop 1.1.0 - } catch (NoSuchFieldException e) { - // hadoop 1.0.X or hadoop 1.1.0 } } } @@ -370,24 +332,7 @@ public class VolumeManagerImpl implements VolumeManager { @Override public short getDefaultReplication(Path path) { Volume v = getVolumeByPath(path); - FileSystem fs = v.getFileSystem(); - try { - // try calling hadoop 2 method - Method method = fs.getClass().getMethod("getDefaultReplication", Path.class); - return ((Short) method.invoke(fs, path)).shortValue(); - } catch (NoSuchMethodException e) { - // ignore - } catch (IllegalArgumentException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - throw new RuntimeException(e); - } - - @SuppressWarnings("deprecation") - short rep = fs.getDefaultReplication(); - return rep; + return v.getFileSystem().getDefaultReplication(path); } @Override @@ -431,44 +376,16 @@ public class VolumeManagerImpl implements VolumeManager { @Override public boolean isReady() throws IOException { for (Volume volume : getFileSystems().values()) { - FileSystem fs = volume.getFileSystem(); + final FileSystem fs = volume.getFileSystem(); if (!(fs instanceof DistributedFileSystem)) continue; - DistributedFileSystem dfs = (DistributedFileSystem) fs; - // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET)) - // Becomes this: - Class<?> safeModeAction; - try { - // hadoop 2.0 - safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction"); - } catch (ClassNotFoundException ex) { - // hadoop 1.0 - try { - safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction"); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Cannot figure out the right class for Constants"); - } - } - Object get = null; - for (Object obj : safeModeAction.getEnumConstants()) { - if (obj.toString().equals("SAFEMODE_GET")) - get = obj; - } - if (get == null) { - throw new RuntimeException("cannot find SAFEMODE_GET"); - } - try { - Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction); - boolean inSafeMode = (Boolean) setSafeMode.invoke(dfs, get); - if (inSafeMode) { - return false; - } - } catch (IllegalArgumentException exception) { - /* Send IAEs back as-is, so that those that wrap UnknownHostException can be handled in the same place as similar sources of failure. */ - throw exception; - } catch (Exception ex) { - throw new RuntimeException("cannot find method setSafeMode"); + + final DistributedFileSystem dfs = (DistributedFileSystem) fs; + + // Returns true when safemode is on + if (dfs.setSafeMode(SafeModeAction.SAFEMODE_GET)) { + return false; } } return true; http://git-wip-us.apache.org/repos/asf/accumulo/blob/2643a8c0/server/base/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java index 64ab011..e7ef77f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java @@ -41,7 +41,7 @@ public class HadoopLogCloser implements LogCloser { // if path points to a viewfs path, then resolve to underlying filesystem if (ViewFSUtils.isViewFS(ns)) { - Path newSource = ViewFSUtils.resolvePath(ns, source); + Path newSource = ns.resolvePath(source); if (!newSource.equals(source) && newSource.toUri().getScheme() != null) { ns = newSource.getFileSystem(CachedConfiguration.getInstance()); source = newSource; http://git-wip-us.apache.org/repos/asf/accumulo/blob/2643a8c0/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index 6f9be7d..5acf5eb 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -404,17 +404,8 @@ public class DfsLogger { else logFile = fs.create(new Path(logPath), true, 0, replication, blockSize); - try { - sync = logFile.getClass().getMethod("hsync"); - flush = logFile.getClass().getMethod("hflush"); - } catch (Exception ex) { - try { - // fall back to sync: send data to datanodes - flush = sync = logFile.getClass().getMethod("sync"); - } catch (Exception e) { - throw new RuntimeException(e); - } - } + sync = logFile.getClass().getMethod("hsync"); + flush = logFile.getClass().getMethod("hflush"); // Initialize the crypto operations. org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory.getCryptoModule(conf http://git-wip-us.apache.org/repos/asf/accumulo/blob/2643a8c0/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java index 1a3f9fc..65282bb 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java @@ -39,30 +39,7 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; public class TabletServerSyncCheckTest { - private static final String DFS_DURABLE_SYNC = "dfs.durable.sync", DFS_SUPPORT_APPEND = "dfs.support.append"; - - @Test(expected = RuntimeException.class) - public void testFailureOnExplicitSyncFalseConf() { - Configuration conf = new Configuration(); - conf.set(DFS_DURABLE_SYNC, "false"); - - FileSystem fs = new TestFileSystem(conf); - TestVolumeManagerImpl vm = new TestVolumeManagerImpl(ImmutableMap.<String,Volume> of("foo", new VolumeImpl(fs, "/"))); - - vm.ensureSyncIsEnabled(); - } - - @Test(expected = RuntimeException.class) - public void testFailureOnSingleExplicitSyncFalseConf() { - Configuration conf1 = new Configuration(), conf2 = new Configuration(); - conf1.set(DFS_DURABLE_SYNC, "false"); - - FileSystem fs1 = new TestFileSystem(conf1); - FileSystem fs2 = new TestFileSystem(conf2); - TestVolumeManagerImpl vm = new TestVolumeManagerImpl(ImmutableMap.<String,Volume> of("bar", new VolumeImpl(fs2, "/"), "foo", new VolumeImpl(fs1, "/"))); - - vm.ensureSyncIsEnabled(); - } + private static final String DFS_SUPPORT_APPEND = "dfs.support.append"; @Test(expected = RuntimeException.class) public void testFailureOnExplicitAppendFalseConf() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/2643a8c0/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java index 89ff515..0cee5ca 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java @@ -111,7 +111,7 @@ public class ContinuousMoru extends Configured implements Tool { } } else { - ContinuousVerify.increment(context.getCounter(Counts.SELF_READ)); + context.getCounter(Counts.SELF_READ).increment(1l); } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/2643a8c0/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java index 461d226..1171ea3 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java @@ -19,7 +19,6 @@ package org.apache.accumulo.test.continuous; import static java.nio.charset.StandardCharsets.UTF_8; import java.io.IOException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -39,7 +38,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.VLongWritable; -import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; @@ -57,24 +55,6 @@ import com.beust.jcommander.validators.PositiveInteger; public class ContinuousVerify extends Configured implements Tool { - // work around hadoop-1/hadoop-2 runtime incompatibility - static private Method INCREMENT; - static { - try { - INCREMENT = Counter.class.getMethod("increment", Long.TYPE); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - static void increment(Object obj) { - try { - INCREMENT.invoke(obj, 1L); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - public static final VLongWritable DEF = new VLongWritable(-1); public static class CMapper extends Mapper<Key,Value,LongWritable,VLongWritable> { @@ -95,7 +75,7 @@ public class ContinuousVerify extends Configured implements Tool { try { ContinuousWalk.validate(key, data); } catch (BadChecksumException bce) { - increment(context.getCounter(Counts.CORRUPT)); + context.getCounter(Counts.CORRUPT).increment(1l); if (corrupt < 1000) { log.error("Bad checksum : " + key); } else if (corrupt == 1000) { @@ -150,12 +130,12 @@ public class ContinuousVerify extends Configured implements Tool { } context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString())); - increment(context.getCounter(Counts.UNDEFINED)); + context.getCounter(Counts.UNDEFINED).increment(1l); } else if (defCount > 0 && refs.size() == 0) { - increment(context.getCounter(Counts.UNREFERENCED)); + context.getCounter(Counts.UNREFERENCED).increment(1l); } else { - increment(context.getCounter(Counts.REFERENCED)); + context.getCounter(Counts.REFERENCED).increment(1l); } }