# IG-980: implemenmted 1 job class loader per node variant.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5285b72c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5285b72c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5285b72c Branch: refs/heads/ignite-980 Commit: 5285b72c3341eebb555a3562d0395459b654fe94 Parents: acc2cb9 Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Fri Jun 19 21:30:23 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Fri Jun 19 21:30:23 2015 +0300 ---------------------------------------------------------------------- .../processors/hadoop/HadoopJobInfo.java | 3 +- .../fs/IgniteHadoopFileSystemCounterWriter.java | 3 +- .../processors/hadoop/HadoopDefaultJobInfo.java | 57 +++++++++++---- .../internal/processors/hadoop/HadoopUtils.java | 77 ++++++++------------ .../hadoop/jobtracker/HadoopJobTracker.java | 5 +- .../child/HadoopChildProcessRunner.java | 5 +- .../processors/hadoop/v2/HadoopV2Job.java | 65 ++++++++++++----- .../hadoop/v2/HadoopV2JobResourceManager.java | 7 +- .../hadoop/v2/HadoopV2TaskContext.java | 2 +- .../processors/hadoop/HadoopTasksV1Test.java | 6 +- .../processors/hadoop/HadoopTasksV2Test.java | 6 +- .../processors/hadoop/HadoopV2JobSelfTest.java | 6 +- .../collections/HadoopAbstractMapTest.java | 2 +- 13 files changed, 144 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java index 51faf5d..e676cbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.jetbrains.annotations.*; import java.io.*; +import java.util.*; /** * Compact job description. @@ -60,7 +61,7 @@ public interface HadoopJobInfo extends Serializable { * @return Job. * @throws IgniteCheckedException If failed. */ - HadoopJob createJob(HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException; + public HadoopJob createJob(UUID localNodeId, HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException; /** * @return Number of reducers configured for job. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java index 420cece..597ff8a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -72,8 +72,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter try { hadoopCfg.set(MRJobConfig.USER_NAME, user); - FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, - jobId.toString(), null); + FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, jobId); fs.mkdirs(jobStatPath); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index 2e855d0..9e685ea 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -18,13 +18,17 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; +import org.jsr166.*; import java.io.*; import java.lang.reflect.*; import java.util.*; +import java.util.concurrent.*; /** * Hadoop job info based on default Hadoop configuration. @@ -49,7 +53,38 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { private String user; /** */ - private static volatile Class<?> jobCls; + private static class CloseableClass implements Closeable { + private final Class<?> clazz; + + CloseableClass(Class<?> c) { + clazz = c; + } + + /** {@inheritDoc} */ + @Override public void close() { + // Noop + } + + public Class<?> clazz() { + return clazz; + } + } + + /** */ + private static final HadoopLazyConcurrentMap<UUID, CloseableClass> hadoopV2JobClasses = + new HadoopLazyConcurrentMap<>(new HadoopLazyConcurrentMap.ValueFactory<UUID, CloseableClass>() { + @Override public CloseableClass createValue(UUID key) { + HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-job-node-" + key); + + try { + Class<?> jobCls = ldr.loadClass(HadoopV2Job.class.getName()); + + return new CloseableClass(jobCls); + } catch (Exception ioe) { + throw new IgniteException(ioe); + } + } + }); /** * Default constructor required by {@link Externalizable}. @@ -82,24 +117,16 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { } /** {@inheritDoc} */ - @Override public HadoopJob createJob(HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { + @Override public HadoopJob createJob(UUID nodeId, HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { try { - Class<?> jobCls0 = jobCls; - - if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes. - synchronized (HadoopDefaultJobInfo.class) { - if ((jobCls0 = jobCls) == null) { - HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-job"); + Class<?> jobCls0 = hadoopV2JobClasses.getOrCreate(nodeId).clazz(); - jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName()); - } - } - } + X.println("#### Creating job: HadoopJob: " + nodeId + ", class = " + jobCls0); - Constructor<?> constructor = jobCls0.getConstructor(HadoopJobId.class, HadoopDefaultJobInfo.class, - IgniteLogger.class); + Constructor<?> constructor = jobCls0.getConstructor(HadoopJobId.class, UUID.class, + HadoopDefaultJobInfo.class, IgniteLogger.class); - return (HadoopJob)constructor.newInstance(jobId, this, log); + return (HadoopJob)constructor.newInstance(jobId, nodeId, this, log); } // NB: java.lang.NoClassDefFoundError may be thrown from Class#getConstructor() call. catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index f50f0b3..2234549 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -33,10 +33,12 @@ import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; +import org.jsr166.*; import java.io.*; import java.net.*; import java.util.*; +import java.util.concurrent.*; /** * Hadoop utility methods. @@ -68,8 +70,8 @@ public class HadoopUtils { = createHadoopLazyConcurrentMap(); /** File system cache for jobs. */ - private static final Map<String, T2<HadoopLazyConcurrentMap<FsCacheKey,FileSystem>, Set<String>>> jobFsMap - = new HashMap<>(16); + private static final ConcurrentMap<HadoopJobId, HadoopLazyConcurrentMap<FsCacheKey,FileSystem>> jobFsMap + = new ConcurrentHashMap8<>(); /** * Creates HadoopLazyConcurrentMap. @@ -414,8 +416,8 @@ public class HadoopUtils { * @return the file system * @throws IOException */ - public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, - @Nullable String jobId, @Nullable String locId) throws IOException { + public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, @Nullable HadoopJobId jobId) + throws IOException { final String usr = getMrHadoopUser(cfg); assert usr != null; @@ -426,7 +428,7 @@ public class HadoopUtils { final FileSystem fs; try { - fs = getWithCaching(uri, cfg, usr, jobId, locId); + fs = getWithCaching(uri, cfg, usr, jobId); } catch (IgniteException ie) { throw new IOException(ie); @@ -545,14 +547,13 @@ public class HadoopUtils { * @param usr The user to create file system for. * @return The file system: either created, or taken from the cache. */ - private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr, - @Nullable String jobId, @Nullable String locId) { + private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr, @Nullable HadoopJobId jobId) { final FsCacheKey key = new FsCacheKey(uri, usr, cfg); if (jobId == null) return fileSysLazyMap.getOrCreate(key); else { - HadoopLazyConcurrentMap<FsCacheKey,FileSystem> lm = getFsMapForJob(jobId, locId); + HadoopLazyConcurrentMap<FsCacheKey,FileSystem> lm = getFsMapForJob(jobId); return lm.getOrCreate(key); } @@ -562,67 +563,47 @@ public class HadoopUtils { * Gets Fs map for given job Id and localNodeId. If local node Id not null, registers this * local node id to track subsequent removal. * @param jobId The job Id. - * @param locId The local node id. * @return File system map. */ - private static synchronized HadoopLazyConcurrentMap<FsCacheKey,FileSystem> getFsMapForJob(String jobId, - @Nullable String locId) { + private static HadoopLazyConcurrentMap<FsCacheKey,FileSystem> getFsMapForJob(final HadoopJobId jobId) { assert jobId != null; - T2<HadoopLazyConcurrentMap<FsCacheKey, FileSystem>, Set<String>> t2 = jobFsMap.get(jobId); + HadoopLazyConcurrentMap<FsCacheKey, FileSystem> lazy = jobFsMap.get(jobId); - if (t2 == null) { - HadoopLazyConcurrentMap<FsCacheKey, FileSystem> newLM = createHadoopLazyConcurrentMap(); + if (lazy != null) + return lazy; - t2 = new T2<>(newLM, (Set<String>)new HashSet<String>()); + HadoopLazyConcurrentMap<FsCacheKey, FileSystem> newLM = createHadoopLazyConcurrentMap(); - T2<HadoopLazyConcurrentMap<FsCacheKey, FileSystem>, Set<String>> pushedT2 = jobFsMap.put(jobId, t2); + HadoopLazyConcurrentMap<FsCacheKey, FileSystem> pushedT2 = jobFsMap.putIfAbsent(jobId, newLM); - assert pushedT2 == null; - } - - if (locId != null) { - // If local node Id is given, register this local Id for this job: - boolean added = t2.get2().add(locId); + if (pushedT2 == null) + lazy = newLM; + else { + lazy = pushedT2; - // new locId appears in HadoopV2Job#initialize(), no job with the same locId should be registered: - assert added; + try { + newLM.close(); + } catch (IgniteCheckedException ice) { + throw new IgniteException(ice); + } } - return t2.get1(); + return lazy; } /** * Closes file system map for this job Id and local node id. * @param jobId The job id. - * @param locId The local node id. * @throws IgniteCheckedException */ - public static synchronized void close(final String jobId, final String locId) throws IgniteCheckedException { + public static synchronized void close(final HadoopJobId jobId) throws IgniteCheckedException { assert jobId != null; - assert locId != null; - - final T2<HadoopLazyConcurrentMap<FsCacheKey, FileSystem>, Set<String>> t2 = jobFsMap.get(jobId); - - if (t2 != null) { - Set<String> locIds = t2.get2(); - - boolean rm = locIds.remove(locId); - assert rm; + final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> lazy = jobFsMap.remove(jobId); - final int usageCnt = locIds.size(); - - assert usageCnt >= 0 : "negative usage count " + usageCnt + ", map: " + t2.get1(); - - if (usageCnt == 0) { - T2<HadoopLazyConcurrentMap<FsCacheKey, FileSystem>, Set<String>> rmT2 = jobFsMap.remove(jobId); - - assert rmT2 == t2; - - t2.get1().close(); - } - } + if (lazy != null) + lazy.close(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java index 2f07817..b7377d4 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -224,6 +224,7 @@ public class HadoopJobTracker extends HadoopComponent { // Fail all pending futures. for (GridFutureAdapter<HadoopJobId> fut : activeFinishFuts.values()) fut.onDone(new IgniteCheckedException("Failed to execute Hadoop map-reduce job (grid is stopping).")); + } /** @@ -986,7 +987,9 @@ public class HadoopJobTracker extends HadoopComponent { jobInfo = meta.jobInfo(); } - job = jobInfo.createJob(jobId, log); + UUID nodeId = ctx.localNodeId(); + + job = jobInfo.createJob(nodeId, jobId, log); job.initialize(false, ctx.localNodeId()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java index 040552a..cfa393e 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.util.offheap.unsafe.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -115,7 +116,9 @@ public class HadoopChildProcessRunner { assert job == null; - job = req.jobInfo().createJob(req.jobId(), log); + UUID nodeId = nodeDesc.parentNodeId(); + + job = req.jobInfo().createJob(nodeId, req.jobId(), log); job.initialize(true, nodeDesc.processId()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index f48db98..6e957df 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -74,7 +74,7 @@ public class HadoopV2Job implements HadoopJob { private final Queue<Class<?>> fullCtxClsQueue = new ConcurrentLinkedDeque<>(); /** Local node ID */ - private volatile UUID locNodeId; + private final UUID locNodeId; /** Serialized JobConf. */ private volatile byte[] jobConfData; @@ -87,12 +87,18 @@ public class HadoopV2Job implements HadoopJob { * @param jobInfo Job info. * @param log Logger. */ - public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log) { + public HadoopV2Job(HadoopJobId jobId, UUID locNodeId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log) { assert jobId != null; + assert locNodeId != null; assert jobInfo != null; this.jobId = jobId; this.jobInfo = jobInfo; + this.locNodeId = locNodeId; + + // TODO: debug: + assert getClass().getClassLoader() instanceof HadoopClassLoader; + assert getClass().getClassLoader().toString().contains(locNodeId.toString()); hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId()); @@ -138,7 +144,7 @@ public class HadoopV2Job implements HadoopJob { Path jobDir = new Path(jobDirPath); try { - FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, jobId.toString(), null); + FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, jobId); JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf, jobDir); @@ -248,17 +254,15 @@ public class HadoopV2Job implements HadoopJob { /** {@inheritDoc} */ @Override public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException { assert locNodeId != null; - assert this.locNodeId == null; - - this.locNodeId = locNodeId; + assert this.locNodeId.equals(locNodeId); Thread.currentThread().setContextClassLoader(jobConf.getClassLoader()); - try { - HadoopUtils.fileSystemForMrUser(null, jobConf, jobId.toString(), this.locNodeId.toString()); - } catch (IOException ioe) { - throw new IgniteCheckedException(ioe); - } +// try { +// HadoopUtils.fileSystemForMrUser(null, jobConf); +// } catch (IOException ioe) { +// throw new IgniteCheckedException(ioe); +// } try { rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(locNodeId, jobId)); @@ -273,6 +277,8 @@ public class HadoopV2Job implements HadoopJob { @Override public void dispose(boolean external) throws IgniteCheckedException { boolean dsp = disposed.compareAndSet(false, true); + X.println("###### Dispose: " + locNodeId + ", ldr = " + getClass().getClassLoader()); + if (!dsp) return; @@ -300,11 +306,8 @@ public class HadoopV2Job implements HadoopJob { try { final ClassLoader ldr = cls.getClassLoader(); - Class<?> daemonCls = ldr.loadClass(HadoopClassLoader.HADOOP_DAEMON_CLASS_NAME); - - Method m = daemonCls.getMethod("dequeueAndStopAll"); - - m.invoke(null); + // Stop Hadoop daemons for this *task*: + stopHadoopFsDaemons(ldr); // Also close all the FileSystems cached in // HadoopLazyConcurrentMap for this *task* class loader: @@ -322,25 +325,47 @@ public class HadoopV2Job implements HadoopJob { assert fullCtxClsQueue.isEmpty(); // Close all cached Fs for this Job: - HadoopUtils.close(jobId.toString(), locNodeId.toString()); + //HadoopUtils.close(jobId.toString(), locNodeId.toString()); + //closeCachedFileSystems(getClass().getClassLoader()); + HadoopUtils.close(jobId); - int i = 0; - while (i++ < 5) - System.gc(); + invokeGc(); if (err != null) throw U.cast(err); } } + // TODO: remove + private void invokeGc() { + int i = 0; + + while (i++ < 5) + System.gc(); + } + /** {@inheritDoc} */ @Override protected void finalize() throws Throwable { super.finalize(); + // TODO: remove dispose(false); } /** + * Stops Hadoop Fs daemon threads. + * @param ldr The task ClassLoader to stop the daemons for. + * @throws Exception On error. + */ + private void stopHadoopFsDaemons(ClassLoader ldr) throws Exception { + Class<?> daemonCls = ldr.loadClass(HadoopClassLoader.HADOOP_DAEMON_CLASS_NAME); + + Method m = daemonCls.getMethod("dequeueAndStopAll"); + + m.invoke(null); + } + + /** * Closes all the file systems user by task * @param ldr The task class loader. * @throws Exception On error. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index 7d13fe9..b86f16d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -115,7 +115,7 @@ public class HadoopV2JobResourceManager { stagingDir = new Path(new URI(mrDir)); if (download) { - FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg, jobId.toString(), null); + FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg, jobId); if (!fs.exists(stagingDir)) throw new IgniteCheckedException("Failed to find map-reduce submission " + @@ -210,7 +210,7 @@ public class HadoopV2JobResourceManager { FileSystem dstFs = FileSystem.getLocal(cfg); - FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, jobId.toString(), null); + FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, jobId); if (extract) { File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives"); @@ -293,8 +293,7 @@ public class HadoopV2JobResourceManager { public void cleanupStagingDirectory() { try { if (stagingDir != null) { - FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), - jobId.toString(), null); + FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), jobId); fs.delete(stagingDir, true); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index 3452678..7012566 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -428,7 +428,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { try { // Task class loader. // We also cache Fs there, all them will be cleared explicitly upon the Job end. - fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), null, null); + fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), null); } catch (IOException e) { throw new IgniteCheckedException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java index 48e83cc..93707fe 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java @@ -44,9 +44,11 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf); - HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0); + UUID uuid = new UUID(0, 0); - return jobInfo.createJob(jobId, log); + HadoopJobId jobId = new HadoopJobId(uuid, 0); + + return jobInfo.createJob(uuid, jobId, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java index e73fae3..bd4c343 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java @@ -62,9 +62,11 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration()); - HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0); + UUID uuid = new UUID(0, 0); - return jobInfo.createJob(jobId, log); + HadoopJobId jobId = new HadoopJobId(uuid, 0); + + return jobInfo.createJob(uuid, jobId, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java index f3b9307..a9fd2c9 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java @@ -68,9 +68,11 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest { HadoopDefaultJobInfo info = createJobInfo(cfg); - HadoopJobId id = new HadoopJobId(UUID.randomUUID(), 1); + final UUID uuid = UUID.randomUUID(); - HadoopJob job = info.createJob(id, log); + HadoopJobId id = new HadoopJobId(uuid, 1); + + HadoopJob job = info.createJob(uuid, id, log); HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0, null)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java index 9395c5e..12b6611 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java @@ -136,7 +136,7 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public HadoopJob createJob(HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { + @Override public HadoopJob createJob(UUID localNodeId, HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { assert false; return null;