# IPC fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e168c633 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e168c633 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e168c633 Branch: refs/heads/ignite-341 Commit: e168c633e7d19569e62c46346941cbeea10f6af7 Parents: c620ee0 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Tue Mar 24 12:16:41 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Tue Mar 24 12:16:41 2015 -0700 ---------------------------------------------------------------------- .../shmem/IpcSharedMemoryServerEndpoint.java | 90 +++++++++++++------- .../IpcSharedMemoryCrashDetectionSelfTest.java | 24 +++++- 2 files changed, 83 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e168c633/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java index 8c2fcb0..4f43474 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java @@ -118,7 +118,10 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint { private final Collection<IpcSharedMemoryClientEndpoint> endpoints = new GridConcurrentHashSet<>(); - /** Use this constructor when dependencies could be injected with {@link GridResourceProcessor#injectGeneric(Object)}. */ + /** + * Use this constructor when dependencies could be injected + * with {@link GridResourceProcessor#injectGeneric(Object)}. + */ public IpcSharedMemoryServerEndpoint() { // No-op. } @@ -524,38 +527,21 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint { assert workTokDir != null; - while (!isCancelled()) { - U.sleep(GC_FREQ); - - if (log.isDebugEnabled()) - log.debug("Starting GC iteration."); - - RandomAccessFile lockFile = null; - - FileLock lock = null; + boolean lastRunNeeded = true; + while (true) { try { - lockFile = new RandomAccessFile(new File(workTokDir, LOCK_FILE_NAME), "rw"); - - lock = lockFile.getChannel().lock(); - - if (lock != null) - processTokenDirectory(workTokDir); - else if (log.isDebugEnabled()) - log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); + Thread.sleep(GC_FREQ); } - catch (OverlappingFileLockException ignored) { - if (log.isDebugEnabled()) - log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); - } - catch (IOException e) { - U.error(log, "Failed to process directory: " + workTokDir.getAbsolutePath(), e); - } - finally { - U.releaseQuiet(lock); - U.closeQuiet(lockFile); + catch (InterruptedException ignored) { + // No-op. } + if (log.isDebugEnabled()) + log.debug("Starting GC iteration."); + + cleanupResources(workTokDir); + // Process spaces created by this endpoint. if (log.isDebugEnabled()) log.debug("Processing local spaces."); @@ -571,10 +557,56 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint { log.debug("Removed endpoint: " + e); } } + + if (isCancelled()) { + if (lastRunNeeded) + lastRunNeeded = false; + else { + Thread.currentThread().interrupt(); + + break; + } + } } } - /** @param workTokDir Token directory (common for multiple nodes). */ + /** + * @param workTokDir Token directory (common for multiple nodes). + */ + private void cleanupResources(File workTokDir) { + RandomAccessFile lockFile = null; + + FileLock lock = null; + + try { + lockFile = new RandomAccessFile(new File(workTokDir, LOCK_FILE_NAME), "rw"); + + lock = lockFile.getChannel().lock(); + + if (lock != null) + processTokenDirectory(workTokDir); + else if (log.isDebugEnabled()) + log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); + } + catch (OverlappingFileLockException ignored) { + if (log.isDebugEnabled()) + log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); + } + catch (InterruptedIOException ignored) { + Thread.currentThread().interrupt(); + } + catch (IOException e) { + U.error(log, "Failed to process directory: " + workTokDir.getAbsolutePath(), e); + } + finally { + U.releaseQuiet(lock); + U.closeQuiet(lockFile); + } + } + + /** + * @param workTokDir Token directory (common for multiple nodes). + */ private void processTokenDirectory(File workTokDir) { for (File f : workTokDir.listFiles()) { if (!f.isDirectory()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e168c633/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java index 5cdb048..2ddf6f3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java @@ -45,6 +45,25 @@ public class IpcSharedMemoryCrashDetectionSelfTest extends GridCommonAbstractTes IpcSharedMemoryNativeLoader.load(); } + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + // Start and stop server endpoint to let GC worker + // make a run and cleanup resources. + + U.setWorkDirectory(null, U.getIgniteHome()); + + IpcSharedMemoryServerEndpoint srv = new IpcSharedMemoryServerEndpoint(); + + new IgniteTestResources().inject(srv); + + try { + srv.start(); + } + finally { + srv.close(); + } + } + /** * @throws Exception If failed. */ @@ -207,7 +226,7 @@ public class IpcSharedMemoryCrashDetectionSelfTest extends GridCommonAbstractTes try { // Run client endpoint. client = (IpcSharedMemoryClientEndpoint) IpcEndpointFactory.connectEndpoint( - "shmem:" + IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT, log); + "shmem:" + IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT, log); OutputStream os = client.outputStream(); @@ -238,7 +257,8 @@ public class IpcSharedMemoryCrashDetectionSelfTest extends GridCommonAbstractTes assertTrue(i >= interactionsCntBeforeSrvKilling); assertTrue(X.hasCause(e, IgniteCheckedException.class)); - assertTrue(X.cause(e, IgniteCheckedException.class).getMessage().contains("Shared memory segment has been closed")); + assertTrue(X.cause(e, IgniteCheckedException.class).getMessage().contains( + "Shared memory segment has been closed")); } finally { U.closeQuiet(client);