# IGNITE-709 Add test to check pending messages on client reconnect.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d59403b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d59403b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d59403b4

Branch: refs/heads/ignite-709_2
Commit: d59403b482a4e2273abcd17c99a3c837ea302154
Parents: e76752d
Author: sevdokimov <sergey.evdoki...@jetbrains.com>
Authored: Sat May 2 19:23:53 2015 +0300
Committer: sevdokimov <sergey.evdoki...@jetbrains.com>
Committed: Sat May 2 19:23:53 2015 +0300

----------------------------------------------------------------------
 .../discovery/tcp/TcpClientDiscoverySpi.java    |  8 ++
 .../tcp/TcpClientDiscoverySelfTest.java         | 84 +++++++++++++++++++-
 2 files changed, 91 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d59403b4/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index 1e6dc2f..c319f9e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -551,6 +551,13 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
     }
 
     /**
+     * FOR TEST PURPOSE ONLY!
+     */
+    public void brokeConnection() {
+        U.closeQuiet(msgWorker.currSock);
+    }
+
+    /**
      * Heartbeat sender.
      */
     private class HeartbeatSender extends TimerTask {
@@ -1288,6 +1295,7 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
                 assert msg.success();
 
                 currSock = reconnector.sock;
+
                 sockWriter.setSocket(currSock);
                 sockReader.setSocket(currSock, locNode.clientRouterNodeId());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d59403b4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
index a280c39..fd9b0f7 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
@@ -22,20 +22,24 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.io.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
 import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.junits.common.*;
 import org.jetbrains.annotations.*;
 
+import java.io.*;
 import java.net.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
 
 import static java.util.concurrent.TimeUnit.*;
 import static org.apache.ignite.events.EventType.*;
@@ -97,7 +101,7 @@ public class TcpClientDiscoverySelfTest extends 
GridCommonAbstractTest {
             cfg.setDiscoverySpi(disco);
         }
         else if (gridName.startsWith("client")) {
-            TcpClientDiscoverySpi disco = new TcpClientDiscoverySpi();
+            TcpClientDiscoverySpi disco = new TestTcpClientDiscovery();
 
             TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
 
@@ -310,6 +314,43 @@ public class TcpClientDiscoverySelfTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testGetMissedMessagesOnReconnect() throws Exception {
+        clientsPerSrv = 1;
+
+        startServerNodes(3);
+        startClientNodes(2);
+
+        checkNodes(3, 2);
+
+        clientLeftLatch = new CountDownLatch(1);
+        srvLeftLatch = new CountDownLatch(2);
+
+        attachListeners(2, 2);
+
+        
((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).suspend();
+
+        stopGrid("server-2");
+
+        await(srvLeftLatch);
+        await(srvLeftLatch);
+
+        Thread.sleep(500);
+
+        assert G.ignite("client-0").cluster().nodes().size() == 4;
+        assert G.ignite("client-1").cluster().nodes().size() == 5;
+
+        clientLeftLatch = new CountDownLatch(1);
+
+        
((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).resume();
+
+        await(clientLeftLatch);
+
+        checkNodes(2, 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testClientSegmentation() throws Exception {
         clientsPerSrv = 1;
 
@@ -739,4 +780,45 @@ public class TcpClientDiscoverySelfTest extends 
GridCommonAbstractTest {
             return true;
         }
     }
+
+    /**
+     *
+     */
+    private static class TestTcpClientDiscovery extends TcpClientDiscoverySpi {
+        /** */
+        private final Lock ioOperationsLock = new ReentrantLock();
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
+            GridByteArrayOutputStream bout) throws IOException, 
IgniteCheckedException {
+            ioOperationsLock.lock();
+            ioOperationsLock.unlock();
+
+            super.writeToSocket(sock, msg, bout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Socket openSocket(InetSocketAddress sockAddr) 
throws IOException {
+            ioOperationsLock.lock();
+            ioOperationsLock.unlock();
+
+            return super.openSocket(sockAddr);
+        }
+
+        /**
+         *
+         */
+        private void suspend() {
+            ioOperationsLock.lock();
+
+            brokeConnection();
+        }
+
+        /**
+         *
+         */
+        private void resume() {
+            ioOperationsLock.unlock();
+        }
+    }
 }

Reply via email to