Repository: incubator-ignite Updated Branches: refs/heads/ignite-45-ipc-debug f884b267e -> 75fdb5d46
# IPC fixes Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cb3e4da0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cb3e4da0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cb3e4da0 Branch: refs/heads/ignite-45-ipc-debug Commit: cb3e4da012ac760dd6f5799bf3dc25e553b98710 Parents: f884b26 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Tue Mar 24 11:54:36 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Tue Mar 24 11:54:36 2015 -0700 ---------------------------------------------------------------------- .../shmem/IpcSharedMemoryClientEndpoint.java | 2 - .../shmem/IpcSharedMemoryServerEndpoint.java | 168 +++++++++++++++++- .../util/ipc/shmem/IpcSharedMemorySpace.java | 4 - .../util/ipc/shmem/IpcSharedMemoryUtils.java | 170 ------------------- .../IpcSharedMemoryCrashDetectionSelfTest.java | 4 +- .../IgniteIpcSharedMemorySelfTestSuite.java | 6 +- 6 files changed, 172 insertions(+), 182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cb3e4da0/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java index d76d7a2..27a234f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java @@ -296,8 +296,6 @@ public class IpcSharedMemoryClientEndpoint implements IpcEndpoint { File tokFile = new File(space.tokenFileName()); - U.dumpStack(log, "Free [tok=" + tokFile + ']'); - // Space is not usable at this point and all local threads // are guaranteed to leave its methods (other party is not alive). // So, we can cleanup resources without additional synchronization. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cb3e4da0/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java index bda5fd6..81592d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java @@ -33,6 +33,7 @@ import org.jetbrains.annotations.*; import java.io.*; import java.net.*; +import java.nio.channels.*; import java.util.*; import java.util.concurrent.atomic.*; @@ -40,6 +41,9 @@ import java.util.concurrent.atomic.*; * Server shared memory IPC endpoint. */ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint { + /** Default lock file name. */ + private static final String LOCK_FILE_NAME = "lock.file"; + /** IPC error message. */ public static final String OUT_OF_RESOURCES_MSG = "Failed to allocate shared memory segment"; @@ -536,7 +540,7 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint { if (log.isDebugEnabled()) log.debug("Starting GC iteration."); - IpcSharedMemoryUtils.cleanResources(workTokDir, tokDir, log); + cleanupResources(workTokDir); // Process spaces created by this endpoint. if (log.isDebugEnabled()) @@ -565,5 +569,167 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint { } } } + + /** + * @param workTokDir Token directory (common for multiple nodes). + */ + private void cleanupResources(File workTokDir) { + RandomAccessFile lockFile = null; + + FileLock lock = null; + + try { + lockFile = new RandomAccessFile(new File(workTokDir, LOCK_FILE_NAME), "rw"); + + lock = lockFile.getChannel().lock(); + + if (lock != null) + processTokenDirectory(workTokDir); + else if (log.isDebugEnabled()) + log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); + } + catch (OverlappingFileLockException ignored) { + if (log.isDebugEnabled()) + log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); + } + catch (IOException e) { + U.error(log, "Failed to process directory: " + workTokDir.getAbsolutePath(), e); + } + finally { + U.releaseQuiet(lock); + U.closeQuiet(lockFile); + } + } + + /** + * @param workTokDir Token directory (common for multiple nodes). + */ + private void processTokenDirectory(File workTokDir) { + for (File f : workTokDir.listFiles()) { + if (!f.isDirectory()) { + if (!f.getName().equals(LOCK_FILE_NAME)) { + if (log.isDebugEnabled()) + log.debug("Unexpected file: " + f.getName()); + } + + continue; + } + + if (f.equals(tokDir)) { + if (log.isDebugEnabled()) + log.debug("Skipping own token directory: " + tokDir.getName()); + + continue; + } + + String name = f.getName(); + + int pid; + + try { + pid = Integer.parseInt(name.substring(name.lastIndexOf('-') + 1)); + } + catch (NumberFormatException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to parse file name: " + name); + + continue; + } + + // Is process alive? + if (IpcSharedMemoryUtils.alive(pid)) { + if (log.isDebugEnabled()) + log.debug("Skipping alive node: " + pid); + + continue; + } + + if (log.isDebugEnabled()) + log.debug("Possibly stale token folder: " + f); + + // Process each token under stale token folder. + File[] shmemToks = f.listFiles(); + + if (shmemToks == null) + // Although this is strange, but is reproducible sometimes on linux. + return; + + int rmvCnt = 0; + + try { + for (File f0 : shmemToks) { + if (log.isDebugEnabled()) + log.debug("Processing token file: " + f0.getName()); + + if (f0.isDirectory()) { + if (log.isDebugEnabled()) + log.debug("Unexpected directory: " + f0.getName()); + } + + // Token file format: gg-shmem-space-[auto_idx]-[other_party_pid]-[size] + String[] toks = f0.getName().split("-"); + + if (toks.length != 6) { + if (log.isDebugEnabled()) + log.debug("Unrecognized token file: " + f0.getName()); + + continue; + } + + int pid0; + int size; + + try { + pid0 = Integer.parseInt(toks[4]); + size = Integer.parseInt(toks[5]); + } + catch (NumberFormatException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to parse file name: " + name); + + continue; + } + + if (IpcSharedMemoryUtils.alive(pid0)) { + if (log.isDebugEnabled()) + log.debug("Skipping alive process: " + pid0); + + continue; + } + + if (log.isDebugEnabled()) + log.debug("Possibly stale token file: " + f0); + + U.dumpStack(log, "Free [tok=" + f0.getAbsolutePath() + ']'); + + IpcSharedMemoryUtils.freeSystemResources(f0.getAbsolutePath(), size); + + if (f0.delete()) { + if (log.isDebugEnabled()) + log.debug("Deleted file: " + f0.getName()); + + rmvCnt++; + } + else if (!f0.exists()) { + if (log.isDebugEnabled()) + log.debug("File has been concurrently deleted: " + f0.getName()); + + rmvCnt++; + } + else if (log.isDebugEnabled()) + log.debug("Failed to delete file: " + f0.getName()); + } + } + finally { + // Assuming that no new files can appear, since + if (rmvCnt == shmemToks.length) { + U.delete(f); + + if (log.isDebugEnabled()) + log.debug("Deleted empty token directory: " + f.getName()); + } + } + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cb3e4da0/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpace.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpace.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpace.java index bbd31da..249d995 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpace.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpace.java @@ -87,8 +87,6 @@ public class IpcSharedMemorySpace implements Closeable { opSize = size; - U.dumpStack(log, "Allocate [tok=" + tokFileName + ", size=" + size + ']'); - shmemPtr = IpcSharedMemoryUtils.allocateSystemResources(tokFileName, size, DEBUG && log.isDebugEnabled()); shmemId = IpcSharedMemoryUtils.sharedMemoryId(shmemPtr); @@ -299,8 +297,6 @@ public class IpcSharedMemorySpace implements Closeable { lock.writeLock().lock(); try { - U.dumpStack(log, "Free [tok=" + tokFileName + ", ptr=" + shmemPtr + ", force=" + force + ']'); - IpcSharedMemoryUtils.freeSystemResources(tokFileName, shmemPtr, force); } finally { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cb3e4da0/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java index 77c73d1..7d0abaa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java @@ -23,7 +23,6 @@ import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; import java.lang.management.*; import java.nio.*; -import java.nio.channels.*; import java.util.*; /** @@ -31,9 +30,6 @@ import java.util.*; * IpcSharedMemoryNativeLoader#load()}. */ public class IpcSharedMemoryUtils { - /** Default lock file name. */ - private static final String LOCK_FILE_NAME = "lock.file"; - /** * Allocates shared memory segment and semaphores for IPC exchange. * @@ -243,170 +239,4 @@ public class IpcSharedMemoryUtils { else throw new IllegalStateException("This OS is not supported."); } - - /** - * @param workTokDir Token directory (common for multiple nodes). - * @param tokDir Current node token directory. - * @param log Logger. - */ - public static void cleanResources(File workTokDir, File tokDir, IgniteLogger log) { - RandomAccessFile lockFile = null; - - FileLock lock = null; - - try { - lockFile = new RandomAccessFile(new File(workTokDir, LOCK_FILE_NAME), "rw"); - - lock = lockFile.getChannel().lock(); - - if (lock != null) - processTokenDirectory(workTokDir, tokDir, log); - else if (log.isDebugEnabled()) - log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); - } - catch (OverlappingFileLockException ignored) { - if (log.isDebugEnabled()) - log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); - } - catch (IOException e) { - U.error(log, "Failed to process directory: " + workTokDir.getAbsolutePath(), e); - } - finally { - U.releaseQuiet(lock); - U.closeQuiet(lockFile); - } - } - - /** - * @param workTokDir Token directory (common for multiple nodes). - * @param tokDir Current node token directory. - * @param log Logger. - */ - private static void processTokenDirectory(File workTokDir, File tokDir, IgniteLogger log) { - for (File f : workTokDir.listFiles()) { - if (!f.isDirectory()) { - if (!f.getName().equals(LOCK_FILE_NAME)) { - if (log.isDebugEnabled()) - log.debug("Unexpected file: " + f.getName()); - } - - continue; - } - - if (f.equals(tokDir)) { - if (log.isDebugEnabled()) - log.debug("Skipping own token directory: " + tokDir.getName()); - - continue; - } - - String name = f.getName(); - - int pid; - - try { - pid = Integer.parseInt(name.substring(name.lastIndexOf('-') + 1)); - } - catch (NumberFormatException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to parse file name: " + name); - - continue; - } - - // Is process alive? - if (IpcSharedMemoryUtils.alive(pid)) { - if (log.isDebugEnabled()) - log.debug("Skipping alive node: " + pid); - - continue; - } - - if (log.isDebugEnabled()) - log.debug("Possibly stale token folder: " + f); - - // Process each token under stale token folder. - File[] shmemToks = f.listFiles(); - - if (shmemToks == null) - // Although this is strange, but is reproducible sometimes on linux. - return; - - int rmvCnt = 0; - - try { - for (File f0 : shmemToks) { - if (log.isDebugEnabled()) - log.debug("Processing token file: " + f0.getName()); - - if (f0.isDirectory()) { - if (log.isDebugEnabled()) - log.debug("Unexpected directory: " + f0.getName()); - } - - // Token file format: gg-shmem-space-[auto_idx]-[other_party_pid]-[size] - String[] toks = f0.getName().split("-"); - - if (toks.length != 6) { - if (log.isDebugEnabled()) - log.debug("Unrecognized token file: " + f0.getName()); - - continue; - } - - int pid0; - int size; - - try { - pid0 = Integer.parseInt(toks[4]); - size = Integer.parseInt(toks[5]); - } - catch (NumberFormatException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to parse file name: " + name); - - continue; - } - - if (IpcSharedMemoryUtils.alive(pid0)) { - if (log.isDebugEnabled()) - log.debug("Skipping alive process: " + pid0); - - continue; - } - - if (log.isDebugEnabled()) - log.debug("Possibly stale token file: " + f0); - - U.dumpStack(log, "Free [tok=" + f0.getAbsolutePath() + ']'); - - IpcSharedMemoryUtils.freeSystemResources(f0.getAbsolutePath(), size); - - if (f0.delete()) { - if (log.isDebugEnabled()) - log.debug("Deleted file: " + f0.getName()); - - rmvCnt++; - } - else if (!f0.exists()) { - if (log.isDebugEnabled()) - log.debug("File has been concurrently deleted: " + f0.getName()); - - rmvCnt++; - } - else if (log.isDebugEnabled()) - log.debug("Failed to delete file: " + f0.getName()); - } - } - finally { - // Assuming that no new files can appear, since - if (rmvCnt == shmemToks.length) { - U.delete(f); - - if (log.isDebugEnabled()) - log.debug("Deleted empty token directory: " + f.getName()); - } - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cb3e4da0/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java index 4e8338d..2ddf6f3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java @@ -67,7 +67,7 @@ public class IpcSharedMemoryCrashDetectionSelfTest extends GridCommonAbstractTes /** * @throws Exception If failed. */ - public void _testIgfsServerClientInteractionsUponClientKilling() throws Exception { + public void testIgfsServerClientInteractionsUponClientKilling() throws Exception { U.setWorkDirectory(null, U.getIgniteHome()); // Run server endpoint. @@ -160,7 +160,7 @@ public class IpcSharedMemoryCrashDetectionSelfTest extends GridCommonAbstractTes /** * @throws Exception If failed. */ - public void _testClientThrowsCorrectExceptionUponServerKilling() throws Exception { + public void testClientThrowsCorrectExceptionUponServerKilling() throws Exception { info("Shared memory IDs before starting server-client interactions: " + IpcSharedMemoryUtils.sharedMemoryIds()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cb3e4da0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIpcSharedMemorySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIpcSharedMemorySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIpcSharedMemorySelfTestSuite.java index f07d687..cf45fad 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIpcSharedMemorySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIpcSharedMemorySelfTestSuite.java @@ -31,10 +31,10 @@ public class IgniteIpcSharedMemorySelfTestSuite extends TestSuite { public static TestSuite suite() throws Exception { TestSuite suite = new TestSuite("Ignite IPC Shared Memory Test Suite."); -// suite.addTest(new TestSuite(IpcSharedMemorySpaceSelfTest.class)); -// suite.addTest(new TestSuite(IpcSharedMemoryUtilsSelfTest.class)); + suite.addTest(new TestSuite(IpcSharedMemorySpaceSelfTest.class)); + suite.addTest(new TestSuite(IpcSharedMemoryUtilsSelfTest.class)); suite.addTest(new TestSuite(IpcSharedMemoryCrashDetectionSelfTest.class)); -// suite.addTest(new TestSuite(IpcSharedMemoryNativeLoaderSelfTest.class)); + suite.addTest(new TestSuite(IpcSharedMemoryNativeLoaderSelfTest.class)); return suite; }