Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-980 776b6c8c5 -> f27987549


# 980: avoided static class map.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f2798754
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f2798754
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f2798754

Branch: refs/heads/ignite-980
Commit: f2798754985ead00ffae90db6df2d237260bd975
Parents: 776b6c8
Author: iveselovskiy <iveselovs...@gridgain.com>
Authored: Mon Jun 22 22:09:33 2015 +0300
Committer: iveselovskiy <iveselovs...@gridgain.com>
Committed: Mon Jun 22 22:09:33 2015 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopJobInfo.java        |   3 +-
 .../hadoop/counter/HadoopCounterWriter.java     |   5 +-
 .../fs/IgniteHadoopFileSystemCounterWriter.java |   9 +-
 .../processors/hadoop/HadoopDefaultJobInfo.java |  48 +-------
 .../hadoop/fs/HadoopFileSystemCache.java        | 117 +++----------------
 .../hadoop/jobtracker/HadoopJobTracker.java     |  26 ++++-
 .../child/HadoopChildProcessRunner.java         |   6 +-
 .../processors/hadoop/v2/HadoopV2Job.java       |  63 ++++++----
 .../hadoop/v2/HadoopV2JobResourceManager.java   |  27 ++---
 .../hadoop/v2/HadoopV2TaskContext.java          |  19 ++-
 .../processors/hadoop/HadoopTasksV1Test.java    |   3 +-
 .../processors/hadoop/HadoopTasksV2Test.java    |   3 +-
 .../processors/hadoop/HadoopV2JobSelfTest.java  |   2 +-
 .../collections/HadoopAbstractMapTest.java      |   3 +-
 14 files changed, 126 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
