#[IGNITE-218]: cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5cbb7ddd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5cbb7ddd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5cbb7ddd Branch: refs/heads/ignite-218 Commit: 5cbb7ddd8c985e07670e9281c36c1ba46a968a8f Parents: a28c52a Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Thu Jun 4 13:42:00 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Thu Jun 4 13:42:00 2015 +0300 ---------------------------------------------------------------------- .../processors/igfs/IgfsAbstractSelfTest.java | 2 +- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 9 +- .../internal/processors/hadoop/HadoopUtils.java | 146 +++++++------------ .../hadoop/SecondaryFileSystemProvider.java | 2 +- .../hadoop/v2/HadoopV2JobResourceManager.java | 4 +- 5 files changed, 61 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cbb7ddd/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java index 90768db..a8a8957 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java @@ -786,7 +786,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { * @throws Exception If failed. */ @SuppressWarnings("ConstantConditions") - public void _testFormat() throws Exception { + public void testFormat() throws Exception { // Test works too long and fails. fail("https://issues.apache.org/jira/browse/IGNITE-586"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cbb7ddd/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index 9b008c6..9d94e5b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -143,9 +143,6 @@ public class IgniteHadoopFileSystem extends FileSystem { /** Custom-provided sequential reads before prefetch. */ private int seqReadsBeforePrefetch; - /** The cache was disabled when the instance was creating. */ - //private boolean cacheEnabled; - /** {@inheritDoc} */ @Override public URI getUri() { if (uri == null) @@ -213,10 +210,6 @@ public class IgniteHadoopFileSystem extends FileSystem { setConf(cfg); - //String disableCacheName = String.format("fs.%s.impl.disable.cache", name.getScheme()); - - //cacheEnabled = !cfg.getBoolean(disableCacheName, false); - mgmt = cfg.getBoolean(IGFS_MANAGEMENT, false); if (!IGFS_SCHEME.equals(name.getScheme())) @@ -345,7 +338,7 @@ public class IgniteHadoopFileSystem extends FileSystem { @Override protected void finalize() throws Throwable { super.finalize(); - close0(); + close(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cbb7ddd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index f4323b6..94f1647 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -34,7 +34,6 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import java.io.*; -import java.lang.reflect.*; import java.net.*; import java.util.*; @@ -63,6 +62,32 @@ public class HadoopUtils { /** Old reducer class attribute. */ private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class"; + /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ + // TODO: cleam up this cache upon Ignite node stop, see https://issues.apache.org/jira/browse/IGNITE-980 . + private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>( + new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { + @Override public FileSystem createValue(FsCacheKey key) { + try { + assert key != null; + + // Explicitly disable FileSystem caching: + URI uri = key.uri(); + + String scheme = uri.getScheme(); + + String prop = HadoopUtils.disableFsCachePropertyName(scheme); + + key.configuration().setBoolean(prop, true); + + return FileSystem.get(uri, key.configuration(), key.user()); + } + catch (IOException | InterruptedException ioe) { + throw new IgniteException(ioe); + } + } + } + ); + /** * Constructor. */ @@ -376,8 +401,6 @@ public class HadoopUtils { * @throws IOException */ public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, boolean doCacheFs) throws IOException { - X.println("######## fileSystemForMrUser: " + HadoopUtils.class.getClassLoader()); - final String usr = getMrHadoopUser(cfg); assert usr != null; @@ -412,42 +435,8 @@ public class HadoopUtils { return fs; } - /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ - private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>( - new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { - @Override public FileSystem createValue(FsCacheKey key) { - try { - assert key != null; - - // Disable cache: - URI uri = key.uri(); - String scheme = uri.getScheme(); - assert scheme != null; - String property = HadoopUtils.disableFsCahcePropertyName(scheme); - key.configuration().setBoolean(property, true); - - FileSystem fs = FileSystem.get(uri, key.configuration(), key.user()); - - // DIAGNOSTIC: Make sure this Fs is not cached by Hadoop: - try { - Object cached = getCached(fs); - - assert cached == null; - } catch (Exception e) { - e.printStackTrace(); - } - - return fs; - } - catch (IOException | InterruptedException ioe) { - throw new IgniteException(ioe); - } - } - } - ); - /** - * Note that configuration is not actually a part of the key. + * Note that configuration is not a part of the key. * It is used solely to initialize the first instance * that is created for the key. */ @@ -465,6 +454,7 @@ public class HadoopUtils { private final Configuration cfg; /** + * Constructor */ public FsCacheKey(URI uri, String usr, Configuration cfg) { assert uri != null; @@ -479,6 +469,7 @@ public class HadoopUtils { } /** + * Creates String key used for equality and hashing. */ private String createEqualityKey() { String scheme = uri.getScheme() == null ? "" : uri.getScheme().toLowerCase(); @@ -489,24 +480,28 @@ public class HadoopUtils { } /** + * The URI. */ public URI uri() { return uri; } /** + * The User. */ public String user() { return usr; } /** + * The Configuration. */ public Configuration configuration() { return cfg; } /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") @Override public boolean equals(Object obj) { if (obj == this) return true; @@ -528,17 +523,32 @@ public class HadoopUtils { } } - public static FileSystem getWithCaching(URI uri, Configuration cfg, String usr) { + /** + * Gets FileSystem caching it in static Ignite cache. The cache is a singleton + * for each class loader. + * + * <p/>Note that the file systems in the cache are keyed by a triplet {scheme, authority, user}. + * The Configuration is not a part of the key. This means that for the given key file system is + * initialized only once with the Configuration passed in upon the file system creation. + * + * @param uri The file system URI. + * @param cfg The configuration. + * @param usr The user to create file system for. + * @return The file system: either created, or taken from the cache. + */ + private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr) { FsCacheKey key = new FsCacheKey(uri, usr, cfg); - X.println("#### Key = " + key); - - FileSystem fs = fileSysLazyMap.getOrCreate(key); - - return fs; + return fileSysLazyMap.getOrCreate(key); } - public static String disableFsCahcePropertyName(String scheme) { + /** + * Gets the property name to disable file system cache. + * @param scheme The file system URI scheme. + * @return The property name. If scheme is null, + * returns "fs.null.impl.disable.cache". + */ + public static String disableFsCachePropertyName(@Nullable String scheme) { return String.format("fs.%s.impl.disable.cache", scheme); } @@ -566,50 +576,4 @@ public class HadoopUtils { return uri0; } - - - /** - * DIAGNOSTIC. - * @return The cached instance in FileSystem cache taken by 'fs.key'. - */ - static Object getCached(FileSystem fs) throws Exception { - assert fs != null; - - Field keyField = FileSystem.class.getDeclaredField("key"); - - keyField.setAccessible(true); - - Object key = keyField.get(fs); - - Map map = getMap(); - - Object cachedFs = map.get(key); - - return cachedFs; - } - - /** - * DIAGNOSTIC. - * @return The FileSystem.CACHE.map . - */ - private static Map getMap() throws Exception { - Field CACHEField = FileSystem.class.getDeclaredField("CACHE"); - - CACHEField.setAccessible(true); - - Object cacheObj = CACHEField.get(null); - - Field mapField = cacheObj.getClass().getDeclaredField("map"); - - mapField.setAccessible(true); - - Map map = (Map)mapField.get(cacheObj); - - return map; - } - - public static void main(String[] args) { - System.out.println(String.format("fs.%s.impl.disable.cache", null)); - } - } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cbb7ddd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java index f9a68f1..dd679de 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java @@ -76,7 +76,7 @@ public class SecondaryFileSystemProvider { } // Disable caching: - String prop = String.format("fs.%s.impl.disable.cache", uri.getScheme()); + String prop = HadoopUtils.disableFsCachePropertyName(uri.getScheme()); cfg.setBoolean(prop, true); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cbb7ddd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index bb72f15..6e2764b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -84,7 +84,9 @@ public class HadoopV2JobResourceManager { try { cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath()); - if(!cfg.getBoolean("fs.file.impl.disable.cache", false)) + String prop = HadoopUtils.disableFsCachePropertyName("file"); + + if (!cfg.getBoolean(prop, false)) FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath())); } finally {