# IGNITE-465: Done.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/030c373f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/030c373f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/030c373f Branch: refs/heads/master Commit: 030c373fa6e95d3a44295cb42ce5c821dfe5bdd5 Parents: 10f6643 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Thu Mar 12 18:18:39 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Thu Mar 12 18:18:39 2015 +0300 ---------------------------------------------------------------------- config/hadoop/default-config.xml | 10 +- examples/config/filesystem/example-igfs.xml | 13 +- .../configuration/FileSystemConfiguration.java | 55 ++--- .../igfs/IgfsIpcEndpointConfiguration.java | 241 +++++++++++++++++++ .../apache/ignite/igfs/IgfsIpcEndpointType.java | 29 +++ .../internal/processors/igfs/IgfsServer.java | 49 +++- .../processors/igfs/IgfsServerManager.java | 35 ++- .../util/ipc/IpcServerEndpointDeserializer.java | 66 ----- .../visor/node/VisorIgfsConfiguration.java | 5 +- modules/core/src/test/config/igfs-loopback.xml | 6 +- modules/core/src/test/config/igfs-shmem.xml | 8 +- .../processors/igfs/IgfsAbstractSelfTest.java | 24 +- ...sCachePerBlockLruEvictionPolicySelfTest.java | 12 +- .../processors/igfs/IgfsMetricsSelfTest.java | 12 +- .../processors/igfs/IgfsModesSelfTest.java | 9 +- ...IpcEndpointRegistrationAbstractSelfTest.java | 21 +- ...dpointRegistrationOnLinuxAndMacSelfTest.java | 7 +- ...pcEndpointRegistrationOnWindowsSelfTest.java | 5 +- .../IpcServerEndpointDeserializerSelfTest.java | 160 ------------ .../ipc/shmem/IpcSharedMemoryNodeStartup.java | 9 +- .../ignite/testsuites/IgniteIgfsTestSuite.java | 2 - .../hadoop/igfs/HadoopIgfsEndpoint.java | 5 +- .../HadoopIgfs20FileSystemAbstractSelfTest.java | 4 +- ...Igfs20FileSystemLoopbackPrimarySelfTest.java | 14 +- ...oopIgfs20FileSystemShmemPrimarySelfTest.java | 14 +- .../igfs/HadoopIgfsDualAbstractSelfTest.java | 25 +- ...oopSecondaryFileSystemConfigurationTest.java | 25 +- .../apache/ignite/igfs/IgfsEventsTestSuite.java | 44 ++-- .../igfs/IgfsNearOnlyMultiNodeSelfTest.java | 10 +- .../IgniteHadoopFileSystemAbstractSelfTest.java | 16 +- .../IgniteHadoopFileSystemClientSelfTest.java | 11 +- ...IgniteHadoopFileSystemHandshakeSelfTest.java | 14 +- .../IgniteHadoopFileSystemIpcCacheSelfTest.java | 10 +- ...niteHadoopFileSystemLoggerStateSelfTest.java | 12 +- ...adoopFileSystemLoopbackAbstractSelfTest.java | 12 +- ...teHadoopFileSystemSecondaryModeSelfTest.java | 23 +- ...teHadoopFileSystemShmemAbstractSelfTest.java | 12 +- .../hadoop/HadoopPopularWordsTest.java | 4 +- 38 files changed, 580 insertions(+), 453 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/config/hadoop/default-config.xml ---------------------------------------------------------------------- diff --git a/config/hadoop/default-config.xml b/config/hadoop/default-config.xml index 65a281e..e45311e 100644 --- a/config/hadoop/default-config.xml +++ b/config/hadoop/default-config.xml @@ -123,11 +123,11 @@ <!-- Configure TCP endpoint for communication with the file system instance. --> <property name="ipcEndpointConfiguration"> - <map> - <entry key="type" value="tcp"/> - <entry key="host" value="0.0.0.0"/> - <entry key="port" value="10500"/> - </map> + <bean class="org.apache.ignite.igfs.IgfsIpcEndpointConfiguration"> + <property name="type" value="TCP" /> + <property name="host" value="0.0.0.0" /> + <property name="port" value="10500" /> + </bean> </property> <!-- Example secondary file system configuration (IGFS configured over Hadoop HDFS): --> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/examples/config/filesystem/example-igfs.xml ---------------------------------------------------------------------- diff --git a/examples/config/filesystem/example-igfs.xml b/examples/config/filesystem/example-igfs.xml index d8ccd34..7334fee 100644 --- a/examples/config/filesystem/example-igfs.xml +++ b/examples/config/filesystem/example-igfs.xml @@ -90,9 +90,9 @@ --> <!-- <property name="ipcEndpointConfiguration"> - <map> - <entry key="type" value="tcp"/> - </map> + <bean class="org.apache.ignite.igfs.IgfsIpcEndpointConfiguration"> + <property name="type" value="TCP" /> + </bean> </property> --> @@ -101,10 +101,9 @@ --> <!-- <property name="ipcEndpointConfiguration"> - <map> - <entry key="type" value="shmem"/> - <entry key="port" value="10500"/> - </map> + <bean class="org.apache.ignite.igfs.IgfsIpcEndpointConfiguration"> + <property name="type" value="SHMEM" /> + </bean> </property> --> </bean> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java index f679fc0..5793df1 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java @@ -34,9 +34,6 @@ public class FileSystemConfiguration { /** Default file system user name. */ public static final String DFLT_USER_NAME = System.getProperty("user.name", "anonymous"); - /** Default IPC port. */ - public static final int DFLT_IPC_PORT = 10500; - /** Default fragmentizer throttling block length. */ public static final long DFLT_FRAGMENTIZER_THROTTLING_BLOCK_LENGTH = 16 * 1024 * 1024; @@ -109,8 +106,8 @@ public class FileSystemConfiguration { /** Per node parallel operations. */ private int perNodeParallelBatchCnt = DFLT_PER_NODE_PARALLEL_BATCH_CNT; - /** IPC endpoint properties to publish IGFS over. */ - private Map<String, String> ipcEndpointCfg; + /** IPC endpoint configuration. */ + private IgfsIpcEndpointConfiguration ipcEndpointCfg; /** IPC endpoint enabled flag. */ private boolean ipcEndpointEnabled = DFLT_IPC_ENDPOINT_ENABLED; @@ -401,52 +398,35 @@ public class FileSystemConfiguration { } /** - * Gets map of IPC endpoint configuration properties. There are 2 different - * types of endpoint supported: {@code shared-memory}, and {@code TCP}. - * <p> - * The following configuration properties are supported for {@code shared-memory} - * endpoint: - * <ul> - * <li>{@code type} - value is {@code shmem} to specify {@code shared-memory} approach.</li> - * <li>{@code port} - endpoint port.</li> - * <li>{@code size} - memory size allocated for single endpoint communication.</li> - * <li> - * {@code tokenDirectoryPath} - path, either absolute or relative to {@code IGNITE_HOME} to - * store shared memory tokens. - * </li> - * </ul> - * <p> - * The following configuration properties are supported for {@code TCP} approach: - * <ul> - * <li>{@code type} - value is {@code tcp} to specify {@code TCP} approach.</li> - * <li>{@code port} - endpoint bind port.</li> - * <li> - * {@code host} - endpoint bind host. If omitted '127.0.0.1' will be used. - * </li> - * </ul> + * Gets IPC endpoint configuration. * <p> - * Note that {@code shared-memory} approach is not supported on Windows environments. - * In case IGFS is failed to bind to particular port, further attempts will be performed every 3 seconds. + * Endpoint is needed for communication between IGFS and {@code IgniteHadoopFileSystem} shipped with <b>Ignite + * Hadoop Accelerator</b>. * - * @return Map of IPC endpoint configuration properties. In case the value is not set, defaults will be used. Default - * type for Windows is "tcp", for all other platforms - "shmem". Default port is {@link #DFLT_IPC_PORT}. + * @return IPC endpoint configuration. */ - @Nullable public Map<String,String> getIpcEndpointConfiguration() { + @Nullable public IgfsIpcEndpointConfiguration getIpcEndpointConfiguration() { return ipcEndpointCfg; } /** - * Sets IPC endpoint configuration to publish IGFS over. + * Sets IPC endpoint configuration. + * <p> + * Endpoint is needed for communication between IGFS and {@code IgniteHadoopFileSystem} shipped with <b>Ignite + * Hadoop Accelerator</b>. * - * @param ipcEndpointCfg Map of IPC endpoint config properties. + * @param ipcEndpointCfg IPC endpoint configuration. */ - public void setIpcEndpointConfiguration(@Nullable Map<String,String> ipcEndpointCfg) { + public void setIpcEndpointConfiguration(@Nullable IgfsIpcEndpointConfiguration ipcEndpointCfg) { this.ipcEndpointCfg = ipcEndpointCfg; } /** * Get IPC endpoint enabled flag. In case it is set to {@code true} endpoint will be created and bound to specific * port. Otherwise endpoint will not be created. Default value is {@link #DFLT_IPC_ENDPOINT_ENABLED}. + * <p> + * Endpoint is needed for communication between IGFS and {@code IgniteHadoopFileSystem} shipped with <b>Ignite + * Hadoop Accelerator</b>. * * @return {@code True} in case endpoint is enabled. */ @@ -456,6 +436,9 @@ public class FileSystemConfiguration { /** * Set IPC endpoint enabled flag. See {@link #isIpcEndpointEnabled()}. + * <p> + * Endpoint is needed for communication between IGFS and {@code IgniteHadoopFileSystem} shipped with <b>Ignite + * Hadoop Accelerator</b>. * * @param ipcEndpointEnabled IPC endpoint enabled flag. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java new file mode 100644 index 0000000..6aaf739 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import static org.apache.ignite.igfs.IgfsIpcEndpointType.*; + +/** + * IGFS IPC endpoint configuration. + */ +public class IgfsIpcEndpointConfiguration { + /** Default endpoint type; TCP for Windows, SHMEM otherwise. */ + public static IgfsIpcEndpointType DFLT_TYP = U.isWindows() ? TCP : SHMEM; + + /** Default host. */ + public static String DFLT_HOST = "127.0.0.1"; + + /** Default port. */ + public static int DFLT_PORT = 10500; + + /** Default shared memory space in bytes. */ + public static final int DFLT_MEM_SIZE = 256 * 1024; + + /** + * Default token directory. Note that this path is relative to {@code IGNITE_HOME/work} folder + * if {@code IGNITE_HOME} system or environment variable specified, otherwise it is relative to + * {@code work} folder under system {@code java.io.tmpdir} folder. + * + * @see IgniteConfiguration#getWorkDirectory() + */ + public static final String DFLT_TOKEN_DIR_PATH = "ipc/shmem"; + + /** Endpoint type. */ + private IgfsIpcEndpointType typ = DFLT_TYP; + + /** Host. */ + private String host = DFLT_HOST; + + /** Port. */ + private int port = DFLT_PORT; + + /** Space size. */ + private int memSize = DFLT_MEM_SIZE; + + /** Token directory path. */ + private String tokenDirPath = DFLT_TOKEN_DIR_PATH; + + /** + * Default constructor. + */ + public IgfsIpcEndpointConfiguration() { + // No-op. + } + + /** + * Copying constructor. + * + * @param cfg Configuration to copy. + */ + public IgfsIpcEndpointConfiguration(IgfsIpcEndpointConfiguration cfg) { + typ = cfg.getType(); + host = cfg.getHost(); + port = cfg.getPort(); + memSize = cfg.getMemorySize(); + tokenDirPath = cfg.getTokenDirectoryPath(); + } + + /** + * Gets endpoint type. There are two endpoints types: {@code SHMEM} working over shared memory, and {@code TCP} + * working over sockets. + * <p> + * Shared memory is recommended approach for Linux-based systems. For Windows TCP is the only available option. + * <p> + * Defaults to {@link #DFLT_TYP}. + * + * @return Endpoint type. + */ + public IgfsIpcEndpointType getType() { + return typ; + } + + /** + * Sets endpoint type. There are two endpoints types: {@link IgfsIpcEndpointType#SHMEM} working over shared memory, + * and {@link IgfsIpcEndpointType#TCP} working over sockets. + * <p> + * Shared memory is recommended approach for Linux-based systems. For Windows TCP is the only available option. + * <p> + * Defaults to {@link #DFLT_TYP}. + * + * @param typ Endpoint type. + */ + public void setType(IgfsIpcEndpointType typ) { + this.typ = typ; + } + + /** + * Gets the host endpoint is bound to. + * <p> + * For {@link IgfsIpcEndpointType#TCP} endpoint this is the network interface server socket is bound to. + * <p> + * For {@link IgfsIpcEndpointType#SHMEM} endpoint socket connection is needed only to perform an initial handshake. + * All further communication is performed over shared memory. Therefore, for {@code SHMEM} this value is ignored + * and socket will be always bound to {@link #DFLT_HOST}. + * <p> + * Defaults to {@link #DFLT_HOST}. + * + * @return Host. + */ + public String getHost() { + return host; + } + + /** + * Sets the host endpoint is bound to. + * <p> + * For {@link IgfsIpcEndpointType#TCP} endpoint this is the network interface server socket is bound to. + * <p> + * For {@link IgfsIpcEndpointType#SHMEM} endpoint socket connection is needed only to perform an initial handshake. + * All further communication is performed over shared memory. Therefore, for {@code SHMEM} this value is ignored + * and socket will be always bound to {@link #DFLT_HOST}. + * <p> + * Defaults to {@link #DFLT_HOST}. + * + * @param host Host. + */ + public void setHost(String host) { + this.host = host; + } + + /** + * Gets the port endpoint is bound to. + * <p> + * For {@link IgfsIpcEndpointType#TCP} endpoint this is the port server socket is bound to. + * <p> + * For {@link IgfsIpcEndpointType#SHMEM} endpoint socket connection is needed only to perform an initial handshake. + * All further communication is performed over shared memory. + * <p> + * Defaults to {@link #DFLT_PORT}. + * + * @return Port. + */ + public int getPort() { + return port; + } + + /** + * Sets the port endpoint is bound to. + * <p> + * For {@link IgfsIpcEndpointType#TCP} endpoint this is the port server socket is bound to. + * <p> + * For {@link IgfsIpcEndpointType#SHMEM} endpoint socket connection is needed only to perform an initial handshake. + * All further communication is performed over shared memory. + * <p> + * Defaults to {@link #DFLT_PORT}. + * + * @param port Port. + */ + public void setPort(int port) { + this.port = port; + } + + /** + * Gets shared memory size in bytes allocated for endpoint communication. + * <p> + * Ignored for {@link IgfsIpcEndpointType#TCP} endpoint. + * <p> + * Defaults to {@link #DFLT_MEM_SIZE}. + * + * @return Shared memory size. + */ + public int getMemorySize() { + return memSize; + } + + /** + * Sets shared memory size in bytes allocated for endpoint communication. + * <p> + * Ignored for {@link IgfsIpcEndpointType#TCP} endpoint. + * <p> + * Defaults to {@link #DFLT_MEM_SIZE}. + * + * @param memSize Shared memory size. + */ + public void setMemorySize(int memSize) { + this.memSize = memSize; + } + + /** + * Gets directory where shared memory tokens are stored. + * <p> + * Note that this path is relative to {@code IGNITE_HOME/work} folder if {@code IGNITE_HOME} system or environment + * variable specified, otherwise it is relative to {@code work} folder under system {@code java.io.tmpdir} folder. + * <p> + * Ignored for {@link IgfsIpcEndpointType#TCP} endpoint. + * <p> + * Defaults to {@link #DFLT_TOKEN_DIR_PATH}. + * + * @return Directory where shared memory tokens are stored. + */ + public String getTokenDirectoryPath() { + return tokenDirPath; + } + + /** + * Sets directory where shared memory tokens are stored. + * <p> + * Note that this path is relative to {@code IGNITE_HOME/work} folder if {@code IGNITE_HOME} system or environment + * variable specified, otherwise it is relative to {@code work} folder under system {@code java.io.tmpdir} folder. + * <p> + * Ignored for {@link IgfsIpcEndpointType#TCP} endpoint. + * <p> + * Defaults to {@link #DFLT_TOKEN_DIR_PATH}. + * + * @param tokenDirPath Directory where shared memory tokens are stored. + */ + public void setTokenDirectoryPath(String tokenDirPath) { + this.tokenDirPath = tokenDirPath; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsIpcEndpointConfiguration.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointType.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointType.java new file mode 100644 index 0000000..475d36f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +/** + * IGFS endpoint type. + */ +public enum IgfsIpcEndpointType { + /** Shared memory endpoint. */ + SHMEM, + + /** TCP endpoint. */ + TCP; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java index 1146812..0cd1a62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.*; +import org.apache.ignite.igfs.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.igfs.common.*; import org.apache.ignite.internal.util.ipc.*; @@ -31,7 +32,6 @@ import org.jdk8.backport.*; import org.jetbrains.annotations.*; import java.io.*; -import java.util.*; import static org.apache.ignite.spi.IgnitePortProtocol.*; @@ -49,7 +49,7 @@ public class IgfsServer { private final IgfsMarshaller marsh; /** Endpoint configuration. */ - private final Map<String,String> endpointCfg; + private final IgfsIpcEndpointConfiguration endpointCfg; /** Server endpoint. */ private IpcServerEndpoint srvEndpoint; @@ -72,7 +72,7 @@ public class IgfsServer { * @param endpointCfg Endpoint configuration to start. * @param mgmt Management flag - if true, server is intended to be started for Visor. */ - public IgfsServer(IgfsContext igfsCtx, Map<String, String> endpointCfg, boolean mgmt) { + public IgfsServer(IgfsContext igfsCtx, IgfsIpcEndpointConfiguration endpointCfg, boolean mgmt) { assert igfsCtx != null; assert endpointCfg != null; @@ -91,7 +91,7 @@ public class IgfsServer { * @throws IgniteCheckedException If failed. */ public void start() throws IgniteCheckedException { - srvEndpoint = IpcServerEndpointDeserializer.deserialize(endpointCfg); + srvEndpoint = createEndpoint(endpointCfg, mgmt); if (U.isWindows() && srvEndpoint instanceof IpcSharedMemoryServerEndpoint) throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.class.getSimpleName() + @@ -135,6 +135,47 @@ public class IgfsServer { } /** + * Create server IPC endpoint. + * + * @param endpointCfg Endpoint configuration. + * @param mgmt Management flag. + * @return Server endpoint. + * @throws IgniteCheckedException If failed. + */ + private static IpcServerEndpoint createEndpoint(IgfsIpcEndpointConfiguration endpointCfg, boolean mgmt) + throws IgniteCheckedException { + A.notNull(endpointCfg, "endpointCfg"); + + IgfsIpcEndpointType typ = endpointCfg.getType(); + + if (typ == null) + throw new IgniteCheckedException("Failed to create server endpoint (type is not specified)"); + + switch (typ) { + case SHMEM: { + IpcSharedMemoryServerEndpoint endpoint = new IpcSharedMemoryServerEndpoint(); + + endpoint.setPort(endpointCfg.getPort()); + endpoint.setSize(endpointCfg.getMemorySize()); + endpoint.setTokenDirectoryPath(endpointCfg.getTokenDirectoryPath()); + + return endpoint; + } + case TCP: { + IpcServerTcpEndpoint endpoint = new IpcServerTcpEndpoint(); + + endpoint.setHost(endpointCfg.getHost()); + endpoint.setPort(endpointCfg.getPort()); + endpoint.setManagement(mgmt); + + return endpoint; + } + default: + throw new IgniteCheckedException("Failed to create server endpoint (type is unknown): " + typ); + } + } + + /** * Callback that is invoked when kernal is ready. */ public void onKernalStart() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java index 643eeff..2cd51f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.igfs.*; import org.apache.ignite.internal.util.ipc.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -30,6 +31,7 @@ import java.util.*; import java.util.concurrent.*; import static org.apache.ignite.configuration.FileSystemConfiguration.*; +import static org.apache.ignite.igfs.IgfsIpcEndpointType.*; /** * IGFS server manager. @@ -50,26 +52,23 @@ public class IgfsServerManager extends IgfsManager { /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { FileSystemConfiguration igfsCfg = igfsCtx.configuration(); - Map<String,String> cfg = igfsCfg.getIpcEndpointConfiguration(); - if (F.isEmpty(cfg)) { - // Set default configuration. - cfg = new HashMap<>(); + if (igfsCfg.isIpcEndpointEnabled()) { + IgfsIpcEndpointConfiguration ipcCfg = igfsCfg.getIpcEndpointConfiguration(); - cfg.put("type", U.isWindows() ? "tcp" : "shmem"); - cfg.put("port", String.valueOf(DFLT_IPC_PORT)); - } + if (ipcCfg == null) + ipcCfg = new IgfsIpcEndpointConfiguration(); - if (igfsCfg.isIpcEndpointEnabled()) - bind(cfg, /*management*/false); + bind(ipcCfg, /*management*/false); + } if (igfsCfg.getManagementPort() >= 0) { - cfg = new HashMap<>(); + IgfsIpcEndpointConfiguration mgmtIpcCfg = new IgfsIpcEndpointConfiguration(); - cfg.put("type", "tcp"); - cfg.put("port", String.valueOf(igfsCfg.getManagementPort())); + mgmtIpcCfg.setType(TCP); + mgmtIpcCfg.setPort(igfsCfg.getManagementPort()); - bind(cfg, /*management*/true); + bind(mgmtIpcCfg, /*management*/true); } if (bindWorker != null) @@ -84,7 +83,7 @@ public class IgfsServerManager extends IgfsManager { * @param mgmt {@code True} if endpoint is management. * @throws IgniteCheckedException If failed. */ - private void bind(final Map<String,String> endpointCfg, final boolean mgmt) throws IgniteCheckedException { + private void bind(final IgfsIpcEndpointConfiguration endpointCfg, final boolean mgmt) throws IgniteCheckedException { if (srvrs == null) srvrs = new ConcurrentLinkedQueue<>(); @@ -155,7 +154,7 @@ public class IgfsServerManager extends IgfsManager { @SuppressWarnings("BusyWait") private class BindWorker extends GridWorker { /** Configurations to bind. */ - private Collection<IgniteBiTuple<Map<String, String>, Boolean>> bindCfgs = new LinkedList<>(); + private Collection<IgniteBiTuple<IgfsIpcEndpointConfiguration, Boolean>> bindCfgs = new LinkedList<>(); /** * Constructor. @@ -170,7 +169,7 @@ public class IgfsServerManager extends IgfsManager { * @param cfg Configuration. * @param mgmt Management flag. */ - public void addConfiguration(Map<String, String> cfg, boolean mgmt) { + public void addConfiguration(IgfsIpcEndpointConfiguration cfg, boolean mgmt) { bindCfgs.add(F.t(cfg, mgmt)); } @@ -181,10 +180,10 @@ public class IgfsServerManager extends IgfsManager { while (!isCancelled()) { Thread.sleep(REBIND_INTERVAL); - Iterator<IgniteBiTuple<Map<String, String>, Boolean>> it = bindCfgs.iterator(); + Iterator<IgniteBiTuple<IgfsIpcEndpointConfiguration, Boolean>> it = bindCfgs.iterator(); while (it.hasNext()) { - IgniteBiTuple<Map<String, String>, Boolean> cfg = it.next(); + IgniteBiTuple<IgfsIpcEndpointConfiguration, Boolean> cfg = it.next(); IgfsServer ipcSrv = new IgfsServer(igfsCtx, cfg.get1(), cfg.get2()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializer.java deleted file mode 100644 index 07bc28b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializer.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.ipc.loopback.*; -import org.apache.ignite.internal.util.ipc.shmem.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; - -/** - * Grid IpcServerEndpoint configuration deserializer. - */ -public class IpcServerEndpointDeserializer { - /** - * Deserializes IPC server endpoint config into concrete - * instance of {@link IpcServerEndpoint}. - * - * @param endpointCfg Map with properties of the IPC server endpoint config. - * @return Deserialized instance of {@link IpcServerEndpoint}. - * @throws IgniteCheckedException If any problem with configuration properties setting has happened. - */ - public static IpcServerEndpoint deserialize(Map<String,String> endpointCfg) throws IgniteCheckedException { - A.notNull(endpointCfg, "endpointCfg"); - - String endpointType = endpointCfg.get("type"); - - if (endpointType == null) - throw new IgniteCheckedException("Failed to create server endpoint (type is not specified)"); - - switch (endpointType) { - case "shmem": { - IpcSharedMemoryServerEndpoint endpoint = new IpcSharedMemoryServerEndpoint(); - - endpoint.setupConfiguration(endpointCfg); - - return endpoint; - } - case "tcp": { - IpcServerTcpEndpoint endpoint = new IpcServerTcpEndpoint(); - - endpoint.setupConfiguration(endpointCfg); - - return endpoint; - } - default: - throw new IgniteCheckedException("Failed to create server endpoint (type is unknown): " + endpointType); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java index d589138..a1e3c52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java @@ -154,7 +154,8 @@ public class VisorIgfsConfiguration implements Serializable { cfg.fragmentizerThrottlingBlockLen = igfs.getFragmentizerThrottlingBlockLength(); cfg.fragmentizerThrottlingDelay = igfs.getFragmentizerThrottlingDelay(); - Map<String, String> endpointCfg = igfs.getIpcEndpointConfiguration(); + IgfsIpcEndpointConfiguration endpointCfg = igfs.getIpcEndpointConfiguration(); + cfg.ipcEndpointCfg = endpointCfg != null ? endpointCfg.toString() : null; cfg.ipcEndpointEnabled = igfs.isIpcEndpointEnabled(); @@ -332,7 +333,7 @@ public class VisorIgfsConfiguration implements Serializable { } /** - * @return IPC endpoint config (in JSON format) to publish IGFS over. + * @return IPC endpoint config to publish IGFS over. */ @Nullable public String ipcEndpointConfiguration() { return ipcEndpointCfg; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/test/config/igfs-loopback.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/igfs-loopback.xml b/modules/core/src/test/config/igfs-loopback.xml index 46377d9..4092c42 100644 --- a/modules/core/src/test/config/igfs-loopback.xml +++ b/modules/core/src/test/config/igfs-loopback.xml @@ -95,9 +95,9 @@ <!-- Loopback endpoint. --> <property name="ipcEndpointConfiguration"> - <map> - <entry key="type" value="tcp"/> - </map> + <bean class="org.apache.ignite.igfs.IgfsIpcEndpointConfiguration"> + <property name="type" value="TCP" /> + </bean> </property> </bean> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/test/config/igfs-shmem.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/igfs-shmem.xml b/modules/core/src/test/config/igfs-shmem.xml index c08f78d..a64d695 100644 --- a/modules/core/src/test/config/igfs-shmem.xml +++ b/modules/core/src/test/config/igfs-shmem.xml @@ -95,10 +95,10 @@ <!-- Shared memory endpoint. --> <property name="ipcEndpointConfiguration"> - <map> - <entry key="type" value="shmem"/> - <entry key="port" value="10500"/> - </map> + <bean class="org.apache.ignite.igfs.IgfsIpcEndpointConfiguration"> + <property name="type" value="SHMEM" /> + <property name="port" value="10500" /> + </bean> </property> </bean> </list> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java index 9dde005..016807e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java @@ -68,16 +68,10 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { protected static final int SEQ_READS_BEFORE_PREFETCH = 2; /** Primary file system REST endpoint configuration map. */ - protected static final Map<String, String> PRIMARY_REST_CFG = new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", "10500"); - }}; + protected static final IgfsIpcEndpointConfiguration PRIMARY_REST_CFG; /** Secondary file system REST endpoint configuration map. */ - protected static final Map<String, String> SECONDARY_REST_CFG = new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", "11500"); - }}; + protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG; /** Directory. */ protected static final IgfsPath DIR = new IgfsPath("/dir"); @@ -127,6 +121,18 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { /** Memory mode. */ protected final CacheMemoryMode memoryMode; + static { + PRIMARY_REST_CFG = new IgfsIpcEndpointConfiguration(); + + PRIMARY_REST_CFG.setType(IgfsIpcEndpointType.TCP); + PRIMARY_REST_CFG.setPort(10500); + + SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration(); + + SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP); + SECONDARY_REST_CFG.setPort(11500); + } + /** * Constructor. * @@ -183,7 +189,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { * @throws Exception If failed. */ protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode, - @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable Map<String, String> restCfg) throws Exception { + @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception { FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); igfsCfg.setDataCacheName("dataCache"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCachePerBlockLruEvictionPolicySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCachePerBlockLruEvictionPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCachePerBlockLruEvictionPolicySelfTest.java index fd590f5..f9c83d5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCachePerBlockLruEvictionPolicySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsCachePerBlockLruEvictionPolicySelfTest.java @@ -50,10 +50,7 @@ public class IgfsCachePerBlockLruEvictionPolicySelfTest extends IgfsCommonAbstra private static final String IGFS_SECONDARY = "igfs-secondary"; /** Secondary file system REST endpoint configuration map. */ - private static final Map<String, String> SECONDARY_REST_CFG = new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", "11500"); - }}; + private static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG; /** File working in PRIMARY mode. */ public static final IgfsPath FILE = new IgfsPath("/file"); @@ -73,6 +70,13 @@ public class IgfsCachePerBlockLruEvictionPolicySelfTest extends IgfsCommonAbstra /** Eviction policy */ private static CacheIgfsPerBlockLruEvictionPolicy evictPlc; + static { + SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration(); + + SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP); + SECONDARY_REST_CFG.setPort(11500); + } + /** * Start a grid with the primary file system. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java index 0af1dea..65098e5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java @@ -44,10 +44,7 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest { private static final String IGFS_SECONDARY = "igfs-secondary"; /** Secondary file system REST endpoint configuration map. */ - private static final Map<String, String> SECONDARY_REST_CFG = new HashMap<String, String>(){{ - put("type", "tcp"); - put("port", "11500"); - }}; + private static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG; /** Test nodes count. */ private static final int NODES_CNT = 3; @@ -67,6 +64,13 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest { /** Secondary file system block size. */ public static final int SECONDARY_BLOCK_SIZE = 512; + static { + SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration(); + + SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP); + SECONDARY_REST_CFG.setPort(11500); + } + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { startSecondary(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsModesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsModesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsModesSelfTest.java index 4a58285..c084046 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsModesSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsModesSelfTest.java @@ -151,10 +151,11 @@ public class IgfsModesSelfTest extends IgfsCommonAbstractTest { igfsCfg.setName("igfs-secondary"); igfsCfg.setBlockSize(512 * 1024); igfsCfg.setDefaultMode(PRIMARY); - igfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", "11500"); - }}); + + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setPort(11500); CacheConfiguration cacheCfg = defaultCacheConfiguration(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationAbstractSelfTest.java index 694d5c3..431baba 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationAbstractSelfTest.java @@ -59,7 +59,7 @@ public abstract class IgfsServerManagerIpcEndpointRegistrationAbstractSelfTest e IgniteConfiguration cfg = gridConfiguration(); cfg.setFileSystemConfiguration( - igfsConfiguration("tcp", DFLT_IPC_PORT, null) + igfsConfiguration(IgfsIpcEndpointType.TCP, IgfsIpcEndpointConfiguration.DFLT_PORT, null) ); G.start(cfg); @@ -78,8 +78,9 @@ public abstract class IgfsServerManagerIpcEndpointRegistrationAbstractSelfTest e IgniteConfiguration cfg = gridConfiguration(); cfg.setFileSystemConfiguration( - igfsConfiguration("tcp", DFLT_IPC_PORT, "127.0.0.1"), - igfsConfiguration("tcp", DFLT_IPC_PORT + 1, U.getLocalHost().getHostName())); + igfsConfiguration(IgfsIpcEndpointType.TCP, IgfsIpcEndpointConfiguration.DFLT_PORT, "127.0.0.1"), + igfsConfiguration(IgfsIpcEndpointType.TCP, IgfsIpcEndpointConfiguration.DFLT_PORT + 1, + U.getLocalHost().getHostName())); G.start(cfg); @@ -154,20 +155,20 @@ public abstract class IgfsServerManagerIpcEndpointRegistrationAbstractSelfTest e * @param endPntHost End point host. * @return test-purposed IgfsConfiguration. */ - protected FileSystemConfiguration igfsConfiguration(@Nullable String endPntType, @Nullable Integer endPntPort, - @Nullable String endPntHost) throws IgniteCheckedException { - HashMap<String, String> endPntCfg = null; + protected FileSystemConfiguration igfsConfiguration(@Nullable IgfsIpcEndpointType endPntType, + @Nullable Integer endPntPort, @Nullable String endPntHost) throws IgniteCheckedException { + IgfsIpcEndpointConfiguration endPntCfg = null; if (endPntType != null) { - endPntCfg = new HashMap<>(); + endPntCfg = new IgfsIpcEndpointConfiguration(); - endPntCfg.put("type", endPntType); + endPntCfg.setType(endPntType); if (endPntPort != null) - endPntCfg.put("port", String.valueOf(endPntPort)); + endPntCfg.setPort(endPntPort); if (endPntHost != null) - endPntCfg.put("host", endPntHost); + endPntCfg.setHost(endPntHost); } FileSystemConfiguration igfsConfiguration = new FileSystemConfiguration(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnLinuxAndMacSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnLinuxAndMacSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnLinuxAndMacSelfTest.java index 57f10d9..a0f24fb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnLinuxAndMacSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnLinuxAndMacSelfTest.java @@ -18,10 +18,9 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.configuration.*; +import org.apache.ignite.igfs.*; import org.apache.ignite.internal.util.typedef.*; -import static org.apache.ignite.configuration.FileSystemConfiguration.*; - /** * Tests for {@link IgfsServer} that checks all IPC endpoint registration types * permitted for Linux and Mac OS. @@ -36,8 +35,8 @@ public class IgfsServerManagerIpcEndpointRegistrationOnLinuxAndMacSelfTest cfg.setFileSystemConfiguration( igfsConfiguration(null, null, null), // Check null IPC endpoint config won't bring any hassles. - igfsConfiguration("tcp", DFLT_IPC_PORT + 1, null), - igfsConfiguration("shmem", DFLT_IPC_PORT + 2, null)); + igfsConfiguration(IgfsIpcEndpointType.TCP, IgfsIpcEndpointConfiguration.DFLT_PORT + 1, null), + igfsConfiguration(IgfsIpcEndpointType.SHMEM, IgfsIpcEndpointConfiguration.DFLT_PORT + 2, null)); G.start(cfg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest.java index 4f18aff..c9c03ce 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.igfs.*; import org.apache.ignite.internal.util.ipc.loopback.*; import org.apache.ignite.internal.util.ipc.shmem.*; import org.apache.ignite.internal.util.typedef.*; @@ -40,8 +41,8 @@ public class IgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest @Override public Object call() throws Exception { IgniteConfiguration cfg = gridConfiguration(); - cfg.setFileSystemConfiguration(igfsConfiguration("shmem", IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT, - null)); + cfg.setFileSystemConfiguration(igfsConfiguration(IgfsIpcEndpointType.SHMEM, + IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT, null)); return G.start(cfg); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializerSelfTest.java deleted file mode 100644 index f5aa591..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializerSelfTest.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.ipc; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.util.ipc.loopback.*; -import org.apache.ignite.internal.util.ipc.shmem.*; -import org.apache.ignite.testframework.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * Tests for {@code IpcServerEndpointDeserializer}. - */ -public class IpcServerEndpointDeserializerSelfTest extends IgfsCommonAbstractTest { - /** */ - private Map<String,String> shmemSrvEndpoint; - - /** */ - private Map<String,String> tcpSrvEndpoint; - - /** - * Initialize test stuff. - */ - @Override protected void beforeTest() throws Exception { - shmemSrvEndpoint = new HashMap<>(); - shmemSrvEndpoint.put("port", "888"); - shmemSrvEndpoint.put("size", "111"); - shmemSrvEndpoint.put("tokenDirectoryPath", "test-my-path-baby"); - - tcpSrvEndpoint = new HashMap<>(); - tcpSrvEndpoint.put("port", "999"); - } - - /** - * @throws Exception In case of any exception. - */ - public void testDeserializeIfCfgIsNull() throws Exception { - GridTestUtils.assertThrows(log, new Callable<Object>() { - @SuppressWarnings("NullableProblems") - @Override public Object call() throws Exception { - return IpcServerEndpointDeserializer.deserialize(null); - } - }, NullPointerException.class, "Ouch! Argument cannot be null: endpointCfg"); - } - - /** - * @throws Exception In case of any exception. - */ - public void testDeserializeIfShmemAndNoTypeInfoInJson() throws Exception { - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - return IpcServerEndpointDeserializer.deserialize(shmemSrvEndpoint); - } - }, IgniteCheckedException.class, "Failed to create server endpoint (type is not specified)"); - } - - /** - * @throws Exception In case of any exception. - */ - public void testDeserializeIfShmemAndNoUnknownTypeInfoInJson() throws Exception { - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - Map<String, String> endPnt = new HashMap<>(); - - endPnt.putAll(shmemSrvEndpoint); - endPnt.put("type", "unknownEndpointType"); - - return IpcServerEndpointDeserializer.deserialize(endPnt); - } - }, IgniteCheckedException.class, "Failed to create server endpoint (type is unknown): unknownEndpointType"); - } - - /** - * @throws Exception In case of any exception. - */ - public void testDeserializeIfLoopbackAndJsonIsLightlyBroken() throws Exception { - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - return IpcServerEndpointDeserializer.deserialize(tcpSrvEndpoint); - } - }, IgniteCheckedException.class, null); - } - - /** - * @throws Exception In case of any exception. - */ - public void testDeserializeIfShmemAndJsonIsOk() throws Exception { - Map<String, String> endPnt = new HashMap<>(); - - endPnt.putAll(shmemSrvEndpoint); - endPnt.put("type", "shmem"); - - IpcServerEndpoint deserialized = IpcServerEndpointDeserializer.deserialize(endPnt); - - assertTrue(deserialized instanceof IpcSharedMemoryServerEndpoint); - - IpcSharedMemoryServerEndpoint deserializedShmemEndpoint = (IpcSharedMemoryServerEndpoint)deserialized; - - assertEquals(shmemSrvEndpoint.get("port"), String.valueOf(deserializedShmemEndpoint.getPort())); - assertEquals(shmemSrvEndpoint.get("size"), String.valueOf(deserializedShmemEndpoint.getSize())); - assertEquals(shmemSrvEndpoint.get("tokenDirectoryPath"), deserializedShmemEndpoint.getTokenDirectoryPath()); - } - - /** - * @throws Exception In case of any exception. - */ - public void testDeserializeIfShmemAndJsonIsOkAndDefaultValuesAreSetToFields() throws Exception { - IpcSharedMemoryServerEndpoint defShmemSrvEndpoint = new IpcSharedMemoryServerEndpoint(); - defShmemSrvEndpoint.setPort(8); - - Map<String, String> endPnt = new HashMap<>(); - - endPnt.put("type", "shmem"); - endPnt.put("port", String.valueOf(defShmemSrvEndpoint.getPort())); - - IpcServerEndpoint deserialized = IpcServerEndpointDeserializer.deserialize(endPnt); - - assertTrue(deserialized instanceof IpcSharedMemoryServerEndpoint); - - IpcSharedMemoryServerEndpoint deserializedShmemEndpoint = (IpcSharedMemoryServerEndpoint)deserialized; - - assertEquals(defShmemSrvEndpoint.getPort(), deserializedShmemEndpoint.getPort()); - assertEquals(defShmemSrvEndpoint.getSize(), deserializedShmemEndpoint.getSize()); - assertEquals(defShmemSrvEndpoint.getTokenDirectoryPath(), deserializedShmemEndpoint.getTokenDirectoryPath()); - } - - /** - * @throws Exception In case of any exception. - */ - public void testDeserializeIfLoopbackAndJsonIsOk() throws Exception { - Map<String, String> endPnt = new HashMap<>(); - - endPnt.putAll(tcpSrvEndpoint); - endPnt.put("type", "tcp"); - - IpcServerEndpoint deserialized = IpcServerEndpointDeserializer.deserialize(endPnt); - - assertTrue(deserialized instanceof IpcServerTcpEndpoint); - - assertEquals(tcpSrvEndpoint.get("port"), String.valueOf(deserialized.getPort())); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java index 8051a3e..6bbee42 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java @@ -19,12 +19,11 @@ package org.apache.ignite.internal.util.ipc.shmem; import org.apache.ignite.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.igfs.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import java.util.*; - import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; @@ -49,10 +48,10 @@ public class IpcSharedMemoryNodeStartup { cfg.setDiscoverySpi(discoSpi); - Map<String, String> endpointCfg = new HashMap<>(); + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); - endpointCfg.put("type", "shmem"); - endpointCfg.put("port", "10500"); + endpointCfg.setType(IgfsIpcEndpointType.SHMEM); + endpointCfg.setPort(10500); igfsCfg.setIpcEndpointConfiguration(endpointCfg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java index e67e661..c0893cc 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java @@ -21,7 +21,6 @@ import junit.framework.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.processors.igfs.split.*; -import org.apache.ignite.internal.util.ipc.*; import org.apache.ignite.internal.util.typedef.internal.*; /** @@ -52,7 +51,6 @@ public class IgniteIgfsTestSuite extends TestSuite { suite.addTest(new TestSuite(IgfsStreamsSelfTest.class)); suite.addTest(new TestSuite(IgfsModesSelfTest.class)); - suite.addTest(new TestSuite(IpcServerEndpointDeserializerSelfTest.class)); suite.addTest(new TestSuite(IgfsMetricsSelfTest.class)); suite.addTest(new TestSuite(IgfsPrimarySelfTest.class)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java index 7502f57..26d1940 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.hadoop.igfs; import org.apache.ignite.*; +import org.apache.ignite.igfs.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -26,8 +27,6 @@ import org.jetbrains.annotations.*; import java.io.*; import java.net.*; -import static org.apache.ignite.configuration.FileSystemConfiguration.*; - /** * IGFS endpoint abstraction. */ @@ -148,7 +147,7 @@ public class HadoopIgfsEndpoint { int port; if (tokens.length == 1) - port = DFLT_IPC_PORT; + port = IgfsIpcEndpointConfiguration.DFLT_PORT; else if (tokens.length == 2) { String portStr = tokens[1]; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java index d907a6c..361d2ff 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java @@ -107,7 +107,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA * @param gridName Grid name. * @return IPC primary endpoint configuration. */ - protected abstract Map<String, String> primaryIpcEndpointConfiguration(String gridName); + protected abstract IgfsIpcEndpointConfiguration primaryIpcEndpointConfiguration(String gridName); /** * Gets secondary file system URI path. @@ -128,7 +128,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA * * @return Secondary IPC endpoint configuration. */ - protected abstract Map<String, String> secondaryIpcEndpointConfiguration(); + protected abstract IgfsIpcEndpointConfiguration secondaryIpcEndpointConfiguration(); /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemLoopbackPrimarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemLoopbackPrimarySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemLoopbackPrimarySelfTest.java index 2be65fd..8a5b12f 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemLoopbackPrimarySelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemLoopbackPrimarySelfTest.java @@ -44,11 +44,13 @@ public class HadoopIgfs20FileSystemLoopbackPrimarySelfTest extends HadoopIgfs20F } /** {@inheritDoc} */ - @Override protected Map<String, String> primaryIpcEndpointConfiguration(final String gridName) { - return new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", String.valueOf(DFLT_IPC_PORT + getTestGridIndex(gridName))); - }}; + @Override protected IgfsIpcEndpointConfiguration primaryIpcEndpointConfiguration(final String gridName) { + IgfsIpcEndpointConfiguration cfg = new IgfsIpcEndpointConfiguration(); + + cfg.setType(IgfsIpcEndpointType.TCP); + cfg.setPort(DFLT_IPC_PORT + getTestGridIndex(gridName)); + + return cfg; } /** {@inheritDoc} */ @@ -66,7 +68,7 @@ public class HadoopIgfs20FileSystemLoopbackPrimarySelfTest extends HadoopIgfs20F } /** {@inheritDoc} */ - @Override protected Map<String, String> secondaryIpcEndpointConfiguration() { + @Override protected IgfsIpcEndpointConfiguration secondaryIpcEndpointConfiguration() { assert false; return null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemShmemPrimarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemShmemPrimarySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemShmemPrimarySelfTest.java index 93f2d4a..d38c23d 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemShmemPrimarySelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemShmemPrimarySelfTest.java @@ -44,11 +44,13 @@ public class HadoopIgfs20FileSystemShmemPrimarySelfTest extends HadoopIgfs20File } /** {@inheritDoc} */ - @Override protected Map<String, String> primaryIpcEndpointConfiguration(final String gridName) { - return new HashMap<String, String>() {{ - put("type", "shmem"); - put("port", String.valueOf(DFLT_IPC_PORT + getTestGridIndex(gridName))); - }}; + @Override protected IgfsIpcEndpointConfiguration primaryIpcEndpointConfiguration(final String gridName) { + IgfsIpcEndpointConfiguration cfg = new IgfsIpcEndpointConfiguration(); + + cfg.setType(IgfsIpcEndpointType.SHMEM); + cfg.setPort(DFLT_IPC_PORT + getTestGridIndex(gridName)); + + return cfg; } /** {@inheritDoc} */ @@ -66,7 +68,7 @@ public class HadoopIgfs20FileSystemShmemPrimarySelfTest extends HadoopIgfs20File } /** {@inheritDoc} */ - @Override protected Map<String, String> secondaryIpcEndpointConfiguration() { + @Override protected IgfsIpcEndpointConfiguration secondaryIpcEndpointConfiguration() { assert false; return null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java index e89d015..b3912e5 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java @@ -35,7 +35,6 @@ import org.jetbrains.annotations.*; import java.io.*; import java.net.*; -import java.util.*; import java.util.concurrent.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; @@ -70,16 +69,10 @@ public abstract class HadoopIgfsDualAbstractSelfTest extends IgfsCommonAbstractT protected static final String PRIMARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback.xml"; /** Primary file system REST endpoint configuration map. */ - protected static final Map<String, String> PRIMARY_REST_CFG = new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", "10500"); - }}; + protected static final IgfsIpcEndpointConfiguration PRIMARY_REST_CFG; /** Secondary file system REST endpoint configuration map. */ - protected static final Map<String, String> SECONDARY_REST_CFG = new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", "11500"); - }}; + protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG; /** Directory. */ protected static final IgfsPath DIR = new IgfsPath("/dir"); @@ -102,6 +95,18 @@ public abstract class HadoopIgfsDualAbstractSelfTest extends IgfsCommonAbstractT /** IGFS mode. */ protected final IgfsMode mode; + static { + PRIMARY_REST_CFG = new IgfsIpcEndpointConfiguration(); + + PRIMARY_REST_CFG.setType(IgfsIpcEndpointType.TCP); + PRIMARY_REST_CFG.setPort(10500); + + SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration(); + + SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP); + SECONDARY_REST_CFG.setPort(11500); + } + /** * Constructor. * @@ -124,7 +129,7 @@ public abstract class HadoopIgfsDualAbstractSelfTest extends IgfsCommonAbstractT * @throws Exception If failed. */ protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode, - @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable Map<String, String> restCfg) throws Exception { + @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception { FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); igfsCfg.setDataCacheName("dataCache"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java index 11279ca..dbe2449 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java @@ -40,7 +40,6 @@ import org.apache.ignite.testframework.*; import java.io.*; import java.net.*; -import java.util.*; import java.util.concurrent.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; @@ -69,10 +68,7 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra private static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml"; /** Secondary endpoint configuration. */ - protected static final Map<String, String> SECONDARY_ENDPOINT_CFG = new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", "11500"); - }}; + protected static final IgfsIpcEndpointConfiguration SECONDARY_ENDPOINT_CFG; /** Group size. */ public static final int GRP_SIZE = 128; @@ -125,6 +121,13 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra /** Skip local shmem flag. */ private final boolean skipLocShmem; + static { + SECONDARY_ENDPOINT_CFG = new IgfsIpcEndpointConfiguration(); + + SECONDARY_ENDPOINT_CFG.setType(IgfsIpcEndpointType.TCP); + SECONDARY_ENDPOINT_CFG.setPort(11500); + } + /** * Constructor. * @@ -291,11 +294,13 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra * @param gridName Grid name. * @return IPC primary endpoint configuration. */ - protected Map<String, String> primaryIpcEndpointConfiguration(final String gridName) { - return new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", String.valueOf(DFLT_IPC_PORT + getTestGridIndex(gridName))); - }}; + protected IgfsIpcEndpointConfiguration primaryIpcEndpointConfiguration(final String gridName) { + IgfsIpcEndpointConfiguration cfg = new IgfsIpcEndpointConfiguration(); + + cfg.setType(IgfsIpcEndpointType.TCP); + cfg.setPort(DFLT_IPC_PORT + getTestGridIndex(gridName)); + + return cfg; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java index f6f5bae..c082fe0 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java @@ -26,8 +26,6 @@ import org.apache.ignite.internal.util.ipc.shmem.*; import org.apache.ignite.internal.util.typedef.*; import org.jetbrains.annotations.*; -import java.util.*; - import static org.apache.ignite.igfs.IgfsMode.*; /** @@ -79,10 +77,12 @@ public class IgfsEventsTestSuite extends TestSuite { @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); - igfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ - put("type", "shmem"); - put("port", String.valueOf(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1)); - }}); + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.SHMEM); + endpointCfg.setPort(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); return igfsCfg; } @@ -96,10 +96,12 @@ public class IgfsEventsTestSuite extends TestSuite { @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); - igfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", String.valueOf(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1)); - }}); + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setPort(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); return igfsCfg; } @@ -131,10 +133,13 @@ public class IgfsEventsTestSuite extends TestSuite { igfsCfg.setName("igfs-secondary"); igfsCfg.setDefaultMode(PRIMARY); - igfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>(){{ - put("type", "tcp"); - put("port", "11500"); - }}); + + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setPort(11500); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); return igfsCfg; } @@ -228,10 +233,13 @@ public class IgfsEventsTestSuite extends TestSuite { igfsCfg.setName("igfs-secondary"); igfsCfg.setDefaultMode(PRIMARY); - igfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", "11500"); - }}); + + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setPort(11500); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); return igfsCfg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java index d128731..eece455 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java @@ -81,10 +81,12 @@ public class IgfsNearOnlyMultiNodeSelfTest extends GridCommonAbstractTest { igfsCfg.setMetaCacheName("partitioned"); igfsCfg.setName("igfs"); - igfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ - put("type", "shmem"); - put("port", String.valueOf(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + cnt)); - }}); + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.SHMEM); + endpointCfg.setPort(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + cnt); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); igfsCfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java index be25c61..4c59b05 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java @@ -25,7 +25,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.hadoop.fs.*; -import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; +import org.apache.ignite.hadoop.fs.v1.*; import org.apache.ignite.internal.processors.hadoop.igfs.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.*; @@ -75,10 +75,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA private static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml"; /** Secondary endpoint configuration. */ - protected static final Map<String, String> SECONDARY_ENDPOINT_CFG = new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", "11500"); - }}; + protected static final IgfsIpcEndpointConfiguration SECONDARY_ENDPOINT_CFG; /** Group size. */ public static final int GRP_SIZE = 128; @@ -116,6 +113,13 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA /** Primary file system configuration. */ protected Configuration primaryFsCfg; + static { + SECONDARY_ENDPOINT_CFG = new IgfsIpcEndpointConfiguration(); + + SECONDARY_ENDPOINT_CFG.setType(IgfsIpcEndpointType.TCP); + SECONDARY_ENDPOINT_CFG.setPort(11500); + } + /** File statuses comparator. */ private static final Comparator<FileStatus> STATUS_COMPARATOR = new Comparator<FileStatus>() { @SuppressWarnings("deprecation") @@ -260,7 +264,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA * @param gridName Grid name. * @return IPC primary endpoint configuration. */ - protected abstract Map<String, String> primaryIpcEndpointConfiguration(String gridName); + protected abstract IgfsIpcEndpointConfiguration primaryIpcEndpointConfiguration(String gridName); /** {@inheritDoc} */ @Override public String getTestGridName() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java index 29dd996..15a3706 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java @@ -72,10 +72,13 @@ public class IgniteHadoopFileSystemClientSelfTest extends IgfsCommonAbstractTest igfsCfg.setMetaCacheName("replicated"); igfsCfg.setName("igfs"); igfsCfg.setBlockSize(512 * 1024); - igfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", String.valueOf(DFLT_IPC_PORT)); - }}); + + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setPort(DFLT_IPC_PORT); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); cfg.setCacheConfiguration(cacheConfiguration()); cfg.setFileSystemConfiguration(igfsCfg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java index 3b4c5c2..6502c93 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.*; import org.apache.ignite.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem; +import org.apache.ignite.hadoop.fs.v2.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.spi.communication.tcp.*; @@ -33,7 +33,6 @@ import org.apache.ignite.testframework.*; import java.io.*; import java.net.*; -import java.util.*; import java.util.concurrent.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; @@ -234,10 +233,13 @@ public class IgniteHadoopFileSystemHandshakeSelfTest extends IgfsCommonAbstractT igfsCfg.setName(dfltIgfsName ? null : IGFS_NAME); igfsCfg.setPrefetchBlocks(1); igfsCfg.setDefaultMode(PRIMARY); - igfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", String.valueOf(DFLT_IPC_PORT)); - }}); + + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setPort(DFLT_IPC_PORT); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); igfsCfg.setManagementPort(-1); igfsCfg.setBlockSize(512 * 1024); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java index 135a488..aa1b083 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java @@ -72,10 +72,12 @@ public class IgniteHadoopFileSystemIpcCacheSelfTest extends IgfsCommonAbstractTe igfsCfg.setName("igfs"); igfsCfg.setManagementPort(FileSystemConfiguration.DFLT_MGMT_PORT + cnt); - igfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ - put("type", "shmem"); - put("port", String.valueOf(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + cnt)); - }}); + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.SHMEM); + endpointCfg.setPort(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + cnt); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); igfsCfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/030c373f/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerStateSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerStateSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerStateSelfTest.java index 1f6a204..c671a40 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerStateSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerStateSelfTest.java @@ -33,7 +33,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import java.lang.reflect.*; import java.net.*; import java.nio.file.*; -import java.util.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; import static org.apache.ignite.cache.CacheMode.*; @@ -82,10 +81,13 @@ public class IgniteHadoopFileSystemLoggerStateSelfTest extends IgfsCommonAbstrac igfsCfg.setName("igfs"); igfsCfg.setBlockSize(512 * 1024); igfsCfg.setDefaultMode(PRIMARY); - igfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", "10500"); - }}); + + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setPort(10500); + + igfsCfg.setIpcEndpointConfiguration(endpointCfg); CacheConfiguration cacheCfg = defaultCacheConfiguration();