Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 ce2caffdd -> 07558de4b
# ignite-901 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/07558de4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/07558de4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/07558de4 Branch: refs/heads/ignite-901 Commit: 07558de4bc02a86f1854e7bc7b152203285d5602 Parents: ce2caff Author: sboikov <sboi...@gridgain.com> Authored: Fri Jul 10 17:49:56 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Jul 10 17:56:26 2015 +0300 ---------------------------------------------------------------------- .../communication/tcp/TcpCommunicationSpi.java | 2 +- .../IgniteClientReconnectCacheTest.java | 2 +- .../tcp/TcpClientDiscoverySpiSelfTest.java | 78 ++++++++++++++++++++ ...ClientReconnectCacheQueriesFailoverTest.java | 65 +++++++++++++++- 4 files changed, 142 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/07558de4/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 4fce6f8..4bace29 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1717,7 +1717,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isTraceEnabled()) log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']'); - if (node.equals(getLocalNode())) + if (node.id().equals(getLocalNode().id())) notifyListener(node.id(), msg, NOOP); else { GridCommunicationClient client = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/07558de4/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index aae7162..d79a43e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -936,7 +936,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac for (int i = 0; i < SRV_CNT; i++) stopGrid(i); - assertTrue(disconnectLatch.await(10_000, MILLISECONDS)); + assertTrue(disconnectLatch.await(30_000, MILLISECONDS)); clientMode = false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/07558de4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 4d19f3e..92c1f13 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -1335,6 +1335,84 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { clientNodeIds.add(client.cluster().localNode().id()); checkNodes(changeTop ? 2 : 1, 1); + + Ignite g = startGrid("server-" + srvIdx.getAndIncrement()); + + srvNodeIds.add(g.cluster().localNode().id()); + + checkNodes(changeTop ? 3 : 2, 1); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectAfterFailConcurrentJoin() throws Exception { + startServerNodes(1); + + startClientNodes(1); + + Ignite srv = G.ignite("server-0"); + + TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi()); + + Ignite client = G.ignite("client-0"); + + final ClusterNode clientNode = client.cluster().localNode(); + + assertEquals(2L, clientNode.order()); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + final CountDownLatch disconnectLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + info("Client event: " + evt); + + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + assertEquals(1, reconnectLatch.getCount()); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + assertEquals(0, disconnectLatch.getCount()); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + final int CLIENTS = 20; + + clientsPerSrv = CLIENTS + 1; + + final CountDownLatch latch = new CountDownLatch(1); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + latch.await(); + + Ignite g = startGrid("client-" + clientIdx.getAndIncrement()); + + clientNodeIds.add(g.cluster().localNode().id()); + + return null; + } + }, CLIENTS, "start-client"); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + latch.countDown(); + + assertTrue(disconnectLatch.await(10_000, MILLISECONDS)); + assertTrue(reconnectLatch.await(10_000, MILLISECONDS)); + + clientNodeIds.add(client.cluster().localNode().id()); + + fut.get(); + + checkNodes(1, CLIENTS + 1); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/07558de4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java index 127745b..23320ae 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java @@ -18,11 +18,13 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import javax.cache.*; import java.util.*; @@ -49,6 +51,18 @@ public class IgniteClientReconnectCacheQueriesFailoverTest extends IgniteClientR return cfg; } + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + final IgniteCache<Integer, Person> cache = grid(serverCount()).cache(null); + + assertNotNull(cache); + + for (int i = 0; i <= 10_000; i++) + cache.put(i, new Person(i, "name-" + i)); + } + /** * @throws Exception If failed. */ @@ -59,9 +73,6 @@ public class IgniteClientReconnectCacheQueriesFailoverTest extends IgniteClientR assertNotNull(cache); - for (int i = 0; i <= 10_000; i++) - cache.put(i, new Person(i, "name-" + i)); - reconnectFailover(new Callable<Void>() { @Override public Void call() throws Exception { SqlQuery<Integer, Person> sqlQry = new SqlQuery<>(Person.class, "where id > 1"); @@ -100,6 +111,54 @@ public class IgniteClientReconnectCacheQueriesFailoverTest extends IgniteClientR } /** + * @throws Exception If failed. + */ + public void testReconnectScanQuery() throws Exception { + final Ignite client = grid(serverCount()); + + final IgniteCache<Integer, Person> cache = client.cache(null); + + assertNotNull(cache); + + final Affinity<Integer> aff = client.affinity(null); + + final Map<Integer, Integer> partMap = new HashMap<>(); + + for (int i = 0; i < aff.partitions(); i++) + partMap.put(i, 0); + + for (int i = 0; i <= 10_000; i++) { + Integer part = aff.partition(i); + + Integer size = partMap.get(part); + + partMap.put(part, size + 1); + } + + reconnectFailover(new Callable<Void>() { + @Override public Void call() throws Exception { + ScanQuery<Integer, Person> qry = new ScanQuery<>(new IgniteBiPredicate<Integer, Person>() { + @Override public boolean apply(Integer key, Person val) { + return val.getId() % 2 == 1; + } + }); + + assertEquals(5000, cache.query(qry).getAll().size()); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + Integer part = rnd.nextInt(0, aff.partitions()); + + qry = new ScanQuery<>(part); + + assertEquals((int)partMap.get(part), cache.query(qry).getAll().size()); + + return null; + } + }); + } + + /** * */ public static class Person {