http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryInitRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryInitRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryInitRequest.java new file mode 100644 index 0000000..c187dbc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryInitRequest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem; + +import java.io.*; + +/** + * + */ +public class IpcSharedMemoryInitRequest implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private int pid; + + /** + * @param pid PID of the {@code client} party. + */ + public IpcSharedMemoryInitRequest(int pid) { + this.pid = pid; + } + + /** + * Required by {@code Externalizable}. + */ + public IpcSharedMemoryInitRequest() { + // No-op. + } + + /** + * @return Sender PID. + */ + public int pid() { + return pid; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(pid); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + pid = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "IpcSharedMemoryInitRequest [pid=" + pid + ']'; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryInitResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryInitResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryInitResponse.java new file mode 100644 index 0000000..9f2ed24 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryInitResponse.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * + */ +public class IpcSharedMemoryInitResponse implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private String inTokFileName; + + /** */ + private int inSharedMemId; + + /** */ + private String outTokFileName; + + /** */ + private int outSharedMemId; + + /** */ + private int pid; + + /** */ + private int size; + + /** */ + private Exception err; + + /** + * Constructs a successful response. + * + * @param inTokFileName In token. + * @param inSharedMemId In shared memory ID. + * @param outTokFileName Out token. + * @param outSharedMemId Out shared memory ID. + * @param pid PID of the {@code server} party. + * @param size Size. + */ + public IpcSharedMemoryInitResponse(String inTokFileName, int inSharedMemId, String outTokFileName, + int outSharedMemId, int pid, int size) { + this.inTokFileName = inTokFileName; + this.inSharedMemId = inSharedMemId; + this.outTokFileName = outTokFileName; + this.outSharedMemId = outSharedMemId; + this.pid = pid; + this.size = size; + } + + /** + * Constructs an error response. + * + * @param err Error cause. + */ + public IpcSharedMemoryInitResponse(Exception err) { + this.err = err; + } + + /** + * Required by {@code Externalizable}. + */ + public IpcSharedMemoryInitResponse() { + // No-op. + } + + /** + * @return In token file name or {@code null}, if this is an error response. + */ + @Nullable public String inTokenFileName() { + return inTokFileName; + } + + /** + * @return In shared memory ID. + */ + public int inSharedMemoryId() { + return inSharedMemId; + } + + /** + * @return Out token file name or {@code null}, if this is an error response. + */ + @Nullable public String outTokenFileName() { + return outTokFileName; + } + + /** + * @return Out shared memory ID. + */ + public int outSharedMemoryId() { + return outSharedMemId; + } + + /** + * @return Sender PID. + */ + public int pid() { + return pid; + } + + /** + * @return Space size. + */ + public int size() { + return size; + } + + /** + * @return Error message or {@code null}, if this is + * a successful response. + */ + @Nullable public Exception error() { + return err; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, inTokFileName); + out.writeInt(inSharedMemId); + U.writeString(out, outTokFileName); + out.writeInt(outSharedMemId); + out.writeObject(err); + out.writeInt(pid); + out.writeInt(size); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + inTokFileName = U.readString(in); + inSharedMemId = in.readInt(); + outTokFileName = U.readString(in); + outSharedMemId = in.readInt(); + err = (Exception)in.readObject(); + pid = in.readInt(); + size = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "IpcSharedMemoryInitResponse [err=" + err + + ", inTokFileName=" + inTokFileName + + ", inSharedMemId=" + inSharedMemId + + ", outTokFileName=" + outTokFileName + + ", outSharedMemId=" + outSharedMemId + + ", pid=" + pid + + ", size=" + size + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryInputStream.java new file mode 100644 index 0000000..0909da0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryInputStream.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * + */ +public class IpcSharedMemoryInputStream extends InputStream { + /** */ + private final IpcSharedMemorySpace in; + + /** Stream instance is not thread-safe so we can cache buffer. */ + private byte[] buf = new byte[1]; + + /** + * @param in Space. + */ + public IpcSharedMemoryInputStream(IpcSharedMemorySpace in) { + assert in != null; + + this.in = in; + } + + /** {@inheritDoc} */ + @Override public int read() throws IOException { + try { + int read = in.read(buf, 0, 1, 0); + + if (read < 0) + return read; + + return buf[0] & 0xFF; + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + + /** {@inheritDoc} */ + @Override public int read(byte[] b, int off, int len) throws IOException { + try { + return in.read(b, off, len, 0); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + + /** {@inheritDoc} */ + @Override public int available() throws IOException { + try { + return in.unreadCount(); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + in.close(); + } + + /** + * Forcibly closes spaces and frees all system resources. + * <p> + * This method should be called with caution as it may result to the other-party + * process crash. It is intended to call when there was an IO error during handshake + * and other party has not yet attached to the space. + */ + public void forceClose() { + in.forceClose(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IpcSharedMemoryInputStream.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java new file mode 100644 index 0000000..a835c96 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.net.*; +import java.nio.channels.*; +import java.security.*; +import java.util.*; + +/** + * Shared memory native loader. + */ +@SuppressWarnings("ErrorNotRethrown") +public class IpcSharedMemoryNativeLoader { + /** Loaded flag. */ + private static volatile boolean loaded; + + /** Library name base. */ + private static final String LIB_NAME_BASE = "igniteshmem"; + + /** Lock file path. */ + private static final File LOCK_FILE = new File(System.getProperty("java.io.tmpdir"), "igniteshmem.lock"); + + /** Library name. */ + static final String LIB_NAME = LIB_NAME_BASE + "-" + GridProductImpl.VER; + + /** + * @return Operating system name to resolve path to library. + */ + private static String os() { + String name = System.getProperty("os.name").toLowerCase().trim(); + + if (name.startsWith("win")) + throw new IllegalStateException("IPC shared memory native loader should not be called on windows."); + + if (name.startsWith("linux")) + return "linux"; + + if (name.startsWith("mac os x")) + return "osx"; + + return name.replaceAll("\\W+", "_"); + } + + /** + * @return Platform. + */ + private static String platform() { + return os() + bitModel(); + } + + /** + * @return Bit model. + */ + private static int bitModel() { + String prop = System.getProperty("sun.arch.data.model"); + + if (prop == null) + prop = System.getProperty("com.ibm.vm.bitmode"); + + if (prop != null) + return Integer.parseInt(prop); + + // We don't know. + return -1; + } + + /** + * @throws IgniteCheckedException If failed. + */ + public static void load() throws IgniteCheckedException { + if (loaded) + return; + + synchronized (IpcSharedMemoryNativeLoader.class) { + if (loaded) + return; + + doLoad(); + + loaded = true; + } + } + + /** + * @throws IgniteCheckedException If failed. + */ + private static void doLoad() throws IgniteCheckedException { + assert Thread.holdsLock(IpcSharedMemoryNativeLoader.class); + + Collection<Throwable> errs = new ArrayList<>(); + + try { + // Load native library (the library directory should be in java.library.path). + System.loadLibrary(LIB_NAME); + + return; + } + catch (UnsatisfiedLinkError e) { + errs.add(e); + } + + // Obtain lock on file to prevent concurrent extracts. + try (RandomAccessFile randomAccessFile = new RandomAccessFile(LOCK_FILE, "rws"); + FileLock ignored = randomAccessFile.getChannel().lock()) { + if (extractAndLoad(errs, platformSpecificResourcePath())) + return; + + if (extractAndLoad(errs, osSpecificResourcePath())) + return; + + if (extractAndLoad(errs, resourcePath())) + return; + + // Failed to find the library. + assert !errs.isEmpty(); + + throw new IgniteCheckedException("Failed to load native IPC library: " + errs); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to obtain file lock: " + LOCK_FILE, e); + } + } + + /** + * @return OS resource path. + */ + private static String osSpecificResourcePath() { + return "META-INF/native/" + os() + "/" + mapLibraryName(LIB_NAME_BASE); + } + + /** + * @return Platform resource path. + */ + private static String platformSpecificResourcePath() { + return "META-INF/native/" + platform() + "/" + mapLibraryName(LIB_NAME_BASE); + } + + /** + * @return Resource path. + */ + private static String resourcePath() { + return "META-INF/native/" + mapLibraryName(LIB_NAME_BASE); + } + + /** + * @return Maps library name to file name. + */ + private static String mapLibraryName(String name) { + String libName = System.mapLibraryName(name); + + if (U.isMacOs() && libName.endsWith(".jnilib")) + return libName.substring(0, libName.length() - "jnilib".length()) + "dylib"; + + return libName; + } + + /** + * @param errs Errors collection. + * @param rsrcPath Path. + * @return {@code True} if library was found and loaded. + */ + private static boolean extractAndLoad(Collection<Throwable> errs, String rsrcPath) { + ClassLoader clsLdr = U.detectClassLoader(IpcSharedMemoryNativeLoader.class); + + URL rsrc = clsLdr.getResource(rsrcPath); + + if (rsrc != null) + return extract(errs, rsrc, new File(System.getProperty("java.io.tmpdir"), mapLibraryName(LIB_NAME))); + else { + errs.add(new IllegalStateException("Failed to find resource with specified class loader " + + "[rsrc=" + rsrcPath + ", clsLdr=" + clsLdr + ']')); + + return false; + } + } + + /** + * @param errs Errors collection. + * @param src Source. + * @param target Target. + * @return {@code True} if resource was found and loaded. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + private static boolean extract(Collection<Throwable> errs, URL src, File target) { + FileOutputStream os = null; + InputStream is = null; + + try { + if (!target.exists() || !haveEqualMD5(target, src)) { + is = src.openStream(); + + if (is != null) { + os = new FileOutputStream(target); + + int read; + + byte[] buf = new byte[4096]; + + while ((read = is.read(buf)) != -1) + os.write(buf, 0, read); + } + } + + // chmod 775. + if (!U.isWindows()) + Runtime.getRuntime().exec(new String[] {"chmod", "775", target.getCanonicalPath()}).waitFor(); + + System.load(target.getPath()); + + return true; + } + catch (IOException | UnsatisfiedLinkError | InterruptedException | NoSuchAlgorithmException e) { + errs.add(e); + } + finally { + U.closeQuiet(os); + U.closeQuiet(is); + } + + return false; + } + + /** + * @param target + * @param src + * @return + * @throws NoSuchAlgorithmException + * @throws IOException + */ + private static boolean haveEqualMD5(File target, URL src) throws NoSuchAlgorithmException, IOException { + try (InputStream targetIS = new FileInputStream(target); + InputStream srcIS = src.openStream()) { + + String targetMD5 = U.calculateMD5(targetIS); + String srcMD5 = U.calculateMD5(srcIS); + + return targetMD5.equals(srcMD5); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryOperationTimedoutException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryOperationTimedoutException.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryOperationTimedoutException.java new file mode 100644 index 0000000..f0c3f53 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryOperationTimedoutException.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem; + +import org.apache.ignite.*; +import org.jetbrains.annotations.*; + +/** + * Thrown when IPC operation (such as {@link IpcSharedMemorySpace#wait(long)}) + * has timed out. + */ +public class IpcSharedMemoryOperationTimedoutException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Creates new exception with given error message. + * + * @param msg Error message. + */ + public IpcSharedMemoryOperationTimedoutException(String msg) { + super(msg); + } + + /** + * Creates new exception with given throwable as a cause and + * source of error message. + * + * @param cause Non-null throwable cause. + */ + public IpcSharedMemoryOperationTimedoutException(Throwable cause) { + super(cause); + } + + /** + * Creates new exception with given error message and optional nested exception. + * + * @param msg Error message. + * @param cause Optional nested exception (can be {@code null}). + */ + public IpcSharedMemoryOperationTimedoutException(String msg, @Nullable Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryOutputStream.java new file mode 100644 index 0000000..eb85fc1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryOutputStream.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * + */ +public class IpcSharedMemoryOutputStream extends OutputStream { + /** */ + private final IpcSharedMemorySpace out; + + /** + * @param out Space. + */ + public IpcSharedMemoryOutputStream(IpcSharedMemorySpace out) { + assert out != null; + + this.out = out; + } + + /** {@inheritDoc} */ + @Override public void write(int b) throws IOException { + byte[] buf = new byte[1]; + + buf[0] = (byte)b; + + write(buf, 0, 1); + } + + /** {@inheritDoc} */ + @Override public void write(byte[] b, int off, int len) throws IOException { + try { + out.write(b, off, len, 0); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + out.close(); + } + + /** + * Forcibly closes spaces and frees all system resources. + * <p> + * This method should be called with caution as it may result to the other-party + * process crash. It is intended to call when there was an IO error during handshake + * and other party has not yet attached to the space. + */ + public void forceClose() { + out.forceClose(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IpcSharedMemoryOutputStream.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java new file mode 100644 index 0000000..d22f2c9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java @@ -0,0 +1,707 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.thread.*; +import org.apache.ignite.internal.processors.resource.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.worker.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.nio.channels.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Server shared memory IPC endpoint. + */ +public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint { + /** Troubleshooting public wiki page. */ + public static final String TROUBLESHOOTING_URL = "http://bit.ly/GridGain-Troubleshooting"; + + /** IPC error message. */ + public static final String OUT_OF_RESOURCES_MSG = "Failed to allocate shared memory segment " + + "(for troubleshooting see " + TROUBLESHOOTING_URL + ')'; + + /** Default endpoint port number. */ + public static final int DFLT_IPC_PORT = 10500; + + /** Default shared memory space in bytes. */ + public static final int DFLT_SPACE_SIZE = 256 * 1024; + + /** + * Default token directory. Note that this path is relative to {@code GRIDGAIN_HOME/work} folder + * if {@code GRIDGAIN_HOME} system or environment variable specified, otherwise it is relative to + * {@code work} folder under system {@code java.io.tmpdir} folder. + * + * @see org.apache.ignite.configuration.IgniteConfiguration#getWorkDirectory() + */ + public static final String DFLT_TOKEN_DIR_PATH = "ipc/shmem"; + + /** + * Shared memory token file name prefix. + * + * Token files are created and stored in the following manner: [tokDirPath]/[nodeId]-[current + * PID]/gg-shmem-space-[auto_idx]-[other_party_pid]-[size] + */ + public static final String TOKEN_FILE_NAME = "gg-shmem-space-"; + + /** Default lock file name. */ + private static final String LOCK_FILE_NAME = "lock.file"; + + /** GC frequency. */ + private static final long GC_FREQ = 10000; + + /** ID generator. */ + private static final AtomicLong tokIdxGen = new AtomicLong(); + + /** Port to bind socket to. */ + private int port = DFLT_IPC_PORT; + + /** Prefix. */ + private String tokDirPath = DFLT_TOKEN_DIR_PATH; + + /** Space size. */ + private int size = DFLT_SPACE_SIZE; + + /** Server socket. */ + @GridToStringExclude + private ServerSocket srvSock; + + /** Token directory. */ + private File tokDir; + + /** Logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** Local node ID. */ + private UUID locNodeId; + + /** Grid name. */ + private String gridName; + + /** Flag allowing not to print out of resources warning. */ + private boolean omitOutOfResourcesWarn; + + /** GC worker. */ + private GridWorker gcWorker; + + /** Pid of the current process. */ + private int pid; + + /** Closed flag. */ + private volatile boolean closed; + + /** Spaces opened on with this endpoint. */ + private final Collection<IpcSharedMemoryClientEndpoint> endpoints = + new GridConcurrentHashSet<>(); + + /** Use this constructor when dependencies could be injected with {@link GridResourceProcessor#injectGeneric(Object)}. */ + public IpcSharedMemoryServerEndpoint() { + // No-op. + } + + /** + * Constructor to set dependencies explicitly. + * + * @param log Log. + * @param locNodeId Node id. + * @param gridName Grid name. + */ + public IpcSharedMemoryServerEndpoint(IgniteLogger log, UUID locNodeId, String gridName) { + this.log = log; + this.locNodeId = locNodeId; + this.gridName = gridName; + } + + /** @param omitOutOfResourcesWarn If {@code true}, out of resources warning will not be printed by server. */ + public void omitOutOfResourcesWarning(boolean omitOutOfResourcesWarn) { + this.omitOutOfResourcesWarn = omitOutOfResourcesWarn; + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + IpcSharedMemoryNativeLoader.load(); + + pid = IpcSharedMemoryUtils.pid(); + + if (pid == -1) + throw new IpcEndpointBindException("Failed to get PID of the current process."); + + if (size <= 0) + throw new IpcEndpointBindException("Space size should be positive: " + size); + + String tokDirPath = this.tokDirPath; + + if (F.isEmpty(tokDirPath)) + throw new IpcEndpointBindException("Token directory path is empty."); + + tokDirPath = tokDirPath + '/' + locNodeId.toString() + '-' + IpcSharedMemoryUtils.pid(); + + tokDir = U.resolveWorkDirectory(tokDirPath, false); + + if (port <= 0 || port >= 0xffff) + throw new IpcEndpointBindException("Port value is illegal: " + port); + + try { + srvSock = new ServerSocket(); + + // Always bind to loopback. + srvSock.bind(new InetSocketAddress("127.0.0.1", port)); + } + catch (IOException e) { + // Although empty socket constructor never throws exception, close it just in case. + U.closeQuiet(srvSock); + + throw new IpcEndpointBindException("Failed to bind shared memory IPC endpoint (is port already " + + "in use?): " + port, e); + } + + gcWorker = new GcWorker(gridName, "ipc-shmem-gc", log); + + new IgniteThread(gcWorker).start(); + + if (log.isInfoEnabled()) + log.info("IPC shared memory server endpoint started [port=" + port + + ", tokDir=" + tokDir.getAbsolutePath() + ']'); + } + + /** {@inheritDoc} */ + @SuppressWarnings("ErrorNotRethrown") + @Override public IpcEndpoint accept() throws IgniteCheckedException { + while (!Thread.currentThread().isInterrupted()) { + Socket sock = null; + + boolean accepted = false; + + try { + sock = srvSock.accept(); + + accepted = true; + + InputStream inputStream = sock.getInputStream(); + ObjectInputStream in = new ObjectInputStream(inputStream); + + ObjectOutputStream out = new ObjectOutputStream(sock.getOutputStream()); + + IpcSharedMemorySpace inSpace = null; + + IpcSharedMemorySpace outSpace = null; + + boolean err = true; + + try { + IpcSharedMemoryInitRequest req = (IpcSharedMemoryInitRequest)in.readObject(); + + if (log.isDebugEnabled()) + log.debug("Processing request: " + req); + + IgnitePair<String> p = inOutToken(req.pid(), size); + + String file1 = p.get1(); + String file2 = p.get2(); + + assert file1 != null; + assert file2 != null; + + // Create tokens. + new File(file1).createNewFile(); + new File(file2).createNewFile(); + + if (log.isDebugEnabled()) + log.debug("Created token files: " + p); + + inSpace = new IpcSharedMemorySpace( + file1, + req.pid(), + pid, + size, + true, + log); + + outSpace = new IpcSharedMemorySpace( + file2, + pid, + req.pid(), + size, + false, + log); + + IpcSharedMemoryClientEndpoint ret = new IpcSharedMemoryClientEndpoint(inSpace, outSpace, + log); + + out.writeObject(new IpcSharedMemoryInitResponse(file2, outSpace.sharedMemoryId(), + file1, inSpace.sharedMemoryId(), pid, size)); + + err = !in.readBoolean(); + + endpoints.add(ret); + + return ret; + } + catch (UnsatisfiedLinkError e) { + throw IpcSharedMemoryUtils.linkError(e); + } + catch (IOException e) { + if (log.isDebugEnabled()) + log.debug("Failed to process incoming connection " + + "(was connection closed by another party):" + e.getMessage()); + } + catch (ClassNotFoundException e) { + U.error(log, "Failed to process incoming connection.", e); + } + catch (ClassCastException e) { + String msg = "Failed to process incoming connection (most probably, shared memory " + + "rest endpoint has been configured by mistake)."; + + LT.warn(log, null, msg); + + sendErrorResponse(out, e); + } + catch (IpcOutOfSystemResourcesException e) { + if (!omitOutOfResourcesWarn) + LT.warn(log, null, OUT_OF_RESOURCES_MSG); + + sendErrorResponse(out, e); + } + catch (IgniteCheckedException e) { + LT.error(log, e, "Failed to process incoming shared memory connection."); + + sendErrorResponse(out, e); + } + finally { + // Exception has been thrown, need to free system resources. + if (err) { + if (inSpace != null) + inSpace.forceClose(); + + // Safety. + if (outSpace != null) + outSpace.forceClose(); + } + } + } + catch (IOException e) { + if (!Thread.currentThread().isInterrupted() && !accepted) + throw new IgniteCheckedException("Failed to accept incoming connection.", e); + + if (!closed) + LT.error(log, null, "Failed to process incoming shared memory connection: " + e.getMessage()); + } + finally { + U.closeQuiet(sock); + } + } // while + + throw new IgniteInterruptedException("Socket accept was interrupted."); + } + + /** + * Injects resources. + * + * @param ignite Ignite + */ + @IgniteInstanceResource + private void injectResources(Ignite ignite){ + if (ignite != null) { + // Inject resources. + gridName = ignite.name(); + locNodeId = ignite.configuration().getNodeId(); + } + else { + // Cleanup resources. + gridName = null; + locNodeId = null; + } + } + + /** + * @param out Output stream. + * @param err Error cause. + */ + private void sendErrorResponse(ObjectOutput out, Exception err) { + try { + out.writeObject(new IpcSharedMemoryInitResponse(err)); + } + catch (IOException e) { + U.error(log, "Failed to send error response to client.", e); + } + } + + /** + * @param pid PID of the other party. + * @param size Size of the space. + * @return Token pair. + */ + private IgnitePair<String> inOutToken(int pid, int size) { + while (true) { + long idx = tokIdxGen.get(); + + if (tokIdxGen.compareAndSet(idx, idx + 2)) + return F.pair(new File(tokDir, TOKEN_FILE_NAME + idx + "-" + pid + "-" + size).getAbsolutePath(), + new File(tokDir, TOKEN_FILE_NAME + (idx + 1) + "-" + pid + "-" + size).getAbsolutePath()); + } + } + + /** {@inheritDoc} */ + @Override public int getPort() { + return port; + } + + /** {@inheritDoc} */ + @Nullable @Override public String getHost() { + return null; + } + + /** + * {@inheritDoc} + * + * @return {@code false} as shared memory endpoints can not be used for management. + */ + @Override public boolean isManagement() { + return false; + } + + /** + * Sets port endpoint will be bound to. + * + * @param port Port number. + */ + public void setPort(int port) { + this.port = port; + } + + /** + * Gets token directory path. + * + * @return Token directory path. + */ + public String getTokenDirectoryPath() { + return tokDirPath; + } + + /** + * Sets token directory path. + * + * @param tokDirPath Token directory path. + */ + public void setTokenDirectoryPath(String tokDirPath) { + this.tokDirPath = tokDirPath; + } + + /** + * Gets size of shared memory spaces that are created by the endpoint. + * + * @return Size of shared memory space. + */ + public int getSize() { + return size; + } + + /** + * Sets size of shared memory spaces that are created by the endpoint. + * + * @param size Size of shared memory space. + */ + public void setSize(int size) { + this.size = size; + } + + /** {@inheritDoc} */ + @Override public void close() { + closed = true; + + U.closeQuiet(srvSock); + + if (gcWorker != null) { + U.cancel(gcWorker); + + // This method may be called from already interrupted thread. + // Need to ensure cleaning on close. + boolean interrupted = Thread.interrupted(); + + try { + U.join(gcWorker); + } + catch (IgniteInterruptedException e) { + U.warn(log, "Interrupted when stopping GC worker.", e); + } + finally { + if (interrupted) + Thread.currentThread().interrupt(); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IpcSharedMemoryServerEndpoint.class, this); + } + + /** + * Sets configuration properties from the map. + * + * @param endpointCfg Map of properties. + * @throws IgniteCheckedException If invalid property name or value. + */ + public void setupConfiguration(Map<String, String> endpointCfg) throws IgniteCheckedException { + for (Map.Entry<String,String> e : endpointCfg.entrySet()) { + try { + switch (e.getKey()) { + case "type": + case "host": + case "management": + //Ignore these properties + break; + + case "port": + setPort(Integer.parseInt(e.getValue())); + break; + + case "size": + setSize(Integer.parseInt(e.getValue())); + break; + + case "tokenDirectoryPath": + setTokenDirectoryPath(e.getValue()); + break; + + default: + throw new IgniteCheckedException("Invalid property '" + e.getKey() + "' of " + getClass().getSimpleName()); + } + } + catch (Throwable t) { + if (t instanceof IgniteCheckedException) + throw t; + + throw new IgniteCheckedException("Invalid value '" + e.getValue() + "' of the property '" + e.getKey() + "' in " + + getClass().getSimpleName(), t); + } + } + } + + /** + * + */ + private class GcWorker extends GridWorker { + /** + * @param gridName Grid name. + * @param name Name. + * @param log Log. + */ + protected GcWorker(@Nullable String gridName, String name, IgniteLogger log) { + super(gridName, name, log); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedException { + if (log.isDebugEnabled()) + log.debug("GC worker started."); + + File workTokDir = tokDir.getParentFile(); + + assert workTokDir != null; + + while (!isCancelled()) { + U.sleep(GC_FREQ); + + if (log.isDebugEnabled()) + log.debug("Starting GC iteration."); + + RandomAccessFile lockFile = null; + + FileLock lock = null; + + try { + lockFile = new RandomAccessFile(new File(workTokDir, LOCK_FILE_NAME), "rw"); + + lock = lockFile.getChannel().lock(); + + if (lock != null) + processTokenDirectory(workTokDir); + else if (log.isDebugEnabled()) + log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); + } + catch (OverlappingFileLockException ignored) { + if (log.isDebugEnabled()) + log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); + } + catch (IOException e) { + U.error(log, "Failed to process directory: " + workTokDir.getAbsolutePath(), e); + } + finally { + U.releaseQuiet(lock); + U.closeQuiet(lockFile); + } + + // Process spaces created by this endpoint. + if (log.isDebugEnabled()) + log.debug("Processing local spaces."); + + for (IpcSharedMemoryClientEndpoint e : endpoints) { + if (log.isDebugEnabled()) + log.debug("Processing endpoint: " + e); + + if (!e.checkOtherPartyAlive()) { + endpoints.remove(e); + + if (log.isDebugEnabled()) + log.debug("Removed endpoint: " + e); + } + } + } + } + + /** @param workTokDir Token directory (common for multiple nodes). */ + private void processTokenDirectory(File workTokDir) { + for (File f : workTokDir.listFiles()) { + if (!f.isDirectory()) { + if (!f.getName().equals(LOCK_FILE_NAME)) { + if (log.isDebugEnabled()) + log.debug("Unexpected file: " + f.getName()); + } + + continue; + } + + if (f.equals(tokDir)) { + if (log.isDebugEnabled()) + log.debug("Skipping own token directory: " + tokDir.getName()); + + continue; + } + + String name = f.getName(); + + int pid; + + try { + pid = Integer.parseInt(name.substring(name.lastIndexOf('-') + 1)); + } + catch (NumberFormatException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to parse file name: " + name); + + continue; + } + + // Is process alive? + if (IpcSharedMemoryUtils.alive(pid)) { + if (log.isDebugEnabled()) + log.debug("Skipping alive node: " + pid); + + continue; + } + + if (log.isDebugEnabled()) + log.debug("Possibly stale token folder: " + f); + + // Process each token under stale token folder. + File[] shmemToks = f.listFiles(); + + if (shmemToks == null) + // Although this is strange, but is reproducible sometimes on linux. + return; + + int rmvCnt = 0; + + try { + for (File f0 : shmemToks) { + if (log.isDebugEnabled()) + log.debug("Processing token file: " + f0.getName()); + + if (f0.isDirectory()) { + if (log.isDebugEnabled()) + log.debug("Unexpected directory: " + f0.getName()); + } + + // Token file format: gg-shmem-space-[auto_idx]-[other_party_pid]-[size] + String[] toks = f0.getName().split("-"); + + if (toks.length != 6) { + if (log.isDebugEnabled()) + log.debug("Unrecognized token file: " + f0.getName()); + + continue; + } + + int pid0; + int size; + + try { + pid0 = Integer.parseInt(toks[4]); + size = Integer.parseInt(toks[5]); + } + catch (NumberFormatException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to parse file name: " + name); + + continue; + } + + if (IpcSharedMemoryUtils.alive(pid0)) { + if (log.isDebugEnabled()) + log.debug("Skipping alive process: " + pid0); + + continue; + } + + if (log.isDebugEnabled()) + log.debug("Possibly stale token file: " + f0); + + IpcSharedMemoryUtils.freeSystemResources(f0.getAbsolutePath(), size); + + if (f0.delete()) { + if (log.isDebugEnabled()) + log.debug("Deleted file: " + f0.getName()); + + rmvCnt++; + } + else if (!f0.exists()) { + if (log.isDebugEnabled()) + log.debug("File has been concurrently deleted: " + f0.getName()); + + rmvCnt++; + } + else if (log.isDebugEnabled()) + log.debug("Failed to delete file: " + f0.getName()); + } + } + finally { + // Assuming that no new files can appear, since + if (rmvCnt == shmemToks.length) { + U.delete(f); + + if (log.isDebugEnabled()) + log.debug("Deleted empty token directory: " + f.getName()); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpace.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpace.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpace.java new file mode 100644 index 0000000..ba4be48 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpace.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.nio.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +import static org.apache.ignite.IgniteSystemProperties.*; + +/** + * + */ +@SuppressWarnings({"PointlessBooleanExpression", "ConstantConditions"}) +public class IpcSharedMemorySpace implements Closeable { + /** Debug flag (enable for testing). */ + private static final boolean DEBUG = Boolean.getBoolean(GG_IPC_SHMEM_SPACE_DEBUG); + + /** Shared memory segment size (operable). */ + private final int opSize; + + /** Shared memory native pointer. */ + private final long shmemPtr; + + /** Shared memory ID. */ + private final int shmemId; + + /** Semaphore set ID. */ + private final int semId; + + /** Local closed flag. */ + private final AtomicBoolean closed = new AtomicBoolean(); + + /** {@code True} if space has been closed. */ + private final boolean isReader; + + /** Lock to protect readers and writers from concurrent close. */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** */ + private final int writerPid; + + /** */ + private final int readerPid; + + /** */ + private final String tokFileName; + + /** */ + private final IgniteLogger log; + + /** + * This will allocate system resources for the space. + * + * @param tokFileName Token filename. + * @param writerPid Writer PID. + * @param readerPid Reader PID. + * @param size Size in bytes. + * @param reader {@code True} if reader. + * @param parent Parent logger. + * @throws IgniteCheckedException If failed. + */ + public IpcSharedMemorySpace(String tokFileName, int writerPid, int readerPid, int size, boolean reader, + IgniteLogger parent) throws IgniteCheckedException { + assert size > 0 : "Size cannot be less than 1 byte"; + + log = parent.getLogger(IpcSharedMemorySpace.class); + + opSize = size; + + shmemPtr = IpcSharedMemoryUtils.allocateSystemResources(tokFileName, size, DEBUG && log.isDebugEnabled()); + + shmemId = IpcSharedMemoryUtils.sharedMemoryId(shmemPtr); + semId = IpcSharedMemoryUtils.semaphoreId(shmemPtr); + + isReader = reader; + + this.tokFileName = tokFileName; + this.readerPid = readerPid; + this.writerPid = writerPid; + + if (DEBUG && log.isDebugEnabled()) + log.debug("Shared memory space has been created: " + this); + } + + /** + * This should be called in order to attach to already allocated system resources. + * + * @param tokFileName Token file name (for proper cleanup). + * @param writerPid Writer PID. + * @param readerPid Reader PID. + * @param size Size. + * @param reader Reader flag. + * @param shmemId Shared memory ID. + * @param parent Logger. + * @throws IgniteCheckedException If failed. + */ + public IpcSharedMemorySpace(String tokFileName, int writerPid, int readerPid, int size, boolean reader, + int shmemId, IgniteLogger parent) throws IgniteCheckedException { + assert size > 0 : "Size cannot be less than 1 byte"; + + log = parent.getLogger(IpcSharedMemorySpace.class); + + opSize = size; + isReader = reader; + this.shmemId = shmemId; + this.writerPid = writerPid; + this.readerPid = readerPid; + this.tokFileName = tokFileName; + + shmemPtr = IpcSharedMemoryUtils.attach(shmemId, DEBUG && log.isDebugEnabled()); + + semId = IpcSharedMemoryUtils.semaphoreId(shmemPtr); + } + + /** + * @param buf Buffer. + * @param off Offset. + * @param len Length. + * @param timeout Operation timeout in milliseconds ({@code 0} to wait forever). + * @throws IgniteCheckedException If space has been closed. + * @throws IpcSharedMemoryOperationTimedoutException If operation times out. + */ + public void write(byte[] buf, int off, int len, long timeout) throws IgniteCheckedException, + IpcSharedMemoryOperationTimedoutException { + assert buf != null; + assert len > 0; + assert buf.length >= off + len; + assert timeout >= 0; + + assert !isReader; + + lock.readLock().lock(); + + try { + if (closed.get()) + throw new IgniteCheckedException("Shared memory segment has been closed: " + this); + + IpcSharedMemoryUtils.writeSharedMemory(shmemPtr, buf, off, len, timeout); + } + finally { + lock.readLock().unlock(); + } + } + + /** + * @param buf Buffer. + * @param off Offset. + * @param len Length. + * @param timeout Operation timeout in milliseconds ({@code 0} to wait forever). + * @throws IgniteCheckedException If space has been closed. + * @throws IpcSharedMemoryOperationTimedoutException If operation times out. + */ + public void write(ByteBuffer buf, int off, int len, long timeout) throws IgniteCheckedException, + IpcSharedMemoryOperationTimedoutException { + assert buf != null; + assert len > 0; + assert buf.limit() >= off + len; + assert timeout >= 0; + assert !isReader; + + lock.readLock().lock(); + + try { + if (closed.get()) + throw new IgniteCheckedException("Shared memory segment has been closed: " + this); + + IpcSharedMemoryUtils.writeSharedMemoryByteBuffer(shmemPtr, buf, off, len, timeout); + } + finally { + lock.readLock().unlock(); + } + } + + /** + * Blocks until at least 1 byte is read. + * + * @param buf Buffer. + * @param off Offset. + * @param len Length. + * @param timeout Operation timeout in milliseconds ({@code 0} to wait forever). + * @return Read bytes count. + * @throws IgniteCheckedException If space has been closed. + * @throws IpcSharedMemoryOperationTimedoutException If operation times out. + */ + public int read(byte[] buf, int off, int len, long timeout) throws IgniteCheckedException, + IpcSharedMemoryOperationTimedoutException { + assert buf != null; + assert len > 0; + assert buf.length >= off + len; + + assert isReader; + + lock.readLock().lock(); + + try { + if (closed.get()) + throw new IgniteCheckedException("Shared memory segment has been closed: " + this); + + return (int) IpcSharedMemoryUtils.readSharedMemory(shmemPtr, buf, off, len, timeout); + } + finally { + lock.readLock().unlock(); + } + } + + /** + * Blocks until at least 1 byte is read. + * + * @param buf Buffer. + * @param off Offset. + * @param len Length. + * @param timeout Operation timeout in milliseconds ({@code 0} to wait forever). + * @return Read bytes count. + * @throws IgniteCheckedException If space has been closed. + * @throws IpcSharedMemoryOperationTimedoutException If operation times out. + */ + public int read(ByteBuffer buf, int off, int len, long timeout) throws IgniteCheckedException, + IpcSharedMemoryOperationTimedoutException { + assert buf != null; + assert len > 0; + assert buf.capacity() >= off + len; + assert isReader; + + lock.readLock().lock(); + + try { + if (closed.get()) + throw new IgniteCheckedException("Shared memory segment has been closed: " + this); + + return (int) IpcSharedMemoryUtils.readSharedMemoryByteBuffer(shmemPtr, buf, off, len, timeout); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void close() { + close0(false); + } + + /** + * Forcibly closes the space and frees all system resources. + * <p> + * This method should be called with caution as it may result to the other-party + * process crash. It is intended to call when there was an IO error during handshake + * and other party has not yet attached to the space. + */ + public void forceClose() { + close0(true); + } + + /** + * @return Shared memory ID. + */ + public int sharedMemoryId() { + return shmemId; + } + + /** + * @return Semaphore set ID. + */ + public int semaphoreId() { + return semId; + } + + /** + * @param force {@code True} to close the space. + */ + private void close0(boolean force) { + if (!closed.compareAndSet(false, true)) + return; + + IpcSharedMemoryUtils.ipcClose(shmemPtr); + + // Wait all readers and writes to leave critical section. + lock.writeLock().lock(); + + try { + IpcSharedMemoryUtils.freeSystemResources(tokFileName, shmemPtr, force); + } + finally { + lock.writeLock().unlock(); + } + + if (DEBUG && log.isDebugEnabled()) + log.debug("Shared memory space has been closed: " + this); + } + + /** + * @return Bytes available for read. + * @throws IgniteCheckedException If failed. + */ + public int unreadCount() throws IgniteCheckedException { + lock.readLock().lock(); + + try { + if (closed.get()) + throw new IgniteCheckedException("Shared memory segment has been closed: " + this); + + return IpcSharedMemoryUtils.unreadCount(shmemPtr); + } + finally { + lock.readLock().unlock(); + } + } + + /** + * @return Shared memory pointer. + */ + public long sharedMemPointer() { + return shmemPtr; + } + + /** + * @return Reader PID. + */ + public int readerPid() { + return readerPid; + } + + /** + * @return Writer PID. + */ + public int writerPid() { + return writerPid; + } + + /** + * @return Vis-a-vis PID. + */ + public int otherPartyPid() { + return isReader ? writerPid : readerPid; + } + + /** + * @return Token file name used to create shared memory space. + */ + public String tokenFileName() { + return tokFileName; + } + + /** + * @return Space size. + */ + public int size() { + return opSize; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IpcSharedMemorySpace.class, this, "closed", closed.get()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java index ec0566e..7d0abaa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java @@ -27,7 +27,7 @@ import java.util.*; /** * NOTE: Native library should be loaded, before methods of this class are called. Native library is loaded with: {@link - * GridIpcSharedMemoryNativeLoader#load()}. + * IpcSharedMemoryNativeLoader#load()}. */ public class IpcSharedMemoryUtils { /** @@ -84,10 +84,10 @@ public class IpcSharedMemoryUtils { * @param timeout Operation timeout. * @return Read bytes count. * @throws IgniteCheckedException If space has been closed. - * @throws GridIpcSharedMemoryOperationTimedoutException If operation times out. + * @throws IpcSharedMemoryOperationTimedoutException If operation times out. */ static native long readSharedMemory(long shMemPtr, byte dest[], long dOff, long size, long timeout) - throws IgniteCheckedException, GridIpcSharedMemoryOperationTimedoutException; + throws IgniteCheckedException, IpcSharedMemoryOperationTimedoutException; /** * @param shmemPtr Shared memory pointer. @@ -115,10 +115,10 @@ public class IpcSharedMemoryUtils { * @param timeout Operation timeout. * @return Read bytes count. * @throws IgniteCheckedException If space has been closed. - * @throws GridIpcSharedMemoryOperationTimedoutException If operation times out. + * @throws IpcSharedMemoryOperationTimedoutException If operation times out. */ static native long readSharedMemoryByteBuffer(long shMemPtr, ByteBuffer dest, long dOff, long size, long timeout) - throws IgniteCheckedException, GridIpcSharedMemoryOperationTimedoutException; + throws IgniteCheckedException, IpcSharedMemoryOperationTimedoutException; /** * @param shMemPtr Shared memory pointer @@ -127,10 +127,10 @@ public class IpcSharedMemoryUtils { * @param size Size. * @param timeout Operation timeout. * @throws IgniteCheckedException If space has been closed. - * @throws GridIpcSharedMemoryOperationTimedoutException If operation times out. + * @throws IpcSharedMemoryOperationTimedoutException If operation times out. */ static native void writeSharedMemory(long shMemPtr, byte src[], long sOff, long size, long timeout) - throws IgniteCheckedException, GridIpcSharedMemoryOperationTimedoutException; + throws IgniteCheckedException, IpcSharedMemoryOperationTimedoutException; /** * @param shMemPtr Shared memory pointer @@ -139,10 +139,10 @@ public class IpcSharedMemoryUtils { * @param size Size. * @param timeout Operation timeout. * @throws IgniteCheckedException If space has been closed. - * @throws GridIpcSharedMemoryOperationTimedoutException If operation times out. + * @throws IpcSharedMemoryOperationTimedoutException If operation times out. */ static native void writeSharedMemoryByteBuffer(long shMemPtr, ByteBuffer src, long sOff, long size, long timeout) - throws IgniteCheckedException, GridIpcSharedMemoryOperationTimedoutException; + throws IgniteCheckedException, IpcSharedMemoryOperationTimedoutException; /** @return PID of the current process (-1 on error). */ public static int pid() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java index 63b5682..5cbc506 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -33,7 +33,7 @@ import java.util.*; */ public class GridShmemCommunicationClient extends GridAbstractCommunicationClient { /** */ - private final GridIpcSharedMemoryClientEndpoint shmem; + private final IpcSharedMemoryClientEndpoint shmem; /** */ private final ByteBuffer writeBuf; @@ -59,7 +59,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien assert port > 0 && port < 0xffff; assert connTimeout >= 0; - shmem = new GridIpcSharedMemoryClientEndpoint(port, (int)connTimeout, log); + shmem = new IpcSharedMemoryClientEndpoint(port, (int)connTimeout, log); this.msgWriter = msgWriter; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java index f00f47b..fdc8dd7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java @@ -129,10 +129,10 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa long start0 = U.currentTimeMillis(); try { - Collection<GridIpcServerEndpoint> endPoints = ggfsProc.endpoints(ggfs.name()); + Collection<IpcServerEndpoint> endPoints = ggfsProc.endpoints(ggfs.name()); if (endPoints != null) { - for (GridIpcServerEndpoint ep : endPoints) + for (IpcServerEndpoint ep : endPoints) if (ep.isManagement()) res.ggfsEndpoints().add(new VisorGgfsEndpoint(ggfs.name(), g.name(), ep.getHost(), ep.getPort())); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 4e9369a..afaae4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -147,7 +147,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** IPC error message. */ public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " + "(switching to TCP, may be slower). For troubleshooting see " + - GridIpcSharedMemoryServerEndpoint.TROUBLESHOOTING_URL; + IpcSharedMemoryServerEndpoint.TROUBLESHOOTING_URL; /** Node attribute that is mapped to node IP addresses (value is <tt>comm.tcp.addrs</tt>). */ public static final String ATTR_ADDRS = "comm.tcp.addrs"; @@ -688,7 +688,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter private GridNioServer<GridTcpCommunicationMessageAdapter> nioSrvr; /** Shared memory server. */ - private GridIpcSharedMemoryServerEndpoint shmemSrv; + private IpcSharedMemoryServerEndpoint shmemSrv; /** {@code TCP_NODELAY} option value for created sockets. */ private boolean tcpNoDelay = DFLT_TCP_NODELAY; @@ -1609,7 +1609,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @return Server. * @throws IgniteCheckedException If failed. */ - @Nullable private GridIpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException { + @Nullable private IpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException { if (boundTcpShmemPort >= 0) throw new IgniteCheckedException("Shared memory server was already created on port " + boundTcpShmemPort); @@ -1621,8 +1621,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // If configured TCP port is busy, find first available in range. for (int port = shmemPort; port < shmemPort + locPortRange; port++) { try { - GridIpcSharedMemoryServerEndpoint srv = - new GridIpcSharedMemoryServerEndpoint(log, ignite.configuration().getNodeId(), gridName); + IpcSharedMemoryServerEndpoint srv = + new IpcSharedMemoryServerEndpoint(log, ignite.configuration().getNodeId(), gridName); srv.setPort(port); @@ -1907,8 +1907,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return createShmemClient(node, shmemPort); } catch (IgniteCheckedException e) { - if (e.hasCause(GridIpcOutOfSystemResourcesException.class)) - // Has cause or is itself the GridIpcOutOfSystemResourcesException. + if (e.hasCause(IpcOutOfSystemResourcesException.class)) + // Has cause or is itself the IpcOutOfSystemResourcesException. LT.warn(log, null, OUT_OF_RESOURCES_TCP_MSG); else if (getSpiContext().node(node.id()) != null) LT.warn(log, null, e.getMessage()); @@ -2453,12 +2453,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ private class ShmemAcceptWorker extends GridWorker { /** */ - private final GridIpcSharedMemoryServerEndpoint srv; + private final IpcSharedMemoryServerEndpoint srv; /** * @param srv Server. */ - ShmemAcceptWorker(GridIpcSharedMemoryServerEndpoint srv) { + ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) { super(gridName, "shmem-communication-acceptor", log); this.srv = srv; @@ -2497,12 +2497,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ private class ShmemWorker extends GridWorker { /** */ - private final GridIpcEndpoint endpoint; + private final IpcEndpoint endpoint; /** * @param endpoint Endpoint. */ - private ShmemWorker(GridIpcEndpoint endpoint) { + private ShmemWorker(IpcEndpoint endpoint) { super(gridName, "shmem-worker", log); this.endpoint = endpoint; @@ -2511,7 +2511,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { try { - GridIpcToNioAdapter<GridTcpCommunicationMessageAdapter> adapter = new GridIpcToNioAdapter<>( + IpcToNioAdapter<GridTcpCommunicationMessageAdapter> adapter = new IpcToNioAdapter<>( metricsLsnr, log, endpoint, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManagerIpcEndpointRegistrationAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManagerIpcEndpointRegistrationAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManagerIpcEndpointRegistrationAbstractSelfTest.java index 9761287..037d0bb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManagerIpcEndpointRegistrationAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManagerIpcEndpointRegistrationAbstractSelfTest.java @@ -102,9 +102,9 @@ public abstract class GridGgfsServerManagerIpcEndpointRegistrationAbstractSelfTe int shmem = 0; for (GridPortRecord record : ctx.ports().records()) { - if (record.clazz() == GridIpcSharedMemoryServerEndpoint.class) + if (record.clazz() == IpcSharedMemoryServerEndpoint.class) shmem++; - else if (record.clazz() == GridIpcServerTcpEndpoint.class) + else if (record.clazz() == IpcServerTcpEndpoint.class) tcp++; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest.java index a759266..3920ad6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest.java @@ -40,7 +40,7 @@ public class GridGgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest @Override public Object call() throws Exception { IgniteConfiguration cfg = gridConfiguration(); - cfg.setGgfsConfiguration(igniteFsConfiguration("shmem", GridIpcSharedMemoryServerEndpoint.DFLT_IPC_PORT, + cfg.setGgfsConfiguration(igniteFsConfiguration("shmem", IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT, null)); return G.start(cfg); @@ -48,6 +48,6 @@ public class GridGgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest }, IgniteCheckedException.class, null); assert e.getCause().getMessage().contains(" should not be configured on Windows (configure " + - GridIpcServerTcpEndpoint.class.getSimpleName() + ")"); + IpcServerTcpEndpoint.class.getSimpleName() + ")"); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/GridIpcServerEndpointDeserializerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/GridIpcServerEndpointDeserializerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/GridIpcServerEndpointDeserializerSelfTest.java deleted file mode 100644 index eb79a68..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/GridIpcServerEndpointDeserializerSelfTest.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.fs.*; -import org.apache.ignite.internal.util.ipc.loopback.*; -import org.apache.ignite.internal.util.ipc.shmem.*; -import org.apache.ignite.testframework.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * Tests for {@code GridIpcServerEndpointDeserializer}. - */ -public class GridIpcServerEndpointDeserializerSelfTest extends GridGgfsCommonAbstractTest { - /** */ - private Map<String,String> shmemSrvEndpoint; - - /** */ - private Map<String,String> tcpSrvEndpoint; - - /** - * Initialize test stuff. - */ - @Override protected void beforeTest() throws Exception { - shmemSrvEndpoint = new HashMap<>(); - shmemSrvEndpoint.put("port", "888"); - shmemSrvEndpoint.put("size", "111"); - shmemSrvEndpoint.put("tokenDirectoryPath", "test-my-path-baby"); - - tcpSrvEndpoint = new HashMap<>(); - tcpSrvEndpoint.put("port", "999"); - } - - /** - * @throws Exception In case of any exception. - */ - public void testDeserializeIfCfgIsNull() throws Exception { - GridTestUtils.assertThrows(log, new Callable<Object>() { - @SuppressWarnings("NullableProblems") - @Override public Object call() throws Exception { - return GridIpcServerEndpointDeserializer.deserialize(null); - } - }, NullPointerException.class, "Ouch! Argument cannot be null: endpointCfg"); - } - - /** - * @throws Exception In case of any exception. - */ - public void testDeserializeIfShmemAndNoTypeInfoInJson() throws Exception { - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - return GridIpcServerEndpointDeserializer.deserialize(shmemSrvEndpoint); - } - }, IgniteCheckedException.class, "Failed to create server endpoint (type is not specified)"); - } - - /** - * @throws Exception In case of any exception. - */ - public void testDeserializeIfShmemAndNoUnknownTypeInfoInJson() throws Exception { - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - Map<String, String> endPnt = new HashMap<>(); - - endPnt.putAll(shmemSrvEndpoint); - endPnt.put("type", "unknownEndpointType"); - - return GridIpcServerEndpointDeserializer.deserialize(endPnt); - } - }, IgniteCheckedException.class, "Failed to create server endpoint (type is unknown): unknownEndpointType"); - } - - /** - * @throws Exception In case of any exception. - */ - public void testDeserializeIfLoopbackAndJsonIsLightlyBroken() throws Exception { - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - return GridIpcServerEndpointDeserializer.deserialize(tcpSrvEndpoint); - } - }, IgniteCheckedException.class, null); - } - - /** - * @throws Exception In case of any exception. - */ - public void testDeserializeIfShmemAndJsonIsOk() throws Exception { - Map<String, String> endPnt = new HashMap<>(); - - endPnt.putAll(shmemSrvEndpoint); - endPnt.put("type", "shmem"); - - GridIpcServerEndpoint deserialized = GridIpcServerEndpointDeserializer.deserialize(endPnt); - - assertTrue(deserialized instanceof GridIpcSharedMemoryServerEndpoint); - - GridIpcSharedMemoryServerEndpoint deserializedShmemEndpoint = (GridIpcSharedMemoryServerEndpoint)deserialized; - - assertEquals(shmemSrvEndpoint.get("port"), String.valueOf(deserializedShmemEndpoint.getPort())); - assertEquals(shmemSrvEndpoint.get("size"), String.valueOf(deserializedShmemEndpoint.getSize())); - assertEquals(shmemSrvEndpoint.get("tokenDirectoryPath"), deserializedShmemEndpoint.getTokenDirectoryPath()); - } - - /** - * @throws Exception In case of any exception. - */ - public void testDeserializeIfShmemAndJsonIsOkAndDefaultValuesAreSetToFields() throws Exception { - GridIpcSharedMemoryServerEndpoint defShmemSrvEndpoint = new GridIpcSharedMemoryServerEndpoint(); - defShmemSrvEndpoint.setPort(8); - - Map<String, String> endPnt = new HashMap<>(); - - endPnt.put("type", "shmem"); - endPnt.put("port", String.valueOf(defShmemSrvEndpoint.getPort())); - - GridIpcServerEndpoint deserialized = GridIpcServerEndpointDeserializer.deserialize(endPnt); - - assertTrue(deserialized instanceof GridIpcSharedMemoryServerEndpoint); - - GridIpcSharedMemoryServerEndpoint deserializedShmemEndpoint = (GridIpcSharedMemoryServerEndpoint)deserialized; - - assertEquals(defShmemSrvEndpoint.getPort(), deserializedShmemEndpoint.getPort()); - assertEquals(defShmemSrvEndpoint.getSize(), deserializedShmemEndpoint.getSize()); - assertEquals(defShmemSrvEndpoint.getTokenDirectoryPath(), deserializedShmemEndpoint.getTokenDirectoryPath()); - } - - /** - * @throws Exception In case of any exception. - */ - public void testDeserializeIfLoopbackAndJsonIsOk() throws Exception { - Map<String, String> endPnt = new HashMap<>(); - - endPnt.putAll(tcpSrvEndpoint); - endPnt.put("type", "tcp"); - - GridIpcServerEndpoint deserialized = GridIpcServerEndpointDeserializer.deserialize(endPnt); - - assertTrue(deserialized instanceof GridIpcServerTcpEndpoint); - - assertEquals(tcpSrvEndpoint.get("port"), String.valueOf(deserialized.getPort())); - } -}