# IPC fixes

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e168c633
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e168c633
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e168c633

Branch: refs/heads/ignite-471
Commit: e168c633e7d19569e62c46346941cbeea10f6af7
Parents: c620ee0
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Tue Mar 24 12:16:41 2015 -0700
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Tue Mar 24 12:16:41 2015 -0700

----------------------------------------------------------------------
 .../shmem/IpcSharedMemoryServerEndpoint.java    | 90 +++++++++++++-------
 .../IpcSharedMemoryCrashDetectionSelfTest.java  | 24 +++++-
 2 files changed, 83 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e168c633/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
index 8c2fcb0..4f43474 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
@@ -118,7 +118,10 @@ public class IpcSharedMemoryServerEndpoint implements 
IpcServerEndpoint {
     private final Collection<IpcSharedMemoryClientEndpoint> endpoints =
         new GridConcurrentHashSet<>();
 
-    /** Use this constructor when dependencies could be injected with {@link 
GridResourceProcessor#injectGeneric(Object)}. */
+    /**
+     * Use this constructor when dependencies could be injected
+     * with {@link GridResourceProcessor#injectGeneric(Object)}.
+     */
     public IpcSharedMemoryServerEndpoint() {
         // No-op.
     }
@@ -524,38 +527,21 @@ public class IpcSharedMemoryServerEndpoint implements 
IpcServerEndpoint {
 
             assert workTokDir != null;
 
-            while (!isCancelled()) {
-                U.sleep(GC_FREQ);
-
-                if (log.isDebugEnabled())
-                    log.debug("Starting GC iteration.");
-
-                RandomAccessFile lockFile = null;
-
-                FileLock lock = null;
+            boolean lastRunNeeded = true;
 
+            while (true) {
                 try {
-                    lockFile = new RandomAccessFile(new File(workTokDir, 
LOCK_FILE_NAME), "rw");
-
-                    lock = lockFile.getChannel().lock();
-
-                    if (lock != null)
-                        processTokenDirectory(workTokDir);
-                    else if (log.isDebugEnabled())
-                        log.debug("Token directory is being processed 
concurrently: " + workTokDir.getAbsolutePath());
+                    Thread.sleep(GC_FREQ);
                 }
-                catch (OverlappingFileLockException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Token directory is being processed 
concurrently: " + workTokDir.getAbsolutePath());
-                }
-                catch (IOException e) {
-                    U.error(log, "Failed to process directory: " + 
workTokDir.getAbsolutePath(), e);
-                }
-                finally {
-                    U.releaseQuiet(lock);
-                    U.closeQuiet(lockFile);
+                catch (InterruptedException ignored) {
+                    // No-op.
                 }
 
+                if (log.isDebugEnabled())
+                    log.debug("Starting GC iteration.");
+
+                cleanupResources(workTokDir);
+
                 // Process spaces created by this endpoint.
                 if (log.isDebugEnabled())
                     log.debug("Processing local spaces.");
@@ -571,10 +557,56 @@ public class IpcSharedMemoryServerEndpoint implements 
IpcServerEndpoint {
                             log.debug("Removed endpoint: " + e);
                     }
                 }
+
+                if (isCancelled()) {
+                    if (lastRunNeeded)
+                        lastRunNeeded = false;
+                    else {
+                        Thread.currentThread().interrupt();
+
+                        break;
+                    }
+                }
             }
         }
 
-        /** @param workTokDir Token directory (common for multiple nodes). */
+        /**
+         * @param workTokDir Token directory (common for multiple nodes).
+         */
+        private void cleanupResources(File workTokDir) {
+            RandomAccessFile lockFile = null;
+
+            FileLock lock = null;
+
+            try {
+                lockFile = new RandomAccessFile(new File(workTokDir, 
LOCK_FILE_NAME), "rw");
+
+                lock = lockFile.getChannel().lock();
+
+                if (lock != null)
+                    processTokenDirectory(workTokDir);
+                else if (log.isDebugEnabled())
+                    log.debug("Token directory is being processed 
concurrently: " + workTokDir.getAbsolutePath());
+            }
+            catch (OverlappingFileLockException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Token directory is being processed 
concurrently: " + workTokDir.getAbsolutePath());
+            }
+            catch (InterruptedIOException ignored) {
+                Thread.currentThread().interrupt();
+            }
+            catch (IOException e) {
+                U.error(log, "Failed to process directory: " + 
workTokDir.getAbsolutePath(), e);
+            }
+            finally {
+                U.releaseQuiet(lock);
+                U.closeQuiet(lockFile);
+            }
+        }
+
+        /**
+         * @param workTokDir Token directory (common for multiple nodes).
+         */
         private void processTokenDirectory(File workTokDir) {
             for (File f : workTokDir.listFiles()) {
                 if (!f.isDirectory()) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e168c633/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
index 5cdb048..2ddf6f3 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
@@ -45,6 +45,25 @@ public class IpcSharedMemoryCrashDetectionSelfTest extends 
GridCommonAbstractTes
         IpcSharedMemoryNativeLoader.load();
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        // Start and stop server endpoint to let GC worker
+        // make a run and cleanup resources.
+
+        U.setWorkDirectory(null, U.getIgniteHome());
+
+        IpcSharedMemoryServerEndpoint srv = new 
IpcSharedMemoryServerEndpoint();
+
+        new IgniteTestResources().inject(srv);
+
+        try {
+            srv.start();
+        }
+        finally {
+            srv.close();
+        }
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -207,7 +226,7 @@ public class IpcSharedMemoryCrashDetectionSelfTest extends 
GridCommonAbstractTes
         try {
             // Run client endpoint.
             client = (IpcSharedMemoryClientEndpoint) 
IpcEndpointFactory.connectEndpoint(
-                    "shmem:" + IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT, 
log);
+                "shmem:" + IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT, log);
 
             OutputStream os = client.outputStream();
 
@@ -238,7 +257,8 @@ public class IpcSharedMemoryCrashDetectionSelfTest extends 
GridCommonAbstractTes
             assertTrue(i >= interactionsCntBeforeSrvKilling);
 
             assertTrue(X.hasCause(e, IgniteCheckedException.class));
-            assertTrue(X.cause(e, 
IgniteCheckedException.class).getMessage().contains("Shared memory segment has 
been closed"));
+            assertTrue(X.cause(e, 
IgniteCheckedException.class).getMessage().contains(
+                "Shared memory segment has been closed"));
         }
         finally {
             U.closeQuiet(client);

Reply via email to