http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java new file mode 100644 index 0000000..7cfc329 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + +/** + * + */ +public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectFailoverAbstractTest { + /** */ + protected static final String ATOMIC_CACHE = "ATOMIC_CACHE"; + + /** */ + protected static final String TX_CACHE = "TX_CACHE"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg1 = new CacheConfiguration(); + + ccfg1.setName(ATOMIC_CACHE); + ccfg1.setBackups(1); + ccfg1.setAtomicityMode(ATOMIC); + + CacheConfiguration ccfg2 = new CacheConfiguration(); + + ccfg2.setName(TX_CACHE); + ccfg2.setBackups(1); + ccfg2.setAtomicityMode(TRANSACTIONAL); + + cfg.setCacheConfiguration(ccfg1, ccfg2); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testReconnectAtomicCache() throws Exception { + final Ignite client = grid(serverCount()); + + final IgniteCache<Integer, Integer> cache = client.cache(ATOMIC_CACHE); + + assertNotNull(cache); + + assertEquals(ATOMIC, cache.getConfiguration(CacheConfiguration.class).getAtomicityMode()); + + reconnectFailover(new Callable<Void>() { + @Override public Void call() throws Exception { + TreeMap<Integer, Integer> map = new TreeMap<>(); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 10; i++) { + Integer key = rnd.nextInt(0, 100_000); + + cache.put(key, key); + + assertEquals(key, cache.get(key)); + + map.put(key, key); + } + + cache.putAll(map); + + Map<Integer, Integer> res = cache.getAll(map.keySet()); + + assertEquals(map, res); + + return null; + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectTxCache() throws Exception { + final Ignite client = grid(serverCount()); + + final IgniteCache<Integer, Integer> cache = client.cache(TX_CACHE); + + assertNotNull(cache); + + assertEquals(TRANSACTIONAL, cache.getConfiguration(CacheConfiguration.class).getAtomicityMode()); + + final IgniteTransactions txs = client.transactions(); + + reconnectFailover(new Callable<Void>() { + @Override public Void call() throws Exception { + TreeMap<Integer, Integer> map = new TreeMap<>(); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 5; i++) { + Integer key = rnd.nextInt(0, 100_000); + + cache.put(key, key); + + assertEquals(key, cache.get(key)); + + map.put(key, key); + } + + for (TransactionConcurrency txConcurrency : TransactionConcurrency.values()) { + try (Transaction tx = txs.txStart(txConcurrency, REPEATABLE_READ)) { + for (Map.Entry<Integer, Integer> e : map.entrySet()) { + cache.put(e.getKey(), e.getValue()); + + assertNotNull(cache.get(e.getKey())); + } + + tx.commit(); + } + } + + cache.putAll(map); + + Map<Integer, Integer> res = cache.getAll(map.keySet()); + + assertEquals(map, res); + + return null; + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectComputeApi() throws Exception { + final Ignite client = grid(serverCount()); + + final IgniteCompute comp = client.compute(); + + reconnectFailover(new Callable<Void>() { + @Override public Void call() throws Exception { + comp.call(new DummyClosure()); + + comp.broadcast(new DummyClosure()); + + return null; + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectStreamerApi() throws Exception { + final Ignite client = grid(serverCount()); + + reconnectFailover(new Callable<Void>() { + @Override public Void call() throws Exception { + stream(ATOMIC_CACHE); + + stream(TX_CACHE); + + return null; + } + + private void stream(String cacheName) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + try (IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(cacheName)) { + streamer.allowOverwrite(true); + + streamer.perNodeBufferSize(10); + + for (int i = 0; i < 100; i++) + streamer.addData(rnd.nextInt(100_000), 0); + } + } + }); + } + + /** + * + */ + public static class DummyClosure implements IgniteCallable<Object> { + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + return 1; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java new file mode 100644 index 0000000..31b4192 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.processors.service.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.services.*; +import org.apache.ignite.testframework.*; + +import java.util.concurrent.*; + +/** + * + */ +public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbstractTest { + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected int clientCount() { + return 1; + } + + /** + * @throws Exception If failed. + */ + public void testReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + IgniteServices services = client.services(); + + services.deployClusterSingleton("testReconnect", new TestServiceImpl()); + + TestService srvc = services.serviceProxy("testReconnect", TestService.class, false); + + assertNotNull(srvc); + + long topVer = grid(0).cluster().topologyVersion(); + + assertEquals((Object)topVer, srvc.test()); + + Ignite srv = clientRouter(client); + + reconnectClientNode(client, srv, null); + + CountDownLatch latch = new CountDownLatch(1); + + DummyService.exeLatch("testReconnect2", latch); + + services.deployClusterSingleton("testReconnect2", new DummyService()); + + assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + + assertEquals((Object)(topVer + 2), srvc.test()); + } + + /** + * @throws Exception If failed. + */ + public void testServiceRemove() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + IgniteServices clnServices = client.services(); + + final IgniteServices srvServices = srv.services(); + + srvServices.deployClusterSingleton("testServiceRemove", new TestServiceImpl()); + + final TestService srvc = clnServices.serviceProxy("testServiceRemove", TestService.class, false); + + assertNotNull(srvc); + + assertNotNull(srvc.test()); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvServices.cancel("testServiceRemove"); + } + }); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return srvc.test(); + } + }, IgniteException.class, null); + + clnServices.deployClusterSingleton("testServiceRemove", new TestServiceImpl()); + + TestService newSrvc = clnServices.serviceProxy("testServiceRemove", TestService.class, false); + + assertNotNull(newSrvc); + assertNotNull(newSrvc.test()); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectInDeploying() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + final IgniteServices services = client.services(); + + Ignite srv = clientRouter(client); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + commSpi.blockMessage(GridNearTxPrepareResponse.class); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + services.deployClusterSingleton("testReconnectInDeploying", new TestServiceImpl()); + } + catch (IgniteClientDisconnectedException e) { + checkAndWait(e); + + return true; + } + + return false; + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + commSpi.unblockMessage(); + + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectInProgress() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + final IgniteServices services = client.services(); + + final Ignite srv = clientRouter(client); + + services.deployClusterSingleton("testReconnectInProgress", new TestServiceImpl()); + + final TestService srvc = services.serviceProxy("testReconnectInProgress", TestService.class, false); + + assertNotNull(srvc); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + commSpi.blockMessage(GridJobExecuteResponse.class); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + srvc.test(); + } + catch (IgniteClientDisconnectedException e) { + checkAndWait(e); + + return true; + } + + return false; + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + commSpi.unblockMessage(); + + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); + } + + /** + * + */ + public static interface TestService { + /** + * @return Topology version. + */ + public Long test(); + } + + /** + * + */ + public static class TestServiceImpl implements Service, TestService { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public void cancel(ServiceContext ctx) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void init(ServiceContext ctx) throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void execute(ServiceContext ctx) throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Long test() { + assertFalse(ignite.cluster().localNode().isClient()); + + return ignite.cluster().topologyVersion(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java new file mode 100644 index 0000000..98c3d0f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; + +import java.util.concurrent.*; + +import static org.apache.ignite.events.EventType.*; + +/** + * + */ +public class IgniteClientReconnectStopTest extends IgniteClientReconnectAbstractTest { + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 1; + } + + /** + * @throws Exception If failed. + */ + public void testStopWhenDisconnected() throws Exception { + clientMode = true; + + Ignite client = startGrid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + waitReconnectEvent(disconnectLatch); + + IgniteFuture<?> reconnectFut = null; + + try { + client.getOrCreateCache(new CacheConfiguration<>()); + + fail(); + } + catch (IgniteClientDisconnectedException e) { + log.info("Expected operation exception: " + e); + + reconnectFut = e.reconnectFuture(); + } + + assertNotNull(reconnectFut); + + client.close(); + + try { + reconnectFut.get(); + + fail(); + } + catch (IgniteException e) { + log.info("Expected reconnect exception: " + e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java new file mode 100644 index 0000000..a4cf77f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.datastreamer.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.testframework.*; + +import javax.cache.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class IgniteClientReconnectStreamerTest extends IgniteClientReconnectAbstractTest { + /** */ + public static final String CACHE_NAME = "streamer"; + + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected int clientCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<Integer, Integer>(CACHE_NAME) + .setAtomicityMode(ATOMIC) + .setCacheMode(PARTITIONED); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testStreamerReconnect() throws Exception { + final Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + final IgniteCache<Object, Object> srvCache = srv.cache(CACHE_NAME); + + IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(CACHE_NAME); + + for (int i = 0; i < 50; i++) + streamer.addData(i, i); + + streamer.flush(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return srvCache.localSize() == 50; + } + }, 2000L); + + assertEquals(50, srvCache.localSize()); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + try { + client.dataStreamer(CACHE_NAME); + + fail(); + } + catch (IgniteClientDisconnectedException e) { + assertNotNull(e.reconnectFuture()); + } + } + }); + + checkStreamerClosed(streamer); + + streamer = client.dataStreamer(CACHE_NAME); + + for (int i = 50; i < 100; i++) + streamer.addData(i, i); + + streamer.flush(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return srvCache.localSize() == 100; + } + }, 2000L); + + assertEquals(100, srvCache.localSize()); + + streamer.close(); + + streamer.future().get(2, TimeUnit.SECONDS); + + srvCache.removeAll(); + } + + /** + * @throws Exception If failed. + */ + public void testStreamerReconnectInProgress() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + final IgniteCache<Object, Object> srvCache = srv.cache(CACHE_NAME); + + final IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(CACHE_NAME); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + commSpi.blockMessage(DataStreamerResponse.class); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + for (int i = 0; i < 50; i++) + streamer.addData(i, i); + + streamer.flush(); + } + catch (CacheException e) { + checkAndWait(e); + + return true; + } + finally { + streamer.close(); + } + + return false; + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + commSpi.unblockMessage(); + + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); + + checkStreamerClosed(streamer); + + IgniteDataStreamer<Integer, Integer> streamer2 = client.dataStreamer(CACHE_NAME); + + for (int i = 0; i < 50; i++) + streamer2.addData(i, i); + + streamer2.close(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return srvCache.localSize() == 50; + } + }, 2000L); + + assertEquals(50, srvCache.localSize()); + } + + /** + * @param streamer Streamer. + */ + private void checkStreamerClosed(IgniteDataStreamer<Integer, Integer> streamer) { + try { + streamer.addData(100, 100); + + fail(); + } + catch (CacheException e) { + checkAndWait(e); + } + + try { + streamer.flush(); + + fail(); + } + catch (CacheException e) { + checkAndWait(e); + } + + try { + streamer.future().get(); + + fail(); + } + catch (CacheException e) { + checkAndWait(e); + } + + streamer.tryFlush(); + + streamer.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java index 27c2a61..a392245 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java @@ -62,6 +62,7 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest { IgniteConfiguration cfg = super.getConfiguration(gridName); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setClientReconnectDisabled(true); if (getTestGridName(nodeCount() - 1).equals(gridName) || getTestGridName(nodeCount() - 2).equals(gridName)) cfg.setClientMode(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java index 9780080..62f5d41 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.managers.deployment; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.resource.*; +import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.jdk.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.deployment.*; @@ -95,5 +96,11 @@ public class GridDeploymentManagerStopSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override public boolean unregister(String rsrcName) { return false; } + + /** {@inheritDoc} */ + @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) { /* No-op. */ } + + /** {@inheritDoc} */ + @Override public void onClientReconnected(boolean clusterRestarted) { /* No-op. */ } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java index 074f6ff..9c30f23 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java @@ -263,7 +263,7 @@ public abstract class IgniteCacheAbstractStopBusySelfTest extends GridCommonAbst e.printStackTrace(pw); - assertTrue(sw.toString().contains("grid is stopping")); + assertTrue(sw.toString().contains("node is stopping")); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java index 071341e..8703d32 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.IgniteException; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.IgniteInternalFuture; @@ -27,7 +26,7 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; -import javax.cache.Cache; +import javax.cache.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -89,7 +88,8 @@ public class IgniteCacheDynamicStopSelfTest extends GridCommonAbstractTest { @Override public void apply(IgniteFuture<?> f) { try { f.get(); - } catch (IgniteException ignore) { + } + catch (CacheException ignore) { // This may be debugged. } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java index af3ea9d..30bf5dc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.transactions.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.indexing.*; import org.apache.ignite.testframework.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java index d78add6..53404cc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java @@ -75,7 +75,7 @@ public class IgniteCacheSystemTransactionsSelfTest extends GridCacheAbstractSelf jcache.get("1"); jcache.put("1", "11"); - GridCacheAdapter<Object, Object> utilityCache = ignite.context().cache().utilityCache(); + IgniteInternalCache<Object, Object> utilityCache = ignite.context().cache().utilityCache(); utilityCache.getAndPutIfAbsent("2", "2"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java index 19e40bf..7a2e8b3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java @@ -220,7 +220,8 @@ public class GridCacheReplicatedInvalidateSelfTest extends GridCommonAbstractTes Object msg0 = ((GridIoMessage)msg).message(); if (!(msg0 instanceof GridClockDeltaSnapshotMessage)) { - info("Sending message [locNodeId=" + getLocalNodeId() + ", destNodeId= " + destNode.id() + info("Sending message [locNodeId=" + ignite.cluster().localNode().id() + + ", destNodeId= " + destNode.id() + ", msg=" + msg + ']'); synchronized (msgCntMap) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index e9d7a45..9a883b3 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -55,8 +55,8 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { new GridCacheDeploymentManager<K, V>(), new GridCachePartitionExchangeManager<K, V>(), new GridCacheIoManager(), - null, - new CacheNoopJtaManager() + new CacheNoopJtaManager(), + null ), defaultCacheConfiguration(), CacheType.USER, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index ec6a526..63db0c1 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -111,6 +111,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** */ private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite; + /** */ + private boolean reconnectDisabled; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -159,6 +162,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { disco.setJoinTimeout(joinTimeout); disco.setNetworkTimeout(netTimeout); + disco.setClientReconnectDisabled(reconnectDisabled); + disco.afterWrite(afterWrite); cfg.setDiscoverySpi(disco); @@ -524,7 +529,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { TestTcpDiscoverySpi spi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi()); - spi.pauseAll(); + spi.pauseAll(false); try { spi.brakeConnection(); @@ -568,7 +573,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { TestTcpDiscoverySpi spi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi()); - spi.pauseAll(); + spi.pauseAll(false); try { spi.brakeConnection(); @@ -606,7 +611,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { attachListeners(2, 2); - ((TestTcpDiscoverySpi)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll(); + ((TestTcpDiscoverySpi)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll(true); stopGrid("server-2"); @@ -633,6 +638,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { public void testClientSegmentation() throws Exception { clientsPerSrv = 1; + reconnectDisabled = true; + startServerNodes(3); startClientNodes(3); @@ -656,6 +663,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { final TcpDiscoverySpi disco = (TcpDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi(); try { + log.info("Fail server: " + 2); + failServer(2); await(srvFailedLatch); @@ -886,8 +895,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { try { startClientNodes(1); - assertEquals(G.ignite("server-0").cluster().localNode().id(), ((TcpDiscoveryNode)G.ignite("client-0") - .cluster().localNode()).clientRouterNodeId()); + assertEquals(G.ignite("server-0").cluster().localNode().id(), + ((TcpDiscoveryNode) G.ignite("client-0").cluster().localNode()).clientRouterNodeId()); checkNodes(2, 1); @@ -1206,6 +1215,528 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testReconnectAfterFail() throws Exception { + reconnectAfterFail(false); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectAfterFailTopologyChanged() throws Exception { + reconnectAfterFail(true); + } + + /** + * @param changeTop If {@code true} topology is changed after client disconnects. + * @throws Exception If failed. + */ + private void reconnectAfterFail(final boolean changeTop) throws Exception { + startServerNodes(1); + + startClientNodes(1); + + Ignite srv = G.ignite("server-0"); + + TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi()); + + Ignite client = G.ignite("client-0"); + + final ClusterNode clientNode = client.cluster().localNode(); + + final UUID clientId = clientNode.id(); + + final TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()); + + assertEquals(2L, clientNode.order()); + + final CountDownLatch failLatch = new CountDownLatch(1); + + final CountDownLatch joinLatch = new CountDownLatch(1); + + srv.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + info("Server event: " + evt); + + DiscoveryEvent evt0 = (DiscoveryEvent)evt; + + if (evt0.eventNode().id().equals(clientId) && (evt.type() == EVT_NODE_FAILED)) { + if (evt.type() == EVT_NODE_FAILED) + failLatch.countDown(); + } + else if (evt.type() == EVT_NODE_JOINED) { + TcpDiscoveryNode node = (TcpDiscoveryNode)evt0.eventNode(); + + if ("client-0".equals(node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME))) { + assertEquals(changeTop ? 5L : 4L, node.order()); + + joinLatch.countDown(); + } + } + + return true; + } + }, EVT_NODE_FAILED, EVT_NODE_JOINED); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + info("Client event: " + evt); + + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + assertEquals(1, reconnectLatch.getCount()); + + disconnectLatch.countDown(); + + if (changeTop) + clientSpi.pauseAll(false); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + assertEquals(0, disconnectLatch.getCount()); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + if (changeTop) { + Ignite g = startGrid("server-" + srvIdx.getAndIncrement()); + + srvNodeIds.add(g.cluster().localNode().id()); + + clientSpi.resumeAll(); + } + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + assertTrue(failLatch.await(5000, MILLISECONDS)); + assertTrue(joinLatch.await(5000, MILLISECONDS)); + + long topVer = changeTop ? 5L : 4L; + + assertEquals(topVer, client.cluster().localNode().order()); + + assertEquals(topVer, client.cluster().topologyVersion()); + + Collection<ClusterNode> clientTop = client.cluster().topology(topVer); + + assertEquals(changeTop ? 3 : 2, clientTop.size()); + + clientNodeIds.remove(clientId); + + clientNodeIds.add(client.cluster().localNode().id()); + + checkNodes(changeTop ? 2 : 1, 1); + + Ignite g = startGrid("server-" + srvIdx.getAndIncrement()); + + srvNodeIds.add(g.cluster().localNode().id()); + + checkNodes(changeTop ? 3 : 2, 1); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectAfterFailConcurrentJoin() throws Exception { + startServerNodes(1); + + startClientNodes(1); + + Ignite srv = G.ignite("server-0"); + + TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi()); + + Ignite client = G.ignite("client-0"); + + final ClusterNode clientNode = client.cluster().localNode(); + + assertEquals(2L, clientNode.order()); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + final CountDownLatch disconnectLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + info("Client event: " + evt); + + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + assertEquals(1, reconnectLatch.getCount()); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + assertEquals(0, disconnectLatch.getCount()); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + final int CLIENTS = 20; + + clientsPerSrv = CLIENTS + 1; + + final CountDownLatch latch = new CountDownLatch(1); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + latch.await(); + + Ignite g = startGrid("client-" + clientIdx.getAndIncrement()); + + clientNodeIds.add(g.cluster().localNode().id()); + + return null; + } + }, CLIENTS, "start-client"); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + latch.countDown(); + + assertTrue(disconnectLatch.await(10_000, MILLISECONDS)); + assertTrue(reconnectLatch.await(10_000, MILLISECONDS)); + + clientNodeIds.add(client.cluster().localNode().id()); + + fut.get(); + + checkNodes(1, CLIENTS + 1); + } + + /** + * @throws Exception If failed. + */ + public void testClientFailReconnectDisabled() throws Exception { + reconnectDisabled = true; + + startServerNodes(1); + + startClientNodes(1); + + Ignite srv = G.ignite("server-0"); + + TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi()); + + Ignite client = G.ignite("client-0"); + + final CountDownLatch segmentedLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_NODE_SEGMENTED) + segmentedLatch.countDown(); + + return false; + } + }, EVT_NODE_SEGMENTED); + + srvFailedLatch = new CountDownLatch(1); + + attachListeners(1, 0); + + log.info("Fail client node."); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(srvFailedLatch.await(5000, MILLISECONDS)); + assertTrue(segmentedLatch.await(5000, MILLISECONDS)); + + checkNodes(1, 0); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectSegmentedAfterJoinTimeoutServerFailed() throws Exception { + reconnectSegmentedAfterJoinTimeout(true); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectSegmentedAfterJoinTimeoutNetworkError() throws Exception { + reconnectSegmentedAfterJoinTimeout(false); + } + + /** + * @param failSrv If {@code true} fails server, otherwise server does not send join message. + * @throws Exception If failed. + */ + private void reconnectSegmentedAfterJoinTimeout(boolean failSrv) throws Exception { + netTimeout = 4000; + joinTimeout = 5000; + + startServerNodes(1); + + startClientNodes(1); + + final Ignite srv = G.ignite("server-0"); + Ignite client = G.ignite("client-0"); + + TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi()); + TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch segmentedLatch = new CountDownLatch(1); + final AtomicBoolean err = new AtomicBoolean(false); + + if (!failSrv) { + srvFailedLatch = new CountDownLatch(1); + + attachListeners(1, 0); + } + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + log.info("Disconnected event."); + + assertEquals(1, segmentedLatch.getCount()); + assertEquals(1, disconnectLatch.getCount()); + assertFalse(err.get()); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_NODE_SEGMENTED) { + log.info("Segmented event."); + + assertEquals(1, segmentedLatch.getCount()); + assertEquals(0, disconnectLatch.getCount()); + assertFalse(err.get()); + + segmentedLatch.countDown(); + } + else { + log.error("Unexpected event: " + evt); + + err.set(true); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED); + + if (failSrv) { + log.info("Fail server."); + + failServer(0); + } + else { + log.info("Fail client connection."); + + srvSpi.failClientReconnect.set(1_000_000); + srvSpi.skipNodeAdded = true; + + clientSpi.brakeConnection(); + } + + assertTrue(disconnectLatch.await(10_000, MILLISECONDS)); + + assertTrue(segmentedLatch.await(10_000, MILLISECONDS)); + + waitSegmented(client); + + assertFalse(err.get()); + + if (!failSrv) { + await(srvFailedLatch); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return srv.cluster().nodes().size() == 1; + } + }, 10_000); + + checkNodes(1, 0); + } + } + + /** + * @throws Exception If failed. + */ + public void testReconnectClusterRestart() throws Exception { + netTimeout = 3000; + joinTimeout = 60_000; + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + final AtomicBoolean err = new AtomicBoolean(false); + + startServerNodes(1); + + startClientNodes(1); + + Ignite srv = G.ignite("server-0"); + Ignite client = G.ignite("client-0"); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + log.info("Disconnected event."); + + assertEquals(1, reconnectLatch.getCount()); + assertEquals(1, disconnectLatch.getCount()); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + log.info("Reconnected event."); + + assertEquals(1, reconnectLatch.getCount()); + assertEquals(0, disconnectLatch.getCount()); + assertFalse(err.get()); + + reconnectLatch.countDown(); + } + else { + log.error("Unexpected event: " + evt); + + err.set(true); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED); + + log.info("Stop server."); + + srv.close(); + + assertTrue(disconnectLatch.await(10_000, MILLISECONDS)); + + srvNodeIds.clear(); + srvIdx.set(0); + + Thread.sleep(3000); + + log.info("Restart server."); + + startServerNodes(1); + + assertTrue(reconnectLatch.await(10_000, MILLISECONDS)); + + clientNodeIds.clear(); + clientNodeIds.add(client.cluster().localNode().id()); + + checkNodes(1, 1); + + assertFalse(err.get()); + } + + /** + * @throws Exception If failed. + */ + public void testDisconnectAfterNetworkTimeout() throws Exception { + netTimeout = 5000; + joinTimeout = 60_000; + maxMissedClientHbs = 2; + + startServerNodes(1); + + startClientNodes(1); + + final Ignite srv = G.ignite("server-0"); + Ignite client = G.ignite("client-0"); + + TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi()); + TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + final AtomicBoolean err = new AtomicBoolean(false); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override + public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + log.info("Disconnected event."); + + assertEquals(1, reconnectLatch.getCount()); + assertEquals(1, disconnectLatch.getCount()); + assertFalse(err.get()); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + log.info("Reconnected event."); + + assertEquals(1, reconnectLatch.getCount()); + assertEquals(0, disconnectLatch.getCount()); + assertFalse(err.get()); + + reconnectLatch.countDown(); + } + else { + log.error("Unexpected event: " + evt); + + err.set(true); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED); + + log.info("Fail client connection1."); + + srvSpi.failClientReconnect.set(1_000_000); + srvSpi.skipNodeAdded = true; + + clientSpi.brakeConnection(); + + assertTrue(disconnectLatch.await(10_000, MILLISECONDS)); + + log.info("Fail client connection2."); + + srvSpi.failClientReconnect.set(0); + srvSpi.skipNodeAdded = false; + + clientSpi.brakeConnection(); + + assertTrue(reconnectLatch.await(10_000, MILLISECONDS)); + + clientNodeIds.clear(); + + clientNodeIds.add(client.cluster().localNode().id()); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override + public boolean apply() { + return srv.cluster().nodes().size() == 2; + } + }, 10_000); + + checkNodes(1, 1); + + assertFalse(err.get()); + } + + /** + * @param ignite Ignite. + * @throws Exception If failed. + */ + private void waitSegmented(final Ignite ignite) throws Exception { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return IgniteState.STOPPED_ON_SEGMENTATION == Ignition.state(ignite.name()); + } + }, 5000); + + assertEquals(IgniteState.STOPPED_ON_SEGMENTATION, Ignition.state(ignite.name())); + } + + /** * @param clientIdx Client index. * @param srvIdx Server index. * @throws Exception In case of error. @@ -1401,7 +1932,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { private void checkRemoteNodes(Ignite ignite, int expCnt) { Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes(); - assertEquals(expCnt, nodes.size()); + assertEquals("Unexpected state for node: " + ignite.name(), expCnt, nodes.size()); for (ClusterNode node : nodes) { UUID id = node.id(); @@ -1420,7 +1951,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @throws InterruptedException If interrupted. */ private void await(CountDownLatch latch) throws InterruptedException { - assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS)); + assertTrue("Latch count: " + latch.getCount(), latch.await(10_000, MILLISECONDS)); } /** @@ -1471,6 +2002,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** */ private volatile String delayJoinAckFor; + /** */ + private volatile boolean skipNodeAdded; + /** * @param lock Lock. */ @@ -1543,6 +2077,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { boolean fail = false; + if (skipNodeAdded && + (msg instanceof TcpDiscoveryNodeAddedMessage || msg instanceof TcpDiscoveryNodeAddFinishedMessage)) { + log.info("Skip message: " + msg); + + return; + } + if (msg instanceof TcpDiscoveryNodeAddedMessage) fail = failNodeAdded.getAndDecrement() > 0; else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) @@ -1577,12 +2118,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** - * + * @param suspend If {@code true} suspends worker threads. */ - public void pauseAll() { + public void pauseAll(boolean suspend) { pauseResumeOperation(true, openSockLock, writeLock); - impl.workerThread().suspend(); + if (suspend) + impl.workerThread().suspend(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java index 159c451..dacbf55 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java @@ -317,4 +317,9 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx { @Override public ClusterMetrics metrics() throws IgniteException { throw new UnsupportedOperationException("Operation is not supported yet."); } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteFuture<?> clientReconnectFuture() { + throw new UnsupportedOperationException("Operation is not supported yet."); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java new file mode 100644 index 0000000..66c9835 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import junit.framework.*; +import org.apache.ignite.internal.*; + +/** + * + */ +public class IgniteClientReconnectTestSuite extends TestSuite { + /** + * @return Test suite. + * @throws Exception In case of error. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Ignite Client Reconnect Test Suite"); + + suite.addTestSuite(IgniteClientReconnectStopTest.class); + suite.addTestSuite(IgniteClientReconnectApiExceptionTest.class); + suite.addTestSuite(IgniteClientReconnectDiscoveryStateTest.class); + suite.addTestSuite(IgniteClientReconnectCacheTest.class); + suite.addTestSuite(IgniteClientReconnectContinuousProcessorTest.class); + suite.addTestSuite(IgniteClientReconnectComputeTest.class); + suite.addTestSuite(IgniteClientReconnectAtomicsTest.class); + suite.addTestSuite(IgniteClientReconnectCollectionsTest.class); + suite.addTestSuite(IgniteClientReconnectServicesTest.class); + suite.addTestSuite(IgniteClientReconnectStreamerTest.class); + suite.addTestSuite(IgniteClientReconnectFailoverTest.class); + + return suite; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 06c0961..c76dbe7 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1439,6 +1439,11 @@ public class IgniteH2Indexing implements GridQueryIndexing { fut.get(); } + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture<?> reconnectFut) { + rdcQryExec.onDisconnected(reconnectFut); + } + /** * Wrapper to store connection and flag is schema set or not. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java index af29647..2b2996d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java @@ -26,6 +26,7 @@ import org.h2.table.*; import org.jetbrains.annotations.*; import org.jsr166.*; +import javax.cache.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -40,7 +41,7 @@ public abstract class GridMergeIndex extends BaseIndex { private static final int MAX_FETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_MAX_SIZE, 10_000); /** All rows number. */ - private final AtomicInteger expectedRowsCnt = new AtomicInteger(0); + private final AtomicInteger expRowsCnt = new AtomicInteger(0); /** Remaining rows per source node ID. */ private final ConcurrentMap<UUID, Counter> remainingRows = new ConcurrentHashMap8<>(); @@ -75,8 +76,8 @@ public abstract class GridMergeIndex extends BaseIndex { } /** {@inheritDoc} */ - @Override public long getRowCount(Session session) { - return expectedRowsCnt.get(); + @Override public long getRowCount(Session ses) { + return expRowsCnt.get(); } /** {@inheritDoc} */ @@ -93,6 +94,23 @@ public abstract class GridMergeIndex extends BaseIndex { } /** + * @param e Error. + */ + public void fail(final CacheException e) { + for (UUID nodeId0 : remainingRows.keySet()) { + addPage0(new GridResultPage(null, nodeId0, null) { + @Override public boolean isFail() { + return true; + } + + @Override public void fetchNextPage() { + throw e; + } + }); + } + } + + /** * @param nodeId Node ID. */ public void fail(UUID nodeId) { @@ -120,7 +138,7 @@ public abstract class GridMergeIndex extends BaseIndex { assert !cnt.initialized : "Counter is already initialized."; cnt.addAndGet(allRows); - expectedRowsCnt.addAndGet(allRows); + expRowsCnt.addAndGet(allRows); // We need this separate flag to handle case when the first source contains only one page // and it will signal that all remaining counters are zero and fetch is finished. @@ -162,7 +180,7 @@ public abstract class GridMergeIndex extends BaseIndex { } /** {@inheritDoc} */ - @Override public Cursor find(Session session, SearchRow first, SearchRow last) { + @Override public Cursor find(Session ses, SearchRow first, SearchRow last) { if (fetched == null) throw new IgniteException("Fetched result set was too large."); @@ -176,7 +194,7 @@ public abstract class GridMergeIndex extends BaseIndex { * @return {@code true} If we have fetched all the remote rows. */ public boolean fetchedAll() { - return fetchedCnt == expectedRowsCnt.get(); + return fetchedCnt == expRowsCnt.get(); } /** @@ -200,32 +218,32 @@ public abstract class GridMergeIndex extends BaseIndex { } /** {@inheritDoc} */ - @Override public void close(Session session) { + @Override public void close(Session ses) { // No-op. } /** {@inheritDoc} */ - @Override public void add(Session session, Row row) { + @Override public void add(Session ses, Row row) { throw DbException.getUnsupportedException("add"); } /** {@inheritDoc} */ - @Override public void remove(Session session, Row row) { + @Override public void remove(Session ses, Row row) { throw DbException.getUnsupportedException("remove row"); } /** {@inheritDoc} */ - @Override public double getCost(Session session, int[] masks, TableFilter filter, SortOrder sortOrder) { + @Override public double getCost(Session ses, int[] masks, TableFilter filter, SortOrder sortOrder) { return getRowCountApproximation() + Constants.COST_ROW_OFFSET; } /** {@inheritDoc} */ - @Override public void remove(Session session) { + @Override public void remove(Session ses) { throw DbException.getUnsupportedException("remove index"); } /** {@inheritDoc} */ - @Override public void truncate(Session session) { + @Override public void truncate(Session ses) { throw DbException.getUnsupportedException("truncate"); } @@ -235,7 +253,7 @@ public abstract class GridMergeIndex extends BaseIndex { } /** {@inheritDoc} */ - @Override public Cursor findFirstOrLast(Session session, boolean first) { + @Override public Cursor findFirstOrLast(Session ses, boolean first) { throw DbException.getUnsupportedException("findFirstOrLast"); } @@ -299,6 +317,7 @@ public abstract class GridMergeIndex extends BaseIndex { private Iterator<Row> stream; /** + * @param stream Iterator. */ public FetchingCursor(Iterator<Row> stream) { super(new FetchedIterator()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 32d1c95..cde3288 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.plugin.extensions.communication.*; import org.h2.command.*; @@ -47,6 +48,7 @@ import org.h2.table.*; import org.h2.tools.*; import org.h2.util.*; import org.h2.value.*; +import org.jetbrains.annotations.*; import org.jsr166.*; import javax.cache.*; @@ -234,10 +236,15 @@ public class GridReduceQueryExecutor { Object errState = r.state.get(); if (errState != null) { + CacheException err0 = errState instanceof CacheException ? (CacheException)errState : null; + + if (err0 != null && err0.getCause() instanceof IgniteClientDisconnectedException) + throw err0; + CacheException e = new CacheException("Failed to fetch data from node: " + node.id()); - if (errState instanceof CacheException) - e.addSuppressed((Throwable)errState); + if (err0 != null) + e.addSuppressed(err0); throw e; } @@ -301,6 +308,7 @@ public class GridReduceQueryExecutor { } /** + * @param cctx Cache context. * @return {@code true} If cache context */ private boolean hasMovingPartitions(GridCacheContext<?,?> cctx) { @@ -481,6 +489,12 @@ public class GridReduceQueryExecutor { runs.put(qryReqId, r); try { + if (ctx.clientDisconnected()) { + throw new CacheException("Query was cancelled, client node disconnected.", + new IgniteClientDisconnectedException(ctx.cluster().clientReconnectFuture(), + "Client node disconnected.")); + } + Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries(); if (qry.explain()) { @@ -506,8 +520,14 @@ public class GridReduceQueryExecutor { Object state = r.state.get(); if (state != null) { - if (state instanceof CacheException) - throw new CacheException("Failed to run map query remotely.", (CacheException)state); + if (state instanceof CacheException) { + CacheException err = (CacheException)state; + + if (err.getCause() instanceof IgniteClientDisconnectedException) + throw err; + + throw new CacheException("Failed to run map query remotely.", err); + } if (state instanceof AffinityTopologyVersion) { retry = true; @@ -550,7 +570,20 @@ public class GridReduceQueryExecutor { catch (IgniteCheckedException | RuntimeException e) { U.closeQuiet(r.conn); - throw new CacheException("Failed to run reduce query locally.", e); + if (e instanceof CacheException) + throw (CacheException)e; + + Throwable cause = e; + + if (e instanceof IgniteCheckedException) { + Throwable disconnectedErr = + ((IgniteCheckedException)e).getCause(IgniteClientDisconnectedException.class); + + if (disconnectedErr != null) + cause = disconnectedErr; + } + + throw new CacheException("Failed to run reduce query locally.", cause); } finally { if (!runs.remove(qryReqId, r)) @@ -1082,6 +1115,17 @@ public class GridReduceQueryExecutor { } /** + * @param reconnectFut Reconnect future. + */ + public void onDisconnected(IgniteFuture<?> reconnectFut) { + CacheException err = new CacheException("Query was cancelled, client node disconnected.", + new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.")); + + for (Map.Entry<Long, QueryRun> e : runs.entrySet()) + e.getValue().disconnected(err); + } + + /** * */ private static class QueryRun { @@ -1104,7 +1148,7 @@ public class GridReduceQueryExecutor { * @param o Fail state object. * @param nodeId Node ID. */ - void state(Object o, UUID nodeId) { + void state(Object o, @Nullable UUID nodeId) { assert o != null; assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass(); @@ -1117,6 +1161,20 @@ public class GridReduceQueryExecutor { for (GridMergeTable tbl : tbls) // Fail all merge indexes. tbl.getScanIndex(null).fail(nodeId); } + + /** + * @param e Error. + */ + void disconnected(CacheException e) { + if (!state.compareAndSet(null, e)) + return; + + while (latch.getCount() != 0) // We don't need to wait for all nodes to reply. + latch.countDown(); + + for (GridMergeTable tbl : tbls) // Fail all merge indexes. + tbl.getScanIndex(null).fail(e); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java new file mode 100644 index 0000000..23320ae --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; + +import javax.cache.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class IgniteClientReconnectCacheQueriesFailoverTest extends IgniteClientReconnectFailoverAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setIndexedTypes(Integer.class, Person.class); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + final IgniteCache<Integer, Person> cache = grid(serverCount()).cache(null); + + assertNotNull(cache); + + for (int i = 0; i <= 10_000; i++) + cache.put(i, new Person(i, "name-" + i)); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectCacheQueries() throws Exception { + final Ignite client = grid(serverCount()); + + final IgniteCache<Integer, Person> cache = client.cache(null); + + assertNotNull(cache); + + reconnectFailover(new Callable<Void>() { + @Override public Void call() throws Exception { + SqlQuery<Integer, Person> sqlQry = new SqlQuery<>(Person.class, "where id > 1"); + + try { + assertEquals(9999, cache.query(sqlQry).getAll().size()); + } + catch (CacheException e) { + if (e.getCause() instanceof IgniteClientDisconnectedException) + throw e; + else + log.info("Ignore error: " + e); + } + + try { + SqlFieldsQuery fieldsQry = new SqlFieldsQuery("select avg(p.id) from Person p"); + + List<List<?>> res = cache.query(fieldsQry).getAll(); + + assertEquals(1, res.size()); + + Double avg = (Double)res.get(0).get(0); + + assertEquals(5_000, avg.intValue()); + } + catch (CacheException e) { + if (e.getCause() instanceof IgniteClientDisconnectedException) + throw e; + else + log.info("Ignore error: " + e); + } + + return null; + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectScanQuery() throws Exception { + final Ignite client = grid(serverCount()); + + final IgniteCache<Integer, Person> cache = client.cache(null); + + assertNotNull(cache); + + final Affinity<Integer> aff = client.affinity(null); + + final Map<Integer, Integer> partMap = new HashMap<>(); + + for (int i = 0; i < aff.partitions(); i++) + partMap.put(i, 0); + + for (int i = 0; i <= 10_000; i++) { + Integer part = aff.partition(i); + + Integer size = partMap.get(part); + + partMap.put(part, size + 1); + } + + reconnectFailover(new Callable<Void>() { + @Override public Void call() throws Exception { + ScanQuery<Integer, Person> qry = new ScanQuery<>(new IgniteBiPredicate<Integer, Person>() { + @Override public boolean apply(Integer key, Person val) { + return val.getId() % 2 == 1; + } + }); + + assertEquals(5000, cache.query(qry).getAll().size()); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + Integer part = rnd.nextInt(0, aff.partitions()); + + qry = new ScanQuery<>(part); + + assertEquals((int)partMap.get(part), cache.query(qry).getAll().size()); + + return null; + } + }); + } + + /** + * + */ + public static class Person { + /** */ + @QuerySqlField + public int id; + + /** */ + @QuerySqlField + public String name; + + /** + * @param id Id. + * @param name Name. + */ + public Person(int id, String name) { + this.id = id; + this.name = name; + } + + /** + * @return Id. + */ + public int getId() { + return id; + } + + /** + * @param id Set id. + */ + public void setId(int id) { + this.id = id; + } + + /** + * @return Name. + */ + public String getName() { + return name; + } + + /** + * @param name Name. + */ + public void setName(String name) { + this.name = name; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + return this == o || !(o == null || getClass() != o.getClass()) && id == ((Person)o).id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Person.class, this); + } + } +}