http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientDefaultCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientDefaultCacheSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientDefaultCacheSelfTest.java new file mode 100644 index 0000000..68dd143 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientDefaultCacheSelfTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.client.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.internal.client.GridClientProtocol.*; +import static org.apache.ignite.IgniteSystemProperties.*; + +/** + * Tests that client is able to connect to a grid with only default cache enabled. + */ +public class ClientDefaultCacheSelfTest extends GridCommonAbstractTest { + /** Path to jetty config configured with SSL. */ + private static final String REST_JETTY_CFG = "modules/clients/src/test/resources/jetty/rest-jetty.xml"; + + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Host. */ + private static final String HOST = "127.0.0.1"; + + /** Port. */ + private static final int TCP_PORT = 11211; + + /** Cached local node id. */ + private UUID locNodeId; + + /** Http port. */ + private static final int HTTP_PORT = 8081; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + System.setProperty(IGNITE_JETTY_PORT, String.valueOf(HTTP_PORT)); + + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopGrid(); + + System.clearProperty (IGNITE_JETTY_PORT); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + locNodeId = grid().localNode().id(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + assert cfg.getClientConnectionConfiguration() == null; + + ClientConnectionConfiguration clientCfg = new ClientConnectionConfiguration(); + + clientCfg.setRestJettyPath(REST_JETTY_CFG); + + cfg.setClientConnectionConfiguration(clientCfg); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + cfg.setCacheConfiguration(defaultCacheConfiguration()); + + return cfg; + } + + /** + * @return Client. + * @throws GridClientException In case of error. + */ + private GridClient clientTcp() throws GridClientException { + GridClientConfiguration cfg = new GridClientConfiguration(); + + cfg.setProtocol(TCP); + cfg.setServers(getServerList(TCP_PORT)); + cfg.setDataConfigurations(Collections.singleton(new GridClientDataConfiguration())); + + GridClient gridClient = GridClientFactory.start(cfg); + + assert F.exist(gridClient.compute().nodes(), new IgnitePredicate<GridClientNode>() { + @Override public boolean apply(GridClientNode n) { + return n.nodeId().equals(locNodeId); + } + }); + + return gridClient; + } + + /** + * Builds list of connection strings with few different ports. + * Used to avoid possible failures in case of port range active. + * + * @param startPort Port to start list from. + * @return List of client connection strings. + */ + private Collection<String> getServerList(int startPort) { + Collection<String> srvs = new ArrayList<>(); + + for (int i = startPort; i < startPort + 10; i++) + srvs.add(HOST + ":" + i); + + return srvs; + } + + /** + * @throws Exception If failed. + */ + public void testTcp() throws Exception { + try { + boolean putRes = cache().putx("key", 1); + + assert putRes : "Put operation failed"; + + GridClient client = clientTcp(); + + Integer val = client.data().<String, Integer>get("key"); + + assert val != null; + + assert val == 1; + } + finally { + GridClientFactory.stopAll(); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientGetAffinityTask.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientGetAffinityTask.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientGetAffinityTask.java new file mode 100644 index 0000000..f3b5490 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientGetAffinityTask.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * Get affinity for task argument. + */ +public class ClientGetAffinityTask extends TaskSingleJobSplitAdapter<String, Integer> { + /** Grid. */ + @IgniteInstanceResource + private transient Ignite ignite; + + /** {@inheritDoc} */ + @Override protected Object executeJob(int gridSize, String arg) throws IgniteCheckedException { + A.notNull(arg, "task argument"); + + String[] split = arg.split(":", 2); + + A.ensure(split.length == 2, "Task argument should have format 'cacheName:affinityKey'."); + + String cacheName = split[0]; + String affKey = split[1]; + + if ("null".equals(cacheName)) + cacheName = null; + + ClusterNode node = ignite.cluster().mapKeyToNode(cacheName, affKey); + + return node.id().toString(); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { + if (res.getException() != null) + return FAILOVER; + + return WAIT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientHttpTask.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientHttpTask.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientHttpTask.java new file mode 100644 index 0000000..ee2a31b --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientHttpTask.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import net.sf.json.*; +import org.apache.ignite.*; +import org.apache.ignite.compute.*; + +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * Test task summarizes length of all strings in the arguments list. + * <p> + * The argument of the task is JSON-serialized array of objects to calculate string length sum of. + */ +public class ClientHttpTask extends ComputeTaskSplitAdapter<String, Integer> { + /** Task delegate. */ + private final ClientTcpTask delegate = new ClientTcpTask(); + + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) throws IgniteCheckedException { + JSON json = JSONSerializer.toJSON(arg); + + List list = json.isArray() ? JSONArray.toList((JSONArray)json, String.class, new JsonConfig()) : null; + + //noinspection unchecked + return delegate.split(gridSize, list); + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return delegate.reduce(results); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { + if (res.getException() != null) + return FAILOVER; + + return WAIT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientNodeStartup.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientNodeStartup.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientNodeStartup.java new file mode 100644 index 0000000..83f1537 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientNodeStartup.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * Starts up one grid node (server) with pre-defined ports and tasks to test client-server interactions. + * <p> + * Note that different nodes cannot share the same port for rest services. If you want + * to start more than one node on the same physical machine you must provide different + * configurations for each node. Otherwise, this example would not work. + * <p> + * After this example has been started you can use pre-defined endpoints and task names in your + * client-server interactions to work with the node over un-secure protocols (binary or http). + * <p> + * Usually you cannot start secured and unsecured nodes in one grid, so started together + * secured and unsecured nodes belong to different grids. + * <p> + * Available endponts: + * <ul> + * <li>127.0.0.1:10080 - TCP unsecured endpoint.</li> + * <li>127.0.0.1:11080 - HTTP unsecured endpoint.</li> + * </ul> + * <p> + * Required credentials for remote client authentication: "s3cret". + */ +public class ClientNodeStartup { + /** + * Starts up two nodes with specified cache configuration on pre-defined endpoints. + * + * @param args Command line arguments, none required. + * @throws IgniteCheckedException In case of any exception. + */ + public static void main(String[] args) throws IgniteCheckedException { + try (Ignite g = G.start("modules/clients/src/test/resources/spring-server-node.xml")) { + U.sleep(Long.MAX_VALUE); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientPortableArgumentTask.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientPortableArgumentTask.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientPortableArgumentTask.java new file mode 100644 index 0000000..ca587d7 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientPortableArgumentTask.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.portables.*; + +import java.util.*; + +/** + * Task where argument and result are {@link ClientTestPortable}. + */ +public class ClientPortableArgumentTask extends TaskSingleJobSplitAdapter { + /** {@inheritDoc} */ + @Override protected Object executeJob(int gridSize, Object arg) throws IgniteCheckedException { + Collection args = (Collection)arg; + + Iterator<Object> it = args.iterator(); + + assert args.size() == 2 : args.size(); + + boolean expPortable = (Boolean)it.next(); + + ClientTestPortable p; + + if (expPortable) { + PortableObject obj = (PortableObject)it.next(); + + p = obj.deserialize(); + } + else + p = (ClientTestPortable)it.next(); + + assert p != null; + + return new ClientTestPortable(p.i + 1, true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientPutPortableTask.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientPutPortableTask.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientPutPortableTask.java new file mode 100644 index 0000000..1e936f8 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientPutPortableTask.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.resources.*; + +/** + * Task creates portable object and puts it in cache. + */ +public class ClientPutPortableTask extends TaskSingleJobSplitAdapter { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override protected Object executeJob(int gridSize, Object arg) throws IgniteCheckedException { + String cacheName = (String)arg; + + GridCache<Object, Object> cache = ignite.cache(cacheName); + + ClientTestPortable p = new ClientTestPortable(100, true); + + cache.put(1, p); + + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientReconnectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientReconnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientReconnectionSelfTest.java new file mode 100644 index 0000000..ab584b4 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientReconnectionSelfTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.client.*; +import org.apache.ignite.internal.client.impl.connection.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.nio.channels.*; +import java.util.*; + +/** + * + */ +public class ClientReconnectionSelfTest extends GridCommonAbstractTest { + /** */ + public static final String HOST = "127.0.0.1"; + + /** */ + private ClientTestRestServer[] srvs = new ClientTestRestServer[ClientTestRestServer.SERVERS_CNT]; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + for (int i = 0; i < srvs.length; i++) { + ClientTestRestServer srv = srvs[i]; + + if (srv != null) + srv.stop(); + + srvs[i] = null; + } + + super.afterTest(); + } + + /** + * @return Client for test. + * @throws GridClientException In case of error. + */ + private GridClient client() throws GridClientException { + return client(HOST); + } + + /** + * @param host - server host + * @return Client for test. + * @throws GridClientException In case of error. + */ + private GridClient client(String host) throws GridClientException { + GridClientConfiguration cfg = new GridClientConfiguration(); + + cfg.setProtocol(GridClientProtocol.TCP); + + Collection<String> addrs = new ArrayList<>(); + + for (int port = ClientTestRestServer.FIRST_SERVER_PORT; port < ClientTestRestServer.FIRST_SERVER_PORT + ClientTestRestServer.SERVERS_CNT; port++) + addrs.add(host + ":" + port); + + cfg.setServers(addrs); + + cfg.setTopologyRefreshFrequency(60 * 60 * 1000); + + return GridClientFactory.start(cfg); + } + + /** + * @throws Exception If failed. + */ + public void testNoFailedReconnection() throws Exception { + for (int i = 0; i < ClientTestRestServer.SERVERS_CNT; i++) + runServer(i, false); + + try (GridClient client = client()) { // Here client opens initial connection and fetches topology. + // Only first server in list should be contacted. + assertEquals(1, srvs[0].getConnectCount()); + + for (int i = 1; i < ClientTestRestServer.SERVERS_CNT; i++) + assertEquals(0, srvs[i].getConnectCount()); + + srvs[0].resetCounters(); + + int contactedSrv = 0; + + for (int i = 0; i < 100; i++) { + int failed = contactedSrv; + + srvs[failed].fail(); + + // Sometimes session close missing on client side. Retry few times until request succeeds. + while (true) + try { + client.compute().refreshTopology(false, false); + + break; + } + catch (GridClientConnectionResetException e) { + info("Exception caught: " + e); + } + + // Check which servers where contacted, + int connects = 0; + + for (int srv = 0; srv < ClientTestRestServer.SERVERS_CNT; srv++) { + if (srvs[srv].getSuccessfulConnectCount() > 0) { + assertTrue("Failed server was contacted: " + srv, srv != failed); + + contactedSrv = srv; + } + + connects += srvs[srv].getSuccessfulConnectCount(); + } + + assertEquals(1, connects); // Only one new connection should be opened. + + srvs[failed].repair(); + + srvs[contactedSrv].resetCounters(); // It should be the only server with non-0 counters. + } + + } + } + + /** + * @throws Exception If failed. + */ + public void testCorrectInit() throws Exception { + for (int i = 0; i < ClientTestRestServer.SERVERS_CNT; i++) + runServer(i, i == 0); + + try (GridClient ignored = client()) { // Here client opens initial connection and fetches topology. + // First and second should be contacted, due to failure in initial request to the first. + for (int i = 0; i < 2; i++) + assertEquals("Iteration: " + i, 1, srvs[i].getConnectCount()); + + for (int i = 2; i < ClientTestRestServer.SERVERS_CNT; i++) + assertEquals(0, srvs[i].getConnectCount()); + } + } + + /** + * @throws Exception If failed. + */ + public void testFailedInit() throws Exception { + for (int i = 0; i < ClientTestRestServer.SERVERS_CNT; i++) + runServer(i, true); + + GridClient c = client(); + + try { + c.compute().execute("fake", "arg"); + + fail("Client operation should fail when server resets connections."); + } + catch (GridClientDisconnectedException e) { + assertTrue("Thrown exception doesn't have an expected cause: " + X.getFullStackTrace(e), + X.hasCause(e, GridClientConnectionResetException.class, ClosedChannelException.class)); + } + + for (int i = 0; i < ClientTestRestServer.SERVERS_CNT; i++) + // Connection manager does 3 attempts to get topology before failure. + assertEquals("Server: " + i, 3, srvs[i].getConnectCount()); + } + + /** + * @throws Exception If failed. + */ + // TODO Uncomment when GG-3789 fixed. +// public void testIdleConnection() throws Exception { +// for (int i = 0; i < SERVERS_CNT; i++) +// runServer(i, false); +// +// GridClient client = client(); // Here client opens initial connection and fetches topology. +// +// try { +// // Only first server in list should be contacted. +// assertEquals(1, srvs[0].getConnectCount()); +// +// Thread.sleep(35000); // Timeout as idle. +// +// assertEquals(1, srvs[0].getDisconnectCount()); +// +// for (int i = 1; i < SERVERS_CNT; i++) +// assertEquals(0, srvs[i].getConnectCount()); +// +// srvs[0].resetCounters(); +// +// // On new request connection should be re-opened. +// client.compute().refreshTopology(false, false); +// +// assertEquals(1, srvs[0].getConnectCount()); +// +// for (int i = 1; i < SERVERS_CNT; i++) +// assertEquals(0, srvs[i].getConnectCount()); +// } +// finally { +// GridClientFactory.stop(client.id()); +// } +// } + + /** + * Runs a new server with given index. + * + * @param idx Server index, same as in client configuration's servers property. + * @param failOnConnect If {@code true} the server should fail incoming connection immediately. + * @return Server instance. + * @throws IgniteCheckedException If failed. + */ + private ClientTestRestServer runServer(int idx, boolean failOnConnect) throws IgniteCheckedException { + ClientTestRestServer srv = new ClientTestRestServer(ClientTestRestServer.FIRST_SERVER_PORT + idx, failOnConnect, log()); + + srv.start(); + + srvs[idx] = srv; + + return srv; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientSslNodeStartup.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientSslNodeStartup.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientSslNodeStartup.java new file mode 100644 index 0000000..bdef7eb --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientSslNodeStartup.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * Starts up one grid node (server) with pre-defined ports and tasks to test client-server interactions. + * <p> + * Note that different nodes cannot share the same port for rest services. If you want + * to start more than one node on the same physical machine you must provide different + * configurations for each node. Otherwise, this example would not work. + * <p> + * After this example has been started you can use pre-defined endpoints and task names in your + * client-server interactions to work with the node over secured protocols (binary over SSL or https). + * <p> + * Usually you cannot start secured and unsecured nodes in one grid, so started together + * secured and unsecured nodes belong to different grids. + * <p> + * Available endponts: + * <ul> + * <li>127.0.0.1:10443 - TCP SSL-protected endpoint.</li> + * <li>127.0.0.1:11443 - HTTP SSL-protected endpoint.</li> + * </ul> + * <p> + * Required credentials for remote client authentication: "s3cret". + */ +public class ClientSslNodeStartup { + /** + * Starts up two nodes with specified cache configuration on pre-defined endpoints. + * + * @param args Command line arguments, none required. + * @throws IgniteCheckedException In case of any exception. + */ + public static void main(String[] args) throws IgniteCheckedException { + System.setProperty("CLIENTS_MODULE_PATH", U.resolveGridGainPath("modules/clients").getAbsolutePath()); + + try (Ignite g = G.start("modules/clients/src/test/resources/spring-server-ssl-node.xml")) { + U.sleep(Long.MAX_VALUE); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStartNodeTask.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStartNodeTask.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStartNodeTask.java new file mode 100644 index 0000000..b16a338 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStartNodeTask.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.springframework.beans.factory.*; +import org.springframework.context.support.*; + +import java.net.*; +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * Start node task, applicable arguments: + * <ul> + * <li>tcp</li> + * <li>http</li> + * <li>tcp+ssl</li> + * <li>http+ssl</li> + * </ul> + */ +public class ClientStartNodeTask extends TaskSingleJobSplitAdapter<String, Integer> { + /** + * Available node's configurations. + */ + private static final Map<String, String> NODE_CFG = new HashMap<String, String>() {{ + put("tcp", "modules/clients/src/test/resources/spring-server-node.xml"); + put("http", "modules/clients/src/test/resources/spring-server-node.xml"); + put("tcp+ssl", "modules/clients/src/test/resources/spring-server-ssl-node.xml"); + put("http+ssl", "modules/clients/src/test/resources/spring-server-ssl-node.xml"); + }}; + + /** */ + @IgniteLoggerResource + private transient IgniteLogger log; + + /** */ + @IgniteInstanceResource + private transient Ignite ignite; + + /** {@inheritDoc} */ + @Override protected Object executeJob(int gridSize, String type) throws IgniteCheckedException { + log.info(">>> Starting new grid node [currGridSize=" + gridSize + ", arg=" + type + "]"); + + if (type == null) + throw new IllegalArgumentException("Node type to start should be specified."); + + IgniteConfiguration cfg = getConfig(type); + + // Generate unique for this VM grid name. + String gridName = cfg.getGridName() + " (" + UUID.randomUUID() + ")"; + + // Update grid name (required to be unique). + cfg.setGridName(gridName); + + // Start new node in current VM. + Ignite g = G.start(cfg); + + log.info(">>> Grid started [nodeId=" + g.cluster().localNode().id() + ", name='" + g.name() + "']"); + + return true; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) + throws IgniteCheckedException { + if (res.getException() != null) + return FAILOVER; + + return WAIT; + } + + /** + * Load grid configuration for specified node type. + * + * @param type Node type to load configuration for. + * @return Grid configuration for specified node type. + */ + static IgniteConfiguration getConfig(String type) { + String path = NODE_CFG.get(type); + + if (path == null) + throw new IllegalArgumentException("Unsupported node type: " + type); + + URL url = U.resolveGridGainUrl(path); + + BeanFactory ctx = new FileSystemXmlApplicationContext(url.toString()); + + return (IgniteConfiguration)ctx.getBean("grid.cfg"); + } + + /** + * Example for start/stop node tasks. + * + * @param args Not used. + */ + public static void main(String[] args) { + String nodeType = "tcp+ssl"; + + // Start initial node = 1 + try (Ignite g = G.start(NODE_CFG.get(nodeType))) { + // Change topology. + changeTopology(g, 4, 1, nodeType); + changeTopology(g, 1, 4, nodeType); + + // Stop node by id = 0 + g.compute().execute(ClientStopNodeTask.class, g.cluster().localNode().id().toString()); + + // Wait for node stops. + //U.sleep(1000); + + assert G.allGrids().isEmpty(); + } + catch (IgniteCheckedException e) { + System.err.println("Uncaught exception: " + e.getMessage()); + + e.printStackTrace(System.err); + } + } + + /** + * Change topology. + * + * @param parent Grid to execute tasks on. + * @param add New nodes count. + * @param rmv Remove nodes count. + * @param type Type of nodes to manipulate. + * @throws IgniteCheckedException On any exception. + */ + private static void changeTopology(Ignite parent, int add, int rmv, String type) throws IgniteCheckedException { + Collection<ComputeTaskFuture<?>> tasks = new ArrayList<>(); + + IgniteCompute comp = parent.compute().withAsync(); + + // Start nodes in parallel. + while (add-- > 0) { + comp.execute(ClientStartNodeTask.class, type); + + tasks.add(comp.future()); + } + + for (ComputeTaskFuture<?> task : tasks) + task.get(); + + // Stop nodes in sequence. + while (rmv-- > 0) + parent.compute().execute(ClientStopNodeTask.class, type); + + // Wait for node stops. + //U.sleep(1000); + + Collection<String> gridNames = new ArrayList<>(); + + for (Ignite g : G.allGrids()) + gridNames.add(g.name()); + + parent.log().info(">>> Available grids: " + gridNames); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStopNodeTask.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStopNodeTask.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStopNodeTask.java new file mode 100644 index 0000000..c930ed3 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStopNodeTask.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * Stop node task, applicable arguments: + * <ul> + * <li>node id (as string) to stop or</li> + * <li>node type (see start nodes task).</li> + * </ul> + */ +public class ClientStopNodeTask extends ComputeTaskSplitAdapter<String, Integer> { + /** */ + @IgniteLoggerResource + private transient IgniteLogger log; + + /** */ + @IgniteInstanceResource + private transient Ignite ignite; + + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) throws IgniteCheckedException { + Collection<ComputeJob> jobs = new ArrayList<>(); + + for (int i = 0; i < gridSize; i++) + jobs.add(new StopJob(arg)); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { + ComputeJobResultPolicy superRes = super.result(res, rcvd); + + // Deny failover. + if (superRes == FAILOVER) + superRes = WAIT; + + return superRes; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + int stoppedCnt = 0; + + for (ComputeJobResult res : results) + if (!res.isCancelled()) + stoppedCnt+=(Integer)res.getData(); + + return stoppedCnt; + } + + /** + * Stop node job it is executed on. + */ + private static class StopJob extends ComputeJobAdapter { + /** */ + private final String gridType; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private StopJob(String gridType) { + this.gridType = gridType; + } + + /** {@inheritDoc} */ + @Override public Object execute() { + log.info(">>> Stop node [nodeId=" + ignite.cluster().localNode().id() + ", name='" + ignite.name() + "']"); + + String prefix = ClientStartNodeTask.getConfig(gridType).getGridName() + " ("; + + if (!ignite.name().startsWith(prefix)) { + int stoppedCnt = 0; + + for (Ignite g : G.allGrids()) + if (g.name().startsWith(prefix)) { + try { + log.info(">>> Grid stopping [nodeId=" + g.cluster().localNode().id() + + ", name='" + g.name() + "']"); + + G.stop(g.name(), true); + + stoppedCnt++; + } + catch (IllegalStateException e) { + log.warning("Failed to stop grid.", e); + } + } + + return stoppedCnt; + } + + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStringLengthTask.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStringLengthTask.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStringLengthTask.java new file mode 100644 index 0000000..398c0e8 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientStringLengthTask.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; + +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * Test task calculate length of the string passed in the argument. + * <p> + * The argument of the task is a simple string to calculate length of. + */ +public class ClientStringLengthTask extends ComputeTaskSplitAdapter<String, Integer> { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) throws IgniteCheckedException { + Collection<ComputeJobAdapter> jobs = new ArrayList<>(); + + if (arg != null) + for (final Object val : arg.split("")) + jobs.add(new ComputeJobAdapter() { + @Override public Object execute() { + try { + Thread.sleep(5); + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + + return val == null ? 0 : val.toString().length(); + } + }); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + int sum = 0; + + for (ComputeJobResult res : results) + sum += res.<Integer>getData(); + + return sum; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) + throws IgniteCheckedException { + if (res.getException() != null) + return FAILOVER; + + return WAIT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpMultiThreadedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpMultiThreadedSelfTest.java new file mode 100644 index 0000000..74f9a8e --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpMultiThreadedSelfTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.internal.client.ssl.*; + +/** + * Runs multi-threaded tests on tcp binary protocol (ssl is disabled). + */ +public class ClientTcpMultiThreadedSelfTest extends ClientAbstractMultiThreadedSelfTest { + /** {@inheritDoc} */ + @Override protected GridClientProtocol protocol() { + return GridClientProtocol.TCP; + } + + /** {@inheritDoc} */ + @Override protected String serverAddress() { + return HOST + ":" + REST_TCP_PORT_BASE; + } + + /** {@inheritDoc} */ + @Override protected boolean useSsl() { + return false; + } + + /** {@inheritDoc} */ + @Override protected GridSslContextFactory sslContextFactory() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpSslAuthenticationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpSslAuthenticationSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpSslAuthenticationSelfTest.java new file mode 100644 index 0000000..859d3d6 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpSslAuthenticationSelfTest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.client.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.balancer.*; +import org.apache.ignite.internal.client.impl.*; +import org.apache.ignite.internal.client.ssl.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.net.ssl.*; +import java.security.cert.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Tests + */ +public class ClientTcpSslAuthenticationSelfTest extends GridCommonAbstractTest { + /** REST TCP port. */ + private static final int REST_TCP_PORT = 12121; + + /** Test trust manager for server. */ + private MockX509TrustManager srvTrustMgr = new MockX509TrustManager(); + + /** Test trust manager for client. */ + private MockX509TrustManager clientTrustMgr = new MockX509TrustManager(); + + /** Whether server should check clients. */ + private volatile boolean checkClient; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + assertEquals(0, srvTrustMgr.serverCheckCallCount()); + assertEquals(0, clientTrustMgr.clientCheckCallCount()); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + srvTrustMgr.reset(); + clientTrustMgr.reset(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setLocalHost(getTestResources().getLocalHost()); + + assert c.getClientConnectionConfiguration() == null; + + ClientConnectionConfiguration clientCfg = new ClientConnectionConfiguration(); + + clientCfg.setRestTcpPort(REST_TCP_PORT); + clientCfg.setRestTcpSslEnabled(true); + + clientCfg.setRestTcpSslClientAuth(checkClient); + clientCfg.setRestTcpSslClientAuth(checkClient); + + GridSslBasicContextFactory factory = (GridSslBasicContextFactory)GridTestUtils.sslContextFactory(); + + factory.setTrustManagers(srvTrustMgr); + + clientCfg.setRestTcpSslContextFactory(factory); + + c.setClientConnectionConfiguration(clientCfg); + + return c; + } + + /** + * Creates client that will try to connect to only first node in grid. + * + * @return Client. + * @throws Exception If failed to create client. + */ + private GridClientImpl createClient() throws Exception { + GridClientConfiguration cfg = new GridClientConfiguration(); + + cfg.setServers(Arrays.asList(U.getLocalHost().getHostAddress() + ":" + REST_TCP_PORT)); + cfg.setBalancer(new GridClientRoundRobinBalancer()); + + GridSslBasicContextFactory factory = (GridSslBasicContextFactory)GridTestUtils.sslContextFactory(); + + factory.setTrustManagers(clientTrustMgr); + + cfg.setSslContextFactory(factory); + + return (GridClientImpl)GridClientFactory.start(cfg); + } + + /** + * @throws Exception If failed. + */ + public void testServerAuthenticated() throws Exception { + checkServerAuthenticatedByClient(false); + } + + /** + * @throws Exception If failed. + */ + public void testServerNotAuthenticatedByClient() throws Exception { + try { + checkServerAuthenticatedByClient(true); + } + catch (GridClientDisconnectedException e) { + assertTrue(X.hasCause(e, GridServerUnreachableException.class)); + } + } + + /** + * @throws Exception If failed. + */ + public void testClientAuthenticated() throws Exception { + checkClientAuthenticatedByServer(false); + } + + /** + * @throws Exception If failed. + */ + public void testClientNotAuthenticated() throws Exception { + try { + checkServerAuthenticatedByClient(true); + } + catch (GridClientDisconnectedException e) { + assertTrue(X.hasCause(e, GridServerUnreachableException.class)); + } + } + + /** + * @param fail Should client trust manager fail. + * @throws Exception If failed. + */ + private void checkServerAuthenticatedByClient(boolean fail) throws Exception { + checkClient = false; + srvTrustMgr.shouldFail(false); + clientTrustMgr.shouldFail(fail); + + startGrid(); + + try { + try (GridClientImpl c = createClient()) { + c.compute().refreshTopology(false, false); + } + } + finally { + G.stopAll(false); + } + + assertEquals(0, srvTrustMgr.clientCheckCallCount()); + assertEquals(1, clientTrustMgr.serverCheckCallCount()); + } + + /** + * @param fail Should server trust manager fail. + * @throws Exception If failed. + */ + private void checkClientAuthenticatedByServer(boolean fail) throws Exception { + checkClient = true; + srvTrustMgr.shouldFail(fail); + clientTrustMgr.shouldFail(false); + + startGrid(); + + try { + try (GridClientImpl c = createClient()) { + c.compute().refreshTopology(false, false); + } + } + finally { + G.stopAll(false); + } + + assertEquals(1, srvTrustMgr.clientCheckCallCount()); + assertEquals(1, clientTrustMgr.serverCheckCallCount()); + } + + /** + * Test trust manager to emulate certificate check failures. + */ + private static class MockX509TrustManager implements X509TrustManager { + /** Empty array. */ + private static final X509Certificate[] EMPTY = new X509Certificate[0]; + + /** Whether checks should fail. */ + private volatile boolean shouldFail; + + /** Client check call count. */ + private AtomicInteger clientCheckCallCnt = new AtomicInteger(); + + /** Server check call count. */ + private AtomicInteger srvCheckCallCnt = new AtomicInteger(); + + /** + * @param shouldFail Whether checks should fail. + */ + private void shouldFail(boolean shouldFail) { + this.shouldFail = shouldFail; + } + + /** {@inheritDoc} */ + @Override public void checkClientTrusted(X509Certificate[] x509Certificates, String s) + throws CertificateException { + clientCheckCallCnt.incrementAndGet(); + + if (shouldFail) + throw new CertificateException("Client check failed."); + } + + /** {@inheritDoc} */ + @Override public void checkServerTrusted(X509Certificate[] x509Certificates, String s) + throws CertificateException { + srvCheckCallCnt.incrementAndGet(); + + if (shouldFail) + throw new CertificateException("Server check failed."); + } + + /** {@inheritDoc} */ + @Override public X509Certificate[] getAcceptedIssuers() { + return EMPTY; + } + + /** + * @return Call count to checkClientTrusted method. + */ + public int clientCheckCallCount() { + return clientCheckCallCnt.get(); + } + + /** + * @return Call count to checkServerTrusted method. + */ + public int serverCheckCallCount() { + return srvCheckCallCnt.get(); + } + + /** + * Clears should fail flag and resets call counters. + */ + public void reset() { + shouldFail = false; + clientCheckCallCnt.set(0); + srvCheckCallCnt.set(0); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpSslMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpSslMultiThreadedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpSslMultiThreadedSelfTest.java new file mode 100644 index 0000000..4131bf0 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpSslMultiThreadedSelfTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.internal.client.ssl.*; +import org.apache.ignite.testframework.*; + +/** + * Runs multithreaded tests for client over tcp binary protocol with ssl enabled. + */ +public class ClientTcpSslMultiThreadedSelfTest extends ClientAbstractMultiThreadedSelfTest { + /** {@inheritDoc} */ + @Override protected GridClientProtocol protocol() { + return GridClientProtocol.TCP; + } + + /** {@inheritDoc} */ + @Override protected String serverAddress() { + return HOST + ":" + REST_TCP_PORT_BASE; + } + + /** {@inheritDoc} */ + @Override protected boolean useSsl() { + return true; + } + + /** {@inheritDoc} */ + @Override protected GridSslContextFactory sslContextFactory() { + return GridTestUtils.sslContextFactory(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpTask.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpTask.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpTask.java new file mode 100644 index 0000000..93d5b24 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpTask.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; + +import java.util.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + +/** + * Test task summarizes length of all strings in the arguments list. + * <p> + * The argument of the task is a collection of objects to calculate string length sum of. + */ +public class ClientTcpTask extends ComputeTaskSplitAdapter<List<Object>, Integer> { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, List<Object> list) + throws IgniteCheckedException { + Collection<ComputeJobAdapter> jobs = new ArrayList<>(); + + if (list != null) + for (final Object val : list) + jobs.add(new ComputeJobAdapter() { + @Override public Object execute() { + try { + Thread.sleep(5); + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + + return val == null ? 0 : val.toString().length(); + } + }); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + int sum = 0; + + for (ComputeJobResult res : results) + sum += res.<Integer>getData(); + + return sum; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { + if (res.getException() != null) + return FAILOVER; + + return WAIT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpTaskExecutionAfterTopologyRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpTaskExecutionAfterTopologyRestartSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpTaskExecutionAfterTopologyRestartSelfTest.java new file mode 100644 index 0000000..948354f --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTcpTaskExecutionAfterTopologyRestartSelfTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.client.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +/** + * Ensures + */ +public class ClientTcpTaskExecutionAfterTopologyRestartSelfTest extends GridCommonAbstractTest { + /** Port. */ + private static final int PORT = 11211; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setLocalHost("127.0.0.1"); + + assert cfg.getClientConnectionConfiguration() == null; + + ClientConnectionConfiguration clientCfg = new ClientConnectionConfiguration(); + + clientCfg.setRestTcpPort(PORT); + + cfg.setClientConnectionConfiguration(clientCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testTaskAfterRestart() throws Exception { + startGrids(1); + + GridClientConfiguration cfg = new GridClientConfiguration(); + + cfg.setProtocol(GridClientProtocol.TCP); + cfg.setServers(Collections.singleton("127.0.0.1:" + PORT)); + + GridClient cli = GridClientFactory.start(cfg); + + cli.compute().execute(ClientTcpTask.class.getName(), Collections.singletonList("arg")); + + stopAllGrids(); + + startGrid(); + + cli.compute().execute(ClientTcpTask.class.getName(), Collections.singletonList("arg")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestPortable.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestPortable.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestPortable.java new file mode 100644 index 0000000..7f9aa46 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestPortable.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.portables.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Test portable object. + */ +@SuppressWarnings("PublicField") +public class ClientTestPortable implements PortableMarshalAware, Serializable { + /** */ + public byte b; + + /** */ + public byte bRaw; + + /** */ + public short s; + + /** */ + public short sRaw; + + /** */ + public int i; + + /** */ + public int iRaw; + + /** */ + public long l; + + /** */ + public long lRaw; + + /** */ + public float f; + + /** */ + public float fRaw; + + /** */ + public double d; + + /** */ + public double dRaw; + + /** */ + public char c; + + /** */ + public char cRaw; + + /** */ + public boolean bool; + + /** */ + public boolean boolRaw; + + /** */ + public String str; + + /** */ + public String strRaw; + + /** */ + public UUID uuid; + + /** */ + public UUID uuidRaw; + + /** */ + public Date date; + + /** */ + public Date dateRaw; + + /** */ + public TestEnum e; + + /** */ + public TestEnum eRaw; + + /** */ + public byte[] bArr; + + /** */ + public byte[] bArrRaw; + + /** */ + public short[] sArr; + + /** */ + public short[] sArrRaw; + + /** */ + public int[] iArr; + + /** */ + public int[] iArrRaw; + + /** */ + public long[] lArr; + + /** */ + public long[] lArrRaw; + + /** */ + public float[] fArr; + + /** */ + public float[] fArrRaw; + + /** */ + public double[] dArr; + + /** */ + public double[] dArrRaw; + + /** */ + public char[] cArr; + + /** */ + public char[] cArrRaw; + + /** */ + public boolean[] boolArr; + + /** */ + public boolean[] boolArrRaw; + + /** */ + public String[] strArr; + + /** */ + public String[] strArrRaw; + + /** */ + public UUID[] uuidArr; + + /** */ + public UUID[] uuidArrRaw; + + /** */ + public Date[] dateArr; + + /** */ + public Date[] dateArrRaw; + + /** */ + public TestEnum[] eArr; + + /** */ + public TestEnum[] eArrRaw; + + /** */ + public Object[] objArr; + + /** */ + public Object[] objArrRaw; + + /** */ + public Collection<String> col; + + /** */ + public Collection<String> colRaw; + + /** */ + public Map<Integer, String> map; + + /** */ + public Map<Integer, String> mapRaw; + + /** */ + public ClientTestPortable portable1; + + /** */ + public ClientTestPortable portable2; + + /** */ + public ClientTestPortable portableRaw1; + + /** */ + public ClientTestPortable portableRaw2; + + /** + */ + public ClientTestPortable() { + // No-op. + } + + /** + * @param val Value. + * @param createInner If {@code true} creates nested object. + */ + public ClientTestPortable(int val, boolean createInner) { + b = (byte)val; + bRaw = (byte)(val + 1); + + s = (short)val; + sRaw = (short)(val + 1); + + i = val; + iRaw = i + 1; + + l = val; + lRaw = i + 1; + + f = val + 0.5f; + fRaw = f + 1; + + d = val + 0.5f; + dRaw = d + 1; + + c = (char)val; + cRaw = (char)(val + 1); + + bool = true; + boolRaw = false; + + str = String.valueOf(i); + strRaw = String.valueOf(iRaw); + + uuid = new UUID(i, i); + uuidRaw = new UUID(iRaw, iRaw); + + date = new Date(i); + dateRaw = new Date(iRaw); + + e = enumValue(i); + eRaw = enumValue(iRaw); + + bArr = new byte[]{b, (byte)(b + 1)}; + bArrRaw = new byte[]{bRaw, (byte)(bRaw + 1)}; + + sArr = new short[]{s, (short)(s + 1)}; + sArrRaw = new short[]{sRaw, (short)(sRaw + 1)}; + + iArr = new int[]{i, i + 1}; + iArrRaw = new int[]{iRaw, iRaw + 1}; + + lArr = new long[]{l, l + 1}; + lArrRaw = new long[]{lRaw, lRaw + 1}; + + fArr = new float[]{f, f + 1}; + fArrRaw = new float[]{fRaw, fRaw + 1}; + + dArr = new double[]{d, d + 1}; + dArrRaw = new double[]{dRaw, dRaw + 1}; + + cArr = new char[]{c, (char)(c + 1)}; + cArrRaw = new char[]{cRaw, (char)(cRaw + 1)}; + + boolArr = new boolean[]{true, true}; + boolArrRaw = new boolean[]{true, true}; + + strArr = new String[]{str, str + "1"}; + strArrRaw = new String[]{strRaw, strRaw + "1"}; + + uuidArr = new UUID[]{uuid, new UUID(uuid.getMostSignificantBits() + 1, uuid.getLeastSignificantBits() + 1)}; + uuidArrRaw = new UUID[]{uuidRaw, + new UUID(uuidRaw.getMostSignificantBits() + 1, uuidRaw.getLeastSignificantBits() + 1)}; + + dateArr = new Date[]{date, new Date(date.getTime() + 1)}; + dateArrRaw = new Date[]{dateRaw, new Date(dateRaw.getTime() + 1)}; + + eArr = new TestEnum[]{enumValue(i), enumValue(i + 1)}; + eArrRaw = new TestEnum[]{enumValue(iRaw), enumValue(iRaw + 1)}; + + objArr = new Object[]{uuid, new UUID(uuid.getMostSignificantBits() + 1, uuid.getLeastSignificantBits() + 1)}; + objArrRaw = new Object[]{uuidRaw, + new UUID(uuidRaw.getMostSignificantBits() + 1, uuidRaw.getLeastSignificantBits() + 1)}; + + col = Arrays.asList(str, str + "1"); + colRaw = Arrays.asList(strRaw, strRaw + "1"); + + map = new HashMap<>(); + map.put(1, str); + map.put(2, str + "1"); + + mapRaw = new HashMap<>(); + mapRaw.put(1, strRaw); + mapRaw.put(2, strRaw + "1"); + + if (createInner) { + portable1 = new ClientTestPortable(val + 1, false); + portable2 = portable1; + + portableRaw1 = new ClientTestPortable(val + 2, false); + portableRaw2 = portableRaw1; + } + } + + /** {@inheritDoc} */ + @Override public void writePortable(PortableWriter writer) throws PortableException { + writer.writeByte("_b", b); + writer.writeShort("_s", s); + writer.writeInt("_i", i); + writer.writeLong("_l", l); + writer.writeFloat("_f", f); + writer.writeDouble("_d", d); + writer.writeChar("_c", c); + writer.writeBoolean("_bool", bool); + writer.writeString("_str", str); + writer.writeUuid("_uuid", uuid); + writer.writeDate("_date", date); + writer.writeEnum("_enum", e); + writer.writeByteArray("_bArr", bArr); + writer.writeShortArray("_sArr", sArr); + writer.writeIntArray("_iArr", iArr); + writer.writeLongArray("_lArr", lArr); + writer.writeFloatArray("_fArr", fArr); + writer.writeDoubleArray("_dArr", dArr); + writer.writeCharArray("_cArr", cArr); + writer.writeBooleanArray("_boolArr", boolArr); + writer.writeStringArray("_strArr", strArr); + writer.writeUuidArray("_uuidArr", uuidArr); + writer.writeDateArray("_dateArr", dateArr); + writer.writeEnumArray("_eArr", eArr); + writer.writeObjectArray("_objArr", objArr); + writer.writeCollection("_col", col); + writer.writeMap("_map", map); + writer.writeObject("_portable1", portable1); + writer.writeObject("_portable2", portable2); + + PortableRawWriter raw = writer.rawWriter(); + + raw.writeByte(bRaw); + raw.writeShort(sRaw); + raw.writeInt(iRaw); + raw.writeLong(lRaw); + raw.writeFloat(fRaw); + raw.writeDouble(dRaw); + raw.writeChar(cRaw); + raw.writeBoolean(boolRaw); + raw.writeString(strRaw); + raw.writeUuid(uuidRaw); + raw.writeDate(dateRaw); + raw.writeEnum(eRaw); + raw.writeByteArray(bArrRaw); + raw.writeShortArray(sArrRaw); + raw.writeIntArray(iArrRaw); + raw.writeLongArray(lArrRaw); + raw.writeFloatArray(fArrRaw); + raw.writeDoubleArray(dArrRaw); + raw.writeCharArray(cArrRaw); + raw.writeBooleanArray(boolArrRaw); + raw.writeStringArray(strArrRaw); + raw.writeUuidArray(uuidArrRaw); + raw.writeDateArray(dateArrRaw); + raw.writeEnumArray(eArrRaw); + raw.writeObjectArray(objArrRaw); + raw.writeCollection(colRaw); + raw.writeMap(mapRaw); + raw.writeObject(portableRaw1); + raw.writeObject(portableRaw2); + } + + /** {@inheritDoc} */ + @Override public void readPortable(PortableReader reader) throws PortableException { + b = reader.readByte("_b"); + s = reader.readShort("_s"); + i = reader.readInt("_i"); + l = reader.readLong("_l"); + f = reader.readFloat("_f"); + d = reader.readDouble("_d"); + c = reader.readChar("_c"); + bool = reader.readBoolean("_bool"); + str = reader.readString("_str"); + uuid = reader.readUuid("_uuid"); + date = reader.readDate("_date"); + e = reader.readEnum("_enum", TestEnum.class); + bArr = reader.readByteArray("_bArr"); + sArr = reader.readShortArray("_sArr"); + iArr = reader.readIntArray("_iArr"); + lArr = reader.readLongArray("_lArr"); + fArr = reader.readFloatArray("_fArr"); + dArr = reader.readDoubleArray("_dArr"); + cArr = reader.readCharArray("_cArr"); + boolArr = reader.readBooleanArray("_boolArr"); + strArr = reader.readStringArray("_strArr"); + uuidArr = reader.readUuidArray("_uuidArr"); + dateArr = reader.readDateArray("_dateArr"); + eArr = reader.readEnumArray("_eArr", TestEnum.class); + objArr = reader.readObjectArray("_objArr"); + col = reader.readCollection("_col"); + map = reader.readMap("_map"); + portable1 = (ClientTestPortable)reader.readObject("_portable1"); + portable2 = (ClientTestPortable)reader.readObject("_portable2"); + + PortableRawReader raw = reader.rawReader(); + + bRaw = raw.readByte(); + sRaw = raw.readShort(); + iRaw = raw.readInt(); + lRaw = raw.readLong(); + fRaw = raw.readFloat(); + dRaw = raw.readDouble(); + cRaw = raw.readChar(); + boolRaw = raw.readBoolean(); + strRaw = raw.readString(); + uuidRaw = raw.readUuid(); + dateRaw = raw.readDate(); + eRaw = raw.readEnum(TestEnum.class); + bArrRaw = raw.readByteArray(); + sArrRaw = raw.readShortArray(); + iArrRaw = raw.readIntArray(); + lArrRaw = raw.readLongArray(); + fArrRaw = raw.readFloatArray(); + dArrRaw = raw.readDoubleArray(); + cArrRaw = raw.readCharArray(); + boolArrRaw = raw.readBooleanArray(); + strArrRaw = raw.readStringArray(); + uuidArrRaw = raw.readUuidArray(); + dateArrRaw = raw.readDateArray(); + eArrRaw = raw.readEnumArray(TestEnum.class); + objArrRaw = raw.readObjectArray(); + colRaw = raw.readCollection(); + mapRaw = raw.readMap(); + portableRaw1 = (ClientTestPortable)raw.readObject(); + portableRaw2 = (ClientTestPortable)raw.readObject(); + } + + /** + * @param idx Value index. + * @return Enum value. + */ + static TestEnum enumValue(int idx) { + return TestEnum.values()[idx % TestEnum.values().length]; + } + + /** + * Test enum. + */ + private enum TestEnum { + /** */ + VAL1, + + /** */ + VAL2, + + /** */ + VAl3, + + /** */ + VAL4, + + /** */ + VAL5, + + /** */ + VAL6, + + /** */ + VAL7, + + /** */ + VAL8, + + /** */ + VAL9, + + /** */ + VAL10 + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClientTestPortable.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestPortableAffinityKeyTask.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestPortableAffinityKeyTask.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestPortableAffinityKeyTask.java new file mode 100644 index 0000000..e94f8d5 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientTestPortableAffinityKeyTask.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.portables.*; +import org.apache.ignite.resources.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Task used to test portable affinity key. + */ +public class ClientTestPortableAffinityKeyTask extends ComputeTaskAdapter<Object, Boolean> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> clusterNodes, + @Nullable final Object arg) throws IgniteCheckedException { + for (ClusterNode node : clusterNodes) { + if (node.isLocal()) + return Collections.singletonMap(new ComputeJobAdapter() { + @Override public Object execute() throws IgniteCheckedException { + return executeJob(arg); + } + }, node); + } + + throw new IgniteCheckedException("Failed to find local node in task topology: " + clusterNodes); + } + + /** {@inheritDoc} */ + @Nullable @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return results.get(0).getData(); + } + + /** + * @param arg Argument. + * @return Execution result. + * @throws IgniteCheckedException If failed. + */ + protected Boolean executeJob(Object arg) throws IgniteCheckedException { + Collection args = (Collection)arg; + + Iterator<Object> it = args.iterator(); + + assert args.size() == 3 : args.size(); + + PortableObject obj = (PortableObject)it.next(); + + String cacheName = (String)it.next(); + + String expAffKey = (String)it.next(); + + Object affKey = ignite.cache(cacheName).affinity().affinityKey(obj); + + if (!expAffKey.equals(affKey)) + throw new IgniteCheckedException("Unexpected affinity key: " + affKey); + + if (!ignite.cache(cacheName).affinity().mapKeyToNode(obj).isLocal()) + throw new IgniteCheckedException("Job is not run on primary node."); + + return true; + } +}