# IG-980: (1) refactoring, (2) added classloader assertions, (3) cleanup.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f57dc8a6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f57dc8a6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f57dc8a6 Branch: refs/heads/ignite-980 Commit: f57dc8a66c5a6b0c95f773411c4ff7a1e4ea022f Parents: ef4e09b Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Mon Jun 22 16:11:12 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Mon Jun 22 16:11:12 2015 +0300 ---------------------------------------------------------------------- .../fs/IgniteHadoopFileSystemCounterWriter.java | 3 +- .../processors/hadoop/HadoopClassLoader.java | 29 ++ .../processors/hadoop/HadoopDefaultJobInfo.java | 4 +- .../internal/processors/hadoop/HadoopUtils.java | 302 ----------------- .../hadoop/SecondaryFileSystemProvider.java | 3 +- .../hadoop/fs/HadoopFileSystemCache.java | 333 +++++++++++++++++++ .../hadoop/fs/HadoopFileSystemsUtils.java | 11 + .../processors/hadoop/v2/HadoopV2Job.java | 41 +-- .../hadoop/v2/HadoopV2JobResourceManager.java | 22 +- .../hadoop/v2/HadoopV2TaskContext.java | 8 +- 10 files changed, 412 insertions(+), 344 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java index 597ff8a..0ba4da4 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -25,6 +25,7 @@ import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; @@ -72,7 +73,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter try { hadoopCfg.set(MRJobConfig.USER_NAME, user); - FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, jobId); + FileSystem fs = HadoopFileSystemCache.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, jobId); fs.mkdirs(jobStatPath); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java index eb98ff9..0988fe0 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java @@ -67,6 +67,28 @@ public class HadoopClassLoader extends URLClassLoader { private final String name; /** + * Gets name for Job class loader. The name is specific for local node id. + * @param locNodeId The local node id. + * @return The class loader name. + */ + public static String nameForJob(UUID locNodeId) { + return "hadoop-job-node-" + locNodeId.toString(); + } + + /** + * Gets name for the task class loader. Task class loader + * @param info The task info. + * @param prefix Get only prefix (without task type and number) + * @return The class loader name. + */ + public static String nameForTask(HadoopTaskInfo info, boolean prefix) { + if (prefix) + return "hadoop-task-" + info.jobId() + "-"; + else + return "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber(); + } + + /** * @param urls Urls. */ public HadoopClassLoader(URL[] urls, String name) { @@ -568,4 +590,11 @@ public class HadoopClassLoader extends URLClassLoader { @Override public String toString() { return S.toString(HadoopClassLoader.class, this); } + + /** + * Getter for name field. + */ + public String name() { + return name; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index 9e685ea..a31ada5 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -74,7 +74,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { private static final HadoopLazyConcurrentMap<UUID, CloseableClass> hadoopV2JobClasses = new HadoopLazyConcurrentMap<>(new HadoopLazyConcurrentMap.ValueFactory<UUID, CloseableClass>() { @Override public CloseableClass createValue(UUID key) { - HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-job-node-" + key); + HadoopClassLoader ldr = new HadoopClassLoader(null, HadoopClassLoader.nameForJob(key)); try { Class<?> jobCls = ldr.loadClass(HadoopV2Job.class.getName()); @@ -121,8 +121,6 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { try { Class<?> jobCls0 = hadoopV2JobClasses.getOrCreate(nodeId).clazz(); - X.println("#### Creating job: HadoopJob: " + nodeId + ", class = " + jobCls0); - Constructor<?> constructor = jobCls0.getConstructor(HadoopJobId.class, UUID.class, HadoopDefaultJobInfo.class, IgniteLogger.class); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index 2234549..f87e610 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -26,19 +26,11 @@ import org.apache.hadoop.mapreduce.JobPriority; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.*; import org.apache.ignite.*; -import org.apache.ignite.hadoop.fs.v1.*; -import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.hadoop.v2.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; -import org.jsr166.*; import java.io.*; -import java.net.*; import java.util.*; -import java.util.concurrent.*; /** * Hadoop utility methods. @@ -65,47 +57,6 @@ public class HadoopUtils { /** Old reducer class attribute. */ private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class"; - /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ - private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fileSysLazyMap - = createHadoopLazyConcurrentMap(); - - /** File system cache for jobs. */ - private static final ConcurrentMap<HadoopJobId, HadoopLazyConcurrentMap<FsCacheKey,FileSystem>> jobFsMap - = new ConcurrentHashMap8<>(); - - /** - * Creates HadoopLazyConcurrentMap. - * @return a new HadoopLazyConcurrentMap. - */ - public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() { - return new HadoopLazyConcurrentMap<>( - new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { - @Override public FileSystem createValue(FsCacheKey key) { - try { - assert key != null; - - // Explicitly disable FileSystem caching: - URI uri = key.uri(); - - String scheme = uri.getScheme(); - - // Copy the configuration to avoid altering the external object. - Configuration cfg = new Configuration(key.configuration()); - - String prop = HadoopUtils.disableFsCachePropertyName(scheme); - - cfg.setBoolean(prop, true); - - return FileSystem.get(uri, cfg, key.user()); - } - catch (IOException | InterruptedException ioe) { - throw new IgniteException(ioe); - } - } - } - ); - } - /** * Constructor. */ @@ -393,257 +344,4 @@ public class HadoopUtils { } } - /** - * Gets non-null user name as per the Hadoop viewpoint. - * @param cfg the Hadoop job configuration, may be null. - * @return the user name, never null. - */ - private static String getMrHadoopUser(Configuration cfg) throws IOException { - String user = cfg.get(MRJobConfig.USER_NAME); - - if (user == null) - user = IgniteHadoopFileSystem.getFsHadoopUser(); - - return user; - } - - /** - * Common method to get the V1 file system in MapRed engine. - * It creates the filesystem for the user specified in the - * configuration with {@link MRJobConfig#USER_NAME} property. - * @param uri the file system uri. - * @param cfg the configuration. - * @return the file system - * @throws IOException - */ - public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, @Nullable HadoopJobId jobId) - throws IOException { - final String usr = getMrHadoopUser(cfg); - - assert usr != null; - - if (uri == null) - uri = FileSystem.getDefaultUri(cfg); - - final FileSystem fs; - - try { - fs = getWithCaching(uri, cfg, usr, jobId); - } - catch (IgniteException ie) { - throw new IOException(ie); - } - - assert fs != null; - assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user()); - - return fs; - } - - /** - * Note that configuration is not a part of the key. - * It is used solely to initialize the first instance - * that is created for the key. - */ - public static final class FsCacheKey { - /** */ - private final URI uri; - - /** */ - private final String usr; - - /** */ - private final String equalityKey; - - /** */ - private final Configuration cfg; - - /** - * Constructor - */ - public FsCacheKey(URI uri, String usr, Configuration cfg) { - assert uri != null; - assert usr != null; - assert cfg != null; - - this.uri = fixUri(uri, cfg); - this.usr = usr; - this.cfg = cfg; - - this.equalityKey = createEqualityKey(); - } - - /** - * Creates String key used for equality and hashing. - */ - private String createEqualityKey() { - GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@"); - - if (uri.getScheme() != null) - sb.a(uri.getScheme().toLowerCase()); - - sb.a("://"); - - if (uri.getAuthority() != null) - sb.a(uri.getAuthority().toLowerCase()); - - return sb.toString(); - } - - /** - * The URI. - */ - public URI uri() { - return uri; - } - - /** - * The User. - */ - public String user() { - return usr; - } - - /** - * The Configuration. - */ - public Configuration configuration() { - return cfg; - } - - /** {@inheritDoc} */ - @SuppressWarnings("SimplifiableIfStatement") - @Override public boolean equals(Object obj) { - if (obj == this) - return true; - - if (obj == null || getClass() != obj.getClass()) - return false; - - return equalityKey.equals(((FsCacheKey)obj).equalityKey); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return equalityKey.hashCode(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return equalityKey; - } - } - - /** - * Gets FileSystem caching it in static Ignite cache. The cache is a singleton - * for each class loader. - * - * <p/>Note that the file systems in the cache are keyed by a triplet {scheme, authority, user}. - * The Configuration is not a part of the key. This means that for the given key file system is - * initialized only once with the Configuration passed in upon the file system creation. - * - * @param uri The file system URI. - * @param cfg The configuration. - * @param usr The user to create file system for. - * @return The file system: either created, or taken from the cache. - */ - private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr, @Nullable HadoopJobId jobId) { - final FsCacheKey key = new FsCacheKey(uri, usr, cfg); - - if (jobId == null) - return fileSysLazyMap.getOrCreate(key); - else { - HadoopLazyConcurrentMap<FsCacheKey,FileSystem> lm = getFsMapForJob(jobId); - - return lm.getOrCreate(key); - } - } - - /** - * Gets Fs map for given job Id and localNodeId. If local node Id not null, registers this - * local node id to track subsequent removal. - * @param jobId The job Id. - * @return File system map. - */ - private static HadoopLazyConcurrentMap<FsCacheKey,FileSystem> getFsMapForJob(final HadoopJobId jobId) { - assert jobId != null; - - HadoopLazyConcurrentMap<FsCacheKey, FileSystem> lazy = jobFsMap.get(jobId); - - if (lazy != null) - return lazy; - - HadoopLazyConcurrentMap<FsCacheKey, FileSystem> newLM = createHadoopLazyConcurrentMap(); - - HadoopLazyConcurrentMap<FsCacheKey, FileSystem> pushedT2 = jobFsMap.putIfAbsent(jobId, newLM); - - if (pushedT2 == null) - lazy = newLM; - else { - lazy = pushedT2; - - try { - newLM.close(); - } catch (IgniteCheckedException ice) { - throw new IgniteException(ice); - } - } - - return lazy; - } - - /** - * Closes file system map for this job Id and local node id. - * @param jobId The job id. - * @throws IgniteCheckedException - */ - public static synchronized void close(final HadoopJobId jobId) throws IgniteCheckedException { - assert jobId != null; - - final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> lazy = jobFsMap.remove(jobId); - - if (lazy != null) - lazy.close(); - } - - /** - * Gets the property name to disable file system cache. - * @param scheme The file system URI scheme. - * @return The property name. If scheme is null, - * returns "fs.null.impl.disable.cache". - */ - public static String disableFsCachePropertyName(@Nullable String scheme) { - return String.format("fs.%s.impl.disable.cache", scheme); - } - - /** - * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3). - * @param uri0 The uri. - * @param cfg The cfg. - * @return Correct URI. - */ - public static URI fixUri(URI uri0, Configuration cfg) { - if (uri0 == null) - return FileSystem.getDefaultUri(cfg); - - String scheme = uri0.getScheme(); - String authority = uri0.getAuthority(); - - if (authority == null) { - URI dfltUri = FileSystem.getDefaultUri(cfg); - - if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null)) - return dfltUri; - } - - return uri0; - } - - /** - * This method is called with reflection upon Job finish. This will clean up all the Fs created for tasks. - * @throws IgniteCheckedException - */ - public static void close() throws IgniteCheckedException { - fileSysLazyMap.close(); - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java index dd679de..ef04b0f 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.security.*; +import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -76,7 +77,7 @@ public class SecondaryFileSystemProvider { } // Disable caching: - String prop = HadoopUtils.disableFsCachePropertyName(uri.getScheme()); + String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(uri.getScheme()); cfg.setBoolean(prop, true); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java new file mode 100644 index 0000000..cac248b --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.ignite.*; +import org.apache.ignite.hadoop.fs.v1.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; +import org.jsr166.*; + +import java.io.*; +import java.net.*; +import java.util.concurrent.*; + +/** + * Static caches of file systems used by Map-Reduce tasks and jobs. + * This class + */ +public class HadoopFileSystemCache { + /** Lazy per-user file system cache used by Hadoop tasks. */ + private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> taskFsMap + = createHadoopLazyConcurrentMap(); + + /** File system cache for used by Hadoop jobs. */ + private static final ConcurrentMap<HadoopJobId, HadoopLazyConcurrentMap<FsCacheKey,FileSystem>> jobFsMap + = new ConcurrentHashMap8<>(); + + /** + * Creates HadoopLazyConcurrentMap. + * @return a new HadoopLazyConcurrentMap. + */ + private static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() { + return new HadoopLazyConcurrentMap<>( + new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { + @Override public FileSystem createValue(FsCacheKey key) { + try { + assert key != null; + + // Explicitly disable FileSystem caching: + URI uri = key.uri(); + + String scheme = uri.getScheme(); + + // Copy the configuration to avoid altering the external object. + Configuration cfg = new Configuration(key.configuration()); + + String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme); + + cfg.setBoolean(prop, true); + + return FileSystem.get(uri, cfg, key.user()); + } + catch (IOException | InterruptedException ioe) { + throw new IgniteException(ioe); + } + } + } + ); + } + + /** + * Gets non-null user name as per the Hadoop viewpoint. + * @param cfg the Hadoop job configuration, may be null. + * @return the user name, never null. + */ + private static String getMrHadoopUser(Configuration cfg) throws IOException { + String user = cfg.get(MRJobConfig.USER_NAME); + + if (user == null) + user = IgniteHadoopFileSystem.getFsHadoopUser(); + + return user; + } + + /** + * Common method to get the V1 file system in MapRed engine. + * It gets the filesystem for the user specified in the + * configuration with {@link MRJobConfig#USER_NAME} property. + * The file systems are created and cached upon first request. + * + * <p/> The behavior of this method relies upon class loader structure of map-red engine. + * In particular, file system for a job must be requested by Job {@link HadoopClassLoader} specific + * for local node id (grid instance). The file system for a task must be requested by Task {@link HadoopClassLoader} + * specific for that task or reused from another task of the same job. + * + * @param uri the file system uri. + * @param cfg the configuration. + * @param jobId The job id, if file system is requested for a job, or null if the file system is requested for + * a task. + * @return The file system. + * @throws IOException On error. + */ + public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, @Nullable HadoopJobId jobId) + throws IOException { + final String usr = getMrHadoopUser(cfg); + + assert usr != null; + + if (uri == null) + uri = FileSystem.getDefaultUri(cfg); + + final FileSystem fs; + + try { + fs = getWithCaching(uri, cfg, usr, jobId); + } + catch (IgniteException ie) { + throw new IOException(ie); + } + + assert fs != null; + assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user()); + + return fs; + } + + /** + * Gets FileSystem caching it in static Ignite cache. The cache is a singleton + * for each class loader. + * + * <p/>Note that the file systems in the cache are keyed by a triplet {scheme, authority, user}. + * The Configuration is not a part of the key. This means that for the given key file system is + * initialized only once with the Configuration passed in upon the file system creation. + * + * @param uri The file system URI. + * @param cfg The configuration. + * @param usr The user to create file system for. + * @return The file system: either created, or taken from the cache. + */ + private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr, @Nullable HadoopJobId jobId) { + final FsCacheKey key = new FsCacheKey(uri, usr, cfg); + + if (jobId == null) + return taskFsMap.getOrCreate(key); + else { + HadoopLazyConcurrentMap<FsCacheKey,FileSystem> lm = getFsMapForJob(jobId); + + return lm.getOrCreate(key); + } + } + + /** + * Gets Fs map for given job Id and localNodeId. If local node Id not null, registers this + * local node id to track subsequent removal. + * @param jobId The job Id. + * @return File system map. + */ + private static HadoopLazyConcurrentMap<FsCacheKey,FileSystem> getFsMapForJob(final HadoopJobId jobId) { + assert jobId != null; + + HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map = jobFsMap.get(jobId); + + if (map != null) + return map; + + HadoopLazyConcurrentMap<FsCacheKey, FileSystem> newLM = createHadoopLazyConcurrentMap(); + + HadoopLazyConcurrentMap<FsCacheKey, FileSystem> pushedT2 = jobFsMap.putIfAbsent(jobId, newLM); + + if (pushedT2 == null) + map = newLM; + else { + map = pushedT2; + + try { + newLM.close(); + } catch (IgniteCheckedException ice) { + throw new IgniteException(ice); + } + } + + return map; + } + + /** + * Closes file system map for this job Id and local node id. + * @param jobId The job id. + * @throws IgniteCheckedException + */ + public static synchronized void close(final HadoopJobId jobId) throws IgniteCheckedException { + assert jobId != null; + + final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map = jobFsMap.remove(jobId); + + if (map != null) + map.close(); + } + + /** + * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3). + * @param uri0 The uri. + * @param cfg The cfg. + * @return Correct URI. + */ + private static URI fixUri(URI uri0, Configuration cfg) { + if (uri0 == null) + return FileSystem.getDefaultUri(cfg); + + String scheme = uri0.getScheme(); + String authority = uri0.getAuthority(); + + if (authority == null) { + URI dfltUri = FileSystem.getDefaultUri(cfg); + + if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null)) + return dfltUri; + } + + return uri0; + } + + /** + * This method is called with reflection upon Job finish. This will clean up all the Fs created for tasks. + * @throws IgniteCheckedException + */ + public static void close() throws IgniteCheckedException { + taskFsMap.close(); + } + + /** + * Note that configuration is not a part of the key. + * It is used solely to initialize the first instance + * that is created for the key. + */ + public static final class FsCacheKey { + /** */ + private final URI uri; + + /** */ + private final String usr; + + /** */ + private final String equalityKey; + + /** */ + private final Configuration cfg; + + /** + * Constructor + */ + public FsCacheKey(URI uri, String usr, Configuration cfg) { + assert uri != null; + assert usr != null; + assert cfg != null; + + this.uri = fixUri(uri, cfg); + this.usr = usr; + this.cfg = cfg; + + this.equalityKey = createEqualityKey(); + } + + /** + * Creates String key used for equality and hashing. + */ + private String createEqualityKey() { + GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@"); + + if (uri.getScheme() != null) + sb.a(uri.getScheme().toLowerCase()); + + sb.a("://"); + + if (uri.getAuthority() != null) + sb.a(uri.getAuthority().toLowerCase()); + + return sb.toString(); + } + + /** + * The URI. + */ + public URI uri() { + return uri; + } + + /** + * The User. + */ + public String user() { + return usr; + } + + /** + * The Configuration. + */ + public Configuration configuration() { + return cfg; + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (obj == null || getClass() != obj.getClass()) + return false; + + return equalityKey.equals(((FsCacheKey)obj).equalityKey); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return equalityKey.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return equalityKey; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java index d90bc28..382bbd0 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop.fs; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; +import org.jetbrains.annotations.*; /** * Utilities for configuring file systems to support the separate working directory per each thread. @@ -37,4 +38,14 @@ public class HadoopFileSystemsUtils { cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV2.class.getName()); } + + /** + * Gets the property name to disable file system cache. + * @param scheme The file system URI scheme. + * @return The property name. If scheme is null, + * returns "fs.null.impl.disable.cache". + */ + public static String disableFsCachePropertyName(@Nullable String scheme) { + return String.format("fs.%s.impl.disable.cache", scheme); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index 6e957df..7e70865 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -96,9 +96,7 @@ public class HadoopV2Job implements HadoopJob { this.jobInfo = jobInfo; this.locNodeId = locNodeId; - // TODO: debug: - assert getClass().getClassLoader() instanceof HadoopClassLoader; - assert getClass().getClassLoader().toString().contains(locNodeId.toString()); + assert ((HadoopClassLoader)getClass().getClassLoader()).name().equals(HadoopClassLoader.nameForJob(locNodeId)); hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId()); @@ -113,7 +111,7 @@ public class HadoopV2Job implements HadoopJob { jobCtx = new JobContextImpl(jobConf, hadoopJobID); - rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log); + rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log, locNodeId); } /** {@inheritDoc} */ @@ -144,7 +142,10 @@ public class HadoopV2Job implements HadoopJob { Path jobDir = new Path(jobDirPath); try { - FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, jobId); + assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name() + .equals(HadoopClassLoader.nameForJob(locNodeId)); + + FileSystem fs = HadoopFileSystemCache.fileSystemForMrUser(jobDir.toUri(), jobConf, jobId); JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf, jobDir); @@ -211,7 +212,7 @@ public class HadoopV2Job implements HadoopJob { // Note that the classloader identified by the task it was initially created for, // but later it may be reused for other tasks. HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(), - "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber()); + HadoopClassLoader.nameForTask(info, false)); cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName()); @@ -277,8 +278,6 @@ public class HadoopV2Job implements HadoopJob { @Override public void dispose(boolean external) throws IgniteCheckedException { boolean dsp = disposed.compareAndSet(false, true); - X.println("###### Dispose: " + locNodeId + ", ldr = " + getClass().getClassLoader()); - if (!dsp) return; @@ -324,34 +323,14 @@ public class HadoopV2Job implements HadoopJob { assert fullCtxClsQueue.isEmpty(); - // Close all cached Fs for this Job: - //HadoopUtils.close(jobId.toString(), locNodeId.toString()); - //closeCachedFileSystems(getClass().getClassLoader()); - HadoopUtils.close(jobId); - - invokeGc(); + // Close all cached file systems for this Job: + HadoopFileSystemCache.close(jobId); if (err != null) throw U.cast(err); } } - // TODO: remove - private void invokeGc() { - int i = 0; - - while (i++ < 5) - System.gc(); - } - - /** {@inheritDoc} */ - @Override protected void finalize() throws Throwable { - super.finalize(); - - // TODO: remove - dispose(false); - } - /** * Stops Hadoop Fs daemon threads. * @param ldr The task ClassLoader to stop the daemons for. @@ -371,7 +350,7 @@ public class HadoopV2Job implements HadoopJob { * @throws Exception On error. */ private void closeCachedFileSystems(ClassLoader ldr) throws Exception { - Class<?> clazz = ldr.loadClass(HadoopUtils.class.getName()); + Class<?> clazz = ldr.loadClass(HadoopFileSystemCache.class.getName()); Method m = clazz.getMethod("close"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index b86f16d..05d61fc 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -41,7 +41,7 @@ import java.util.*; */ public class HadoopV2JobResourceManager { /** File type Fs disable caching property name. */ - private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = HadoopUtils.disableFsCachePropertyName("file"); + private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = HadoopFileSystemsUtils.disableFsCachePropertyName("file"); /** Hadoop job context. */ private final JobContextImpl ctx; @@ -61,16 +61,19 @@ public class HadoopV2JobResourceManager { /** Staging directory to delivery job jar and config to the work nodes. */ private Path stagingDir; + private final UUID locNodeId; + /** * Creates new instance. * @param jobId Job ID. * @param ctx Hadoop job context. * @param log Logger. */ - public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, IgniteLogger log) { + public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, IgniteLogger log, UUID locNodeId) { this.jobId = jobId; this.ctx = ctx; this.log = log.getLogger(HadoopV2JobResourceManager.class); + this.locNodeId = locNodeId; } /** @@ -115,7 +118,10 @@ public class HadoopV2JobResourceManager { stagingDir = new Path(new URI(mrDir)); if (download) { - FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg, jobId); + assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name() + .equals(HadoopClassLoader.nameForJob(locNodeId)); + + FileSystem fs = HadoopFileSystemCache.fileSystemForMrUser(stagingDir.toUri(), cfg, jobId); if (!fs.exists(stagingDir)) throw new IgniteCheckedException("Failed to find map-reduce submission " + @@ -210,7 +216,10 @@ public class HadoopV2JobResourceManager { FileSystem dstFs = FileSystem.getLocal(cfg); - FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, jobId); + assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name() + .equals(HadoopClassLoader.nameForJob(locNodeId)); + + FileSystem srcFs = HadoopFileSystemCache.fileSystemForMrUser(srcPath.toUri(), cfg, jobId); if (extract) { File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives"); @@ -293,7 +302,10 @@ public class HadoopV2JobResourceManager { public void cleanupStagingDirectory() { try { if (stagingDir != null) { - FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), jobId); + assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name() + .equals(HadoopClassLoader.nameForJob(locNodeId)); + + FileSystem fs = HadoopFileSystemCache.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), jobId); fs.delete(stagingDir, true); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index 7012566..24293b8 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -33,6 +33,7 @@ import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.hadoop.v1.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; @@ -426,9 +427,14 @@ public class HadoopV2TaskContext extends HadoopTaskContext { FileSystem fs; try { + // This assertion uses .startsWith() instead of .equals() because task class loaders may + // be reused between tasks of the same job. + assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name() + .startsWith(HadoopClassLoader.nameForTask(taskInfo(), true)); + // Task class loader. // We also cache Fs there, all them will be cleared explicitly upon the Job end. - fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), null); + fs = HadoopFileSystemCache.fileSystemForMrUser(jobDir.toUri(), jobConf(), null); } catch (IOException e) { throw new IgniteCheckedException(e);