Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-494b 856738f5f -> 2340f5f26


[IGNITE-494]: corrected after review.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2340f5f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2340f5f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2340f5f2

Branch: refs/heads/ignite-494b
Commit: 2340f5f26d095e79a5a9baa0e4297037706e052c
Parents: 856738f
Author: iveselovskiy <iveselovs...@gridgain.com>
Authored: Fri Mar 27 15:43:48 2015 +0300
Committer: iveselovskiy <iveselovs...@gridgain.com>
Committed: Fri Mar 27 15:43:48 2015 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopClassLoader.java    | 26 +++----
 .../processors/hadoop/v2/HadoopDaemon.java      | 78 +++++++++++---------
 .../processors/hadoop/v2/HadoopV2Job.java       | 63 +++++++++-------
 3 files changed, 91 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2340f5f2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
index 819bf42..cc011ae 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.hadoop;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.v2.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 import org.objectweb.asm.*;
@@ -43,8 +44,8 @@ public class HadoopClassLoader extends URLClassLoader {
         registerAsParallelCapable();
     }
 
-    /** Diagnostic instance counter. */
-    private static final AtomicLong instances = new AtomicLong(0);
+    /** Name of the Hadoop daemon class. */
+    public static final String HADOOP_DAEMON_CLASS_NAME = 
"org.apache.hadoop.util.Daemon";
 
     /** */
     private static final URLClassLoader APP_CLS_LDR = 
