# IG-980: (1) refactoring, (2) added classloader assertions, (3) cleanup.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f57dc8a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f57dc8a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f57dc8a6

Branch: refs/heads/ignite-980
Commit: f57dc8a66c5a6b0c95f773411c4ff7a1e4ea022f
Parents: ef4e09b
Author: iveselovskiy <iveselovs...@gridgain.com>
Authored: Mon Jun 22 16:11:12 2015 +0300
Committer: iveselovskiy <iveselovs...@gridgain.com>
Committed: Mon Jun 22 16:11:12 2015 +0300

----------------------------------------------------------------------
 .../fs/IgniteHadoopFileSystemCounterWriter.java |   3 +-
 .../processors/hadoop/HadoopClassLoader.java    |  29 ++
 .../processors/hadoop/HadoopDefaultJobInfo.java |   4 +-
 .../internal/processors/hadoop/HadoopUtils.java | 302 -----------------
 .../hadoop/SecondaryFileSystemProvider.java     |   3 +-
 .../hadoop/fs/HadoopFileSystemCache.java        | 333 +++++++++++++++++++
 .../hadoop/fs/HadoopFileSystemsUtils.java       |  11 +
 .../processors/hadoop/v2/HadoopV2Job.java       |  41 +--
 .../hadoop/v2/HadoopV2JobResourceManager.java   |  22 +-
 .../hadoop/v2/HadoopV2TaskContext.java          |   8 +-
 10 files changed, 412 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 597ff8a..0ba4da4 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -25,6 +25,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 
@@ -72,7 +73,7 @@ public class IgniteHadoopFileSystemCounterWriter implements 
HadoopCounterWriter
         try {
             hadoopCfg.set(MRJobConfig.USER_NAME, user);
 
-            FileSystem fs = 
HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, jobId);
+            FileSystem fs = 
HadoopFileSystemCache.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, 
jobId);
 
             fs.mkdirs(jobStatPath);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
