# IGNITE-494: Applied patch from Ivan for permgen leak.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5657b41a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5657b41a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5657b41a Branch: refs/heads/ignite-424 Commit: 5657b41a55054d26c53beb1322207dc7a10e402b Parents: 2230fe7 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Fri Mar 27 18:22:35 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Fri Mar 27 18:22:35 2015 +0300 ---------------------------------------------------------------------- .../processors/hadoop/HadoopClassLoader.java | 21 +++- .../processors/hadoop/HadoopDefaultJobInfo.java | 2 +- .../hadoop/jobtracker/HadoopJobTracker.java | 4 +- .../processors/hadoop/v2/HadoopDaemon.java | 125 +++++++++++++++++++ .../processors/hadoop/v2/HadoopV2Job.java | 58 +++++++-- .../apache/ignite/igfs/IgfsEventsTestSuite.java | 4 +- .../hadoop/HadoopClassLoaderTest.java | 2 +- .../testsuites/IgniteHadoopTestSuite.java | 2 +- .../IgniteIgfsLinuxAndMacOSTestSuite.java | 2 +- 9 files changed, 203 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java index 38c683f..eb98ff9 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.v2.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import org.jsr166.*; import org.objectweb.asm.*; @@ -43,6 +44,9 @@ public class HadoopClassLoader extends URLClassLoader { registerAsParallelCapable(); } + /** Name of the Hadoop daemon class. */ + public static final String HADOOP_DAEMON_CLASS_NAME = "org.apache.hadoop.util.Daemon"; + /** */ private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)HadoopClassLoader.class.getClassLoader(); @@ -58,13 +62,19 @@ public class HadoopClassLoader extends URLClassLoader { /** */ private static final Map<String, byte[]> bytesCache = new ConcurrentHashMap8<>(); + /** Diagnostic name of this class loader. */ + @SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"}) + private final String name; + /** * @param urls Urls. */ - public HadoopClassLoader(URL[] urls) { + public HadoopClassLoader(URL[] urls, String name) { super(addHadoopUrls(urls), APP_CLS_LDR); assert !(getParent() instanceof HadoopClassLoader); + + this.name = name; } /** @@ -96,6 +106,10 @@ public class HadoopClassLoader extends URLClassLoader { return loadFromBytes(name, HadoopShutdownHookManager.class.getName()); else if (name.endsWith(".util.NativeCodeLoader")) return loadFromBytes(name, HadoopNativeCodeLoader.class.getName()); + else if (name.equals(HADOOP_DAEMON_CLASS_NAME)) + // We replace this in order to be able to forcibly stop some daemon threads + // that otherwise never stop (e.g. PeerCache runnables): + return loadFromBytes(name, HadoopDaemon.class.getName()); return loadClassExplicitly(name, resolve); } @@ -549,4 +563,9 @@ public class HadoopClassLoader extends URLClassLoader { return hadoopUrls; } } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopClassLoader.class, this); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index 77eb6d2..76e9a43 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -89,7 +89,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes. synchronized (HadoopDefaultJobInfo.class) { if ((jobCls0 = jobCls) == null) { - HadoopClassLoader ldr = new HadoopClassLoader(null); + HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-main"); jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java index 9e959a2..8ec4aed 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -826,8 +826,6 @@ public class HadoopJobTracker extends HadoopComponent { jobs.remove(jobId); - job.dispose(false); - if (ctx.jobUpdateLeader()) { ClassLoader ldr = job.getClass().getClassLoader(); @@ -849,6 +847,8 @@ public class HadoopJobTracker extends HadoopComponent { } } + job.dispose(false); + break; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java new file mode 100644 index 0000000..5a8ac64 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.util.*; + +/** + * Replacement for Hadoop {@code org.apache.hadoop.util.Daemon} class. + */ +@SuppressWarnings("UnusedDeclaration") +public class HadoopDaemon extends Thread { + /** Lock object used for synchronization. */ + private static final Object lock = new Object(); + + /** Collection to hold the threads to be stopped. */ + private static Collection<HadoopDaemon> daemons = new LinkedList<>(); + + { + setDaemon(true); // always a daemon + } + + /** Runnable of this thread, may be this. */ + final Runnable runnable; + + /** + * Construct a daemon thread. + */ + public HadoopDaemon() { + super(); + + runnable = this; + + enqueueIfNeeded(); + } + + /** + * Construct a daemon thread. + */ + public HadoopDaemon(Runnable runnable) { + super(runnable); + + this.runnable = runnable; + + this.setName(runnable.toString()); + + enqueueIfNeeded(); + } + + /** + * Construct a daemon thread to be part of a specified thread group. + */ + public HadoopDaemon(ThreadGroup grp, Runnable runnable) { + super(grp, runnable); + + this.runnable = runnable; + + this.setName(runnable.toString()); + + enqueueIfNeeded(); + } + + /** + * Getter for the runnable. May return this. + * + * @return the runnable + */ + public Runnable getRunnable() { + return runnable; + } + + /** + * if the runnable is a Hadoop org.apache.hadoop.hdfs.PeerCache Runnable. + * + * @param r the runnable. + * @return true if it is. + */ + private static boolean isPeerCacheRunnable(Runnable r) { + String name = r.getClass().getName(); + + return name.startsWith("org.apache.hadoop.hdfs.PeerCache"); + } + + /** + * Enqueue this thread if it should be stopped upon the task end. + */ + private void enqueueIfNeeded() { + synchronized (lock) { + if (daemons == null) + throw new RuntimeException("Failed to create HadoopDaemon (its registry is already cleared): " + + "[classLoader=" + getClass().getClassLoader() + ']'); + + if (runnable.getClass().getClassLoader() == getClass().getClassLoader() && isPeerCacheRunnable(runnable)) + daemons.add(this); + } + } + + /** + * Stops all the registered threads. + */ + public static void dequeueAndStopAll() { + synchronized (lock) { + if (daemons != null) { + for (HadoopDaemon daemon : daemons) + daemon.interrupt(); + + daemons = null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index f35c2f8..e3c2bfa 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -55,7 +55,7 @@ public class HadoopV2Job implements HadoopJob { private final HadoopJobId jobId; /** Job info. */ - protected HadoopJobInfo jobInfo; + protected final HadoopJobInfo jobInfo; /** */ private final JobID hadoopJobID; @@ -70,8 +70,11 @@ public class HadoopV2Job implements HadoopJob { /** Pooling task context class and thus class loading environment. */ private final Queue<Class<?>> taskCtxClsPool = new ConcurrentLinkedQueue<>(); + /** All created contexts. */ + private final Queue<Class<?>> fullCtxClsQueue = new ConcurrentLinkedDeque<>(); + /** Local node ID */ - private UUID locNodeId; + private volatile UUID locNodeId; /** Serialized JobConf. */ private volatile byte[] jobConfData; @@ -196,9 +199,14 @@ public class HadoopV2Job implements HadoopJob { try { if (cls == null) { // If there is no pooled class, then load new one. - HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath()); + // Note that the classloader identified by the task it was initially created for, + // but later it may be reused for other tasks. + HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(), + "hadoop-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber()); cls = ldr.loadClass(HadoopV2TaskContext.class.getName()); + + fullCtxClsQueue.add(cls); } Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, HadoopJob.class, @@ -246,12 +254,46 @@ public class HadoopV2Job implements HadoopJob { } /** {@inheritDoc} */ + @SuppressWarnings("ThrowFromFinallyBlock") @Override public void dispose(boolean external) throws IgniteCheckedException { - if (rsrcMgr != null && !external) { - File jobLocDir = jobLocalDir(locNodeId, jobId); + try { + if (rsrcMgr != null && !external) { + File jobLocDir = jobLocalDir(locNodeId, jobId); + + if (jobLocDir.exists()) + U.delete(jobLocDir); + } + } + finally { + taskCtxClsPool.clear(); + + Throwable err = null; + + // Stop the daemon threads that have been created + // with the task class loaders: + while (true) { + Class<?> cls = fullCtxClsQueue.poll(); + + if (cls == null) + break; + + try { + Class<?> daemonCls = cls.getClassLoader().loadClass(HadoopClassLoader.HADOOP_DAEMON_CLASS_NAME); + + Method m = daemonCls.getMethod("dequeueAndStopAll"); + + m.invoke(null); + } + catch (Throwable e) { + if (err == null) + err = e; + } + } + + assert fullCtxClsQueue.isEmpty(); - if (jobLocDir.exists()) - U.delete(jobLocDir); + if (err != null) + throw U.cast(err); } } @@ -264,7 +306,7 @@ public class HadoopV2Job implements HadoopJob { @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { HadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), info.taskNumber())).get(); - taskCtxClsPool.offer(ctx.getClass()); + taskCtxClsPool.add(ctx.getClass()); File locDir = taskLocalDir(locNodeId, info); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java index c082fe0..fb21e2d 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java @@ -38,7 +38,7 @@ public class IgfsEventsTestSuite extends TestSuite { * @throws Exception Thrown in case of the failure. */ public static TestSuite suite() throws Exception { - HadoopClassLoader ldr = new HadoopClassLoader(null); + HadoopClassLoader ldr = new HadoopClassLoader(null, "test"); TestSuite suite = new TestSuite("Ignite FS Events Test Suite"); @@ -58,7 +58,7 @@ public class IgfsEventsTestSuite extends TestSuite { * @throws Exception Thrown in case of the failure. */ public static TestSuite suiteNoarchOnly() throws Exception { - HadoopClassLoader ldr = new HadoopClassLoader(null); + HadoopClassLoader ldr = new HadoopClassLoader(null, "test"); TestSuite suite = new TestSuite("Ignite IGFS Events Test Suite Noarch Only"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java index a3289cb..3b55fa5 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java @@ -25,7 +25,7 @@ import org.apache.hadoop.mapreduce.*; */ public class HadoopClassLoaderTest extends TestCase { /** */ - HadoopClassLoader ldr = new HadoopClassLoader(null); + HadoopClassLoader ldr = new HadoopClassLoader(null, "test"); /** * @throws Exception If failed. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 985dbb2..183087c 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -48,7 +48,7 @@ public class IgniteHadoopTestSuite extends TestSuite { downloadHadoop(); downloadHive(); - HadoopClassLoader ldr = new HadoopClassLoader(null); + HadoopClassLoader ldr = new HadoopClassLoader(null, "test"); TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5657b41a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java index 87233fc..8982d83 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java @@ -36,7 +36,7 @@ public class IgniteIgfsLinuxAndMacOSTestSuite extends TestSuite { public static TestSuite suite() throws Exception { downloadHadoop(); - HadoopClassLoader ldr = new HadoopClassLoader(null); + HadoopClassLoader ldr = new HadoopClassLoader(null, "test"); TestSuite suite = new TestSuite("Ignite IGFS Test Suite For Linux And Mac OS");