http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryClientEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryClientEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryClientEndpoint.java deleted file mode 100644 index 93590d5..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryClientEndpoint.java +++ /dev/null @@ -1,336 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.ipc.*; - -import java.io.*; -import java.net.*; - -/** - * IPC endpoint based on shared memory space. - */ -public class GridIpcSharedMemoryClientEndpoint implements GridIpcEndpoint { - /** In space. */ - private final GridIpcSharedMemorySpace inSpace; - - /** Out space. */ - private final GridIpcSharedMemorySpace outSpace; - - /** In space. */ - private final GridIpcSharedMemoryInputStream in; - - /** Out space. */ - private final GridIpcSharedMemoryOutputStream out; - - /** */ - private boolean checkIn = true; - - /** */ - private boolean checkOut = true; - - /** */ - private final Thread checker; - - /** */ - private final IgniteLogger log; - - /** - * Creates connected client IPC endpoint. - * - * @param inSpace In space. - * @param outSpace Out space. - * @param parent Parent logger. - */ - public GridIpcSharedMemoryClientEndpoint(GridIpcSharedMemorySpace inSpace, GridIpcSharedMemorySpace outSpace, - IgniteLogger parent) { - assert inSpace != null; - assert outSpace != null; - - log = parent.getLogger(GridIpcSharedMemoryClientEndpoint.class); - - this.inSpace = inSpace; - this.outSpace = outSpace; - - in = new GridIpcSharedMemoryInputStream(inSpace); - out = new GridIpcSharedMemoryOutputStream(outSpace); - - checker = null; - } - - /** - * Creates and connects client IPC endpoint and starts background checker thread to avoid deadlocks on other party - * crash. Waits until port became available. - * - * @param port Port server endpoint bound to. - * @param parent Parent logger. - * @throws IgniteCheckedException If connection fails. - */ - public GridIpcSharedMemoryClientEndpoint(int port, IgniteLogger parent) throws IgniteCheckedException { - this(port, 0, parent); - } - - /** - * Creates and connects client IPC endpoint and starts background checker thread to avoid deadlocks on other party - * crash. - * - * @param port Port server endpoint bound to. - * @param timeout Connection timeout. - * @param parent Parent logger. - * @throws IgniteCheckedException If connection fails. - */ - @SuppressWarnings({"CallToThreadStartDuringObjectConstruction", "ErrorNotRethrown"}) - public GridIpcSharedMemoryClientEndpoint(int port, int timeout, IgniteLogger parent) throws IgniteCheckedException { - assert port > 0; - assert port < 0xffff; - - log = parent.getLogger(GridIpcSharedMemoryClientEndpoint.class); - - GridIpcSharedMemorySpace inSpace = null; - GridIpcSharedMemorySpace outSpace = null; - - Socket sock = new Socket(); - - Exception err = null; - boolean clear = true; - - try { - GridIpcSharedMemoryNativeLoader.load(); - - sock.connect(new InetSocketAddress("127.0.0.1", port), timeout); - - // Send request. - ObjectOutputStream out = new ObjectOutputStream(sock.getOutputStream()); - - int pid = IpcSharedMemoryUtils.pid(); - - out.writeObject(new GridIpcSharedMemoryInitRequest(pid)); - - ObjectInputStream in = new ObjectInputStream(sock.getInputStream()); - - GridIpcSharedMemoryInitResponse res = (GridIpcSharedMemoryInitResponse)in.readObject(); - - err = res.error(); - - if (err == null) { - String inTokFileName = res.inTokenFileName(); - - assert inTokFileName != null; - - inSpace = new GridIpcSharedMemorySpace(inTokFileName, res.pid(), pid, res.size(), true, - res.inSharedMemoryId(), log); - - String outTokFileName = res.outTokenFileName(); - - assert outTokFileName != null; - - outSpace = new GridIpcSharedMemorySpace(outTokFileName, pid, res.pid(), res.size(), false, - res.outSharedMemoryId(), log); - - // This is success ACK. - out.writeBoolean(true); - - out.flush(); - - clear = false; - } - } - catch (UnsatisfiedLinkError e) { - throw IpcSharedMemoryUtils.linkError(e); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to connect shared memory endpoint to port " + - "(is shared memory server endpoint up and running?): " + port, e); - } - catch (ClassNotFoundException | ClassCastException e) { - throw new IgniteCheckedException(e); - } - finally { - U.closeQuiet(sock); - - if (clear) { - if (inSpace != null) - inSpace.forceClose(); - - if (outSpace != null) - outSpace.forceClose(); - } - } - - if (err != null) // Error response. - throw new IgniteCheckedException(err); - - this.inSpace = inSpace; - this.outSpace = outSpace; - - in = new GridIpcSharedMemoryInputStream(inSpace); - out = new GridIpcSharedMemoryOutputStream(outSpace); - - checker = new Thread(new AliveChecker()); - - // Required for Hadoop 2.x - checker.setDaemon(true); - - checker.start(); - } - - /** {@inheritDoc} */ - @Override public InputStream inputStream() { - return in; - } - - /** {@inheritDoc} */ - @Override public OutputStream outputStream() { - return out; - } - - /** {@inheritDoc} */ - @Override public void close() { - U.closeQuiet(in); - U.closeQuiet(out); - - stopChecker(); - } - - /** - * Forcibly closes spaces and frees all system resources. <p> This method should be called with caution as it may - * result to the other-party process crash. It is intended to call when there was an IO error during handshake and - * other party has not yet attached to the space. - */ - public void forceClose() { - in.forceClose(); - out.forceClose(); - - stopChecker(); - } - - /** - * - */ - private void stopChecker() { - if (checker != null) { - checker.interrupt(); - - try { - checker.join(); - } - catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } - } - } - - /** @return {@code True} if other party is alive and new invocation of this method needed. */ - boolean checkOtherPartyAlive() { - if (checkIn) { - File tokFile = new File(inSpace.tokenFileName()); - - if (!tokFile.exists()) - checkIn = false; - } - - if (checkOut) { - File tokFile = new File(outSpace.tokenFileName()); - - if (!tokFile.exists()) - checkOut = false; - } - - if (!checkIn && !checkOut) - return false; - - if (!IpcSharedMemoryUtils.alive(inSpace.otherPartyPid())) { - U.warn(log, "Remote process is considered to be dead (shared memory space will be forcibly closed): " + - inSpace.otherPartyPid()); - - closeSpace(inSpace); - closeSpace(outSpace); - - return false; - } - - // Need to call this method again after timeout. - return true; - } - - /** - * This method is intended for test purposes only. - * - * @return In space. - */ - GridIpcSharedMemorySpace inSpace() { - return inSpace; - } - - /** - * This method is intended for test purposes only. - * - * @return Out space. - */ - GridIpcSharedMemorySpace outSpace() { - return outSpace; - } - - /** @param space Space to close. */ - private void closeSpace(GridIpcSharedMemorySpace space) { - assert space != null; - - space.forceClose(); - - File tokFile = new File(space.tokenFileName()); - - // Space is not usable at this point and all local threads - // are guaranteed to leave its methods (other party is not alive). - // So, we can cleanup resources without additional synchronization. - IpcSharedMemoryUtils.freeSystemResources(tokFile.getAbsolutePath(), space.size()); - - tokFile.delete(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridIpcSharedMemoryClientEndpoint.class, this); - } - - /** - * - */ - private class AliveChecker implements Runnable { - /** Check frequency. */ - private static final long CHECK_FREQ = 10000; - - /** {@inheritDoc} */ - @SuppressWarnings("BusyWait") - @Override public void run() { - while (!Thread.currentThread().isInterrupted()) { - try { - Thread.sleep(CHECK_FREQ); - } - catch (InterruptedException ignored) { - return; - } - - if (!checkOtherPartyAlive()) - // No need to check any more. - return; - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryInitRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryInitRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryInitRequest.java deleted file mode 100644 index 0e630a7..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryInitRequest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem; - -import java.io.*; - -/** - * - */ -public class GridIpcSharedMemoryInitRequest implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private int pid; - - /** - * @param pid PID of the {@code client} party. - */ - public GridIpcSharedMemoryInitRequest(int pid) { - this.pid = pid; - } - - /** - * Required by {@code Externalizable}. - */ - public GridIpcSharedMemoryInitRequest() { - // No-op. - } - - /** - * @return Sender PID. - */ - public int pid() { - return pid; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(pid); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - pid = in.readInt(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "GridIpcSharedMemoryInitRequest [pid=" + pid + ']'; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryInitResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryInitResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryInitResponse.java deleted file mode 100644 index 59447da..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryInitResponse.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * - */ -public class GridIpcSharedMemoryInitResponse implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private String inTokFileName; - - /** */ - private int inSharedMemId; - - /** */ - private String outTokFileName; - - /** */ - private int outSharedMemId; - - /** */ - private int pid; - - /** */ - private int size; - - /** */ - private Exception err; - - /** - * Constructs a successful response. - * - * @param inTokFileName In token. - * @param inSharedMemId In shared memory ID. - * @param outTokFileName Out token. - * @param outSharedMemId Out shared memory ID. - * @param pid PID of the {@code server} party. - * @param size Size. - */ - public GridIpcSharedMemoryInitResponse(String inTokFileName, int inSharedMemId, String outTokFileName, - int outSharedMemId, int pid, int size) { - this.inTokFileName = inTokFileName; - this.inSharedMemId = inSharedMemId; - this.outTokFileName = outTokFileName; - this.outSharedMemId = outSharedMemId; - this.pid = pid; - this.size = size; - } - - /** - * Constructs an error response. - * - * @param err Error cause. - */ - public GridIpcSharedMemoryInitResponse(Exception err) { - this.err = err; - } - - /** - * Required by {@code Externalizable}. - */ - public GridIpcSharedMemoryInitResponse() { - // No-op. - } - - /** - * @return In token file name or {@code null}, if this is an error response. - */ - @Nullable public String inTokenFileName() { - return inTokFileName; - } - - /** - * @return In shared memory ID. - */ - public int inSharedMemoryId() { - return inSharedMemId; - } - - /** - * @return Out token file name or {@code null}, if this is an error response. - */ - @Nullable public String outTokenFileName() { - return outTokFileName; - } - - /** - * @return Out shared memory ID. - */ - public int outSharedMemoryId() { - return outSharedMemId; - } - - /** - * @return Sender PID. - */ - public int pid() { - return pid; - } - - /** - * @return Space size. - */ - public int size() { - return size; - } - - /** - * @return Error message or {@code null}, if this is - * a successful response. - */ - @Nullable public Exception error() { - return err; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, inTokFileName); - out.writeInt(inSharedMemId); - U.writeString(out, outTokFileName); - out.writeInt(outSharedMemId); - out.writeObject(err); - out.writeInt(pid); - out.writeInt(size); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - inTokFileName = U.readString(in); - inSharedMemId = in.readInt(); - outTokFileName = U.readString(in); - outSharedMemId = in.readInt(); - err = (Exception)in.readObject(); - pid = in.readInt(); - size = in.readInt(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "GridIpcSharedMemoryInitResponse [err=" + err + - ", inTokFileName=" + inTokFileName + - ", inSharedMemId=" + inSharedMemId + - ", outTokFileName=" + outTokFileName + - ", outSharedMemId=" + outSharedMemId + - ", pid=" + pid + - ", size=" + size + ']'; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryInputStream.java deleted file mode 100644 index 3c6be93..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryInputStream.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * - */ -public class GridIpcSharedMemoryInputStream extends InputStream { - /** */ - private final GridIpcSharedMemorySpace in; - - /** Stream instance is not thread-safe so we can cache buffer. */ - private byte[] buf = new byte[1]; - - /** - * @param in Space. - */ - public GridIpcSharedMemoryInputStream(GridIpcSharedMemorySpace in) { - assert in != null; - - this.in = in; - } - - /** {@inheritDoc} */ - @Override public int read() throws IOException { - try { - int read = in.read(buf, 0, 1, 0); - - if (read < 0) - return read; - - return buf[0] & 0xFF; - } - catch (IgniteCheckedException e) { - throw new IOException(e); - } - } - - /** {@inheritDoc} */ - @Override public int read(byte[] b, int off, int len) throws IOException { - try { - return in.read(b, off, len, 0); - } - catch (IgniteCheckedException e) { - throw new IOException(e); - } - } - - /** {@inheritDoc} */ - @Override public int available() throws IOException { - try { - return in.unreadCount(); - } - catch (IgniteCheckedException e) { - throw new IOException(e); - } - } - - /** {@inheritDoc} */ - @Override public void close() throws IOException { - in.close(); - } - - /** - * Forcibly closes spaces and frees all system resources. - * <p> - * This method should be called with caution as it may result to the other-party - * process crash. It is intended to call when there was an IO error during handshake - * and other party has not yet attached to the space. - */ - public void forceClose() { - in.forceClose(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridIpcSharedMemoryInputStream.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java deleted file mode 100644 index 4ad95d4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.net.*; -import java.nio.channels.*; -import java.security.*; -import java.util.*; - -/** - * Shared memory native loader. - */ -@SuppressWarnings("ErrorNotRethrown") -public class GridIpcSharedMemoryNativeLoader { - /** Loaded flag. */ - private static volatile boolean loaded; - - /** Library name base. */ - private static final String LIB_NAME_BASE = "igniteshmem"; - - /** Lock file path. */ - private static final File LOCK_FILE = new File(System.getProperty("java.io.tmpdir"), "igniteshmem.lock"); - - /** Library name. */ - static final String LIB_NAME = LIB_NAME_BASE + "-" + GridProductImpl.VER; - - /** - * @return Operating system name to resolve path to library. - */ - private static String os() { - String name = System.getProperty("os.name").toLowerCase().trim(); - - if (name.startsWith("win")) - throw new IllegalStateException("IPC shared memory native loader should not be called on windows."); - - if (name.startsWith("linux")) - return "linux"; - - if (name.startsWith("mac os x")) - return "osx"; - - return name.replaceAll("\\W+", "_"); - } - - /** - * @return Platform. - */ - private static String platform() { - return os() + bitModel(); - } - - /** - * @return Bit model. - */ - private static int bitModel() { - String prop = System.getProperty("sun.arch.data.model"); - - if (prop == null) - prop = System.getProperty("com.ibm.vm.bitmode"); - - if (prop != null) - return Integer.parseInt(prop); - - // We don't know. - return -1; - } - - /** - * @throws IgniteCheckedException If failed. - */ - public static void load() throws IgniteCheckedException { - if (loaded) - return; - - synchronized (GridIpcSharedMemoryNativeLoader.class) { - if (loaded) - return; - - doLoad(); - - loaded = true; - } - } - - /** - * @throws IgniteCheckedException If failed. - */ - private static void doLoad() throws IgniteCheckedException { - assert Thread.holdsLock(GridIpcSharedMemoryNativeLoader.class); - - Collection<Throwable> errs = new ArrayList<>(); - - try { - // Load native library (the library directory should be in java.library.path). - System.loadLibrary(LIB_NAME); - - return; - } - catch (UnsatisfiedLinkError e) { - errs.add(e); - } - - // Obtain lock on file to prevent concurrent extracts. - try (RandomAccessFile randomAccessFile = new RandomAccessFile(LOCK_FILE, "rws"); - FileLock ignored = randomAccessFile.getChannel().lock()) { - if (extractAndLoad(errs, platformSpecificResourcePath())) - return; - - if (extractAndLoad(errs, osSpecificResourcePath())) - return; - - if (extractAndLoad(errs, resourcePath())) - return; - - // Failed to find the library. - assert !errs.isEmpty(); - - throw new IgniteCheckedException("Failed to load native IPC library: " + errs); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to obtain file lock: " + LOCK_FILE, e); - } - } - - /** - * @return OS resource path. - */ - private static String osSpecificResourcePath() { - return "META-INF/native/" + os() + "/" + mapLibraryName(LIB_NAME_BASE); - } - - /** - * @return Platform resource path. - */ - private static String platformSpecificResourcePath() { - return "META-INF/native/" + platform() + "/" + mapLibraryName(LIB_NAME_BASE); - } - - /** - * @return Resource path. - */ - private static String resourcePath() { - return "META-INF/native/" + mapLibraryName(LIB_NAME_BASE); - } - - /** - * @return Maps library name to file name. - */ - private static String mapLibraryName(String name) { - String libName = System.mapLibraryName(name); - - if (U.isMacOs() && libName.endsWith(".jnilib")) - return libName.substring(0, libName.length() - "jnilib".length()) + "dylib"; - - return libName; - } - - /** - * @param errs Errors collection. - * @param rsrcPath Path. - * @return {@code True} if library was found and loaded. - */ - private static boolean extractAndLoad(Collection<Throwable> errs, String rsrcPath) { - ClassLoader clsLdr = U.detectClassLoader(GridIpcSharedMemoryNativeLoader.class); - - URL rsrc = clsLdr.getResource(rsrcPath); - - if (rsrc != null) - return extract(errs, rsrc, new File(System.getProperty("java.io.tmpdir"), mapLibraryName(LIB_NAME))); - else { - errs.add(new IllegalStateException("Failed to find resource with specified class loader " + - "[rsrc=" + rsrcPath + ", clsLdr=" + clsLdr + ']')); - - return false; - } - } - - /** - * @param errs Errors collection. - * @param src Source. - * @param target Target. - * @return {@code True} if resource was found and loaded. - */ - @SuppressWarnings("ResultOfMethodCallIgnored") - private static boolean extract(Collection<Throwable> errs, URL src, File target) { - FileOutputStream os = null; - InputStream is = null; - - try { - if (!target.exists() || !haveEqualMD5(target, src)) { - is = src.openStream(); - - if (is != null) { - os = new FileOutputStream(target); - - int read; - - byte[] buf = new byte[4096]; - - while ((read = is.read(buf)) != -1) - os.write(buf, 0, read); - } - } - - // chmod 775. - if (!U.isWindows()) - Runtime.getRuntime().exec(new String[] {"chmod", "775", target.getCanonicalPath()}).waitFor(); - - System.load(target.getPath()); - - return true; - } - catch (IOException | UnsatisfiedLinkError | InterruptedException | NoSuchAlgorithmException e) { - errs.add(e); - } - finally { - U.closeQuiet(os); - U.closeQuiet(is); - } - - return false; - } - - /** - * @param target - * @param src - * @return - * @throws NoSuchAlgorithmException - * @throws IOException - */ - private static boolean haveEqualMD5(File target, URL src) throws NoSuchAlgorithmException, IOException { - try (InputStream targetIS = new FileInputStream(target); - InputStream srcIS = src.openStream()) { - - String targetMD5 = U.calculateMD5(targetIS); - String srcMD5 = U.calculateMD5(srcIS); - - return targetMD5.equals(srcMD5); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryOperationTimedoutException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryOperationTimedoutException.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryOperationTimedoutException.java deleted file mode 100644 index 093f3d0..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryOperationTimedoutException.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem; - -import org.apache.ignite.*; -import org.jetbrains.annotations.*; - -/** - * Thrown when IPC operation (such as {@link GridIpcSharedMemorySpace#wait(long)}) - * has timed out. - */ -public class GridIpcSharedMemoryOperationTimedoutException extends IgniteCheckedException { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Creates new exception with given error message. - * - * @param msg Error message. - */ - public GridIpcSharedMemoryOperationTimedoutException(String msg) { - super(msg); - } - - /** - * Creates new exception with given throwable as a cause and - * source of error message. - * - * @param cause Non-null throwable cause. - */ - public GridIpcSharedMemoryOperationTimedoutException(Throwable cause) { - super(cause); - } - - /** - * Creates new exception with given error message and optional nested exception. - * - * @param msg Error message. - * @param cause Optional nested exception (can be {@code null}). - */ - public GridIpcSharedMemoryOperationTimedoutException(String msg, @Nullable Throwable cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryOutputStream.java deleted file mode 100644 index 8aa042c..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryOutputStream.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * - */ -public class GridIpcSharedMemoryOutputStream extends OutputStream { - /** */ - private final GridIpcSharedMemorySpace out; - - /** - * @param out Space. - */ - public GridIpcSharedMemoryOutputStream(GridIpcSharedMemorySpace out) { - assert out != null; - - this.out = out; - } - - /** {@inheritDoc} */ - @Override public void write(int b) throws IOException { - byte[] buf = new byte[1]; - - buf[0] = (byte)b; - - write(buf, 0, 1); - } - - /** {@inheritDoc} */ - @Override public void write(byte[] b, int off, int len) throws IOException { - try { - out.write(b, off, len, 0); - } - catch (IgniteCheckedException e) { - throw new IOException(e); - } - } - - /** {@inheritDoc} */ - @Override public void close() throws IOException { - out.close(); - } - - /** - * Forcibly closes spaces and frees all system resources. - * <p> - * This method should be called with caution as it may result to the other-party - * process crash. It is intended to call when there was an IO error during handshake - * and other party has not yet attached to the space. - */ - public void forceClose() { - out.forceClose(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridIpcSharedMemoryOutputStream.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryServerEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryServerEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryServerEndpoint.java deleted file mode 100644 index c8d4f63..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryServerEndpoint.java +++ /dev/null @@ -1,707 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.thread.*; -import org.apache.ignite.internal.processors.resource.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.ipc.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.worker.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.nio.channels.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Server shared memory IPC endpoint. - */ -public class GridIpcSharedMemoryServerEndpoint implements GridIpcServerEndpoint { - /** Troubleshooting public wiki page. */ - public static final String TROUBLESHOOTING_URL = "http://bit.ly/GridGain-Troubleshooting"; - - /** IPC error message. */ - public static final String OUT_OF_RESOURCES_MSG = "Failed to allocate shared memory segment " + - "(for troubleshooting see " + TROUBLESHOOTING_URL + ')'; - - /** Default endpoint port number. */ - public static final int DFLT_IPC_PORT = 10500; - - /** Default shared memory space in bytes. */ - public static final int DFLT_SPACE_SIZE = 256 * 1024; - - /** - * Default token directory. Note that this path is relative to {@code GRIDGAIN_HOME/work} folder - * if {@code GRIDGAIN_HOME} system or environment variable specified, otherwise it is relative to - * {@code work} folder under system {@code java.io.tmpdir} folder. - * - * @see org.apache.ignite.configuration.IgniteConfiguration#getWorkDirectory() - */ - public static final String DFLT_TOKEN_DIR_PATH = "ipc/shmem"; - - /** - * Shared memory token file name prefix. - * - * Token files are created and stored in the following manner: [tokDirPath]/[nodeId]-[current - * PID]/gg-shmem-space-[auto_idx]-[other_party_pid]-[size] - */ - public static final String TOKEN_FILE_NAME = "gg-shmem-space-"; - - /** Default lock file name. */ - private static final String LOCK_FILE_NAME = "lock.file"; - - /** GC frequency. */ - private static final long GC_FREQ = 10000; - - /** ID generator. */ - private static final AtomicLong tokIdxGen = new AtomicLong(); - - /** Port to bind socket to. */ - private int port = DFLT_IPC_PORT; - - /** Prefix. */ - private String tokDirPath = DFLT_TOKEN_DIR_PATH; - - /** Space size. */ - private int size = DFLT_SPACE_SIZE; - - /** Server socket. */ - @GridToStringExclude - private ServerSocket srvSock; - - /** Token directory. */ - private File tokDir; - - /** Logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** Local node ID. */ - private UUID locNodeId; - - /** Grid name. */ - private String gridName; - - /** Flag allowing not to print out of resources warning. */ - private boolean omitOutOfResourcesWarn; - - /** GC worker. */ - private GridWorker gcWorker; - - /** Pid of the current process. */ - private int pid; - - /** Closed flag. */ - private volatile boolean closed; - - /** Spaces opened on with this endpoint. */ - private final Collection<GridIpcSharedMemoryClientEndpoint> endpoints = - new GridConcurrentHashSet<>(); - - /** Use this constructor when dependencies could be injected with {@link GridResourceProcessor#injectGeneric(Object)}. */ - public GridIpcSharedMemoryServerEndpoint() { - // No-op. - } - - /** - * Constructor to set dependencies explicitly. - * - * @param log Log. - * @param locNodeId Node id. - * @param gridName Grid name. - */ - public GridIpcSharedMemoryServerEndpoint(IgniteLogger log, UUID locNodeId, String gridName) { - this.log = log; - this.locNodeId = locNodeId; - this.gridName = gridName; - } - - /** @param omitOutOfResourcesWarn If {@code true}, out of resources warning will not be printed by server. */ - public void omitOutOfResourcesWarning(boolean omitOutOfResourcesWarn) { - this.omitOutOfResourcesWarn = omitOutOfResourcesWarn; - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - GridIpcSharedMemoryNativeLoader.load(); - - pid = IpcSharedMemoryUtils.pid(); - - if (pid == -1) - throw new GridIpcEndpointBindException("Failed to get PID of the current process."); - - if (size <= 0) - throw new GridIpcEndpointBindException("Space size should be positive: " + size); - - String tokDirPath = this.tokDirPath; - - if (F.isEmpty(tokDirPath)) - throw new GridIpcEndpointBindException("Token directory path is empty."); - - tokDirPath = tokDirPath + '/' + locNodeId.toString() + '-' + IpcSharedMemoryUtils.pid(); - - tokDir = U.resolveWorkDirectory(tokDirPath, false); - - if (port <= 0 || port >= 0xffff) - throw new GridIpcEndpointBindException("Port value is illegal: " + port); - - try { - srvSock = new ServerSocket(); - - // Always bind to loopback. - srvSock.bind(new InetSocketAddress("127.0.0.1", port)); - } - catch (IOException e) { - // Although empty socket constructor never throws exception, close it just in case. - U.closeQuiet(srvSock); - - throw new GridIpcEndpointBindException("Failed to bind shared memory IPC endpoint (is port already " + - "in use?): " + port, e); - } - - gcWorker = new GcWorker(gridName, "ipc-shmem-gc", log); - - new IgniteThread(gcWorker).start(); - - if (log.isInfoEnabled()) - log.info("IPC shared memory server endpoint started [port=" + port + - ", tokDir=" + tokDir.getAbsolutePath() + ']'); - } - - /** {@inheritDoc} */ - @SuppressWarnings("ErrorNotRethrown") - @Override public GridIpcEndpoint accept() throws IgniteCheckedException { - while (!Thread.currentThread().isInterrupted()) { - Socket sock = null; - - boolean accepted = false; - - try { - sock = srvSock.accept(); - - accepted = true; - - InputStream inputStream = sock.getInputStream(); - ObjectInputStream in = new ObjectInputStream(inputStream); - - ObjectOutputStream out = new ObjectOutputStream(sock.getOutputStream()); - - GridIpcSharedMemorySpace inSpace = null; - - GridIpcSharedMemorySpace outSpace = null; - - boolean err = true; - - try { - GridIpcSharedMemoryInitRequest req = (GridIpcSharedMemoryInitRequest)in.readObject(); - - if (log.isDebugEnabled()) - log.debug("Processing request: " + req); - - IgnitePair<String> p = inOutToken(req.pid(), size); - - String file1 = p.get1(); - String file2 = p.get2(); - - assert file1 != null; - assert file2 != null; - - // Create tokens. - new File(file1).createNewFile(); - new File(file2).createNewFile(); - - if (log.isDebugEnabled()) - log.debug("Created token files: " + p); - - inSpace = new GridIpcSharedMemorySpace( - file1, - req.pid(), - pid, - size, - true, - log); - - outSpace = new GridIpcSharedMemorySpace( - file2, - pid, - req.pid(), - size, - false, - log); - - GridIpcSharedMemoryClientEndpoint ret = new GridIpcSharedMemoryClientEndpoint(inSpace, outSpace, - log); - - out.writeObject(new GridIpcSharedMemoryInitResponse(file2, outSpace.sharedMemoryId(), - file1, inSpace.sharedMemoryId(), pid, size)); - - err = !in.readBoolean(); - - endpoints.add(ret); - - return ret; - } - catch (UnsatisfiedLinkError e) { - throw IpcSharedMemoryUtils.linkError(e); - } - catch (IOException e) { - if (log.isDebugEnabled()) - log.debug("Failed to process incoming connection " + - "(was connection closed by another party):" + e.getMessage()); - } - catch (ClassNotFoundException e) { - U.error(log, "Failed to process incoming connection.", e); - } - catch (ClassCastException e) { - String msg = "Failed to process incoming connection (most probably, shared memory " + - "rest endpoint has been configured by mistake)."; - - LT.warn(log, null, msg); - - sendErrorResponse(out, e); - } - catch (GridIpcOutOfSystemResourcesException e) { - if (!omitOutOfResourcesWarn) - LT.warn(log, null, OUT_OF_RESOURCES_MSG); - - sendErrorResponse(out, e); - } - catch (IgniteCheckedException e) { - LT.error(log, e, "Failed to process incoming shared memory connection."); - - sendErrorResponse(out, e); - } - finally { - // Exception has been thrown, need to free system resources. - if (err) { - if (inSpace != null) - inSpace.forceClose(); - - // Safety. - if (outSpace != null) - outSpace.forceClose(); - } - } - } - catch (IOException e) { - if (!Thread.currentThread().isInterrupted() && !accepted) - throw new IgniteCheckedException("Failed to accept incoming connection.", e); - - if (!closed) - LT.error(log, null, "Failed to process incoming shared memory connection: " + e.getMessage()); - } - finally { - U.closeQuiet(sock); - } - } // while - - throw new IgniteInterruptedException("Socket accept was interrupted."); - } - - /** - * Injects resources. - * - * @param ignite Ignite - */ - @IgniteInstanceResource - private void injectResources(Ignite ignite){ - if (ignite != null) { - // Inject resources. - gridName = ignite.name(); - locNodeId = ignite.configuration().getNodeId(); - } - else { - // Cleanup resources. - gridName = null; - locNodeId = null; - } - } - - /** - * @param out Output stream. - * @param err Error cause. - */ - private void sendErrorResponse(ObjectOutput out, Exception err) { - try { - out.writeObject(new GridIpcSharedMemoryInitResponse(err)); - } - catch (IOException e) { - U.error(log, "Failed to send error response to client.", e); - } - } - - /** - * @param pid PID of the other party. - * @param size Size of the space. - * @return Token pair. - */ - private IgnitePair<String> inOutToken(int pid, int size) { - while (true) { - long idx = tokIdxGen.get(); - - if (tokIdxGen.compareAndSet(idx, idx + 2)) - return F.pair(new File(tokDir, TOKEN_FILE_NAME + idx + "-" + pid + "-" + size).getAbsolutePath(), - new File(tokDir, TOKEN_FILE_NAME + (idx + 1) + "-" + pid + "-" + size).getAbsolutePath()); - } - } - - /** {@inheritDoc} */ - @Override public int getPort() { - return port; - } - - /** {@inheritDoc} */ - @Nullable @Override public String getHost() { - return null; - } - - /** - * {@inheritDoc} - * - * @return {@code false} as shared memory endpoints can not be used for management. - */ - @Override public boolean isManagement() { - return false; - } - - /** - * Sets port endpoint will be bound to. - * - * @param port Port number. - */ - public void setPort(int port) { - this.port = port; - } - - /** - * Gets token directory path. - * - * @return Token directory path. - */ - public String getTokenDirectoryPath() { - return tokDirPath; - } - - /** - * Sets token directory path. - * - * @param tokDirPath Token directory path. - */ - public void setTokenDirectoryPath(String tokDirPath) { - this.tokDirPath = tokDirPath; - } - - /** - * Gets size of shared memory spaces that are created by the endpoint. - * - * @return Size of shared memory space. - */ - public int getSize() { - return size; - } - - /** - * Sets size of shared memory spaces that are created by the endpoint. - * - * @param size Size of shared memory space. - */ - public void setSize(int size) { - this.size = size; - } - - /** {@inheritDoc} */ - @Override public void close() { - closed = true; - - U.closeQuiet(srvSock); - - if (gcWorker != null) { - U.cancel(gcWorker); - - // This method may be called from already interrupted thread. - // Need to ensure cleaning on close. - boolean interrupted = Thread.interrupted(); - - try { - U.join(gcWorker); - } - catch (IgniteInterruptedException e) { - U.warn(log, "Interrupted when stopping GC worker.", e); - } - finally { - if (interrupted) - Thread.currentThread().interrupt(); - } - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridIpcSharedMemoryServerEndpoint.class, this); - } - - /** - * Sets configuration properties from the map. - * - * @param endpointCfg Map of properties. - * @throws IgniteCheckedException If invalid property name or value. - */ - public void setupConfiguration(Map<String, String> endpointCfg) throws IgniteCheckedException { - for (Map.Entry<String,String> e : endpointCfg.entrySet()) { - try { - switch (e.getKey()) { - case "type": - case "host": - case "management": - //Ignore these properties - break; - - case "port": - setPort(Integer.parseInt(e.getValue())); - break; - - case "size": - setSize(Integer.parseInt(e.getValue())); - break; - - case "tokenDirectoryPath": - setTokenDirectoryPath(e.getValue()); - break; - - default: - throw new IgniteCheckedException("Invalid property '" + e.getKey() + "' of " + getClass().getSimpleName()); - } - } - catch (Throwable t) { - if (t instanceof IgniteCheckedException) - throw t; - - throw new IgniteCheckedException("Invalid value '" + e.getValue() + "' of the property '" + e.getKey() + "' in " + - getClass().getSimpleName(), t); - } - } - } - - /** - * - */ - private class GcWorker extends GridWorker { - /** - * @param gridName Grid name. - * @param name Name. - * @param log Log. - */ - protected GcWorker(@Nullable String gridName, String name, IgniteLogger log) { - super(gridName, name, log); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedException { - if (log.isDebugEnabled()) - log.debug("GC worker started."); - - File workTokDir = tokDir.getParentFile(); - - assert workTokDir != null; - - while (!isCancelled()) { - U.sleep(GC_FREQ); - - if (log.isDebugEnabled()) - log.debug("Starting GC iteration."); - - RandomAccessFile lockFile = null; - - FileLock lock = null; - - try { - lockFile = new RandomAccessFile(new File(workTokDir, LOCK_FILE_NAME), "rw"); - - lock = lockFile.getChannel().lock(); - - if (lock != null) - processTokenDirectory(workTokDir); - else if (log.isDebugEnabled()) - log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); - } - catch (OverlappingFileLockException ignored) { - if (log.isDebugEnabled()) - log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); - } - catch (IOException e) { - U.error(log, "Failed to process directory: " + workTokDir.getAbsolutePath(), e); - } - finally { - U.releaseQuiet(lock); - U.closeQuiet(lockFile); - } - - // Process spaces created by this endpoint. - if (log.isDebugEnabled()) - log.debug("Processing local spaces."); - - for (GridIpcSharedMemoryClientEndpoint e : endpoints) { - if (log.isDebugEnabled()) - log.debug("Processing endpoint: " + e); - - if (!e.checkOtherPartyAlive()) { - endpoints.remove(e); - - if (log.isDebugEnabled()) - log.debug("Removed endpoint: " + e); - } - } - } - } - - /** @param workTokDir Token directory (common for multiple nodes). */ - private void processTokenDirectory(File workTokDir) { - for (File f : workTokDir.listFiles()) { - if (!f.isDirectory()) { - if (!f.getName().equals(LOCK_FILE_NAME)) { - if (log.isDebugEnabled()) - log.debug("Unexpected file: " + f.getName()); - } - - continue; - } - - if (f.equals(tokDir)) { - if (log.isDebugEnabled()) - log.debug("Skipping own token directory: " + tokDir.getName()); - - continue; - } - - String name = f.getName(); - - int pid; - - try { - pid = Integer.parseInt(name.substring(name.lastIndexOf('-') + 1)); - } - catch (NumberFormatException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to parse file name: " + name); - - continue; - } - - // Is process alive? - if (IpcSharedMemoryUtils.alive(pid)) { - if (log.isDebugEnabled()) - log.debug("Skipping alive node: " + pid); - - continue; - } - - if (log.isDebugEnabled()) - log.debug("Possibly stale token folder: " + f); - - // Process each token under stale token folder. - File[] shmemToks = f.listFiles(); - - if (shmemToks == null) - // Although this is strange, but is reproducible sometimes on linux. - return; - - int rmvCnt = 0; - - try { - for (File f0 : shmemToks) { - if (log.isDebugEnabled()) - log.debug("Processing token file: " + f0.getName()); - - if (f0.isDirectory()) { - if (log.isDebugEnabled()) - log.debug("Unexpected directory: " + f0.getName()); - } - - // Token file format: gg-shmem-space-[auto_idx]-[other_party_pid]-[size] - String[] toks = f0.getName().split("-"); - - if (toks.length != 6) { - if (log.isDebugEnabled()) - log.debug("Unrecognized token file: " + f0.getName()); - - continue; - } - - int pid0; - int size; - - try { - pid0 = Integer.parseInt(toks[4]); - size = Integer.parseInt(toks[5]); - } - catch (NumberFormatException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to parse file name: " + name); - - continue; - } - - if (IpcSharedMemoryUtils.alive(pid0)) { - if (log.isDebugEnabled()) - log.debug("Skipping alive process: " + pid0); - - continue; - } - - if (log.isDebugEnabled()) - log.debug("Possibly stale token file: " + f0); - - IpcSharedMemoryUtils.freeSystemResources(f0.getAbsolutePath(), size); - - if (f0.delete()) { - if (log.isDebugEnabled()) - log.debug("Deleted file: " + f0.getName()); - - rmvCnt++; - } - else if (!f0.exists()) { - if (log.isDebugEnabled()) - log.debug("File has been concurrently deleted: " + f0.getName()); - - rmvCnt++; - } - else if (log.isDebugEnabled()) - log.debug("Failed to delete file: " + f0.getName()); - } - } - finally { - // Assuming that no new files can appear, since - if (rmvCnt == shmemToks.length) { - U.delete(f); - - if (log.isDebugEnabled()) - log.debug("Deleted empty token directory: " + f.getName()); - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemorySpace.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemorySpace.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemorySpace.java deleted file mode 100644 index 89d2d4e..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemorySpace.java +++ /dev/null @@ -1,374 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.nio.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; - -import static org.apache.ignite.IgniteSystemProperties.*; - -/** - * - */ -@SuppressWarnings({"PointlessBooleanExpression", "ConstantConditions"}) -public class GridIpcSharedMemorySpace implements Closeable { - /** Debug flag (enable for testing). */ - private static final boolean DEBUG = Boolean.getBoolean(GG_IPC_SHMEM_SPACE_DEBUG); - - /** Shared memory segment size (operable). */ - private final int opSize; - - /** Shared memory native pointer. */ - private final long shmemPtr; - - /** Shared memory ID. */ - private final int shmemId; - - /** Semaphore set ID. */ - private final int semId; - - /** Local closed flag. */ - private final AtomicBoolean closed = new AtomicBoolean(); - - /** {@code True} if space has been closed. */ - private final boolean isReader; - - /** Lock to protect readers and writers from concurrent close. */ - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - - /** */ - private final int writerPid; - - /** */ - private final int readerPid; - - /** */ - private final String tokFileName; - - /** */ - private final IgniteLogger log; - - /** - * This will allocate system resources for the space. - * - * @param tokFileName Token filename. - * @param writerPid Writer PID. - * @param readerPid Reader PID. - * @param size Size in bytes. - * @param reader {@code True} if reader. - * @param parent Parent logger. - * @throws IgniteCheckedException If failed. - */ - public GridIpcSharedMemorySpace(String tokFileName, int writerPid, int readerPid, int size, boolean reader, - IgniteLogger parent) throws IgniteCheckedException { - assert size > 0 : "Size cannot be less than 1 byte"; - - log = parent.getLogger(GridIpcSharedMemorySpace.class); - - opSize = size; - - shmemPtr = IpcSharedMemoryUtils.allocateSystemResources(tokFileName, size, DEBUG && log.isDebugEnabled()); - - shmemId = IpcSharedMemoryUtils.sharedMemoryId(shmemPtr); - semId = IpcSharedMemoryUtils.semaphoreId(shmemPtr); - - isReader = reader; - - this.tokFileName = tokFileName; - this.readerPid = readerPid; - this.writerPid = writerPid; - - if (DEBUG && log.isDebugEnabled()) - log.debug("Shared memory space has been created: " + this); - } - - /** - * This should be called in order to attach to already allocated system resources. - * - * @param tokFileName Token file name (for proper cleanup). - * @param writerPid Writer PID. - * @param readerPid Reader PID. - * @param size Size. - * @param reader Reader flag. - * @param shmemId Shared memory ID. - * @param parent Logger. - * @throws IgniteCheckedException If failed. - */ - public GridIpcSharedMemorySpace(String tokFileName, int writerPid, int readerPid, int size, boolean reader, - int shmemId, IgniteLogger parent) throws IgniteCheckedException { - assert size > 0 : "Size cannot be less than 1 byte"; - - log = parent.getLogger(GridIpcSharedMemorySpace.class); - - opSize = size; - isReader = reader; - this.shmemId = shmemId; - this.writerPid = writerPid; - this.readerPid = readerPid; - this.tokFileName = tokFileName; - - shmemPtr = IpcSharedMemoryUtils.attach(shmemId, DEBUG && log.isDebugEnabled()); - - semId = IpcSharedMemoryUtils.semaphoreId(shmemPtr); - } - - /** - * @param buf Buffer. - * @param off Offset. - * @param len Length. - * @param timeout Operation timeout in milliseconds ({@code 0} to wait forever). - * @throws IgniteCheckedException If space has been closed. - * @throws GridIpcSharedMemoryOperationTimedoutException If operation times out. - */ - public void write(byte[] buf, int off, int len, long timeout) throws IgniteCheckedException, - GridIpcSharedMemoryOperationTimedoutException { - assert buf != null; - assert len > 0; - assert buf.length >= off + len; - assert timeout >= 0; - - assert !isReader; - - lock.readLock().lock(); - - try { - if (closed.get()) - throw new IgniteCheckedException("Shared memory segment has been closed: " + this); - - IpcSharedMemoryUtils.writeSharedMemory(shmemPtr, buf, off, len, timeout); - } - finally { - lock.readLock().unlock(); - } - } - - /** - * @param buf Buffer. - * @param off Offset. - * @param len Length. - * @param timeout Operation timeout in milliseconds ({@code 0} to wait forever). - * @throws IgniteCheckedException If space has been closed. - * @throws GridIpcSharedMemoryOperationTimedoutException If operation times out. - */ - public void write(ByteBuffer buf, int off, int len, long timeout) throws IgniteCheckedException, - GridIpcSharedMemoryOperationTimedoutException { - assert buf != null; - assert len > 0; - assert buf.limit() >= off + len; - assert timeout >= 0; - assert !isReader; - - lock.readLock().lock(); - - try { - if (closed.get()) - throw new IgniteCheckedException("Shared memory segment has been closed: " + this); - - IpcSharedMemoryUtils.writeSharedMemoryByteBuffer(shmemPtr, buf, off, len, timeout); - } - finally { - lock.readLock().unlock(); - } - } - - /** - * Blocks until at least 1 byte is read. - * - * @param buf Buffer. - * @param off Offset. - * @param len Length. - * @param timeout Operation timeout in milliseconds ({@code 0} to wait forever). - * @return Read bytes count. - * @throws IgniteCheckedException If space has been closed. - * @throws GridIpcSharedMemoryOperationTimedoutException If operation times out. - */ - public int read(byte[] buf, int off, int len, long timeout) throws IgniteCheckedException, - GridIpcSharedMemoryOperationTimedoutException{ - assert buf != null; - assert len > 0; - assert buf.length >= off + len; - - assert isReader; - - lock.readLock().lock(); - - try { - if (closed.get()) - throw new IgniteCheckedException("Shared memory segment has been closed: " + this); - - return (int) IpcSharedMemoryUtils.readSharedMemory(shmemPtr, buf, off, len, timeout); - } - finally { - lock.readLock().unlock(); - } - } - - /** - * Blocks until at least 1 byte is read. - * - * @param buf Buffer. - * @param off Offset. - * @param len Length. - * @param timeout Operation timeout in milliseconds ({@code 0} to wait forever). - * @return Read bytes count. - * @throws IgniteCheckedException If space has been closed. - * @throws GridIpcSharedMemoryOperationTimedoutException If operation times out. - */ - public int read(ByteBuffer buf, int off, int len, long timeout) throws IgniteCheckedException, - GridIpcSharedMemoryOperationTimedoutException{ - assert buf != null; - assert len > 0; - assert buf.capacity() >= off + len; - assert isReader; - - lock.readLock().lock(); - - try { - if (closed.get()) - throw new IgniteCheckedException("Shared memory segment has been closed: " + this); - - return (int) IpcSharedMemoryUtils.readSharedMemoryByteBuffer(shmemPtr, buf, off, len, timeout); - } - finally { - lock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public void close() { - close0(false); - } - - /** - * Forcibly closes the space and frees all system resources. - * <p> - * This method should be called with caution as it may result to the other-party - * process crash. It is intended to call when there was an IO error during handshake - * and other party has not yet attached to the space. - */ - public void forceClose() { - close0(true); - } - - /** - * @return Shared memory ID. - */ - public int sharedMemoryId() { - return shmemId; - } - - /** - * @return Semaphore set ID. - */ - public int semaphoreId() { - return semId; - } - - /** - * @param force {@code True} to close the space. - */ - private void close0(boolean force) { - if (!closed.compareAndSet(false, true)) - return; - - IpcSharedMemoryUtils.ipcClose(shmemPtr); - - // Wait all readers and writes to leave critical section. - lock.writeLock().lock(); - - try { - IpcSharedMemoryUtils.freeSystemResources(tokFileName, shmemPtr, force); - } - finally { - lock.writeLock().unlock(); - } - - if (DEBUG && log.isDebugEnabled()) - log.debug("Shared memory space has been closed: " + this); - } - - /** - * @return Bytes available for read. - * @throws IgniteCheckedException If failed. - */ - public int unreadCount() throws IgniteCheckedException { - lock.readLock().lock(); - - try { - if (closed.get()) - throw new IgniteCheckedException("Shared memory segment has been closed: " + this); - - return IpcSharedMemoryUtils.unreadCount(shmemPtr); - } - finally { - lock.readLock().unlock(); - } - } - - /** - * @return Shared memory pointer. - */ - public long sharedMemPointer() { - return shmemPtr; - } - - /** - * @return Reader PID. - */ - public int readerPid() { - return readerPid; - } - - /** - * @return Writer PID. - */ - public int writerPid() { - return writerPid; - } - - /** - * @return Vis-a-vis PID. - */ - public int otherPartyPid() { - return isReader ? writerPid : readerPid; - } - - /** - * @return Token file name used to create shared memory space. - */ - public String tokenFileName() { - return tokFileName; - } - - /** - * @return Space size. - */ - public int size() { - return opSize; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridIpcSharedMemorySpace.class, this, "closed", closed.get()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcOutOfSystemResourcesException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcOutOfSystemResourcesException.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcOutOfSystemResourcesException.java new file mode 100644 index 0000000..10aea80 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcOutOfSystemResourcesException.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem; + +import org.apache.ignite.*; +import org.jetbrains.annotations.*; + +/** + * Thrown when IPC runs out of system resources (for example, no more free shared memory is + * available in operating system). + */ +public class IpcOutOfSystemResourcesException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Creates new exception with given error message. + * + * @param msg Error message. + */ + public IpcOutOfSystemResourcesException(String msg) { + super(msg); + } + + /** + * Creates new exception with given throwable as a cause and + * source of error message. + * + * @param cause Non-null throwable cause. + */ + public IpcOutOfSystemResourcesException(Throwable cause) { + super(cause); + } + + /** + * Creates new exception with given error message and optional nested exception. + * + * @param msg Error message. + * @param cause Optional nested exception (can be {@code null}). + */ + public IpcOutOfSystemResourcesException(String msg, @Nullable Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java new file mode 100644 index 0000000..d421440 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.ipc.*; + +import java.io.*; +import java.net.*; + +/** + * IPC endpoint based on shared memory space. + */ +public class IpcSharedMemoryClientEndpoint implements IpcEndpoint { + /** In space. */ + private final IpcSharedMemorySpace inSpace; + + /** Out space. */ + private final IpcSharedMemorySpace outSpace; + + /** In space. */ + private final IpcSharedMemoryInputStream in; + + /** Out space. */ + private final IpcSharedMemoryOutputStream out; + + /** */ + private boolean checkIn = true; + + /** */ + private boolean checkOut = true; + + /** */ + private final Thread checker; + + /** */ + private final IgniteLogger log; + + /** + * Creates connected client IPC endpoint. + * + * @param inSpace In space. + * @param outSpace Out space. + * @param parent Parent logger. + */ + public IpcSharedMemoryClientEndpoint(IpcSharedMemorySpace inSpace, IpcSharedMemorySpace outSpace, + IgniteLogger parent) { + assert inSpace != null; + assert outSpace != null; + + log = parent.getLogger(IpcSharedMemoryClientEndpoint.class); + + this.inSpace = inSpace; + this.outSpace = outSpace; + + in = new IpcSharedMemoryInputStream(inSpace); + out = new IpcSharedMemoryOutputStream(outSpace); + + checker = null; + } + + /** + * Creates and connects client IPC endpoint and starts background checker thread to avoid deadlocks on other party + * crash. Waits until port became available. + * + * @param port Port server endpoint bound to. + * @param parent Parent logger. + * @throws IgniteCheckedException If connection fails. + */ + public IpcSharedMemoryClientEndpoint(int port, IgniteLogger parent) throws IgniteCheckedException { + this(port, 0, parent); + } + + /** + * Creates and connects client IPC endpoint and starts background checker thread to avoid deadlocks on other party + * crash. + * + * @param port Port server endpoint bound to. + * @param timeout Connection timeout. + * @param parent Parent logger. + * @throws IgniteCheckedException If connection fails. + */ + @SuppressWarnings({"CallToThreadStartDuringObjectConstruction", "ErrorNotRethrown"}) + public IpcSharedMemoryClientEndpoint(int port, int timeout, IgniteLogger parent) throws IgniteCheckedException { + assert port > 0; + assert port < 0xffff; + + log = parent.getLogger(IpcSharedMemoryClientEndpoint.class); + + IpcSharedMemorySpace inSpace = null; + IpcSharedMemorySpace outSpace = null; + + Socket sock = new Socket(); + + Exception err = null; + boolean clear = true; + + try { + IpcSharedMemoryNativeLoader.load(); + + sock.connect(new InetSocketAddress("127.0.0.1", port), timeout); + + // Send request. + ObjectOutputStream out = new ObjectOutputStream(sock.getOutputStream()); + + int pid = IpcSharedMemoryUtils.pid(); + + out.writeObject(new IpcSharedMemoryInitRequest(pid)); + + ObjectInputStream in = new ObjectInputStream(sock.getInputStream()); + + IpcSharedMemoryInitResponse res = (IpcSharedMemoryInitResponse)in.readObject(); + + err = res.error(); + + if (err == null) { + String inTokFileName = res.inTokenFileName(); + + assert inTokFileName != null; + + inSpace = new IpcSharedMemorySpace(inTokFileName, res.pid(), pid, res.size(), true, + res.inSharedMemoryId(), log); + + String outTokFileName = res.outTokenFileName(); + + assert outTokFileName != null; + + outSpace = new IpcSharedMemorySpace(outTokFileName, pid, res.pid(), res.size(), false, + res.outSharedMemoryId(), log); + + // This is success ACK. + out.writeBoolean(true); + + out.flush(); + + clear = false; + } + } + catch (UnsatisfiedLinkError e) { + throw IpcSharedMemoryUtils.linkError(e); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to connect shared memory endpoint to port " + + "(is shared memory server endpoint up and running?): " + port, e); + } + catch (ClassNotFoundException | ClassCastException e) { + throw new IgniteCheckedException(e); + } + finally { + U.closeQuiet(sock); + + if (clear) { + if (inSpace != null) + inSpace.forceClose(); + + if (outSpace != null) + outSpace.forceClose(); + } + } + + if (err != null) // Error response. + throw new IgniteCheckedException(err); + + this.inSpace = inSpace; + this.outSpace = outSpace; + + in = new IpcSharedMemoryInputStream(inSpace); + out = new IpcSharedMemoryOutputStream(outSpace); + + checker = new Thread(new AliveChecker()); + + // Required for Hadoop 2.x + checker.setDaemon(true); + + checker.start(); + } + + /** {@inheritDoc} */ + @Override public InputStream inputStream() { + return in; + } + + /** {@inheritDoc} */ + @Override public OutputStream outputStream() { + return out; + } + + /** {@inheritDoc} */ + @Override public void close() { + U.closeQuiet(in); + U.closeQuiet(out); + + stopChecker(); + } + + /** + * Forcibly closes spaces and frees all system resources. <p> This method should be called with caution as it may + * result to the other-party process crash. It is intended to call when there was an IO error during handshake and + * other party has not yet attached to the space. + */ + public void forceClose() { + in.forceClose(); + out.forceClose(); + + stopChecker(); + } + + /** + * + */ + private void stopChecker() { + if (checker != null) { + checker.interrupt(); + + try { + checker.join(); + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + } + + /** @return {@code True} if other party is alive and new invocation of this method needed. */ + boolean checkOtherPartyAlive() { + if (checkIn) { + File tokFile = new File(inSpace.tokenFileName()); + + if (!tokFile.exists()) + checkIn = false; + } + + if (checkOut) { + File tokFile = new File(outSpace.tokenFileName()); + + if (!tokFile.exists()) + checkOut = false; + } + + if (!checkIn && !checkOut) + return false; + + if (!IpcSharedMemoryUtils.alive(inSpace.otherPartyPid())) { + U.warn(log, "Remote process is considered to be dead (shared memory space will be forcibly closed): " + + inSpace.otherPartyPid()); + + closeSpace(inSpace); + closeSpace(outSpace); + + return false; + } + + // Need to call this method again after timeout. + return true; + } + + /** + * This method is intended for test purposes only. + * + * @return In space. + */ + IpcSharedMemorySpace inSpace() { + return inSpace; + } + + /** + * This method is intended for test purposes only. + * + * @return Out space. + */ + IpcSharedMemorySpace outSpace() { + return outSpace; + } + + /** @param space Space to close. */ + private void closeSpace(IpcSharedMemorySpace space) { + assert space != null; + + space.forceClose(); + + File tokFile = new File(space.tokenFileName()); + + // Space is not usable at this point and all local threads + // are guaranteed to leave its methods (other party is not alive). + // So, we can cleanup resources without additional synchronization. + IpcSharedMemoryUtils.freeSystemResources(tokFile.getAbsolutePath(), space.size()); + + tokFile.delete(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IpcSharedMemoryClientEndpoint.class, this); + } + + /** + * + */ + private class AliveChecker implements Runnable { + /** Check frequency. */ + private static final long CHECK_FREQ = 10000; + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override public void run() { + while (!Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(CHECK_FREQ); + } + catch (InterruptedException ignored) { + return; + } + + if (!checkOtherPartyAlive()) + // No need to check any more. + return; + } + } + } +}