# ignite-901
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/86d963f9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/86d963f9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/86d963f9 Branch: refs/heads/ignite-901 Commit: 86d963f98f3d3db33effdc482654e86d5b02bc52 Parents: a3318e3 Author: sboikov <sboi...@gridgain.com> Authored: Wed Jul 8 10:08:02 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jul 8 18:19:00 2015 +0300 ---------------------------------------------------------------------- .../IgniteClientDisconnectedException.java | 10 +- .../ignite/internal/GridJobSiblingImpl.java | 2 +- .../ignite/internal/GridKernalGatewayImpl.java | 4 +- .../apache/ignite/internal/IgniteKernal.java | 3 + .../internal/cluster/IgniteClusterImpl.java | 3 + .../internal/managers/GridManagerAdapter.java | 7 +- .../deployment/GridDeploymentCommunication.java | 2 +- .../deployment/GridDeploymentManager.java | 11 +- .../discovery/GridDiscoveryManager.java | 64 ++- .../processors/cache/GridCacheGateway.java | 3 +- .../processors/cache/GridCacheUtils.java | 5 +- .../processors/cache/IgniteCacheFutureImpl.java | 5 + .../dht/preloader/GridDhtPreloader.java | 2 +- .../clock/GridClockSyncProcessor.java | 2 +- .../datastreamer/DataStreamerImpl.java | 17 +- .../GridCacheCountDownLatchImpl.java | 2 +- .../processors/job/GridJobProcessor.java | 2 +- .../internal/processors/job/GridJobWorker.java | 2 +- .../processors/task/GridTaskProcessor.java | 41 +- .../processors/task/GridTaskWorker.java | 59 ++- .../ignite/internal/util/IgniteUtils.java | 19 + .../communication/tcp/TcpCommunicationSpi.java | 16 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 90 ++-- .../spi/discovery/tcp/TcpDiscoverySpi.java | 12 + .../IgniteClientReconnectAbstractTest.java | 3 + ...niteClientReconnectFailoverAbstractTest.java | 228 ++++++++++ .../IgniteClientReconnectFailoverTest.java | 167 +------- .../IgniteSlowClientDetectionSelfTest.java | 1 + .../cache/IgniteCacheDynamicStopSelfTest.java | 6 +- .../IgniteTxExceptionAbstractSelfTest.java | 1 + .../tcp/TcpClientDiscoverySpiSelfTest.java | 21 + .../h2/twostep/GridReduceQueryExecutor.java | 16 +- .../IgniteClientReconnectQueriesTest.java | 428 ------------------- ...ClientReconnectCacheQueriesFailoverTest.java | 149 +++++++ .../cache/IgniteClientReconnectQueriesTest.java | 428 +++++++++++++++++++ ...dCacheAbstractReduceFieldsQuerySelfTest.java | 4 + .../IgniteCacheQuerySelfTestSuite.java | 3 - .../IgniteCacheWithIndexingTestSuite.java | 1 + 38 files changed, 1147 insertions(+), 692 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java index 726091f..c40dd9c 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java @@ -33,6 +33,14 @@ public class IgniteClientDisconnectedException extends IgniteException { /** * @param reconnectFut Reconnect future. * @param msg Error message. + */ + public IgniteClientDisconnectedException(IgniteFuture<?> reconnectFut, String msg) { + this(reconnectFut, msg, null); + } + + /** + * @param reconnectFut Reconnect future. + * @param msg Error message. * @param cause Optional nested exception (can be {@code null}). */ public IgniteClientDisconnectedException( @@ -41,8 +49,6 @@ public class IgniteClientDisconnectedException extends IgniteException { @Nullable Throwable cause) { super(msg, cause); - assert reconnectFut != null; - this.reconnectFut = reconnectFut; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java index 62adf52..b4e0f01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java @@ -167,7 +167,7 @@ public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable { } catch (IgniteCheckedException e) { // Avoid stack trace for left nodes. - if (ctx.discovery().node(node.id()) != null && ctx.discovery().pingNode(node.id())) + if (ctx.discovery().node(node.id()) != null && ctx.discovery().pingNodeNoError(node.id())) U.error(ctx.log(GridJobSiblingImpl.class), "Failed to send cancel request to node " + "[nodeId=" + node.id() + ", ses=" + ses + ']', e); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java index b1f4df8..fa395e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java @@ -78,7 +78,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { rwLock.readUnlock(); if (state == GridKernalState.DISCONNECTED) - throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.", null); + throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName); throw illegalState(); } @@ -92,7 +92,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { rwLock.readLock(); if (state == GridKernalState.DISCONNECTED) - throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.", null); + throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 0dd3c29..0a9d093 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2186,6 +2186,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { return false; } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } finally { unguard(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java index c4de2da..246eab5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java @@ -123,6 +123,9 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus try { return ctx.discovery().pingNode(nodeId); } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } finally { unguard(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index 1cbe68d..9faa056 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -328,7 +328,12 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan @Override public boolean pingNode(UUID nodeId) { A.notNull(nodeId, "nodeId"); - return ctx.discovery().pingNode(nodeId); + try { + return ctx.discovery().pingNode(nodeId); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } } @Override public void send(ClusterNode node, Serializable msg, String topic) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java index 443b221..3b886a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java @@ -293,7 +293,7 @@ class GridDeploymentCommunication { log.debug("Sent peer class loading response [node=" + node.id() + ", res=" + res + ']'); } catch (IgniteCheckedException e) { - if (ctx.discovery().pingNode(nodeId)) + if (ctx.discovery().pingNodeNoError(nodeId)) U.error(log, "Failed to send peer class loading response to node: " + nodeId, e); else if (log.isDebugEnabled()) log.debug("Failed to send peer class loading response to node " + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java index 9e418a5..75fb41e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java @@ -94,13 +94,7 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> { comm.start(); - locStore = new GridDeploymentLocalStore(getSpi(), ctx, comm); - ldrStore = new GridDeploymentPerLoaderStore(getSpi(), ctx, comm); - verStore = new GridDeploymentPerVersionStore(getSpi(), ctx, comm); - - locStore.start(); - ldrStore.start(); - verStore.start(); + startStores(); if (log.isDebugEnabled()) { log.debug("Local deployment: " + locDep); @@ -129,6 +123,9 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> { storesStop(); + if (comm != null) + comm.stop(); + getSpi().setListener(null); stopSpi(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 4a064d1..096f0e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -477,6 +477,27 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { return; } + else if (type == EVT_CLIENT_NODE_DISCONNECTED) { + /* + * Notify all components from discovery thread to avoid concurrent + * reconnect while disconnect handling is in progress. + */ + + assert locNode.isClient() : locNode; + assert node.isClient() : node; + + ((IgniteKernal)ctx.grid()).onDisconnected(); + + DiscoveryEvent evt = new DiscoveryEvent(); + + evt.node(ctx.discovery().localNode()); + evt.eventNode(node); + evt.type(type); + + ctx.event().record(evt); + + return; + } discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg); } @@ -1106,8 +1127,36 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * @param nodeId ID of the node. * @return {@code True} if ping succeeded. + * @throws IgniteClientDisconnectedCheckedException If ping failed. */ - public boolean pingNode(UUID nodeId) { + public boolean pingNode(UUID nodeId) throws IgniteClientDisconnectedCheckedException { + assert nodeId != null; + + if (!busyLock.enterBusy()) + return false; + + try { + return getSpi().pingNode(nodeId); + } + catch (IgniteException e) { + if (e.hasCause(IgniteClientDisconnectedCheckedException.class)) { + IgniteFuture<?> reconnectFut = ctx.cluster().clientReconnectFuture(); + + throw new IgniteClientDisconnectedCheckedException(reconnectFut, e.getMessage()); + } + + throw e; + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param nodeId ID of the node. + * @return {@code True} if ping succeeded. + */ + public boolean pingNodeNoError(UUID nodeId) { assert nodeId != null; if (!busyLock.enterBusy()) @@ -1897,20 +1946,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { break; } - case EVT_CLIENT_NODE_DISCONNECTED: { - assert localNode().isClient() : evt; - - ((IgniteKernal)ctx.grid()).onDisconnected(); - - break; - } - case EVT_CLIENT_NODE_RECONNECTED: { assert localNode().isClient() : evt; // TODO IGNITE-901. ((IgniteKernal)ctx.grid()).reconnected(false); + if (log.isInfoEnabled()) + log.info("Client node reconnected to cluster: " + node); + + ackTopology(topVer.topologyVersion(), true); + break; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index a9a73eb..da409a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -68,6 +68,7 @@ public class GridCacheGateway<K, V> { /** * @param lock {@code True} if lock is held. * @param stopErr {@code True} if throw exception if stopped. + * @return {@code True} if cache is in started state. */ private boolean checkState(boolean lock, boolean stopErr) { State state = this.state; @@ -86,7 +87,7 @@ public class GridCacheGateway<K, V> { assert reconnectFut != null; throw new CacheException( - new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.", null)); + new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + ctx.gridName())); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 6faf6e4..bd2623d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1565,8 +1565,11 @@ public class GridCacheUtils { (IgniteClientDisconnectedCheckedException)e : e.getCause(IgniteClientDisconnectedCheckedException.class); - if (disconnectedErr != null) + if (disconnectedErr != null) { + assert disconnectedErr.reconnectFuture() != null : disconnectedErr; + e = disconnectedErr; + } if (e.hasCause(CacheWriterException.class)) return new CacheWriterException(U.convertExceptionNoWrap(e)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java index 06c28e6..13af004 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java @@ -37,6 +37,11 @@ public class IgniteCacheFutureImpl<V> extends IgniteFutureImpl<V> { /** {@inheritDoc} */ @Override protected RuntimeException convertException(IgniteCheckedException e) { + if (e instanceof IgniteFutureCancelledCheckedException || + e instanceof IgniteInterruptedCheckedException || + e instanceof IgniteFutureTimeoutCheckedException) + return U.convertException(e); + return CU.convertToCacheException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 969d7a2..f33f791 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -223,7 +223,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { final CacheConfiguration cfg = cctx.config(); - if (cfg.getRebalanceDelay() >= 0) { + if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) { U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name()); demandPool.syncFuture().listen(new CI1<Object>() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java index 2920176..478426f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java @@ -295,7 +295,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter { ctx.io().send(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL); } catch (IgniteCheckedException e) { - if (ctx.discovery().pingNode(n.id())) + if (ctx.discovery().pingNodeNoError(n.id())) U.error(log, "Failed to send time sync snapshot to remote node (did not leave grid?) " + "[nodeId=" + n.id() + ", msg=" + msg + ", err=" + e.getMessage() + ']'); else if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 55915f3..605f478 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -1275,11 +1275,18 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']'); } catch (IgniteCheckedException e) { - if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id())) - ((GridFutureAdapter<Object>)fut).onDone(e); - else - ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " + - "request (node has left): " + node.id())); + GridFutureAdapter<Object> fut0 = ((GridFutureAdapter<Object>)fut); + + try { + if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id())) + fut0.onDone(e); + else + fut0.onDone(new ClusterTopologyCheckedException("Failed to send request (node has left): " + + node.id())); + } + catch (IgniteClientDisconnectedCheckedException e0) { + fut0.onDone(e0); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java index 2d3cf13..cfc051c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java @@ -339,7 +339,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc GridCacheCountDownLatchValue latchVal = latchView.get(key); if (latchVal == null) - throw new IgniteCheckedException("Failed to find count down latch with given name: " + name); + return 0; val = latchVal.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index 48e9686..350068a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -1413,7 +1413,7 @@ public class GridJobProcessor extends GridProcessorAdapter { * @return {@code true} if node is dead, {@code false} is node is alive. */ private boolean isDeadNode(UUID uid) { - return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid); + return ctx.discovery().node(uid) == null || !ctx.discovery().pingNodeNoError(uid); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index d1ee5ad..3a309f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -863,7 +863,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { * @return {@code true} if node is dead, {@code false} is node is alive. */ private boolean isDeadNode(UUID uid) { - return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid); + return ctx.discovery().node(uid) == null || !ctx.discovery().pingNodeNoError(uid); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java index 65ce557..d3caf5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java @@ -121,11 +121,8 @@ public class GridTaskProcessor extends GridProcessorAdapter { @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { IgniteClientDisconnectedCheckedException err = disconnectedError(reconnectFut); - for (GridTaskWorker<?, ?> worker : tasks.values()) { + for (GridTaskWorker<?, ?> worker : tasks.values()) worker.finishTask(null, err); - - worker.cancel(); - } } /** @@ -617,31 +614,29 @@ public class GridTaskProcessor extends GridProcessorAdapter { assert taskWorker0 == null : "Session ID is not unique: " + sesId; - if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != null) { - try { - // Start task execution in another thread. - if (sys) - ctx.getSystemExecutorService().execute(taskWorker); - else - ctx.getExecutorService().execute(taskWorker); - } - catch (RejectedExecutionException e) { - tasks.remove(sesId); + if (!ctx.clientDisconnected()) { + if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != null) { + try { + // Start task execution in another thread. + if (sys) + ctx.getSystemExecutorService().execute(taskWorker); + else + ctx.getExecutorService().execute(taskWorker); + } + catch (RejectedExecutionException e) { + tasks.remove(sesId); - release(dep); + release(dep); - handleException(new ComputeExecutionRejectedException("Failed to execute task " + - "due to thread pool execution rejection: " + taskName, e), fut); + handleException(new ComputeExecutionRejectedException("Failed to execute task " + + "due to thread pool execution rejection: " + taskName, e), fut); + } } + else + taskWorker.run(); } else - taskWorker.run(); - - if (ctx.clientDisconnected()) { taskWorker.finishTask(null, disconnectedError(null)); - - taskWorker.cancel(); - } } } else { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index eb5fa77..133a31f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -1070,10 +1070,17 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { PUBLIC_POOL); } catch (IgniteCheckedException e) { - if (!isDeadNode(nodeId)) - U.error(log, "Failed to send cancel request to node (will ignore) [nodeId=" + - nodeId + ", taskName=" + ses.getTaskName() + - ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']', e); + try { + if (!isDeadNode(nodeId)) + U.error(log, "Failed to send cancel request to node (will ignore) [nodeId=" + + nodeId + ", taskName=" + ses.getTaskName() + + ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']', e); + } + catch (IgniteClientDisconnectedCheckedException e0) { + if (log.isDebugEnabled()) + log.debug("Failed to send cancel request to node, client disconnected [nodeId=" + + nodeId + ", taskName=" + ses.getTaskName() + ']'); + } } } } @@ -1169,24 +1176,39 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { } } catch (IgniteCheckedException e) { - boolean deadNode = isDeadNode(res.getNode().id()); + IgniteException fakeErr = null; - // Avoid stack trace if node has left grid. - if (deadNode) - U.warn(log, "Failed to send job request because remote node left grid (if failover is enabled, " + - "will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() + - ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']'); - else - U.error(log, "Failed to send job request: " + req, e); + try { + boolean deadNode = isDeadNode(res.getNode().id()); + + // Avoid stack trace if node has left grid. + if (deadNode) { + U.warn(log, "Failed to send job request because remote node left grid (if failover is enabled, " + + "will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() + + ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']'); + + fakeErr = new ClusterTopologyException("Failed to send job due to node failure: " + node, e); + } + else + U.error(log, "Failed to send job request: " + req, e); + + } + catch (IgniteClientDisconnectedCheckedException e0) { + if (log.isDebugEnabled()) + log.debug("Failed to send job request, client disconnected [node=" + node + + ", taskName=" + ses.getTaskName() + ", taskSesId=" + ses.getId() + ", jobSesId=" + + res.getJobContext().getJobId() + ']'); + + fakeErr = U.convertException(e0); + } GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(), res.getJobContext().getJobId(), null, null, null, null, null, null, false); - if (deadNode) - fakeRes.setFakeException(new ClusterTopologyException("Failed to send job due to node failure: " + - node, e)); - else - fakeRes.setFakeException(U.convertException(e)); + if (fakeErr == null) + fakeErr = U.convertException(e); + + fakeRes.setFakeException(fakeErr); onResponse(fakeRes); } @@ -1345,8 +1367,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { * * @param uid UID of node to check. * @return {@code true} if node is dead, {@code false} is node is alive. + * @throws IgniteClientDisconnectedCheckedException if ping failed when client disconnected. */ - private boolean isDeadNode(UUID uid) { + private boolean isDeadNode(UUID uid) throws IgniteClientDisconnectedCheckedException { return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 91d8172..149222e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -682,6 +682,25 @@ public abstract class IgniteUtils { * @return Ignite runtime exception. */ public static IgniteException convertException(IgniteCheckedException e) { + IgniteClientDisconnectedException e0 = e.getCause(IgniteClientDisconnectedException.class); + + if (e0 != null) { + assert e0.reconnectFuture() != null : e0; + + throw e0; + } + + IgniteClientDisconnectedCheckedException disconnectedErr = + e instanceof IgniteClientDisconnectedCheckedException ? + (IgniteClientDisconnectedCheckedException)e + : e.getCause(IgniteClientDisconnectedCheckedException.class); + + if (disconnectedErr != null) { + assert disconnectedErr.reconnectFuture() != null : disconnectedErr; + + e = disconnectedErr; + } + C1<IgniteCheckedException, IgniteException> converter = exceptionConverters.get(e.getClass()); if (converter != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 7691e3f..8ea2b82 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -2707,10 +2707,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) { ClusterNode node = recoveryDesc.node(); - if (clients.containsKey(node.id()) || - !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) || - !getSpiContext().pingNode(node.id())) + try { + if (clients.containsKey(node.id()) || + !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) || + !getSpiContext().pingNode(node.id())) + return; + } + catch (IgniteClientDisconnectedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to ping node, client disconnected."); + return; + } try { if (log.isDebugEnabled()) @@ -3100,6 +3108,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param nodeId Node ID. */ private NodeIdMessage(UUID nodeId) { + assert nodeId != null; + nodeIdBytes = U.uuidToBytes(nodeId); nodeIdBytesWithType = new byte[nodeIdBytes.length + 1]; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 38ba8fd..b3793b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -70,6 +70,9 @@ class ClientImpl extends TcpDiscoveryImpl { /** */ private SocketReader sockReader; + /** */ + private volatile State state; + /** Last message ID. */ private volatile IgniteUuid lastMsgId; @@ -255,23 +258,36 @@ class ClientImpl extends TcpDiscoveryImpl { if (oldFut != null) fut = oldFut; else { - if (spi.getSpiContext().isStopping()) { + State state = this.state; + + if (spi.getSpiContext().isStopping() || state == State.STOPPED || state == State.SEGMENTED) { if (pingFuts.remove(nodeId, fut)) fut.onDone(false); return false; } + else if (state == State.DISCONNECTED) { + if (pingFuts.remove(nodeId, fut)) + fut.onDone(new IgniteClientDisconnectedCheckedException(null, + "Failed to ping node, client node disconnected.")); + } + else { + final GridFutureAdapter<Boolean> finalFut = fut; + + timer.schedule(new TimerTask() { + @Override public void run() { + if (pingFuts.remove(nodeId, finalFut)) { + if (ClientImpl.this.state == State.DISCONNECTED) + finalFut.onDone(new IgniteClientDisconnectedCheckedException(null, + "Failed to ping node, client node disconnected.")); + else + finalFut.onDone(false); + } + } + }, spi.netTimeout); - final GridFutureAdapter<Boolean> finalFut = fut; - - timer.schedule(new TimerTask() { - @Override public void run() { - if (pingFuts.remove(nodeId, finalFut)) - finalFut.onDone(false); - } - }, spi.netTimeout); - - sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId)); + sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId)); + } } } @@ -282,7 +298,7 @@ class ClientImpl extends TcpDiscoveryImpl { return false; } catch (IgniteCheckedException e) { - throw new IgniteSpiException(e); // Should newer occur. + throw new IgniteSpiException(e); } } @@ -953,8 +969,7 @@ class ClientImpl extends TcpDiscoveryImpl { @Override protected void body() throws InterruptedException { assert state == ClientImpl.State.DISCONNECTED || state == ClientImpl.State.CONNECTED - || state == ClientImpl.State.STARTING : - state; + || state == ClientImpl.State.STARTING : state; boolean success = false; @@ -976,7 +991,7 @@ class ClientImpl extends TcpDiscoveryImpl { } else U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " + - "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']'); + "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']'); return; } @@ -1171,21 +1186,36 @@ class ClientImpl extends TcpDiscoveryImpl { reconnector = null; - state = ClientImpl.State.DISCONNECTED; + if (spi.isClientReconnectDisabled()) { + state = ClientImpl.State.SEGMENTED; + + notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); + } + else { + state = ClientImpl.State.DISCONNECTED; + + nodeAdded = false; - nodeAdded = false; + IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException( + null, "Failed to ping node, client node disconnected."); - notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes()); + for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) { + GridFutureAdapter<Boolean> fut = e.getValue(); + + if (pingFuts.remove(e.getKey(), fut)) + fut.onDone(err); + } - UUID newId = UUID.randomUUID(); + notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes()); - log.info("Change node id: " + newId); + UUID newId = UUID.randomUUID(); - rmtNodes.clear(); + log.info("Change node id: " + newId + " " + locNode.attribute(IgniteNodeAttributes.ATTR_GRID_NAME)); - locNode.onClientDisconnected(newId); + locNode.onClientDisconnected(newId); - tryJoin(); + tryJoin(); + } } else { TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg; @@ -1298,11 +1328,13 @@ class ClientImpl extends TcpDiscoveryImpl { * @return {@code True} if client in process of join. */ private boolean joining() { + ClientImpl.State state = ClientImpl.this.state; + return state == ClientImpl.State.STARTING || state == ClientImpl.State.DISCONNECTED; } /** - * @return {@code True} if disconnected. + * @return {@code True} if client disconnected. */ private boolean disconnected() { return state == ClientImpl.State.DISCONNECTED; @@ -1795,17 +1827,23 @@ class ClientImpl extends TcpDiscoveryImpl { } } - private volatile State state; - + /** + * + */ private enum State { + /** */ STARTING, + /** */ CONNECTED, + /** */ DISCONNECTED, + /** */ SEGMENTED, + /** */ STOPPED } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 9446d2d..3995207 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -327,6 +327,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** */ private boolean forceSrvMode; + /** */ + private boolean clientReconnectDisabled; + /** {@inheritDoc} */ @Override public String getSpiState() { return impl.getSpiState(); @@ -416,6 +419,15 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T return this; } + public boolean isClientReconnectDisabled() { + return clientReconnectDisabled; + } + + @IgniteSpiConfiguration(optional = true) + public void setClientReconnectDisabled(boolean clientReconnectDisabled) { + this.clientReconnectDisabled = clientReconnectDisabled; + } + /** * Inject resources * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index ec043f8..8fca97c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -218,6 +218,9 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra protected IgniteFuture<?> check(CacheException e) { log.info("Expected exception: " + e); + if (!(e.getCause() instanceof IgniteClientDisconnectedException)) + log.error("Unexpected cause: " + e.getCause(), e); + assertTrue("Unexpected cause: " + e.getCause(), e.getCause() instanceof IgniteClientDisconnectedException); IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java new file mode 100644 index 0000000..551cb1a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.*; + +import javax.cache.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.events.EventType.*; + +/** + * + */ +public abstract class IgniteClientReconnectFailoverAbstractTest extends IgniteClientReconnectAbstractTest { + /** */ + private static final Integer THREADS = 1; + + /** */ + private volatile CyclicBarrier barrier; + + /** */ + protected static final long TEST_TIME = 90_000; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPeerClassLoadingEnabled(false); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected int clientCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIME * 60_000; + } + + /** + * @param c Test closure. + * @throws Exception If failed. + */ + protected final void reconnectFailover(final Callable<Void> c) throws Exception { + final Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final AtomicBoolean stop = new AtomicBoolean(false); + + final IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + int iter = 0; + + while (!stop.get()) { + try { + c.call(); + } + catch (CacheException e) { + checkAndWait(e); + } + catch (IgniteClientDisconnectedException e) { + checkAndWait(e); + } + + if (++iter % 100 == 0) + log.info("Iteration: " + iter); + + if (barrier != null) + barrier.await(); + } + + return null; + } catch (Throwable e) { + log.error("Unexpected error in operation thread: " + e, e); + + stop.set(true); + + throw e; + } + } + }, THREADS, "test-operation-thread"); + + final AtomicReference<CountDownLatch> disconnected = new AtomicReference<>(); + final AtomicReference<CountDownLatch> reconnected = new AtomicReference<>(); + + IgnitePredicate<Event> p = new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + CountDownLatch latch = reconnected.get(); + + assertNotNull(latch); + assertEquals(1, latch.getCount()); + + latch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + CountDownLatch latch = disconnected.get(); + + assertNotNull(latch); + assertEquals(1, latch.getCount()); + + latch.countDown(); + } + + return true; + } + }; + + client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + try { + long stopTime = System.currentTimeMillis() + TEST_TIME; + + String err = null; + + while (System.currentTimeMillis() < stopTime && !fut.isDone()) { + U.sleep(100); + + CountDownLatch disconnectLatch = new CountDownLatch(1); + CountDownLatch reconnectLatch = new CountDownLatch(1); + + disconnected.set(disconnectLatch); + reconnected.set(reconnectLatch); + + UUID nodeId = client.cluster().localNode().id(); + + log.info("Fail client: " + nodeId); + + srvSpi.failNode(nodeId, null); + + if (!disconnectLatch.await(5000, MILLISECONDS)) { + err = "Failed to wait for disconnect"; + + break; + } + + if (!reconnectLatch.await(5000, MILLISECONDS)) { + err = "Failed to wait for reconnect"; + + break; + } + + barrier = new CyclicBarrier(THREADS + 1, new Runnable() { + @Override public void run() { + barrier = null; + } + }); + + try { + barrier.await(10, SECONDS); + } + catch (TimeoutException e) { + err = "Operations hang or fail with unexpected error."; + + break; + } + } + + if (err != null) { + log.error(err); + + U.dumpThreads(log); + + CyclicBarrier barrier0 = barrier; + + if (barrier0 != null) + barrier0.reset(); + + stop.set(true); + + fut.get(); + + fail(err); + } + + stop.set(true); + + fut.get(); + } + finally { + client.events().stopLocalListen(p); + + stop.set(true); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java index 35f86f5..7cfc329 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java @@ -19,37 +19,24 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; -import org.apache.ignite.testframework.*; import org.apache.ignite.transactions.*; -import javax.cache.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import static java.util.concurrent.TimeUnit.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.transactions.TransactionIsolation.*; /** * */ -public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectAbstractTest { +public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectFailoverAbstractTest { /** */ - public final Integer THREADS = 8; + protected static final String ATOMIC_CACHE = "ATOMIC_CACHE"; /** */ - private volatile CyclicBarrier barrier; - - /** */ - private static final String ATOMIC_CACHE = "ATOMIC_CACHE"; - - /** */ - private static final String TX_CACHE = "TX_CACHE"; + protected static final String TX_CACHE = "TX_CACHE"; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { @@ -72,21 +59,6 @@ public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectAbst return cfg; } - /** {@inheritDoc} */ - @Override protected int serverCount() { - return 3; - } - - /** {@inheritDoc} */ - @Override protected int clientCount() { - return 1; - } - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return 2 * 60_000; - } - /** * @throws Exception If failed. */ @@ -199,138 +171,33 @@ public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectAbst } /** - * @param c Test closure. * @throws Exception If failed. */ - public void reconnectFailover(final Callable<Void> c) throws Exception { + public void testReconnectStreamerApi() throws Exception { final Ignite client = grid(serverCount()); - assertTrue(client.cluster().localNode().isClient()); - - Ignite srv = clientRouter(client); - - TestTcpDiscoverySpi srvSpi = spi(srv); - - final AtomicBoolean stop = new AtomicBoolean(false); - - final IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - try { - int iter = 0; - - while (!stop.get()) { - try { - c.call(); - } - catch (CacheException e) { - checkAndWait(e); - } - catch (IgniteClientDisconnectedException e) { - checkAndWait(e); - } - - if (++iter % 100 == 0) - log.info("Iteration: " + iter); - - if (barrier != null) - barrier.await(); - } - - return null; - } - catch (Throwable e) { - stop.set(true); - - log.error("Unexpected error: " + e, e); - - throw e; - } - } - }, THREADS, "test-operation-thread"); - - final AtomicReference<CountDownLatch> reconnected = new AtomicReference<>(); - - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); - - CountDownLatch latch = reconnected.get(); - - assertNotNull(latch); - assertEquals(1, latch.getCount()); + reconnectFailover(new Callable<Void>() { + @Override public Void call() throws Exception { + stream(ATOMIC_CACHE); - latch.countDown(); - } - else if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) - info("Disconnected: " + evt); + stream(TX_CACHE); - return true; + return null; } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - - try { - long stopTime = System.currentTimeMillis() + 60_000; - - String err = null; - while (System.currentTimeMillis() < stopTime && !fut.isDone()) { - U.sleep(100); - - CountDownLatch latch = new CountDownLatch(1); - - reconnected.set(latch); - - UUID nodeId = client.cluster().localNode().id(); - - log.info("Fail client: " + nodeId); - - srvSpi.failNode(nodeId, null); - - if (!latch.await(5000, MILLISECONDS)) { - err = "Failed to wait for reconnect"; - - break; - } + private void stream(String cacheName) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); - barrier = new CyclicBarrier(THREADS + 1, new Runnable() { - @Override public void run() { - barrier = null; - } - }); + try (IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(cacheName)) { + streamer.allowOverwrite(true); - try { - barrier.await(10, SECONDS); - } - catch (TimeoutException e) { - err = "Operation hangs."; + streamer.perNodeBufferSize(10); - break; + for (int i = 0; i < 100; i++) + streamer.addData(rnd.nextInt(100_000), 0); } } - - if (err != null) { - log.error(err); - - U.dumpThreads(log); - - CyclicBarrier barrier0 = barrier; - - if (barrier0 != null) - barrier0.reset(); - - stop.set(true); - - fail(err); - } - - stop.set(true); - - fut.get(); - } - finally { - stop.set(true); - } + }); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java index 27c2a61..a392245 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java @@ -62,6 +62,7 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest { IgniteConfiguration cfg = super.getConfiguration(gridName); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setClientReconnectDisabled(true); if (getTestGridName(nodeCount() - 1).equals(gridName) || getTestGridName(nodeCount() - 2).equals(gridName)) cfg.setClientMode(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java index 071341e..8703d32 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.IgniteException; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.IgniteInternalFuture; @@ -27,7 +26,7 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; -import javax.cache.Cache; +import javax.cache.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -89,7 +88,8 @@ public class IgniteCacheDynamicStopSelfTest extends GridCommonAbstractTest { @Override public void apply(IgniteFuture<?> f) { try { f.get(); - } catch (IgniteException ignore) { + } + catch (CacheException ignore) { // This may be debugged. } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java index af3ea9d..30bf5dc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.transactions.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.indexing.*; import org.apache.ignite.testframework.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 55fae9b..ba38dfc 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -1332,6 +1332,27 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testClientReconnectDisabled() throws Exception { + // TODO IGNTIE-901. + } + + /** + * @throws Exception If failed. + */ + public void testDisconnectAfterNetworkTimeout() throws Exception { + // TODO IGNTIE-901. + } + + /** + * @throws Exception If failed. + */ + public void testReconnectSegmentedAfterJoinTimeout() throws Exception { + // TODO IGNTIE-901. + } + + /** * @param clientIdx Client index. * @param srvIdx Server index. * @throws Exception In case of error. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 64e16bf..b531c35 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -492,7 +492,7 @@ public class GridReduceQueryExecutor { if (ctx.clientDisconnected()) { throw new CacheException("Query was cancelled, client node disconnected.", new IgniteClientDisconnectedException(ctx.cluster().clientReconnectFuture(), - "Client node disconnected.", null)); + "Client node disconnected.")); } Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries(); @@ -573,7 +573,17 @@ public class GridReduceQueryExecutor { if (e instanceof CacheException) throw (CacheException)e; - throw new CacheException("Failed to run reduce query locally.", e); + Throwable cause = e; + + if (e instanceof IgniteCheckedException) { + Throwable disconnectedErr = + ((IgniteCheckedException)e).getCause(IgniteClientDisconnectedException.class); + + if (disconnectedErr != null) + cause = disconnectedErr; + } + + throw new CacheException("Failed to run reduce query locally.", cause); } finally { if (!runs.remove(qryReqId, r)) @@ -1109,7 +1119,7 @@ public class GridReduceQueryExecutor { */ public void onDisconnected(IgniteFuture<?> reconnectFut) { CacheException err = new CacheException("Query was cancelled, client node disconnected.", - new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.", null)); + new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.")); for (Map.Entry<Long, QueryRun> e : runs.entrySet()) e.getValue().state(err, null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java deleted file mode 100644 index b0dc965..0000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java +++ /dev/null @@ -1,428 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors; - -import org.apache.ignite.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.cache.query.annotations.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.query.*; -import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.testframework.*; - -import javax.cache.*; -import java.util.*; -import java.util.concurrent.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheMode.*; - -/** - * - */ -public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstractTest { - /** */ - public static final String QUERY_CACHE = "query"; - - /** {@inheritDoc} */ - @Override protected int serverCount() { - return 3; - } - - /** {@inheritDoc} */ - @Override protected int clientCount() { - return 1; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - CacheConfiguration<Integer, Person> ccfg = new CacheConfiguration<Integer, Person>(QUERY_CACHE) - .setCacheMode(PARTITIONED) - .setAtomicityMode(ATOMIC) - .setBackups(1) - .setIndexedTypes(Integer.class, Person.class); - - cfg.setCacheConfiguration(ccfg); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - super.afterTest(); - - grid(0).getOrCreateCache(QUERY_CACHE).removeAll(); - } - - /** - * @throws Exception If failed. - */ - public void testQueryReconnect() throws Exception { - Ignite cln = grid(serverCount()); - - assertTrue(cln.cluster().localNode().isClient()); - - final Ignite srv = clientRouter(cln); - - final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE); - - final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE); - - clnCache.put(1, new Person(1, "name1", "surname1")); - clnCache.put(2, new Person(2, "name2", "surname2")); - clnCache.put(3, new Person(3, "name3", "surname3")); - - final SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key <> 0"); - - qry.setPageSize(1); - - QueryCursor<Cache.Entry<Integer, Person>> cur = clnCache.query(qry); - - reconnectClientNode(cln, srv, new Runnable() { - @Override public void run() { - srvCache.put(4, new Person(4, "name4", "surname4")); - - try { - clnCache.query(qry); - - fail(); - } catch (CacheException e) { - check(e); - } - } - }); - - List<Cache.Entry<Integer, Person>> res = cur.getAll(); - - assertNotNull(res); - assertEquals(4, res.size()); - } - - /** - * @throws Exception If failed. - */ - public void testReconnectQueryInProgress() throws Exception { - Ignite cln = grid(serverCount()); - - assertTrue(cln.cluster().localNode().isClient()); - - final Ignite srv = clientRouter(cln); - - final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE); - - clnCache.put(1, new Person(1, "name1", "surname1")); - clnCache.put(2, new Person(2, "name2", "surname2")); - clnCache.put(3, new Person(3, "name3", "surname3")); - - blockMessage(GridQueryNextPageResponse.class); - - final SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key <> 0"); - - qry.setPageSize(1); - - final QueryCursor<Cache.Entry<Integer, Person>> cur1 = clnCache.query(qry); - - final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - try { - cur1.getAll(); - } - catch (CacheException e) { - checkAndWait(e); - - return true; - } - - return false; - } - }); - - // Check that client waiting operation. - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - return fut.get(200); - } - }, IgniteFutureTimeoutCheckedException.class, null); - - assertNotDone(fut); - - unblockMessage(); - - reconnectClientNode(cln, srv, null); - - assertTrue((Boolean) fut.get(2, SECONDS)); - - QueryCursor<Cache.Entry<Integer, Person>> cur2 = clnCache.query(qry); - - assertEquals(3, cur2.getAll().size()); - } - - /** - * @throws Exception If failed. - */ - public void testScanQueryReconnect() throws Exception { - Ignite cln = grid(serverCount()); - - assertTrue(cln.cluster().localNode().isClient()); - - final Ignite srv = clientRouter(cln); - - final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE); - - final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE); - - for (int i = 0; i < 100; i++) - clnCache.put(i, new Person(i, "name-" + i, "surname-" + i)); - - final ScanQuery<Integer, Person> scanQry = new ScanQuery<>(); - - scanQry.setPageSize(1); - - scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() { - @Override public boolean apply(Integer integer, Person person) { - return true; - } - }); - - QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(scanQry); - - reconnectClientNode(cln, srv, new Runnable() { - @Override public void run() { - srvCache.put(1000, new Person(1000, "name", "surname")); - - try { - clnCache.query(scanQry); - - fail(); - } - catch (CacheException e) { - check(e); - } - } - }); - - try { - qryCursor.getAll(); - - fail(); - } - catch (CacheException e) { - checkAndWait(e); - } - - qryCursor = clnCache.query(scanQry); - - assertEquals(101, qryCursor.getAll().size()); - } - - /** - * @throws Exception If failed. - */ - public void testScanQueryReconnectInProgress1() throws Exception { - scanQueryReconnectInProgress(false); - } - - /** - * @throws Exception If failed. - */ - public void testScanQueryReconnectInProgress2() throws Exception { - scanQueryReconnectInProgress(true); - } - - /** - * @param setPart If {@code true} sets partition for scan query. - * @throws Exception If failed. - */ - private void scanQueryReconnectInProgress(boolean setPart) throws Exception { - Ignite cln = grid(serverCount()); - - assertTrue(cln.cluster().localNode().isClient()); - - final Ignite srv = clientRouter(cln); - - final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE); - - clnCache.put(1, new Person(1, "name1", "surname1")); - clnCache.put(2, new Person(2, "name2", "surname2")); - clnCache.put(3, new Person(3, "name3", "surname3")); - - final ScanQuery<Integer, Person> scanQry = new ScanQuery<>(); - - scanQry.setPageSize(1); - - scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() { - @Override public boolean apply(Integer integer, Person person) { - return true; - } - }); - - if (setPart) - scanQry.setPartition(1); - - blockMessage(GridCacheQueryResponse.class); - - final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - try { - QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(scanQry); - - qryCursor.getAll(); - } - catch (CacheException e) { - checkAndWait(e); - - return true; - } - - return false; - } - }); - - // Check that client waiting operation. - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - return fut.get(200); - } - }, IgniteFutureTimeoutCheckedException.class, null); - - assertNotDone(fut); - - unblockMessage(); - - reconnectClientNode(cln, srv, null); - - assertTrue((Boolean)fut.get(2, SECONDS)); - - QueryCursor<Cache.Entry<Integer, Person>> qryCursor2 = clnCache.query(scanQry); - - assertEquals(setPart ? 1 : 3, qryCursor2.getAll().size()); - } - - /** - * @param clazz Message class. - */ - private void blockMessage(Class<?> clazz) { - for (int i = 0; i < serverCount(); i++) { - BlockTpcCommunicationSpi commSpi = commSpi(grid(i)); - - commSpi.blockMessage(clazz); - } - } - - /** - * - */ - private void unblockMessage() { - for (int i = 0; i < serverCount(); i++) { - BlockTpcCommunicationSpi commSpi = commSpi(grid(i)); - - commSpi.unblockMessage(); - } - } - - /** - * - */ - public static class Person { - /** */ - @QuerySqlField - public int id; - - /** */ - @QuerySqlField - public String name; - - /** */ - @QuerySqlField - public String surname; - - /** - * @param id Id. - * @param name Name. - * @param surname Surname. - */ - public Person(int id, String name, String surname) { - this.id = id; - this.name = name; - this.surname = surname; - } - - /** - * @return Id. - */ - public int getId() { - return id; - } - - /** - * @param id Set id. - */ - public void setId(int id) { - this.id = id; - } - - /** - * @return Name. - */ - public String getName() { - return name; - } - - /** - * @param name Name. - */ - public void setName(String name) { - this.name = name; - } - - /** - * @return Surname. - */ - public String getSurname() { - return surname; - } - - /** - * @param surname Surname. - */ - public void setSurname(String surname) { - this.surname = surname; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - return this == o || !(o == null || getClass() != o.getClass()) && id == ((Person)o).id; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return id; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(Person.class, this); - } - } -}