Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 c221e773a -> 97a62245f
IGNITE-901 Added tests for Sequence. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/97a62245 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/97a62245 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/97a62245 Branch: refs/heads/ignite-901 Commit: 97a62245f17af1f83070e1390f4af8525d698960 Parents: c221e77 Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Fri Jul 3 14:03:21 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Fri Jul 3 14:03:21 2015 +0300 ---------------------------------------------------------------------- .../IgniteClientReconnectAtomicsTest.java | 232 +++++++++++++++++++ 1 file changed, 232 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/97a62245/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java index 1a5b795..d7f6170 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java @@ -46,6 +46,238 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr /** * @throws Exception If failed. */ + public void testAtomicSeqReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeq", 0, true); + + assertEquals(1L, clientAtomicSeq.incrementAndGet()); + + IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeq", 0, false); + + assertEquals(1001L, srvAtomicSeq.incrementAndGet()); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + + assertEquals(1002L, srvAtomicSeq.incrementAndGet()); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + assertEquals(2L, clientAtomicSeq.incrementAndGet()); + + assertEquals(1003L, srvAtomicSeq.incrementAndGet()); + + assertEquals(3L, clientAtomicSeq.incrementAndGet()); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicSeqReconnectRemoved() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqRmv", 0, true); + + clientAtomicSeq.batchSize(1); + + assertEquals(1L, clientAtomicSeq.incrementAndGet()); + + final IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeqRmv", 0, false); + + srvAtomicSeq.batchSize(1); + + assertEquals(1001L, srvAtomicSeq.incrementAndGet()); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + + srvAtomicSeq.close(); + + assert srvAtomicSeq.removed(); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + U.sleep(1000); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < 2000; i++) + clientAtomicSeq.incrementAndGet(); + + return null; + } + }, IgniteException.class, null); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicSeqReconnectInProgress() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqInProg", 0, true); + + clientAtomicSeq.batchSize(1); + + final IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeqInProg", 0, false); + + srvAtomicSeq.batchSize(1); + + commSpi.msgClass = GridNearLockResponse.class; + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < 3000; i++) + clientAtomicSeq.incrementAndGet(); + + return null; + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + commSpi.unblockMsg(); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + + // Check that future failed. + assertNotNull(fut.error()); + assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + // Check that after reconnect working. + assert clientAtomicSeq.incrementAndGet() >= 0; + assert srvAtomicSeq.incrementAndGet() >= 0; + + clientAtomicSeq.close(); + } + + /** + * @throws Exception If failed. + */ public void testAtomicReferenceReconnect() throws Exception { Ignite client = grid(serverCount());