Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-494 [created] b6d0bed9f


[IGNITE-494]: workable fix with PeerCache thread stopping logic. (draft)


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/867ef8d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/867ef8d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/867ef8d0

Branch: refs/heads/ignite-494
Commit: 867ef8d03b884a3568127004263a48ff356d59e4
Parents: d578683
Author: iveselovskiy <[email protected]>
Authored: Thu Mar 26 12:35:46 2015 +0300
Committer: iveselovskiy <[email protected]>
Committed: Thu Mar 26 12:35:46 2015 +0300

----------------------------------------------------------------------
 .../util/CallableValuedThreadLocal.java         | 27 +++++++
 .../ignite/internal/util/ValuedThreadLocal.java | 21 +++++
 .../processors/hadoop/HadoopClassLoader.java    | 28 ++++++-
 .../processors/hadoop/HadoopDefaultJobInfo.java |  2 +-
 .../hadoop/jobtracker/HadoopJobTracker.java     |  4 +-
 .../processors/hadoop/v2/HadoopV2Job.java       | 82 +++++++++++++++++---
 .../hadoop/v2/HadoopV2TaskContext.java          | 11 +++
 .../apache/ignite/igfs/IgfsEventsTestSuite.java |  4 +-
 .../hadoop/HadoopClassLoaderTest.java           |  2 +-
 .../testsuites/IgniteHadoopTestSuite.java       |  2 +-
 .../IgniteIgfsLinuxAndMacOSTestSuite.java       |  2 +-
 11 files changed, 166 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/core/src/main/java/org/apache/ignite/internal/util/CallableValuedThreadLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/CallableValuedThreadLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/CallableValuedThreadLocal.java
new file mode 100644
index 0000000..c11eafa
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/CallableValuedThreadLocal.java
@@ -0,0 +1,27 @@
+//package org.apache.ignite.internal.util;
+//
+//import java.util.concurrent.*;
+//
+///**
+// *
+// */
+//public class CallableValuedThreadLocal <T>  extends ThreadLocal <T> {
+//    private Callable<T> callable;
+//
+//    public CallableValuedThreadLocal(Callable<T> callable) {
+//        this.callable = callable;
+//    }
+//
+//    @Override public T initialValue() {
+//        try {
+//            return callable == null ? null : callable.call();
+//        } catch (Exception e) {
+//            throw new RuntimeException(e);
+//        }
+//    }
+//
+//    public void dispose() {
+//        callable = null;
+//    }
+//}
+//

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/core/src/main/java/org/apache/ignite/internal/util/ValuedThreadLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ValuedThreadLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ValuedThreadLocal.java
new file mode 100644
index 0000000..fe73c5b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ValuedThreadLocal.java
@@ -0,0 +1,21 @@
+//package org.apache.ignite.internal.util;
+//
+///**
+// * Preferable over anonymous ThreadLocal inner subclass since it does not
+// * hold reference to the parent class.
+// */
+//public class ValuedThreadLocal <T>  extends ThreadLocal <T> {
+//    private T initialValue;
+//
+//    public ValuedThreadLocal(T initialValue) {
+//        this.initialValue = initialValue;
+//    }
+//
+//    @Override public T initialValue() {
+//        return initialValue;
+//    }
+//
+//    public void dispose() {
+//        initialValue = null;
+//    }
+//}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
index 1856e41..102ddab 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
@@ -20,12 +20,14 @@ package org.apache.ignite.internal.processors.hadoop;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.v2.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 import org.objectweb.asm.*;
 import org.objectweb.asm.commons.*;
 
 import java.io.*;
+import java.lang.reflect.*;
 import java.net.*;
 import java.util.*;
 import java.util.concurrent.atomic.*;
@@ -44,6 +46,9 @@ public class HadoopClassLoader extends URLClassLoader {
     }
 
     /** */
+    private static final AtomicLong instances = new AtomicLong(0);
+
+    /** */
     private static final URLClassLoader APP_CLS_LDR = 
(URLClassLoader)HadoopClassLoader.class.getClassLoader();
 
     /** */