(URLClassLoader)HadoopClassLoader.class.getClassLoader();
@@ -63,6 +64,7 @@ public class HadoopClassLoader extends URLClassLoader {
 
     /** Diagnostic name of this class loader. */
     private final String name;
+
     /**
      * @param urls Urls.
      */
@@ -72,9 +74,6 @@ public class HadoopClassLoader extends URLClassLoader {
         assert !(getParent() instanceof HadoopClassLoader);
 
         this.name = name;
-
-        long numInstances = instances.incrementAndGet();
-        //U.debug("++: " + numInstances);
     }
 
     /**
@@ -106,7 +105,9 @@ public class HadoopClassLoader extends URLClassLoader {
                     return loadFromBytes(name, 
HadoopShutdownHookManager.class.getName());
                 else if (name.endsWith(".util.NativeCodeLoader"))
                     return loadFromBytes(name, 
HadoopNativeCodeLoader.class.getName());
-                else if (name.equals("org.apache.hadoop.util.Daemon"))
+                else if (name.equals(HADOOP_DAEMON_CLASS_NAME))
+                    // We replace this in order to be able to forcibly stop 
some daemon threads
+                    // that otherwise never stop (e.g. PeerCache runnables):
                     return loadFromBytes(name, HadoopDaemon.class.getName());
 
                 return loadClassExplicitly(name, resolve);
@@ -562,17 +563,8 @@ public class HadoopClassLoader extends URLClassLoader {
         }
     }
 
-    @Override protected void finalize() throws Throwable {
-        super.finalize();
-        long numInstances = instances.decrementAndGet();
-
-//        Field f = ClassLoader.class.getDeclaredField("classes");
-//        f.setAccessible(true);
-//        Vector v = (Vector)f.get(this);
-//        U.debug("--: " + numInstances + ", classes = " + v.size());
-    }
-
+    /** {@inheritDoc} */
     @Override public String toString() {
-        return super.toString() + ": " + name;
+        return S.toString(HadoopClassLoader.class, this);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2340f5f2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
index 1daa458..b7b7af4 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
@@ -17,9 +17,6 @@
 
 package org.apache.ignite.internal.processors.hadoop.v2;
 
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -31,8 +28,11 @@ import java.util.concurrent.*;
  *
  * A thread that has called {@link Thread#setDaemon(boolean) } with true. */
 public class HadoopDaemon extends Thread {
-    /** queue to hold the threads to be stopped. */
-    private static final Queue<HadoopDaemon> daemonQueue = new 
ConcurrentLinkedQueue<>();
+    /** Lock object used for synchronization. */
+    private static final Object lock = new Object();
+
+    /** Queue to hold the threads to be stopped. */
+    private static Queue<HadoopDaemon> daemonQueue = new 
ConcurrentLinkedQueue<>();
 
     {
         setDaemon(true); // always a daemon
@@ -44,23 +44,31 @@ public class HadoopDaemon extends Thread {
     /** Construct a daemon thread. */
     public HadoopDaemon() {
         super();
+
         runnable = this;
+
         enqueueIfNeeded();
     }
 
     /** Construct a daemon thread. */
     public HadoopDaemon(Runnable runnable) {
         super(runnable);
+
         this.runnable = runnable;
+
         this.setName(runnable.toString());
+
         enqueueIfNeeded();
     }
 
     /** Construct a daemon thread to be part of a specified thread group. */
     public HadoopDaemon(ThreadGroup group, Runnable runnable) {
         super(group, runnable);
+
         this.runnable = runnable;
+
         this.setName(runnable.toString());
+
         enqueueIfNeeded();
     }
 
@@ -79,6 +87,7 @@ public class HadoopDaemon extends Thread {
      */
     private static boolean isPeerCacheRunnable(Runnable r) {
         String name = r.getClass().getName();
+
         return name.startsWith("org.apache.hadoop.hdfs.PeerCache");
     }
 
@@ -89,7 +98,9 @@ public class HadoopDaemon extends Thread {
      */
     private boolean belongsToThisClassLoader(Runnable x) {
         ClassLoader cl = x.getClass().getClassLoader();
+
         ClassLoader thisCl = getClass().getClassLoader();
+
         return cl == thisCl;
     }
 
@@ -97,48 +108,49 @@ public class HadoopDaemon extends Thread {
      * Enqueues this thread if it should be stopped upon the task end.
      */
     private void enqueueIfNeeded() {
-        if (isPeerCacheRunnable(runnable)
-                && belongsToThisClassLoader(runnable))
-            daemonQueue.add(this);
+        synchronized (lock) {
+            Queue<HadoopDaemon> queue = daemonQueue;
+
+            if (queue == null)
+                throw new RuntimeException("Failed to create HadoopDaemon: its 
registry is already cleared. " +
+                    "ClassLoader: " + getClass().getClassLoader());
+
+            if (isPeerCacheRunnable(runnable) && 
belongsToThisClassLoader(runnable))
+                queue.add(this);
+        }
     }
 
     /**
      * Stops all the enqueued threads.
      */
     public static void dequeueAndStopAll() {
-        HadoopDaemon daemon;
+        final Queue<HadoopDaemon> queue;
 
-        while (true) {
-            daemon = daemonQueue.poll();
+        synchronized (lock) {
+            queue = daemonQueue;
 
-            if (daemon == null)
-                break;
+            if (queue == null)
+                return; // any subsequent call to dequeueAndStopAll() is 
ignored.
 
-            stopImpl(daemon);
-        }
+            daemonQueue = null;
 
-        assert daemonQueue.isEmpty();
-    }
+            HadoopDaemon daemon;
 
-    /**
-     * Stops the given thread by interrupting and joining it.
-     * @param t the thread to stop.
-     */
-    private static void stopImpl(Thread t) {
-        int attempts = 0;
+            while (true) {
+                daemon = queue.poll();
 
-        while (t.isAlive() && attempts <= 5) {
-            t.interrupt();
+                if (daemon == null)
+                    break; // end of queue
 
-            try {
-                t.join(500L);
-            }
-            catch (InterruptedException ie) {
-                U.debug(ie.toString());
+                // NB: we don't join the thread assuming one interrupt() is 
quite enough.
+                // This is true for PeerCache Runnable implementation, though 
may not be true
+                // for other threads:
+                daemon.interrupt();
             }
-
-            attempts++;
         }
-        //U.debug("Daemon thread " + t + " stopped: " + !t.isAlive() + " in " 
+ attempts + " attempt(s).");
+
+        assert daemonQueue == null;
+
+        assert queue.isEmpty();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2340f5f2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index 73143bd..8a61c6a 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -46,22 +46,22 @@ import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
  */
 public class HadoopV2Job implements HadoopJob {
     /** */
-    private JobConf jobConf;
+    private final JobConf jobConf;
 
     /** */
-    private JobContextImpl jobCtx;
+    private final JobContextImpl jobCtx;
 
     /** Hadoop job ID. */
-    private HadoopJobId jobId;
+    private final HadoopJobId jobId;
 
     /** Job info. */
-    protected HadoopJobInfo jobInfo;
+    protected final HadoopJobInfo jobInfo;
 
     /** */
     private final JobID hadoopJobID;
 
     /** */
-    private HadoopV2JobResourceManager rsrcMgr;
+    private final HadoopV2JobResourceManager rsrcMgr;
 
     /** */
     private final ConcurrentMap<T2<HadoopTaskType, Integer>, 
GridFutureAdapter<HadoopTaskContext>> ctxs =
@@ -70,8 +70,13 @@ public class HadoopV2Job implements HadoopJob {
     /** Pooling task context class and thus class loading environment. */
     private final Queue<Class<?>> taskCtxClsPool = new 
ConcurrentLinkedQueue<>();
 
+    /** Will store all the created context classes there. Potentially all the
+     * contexts are pooled in taskCtxClsPool, but upon some ubnormal conditions
+     * some of them may not be returned to the pool.*/
+    private final Queue<Class<?>> fullCtxClsQueue = new 
ConcurrentLinkedDeque<>();
+
     /** Local node ID */
-    private UUID locNodeId;
+    private volatile UUID locNodeId;
 
     /** Serialized JobConf. */
     private volatile byte[] jobConfData;
@@ -196,10 +201,14 @@ public class HadoopV2Job implements HadoopJob {
         try {
             if (cls == null) {
                 // If there is no pooled class, then load new one.
+                // Note that the classloader identified by the task it was 
created for,
+                // but later it may be reused for other tasks.
                 HadoopClassLoader ldr = new 
HadoopClassLoader(rsrcMgr.classPath(),
-                    "hadoop-" + info.jobId() + "-" + info.taskNumber());
+                    "hadoop-" + info.jobId() + "-" + info.type() + "-" + 
info.taskNumber());
 
                 cls = ldr.loadClass(HadoopV2TaskContext.class.getName());
+
+                fullCtxClsQueue.add(cls);
             }
 
             Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, 
HadoopJob.class,
@@ -255,24 +264,23 @@ public class HadoopV2Job implements HadoopJob {
                 if (jobLocDir.exists())
                     U.delete(jobLocDir);
             }
+
+            // Stop the daemon threads that have been created
+            // with the task class loaders:
+            while (true) {
+                Class<?> c = fullCtxClsQueue.poll();
+
+                if (c == null)
+                    break;
+
+                clearDaemons(c);
+            }
         }
         finally {
-            dispose0();
-        }
-    }
+            taskCtxClsPool.clear();
 
-    /**
-     * Zero some references that may hold some objects reachable.
-     */
-    private void dispose0() {
-        jobConf = null;
-        jobCtx = null;
-        jobId = null;
-        jobInfo = null;
-        rsrcMgr = null;
-        ctxs.clear();
-        taskCtxClsPool.clear();
-        jobConfData = null;
+            fullCtxClsQueue.clear();
+        }
     }
 
     /** {@inheritDoc} */
@@ -284,17 +292,20 @@ public class HadoopV2Job implements HadoopJob {
     @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws 
IgniteCheckedException {
         HadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), 
info.taskNumber())).get();
 
-        taskCtxClsPool.offer(ctx.getClass());
+        taskCtxClsPool.add(ctx.getClass());
 
         File locDir = taskLocalDir(locNodeId, info);
 
         if (locDir.exists())
             U.delete(locDir);
+    }
 
-        // Since the task is finished, we should stop the daemon threads that 
have been created
-        // with this class loader:
+    /** Clear the daemon threads that belong to the classloader of the given 
class. */
+    private void clearDaemons(Class<?> clazz) throws IgniteCheckedException {
         try {
-            Class<?> c = 
ctx.getClass().getClassLoader().loadClass("org.apache.hadoop.util.Daemon");
+            Class<?> c = 
clazz.getClassLoader().loadClass(HadoopClassLoader.HADOOP_DAEMON_CLASS_NAME);
+            // Of coarse, org.apache.hadoop.util.Daemon does not have method 
dequeueAndStopAll,
+            // but we replaced this class with another class HadoopDaemon, 
which does:
             Method m = c.getMethod("dequeueAndStopAll");
             m.invoke(null);
         }

Reply via email to