# ignite-117 : rename classes from ipc package GridIpc..->Ipc..
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e50ca07d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e50ca07d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e50ca07d Branch: refs/heads/sprint-1 Commit: e50ca07de475612eb36e7706f6100e7c6f159826 Parents: 3789241 Author: Artem SHutak <ashu...@gridgain.com> Authored: Wed Jan 28 20:52:06 2015 +0300 Committer: Artem SHutak <ashu...@gridgain.com> Committed: Wed Jan 28 20:52:06 2015 +0300 ---------------------------------------------------------------------- .../internal/processors/fs/GridGgfsServer.java | 26 +- .../processors/fs/GridGgfsServerManager.java | 8 +- .../processors/fs/IgniteFsNoopProcessor.java | 2 +- .../processors/fs/IgniteFsProcessor.java | 4 +- .../processors/fs/IgniteFsProcessorAdapter.java | 2 +- .../internal/util/ipc/GridIpcEndpoint.java | 49 -- .../util/ipc/GridIpcEndpointBindException.java | 47 -- .../util/ipc/GridIpcEndpointFactory.java | 84 --- .../internal/util/ipc/GridIpcEndpointType.java | 29 - .../util/ipc/GridIpcServerEndpoint.java | 73 -- .../ipc/GridIpcServerEndpointDeserializer.java | 66 -- .../internal/util/ipc/GridIpcToNioAdapter.java | 250 ------- .../ignite/internal/util/ipc/IpcEndpoint.java | 49 ++ .../util/ipc/IpcEndpointBindException.java | 47 ++ .../internal/util/ipc/IpcEndpointFactory.java | 84 +++ .../internal/util/ipc/IpcEndpointType.java | 29 + .../internal/util/ipc/IpcServerEndpoint.java | 73 ++ .../util/ipc/IpcServerEndpointDeserializer.java | 66 ++ .../internal/util/ipc/IpcToNioAdapter.java | 250 +++++++ .../ipc/loopback/GridIpcClientTcpEndpoint.java | 87 --- .../ipc/loopback/GridIpcServerTcpEndpoint.java | 179 ----- .../util/ipc/loopback/IpcClientTcpEndpoint.java | 87 +++ .../util/ipc/loopback/IpcServerTcpEndpoint.java | 179 +++++ .../GridIpcOutOfSystemResourcesException.java | 59 -- .../GridIpcSharedMemoryClientEndpoint.java | 336 --------- .../shmem/GridIpcSharedMemoryInitRequest.java | 67 -- .../shmem/GridIpcSharedMemoryInitResponse.java | 171 ----- .../shmem/GridIpcSharedMemoryInputStream.java | 99 --- .../shmem/GridIpcSharedMemoryNativeLoader.java | 261 ------- ...cSharedMemoryOperationTimedoutException.java | 59 -- .../shmem/GridIpcSharedMemoryOutputStream.java | 80 --- .../GridIpcSharedMemoryServerEndpoint.java | 707 ------------------- .../ipc/shmem/GridIpcSharedMemorySpace.java | 374 ---------- .../shmem/IpcOutOfSystemResourcesException.java | 59 ++ .../shmem/IpcSharedMemoryClientEndpoint.java | 336 +++++++++ .../ipc/shmem/IpcSharedMemoryInitRequest.java | 67 ++ .../ipc/shmem/IpcSharedMemoryInitResponse.java | 171 +++++ .../ipc/shmem/IpcSharedMemoryInputStream.java | 99 +++ .../ipc/shmem/IpcSharedMemoryNativeLoader.java | 261 +++++++ ...cSharedMemoryOperationTimedoutException.java | 59 ++ .../ipc/shmem/IpcSharedMemoryOutputStream.java | 80 +++ .../shmem/IpcSharedMemoryServerEndpoint.java | 707 +++++++++++++++++++ .../util/ipc/shmem/IpcSharedMemorySpace.java | 374 ++++++++++ .../util/ipc/shmem/IpcSharedMemoryUtils.java | 18 +- .../util/nio/GridShmemCommunicationClient.java | 4 +- .../visor/node/VisorNodeDataCollectorJob.java | 4 +- .../communication/tcp/TcpCommunicationSpi.java | 24 +- ...IpcEndpointRegistrationAbstractSelfTest.java | 4 +- ...pcEndpointRegistrationOnWindowsSelfTest.java | 4 +- ...idIpcServerEndpointDeserializerSelfTest.java | 160 ----- .../IpcServerEndpointDeserializerSelfTest.java | 160 +++++ .../ipc/shmem/GgfsSharedMemoryTestClient.java | 76 ++ .../ipc/shmem/GgfsSharedMemoryTestServer.java | 71 ++ .../shmem/GridGgfsSharedMemoryTestClient.java | 76 -- .../shmem/GridGgfsSharedMemoryTestServer.java | 71 -- ...idIpcSharedMemoryCrashDetectionSelfTest.java | 500 ------------- .../shmem/GridIpcSharedMemoryFakeClient.java | 36 - ...GridIpcSharedMemoryNativeLoaderSelfTest.java | 46 -- .../shmem/GridIpcSharedMemoryNodeStartup.java | 87 --- .../shmem/GridIpcSharedMemorySpaceSelfTest.java | 259 ------- .../IpcSharedMemoryCrashDetectionSelfTest.java | 500 +++++++++++++ .../ipc/shmem/IpcSharedMemoryFakeClient.java | 36 + .../IpcSharedMemoryNativeLoaderSelfTest.java | 46 ++ .../ipc/shmem/IpcSharedMemoryNodeStartup.java | 87 +++ .../ipc/shmem/IpcSharedMemorySpaceSelfTest.java | 259 +++++++ .../ipc/shmem/IpcSharedMemoryUtilsSelfTest.java | 4 +- .../LoadWithCorruptedLibFileTestRunner.java | 4 +- .../GridIpcSharedMemoryBenchmarkParty.java | 35 - .../GridIpcSharedMemoryBenchmarkReader.java | 133 ---- .../GridIpcSharedMemoryBenchmarkWriter.java | 125 ---- .../IpcSharedMemoryBenchmarkParty.java | 35 + .../IpcSharedMemoryBenchmarkReader.java | 133 ++++ .../IpcSharedMemoryBenchmarkWriter.java | 125 ++++ .../ignite/testsuites/IgniteFsTestSuite.java | 2 +- .../IgniteIpcSharedMemorySelfTestSuite.java | 6 +- .../internal/fs/hadoop/GridGgfsHadoopIpcIo.java | 10 +- .../GridHadoopExternalCommunication.java | 26 +- .../GridHadoopIpcToNioAdapter.java | 6 +- ...doop20FileSystemLoopbackPrimarySelfTest.java | 2 +- ...sHadoop20FileSystemShmemPrimarySelfTest.java | 2 +- .../GridGgfsHadoopFileSystemClientSelfTest.java | 2 +- ...idGgfsHadoopFileSystemHandshakeSelfTest.java | 2 +- ...ridGgfsHadoopFileSystemIpcCacheSelfTest.java | 2 +- ...adoopFileSystemLoopbackAbstractSelfTest.java | 2 +- ...fsHadoopFileSystemShmemAbstractSelfTest.java | 8 +- .../fs/GridGgfsNearOnlyMultiNodeSelfTest.java | 4 +- .../ignite/fs/IgniteFsEventsTestSuite.java | 4 +- 87 files changed, 4698 insertions(+), 4698 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServer.java index 73010c5..afe2c17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServer.java @@ -52,7 +52,7 @@ public class GridGgfsServer { private final Map<String,String> endpointCfg; /** Server endpoint. */ - private GridIpcServerEndpoint srvEndpoint; + private IpcServerEndpoint srvEndpoint; /** Server message handler. */ private GridGgfsServerHandler hnd; @@ -91,15 +91,15 @@ public class GridGgfsServer { * @throws IgniteCheckedException If failed. */ public void start() throws IgniteCheckedException { - srvEndpoint = GridIpcServerEndpointDeserializer.deserialize(endpointCfg); + srvEndpoint = IpcServerEndpointDeserializer.deserialize(endpointCfg); - if (U.isWindows() && srvEndpoint instanceof GridIpcSharedMemoryServerEndpoint) - throw new IgniteCheckedException(GridIpcSharedMemoryServerEndpoint.class.getSimpleName() + + if (U.isWindows() && srvEndpoint instanceof IpcSharedMemoryServerEndpoint) + throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.class.getSimpleName() + " should not be configured on Windows (configure " + - GridIpcServerTcpEndpoint.class.getSimpleName() + ")"); + IpcServerTcpEndpoint.class.getSimpleName() + ")"); - if (srvEndpoint instanceof GridIpcServerTcpEndpoint) { - GridIpcServerTcpEndpoint srvEndpoint0 = (GridIpcServerTcpEndpoint)srvEndpoint; + if (srvEndpoint instanceof IpcServerTcpEndpoint) { + IpcServerTcpEndpoint srvEndpoint0 = (IpcServerTcpEndpoint)srvEndpoint; srvEndpoint0.setManagement(mgmt); @@ -124,7 +124,7 @@ public class GridGgfsServer { srvEndpoint.start(); - // GridIpcServerEndpoint.getPort contract states return -1 if there is no port to be registered. + // IpcServerEndpoint.getPort contract states return -1 if there is no port to be registered. if (srvEndpoint.getPort() >= 0) ggfsCtx.kernalContext().ports().registerPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass()); @@ -172,7 +172,7 @@ public class GridGgfsServer { U.join(clientWorkers, log); - // GridIpcServerEndpoint.getPort contract states return -1 if there is no port to be registered. + // IpcServerEndpoint.getPort contract states return -1 if there is no port to be registered. if (srvEndpoint.getPort() >= 0) ggfsCtx.kernalContext().ports().deregisterPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass()); @@ -189,7 +189,7 @@ public class GridGgfsServer { * * @return IPC server endpoint. */ - public GridIpcServerEndpoint getIpcServerEndpoint() { + public IpcServerEndpoint getIpcServerEndpoint() { return srvEndpoint; } @@ -198,7 +198,7 @@ public class GridGgfsServer { */ private class ClientWorker extends GridWorker { /** Connected client endpoint. */ - private GridIpcEndpoint endpoint; + private IpcEndpoint endpoint; /** Data output stream. */ private final GridGgfsDataOutputStream out; @@ -216,7 +216,7 @@ public class GridGgfsServer { * @param endpoint Connected client endpoint. * @throws IgniteCheckedException If endpoint output stream cannot be obtained. */ - protected ClientWorker(GridIpcEndpoint endpoint, int idx) throws IgniteCheckedException { + protected ClientWorker(IpcEndpoint endpoint, int idx) throws IgniteCheckedException { super(ggfsCtx.kernalContext().gridName(), "ggfs-client-worker-" + idx, log); this.endpoint = endpoint; @@ -391,7 +391,7 @@ public class GridGgfsServer { @Override protected void body() throws InterruptedException, IgniteInterruptedException { try { while (!Thread.currentThread().isInterrupted()) { - GridIpcEndpoint client = srvEndpoint.accept(); + IpcEndpoint client = srvEndpoint.accept(); if (log.isDebugEnabled()) log.debug("GGFS client connected [ggfsName=" + ggfsCtx.kernalContext().gridName() + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManager.java index f214550..1c0caa3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManager.java @@ -95,7 +95,7 @@ public class GridGgfsServerManager extends GridGgfsManager { srvrs.add(ipcSrv); } - catch (GridIpcEndpointBindException ignored) { + catch (IpcEndpointBindException ignored) { int port = ipcSrv.getIpcServerEndpoint().getPort(); String portMsg = port != -1 ? " Failed to bind to port (is port already in use?): " + port : ""; @@ -114,9 +114,9 @@ public class GridGgfsServerManager extends GridGgfsManager { /** * @return Collection of active endpoints. */ - public Collection<GridIpcServerEndpoint> endpoints() { - return F.viewReadOnly(srvrs, new C1<GridGgfsServer, GridIpcServerEndpoint>() { - @Override public GridIpcServerEndpoint apply(GridGgfsServer e) { + public Collection<IpcServerEndpoint> endpoints() { + return F.viewReadOnly(srvrs, new C1<GridGgfsServer, IpcServerEndpoint>() { + @Override public IpcServerEndpoint apply(GridGgfsServer e) { return e.getIpcServerEndpoint(); } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsNoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsNoopProcessor.java index d23cb13..f744a5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsNoopProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsNoopProcessor.java @@ -59,7 +59,7 @@ public class IgniteFsNoopProcessor extends IgniteFsProcessorAdapter { } /** {@inheritDoc} */ - @Override public Collection<GridIpcServerEndpoint> endpoints(@Nullable String name) { + @Override public Collection<IpcServerEndpoint> endpoints(@Nullable String name) { return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java index cc21789..9d4c99e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessor.java @@ -222,10 +222,10 @@ public class IgniteFsProcessor extends IgniteFsProcessorAdapter { } /** {@inheritDoc} */ - @Override @Nullable public Collection<GridIpcServerEndpoint> endpoints(@Nullable String name) { + @Override @Nullable public Collection<IpcServerEndpoint> endpoints(@Nullable String name) { GridGgfsContext ggfsCtx = ggfsCache.get(maskName(name)); - return ggfsCtx == null ? Collections.<GridIpcServerEndpoint>emptyList() : ggfsCtx.server().endpoints(); + return ggfsCtx == null ? Collections.<IpcServerEndpoint>emptyList() : ggfsCtx.server().endpoints(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessorAdapter.java index 47034ec..ef73ff6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessorAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgniteFsProcessorAdapter.java @@ -62,7 +62,7 @@ public abstract class IgniteFsProcessorAdapter extends GridProcessorAdapter { * @param name GGFS name. * @return Collection of endpoints or {@code null} in case GGFS is not defined. */ - public abstract Collection<GridIpcServerEndpoint> endpoints(@Nullable String name); + public abstract Collection<IpcServerEndpoint> endpoints(@Nullable String name); /** * Create compute job for the given GGFS job. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcEndpoint.java deleted file mode 100644 index 9477e23..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcEndpoint.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc; - -import org.apache.ignite.*; - -import java.io.*; - -/** - * GGFS IPC endpoint used for point-to-point communication. - */ -public interface GridIpcEndpoint extends Closeable { - /** - * Gets input stream associated with this IPC endpoint. - * - * @return IPC input stream. - * @throws IgniteCheckedException If error occurred. - */ - public InputStream inputStream() throws IgniteCheckedException; - - /** - * Gets output stream associated with this IPC endpoint. - * - * @return IPC output stream. - * @throws IgniteCheckedException If error occurred. - */ - public OutputStream outputStream() throws IgniteCheckedException; - - /** - * Closes endpoint. Note that IPC endpoint may acquire native resources so it must be always closed - * once it is not needed. - */ - @Override public void close(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcEndpointBindException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcEndpointBindException.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcEndpointBindException.java deleted file mode 100644 index 81669cc..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcEndpointBindException.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc; - -import org.apache.ignite.*; - -/** - * Represents exception occurred during IPC endpoint binding. - */ -public class GridIpcEndpointBindException extends IgniteCheckedException { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Constructor. - * - * @param msg Message. - */ - public GridIpcEndpointBindException(String msg) { - super(msg); - } - - /** - * Constructor. - * - * @param msg Message. - * @param cause Cause. - */ - public GridIpcEndpointBindException(String msg, Throwable cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcEndpointFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcEndpointFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcEndpointFactory.java deleted file mode 100644 index 83e089f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcEndpointFactory.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.ipc.loopback.*; -import org.apache.ignite.internal.util.ipc.shmem.*; - -/** - * Ggfs endpoint factory for inter-process communication. - */ -public class GridIpcEndpointFactory { - /** - * Connects to open server IPC endpoint. - * - * @param endpointAddr Endpoint address. - * @param log Log. - * @return Connected client endpoint. - * @throws IgniteCheckedException If failed to establish connection. - */ - public static GridIpcEndpoint connectEndpoint(String endpointAddr, IgniteLogger log) throws IgniteCheckedException { - A.notNull(endpointAddr, "endpointAddr"); - - String[] split = endpointAddr.split(":"); - - int port; - - if (split.length == 2) { - try { - port = Integer.parseInt(split[1]); - } - catch (NumberFormatException e) { - throw new IgniteCheckedException("Failed to parse port number: " + endpointAddr, e); - } - } - else - // Use default port. - port = -1; - - return "shmem".equalsIgnoreCase(split[0]) ? - connectSharedMemoryEndpoint(port > 0 ? port : GridIpcSharedMemoryServerEndpoint.DFLT_IPC_PORT, log) : - connectTcpEndpoint(split[0], port > 0 ? port : GridIpcServerTcpEndpoint.DFLT_IPC_PORT); - } - - /** - * Connects loopback IPC endpoint. - * - * @param host Loopback host. - * @param port Loopback endpoint port. - * @return Connected client endpoint. - * @throws IgniteCheckedException If connection failed. - */ - private static GridIpcEndpoint connectTcpEndpoint(String host, int port) throws IgniteCheckedException { - return new GridIpcClientTcpEndpoint(host, port); - } - - /** - * Connects IPC shared memory endpoint. - * - * @param port Endpoint port. - * @param log Log. - * @return Connected client endpoint. - * @throws IgniteCheckedException If connection failed. - */ - private static GridIpcEndpoint connectSharedMemoryEndpoint(int port, IgniteLogger log) throws IgniteCheckedException { - return new GridIpcSharedMemoryClientEndpoint(port, log); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcEndpointType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcEndpointType.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcEndpointType.java deleted file mode 100644 index ba05f3a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcEndpointType.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc; - -/** - * IPC endpoint type. - */ -public enum GridIpcEndpointType { - /** TCP loopback socket. Supported on all platforms. */ - TCP_LOOPBACK, - - /** Shared memory region. Supported on POSIX-compliant OSes. */ - SHARED_MEMORY -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcServerEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcServerEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcServerEndpoint.java deleted file mode 100644 index c09fca5..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcServerEndpoint.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc; - -import org.apache.ignite.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * IPC server endpoint that is capable for client connections accepting. - */ -public interface GridIpcServerEndpoint extends Closeable { - /** - * Accepts client IPC connection. After client connection is accepted, it can be used - * for IPC. This method will block until client connects to IPC server endpoint. - * - * @return Accepted client connection. - * @throws IgniteCheckedException If accept failed and the endpoint is not usable anymore. - */ - public GridIpcEndpoint accept() throws IgniteCheckedException; - - /** - * Starts configured endpoint implementation. - * - * @throws IgniteCheckedException If failed to start server endpoint. - */ - public void start() throws IgniteCheckedException; - - /** - * Closes server IPC. After IPC is closed, no further operations can be performed on this - * object. - */ - @Override public void close(); - - /** - * Gets port endpoint is bound to. - * Endpoints who does not bind to any port should return -1. - * - * @return Port number. - */ - public int getPort(); - - /** - * Gets host endpoint is bound to. - * Endpoints who does not bind to any port should return {@code null}. - * - * @return Host. - */ - @Nullable public String getHost(); - - /** - * Indicates if this endpoint is a management endpoint. - * - * @return {@code true} if it's a management endpoint. - */ - public boolean isManagement(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcServerEndpointDeserializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcServerEndpointDeserializer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcServerEndpointDeserializer.java deleted file mode 100644 index 3a663a1..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcServerEndpointDeserializer.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.ipc.loopback.*; -import org.apache.ignite.internal.util.ipc.shmem.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; - -/** - * Grid GridIpcServerEndpoint configuration deserializer. - */ -public class GridIpcServerEndpointDeserializer { - /** - * Deserializes IPC server endpoint config into concrete - * instance of {@link GridIpcServerEndpoint}. - * - * @param endpointCfg Map with properties of the IPC server endpoint config. - * @return Deserialized instance of {@link GridIpcServerEndpoint}. - * @throws IgniteCheckedException If any problem with configuration properties setting has happened. - */ - public static GridIpcServerEndpoint deserialize(Map<String,String> endpointCfg) throws IgniteCheckedException { - A.notNull(endpointCfg, "endpointCfg"); - - String endpointType = endpointCfg.get("type"); - - if (endpointType == null) - throw new IgniteCheckedException("Failed to create server endpoint (type is not specified)"); - - switch (endpointType) { - case "shmem": { - GridIpcSharedMemoryServerEndpoint endpoint = new GridIpcSharedMemoryServerEndpoint(); - - endpoint.setupConfiguration(endpointCfg); - - return endpoint; - } - case "tcp": { - GridIpcServerTcpEndpoint endpoint = new GridIpcServerTcpEndpoint(); - - endpoint.setupConfiguration(endpointCfg); - - return endpoint; - } - default: - throw new IgniteCheckedException("Failed to create server endpoint (type is unknown): " + endpointType); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcToNioAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcToNioAdapter.java deleted file mode 100644 index 50dc9c6..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/GridIpcToNioAdapter.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.nio.*; - -import java.io.*; -import java.nio.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Allows to re-use existing {@link GridNioFilter}s on IPC (specifically shared memory IPC) - * communications. - * - * Note that this class consumes an entire thread inside {@link #serve()} method - * in order to serve one {@link GridIpcEndpoint}. - */ -public class GridIpcToNioAdapter<T> { - /** */ - private final GridIpcEndpoint endp; - - /** */ - private final GridNioFilterChain<T> chain; - - /** */ - private final GridNioSessionImpl ses; - - /** */ - private final AtomicReference<CountDownLatch> latchRef = new AtomicReference<>(); - - /** */ - private final ByteBuffer writeBuf; - - /** */ - private final GridNioMetricsListener metricsLsnr; - - /** */ - private final GridNioMessageWriter msgWriter; - - /** - * @param metricsLsnr Metrics listener. - * @param log Log. - * @param endp Endpoint. - * @param msgWriter Message writer. - * @param lsnr Listener. - * @param filters Filters. - */ - public GridIpcToNioAdapter(GridNioMetricsListener metricsLsnr, IgniteLogger log, GridIpcEndpoint endp, - GridNioMessageWriter msgWriter, GridNioServerListener<T> lsnr, GridNioFilter... filters) { - assert metricsLsnr != null; - assert msgWriter != null; - - this.metricsLsnr = metricsLsnr; - this.endp = endp; - this.msgWriter = msgWriter; - - chain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters); - ses = new GridNioSessionImpl(chain, null, null, true); - - writeBuf = ByteBuffer.allocate(8 << 10); - - writeBuf.order(ByteOrder.nativeOrder()); - } - - /** - * Serves given set of listeners repeatedly reading data from the endpoint. - * - * @throws InterruptedException If interrupted. - */ - public void serve() throws InterruptedException { - try { - chain.onSessionOpened(ses); - - InputStream in = endp.inputStream(); - - ByteBuffer readBuf = ByteBuffer.allocate(8 << 10); - - readBuf.order(ByteOrder.nativeOrder()); - - assert readBuf.hasArray(); - - while (!Thread.interrupted()) { - int pos = readBuf.position(); - - int read = in.read(readBuf.array(), pos, readBuf.remaining()); - - if (read > 0) { - metricsLsnr.onBytesReceived(read); - - readBuf.position(0); - readBuf.limit(pos + read); - - chain.onMessageReceived(ses, readBuf); - - if (readBuf.hasRemaining()) - readBuf.compact(); - else - readBuf.clear(); - - CountDownLatch latch = latchRef.get(); - - if (latch != null) - latch.await(); - } - else if (read < 0) { - endp.close(); - - break; // And close below. - } - } - } - catch (Exception e) { - chain.onExceptionCaught(ses, new IgniteCheckedException("Failed to read from IPC endpoint.", e)); - } - finally { - try { - // Assuming remote end closed connection - pushing event from head to tail. - chain.onSessionClosed(ses); - } - catch (IgniteCheckedException e) { - chain.onExceptionCaught(ses, new IgniteCheckedException("Failed to process session close event " + - "for IPC endpoint.", e)); - } - } - } - - /** - * Handles write events on chain. - * - * @param msg Buffer to send. - * @return Send result. - */ - private GridNioFuture<?> send(GridTcpCommunicationMessageAdapter msg) { - assert writeBuf.hasArray(); - - try { - // This method is called only on handshake, - // so we don't need to provide node ID for - // rolling updates support. - int cnt = msgWriter.writeFully(null, msg, endp.outputStream(), writeBuf); - - metricsLsnr.onBytesSent(cnt); - } - catch (IOException | IgniteCheckedException e) { - return new GridNioFinishedFuture<Object>(e); - } - - return new GridNioFinishedFuture<>((Object)null); - } - - /** - * Filter forwarding messages from chain's head to this server. - */ - private class HeadFilter extends GridNioFilterAdapter { - /** - * Assigns filter name. - */ - protected HeadFilter() { - super("HeadFilter"); - } - - /** {@inheritDoc} */ - @Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException { - proceedSessionOpened(ses); - } - - /** {@inheritDoc} */ - @Override public void onSessionClosed(GridNioSession ses) throws IgniteCheckedException { - proceedSessionClosed(ses); - } - - /** {@inheritDoc} */ - @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException { - proceedExceptionCaught(ses, ex); - } - - /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) { - assert ses == GridIpcToNioAdapter.this.ses; - - return send((GridTcpCommunicationMessageAdapter)msg); - } - - /** {@inheritDoc} */ - @Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException { - proceedMessageReceived(ses, msg); - } - - /** {@inheritDoc} */ - @Override public GridNioFuture<?> onPauseReads(GridNioSession ses) throws IgniteCheckedException { - // This call should be synced externally to avoid races. - boolean b = latchRef.compareAndSet(null, new CountDownLatch(1)); - - assert b; - - return new GridNioFinishedFuture<>(b); - } - - /** {@inheritDoc} */ - @Override public GridNioFuture<?> onResumeReads(GridNioSession ses) throws IgniteCheckedException { - // This call should be synced externally to avoid races. - CountDownLatch latch = latchRef.getAndSet(null); - - if (latch != null) - latch.countDown(); - - return new GridNioFinishedFuture<Object>(latch != null); - } - - /** {@inheritDoc} */ - @Override public GridNioFuture<Boolean> onSessionClose(GridNioSession ses) { - assert ses == GridIpcToNioAdapter.this.ses; - - boolean closed = GridIpcToNioAdapter.this.ses.setClosed(); - - if (closed) - endp.close(); - - return new GridNioFinishedFuture<>(closed); - } - - /** {@inheritDoc} */ - @Override public void onSessionIdleTimeout(GridNioSession ses) throws IgniteCheckedException { - proceedSessionIdleTimeout(ses); - } - - /** {@inheritDoc} */ - @Override public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException { - proceedSessionWriteTimeout(ses); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpoint.java new file mode 100644 index 0000000..3e91100 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpoint.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc; + +import org.apache.ignite.*; + +import java.io.*; + +/** + * GGFS IPC endpoint used for point-to-point communication. + */ +public interface IpcEndpoint extends Closeable { + /** + * Gets input stream associated with this IPC endpoint. + * + * @return IPC input stream. + * @throws IgniteCheckedException If error occurred. + */ + public InputStream inputStream() throws IgniteCheckedException; + + /** + * Gets output stream associated with this IPC endpoint. + * + * @return IPC output stream. + * @throws IgniteCheckedException If error occurred. + */ + public OutputStream outputStream() throws IgniteCheckedException; + + /** + * Closes endpoint. Note that IPC endpoint may acquire native resources so it must be always closed + * once it is not needed. + */ + @Override public void close(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpointBindException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpointBindException.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpointBindException.java new file mode 100644 index 0000000..ab1e3cf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpointBindException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc; + +import org.apache.ignite.*; + +/** + * Represents exception occurred during IPC endpoint binding. + */ +public class IpcEndpointBindException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Constructor. + * + * @param msg Message. + */ + public IpcEndpointBindException(String msg) { + super(msg); + } + + /** + * Constructor. + * + * @param msg Message. + * @param cause Cause. + */ + public IpcEndpointBindException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpointFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpointFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpointFactory.java new file mode 100644 index 0000000..d0067c7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpointFactory.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.ipc.loopback.*; +import org.apache.ignite.internal.util.ipc.shmem.*; + +/** + * Ggfs endpoint factory for inter-process communication. + */ +public class IpcEndpointFactory { + /** + * Connects to open server IPC endpoint. + * + * @param endpointAddr Endpoint address. + * @param log Log. + * @return Connected client endpoint. + * @throws IgniteCheckedException If failed to establish connection. + */ + public static IpcEndpoint connectEndpoint(String endpointAddr, IgniteLogger log) throws IgniteCheckedException { + A.notNull(endpointAddr, "endpointAddr"); + + String[] split = endpointAddr.split(":"); + + int port; + + if (split.length == 2) { + try { + port = Integer.parseInt(split[1]); + } + catch (NumberFormatException e) { + throw new IgniteCheckedException("Failed to parse port number: " + endpointAddr, e); + } + } + else + // Use default port. + port = -1; + + return "shmem".equalsIgnoreCase(split[0]) ? + connectSharedMemoryEndpoint(port > 0 ? port : IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT, log) : + connectTcpEndpoint(split[0], port > 0 ? port : IpcServerTcpEndpoint.DFLT_IPC_PORT); + } + + /** + * Connects loopback IPC endpoint. + * + * @param host Loopback host. + * @param port Loopback endpoint port. + * @return Connected client endpoint. + * @throws IgniteCheckedException If connection failed. + */ + private static IpcEndpoint connectTcpEndpoint(String host, int port) throws IgniteCheckedException { + return new IpcClientTcpEndpoint(host, port); + } + + /** + * Connects IPC shared memory endpoint. + * + * @param port Endpoint port. + * @param log Log. + * @return Connected client endpoint. + * @throws IgniteCheckedException If connection failed. + */ + private static IpcEndpoint connectSharedMemoryEndpoint(int port, IgniteLogger log) throws IgniteCheckedException { + return new IpcSharedMemoryClientEndpoint(port, log); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpointType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpointType.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpointType.java new file mode 100644 index 0000000..6636d94 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpointType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc; + +/** + * IPC endpoint type. + */ +public enum IpcEndpointType { + /** TCP loopback socket. Supported on all platforms. */ + TCP_LOOPBACK, + + /** Shared memory region. Supported on POSIX-compliant OSes. */ + SHARED_MEMORY +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpoint.java new file mode 100644 index 0000000..ec13308 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpoint.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc; + +import org.apache.ignite.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * IPC server endpoint that is capable for client connections accepting. + */ +public interface IpcServerEndpoint extends Closeable { + /** + * Accepts client IPC connection. After client connection is accepted, it can be used + * for IPC. This method will block until client connects to IPC server endpoint. + * + * @return Accepted client connection. + * @throws IgniteCheckedException If accept failed and the endpoint is not usable anymore. + */ + public IpcEndpoint accept() throws IgniteCheckedException; + + /** + * Starts configured endpoint implementation. + * + * @throws IgniteCheckedException If failed to start server endpoint. + */ + public void start() throws IgniteCheckedException; + + /** + * Closes server IPC. After IPC is closed, no further operations can be performed on this + * object. + */ + @Override public void close(); + + /** + * Gets port endpoint is bound to. + * Endpoints who does not bind to any port should return -1. + * + * @return Port number. + */ + public int getPort(); + + /** + * Gets host endpoint is bound to. + * Endpoints who does not bind to any port should return {@code null}. + * + * @return Host. + */ + @Nullable public String getHost(); + + /** + * Indicates if this endpoint is a management endpoint. + * + * @return {@code true} if it's a management endpoint. + */ + public boolean isManagement(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializer.java new file mode 100644 index 0000000..07bc28b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializer.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.ipc.loopback.*; +import org.apache.ignite.internal.util.ipc.shmem.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * Grid IpcServerEndpoint configuration deserializer. + */ +public class IpcServerEndpointDeserializer { + /** + * Deserializes IPC server endpoint config into concrete + * instance of {@link IpcServerEndpoint}. + * + * @param endpointCfg Map with properties of the IPC server endpoint config. + * @return Deserialized instance of {@link IpcServerEndpoint}. + * @throws IgniteCheckedException If any problem with configuration properties setting has happened. + */ + public static IpcServerEndpoint deserialize(Map<String,String> endpointCfg) throws IgniteCheckedException { + A.notNull(endpointCfg, "endpointCfg"); + + String endpointType = endpointCfg.get("type"); + + if (endpointType == null) + throw new IgniteCheckedException("Failed to create server endpoint (type is not specified)"); + + switch (endpointType) { + case "shmem": { + IpcSharedMemoryServerEndpoint endpoint = new IpcSharedMemoryServerEndpoint(); + + endpoint.setupConfiguration(endpointCfg); + + return endpoint; + } + case "tcp": { + IpcServerTcpEndpoint endpoint = new IpcServerTcpEndpoint(); + + endpoint.setupConfiguration(endpointCfg); + + return endpoint; + } + default: + throw new IgniteCheckedException("Failed to create server endpoint (type is unknown): " + endpointType); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java new file mode 100644 index 0000000..5097db7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.nio.*; + +import java.io.*; +import java.nio.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Allows to re-use existing {@link GridNioFilter}s on IPC (specifically shared memory IPC) + * communications. + * + * Note that this class consumes an entire thread inside {@link #serve()} method + * in order to serve one {@link IpcEndpoint}. + */ +public class IpcToNioAdapter<T> { + /** */ + private final IpcEndpoint endp; + + /** */ + private final GridNioFilterChain<T> chain; + + /** */ + private final GridNioSessionImpl ses; + + /** */ + private final AtomicReference<CountDownLatch> latchRef = new AtomicReference<>(); + + /** */ + private final ByteBuffer writeBuf; + + /** */ + private final GridNioMetricsListener metricsLsnr; + + /** */ + private final GridNioMessageWriter msgWriter; + + /** + * @param metricsLsnr Metrics listener. + * @param log Log. + * @param endp Endpoint. + * @param msgWriter Message writer. + * @param lsnr Listener. + * @param filters Filters. + */ + public IpcToNioAdapter(GridNioMetricsListener metricsLsnr, IgniteLogger log, IpcEndpoint endp, + GridNioMessageWriter msgWriter, GridNioServerListener<T> lsnr, GridNioFilter... filters) { + assert metricsLsnr != null; + assert msgWriter != null; + + this.metricsLsnr = metricsLsnr; + this.endp = endp; + this.msgWriter = msgWriter; + + chain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters); + ses = new GridNioSessionImpl(chain, null, null, true); + + writeBuf = ByteBuffer.allocate(8 << 10); + + writeBuf.order(ByteOrder.nativeOrder()); + } + + /** + * Serves given set of listeners repeatedly reading data from the endpoint. + * + * @throws InterruptedException If interrupted. + */ + public void serve() throws InterruptedException { + try { + chain.onSessionOpened(ses); + + InputStream in = endp.inputStream(); + + ByteBuffer readBuf = ByteBuffer.allocate(8 << 10); + + readBuf.order(ByteOrder.nativeOrder()); + + assert readBuf.hasArray(); + + while (!Thread.interrupted()) { + int pos = readBuf.position(); + + int read = in.read(readBuf.array(), pos, readBuf.remaining()); + + if (read > 0) { + metricsLsnr.onBytesReceived(read); + + readBuf.position(0); + readBuf.limit(pos + read); + + chain.onMessageReceived(ses, readBuf); + + if (readBuf.hasRemaining()) + readBuf.compact(); + else + readBuf.clear(); + + CountDownLatch latch = latchRef.get(); + + if (latch != null) + latch.await(); + } + else if (read < 0) { + endp.close(); + + break; // And close below. + } + } + } + catch (Exception e) { + chain.onExceptionCaught(ses, new IgniteCheckedException("Failed to read from IPC endpoint.", e)); + } + finally { + try { + // Assuming remote end closed connection - pushing event from head to tail. + chain.onSessionClosed(ses); + } + catch (IgniteCheckedException e) { + chain.onExceptionCaught(ses, new IgniteCheckedException("Failed to process session close event " + + "for IPC endpoint.", e)); + } + } + } + + /** + * Handles write events on chain. + * + * @param msg Buffer to send. + * @return Send result. + */ + private GridNioFuture<?> send(GridTcpCommunicationMessageAdapter msg) { + assert writeBuf.hasArray(); + + try { + // This method is called only on handshake, + // so we don't need to provide node ID for + // rolling updates support. + int cnt = msgWriter.writeFully(null, msg, endp.outputStream(), writeBuf); + + metricsLsnr.onBytesSent(cnt); + } + catch (IOException | IgniteCheckedException e) { + return new GridNioFinishedFuture<Object>(e); + } + + return new GridNioFinishedFuture<>((Object)null); + } + + /** + * Filter forwarding messages from chain's head to this server. + */ + private class HeadFilter extends GridNioFilterAdapter { + /** + * Assigns filter name. + */ + protected HeadFilter() { + super("HeadFilter"); + } + + /** {@inheritDoc} */ + @Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException { + proceedSessionOpened(ses); + } + + /** {@inheritDoc} */ + @Override public void onSessionClosed(GridNioSession ses) throws IgniteCheckedException { + proceedSessionClosed(ses); + } + + /** {@inheritDoc} */ + @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException { + proceedExceptionCaught(ses, ex); + } + + /** {@inheritDoc} */ + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) { + assert ses == IpcToNioAdapter.this.ses; + + return send((GridTcpCommunicationMessageAdapter)msg); + } + + /** {@inheritDoc} */ + @Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException { + proceedMessageReceived(ses, msg); + } + + /** {@inheritDoc} */ + @Override public GridNioFuture<?> onPauseReads(GridNioSession ses) throws IgniteCheckedException { + // This call should be synced externally to avoid races. + boolean b = latchRef.compareAndSet(null, new CountDownLatch(1)); + + assert b; + + return new GridNioFinishedFuture<>(b); + } + + /** {@inheritDoc} */ + @Override public GridNioFuture<?> onResumeReads(GridNioSession ses) throws IgniteCheckedException { + // This call should be synced externally to avoid races. + CountDownLatch latch = latchRef.getAndSet(null); + + if (latch != null) + latch.countDown(); + + return new GridNioFinishedFuture<Object>(latch != null); + } + + /** {@inheritDoc} */ + @Override public GridNioFuture<Boolean> onSessionClose(GridNioSession ses) { + assert ses == IpcToNioAdapter.this.ses; + + boolean closed = IpcToNioAdapter.this.ses.setClosed(); + + if (closed) + endp.close(); + + return new GridNioFinishedFuture<>(closed); + } + + /** {@inheritDoc} */ + @Override public void onSessionIdleTimeout(GridNioSession ses) throws IgniteCheckedException { + proceedSessionIdleTimeout(ses); + } + + /** {@inheritDoc} */ + @Override public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException { + proceedSessionWriteTimeout(ses); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/loopback/GridIpcClientTcpEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/loopback/GridIpcClientTcpEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/loopback/GridIpcClientTcpEndpoint.java deleted file mode 100644 index 4586ba4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/loopback/GridIpcClientTcpEndpoint.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.loopback; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.ipc.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.net.*; - -/** - * Loopback IPC endpoint based on socket. - */ -public class GridIpcClientTcpEndpoint implements GridIpcEndpoint { - /** Client socket. */ - private Socket clientSock; - - /** - * Creates connected client IPC endpoint. - * - * @param clientSock Connected client socket. - */ - public GridIpcClientTcpEndpoint(Socket clientSock) { - assert clientSock != null; - - this.clientSock = clientSock; - } - - /** - * Creates and connects client IPC endpoint. - * - * @param port Port. - * @param host Host. - * @throws IgniteCheckedException If connection fails. - */ - public GridIpcClientTcpEndpoint(String host, int port) throws IgniteCheckedException { - clientSock = new Socket(); - - try { - clientSock.connect(new InetSocketAddress(host, port)); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to connect to endpoint [host=" + host + ", port=" + port + ']', e); - } - } - - /** {@inheritDoc} */ - @Override public InputStream inputStream() throws IgniteCheckedException { - try { - return clientSock.getInputStream(); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public OutputStream outputStream() throws IgniteCheckedException { - try { - return clientSock.getOutputStream(); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public void close() { - U.closeQuiet(clientSock); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/loopback/GridIpcServerTcpEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/loopback/GridIpcServerTcpEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/loopback/GridIpcServerTcpEndpoint.java deleted file mode 100644 index 695b66b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/loopback/GridIpcServerTcpEndpoint.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.loopback; - -import org.apache.ignite.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.ipc.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -/** - * Server loopback IPC endpoint. - */ -public class GridIpcServerTcpEndpoint implements GridIpcServerEndpoint { - /** Default endpoint port number. */ - public static final int DFLT_IPC_PORT = 10500; - - /** Server socket. */ - private ServerSocket srvSock; - - /** Port to bind socket to. */ - private int port = DFLT_IPC_PORT; - - /** Host to bind socket to. */ - private String host; - - /** Management endpoint flag. */ - private boolean mgmt; - - /** Logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - if (port <= 0 || port >= 0xffff) - throw new GridIpcEndpointBindException("Port value is illegal: " + port); - - try { - srvSock = new ServerSocket(); - - assert host != null; - - srvSock.bind(new InetSocketAddress(U.resolveLocalHost(host), port)); - - if (log.isInfoEnabled()) - log.info("IPC server loopback endpoint started [port=" + port + ']'); - } - catch (IOException e) { - if (srvSock != null) - U.closeQuiet(srvSock); - - throw new GridIpcEndpointBindException("Failed to bind loopback IPC endpoint (is port already in " + - "use?): " + port, e); - } - } - - /** {@inheritDoc} */ - @Override public GridIpcEndpoint accept() throws IgniteCheckedException { - try { - Socket sock = srvSock.accept(); - - return new GridIpcClientTcpEndpoint(sock); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public void close() { - U.closeQuiet(srvSock); - } - - /** {@inheritDoc} */ - @Override public int getPort() { - return port; - } - - /** - * Sets port endpoint will be bound to. - * - * @param port Port number. - */ - public void setPort(int port) { - this.port = port; - } - - /** {@inheritDoc} */ - @Override public String getHost() { - return host; - } - - /** - * Sets host endpoint will be bound to. - * - * @param host Host. - */ - public void setHost(String host) { - this.host = host; - } - - /** {@inheritDoc} */ - @Override public boolean isManagement() { - return mgmt; - } - - /** - * Sets management property. - * - * @param mgmt flag. - */ - public void setManagement(boolean mgmt) { - this.mgmt = mgmt; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridIpcServerTcpEndpoint.class, this); - } - - /** - * Sets configuration properties from the map. - * - * @param endpointCfg Map of properties. - * @throws IgniteCheckedException If invalid property name or value. - */ - public void setupConfiguration(Map<String, String> endpointCfg) throws IgniteCheckedException { - for (Map.Entry<String,String> e : endpointCfg.entrySet()) { - try { - switch (e.getKey()) { - case "type": - //Ignore this property - break; - - case "port": - setPort(Integer.parseInt(e.getValue())); - break; - - case "host": - setHost(e.getValue()); - break; - - case "management": - setManagement(Boolean.valueOf(e.getValue())); - break; - - default: - throw new IgniteCheckedException("Invalid property '" + e.getKey() + "' of " + getClass().getSimpleName()); - } - } - catch (Throwable t) { - if (t instanceof IgniteCheckedException) - throw t; - - throw new IgniteCheckedException("Invalid value '" + e.getValue() + "' of the property '" + e.getKey() + "' in " + - getClass().getSimpleName(), t); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/loopback/IpcClientTcpEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/loopback/IpcClientTcpEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/loopback/IpcClientTcpEndpoint.java new file mode 100644 index 0000000..ab787b6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/loopback/IpcClientTcpEndpoint.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.loopback; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.net.*; + +/** + * Loopback IPC endpoint based on socket. + */ +public class IpcClientTcpEndpoint implements IpcEndpoint { + /** Client socket. */ + private Socket clientSock; + + /** + * Creates connected client IPC endpoint. + * + * @param clientSock Connected client socket. + */ + public IpcClientTcpEndpoint(Socket clientSock) { + assert clientSock != null; + + this.clientSock = clientSock; + } + + /** + * Creates and connects client IPC endpoint. + * + * @param port Port. + * @param host Host. + * @throws IgniteCheckedException If connection fails. + */ + public IpcClientTcpEndpoint(String host, int port) throws IgniteCheckedException { + clientSock = new Socket(); + + try { + clientSock.connect(new InetSocketAddress(host, port)); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to connect to endpoint [host=" + host + ", port=" + port + ']', e); + } + } + + /** {@inheritDoc} */ + @Override public InputStream inputStream() throws IgniteCheckedException { + try { + return clientSock.getInputStream(); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public OutputStream outputStream() throws IgniteCheckedException { + try { + return clientSock.getOutputStream(); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public void close() { + U.closeQuiet(clientSock); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/loopback/IpcServerTcpEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/loopback/IpcServerTcpEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/loopback/IpcServerTcpEndpoint.java new file mode 100644 index 0000000..f201b9b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/loopback/IpcServerTcpEndpoint.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.ipc.loopback; + +import org.apache.ignite.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.ipc.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Server loopback IPC endpoint. + */ +public class IpcServerTcpEndpoint implements IpcServerEndpoint { + /** Default endpoint port number. */ + public static final int DFLT_IPC_PORT = 10500; + + /** Server socket. */ + private ServerSocket srvSock; + + /** Port to bind socket to. */ + private int port = DFLT_IPC_PORT; + + /** Host to bind socket to. */ + private String host; + + /** Management endpoint flag. */ + private boolean mgmt; + + /** Logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + if (port <= 0 || port >= 0xffff) + throw new IpcEndpointBindException("Port value is illegal: " + port); + + try { + srvSock = new ServerSocket(); + + assert host != null; + + srvSock.bind(new InetSocketAddress(U.resolveLocalHost(host), port)); + + if (log.isInfoEnabled()) + log.info("IPC server loopback endpoint started [port=" + port + ']'); + } + catch (IOException e) { + if (srvSock != null) + U.closeQuiet(srvSock); + + throw new IpcEndpointBindException("Failed to bind loopback IPC endpoint (is port already in " + + "use?): " + port, e); + } + } + + /** {@inheritDoc} */ + @Override public IpcEndpoint accept() throws IgniteCheckedException { + try { + Socket sock = srvSock.accept(); + + return new IpcClientTcpEndpoint(sock); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public void close() { + U.closeQuiet(srvSock); + } + + /** {@inheritDoc} */ + @Override public int getPort() { + return port; + } + + /** + * Sets port endpoint will be bound to. + * + * @param port Port number. + */ + public void setPort(int port) { + this.port = port; + } + + /** {@inheritDoc} */ + @Override public String getHost() { + return host; + } + + /** + * Sets host endpoint will be bound to. + * + * @param host Host. + */ + public void setHost(String host) { + this.host = host; + } + + /** {@inheritDoc} */ + @Override public boolean isManagement() { + return mgmt; + } + + /** + * Sets management property. + * + * @param mgmt flag. + */ + public void setManagement(boolean mgmt) { + this.mgmt = mgmt; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IpcServerTcpEndpoint.class, this); + } + + /** + * Sets configuration properties from the map. + * + * @param endpointCfg Map of properties. + * @throws IgniteCheckedException If invalid property name or value. + */ + public void setupConfiguration(Map<String, String> endpointCfg) throws IgniteCheckedException { + for (Map.Entry<String,String> e : endpointCfg.entrySet()) { + try { + switch (e.getKey()) { + case "type": + //Ignore this property + break; + + case "port": + setPort(Integer.parseInt(e.getValue())); + break; + + case "host": + setHost(e.getValue()); + break; + + case "management": + setManagement(Boolean.valueOf(e.getValue())); + break; + + default: + throw new IgniteCheckedException("Invalid property '" + e.getKey() + "' of " + getClass().getSimpleName()); + } + } + catch (Throwable t) { + if (t instanceof IgniteCheckedException) + throw t; + + throw new IgniteCheckedException("Invalid value '" + e.getValue() + "' of the property '" + e.getKey() + "' in " + + getClass().getSimpleName(), t); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ca07d/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcOutOfSystemResourcesException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcOutOfSystemResourcesException.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcOutOfSystemResourcesException.java deleted file mode 100644 index 93dba67..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcOutOfSystemResourcesException.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc.shmem; - -import org.apache.ignite.*; -import org.jetbrains.annotations.*; - -/** - * Thrown when IPC runs out of system resources (for example, no more free shared memory is - * available in operating system). - */ -public class GridIpcOutOfSystemResourcesException extends IgniteCheckedException { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Creates new exception with given error message. - * - * @param msg Error message. - */ - public GridIpcOutOfSystemResourcesException(String msg) { - super(msg); - } - - /** - * Creates new exception with given throwable as a cause and - * source of error message. - * - * @param cause Non-null throwable cause. - */ - public GridIpcOutOfSystemResourcesException(Throwable cause) { - super(cause); - } - - /** - * Creates new exception with given error message and optional nested exception. - * - * @param msg Error message. - * @param cause Optional nested exception (can be {@code null}). - */ - public GridIpcOutOfSystemResourcesException(String msg, @Nullable Throwable cause) { - super(msg, cause); - } -}