Repository: incubator-ignite Updated Branches: refs/heads/ignite-494b 856738f5f -> 2340f5f26
[IGNITE-494]: corrected after review. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2340f5f2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2340f5f2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2340f5f2 Branch: refs/heads/ignite-494b Commit: 2340f5f26d095e79a5a9baa0e4297037706e052c Parents: 856738f Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Fri Mar 27 15:43:48 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Fri Mar 27 15:43:48 2015 +0300 ---------------------------------------------------------------------- .../processors/hadoop/HadoopClassLoader.java | 26 +++---- .../processors/hadoop/v2/HadoopDaemon.java | 78 +++++++++++--------- .../processors/hadoop/v2/HadoopV2Job.java | 63 +++++++++------- 3 files changed, 91 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2340f5f2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java index 819bf42..cc011ae 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.v2.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import org.jsr166.*; import org.objectweb.asm.*; @@ -43,8 +44,8 @@ public class HadoopClassLoader extends URLClassLoader { registerAsParallelCapable(); } - /** Diagnostic instance counter. */ - private static final AtomicLong instances = new AtomicLong(0); + /** Name of the Hadoop daemon class. */ + public static final String HADOOP_DAEMON_CLASS_NAME = "org.apache.hadoop.util.Daemon"; /** */ private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)HadoopClassLoader.class.getClassLoader(); @@ -63,6 +64,7 @@ public class HadoopClassLoader extends URLClassLoader { /** Diagnostic name of this class loader. */ private final String name; + /** * @param urls Urls. */ @@ -72,9 +74,6 @@ public class HadoopClassLoader extends URLClassLoader { assert !(getParent() instanceof HadoopClassLoader); this.name = name; - - long numInstances = instances.incrementAndGet(); - //U.debug("++: " + numInstances); } /** @@ -106,7 +105,9 @@ public class HadoopClassLoader extends URLClassLoader { return loadFromBytes(name, HadoopShutdownHookManager.class.getName()); else if (name.endsWith(".util.NativeCodeLoader")) return loadFromBytes(name, HadoopNativeCodeLoader.class.getName()); - else if (name.equals("org.apache.hadoop.util.Daemon")) + else if (name.equals(HADOOP_DAEMON_CLASS_NAME)) + // We replace this in order to be able to forcibly stop some daemon threads + // that otherwise never stop (e.g. PeerCache runnables): return loadFromBytes(name, HadoopDaemon.class.getName()); return loadClassExplicitly(name, resolve); @@ -562,17 +563,8 @@ public class HadoopClassLoader extends URLClassLoader { } } - @Override protected void finalize() throws Throwable { - super.finalize(); - long numInstances = instances.decrementAndGet(); - -// Field f = ClassLoader.class.getDeclaredField("classes"); -// f.setAccessible(true); -// Vector v = (Vector)f.get(this); -// U.debug("--: " + numInstances + ", classes = " + v.size()); - } - + /** {@inheritDoc} */ @Override public String toString() { - return super.toString() + ": " + name; + return S.toString(HadoopClassLoader.class, this); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2340f5f2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java index 1daa458..b7b7af4 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java @@ -17,9 +17,6 @@ package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.ignite.internal.util.typedef.internal.*; - import java.util.*; import java.util.concurrent.*; @@ -31,8 +28,11 @@ import java.util.concurrent.*; * * A thread that has called {@link Thread#setDaemon(boolean) } with true. */ public class HadoopDaemon extends Thread { - /** queue to hold the threads to be stopped. */ - private static final Queue<HadoopDaemon> daemonQueue = new ConcurrentLinkedQueue<>(); + /** Lock object used for synchronization. */ + private static final Object lock = new Object(); + + /** Queue to hold the threads to be stopped. */ + private static Queue<HadoopDaemon> daemonQueue = new ConcurrentLinkedQueue<>(); { setDaemon(true); // always a daemon @@ -44,23 +44,31 @@ public class HadoopDaemon extends Thread { /** Construct a daemon thread. */ public HadoopDaemon() { super(); + runnable = this; + enqueueIfNeeded(); } /** Construct a daemon thread. */ public HadoopDaemon(Runnable runnable) { super(runnable); + this.runnable = runnable; + this.setName(runnable.toString()); + enqueueIfNeeded(); } /** Construct a daemon thread to be part of a specified thread group. */ public HadoopDaemon(ThreadGroup group, Runnable runnable) { super(group, runnable); + this.runnable = runnable; + this.setName(runnable.toString()); + enqueueIfNeeded(); } @@ -79,6 +87,7 @@ public class HadoopDaemon extends Thread { */ private static boolean isPeerCacheRunnable(Runnable r) { String name = r.getClass().getName(); + return name.startsWith("org.apache.hadoop.hdfs.PeerCache"); } @@ -89,7 +98,9 @@ public class HadoopDaemon extends Thread { */ private boolean belongsToThisClassLoader(Runnable x) { ClassLoader cl = x.getClass().getClassLoader(); + ClassLoader thisCl = getClass().getClassLoader(); + return cl == thisCl; } @@ -97,48 +108,49 @@ public class HadoopDaemon extends Thread { * Enqueues this thread if it should be stopped upon the task end. */ private void enqueueIfNeeded() { - if (isPeerCacheRunnable(runnable) - && belongsToThisClassLoader(runnable)) - daemonQueue.add(this); + synchronized (lock) { + Queue<HadoopDaemon> queue = daemonQueue; + + if (queue == null) + throw new RuntimeException("Failed to create HadoopDaemon: its registry is already cleared. " + + "ClassLoader: " + getClass().getClassLoader()); + + if (isPeerCacheRunnable(runnable) && belongsToThisClassLoader(runnable)) + queue.add(this); + } } /** * Stops all the enqueued threads. */ public static void dequeueAndStopAll() { - HadoopDaemon daemon; + final Queue<HadoopDaemon> queue; - while (true) { - daemon = daemonQueue.poll(); + synchronized (lock) { + queue = daemonQueue; - if (daemon == null) - break; + if (queue == null) + return; // any subsequent call to dequeueAndStopAll() is ignored. - stopImpl(daemon); - } + daemonQueue = null; - assert daemonQueue.isEmpty(); - } + HadoopDaemon daemon; - /** - * Stops the given thread by interrupting and joining it. - * @param t the thread to stop. - */ - private static void stopImpl(Thread t) { - int attempts = 0; + while (true) { + daemon = queue.poll(); - while (t.isAlive() && attempts <= 5) { - t.interrupt(); + if (daemon == null) + break; // end of queue - try { - t.join(500L); - } - catch (InterruptedException ie) { - U.debug(ie.toString()); + // NB: we don't join the thread assuming one interrupt() is quite enough. + // This is true for PeerCache Runnable implementation, though may not be true + // for other threads: + daemon.interrupt(); } - - attempts++; } - //U.debug("Daemon thread " + t + " stopped: " + !t.isAlive() + " in " + attempts + " attempt(s)."); + + assert daemonQueue == null; + + assert queue.isEmpty(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2340f5f2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index 73143bd..8a61c6a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -46,22 +46,22 @@ import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; */ public class HadoopV2Job implements HadoopJob { /** */ - private JobConf jobConf; + private final JobConf jobConf; /** */ - private JobContextImpl jobCtx; + private final JobContextImpl jobCtx; /** Hadoop job ID. */ - private HadoopJobId jobId; + private final HadoopJobId jobId; /** Job info. */ - protected HadoopJobInfo jobInfo; + protected final HadoopJobInfo jobInfo; /** */ private final JobID hadoopJobID; /** */ - private HadoopV2JobResourceManager rsrcMgr; + private final HadoopV2JobResourceManager rsrcMgr; /** */ private final ConcurrentMap<T2<HadoopTaskType, Integer>, GridFutureAdapter<HadoopTaskContext>> ctxs = @@ -70,8 +70,13 @@ public class HadoopV2Job implements HadoopJob { /** Pooling task context class and thus class loading environment. */ private final Queue<Class<?>> taskCtxClsPool = new ConcurrentLinkedQueue<>(); + /** Will store all the created context classes there. Potentially all the + * contexts are pooled in taskCtxClsPool, but upon some ubnormal conditions + * some of them may not be returned to the pool.*/ + private final Queue<Class<?>> fullCtxClsQueue = new ConcurrentLinkedDeque<>(); + /** Local node ID */ - private UUID locNodeId; + private volatile UUID locNodeId; /** Serialized JobConf. */ private volatile byte[] jobConfData; @@ -196,10 +201,14 @@ public class HadoopV2Job implements HadoopJob { try { if (cls == null) { // If there is no pooled class, then load new one. + // Note that the classloader identified by the task it was created for, + // but later it may be reused for other tasks. HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(), - "hadoop-" + info.jobId() + "-" + info.taskNumber()); + "hadoop-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber()); cls = ldr.loadClass(HadoopV2TaskContext.class.getName()); + + fullCtxClsQueue.add(cls); } Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, HadoopJob.class, @@ -255,24 +264,23 @@ public class HadoopV2Job implements HadoopJob { if (jobLocDir.exists()) U.delete(jobLocDir); } + + // Stop the daemon threads that have been created + // with the task class loaders: + while (true) { + Class<?> c = fullCtxClsQueue.poll(); + + if (c == null) + break; + + clearDaemons(c); + } } finally { - dispose0(); - } - } + taskCtxClsPool.clear(); - /** - * Zero some references that may hold some objects reachable. - */ - private void dispose0() { - jobConf = null; - jobCtx = null; - jobId = null; - jobInfo = null; - rsrcMgr = null; - ctxs.clear(); - taskCtxClsPool.clear(); - jobConfData = null; + fullCtxClsQueue.clear(); + } } /** {@inheritDoc} */ @@ -284,17 +292,20 @@ public class HadoopV2Job implements HadoopJob { @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { HadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), info.taskNumber())).get(); - taskCtxClsPool.offer(ctx.getClass()); + taskCtxClsPool.add(ctx.getClass()); File locDir = taskLocalDir(locNodeId, info); if (locDir.exists()) U.delete(locDir); + } - // Since the task is finished, we should stop the daemon threads that have been created - // with this class loader: + /** Clear the daemon threads that belong to the classloader of the given class. */ + private void clearDaemons(Class<?> clazz) throws IgniteCheckedException { try { - Class<?> c = ctx.getClass().getClassLoader().loadClass("org.apache.hadoop.util.Daemon"); + Class<?> c = clazz.getClassLoader().loadClass(HadoopClassLoader.HADOOP_DAEMON_CLASS_NAME); + // Of coarse, org.apache.hadoop.util.Daemon does not have method dequeueAndStopAll, + // but we replaced this class with another class HadoopDaemon, which does: Method m = c.getMethod("dequeueAndStopAll"); m.invoke(null); }