# IGNITE-494: Applied patch from Ivan for permgen leak.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5657b41a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5657b41a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5657b41a

Branch: refs/heads/ignite-424
Commit: 5657b41a55054d26c53beb1322207dc7a10e402b
Parents: 2230fe7
Author: vozerov-gridgain <voze...@gridgain.com>
Authored: Fri Mar 27 18:22:35 2015 +0300
Committer: vozerov-gridgain <voze...@gridgain.com>
Committed: Fri Mar 27 18:22:35 2015 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopClassLoader.java    |  21 +++-
 .../processors/hadoop/HadoopDefaultJobInfo.java |   2 +-
 .../hadoop/jobtracker/HadoopJobTracker.java     |   4 +-
 .../processors/hadoop/v2/HadoopDaemon.java      | 125 +++++++++++++++++++
 .../processors/hadoop/v2/HadoopV2Job.java       |  58 +++++++--
 .../apache/ignite/igfs/IgfsEventsTestSuite.java |   4 +-
 .../hadoop/HadoopClassLoaderTest.java           |   2 +-
 .../testsuites/IgniteHadoopTestSuite.java       |   2 +-
 .../IgniteIgfsLinuxAndMacOSTestSuite.java       |   2 +-
 9 files changed, 203 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
index 38c683f..eb98ff9 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.hadoop;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.v2.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 import org.objectweb.asm.*;
@@ -43,6 +44,9 @@ public class HadoopClassLoader extends URLClassLoader {
         registerAsParallelCapable();
     }
 
+    /** Name of the Hadoop daemon class. */
+    public static final String HADOOP_DAEMON_CLASS_NAME = 
"org.apache.hadoop.util.Daemon";
+
     /** */
     private static final URLClassLoader APP_CLS_LDR = 
(URLClassLoader)HadoopClassLoader.class.getClassLoader();
 
