http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java new file mode 100644 index 0000000..fab41e7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jdk8.backport.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class IpcSharedMemorySpaceSelfTest extends GridCommonAbstractTest { + /** */ + public static final int DATA_LEN = 1024 * 1024; + + /** */ + private static final byte[] DATA = new byte[DATA_LEN]; + + /** + * + */ + static { + for (int i = 0; i < DATA_LEN; i++) + DATA[i] = (byte)i; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + IpcSharedMemoryNativeLoader.load(); + } + + /** + * @throws Exception If failed. + */ + public void testBasicOperations() throws Exception { + File tokFile = new File(IgniteSystemProperties.getString("java.io.tmpdir"), UUID.randomUUID().toString()); + + assert tokFile.createNewFile(); + + final String tok = tokFile.getAbsolutePath(); + + info("Array length: " + DATA.length); + + final AtomicReference<IpcSharedMemorySpace> spaceRef = new AtomicReference<>(); + + IgniteFuture<?> fut1 = multithreadedAsync( + new Callable<Object>() { + @SuppressWarnings("TooBroadScope") + @Override public Object call() throws Exception { + try (IpcSharedMemorySpace space = new IpcSharedMemorySpace(tok, 0, 0, 128, false, + log)) { + spaceRef.set(space); + + int bytesWritten = 0; + + for (; ; ) { + int len = Math.min(DATA.length - bytesWritten, + ThreadLocalRandom8.current().nextInt(256) + 1); + + space.write(DATA, bytesWritten, len, 0); + + bytesWritten += len; + + if (bytesWritten == DATA.length) + break; + } + + info("Thread finished."); + + return null; + } + } + }, + 1, + "writer"); + + IgniteFuture<?> fut2 = multithreadedAsync( + new Callable<Object>() { + @SuppressWarnings({"TooBroadScope", "StatementWithEmptyBody"}) + @Override public Object call() throws Exception { + IpcSharedMemorySpace inSpace; + + while ((inSpace = spaceRef.get()) == null) { + // No-op; + } + + try (IpcSharedMemorySpace space = new IpcSharedMemorySpace(tok, 0, 0, 128, true, + inSpace.sharedMemoryId(), log)) { + byte[] buf = new byte[DATA_LEN]; + + int bytesRead = 0; + + for (; ; ) { + int len = Math.min(DATA.length - bytesRead, + ThreadLocalRandom8.current().nextInt(32) + 1); + + int len0 = space.read(buf, bytesRead, len, 0); + + assert len0 > 0; + + bytesRead += len0; + + if (bytesRead == DATA_LEN) + break; + } + + assertTrue(Arrays.equals(DATA, buf)); + + return null; + } + } + }, + 1, + "reader"); + + fut1.get(); + fut2.get(); + + assert !tokFile.exists(); + } + + /** + * @throws Exception If failed. + */ + public void testForceClose() throws Exception { + File tokFile = new File(IgniteSystemProperties.getString("java.io.tmpdir"), getTestGridName()); + + assert tokFile.createNewFile() || tokFile.exists(); + + String tok = tokFile.getAbsolutePath(); + + info("Using token file: " + tok); + + Collection<Integer> ids = IpcSharedMemoryUtils.sharedMemoryIds(); + + info("IDs in the system: " + ids); + + IpcSharedMemorySpace space = new IpcSharedMemorySpace(tok, IpcSharedMemoryUtils.pid(), 0, 128, + false, log); + + ids = IpcSharedMemoryUtils.sharedMemoryIds(); + + info("IDs in the system: " + ids); + + assert ids.contains(space.sharedMemoryId()); + + // Write some data to the space, but avoid blocking. + space.write(DATA, 0, 16, 0); + + int shmemId = space.sharedMemoryId(); + + space.forceClose(); + + ids = IpcSharedMemoryUtils.sharedMemoryIds(); + + info("IDs in the system: " + ids); + + assert !ids.contains(shmemId); + + assert !tokFile.exists(); + } + + /** + * @throws Exception If failed. + */ + public void testReadAfterClose() throws Exception { + File tokFile = new File(IgniteSystemProperties.getString("java.io.tmpdir"), getTestGridName()); + + assert tokFile.createNewFile() || tokFile.exists(); + + String tok = tokFile.getAbsolutePath(); + + info("Using token file: " + tok); + + IpcSharedMemorySpace spaceOut = new IpcSharedMemorySpace(tok, IpcSharedMemoryUtils.pid(), 0, 128, + false, log); + + try (IpcSharedMemorySpace spaceIn = new IpcSharedMemorySpace(tok, IpcSharedMemoryUtils.pid(), 0, + 128, true, spaceOut.sharedMemoryId(), log)) { + // Write some data to the space, but avoid blocking. + spaceOut.write(DATA, 0, 16, 0); + + spaceOut.close(); + + // Read after other party has already called "close()". + // Space has data available and should read it. + byte[] buf = new byte[16]; + + int len = spaceIn.read(buf, 0, 16, 0); + + assert len == 16; + + len = spaceIn.read(buf, 0, 16, 0); + + assert len == -1; + } + + assert !tokFile.exists(); + } + + /** + * @throws Exception If failed. + */ + public void testWriteAfterClose() throws Exception { + File tokFile = new File(IgniteSystemProperties.getString("java.io.tmpdir"), getTestGridName()); + + assert tokFile.createNewFile() || tokFile.exists(); + + String tok = tokFile.getAbsolutePath(); + + info("Using token file: " + tok); + + try (IpcSharedMemorySpace spaceOut = new IpcSharedMemorySpace(tok, IpcSharedMemoryUtils.pid(), + IpcSharedMemoryUtils.pid(), 128, false, log)) { + + try (IpcSharedMemorySpace spaceIn = new IpcSharedMemorySpace(tok, IpcSharedMemoryUtils.pid(), + IpcSharedMemoryUtils.pid(), 128, true, spaceOut.sharedMemoryId(), log)) { + // Write some data to the space, but avoid blocking. + spaceOut.write(DATA, 0, 16, 0); + + spaceIn.close(); + + try { + spaceOut.write(DATA, 0, 16, 0); + + assert false; + } + catch (IgniteCheckedException e) { + info("Caught expected exception: " + e); + } + } + } + + assert !tokFile.exists(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java index 028cae9..4c5413c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java @@ -31,7 +31,7 @@ public class IpcSharedMemoryUtilsSelfTest extends GridCommonAbstractTest { @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); - GridIpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(); } /** @@ -58,7 +58,7 @@ public class IpcSharedMemoryUtilsSelfTest extends GridCommonAbstractTest { String tok = tokFile.getAbsolutePath(); - GridIpcSharedMemorySpace space = new GridIpcSharedMemorySpace(tok, IpcSharedMemoryUtils.pid(), 0, 128, + IpcSharedMemorySpace space = new IpcSharedMemorySpace(tok, IpcSharedMemoryUtils.pid(), 0, 128, false, log); info("Space: " + space); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java index acee749..a3b4a0e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java @@ -6,14 +6,14 @@ import java.io.IOException; public class LoadWithCorruptedLibFileTestRunner { public static final String TMP_DIR_FOR_TEST = System.getProperty("user.home"); - public static final String LOADED_LIB_FILE_NAME = System.mapLibraryName(GridIpcSharedMemoryNativeLoader.LIB_NAME); + public static final String LOADED_LIB_FILE_NAME = System.mapLibraryName(IpcSharedMemoryNativeLoader.LIB_NAME); public static void main(String[] args) throws Exception { System.setProperty("java.io.tmpdir", TMP_DIR_FOR_TEST); createCorruptedLibFile(); - GridIpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(); } private static void createCorruptedLibFile() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/GridIpcSharedMemoryBenchmarkParty.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/GridIpcSharedMemoryBenchmarkParty.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/GridIpcSharedMemoryBenchmarkParty.java deleted file mode 100644 index eb07b97..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/GridIpcSharedMemoryBenchmarkParty.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem.benchmark; - -import java.io.*; - -/** - * - */ -interface GridIpcSharedMemoryBenchmarkParty { - /** */ - public static final int DFLT_SPACE_SIZE = 512 * 1024; - - /** */ - public static final int DFLT_BUF_SIZE = 8 * 1024; - - /** */ - public static final String DFLT_TOKEN = - new File(System.getProperty("java.io.tmpdir"), "benchmark").getAbsolutePath(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/GridIpcSharedMemoryBenchmarkReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/GridIpcSharedMemoryBenchmarkReader.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/GridIpcSharedMemoryBenchmarkReader.java deleted file mode 100644 index 2583245..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/GridIpcSharedMemoryBenchmarkReader.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem.benchmark; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.ipc.*; -import org.apache.ignite.internal.util.ipc.shmem.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.*; - -import javax.swing.*; -import java.io.*; -import java.util.concurrent.atomic.*; - -/** - * - */ -public class GridIpcSharedMemoryBenchmarkReader implements GridIpcSharedMemoryBenchmarkParty { - /** Destination buffer size. */ - public static final int DST_BUFFER_SIZE = 512 * 1024 * 1024; - - /** */ - private static volatile boolean done; - - /** - * @param args Args. - * @throws IgniteCheckedException If failed. - */ - public static void main(String[] args) throws IgniteCheckedException { - GridIpcSharedMemoryNativeLoader.load(); - - int nThreads = (args.length > 0 ? Integer.parseInt(args[0]) : 1); - - final AtomicLong transferCntr = new AtomicLong(); - - Thread collector = new Thread(new Runnable() { - @SuppressWarnings("BusyWait") - @Override public void run() { - try { - while (!done) { - Thread.sleep(5000); - - X.println("Transfer rate: " + transferCntr.getAndSet(0) / (1024 * 1024 * 5) + " MB/sec"); - } - } - catch (InterruptedException ignored) { - // No-op. - } - - } - }); - - collector.start(); - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override public void run() { - System.out.println("Shutting down..."); - - done = true; - } - }); - - try (GridIpcSharedMemoryServerEndpoint srv = new GridIpcSharedMemoryServerEndpoint()) { - new GridTestResources().inject(srv); - - srv.start(); - - for (int i = 0; i < nThreads; i++) { - final GridIpcEndpoint endPnt = srv.accept(); - - new Thread(new Runnable() { - @Override - public void run() { - InputStream space = null; - - try { - space = endPnt.inputStream(); - - byte[] buf = new byte[DST_BUFFER_SIZE]; - - int pos = 0; - - while (!done) { - int maxRead = Math.min(buf.length - pos, DFLT_BUF_SIZE); - - int read = space.read(buf, pos, maxRead); - - if (read == -1) { - X.println("Space has been closed"); - - return; - } - - transferCntr.addAndGet(read); - - pos += read; - - if (pos >= buf.length) - pos = 0; - } - } - catch (Exception e) { - e.printStackTrace(); - } - finally { - U.closeQuiet(space); - } - } - }).start(); - } - } - - JOptionPane.showMessageDialog(null, "Press OK to stop READER."); - - done = true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/GridIpcSharedMemoryBenchmarkWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/GridIpcSharedMemoryBenchmarkWriter.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/GridIpcSharedMemoryBenchmarkWriter.java deleted file mode 100644 index c0c0dd6..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/GridIpcSharedMemoryBenchmarkWriter.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem.benchmark; - -import org.apache.ignite.*; -import org.apache.ignite.logger.java.*; -import org.apache.ignite.internal.util.ipc.*; -import org.apache.ignite.internal.util.ipc.shmem.*; -import org.apache.ignite.internal.util.typedef.*; - -import javax.swing.*; -import java.io.*; -import java.util.concurrent.atomic.*; - -/** - * - */ -public class GridIpcSharedMemoryBenchmarkWriter implements GridIpcSharedMemoryBenchmarkParty { - /** Large send buffer size. */ - public static final int SRC_BUFFER_SIZE = 512 * 1024 * 1024; - - /** */ - private static volatile boolean done; - - /** - * @param args Args. - * @throws IgniteCheckedException If failed. - */ - public static void main(String[] args) throws IgniteCheckedException { - GridIpcSharedMemoryNativeLoader.load(); - - int nThreads = args.length > 0 ? Integer.parseInt(args[0]) : 1; - - final AtomicLong transferCntr = new AtomicLong(); - - Thread collector = new Thread(new Runnable() { - @SuppressWarnings("BusyWait") - @Override public void run() { - try { - while (!done) { - Thread.sleep(5000); - - X.println("Transfer rate: " + transferCntr.getAndSet(0) / (1024 * 1024 * 5) + " MB/sec"); - } - } - catch (InterruptedException ignored) { - // No-op. - } - - } - }); - - collector.start(); - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override public void run() { - System.out.println("Shutting down..."); - - done = true; - } - }); - - for (int i = 0; i < nThreads; i++) { - final int threadNum = i; - - new Thread(new Runnable() { - @Override public void run() { - GridIpcEndpoint client = null; - - try { - client = GridIpcEndpointFactory.connectEndpoint("shmem:" + - GridIpcSharedMemoryServerEndpoint.DFLT_IPC_PORT, new IgniteJavaLogger()); - - OutputStream space = client.outputStream(); - - byte[] buf = new byte[SRC_BUFFER_SIZE]; - - int pos = 0; - - while (!done) { - int snd = Math.min(DFLT_BUF_SIZE, buf.length - pos); - - space.write(buf, pos, snd); - - // Measure only 1 client. - if (threadNum == 0) - transferCntr.addAndGet(snd); - - pos += snd; - - if (pos >= buf.length) - pos = 0; - } - } - catch (Exception e) { - e.printStackTrace(); - } - finally { - if (client != null) - client.close(); - } - } - }).start(); - } - - JOptionPane.showMessageDialog(null, "Press OK to stop WRITER."); - - done = true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkParty.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkParty.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkParty.java new file mode 100644 index 0000000..99e3644 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkParty.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem.benchmark; + +import java.io.*; + +/** + * + */ +interface IpcSharedMemoryBenchmarkParty { + /** */ + public static final int DFLT_SPACE_SIZE = 512 * 1024; + + /** */ + public static final int DFLT_BUF_SIZE = 8 * 1024; + + /** */ + public static final String DFLT_TOKEN = + new File(System.getProperty("java.io.tmpdir"), "benchmark").getAbsolutePath(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java new file mode 100644 index 0000000..0ac3ef7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem.benchmark; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.ipc.shmem.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.*; + +import javax.swing.*; +import java.io.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class IpcSharedMemoryBenchmarkReader implements IpcSharedMemoryBenchmarkParty { + /** Destination buffer size. */ + public static final int DST_BUFFER_SIZE = 512 * 1024 * 1024; + + /** */ + private static volatile boolean done; + + /** + * @param args Args. + * @throws IgniteCheckedException If failed. + */ + public static void main(String[] args) throws IgniteCheckedException { + IpcSharedMemoryNativeLoader.load(); + + int nThreads = (args.length > 0 ? Integer.parseInt(args[0]) : 1); + + final AtomicLong transferCntr = new AtomicLong(); + + Thread collector = new Thread(new Runnable() { + @SuppressWarnings("BusyWait") + @Override public void run() { + try { + while (!done) { + Thread.sleep(5000); + + X.println("Transfer rate: " + transferCntr.getAndSet(0) / (1024 * 1024 * 5) + " MB/sec"); + } + } + catch (InterruptedException ignored) { + // No-op. + } + + } + }); + + collector.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override public void run() { + System.out.println("Shutting down..."); + + done = true; + } + }); + + try (IpcSharedMemoryServerEndpoint srv = new IpcSharedMemoryServerEndpoint()) { + new GridTestResources().inject(srv); + + srv.start(); + + for (int i = 0; i < nThreads; i++) { + final IpcEndpoint endPnt = srv.accept(); + + new Thread(new Runnable() { + @Override + public void run() { + InputStream space = null; + + try { + space = endPnt.inputStream(); + + byte[] buf = new byte[DST_BUFFER_SIZE]; + + int pos = 0; + + while (!done) { + int maxRead = Math.min(buf.length - pos, DFLT_BUF_SIZE); + + int read = space.read(buf, pos, maxRead); + + if (read == -1) { + X.println("Space has been closed"); + + return; + } + + transferCntr.addAndGet(read); + + pos += read; + + if (pos >= buf.length) + pos = 0; + } + } + catch (Exception e) { + e.printStackTrace(); + } + finally { + U.closeQuiet(space); + } + } + }).start(); + } + } + + JOptionPane.showMessageDialog(null, "Press OK to stop READER."); + + done = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java new file mode 100644 index 0000000..f3a9aad --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.shmem.benchmark; + +import org.apache.ignite.*; +import org.apache.ignite.logger.java.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.ipc.shmem.*; +import org.apache.ignite.internal.util.typedef.*; + +import javax.swing.*; +import java.io.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class IpcSharedMemoryBenchmarkWriter implements IpcSharedMemoryBenchmarkParty { + /** Large send buffer size. */ + public static final int SRC_BUFFER_SIZE = 512 * 1024 * 1024; + + /** */ + private static volatile boolean done; + + /** + * @param args Args. + * @throws IgniteCheckedException If failed. + */ + public static void main(String[] args) throws IgniteCheckedException { + IpcSharedMemoryNativeLoader.load(); + + int nThreads = args.length > 0 ? Integer.parseInt(args[0]) : 1; + + final AtomicLong transferCntr = new AtomicLong(); + + Thread collector = new Thread(new Runnable() { + @SuppressWarnings("BusyWait") + @Override public void run() { + try { + while (!done) { + Thread.sleep(5000); + + X.println("Transfer rate: " + transferCntr.getAndSet(0) / (1024 * 1024 * 5) + " MB/sec"); + } + } + catch (InterruptedException ignored) { + // No-op. + } + + } + }); + + collector.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override public void run() { + System.out.println("Shutting down..."); + + done = true; + } + }); + + for (int i = 0; i < nThreads; i++) { + final int threadNum = i; + + new Thread(new Runnable() { + @Override public void run() { + IpcEndpoint client = null; + + try { + client = IpcEndpointFactory.connectEndpoint("shmem:" + + IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT, new IgniteJavaLogger()); + + OutputStream space = client.outputStream(); + + byte[] buf = new byte[SRC_BUFFER_SIZE]; + + int pos = 0; + + while (!done) { + int snd = Math.min(DFLT_BUF_SIZE, buf.length - pos); + + space.write(buf, pos, snd); + + // Measure only 1 client. + if (threadNum == 0) + transferCntr.addAndGet(snd); + + pos += snd; + + if (pos >= buf.length) + pos = 0; + } + } + catch (Exception e) { + e.printStackTrace(); + } + finally { + if (client != null) + client.close(); + } + } + }).start(); + } + + JOptionPane.showMessageDialog(null, "Press OK to stop WRITER."); + + done = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteFsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteFsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteFsTestSuite.java index dbc5488..1375b87 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteFsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteFsTestSuite.java @@ -52,7 +52,7 @@ public class IgniteFsTestSuite extends TestSuite { suite.addTest(new TestSuite(GridGgfsStreamsSelfTest.class)); suite.addTest(new TestSuite(GridGgfsModesSelfTest.class)); - suite.addTest(new TestSuite(GridIpcServerEndpointDeserializerSelfTest.class)); + suite.addTest(new TestSuite(IpcServerEndpointDeserializerSelfTest.class)); suite.addTest(new TestSuite(GridGgfsMetricsSelfTest.class)); suite.addTest(new TestSuite(GridGgfsPrimarySelfTest.class)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIpcSharedMemorySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIpcSharedMemorySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIpcSharedMemorySelfTestSuite.java index 6b6d49d..cf45fad 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIpcSharedMemorySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIpcSharedMemorySelfTestSuite.java @@ -31,10 +31,10 @@ public class IgniteIpcSharedMemorySelfTestSuite extends TestSuite { public static TestSuite suite() throws Exception { TestSuite suite = new TestSuite("Ignite IPC Shared Memory Test Suite."); - suite.addTest(new TestSuite(GridIpcSharedMemorySpaceSelfTest.class)); + suite.addTest(new TestSuite(IpcSharedMemorySpaceSelfTest.class)); suite.addTest(new TestSuite(IpcSharedMemoryUtilsSelfTest.class)); - suite.addTest(new TestSuite(GridIpcSharedMemoryCrashDetectionSelfTest.class)); - suite.addTest(new TestSuite(GridIpcSharedMemoryNativeLoaderSelfTest.class)); + suite.addTest(new TestSuite(IpcSharedMemoryCrashDetectionSelfTest.class)); + suite.addTest(new TestSuite(IpcSharedMemoryNativeLoaderSelfTest.class)); return suite; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopIpcIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopIpcIo.java index 9214bc7..44b8a5b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopIpcIo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/GridGgfsHadoopIpcIo.java @@ -51,7 +51,7 @@ public class GridGgfsHadoopIpcIo implements GridGgfsHadoopIo { private AtomicLong reqIdCnt = new AtomicLong(); /** Endpoint. */ - private GridIpcEndpoint endpoint; + private IpcEndpoint endpoint; /** Endpoint output stream. */ private GridGgfsDataOutputStream out; @@ -236,8 +236,8 @@ public class GridGgfsHadoopIpcIo implements GridGgfsHadoopIo { boolean success = false; try { - endpoint = GridIpcEndpointFactory.connectEndpoint( - endpointAddr, new GridLoggerProxy(new GridGgfsHadoopJclLogger(log), null, null, "")); + endpoint = IpcEndpointFactory.connectEndpoint( + endpointAddr, new GridLoggerProxy(new GridGgfsHadoopJclLogger(log), null, null, "")); out = new GridGgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream())); @@ -251,10 +251,10 @@ public class GridGgfsHadoopIpcIo implements GridGgfsHadoopIo { success = true; } catch (IgniteCheckedException e) { - GridIpcOutOfSystemResourcesException resEx = e.getCause(GridIpcOutOfSystemResourcesException.class); + IpcOutOfSystemResourcesException resEx = e.getCause(IpcOutOfSystemResourcesException.class); if (resEx != null) - throw new IgniteCheckedException(GridIpcSharedMemoryServerEndpoint.OUT_OF_RESOURCES_MSG, resEx); + throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.OUT_OF_RESOURCES_MSG, resEx); throw e; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java index 19ccdbc..27279cb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java @@ -46,7 +46,7 @@ public class GridHadoopExternalCommunication { /** IPC error message. */ public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " + "(switching to TCP, may be slower). For troubleshooting see " + - GridIpcSharedMemoryServerEndpoint.TROUBLESHOOTING_URL; + IpcSharedMemoryServerEndpoint.TROUBLESHOOTING_URL; /** Default port which node sets listener to (value is <tt>47100</tt>). */ public static final int DFLT_PORT = 27100; @@ -187,7 +187,7 @@ public class GridHadoopExternalCommunication { private GridNioServer<GridHadoopMessage> nioSrvr; /** Shared memory server. */ - private GridIpcSharedMemoryServerEndpoint shmemSrv; + private IpcSharedMemoryServerEndpoint shmemSrv; /** {@code TCP_NODELAY} option value for created sockets. */ private boolean tcpNoDelay = DFLT_TCP_NODELAY; @@ -643,7 +643,7 @@ public class GridHadoopExternalCommunication { * @return Server. * @throws IgniteCheckedException If failed. */ - @Nullable private GridIpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException { + @Nullable private IpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException { if (boundTcpShmemPort >= 0) throw new IgniteCheckedException("Shared memory server was already created on port " + boundTcpShmemPort); @@ -655,8 +655,8 @@ public class GridHadoopExternalCommunication { // If configured TCP port is busy, find first available in range. for (int port = shmemPort; port < shmemPort + locPortRange; port++) { try { - GridIpcSharedMemoryServerEndpoint srv = new GridIpcSharedMemoryServerEndpoint( - log.getLogger(GridIpcSharedMemoryServerEndpoint.class), + IpcSharedMemoryServerEndpoint srv = new IpcSharedMemoryServerEndpoint( + log.getLogger(IpcSharedMemoryServerEndpoint.class), locProcDesc.processId(), gridName); srv.setPort(port); @@ -808,8 +808,8 @@ public class GridHadoopExternalCommunication { return createShmemClient(desc, shmemPort); } catch (IgniteCheckedException e) { - if (e.hasCause(GridIpcOutOfSystemResourcesException.class)) - // Has cause or is itself the GridIpcOutOfSystemResourcesException. + if (e.hasCause(IpcOutOfSystemResourcesException.class)) + // Has cause or is itself the IpcOutOfSystemResourcesException. LT.warn(log, null, OUT_OF_RESOURCES_TCP_MSG); else if (log.isDebugEnabled()) log.debug("Failed to establish shared memory connection with local hadoop process: " + @@ -835,10 +835,10 @@ public class GridHadoopExternalCommunication { long connTimeout0 = connTimeout; while (true) { - GridIpcEndpoint clientEndpoint; + IpcEndpoint clientEndpoint; try { - clientEndpoint = new GridIpcSharedMemoryClientEndpoint(port, (int)connTimeout, log); + clientEndpoint = new IpcSharedMemoryClientEndpoint(port, (int)connTimeout, log); } catch (IgniteCheckedException e) { // Reconnect for the second time, if connection is not established. @@ -1077,12 +1077,12 @@ public class GridHadoopExternalCommunication { */ private class ShmemAcceptWorker extends GridWorker { /** */ - private final GridIpcSharedMemoryServerEndpoint srv; + private final IpcSharedMemoryServerEndpoint srv; /** * @param srv Server. */ - ShmemAcceptWorker(GridIpcSharedMemoryServerEndpoint srv) { + ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) { super(gridName, "shmem-communication-acceptor", log); this.srv = srv; @@ -1121,7 +1121,7 @@ public class GridHadoopExternalCommunication { */ private class ShmemWorker extends GridWorker { /** */ - private final GridIpcEndpoint endpoint; + private final IpcEndpoint endpoint; /** Adapter. */ private GridHadoopIpcToNioAdapter<GridHadoopMessage> adapter; @@ -1129,7 +1129,7 @@ public class GridHadoopExternalCommunication { /** * @param endpoint Endpoint. */ - private ShmemWorker(GridIpcEndpoint endpoint, boolean accepted) { + private ShmemWorker(IpcEndpoint endpoint, boolean accepted) { super(gridName, "shmem-worker", log); this.endpoint = endpoint; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopIpcToNioAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopIpcToNioAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopIpcToNioAdapter.java index 35aa982..a39451d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopIpcToNioAdapter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopIpcToNioAdapter.java @@ -31,11 +31,11 @@ import java.util.concurrent.atomic.*; * communications. * * Note that this class consumes an entire thread inside {@link #serve()} method - * in order to serve one {@link GridIpcEndpoint}. + * in order to serve one {@link org.apache.ignite.internal.util.ipc.IpcEndpoint}. */ public class GridHadoopIpcToNioAdapter<T> { /** */ - private final GridIpcEndpoint endp; + private final IpcEndpoint endp; /** */ private final GridNioFilterChain<T> chain; @@ -55,7 +55,7 @@ public class GridHadoopIpcToNioAdapter<T> { * @param lsnr Listener. * @param filters Filters. */ - public GridHadoopIpcToNioAdapter(IgniteLogger log, GridIpcEndpoint endp, boolean accepted, + public GridHadoopIpcToNioAdapter(IgniteLogger log, IpcEndpoint endp, boolean accepted, GridNioServerListener<T> lsnr, GridNioFilter... filters) { this.endp = endp; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoop20FileSystemLoopbackPrimarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoop20FileSystemLoopbackPrimarySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoop20FileSystemLoopbackPrimarySelfTest.java index e09820e..d090055 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoop20FileSystemLoopbackPrimarySelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoop20FileSystemLoopbackPrimarySelfTest.java @@ -20,7 +20,7 @@ package org.apache.ignite.fs; import java.util.*; import static org.apache.ignite.fs.IgniteFsMode.*; -import static org.apache.ignite.internal.util.ipc.shmem.GridIpcSharedMemoryServerEndpoint.*; +import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.*; /** * Tests Hadoop 2.x file system in primary mode. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoop20FileSystemShmemPrimarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoop20FileSystemShmemPrimarySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoop20FileSystemShmemPrimarySelfTest.java index 8b0a699..82c1610 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoop20FileSystemShmemPrimarySelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoop20FileSystemShmemPrimarySelfTest.java @@ -20,7 +20,7 @@ package org.apache.ignite.fs; import java.util.*; import static org.apache.ignite.fs.IgniteFsMode.*; -import static org.apache.ignite.internal.util.ipc.shmem.GridIpcSharedMemoryServerEndpoint.*; +import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.*; /** * Tests Hadoop 2.x file system in primary mode. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemClientSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemClientSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemClientSelfTest.java index 13618fc..2a98fcb 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemClientSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemClientSelfTest.java @@ -39,7 +39,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.*; import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; -import static org.apache.ignite.internal.util.ipc.shmem.GridIpcSharedMemoryServerEndpoint.*; +import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.*; /** * Test interaction between a GGFS client and a GGFS server. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemHandshakeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemHandshakeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemHandshakeSelfTest.java index b76084c..c231b27 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemHandshakeSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemHandshakeSelfTest.java @@ -42,7 +42,7 @@ import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; import static org.apache.ignite.fs.IgniteFsMode.*; import static org.apache.ignite.internal.fs.hadoop.GridGgfsHadoopUtils.*; -import static org.apache.ignite.internal.util.ipc.shmem.GridIpcSharedMemoryServerEndpoint.*; +import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.*; /** * Tests for GGFS file system handshake. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemIpcCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemIpcCacheSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemIpcCacheSelfTest.java index be58d5c..1779a78 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemIpcCacheSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemIpcCacheSelfTest.java @@ -74,7 +74,7 @@ public class GridGgfsHadoopFileSystemIpcCacheSelfTest extends GridGgfsCommonAbst ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ put("type", "shmem"); - put("port", String.valueOf(GridIpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + cnt)); + put("port", String.valueOf(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + cnt)); }}); ggfsCfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemLoopbackAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemLoopbackAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemLoopbackAbstractSelfTest.java index d515cb4..bfe0dfd 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemLoopbackAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemLoopbackAbstractSelfTest.java @@ -19,7 +19,7 @@ package org.apache.ignite.fs; import java.util.*; -import static org.apache.ignite.internal.util.ipc.shmem.GridIpcSharedMemoryServerEndpoint.*; +import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.*; /** * GGFS Hadoop file system IPC loopback self test. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemShmemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemShmemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemShmemAbstractSelfTest.java index 0e9f2db..f62c1a4 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemShmemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsHadoopFileSystemShmemAbstractSelfTest.java @@ -25,7 +25,7 @@ import org.apache.ignite.testframework.*; import java.util.*; import java.util.concurrent.*; -import static org.apache.ignite.internal.util.ipc.shmem.GridIpcSharedMemoryServerEndpoint.*; +import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.*; /** * GGFS Hadoop file system IPC self test. @@ -57,14 +57,14 @@ public abstract class GridGgfsHadoopFileSystemShmemAbstractSelfTest extends Grid */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") public void testOutOfResources() throws Exception { - final Collection<GridIpcEndpoint> eps = new LinkedList<>(); + final Collection<IpcEndpoint> eps = new LinkedList<>(); try { IgniteCheckedException e = (IgniteCheckedException)GridTestUtils.assertThrows(log, new Callable<Object>() { @SuppressWarnings("InfiniteLoopStatement") @Override public Object call() throws Exception { while (true) { - GridIpcEndpoint ep = GridIpcEndpointFactory.connectEndpoint("shmem:10500", log); + IpcEndpoint ep = IpcEndpointFactory.connectEndpoint("shmem:10500", log); eps.add(ep); } @@ -81,7 +81,7 @@ public abstract class GridGgfsHadoopFileSystemShmemAbstractSelfTest extends Grid msg.contains("(error code: 12)")); } finally { - for (GridIpcEndpoint ep : eps) + for (IpcEndpoint ep : eps) ep.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsNearOnlyMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsNearOnlyMultiNodeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsNearOnlyMultiNodeSelfTest.java index 99e18f4..ec74ad5 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsNearOnlyMultiNodeSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsNearOnlyMultiNodeSelfTest.java @@ -83,7 +83,7 @@ public class GridGgfsNearOnlyMultiNodeSelfTest extends GridCommonAbstractTest { ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ put("type", "shmem"); - put("port", String.valueOf(GridIpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + cnt)); + put("port", String.valueOf(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + cnt)); }}); ggfsCfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups. @@ -146,7 +146,7 @@ public class GridGgfsNearOnlyMultiNodeSelfTest extends GridCommonAbstractTest { */ protected URI getFileSystemURI(int grid) { try { - return new URI("ggfs://127.0.0.1:" + (GridIpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + grid)); + return new URI("ggfs://127.0.0.1:" + (IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + grid)); } catch (URISyntaxException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/hadoop/src/test/java/org/apache/ignite/fs/IgniteFsEventsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/fs/IgniteFsEventsTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/fs/IgniteFsEventsTestSuite.java index 8ad8a43..b5eab88 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/fs/IgniteFsEventsTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/fs/IgniteFsEventsTestSuite.java @@ -81,7 +81,7 @@ public class IgniteFsEventsTestSuite extends TestSuite { ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ put("type", "shmem"); - put("port", String.valueOf(GridIpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1)); + put("port", String.valueOf(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1)); }}); return ggfsCfg; @@ -98,7 +98,7 @@ public class IgniteFsEventsTestSuite extends TestSuite { ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ put("type", "tcp"); - put("port", String.valueOf(GridIpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1)); + put("port", String.valueOf(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1)); }}); return ggfsCfg;