Repository: incubator-ignite Updated Branches: refs/heads/ignite-45-ipc-debug f364b88f7 -> 76931497b
# IPC debug Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/76931497 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/76931497 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/76931497 Branch: refs/heads/ignite-45-ipc-debug Commit: 76931497bb0b40e7cf7f61bcf4ff72d9854a5f70 Parents: f364b88 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Mon Mar 23 19:05:41 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Mon Mar 23 19:05:41 2015 -0700 ---------------------------------------------------------------------- .../shmem/IpcSharedMemoryServerEndpoint.java | 159 +----------------- .../util/ipc/shmem/IpcSharedMemoryUtils.java | 168 +++++++++++++++++++ .../IpcSharedMemoryCrashDetectionSelfTest.java | 3 + 3 files changed, 172 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76931497/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java index 804b9fe..1aa8db1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java @@ -33,7 +33,6 @@ import org.jetbrains.annotations.*; import java.io.*; import java.net.*; -import java.nio.channels.*; import java.util.*; import java.util.concurrent.atomic.*; @@ -67,9 +66,6 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint { */ public static final String TOKEN_FILE_NAME = "gg-shmem-space-"; - /** Default lock file name. */ - private static final String LOCK_FILE_NAME = "lock.file"; - /** GC frequency. */ private static final long GC_FREQ = 10000; @@ -530,31 +526,7 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint { if (log.isDebugEnabled()) log.debug("Starting GC iteration."); - RandomAccessFile lockFile = null; - - FileLock lock = null; - - try { - lockFile = new RandomAccessFile(new File(workTokDir, LOCK_FILE_NAME), "rw"); - - lock = lockFile.getChannel().lock(); - - if (lock != null) - processTokenDirectory(workTokDir); - else if (log.isDebugEnabled()) - log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); - } - catch (OverlappingFileLockException ignored) { - if (log.isDebugEnabled()) - log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); - } - catch (IOException e) { - U.error(log, "Failed to process directory: " + workTokDir.getAbsolutePath(), e); - } - finally { - U.releaseQuiet(lock); - U.closeQuiet(lockFile); - } + IpcSharedMemoryUtils.cleanResources(workTokDir, tokDir, log); // Process spaces created by this endpoint. if (log.isDebugEnabled()) @@ -573,134 +545,5 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint { } } } - - /** @param workTokDir Token directory (common for multiple nodes). */ - private void processTokenDirectory(File workTokDir) { - for (File f : workTokDir.listFiles()) { - if (!f.isDirectory()) { - if (!f.getName().equals(LOCK_FILE_NAME)) { - if (log.isDebugEnabled()) - log.debug("Unexpected file: " + f.getName()); - } - - continue; - } - - if (f.equals(tokDir)) { - if (log.isDebugEnabled()) - log.debug("Skipping own token directory: " + tokDir.getName()); - - continue; - } - - String name = f.getName(); - - int pid; - - try { - pid = Integer.parseInt(name.substring(name.lastIndexOf('-') + 1)); - } - catch (NumberFormatException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to parse file name: " + name); - - continue; - } - - // Is process alive? - if (IpcSharedMemoryUtils.alive(pid)) { - if (log.isDebugEnabled()) - log.debug("Skipping alive node: " + pid); - - continue; - } - - if (log.isDebugEnabled()) - log.debug("Possibly stale token folder: " + f); - - // Process each token under stale token folder. - File[] shmemToks = f.listFiles(); - - if (shmemToks == null) - // Although this is strange, but is reproducible sometimes on linux. - return; - - int rmvCnt = 0; - - try { - for (File f0 : shmemToks) { - if (log.isDebugEnabled()) - log.debug("Processing token file: " + f0.getName()); - - if (f0.isDirectory()) { - if (log.isDebugEnabled()) - log.debug("Unexpected directory: " + f0.getName()); - } - - // Token file format: gg-shmem-space-[auto_idx]-[other_party_pid]-[size] - String[] toks = f0.getName().split("-"); - - if (toks.length != 6) { - if (log.isDebugEnabled()) - log.debug("Unrecognized token file: " + f0.getName()); - - continue; - } - - int pid0; - int size; - - try { - pid0 = Integer.parseInt(toks[4]); - size = Integer.parseInt(toks[5]); - } - catch (NumberFormatException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to parse file name: " + name); - - continue; - } - - if (IpcSharedMemoryUtils.alive(pid0)) { - if (log.isDebugEnabled()) - log.debug("Skipping alive process: " + pid0); - - continue; - } - - if (log.isDebugEnabled()) - log.debug("Possibly stale token file: " + f0); - - U.dumpStack(log, "Free [tok=" + f0.getAbsolutePath() + ']'); - - IpcSharedMemoryUtils.freeSystemResources(f0.getAbsolutePath(), size); - - if (f0.delete()) { - if (log.isDebugEnabled()) - log.debug("Deleted file: " + f0.getName()); - - rmvCnt++; - } - else if (!f0.exists()) { - if (log.isDebugEnabled()) - log.debug("File has been concurrently deleted: " + f0.getName()); - - rmvCnt++; - } - else if (log.isDebugEnabled()) - log.debug("Failed to delete file: " + f0.getName()); - } - } - finally { - // Assuming that no new files can appear, since - if (rmvCnt == shmemToks.length) { - U.delete(f); - - if (log.isDebugEnabled()) - log.debug("Deleted empty token directory: " + f.getName()); - } - } - } - } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76931497/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java index 7d0abaa..169fc0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; import java.lang.management.*; import java.nio.*; +import java.nio.channels.*; import java.util.*; /** @@ -30,6 +31,9 @@ import java.util.*; * IpcSharedMemoryNativeLoader#load()}. */ public class IpcSharedMemoryUtils { + /** Default lock file name. */ + private static final String LOCK_FILE_NAME = "lock.file"; + /** * Allocates shared memory segment and semaphores for IPC exchange. * @@ -239,4 +243,168 @@ public class IpcSharedMemoryUtils { else throw new IllegalStateException("This OS is not supported."); } + + /** + * @param workTokDir Work token directory. + * @param tokDir Current node token directory. + * @param log Logger. + */ + public static void cleanResources(File workTokDir, File tokDir, IgniteLogger log) { + RandomAccessFile lockFile = null; + + FileLock lock = null; + + try { + lockFile = new RandomAccessFile(new File(workTokDir, LOCK_FILE_NAME), "rw"); + + lock = lockFile.getChannel().lock(); + + if (lock != null) + processTokenDirectory(workTokDir, tokDir, log); + else if (log.isDebugEnabled()) + log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); + } + catch (OverlappingFileLockException ignored) { + if (log.isDebugEnabled()) + log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); + } + catch (IOException e) { + U.error(log, "Failed to process directory: " + workTokDir.getAbsolutePath(), e); + } + finally { + U.releaseQuiet(lock); + U.closeQuiet(lockFile); + } + } + + /** + * @param workTokDir Token directory (common for multiple nodes). + */ + private static void processTokenDirectory(File workTokDir, File tokDir, IgniteLogger log) { + for (File f : workTokDir.listFiles()) { + if (!f.isDirectory()) { + if (!f.getName().equals(LOCK_FILE_NAME)) { + if (log.isDebugEnabled()) + log.debug("Unexpected file: " + f.getName()); + } + + continue; + } + + if (f.equals(tokDir)) { + if (log.isDebugEnabled()) + log.debug("Skipping own token directory: " + tokDir.getName()); + + continue; + } + + String name = f.getName(); + + int pid; + + try { + pid = Integer.parseInt(name.substring(name.lastIndexOf('-') + 1)); + } + catch (NumberFormatException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to parse file name: " + name); + + continue; + } + + // Is process alive? + if (IpcSharedMemoryUtils.alive(pid)) { + if (log.isDebugEnabled()) + log.debug("Skipping alive node: " + pid); + + continue; + } + + if (log.isDebugEnabled()) + log.debug("Possibly stale token folder: " + f); + + // Process each token under stale token folder. + File[] shmemToks = f.listFiles(); + + if (shmemToks == null) + // Although this is strange, but is reproducible sometimes on linux. + return; + + int rmvCnt = 0; + + try { + for (File f0 : shmemToks) { + if (log.isDebugEnabled()) + log.debug("Processing token file: " + f0.getName()); + + if (f0.isDirectory()) { + if (log.isDebugEnabled()) + log.debug("Unexpected directory: " + f0.getName()); + } + + // Token file format: gg-shmem-space-[auto_idx]-[other_party_pid]-[size] + String[] toks = f0.getName().split("-"); + + if (toks.length != 6) { + if (log.isDebugEnabled()) + log.debug("Unrecognized token file: " + f0.getName()); + + continue; + } + + int pid0; + int size; + + try { + pid0 = Integer.parseInt(toks[4]); + size = Integer.parseInt(toks[5]); + } + catch (NumberFormatException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to parse file name: " + name); + + continue; + } + + if (IpcSharedMemoryUtils.alive(pid0)) { + if (log.isDebugEnabled()) + log.debug("Skipping alive process: " + pid0); + + continue; + } + + if (log.isDebugEnabled()) + log.debug("Possibly stale token file: " + f0); + + U.dumpStack(log, "Free [tok=" + f0.getAbsolutePath() + ']'); + + IpcSharedMemoryUtils.freeSystemResources(f0.getAbsolutePath(), size); + + if (f0.delete()) { + if (log.isDebugEnabled()) + log.debug("Deleted file: " + f0.getName()); + + rmvCnt++; + } + else if (!f0.exists()) { + if (log.isDebugEnabled()) + log.debug("File has been concurrently deleted: " + f0.getName()); + + rmvCnt++; + } + else if (log.isDebugEnabled()) + log.debug("Failed to delete file: " + f0.getName()); + } + } + finally { + // Assuming that no new files can appear, since + if (rmvCnt == shmemToks.length) { + U.delete(f); + + if (log.isDebugEnabled()) + log.debug("Deleted empty token directory: " + f.getName()); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76931497/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java index 5cdb048..0675866 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java @@ -86,6 +86,9 @@ public class IpcSharedMemoryCrashDetectionSelfTest extends GridCommonAbstractTes } finally { srv.close(); + + IpcSharedMemoryUtils.cleanResources(U.resolveWorkDirectory( + srv.getTokenDirectoryPath(), false).getParentFile(), null, log); } }