Ignite-929
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/71b81e05 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/71b81e05 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/71b81e05 Branch: refs/heads/ignite-929 Commit: 71b81e058c0e441f405d1083b6240cb3332819df Parents: 7442023 Author: avinogradov <avinogra...@gridgain.com> Authored: Tue May 26 20:07:49 2015 +0300 Committer: avinogradov <avinogra...@gridgain.com> Committed: Tue May 26 20:07:49 2015 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 14 +++++++ .../cache/DynamicCacheChangeRequest.java | 39 ++++++++++++------- .../processors/cache/GridCacheProcessor.java | 41 +++++++++++++++----- .../distributed/dht/GridDhtCacheEntry.java | 3 +- 4 files changed, 72 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71b81e05/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 0950774..9737002 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -240,6 +240,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * * @param cacheName Cache name. * @param clientNodeId Near node ID. + * @param nearEnabled Near cache enabled. */ public void addClientNode(String cacheName, UUID clientNodeId, boolean nearEnabled) { CachePredicate predicate = registeredCaches.get(cacheName); @@ -249,6 +250,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * Removes near node ID from cache filter. + * + * @param cacheName Cache name. + * @param clientNodeId Near node ID. + */ + public void onNodeLeft(String cacheName, UUID clientNodeId) { + CachePredicate predicate = registeredCaches.get(cacheName); + + if (predicate != null) + predicate.onNodeLeft(clientNodeId); + } + + /** * @return Client nodes map. */ public Map<String, Map<UUID, Boolean>> clientNodesMap() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71b81e05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index c08a179..7af1572 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -57,6 +57,9 @@ public class DynamicCacheChangeRequest implements Serializable { /** Stop flag. */ private boolean stop; + /** Close flag. */ + private boolean close; + /** Fail if exists flag. */ private boolean failIfExists; @@ -68,23 +71,10 @@ public class DynamicCacheChangeRequest implements Serializable { * * @param cacheName Cache stop name. * @param initiatingNodeId Initiating node ID. - * @param stop Stop flag. */ - public DynamicCacheChangeRequest(String cacheName, UUID initiatingNodeId, boolean stop) { + public DynamicCacheChangeRequest(String cacheName, UUID initiatingNodeId) { this.cacheName = cacheName; this.initiatingNodeId = initiatingNodeId; - - this.stop = stop; - } - - /** - * Constructor means for start requests. - * - * @param cacheName Cache name. - * @param initiatingNodeId Initiating node ID. - */ - public DynamicCacheChangeRequest(String cacheName, UUID initiatingNodeId) { - this(cacheName, initiatingNodeId, false); } /** @@ -130,6 +120,13 @@ public class DynamicCacheChangeRequest implements Serializable { } /** + * @param stop New stop flag. + */ + public void stop(boolean stop) { + this.stop = stop; + } + + /** * @return Cache name. */ public String cacheName() { @@ -220,6 +217,20 @@ public class DynamicCacheChangeRequest implements Serializable { this.failIfExists = failIfExists; } + /** + * @return Close flag. + */ + public boolean close() { + return close; + } + + /** + * @param close New close flag. + */ + public void close(boolean close) { + this.close = close; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheChangeRequest.class, this, "cacheName", cacheName()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71b81e05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 4edfd8b..ccbf10a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1469,7 +1469,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param req Request. */ private void stopGateway(DynamicCacheChangeRequest req) { - assert req.stop(); + assert req.stop() || req.close(); // Break the proxy before exchange future is done. IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(maskNull(req.cacheName())); @@ -1482,7 +1482,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param req Stop request. */ public void prepareCacheStop(DynamicCacheChangeRequest req) { - assert req.stop(); + assert req.stop() || req.close(); GridCacheAdapter<?, ?> cache = caches.remove(maskNull(req.cacheName())); @@ -1939,7 +1939,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Future that will be completed when cache is destroyed. */ public IgniteInternalFuture<?> dynamicDestroyCache(String cacheName) { - DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId(), true); + DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId()); + + t.stop(true); return F.first(initiateCacheChanges(F.asList(t))); } @@ -1958,14 +1960,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCacheAdapter<?, ?> cache = caches.get(maskNull(cacheName)); if (cache != null && !cache.context().affinityNode()) { - if (caches.remove(maskNull(cacheName)) != null) { - GridCacheContext<?, ?> ctx = cache.context(); + GridCacheContext<?, ?> ctx = cache.context(); - sharedCtx.removeCacheContext(ctx); + DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId()); - onKernalStop(cache, true); - stopCache(cache, true); - } + t.close(true); + + return F.first(initiateCacheChanges(F.asList(t))); } return null; // No-op. @@ -1985,7 +1986,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req.deploymentId(), req); try { - if (req.stop()) { + if (req.stop() || req.close()) { DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); if (desc == null) @@ -2127,6 +2128,26 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.initiatingNodeId(), req.nearCacheConfiguration() != null); } + else if (req.close()) { + + if (req.initiatingNodeId().equals(ctx.localNodeId())) { + stopGateway(req); + + prepareCacheStop(req); + + if (desc != null) + registeredCaches.remove(maskNull(req.cacheName()), desc); + } + + ctx.discovery().onNodeLeft(req.cacheName(), req.initiatingNodeId()); + + DynamicCacheStartFuture changeFut = (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName())); + + if (changeFut != null && changeFut.deploymentId().equals(req.deploymentId())) { + // No-op. + changeFut.onDone(); + } + } else { if (desc == null) { // If local node initiated start, fail the start future. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71b81e05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index c9a7af8..1ecc63c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -601,7 +601,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { List<ReaderId> newRdrs = null; for (int i = 0; i < rdrs.length; i++) { - if (!cctx.discovery().alive(rdrs[i].nodeId())) { + if (!cctx.discovery().alive(rdrs[i].nodeId()) || + !cctx.discovery().cacheNode(cctx.discovery().node(rdrs[i].nodeId), cacheName())) { // Node has left and if new list has already been created, just skip. // Otherwise, create new list and add alive nodes. if (newRdrs == null) {