http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/client/impl/ClientPartitionAffinitySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/client/impl/ClientPartitionAffinitySelfTest.java b/modules/clients/src/test/java/org/apache/ignite/client/impl/ClientPartitionAffinitySelfTest.java deleted file mode 100644 index ca88bd1..0000000 --- a/modules/clients/src/test/java/org/apache/ignite/client/impl/ClientPartitionAffinitySelfTest.java +++ /dev/null @@ -1,407 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.client.impl; - -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cache.affinity.consistenthash.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.client.*; -import org.apache.ignite.internal.client.impl.*; -import org.apache.ignite.internal.processors.affinity.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.*; - -import static org.apache.ignite.cache.affinity.consistenthash.CacheConsistentHashAffinityFunction.*; - -/** - * Client's partitioned affinity tests. - */ -public class ClientPartitionAffinitySelfTest extends GridCommonAbstractTest { - /** Hash ID resolver. */ - private static final GridClientPartitionAffinity.HashIdResolver HASH_ID_RSLVR = - new GridClientPartitionAffinity.HashIdResolver() { - @Override public Object getHashId(GridClientNode node) { - return node.nodeId(); - } - }; - - /** - * Test predefined affinity - must be ported to other clients. - */ - @SuppressWarnings("UnaryPlus") - public void testPredefined() throws Exception { - // Use Md5 hasher for this test. - GridClientPartitionAffinity aff = new GridClientPartitionAffinity(); - - getTestResources().inject(aff); - - aff.setHashIdResolver(HASH_ID_RSLVR); - - List<GridClientNode> nodes = new ArrayList<>(); - - nodes.add(createNode("000ea4cd-f449-4dcb-869a-5317c63bd619", 50)); - nodes.add(createNode("010ea4cd-f449-4dcb-869a-5317c63bd62a", 60)); - nodes.add(createNode("0209ec54-ff53-4fdb-8239-5a3ac1fb31bd", 70)); - nodes.add(createNode("0309ec54-ff53-4fdb-8239-5a3ac1fb31ef", 80)); - nodes.add(createNode("040c9b94-02ae-45a6-9d5c-a066dbdf2636", 90)); - nodes.add(createNode("050c9b94-02ae-45a6-9d5c-a066dbdf2747", 100)); - nodes.add(createNode("0601f916-4357-4cfe-a7df-49d4721690bf", 110)); - nodes.add(createNode("0702f916-4357-4cfe-a7df-49d4721691c0", 120)); - - Map<Object, Integer> data = new LinkedHashMap<>(); - - data.put("", 4); - data.put("asdf", 4); - data.put("224ea4cd-f449-4dcb-869a-5317c63bd619", 5); - data.put("fdc9ec54-ff53-4fdb-8239-5a3ac1fb31bd", 2); - data.put("0f9c9b94-02ae-45a6-9d5c-a066dbdf2636", 2); - data.put("d8f1f916-4357-4cfe-a7df-49d4721690bf", 7); - data.put("c77ffeae-78a1-4ee6-a0fd-8d197a794412", 3); - data.put("35de9f21-3c9b-4f4a-a7d5-3e2c6cb01564", 1); - data.put("d67eb652-4e76-47fb-ad4e-cd902d9b868a", 7); - - data.put(0, 4); - data.put(1, 7); - data.put(12, 5); - data.put(123, 6); - data.put(1234, 4); - data.put(12345, 6); - data.put(123456, 6); - data.put(1234567, 6); - data.put(12345678, 0); - data.put(123456789, 7); - data.put(1234567890, 7); - data.put(1234567890L, 7); - data.put(12345678901L, 2); - data.put(123456789012L, 1); - data.put(1234567890123L, 0); - data.put(12345678901234L, 1); - data.put(123456789012345L, 6); - data.put(1234567890123456L, 7); - data.put(-23456789012345L, 4); - data.put(-2345678901234L, 1); - data.put(-234567890123L, 5); - data.put(-23456789012L, 5); - data.put(-2345678901L, 7); - data.put(-234567890L, 4); - data.put(-234567890, 7); - data.put(-23456789, 7); - data.put(-2345678, 0); - data.put(-234567, 6); - data.put(-23456, 6); - data.put(-2345, 6); - data.put(-234, 7); - data.put(-23, 5); - data.put(-2, 4); - - data.put(0x80000000, 4); - data.put(0x7fffffff, 7); - data.put(0x8000000000000000L, 4); - data.put(0x7fffffffffffffffL, 4); - - data.put(+1.1, 3); - data.put(-10.01, 4); - data.put(+100.001, 4); - data.put(-1000.0001, 4); - data.put(+1.7976931348623157E+308, 6); - data.put(-1.7976931348623157E+308, 6); - data.put(+4.9E-324, 7); - data.put(-4.9E-324, 7); - - boolean ok = true; - - for (Map.Entry<Object, Integer> entry : data.entrySet()) { - UUID exp = nodes.get(entry.getValue()).nodeId(); - UUID act = aff.node(entry.getKey(), nodes).nodeId(); - - if (exp.equals(act)) - continue; - - ok = false; - - info("Failed to validate affinity for key '" + entry.getKey() + "' [expected=" + exp + - ", actual=" + act + "."); - } - - if (ok) - return; - - fail("Client partitioned affinity validation fails."); - } - - /** - * Test predefined affinity - must be ported to other clients. - */ - @SuppressWarnings("UnaryPlus") - public void testPredefinedHashIdResolver() throws Exception { - // Use Md5 hasher for this test. - GridClientPartitionAffinity aff = new GridClientPartitionAffinity(); - - getTestResources().inject(aff); - - aff.setHashIdResolver(new GridClientPartitionAffinity.HashIdResolver() { - @Override public Object getHashId(GridClientNode node) { - return node.replicaCount(); - } - }); - - List<GridClientNode> nodes = new ArrayList<>(); - - nodes.add(createNode("000ea4cd-f449-4dcb-869a-5317c63bd619", 50)); - nodes.add(createNode("010ea4cd-f449-4dcb-869a-5317c63bd62a", 60)); - nodes.add(createNode("0209ec54-ff53-4fdb-8239-5a3ac1fb31bd", 70)); - nodes.add(createNode("0309ec54-ff53-4fdb-8239-5a3ac1fb31ef", 80)); - nodes.add(createNode("040c9b94-02ae-45a6-9d5c-a066dbdf2636", 90)); - nodes.add(createNode("050c9b94-02ae-45a6-9d5c-a066dbdf2747", 100)); - nodes.add(createNode("0601f916-4357-4cfe-a7df-49d4721690bf", 110)); - nodes.add(createNode("0702f916-4357-4cfe-a7df-49d4721691c0", 120)); - - Map<Object, Integer> data = new LinkedHashMap<>(); - - data.put("", 4); - data.put("asdf", 3); - data.put("224ea4cd-f449-4dcb-869a-5317c63bd619", 5); - data.put("fdc9ec54-ff53-4fdb-8239-5a3ac1fb31bd", 2); - data.put("0f9c9b94-02ae-45a6-9d5c-a066dbdf2636", 2); - data.put("d8f1f916-4357-4cfe-a7df-49d4721690bf", 4); - data.put("c77ffeae-78a1-4ee6-a0fd-8d197a794412", 3); - data.put("35de9f21-3c9b-4f4a-a7d5-3e2c6cb01564", 4); - data.put("d67eb652-4e76-47fb-ad4e-cd902d9b868a", 2); - - data.put(0, 4); - data.put(1, 1); - data.put(12, 7); - data.put(123, 1); - data.put(1234, 6); - data.put(12345, 2); - data.put(123456, 5); - data.put(1234567, 4); - data.put(12345678, 6); - data.put(123456789, 3); - data.put(1234567890, 3); - data.put(1234567890L, 3); - data.put(12345678901L, 0); - data.put(123456789012L, 1); - data.put(1234567890123L, 3); - data.put(12345678901234L, 5); - data.put(123456789012345L, 5); - data.put(1234567890123456L, 7); - data.put(-23456789012345L, 6); - data.put(-2345678901234L, 4); - data.put(-234567890123L, 3); - data.put(-23456789012L, 0); - data.put(-2345678901L, 4); - data.put(-234567890L, 5); - data.put(-234567890, 3); - data.put(-23456789, 3); - data.put(-2345678, 6); - data.put(-234567, 4); - data.put(-23456, 5); - data.put(-2345, 2); - data.put(-234, 7); - data.put(-23, 6); - data.put(-2, 6); - - data.put(0x80000000, 7); - data.put(0x7fffffff, 1); - data.put(0x8000000000000000L, 7); - data.put(0x7fffffffffffffffL, 7); - - data.put(+1.1, 2); - data.put(-10.01, 0); - data.put(+100.001, 2); - data.put(-1000.0001, 0); - data.put(+1.7976931348623157E+308, 6); - data.put(-1.7976931348623157E+308, 1); - data.put(+4.9E-324, 1); - data.put(-4.9E-324, 1); - - boolean ok = true; - - for (Map.Entry<Object, Integer> entry : data.entrySet()) { - UUID exp = nodes.get(entry.getValue()).nodeId(); - UUID act = aff.node(entry.getKey(), nodes).nodeId(); - - if (exp.equals(act)) - continue; - - ok = false; - - info("Failed to validate affinity for key '" + entry.getKey() + "' [expected=" + exp + - ", actual=" + act + "."); - } - - if (ok) - return; - - fail("Client partitioned affinity validation fails."); - } - - /** - * Create node with specified node id and replica count. - * - * @param nodeId Node id. - * @param replicaCnt Node partitioned affinity replica count. - * @return New node with specified node id and replica count. - */ - private GridClientNode createNode(String nodeId, int replicaCnt) { - return GridClientNodeImpl.builder() - .nodeId(UUID.fromString(nodeId)) - .replicaCount(replicaCnt) - .build(); - } - - /** - * Validate client partitioned affinity and cache partitioned affinity produce the same result. - * - * @throws Exception On any exception. - */ - public void testReplicas() throws Exception { - // Emulate nodes in topology. - Collection<GridClientNode> nodes = new ArrayList<>(); - Collection<ClusterNode> srvNodes = new ArrayList<>(); - - // Define affinities to test. - GridClientPartitionAffinity aff = new GridClientPartitionAffinity(); - - getTestResources().inject(aff); - - aff.setHashIdResolver(HASH_ID_RSLVR); - - CacheConsistentHashAffinityFunction srvAff = new CacheConsistentHashAffinityFunction(); - - getTestResources().inject(srvAff); - - srvAff.setHashIdResolver(new CacheAffinityNodeIdHashResolver()); - - // Define keys to test affinity for. - Collection<String> keys = new ArrayList<>( - Arrays.asList("", "1", "12", "asdf", "Hadoop\u3092\u6bba\u3059")); - - for (int i = 0; i < 10; i++) - keys.add(UUID.randomUUID().toString()); - - // Test affinity behaviour on different topologies. - for (int i = 0; i < 20; i++) { - addNodes(1 + (int)Math.round(Math.random() * 50), nodes, srvNodes); - - for (String key : keys) - assertSameAffinity(key, aff, srvAff, nodes, srvNodes); - } - } - - /** - * Add {@code cnt} nodes into emulated topology. - * - * @param cnt Number of nodes to add into emulated topology. - * @param nodes Client topology. - * @param srvNodes Server topology. - */ - private void addNodes(int cnt, Collection<GridClientNode> nodes, Collection<ClusterNode> srvNodes) { - while (cnt-- > 0) { - UUID nodeId = UUID.randomUUID(); - int replicaCnt = (int)Math.round(Math.random() * 500) + 1; - - nodes.add(GridClientNodeImpl.builder() - .nodeId(nodeId) - .replicaCount(replicaCnt) - .build()); - - ClusterNode srvNode = new TestRichNode(nodeId, replicaCnt); - - srvNodes.add(srvNode); - } - } - - /** - * Compare server and client affinity for specified key in current topology. - * - * @param key Key to validate affinity for. - * @param aff Client affinity. - * @param srvAff Server affinity. - * @param nodes Client topology. - * @param srvNodes Server topology. - */ - private void assertSameAffinity(Object key, GridClientDataAffinity aff, CacheAffinityFunction srvAff, - Collection<? extends GridClientNode> nodes, Collection<ClusterNode> srvNodes) { - GridClientNode node = aff.node(key, nodes); - int part = srvAff.partition(key); - - CacheAffinityFunctionContext ctx = new GridCacheAffinityFunctionContextImpl(new ArrayList<>(srvNodes), - null, null, 1, 0); - - ClusterNode srvNode = F.first(srvAff.assignPartitions(ctx).get(part)); - - if (node == null) - assertNull(srvNode); - else { - assertNotNull(srvNode); - assertEquals(node.nodeId(), srvNode.id()); - } - } - - /** - * Rich node stub to use in emulated server topology. - */ - private static class TestRichNode extends GridTestNode { - /** - * Node id. - */ - private final UUID nodeId; - - /** - * Partitioned affinity replicas count. - */ - private final Integer replicaCnt; - - /** - * Externalizable class requires public no-arg constructor. - */ - @SuppressWarnings("UnusedDeclaration") - public TestRichNode() { - this(UUID.randomUUID(), DFLT_REPLICA_COUNT); - } - - /** - * Constructs rich node stub to use in emulated server topology. - * - * @param nodeId Node id. - * @param replicaCnt Partitioned affinity replicas count. - */ - private TestRichNode(UUID nodeId, int replicaCnt) { - this.nodeId = nodeId; - this.replicaCnt = replicaCnt; - } - - /** {@inheritDoc} */ - @Override public UUID id() { - return nodeId; - } - - /** {@inheritDoc} */ - @Override public <T> T attribute(String name) { - if (DFLT_REPLICA_COUNT_ATTR_NAME.equals(name)) - return (T)replicaCnt; - - return super.attribute(name); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/client/impl/ClientPropertiesConfigurationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/client/impl/ClientPropertiesConfigurationSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/client/impl/ClientPropertiesConfigurationSelfTest.java deleted file mode 100644 index 247fe34..0000000 --- a/modules/clients/src/test/java/org/apache/ignite/client/impl/ClientPropertiesConfigurationSelfTest.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.client.impl; - -import org.apache.commons.io.*; -import org.apache.ignite.client.*; -import org.apache.ignite.internal.client.*; -import org.apache.ignite.internal.client.balancer.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.common.*; -import org.springframework.context.support.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -import static org.apache.ignite.client.GridClientConfiguration.*; - -/** - * Properties-based configuration self test. - */ -public class ClientPropertiesConfigurationSelfTest extends GridCommonAbstractTest { - /** - * Grid client spring configuration. - */ - private static final URL GRID_CLIENT_SPRING_CONFIG; - - /** - * Grid client properties-based configuration. - */ - private static final URL GRID_CLIENT_CONFIG; - - /** - * - */ - static { - GRID_CLIENT_SPRING_CONFIG = - U.resolveGridGainUrl("/modules/clients/config/grid-client-spring-config.xml"); - - GRID_CLIENT_CONFIG = U.resolveGridGainUrl("/modules/clients/config/grid-client-config.properties"); - } - - /** - * Test client configuration loaded from the properties. - * - * @throws Exception In case of exception. - */ - public void testCreation() throws Exception { - // Validate default configuration. - GridClientConfiguration cfg = new GridClientConfiguration(); - - cfg.setServers(Arrays.asList("localhost:11211")); - - validateConfig(0, cfg); - - // Validate default properties-based configuration. - cfg = new GridClientConfiguration(); - - cfg.setServers(Arrays.asList("localhost:11211")); - - validateConfig(0, cfg); - - // Validate loaded configuration. - Properties props = loadProperties(1, GRID_CLIENT_CONFIG); - validateConfig(0, new GridClientConfiguration(props)); - - // Validate loaded configuration with changed key prefixes. - Properties props2 = new Properties(); - - for (Map.Entry<Object, Object> e : props.entrySet()) - props2.put("new." + e.getKey(), e.getValue()); - - validateConfig(0, new GridClientConfiguration("new.gg.client", props2)); - validateConfig(0, new GridClientConfiguration("new.gg.client.", props2)); - - // Validate loaded test configuration. - File tmp = uncommentProperties(GRID_CLIENT_CONFIG); - - props = loadProperties(25, tmp.toURI().toURL()); - validateConfig(2, new GridClientConfiguration(props)); - - // Validate loaded test configuration with changed key prefixes. - props2 = new Properties(); - - for (Map.Entry<Object, Object> e : props.entrySet()) - props2.put("new." + e.getKey(), e.getValue()); - - validateConfig(2, new GridClientConfiguration("new.gg.client", props2)); - validateConfig(2, new GridClientConfiguration("new.gg.client.", props2)); - - // Validate loaded test configuration with empty key prefixes. - props2 = new Properties(); - - for (Map.Entry<Object, Object> e : props.entrySet()) - props2.put(e.getKey().toString().replace("gg.client.", ""), e.getValue()); - - validateConfig(2, new GridClientConfiguration("", props2)); - validateConfig(2, new GridClientConfiguration(".", props2)); - } - - /** - * Validate spring client configuration. - * - * @throws Exception In case of any exception. - */ - public void testSpringConfig() throws Exception { - GridClientConfiguration cfg = new FileSystemXmlApplicationContext( - GRID_CLIENT_SPRING_CONFIG.toString()).getBean(GridClientConfiguration.class); - - assertEquals(Arrays.asList("127.0.0.1:11211"), new ArrayList<>(cfg.getServers())); - assertNull(cfg.getSecurityCredentialsProvider()); - - Collection<GridClientDataConfiguration> dataCfgs = cfg.getDataConfigurations(); - - assertEquals(1, dataCfgs.size()); - - GridClientDataConfiguration dataCfg = dataCfgs.iterator().next(); - - assertEquals("partitioned", dataCfg.getName()); - - assertNotNull(dataCfg.getPinnedBalancer()); - assertEquals(GridClientRandomBalancer.class, dataCfg.getPinnedBalancer().getClass()); - - assertNotNull(dataCfg.getAffinity()); - assertEquals(GridClientPartitionAffinity.class, dataCfg.getAffinity().getClass()); - } - - /** - * Uncomment properties. - * - * @param url Source to uncomment client properties for. - * @return Temporary file with uncommented client properties. - * @throws IOException In case of IO exception. - */ - private File uncommentProperties(URL url) throws IOException { - InputStream in = url.openStream(); - - assertNotNull(in); - - LineIterator it = IOUtils.lineIterator(in, "UTF-8"); - Collection<String> lines = new ArrayList<>(); - - while (it.hasNext()) - lines.add(it.nextLine().replace("#gg.client.", "gg.client.")); - - IgniteUtils.closeQuiet(in); - - File tmp = File.createTempFile(UUID.randomUUID().toString(), "properties"); - - tmp.deleteOnExit(); - - FileUtils.writeLines(tmp, lines); - - return tmp; - } - - /** - * Load properties from the url. - * - * @param expLoaded Expected number of loaded properties. - * @param url URL to load properties from. - * @return Loaded properties. - * @throws IOException In case of IO exception. - */ - private Properties loadProperties(int expLoaded, URL url) throws IOException { - InputStream in = url.openStream(); - - Properties props = new Properties(); - - assertEquals(0, props.size()); - - props.load(in); - - assertEquals(expLoaded, props.size()); - - IgniteUtils.closeQuiet(in); - - return props; - } - - /** - * Validate loaded configuration. - * - * @param expDataCfgs Expected data configurations count. - * @param cfg Client configuration to validate. - */ - private void validateConfig(int expDataCfgs, GridClientConfiguration cfg) { - assertEquals(GridClientRandomBalancer.class, cfg.getBalancer().getClass()); - assertEquals(10000, cfg.getConnectTimeout()); - assertEquals(null, cfg.getSecurityCredentialsProvider()); - - assertEquals(expDataCfgs, cfg.getDataConfigurations().size()); - - if (expDataCfgs == 2) { - GridClientDataConfiguration nullCfg = cfg.getDataConfiguration(null); - - assertEquals(null, nullCfg.getName()); - assertEquals(null, nullCfg.getAffinity()); - assertEquals(GridClientRandomBalancer.class, nullCfg.getPinnedBalancer().getClass()); - - GridClientDataConfiguration partCfg = cfg.getDataConfiguration("partitioned"); - - assertEquals("partitioned", partCfg.getName()); - assertEquals(GridClientPartitionAffinity.class, partCfg.getAffinity().getClass()); - assertEquals(GridClientRoundRobinBalancer.class, partCfg.getPinnedBalancer().getClass()); - } - - assertEquals(DFLT_MAX_CONN_IDLE_TIME, cfg.getMaxConnectionIdleTime()); - assertEquals(GridClientProtocol.TCP, cfg.getProtocol()); - assertEquals(Arrays.asList("localhost:11211"), new ArrayList<>(cfg.getServers())); - assertEquals(true, cfg.isEnableAttributesCache()); - assertEquals(true, cfg.isEnableMetricsCache()); - assertEquals(true, cfg.isTcpNoDelay()); - assertEquals(null, cfg.getSslContextFactory(), null); - assertEquals(DFLT_TOP_REFRESH_FREQ, cfg.getTopologyRefreshFrequency()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/client/integration/ClientAbstractConnectivitySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/client/integration/ClientAbstractConnectivitySelfTest.java b/modules/clients/src/test/java/org/apache/ignite/client/integration/ClientAbstractConnectivitySelfTest.java deleted file mode 100644 index ec2c271..0000000 --- a/modules/clients/src/test/java/org/apache/ignite/client/integration/ClientAbstractConnectivitySelfTest.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.client.integration; - -import org.apache.ignite.*; -import org.apache.ignite.client.*; -import org.apache.ignite.internal.client.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.net.*; -import java.util.*; - -/** - * Tests the REST client-server connectivity with various configurations. - */ -public abstract class ClientAbstractConnectivitySelfTest extends GridCommonAbstractTest { - /** */ - private static final String WILDCARD_IP = "0.0.0.0"; - - /** */ - private static final String LOOPBACK_IP = "127.0.0.1"; - - /** - * @return IP addresses. - * @throws Exception If failed. - */ - private static IgniteBiTuple<Collection<String>, Collection<String>> getAllIps() throws Exception { - return U.resolveLocalAddresses(InetAddress.getByName("0.0.0.0")); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - GridClientFactory.stopAll(); - - G.stopAll(true); - } - - /** - * Starts a REST-enabled node. - * - * @param name Node name. - * @param addr REST address (default if null). - * @param port REST port (default if null). - * @return Started node. - * @throws Exception If case of configuration or startup error. - */ - protected abstract Ignite startRestNode(String name, @Nullable String addr, @Nullable Integer port) throws Exception; - - /** - * @return Default REST port. - */ - protected abstract int defaultRestPort(); - - /** - * @return REST address attribute name. - */ - protected abstract String restAddressAttributeName(); - - /** - * @return REST host name attribute name. - */ - protected abstract String restHostNameAttributeName(); - - /** - * @return REST port attribute name. - */ - protected abstract String restPortAttributeName(); - - /** - * @return REST protocol. - */ - protected abstract GridClientProtocol protocol(); - - /** - * Starts a REST client. - * - * @param addr REST server address. - * @param port REST server port. - * @return A successfully started REST client. - * @throws GridClientException If failed to start REST client. - */ - protected GridClient startClient(String addr, int port) throws GridClientException { - GridClientConfiguration cliCfg = new GridClientConfiguration(); - cliCfg.setServers(Collections.singleton(addr + ":" + port)); - cliCfg.setProtocol(protocol()); - - return GridClientFactory.start(cliCfg); - } - - /** - * Tests correct behavior in case of 1 REST-enabled node - * with default settings. - * - * @throws Exception If failed. - */ - public void testOneNodeDefaultHostAndPort() throws Exception { - startRestNode("grid1", null, null); - - checkConnectivityByIp(LOOPBACK_IP, getAllIps()); - - String extIp = F.find(U.allLocalIps(), null, new IpV4AddressPredicate()); - - checkConnectivityByIp(extIp, getAllIps()); - } - - /** - * Tests correct behavior in case of 1 REST-enabled node - * with explicitly specified loopback address setting. - * - * @throws Exception If error occurs. - */ - public void testOneNodeLoopbackHost() throws Exception { - startRestNode("grid1", LOOPBACK_IP, defaultRestPort()); - - checkConnectivityByIp(LOOPBACK_IP, F.t((Collection<String>)Collections.singleton(LOOPBACK_IP), - (Collection<String>)Collections.singleton(""))); - } - - /** - * Tests correct behavior in case of 1 REST-enabled node - * with explicitly specified 0.0.0.0 address. - * - * @throws Exception If error occurs. - */ - public void testOneNodeZeroIpv4Address() throws Exception { - startRestNode("grid1", WILDCARD_IP, defaultRestPort()); - - Collection<String> addrs = new LinkedList<>(); - - addrs.add(LOOPBACK_IP); - - Collection<String> nonLoopbackAddrs = U.allLocalIps(); - - assertNotNull(nonLoopbackAddrs); - - addrs.addAll(F.view(nonLoopbackAddrs, new IpV4AddressPredicate())); - - // The client should be able to connect through all IPv4 addresses. - for (String addr : addrs) { - log.info("Trying address: " + addr); - - GridClient cli = startClient(addr, defaultRestPort()); - - List<GridClientNode> nodes = cli.compute().refreshTopology(true, false); - - assertEquals(1, nodes.size()); - - GridClientNode node = F.first(nodes); - - assertNotNull(node); - - assertEquals(getAllIps().get1(), node.attribute(restAddressAttributeName())); - assertEquals(getAllIps().get2(), node.attribute(restHostNameAttributeName())); - - List<String> nodeAddrs = node.tcpAddresses(); - - assertTrue(nodeAddrs.contains(LOOPBACK_IP)); - - assertTrue(F.containsAll(nodeAddrs, addrs)); - } - } - - /** - * Tests correct behavior in case of 2 REST-enabled nodes with default - * settings. - * - * @throws Exception If error occurs. - */ - public void testTwoNodesDefaultHostAndPort() throws Exception { - startRestNode("grid1", null, null); - startRestNode("grid2", null, null); - - GridClient cli = startClient(LOOPBACK_IP, defaultRestPort()); - - List<GridClientNode> nodes = cli.compute().refreshTopology(true, false); - - assertEquals(2, nodes.size()); - - assertTrue(F.forAll(nodes, new P1<GridClientNode>() { - @Override public boolean apply(GridClientNode node) { - return node.tcpAddresses().contains(LOOPBACK_IP); - } - })); - - GridTestUtils.assertOneToOne( - nodes, - new P1<GridClientNode>() { - @Override public boolean apply(GridClientNode node) { - try { - return eqAddresses(getAllIps(), node) && - Integer.valueOf(defaultRestPort()).equals(node.attribute(restPortAttributeName())); - } - catch (Exception ignored) { - return false; - } - } - }, - new P1<GridClientNode>() { - @Override public boolean apply(GridClientNode node) { - try { - return eqAddresses(getAllIps(), node) && - Integer.valueOf(defaultRestPort() + 1).equals(node.attribute(restPortAttributeName())); - } - catch (Exception ignored) { - return false; - } - } - } - ); - } - - /** - * Tests correct behavior in case of shutdown node used to refresh topology state. - * - * @throws Exception If error occurs. - */ - public void testRefreshTopologyOnNodeLeft() throws Exception { - startRestNode("grid1", null, null); - startRestNode("grid2", null, null); - - GridClient cli = startClient(LOOPBACK_IP, defaultRestPort()); - - List<GridClientNode> nodes = cli.compute().refreshTopology(true, false); - - assertEquals(2, nodes.size()); - - stopGrid("grid1"); - - nodes = cli.compute().refreshTopology(true, false); - - assertEquals(1, nodes.size()); - - startRestNode("grid3", null, null); - - nodes = cli.compute().refreshTopology(true, false); - - assertEquals(2, nodes.size()); - - stopGrid("grid2"); - - nodes = cli.compute().refreshTopology(true, false); - - assertEquals(1, nodes.size()); - } - - /** - * @param connectIp IP to test. - * @param nodeIp Expected IP reported to client. - * @throws GridClientException If failed. - */ - private void checkConnectivityByIp(String connectIp, IgniteBiTuple<Collection<String>, Collection<String>> nodeIp) - throws GridClientException { - GridClient cli = startClient(connectIp, defaultRestPort()); - - List<GridClientNode> nodes = cli.compute().refreshTopology(true, false); - - assertEquals(1, nodes.size()); - - GridClientNode node = F.first(nodes); - - assertNotNull(node); - assertTrue(eqAddresses(nodeIp, node)); - } - - /** - * @param nodeIp Node ip. - * @param node Node. - * @return {@code True} if addresses are equal, {@code false} otherwise. - */ - private boolean eqAddresses(IgniteBiTuple<Collection<String>, Collection<String>> nodeIp, GridClientNode node) { - return F.eqOrdered(nodeIp.get1(), (Collection<String>)(node.attribute(restAddressAttributeName()))) && - F.eqOrdered(nodeIp.get2(), (Collection<String>)(node.attribute(restHostNameAttributeName()))); - } - - /** - * Predicate that returns IPv4 address strings. - */ - private static class IpV4AddressPredicate implements P1<String> { - /** {@inheritDoc} */ - @Override public boolean apply(String s) { - return s.matches("\\d+\\.\\d+\\.\\d+\\.\\d+"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/client/integration/ClientAbstractMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/client/integration/ClientAbstractMultiNodeSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/client/integration/ClientAbstractMultiNodeSelfTest.java deleted file mode 100644 index bf3250b..0000000 --- a/modules/clients/src/test/java/org/apache/ignite/client/integration/ClientAbstractMultiNodeSelfTest.java +++ /dev/null @@ -1,858 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.client.integration; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.client.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.client.*; -import org.apache.ignite.internal.client.balancer.*; -import org.apache.ignite.internal.client.ssl.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.*; -import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.apache.ignite.spi.communication.tcp.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.managers.communication.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; - -/** - * Tests basic client behavior with multiple nodes. - */ -@SuppressWarnings("ThrowableResultOfMethodCallIgnored") -public abstract class ClientAbstractMultiNodeSelfTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Partitioned cache name. */ - private static final String PARTITIONED_CACHE_NAME = "partitioned"; - - /** Replicated cache name. */ - private static final String REPLICATED_CACHE_NAME = "replicated"; - - /** Replicated async cache name. */ - private static final String REPLICATED_ASYNC_CACHE_NAME = "replicated_async"; - - /** Nodes count. */ - public static final int NODES_CNT = 5; - - /** - * Topology update frequency. - * Set it longer than router's, so we wouldn't receive failures - * caused only by obsolete topology on router. - */ - static final int TOP_REFRESH_FREQ = 2500; - - /** Path to jetty config. */ - public static final String REST_JETTY_CFG = "modules/clients/src/test/resources/jetty/rest-jetty.xml"; - - /** Path to jetty config with SSl enabled. */ - public static final String REST_JETTY_SSL_CFG = "modules/clients/src/test/resources/jetty/rest-jetty-ssl.xml"; - - /** Host. */ - public static final String HOST = "127.0.0.1"; - - /** Base for tcp rest ports. */ - public static final int REST_TCP_PORT_BASE = 12345; - - /** Base for http rest ports, defined in {@link #REST_JETTY_CFG}. */ - public static final int REST_HTTP_PORT_BASE = 11080; - - /** Base for https rest ports, defined in {@link #REST_JETTY_SSL_CFG}. */ - public static final int REST_HTTPS_PORT_BASE = 11443; - - /** */ - private static volatile boolean commSpiEnabled; - - /** Flag to enable REST in node configuration. */ - private boolean restEnabled = true; - - /** Client instance for each test. */ - private GridClient client; - - /** - * @return Client protocol that should be used. - */ - protected abstract GridClientProtocol protocol(); - - /** - * @return Server address to create first connection. - */ - protected abstract String serverAddress(); - - /** - * @return SSL context factory to use if SSL or {@code null} to disable SSL usage. - */ - @Nullable protected GridSslContextFactory sslContextFactory() { - return null; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration c = super.getConfiguration(gridName); - - c.setLocalHost(HOST); - - assert c.getClientConnectionConfiguration() == null; - - if (restEnabled) { - ClientConnectionConfiguration clientCfg = new ClientConnectionConfiguration(); - - clientCfg.setRestTcpPort(REST_TCP_PORT_BASE); - - GridSslContextFactory sslCtxFactory = sslContextFactory(); - - if (sslCtxFactory != null) { - clientCfg.setRestTcpSslEnabled(true); - clientCfg.setRestTcpSslContextFactory(sslCtxFactory); - } - - c.setClientConnectionConfiguration(clientCfg); - } - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(IP_FINDER); - - c.setDiscoverySpi(disco); - - TestCommunicationSpi spi = new TestCommunicationSpi(); - - spi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); - - c.setCommunicationSpi(spi); - - c.setCacheConfiguration(cacheConfiguration(null), cacheConfiguration(PARTITIONED_CACHE_NAME), - cacheConfiguration(REPLICATED_CACHE_NAME), cacheConfiguration(REPLICATED_ASYNC_CACHE_NAME)); - - ThreadPoolExecutor exec = new ThreadPoolExecutor( - 40, - 40, - 0, - MILLISECONDS, - new LinkedBlockingQueue<Runnable>()); - - exec.prestartAllCoreThreads(); - - c.setExecutorService(exec); - - c.setExecutorServiceShutdown(true); - - ThreadPoolExecutor sysExec = new ThreadPoolExecutor( - 40, - 40, - 0, - MILLISECONDS, - new LinkedBlockingQueue<Runnable>()); - - sysExec.prestartAllCoreThreads(); - - c.setSystemExecutorService(sysExec); - - c.setSystemExecutorServiceShutdown(true); - - return c; - } - - /** - * @param cacheName Cache name. - * @return Cache configuration. - * @throws Exception In case of error. - */ - private CacheConfiguration cacheConfiguration(@Nullable String cacheName) throws Exception { - CacheConfiguration cfg = defaultCacheConfiguration(); - - cfg.setAtomicityMode(TRANSACTIONAL); - cfg.setDistributionMode(NEAR_PARTITIONED); - - if (cacheName == null) - cfg.setCacheMode(LOCAL); - else if (PARTITIONED_CACHE_NAME.equals(cacheName)) { - cfg.setCacheMode(PARTITIONED); - - cfg.setBackups(0); - } - else - cfg.setCacheMode(REPLICATED); - - cfg.setName(cacheName); - - cfg.setWriteSynchronizationMode(REPLICATED_ASYNC_CACHE_NAME.equals(cacheName) ? FULL_ASYNC : FULL_SYNC); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - startGridsMultiThreaded(NODES_CNT); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - info("Stopping grids."); - - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - client = GridClientFactory.start(clientConfiguration()); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - if (client != null) { - GridClientFactory.stop(client.id(), false); - - client = null; - } - } - - /** - * @throws Exception If failed. - */ - public void testSyncCommitRollbackFlags() throws Exception { - commSpiEnabled = true; - - try { - GridClientData data = client.data(REPLICATED_ASYNC_CACHE_NAME); - - info("Before put x1"); - - data.put("x1", "y1"); - - info("Before put x2"); - - data.flagsOn(GridClientCacheFlag.SYNC_COMMIT).put("x2", "y2"); - - info("Before put x3"); - - data.put("x3", "y3"); - - info("Before put x4"); - - data.flagsOn(GridClientCacheFlag.SYNC_COMMIT).put("x4", "y4"); - } - finally { - commSpiEnabled = false; - } - } - - /** - * @throws Exception If failed. - */ - public void testEmptyProjections() throws Exception { - final GridClientCompute dflt = client.compute(); - - Collection<? extends GridClientNode> nodes = dflt.nodes(); - - assertEquals(NODES_CNT, nodes.size()); - - Iterator<? extends GridClientNode> iter = nodes.iterator(); - - final GridClientCompute singleNodePrj = dflt.projection(Collections.singletonList(iter.next())); - - final GridClientNode second = iter.next(); - - final GridClientPredicate<GridClientNode> noneFilter = new GridClientPredicate<GridClientNode>() { - @Override public boolean apply(GridClientNode node) { - return false; - } - }; - - final GridClientPredicate<GridClientNode> targetFilter = new GridClientPredicate<GridClientNode>() { - @Override public boolean apply(GridClientNode node) { - return node.nodeId().equals(second.nodeId()); - } - }; - - GridTestUtils.assertThrows(log(), new Callable<Object>() { - @Override public Object call() throws Exception { - return dflt.projection(noneFilter).log(-1, -1); - } - }, GridServerUnreachableException.class, null); - - GridTestUtils.assertThrows(log(), new Callable<Object>() { - @Override public Object call() throws Exception { - return singleNodePrj.projection(second); - } - }, GridClientException.class, null); - - GridTestUtils.assertThrows(log(), new Callable<Object>() { - @Override - public Object call() throws Exception { - return singleNodePrj.projection(targetFilter); - } - }, GridClientException.class, null); - } - - /** - * @throws Exception If failed. - */ - public void testProjectionRun() throws Exception { - GridClientCompute dflt = client.compute(); - - Collection<? extends GridClientNode> nodes = dflt.nodes(); - - assertEquals(NODES_CNT, nodes.size()); - - for (int i = 0; i < NODES_CNT; i++) { - Ignite g = grid(i); - - assert g != null; - - GridClientNode clientNode = dflt.node(g.cluster().localNode().id()); - - assertNotNull("Client node for " + g.cluster().localNode().id() + " was not found", clientNode); - - GridClientCompute prj = dflt.projection(clientNode); - - String res = prj.execute(TestTask.class.getName(), null); - - assertNotNull(res); - - assertEquals(g.cluster().localNode().id().toString(), res); - } - } - - /** - * @throws Exception If failed. - */ - public void testAffinityExecute() throws Exception { - GridClientCompute dflt = client.compute(); - - GridClientData data = client.data(PARTITIONED_CACHE_NAME); - - Collection<? extends GridClientNode> nodes = dflt.nodes(); - - assertEquals(NODES_CNT, nodes.size()); - - for (int i = 0; i < NODES_CNT; i++) { - Ignite g = grid(i); - - assert g != null; - - int affinityKey = -1; - - for (int key = 0; key < 10000; key++) { - if (g.cluster().localNode().id().equals(data.affinity(key))) { - affinityKey = key; - - break; - } - } - - if (affinityKey == -1) - throw new Exception("Unable to found key for which node is primary: " + g.cluster().localNode().id()); - - GridClientNode clientNode = dflt.node(g.cluster().localNode().id()); - - assertNotNull("Client node for " + g.cluster().localNode().id() + " was not found", clientNode); - - String res = dflt.affinityExecute(TestTask.class.getName(), PARTITIONED_CACHE_NAME, affinityKey, null); - - assertNotNull(res); - - assertEquals(g.cluster().localNode().id().toString(), res); - } - } - - /** - * @throws Exception If failed. - */ - public void testInvalidateFlag() throws Exception { - IgniteEx g0 = grid(0); - - GridCache<String, String> cache = g0.cache(PARTITIONED_CACHE_NAME); - - String key = null; - - for (int i = 0; i < 10_000; i++) { - if (!cache.affinity().isPrimaryOrBackup(g0.localNode(), String.valueOf(i))) { - key = String.valueOf(i); - - break; - } - } - - assertNotNull(key); - - cache.put(key, key); // Create entry in near cache, it is invalidated if INVALIDATE flag is set. - - assertNotNull(cache.peek(key)); - - GridClientData d = client.data(PARTITIONED_CACHE_NAME); - - d.flagsOn(GridClientCacheFlag.INVALIDATE).put(key, "zzz"); - - for (Ignite g : G.allGrids()) { - cache = g.cache(PARTITIONED_CACHE_NAME); - - if (cache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key)) - assertEquals("zzz", cache.peek(key)); - else - assertNull(cache.peek(key)); - } - } - - /** - * @throws Exception If failed. - */ - public void testClientAffinity() throws Exception { - GridClientData partitioned = client.data(PARTITIONED_CACHE_NAME); - - Collection<Object> keys = new ArrayList<>(); - - keys.addAll(Arrays.asList( - Boolean.TRUE, - Boolean.FALSE, - 1, - Integer.MAX_VALUE - )); - - Random rnd = new Random(); - StringBuilder sb = new StringBuilder(); - - // Generate some random strings. - for (int i = 0; i < 100; i++) { - sb.setLength(0); - - for (int j = 0; j < 255; j++) - // Only printable ASCII symbols for test. - sb.append((char)(rnd.nextInt(0x7f - 0x20) + 0x20)); - - keys.add(sb.toString()); - } - - // Generate some more keys to achieve better coverage. - for (int i = 0; i < 100; i++) - keys.add(UUID.randomUUID()); - - for (Object key : keys) { - UUID nodeId = grid(0).mapKeyToNode(PARTITIONED_CACHE_NAME, key).id(); - - UUID clientNodeId = partitioned.affinity(key); - - assertEquals("Invalid affinity mapping for REST response for key: " + key, nodeId, clientNodeId); - } - } - - /** - * @throws Exception If failed. - */ - public void testTopologyListener() throws Exception { - final Collection<UUID> added = new ArrayList<>(1); - final Collection<UUID> rmvd = new ArrayList<>(1); - - final CountDownLatch addedLatch = new CountDownLatch(1); - final CountDownLatch rmvLatch = new CountDownLatch(1); - - assertEquals(NODES_CNT, client.compute().refreshTopology(false, false).size()); - - GridClientTopologyListener lsnr = new GridClientTopologyListener() { - @Override public void onNodeAdded(GridClientNode node) { - added.add(node.nodeId()); - - addedLatch.countDown(); - } - - @Override public void onNodeRemoved(GridClientNode node) { - rmvd.add(node.nodeId()); - - rmvLatch.countDown(); - } - }; - - client.addTopologyListener(lsnr); - - try { - Ignite g = startGrid(NODES_CNT + 1); - - UUID id = g.cluster().localNode().id(); - - assertTrue(addedLatch.await(2 * TOP_REFRESH_FREQ, MILLISECONDS)); - - assertEquals(1, added.size()); - assertEquals(id, F.first(added)); - - stopGrid(NODES_CNT + 1); - - assertTrue(rmvLatch.await(2 * TOP_REFRESH_FREQ, MILLISECONDS)); - - assertEquals(1, rmvd.size()); - assertEquals(id, F.first(rmvd)); - } - finally { - client.removeTopologyListener(lsnr); - - stopGrid(NODES_CNT + 1); - } - } - - /** - * @throws Exception If failed. - */ - public void testDisabledRest() throws Exception { - restEnabled = false; - - final Ignite g = startGrid("disabled-rest"); - - try { - Thread.sleep(2 * TOP_REFRESH_FREQ); - - // As long as we have round robin load balancer this will cause every node to be queried. - for (int i = 0; i < NODES_CNT + 1; i++) - assertEquals(NODES_CNT + 1, client.compute().refreshTopology(false, false).size()); - - final GridClientData data = client.data(PARTITIONED_CACHE_NAME); - - // Check rest-disabled node is unavailable. - try { - String affKey; - - do { - affKey = UUID.randomUUID().toString(); - } while (!data.affinity(affKey).equals(g.cluster().localNode().id())); - - data.put(affKey, "asdf"); - - assertEquals("asdf", cache(0, PARTITIONED_CACHE_NAME).get(affKey)); - } - catch (GridServerUnreachableException e) { - // Thrown for direct client-node connections. - assertTrue("Unexpected exception message: " + e.getMessage(), - e.getMessage().startsWith("No available endpoints to connect (is rest enabled for this node?)")); - } - catch (GridClientException e) { - // Thrown for routed client-router-node connections. - String msg = e.getMessage(); - - assertTrue("Unexpected exception message: " + msg, protocol() == GridClientProtocol.TCP ? - msg.contains("No available endpoints to connect (is rest enabled for this node?)") : // TCP router. - msg.startsWith("No available nodes on the router for destination node ID")); // HTTP router. - } - - // Check rest-enabled nodes are available. - String affKey; - - do { - affKey = UUID.randomUUID().toString(); - } while (data.affinity(affKey).equals(g.cluster().localNode().id())); - - data.put(affKey, "fdsa"); - - assertEquals("fdsa", cache(0, PARTITIONED_CACHE_NAME).get(affKey)); - } - finally { - restEnabled = true; - - G.stop(g.name(), true); - } - } - - /** - * @throws Exception If failed. - */ - public void testAffinityPut() throws Exception { - Thread.sleep(2 * TOP_REFRESH_FREQ); - - assertEquals(NODES_CNT, client.compute().refreshTopology(false, false).size()); - - Map<UUID, Ignite> gridsByLocNode = new HashMap<>(NODES_CNT); - - GridClientData partitioned = client.data(PARTITIONED_CACHE_NAME); - - GridClientCompute compute = client.compute(); - - for (int i = 0; i < NODES_CNT; i++) - gridsByLocNode.put(grid(i).localNode().id(), grid(i)); - - for (int i = 0; i < 100; i++) { - String key = "key" + i; - - UUID primaryNodeId = grid(0).mapKeyToNode(PARTITIONED_CACHE_NAME, key).id(); - - assertEquals("Affinity mismatch for key: " + key, primaryNodeId, partitioned.affinity(key)); - - assertEquals(primaryNodeId, partitioned.affinity(key)); - - // Must go to primary node only. Since backup count is 0, value must present on - // primary node only. - partitioned.put(key, "val" + key); - - for (Map.Entry<UUID, Ignite> entry : gridsByLocNode.entrySet()) { - Object val = entry.getValue().cache(PARTITIONED_CACHE_NAME).peek(key); - - if (primaryNodeId.equals(entry.getKey())) - assertEquals("val" + key, val); - else - assertNull(val); - } - } - - // Now check that we will see value in near cache in pinned mode. - for (int i = 100; i < 200; i++) { - String pinnedKey = "key" + i; - - UUID primaryNodeId = grid(0).mapKeyToNode(PARTITIONED_CACHE_NAME, pinnedKey).id(); - - UUID pinnedNodeId = F.first(F.view(gridsByLocNode.keySet(), F.notEqualTo(primaryNodeId))); - - GridClientNode node = compute.node(pinnedNodeId); - - partitioned.pinNodes(node).put(pinnedKey, "val" + pinnedKey); - - for (Map.Entry<UUID, Ignite> entry : gridsByLocNode.entrySet()) { - Object val = entry.getValue().cache(PARTITIONED_CACHE_NAME).peek(pinnedKey); - - if (primaryNodeId.equals(entry.getKey()) || pinnedNodeId.equals(entry.getKey())) - assertEquals("val" + pinnedKey, val); - else - assertNull(val); - } - } - } - - /** - * @return Client configuration for the test. - */ - protected GridClientConfiguration clientConfiguration() throws GridClientException { - GridClientConfiguration cfg = new GridClientConfiguration(); - - cfg.setBalancer(getBalancer()); - - cfg.setTopologyRefreshFrequency(TOP_REFRESH_FREQ); - - cfg.setProtocol(protocol()); - cfg.setServers(Arrays.asList(serverAddress())); - cfg.setSslContextFactory(sslContextFactory()); - - GridClientDataConfiguration loc = new GridClientDataConfiguration(); - - GridClientDataConfiguration partitioned = new GridClientDataConfiguration(); - - partitioned.setName(PARTITIONED_CACHE_NAME); - partitioned.setAffinity(new GridClientPartitionAffinity()); - - GridClientDataConfiguration replicated = new GridClientDataConfiguration(); - replicated.setName(REPLICATED_CACHE_NAME); - - GridClientDataConfiguration replicatedAsync = new GridClientDataConfiguration(); - replicatedAsync.setName(REPLICATED_ASYNC_CACHE_NAME); - - cfg.setDataConfigurations(Arrays.asList(loc, partitioned, replicated, replicatedAsync)); - - return cfg; - } - - /** - * Gets client load balancer. - * - * @return Load balancer. - */ - protected GridClientLoadBalancer getBalancer() { - return new GridClientRoundRobinBalancer(); - } - - /** - * Test task. Returns a tuple in which first component is id of node that has split the task, - * and second component is count of nodes that executed jobs. - */ - private static class TestTask extends ComputeTaskSplitAdapter<Object, String> { - /** */ - @IgniteInstanceResource - private Ignite ignite; - - /** Count of tasks this job was split to. */ - private int gridSize; - - /** {@inheritDoc} */ - @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) - throws IgniteCheckedException { - Collection<ComputeJobAdapter> jobs = new ArrayList<>(gridSize); - - this.gridSize = gridSize; - - final String locNodeId = ignite.cluster().localNode().id().toString(); - - for (int i = 0; i < gridSize; i++) { - jobs.add(new ComputeJobAdapter() { - @SuppressWarnings("OverlyStrongTypeCast") - @Override public Object execute() { - try { - Thread.sleep(1000); - } - catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } - - return new IgniteBiTuple<>(locNodeId, 1); - } - }); - } - - return jobs; - } - - /** {@inheritDoc} */ - @Override public String reduce(List<ComputeJobResult> results) throws IgniteCheckedException { - int sum = 0; - - String locNodeId = null; - - for (ComputeJobResult res : results) { - IgniteBiTuple<String, Integer> part = res.getData(); - - if (locNodeId == null) - locNodeId = part.get1(); - - Integer i = part.get2(); - - if (i != null) - sum += i; - } - - assert gridSize == sum; - - return locNodeId; - } - } - - /** - * Communication SPI which checks cache flags. - */ - @SuppressWarnings("unchecked") - private static class TestCommunicationSpi extends TcpCommunicationSpi { - /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg) - throws IgniteSpiException { - checkSyncFlags((GridIoMessage)msg); - - super.sendMessage(node, msg); - } - - /** - * Check if flags in correct state. - * - * @param msg Message. - */ - private void checkSyncFlags(GridIoMessage msg) { - if (!commSpiEnabled) - return; - - Object o = msg.message(); - - if (!(o instanceof GridDistributedLockRequest)) - return; - - IgniteKernal g = (IgniteKernal)G.ignite(ignite.configuration().getNodeId()); - - GridCacheContext<Object, Object> cacheCtx = g.internalCache(REPLICATED_ASYNC_CACHE_NAME).context(); - - IgniteTxManager<Object, Object> tm = cacheCtx.tm(); - - GridCacheVersion v = ((GridCacheVersionable)o).version(); - - IgniteTxEx t = tm.tx(v); - - if (t.hasWriteKey(cacheCtx.txKey("x1"))) - assertFalse("Invalid tx flags: " + t, t.syncCommit()); - else if (t.hasWriteKey(cacheCtx.txKey("x2"))) - assertTrue("Invalid tx flags: " + t, t.syncCommit()); - else if (t.hasWriteKey(cacheCtx.txKey("x3"))) - assertFalse("Invalid tx flags: " + t, t.syncCommit()); - else if (t.hasWriteKey(cacheCtx.txKey("x4"))) - assertTrue("Invalid tx flags: " + t, t.syncCommit()); - } - } - - /** - * @throws Exception If failed. - */ - public void testMultithreadedCommand() throws Exception { - final GridClientData data = client.data(PARTITIONED_CACHE_NAME); - final GridClientCompute compute = client.compute(); - final AtomicInteger cnt = new AtomicInteger(0); - - multithreaded(new Callable<Object>() { - @Override public Object call() throws Exception { - for (int i = 0; i < 20; i++) { - String key = UUID.randomUUID().toString(); - String val = UUID.randomUUID().toString(); - - switch (cnt.incrementAndGet() % 4) { - case 0: { - assertTrue(data.put(key, val)); - assertEquals(val, data.get(key)); - assertTrue(data.remove(key)); - - break; - } - - case 1: { - assertNotNull(data.metrics()); - - break; - } - - case 2: { - String nodeId = compute.execute(TestTask.class.getName(), null); - - assertNotNull(nodeId); - assertNotNull(compute.refreshNode(UUID.fromString(nodeId), true, true)); - - break; - } - - case 3: { - assertEquals(NODES_CNT, compute.refreshTopology(true, true).size()); - - break; - } - } - } - - return null; - } - }, 50, "multithreaded-client-access"); - } -}