# IGNITE-709 Add test to check pending messages on client reconnect.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d59403b4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d59403b4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d59403b4 Branch: refs/heads/ignite-709_2 Commit: d59403b482a4e2273abcd17c99a3c837ea302154 Parents: e76752d Author: sevdokimov <sergey.evdoki...@jetbrains.com> Authored: Sat May 2 19:23:53 2015 +0300 Committer: sevdokimov <sergey.evdoki...@jetbrains.com> Committed: Sat May 2 19:23:53 2015 +0300 ---------------------------------------------------------------------- .../discovery/tcp/TcpClientDiscoverySpi.java | 8 ++ .../tcp/TcpClientDiscoverySelfTest.java | 84 +++++++++++++++++++- 2 files changed, 91 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d59403b4/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index 1e6dc2f..c319f9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -551,6 +551,13 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp } /** + * FOR TEST PURPOSE ONLY! + */ + public void brokeConnection() { + U.closeQuiet(msgWorker.currSock); + } + + /** * Heartbeat sender. */ private class HeartbeatSender extends TimerTask { @@ -1288,6 +1295,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp assert msg.success(); currSock = reconnector.sock; + sockWriter.setSocket(currSock); sockReader.setSocket(currSock, locNode.clientRouterNodeId()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d59403b4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java index a280c39..fd9b0f7 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java @@ -22,20 +22,24 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.io.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.discovery.tcp.messages.*; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; import org.jetbrains.annotations.*; +import java.io.*; import java.net.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; import static java.util.concurrent.TimeUnit.*; import static org.apache.ignite.events.EventType.*; @@ -97,7 +101,7 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest { cfg.setDiscoverySpi(disco); } else if (gridName.startsWith("client")) { - TcpClientDiscoverySpi disco = new TcpClientDiscoverySpi(); + TcpClientDiscoverySpi disco = new TestTcpClientDiscovery(); TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); @@ -310,6 +314,43 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testGetMissedMessagesOnReconnect() throws Exception { + clientsPerSrv = 1; + + startServerNodes(3); + startClientNodes(2); + + checkNodes(3, 2); + + clientLeftLatch = new CountDownLatch(1); + srvLeftLatch = new CountDownLatch(2); + + attachListeners(2, 2); + + ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).suspend(); + + stopGrid("server-2"); + + await(srvLeftLatch); + await(srvLeftLatch); + + Thread.sleep(500); + + assert G.ignite("client-0").cluster().nodes().size() == 4; + assert G.ignite("client-1").cluster().nodes().size() == 5; + + clientLeftLatch = new CountDownLatch(1); + + ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).resume(); + + await(clientLeftLatch); + + checkNodes(2, 2); + } + + /** + * @throws Exception If failed. + */ public void testClientSegmentation() throws Exception { clientsPerSrv = 1; @@ -739,4 +780,45 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest { return true; } } + + /** + * + */ + private static class TestTcpClientDiscovery extends TcpClientDiscoverySpi { + /** */ + private final Lock ioOperationsLock = new ReentrantLock(); + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, + GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException { + ioOperationsLock.lock(); + ioOperationsLock.unlock(); + + super.writeToSocket(sock, msg, bout); + } + + /** {@inheritDoc} */ + @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException { + ioOperationsLock.lock(); + ioOperationsLock.unlock(); + + return super.openSocket(sockAddr); + } + + /** + * + */ + private void suspend() { + ioOperationsLock.lock(); + + brokeConnection(); + } + + /** + * + */ + private void resume() { + ioOperationsLock.unlock(); + } + } }