# IG-980: HadoopV2Job#finalize() also disposes the job, num instances limit for tests decreased to 4;
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d772d449 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d772d449 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d772d449 Branch: refs/heads/ignite-980 Commit: d772d449e798f7494abc8b52200cf7c04f4f79a0 Parents: 3df0112 Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Fri Jun 19 12:09:17 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Fri Jun 19 12:09:17 2015 +0300 ---------------------------------------------------------------------- .../ignite/igfs/IgfsEventsAbstractSelfTest.java | 3 ++ .../processors/igfs/IgfsAbstractSelfTest.java | 2 +- .../internal/processors/hadoop/HadoopUtils.java | 36 ++++++++------------ .../processors/hadoop/v2/HadoopV2Job.java | 27 ++++++++++----- .../igfs/HadoopIgfsDualAbstractSelfTest.java | 6 ++++ .../apache/ignite/igfs/IgfsEventsTestSuite.java | 1 + .../hadoop/HadoopAbstractSelfTest.java | 29 ++++++++++++---- .../testsuites/IgniteHadoopTestSuite.java | 7 ++++ 8 files changed, 74 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d772d449/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java index 07cd1e6..3bdd7e5 100644 --- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java @@ -174,6 +174,9 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { stopGrid(1); + + if (igfs != null) + igfs.stop(false); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d772d449/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java index 90768db..a8a8957 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java @@ -786,7 +786,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { * @throws Exception If failed. */ @SuppressWarnings("ConstantConditions") - public void _testFormat() throws Exception { + public void testFormat() throws Exception { // Test works too long and fails. fail("https://issues.apache.org/jira/browse/IGNITE-586"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d772d449/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index 4802b64..bf63591 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -605,11 +605,7 @@ public class HadoopUtils { X.println("### DECREMENT: usage count == " + usageCount + ", jobId = " + jobId + ", locId = " + locId); - if (usageCount < 0) { - X.println("negative usage count map: " + t2.get1()); - - assert false; - } + assert usageCount >= 0 : "negative usage count " + usageCount + ", map: " + t2.get1(); if (usageCount == 0) { T2<HadoopLazyConcurrentMap<FsCacheKey, FileSystem>, Set<String>> removedT2 = jobFsMap.remove(jobId); @@ -618,24 +614,22 @@ public class HadoopUtils { t2.get1().close(); } - else - X.println("### Not closing Fs since usage count == " + usageCount); } } - /** - * Diagnostic method. - */ - public static synchronized void dump() { - System.out.println("DUMP: ##################### main map: " + fileSysLazyMap); - if (!jobFsMap.isEmpty()) { - System.out.println("##################### job map: " + jobFsMap.size()); - for (Map.Entry<String, T2<HadoopLazyConcurrentMap<FsCacheKey, FileSystem>, Set<String>>> e : jobFsMap.entrySet()) { - T2<HadoopLazyConcurrentMap<FsCacheKey, FileSystem>, Set<String>> t2 = e.getValue(); - System.out.println("###### job = " + e.getKey() + ", cnt = " + t2.get2() + ", map = " + t2.get1()); - } - } - } +// /** +// * Diagnostic method. +// */ +// public static synchronized void dump() { +// System.out.println("DUMP: ##################### main map: " + fileSysLazyMap); +// if (!jobFsMap.isEmpty()) { +// System.out.println("##################### job map: " + jobFsMap.size()); +// for (Map.Entry<String, T2<HadoopLazyConcurrentMap<FsCacheKey, FileSystem>, Set<String>>> e : jobFsMap.entrySet()) { +// T2<HadoopLazyConcurrentMap<FsCacheKey, FileSystem>, Set<String>> t2 = e.getValue(); +// System.out.println("###### job = " + e.getKey() + ", cnt = " + t2.get2() + ", map = " + t2.get1()); +// } +// } +// } /** * Gets the property name to disable file system cache. @@ -676,7 +670,5 @@ public class HadoopUtils { */ public static void close() throws IgniteCheckedException { fileSysLazyMap.close(); - - dump(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d772d449/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index cff27eb..287bedb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -285,14 +285,13 @@ public class HadoopV2Job implements HadoopJob { /** {@inheritDoc} */ @SuppressWarnings("ThrowFromFinallyBlock") @Override public void dispose(boolean external) throws IgniteCheckedException { - X.println("############# DISPOSE: jod = " + jobId + ", locId = " + locNodeId.toString() + ", this = " + this); - //X.println(" loc node id = " + locNodeId); - X.println(" XXXXXX this id = " + System.identityHashCode(this)); - assert initialized.get(); boolean dsp = disposed.compareAndSet(false, true); - assert dsp; + if (!dsp) + return; + + X.println("############# DISPOSE: jod = " + jobId + ", locId = " + locNodeId.toString() + ", this = " + this); try { if (rsrcMgr != null && !external) { @@ -328,11 +327,12 @@ public class HadoopV2Job implements HadoopJob { // HadoopLazyConcurrentMap for this *task* class loader: closeCachedFileSystems(ldr); - // assert getClass().getClassLoader() instanceof HadoopClassLoader; // assert getClass().getClassLoader().toString().contains("hadoop-job"); } catch (Throwable e) { + e.printStackTrace(); + if (err == null) err = e; @@ -346,7 +346,9 @@ public class HadoopV2Job implements HadoopJob { // Close all cached Fs for this Job: HadoopUtils.close(jobId.toString(), locNodeId.toString()); - for (int q=0; q<10; q++) + int i = 0; + + while (i++ < 5) System.gc(); if (err != null) @@ -354,7 +356,14 @@ public class HadoopV2Job implements HadoopJob { } } - private void closeCachedFileSystems(ClassLoader ldr) { + /** {@inheritDoc} */ + @Override protected void finalize() throws Throwable { + super.finalize(); + + dispose(false); + } + + private void closeCachedFileSystems(ClassLoader ldr) throws Exception { try { Class clazz = ldr.loadClass(HadoopUtils.class.getName()); @@ -364,6 +373,8 @@ public class HadoopV2Job implements HadoopJob { } catch (Exception e) { e.printStackTrace(); + + throw e; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d772d449/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java index 00f0b5e..ed8e36d 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java @@ -204,6 +204,12 @@ public abstract class HadoopIgfsDualAbstractSelfTest extends IgfsCommonAbstractT /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { G.stopAll(true); + + if (igfs != null) + igfs.stop(false); + + if (igfsSecondary != null) + igfsSecondary.stop(false); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d772d449/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java index e7c7f8a..375a949 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java @@ -21,6 +21,7 @@ import junit.framework.*; import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.hadoop.fs.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.ipc.shmem.*; import org.apache.ignite.internal.util.typedef.*; import org.jetbrains.annotations.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d772d449/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java index 8d5faa9..27355ea 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java @@ -22,7 +22,6 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.*; import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem; import org.apache.ignite.internal.processors.hadoop.fs.*; -import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; @@ -99,13 +98,13 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { initCp = null; + // Ensure the file systems are cleared: int inst = HadoopLazyConcurrentMap.getNumberOfInstances(); - X.println("HLCM instances: " + inst); - - // TODO: harden this contraint. It looks like sometimes Job#dispose(boolean) is not caled, so not all - // file systems are cleared: - assertTrue("HadoopLazyConcurrentMap instances: " + inst, inst <= 8); + if (inst <= 4) + log().info("HadoopLazyConcurrentMap instances: " + inst); + else + log().warning("#### HadoopLazyConcurrentMap instances: " + inst); } /** {@inheritDoc} */ @@ -239,4 +238,22 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { protected String igfsScheme() { return "igfs://:" + getTestGridName(0) + "@/"; } + + /** + * + */ + public static class CheckFsInstancesLeakTest extends GridCommonAbstractTest { + /** + * + * @throws Exception + */ + public void testLeak() throws Exception { + // Ensure the file systems are cleared: + int inst = HadoopLazyConcurrentMap.getNumberOfInstances(); + + log().info("HadoopLazyConcurrentMap instances: " + inst); + + assertTrue("HadoopLazyConcurrentMap instances: " + inst, inst <= 4); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d772d449/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index f5def91..3ce60d2 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -77,9 +77,14 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfsDualSyncSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfsDualAsyncSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopAbstractSelfTest.CheckFsInstancesLeakTest.class.getName()))); // ok + suite.addTest(IgfsEventsTestSuite.suiteNoarchOnly()); + suite.addTest(new TestSuite(ldr.loadClass(HadoopAbstractSelfTest.CheckFsInstancesLeakTest.class.getName()))); + // ok: suite.addTest(new TestSuite(ldr.loadClass(HadoopFileSystemsTest.class.getName()))); @@ -126,6 +131,8 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualSyncTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualAsyncTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopAbstractSelfTest.CheckFsInstancesLeakTest.class.getName()))); + return suite; }