http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java new file mode 100644 index 0000000..37773cd --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java @@ -0,0 +1,846 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.testframework.*; + +import javax.cache.*; +import javax.cache.processor.*; +import java.util.*; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.events.EventType.*; + +/** + * + */ +public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnectAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration(new CacheConfiguration()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 1; + } + + /** + * @throws Exception If failed. + */ + public void testErrorOnDisconnect() throws Exception { + // Check cache operations. + cacheOperationsTest(); + + // Check cache operations. + beforeTestsStarted(); + dataStructureOperationsTest(); + + // Check ignite operations. + beforeTestsStarted(); + igniteOperationsTest(); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void dataStructureOperationsTest() throws Exception { + clientMode = true; + + final Ignite client = startGrid(serverCount()); + + doTestIgniteOperationOnDisconnect(client, Arrays.asList( + // Check atomic long. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + boolean failed = false; + + try { + client.atomicLong("testAtomic", 41, true); + } + catch (IgniteClientDisconnectedException e) { + failed = true; + + checkAndWait(e); + } + + assertTrue(failed); + + return client.atomicLong("testAtomic", 41, true); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNotNull(o); + + IgniteAtomicLong atomicLong = (IgniteAtomicLong)o; + + assertEquals(42, atomicLong.incrementAndGet()); + + return true; + } + } + ), + // Check set. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + boolean failed = false; + + try { + client.set("testSet", new CollectionConfiguration()); + } + catch (IgniteClientDisconnectedException e) { + failed = true; + + checkAndWait(e); + } + + assertTrue(failed); + + return client.set("testSet", new CollectionConfiguration()); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNotNull(o); + + IgniteSet set = (IgniteSet)o; + + String val = "testVal"; + + set.add(val); + + assertEquals(1, set.size()); + assertTrue(set.contains(val)); + + return true; + } + } + ), + // Check ignite queue. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + boolean failed = false; + + try { + client.queue("TestQueue", 10, new CollectionConfiguration()); + } + catch (IgniteClientDisconnectedException e) { + failed = true; + + checkAndWait(e); + } + + assertTrue(failed); + + return client.queue("TestQueue", 10, new CollectionConfiguration()); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNotNull(o); + + IgniteQueue queue = (IgniteQueue)o; + + String val = "Test"; + + queue.add(val); + + assertEquals(val, queue.poll()); + + return true; + } + } + ) + )); + + clientMode = false; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void cacheOperationsTest() throws Exception { + clientMode = true; + + final Ignite client = startGrid(serverCount()); + + final IgniteCache<Object, Object> dfltCache = client.cache(null); + + assertNotNull(dfltCache); + + doTestIgniteOperationOnDisconnect(client, Arrays.asList( + // Check put and get operation. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + boolean failed = false; + + try { + dfltCache.getAndPut(9999, 9999); + } + catch (CacheException e) { + failed = true; + + checkAndWait(e); + } + + assertTrue(failed); + + return dfltCache.getAndPut(9999, 9999); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNull(o); + + assertEquals(9999, dfltCache.get(9999)); + + return true; + } + } + ), + // Check put operation. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + boolean failed = false; + + try { + dfltCache.put(10000, 10000); + } + catch (CacheException e) { + failed = true; + + checkAndWait(e); + } + + assertTrue(failed); + + dfltCache.put(10000, 10000); + + return true; + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertTrue((Boolean)o); + + assertEquals(10000, dfltCache.get(10000)); + + return true; + } + } + ), + // Check get operation. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + boolean failed = false; + + try { + dfltCache.get(10001); + } + catch (CacheException e) { + failed = true; + + checkAndWait(e); + } + + assertTrue(failed); + + return dfltCache.get(10001); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNull(o); + + return true; + } + } + ), + // Check invoke operation. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + boolean failed = false; + + try { + dfltCache.invoke(10000, new CacheEntryProcessor<Object, Object, Object>() { + @Override public Object process(MutableEntry<Object, Object> entry, + Object... arguments) throws EntryProcessorException { + assertTrue(entry.exists()); + + return (int)entry.getValue() * 2; + } + }); + } + catch (CacheException e) { + failed = true; + + checkAndWait(e); + } + + assertTrue(failed); + + return dfltCache.invoke(10000, new CacheEntryProcessor<Object, Object, Object>() { + @Override public Object process(MutableEntry<Object, Object> entry, + Object... arguments) throws EntryProcessorException { + assertTrue(entry.exists()); + + return (int)entry.getValue() * 2; + } + }); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNotNull(o); + + assertEquals(20000, (int)o); + + return true; + } + } + ), + // Check put async operation. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + IgniteCache<Object, Object> async = dfltCache.withAsync(); + + boolean failed = false; + + try { + async.put(10002, 10002); + + async.future().get(); + } + catch (CacheException e) { + failed = true; + + checkAndWait(e); + } + + assertTrue(failed); + + async.put(10002, 10002); + + return async.future().get(); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNull(o); + + assertEquals(10002, dfltCache.get(10002)); + + return true; + } + } + ), + // Check transaction. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + boolean failed = false; + + try { + client.transactions(); + } + catch (IgniteClientDisconnectedException e) { + failed = true; + + checkAndWait(e); + } + + assertTrue(failed); + + return client.transactions(); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + IgniteTransactions txs = (IgniteTransactions)o; + + assertNotNull(txs); + + return true; + } + } + ), + // Check get cache. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + boolean failed = false; + + try { + client.cache(null); + } + catch (IgniteClientDisconnectedException e) { + failed = true; + + checkAndWait(e); + } + + assertTrue(failed); + + return client.cache(null); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + IgniteCache<Object, Object> cache0 = (IgniteCache<Object, Object>)o; + + assertNotNull(cache0); + + cache0.put(1, 1); + + assertEquals(1, cache0.get(1)); + + return true; + } + } + ), + // Check streamer. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + boolean failed = false; + + try { + client.dataStreamer(null); + } + catch (IgniteClientDisconnectedException e) { + failed = true; + + checkAndWait(e); + } + + assertTrue(failed); + + return client.dataStreamer(null); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + IgniteDataStreamer<Object, Object> streamer = (IgniteDataStreamer<Object, Object>)o; + + streamer.addData(2, 2); + + streamer.close(); + + assertEquals(2, client.cache(null).get(2)); + + return true; + } + } + ), + // Check create cache. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + boolean failed = false; + + try { + client.createCache("test_cache"); + } + catch (IgniteClientDisconnectedException e) { + failed = true; + + checkAndWait(e); + } + + assertTrue(failed); + + return client.createCache("test_cache"); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + IgniteCache<Object, Object> cache = (IgniteCache<Object, Object>)o; + + assertNotNull(cache); + + cache.put(1, 1); + + assertEquals(1, cache.get(1)); + + return true; + } + } + ) + + )); + + clientMode = false; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void igniteOperationsTest() throws Exception { + clientMode = true; + + final Ignite client = startGrid(serverCount()); + + final IgniteCache<Object, Object> dfltCache = client.cache(null); + + final CountDownLatch recvLatch = new CountDownLatch(1); + + assertNotNull(dfltCache); + + doTestIgniteOperationOnDisconnect(client, Arrays.asList( + // Check compute. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + boolean failed = false; + + try { + client.compute(); + } + catch (IgniteClientDisconnectedException e) { + failed = true; + + checkAndWait(e); + } + + assertTrue(failed); + + return client.compute(); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + IgniteCompute comp = (IgniteCompute)o; + + Collection<UUID> uuids = comp.broadcast(new IgniteCallable<UUID>() { + @IgniteInstanceResource + private Ignite ignite; + + @Override public UUID call() throws Exception { + return ignite.cluster().localNode().id(); + } + }); + + assertFalse(uuids.isEmpty()); + + for (UUID uuid : uuids) + assertNotNull(uuid); + + return true; + } + } + ), + + // Check ping node. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + boolean failed = false; + + try { + client.cluster().pingNode(new UUID(0, 0)); + } + catch (IgniteClientDisconnectedException e) { + failed = true; + + checkAndWait(e); + } + + assertTrue(failed); + + return client.cluster().pingNode(new UUID(0, 0)); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + Boolean pingNode = (Boolean)o; + + assertFalse(pingNode); + + return true; + } + } + ), + // Check register remote listener. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + boolean failed = false; + + try { + client.events().remoteListen(null, new IgnitePredicate<Event>() { + @Override public boolean apply(Event event) { + return true; + } + }); + } + catch (IgniteClientDisconnectedException e) { + failed = true; + + checkAndWait(e); + } + + assertTrue(failed); + + return client.events().remoteListen(null, new IgnitePredicate<Event>() { + @Override public boolean apply(Event event) { + return true; + } + }); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + UUID remoteId = (UUID)o; + + assertNotNull(remoteId); + + client.events().stopRemoteListen(remoteId); + + return true; + } + } + ), + // Check message operation. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + boolean failed = false; + + try { + client.message().remoteListen(null, new IgniteBiPredicate<UUID, Object>() { + @Override public boolean apply(UUID uuid, Object o) { + if (o.equals("Test message.")) + recvLatch.countDown(); + + return true; + } + }); + } + catch (IgniteClientDisconnectedException e) { + failed = true; + + checkAndWait(e); + } + + assertTrue(failed); + + return client.message().remoteListen(null, new IgniteBiPredicate<UUID, Object>() { + @Override public boolean apply(UUID uuid, Object o) { + if (o.equals("Test message.")) + recvLatch.countDown(); + + return true; + } + }); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNotNull(o); + + IgniteMessaging msg = client.message(); + + msg.send(null, "Test message."); + + try { + assertTrue(recvLatch.await(2, SECONDS)); + } + catch (InterruptedException e) { + fail("Message wasn't received."); + } + + return true; + } + } + ), + // Check executor. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + boolean failed = false; + + try { + client.executorService().submit(new Callable<Integer>() { + @Override public Integer call() throws Exception { + return 42; + } + }); + } + catch (IgniteClientDisconnectedException e) { + failed = true; + + checkAndWait(e); + } + + assertTrue(failed); + + return client.executorService().submit(new Callable<Integer>() { + @Override public Integer call() throws Exception { + return 42; + } + }); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNotNull(o); + + Future<Integer> fut = (Future<Integer>)o; + + try { + assertEquals(42, (int)fut.get()); + } + catch (Exception e) { + fail("Failed submit task."); + } + + return true; + } + } + ) + )); + + clientMode = false; + } + + /** + * @param client Client. + * @param ops Operations closures. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + private void doTestIgniteOperationOnDisconnect(Ignite client, final List<T2<Callable, C1<Object, Boolean>>> ops) + throws Exception { + assertNotNull(client.cache(null)); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + final List<IgniteInternalFuture> futs = new ArrayList<>(); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + assertEquals(1, reconnectLatch.getCount()); + + for (T2<Callable, C1<Object, Boolean>> op : ops) + futs.add(GridTestUtils.runAsync(op.get1())); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + try { + log.info("Fail client."); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + waitReconnectEvent(disconnectLatch); + + assertEquals(ops.size(), futs.size()); + + for (IgniteInternalFuture<?> fut : futs) + assertNotDone(fut); + + U.sleep(2000); + + for (IgniteInternalFuture<?> fut : futs) + assertNotDone(fut); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + waitReconnectEvent(reconnectLatch); + + // Check operation after reconnect working. + for (int i = 0; i < futs.size(); i++) { + final int i0 = i; + + try { + final Object futRes = futs.get(i0).get(2, SECONDS); + + assertTrue(GridTestUtils.runAsync(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + return ops.get(i0).get2().apply(futRes); + } + }).get(2, SECONDS)); + } + catch (IgniteFutureTimeoutCheckedException e) { + e.printStackTrace(); + + fail("Operation timeout. Iteration: " + i + "."); + } + } + } + finally { + clientSpi.writeLatch.countDown(); + + for (IgniteInternalFuture fut : futs) + fut.cancel(); + + stopAllGrids(); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java new file mode 100644 index 0000000..bb568ab --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java @@ -0,0 +1,672 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.testframework.*; + +import java.util.concurrent.*; + +/** + * + */ +public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstractTest { + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected int clientCount() { + return 1; + } + + /** + * @throws Exception If failed. + */ + public void testAtomicSeqReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeq", 0, true); + + assertEquals(1L, clientAtomicSeq.incrementAndGet()); + + final IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeq", 0, false); + + assertEquals(1001L, srvAtomicSeq.incrementAndGet()); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + assertEquals(1002L, srvAtomicSeq.incrementAndGet()); + } + }); + + assertEquals(2L, clientAtomicSeq.incrementAndGet()); + + assertEquals(1003L, srvAtomicSeq.incrementAndGet()); + + assertEquals(3L, clientAtomicSeq.incrementAndGet()); + + clientAtomicSeq.close(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicSeqReconnectRemoved() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqRmv", 0, true); + + clientAtomicSeq.batchSize(1); + + assertEquals(1L, clientAtomicSeq.incrementAndGet()); + + final IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeqRmv", 0, false); + + srvAtomicSeq.batchSize(1); + + assertEquals(1001L, srvAtomicSeq.incrementAndGet()); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvAtomicSeq.close(); + + assert srvAtomicSeq.removed(); + } + }); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < 2000; i++) + clientAtomicSeq.incrementAndGet(); + + return null; + } + }, IllegalStateException.class, null); + + IgniteAtomicSequence newClientAtomicSeq = client.atomicSequence("atomicSeqRmv", 0, true); + + assertEquals(0, newClientAtomicSeq.get()); + + assertEquals(1, newClientAtomicSeq.incrementAndGet()); + + newClientAtomicSeq.close(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicSeqReconnectInProgress() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqInProg", 0, true); + + clientAtomicSeq.batchSize(1); + + final IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeqInProg", 0, false); + + srvAtomicSeq.batchSize(1); + + commSpi.blockMessage(GridNearLockResponse.class); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < 3000; i++) { + try { + clientAtomicSeq.incrementAndGet(); + } + catch (IgniteClientDisconnectedException e) { + checkAndWait(e); + + return true; + } + } + + return false; + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + commSpi.unblockMessage(); + + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); + + // Check that after reconnect working. + assert clientAtomicSeq.incrementAndGet() >= 0; + assert srvAtomicSeq.incrementAndGet() >= 0; + + clientAtomicSeq.close(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReferenceReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + IgniteAtomicReference<String> clientAtomicRef = client.atomicReference("atomicRef", "1st value", true); + + assertEquals("1st value", clientAtomicRef.get()); + assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value")); + assertEquals("2st value", clientAtomicRef.get()); + + final IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRef", "1st value", false); + + assertEquals("2st value", srvAtomicRef.get()); + assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value")); + assertEquals("3st value", srvAtomicRef.get()); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + assertEquals("3st value", srvAtomicRef.get()); + assertTrue(srvAtomicRef.compareAndSet("3st value", "4st value")); + assertEquals("4st value", srvAtomicRef.get()); + } + }); + + assertEquals("4st value", clientAtomicRef.get()); + assertTrue(clientAtomicRef.compareAndSet("4st value", "5st value")); + assertEquals("5st value", clientAtomicRef.get()); + + assertEquals("5st value", srvAtomicRef.get()); + assertTrue(srvAtomicRef.compareAndSet("5st value", "6st value")); + assertEquals("6st value", srvAtomicRef.get()); + + srvAtomicRef.close(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReferenceReconnectRemoved() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + final IgniteAtomicReference<String> clientAtomicRef = + client.atomicReference("atomicRefRemoved", "1st value", true); + + assertEquals("1st value", clientAtomicRef.get()); + assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value")); + assertEquals("2st value", clientAtomicRef.get()); + + final IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRefRemoved", "1st value", false); + + assertEquals("2st value", srvAtomicRef.get()); + assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value")); + assertEquals("3st value", srvAtomicRef.get()); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvAtomicRef.close(); + } + }); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + clientAtomicRef.compareAndSet("3st value", "4st value"); + + return null; + } + }, IllegalStateException.class, null); + + IgniteAtomicReference<String> newClientAtomicRef = + client.atomicReference("atomicRefRemoved", "1st value", true); + + IgniteAtomicReference<String> newSrvAtomicRef = srv.atomicReference("atomicRefRemoved", "1st value", false); + + assertEquals("1st value", newClientAtomicRef.get()); + assertTrue(newClientAtomicRef.compareAndSet("1st value", "2st value")); + assertEquals("2st value", newClientAtomicRef.get()); + + assertEquals("2st value", newSrvAtomicRef.get()); + assertTrue(newSrvAtomicRef.compareAndSet("2st value", "3st value")); + assertEquals("3st value", newSrvAtomicRef.get()); + + newClientAtomicRef.close(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReferenceReconnectInProgress() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + final IgniteAtomicReference<String> clientAtomicRef = + client.atomicReference("atomicRefInProg", "1st value", true); + + assertEquals("1st value", clientAtomicRef.get()); + assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value")); + assertEquals("2st value", clientAtomicRef.get()); + + IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRefInProg", "1st value", false); + + assertEquals("2st value", srvAtomicRef.get()); + assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value")); + assertEquals("3st value", srvAtomicRef.get()); + + BlockTpcCommunicationSpi servCommSpi = commSpi(srv); + + servCommSpi.blockMessage(GridNearLockResponse.class); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + clientAtomicRef.compareAndSet("3st value", "4st value"); + } + catch (IgniteClientDisconnectedException e) { + checkAndWait(e); + + return true; + } + + return false; + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + servCommSpi.unblockMessage(); + + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); + + // Check that after reconnect working. + assertEquals("3st value", clientAtomicRef.get()); + assertTrue(clientAtomicRef.compareAndSet("3st value", "4st value")); + assertEquals("4st value", clientAtomicRef.get()); + + assertEquals("4st value", srvAtomicRef.get()); + assertTrue(srvAtomicRef.compareAndSet("4st value", "5st value")); + assertEquals("5st value", srvAtomicRef.get()); + + srvAtomicRef.close(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicStampedReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStamped", 0, 0, true); + + assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1)); + assertEquals(1, clientAtomicStamped.value()); + assertEquals(1, clientAtomicStamped.stamp()); + + final IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStamped", 0, 0, false); + + assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2)); + assertEquals(2, srvAtomicStamped.value()); + assertEquals(2, srvAtomicStamped.stamp()); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + assertEquals(true, srvAtomicStamped.compareAndSet(2, 3, 2, 3)); + assertEquals(3, srvAtomicStamped.value()); + assertEquals(3, srvAtomicStamped.stamp()); + } + }); + + assertEquals(true, clientAtomicStamped.compareAndSet(3, 4, 3, 4)); + assertEquals(4, clientAtomicStamped.value()); + assertEquals(4, clientAtomicStamped.stamp()); + + assertEquals(true, srvAtomicStamped.compareAndSet(4, 5, 4, 5)); + assertEquals(5, srvAtomicStamped.value()); + assertEquals(5, srvAtomicStamped.stamp()); + + srvAtomicStamped.close(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicStampedReconnectRemoved() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedRemoved", 0, 0, true); + + assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1)); + assertEquals(1, clientAtomicStamped.value()); + assertEquals(1, clientAtomicStamped.stamp()); + + final IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStampedRemoved", 0, 0, false); + + assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2)); + assertEquals(2, srvAtomicStamped.value()); + assertEquals(2, srvAtomicStamped.stamp()); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvAtomicStamped.close(); + } + }); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + clientAtomicStamped.compareAndSet(2, 3, 2, 3); + + return null; + } + }, IllegalStateException.class, null); + + IgniteAtomicStamped newClientAtomicStamped = client.atomicStamped("atomicStampedRemoved", 0, 0, true); + + assertEquals(true, newClientAtomicStamped.compareAndSet(0, 1, 0, 1)); + assertEquals(1, newClientAtomicStamped.value()); + assertEquals(1, newClientAtomicStamped.stamp()); + + IgniteAtomicStamped newSrvAtomicStamped = srv.atomicStamped("atomicStampedRemoved", 0, 0, false); + + assertEquals(true, newSrvAtomicStamped.compareAndSet(1, 2, 1, 2)); + assertEquals(2, newSrvAtomicStamped.value()); + assertEquals(2, newSrvAtomicStamped.stamp()); + + newClientAtomicStamped.close(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicStampedReconnectInProgress() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedInProgress", 0, 0, true); + + assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1)); + assertEquals(1, clientAtomicStamped.value()); + assertEquals(1, clientAtomicStamped.stamp()); + + IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStampedInProgress", 0, 0, false); + + assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2)); + assertEquals(2, srvAtomicStamped.value()); + assertEquals(2, srvAtomicStamped.stamp()); + + BlockTpcCommunicationSpi servCommSpi = commSpi(srv); + + servCommSpi.blockMessage(GridNearLockResponse.class); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + clientAtomicStamped.compareAndSet(2, 3, 2, 3); + } + catch (IgniteClientDisconnectedException e) { + checkAndWait(e); + + return true; + } + + return false; + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + servCommSpi.unblockMessage(); + + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); + + // Check that after reconnect working. + assertEquals(true, clientAtomicStamped.compareAndSet(2, 3, 2, 3)); + assertEquals(3, clientAtomicStamped.value()); + assertEquals(3, clientAtomicStamped.stamp()); + + assertEquals(true, srvAtomicStamped.compareAndSet(3, 4, 3, 4)); + assertEquals(4, srvAtomicStamped.value()); + assertEquals(4, srvAtomicStamped.stamp()); + + srvAtomicStamped.close(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicLongReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLong", 0, true); + + assertEquals(0L, clientAtomicLong.getAndAdd(1)); + + final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLong", 0, false); + + assertEquals(1L, srvAtomicLong.getAndAdd(1)); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + assertEquals(2L, srvAtomicLong.getAndAdd(1)); + } + }); + + assertEquals(3L, clientAtomicLong.getAndAdd(1)); + + assertEquals(4L, srvAtomicLong.getAndAdd(1)); + + assertEquals(5L, clientAtomicLong.getAndAdd(1)); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicLongReconnectRemoved() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongRmv", 0, true); + + assertEquals(0L, clientAtomicLong.getAndAdd(1)); + + final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongRmv", 0, false); + + assertEquals(1L, srvAtomicLong.getAndAdd(1)); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvAtomicLong.close(); + } + }); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + clientAtomicLong.getAndAdd(1); + + return null; + } + }, IllegalStateException.class, null); + + IgniteAtomicLong newClientAtomicLong = client.atomicLong("atomicLongRmv", 0, true); + + assertEquals(0L, newClientAtomicLong.getAndAdd(1)); + + IgniteAtomicLong newSrvAtomicLong = srv.atomicLong("atomicLongRmv", 0, false); + + assertEquals(1L, newSrvAtomicLong.getAndAdd(1)); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicLongReconnectInProgress() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongInProggress", 0, true); + + final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongInProggress", 0, false); + + commSpi.blockMessage(GridNearLockResponse.class); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + clientAtomicLong.getAndAdd(1); + } + catch (IgniteClientDisconnectedException e) { + checkAndWait(e); + + return true; + } + + return false; + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + commSpi.unblockMessage(); + + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); + + // Check that after reconnect working. + assertEquals(1, clientAtomicLong.addAndGet(1)); + assertEquals(2, srvAtomicLong.addAndGet(1)); + + clientAtomicLong.close(); + } + + /** + * @throws Exception If failed. + */ + public void testLatchReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + IgniteCountDownLatch clientLatch = client.countDownLatch("latch1", 3, false, true); + + assertEquals(3, clientLatch.count()); + + final IgniteCountDownLatch srvLatch = srv.countDownLatch("latch1", 3, false, false); + + assertEquals(3, srvLatch.count()); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvLatch.countDown(); + } + }); + + assertEquals(2, srvLatch.count()); + assertEquals(2, clientLatch.count()); + + srvLatch.countDown(); + + assertEquals(1, srvLatch.count()); + assertEquals(1, clientLatch.count()); + + clientLatch.countDown(); + + assertEquals(0, srvLatch.count()); + assertEquals(0, clientLatch.count()); + + assertTrue(srvLatch.await(1000)); + assertTrue(clientLatch.await(1000)); + } +}