http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializerSelfTest.java new file mode 100644 index 0000000..a32ebb6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializerSelfTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.fs.*; +import org.apache.ignite.internal.util.ipc.loopback.*; +import org.apache.ignite.internal.util.ipc.shmem.*; +import org.apache.ignite.testframework.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Tests for {@code IpcServerEndpointDeserializer}. + */ +public class IpcServerEndpointDeserializerSelfTest extends GridGgfsCommonAbstractTest { + /** */ + private Map<String,String> shmemSrvEndpoint; + + /** */ + private Map<String,String> tcpSrvEndpoint; + + /** + * Initialize test stuff. + */ + @Override protected void beforeTest() throws Exception { + shmemSrvEndpoint = new HashMap<>(); + shmemSrvEndpoint.put("port", "888"); + shmemSrvEndpoint.put("size", "111"); + shmemSrvEndpoint.put("tokenDirectoryPath", "test-my-path-baby"); + + tcpSrvEndpoint = new HashMap<>(); + tcpSrvEndpoint.put("port", "999"); + } + + /** + * @throws Exception In case of any exception. + */ + public void testDeserializeIfCfgIsNull() throws Exception { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @SuppressWarnings("NullableProblems") + @Override public Object call() throws Exception { + return IpcServerEndpointDeserializer.deserialize(null); + } + }, NullPointerException.class, "Ouch! Argument cannot be null: endpointCfg"); + } + + /** + * @throws Exception In case of any exception. + */ + public void testDeserializeIfShmemAndNoTypeInfoInJson() throws Exception { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return IpcServerEndpointDeserializer.deserialize(shmemSrvEndpoint); + } + }, IgniteCheckedException.class, "Failed to create server endpoint (type is not specified)"); + } + + /** + * @throws Exception In case of any exception. + */ + public void testDeserializeIfShmemAndNoUnknownTypeInfoInJson() throws Exception { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + Map<String, String> endPnt = new HashMap<>(); + + endPnt.putAll(shmemSrvEndpoint); + endPnt.put("type", "unknownEndpointType"); + + return IpcServerEndpointDeserializer.deserialize(endPnt); + } + }, IgniteCheckedException.class, "Failed to create server endpoint (type is unknown): unknownEndpointType"); + } + + /** + * @throws Exception In case of any exception. + */ + public void testDeserializeIfLoopbackAndJsonIsLightlyBroken() throws Exception { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return IpcServerEndpointDeserializer.deserialize(tcpSrvEndpoint); + } + }, IgniteCheckedException.class, null); + } + + /** + * @throws Exception In case of any exception. + */ + public void testDeserializeIfShmemAndJsonIsOk() throws Exception { + Map<String, String> endPnt = new HashMap<>(); + + endPnt.putAll(shmemSrvEndpoint); + endPnt.put("type", "shmem"); + + IpcServerEndpoint deserialized = IpcServerEndpointDeserializer.deserialize(endPnt); + + assertTrue(deserialized instanceof IpcSharedMemoryServerEndpoint); + + IpcSharedMemoryServerEndpoint deserializedShmemEndpoint = (IpcSharedMemoryServerEndpoint)deserialized; + + assertEquals(shmemSrvEndpoint.get("port"), String.valueOf(deserializedShmemEndpoint.getPort())); + assertEquals(shmemSrvEndpoint.get("size"), String.valueOf(deserializedShmemEndpoint.getSize())); + assertEquals(shmemSrvEndpoint.get("tokenDirectoryPath"), deserializedShmemEndpoint.getTokenDirectoryPath()); + } + + /** + * @throws Exception In case of any exception. + */ + public void testDeserializeIfShmemAndJsonIsOkAndDefaultValuesAreSetToFields() throws Exception { + IpcSharedMemoryServerEndpoint defShmemSrvEndpoint = new IpcSharedMemoryServerEndpoint(); + defShmemSrvEndpoint.setPort(8); + + Map<String, String> endPnt = new HashMap<>(); + + endPnt.put("type", "shmem"); + endPnt.put("port", String.valueOf(defShmemSrvEndpoint.getPort())); + + IpcServerEndpoint deserialized = IpcServerEndpointDeserializer.deserialize(endPnt); + + assertTrue(deserialized instanceof IpcSharedMemoryServerEndpoint); + + IpcSharedMemoryServerEndpoint deserializedShmemEndpoint = (IpcSharedMemoryServerEndpoint)deserialized; + + assertEquals(defShmemSrvEndpoint.getPort(), deserializedShmemEndpoint.getPort()); + assertEquals(defShmemSrvEndpoint.getSize(), deserializedShmemEndpoint.getSize()); + assertEquals(defShmemSrvEndpoint.getTokenDirectoryPath(), deserializedShmemEndpoint.getTokenDirectoryPath()); + } + + /** + * @throws Exception In case of any exception. + */ + public void testDeserializeIfLoopbackAndJsonIsOk() throws Exception { + Map<String, String> endPnt = new HashMap<>(); + + endPnt.putAll(tcpSrvEndpoint); + endPnt.put("type", "tcp"); + + IpcServerEndpoint deserialized = IpcServerEndpointDeserializer.deserialize(endPnt); + + assertTrue(deserialized instanceof IpcServerTcpEndpoint); + + assertEquals(tcpSrvEndpoint.get("port"), String.valueOf(deserialized.getPort())); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GgfsSharedMemoryTestClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GgfsSharedMemoryTestClient.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GgfsSharedMemoryTestClient.java new file mode 100644 index 0000000..a5633b3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GgfsSharedMemoryTestClient.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem; + +import org.apache.ignite.internal.util.*; +import org.apache.ignite.logger.java.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.ipc.*; + +import java.io.*; + +/** + * Test-purposed app launching {@link IpcSharedMemoryClientEndpoint} and designed + * to be used with conjunction to {@link GridJavaProcess}. + */ +public class GgfsSharedMemoryTestClient { + /** + * Internal protocol message prefix saying that the next text in the outputted line + * are comma-separated shared memory ids. + */ + static final String SHMEM_IDS_MSG_PREFIX = "SHMEM_IDS_MSG_PREFIX"; + + /** + * @param args Args. + */ + @SuppressWarnings({"BusyWait", "InfiniteLoopStatement"}) + public static void main(String[] args) { + X.println("Starting client ..."); + + // Tell our process PID to the wrapper. + X.println(GridJavaProcess.PID_MSG_PREFIX + U.jvmPid()); + + OutputStream os = null; + + try { + IpcSharedMemoryClientEndpoint client = (IpcSharedMemoryClientEndpoint) IpcEndpointFactory.connectEndpoint( + "shmem:" + IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT, new IgniteJavaLogger()); + + os = client.outputStream(); + + // Tell our shmem ids. + X.println(SHMEM_IDS_MSG_PREFIX + client.inSpace().sharedMemoryId() + "," + + client.outSpace().sharedMemoryId()); + + for (;;) { + X.println("Write: 123"); + + os.write(123); + + Thread.sleep(IpcSharedMemoryCrashDetectionSelfTest.RW_SLEEP_TIMEOUT); + } + } + catch (Exception e) { + e.printStackTrace(); + } + finally { + U.closeQuiet(os); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GgfsSharedMemoryTestServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GgfsSharedMemoryTestServer.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GgfsSharedMemoryTestServer.java new file mode 100644 index 0000000..b3d41b2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GgfsSharedMemoryTestServer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.*; + +import java.io.*; + +/** + * Test-purposed app launching {@link IpcSharedMemoryServerEndpoint} and designed + * to be used with conjunction to {@link GridJavaProcess}. + */ +public class GgfsSharedMemoryTestServer { + @SuppressWarnings({"BusyWait", "InfiniteLoopStatement"}) + public static void main(String[] args) throws IgniteCheckedException { + System.out.println("Starting server ..."); + + U.setWorkDirectory(null, U.getGridGainHome()); + + // Tell our process PID to the wrapper. + X.println(GridJavaProcess.PID_MSG_PREFIX + U.jvmPid()); + + InputStream is = null; + + try { + IpcServerEndpoint srv = new IpcSharedMemoryServerEndpoint(); + + new GridTestResources().inject(srv); + + srv.start(); + + IpcEndpoint clientEndpoint = srv.accept(); + + is = clientEndpoint.inputStream(); + + for (;;) { + X.println("Before read."); + + is.read(); + + Thread.sleep(IpcSharedMemoryCrashDetectionSelfTest.RW_SLEEP_TIMEOUT); + } + } + catch (Exception e) { + e.printStackTrace(); + } + finally { + U.closeQuiet(is); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridGgfsSharedMemoryTestClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridGgfsSharedMemoryTestClient.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridGgfsSharedMemoryTestClient.java deleted file mode 100644 index cc1e626..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridGgfsSharedMemoryTestClient.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem; - -import org.apache.ignite.internal.util.*; -import org.apache.ignite.logger.java.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.ipc.*; - -import java.io.*; - -/** - * Test-purposed app launching {@link GridIpcSharedMemoryClientEndpoint} and designed - * to be used with conjunction to {@link GridJavaProcess}. - */ -public class GridGgfsSharedMemoryTestClient { - /** - * Internal protocol message prefix saying that the next text in the outputted line - * are comma-separated shared memory ids. - */ - static final String SHMEM_IDS_MSG_PREFIX = "SHMEM_IDS_MSG_PREFIX"; - - /** - * @param args Args. - */ - @SuppressWarnings({"BusyWait", "InfiniteLoopStatement"}) - public static void main(String[] args) { - X.println("Starting client ..."); - - // Tell our process PID to the wrapper. - X.println(GridJavaProcess.PID_MSG_PREFIX + U.jvmPid()); - - OutputStream os = null; - - try { - GridIpcSharedMemoryClientEndpoint client = (GridIpcSharedMemoryClientEndpoint)GridIpcEndpointFactory.connectEndpoint( - "shmem:" + GridIpcSharedMemoryServerEndpoint.DFLT_IPC_PORT, new IgniteJavaLogger()); - - os = client.outputStream(); - - // Tell our shmem ids. - X.println(SHMEM_IDS_MSG_PREFIX + client.inSpace().sharedMemoryId() + "," + - client.outSpace().sharedMemoryId()); - - for (;;) { - X.println("Write: 123"); - - os.write(123); - - Thread.sleep(GridIpcSharedMemoryCrashDetectionSelfTest.RW_SLEEP_TIMEOUT); - } - } - catch (Exception e) { - e.printStackTrace(); - } - finally { - U.closeQuiet(os); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridGgfsSharedMemoryTestServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridGgfsSharedMemoryTestServer.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridGgfsSharedMemoryTestServer.java deleted file mode 100644 index 05ddc3a..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridGgfsSharedMemoryTestServer.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.ipc.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.*; - -import java.io.*; - -/** - * Test-purposed app launching {@link GridIpcSharedMemoryServerEndpoint} and designed - * to be used with conjunction to {@link GridJavaProcess}. - */ -public class GridGgfsSharedMemoryTestServer { - @SuppressWarnings({"BusyWait", "InfiniteLoopStatement"}) - public static void main(String[] args) throws IgniteCheckedException { - System.out.println("Starting server ..."); - - U.setWorkDirectory(null, U.getGridGainHome()); - - // Tell our process PID to the wrapper. - X.println(GridJavaProcess.PID_MSG_PREFIX + U.jvmPid()); - - InputStream is = null; - - try { - GridIpcServerEndpoint srv = new GridIpcSharedMemoryServerEndpoint(); - - new GridTestResources().inject(srv); - - srv.start(); - - GridIpcEndpoint clientEndpoint = srv.accept(); - - is = clientEndpoint.inputStream(); - - for (;;) { - X.println("Before read."); - - is.read(); - - Thread.sleep(GridIpcSharedMemoryCrashDetectionSelfTest.RW_SLEEP_TIMEOUT); - } - } - catch (Exception e) { - e.printStackTrace(); - } - finally { - U.closeQuiet(is); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryCrashDetectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryCrashDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryCrashDetectionSelfTest.java deleted file mode 100644 index 37468f1..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryCrashDetectionSelfTest.java +++ /dev/null @@ -1,500 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem; - -import org.apache.commons.collections.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.ipc.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Test shared memory endpoints crash detection. - */ -public class GridIpcSharedMemoryCrashDetectionSelfTest extends GridCommonAbstractTest { - /** Timeout in ms between read/write attempts in busy-wait loops. */ - public static final int RW_SLEEP_TIMEOUT = 50; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - GridIpcSharedMemoryNativeLoader.load(); - } - - /** - * @throws Exception If failed. - */ - public void testGgfsServerClientInteractionsUponClientKilling() throws Exception { - U.setWorkDirectory(null, U.getGridGainHome()); - - // Run server endpoint. - GridIpcSharedMemoryServerEndpoint srv = new GridIpcSharedMemoryServerEndpoint(); - - new GridTestResources().inject(srv); - - try { - srv.start(); - - info("Check that server gets correct exception upon client's killing."); - - info("Shared memory IDs before starting client endpoint: " + IpcSharedMemoryUtils.sharedMemoryIds()); - - Collection<Integer> shmemIdsWithinInteractions = interactWithClient(srv, true); - - Collection<Integer> shmemIdsAfterInteractions = null; - - // Give server endpoint some time to make resource clean up. See GridIpcSharedMemoryServerEndpoint.GC_FREQ. - for (int i = 0; i < 12; i++) { - shmemIdsAfterInteractions = IpcSharedMemoryUtils.sharedMemoryIds(); - - info("Shared memory IDs created within interaction: " + shmemIdsWithinInteractions); - info("Shared memory IDs after killing client endpoint: " + shmemIdsAfterInteractions); - - if (CollectionUtils.containsAny(shmemIdsAfterInteractions, shmemIdsWithinInteractions)) - U.sleep(1000); - else - break; - } - - assertFalse("List of shared memory IDs after killing client endpoint should not include IDs created " + - "within server-client interactions.", - CollectionUtils.containsAny(shmemIdsAfterInteractions, shmemIdsWithinInteractions)); - } - finally { - srv.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testGgfsClientServerInteractionsUponServerKilling() throws Exception { - Collection<Integer> shmemIdsBeforeInteractions = IpcSharedMemoryUtils.sharedMemoryIds(); - - info("Shared memory IDs before starting server-client interactions: " + shmemIdsBeforeInteractions); - - Collection<Integer> shmemIdsWithinInteractions = interactWithServer(); - - Collection<Integer> shmemIdsAfterInteractions = IpcSharedMemoryUtils.sharedMemoryIds(); - - info("Shared memory IDs created within interaction: " + shmemIdsWithinInteractions); - info("Shared memory IDs after server and client killing: " + shmemIdsAfterInteractions); - - if (!U.isLinux()) - assertTrue("List of shared memory IDs after server-client interactions should include IDs created within " + - "client-server interactions.", shmemIdsAfterInteractions.containsAll(shmemIdsWithinInteractions)); - else - assertFalse("List of shared memory IDs after server-client interactions should not include IDs created " + - "(on Linux): within client-server interactions.", - CollectionUtils.containsAny(shmemIdsAfterInteractions, shmemIdsWithinInteractions)); - - ProcessStartResult srvStartRes = startSharedMemoryTestServer(); - - try { - // Give server endpoint some time to make resource clean up. See GridIpcSharedMemoryServerEndpoint.GC_FREQ. - for (int i = 0; i < 12; i++) { - shmemIdsAfterInteractions = IpcSharedMemoryUtils.sharedMemoryIds(); - - info("Shared memory IDs after server restart: " + shmemIdsAfterInteractions); - - if (CollectionUtils.containsAny(shmemIdsAfterInteractions, shmemIdsWithinInteractions)) - U.sleep(1000); - else - break; - } - - assertFalse("List of shared memory IDs after server endpoint restart should not include IDs created: " + - "within client-server interactions.", - CollectionUtils.containsAny(shmemIdsAfterInteractions, shmemIdsWithinInteractions)); - } - finally { - srvStartRes.proc().kill(); - - srvStartRes.isKilledLatch().await(); - } - } - - /** - * @throws Exception If failed. - */ - public void testClientThrowsCorrectExceptionUponServerKilling() throws Exception { - info("Shared memory IDs before starting server-client interactions: " + - IpcSharedMemoryUtils.sharedMemoryIds()); - - Collection<Integer> shmemIdsWithinInteractions = checkClientThrowsCorrectExceptionUponServerKilling(); - - Collection<Integer> shmemIdsAfterInteractions = IpcSharedMemoryUtils.sharedMemoryIds(); - - info("Shared memory IDs created within interaction: " + shmemIdsWithinInteractions); - info("Shared memory IDs after server killing and client graceful termination: " + shmemIdsAfterInteractions); - - assertFalse("List of shared memory IDs after killing server endpoint should not include IDs created " + - "within server-client interactions.", - CollectionUtils.containsAny(shmemIdsAfterInteractions, shmemIdsWithinInteractions)); - } - - /** - * Launches GridGgfsSharedMemoryTestServer and GridGgfsSharedMemoryTestClient. - * After successful connection kills firstly server and secondly client. - * - * @return Collection of shared memory IDs created while client-server interactions. - * @throws Exception In case of any exception happen. - */ - private Collection<Integer> interactWithServer() throws Exception { - ProcessStartResult srvStartRes = startSharedMemoryTestServer(); - - ProcessStartResult clientStartRes = startSharedMemoryTestClient(); - - // Wait until client and server start to talk. - clientStartRes.isReadyLatch().await(); - - info("Going to kill server."); - - srvStartRes.proc().kill(); - - srvStartRes.isKilledLatch().await(); - - info("Going to kill client."); - - clientStartRes.proc().kill(); - - clientStartRes.isKilledLatch().await(); - - return clientStartRes.shmemIds(); - } - - /** - * Launches GridGgfsSharedMemoryTestServer and connects to it with client endpoint. - * After couple of reads-writes kills the server and checks client throws correct exception. - * - * @return List of shared memory IDs created while client-server interactions. - * @throws Exception In case of any exception happen. - */ - @SuppressWarnings("BusyWait") - private Collection<Integer> checkClientThrowsCorrectExceptionUponServerKilling() throws Exception { - ProcessStartResult srvStartRes = startSharedMemoryTestServer(); - - Collection<Integer> shmemIds = new ArrayList<>(); - GridIpcSharedMemoryClientEndpoint client = null; - - int interactionsCntBeforeSrvKilling = 5; - int i = 1; - - try { - // Run client endpoint. - client = (GridIpcSharedMemoryClientEndpoint)GridIpcEndpointFactory.connectEndpoint( - "shmem:" + GridIpcSharedMemoryServerEndpoint.DFLT_IPC_PORT, log); - - OutputStream os = client.outputStream(); - - shmemIds.add(client.inSpace().sharedMemoryId()); - shmemIds.add(client.outSpace().sharedMemoryId()); - - for (; i < interactionsCntBeforeSrvKilling * 2; i++) { - info("Write: 123"); - - os.write(123); - - Thread.sleep(RW_SLEEP_TIMEOUT); - - if (i == interactionsCntBeforeSrvKilling) { - info("Going to kill server."); - - srvStartRes.proc().kill(); - - info("Write 512k array to hang write procedure."); - - os.write(new byte[512 * 1024]); - } - } - - fail("Client should throw IOException upon server killing."); - } - catch (IOException e) { - assertTrue(i >= interactionsCntBeforeSrvKilling); - - assertTrue(X.hasCause(e, IgniteCheckedException.class)); - assertTrue(X.cause(e, IgniteCheckedException.class).getMessage().contains("Shared memory segment has been closed")); - } - finally { - U.closeQuiet(client); - } - - srvStartRes.isKilledLatch().await(); - - return shmemIds; - } - - /** - * Creates client endpoint and launches interaction between the one and the given server endpoint. - * - * - * @param srv Server endpoint to interact with. - * @param killClient Whether or not kill client endpoint within interaction. - * @return List of shared memory IDs created while client-server interactions. - * @throws Exception In case of any exception happen. - */ - @SuppressWarnings({"BusyWait", "TypeMayBeWeakened"}) - private Collection<Integer> interactWithClient(GridIpcSharedMemoryServerEndpoint srv, boolean killClient) - throws Exception { - ProcessStartResult clientStartRes = startSharedMemoryTestClient(); - - GridIpcSharedMemoryClientEndpoint clientEndpoint = (GridIpcSharedMemoryClientEndpoint)srv.accept(); - - Collection<Integer> shmemIds = new ArrayList<>(); - InputStream is = null; - - int interactionsCntBeforeClientKilling = 5; - int i = 1; - - try { - is = clientEndpoint.inputStream(); - - shmemIds.add(clientEndpoint.inSpace().sharedMemoryId()); - shmemIds.add(clientEndpoint.outSpace().sharedMemoryId()); - - for (; i < interactionsCntBeforeClientKilling * 2; i++) { - info("Before read."); - - is.read(); - - Thread.sleep(RW_SLEEP_TIMEOUT); - - if (killClient && i == interactionsCntBeforeClientKilling) { - info("Going to kill client."); - - clientStartRes.proc().kill(); - } - } - } - catch (IOException e) { - assertTrue("No IOException should be thrown if we do not kill client.", killClient); - assertTrue("No IOException should be thrown before client is killed.", - i > interactionsCntBeforeClientKilling); - - assertTrue(X.hasCause(e, IgniteCheckedException.class)); - assertTrue(X.cause(e, IgniteCheckedException.class).getMessage().contains("Shared memory segment has been closed")); - - clientStartRes.isKilledLatch().await(); - - return shmemIds; - } - finally { - U.closeQuiet(is); - } - - assertTrue( - "Interactions count should be bigger than interactionsCntBeforeClientKilling if we do not kill client.", - i > interactionsCntBeforeClientKilling); - - // Cleanup client. - clientStartRes.proc().kill(); - - clientStartRes.isKilledLatch().await(); - - assertFalse("No IOException have been thrown while the client should be killed.", killClient); - - return shmemIds; - } - - /** - * Starts {@code GridGgfsSharedMemoryTestClient}. The method doesn't wait while client being started. - * - * @return Start result of the {@code GridGgfsSharedMemoryTestClient}. - * @throws Exception In case of any exception happen. - */ - private ProcessStartResult startSharedMemoryTestClient() throws Exception { - /** */ - final CountDownLatch killedLatch = new CountDownLatch(1); - - /** */ - final CountDownLatch readyLatch = new CountDownLatch(1); - - /** */ - final ProcessStartResult res = new ProcessStartResult(); - - /** Process. */ - GridJavaProcess proc = GridJavaProcess.exec( - GridGgfsSharedMemoryTestClient.class, null, - log, - new CI1<String>() { - @Override public void apply(String s) { - info("Client process prints: " + s); - - if (s.startsWith(GridGgfsSharedMemoryTestClient.SHMEM_IDS_MSG_PREFIX)) { - res.shmemIds(s.substring(GridGgfsSharedMemoryTestClient.SHMEM_IDS_MSG_PREFIX.length())); - - readyLatch.countDown(); - } - } - }, - new CA() { - @Override public void apply() { - info("Client is killed"); - - killedLatch.countDown(); - } - }, - null, - System.getProperty("surefire.test.class.path") - ); - - res.proc(proc); - res.isKilledLatch(killedLatch); - res.isReadyLatch(readyLatch); - - return res; - } - - /** - * Starts {@code GridGgfsSharedMemoryTestServer}. The method waits while server being started. - * - * @return Start result of the {@code GridGgfsSharedMemoryTestServer}. - * @throws Exception In case of any exception happen. - */ - private ProcessStartResult startSharedMemoryTestServer() throws Exception { - final CountDownLatch srvReady = new CountDownLatch(1); - final CountDownLatch isKilledLatch = new CountDownLatch(1); - - GridJavaProcess proc = GridJavaProcess.exec( - GridGgfsSharedMemoryTestServer.class, null, - log, - new CI1<String>() { - @Override public void apply(String str) { - info("Server process prints: " + str); - - if (str.contains("IPC shared memory server endpoint started")) - srvReady.countDown(); - } - }, - new CA() { - @Override public void apply() { - info("Server is killed"); - - isKilledLatch.countDown(); - } - }, - null, - System.getProperty("surefire.test.class.path") - ); - - srvReady.await(); - - ProcessStartResult res = new ProcessStartResult(); - - res.proc(proc); - res.isKilledLatch(isKilledLatch); - - return res; - } - - /** - * Internal utility class to store results of running client/server in separate process. - */ - private static class ProcessStartResult { - /** Java process within which some class has been run. */ - private GridJavaProcess proc; - - /** Count down latch to signal when process termination will be detected. */ - private CountDownLatch killedLatch; - - /** Count down latch to signal when process is readiness (in terms of business logic) will be detected. */ - private CountDownLatch readyLatch; - - /** Shared memory IDs string read from system.input. */ - private Collection<Integer> shmemIds; - - /** - * @return Java process within which some class has been run. - */ - GridJavaProcess proc() { - return proc; - } - - /** - * Sets Java process within which some class has been run. - * - * @param proc Java process. - */ - void proc(GridJavaProcess proc) { - this.proc = proc; - } - - /** - * @return Latch to signal when process termination will be detected. - */ - CountDownLatch isKilledLatch() { - return killedLatch; - } - - /** - * Sets CountDownLatch to signal when process termination will be detected. - * - * @param killedLatch CountDownLatch - */ - void isKilledLatch(CountDownLatch killedLatch) { - this.killedLatch = killedLatch; - } - - /** - * @return Latch to signal when process is readiness (in terms of business logic) will be detected. - */ - CountDownLatch isReadyLatch() { - return readyLatch; - } - - /** - * Sets CountDownLatch to signal when process readiness (in terms of business logic) will be detected. - * - * @param readyLatch CountDownLatch - */ - void isReadyLatch(CountDownLatch readyLatch) { - this.readyLatch = readyLatch; - } - - /** - * @return Shared memory IDs string read from system.input. Nullable. - */ - @Nullable Collection<Integer> shmemIds() { - return shmemIds; - } - - /** - * Sets Shared memory IDs string read from system.input. - * - * @param shmemIds Shared memory IDs string. - */ - public void shmemIds(String shmemIds) { - this.shmemIds = (shmemIds == null) ? null : - F.transform(shmemIds.split(","), new C1<String, Integer>() { - @Override public Integer apply(String s) { - return Long.valueOf(s).intValue(); - } - }); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryFakeClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryFakeClient.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryFakeClient.java deleted file mode 100644 index e45813f..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryFakeClient.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem; - -import org.apache.ignite.logger.java.*; -import org.apache.ignite.internal.util.ipc.*; - -/** - * - */ -public class GridIpcSharedMemoryFakeClient { - /** - * @param args Args. - * @throws Exception If failed. - */ - public static void main(String[] args) throws Exception{ - GridIpcEndpointFactory.connectEndpoint("shmem:10500", new IgniteJavaLogger()); - GridIpcEndpointFactory.connectEndpoint("shmem:10500", new IgniteJavaLogger()); - GridIpcEndpointFactory.connectEndpoint("shmem:10500", new IgniteJavaLogger()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoaderSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoaderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoaderSelfTest.java deleted file mode 100644 index fad783f..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoaderSelfTest.java +++ /dev/null @@ -1,46 +0,0 @@ -package org.apache.ignite.internal.util.ipc.shmem; - -import junit.framework.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -public class GridIpcSharedMemoryNativeLoaderSelfTest extends TestCase { - - public void testLoadWithCorruptedLibFile() throws Exception { - if (U.isWindows()) - return; - - Process ps = GridJavaProcess.exec( - LoadWithCorruptedLibFileTestRunner.class, - null, - null, - null, - null, - Collections.<String>emptyList(), - null - ).getProcess(); - - readStreams(ps); - - int code = ps.waitFor(); - - assertEquals("Returned code have to be 0.", 0, code); - } - - private void readStreams(Process proc) throws IOException { - BufferedReader stdOut = new BufferedReader(new InputStreamReader(proc.getInputStream())); - - String s; - - while ((s = stdOut.readLine()) != null) - System.out.println("OUT>>>>>> " + s); - - BufferedReader errOut = new BufferedReader(new InputStreamReader(proc.getErrorStream())); - - while ((s = errOut.readLine()) != null) - System.out.println("ERR>>>>>> " + s); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNodeStartup.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNodeStartup.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNodeStartup.java deleted file mode 100644 index 878eb78..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNodeStartup.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.fs.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.util.*; - -import static org.apache.ignite.events.IgniteEventType.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; - -/** - * - */ -public class GridIpcSharedMemoryNodeStartup { - /** - * @param args Args. - * @throws Exception If failed. - */ - public static void main(String[] args) throws Exception{ - IgniteConfiguration cfg = new IgniteConfiguration(); - - IgniteFsConfiguration ggfsCfg = new IgniteFsConfiguration(); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); - - cfg.setDiscoverySpi(discoSpi); - - Map<String, String> endpointCfg = new HashMap<>(); - - endpointCfg.put("type", "shmem"); - endpointCfg.put("port", "10500"); - - ggfsCfg.setIpcEndpointConfiguration(endpointCfg); - - ggfsCfg.setDataCacheName("partitioned"); - ggfsCfg.setMetaCacheName("partitioned"); - ggfsCfg.setName("ggfs"); - - cfg.setGgfsConfiguration(ggfsCfg); - - CacheConfiguration cacheCfg = new CacheConfiguration(); - - cacheCfg.setName("partitioned"); - cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setDistributionMode(PARTITIONED_ONLY); - cacheCfg.setWriteSynchronizationMode(FULL_SYNC); - cacheCfg.setEvictionPolicy(null); - cacheCfg.setBackups(0); - cacheCfg.setQueryIndexEnabled(false); - - cfg.setCacheConfiguration(cacheCfg); - - cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); - - try (Ignite ignored = G.start(cfg)) { - X.println("Press any key to stop grid..."); - - System.in.read(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemorySpaceSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemorySpaceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemorySpaceSelfTest.java deleted file mode 100644 index f443122..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemorySpaceSelfTest.java +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jdk8.backport.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * - */ -public class GridIpcSharedMemorySpaceSelfTest extends GridCommonAbstractTest { - /** */ - public static final int DATA_LEN = 1024 * 1024; - - /** */ - private static final byte[] DATA = new byte[DATA_LEN]; - - /** - * - */ - static { - for (int i = 0; i < DATA_LEN; i++) - DATA[i] = (byte)i; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - GridIpcSharedMemoryNativeLoader.load(); - } - - /** - * @throws Exception If failed. - */ - public void testBasicOperations() throws Exception { - File tokFile = new File(IgniteSystemProperties.getString("java.io.tmpdir"), UUID.randomUUID().toString()); - - assert tokFile.createNewFile(); - - final String tok = tokFile.getAbsolutePath(); - - info("Array length: " + DATA.length); - - final AtomicReference<GridIpcSharedMemorySpace> spaceRef = new AtomicReference<>(); - - IgniteFuture<?> fut1 = multithreadedAsync( - new Callable<Object>() { - @SuppressWarnings("TooBroadScope") - @Override public Object call() throws Exception { - try (GridIpcSharedMemorySpace space = new GridIpcSharedMemorySpace(tok, 0, 0, 128, false, - log)) { - spaceRef.set(space); - - int bytesWritten = 0; - - for (; ; ) { - int len = Math.min(DATA.length - bytesWritten, - ThreadLocalRandom8.current().nextInt(256) + 1); - - space.write(DATA, bytesWritten, len, 0); - - bytesWritten += len; - - if (bytesWritten == DATA.length) - break; - } - - info("Thread finished."); - - return null; - } - } - }, - 1, - "writer"); - - IgniteFuture<?> fut2 = multithreadedAsync( - new Callable<Object>() { - @SuppressWarnings({"TooBroadScope", "StatementWithEmptyBody"}) - @Override public Object call() throws Exception { - GridIpcSharedMemorySpace inSpace; - - while ((inSpace = spaceRef.get()) == null) { - // No-op; - } - - try (GridIpcSharedMemorySpace space = new GridIpcSharedMemorySpace(tok, 0, 0, 128, true, - inSpace.sharedMemoryId(), log)) { - byte[] buf = new byte[DATA_LEN]; - - int bytesRead = 0; - - for (; ; ) { - int len = Math.min(DATA.length - bytesRead, - ThreadLocalRandom8.current().nextInt(32) + 1); - - int len0 = space.read(buf, bytesRead, len, 0); - - assert len0 > 0; - - bytesRead += len0; - - if (bytesRead == DATA_LEN) - break; - } - - assertTrue(Arrays.equals(DATA, buf)); - - return null; - } - } - }, - 1, - "reader"); - - fut1.get(); - fut2.get(); - - assert !tokFile.exists(); - } - - /** - * @throws Exception If failed. - */ - public void testForceClose() throws Exception { - File tokFile = new File(IgniteSystemProperties.getString("java.io.tmpdir"), getTestGridName()); - - assert tokFile.createNewFile() || tokFile.exists(); - - String tok = tokFile.getAbsolutePath(); - - info("Using token file: " + tok); - - Collection<Integer> ids = IpcSharedMemoryUtils.sharedMemoryIds(); - - info("IDs in the system: " + ids); - - GridIpcSharedMemorySpace space = new GridIpcSharedMemorySpace(tok, IpcSharedMemoryUtils.pid(), 0, 128, - false, log); - - ids = IpcSharedMemoryUtils.sharedMemoryIds(); - - info("IDs in the system: " + ids); - - assert ids.contains(space.sharedMemoryId()); - - // Write some data to the space, but avoid blocking. - space.write(DATA, 0, 16, 0); - - int shmemId = space.sharedMemoryId(); - - space.forceClose(); - - ids = IpcSharedMemoryUtils.sharedMemoryIds(); - - info("IDs in the system: " + ids); - - assert !ids.contains(shmemId); - - assert !tokFile.exists(); - } - - /** - * @throws Exception If failed. - */ - public void testReadAfterClose() throws Exception { - File tokFile = new File(IgniteSystemProperties.getString("java.io.tmpdir"), getTestGridName()); - - assert tokFile.createNewFile() || tokFile.exists(); - - String tok = tokFile.getAbsolutePath(); - - info("Using token file: " + tok); - - GridIpcSharedMemorySpace spaceOut = new GridIpcSharedMemorySpace(tok, IpcSharedMemoryUtils.pid(), 0, 128, - false, log); - - try (GridIpcSharedMemorySpace spaceIn = new GridIpcSharedMemorySpace(tok, IpcSharedMemoryUtils.pid(), 0, - 128, true, spaceOut.sharedMemoryId(), log)) { - // Write some data to the space, but avoid blocking. - spaceOut.write(DATA, 0, 16, 0); - - spaceOut.close(); - - // Read after other party has already called "close()". - // Space has data available and should read it. - byte[] buf = new byte[16]; - - int len = spaceIn.read(buf, 0, 16, 0); - - assert len == 16; - - len = spaceIn.read(buf, 0, 16, 0); - - assert len == -1; - } - - assert !tokFile.exists(); - } - - /** - * @throws Exception If failed. - */ - public void testWriteAfterClose() throws Exception { - File tokFile = new File(IgniteSystemProperties.getString("java.io.tmpdir"), getTestGridName()); - - assert tokFile.createNewFile() || tokFile.exists(); - - String tok = tokFile.getAbsolutePath(); - - info("Using token file: " + tok); - - try (GridIpcSharedMemorySpace spaceOut = new GridIpcSharedMemorySpace(tok, IpcSharedMemoryUtils.pid(), - IpcSharedMemoryUtils.pid(), 128, false, log)) { - - try (GridIpcSharedMemorySpace spaceIn = new GridIpcSharedMemorySpace(tok, IpcSharedMemoryUtils.pid(), - IpcSharedMemoryUtils.pid(), 128, true, spaceOut.sharedMemoryId(), log)) { - // Write some data to the space, but avoid blocking. - spaceOut.write(DATA, 0, 16, 0); - - spaceIn.close(); - - try { - spaceOut.write(DATA, 0, 16, 0); - - assert false; - } - catch (IgniteCheckedException e) { - info("Caught expected exception: " + e); - } - } - } - - assert !tokFile.exists(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java new file mode 100644 index 0000000..94c1f75 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem; + +import org.apache.commons.collections.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Test shared memory endpoints crash detection. + */ +public class IpcSharedMemoryCrashDetectionSelfTest extends GridCommonAbstractTest { + /** Timeout in ms between read/write attempts in busy-wait loops. */ + public static final int RW_SLEEP_TIMEOUT = 50; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + IpcSharedMemoryNativeLoader.load(); + } + + /** + * @throws Exception If failed. + */ + public void testGgfsServerClientInteractionsUponClientKilling() throws Exception { + U.setWorkDirectory(null, U.getGridGainHome()); + + // Run server endpoint. + IpcSharedMemoryServerEndpoint srv = new IpcSharedMemoryServerEndpoint(); + + new GridTestResources().inject(srv); + + try { + srv.start(); + + info("Check that server gets correct exception upon client's killing."); + + info("Shared memory IDs before starting client endpoint: " + IpcSharedMemoryUtils.sharedMemoryIds()); + + Collection<Integer> shmemIdsWithinInteractions = interactWithClient(srv, true); + + Collection<Integer> shmemIdsAfterInteractions = null; + + // Give server endpoint some time to make resource clean up. See IpcSharedMemoryServerEndpoint.GC_FREQ. + for (int i = 0; i < 12; i++) { + shmemIdsAfterInteractions = IpcSharedMemoryUtils.sharedMemoryIds(); + + info("Shared memory IDs created within interaction: " + shmemIdsWithinInteractions); + info("Shared memory IDs after killing client endpoint: " + shmemIdsAfterInteractions); + + if (CollectionUtils.containsAny(shmemIdsAfterInteractions, shmemIdsWithinInteractions)) + U.sleep(1000); + else + break; + } + + assertFalse("List of shared memory IDs after killing client endpoint should not include IDs created " + + "within server-client interactions.", + CollectionUtils.containsAny(shmemIdsAfterInteractions, shmemIdsWithinInteractions)); + } + finally { + srv.close(); + } + } + + /** + * @throws Exception If failed. + */ + public void testGgfsClientServerInteractionsUponServerKilling() throws Exception { + Collection<Integer> shmemIdsBeforeInteractions = IpcSharedMemoryUtils.sharedMemoryIds(); + + info("Shared memory IDs before starting server-client interactions: " + shmemIdsBeforeInteractions); + + Collection<Integer> shmemIdsWithinInteractions = interactWithServer(); + + Collection<Integer> shmemIdsAfterInteractions = IpcSharedMemoryUtils.sharedMemoryIds(); + + info("Shared memory IDs created within interaction: " + shmemIdsWithinInteractions); + info("Shared memory IDs after server and client killing: " + shmemIdsAfterInteractions); + + if (!U.isLinux()) + assertTrue("List of shared memory IDs after server-client interactions should include IDs created within " + + "client-server interactions.", shmemIdsAfterInteractions.containsAll(shmemIdsWithinInteractions)); + else + assertFalse("List of shared memory IDs after server-client interactions should not include IDs created " + + "(on Linux): within client-server interactions.", + CollectionUtils.containsAny(shmemIdsAfterInteractions, shmemIdsWithinInteractions)); + + ProcessStartResult srvStartRes = startSharedMemoryTestServer(); + + try { + // Give server endpoint some time to make resource clean up. See IpcSharedMemoryServerEndpoint.GC_FREQ. + for (int i = 0; i < 12; i++) { + shmemIdsAfterInteractions = IpcSharedMemoryUtils.sharedMemoryIds(); + + info("Shared memory IDs after server restart: " + shmemIdsAfterInteractions); + + if (CollectionUtils.containsAny(shmemIdsAfterInteractions, shmemIdsWithinInteractions)) + U.sleep(1000); + else + break; + } + + assertFalse("List of shared memory IDs after server endpoint restart should not include IDs created: " + + "within client-server interactions.", + CollectionUtils.containsAny(shmemIdsAfterInteractions, shmemIdsWithinInteractions)); + } + finally { + srvStartRes.proc().kill(); + + srvStartRes.isKilledLatch().await(); + } + } + + /** + * @throws Exception If failed. + */ + public void testClientThrowsCorrectExceptionUponServerKilling() throws Exception { + info("Shared memory IDs before starting server-client interactions: " + + IpcSharedMemoryUtils.sharedMemoryIds()); + + Collection<Integer> shmemIdsWithinInteractions = checkClientThrowsCorrectExceptionUponServerKilling(); + + Collection<Integer> shmemIdsAfterInteractions = IpcSharedMemoryUtils.sharedMemoryIds(); + + info("Shared memory IDs created within interaction: " + shmemIdsWithinInteractions); + info("Shared memory IDs after server killing and client graceful termination: " + shmemIdsAfterInteractions); + + assertFalse("List of shared memory IDs after killing server endpoint should not include IDs created " + + "within server-client interactions.", + CollectionUtils.containsAny(shmemIdsAfterInteractions, shmemIdsWithinInteractions)); + } + + /** + * Launches GgfsSharedMemoryTestServer and GgfsSharedMemoryTestClient. + * After successful connection kills firstly server and secondly client. + * + * @return Collection of shared memory IDs created while client-server interactions. + * @throws Exception In case of any exception happen. + */ + private Collection<Integer> interactWithServer() throws Exception { + ProcessStartResult srvStartRes = startSharedMemoryTestServer(); + + ProcessStartResult clientStartRes = startSharedMemoryTestClient(); + + // Wait until client and server start to talk. + clientStartRes.isReadyLatch().await(); + + info("Going to kill server."); + + srvStartRes.proc().kill(); + + srvStartRes.isKilledLatch().await(); + + info("Going to kill client."); + + clientStartRes.proc().kill(); + + clientStartRes.isKilledLatch().await(); + + return clientStartRes.shmemIds(); + } + + /** + * Launches GgfsSharedMemoryTestServer and connects to it with client endpoint. + * After couple of reads-writes kills the server and checks client throws correct exception. + * + * @return List of shared memory IDs created while client-server interactions. + * @throws Exception In case of any exception happen. + */ + @SuppressWarnings("BusyWait") + private Collection<Integer> checkClientThrowsCorrectExceptionUponServerKilling() throws Exception { + ProcessStartResult srvStartRes = startSharedMemoryTestServer(); + + Collection<Integer> shmemIds = new ArrayList<>(); + IpcSharedMemoryClientEndpoint client = null; + + int interactionsCntBeforeSrvKilling = 5; + int i = 1; + + try { + // Run client endpoint. + client = (IpcSharedMemoryClientEndpoint) IpcEndpointFactory.connectEndpoint( + "shmem:" + IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT, log); + + OutputStream os = client.outputStream(); + + shmemIds.add(client.inSpace().sharedMemoryId()); + shmemIds.add(client.outSpace().sharedMemoryId()); + + for (; i < interactionsCntBeforeSrvKilling * 2; i++) { + info("Write: 123"); + + os.write(123); + + Thread.sleep(RW_SLEEP_TIMEOUT); + + if (i == interactionsCntBeforeSrvKilling) { + info("Going to kill server."); + + srvStartRes.proc().kill(); + + info("Write 512k array to hang write procedure."); + + os.write(new byte[512 * 1024]); + } + } + + fail("Client should throw IOException upon server killing."); + } + catch (IOException e) { + assertTrue(i >= interactionsCntBeforeSrvKilling); + + assertTrue(X.hasCause(e, IgniteCheckedException.class)); + assertTrue(X.cause(e, IgniteCheckedException.class).getMessage().contains("Shared memory segment has been closed")); + } + finally { + U.closeQuiet(client); + } + + srvStartRes.isKilledLatch().await(); + + return shmemIds; + } + + /** + * Creates client endpoint and launches interaction between the one and the given server endpoint. + * + * + * @param srv Server endpoint to interact with. + * @param killClient Whether or not kill client endpoint within interaction. + * @return List of shared memory IDs created while client-server interactions. + * @throws Exception In case of any exception happen. + */ + @SuppressWarnings({"BusyWait", "TypeMayBeWeakened"}) + private Collection<Integer> interactWithClient(IpcSharedMemoryServerEndpoint srv, boolean killClient) + throws Exception { + ProcessStartResult clientStartRes = startSharedMemoryTestClient(); + + IpcSharedMemoryClientEndpoint clientEndpoint = (IpcSharedMemoryClientEndpoint)srv.accept(); + + Collection<Integer> shmemIds = new ArrayList<>(); + InputStream is = null; + + int interactionsCntBeforeClientKilling = 5; + int i = 1; + + try { + is = clientEndpoint.inputStream(); + + shmemIds.add(clientEndpoint.inSpace().sharedMemoryId()); + shmemIds.add(clientEndpoint.outSpace().sharedMemoryId()); + + for (; i < interactionsCntBeforeClientKilling * 2; i++) { + info("Before read."); + + is.read(); + + Thread.sleep(RW_SLEEP_TIMEOUT); + + if (killClient && i == interactionsCntBeforeClientKilling) { + info("Going to kill client."); + + clientStartRes.proc().kill(); + } + } + } + catch (IOException e) { + assertTrue("No IOException should be thrown if we do not kill client.", killClient); + assertTrue("No IOException should be thrown before client is killed.", + i > interactionsCntBeforeClientKilling); + + assertTrue(X.hasCause(e, IgniteCheckedException.class)); + assertTrue(X.cause(e, IgniteCheckedException.class).getMessage().contains("Shared memory segment has been closed")); + + clientStartRes.isKilledLatch().await(); + + return shmemIds; + } + finally { + U.closeQuiet(is); + } + + assertTrue( + "Interactions count should be bigger than interactionsCntBeforeClientKilling if we do not kill client.", + i > interactionsCntBeforeClientKilling); + + // Cleanup client. + clientStartRes.proc().kill(); + + clientStartRes.isKilledLatch().await(); + + assertFalse("No IOException have been thrown while the client should be killed.", killClient); + + return shmemIds; + } + + /** + * Starts {@code GgfsSharedMemoryTestClient}. The method doesn't wait while client being started. + * + * @return Start result of the {@code GgfsSharedMemoryTestClient}. + * @throws Exception In case of any exception happen. + */ + private ProcessStartResult startSharedMemoryTestClient() throws Exception { + /** */ + final CountDownLatch killedLatch = new CountDownLatch(1); + + /** */ + final CountDownLatch readyLatch = new CountDownLatch(1); + + /** */ + final ProcessStartResult res = new ProcessStartResult(); + + /** Process. */ + GridJavaProcess proc = GridJavaProcess.exec( + GgfsSharedMemoryTestClient.class, null, + log, + new CI1<String>() { + @Override public void apply(String s) { + info("Client process prints: " + s); + + if (s.startsWith(GgfsSharedMemoryTestClient.SHMEM_IDS_MSG_PREFIX)) { + res.shmemIds(s.substring(GgfsSharedMemoryTestClient.SHMEM_IDS_MSG_PREFIX.length())); + + readyLatch.countDown(); + } + } + }, + new CA() { + @Override public void apply() { + info("Client is killed"); + + killedLatch.countDown(); + } + }, + null, + System.getProperty("surefire.test.class.path") + ); + + res.proc(proc); + res.isKilledLatch(killedLatch); + res.isReadyLatch(readyLatch); + + return res; + } + + /** + * Starts {@code GgfsSharedMemoryTestServer}. The method waits while server being started. + * + * @return Start result of the {@code GgfsSharedMemoryTestServer}. + * @throws Exception In case of any exception happen. + */ + private ProcessStartResult startSharedMemoryTestServer() throws Exception { + final CountDownLatch srvReady = new CountDownLatch(1); + final CountDownLatch isKilledLatch = new CountDownLatch(1); + + GridJavaProcess proc = GridJavaProcess.exec( + GgfsSharedMemoryTestServer.class, null, + log, + new CI1<String>() { + @Override public void apply(String str) { + info("Server process prints: " + str); + + if (str.contains("IPC shared memory server endpoint started")) + srvReady.countDown(); + } + }, + new CA() { + @Override public void apply() { + info("Server is killed"); + + isKilledLatch.countDown(); + } + }, + null, + System.getProperty("surefire.test.class.path") + ); + + srvReady.await(); + + ProcessStartResult res = new ProcessStartResult(); + + res.proc(proc); + res.isKilledLatch(isKilledLatch); + + return res; + } + + /** + * Internal utility class to store results of running client/server in separate process. + */ + private static class ProcessStartResult { + /** Java process within which some class has been run. */ + private GridJavaProcess proc; + + /** Count down latch to signal when process termination will be detected. */ + private CountDownLatch killedLatch; + + /** Count down latch to signal when process is readiness (in terms of business logic) will be detected. */ + private CountDownLatch readyLatch; + + /** Shared memory IDs string read from system.input. */ + private Collection<Integer> shmemIds; + + /** + * @return Java process within which some class has been run. + */ + GridJavaProcess proc() { + return proc; + } + + /** + * Sets Java process within which some class has been run. + * + * @param proc Java process. + */ + void proc(GridJavaProcess proc) { + this.proc = proc; + } + + /** + * @return Latch to signal when process termination will be detected. + */ + CountDownLatch isKilledLatch() { + return killedLatch; + } + + /** + * Sets CountDownLatch to signal when process termination will be detected. + * + * @param killedLatch CountDownLatch + */ + void isKilledLatch(CountDownLatch killedLatch) { + this.killedLatch = killedLatch; + } + + /** + * @return Latch to signal when process is readiness (in terms of business logic) will be detected. + */ + CountDownLatch isReadyLatch() { + return readyLatch; + } + + /** + * Sets CountDownLatch to signal when process readiness (in terms of business logic) will be detected. + * + * @param readyLatch CountDownLatch + */ + void isReadyLatch(CountDownLatch readyLatch) { + this.readyLatch = readyLatch; + } + + /** + * @return Shared memory IDs string read from system.input. Nullable. + */ + @Nullable Collection<Integer> shmemIds() { + return shmemIds; + } + + /** + * Sets Shared memory IDs string read from system.input. + * + * @param shmemIds Shared memory IDs string. + */ + public void shmemIds(String shmemIds) { + this.shmemIds = (shmemIds == null) ? null : + F.transform(shmemIds.split(","), new C1<String, Integer>() { + @Override public Integer apply(String s) { + return Long.valueOf(s).intValue(); + } + }); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryFakeClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryFakeClient.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryFakeClient.java new file mode 100644 index 0000000..c8046ca --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryFakeClient.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem; + +import org.apache.ignite.logger.java.*; +import org.apache.ignite.internal.util.ipc.*; + +/** + * + */ +public class IpcSharedMemoryFakeClient { + /** + * @param args Args. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception{ + IpcEndpointFactory.connectEndpoint("shmem:10500", new IgniteJavaLogger()); + IpcEndpointFactory.connectEndpoint("shmem:10500", new IgniteJavaLogger()); + IpcEndpointFactory.connectEndpoint("shmem:10500", new IgniteJavaLogger()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoaderSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoaderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoaderSelfTest.java new file mode 100644 index 0000000..e75eb04 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoaderSelfTest.java @@ -0,0 +1,46 @@ +package org.apache.ignite.internal.util.ipc.shmem; + +import junit.framework.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +public class IpcSharedMemoryNativeLoaderSelfTest extends TestCase { + + public void testLoadWithCorruptedLibFile() throws Exception { + if (U.isWindows()) + return; + + Process ps = GridJavaProcess.exec( + LoadWithCorruptedLibFileTestRunner.class, + null, + null, + null, + null, + Collections.<String>emptyList(), + null + ).getProcess(); + + readStreams(ps); + + int code = ps.waitFor(); + + assertEquals("Returned code have to be 0.", 0, code); + } + + private void readStreams(Process proc) throws IOException { + BufferedReader stdOut = new BufferedReader(new InputStreamReader(proc.getInputStream())); + + String s; + + while ((s = stdOut.readLine()) != null) + System.out.println("OUT>>>>>> " + s); + + BufferedReader errOut = new BufferedReader(new InputStreamReader(proc.getErrorStream())); + + while ((s = errOut.readLine()) != null) + System.out.println("ERR>>>>>> " + s); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java new file mode 100644 index 0000000..64a0d5c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.util.*; + +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * + */ +public class IpcSharedMemoryNodeStartup { + /** + * @param args Args. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception{ + IgniteConfiguration cfg = new IgniteConfiguration(); + + IgniteFsConfiguration ggfsCfg = new IgniteFsConfiguration(); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + + Map<String, String> endpointCfg = new HashMap<>(); + + endpointCfg.put("type", "shmem"); + endpointCfg.put("port", "10500"); + + ggfsCfg.setIpcEndpointConfiguration(endpointCfg); + + ggfsCfg.setDataCacheName("partitioned"); + ggfsCfg.setMetaCacheName("partitioned"); + ggfsCfg.setName("ggfs"); + + cfg.setGgfsConfiguration(ggfsCfg); + + CacheConfiguration cacheCfg = new CacheConfiguration(); + + cacheCfg.setName("partitioned"); + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setDistributionMode(PARTITIONED_ONLY); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setEvictionPolicy(null); + cacheCfg.setBackups(0); + cacheCfg.setQueryIndexEnabled(false); + + cfg.setCacheConfiguration(cacheCfg); + + cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + + try (Ignite ignored = G.start(cfg)) { + X.println("Press any key to stop grid..."); + + System.in.read(); + } + } +}