@@ -58,13 +62,19 @@ public class HadoopClassLoader extends URLClassLoader {
     /** */
     private static final Map<String, byte[]> bytesCache = new 
ConcurrentHashMap8<>();
 
+    /** Diagnostic name of this class loader. */
+    @SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"})
+    private final String name;
+
     /**
      * @param urls Urls.
      */
-    public HadoopClassLoader(URL[] urls) {
+    public HadoopClassLoader(URL[] urls, String name) {
         super(addHadoopUrls(urls), APP_CLS_LDR);
 
         assert !(getParent() instanceof HadoopClassLoader);
+
+        this.name = name;
     }
 
     /**
@@ -96,6 +106,10 @@ public class HadoopClassLoader extends URLClassLoader {
                     return loadFromBytes(name, 
HadoopShutdownHookManager.class.getName());
                 else if (name.endsWith(".util.NativeCodeLoader"))
                     return loadFromBytes(name, 
HadoopNativeCodeLoader.class.getName());
+                else if (name.equals(HADOOP_DAEMON_CLASS_NAME))
+                    // We replace this in order to be able to forcibly stop 
some daemon threads
+                    // that otherwise never stop (e.g. PeerCache runnables):
+                    return loadFromBytes(name, HadoopDaemon.class.getName());
 
                 return loadClassExplicitly(name, resolve);
             }
@@ -549,4 +563,9 @@ public class HadoopClassLoader extends URLClassLoader {
             return hadoopUrls;
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopClassLoader.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index 77eb6d2..76e9a43 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -89,7 +89,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, 
Externalizable {
             if (jobCls0 == null) { // It is enough to have only one class 
loader with only Hadoop classes.
                 synchronized (HadoopDefaultJobInfo.class) {
                     if ((jobCls0 = jobCls) == null) {
-                        HadoopClassLoader ldr = new HadoopClassLoader(null);
+                        HadoopClassLoader ldr = new HadoopClassLoader(null, 
"hadoop-main");
 
                         jobCls = jobCls0 = 
ldr.loadClass(HadoopV2Job.class.getName());
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index 9e959a2..8ec4aed 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -826,8 +826,6 @@ public class HadoopJobTracker extends HadoopComponent {
 
                 jobs.remove(jobId);
 
-                job.dispose(false);
-
                 if (ctx.jobUpdateLeader()) {
                     ClassLoader ldr = job.getClass().getClassLoader();
 
@@ -849,6 +847,8 @@ public class HadoopJobTracker extends HadoopComponent {
                     }
                 }
 
+                job.dispose(false);
+
                 break;
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
new file mode 100644
index 0000000..5a8ac64
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import java.util.*;
+
+/**
+ * Replacement for Hadoop {@code org.apache.hadoop.util.Daemon} class.
+ */
+@SuppressWarnings("UnusedDeclaration")
+public class HadoopDaemon extends Thread {
+    /** Lock object used for synchronization. */
+    private static final Object lock = new Object();
+
+    /** Collection to hold the threads to be stopped. */
+    private static Collection<HadoopDaemon> daemons = new LinkedList<>();
+
+    {
+        setDaemon(true); // always a daemon
+    }
+
+    /** Runnable of this thread, may be this. */
+    final Runnable runnable;
+
+    /**
+     * Construct a daemon thread.
+     */
+    public HadoopDaemon() {
+        super();
+
+        runnable = this;
+
+        enqueueIfNeeded();
+    }
+
+    /**
+     * Construct a daemon thread.
+     */
+    public HadoopDaemon(Runnable runnable) {
+        super(runnable);
+
+        this.runnable = runnable;
+
+        this.setName(runnable.toString());
+
+        enqueueIfNeeded();
+    }
+
+    /**
+     * Construct a daemon thread to be part of a specified thread group.
+     */
+    public HadoopDaemon(ThreadGroup grp, Runnable runnable) {
+        super(grp, runnable);
+
+        this.runnable = runnable;
+
+        this.setName(runnable.toString());
+
+        enqueueIfNeeded();
+    }
+
+    /**
+     * Getter for the runnable. May return this.
+     *
+     * @return the runnable
+     */
+    public Runnable getRunnable() {
+        return runnable;
+    }
+
+    /**
+     * if the runnable is a Hadoop org.apache.hadoop.hdfs.PeerCache Runnable.
+     *
+     * @param r the runnable.
+     * @return true if it is.
+     */
+    private static boolean isPeerCacheRunnable(Runnable r) {
+        String name = r.getClass().getName();
+
+        return name.startsWith("org.apache.hadoop.hdfs.PeerCache");
+    }
+
+    /**
+     * Enqueue this thread if it should be stopped upon the task end.
+     */
+    private void enqueueIfNeeded() {
+        synchronized (lock) {
+            if (daemons == null)
+                throw new RuntimeException("Failed to create HadoopDaemon (its 
registry is already cleared): " +
+                    "[classLoader=" + getClass().getClassLoader() + ']');
+
+            if (runnable.getClass().getClassLoader() == 
getClass().getClassLoader() && isPeerCacheRunnable(runnable))
+                daemons.add(this);
+        }
+    }
+
+    /**
+     * Stops all the registered threads.
+     */
+    public static void dequeueAndStopAll() {
+        synchronized (lock) {
+            if (daemons != null) {
+                for (HadoopDaemon daemon : daemons)
+                    daemon.interrupt();
+
+                daemons = null;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index f35c2f8..e3c2bfa 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -55,7 +55,7 @@ public class HadoopV2Job implements HadoopJob {
     private final HadoopJobId jobId;
 
     /** Job info. */
-    protected HadoopJobInfo jobInfo;
+    protected final HadoopJobInfo jobInfo;
 
     /** */
     private final JobID hadoopJobID;
@@ -70,8 +70,11 @@ public class HadoopV2Job implements HadoopJob {
     /** Pooling task context class and thus class loading environment. */
     private final Queue<Class<?>> taskCtxClsPool = new 
ConcurrentLinkedQueue<>();
 
+    /** All created contexts. */
+    private final Queue<Class<?>> fullCtxClsQueue = new 
ConcurrentLinkedDeque<>();
+
     /** Local node ID */
-    private UUID locNodeId;
+    private volatile UUID locNodeId;
 
     /** Serialized JobConf. */
     private volatile byte[] jobConfData;
@@ -196,9 +199,14 @@ public class HadoopV2Job implements HadoopJob {
         try {
             if (cls == null) {
                 // If there is no pooled class, then load new one.
-                HadoopClassLoader ldr = new 
HadoopClassLoader(rsrcMgr.classPath());
+                // Note that the classloader identified by the task it was 
initially created for,
+                // but later it may be reused for other tasks.
+                HadoopClassLoader ldr = new 
HadoopClassLoader(rsrcMgr.classPath(),
+                    "hadoop-" + info.jobId() + "-" + info.type() + "-" + 
info.taskNumber());
 
                 cls = ldr.loadClass(HadoopV2TaskContext.class.getName());
+
+                fullCtxClsQueue.add(cls);
             }
 
             Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, 
HadoopJob.class,
@@ -246,12 +254,46 @@ public class HadoopV2Job implements HadoopJob {
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("ThrowFromFinallyBlock")
     @Override public void dispose(boolean external) throws 
IgniteCheckedException {
-        if (rsrcMgr != null && !external) {
-            File jobLocDir = jobLocalDir(locNodeId, jobId);
+        try {
+            if (rsrcMgr != null && !external) {
+                File jobLocDir = jobLocalDir(locNodeId, jobId);
+
+                if (jobLocDir.exists())
+                    U.delete(jobLocDir);
+            }
+        }
+        finally {
+            taskCtxClsPool.clear();
+
+            Throwable err = null;
+
+            // Stop the daemon threads that have been created
+            // with the task class loaders:
+            while (true) {
+                Class<?> cls = fullCtxClsQueue.poll();
+
+                if (cls == null)
+                    break;
+
+                try {
+                    Class<?> daemonCls = 
cls.getClassLoader().loadClass(HadoopClassLoader.HADOOP_DAEMON_CLASS_NAME);
+
+                    Method m = daemonCls.getMethod("dequeueAndStopAll");
+
+                    m.invoke(null);
+                }
+                catch (Throwable e) {
+                    if (err == null)
+                        err = e;
+                }
+            }
+
+            assert fullCtxClsQueue.isEmpty();
 
-            if (jobLocDir.exists())
-                U.delete(jobLocDir);
+            if (err != null)
+                throw U.cast(err);
         }
     }
 
@@ -264,7 +306,7 @@ public class HadoopV2Job implements HadoopJob {
     @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws 
IgniteCheckedException {
         HadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), 
info.taskNumber())).get();
 
-        taskCtxClsPool.offer(ctx.getClass());
+        taskCtxClsPool.add(ctx.getClass());
 
         File locDir = taskLocalDir(locNodeId, info);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
index c082fe0..fb21e2d 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
@@ -38,7 +38,7 @@ public class IgfsEventsTestSuite extends TestSuite {
      * @throws Exception Thrown in case of the failure.
      */
     public static TestSuite suite() throws Exception {
-        HadoopClassLoader ldr = new HadoopClassLoader(null);
+        HadoopClassLoader ldr = new HadoopClassLoader(null, "test");
 
         TestSuite suite = new TestSuite("Ignite FS Events Test Suite");
 
@@ -58,7 +58,7 @@ public class IgfsEventsTestSuite extends TestSuite {
      * @throws Exception Thrown in case of the failure.
      */
     public static TestSuite suiteNoarchOnly() throws Exception {
-        HadoopClassLoader ldr = new HadoopClassLoader(null);
+        HadoopClassLoader ldr = new HadoopClassLoader(null, "test");
 
         TestSuite suite = new TestSuite("Ignite IGFS Events Test Suite Noarch 
Only");
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java
index a3289cb..3b55fa5 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.mapreduce.*;
  */
 public class HadoopClassLoaderTest extends TestCase {
     /** */
-    HadoopClassLoader ldr = new HadoopClassLoader(null);
+    HadoopClassLoader ldr = new HadoopClassLoader(null, "test");
 
     /**
      * @throws Exception If failed.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 985dbb2..183087c 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -48,7 +48,7 @@ public class IgniteHadoopTestSuite extends TestSuite {
         downloadHadoop();
         downloadHive();
 
-        HadoopClassLoader ldr = new HadoopClassLoader(null);
+        HadoopClassLoader ldr = new HadoopClassLoader(null, "test");
 
         TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite");
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java
index 87233fc..8982d83 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java
@@ -36,7 +36,7 @@ public class IgniteIgfsLinuxAndMacOSTestSuite extends 
TestSuite {
     public static TestSuite suite() throws Exception {
         downloadHadoop();
 
-        HadoopClassLoader ldr = new HadoopClassLoader(null);
+        HadoopClassLoader ldr = new HadoopClassLoader(null, "test");
 
         TestSuite suite = new TestSuite("Ignite IGFS Test Suite For Linux And 
Mac OS");
 

Reply via email to