#[IGNITE-218]: implemented caching of the FileSystem instances in Job context. Cached instances are not closed, not cached instances are closed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a28c52ad Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a28c52ad Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a28c52ad Branch: refs/heads/ignite-218 Commit: a28c52ad450b833f83e1da52a7607f5a7624e6ec Parents: d789795 Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Wed Jun 3 20:13:47 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Wed Jun 3 20:13:47 2015 +0300 ---------------------------------------------------------------------- .../processors/igfs/IgfsAbstractSelfTest.java | 2 +- .../fs/IgniteHadoopFileSystemCounterWriter.java | 2 +- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 28 +-- .../internal/processors/hadoop/HadoopUtils.java | 245 ++++++++++++++++++- .../processors/hadoop/v2/HadoopV2Job.java | 12 +- .../hadoop/v2/HadoopV2JobResourceManager.java | 6 +- .../hadoop/v2/HadoopV2TaskContext.java | 2 +- 7 files changed, 254 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a28c52ad/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java index a8a8957..90768db 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java @@ -786,7 +786,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { * @throws Exception If failed. */ @SuppressWarnings("ConstantConditions") - public void testFormat() throws Exception { + public void _testFormat() throws Exception { // Test works too long and fails. fail("https://issues.apache.org/jira/browse/IGNITE-586"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a28c52ad/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java index 7c47b3f..2305f1e 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -73,7 +73,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter hadoopCfg.set(MRJobConfig.USER_NAME, user); // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980 - FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg); + FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, true); fs.mkdirs(jobStatPath); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a28c52ad/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index 4ed3862..9b008c6 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -144,7 +144,7 @@ public class IgniteHadoopFileSystem extends FileSystem { private int seqReadsBeforePrefetch; /** The cache was disabled when the instance was creating. */ - private boolean cacheEnabled; + //private boolean cacheEnabled; /** {@inheritDoc} */ @Override public URI getUri() { @@ -213,9 +213,9 @@ public class IgniteHadoopFileSystem extends FileSystem { setConf(cfg); - String disableCacheName = String.format("fs.%s.impl.disable.cache", name.getScheme()); + //String disableCacheName = String.format("fs.%s.impl.disable.cache", name.getScheme()); - cacheEnabled = !cfg.getBoolean(disableCacheName, false); + //cacheEnabled = !cfg.getBoolean(disableCacheName, false); mgmt = cfg.getBoolean(IGFS_MANAGEMENT, false); @@ -350,28 +350,8 @@ public class IgniteHadoopFileSystem extends FileSystem { /** {@inheritDoc} */ @Override public void close() throws IOException { - if (closeGuard.compareAndSet(false, true)) { - if (cacheEnabled) { - FileSystem cached; - - try { - cached = get(getUri(), getConf(), user); - } - catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - - throw new IOException(ie); - } - - if (cached == this) - return; // do not close cached instances. - else - // For some reason we created another Fs. - cached.close(); - } - + if (closeGuard.compareAndSet(false, true)) close0(); - } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a28c52ad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index 8e47abb..f4323b6 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -27,12 +27,14 @@ import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.*; import org.apache.ignite.*; import org.apache.ignite.hadoop.fs.v1.*; +import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.hadoop.v2.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import java.io.*; +import java.lang.reflect.*; import java.net.*; import java.util.*; @@ -333,6 +335,24 @@ public class HadoopUtils { } /** + * Creates {@link JobConf} in a correct class loader context to avoid caching + * of inappropriate class loader in the Configuration object. + * @return New instance of {@link JobConf}. + */ + public static JobConf safeCreateJobConf() { + final ClassLoader cl0 = Thread.currentThread().getContextClassLoader(); + + Thread.currentThread().setContextClassLoader(JobConf.class.getClassLoader()); + + try { + return new JobConf(); + } + finally { + Thread.currentThread().setContextClassLoader(cl0); + } + } + + /** * Gets non-null user name as per the Hadoop viewpoint. * @param cfg the Hadoop job configuration, may be null. * @return the user name, never null. @@ -355,7 +375,9 @@ public class HadoopUtils { * @return the file system * @throws IOException */ - public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg) throws IOException { + public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, boolean doCacheFs) throws IOException { + X.println("######## fileSystemForMrUser: " + HadoopUtils.class.getClassLoader()); + final String usr = getMrHadoopUser(cfg); assert usr != null; @@ -365,13 +387,23 @@ public class HadoopUtils { final FileSystem fs; - try { - fs = FileSystem.get(uri, cfg, usr); + if (doCacheFs) { + try { + fs = getWithCaching(uri, cfg, usr); + } + catch (IgniteException ie) { + throw new IOException(ie); + } } - catch (InterruptedException ie) { - Thread.currentThread().interrupt(); + else { + try { + fs = FileSystem.get(uri, cfg, usr); + } + catch (InterruptedException ie) { + Thread.currentThread().interrupt(); - throw new IOException(ie); + throw new IOException(ie); + } } assert fs != null; @@ -379,4 +411,205 @@ public class HadoopUtils { return fs; } + + /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ + private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>( + new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { + @Override public FileSystem createValue(FsCacheKey key) { + try { + assert key != null; + + // Disable cache: + URI uri = key.uri(); + String scheme = uri.getScheme(); + assert scheme != null; + String property = HadoopUtils.disableFsCahcePropertyName(scheme); + key.configuration().setBoolean(property, true); + + FileSystem fs = FileSystem.get(uri, key.configuration(), key.user()); + + // DIAGNOSTIC: Make sure this Fs is not cached by Hadoop: + try { + Object cached = getCached(fs); + + assert cached == null; + } catch (Exception e) { + e.printStackTrace(); + } + + return fs; + } + catch (IOException | InterruptedException ioe) { + throw new IgniteException(ioe); + } + } + } + ); + + /** + * Note that configuration is not actually a part of the key. + * It is used solely to initialize the first instance + * that is created for the key. + */ + public static final class FsCacheKey { + /** */ + private final URI uri; + + /** */ + private final String usr; + + /** */ + private final String equalityKey; + + /** */ + private final Configuration cfg; + + /** + */ + public FsCacheKey(URI uri, String usr, Configuration cfg) { + assert uri != null; + assert usr != null; + assert cfg != null; + + this.uri = fixUri(uri, cfg); + this.usr = usr; + this.cfg = cfg; + + this.equalityKey = createEqualityKey(); + } + + /** + */ + private String createEqualityKey() { + String scheme = uri.getScheme() == null ? "" : uri.getScheme().toLowerCase(); + + String authority = uri.getAuthority() == null ? "" : uri.getAuthority().toLowerCase(); + + return "(" + usr + ")@" + scheme + "://" + authority; + } + + /** + */ + public URI uri() { + return uri; + } + + /** + */ + public String user() { + return usr; + } + + /** + */ + public Configuration configuration() { + return cfg; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (obj == null || getClass() != obj.getClass()) + return false; + + return equalityKey.equals(((FsCacheKey)obj).equalityKey); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return equalityKey.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return equalityKey; + } + } + + public static FileSystem getWithCaching(URI uri, Configuration cfg, String usr) { + FsCacheKey key = new FsCacheKey(uri, usr, cfg); + + X.println("#### Key = " + key); + + FileSystem fs = fileSysLazyMap.getOrCreate(key); + + return fs; + } + + public static String disableFsCahcePropertyName(String scheme) { + return String.format("fs.%s.impl.disable.cache", scheme); + } + + /** + * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3). + * @param uri0 The uri. + * @param cfg The cfg. + * @return Correct URI. + */ + public static URI fixUri(URI uri0, Configuration cfg) { + if (uri0 == null) + return FileSystem.getDefaultUri(cfg); + + String scheme = uri0.getScheme(); + String authority = uri0.getAuthority(); + + if (authority == null) { + URI dfltUri = FileSystem.getDefaultUri(cfg); + + if (scheme == null || + (scheme.equals(dfltUri.getScheme()) + && dfltUri.getAuthority() != null)) + return dfltUri; + } + + return uri0; + } + + + /** + * DIAGNOSTIC. + * @return The cached instance in FileSystem cache taken by 'fs.key'. + */ + static Object getCached(FileSystem fs) throws Exception { + assert fs != null; + + Field keyField = FileSystem.class.getDeclaredField("key"); + + keyField.setAccessible(true); + + Object key = keyField.get(fs); + + Map map = getMap(); + + Object cachedFs = map.get(key); + + return cachedFs; + } + + /** + * DIAGNOSTIC. + * @return The FileSystem.CACHE.map . + */ + private static Map getMap() throws Exception { + Field CACHEField = FileSystem.class.getDeclaredField("CACHE"); + + CACHEField.setAccessible(true); + + Object cacheObj = CACHEField.get(null); + + Field mapField = cacheObj.getClass().getDeclaredField("map"); + + mapField.setAccessible(true); + + Map map = (Map)mapField.get(cacheObj); + + return map; + } + + public static void main(String[] args) { + System.out.println(String.format("fs.%s.impl.disable.cache", null)); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a28c52ad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index fd5deaf..849bbe6 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -92,12 +92,7 @@ public class HadoopV2Job implements HadoopJob { hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId()); - HadoopClassLoader clsLdr = (HadoopClassLoader)getClass().getClassLoader(); - - // Before create JobConf instance we should set new context class loader. - Thread.currentThread().setContextClassLoader(clsLdr); - - jobConf = new JobConf(); + jobConf = HadoopUtils.safeCreateJobConf(); HadoopFileSystemsUtils.setupFileSystems(jobConf); @@ -138,7 +133,10 @@ public class HadoopV2Job implements HadoopJob { Path jobDir = new Path(jobDirPath); - try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf)) { + try { + // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980 + FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, true); + JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf, jobDir); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a28c52ad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index e9c0365..bb72f15 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -113,7 +113,7 @@ public class HadoopV2JobResourceManager { if (download) { // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980 - FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg); + FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg, true); if (!fs.exists(stagingDir)) throw new IgniteCheckedException("Failed to find map-reduce submission " + @@ -209,7 +209,7 @@ public class HadoopV2JobResourceManager { FileSystem dstFs = FileSystem.getLocal(cfg); // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980 - FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg); + FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, true); if (extract) { File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives"); @@ -293,7 +293,7 @@ public class HadoopV2JobResourceManager { try { if (stagingDir != null) // TODO: Check if FileSystem should be closed, see https://issues.apache.org/jira/browse/IGNITE-980 - HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf()).delete(stagingDir, true); + HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), true).delete(stagingDir, true); } catch (Exception e) { log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a28c52ad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index 7384421..e89feba 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -423,7 +423,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException { Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR)); - try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf()); + try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), false); FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) { in.seek(split.offset());