Repository: incubator-ignite Updated Branches: refs/heads/ignite-980 776b6c8c5 -> f27987549
# 980: avoided static class map. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f2798754 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f2798754 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f2798754 Branch: refs/heads/ignite-980 Commit: f2798754985ead00ffae90db6df2d237260bd975 Parents: 776b6c8 Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Mon Jun 22 22:09:33 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Mon Jun 22 22:09:33 2015 +0300 ---------------------------------------------------------------------- .../processors/hadoop/HadoopJobInfo.java | 3 +- .../hadoop/counter/HadoopCounterWriter.java | 5 +- .../fs/IgniteHadoopFileSystemCounterWriter.java | 9 +- .../processors/hadoop/HadoopDefaultJobInfo.java | 48 +------- .../hadoop/fs/HadoopFileSystemCache.java | 117 +++---------------- .../hadoop/jobtracker/HadoopJobTracker.java | 26 ++++- .../child/HadoopChildProcessRunner.java | 6 +- .../processors/hadoop/v2/HadoopV2Job.java | 63 ++++++---- .../hadoop/v2/HadoopV2JobResourceManager.java | 27 ++--- .../hadoop/v2/HadoopV2TaskContext.java | 19 ++- .../processors/hadoop/HadoopTasksV1Test.java | 3 +- .../processors/hadoop/HadoopTasksV2Test.java | 3 +- .../processors/hadoop/HadoopV2JobSelfTest.java | 2 +- .../collections/HadoopAbstractMapTest.java | 3 +- 14 files changed, 126 insertions(+), 208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java index e676cbd..5876262 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java @@ -61,7 +61,8 @@ public interface HadoopJobInfo extends Serializable { * @return Job. * @throws IgniteCheckedException If failed. */ - public HadoopJob createJob(UUID localNodeId, HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException; + public HadoopJob createJob(Class<? extends HadoopJob> jobCls0, + HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException; /** * @return Number of reducers configured for job. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java index ce67c57..f21a1e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java @@ -28,10 +28,9 @@ public interface HadoopCounterWriter { /** * Writes counters of given job to some statistics storage. * - * @param jobInfo Job info. - * @param jobId Job id. + * @param job The job. * @param cntrs Counters. * @throws IgniteCheckedException If failed. */ - public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs) throws IgniteCheckedException; + public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java index 0ba4da4..b9232c9 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -25,7 +25,7 @@ import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.fs.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; @@ -49,11 +49,14 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO; /** {@inheritDoc} */ - @Override public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs) + @Override public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException { Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration(); + final HadoopJobInfo jobInfo = job.info(); + final HadoopJobId jobId = job.id(); + for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet()) hadoopCfg.set(e.getKey(), e.getValue()); @@ -73,7 +76,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter try { hadoopCfg.set(MRJobConfig.USER_NAME, user); - FileSystem fs = HadoopFileSystemCache.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, jobId); + FileSystem fs = ((HadoopV2Job)job).fileSystem(jobStatPath.toUri(), hadoopCfg); fs.mkdirs(jobStatPath); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index a31ada5..a5f9913 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -18,17 +18,12 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.fs.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; -import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; -import org.jsr166.*; import java.io.*; import java.lang.reflect.*; import java.util.*; -import java.util.concurrent.*; /** * Hadoop job info based on default Hadoop configuration. @@ -52,40 +47,6 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { /** User name. */ private String user; - /** */ - private static class CloseableClass implements Closeable { - private final Class<?> clazz; - - CloseableClass(Class<?> c) { - clazz = c; - } - - /** {@inheritDoc} */ - @Override public void close() { - // Noop - } - - public Class<?> clazz() { - return clazz; - } - } - - /** */ - private static final HadoopLazyConcurrentMap<UUID, CloseableClass> hadoopV2JobClasses = - new HadoopLazyConcurrentMap<>(new HadoopLazyConcurrentMap.ValueFactory<UUID, CloseableClass>() { - @Override public CloseableClass createValue(UUID key) { - HadoopClassLoader ldr = new HadoopClassLoader(null, HadoopClassLoader.nameForJob(key)); - - try { - Class<?> jobCls = ldr.loadClass(HadoopV2Job.class.getName()); - - return new CloseableClass(jobCls); - } catch (Exception ioe) { - throw new IgniteException(ioe); - } - } - }); - /** * Default constructor required by {@link Externalizable}. */ @@ -117,14 +78,13 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { } /** {@inheritDoc} */ - @Override public HadoopJob createJob(UUID nodeId, HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { + @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls0, + HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { try { - Class<?> jobCls0 = hadoopV2JobClasses.getOrCreate(nodeId).clazz(); - - Constructor<?> constructor = jobCls0.getConstructor(HadoopJobId.class, UUID.class, + Constructor<? extends HadoopJob> constructor = jobCls0.getConstructor(HadoopJobId.class, HadoopDefaultJobInfo.class, IgniteLogger.class); - return (HadoopJob)constructor.newInstance(jobId, nodeId, this, log); + return constructor.newInstance(jobId, this, log); } // NB: java.lang.NoClassDefFoundError may be thrown from Class#getConstructor() call. catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java index cac248b..96b32db 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java @@ -22,34 +22,23 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.mapreduce.*; import org.apache.ignite.*; import org.apache.ignite.hadoop.fs.v1.*; -import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.jetbrains.annotations.*; -import org.jsr166.*; import java.io.*; import java.net.*; -import java.util.concurrent.*; /** * Static caches of file systems used by Map-Reduce tasks and jobs. * This class */ public class HadoopFileSystemCache { - /** Lazy per-user file system cache used by Hadoop tasks. */ - private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> taskFsMap - = createHadoopLazyConcurrentMap(); - - /** File system cache for used by Hadoop jobs. */ - private static final ConcurrentMap<HadoopJobId, HadoopLazyConcurrentMap<FsCacheKey,FileSystem>> jobFsMap - = new ConcurrentHashMap8<>(); - /** * Creates HadoopLazyConcurrentMap. * @return a new HadoopLazyConcurrentMap. */ - private static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() { + public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() { return new HadoopLazyConcurrentMap<>( new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { @Override public FileSystem createValue(FsCacheKey key) { @@ -96,22 +85,20 @@ public class HadoopFileSystemCache { * Common method to get the V1 file system in MapRed engine. * It gets the filesystem for the user specified in the * configuration with {@link MRJobConfig#USER_NAME} property. - * The file systems are created and cached upon first request. - * - * <p/> The behavior of this method relies upon class loader structure of map-red engine. - * In particular, file system for a job must be requested by Job {@link HadoopClassLoader} specific - * for local node id (grid instance). The file system for a task must be requested by Task {@link HadoopClassLoader} - * specific for that task or reused from another task of the same job. + * The file systems are created and cached in the given map upon first request. * - * @param uri the file system uri. - * @param cfg the configuration. - * @param jobId The job id, if file system is requested for a job, or null if the file system is requested for - * a task. + * @param uri The file system uri. + * @param cfg The configuration. + * @param map The caching map. * @return The file system. * @throws IOException On error. */ - public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, @Nullable HadoopJobId jobId) + public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, + HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map) throws IOException { + assert map != null; + assert cfg != null; + final String usr = getMrHadoopUser(cfg); assert usr != null; @@ -122,7 +109,9 @@ public class HadoopFileSystemCache { final FileSystem fs; try { - fs = getWithCaching(uri, cfg, usr, jobId); + final FsCacheKey key = new FsCacheKey(uri, usr, cfg); + + fs = map.getOrCreate(key); } catch (IgniteException ie) { throw new IOException(ie); @@ -135,78 +124,6 @@ public class HadoopFileSystemCache { } /** - * Gets FileSystem caching it in static Ignite cache. The cache is a singleton - * for each class loader. - * - * <p/>Note that the file systems in the cache are keyed by a triplet {scheme, authority, user}. - * The Configuration is not a part of the key. This means that for the given key file system is - * initialized only once with the Configuration passed in upon the file system creation. - * - * @param uri The file system URI. - * @param cfg The configuration. - * @param usr The user to create file system for. - * @return The file system: either created, or taken from the cache. - */ - private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr, @Nullable HadoopJobId jobId) { - final FsCacheKey key = new FsCacheKey(uri, usr, cfg); - - if (jobId == null) - return taskFsMap.getOrCreate(key); - else { - HadoopLazyConcurrentMap<FsCacheKey,FileSystem> lm = getFsMapForJob(jobId); - - return lm.getOrCreate(key); - } - } - - /** - * Gets Fs map for given job Id and localNodeId. If local node Id not null, registers this - * local node id to track subsequent removal. - * @param jobId The job Id. - * @return File system map. - */ - private static HadoopLazyConcurrentMap<FsCacheKey,FileSystem> getFsMapForJob(final HadoopJobId jobId) { - assert jobId != null; - - HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map = jobFsMap.get(jobId); - - if (map != null) - return map; - - HadoopLazyConcurrentMap<FsCacheKey, FileSystem> newLM = createHadoopLazyConcurrentMap(); - - HadoopLazyConcurrentMap<FsCacheKey, FileSystem> pushedT2 = jobFsMap.putIfAbsent(jobId, newLM); - - if (pushedT2 == null) - map = newLM; - else { - map = pushedT2; - - try { - newLM.close(); - } catch (IgniteCheckedException ice) { - throw new IgniteException(ice); - } - } - - return map; - } - - /** - * Closes file system map for this job Id and local node id. - * @param jobId The job id. - * @throws IgniteCheckedException - */ - public static synchronized void close(final HadoopJobId jobId) throws IgniteCheckedException { - assert jobId != null; - - final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map = jobFsMap.remove(jobId); - - if (map != null) - map.close(); - } - - /** * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3). * @param uri0 The uri. * @param cfg The cfg. @@ -230,14 +147,6 @@ public class HadoopFileSystemCache { } /** - * This method is called with reflection upon Job finish. This will clean up all the Fs created for tasks. - * @throws IgniteCheckedException - */ - public static void close() throws IgniteCheckedException { - taskFsMap.close(); - } - - /** * Note that configuration is not a part of the key. * It is used solely to initialize the first instance * that is created for the key. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java index b7377d4..ae8e107 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; @@ -82,6 +83,9 @@ public class HadoopJobTracker extends HadoopComponent { /** Component busy lock. */ private GridSpinReadWriteLock busyLock; + /** Class to create HadoopJob instances from. */ + private volatile Class<? extends HadoopJob> jobCls; + /** Closure to check result of async transform of system cache. */ private final IgniteInClosure<IgniteInternalFuture<?>> failsLog = new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> gridFut) { @@ -95,12 +99,25 @@ public class HadoopJobTracker extends HadoopComponent { }; /** {@inheritDoc} */ - @Override public void start(HadoopContext ctx) throws IgniteCheckedException { + @SuppressWarnings("unchecked") + @Override public void start(final HadoopContext ctx) throws IgniteCheckedException { super.start(ctx); busyLock = new GridSpinReadWriteLock(); evtProcSvc = Executors.newFixedThreadPool(1); + + UUID nodeId = ctx.localNodeId(); + + assert jobCls == null; + + HadoopClassLoader ldr = new HadoopClassLoader(null, HadoopClassLoader.nameForJob(nodeId)); + + try { + jobCls = (Class<HadoopV2Job>)ldr.loadClass(HadoopV2Job.class.getName()); + } catch (Exception ioe) { + throw new IgniteException(ioe); + } } /** @@ -224,7 +241,6 @@ public class HadoopJobTracker extends HadoopComponent { // Fail all pending futures. for (GridFutureAdapter<HadoopJobId> fut : activeFinishFuts.values()) fut.onDone(new IgniteCheckedException("Failed to execute Hadoop map-reduce job (grid is stopping).")); - } /** @@ -839,7 +855,7 @@ public class HadoopJobTracker extends HadoopComponent { HadoopCounters cntrs = meta.counters(); - writer.write(job.info(), jobId, cntrs); + writer.write(job, cntrs); } } catch (Exception e) { @@ -987,9 +1003,7 @@ public class HadoopJobTracker extends HadoopComponent { jobInfo = meta.jobInfo(); } - UUID nodeId = ctx.localNodeId(); - - job = jobInfo.createJob(nodeId, jobId, log); + job = jobInfo.createJob(jobCls, jobId, log); job.initialize(false, ctx.localNodeId()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java index cfa393e..b0b0b8c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java @@ -25,13 +25,13 @@ import org.apache.ignite.internal.processors.hadoop.shuffle.*; import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.offheap.unsafe.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; -import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -116,9 +116,7 @@ public class HadoopChildProcessRunner { assert job == null; - UUID nodeId = nodeDesc.parentNodeId(); - - job = req.jobInfo().createJob(nodeId, req.jobId(), log); + job = req.jobInfo().createJob(HadoopV2Job.class, req.jobId(), log); job.initialize(true, nodeDesc.processId()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index 7e70865..319640d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.hadoop.v2; +import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; @@ -30,16 +31,19 @@ import org.apache.ignite.internal.processors.hadoop.v1.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; import org.jsr166.*; import java.io.*; import java.lang.reflect.*; +import java.net.*; import java.util.*; import java.util.Queue; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCache.*; /** * Hadoop job implementation for v2 API. @@ -71,14 +75,18 @@ public class HadoopV2Job implements HadoopJob { private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = new ConcurrentLinkedQueue<>(); /** All created contexts. */ - private final Queue<Class<?>> fullCtxClsQueue = new ConcurrentLinkedDeque<>(); + private final Queue<Class<? extends HadoopTaskContext>> fullCtxClsQueue = new ConcurrentLinkedDeque<>(); /** Local node ID */ - private final UUID locNodeId; + private volatile UUID locNodeId; /** Serialized JobConf. */ private volatile byte[] jobConfData; + /** File system cache map. */ + private final HadoopLazyConcurrentMap<HadoopFileSystemCache.FsCacheKey, FileSystem> fsMap + = createHadoopLazyConcurrentMap(); + /** Disposal guard. */ private final AtomicBoolean disposed = new AtomicBoolean(); @@ -87,16 +95,16 @@ public class HadoopV2Job implements HadoopJob { * @param jobInfo Job info. * @param log Logger. */ - public HadoopV2Job(HadoopJobId jobId, UUID locNodeId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log) { + public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log) { assert jobId != null; - assert locNodeId != null; + //assert locNodeId != null; assert jobInfo != null; this.jobId = jobId; this.jobInfo = jobInfo; - this.locNodeId = locNodeId; + //this.locNodeId = locNodeId; - assert ((HadoopClassLoader)getClass().getClassLoader()).name().equals(HadoopClassLoader.nameForJob(locNodeId)); + //assert ((HadoopClassLoader)getClass().getClassLoader()).name().equals(HadoopClassLoader.nameForJob(locNodeId)); hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId()); @@ -111,7 +119,7 @@ public class HadoopV2Job implements HadoopJob { jobCtx = new JobContextImpl(jobConf, hadoopJobID); - rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log, locNodeId); + rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log, this); } /** {@inheritDoc} */ @@ -142,10 +150,7 @@ public class HadoopV2Job implements HadoopJob { Path jobDir = new Path(jobDirPath); try { - assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name() - .equals(HadoopClassLoader.nameForJob(locNodeId)); - - FileSystem fs = HadoopFileSystemCache.fileSystemForMrUser(jobDir.toUri(), jobConf, jobId); + FileSystem fs = fileSystem(jobDir.toUri(), jobConf); JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf, jobDir); @@ -191,6 +196,7 @@ public class HadoopV2Job implements HadoopJob { } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException { T2<HadoopTaskType, Integer> locTaskId = new T2<>(info.type(), info.taskNumber()); @@ -255,15 +261,13 @@ public class HadoopV2Job implements HadoopJob { /** {@inheritDoc} */ @Override public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException { assert locNodeId != null; - assert this.locNodeId.equals(locNodeId); - Thread.currentThread().setContextClassLoader(jobConf.getClassLoader()); + this.locNodeId = locNodeId; -// try { -// HadoopUtils.fileSystemForMrUser(null, jobConf); -// } catch (IOException ioe) { -// throw new IgniteCheckedException(ioe); -// } + assert ((HadoopClassLoader)getClass().getClassLoader()).name() + .equals(HadoopClassLoader.nameForJob(this.locNodeId)); + + Thread.currentThread().setContextClassLoader(jobConf.getClassLoader()); try { rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(locNodeId, jobId)); @@ -297,7 +301,7 @@ public class HadoopV2Job implements HadoopJob { // Stop the daemon threads that have been created // with the task class loaders: while (true) { - Class<?> cls = fullCtxClsQueue.poll(); + Class<? extends HadoopTaskContext> cls = fullCtxClsQueue.poll(); if (cls == null) break; @@ -310,7 +314,7 @@ public class HadoopV2Job implements HadoopJob { // Also close all the FileSystems cached in // HadoopLazyConcurrentMap for this *task* class loader: - closeCachedFileSystems(ldr); + closeCachedTaskFileSystems(ldr); } catch (Throwable e) { if (err == null) @@ -323,8 +327,8 @@ public class HadoopV2Job implements HadoopJob { assert fullCtxClsQueue.isEmpty(); - // Close all cached file systems for this Job: - HadoopFileSystemCache.close(jobId); + // Close all cached file systems for this *Job*: + fsMap.close(); if (err != null) throw U.cast(err); @@ -349,8 +353,8 @@ public class HadoopV2Job implements HadoopJob { * @param ldr The task class loader. * @throws Exception On error. */ - private void closeCachedFileSystems(ClassLoader ldr) throws Exception { - Class<?> clazz = ldr.loadClass(HadoopFileSystemCache.class.getName()); + private void closeCachedTaskFileSystems(ClassLoader ldr) throws Exception { + Class<?> clazz = ldr.loadClass(HadoopV2TaskContext.class.getName()); Method m = clazz.getMethod("close"); @@ -386,4 +390,15 @@ public class HadoopV2Job implements HadoopJob { public JobConf jobConf() { return jobConf; } + + /** + * Gets file system for this job. + * @param uri The uri. + * @param cfg The configuration. + * @return The file system. + * @throws IOException On error. + */ + public FileSystem fileSystem(@Nullable URI uri, Configuration cfg) throws IOException { + return HadoopFileSystemCache.fileSystemForMrUser(uri, cfg, fsMap); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index 05d61fc..55a31c6 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -39,7 +39,7 @@ import java.util.*; * Provides all resources are needed to the job execution. Downloads the main jar, the configuration and additional * files are needed to be placed on local files system. */ -public class HadoopV2JobResourceManager { +class HadoopV2JobResourceManager { /** File type Fs disable caching property name. */ private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = HadoopFileSystemsUtils.disableFsCachePropertyName("file"); @@ -61,7 +61,8 @@ public class HadoopV2JobResourceManager { /** Staging directory to delivery job jar and config to the work nodes. */ private Path stagingDir; - private final UUID locNodeId; + /** TODO */ + private final HadoopV2Job fsProvider; /** * Creates new instance. @@ -69,11 +70,11 @@ public class HadoopV2JobResourceManager { * @param ctx Hadoop job context. * @param log Logger. */ - public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, IgniteLogger log, UUID locNodeId) { + public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, IgniteLogger log, HadoopV2Job fsProvider) { this.jobId = jobId; this.ctx = ctx; this.log = log.getLogger(HadoopV2JobResourceManager.class); - this.locNodeId = locNodeId; + this.fsProvider = fsProvider; } /** @@ -118,10 +119,10 @@ public class HadoopV2JobResourceManager { stagingDir = new Path(new URI(mrDir)); if (download) { - assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name() - .equals(HadoopClassLoader.nameForJob(locNodeId)); +// assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name() +// .equals(HadoopClassLoader.nameForJob(locNodeId)); - FileSystem fs = HadoopFileSystemCache.fileSystemForMrUser(stagingDir.toUri(), cfg, jobId); + FileSystem fs = fsProvider.fileSystem(stagingDir.toUri(), cfg); if (!fs.exists(stagingDir)) throw new IgniteCheckedException("Failed to find map-reduce submission " + @@ -216,10 +217,10 @@ public class HadoopV2JobResourceManager { FileSystem dstFs = FileSystem.getLocal(cfg); - assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name() - .equals(HadoopClassLoader.nameForJob(locNodeId)); +// assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name() +// .equals(HadoopClassLoader.nameForJob(locNodeId)); - FileSystem srcFs = HadoopFileSystemCache.fileSystemForMrUser(srcPath.toUri(), cfg, jobId); + FileSystem srcFs = fsProvider.fileSystem(srcPath.toUri(), cfg); if (extract) { File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives"); @@ -302,10 +303,10 @@ public class HadoopV2JobResourceManager { public void cleanupStagingDirectory() { try { if (stagingDir != null) { - assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name() - .equals(HadoopClassLoader.nameForJob(locNodeId)); +// assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name() +// .equals(HadoopClassLoader.nameForJob(locNodeId)); - FileSystem fs = HadoopFileSystemCache.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), jobId); + FileSystem fs = fsProvider.fileSystem(stagingDir.toUri(), ctx.getJobConf()); fs.delete(stagingDir, true); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index 24293b8..f007038 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -45,6 +45,7 @@ import java.security.*; import java.util.*; import java.util.concurrent.*; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCache.*; import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*; import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; @@ -55,6 +56,20 @@ public class HadoopV2TaskContext extends HadoopTaskContext { /** */ private static final boolean COMBINE_KEY_GROUPING_SUPPORTED; + /** Lazy per-user file system cache used by Hadoop tasks. */ + private static final HadoopLazyConcurrentMap<HadoopFileSystemCache.FsCacheKey, FileSystem> fsMap + = createHadoopLazyConcurrentMap(); + + /** + * This method is called with reflection upon Job finish with class loader of each task. + * This will clean up all the Fs created for specific task. + * + * @throws IgniteCheckedException On error. + */ + public static void close() throws IgniteCheckedException { + fsMap.close(); + } + /** * Check for combiner grouping support (available since Hadoop 2.3). */ @@ -92,7 +107,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { private volatile HadoopTask task; /** Local node ID */ - private UUID locNodeId; + private final UUID locNodeId; /** Counters for task. */ private final HadoopCounters cntrs = new HadoopCountersImpl(); @@ -434,7 +449,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { // Task class loader. // We also cache Fs there, all them will be cleared explicitly upon the Job end. - fs = HadoopFileSystemCache.fileSystemForMrUser(jobDir.toUri(), jobConf(), null); + fs = HadoopFileSystemCache.fileSystemForMrUser(jobDir.toUri(), jobConf(), fsMap); } catch (IOException e) { throw new IgniteCheckedException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java index 93707fe..f59be19 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.mapred.*; import org.apache.ignite.internal.processors.hadoop.examples.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; import java.io.*; import java.util.*; @@ -48,7 +49,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { HadoopJobId jobId = new HadoopJobId(uuid, 0); - return jobInfo.createJob(uuid, jobId, log); + return jobInfo.createJob(HadoopV2Job.class, jobId, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java index bd4c343..1570807 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java @@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.ignite.internal.processors.hadoop.examples.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; import java.util.*; @@ -66,7 +67,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { HadoopJobId jobId = new HadoopJobId(uuid, 0); - return jobInfo.createJob(uuid, jobId, log); + return jobInfo.createJob(HadoopV2Job.class, jobId, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java index a9fd2c9..b8f62e6 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java @@ -72,7 +72,7 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest { HadoopJobId id = new HadoopJobId(uuid, 1); - HadoopJob job = info.createJob(uuid, id, log); + HadoopJob job = info.createJob(HadoopV2Job.class, id, log); HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0, null)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2798754/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java index 12b6611..3231134 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java @@ -136,7 +136,8 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public HadoopJob createJob(UUID localNodeId, HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { + @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls0, + HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { assert false; return null;