Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-901 ce2caffdd -> 07558de4b


# ignite-901


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/07558de4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/07558de4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/07558de4

Branch: refs/heads/ignite-901
Commit: 07558de4bc02a86f1854e7bc7b152203285d5602
Parents: ce2caff
Author: sboikov <sboi...@gridgain.com>
Authored: Fri Jul 10 17:49:56 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Fri Jul 10 17:56:26 2015 +0300

----------------------------------------------------------------------
 .../communication/tcp/TcpCommunicationSpi.java  |  2 +-
 .../IgniteClientReconnectCacheTest.java         |  2 +-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      | 78 ++++++++++++++++++++
 ...ClientReconnectCacheQueriesFailoverTest.java | 65 +++++++++++++++-
 4 files changed, 142 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/07558de4/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 4fce6f8..4bace29 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1717,7 +1717,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (log.isTraceEnabled())
             log.trace("Sending message to node [node=" + node + ", msg=" + msg 
+ ']');
 
-        if (node.equals(getLocalNode()))
+        if (node.id().equals(getLocalNode().id()))
             notifyListener(node.id(), msg, NOOP);
         else {
             GridCommunicationClient client = null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/07558de4/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index aae7162..d79a43e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -936,7 +936,7 @@ public class IgniteClientReconnectCacheTest extends 
IgniteClientReconnectAbstrac
         for (int i = 0; i < SRV_CNT; i++)
             stopGrid(i);
 
-        assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+        assertTrue(disconnectLatch.await(30_000, MILLISECONDS));
 
         clientMode = false;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/07558de4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 4d19f3e..92c1f13 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -1335,6 +1335,84 @@ public class TcpClientDiscoverySpiSelfTest extends 
GridCommonAbstractTest {
         clientNodeIds.add(client.cluster().localNode().id());
 
         checkNodes(changeTop ? 2 : 1, 1);
+
+        Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+
+        srvNodeIds.add(g.cluster().localNode().id());
+
+        checkNodes(changeTop ? 3 : 2, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectAfterFailConcurrentJoin() throws Exception {
+        startServerNodes(1);
+
+        startClientNodes(1);
+
+        Ignite srv = G.ignite("server-0");
+
+        TestTcpDiscoverySpi srvSpi = 
((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+
+        Ignite client = G.ignite("client-0");
+
+        final ClusterNode clientNode = client.cluster().localNode();
+
+        assertEquals(2L, clientNode.order());
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                info("Client event: " + evt);
+
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    assertEquals(1, reconnectLatch.getCount());
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    assertEquals(0, disconnectLatch.getCount());
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        final int CLIENTS = 20;
+
+        clientsPerSrv = CLIENTS + 1;
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                latch.await();
+
+                Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
+
+                clientNodeIds.add(g.cluster().localNode().id());
+
+                return null;
+            }
+        }, CLIENTS, "start-client");
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        latch.countDown();
+
+        assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+        assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+
+        clientNodeIds.add(client.cluster().localNode().id());
+
+        fut.get();
+
+        checkNodes(1, CLIENTS + 1);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/07558de4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
index 127745b..23320ae 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
@@ -18,11 +18,13 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cache.query.annotations.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 
 import javax.cache.*;
 import java.util.*;
@@ -49,6 +51,18 @@ public class IgniteClientReconnectCacheQueriesFailoverTest 
extends IgniteClientR
         return cfg;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        final IgniteCache<Integer, Person> cache = 
grid(serverCount()).cache(null);
+
+        assertNotNull(cache);
+
+        for (int i = 0; i <= 10_000; i++)
+            cache.put(i, new Person(i, "name-" + i));
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -59,9 +73,6 @@ public class IgniteClientReconnectCacheQueriesFailoverTest 
extends IgniteClientR
 
         assertNotNull(cache);
 
-        for (int i = 0; i <= 10_000; i++)
-            cache.put(i, new Person(i, "name-" + i));
-
         reconnectFailover(new Callable<Void>() {
             @Override public Void call() throws Exception {
                 SqlQuery<Integer, Person> sqlQry = new 
SqlQuery<>(Person.class, "where id > 1");
@@ -100,6 +111,54 @@ public class IgniteClientReconnectCacheQueriesFailoverTest 
extends IgniteClientR
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectScanQuery() throws Exception {
+        final Ignite client = grid(serverCount());
+
+        final IgniteCache<Integer, Person> cache = client.cache(null);
+
+        assertNotNull(cache);
+
+        final Affinity<Integer> aff = client.affinity(null);
+
+        final Map<Integer, Integer> partMap = new HashMap<>();
+
+        for (int i = 0; i < aff.partitions(); i++)
+            partMap.put(i, 0);
+
+        for (int i = 0; i <= 10_000; i++) {
+            Integer part = aff.partition(i);
+
+            Integer size = partMap.get(part);
+
+            partMap.put(part, size + 1);
+        }
+
+        reconnectFailover(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ScanQuery<Integer, Person> qry = new ScanQuery<>(new 
IgniteBiPredicate<Integer, Person>() {
+                    @Override public boolean apply(Integer key, Person val) {
+                        return val.getId() % 2 == 1;
+                    }
+                });
+
+                assertEquals(5000, cache.query(qry).getAll().size());
+
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                Integer part = rnd.nextInt(0, aff.partitions());
+
+                qry = new ScanQuery<>(part);
+
+                assertEquals((int)partMap.get(part), 
cache.query(qry).getAll().size());
+
+                return null;
+            }
+        });
+    }
+
+    /**
      *
      */
     public static class Person {

Reply via email to