index eb98ff9..0988fe0 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
@@ -67,6 +67,28 @@ public class HadoopClassLoader extends URLClassLoader {
     private final String name;
 
     /**
+     * Gets name for Job class loader. The name is specific for local node id.
+     * @param locNodeId The local node id.
+     * @return The class loader name.
+     */
+    public static String nameForJob(UUID locNodeId) {
+        return "hadoop-job-node-" + locNodeId.toString();
+    }
+
+    /**
+     * Gets name for the task class loader. Task class loader
+     * @param info The task info.
+     * @param prefix Get only prefix (without task type and number)
+     * @return The class loader name.
+     */
+    public static String nameForTask(HadoopTaskInfo info, boolean prefix) {
+        if (prefix)
+            return "hadoop-task-" + info.jobId() + "-";
+        else
+            return "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + 
info.taskNumber();
+    }
+
+    /**
      * @param urls Urls.
      */
     public HadoopClassLoader(URL[] urls, String name) {
@@ -568,4 +590,11 @@ public class HadoopClassLoader extends URLClassLoader {
     @Override public String toString() {
         return S.toString(HadoopClassLoader.class, this);
     }
+
+    /**
+     * Getter for name field.
+     */
+    public String name() {
+        return name;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index 9e685ea..a31ada5 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -74,7 +74,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, 
Externalizable {
     private static final HadoopLazyConcurrentMap<UUID, CloseableClass> 
hadoopV2JobClasses =
         new HadoopLazyConcurrentMap<>(new 
HadoopLazyConcurrentMap.ValueFactory<UUID, CloseableClass>() {
             @Override public CloseableClass createValue(UUID key) {
-                HadoopClassLoader ldr = new HadoopClassLoader(null, 
"hadoop-job-node-" + key);
+                HadoopClassLoader ldr = new HadoopClassLoader(null, 
HadoopClassLoader.nameForJob(key));
 
                 try {
                     Class<?> jobCls = 
ldr.loadClass(HadoopV2Job.class.getName());
@@ -121,8 +121,6 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, 
Externalizable {
         try {
             Class<?> jobCls0 = hadoopV2JobClasses.getOrCreate(nodeId).clazz();
 
-            X.println("#### Creating job: HadoopJob: " + nodeId + ", class = " 
+ jobCls0);
-
             Constructor<?> constructor = 
jobCls0.getConstructor(HadoopJobId.class, UUID.class,
                 HadoopDefaultJobInfo.class, IgniteLogger.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 2234549..f87e610 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -26,19 +26,11 @@ import org.apache.hadoop.mapreduce.JobPriority;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.ignite.*;
-import org.apache.ignite.hadoop.fs.v1.*;
-import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.v2.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-import org.jsr166.*;
 
 import java.io.*;
-import java.net.*;
 import java.util.*;
-import java.util.concurrent.*;
 
 /**
  * Hadoop utility methods.
@@ -65,47 +57,6 @@ public class HadoopUtils {
     /** Old reducer class attribute. */
     private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
 
-    /** Lazy per-user cache for the file systems. It is cleared and nulled in 
#close() method. */
-    private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> 
fileSysLazyMap
-        = createHadoopLazyConcurrentMap();
-
-    /** File system cache for jobs. */
-    private static final ConcurrentMap<HadoopJobId, 
HadoopLazyConcurrentMap<FsCacheKey,FileSystem>> jobFsMap
-        = new ConcurrentHashMap8<>();
-
-    /**
-     * Creates HadoopLazyConcurrentMap.
-     * @return a new HadoopLazyConcurrentMap.
-     */
-    public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> 
createHadoopLazyConcurrentMap() {
-        return new HadoopLazyConcurrentMap<>(
-            new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() 
{
-                @Override public FileSystem createValue(FsCacheKey key) {
-                    try {
-                        assert key != null;
-
-                        // Explicitly disable FileSystem caching:
-                        URI uri = key.uri();
-
-                        String scheme = uri.getScheme();
-
-                        // Copy the configuration to avoid altering the 
external object.
-                        Configuration cfg = new 
Configuration(key.configuration());
-
-                        String prop = 
HadoopUtils.disableFsCachePropertyName(scheme);
-
-                        cfg.setBoolean(prop, true);
-
-                        return FileSystem.get(uri, cfg, key.user());
-                    }
-                    catch (IOException | InterruptedException ioe) {
-                        throw new IgniteException(ioe);
-                    }
-                }
-            }
-        );
-    }
-
     /**
      * Constructor.
      */
@@ -393,257 +344,4 @@ public class HadoopUtils {
         }
     }
 
-    /**
-     * Gets non-null user name as per the Hadoop viewpoint.
-     * @param cfg the Hadoop job configuration, may be null.
-     * @return the user name, never null.
-     */
-    private static String getMrHadoopUser(Configuration cfg) throws 
IOException {
-        String user = cfg.get(MRJobConfig.USER_NAME);
-
-        if (user == null)
-            user = IgniteHadoopFileSystem.getFsHadoopUser();
-
-        return user;
-    }
-
-    /**
-     * Common method to get the V1 file system in MapRed engine.
-     * It creates the filesystem for the user specified in the
-     * configuration with {@link MRJobConfig#USER_NAME} property.
-     * @param uri the file system uri.
-     * @param cfg the configuration.
-     * @return the file system
-     * @throws IOException
-     */
-    public static FileSystem fileSystemForMrUser(@Nullable URI uri, 
Configuration cfg, @Nullable HadoopJobId jobId)
-            throws IOException {
-        final String usr = getMrHadoopUser(cfg);
-
-        assert usr != null;
-
-        if (uri == null)
-            uri = FileSystem.getDefaultUri(cfg);
-
-        final FileSystem fs;
-
-        try {
-            fs = getWithCaching(uri, cfg, usr, jobId);
-        }
-        catch (IgniteException ie) {
-            throw new IOException(ie);
-        }
-
-        assert fs != null;
-        assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, 
((IgniteHadoopFileSystem)fs).user());
-
-        return fs;
-    }
-
-    /**
-     * Note that configuration is not a part of the key.
-     * It is used solely to initialize the first instance
-     * that is created for the key.
-     */
-    public static final class FsCacheKey {
-        /** */
-        private final URI uri;
-
-        /** */
-        private final String usr;
-
-        /** */
-        private final String equalityKey;
-
-        /** */
-        private final Configuration cfg;
-
-        /**
-         * Constructor
-         */
-        public FsCacheKey(URI uri, String usr, Configuration cfg) {
-            assert uri != null;
-            assert usr != null;
-            assert cfg != null;
-
-            this.uri = fixUri(uri, cfg);
-            this.usr = usr;
-            this.cfg = cfg;
-
-            this.equalityKey = createEqualityKey();
-        }
-
-        /**
-         * Creates String key used for equality and hashing.
-         */
-        private String createEqualityKey() {
-            GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@");
-
-            if (uri.getScheme() != null)
-                sb.a(uri.getScheme().toLowerCase());
-
-            sb.a("://");
-
-            if (uri.getAuthority() != null)
-                sb.a(uri.getAuthority().toLowerCase());
-
-            return sb.toString();
-        }
-
-        /**
-         * The URI.
-         */
-        public URI uri() {
-            return uri;
-        }
-
-        /**
-         * The User.
-         */
-        public String user() {
-            return usr;
-        }
-
-        /**
-         * The Configuration.
-         */
-        public Configuration configuration() {
-            return cfg;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("SimplifiableIfStatement")
-        @Override public boolean equals(Object obj) {
-            if (obj == this)
-                return true;
-
-            if (obj == null || getClass() != obj.getClass())
-                return false;
-
-            return equalityKey.equals(((FsCacheKey)obj).equalityKey);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return equalityKey.hashCode();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return equalityKey;
-        }
-    }
-
-    /**
-     * Gets FileSystem caching it in static Ignite cache. The cache is a 
singleton
-     * for each class loader.
-     *
-     * <p/>Note that the file systems in the cache are keyed by a triplet 
{scheme, authority, user}.
-     * The Configuration is not a part of the key. This means that for the 
given key file system is
-     * initialized only once with the Configuration passed in upon the file 
system creation.
-     *
-     * @param uri The file system URI.
-     * @param cfg The configuration.
-     * @param usr The user to create file system for.
-     * @return The file system: either created, or taken from the cache.
-     */
-    private static FileSystem getWithCaching(URI uri, Configuration cfg, 
String usr, @Nullable HadoopJobId jobId) {
-        final FsCacheKey key = new FsCacheKey(uri, usr, cfg);
-
-        if (jobId == null)
-            return fileSysLazyMap.getOrCreate(key);
-        else {
-            HadoopLazyConcurrentMap<FsCacheKey,FileSystem> lm = 
getFsMapForJob(jobId);
-
-            return lm.getOrCreate(key);
-        }
-    }
-
-    /**
-     * Gets Fs map for given job Id and localNodeId. If local node Id not 
null, registers this
-     * local node id to track subsequent removal.
-     * @param jobId The job Id.
-     * @return File system map.
-     */
-    private static HadoopLazyConcurrentMap<FsCacheKey,FileSystem> 
getFsMapForJob(final HadoopJobId jobId) {
-        assert jobId != null;
-
-        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> lazy = 
jobFsMap.get(jobId);
-
-        if (lazy != null)
-            return lazy;
-
-        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> newLM = 
createHadoopLazyConcurrentMap();
-
-        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> pushedT2 = 
jobFsMap.putIfAbsent(jobId, newLM);
-
-        if (pushedT2 == null)
-            lazy = newLM;
-        else {
-            lazy = pushedT2;
-
-            try {
-                newLM.close();
-            } catch (IgniteCheckedException ice) {
-                throw new IgniteException(ice);
-            }
-        }
-
-        return lazy;
-    }
-
-    /**
-     * Closes file system map for this job Id and local node id.
-     * @param jobId The job id.
-     * @throws IgniteCheckedException
-     */
-    public static synchronized void close(final HadoopJobId jobId) throws 
IgniteCheckedException {
-        assert jobId != null;
-
-        final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> lazy = 
jobFsMap.remove(jobId);
-
-        if (lazy != null)
-            lazy.close();
-    }
-
-    /**
-     * Gets the property name to disable file system cache.
-     * @param scheme The file system URI scheme.
-     * @return The property name. If scheme is null,
-     * returns "fs.null.impl.disable.cache".
-     */
-    public static String disableFsCachePropertyName(@Nullable String scheme) {
-        return String.format("fs.%s.impl.disable.cache", scheme);
-    }
-
-    /**
-     * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
-     * @param uri0 The uri.
-     * @param cfg The cfg.
-     * @return Correct URI.
-     */
-    public static URI fixUri(URI uri0, Configuration cfg) {
-        if (uri0 == null)
-            return FileSystem.getDefaultUri(cfg);
-
-        String scheme = uri0.getScheme();
-        String authority = uri0.getAuthority();
-
-        if (authority == null) {
-            URI dfltUri = FileSystem.getDefaultUri(cfg);
-
-            if (scheme == null || (scheme.equals(dfltUri.getScheme()) && 
dfltUri.getAuthority() != null))
-                return dfltUri;
-        }
-
-        return uri0;
-    }
-
-    /**
-     * This method is called with reflection upon Job finish. This will clean 
up all the Fs created for tasks.
-     * @throws IgniteCheckedException
-     */
-    public static void close() throws IgniteCheckedException {
-        fileSysLazyMap.close();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index dd679de..ef04b0f 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.hadoop;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.security.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -76,7 +77,7 @@ public class SecondaryFileSystemProvider {
         }
 
         // Disable caching:
-        String prop = HadoopUtils.disableFsCachePropertyName(uri.getScheme());
+        String prop = 
HadoopFileSystemsUtils.disableFsCachePropertyName(uri.getScheme());
 
         cfg.setBoolean(prop, true);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java
new file mode 100644
index 0000000..cac248b
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.ignite.*;
+import org.apache.ignite.hadoop.fs.v1.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.concurrent.*;
+
+/**
+ * Static caches of file systems used by Map-Reduce tasks and jobs.
+ * This class
+ */
+public class HadoopFileSystemCache {
+    /** Lazy per-user file system cache used by Hadoop tasks. */
+    private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> 
taskFsMap
+        = createHadoopLazyConcurrentMap();
+
+    /** File system cache for used by Hadoop jobs. */
+    private static final ConcurrentMap<HadoopJobId, 
HadoopLazyConcurrentMap<FsCacheKey,FileSystem>> jobFsMap
+        = new ConcurrentHashMap8<>();
+
+    /**
+     * Creates HadoopLazyConcurrentMap.
+     * @return a new HadoopLazyConcurrentMap.
+     */
+    private static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> 
createHadoopLazyConcurrentMap() {
+        return new HadoopLazyConcurrentMap<>(
+            new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() 
{
+                @Override public FileSystem createValue(FsCacheKey key) {
+                    try {
+                        assert key != null;
+
+                        // Explicitly disable FileSystem caching:
+                        URI uri = key.uri();
+
+                        String scheme = uri.getScheme();
+
+                        // Copy the configuration to avoid altering the 
external object.
+                        Configuration cfg = new 
Configuration(key.configuration());
+
+                        String prop = 
HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
+
+                        cfg.setBoolean(prop, true);
+
+                        return FileSystem.get(uri, cfg, key.user());
+                    }
+                    catch (IOException | InterruptedException ioe) {
+                        throw new IgniteException(ioe);
+                    }
+                }
+            }
+        );
+    }
+
+    /**
+     * Gets non-null user name as per the Hadoop viewpoint.
+     * @param cfg the Hadoop job configuration, may be null.
+     * @return the user name, never null.
+     */
+    private static String getMrHadoopUser(Configuration cfg) throws 
IOException {
+        String user = cfg.get(MRJobConfig.USER_NAME);
+
+        if (user == null)
+            user = IgniteHadoopFileSystem.getFsHadoopUser();
+
+        return user;
+    }
+
+    /**
+     * Common method to get the V1 file system in MapRed engine.
+     * It gets the filesystem for the user specified in the
+     * configuration with {@link MRJobConfig#USER_NAME} property.
+     * The file systems are created and cached upon first request.
+     *
+     * <p/> The behavior of this method relies upon class loader structure of 
map-red engine.
+     * In particular, file system for a job must be requested by Job {@link 
HadoopClassLoader} specific
+     * for local node id (grid instance). The file system for a task must be 
requested by Task {@link HadoopClassLoader}
+     * specific for that task or reused from another task of the same job.
+     *
+     * @param uri the file system uri.
+     * @param cfg the configuration.
+     * @param jobId The job id, if file system is requested for a job, or null 
if the file system is requested for
+     * a task.
+     * @return The file system.
+     * @throws IOException On error.
+     */
+    public static FileSystem fileSystemForMrUser(@Nullable URI uri, 
Configuration cfg, @Nullable HadoopJobId jobId)
+            throws IOException {
+        final String usr = getMrHadoopUser(cfg);
+
+        assert usr != null;
+
+        if (uri == null)
+            uri = FileSystem.getDefaultUri(cfg);
+
+        final FileSystem fs;
+
+        try {
+            fs = getWithCaching(uri, cfg, usr, jobId);
+        }
+        catch (IgniteException ie) {
+            throw new IOException(ie);
+        }
+
+        assert fs != null;
+        assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, 
((IgniteHadoopFileSystem)fs).user());
+
+        return fs;
+    }
+
+    /**
+     * Gets FileSystem caching it in static Ignite cache. The cache is a 
singleton
+     * for each class loader.
+     *
+     * <p/>Note that the file systems in the cache are keyed by a triplet 
{scheme, authority, user}.
+     * The Configuration is not a part of the key. This means that for the 
given key file system is
+     * initialized only once with the Configuration passed in upon the file 
system creation.
+     *
+     * @param uri The file system URI.
+     * @param cfg The configuration.
+     * @param usr The user to create file system for.
+     * @return The file system: either created, or taken from the cache.
+     */
+    private static FileSystem getWithCaching(URI uri, Configuration cfg, 
String usr, @Nullable HadoopJobId jobId) {
+        final FsCacheKey key = new FsCacheKey(uri, usr, cfg);
+
+        if (jobId == null)
+            return taskFsMap.getOrCreate(key);
+        else {
+            HadoopLazyConcurrentMap<FsCacheKey,FileSystem> lm = 
getFsMapForJob(jobId);
+
+            return lm.getOrCreate(key);
+        }
+    }
+
+    /**
+     * Gets Fs map for given job Id and localNodeId. If local node Id not 
null, registers this
+     * local node id to track subsequent removal.
+     * @param jobId The job Id.
+     * @return File system map.
+     */
+    private static HadoopLazyConcurrentMap<FsCacheKey,FileSystem> 
getFsMapForJob(final HadoopJobId jobId) {
+        assert jobId != null;
+
+        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map = 
jobFsMap.get(jobId);
+
+        if (map != null)
+            return map;
+
+        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> newLM = 
createHadoopLazyConcurrentMap();
+
+        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> pushedT2 = 
jobFsMap.putIfAbsent(jobId, newLM);
+
+        if (pushedT2 == null)
+            map = newLM;
+        else {
+            map = pushedT2;
+
+            try {
+                newLM.close();
+            } catch (IgniteCheckedException ice) {
+                throw new IgniteException(ice);
+            }
+        }
+
+        return map;
+    }
+
+    /**
+     * Closes file system map for this job Id and local node id.
+     * @param jobId The job id.
+     * @throws IgniteCheckedException
+     */
+    public static synchronized void close(final HadoopJobId jobId) throws 
IgniteCheckedException {
+        assert jobId != null;
+
+        final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map = 
jobFsMap.remove(jobId);
+
+        if (map != null)
+            map.close();
+    }
+
+    /**
+     * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
+     * @param uri0 The uri.
+     * @param cfg The cfg.
+     * @return Correct URI.
+     */
+    private static URI fixUri(URI uri0, Configuration cfg) {
+        if (uri0 == null)
+            return FileSystem.getDefaultUri(cfg);
+
+        String scheme = uri0.getScheme();
+        String authority = uri0.getAuthority();
+
+        if (authority == null) {
+            URI dfltUri = FileSystem.getDefaultUri(cfg);
+
+            if (scheme == null || (scheme.equals(dfltUri.getScheme()) && 
dfltUri.getAuthority() != null))
+                return dfltUri;
+        }
+
+        return uri0;
+    }
+
+    /**
+     * This method is called with reflection upon Job finish. This will clean 
up all the Fs created for tasks.
+     * @throws IgniteCheckedException
+     */
+    public static void close() throws IgniteCheckedException {
+        taskFsMap.close();
+    }
+
+    /**
+     * Note that configuration is not a part of the key.
+     * It is used solely to initialize the first instance
+     * that is created for the key.
+     */
+    public static final class FsCacheKey {
+        /** */
+        private final URI uri;
+
+        /** */
+        private final String usr;
+
+        /** */
+        private final String equalityKey;
+
+        /** */
+        private final Configuration cfg;
+
+        /**
+         * Constructor
+         */
+        public FsCacheKey(URI uri, String usr, Configuration cfg) {
+            assert uri != null;
+            assert usr != null;
+            assert cfg != null;
+
+            this.uri = fixUri(uri, cfg);
+            this.usr = usr;
+            this.cfg = cfg;
+
+            this.equalityKey = createEqualityKey();
+        }
+
+        /**
+         * Creates String key used for equality and hashing.
+         */
+        private String createEqualityKey() {
+            GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@");
+
+            if (uri.getScheme() != null)
+                sb.a(uri.getScheme().toLowerCase());
+
+            sb.a("://");
+
+            if (uri.getAuthority() != null)
+                sb.a(uri.getAuthority().toLowerCase());
+
+            return sb.toString();
+        }
+
+        /**
+         * The URI.
+         */
+        public URI uri() {
+            return uri;
+        }
+
+        /**
+         * The User.
+         */
+        public String user() {
+            return usr;
+        }
+
+        /**
+         * The Configuration.
+         */
+        public Configuration configuration() {
+            return cfg;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("SimplifiableIfStatement")
+        @Override public boolean equals(Object obj) {
+            if (obj == this)
+                return true;
+
+            if (obj == null || getClass() != obj.getClass())
+                return false;
+
+            return equalityKey.equals(((FsCacheKey)obj).equalityKey);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return equalityKey.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return equalityKey;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
index d90bc28..382bbd0 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop.fs;
 
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
+import org.jetbrains.annotations.*;
 
 /**
  * Utilities for configuring file systems to support the separate working 
directory per each thread.
@@ -37,4 +38,14 @@ public class HadoopFileSystemsUtils {
         cfg.set("fs.AbstractFileSystem." + 
FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
                 HadoopLocalFileSystemV2.class.getName());
     }
+
+    /**
+     * Gets the property name to disable file system cache.
+     * @param scheme The file system URI scheme.
+     * @return The property name. If scheme is null,
+     * returns "fs.null.impl.disable.cache".
+     */
+    public static String disableFsCachePropertyName(@Nullable String scheme) {
+        return String.format("fs.%s.impl.disable.cache", scheme);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index 6e957df..7e70865 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -96,9 +96,7 @@ public class HadoopV2Job implements HadoopJob {
         this.jobInfo = jobInfo;
         this.locNodeId = locNodeId;
 
-        // TODO: debug:
-        assert getClass().getClassLoader() instanceof HadoopClassLoader;
-        assert 
getClass().getClassLoader().toString().contains(locNodeId.toString());
+        assert 
((HadoopClassLoader)getClass().getClassLoader()).name().equals(HadoopClassLoader.nameForJob(locNodeId));
 
         hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId());
 
@@ -113,7 +111,7 @@ public class HadoopV2Job implements HadoopJob {
 
         jobCtx = new JobContextImpl(jobConf, hadoopJobID);
 
-        rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log);
+        rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log, 
locNodeId);
     }
 
     /** {@inheritDoc} */
@@ -144,7 +142,10 @@ public class HadoopV2Job implements HadoopJob {
             Path jobDir = new Path(jobDirPath);
 
             try {
-                FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, 
jobId);
+                assert 
((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
+                    .equals(HadoopClassLoader.nameForJob(locNodeId));
+
+                FileSystem fs = 
HadoopFileSystemCache.fileSystemForMrUser(jobDir.toUri(), jobConf, jobId);
 
                 JobSplit.TaskSplitMetaInfo[] metaInfos = 
SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf,
                     jobDir);
@@ -211,7 +212,7 @@ public class HadoopV2Job implements HadoopJob {
                 // Note that the classloader identified by the task it was 
initially created for,
                 // but later it may be reused for other tasks.
                 HadoopClassLoader ldr = new 
HadoopClassLoader(rsrcMgr.classPath(),
-                    "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + 
info.taskNumber());
+                    HadoopClassLoader.nameForTask(info, false));
 
                 cls = (Class<? extends 
HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName());
 
@@ -277,8 +278,6 @@ public class HadoopV2Job implements HadoopJob {
     @Override public void dispose(boolean external) throws 
IgniteCheckedException {
         boolean dsp = disposed.compareAndSet(false, true);
 
-        X.println("###### Dispose: " + locNodeId + ", ldr = " + 
getClass().getClassLoader());
-
         if (!dsp)
             return;
 
@@ -324,34 +323,14 @@ public class HadoopV2Job implements HadoopJob {
 
             assert fullCtxClsQueue.isEmpty();
 
-            // Close all cached Fs for this Job:
-            //HadoopUtils.close(jobId.toString(), locNodeId.toString());
-            //closeCachedFileSystems(getClass().getClassLoader());
-            HadoopUtils.close(jobId);
-
-            invokeGc();
+            // Close all cached file systems for this Job:
+            HadoopFileSystemCache.close(jobId);
 
             if (err != null)
                 throw U.cast(err);
         }
     }
 
-    // TODO: remove
-    private void invokeGc() {
-        int i = 0;
-
-        while (i++ < 5)
-            System.gc();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void finalize() throws Throwable {
-        super.finalize();
-
-        // TODO: remove
-        dispose(false);
-    }
-
     /**
      * Stops Hadoop Fs daemon threads.
      * @param ldr The task ClassLoader to stop the daemons for.
@@ -371,7 +350,7 @@ public class HadoopV2Job implements HadoopJob {
      * @throws Exception On error.
      */
     private void closeCachedFileSystems(ClassLoader ldr) throws Exception {
-        Class<?> clazz = ldr.loadClass(HadoopUtils.class.getName());
+        Class<?> clazz = ldr.loadClass(HadoopFileSystemCache.class.getName());
 
         Method m = clazz.getMethod("close");
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index b86f16d..05d61fc 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -41,7 +41,7 @@ import java.util.*;
  */
 public class HadoopV2JobResourceManager {
     /** File type Fs disable caching property name. */
-    private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = 
HadoopUtils.disableFsCachePropertyName("file");
+    private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = 
HadoopFileSystemsUtils.disableFsCachePropertyName("file");
 
     /** Hadoop job context. */
     private final JobContextImpl ctx;
@@ -61,16 +61,19 @@ public class HadoopV2JobResourceManager {
     /** Staging directory to delivery job jar and config to the work nodes. */
     private Path stagingDir;
 
+    private final UUID locNodeId;
+
     /**
      * Creates new instance.
      * @param jobId Job ID.
      * @param ctx Hadoop job context.
      * @param log Logger.
      */
-    public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, 
IgniteLogger log) {
+    public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, 
IgniteLogger log, UUID locNodeId) {
         this.jobId = jobId;
         this.ctx = ctx;
         this.log = log.getLogger(HadoopV2JobResourceManager.class);
+        this.locNodeId = locNodeId;
     }
 
     /**
@@ -115,7 +118,10 @@ public class HadoopV2JobResourceManager {
                 stagingDir = new Path(new URI(mrDir));
 
                 if (download) {
-                    FileSystem fs = 
HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg, jobId);
+                    assert 
((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
+                        .equals(HadoopClassLoader.nameForJob(locNodeId));
+
+                    FileSystem fs = 
HadoopFileSystemCache.fileSystemForMrUser(stagingDir.toUri(), cfg, jobId);
 
                     if (!fs.exists(stagingDir))
                         throw new IgniteCheckedException("Failed to find 
map-reduce submission " +
@@ -210,7 +216,10 @@ public class HadoopV2JobResourceManager {
 
             FileSystem dstFs = FileSystem.getLocal(cfg);
 
-            FileSystem srcFs = 
HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, jobId);
+            assert 
((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
+                .equals(HadoopClassLoader.nameForJob(locNodeId));
+
+            FileSystem srcFs = 
HadoopFileSystemCache.fileSystemForMrUser(srcPath.toUri(), cfg, jobId);
 
             if (extract) {
                 File archivesPath = new File(jobLocDir.getAbsolutePath(), 
".cached-archives");
@@ -293,7 +302,10 @@ public class HadoopV2JobResourceManager {
     public void cleanupStagingDirectory() {
         try {
             if (stagingDir != null) {
-                FileSystem fs = 
HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), jobId);
+                assert 
((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
+                    .equals(HadoopClassLoader.nameForJob(locNodeId));
+
+                FileSystem fs = 
HadoopFileSystemCache.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), 
jobId);
 
                 fs.delete(stagingDir, true);
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f57dc8a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 7012566..24293b8 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -33,6 +33,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.v1.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -426,9 +427,14 @@ public class HadoopV2TaskContext extends HadoopTaskContext 
{
         FileSystem fs;
 
         try {
+            // This assertion uses .startsWith() instead of .equals() because 
task class loaders may
+            // be reused between tasks of the same job.
+            assert 
((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
+                .startsWith(HadoopClassLoader.nameForTask(taskInfo(), true));
+
             // Task class loader.
             // We also cache Fs there, all them will be cleared explicitly 
upon the Job end.
-            fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), null);
+            fs = HadoopFileSystemCache.fileSystemForMrUser(jobDir.toUri(), 
jobConf(), null);
         }
         catch (IOException e) {
             throw new IgniteCheckedException(e);


Reply via email to