Repository: incubator-ignite Updated Branches: refs/heads/ignite-494 [created] b6d0bed9f
[IGNITE-494]: workable fix with PeerCache thread stopping logic. (draft) Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/867ef8d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/867ef8d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/867ef8d0 Branch: refs/heads/ignite-494 Commit: 867ef8d03b884a3568127004263a48ff356d59e4 Parents: d578683 Author: iveselovskiy <[email protected]> Authored: Thu Mar 26 12:35:46 2015 +0300 Committer: iveselovskiy <[email protected]> Committed: Thu Mar 26 12:35:46 2015 +0300 ---------------------------------------------------------------------- .../util/CallableValuedThreadLocal.java | 27 +++++++ .../ignite/internal/util/ValuedThreadLocal.java | 21 +++++ .../processors/hadoop/HadoopClassLoader.java | 28 ++++++- .../processors/hadoop/HadoopDefaultJobInfo.java | 2 +- .../hadoop/jobtracker/HadoopJobTracker.java | 4 +- .../processors/hadoop/v2/HadoopV2Job.java | 82 +++++++++++++++++--- .../hadoop/v2/HadoopV2TaskContext.java | 11 +++ .../apache/ignite/igfs/IgfsEventsTestSuite.java | 4 +- .../hadoop/HadoopClassLoaderTest.java | 2 +- .../testsuites/IgniteHadoopTestSuite.java | 2 +- .../IgniteIgfsLinuxAndMacOSTestSuite.java | 2 +- 11 files changed, 166 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/core/src/main/java/org/apache/ignite/internal/util/CallableValuedThreadLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/CallableValuedThreadLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/util/CallableValuedThreadLocal.java new file mode 100644 index 0000000..c11eafa --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/CallableValuedThreadLocal.java @@ -0,0 +1,27 @@ +//package org.apache.ignite.internal.util; +// +//import java.util.concurrent.*; +// +///** +// * +// */ +//public class CallableValuedThreadLocal <T> extends ThreadLocal <T> { +// private Callable<T> callable; +// +// public CallableValuedThreadLocal(Callable<T> callable) { +// this.callable = callable; +// } +// +// @Override public T initialValue() { +// try { +// return callable == null ? null : callable.call(); +// } catch (Exception e) { +// throw new RuntimeException(e); +// } +// } +// +// public void dispose() { +// callable = null; +// } +//} +// http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/core/src/main/java/org/apache/ignite/internal/util/ValuedThreadLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ValuedThreadLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ValuedThreadLocal.java new file mode 100644 index 0000000..fe73c5b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ValuedThreadLocal.java @@ -0,0 +1,21 @@ +//package org.apache.ignite.internal.util; +// +///** +// * Preferable over anonymous ThreadLocal inner subclass since it does not +// * hold reference to the parent class. +// */ +//public class ValuedThreadLocal <T> extends ThreadLocal <T> { +// private T initialValue; +// +// public ValuedThreadLocal(T initialValue) { +// this.initialValue = initialValue; +// } +// +// @Override public T initialValue() { +// return initialValue; +// } +// +// public void dispose() { +// initialValue = null; +// } +//} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java index 1856e41..102ddab 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java @@ -20,12 +20,14 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.v2.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; import org.objectweb.asm.*; import org.objectweb.asm.commons.*; import java.io.*; +import java.lang.reflect.*; import java.net.*; import java.util.*; import java.util.concurrent.atomic.*; @@ -44,6 +46,9 @@ public class HadoopClassLoader extends URLClassLoader { } /** */ + private static final AtomicLong instances = new AtomicLong(0); + + /** */ private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)HadoopClassLoader.class.getClassLoader(); /** */ @@ -58,13 +63,20 @@ public class HadoopClassLoader extends URLClassLoader { /** */ private static final Map<String, byte[]> bytesCache = new ConcurrentHashMap8<>(); + /** Name of this classloader -- for diagnostic purposes. */ + private final String name; /** * @param urls Urls. */ - public HadoopClassLoader(URL[] urls) { + public HadoopClassLoader(URL[] urls, String name) { super(addHadoopUrls(urls), APP_CLS_LDR); assert !(getParent() instanceof HadoopClassLoader); + + this.name = name; + + long numInstances = instances.incrementAndGet(); + U.debug("++: " + numInstances); } /** @@ -549,4 +561,18 @@ public class HadoopClassLoader extends URLClassLoader { return hadoopUrls; } } + + @Override protected void finalize() throws Throwable { + super.finalize(); + long numInstances = instances.decrementAndGet(); + + Field f = ClassLoader.class.getDeclaredField("classes"); + f.setAccessible(true); + Vector v = (Vector)f.get(this); + U.debug("--: " + numInstances + ", classes = " + v.size()); + } + + @Override public String toString() { + return super.toString() + ": " + name; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index 77eb6d2..76e9a43 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -89,7 +89,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes. synchronized (HadoopDefaultJobInfo.class) { if ((jobCls0 = jobCls) == null) { - HadoopClassLoader ldr = new HadoopClassLoader(null); + HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-main"); jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java index 0ca61bc..c2bb5be 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -826,8 +826,6 @@ public class HadoopJobTracker extends HadoopComponent { jobs.remove(jobId); - job.dispose(false); - if (ctx.jobUpdateLeader()) { ClassLoader ldr = job.getClass().getClassLoader(); @@ -849,6 +847,8 @@ public class HadoopJobTracker extends HadoopComponent { } } + job.dispose(false); // job may get unusable after #dispose() + break; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index f2f0cab..6c60538 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -46,13 +46,13 @@ import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; */ public class HadoopV2Job implements HadoopJob { /** */ - private final JobConf jobConf; + private JobConf jobConf; /** */ - private final JobContextImpl jobCtx; + private JobContextImpl jobCtx; /** Hadoop job ID. */ - private final HadoopJobId jobId; + private HadoopJobId jobId; /** Job info. */ protected HadoopJobInfo jobInfo; @@ -61,7 +61,7 @@ public class HadoopV2Job implements HadoopJob { private final JobID hadoopJobID; /** */ - private final HadoopV2JobResourceManager rsrcMgr; + private HadoopV2JobResourceManager rsrcMgr; /** */ private final ConcurrentMap<T2<HadoopTaskType, Integer>, GridFutureAdapter<HadoopTaskContext>> ctxs = @@ -196,7 +196,8 @@ public class HadoopV2Job implements HadoopJob { try { if (cls == null) { // If there is no pooled class, then load new one. - HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath()); + HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(), + "hadoop-" + info.jobId() + "-" + info.taskNumber()); cls = ldr.loadClass(HadoopV2TaskContext.class.getName()); } @@ -247,11 +248,69 @@ public class HadoopV2Job implements HadoopJob { /** {@inheritDoc} */ @Override public void dispose(boolean external) throws IgniteCheckedException { - if (rsrcMgr != null && !external) { - File jobLocDir = jobLocalDir(locNodeId, jobId); + try { + if (rsrcMgr != null && !external) { + File jobLocDir = jobLocalDir(locNodeId, jobId); + + if (jobLocDir.exists()) + U.delete(jobLocDir); + } + } + finally { + // free up the references that may hold classloader: + dispose0(); + + //stopPeerCacheThreads0(); + } + } - if (jobLocDir.exists()) - U.delete(jobLocDir); + private void dispose0() { + jobConf = null; + jobCtx = null; + jobId = null; + jobInfo = null; + rsrcMgr = null; + ctxs.clear(); + taskCtxClsPool.clear(); + jobConfData = null; + } + + private void stopPeerCacheThreads0(ClassLoader taskClassLoader) { + // Find out the top thread group: + ThreadGroup tg = Thread.currentThread().getThreadGroup(); + while (true) { + ThreadGroup g1 = tg.getParent(); + if (g1 == null) + break; + tg = g1; + } + + int total = tg.activeCount(); + // Total is only a rough estimate of the thread number: + Thread[] tt = new Thread[(total * 5)/4]; + tg.enumerate(tt, true); + + for (Thread t: tt) { + if (t == null) + break; + + if (t.isDaemon() + && t.getName().startsWith("org.apache.hadoop.hdfs.PeerCache@") + // we should identify if the thread belongs to this Hadoop task: + && t.getContextClassLoader() == taskClassLoader) { + // shutdown the thread: + int attempts = 0; + while (t.isAlive() && attempts <= 5) { + t.interrupt(); + try { + t.join(500L); + } catch (InterruptedException ie) { + U.debug(ie.toString()); + } + attempts++; + } + U.debug("Thread " + t + " stopped: " + !t.isAlive() + " in " + attempts + " attempt(s)."); + } } } @@ -264,12 +323,15 @@ public class HadoopV2Job implements HadoopJob { @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { HadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), info.taskNumber())).get(); - taskCtxClsPool.offer(ctx.getClass()); + Class<?> clazz = ctx.getClass(); + taskCtxClsPool.offer(clazz); File locDir = taskLocalDir(locNodeId, info); if (locDir.exists()) U.delete(locDir); + + stopPeerCacheThreads0(clazz.getClassLoader()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index 24f10a6..11f1a5b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -252,6 +252,17 @@ public class HadoopV2TaskContext extends HadoopTaskContext { /** {@inheritDoc} */ @Override public void cleanupTaskEnvironment() throws IgniteCheckedException { job().cleanupTaskEnvironment(taskInfo()); + // TODO: Not sure this is needed: +// try { +// FileSystem fs = FileSystem.get(jobConf()); +// fs.close(); +// +// LocalFileSystem locFs = FileSystem.getLocal(jobConf()); +// locFs.close(); +// } +// catch (IOException ioe) { +// throw transformException(ioe); +// } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java index c082fe0..fb21e2d 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java @@ -38,7 +38,7 @@ public class IgfsEventsTestSuite extends TestSuite { * @throws Exception Thrown in case of the failure. */ public static TestSuite suite() throws Exception { - HadoopClassLoader ldr = new HadoopClassLoader(null); + HadoopClassLoader ldr = new HadoopClassLoader(null, "test"); TestSuite suite = new TestSuite("Ignite FS Events Test Suite"); @@ -58,7 +58,7 @@ public class IgfsEventsTestSuite extends TestSuite { * @throws Exception Thrown in case of the failure. */ public static TestSuite suiteNoarchOnly() throws Exception { - HadoopClassLoader ldr = new HadoopClassLoader(null); + HadoopClassLoader ldr = new HadoopClassLoader(null, "test"); TestSuite suite = new TestSuite("Ignite IGFS Events Test Suite Noarch Only"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java index a3289cb..3b55fa5 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java @@ -25,7 +25,7 @@ import org.apache.hadoop.mapreduce.*; */ public class HadoopClassLoaderTest extends TestCase { /** */ - HadoopClassLoader ldr = new HadoopClassLoader(null); + HadoopClassLoader ldr = new HadoopClassLoader(null, "test"); /** * @throws Exception If failed. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 985dbb2..183087c 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -48,7 +48,7 @@ public class IgniteHadoopTestSuite extends TestSuite { downloadHadoop(); downloadHive(); - HadoopClassLoader ldr = new HadoopClassLoader(null); + HadoopClassLoader ldr = new HadoopClassLoader(null, "test"); TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/867ef8d0/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java index 87233fc..8982d83 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java @@ -36,7 +36,7 @@ public class IgniteIgfsLinuxAndMacOSTestSuite extends TestSuite { public static TestSuite suite() throws Exception { downloadHadoop(); - HadoopClassLoader ldr = new HadoopClassLoader(null); + HadoopClassLoader ldr = new HadoopClassLoader(null, "test"); TestSuite suite = new TestSuite("Ignite IGFS Test Suite For Linux And Mac OS");