index e676cbd..5876262 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
@@ -61,7 +61,8 @@ public interface HadoopJobInfo extends Serializable {
      * @return Job.
      * @throws IgniteCheckedException If failed.
      */
-    public HadoopJob createJob(UUID localNodeId, HadoopJobId jobId, 
IgniteLogger log) throws IgniteCheckedException;
+    public HadoopJob createJob(Class<? extends HadoopJob> jobCls0,
+        HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException;
 
     /**
      * @return Number of reducers configured for job.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java
index ce67c57..f21a1e6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java
@@ -28,10 +28,9 @@ public interface HadoopCounterWriter {
     /**
      * Writes counters of given job to some statistics storage.
      *
-     * @param jobInfo Job info.
-     * @param jobId Job id.
+     * @param job The job.
      * @param cntrs Counters.
      * @throws IgniteCheckedException If failed.
      */
-    public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters 
cntrs) throws IgniteCheckedException;
+    public void write(HadoopJob job, HadoopCounters cntrs) throws 
IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 0ba4da4..b9232c9 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -25,7 +25,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.fs.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 
@@ -49,11 +49,14 @@ public class IgniteHadoopFileSystemCounterWriter implements 
HadoopCounterWriter
     private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + 
USER_MACRO;
 
     /** {@inheritDoc} */
-    @Override public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, 
HadoopCounters cntrs)
+    @Override public void write(HadoopJob job, HadoopCounters cntrs)
         throws IgniteCheckedException {
 
         Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration();
 
+        final HadoopJobInfo jobInfo = job.info();
+        final HadoopJobId jobId = job.id();
+
         for (Map.Entry<String, String> e : 
((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
             hadoopCfg.set(e.getKey(), e.getValue());
 
@@ -73,7 +76,7 @@ public class IgniteHadoopFileSystemCounterWriter implements 
HadoopCounterWriter
         try {
             hadoopCfg.set(MRJobConfig.USER_NAME, user);
 
-            FileSystem fs = 
HadoopFileSystemCache.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, 
jobId);
+            FileSystem fs = ((HadoopV2Job)job).fileSystem(jobStatPath.toUri(), 
hadoopCfg);
 
             fs.mkdirs(jobStatPath);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index a31ada5..a5f9913 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -18,17 +18,12 @@
 package org.apache.ignite.internal.processors.hadoop;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.fs.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
-import org.jsr166.*;
 
 import java.io.*;
 import java.lang.reflect.*;
 import java.util.*;
-import java.util.concurrent.*;
 
 /**
  * Hadoop job info based on default Hadoop configuration.
@@ -52,40 +47,6 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, 
Externalizable {
     /** User name. */
     private String user;
 
-    /** */
-    private static class CloseableClass implements Closeable {
-        private final Class<?> clazz;
-
-        CloseableClass(Class<?> c) {
-            clazz = c;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() {
-            // Noop
-        }
-
-        public Class<?> clazz() {
-            return clazz;
-        }
-    }
-
-    /** */
-    private static final HadoopLazyConcurrentMap<UUID, CloseableClass> 
hadoopV2JobClasses =
-        new HadoopLazyConcurrentMap<>(new 
HadoopLazyConcurrentMap.ValueFactory<UUID, CloseableClass>() {
-            @Override public CloseableClass createValue(UUID key) {
-                HadoopClassLoader ldr = new HadoopClassLoader(null, 
HadoopClassLoader.nameForJob(key));
-
-                try {
-                    Class<?> jobCls = 
ldr.loadClass(HadoopV2Job.class.getName());
-
-                    return new CloseableClass(jobCls);
-                } catch (Exception ioe) {
-                    throw new IgniteException(ioe);
-                }
-            }
-        });
-
     /**
      * Default constructor required by {@link Externalizable}.
      */
@@ -117,14 +78,13 @@ public class HadoopDefaultJobInfo implements 
HadoopJobInfo, Externalizable {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopJob createJob(UUID nodeId, HadoopJobId jobId, 
IgniteLogger log) throws IgniteCheckedException {
+    @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls0,
+            HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException 
{
         try {
-            Class<?> jobCls0 = hadoopV2JobClasses.getOrCreate(nodeId).clazz();
-
-            Constructor<?> constructor = 
jobCls0.getConstructor(HadoopJobId.class, UUID.class,
+            Constructor<? extends HadoopJob> constructor = 
jobCls0.getConstructor(HadoopJobId.class,
                 HadoopDefaultJobInfo.class, IgniteLogger.class);
 
-            return (HadoopJob)constructor.newInstance(jobId, nodeId, this, 
log);
+            return constructor.newInstance(jobId, this, log);
         }
         // NB: java.lang.NoClassDefFoundError may be thrown from 
Class#getConstructor() call.
         catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java
index cac248b..96b32db 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java
@@ -22,34 +22,23 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.ignite.*;
 import org.apache.ignite.hadoop.fs.v1.*;
-import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.jetbrains.annotations.*;
-import org.jsr166.*;
 
 import java.io.*;
 import java.net.*;
-import java.util.concurrent.*;
 
 /**
  * Static caches of file systems used by Map-Reduce tasks and jobs.
  * This class
  */
 public class HadoopFileSystemCache {
-    /** Lazy per-user file system cache used by Hadoop tasks. */
-    private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> 
taskFsMap
-        = createHadoopLazyConcurrentMap();
-
-    /** File system cache for used by Hadoop jobs. */
-    private static final ConcurrentMap<HadoopJobId, 
HadoopLazyConcurrentMap<FsCacheKey,FileSystem>> jobFsMap
-        = new ConcurrentHashMap8<>();
-
     /**
      * Creates HadoopLazyConcurrentMap.
      * @return a new HadoopLazyConcurrentMap.
      */
-    private static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> 
createHadoopLazyConcurrentMap() {
+    public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> 
createHadoopLazyConcurrentMap() {
         return new HadoopLazyConcurrentMap<>(
             new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() 
{
                 @Override public FileSystem createValue(FsCacheKey key) {
@@ -96,22 +85,20 @@ public class HadoopFileSystemCache {
      * Common method to get the V1 file system in MapRed engine.
      * It gets the filesystem for the user specified in the
      * configuration with {@link MRJobConfig#USER_NAME} property.
-     * The file systems are created and cached upon first request.
-     *
-     * <p/> The behavior of this method relies upon class loader structure of 
map-red engine.
-     * In particular, file system for a job must be requested by Job {@link 
HadoopClassLoader} specific
-     * for local node id (grid instance). The file system for a task must be 
requested by Task {@link HadoopClassLoader}
-     * specific for that task or reused from another task of the same job.
+     * The file systems are created and cached in the given map upon first 
request.
      *
-     * @param uri the file system uri.
-     * @param cfg the configuration.
-     * @param jobId The job id, if file system is requested for a job, or null 
if the file system is requested for
-     * a task.
+     * @param uri The file system uri.
+     * @param cfg The configuration.
+     * @param map The caching map.
      * @return The file system.
      * @throws IOException On error.
      */
-    public static FileSystem fileSystemForMrUser(@Nullable URI uri, 
Configuration cfg, @Nullable HadoopJobId jobId)
+    public static FileSystem fileSystemForMrUser(@Nullable URI uri, 
Configuration cfg,
+        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map)
             throws IOException {
+        assert map != null;
+        assert cfg != null;
+
         final String usr = getMrHadoopUser(cfg);
 
         assert usr != null;
@@ -122,7 +109,9 @@ public class HadoopFileSystemCache {
         final FileSystem fs;
 
         try {
-            fs = getWithCaching(uri, cfg, usr, jobId);
+            final FsCacheKey key = new FsCacheKey(uri, usr, cfg);
+
+            fs = map.getOrCreate(key);
         }
         catch (IgniteException ie) {
             throw new IOException(ie);
@@ -135,78 +124,6 @@ public class HadoopFileSystemCache {
     }
 
     /**
-     * Gets FileSystem caching it in static Ignite cache. The cache is a 
singleton
-     * for each class loader.
-     *
-     * <p/>Note that the file systems in the cache are keyed by a triplet 
{scheme, authority, user}.
-     * The Configuration is not a part of the key. This means that for the 
given key file system is
-     * initialized only once with the Configuration passed in upon the file 
system creation.
-     *
-     * @param uri The file system URI.
-     * @param cfg The configuration.
-     * @param usr The user to create file system for.
-     * @return The file system: either created, or taken from the cache.
-     */
-    private static FileSystem getWithCaching(URI uri, Configuration cfg, 
String usr, @Nullable HadoopJobId jobId) {
-        final FsCacheKey key = new FsCacheKey(uri, usr, cfg);
-
-        if (jobId == null)
-            return taskFsMap.getOrCreate(key);
-        else {
-            HadoopLazyConcurrentMap<FsCacheKey,FileSystem> lm = 
getFsMapForJob(jobId);
-
-            return lm.getOrCreate(key);
-        }
-    }
-
-    /**
-     * Gets Fs map for given job Id and localNodeId. If local node Id not 
null, registers this
-     * local node id to track subsequent removal.
-     * @param jobId The job Id.
-     * @return File system map.
-     */
-    private static HadoopLazyConcurrentMap<FsCacheKey,FileSystem> 
getFsMapForJob(final HadoopJobId jobId) {
-        assert jobId != null;
-
-        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map = 
jobFsMap.get(jobId);
-
-        if (map != null)
-            return map;
-
-        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> newLM = 
createHadoopLazyConcurrentMap();
-
-        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> pushedT2 = 
jobFsMap.putIfAbsent(jobId, newLM);
-
-        if (pushedT2 == null)
-            map = newLM;
-        else {
-            map = pushedT2;
-
-            try {
-                newLM.close();
-            } catch (IgniteCheckedException ice) {
-                throw new IgniteException(ice);
-            }
-        }
-
-        return map;
-    }
-
-    /**
-     * Closes file system map for this job Id and local node id.
-     * @param jobId The job id.
-     * @throws IgniteCheckedException
-     */
-    public static synchronized void close(final HadoopJobId jobId) throws 
IgniteCheckedException {
-        assert jobId != null;
-
-        final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map = 
jobFsMap.remove(jobId);
-
-        if (map != null)
-            map.close();
-    }
-
-    /**
      * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
      * @param uri0 The uri.
      * @param cfg The cfg.
@@ -230,14 +147,6 @@ public class HadoopFileSystemCache {
     }
 
     /**
-     * This method is called with reflection upon Job finish. This will clean 
up all the Fs created for tasks.
-     * @throws IgniteCheckedException
-     */
-    public static void close() throws IgniteCheckedException {
-        taskFsMap.close();
-    }
-
-    /**
      * Note that configuration is not a part of the key.
      * It is used solely to initialize the first instance
      * that is created for the key.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index b7377d4..ae8e107 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
 import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
 import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -82,6 +83,9 @@ public class HadoopJobTracker extends HadoopComponent {
     /** Component busy lock. */
     private GridSpinReadWriteLock busyLock;
 
+    /** Class to create HadoopJob instances from. */
+    private volatile Class<? extends HadoopJob> jobCls;
+
     /** Closure to check result of async transform of system cache. */
     private final IgniteInClosure<IgniteInternalFuture<?>> failsLog = new 
CI1<IgniteInternalFuture<?>>() {
         @Override public void apply(IgniteInternalFuture<?> gridFut) {
@@ -95,12 +99,25 @@ public class HadoopJobTracker extends HadoopComponent {
     };
 
     /** {@inheritDoc} */
-    @Override public void start(HadoopContext ctx) throws 
IgniteCheckedException {
+    @SuppressWarnings("unchecked")
+    @Override public void start(final HadoopContext ctx) throws 
IgniteCheckedException {
         super.start(ctx);
 
         busyLock = new GridSpinReadWriteLock();
 
         evtProcSvc = Executors.newFixedThreadPool(1);
+
+        UUID nodeId = ctx.localNodeId();
+
+        assert jobCls == null;
+
+        HadoopClassLoader ldr = new HadoopClassLoader(null, 
HadoopClassLoader.nameForJob(nodeId));
+
+        try {
+            jobCls = 
(Class<HadoopV2Job>)ldr.loadClass(HadoopV2Job.class.getName());
+        } catch (Exception ioe) {
+            throw new IgniteException(ioe);
+        }
     }
 
     /**
@@ -224,7 +241,6 @@ public class HadoopJobTracker extends HadoopComponent {
         // Fail all pending futures.
         for (GridFutureAdapter<HadoopJobId> fut : activeFinishFuts.values())
             fut.onDone(new IgniteCheckedException("Failed to execute Hadoop 
map-reduce job (grid is stopping)."));
-
     }
 
     /**
@@ -839,7 +855,7 @@ public class HadoopJobTracker extends HadoopComponent {
 
                             HadoopCounters cntrs = meta.counters();
 
-                            writer.write(job.info(), jobId, cntrs);
+                            writer.write(job, cntrs);
                         }
                     }
                     catch (Exception e) {
@@ -987,9 +1003,7 @@ public class HadoopJobTracker extends HadoopComponent {
                 jobInfo = meta.jobInfo();
             }
 
-            UUID nodeId = ctx.localNodeId();
-
-            job = jobInfo.createJob(nodeId, jobId, log);
+            job = jobInfo.createJob(jobCls, jobId, log);
 
             job.initialize(false, ctx.localNodeId());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
index cfa393e..b0b0b8c 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
@@ -25,13 +25,13 @@ import 
org.apache.ignite.internal.processors.hadoop.shuffle.*;
 import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
 import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
 import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.offheap.unsafe.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
-import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
@@ -116,9 +116,7 @@ public class HadoopChildProcessRunner {
 
                 assert job == null;
 
-                UUID nodeId = nodeDesc.parentNodeId();
-
-                job = req.jobInfo().createJob(nodeId, req.jobId(), log);
+                job = req.jobInfo().createJob(HadoopV2Job.class, req.jobId(), 
log);
 
                 job.initialize(true, nodeDesc.processId());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index 7e70865..319640d 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.hadoop.v2;
 
+import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.*;
@@ -30,16 +31,19 @@ import org.apache.ignite.internal.processors.hadoop.v1.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
 import java.io.*;
 import java.lang.reflect.*;
+import java.net.*;
 import java.util.*;
 import java.util.Queue;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCache.*;
 
 /**
  * Hadoop job implementation for v2 API.
@@ -71,14 +75,18 @@ public class HadoopV2Job implements HadoopJob {
     private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = 
new ConcurrentLinkedQueue<>();
 
     /** All created contexts. */
-    private final Queue<Class<?>> fullCtxClsQueue = new 
ConcurrentLinkedDeque<>();
+    private final Queue<Class<? extends HadoopTaskContext>> fullCtxClsQueue = 
new ConcurrentLinkedDeque<>();
 
     /** Local node ID */
-    private final UUID locNodeId;
+    private volatile UUID locNodeId;
 
     /** Serialized JobConf. */
     private volatile byte[] jobConfData;
 
+    /** File system cache map. */
+    private final HadoopLazyConcurrentMap<HadoopFileSystemCache.FsCacheKey, 
FileSystem> fsMap
+        = createHadoopLazyConcurrentMap();
+
     /** Disposal guard. */
     private final AtomicBoolean disposed = new AtomicBoolean();
 
@@ -87,16 +95,16 @@ public class HadoopV2Job implements HadoopJob {
      * @param jobInfo Job info.
      * @param log Logger.
      */
-    public HadoopV2Job(HadoopJobId jobId, UUID locNodeId, final 
HadoopDefaultJobInfo jobInfo, IgniteLogger log) {
+    public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, 
IgniteLogger log) {
         assert jobId != null;
-        assert locNodeId != null;
+        //assert locNodeId != null;
         assert jobInfo != null;
 
         this.jobId = jobId;
         this.jobInfo = jobInfo;
-        this.locNodeId = locNodeId;
+        //this.locNodeId = locNodeId;
 
-        assert 
((HadoopClassLoader)getClass().getClassLoader()).name().equals(HadoopClassLoader.nameForJob(locNodeId));
+        //assert 
((HadoopClassLoader)getClass().getClassLoader()).name().equals(HadoopClassLoader.nameForJob(locNodeId));
 
         hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId());
 
@@ -111,7 +119,7 @@ public class HadoopV2Job implements HadoopJob {
 
         jobCtx = new JobContextImpl(jobConf, hadoopJobID);
 
-        rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log, 
locNodeId);
+        rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log, this);
     }
 
     /** {@inheritDoc} */
@@ -142,10 +150,7 @@ public class HadoopV2Job implements HadoopJob {
             Path jobDir = new Path(jobDirPath);
 
             try {
-                assert 
((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
-                    .equals(HadoopClassLoader.nameForJob(locNodeId));
-
-                FileSystem fs = 
HadoopFileSystemCache.fileSystemForMrUser(jobDir.toUri(), jobConf, jobId);
+                FileSystem fs = fileSystem(jobDir.toUri(), jobConf);
 
                 JobSplit.TaskSplitMetaInfo[] metaInfos = 
SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf,
                     jobDir);
@@ -191,6 +196,7 @@ public class HadoopV2Job implements HadoopJob {
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) 
throws IgniteCheckedException {
         T2<HadoopTaskType, Integer> locTaskId = new T2<>(info.type(),  
info.taskNumber());
 
@@ -255,15 +261,13 @@ public class HadoopV2Job implements HadoopJob {
     /** {@inheritDoc} */
     @Override public void initialize(boolean external, UUID locNodeId) throws 
IgniteCheckedException {
         assert locNodeId != null;
-        assert this.locNodeId.equals(locNodeId);
 
-        Thread.currentThread().setContextClassLoader(jobConf.getClassLoader());
+        this.locNodeId = locNodeId;
 
-//        try {
-//            HadoopUtils.fileSystemForMrUser(null, jobConf);
-//        } catch (IOException ioe) {
-//            throw new IgniteCheckedException(ioe);
-//        }
+        assert ((HadoopClassLoader)getClass().getClassLoader()).name()
+            .equals(HadoopClassLoader.nameForJob(this.locNodeId));
+
+        Thread.currentThread().setContextClassLoader(jobConf.getClassLoader());
 
         try {
             rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(locNodeId, 
jobId));
@@ -297,7 +301,7 @@ public class HadoopV2Job implements HadoopJob {
             // Stop the daemon threads that have been created
             // with the task class loaders:
             while (true) {
-                Class<?> cls = fullCtxClsQueue.poll();
+                Class<? extends HadoopTaskContext> cls = 
fullCtxClsQueue.poll();
 
                 if (cls == null)
                     break;
@@ -310,7 +314,7 @@ public class HadoopV2Job implements HadoopJob {
 
                     // Also close all the FileSystems cached in
                     // HadoopLazyConcurrentMap for this *task* class loader:
-                    closeCachedFileSystems(ldr);
+                    closeCachedTaskFileSystems(ldr);
                 }
                 catch (Throwable e) {
                     if (err == null)
@@ -323,8 +327,8 @@ public class HadoopV2Job implements HadoopJob {
 
             assert fullCtxClsQueue.isEmpty();
 
-            // Close all cached file systems for this Job:
-            HadoopFileSystemCache.close(jobId);
+            // Close all cached file systems for this *Job*:
+            fsMap.close();
 
             if (err != null)
                 throw U.cast(err);
@@ -349,8 +353,8 @@ public class HadoopV2Job implements HadoopJob {
      * @param ldr The task class loader.
      * @throws Exception On error.
      */
-    private void closeCachedFileSystems(ClassLoader ldr) throws Exception {
-        Class<?> clazz = ldr.loadClass(HadoopFileSystemCache.class.getName());
+    private void closeCachedTaskFileSystems(ClassLoader ldr) throws Exception {
+        Class<?> clazz = ldr.loadClass(HadoopV2TaskContext.class.getName());
 
         Method m = clazz.getMethod("close");
 
@@ -386,4 +390,15 @@ public class HadoopV2Job implements HadoopJob {
     public JobConf jobConf() {
         return jobConf;
     }
+
+    /**
+     * Gets file system for this job.
+     * @param uri The uri.
+     * @param cfg The configuration.
+     * @return The file system.
+     * @throws IOException On error.
+     */
+    public FileSystem fileSystem(@Nullable URI uri, Configuration cfg) throws 
IOException {
+        return HadoopFileSystemCache.fileSystemForMrUser(uri, cfg, fsMap);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 05d61fc..55a31c6 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -39,7 +39,7 @@ import java.util.*;
  * Provides all resources are needed to the job execution. Downloads the main 
jar, the configuration and additional
  * files are needed to be placed on local files system.
  */
-public class HadoopV2JobResourceManager {
+class HadoopV2JobResourceManager {
     /** File type Fs disable caching property name. */
     private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = 
HadoopFileSystemsUtils.disableFsCachePropertyName("file");
 
@@ -61,7 +61,8 @@ public class HadoopV2JobResourceManager {
     /** Staging directory to delivery job jar and config to the work nodes. */
     private Path stagingDir;
 
-    private final UUID locNodeId;
+    /** TODO */
+    private final HadoopV2Job fsProvider;
 
     /**
      * Creates new instance.
@@ -69,11 +70,11 @@ public class HadoopV2JobResourceManager {
      * @param ctx Hadoop job context.
      * @param log Logger.
      */
-    public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, 
IgniteLogger log, UUID locNodeId) {
+    public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, 
IgniteLogger log, HadoopV2Job fsProvider) {
         this.jobId = jobId;
         this.ctx = ctx;
         this.log = log.getLogger(HadoopV2JobResourceManager.class);
-        this.locNodeId = locNodeId;
+        this.fsProvider = fsProvider;
     }
 
     /**
@@ -118,10 +119,10 @@ public class HadoopV2JobResourceManager {
                 stagingDir = new Path(new URI(mrDir));
 
                 if (download) {
-                    assert 
((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
-                        .equals(HadoopClassLoader.nameForJob(locNodeId));
+//                    assert 
((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
+//                        .equals(HadoopClassLoader.nameForJob(locNodeId));
 
-                    FileSystem fs = 
HadoopFileSystemCache.fileSystemForMrUser(stagingDir.toUri(), cfg, jobId);
+                    FileSystem fs = fsProvider.fileSystem(stagingDir.toUri(), 
cfg);
 
                     if (!fs.exists(stagingDir))
                         throw new IgniteCheckedException("Failed to find 
map-reduce submission " +
@@ -216,10 +217,10 @@ public class HadoopV2JobResourceManager {
 
             FileSystem dstFs = FileSystem.getLocal(cfg);
 
-            assert 
((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
-                .equals(HadoopClassLoader.nameForJob(locNodeId));
+//            assert 
((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
+//                .equals(HadoopClassLoader.nameForJob(locNodeId));
 
-            FileSystem srcFs = 
HadoopFileSystemCache.fileSystemForMrUser(srcPath.toUri(), cfg, jobId);
+            FileSystem srcFs = fsProvider.fileSystem(srcPath.toUri(), cfg);
 
             if (extract) {
                 File archivesPath = new File(jobLocDir.getAbsolutePath(), 
".cached-archives");
@@ -302,10 +303,10 @@ public class HadoopV2JobResourceManager {
     public void cleanupStagingDirectory() {
         try {
             if (stagingDir != null) {
-                assert 
((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
-                    .equals(HadoopClassLoader.nameForJob(locNodeId));
+//                assert 
((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name()
+//                    .equals(HadoopClassLoader.nameForJob(locNodeId));
 
-                FileSystem fs = 
HadoopFileSystemCache.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), 
jobId);
+                FileSystem fs = fsProvider.fileSystem(stagingDir.toUri(), 
ctx.getJobConf());
 
                 fs.delete(stagingDir, true);
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 24293b8..f007038 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -45,6 +45,7 @@ import java.security.*;
 import java.util.*;
 import java.util.concurrent.*;
 
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCache.*;
 import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
 
@@ -55,6 +56,20 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
     /** */
     private static final boolean COMBINE_KEY_GROUPING_SUPPORTED;
 
+    /** Lazy per-user file system cache used by Hadoop tasks. */
+    private static final 
HadoopLazyConcurrentMap<HadoopFileSystemCache.FsCacheKey, FileSystem> fsMap
+        = createHadoopLazyConcurrentMap();
+
+    /**
+     * This method is called with reflection upon Job finish with class loader 
of each task.
+     * This will clean up all the Fs created for specific task.
+     *
+     * @throws IgniteCheckedException On error.
+     */
+    public static void close() throws IgniteCheckedException {
+        fsMap.close();
+    }
+
     /**
      * Check for combiner grouping support (available since Hadoop 2.3).
      */
@@ -92,7 +107,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
     private volatile HadoopTask task;
 
     /** Local node ID */
-    private UUID locNodeId;
+    private final UUID locNodeId;
 
     /** Counters for task. */
     private final HadoopCounters cntrs = new HadoopCountersImpl();
@@ -434,7 +449,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
 
             // Task class loader.
             // We also cache Fs there, all them will be cleared explicitly 
upon the Job end.
-            fs = HadoopFileSystemCache.fileSystemForMrUser(jobDir.toUri(), 
jobConf(), null);
+            fs = HadoopFileSystemCache.fileSystemForMrUser(jobDir.toUri(), 
jobConf(), fsMap);
         }
         catch (IOException e) {
             throw new IgniteCheckedException(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
index 93707fe..f59be19 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop;
 
 import org.apache.hadoop.mapred.*;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
 
 import java.io.*;
 import java.util.*;
@@ -48,7 +49,7 @@ public class HadoopTasksV1Test extends 
HadoopTasksAllVersionsTest {
 
         HadoopJobId jobId = new HadoopJobId(uuid, 0);
 
-        return jobInfo.createJob(uuid, jobId, log);
+        return jobInfo.createJob(HadoopV2Job.class, jobId, log);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
index bd4c343..1570807 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.lib.input.*;
 import org.apache.hadoop.mapreduce.lib.output.*;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
 
 import java.util.*;
 
@@ -66,7 +67,7 @@ public class HadoopTasksV2Test extends 
HadoopTasksAllVersionsTest {
 
         HadoopJobId jobId = new HadoopJobId(uuid, 0);
 
-        return jobInfo.createJob(uuid, jobId, log);
+        return jobInfo.createJob(HadoopV2Job.class, jobId, log);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
index a9fd2c9..b8f62e6 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
@@ -72,7 +72,7 @@ public class HadoopV2JobSelfTest extends 
HadoopAbstractSelfTest {
 
         HadoopJobId id = new HadoopJobId(uuid, 1);
 
-        HadoopJob job = info.createJob(uuid, id, log);
+        HadoopJob job = info.createJob(HadoopV2Job.class, id, log);
 
         HadoopTaskContext taskCtx = job.getTaskContext(new 
HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0,
             null));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
index 12b6611..3231134 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
@@ -136,7 +136,8 @@ public abstract class HadoopAbstractMapTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public HadoopJob createJob(UUID localNodeId, HadoopJobId 
jobId, IgniteLogger log) throws IgniteCheckedException {
+        @Override public HadoopJob createJob(Class<? extends HadoopJob> 
jobCls0,
+                HadoopJobId jobId, IgniteLogger log) throws 
IgniteCheckedException {
             assert false;
 
             return null;


Reply via email to