Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 bf97d042a -> b14b73e85
IGNITE-901 Extracted reconnectClient methods for testing. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b14b73e8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b14b73e8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b14b73e8 Branch: refs/heads/ignite-901 Commit: b14b73e85ad016cb8db7b93b17baef6cb86e9a45 Parents: bf97d04 Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Fri Jul 3 15:47:16 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Fri Jul 3 15:47:16 2015 +0300 ---------------------------------------------------------------------- .../IgniteClientReconnectAbstractTest.java | 57 ++ .../IgniteClientReconnectAtomicsTest.java | 662 ++----------------- 2 files changed, 130 insertions(+), 589 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b14b73e8/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index e883fb5..a9ce136 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -20,9 +20,11 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; @@ -34,12 +36,16 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.spi.discovery.tcp.messages.*; import org.apache.ignite.testframework.junits.common.*; import org.eclipse.jetty.util.*; +import org.jetbrains.annotations.*; import java.io.*; import java.net.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.events.EventType.*; + /** * */ @@ -155,6 +161,57 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra } /** + * Reconnect client node. + * + * @param client Client. + * @param srv Server. + * @param disconnectedClosure Closure which will be run when client node disconnected. + * @throws Exception If failed. + */ + protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedClosure) + throws Exception { + final TestTcpDiscoverySpi clientSpi = spi(client); + final TestTcpDiscoverySpi srvSpi = spi(srv); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + + if (disconnectedClosure != null) + disconnectedClosure.run(); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + } + + /** * */ protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b14b73e8/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java index c4078f8..a827671 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java @@ -18,17 +18,11 @@ package org.apache.ignite.internal; import org.apache.ignite.*; -import org.apache.ignite.events.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; import org.apache.ignite.testframework.*; import java.util.concurrent.*; -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.events.EventType.*; - /** * TODO IGNITE-901: test AtomicReference, AtomicStamped, usage after remove, test API block, fail current call on disconnect. */ @@ -53,53 +47,19 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - TestTcpDiscoverySpi srvSpi = spi(srv); - IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeq", 0, true); assertEquals(1L, clientAtomicSeq.incrementAndGet()); - IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeq", 0, false); + final IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeq", 0, false); assertEquals(1001L, srvAtomicSeq.incrementAndGet()); - final CountDownLatch disconnectLatch = new CountDownLatch(1); - final CountDownLatch reconnectLatch = new CountDownLatch(1); - - final TestTcpDiscoverySpi clientSpi = spi(client); - - log.info("Block reconnect."); - - clientSpi.writeLatch = new CountDownLatch(1); - - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - info("Disconnected: " + evt); - - disconnectLatch.countDown(); - } - else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); - - reconnectLatch.countDown(); - } - - return true; + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + assertEquals(1002L, srvAtomicSeq.incrementAndGet()); } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - - srvSpi.failNode(client.cluster().localNode().id(), null); - - assertTrue(disconnectLatch.await(5000, MILLISECONDS)); - - assertEquals(1002L, srvAtomicSeq.incrementAndGet()); - - log.info("Allow reconnect."); - - clientSpi.writeLatch.countDown(); - - assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + }); assertEquals(2L, clientAtomicSeq.incrementAndGet()); @@ -118,8 +78,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - TestTcpDiscoverySpi srvSpi = spi(srv); - final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqRmv", 0, true); clientAtomicSeq.batchSize(1); @@ -132,47 +90,13 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertEquals(1001L, srvAtomicSeq.incrementAndGet()); - final CountDownLatch disconnectLatch = new CountDownLatch(1); - final CountDownLatch reconnectLatch = new CountDownLatch(1); - - final TestTcpDiscoverySpi clientSpi = spi(client); - - log.info("Block reconnect."); - - clientSpi.writeLatch = new CountDownLatch(1); - - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - info("Disconnected: " + evt); - - disconnectLatch.countDown(); - } - else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); - - reconnectLatch.countDown(); - } + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvAtomicSeq.close(); - return true; + assert srvAtomicSeq.removed(); } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - - srvSpi.failNode(client.cluster().localNode().id(), null); - - assertTrue(disconnectLatch.await(5000, MILLISECONDS)); - - srvAtomicSeq.close(); - - assert srvAtomicSeq.removed(); - - log.info("Allow reconnect."); - - clientSpi.writeLatch.countDown(); - - assertTrue(reconnectLatch.await(5000, MILLISECONDS)); - - U.sleep(1000); + }); GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { @@ -194,8 +118,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - TestTcpDiscoverySpi srvSpi = spi(srv); - BlockTpcCommunicationSpi commSpi = commSpi(srv); final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqInProg", 0, true); @@ -228,45 +150,13 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr commSpi.unblockMsg(); - final CountDownLatch disconnectLatch = new CountDownLatch(1); - final CountDownLatch reconnectLatch = new CountDownLatch(1); - - final TestTcpDiscoverySpi clientSpi = spi(client); - - log.info("Block reconnect."); - - clientSpi.writeLatch = new CountDownLatch(1); - - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - info("Disconnected: " + evt); - - disconnectLatch.countDown(); - } - else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); - - reconnectLatch.countDown(); - } - - return true; + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + // Check that future failed. + assertNotNull(fut.error()); + assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - - srvSpi.failNode(client.cluster().localNode().id(), null); - - assertTrue(disconnectLatch.await(5000, MILLISECONDS)); - - // Check that future failed. - assertNotNull(fut.error()); - assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); - - log.info("Allow reconnect."); - - clientSpi.writeLatch.countDown(); - - assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + }); // Check that after reconnect working. assert clientAtomicSeq.incrementAndGet() >= 0; @@ -285,59 +175,25 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - TestTcpDiscoverySpi srvSpi = spi(srv); - IgniteAtomicReference<String> clientAtomicRef = client.atomicReference("atomicRef", "1st value", true); assertEquals("1st value", clientAtomicRef.get()); assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value")); assertEquals("2st value", clientAtomicRef.get()); - IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRef", "1st value", false); + final IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRef", "1st value", false); assertEquals("2st value", srvAtomicRef.get()); assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value")); assertEquals("3st value", srvAtomicRef.get()); - final CountDownLatch disconnectLatch = new CountDownLatch(1); - final CountDownLatch reconnectLatch = new CountDownLatch(1); - - final TestTcpDiscoverySpi clientSpi = spi(client); - - log.info("Block reconnect."); - - clientSpi.writeLatch = new CountDownLatch(1); - - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - info("Disconnected: " + evt); - - disconnectLatch.countDown(); - } - else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); - - reconnectLatch.countDown(); - } - - return true; + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + assertEquals("3st value", srvAtomicRef.get()); + assertTrue(srvAtomicRef.compareAndSet("3st value", "4st value")); + assertEquals("4st value", srvAtomicRef.get()); } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - - srvSpi.failNode(client.cluster().localNode().id(), null); - - assertTrue(disconnectLatch.await(5000, MILLISECONDS)); - - assertEquals("3st value", srvAtomicRef.get()); - assertTrue(srvAtomicRef.compareAndSet("3st value", "4st value")); - assertEquals("4st value", srvAtomicRef.get()); - - log.info("Allow reconnect."); - - clientSpi.writeLatch.countDown(); - - assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + }); assertEquals("4st value", clientAtomicRef.get()); assertTrue(clientAtomicRef.compareAndSet("4st value", "5st value")); @@ -360,8 +216,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - TestTcpDiscoverySpi srvSpi = spi(srv); - final IgniteAtomicReference<String> clientAtomicRef = client.atomicReference("atomicRefRemoved", "1st value", true); @@ -369,49 +223,17 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value")); assertEquals("2st value", clientAtomicRef.get()); - IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRefRemoved", "1st value", false); + final IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRefRemoved", "1st value", false); assertEquals("2st value", srvAtomicRef.get()); assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value")); assertEquals("3st value", srvAtomicRef.get()); - final CountDownLatch disconnectLatch = new CountDownLatch(1); - final CountDownLatch reconnectLatch = new CountDownLatch(1); - - final TestTcpDiscoverySpi clientSpi = spi(client); - - log.info("Block reconnect."); - - clientSpi.writeLatch = new CountDownLatch(1); - - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - info("Disconnected: " + evt); - - disconnectLatch.countDown(); - } - else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); - - reconnectLatch.countDown(); - } - - return true; + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvAtomicRef.close(); } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - - srvSpi.failNode(client.cluster().localNode().id(), null); - - assertTrue(disconnectLatch.await(5000, MILLISECONDS)); - - srvAtomicRef.close(); - - log.info("Allow reconnect."); - - clientSpi.writeLatch.countDown(); - - assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + }); GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { @@ -432,8 +254,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - TestTcpDiscoverySpi srvSpi = spi(srv); - final IgniteAtomicReference<String> clientAtomicRef = client.atomicReference("atomicRefInProg", "1st value", true); @@ -468,45 +288,13 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr servCommSpi.unblockMsg(); - final CountDownLatch disconnectLatch = new CountDownLatch(1); - final CountDownLatch reconnectLatch = new CountDownLatch(1); - - final TestTcpDiscoverySpi clientSpi = spi(client); - - log.info("Block reconnect."); - - clientSpi.writeLatch = new CountDownLatch(1); - - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - info("Disconnected: " + evt); - - disconnectLatch.countDown(); - } - else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); - - reconnectLatch.countDown(); - } - - return true; + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + // Check that future failed. + assertNotNull(fut.error()); + assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - - srvSpi.failNode(client.cluster().localNode().id(), null); - - assertTrue(disconnectLatch.await(5000, MILLISECONDS)); - - // Check that future failed. - assertNotNull(fut.error()); - assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); - - log.info("Allow reconnect."); - - clientSpi.writeLatch.countDown(); - - assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + }); // Check that after reconnect working. assertEquals("3st value", clientAtomicRef.get()); @@ -530,59 +318,25 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - TestTcpDiscoverySpi srvSpi = spi(srv); - IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStamped", 0, 0, true); assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1)); assertEquals(1, clientAtomicStamped.value()); assertEquals(1, clientAtomicStamped.stamp()); - IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStamped", 0, 0, false); + final IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStamped", 0, 0, false); assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2)); assertEquals(2, srvAtomicStamped.value()); assertEquals(2, srvAtomicStamped.stamp()); - final CountDownLatch disconnectLatch = new CountDownLatch(1); - final CountDownLatch reconnectLatch = new CountDownLatch(1); - - final TestTcpDiscoverySpi clientSpi = spi(client); - - log.info("Block reconnect."); - - clientSpi.writeLatch = new CountDownLatch(1); - - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - info("Disconnected: " + evt); - - disconnectLatch.countDown(); - } - else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); - - reconnectLatch.countDown(); - } - - return true; + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + assertEquals(true, srvAtomicStamped.compareAndSet(2, 3, 2, 3)); + assertEquals(3, srvAtomicStamped.value()); + assertEquals(3, srvAtomicStamped.stamp()); } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - - srvSpi.failNode(client.cluster().localNode().id(), null); - - assertTrue(disconnectLatch.await(5000, MILLISECONDS)); - - assertEquals(true, srvAtomicStamped.compareAndSet(2, 3, 2, 3)); - assertEquals(3, srvAtomicStamped.value()); - assertEquals(3, srvAtomicStamped.stamp()); - - log.info("Allow reconnect."); - - clientSpi.writeLatch.countDown(); - - assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + }); assertEquals(true, clientAtomicStamped.compareAndSet(3, 4, 3, 4)); assertEquals(4, clientAtomicStamped.value()); @@ -605,57 +359,23 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - TestTcpDiscoverySpi srvSpi = spi(srv); - final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedRemoved", 0, 0, true); assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1)); assertEquals(1, clientAtomicStamped.value()); assertEquals(1, clientAtomicStamped.stamp()); - IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStampedRemoved", 0, 0, false); + final IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStampedRemoved", 0, 0, false); assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2)); assertEquals(2, srvAtomicStamped.value()); assertEquals(2, srvAtomicStamped.stamp()); - final CountDownLatch disconnectLatch = new CountDownLatch(1); - final CountDownLatch reconnectLatch = new CountDownLatch(1); - - final TestTcpDiscoverySpi clientSpi = spi(client); - - log.info("Block reconnect."); - - clientSpi.writeLatch = new CountDownLatch(1); - - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - info("Disconnected: " + evt); - - disconnectLatch.countDown(); - } - else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); - - reconnectLatch.countDown(); - } - - return true; + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvAtomicStamped.close(); } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - - srvSpi.failNode(client.cluster().localNode().id(), null); - - assertTrue(disconnectLatch.await(5000, MILLISECONDS)); - - srvAtomicStamped.close(); - - log.info("Allow reconnect."); - - clientSpi.writeLatch.countDown(); - - assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + }); GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { @@ -676,8 +396,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - TestTcpDiscoverySpi srvSpi = spi(srv); - final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedInProgress", 0, 0, true); assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1)); @@ -711,45 +429,13 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr servCommSpi.unblockMsg(); - final CountDownLatch disconnectLatch = new CountDownLatch(1); - final CountDownLatch reconnectLatch = new CountDownLatch(1); - - final TestTcpDiscoverySpi clientSpi = spi(client); - - log.info("Block reconnect."); - - clientSpi.writeLatch = new CountDownLatch(1); - - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - info("Disconnected: " + evt); - - disconnectLatch.countDown(); - } - else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); - - reconnectLatch.countDown(); - } - - return true; + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + // Check that future failed. + assertNotNull(fut.error()); + assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - - srvSpi.failNode(client.cluster().localNode().id(), null); - - assertTrue(disconnectLatch.await(5000, MILLISECONDS)); - - // Check that future failed. - assertNotNull(fut.error()); - assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); - - log.info("Allow reconnect."); - - clientSpi.writeLatch.countDown(); - - assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + }); // Check that after reconnect working. assertEquals(true, clientAtomicStamped.compareAndSet(2, 3, 2, 3)); @@ -773,53 +459,19 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - TestTcpDiscoverySpi srvSpi = spi(srv); - IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLong", 0, true); assertEquals(0L, clientAtomicLong.getAndAdd(1)); - IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLong", 0, false); + final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLong", 0, false); assertEquals(1L, srvAtomicLong.getAndAdd(1)); - final CountDownLatch disconnectLatch = new CountDownLatch(1); - final CountDownLatch reconnectLatch = new CountDownLatch(1); - - final TestTcpDiscoverySpi clientSpi = spi(client); - - log.info("Block reconnect."); - - clientSpi.writeLatch = new CountDownLatch(1); - - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - info("Disconnected: " + evt); - - disconnectLatch.countDown(); - } - else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); - - reconnectLatch.countDown(); - } - - return true; + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + assertEquals(2L, srvAtomicLong.getAndAdd(1)); } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - - srvSpi.failNode(client.cluster().localNode().id(), null); - - assertTrue(disconnectLatch.await(5000, MILLISECONDS)); - - assertEquals(2L, srvAtomicLong.getAndAdd(1)); - - log.info("Allow reconnect."); - - clientSpi.writeLatch.countDown(); - - assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + }); assertEquals(3L, clientAtomicLong.getAndAdd(1)); @@ -838,53 +490,19 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - TestTcpDiscoverySpi srvSpi = spi(srv); - final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongRmv", 0, true); assertEquals(0L, clientAtomicLong.getAndAdd(1)); - IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongRmv", 0, false); + final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongRmv", 0, false); assertEquals(1L, srvAtomicLong.getAndAdd(1)); - final CountDownLatch disconnectLatch = new CountDownLatch(1); - final CountDownLatch reconnectLatch = new CountDownLatch(1); - - final TestTcpDiscoverySpi clientSpi = spi(client); - - log.info("Block reconnect."); - - clientSpi.writeLatch = new CountDownLatch(1); - - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - info("Disconnected: " + evt); - - disconnectLatch.countDown(); - } - else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); - - reconnectLatch.countDown(); - } - - return true; + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvAtomicLong.close(); } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - - srvSpi.failNode(client.cluster().localNode().id(), null); - - assertTrue(disconnectLatch.await(5000, MILLISECONDS)); - - srvAtomicLong.close(); - - log.info("Allow reconnect."); - - clientSpi.writeLatch.countDown(); - - assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + }); GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { @@ -905,8 +523,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - TestTcpDiscoverySpi srvSpi = spi(srv); - BlockTpcCommunicationSpi commSpi = commSpi(srv); final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongInProggress", 0, true); @@ -932,45 +548,13 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr commSpi.unblockMsg(); - final CountDownLatch disconnectLatch = new CountDownLatch(1); - final CountDownLatch reconnectLatch = new CountDownLatch(1); - - final TestTcpDiscoverySpi clientSpi = spi(client); - - log.info("Block reconnect."); - - clientSpi.writeLatch = new CountDownLatch(1); - - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - info("Disconnected: " + evt); - - disconnectLatch.countDown(); - } - else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); - - reconnectLatch.countDown(); - } - - return true; + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + // Check that future failed. + assertNotNull(fut.error()); + assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - - srvSpi.failNode(client.cluster().localNode().id(), null); - - assertTrue(disconnectLatch.await(5000, MILLISECONDS)); - - // Check that future failed. - assertNotNull(fut.error()); - assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); - - log.info("Allow reconnect."); - - clientSpi.writeLatch.countDown(); - - assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + }); // Check that after reconnect working. assertEquals(1, clientAtomicLong.addAndGet(1)); @@ -989,53 +573,19 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - TestTcpDiscoverySpi srvSpi = spi(srv); - IgniteCountDownLatch clientLatch = client.countDownLatch("latch1", 3, false, true); assertEquals(3, clientLatch.count()); - IgniteCountDownLatch srvLatch = srv.countDownLatch("latch1", 3, false, false); + final IgniteCountDownLatch srvLatch = srv.countDownLatch("latch1", 3, false, false); assertEquals(3, srvLatch.count()); - final CountDownLatch disconnectLatch = new CountDownLatch(1); - final CountDownLatch reconnectLatch = new CountDownLatch(1); - - final TestTcpDiscoverySpi clientSpi = spi(client); - - log.info("Block reconnect."); - - clientSpi.writeLatch = new CountDownLatch(1); - - client.events().localListen(new IgnitePredicate<Event>() { - @Override - public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - info("Disconnected: " + evt); - - disconnectLatch.countDown(); - } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); - - reconnectLatch.countDown(); - } - - return true; + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvLatch.countDown(); } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - - srvSpi.failNode(client.cluster().localNode().id(), null); - - assertTrue(disconnectLatch.await(5000, MILLISECONDS)); - - srvLatch.countDown(); - - log.info("Allow reconnect."); - - clientSpi.writeLatch.countDown(); - - assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + }); assertEquals(2, srvLatch.count()); assertEquals(2, clientLatch.count()); @@ -1053,70 +603,4 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(srvLatch.await(1000)); assertTrue(clientLatch.await(1000)); } - - /** - * @throws Exception If failed. - */ - public void _testLatchReconnect2() throws Exception { - Ignite client = grid(serverCount()); - - assertTrue(client.cluster().localNode().isClient()); - - Ignite srv = clientRouter(client); - - TestTcpDiscoverySpi srvSpi = spi(srv); - - final IgniteCountDownLatch clientLatch = client.countDownLatch("latch2", 1, false, true); - - IgniteCountDownLatch srvLatch = srv.countDownLatch("latch2", 1, false, false); - - assertFalse(clientLatch.await(100)); - - IgniteInternalFuture<Boolean> waitFut = GridTestUtils.runAsync(new Callable<Boolean>() { - @Override public Boolean call() throws Exception { - return clientLatch.await(60_000, MILLISECONDS); - } - }); - - final CountDownLatch disconnectLatch = new CountDownLatch(1); - final CountDownLatch reconnectLatch = new CountDownLatch(1); - - final TestTcpDiscoverySpi clientSpi = spi(client); - - log.info("Block reconnect."); - - clientSpi.writeLatch = new CountDownLatch(1); - - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - info("Disconnected: " + evt); - - disconnectLatch.countDown(); - } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); - - reconnectLatch.countDown(); - } - - return true; - } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - - srvSpi.failNode(client.cluster().localNode().id(), null); - - assertTrue(disconnectLatch.await(5000, MILLISECONDS)); - - srvLatch.countDown(); - - assertNotDone(waitFut); - - log.info("Allow reconnect."); - - clientSpi.writeLatch.countDown(); - - assertTrue(reconnectLatch.await(5000, MILLISECONDS)); - - assertTrue(waitFut.get(5000)); - } }