Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 1b2e0b113 -> c9019888b
# ignite-901 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c9019888 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c9019888 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c9019888 Branch: refs/heads/ignite-901 Commit: c9019888b70c13af599411b32b910e80aaea6081 Parents: 1b2e0b1 Author: sboikov <sboi...@gridgain.com> Authored: Tue Jul 14 12:01:10 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Jul 14 12:11:36 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 15 +++++++++++ .../discovery/GridDiscoveryManager.java | 27 +++++++++++++++----- .../processors/cache/GridCacheContext.java | 4 +-- .../GridCachePartitionExchangeManager.java | 17 ++++++++++++ .../IgniteClientReconnectAbstractTest.java | 1 - 5 files changed, 54 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9019888/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 29f7e9d..90218ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2869,6 +2869,21 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { for (GridComponent comp : ctx.components()) comp.onReconnected(clusterRestarted); + + ctx.cache().context().exchange().reconnectExchangeFuture().listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + try { + fut.get(); + + ctx.gateway().onReconnected(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to reconnect, will stop node", e); + + close(); + } + } + }); } catch (IgniteCheckedException e) { err = e; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9019888/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 64a5e89..a95a200 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -380,11 +380,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private long gridStartTime; @Override public void onDiscovery( - int type, - long topVer, - ClusterNode node, - Collection<ClusterNode> topSnapshot, - Map<Long, Collection<ClusterNode>> snapshots, + final int type, + final long topVer, + final ClusterNode node, + final Collection<ClusterNode> topSnapshot, + final Map<Long, Collection<ClusterNode>> snapshots, @Nullable DiscoverySpiCustomMessage spiCustomMsg ) { if (type == EVT_NODE_JOINED && node.isLocal() && ctx.clientDisconnected()) { @@ -433,7 +433,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { verChanged = false; } - AffinityTopologyVersion nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer); + final AffinityTopologyVersion nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer); if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT) { for (DiscoCache c : discoCacheHist.values()) @@ -521,7 +521,20 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted); - ctx.gateway().onReconnected(); + ctx.cluster().clientReconnectFuture().listen(new CI1<IgniteFuture<?>>() { + @Override public void apply(IgniteFuture<?> fut) { + try { + fut.get(); + + discoWrk.addEvent(type, nextTopVer, node, topSnapshot, null); + } + catch (IgniteException ignore) { + // No-op. + } + } + }); + + return; } discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9019888/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index eb813c3..c6c9f6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -765,7 +765,7 @@ public class GridCacheContext<K, V> implements Externalizable { * @return Partition topology. */ public GridDhtPartitionTopology topology() { - assert isNear() || isDht() || isColocated() || isDhtAtomic(); + assert isNear() || isDht() || isColocated() || isDhtAtomic() : cache; return isNear() ? near().dht().topology() : dht().topology(); } @@ -774,7 +774,7 @@ public class GridCacheContext<K, V> implements Externalizable { * @return Topology version future. */ public GridDhtTopologyFuture topologyVersionFuture() { - assert isNear() || isDht() || isColocated() || isDhtAtomic(); + assert isNear() || isDht() || isColocated() || isDhtAtomic() : cache; GridDhtTopologyFuture fut = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9019888/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index dd3d1d2..0db5273 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -97,6 +97,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private final AtomicReference<AffinityTopologyVersion> readyTopVer = new AtomicReference<>(AffinityTopologyVersion.NONE); + /** */ + private GridFutureAdapter<?> reconnectExchangeFut; + /** * Partition map futures. * This set also contains already completed exchange futures to address race conditions when coordinator @@ -237,6 +240,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana }); } + /** + * @return Reconnect partition exchange future. + */ + public IgniteInternalFuture<?> reconnectExchangeFuture() { + return reconnectExchangeFut; + } + /** {@inheritDoc} */ @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException { super.onKernalStart0(reconnect); @@ -260,6 +270,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, null); + if (reconnect) + reconnectExchangeFut = new GridFutureAdapter<>(); + new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start(); onDiscoveryEvent(cctx.localNodeId(), fut); @@ -275,10 +288,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (GridCacheContext cacheCtx : cctx.cacheContexts()) cacheCtx.preloader().onInitialExchangeComplete(null); + + reconnectExchangeFut.onDone(); } catch (IgniteCheckedException e) { for (GridCacheContext cacheCtx : cctx.cacheContexts()) cacheCtx.preloader().onInitialExchangeComplete(e); + + reconnectExchangeFut.onDone(e); } } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9019888/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index c034b12..af892ca 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -67,7 +67,6 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra disco.setIpFinder(ipFinder); disco.setJoinTimeout(2 * 60_000); - disco.setNetworkTimeout(1000); disco.setSocketTimeout(1000); cfg.setDiscoverySpi(disco);