http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractConnectivitySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractConnectivitySelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractConnectivitySelfTest.java
new file mode 100644
index 0000000..a7b7d8b
--- /dev/null
+++ 
b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractConnectivitySelfTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.integration;
+
+import org.apache.ignite.*;
+import org.apache.ignite.client.*;
+import org.apache.ignite.internal.client.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.net.*;
+import java.util.*;
+
+/**
+ * Tests the REST client-server connectivity with various configurations.
+ */
+public abstract class ClientAbstractConnectivitySelfTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final String WILDCARD_IP = "0.0.0.0";
+
+    /** */
+    private static final String LOOPBACK_IP = "127.0.0.1";
+
+    /**
+     * @return IP addresses.
+     * @throws Exception If failed.
+     */
+    private static IgniteBiTuple<Collection<String>, Collection<String>> 
getAllIps() throws Exception {
+        return U.resolveLocalAddresses(InetAddress.getByName("0.0.0.0"));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        GridClientFactory.stopAll();
+
+        G.stopAll(true);
+    }
+
+    /**
+     * Starts a REST-enabled node.
+     *
+     * @param name Node name.
+     * @param addr REST address (default if null).
+     * @param port REST port (default if null).
+     * @return Started node.
+     * @throws Exception If case of configuration or startup error.
+     */
+    protected abstract Ignite startRestNode(String name, @Nullable String 
addr, @Nullable Integer port) throws Exception;
+
+    /**
+     * @return Default REST port.
+     */
+    protected abstract int defaultRestPort();
+
+    /**
+     * @return REST address attribute name.
+     */
+    protected abstract String restAddressAttributeName();
+
+    /**
+     * @return REST host name attribute name.
+     */
+    protected abstract String restHostNameAttributeName();
+
+    /**
+     * @return REST port attribute name.
+     */
+    protected abstract String restPortAttributeName();
+
+    /**
+     * @return REST protocol.
+     */
+    protected abstract GridClientProtocol protocol();
+
+    /**
+     * Starts a REST client.
+     *
+     * @param addr REST server address.
+     * @param port REST server port.
+     * @return A successfully started REST client.
+     * @throws GridClientException If failed to start REST client.
+     */
+    protected GridClient startClient(String addr, int port) throws 
GridClientException {
+        GridClientConfiguration cliCfg = new GridClientConfiguration();
+        cliCfg.setServers(Collections.singleton(addr + ":" + port));
+        cliCfg.setProtocol(protocol());
+
+        return GridClientFactory.start(cliCfg);
+    }
+
+    /**
+     * Tests correct behavior in case of 1 REST-enabled node
+     * with default settings.
+     *
+     * @throws Exception If failed.
+     */
+    public void testOneNodeDefaultHostAndPort() throws Exception {
+        startRestNode("grid1", null, null);
+
+        checkConnectivityByIp(LOOPBACK_IP, getAllIps());
+
+        String extIp = F.find(U.allLocalIps(), null, new 
IpV4AddressPredicate());
+
+        checkConnectivityByIp(extIp, getAllIps());
+    }
+
+    /**
+     * Tests correct behavior in case of 1 REST-enabled node
+     * with explicitly specified loopback address setting.
+     *
+     * @throws Exception If error occurs.
+     */
+    public void testOneNodeLoopbackHost() throws Exception {
+        startRestNode("grid1", LOOPBACK_IP, defaultRestPort());
+
+        checkConnectivityByIp(LOOPBACK_IP, 
F.t((Collection<String>)Collections.singleton(LOOPBACK_IP),
+            (Collection<String>)Collections.singleton("")));
+    }
+
+    /**
+     * Tests correct behavior in case of 1 REST-enabled node
+     * with explicitly specified 0.0.0.0 address.
+     *
+     * @throws Exception If error occurs.
+     */
+    public void testOneNodeZeroIpv4Address() throws Exception {
+        startRestNode("grid1", WILDCARD_IP, defaultRestPort());
+
+        Collection<String> addrs = new LinkedList<>();
+
+        addrs.add(LOOPBACK_IP);
+
+        Collection<String> nonLoopbackAddrs = U.allLocalIps();
+
+        assertNotNull(nonLoopbackAddrs);
+
+        addrs.addAll(F.view(nonLoopbackAddrs, new IpV4AddressPredicate()));
+
+        // The client should be able to connect through all IPv4 addresses.
+        for (String addr : addrs) {
+            log.info("Trying address: " + addr);
+
+            GridClient cli = startClient(addr, defaultRestPort());
+
+            List<GridClientNode> nodes = cli.compute().refreshTopology(true, 
false);
+
+            assertEquals(1, nodes.size());
+
+            GridClientNode node = F.first(nodes);
+
+            assertNotNull(node);
+
+            assertEquals(getAllIps().get1(), 
node.attribute(restAddressAttributeName()));
+            assertEquals(getAllIps().get2(), 
node.attribute(restHostNameAttributeName()));
+
+            List<String> nodeAddrs = node.tcpAddresses();
+
+            assertTrue(nodeAddrs.contains(LOOPBACK_IP));
+
+            assertTrue(F.containsAll(nodeAddrs, addrs));
+        }
+    }
+
+    /**
+     * Tests correct behavior in case of 2 REST-enabled nodes with default
+     * settings.
+     *
+     * @throws Exception If error occurs.
+     */
+    public void testTwoNodesDefaultHostAndPort() throws Exception {
+        startRestNode("grid1", null, null);
+        startRestNode("grid2", null, null);
+
+        GridClient cli = startClient(LOOPBACK_IP, defaultRestPort());
+
+        List<GridClientNode> nodes = cli.compute().refreshTopology(true, 
false);
+
+        assertEquals(2, nodes.size());
+
+        assertTrue(F.forAll(nodes, new P1<GridClientNode>() {
+            @Override public boolean apply(GridClientNode node) {
+                return node.tcpAddresses().contains(LOOPBACK_IP);
+            }
+        }));
+
+        GridTestUtils.assertOneToOne(
+            nodes,
+            new P1<GridClientNode>() {
+                @Override public boolean apply(GridClientNode node) {
+                    try {
+                        return eqAddresses(getAllIps(), node) &&
+                            
Integer.valueOf(defaultRestPort()).equals(node.attribute(restPortAttributeName()));
+                    }
+                    catch (Exception ignored) {
+                        return false;
+                    }
+                }
+            },
+            new P1<GridClientNode>() {
+                @Override public boolean apply(GridClientNode node) {
+                    try {
+                        return eqAddresses(getAllIps(), node) &&
+                            Integer.valueOf(defaultRestPort() + 
1).equals(node.attribute(restPortAttributeName()));
+                    }
+                    catch (Exception ignored) {
+                        return false;
+                    }
+                }
+            }
+        );
+    }
+
+    /**
+     * Tests correct behavior in case of shutdown node used to refresh 
topology state.
+     *
+     * @throws Exception If error occurs.
+     */
+    public void testRefreshTopologyOnNodeLeft() throws Exception {
+        startRestNode("grid1", null, null);
+        startRestNode("grid2", null, null);
+
+        GridClient cli = startClient(LOOPBACK_IP, defaultRestPort());
+
+        List<GridClientNode> nodes = cli.compute().refreshTopology(true, 
false);
+
+        assertEquals(2, nodes.size());
+
+        stopGrid("grid1");
+
+        nodes = cli.compute().refreshTopology(true, false);
+
+        assertEquals(1, nodes.size());
+
+        startRestNode("grid3", null, null);
+
+        nodes = cli.compute().refreshTopology(true, false);
+
+        assertEquals(2, nodes.size());
+
+        stopGrid("grid2");
+
+        nodes = cli.compute().refreshTopology(true, false);
+
+        assertEquals(1, nodes.size());
+    }
+
+    /**
+     * @param connectIp IP to test.
+     * @param nodeIp Expected IP reported to client.
+     * @throws GridClientException If failed.
+     */
+    private void checkConnectivityByIp(String connectIp, 
IgniteBiTuple<Collection<String>, Collection<String>> nodeIp)
+        throws GridClientException {
+        GridClient cli = startClient(connectIp, defaultRestPort());
+
+        List<GridClientNode> nodes = cli.compute().refreshTopology(true, 
false);
+
+        assertEquals(1, nodes.size());
+
+        GridClientNode node = F.first(nodes);
+
+        assertNotNull(node);
+        assertTrue(eqAddresses(nodeIp, node));
+    }
+
+    /**
+     * @param nodeIp Node ip.
+     * @param node Node.
+     * @return {@code True} if addresses are equal, {@code false} otherwise.
+     */
+    private boolean eqAddresses(IgniteBiTuple<Collection<String>, 
Collection<String>> nodeIp, GridClientNode node) {
+        return F.eqOrdered(nodeIp.get1(), 
(Collection<String>)(node.attribute(restAddressAttributeName()))) &&
+            F.eqOrdered(nodeIp.get2(), 
(Collection<String>)(node.attribute(restHostNameAttributeName())));
+    }
+
+    /**
+     * Predicate that returns IPv4 address strings.
+     */
+    private static class IpV4AddressPredicate implements P1<String> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(String s) {
+            return s.matches("\\d+\\.\\d+\\.\\d+\\.\\d+");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/756034f3/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
new file mode 100644
index 0000000..c6cedaa
--- /dev/null
+++ 
b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
@@ -0,0 +1,858 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.integration;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.client.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.client.*;
+import org.apache.ignite.internal.client.balancer.*;
+import org.apache.ignite.internal.client.ssl.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.util.direct.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ * Tests basic client behavior with multiple nodes.
+ */
+@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+public abstract class ClientAbstractMultiNodeSelfTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Partitioned cache name. */
+    private static final String PARTITIONED_CACHE_NAME = "partitioned";
+
+    /** Replicated cache name. */
+    private static final String REPLICATED_CACHE_NAME = "replicated";
+
+    /** Replicated async cache name. */
+    private static final String REPLICATED_ASYNC_CACHE_NAME = 
"replicated_async";
+
+    /** Nodes count. */
+    public static final int NODES_CNT = 5;
+
+    /**
+     * Topology update frequency.
+     * Set it longer than router's, so we wouldn't receive failures
+     * caused only by obsolete topology on router.
+     */
+    static final int TOP_REFRESH_FREQ = 2500;
+
+    /** Path to jetty config. */
+    public static final String REST_JETTY_CFG = 
"modules/clients/src/test/resources/jetty/rest-jetty.xml";
+
+    /** Path to jetty config with SSl enabled. */
+    public static final String REST_JETTY_SSL_CFG = 
"modules/clients/src/test/resources/jetty/rest-jetty-ssl.xml";
+
+    /** Host. */
+    public static final String HOST = "127.0.0.1";
+
+    /** Base for tcp rest ports. */
+    public static final int REST_TCP_PORT_BASE = 12345;
+
+    /** Base for http rest ports, defined in {@link #REST_JETTY_CFG}. */
+    public static final int REST_HTTP_PORT_BASE = 11080;
+
+    /** Base for https rest ports, defined in {@link #REST_JETTY_SSL_CFG}. */
+    public static final int REST_HTTPS_PORT_BASE = 11443;
+
+    /** */
+    private static volatile boolean commSpiEnabled;
+
+    /** Flag to enable REST in node configuration. */
+    private boolean restEnabled = true;
+
+    /** Client instance for each test. */
+    private GridClient client;
+
+    /**
+     * @return Client protocol that should be used.
+     */
+    protected abstract GridClientProtocol protocol();
+
+    /**
+     * @return Server address to create first connection.
+     */
+    protected abstract String serverAddress();
+
+    /**
+     * @return SSL context factory to use if SSL or {@code null} to disable 
SSL usage.
+     */
+    @Nullable protected GridSslContextFactory sslContextFactory() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        c.setLocalHost(HOST);
+
+        assert c.getClientConnectionConfiguration() == null;
+
+        if (restEnabled) {
+            ClientConnectionConfiguration clientCfg = new 
ClientConnectionConfiguration();
+
+            clientCfg.setRestTcpPort(REST_TCP_PORT_BASE);
+
+            GridSslContextFactory sslCtxFactory = sslContextFactory();
+
+            if (sslCtxFactory != null) {
+                clientCfg.setRestTcpSslEnabled(true);
+                clientCfg.setRestTcpSslContextFactory(sslCtxFactory);
+            }
+
+            c.setClientConnectionConfiguration(clientCfg);
+        }
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        c.setDiscoverySpi(disco);
+
+        TestCommunicationSpi spi = new TestCommunicationSpi();
+
+        spi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
+
+        c.setCommunicationSpi(spi);
+
+        c.setCacheConfiguration(cacheConfiguration(null), 
cacheConfiguration(PARTITIONED_CACHE_NAME),
+            cacheConfiguration(REPLICATED_CACHE_NAME), 
cacheConfiguration(REPLICATED_ASYNC_CACHE_NAME));
+
+        ThreadPoolExecutor exec = new ThreadPoolExecutor(
+            40,
+            40,
+            0,
+            MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>());
+
+        exec.prestartAllCoreThreads();
+
+        c.setExecutorService(exec);
+
+        c.setExecutorServiceShutdown(true);
+
+        ThreadPoolExecutor sysExec = new ThreadPoolExecutor(
+            40,
+            40,
+            0,
+            MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>());
+
+        sysExec.prestartAllCoreThreads();
+
+        c.setSystemExecutorService(sysExec);
+
+        c.setSystemExecutorServiceShutdown(true);
+
+        return c;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache configuration.
+     * @throws Exception In case of error.
+     */
+    private CacheConfiguration cacheConfiguration(@Nullable String cacheName) 
throws Exception {
+        CacheConfiguration cfg = defaultCacheConfiguration();
+
+        cfg.setAtomicityMode(TRANSACTIONAL);
+        cfg.setDistributionMode(NEAR_PARTITIONED);
+
+        if (cacheName == null)
+            cfg.setCacheMode(LOCAL);
+        else if (PARTITIONED_CACHE_NAME.equals(cacheName)) {
+            cfg.setCacheMode(PARTITIONED);
+
+            cfg.setBackups(0);
+        }
+        else
+            cfg.setCacheMode(REPLICATED);
+
+        cfg.setName(cacheName);
+
+        
cfg.setWriteSynchronizationMode(REPLICATED_ASYNC_CACHE_NAME.equals(cacheName) ? 
FULL_ASYNC : FULL_SYNC);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(NODES_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        info("Stopping grids.");
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        client = GridClientFactory.start(clientConfiguration());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        if (client != null) {
+            GridClientFactory.stop(client.id(), false);
+
+            client = null;
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSyncCommitRollbackFlags() throws Exception {
+        commSpiEnabled = true;
+
+        try {
+            GridClientData data = client.data(REPLICATED_ASYNC_CACHE_NAME);
+
+            info("Before put x1");
+
+            data.put("x1", "y1");
+
+            info("Before put x2");
+
+            data.flagsOn(GridClientCacheFlag.SYNC_COMMIT).put("x2", "y2");
+
+            info("Before put x3");
+
+            data.put("x3", "y3");
+
+            info("Before put x4");
+
+            data.flagsOn(GridClientCacheFlag.SYNC_COMMIT).put("x4", "y4");
+        }
+        finally {
+            commSpiEnabled = false;
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEmptyProjections() throws Exception {
+        final GridClientCompute dflt = client.compute();
+
+        Collection<? extends GridClientNode> nodes = dflt.nodes();
+
+        assertEquals(NODES_CNT, nodes.size());
+
+        Iterator<? extends GridClientNode> iter = nodes.iterator();
+
+        final GridClientCompute singleNodePrj = 
dflt.projection(Collections.singletonList(iter.next()));
+
+        final GridClientNode second = iter.next();
+
+        final GridClientPredicate<GridClientNode> noneFilter = new 
GridClientPredicate<GridClientNode>() {
+            @Override public boolean apply(GridClientNode node) {
+                return false;
+            }
+        };
+
+        final GridClientPredicate<GridClientNode> targetFilter = new 
GridClientPredicate<GridClientNode>() {
+            @Override public boolean apply(GridClientNode node) {
+                return node.nodeId().equals(second.nodeId());
+            }
+        };
+
+        GridTestUtils.assertThrows(log(), new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return dflt.projection(noneFilter).log(-1, -1);
+            }
+        }, GridServerUnreachableException.class, null);
+
+        GridTestUtils.assertThrows(log(), new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return singleNodePrj.projection(second);
+            }
+        }, GridClientException.class, null);
+
+        GridTestUtils.assertThrows(log(), new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+                return singleNodePrj.projection(targetFilter);
+            }
+        }, GridClientException.class, null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testProjectionRun() throws Exception {
+        GridClientCompute dflt = client.compute();
+
+        Collection<? extends GridClientNode> nodes = dflt.nodes();
+
+        assertEquals(NODES_CNT, nodes.size());
+
+        for (int i = 0; i < NODES_CNT; i++) {
+            Ignite g = grid(i);
+
+            assert g != null;
+
+            GridClientNode clientNode = 
dflt.node(g.cluster().localNode().id());
+
+            assertNotNull("Client node for " + g.cluster().localNode().id() + 
" was not found", clientNode);
+
+            GridClientCompute prj = dflt.projection(clientNode);
+
+            String res = prj.execute(TestTask.class.getName(), null);
+
+            assertNotNull(res);
+
+            assertEquals(g.cluster().localNode().id().toString(), res);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAffinityExecute() throws Exception {
+        GridClientCompute dflt = client.compute();
+
+        GridClientData data = client.data(PARTITIONED_CACHE_NAME);
+
+        Collection<? extends GridClientNode> nodes = dflt.nodes();
+
+        assertEquals(NODES_CNT, nodes.size());
+
+        for (int i = 0; i < NODES_CNT; i++) {
+            Ignite g = grid(i);
+
+            assert g != null;
+
+            int affinityKey = -1;
+
+            for (int key = 0; key < 10000; key++) {
+                if (g.cluster().localNode().id().equals(data.affinity(key))) {
+                    affinityKey = key;
+
+                    break;
+                }
+            }
+
+            if (affinityKey == -1)
+                throw new Exception("Unable to found key for which node is 
primary: " + g.cluster().localNode().id());
+
+            GridClientNode clientNode = 
dflt.node(g.cluster().localNode().id());
+
+            assertNotNull("Client node for " + g.cluster().localNode().id() + 
" was not found", clientNode);
+
+            String res = dflt.affinityExecute(TestTask.class.getName(), 
PARTITIONED_CACHE_NAME, affinityKey, null);
+
+            assertNotNull(res);
+
+            assertEquals(g.cluster().localNode().id().toString(), res);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvalidateFlag() throws Exception {
+        IgniteEx g0 = grid(0);
+
+        GridCache<String, String> cache = g0.cache(PARTITIONED_CACHE_NAME);
+
+        String key = null;
+
+        for (int i = 0; i < 10_000; i++) {
+            if (!cache.affinity().isPrimaryOrBackup(g0.localNode(), 
String.valueOf(i))) {
+                key = String.valueOf(i);
+
+                break;
+            }
+        }
+
+        assertNotNull(key);
+
+        cache.put(key, key); // Create entry in near cache, it is invalidated 
if INVALIDATE flag is set.
+
+        assertNotNull(cache.peek(key));
+
+        GridClientData d = client.data(PARTITIONED_CACHE_NAME);
+
+        d.flagsOn(GridClientCacheFlag.INVALIDATE).put(key, "zzz");
+
+        for (Ignite g : G.allGrids()) {
+            cache = g.cache(PARTITIONED_CACHE_NAME);
+
+            if (cache.affinity().isPrimaryOrBackup(g.cluster().localNode(), 
key))
+                assertEquals("zzz", cache.peek(key));
+            else
+                assertNull(cache.peek(key));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientAffinity() throws Exception {
+        GridClientData partitioned = client.data(PARTITIONED_CACHE_NAME);
+
+        Collection<Object> keys = new ArrayList<>();
+
+        keys.addAll(Arrays.asList(
+            Boolean.TRUE,
+            Boolean.FALSE,
+            1,
+            Integer.MAX_VALUE
+        ));
+
+        Random rnd = new Random();
+        StringBuilder sb = new StringBuilder();
+
+        // Generate some random strings.
+        for (int i = 0; i < 100; i++) {
+            sb.setLength(0);
+
+            for (int j = 0; j < 255; j++)
+                // Only printable ASCII symbols for test.
+                sb.append((char)(rnd.nextInt(0x7f - 0x20) + 0x20));
+
+            keys.add(sb.toString());
+        }
+
+        // Generate some more keys to achieve better coverage.
+        for (int i = 0; i < 100; i++)
+            keys.add(UUID.randomUUID());
+
+        for (Object key : keys) {
+            UUID nodeId = grid(0).mapKeyToNode(PARTITIONED_CACHE_NAME, 
key).id();
+
+            UUID clientNodeId = partitioned.affinity(key);
+
+            assertEquals("Invalid affinity mapping for REST response for key: 
" + key, nodeId, clientNodeId);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTopologyListener() throws Exception {
+        final Collection<UUID> added = new ArrayList<>(1);
+        final Collection<UUID> rmvd = new ArrayList<>(1);
+
+        final CountDownLatch addedLatch = new CountDownLatch(1);
+        final CountDownLatch rmvLatch = new CountDownLatch(1);
+
+        assertEquals(NODES_CNT, client.compute().refreshTopology(false, 
false).size());
+
+        GridClientTopologyListener lsnr = new GridClientTopologyListener() {
+            @Override public void onNodeAdded(GridClientNode node) {
+                added.add(node.nodeId());
+
+                addedLatch.countDown();
+            }
+
+            @Override public void onNodeRemoved(GridClientNode node) {
+                rmvd.add(node.nodeId());
+
+                rmvLatch.countDown();
+            }
+        };
+
+        client.addTopologyListener(lsnr);
+
+        try {
+            Ignite g = startGrid(NODES_CNT + 1);
+
+            UUID id = g.cluster().localNode().id();
+
+            assertTrue(addedLatch.await(2 * TOP_REFRESH_FREQ, MILLISECONDS));
+
+            assertEquals(1, added.size());
+            assertEquals(id, F.first(added));
+
+            stopGrid(NODES_CNT + 1);
+
+            assertTrue(rmvLatch.await(2 * TOP_REFRESH_FREQ, MILLISECONDS));
+
+            assertEquals(1, rmvd.size());
+            assertEquals(id, F.first(rmvd));
+        }
+        finally {
+            client.removeTopologyListener(lsnr);
+
+            stopGrid(NODES_CNT + 1);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDisabledRest() throws Exception {
+        restEnabled = false;
+
+        final Ignite g = startGrid("disabled-rest");
+
+        try {
+            Thread.sleep(2 * TOP_REFRESH_FREQ);
+
+            // As long as we have round robin load balancer this will cause 
every node to be queried.
+            for (int i = 0; i < NODES_CNT + 1; i++)
+                assertEquals(NODES_CNT + 1, 
client.compute().refreshTopology(false, false).size());
+
+            final GridClientData data = client.data(PARTITIONED_CACHE_NAME);
+
+            // Check rest-disabled node is unavailable.
+            try {
+                String affKey;
+
+                do {
+                    affKey = UUID.randomUUID().toString();
+                } while 
(!data.affinity(affKey).equals(g.cluster().localNode().id()));
+
+                data.put(affKey, "asdf");
+
+                assertEquals("asdf", cache(0, 
PARTITIONED_CACHE_NAME).get(affKey));
+            }
+            catch (GridServerUnreachableException e) {
+                // Thrown for direct client-node connections.
+                assertTrue("Unexpected exception message: " + e.getMessage(),
+                    e.getMessage().startsWith("No available endpoints to 
connect (is rest enabled for this node?)"));
+            }
+            catch (GridClientException e) {
+                // Thrown for routed client-router-node connections.
+                String msg = e.getMessage();
+
+                assertTrue("Unexpected exception message: " + msg, protocol() 
== GridClientProtocol.TCP ?
+                    msg.contains("No available endpoints to connect (is rest 
enabled for this node?)") : // TCP router.
+                    msg.startsWith("No available nodes on the router for 
destination node ID"));         // HTTP router.
+            }
+
+            // Check rest-enabled nodes are available.
+            String affKey;
+
+            do {
+                affKey = UUID.randomUUID().toString();
+            } while 
(data.affinity(affKey).equals(g.cluster().localNode().id()));
+
+            data.put(affKey, "fdsa");
+
+            assertEquals("fdsa", cache(0, PARTITIONED_CACHE_NAME).get(affKey));
+        }
+        finally {
+            restEnabled = true;
+
+            G.stop(g.name(), true);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAffinityPut() throws Exception {
+        Thread.sleep(2 * TOP_REFRESH_FREQ);
+
+        assertEquals(NODES_CNT, client.compute().refreshTopology(false, 
false).size());
+
+        Map<UUID, Ignite> gridsByLocNode = new HashMap<>(NODES_CNT);
+
+        GridClientData partitioned = client.data(PARTITIONED_CACHE_NAME);
+
+        GridClientCompute compute = client.compute();
+
+        for (int i = 0; i < NODES_CNT; i++)
+            gridsByLocNode.put(grid(i).localNode().id(), grid(i));
+
+        for (int i = 0; i < 100; i++) {
+            String key = "key" + i;
+
+            UUID primaryNodeId = grid(0).mapKeyToNode(PARTITIONED_CACHE_NAME, 
key).id();
+
+            assertEquals("Affinity mismatch for key: " + key, primaryNodeId, 
partitioned.affinity(key));
+
+            assertEquals(primaryNodeId, partitioned.affinity(key));
+
+            // Must go to primary node only. Since backup count is 0, value 
must present on
+            // primary node only.
+            partitioned.put(key, "val" + key);
+
+            for (Map.Entry<UUID, Ignite> entry : gridsByLocNode.entrySet()) {
+                Object val = 
entry.getValue().cache(PARTITIONED_CACHE_NAME).peek(key);
+
+                if (primaryNodeId.equals(entry.getKey()))
+                    assertEquals("val" + key, val);
+                else
+                    assertNull(val);
+            }
+        }
+
+        // Now check that we will see value in near cache in pinned mode.
+        for (int i = 100; i < 200; i++) {
+            String pinnedKey = "key" + i;
+
+            UUID primaryNodeId = grid(0).mapKeyToNode(PARTITIONED_CACHE_NAME, 
pinnedKey).id();
+
+            UUID pinnedNodeId = F.first(F.view(gridsByLocNode.keySet(), 
F.notEqualTo(primaryNodeId)));
+
+            GridClientNode node = compute.node(pinnedNodeId);
+
+            partitioned.pinNodes(node).put(pinnedKey, "val" + pinnedKey);
+
+            for (Map.Entry<UUID, Ignite> entry : gridsByLocNode.entrySet()) {
+                Object val = 
entry.getValue().cache(PARTITIONED_CACHE_NAME).peek(pinnedKey);
+
+                if (primaryNodeId.equals(entry.getKey()) || 
pinnedNodeId.equals(entry.getKey()))
+                    assertEquals("val" + pinnedKey, val);
+                else
+                    assertNull(val);
+            }
+        }
+    }
+
+    /**
+     * @return Client configuration for the test.
+     */
+    protected GridClientConfiguration clientConfiguration() throws 
GridClientException {
+        GridClientConfiguration cfg = new GridClientConfiguration();
+
+        cfg.setBalancer(getBalancer());
+
+        cfg.setTopologyRefreshFrequency(TOP_REFRESH_FREQ);
+
+        cfg.setProtocol(protocol());
+        cfg.setServers(Arrays.asList(serverAddress()));
+        cfg.setSslContextFactory(sslContextFactory());
+
+        GridClientDataConfiguration loc = new GridClientDataConfiguration();
+
+        GridClientDataConfiguration partitioned = new 
GridClientDataConfiguration();
+
+        partitioned.setName(PARTITIONED_CACHE_NAME);
+        partitioned.setAffinity(new GridClientPartitionAffinity());
+
+        GridClientDataConfiguration replicated = new 
GridClientDataConfiguration();
+        replicated.setName(REPLICATED_CACHE_NAME);
+
+        GridClientDataConfiguration replicatedAsync = new 
GridClientDataConfiguration();
+        replicatedAsync.setName(REPLICATED_ASYNC_CACHE_NAME);
+
+        cfg.setDataConfigurations(Arrays.asList(loc, partitioned, replicated, 
replicatedAsync));
+
+        return cfg;
+    }
+
+    /**
+     * Gets client load balancer.
+     *
+     * @return Load balancer.
+     */
+    protected GridClientLoadBalancer getBalancer() {
+        return new GridClientRoundRobinBalancer();
+    }
+
+    /**
+     * Test task. Returns a tuple in which first component is id of node that 
has split the task,
+     * and second component is count of nodes that executed jobs.
+     */
+    private static class TestTask extends ComputeTaskSplitAdapter<Object, 
String> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** Count of tasks this job was split to. */
+        private int gridSize;
+
+        /** {@inheritDoc} */
+        @Override protected Collection<? extends ComputeJob> split(int 
gridSize, Object arg)
+            throws IgniteCheckedException {
+            Collection<ComputeJobAdapter> jobs = new ArrayList<>(gridSize);
+
+            this.gridSize = gridSize;
+
+            final String locNodeId = 
ignite.cluster().localNode().id().toString();
+
+            for (int i = 0; i < gridSize; i++) {
+                jobs.add(new ComputeJobAdapter() {
+                    @SuppressWarnings("OverlyStrongTypeCast")
+                    @Override public Object execute() {
+                        try {
+                            Thread.sleep(1000);
+                        }
+                        catch (InterruptedException ignored) {
+                            Thread.currentThread().interrupt();
+                        }
+
+                        return new IgniteBiTuple<>(locNodeId, 1);
+                    }
+                });
+            }
+
+            return jobs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String reduce(List<ComputeJobResult> results) throws 
IgniteCheckedException {
+            int sum = 0;
+
+            String locNodeId = null;
+
+            for (ComputeJobResult res : results) {
+                IgniteBiTuple<String, Integer> part = res.getData();
+
+                if (locNodeId == null)
+                    locNodeId = part.get1();
+
+                Integer i = part.get2();
+
+                if (i != null)
+                    sum += i;
+            }
+
+            assert gridSize == sum;
+
+            return locNodeId;
+        }
+    }
+
+    /**
+     * Communication SPI which checks cache flags.
+     */
+    @SuppressWarnings("unchecked")
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, 
GridTcpCommunicationMessageAdapter msg)
+            throws IgniteSpiException {
+            checkSyncFlags((GridIoMessage)msg);
+
+            super.sendMessage(node, msg);
+        }
+
+        /**
+         * Check if flags in correct state.
+         *
+         * @param msg Message.
+         */
+        private void checkSyncFlags(GridIoMessage msg) {
+            if (!commSpiEnabled)
+                return;
+
+            Object o = msg.message();
+
+            if (!(o instanceof GridDistributedLockRequest))
+                return;
+
+            IgniteKernal g = 
(IgniteKernal)G.ignite(ignite.configuration().getNodeId());
+
+            GridCacheContext<Object, Object> cacheCtx = 
g.internalCache(REPLICATED_ASYNC_CACHE_NAME).context();
+
+            IgniteTxManager<Object, Object> tm = cacheCtx.tm();
+
+            GridCacheVersion v = ((GridCacheVersionable)o).version();
+
+            IgniteTxEx t = tm.tx(v);
+
+            if (t.hasWriteKey(cacheCtx.txKey("x1")))
+                assertFalse("Invalid tx flags: " + t, t.syncCommit());
+            else if (t.hasWriteKey(cacheCtx.txKey("x2")))
+                assertTrue("Invalid tx flags: " + t, t.syncCommit());
+            else if (t.hasWriteKey(cacheCtx.txKey("x3")))
+                assertFalse("Invalid tx flags: " + t, t.syncCommit());
+            else if (t.hasWriteKey(cacheCtx.txKey("x4")))
+                assertTrue("Invalid tx flags: " + t, t.syncCommit());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreadedCommand() throws Exception {
+        final GridClientData data = client.data(PARTITIONED_CACHE_NAME);
+        final GridClientCompute compute = client.compute();
+        final AtomicInteger cnt = new AtomicInteger(0);
+
+        multithreaded(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                for (int i = 0; i < 20; i++) {
+                    String key = UUID.randomUUID().toString();
+                    String val = UUID.randomUUID().toString();
+
+                    switch (cnt.incrementAndGet() % 4) {
+                        case 0: {
+                            assertTrue(data.put(key, val));
+                            assertEquals(val, data.get(key));
+                            assertTrue(data.remove(key));
+
+                            break;
+                        }
+
+                        case 1: {
+                            assertNotNull(data.metrics());
+
+                            break;
+                        }
+
+                        case 2: {
+                            String nodeId = 
compute.execute(TestTask.class.getName(), null);
+
+                            assertNotNull(nodeId);
+                            
assertNotNull(compute.refreshNode(UUID.fromString(nodeId), true, true));
+
+                            break;
+                        }
+
+                        case 3: {
+                            assertEquals(NODES_CNT, 
compute.refreshTopology(true, true).size());
+
+                            break;
+                        }
+                    }
+                }
+
+                return null;
+            }
+        }, 50, "multithreaded-client-access");
+    }
+}

Reply via email to