@@ -58,13 +63,20 @@ public class HadoopClassLoader extends URLClassLoader {
     /** */
     private static final Map<String, byte[]> bytesCache = new 
ConcurrentHashMap8<>();
 
+    /** Name of this classloader -- for diagnostic purposes. */
+    private final String name;
     /**
      * @param urls Urls.
      */
-    public HadoopClassLoader(URL[] urls) {
+    public HadoopClassLoader(URL[] urls, String name) {
         super(addHadoopUrls(urls), APP_CLS_LDR);
 
         assert !(getParent() instanceof HadoopClassLoader);
+
+        this.name = name;
+
+        long numInstances = instances.incrementAndGet();
+        U.debug("++: " + numInstances);
     }
 
     /**
@@ -549,4 +561,18 @@ public class HadoopClassLoader extends URLClassLoader {
             return hadoopUrls;
         }
     }
+
+    @Override protected void finalize() throws Throwable {
+        super.finalize();
+        long numInstances = instances.decrementAndGet();
+
+        Field f = ClassLoader.class.getDeclaredField("classes");
+        f.setAccessible(true);
+        Vector v = (Vector)f.get(this);
+        U.debug("--: " + numInstances + ", classes = " + v.size());
+    }
+
+    @Override public String toString() {
+        return super.toString() + ": " + name;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index 77eb6d2..76e9a43 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -89,7 +89,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, 
Externalizable {
             if (jobCls0 == null) { // It is enough to have only one class 
loader with only Hadoop classes.
                 synchronized (HadoopDefaultJobInfo.class) {
                     if ((jobCls0 = jobCls) == null) {
-                        HadoopClassLoader ldr = new HadoopClassLoader(null);
+                        HadoopClassLoader ldr = new HadoopClassLoader(null, 
"hadoop-main");
 
                         jobCls = jobCls0 = 
ldr.loadClass(HadoopV2Job.class.getName());
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index 0ca61bc..c2bb5be 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -826,8 +826,6 @@ public class HadoopJobTracker extends HadoopComponent {
 
                 jobs.remove(jobId);
 
-                job.dispose(false);
-
                 if (ctx.jobUpdateLeader()) {
                     ClassLoader ldr = job.getClass().getClassLoader();
 
@@ -849,6 +847,8 @@ public class HadoopJobTracker extends HadoopComponent {
                     }
                 }
 
+                job.dispose(false); // job may get unusable after #dispose()
+
                 break;
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index f2f0cab..6c60538 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -46,13 +46,13 @@ import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
  */
 public class HadoopV2Job implements HadoopJob {
     /** */
-    private final JobConf jobConf;
+    private JobConf jobConf;
 
     /** */
-    private final JobContextImpl jobCtx;
+    private JobContextImpl jobCtx;
 
     /** Hadoop job ID. */
-    private final HadoopJobId jobId;
+    private HadoopJobId jobId;
 
     /** Job info. */
     protected HadoopJobInfo jobInfo;
@@ -61,7 +61,7 @@ public class HadoopV2Job implements HadoopJob {
     private final JobID hadoopJobID;
 
     /** */
-    private final HadoopV2JobResourceManager rsrcMgr;
+    private HadoopV2JobResourceManager rsrcMgr;
 
     /** */
     private final ConcurrentMap<T2<HadoopTaskType, Integer>, 
GridFutureAdapter<HadoopTaskContext>> ctxs =
@@ -196,7 +196,8 @@ public class HadoopV2Job implements HadoopJob {
         try {
             if (cls == null) {
                 // If there is no pooled class, then load new one.
-                HadoopClassLoader ldr = new 
HadoopClassLoader(rsrcMgr.classPath());
+                HadoopClassLoader ldr = new 
HadoopClassLoader(rsrcMgr.classPath(),
+                    "hadoop-" + info.jobId() + "-" + info.taskNumber());
 
                 cls = ldr.loadClass(HadoopV2TaskContext.class.getName());
             }
@@ -247,11 +248,69 @@ public class HadoopV2Job implements HadoopJob {
 
     /** {@inheritDoc} */
     @Override public void dispose(boolean external) throws 
IgniteCheckedException {
-        if (rsrcMgr != null && !external) {
-            File jobLocDir = jobLocalDir(locNodeId, jobId);
+        try {
+            if (rsrcMgr != null && !external) {
+                File jobLocDir = jobLocalDir(locNodeId, jobId);
+
+                if (jobLocDir.exists())
+                    U.delete(jobLocDir);
+            }
+        }
+        finally {
+            // free up the references that may hold classloader:
+            dispose0();
+
+            //stopPeerCacheThreads0();
+        }
+    }
 
-            if (jobLocDir.exists())
-                U.delete(jobLocDir);
+    private void dispose0() {
+        jobConf = null;
+        jobCtx = null;
+        jobId = null;
+        jobInfo = null;
+        rsrcMgr = null;
+        ctxs.clear();
+        taskCtxClsPool.clear();
+        jobConfData = null;
+    }
+
+    private void stopPeerCacheThreads0(ClassLoader taskClassLoader) {
+        // Find out the top thread group:
+        ThreadGroup tg = Thread.currentThread().getThreadGroup();
+        while (true) {
+            ThreadGroup g1 = tg.getParent();
+            if (g1 == null)
+                break;
+            tg = g1;
+        }
+
+        int total = tg.activeCount();
+        // Total is only a rough estimate of the thread number:
+        Thread[] tt = new Thread[(total * 5)/4];
+        tg.enumerate(tt, true);
+
+        for (Thread t: tt) {
+            if (t == null)
+                break;
+
+            if (t.isDaemon()
+                && t.getName().startsWith("org.apache.hadoop.hdfs.PeerCache@")
+                // we should identify if the thread belongs to this Hadoop 
task:
+                && t.getContextClassLoader() == taskClassLoader) {
+                // shutdown the thread:
+                int attempts = 0;
+                while (t.isAlive() && attempts <= 5) {
+                    t.interrupt();
+                    try {
+                        t.join(500L);
+                    } catch (InterruptedException ie) {
+                        U.debug(ie.toString());
+                    }
+                    attempts++;
+                }
+                U.debug("Thread " + t + " stopped: " + !t.isAlive() + " in " + 
attempts + " attempt(s).");
+            }
         }
     }
 
@@ -264,12 +323,15 @@ public class HadoopV2Job implements HadoopJob {
     @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws 
IgniteCheckedException {
         HadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), 
info.taskNumber())).get();
 
-        taskCtxClsPool.offer(ctx.getClass());
+        Class<?> clazz = ctx.getClass();
+        taskCtxClsPool.offer(clazz);
 
         File locDir = taskLocalDir(locNodeId, info);
 
         if (locDir.exists())
             U.delete(locDir);
+
+        stopPeerCacheThreads0(clazz.getClassLoader());
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 24f10a6..11f1a5b 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -252,6 +252,17 @@ public class HadoopV2TaskContext extends HadoopTaskContext 
{
     /** {@inheritDoc} */
     @Override public void cleanupTaskEnvironment() throws 
IgniteCheckedException {
         job().cleanupTaskEnvironment(taskInfo());
+        // TODO: Not sure this is needed:
+//        try {
+//            FileSystem fs = FileSystem.get(jobConf());
+//            fs.close();
+//
+//            LocalFileSystem locFs = FileSystem.getLocal(jobConf());
+//            locFs.close();
+//        }
+//        catch (IOException ioe) {
+//            throw transformException(ioe);
+//        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
index c082fe0..fb21e2d 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
@@ -38,7 +38,7 @@ public class IgfsEventsTestSuite extends TestSuite {
      * @throws Exception Thrown in case of the failure.
      */
     public static TestSuite suite() throws Exception {
-        HadoopClassLoader ldr = new HadoopClassLoader(null);
+        HadoopClassLoader ldr = new HadoopClassLoader(null, "test");
 
         TestSuite suite = new TestSuite("Ignite FS Events Test Suite");
 
@@ -58,7 +58,7 @@ public class IgfsEventsTestSuite extends TestSuite {
      * @throws Exception Thrown in case of the failure.
      */
     public static TestSuite suiteNoarchOnly() throws Exception {
-        HadoopClassLoader ldr = new HadoopClassLoader(null);
+        HadoopClassLoader ldr = new HadoopClassLoader(null, "test");
 
         TestSuite suite = new TestSuite("Ignite IGFS Events Test Suite Noarch 
Only");
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java
index a3289cb..3b55fa5 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.mapreduce.*;
  */
 public class HadoopClassLoaderTest extends TestCase {
     /** */
-    HadoopClassLoader ldr = new HadoopClassLoader(null);
+    HadoopClassLoader ldr = new HadoopClassLoader(null, "test");
 
     /**
      * @throws Exception If failed.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 985dbb2..183087c 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -48,7 +48,7 @@ public class IgniteHadoopTestSuite extends TestSuite {
         downloadHadoop();
         downloadHive();
 
-        HadoopClassLoader ldr = new HadoopClassLoader(null);
+        HadoopClassLoader ldr = new HadoopClassLoader(null, "test");
 
         TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite");
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java
index 87233fc..8982d83 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java
@@ -36,7 +36,7 @@ public class IgniteIgfsLinuxAndMacOSTestSuite extends 
TestSuite {
     public static TestSuite suite() throws Exception {
         downloadHadoop();
 
-        HadoopClassLoader ldr = new HadoopClassLoader(null);
+        HadoopClassLoader ldr = new HadoopClassLoader(null, "test");
 
         TestSuite suite = new TestSuite("Ignite IGFS Test Suite For Linux And 
Mac OS");
 

Reply via email to