Repository: incubator-ignite Updated Branches: refs/heads/ignite-929 b6c7eaee3 -> adad6cc4f
# ignite-929 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/adad6cc4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/adad6cc4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/adad6cc4 Branch: refs/heads/ignite-929 Commit: adad6cc4f83fdb314a24f057eb64ceb6418da09f Parents: b6c7eae Author: sboikov <sboi...@gridgain.com> Authored: Thu Jul 9 12:42:24 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jul 9 13:20:46 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheGateway.java | 23 -- .../GridCachePartitionExchangeManager.java | 6 +- .../processors/cache/GridCacheProcessor.java | 98 +++-- .../processors/cache/IgniteCacheProxy.java | 406 +++++++++++++------ .../GridDhtPartitionsExchangeFuture.java | 2 +- .../cache/CacheStopAndDestroySelfTest.java | 234 +++++++---- 6 files changed, 496 insertions(+), 273 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/adad6cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index df450d0..f2beb0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -35,9 +35,6 @@ public class GridCacheGateway<K, V> { /** Stopped flag for dynamic caches. */ private volatile boolean stopped; - /** Closed flag for dynamic caches. */ - private final ThreadLocal<Boolean> closed = new ThreadLocal<>(); - /** */ private GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock(); @@ -153,12 +150,6 @@ public class GridCacheGateway<K, V> { throw new IllegalStateException("Cache has been stopped: " + ctx.name()); } - if (closed.get() != null) { - rwLock.readUnlock(); - - throw new IllegalStateException("Cache has been closed: " + ctx.name()); - } - // Must unlock in case of unexpected errors to avoid // deadlocks during kernal stop. try { @@ -245,20 +236,6 @@ public class GridCacheGateway<K, V> { /** * */ - public void open() { - closed.remove(); - } - - /** - * - */ - public void close() { - closed.set(Boolean.TRUE); - } - - /** - * - */ public void onStopped() { boolean interrupted = false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/adad6cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index af87685..4398b4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -156,16 +156,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana // Validate requests to check if event should trigger partition exchange. for (DynamicCacheChangeRequest req : batch.requests()) { - if (cctx.cache().dynamicCacheRegistered(req)) + if (cctx.cache().exchangeNeeded(req)) valid.add(req); else cctx.cache().completeStartFuture(req); } if (!F.isEmpty(valid)) { - exchId = exchangeId(n.id(), - affinityTopologyVersion(e), - e.type()); + exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type()); exchFut = exchangeFuture(exchId, e, valid); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/adad6cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index e494cd4..f564cb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1390,7 +1390,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return {@code True} if change request was registered to apply. */ @SuppressWarnings("IfMayBeConditional") - public boolean dynamicCacheRegistered(DynamicCacheChangeRequest req) { + public boolean exchangeNeeded(DynamicCacheChangeRequest req) { DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); if (desc != null) { @@ -1519,20 +1519,26 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param req Stop request. */ public void blockGateway(DynamicCacheChangeRequest req) { - assert req.stop(); + assert req.stop() || req.close(); - // Break the proxy before exchange future is done. - IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName())); + if (req.stop() || (req.close() && req.initiatingNodeId().equals(ctx.localNodeId()))) { + // Break the proxy before exchange future is done. + IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName())); - if (proxy != null) - proxy.gate().block(); + if (proxy != null) { + if (req.stop()) + proxy.gate().block(); + else + proxy.closeProxy(); + } + } } /** * @param req Request. */ private void stopGateway(DynamicCacheChangeRequest req) { - assert req.stop() || req.close() : req; + assert req.stop() : req; // Break the proxy before exchange future is done. IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(maskNull(req.cacheName())); @@ -1601,6 +1607,20 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (desc != null && desc.cancelled() && desc.deploymentId().equals(req.deploymentId())) registeredCaches.remove(masked, desc); } + else if (req.close() && req.initiatingNodeId().equals(ctx.localNodeId())) { + IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(masked); + + if (proxy != null) { + if (proxy.context().affinityNode()) { + GridCacheAdapter<?, ?> cache = caches.get(masked); + + if (cache != null) + jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false)); + } + else + prepareCacheStop(req); + } + } completeStartFuture(req); } @@ -2034,29 +2054,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { public IgniteInternalFuture<?> dynamicCloseCache(String cacheName) { IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(cacheName)); - // Closing gateway first. - if (proxy != null) - proxy.gate().close(); - - CacheConfiguration cfg = ctx.cache().cacheConfiguration(cacheName); - - if (cfg.getCacheMode() == LOCAL) - return dynamicDestroyCache(cacheName); - else { - GridCacheAdapter<?, ?> cache = caches.get(maskNull(cacheName)); - - if (cache != null && !cache.context().affinityNode()) { - GridCacheContext<?, ?> ctx = cache.context(); + if (proxy == null || proxy.proxyClosed()) + return new GridFinishedFuture<>(); // No-op. - DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId()); + checkEmptyTransactions(); - t.close(true); + DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId()); - return F.first(initiateCacheChanges(F.asList(t), false)); - } + t.close(true); - return new GridFinishedFuture<>(); // No-op. - } + return F.first(initiateCacheChanges(F.asList(t), false)); } /** @@ -2081,9 +2088,17 @@ public class GridCacheProcessor extends GridProcessorAdapter { // No-op. fut.onDone(); else { + assert desc.cacheConfiguration() != null : desc; + + if (req.close() && desc.cacheConfiguration().getCacheMode() == LOCAL) { + req.close(false); + + req.stop(true); + } + IgniteUuid dynamicDeploymentId = desc.deploymentId(); - assert dynamicDeploymentId != null; + assert dynamicDeploymentId != null : desc; // Save deployment ID to avoid concurrent stops. req.deploymentId(dynamicDeploymentId); @@ -2230,21 +2245,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.initiatingNodeId(), req.nearCacheConfiguration() != null); } - else if (req.close()) { - if (req.initiatingNodeId().equals(ctx.localNodeId())) { - stopGateway(req); - - prepareCacheStop(req); - } - - ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId()); - - completeStartFuture(req); - } else { + assert req.stop() || req.close() : req; + if (desc == null) { - // If local node initiated start, fail the start future. - DynamicCacheStartFuture changeFut = (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName())); + // If local node initiated start, finish future. + DynamicCacheStartFuture changeFut = + (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName())); if (changeFut != null && changeFut.deploymentId().equals(req.deploymentId())) { // No-op. @@ -2254,9 +2261,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { return; } - desc.onCancelled(); + if (req.stop()) { + desc.onCancelled(); - ctx.discovery().removeCacheFilter(req.cacheName()); + ctx.discovery().removeCacheFilter(req.cacheName()); + } + else + ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId()); } } } @@ -2728,9 +2739,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cache == null) cache = startJCache(cacheName, failIfNotStarted); - if (cache != null) - cache.gate().open(); - return (IgniteCacheProxy<K, V>)cache; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/adad6cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 3a5e27f..080502c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -171,19 +171,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public CacheMetrics metrics() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return ctx.cache().metrics(); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public CacheMetrics metrics(ClusterGroup grp) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size()); @@ -202,19 +206,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return new CacheMetricsSnapshot(ctx.cache().metrics(), metrics); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public CacheMetricsMXBean mxBean() { - CacheOperationContext prev = gate.enter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return ctx.cache().mxBean(); } finally { - gate.leave(prev); + onLeave(gate, prev); } } @@ -230,19 +236,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Nullable @Override public Cache.Entry<K, V> randomEntry() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return ctx.cache().randomEntry(); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { CacheOperationContext prj0 = opCtx != null ? opCtx.withExpiryPolicy(plc) : @@ -251,7 +261,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return new IgniteCacheProxy<>(ctx, delegate, prj0, isAsync(), lock); } finally { - onLeave(prev); + onLeave(gate, prev); } } @@ -262,7 +272,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public IgniteCache<K, V> withNoRetries() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { boolean noRetries = opCtx != null && opCtx.noRetries(); @@ -280,14 +292,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V lock); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) @@ -296,7 +310,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V ctx.cache().globalLoadCache(p, args); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -307,7 +321,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) @@ -316,7 +332,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V delegate.localLoadCache(p, args); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -327,7 +343,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -339,7 +357,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.getAndPutIfAbsent(key, val); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -359,13 +377,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public boolean isLocalLocked(K key, boolean byCurrThread) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return byCurrThread ? delegate.isLockedByThread(key) : delegate.isLocked(key); } finally { - onLeave(prev); + onLeave(gate, prev); } } @@ -517,7 +537,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V @Override public <R> QueryCursor<R> query(Query<R> qry) { A.notNull(qry, "qry"); - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -558,7 +580,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw new CacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } @@ -589,7 +611,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public Iterable<Cache.Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return delegate.localEntries(peekModes); @@ -598,37 +622,43 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public QueryMetrics queryMetrics() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return delegate.context().queries().metrics(); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void localEvict(Collection<? extends K> keys) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { delegate.evictAll(keys); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return delegate.localPeek(key, peekModes, null); @@ -637,20 +667,22 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void localPromote(Set<? extends K> keys) throws CacheException { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { delegate.promoteAll(keys); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -660,7 +692,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public int size(CachePeekMode... peekModes) throws CacheException { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -675,13 +709,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public int localSize(CachePeekMode... peekModes) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return delegate.localSize(peekModes); @@ -690,14 +726,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public V get(K key) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -709,7 +747,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.get(key); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -720,7 +758,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public Map<K, V> getAll(Set<? extends K> keys) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -732,7 +772,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.getAll(keys); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -743,7 +783,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -755,7 +797,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.getAllOutTx(keys); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -769,7 +811,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V */ public Map<K, V> getAll(Collection<? extends K> keys) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -781,7 +825,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.getAll(keys); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -796,19 +840,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @return Entry set. */ public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return delegate.entrySetx(filter); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public boolean containsKey(K key) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -820,13 +868,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.containsKey(key); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public boolean containsKeys(Set<? extends K> keys) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -838,7 +888,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.containsKeys(keys); } finally { - onLeave(prev); + onLeave(gate, prev); } } @@ -848,7 +898,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V boolean replaceExisting, @Nullable final CompletionListener completionLsnr ) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { IgniteInternalFuture<?> fut = ctx.cache().loadAll(keys, replaceExisting); @@ -869,14 +921,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void put(K key, V val) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -896,7 +950,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V delegate.put(key, val); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -907,7 +961,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public V getAndPut(K key, V val) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -919,7 +975,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.getAndPut(key, val); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -930,7 +986,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public void putAll(Map<? extends K, ? extends V> map) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) @@ -939,7 +997,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V delegate.putAll(map); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -950,7 +1008,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public boolean putIfAbsent(K key, V val) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -962,7 +1022,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.putIfAbsent(key, val); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -973,7 +1033,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public boolean remove(K key) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -985,7 +1047,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.remove(key); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -996,7 +1058,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public boolean remove(K key, V oldVal) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1008,7 +1072,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.remove(key, oldVal); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1019,7 +1083,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public V getAndRemove(K key) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1031,7 +1097,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.getAndRemove(key); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1042,7 +1108,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public boolean replace(K key, V oldVal, V newVal) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1054,7 +1122,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.replace(key, oldVal, newVal); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1065,7 +1133,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public boolean replace(K key, V val) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1077,7 +1147,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.replace(key, val); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1088,7 +1158,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public V getAndReplace(K key, V val) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1100,7 +1172,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.getAndReplace(key, val); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1111,7 +1183,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public void removeAll(Set<? extends K> keys) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) @@ -1120,7 +1194,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V delegate.removeAll(keys); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1130,7 +1204,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public void removeAll() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) @@ -1142,13 +1218,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void clear(K key) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) @@ -1160,13 +1238,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void clearAll(Set<? extends K> keys) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) @@ -1178,13 +1258,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void clear() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) @@ -1196,32 +1278,36 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void localClear(K key) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { delegate.clearLocally(key); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void localClearAll(Set<? extends K> keys) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { for (K key : keys) delegate.clearLocally(key); } finally { - onLeave(prev); + onLeave(gate, prev); } } @@ -1229,7 +1315,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws EntryProcessorException { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1255,7 +1343,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1267,7 +1355,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... args) throws EntryProcessorException { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1293,7 +1383,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1303,10 +1393,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, - EntryProcessor<K, V, T> entryProcessor, - Object... args) { + EntryProcessor<K, V, T> entryProcessor, + Object... args) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1318,7 +1410,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.invokeAll(keys, entryProcessor, args); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1331,7 +1423,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V CacheEntryProcessor<K, V, T> entryProcessor, Object... args) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1343,7 +1437,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.invokeAll(keys, entryProcessor, args); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1356,7 +1450,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1368,7 +1464,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.invokeAll(map, args); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1395,7 +1491,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public void destroy() { - if (!onEnterIfNoStop()) + GridCacheGateway<K, V> gate = this.gate; + + if (!onEnterIfNoStop(gate)) return; IgniteInternalFuture<?> fut; @@ -1404,7 +1502,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V fut = ctx.kernalContext().cache().dynamicDestroyCache(ctx.name()); } finally { - onLeave(); + onLeave(gate); } try { @@ -1417,7 +1515,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public void close() { - if (!onEnterIfNoStop()) + GridCacheGateway<K, V> gate = this.gate; + + if (!onEnterIfNoStop(gate)) return; IgniteInternalFuture<?> fut; @@ -1426,7 +1526,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V fut = ctx.kernalContext().cache().dynamicCloseCache(ctx.name()); } finally { - onLeave(); + onLeave(gate); } try { @@ -1439,14 +1539,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public boolean isClosed() { - if (!onEnterIfNoStop()) + GridCacheGateway<K, V> gate = this.gate; + + if (!onEnterIfNoStop(gate)) return true; try { return ctx.kernalContext().cache().context().closed(ctx); } finally { - onLeave(); + onLeave(gate); } } @@ -1470,7 +1572,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false); @@ -1479,13 +1583,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { ctx.continuousQueries().cancelJCacheQuery(lsnrCfg); @@ -1494,19 +1600,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public Iterator<Cache.Entry<K, V>> iterator() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return ctx.cache().igniteIterator(); } finally { - onLeave(prev); + onLeave(gate, prev); } } @@ -1539,7 +1647,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @return Projection for portable objects. */ public <K1, V1> IgniteCache<K1, V1> keepPortable() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { CacheOperationContext opCtx0 = @@ -1557,7 +1667,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V lock); } finally { - onLeave(prev); + onLeave(gate, prev); } } @@ -1565,7 +1675,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @return Cache with skip store enabled. */ public IgniteCache<K, V> skipStore() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { boolean skip = opCtx != null && opCtx.skipStore(); @@ -1587,7 +1699,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V lock); } finally { - onLeave(prev); + onLeave(gate, prev); } } @@ -1614,10 +1726,68 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** + * @return {@code True} if proxy was closed. + */ + public boolean proxyClosed() { + return !gate.getClass().equals(GridCacheGateway.class); + } + + /** + * Closes this proxy instance. + */ + public void closeProxy() { + gate = new GridCacheGateway<K, V>(ctx) { + @Override public void enter() { + throw new IllegalStateException("Cache has been closed: " + ctx.name()); + } + + @Override public boolean enterIfNotStopped() { + return false; + } + + @Override public boolean enterIfNotStoppedNoLock() { + return false; + } + + @Override public void leaveNoLock() { + assert false; + } + + @Override public void leave() { + assert false; + } + + @Nullable @Override public CacheOperationContext enter(@Nullable CacheOperationContext opCtx) { + throw new IllegalStateException("Cache has been closed: " + ctx.name()); + } + + @Nullable @Override public CacheOperationContext enterNoLock(@Nullable CacheOperationContext opCtx) { + throw new IllegalStateException("Cache has been closed: " + ctx.name()); + } + + @Override public void leave(CacheOperationContext prev) { + assert false; + } + + @Override public void leaveNoLock(CacheOperationContext prev) { + assert false; + } + + @Override public void block() { + // No-op. + } + + @Override public void onStopped() { + // No-op. + } + }; + } + + /** * @param opCtx Cache operation context to guard. * @return Previous projection set on this thread. */ - private CacheOperationContext onEnter(CacheOperationContext opCtx) { + private CacheOperationContext onEnter(GridCacheGateway<K, V> gate, CacheOperationContext opCtx) { if (lock) return gate.enter(opCtx); else @@ -1629,7 +1799,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * * @return {@code True} if enter successful. */ - private boolean onEnterIfNoStop() { + private boolean onEnterIfNoStop(GridCacheGateway<K, V> gate) { if (lock) return gate.enterIfNotStopped(); else @@ -1639,7 +1809,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** * @param opCtx Operation context to guard. */ - private void onLeave(CacheOperationContext opCtx) { + private void onLeave(GridCacheGateway<K, V> gate, CacheOperationContext opCtx) { if (lock) gate.leave(opCtx); else @@ -1649,7 +1819,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** * On leave. */ - private void onLeave() { + private void onLeave(GridCacheGateway<K, V> gate) { if (lock) gate.leave(); else http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/adad6cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index bae55ca..5701749 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -911,7 +911,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT */ private void blockGateways() { for (DynamicCacheChangeRequest req : reqs) { - if (req.stop()) + if (req.stop() || req.close()) cctx.cache().blockGateway(req); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/adad6cc4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java index 536ddc3..79c2a2a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java @@ -40,6 +40,8 @@ import javax.cache.configuration.*; import java.util.*; import java.util.concurrent.atomic.*; +import static org.apache.ignite.cache.CacheMode.*; + /** * Checks stop and destroy methods behavior. */ @@ -123,7 +125,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { CacheConfiguration cfg = defaultCacheConfiguration(); cfg.setName(CACHE_NAME_DHT); - cfg.setCacheMode(CacheMode.PARTITIONED); + cfg.setCacheMode(PARTITIONED); cfg.setNearConfiguration(null); return cfg; @@ -136,7 +138,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { CacheConfiguration cfg = defaultCacheConfiguration(); cfg.setName(CACHE_NAME_CLIENT); - cfg.setCacheMode(CacheMode.PARTITIONED); + cfg.setCacheMode(PARTITIONED); cfg.setNearConfiguration(null); return cfg; @@ -149,7 +151,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { CacheConfiguration cfg = defaultCacheConfiguration(); cfg.setName(CACHE_NAME_NEAR); - cfg.setCacheMode(CacheMode.PARTITIONED); + cfg.setCacheMode(PARTITIONED); cfg.setNearConfiguration(new NearCacheConfiguration()); return cfg; @@ -162,7 +164,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { CacheConfiguration cfg = defaultCacheConfiguration(); cfg.setName(CACHE_NAME_LOC); - cfg.setCacheMode(CacheMode.LOCAL); + cfg.setCacheMode(LOCAL); cfg.setNearConfiguration(null); return cfg; @@ -175,6 +177,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { */ public void testDhtDoubleDestroy() throws Exception { dhtDestroy(); + dhtDestroy(); } @@ -186,29 +189,36 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { private void dhtDestroy() throws Exception { grid(0).getOrCreateCache(getDhtConfig()); - assert grid(0).cache(CACHE_NAME_DHT).get(KEY_VAL) == null; + assertNull(grid(0).cache(CACHE_NAME_DHT).get(KEY_VAL)); grid(0).cache(CACHE_NAME_DHT).put(KEY_VAL, KEY_VAL); - assert grid(0).cache(CACHE_NAME_DHT).get(KEY_VAL).equals(KEY_VAL); + assertEquals(KEY_VAL, grid(0).cache(CACHE_NAME_DHT).get(KEY_VAL)); + assertEquals(KEY_VAL, grid(1).cache(CACHE_NAME_DHT).get(KEY_VAL)); + assertEquals(KEY_VAL, grid(2).cache(CACHE_NAME_DHT).get(KEY_VAL)); - //DHT Destroy. Cache should be removed from each node. + assertFalse(grid(0).configuration().isClientMode()); + + // DHT Destroy. Cache should be removed from each node. grid(0).cache(CACHE_NAME_DHT).destroy(); try { grid(0).cache(CACHE_NAME_DHT).get(KEY_VAL); - assert false; + + fail(); } catch (IllegalArgumentException | IllegalStateException ignored0) { try { grid(1).cache(CACHE_NAME_DHT).get(KEY_VAL); - assert false; + + fail(); } catch (IllegalArgumentException | IllegalStateException ignored1) { try { grid(2).cache(CACHE_NAME_DHT).get(KEY_VAL); - assert false; + + fail(); } catch (IllegalArgumentException | IllegalStateException ignored2) { // No-op @@ -224,6 +234,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { */ public void testClientDoubleDestroy() throws Exception { clientDestroy(); + clientDestroy(); } @@ -235,29 +246,36 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { private void clientDestroy() throws Exception { grid(0).getOrCreateCache(getClientConfig()); - assert grid(0).cache(CACHE_NAME_CLIENT).get(KEY_VAL) == null; + assertNull(grid(0).cache(CACHE_NAME_CLIENT).get(KEY_VAL)); grid(0).cache(CACHE_NAME_CLIENT).put(KEY_VAL, KEY_VAL); - assert grid(0).cache(CACHE_NAME_CLIENT).get(KEY_VAL).equals(KEY_VAL); + assertEquals(KEY_VAL, grid(0).cache(CACHE_NAME_CLIENT).get(KEY_VAL)); + assertEquals(KEY_VAL, grid(1).cache(CACHE_NAME_CLIENT).get(KEY_VAL)); + assertEquals(KEY_VAL, grid(2).cache(CACHE_NAME_CLIENT).get(KEY_VAL)); - //DHT Destroy from client node. Cache should be removed from each node. + // DHT Destroy from client node. Cache should be removed from each node. + + assertTrue(grid(2).configuration().isClientMode()); grid(2).cache(CACHE_NAME_CLIENT).destroy();// Client node. try { grid(0).cache(CACHE_NAME_CLIENT).get(KEY_VAL); - assert false; + + fail(); } catch (IllegalArgumentException | IllegalStateException ignored0) { try { grid(1).cache(CACHE_NAME_CLIENT).get(KEY_VAL); - assert false; + + fail(); } catch (IllegalArgumentException | IllegalStateException ignored1) { try { grid(2).cache(CACHE_NAME_CLIENT).get(KEY_VAL); - assert false; + + fail(); } catch (IllegalArgumentException | IllegalStateException ignored2) { // No-op @@ -273,6 +291,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { */ public void testNearDoubleDestroy() throws Exception { nearDestroy(); + nearDestroy(); } @@ -286,31 +305,34 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { grid(2).getOrCreateNearCache(CACHE_NAME_NEAR, new NearCacheConfiguration()); - assert grid(0).cache(CACHE_NAME_NEAR).get(KEY_VAL) == null; - assert grid(2).cache(CACHE_NAME_NEAR).get(KEY_VAL) == null; + assertNull(grid(0).cache(CACHE_NAME_NEAR).get(KEY_VAL)); + assertNull(grid(2).cache(CACHE_NAME_NEAR).get(KEY_VAL)); grid(2).cache(CACHE_NAME_NEAR).put(KEY_VAL, KEY_VAL); grid(0).cache(CACHE_NAME_NEAR).put(KEY_VAL, "near-test"); - assert grid(2).cache(CACHE_NAME_NEAR).localPeek(KEY_VAL).equals("near-test"); + assertEquals("near-test", grid(2).cache(CACHE_NAME_NEAR).localPeek(KEY_VAL)); - //Local destroy. Cache should be removed from each node. + // Local destroy. Cache should be removed from each node. grid(2).cache(CACHE_NAME_NEAR).destroy(); try { grid(0).cache(CACHE_NAME_NEAR).get(KEY_VAL); - assert false; + + fail(); } catch (IllegalArgumentException | IllegalStateException ignored0) { try { grid(1).cache(CACHE_NAME_NEAR).get(KEY_VAL); - assert false; + + fail(); } catch (IllegalArgumentException | IllegalStateException ignored1) { try { grid(2).cache(CACHE_NAME_NEAR).get(KEY_VAL); - assert false; + + fail(); } catch (IllegalArgumentException | IllegalStateException ignored2) { // No-op @@ -326,6 +348,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { */ public void testLocalDoubleDestroy() throws Exception { localDestroy(); + localDestroy(); } @@ -352,17 +375,20 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { try { grid(0).cache(CACHE_NAME_LOC).get(KEY_VAL); - assert false; + + fail(); } catch (IllegalArgumentException | IllegalStateException ignored0) { try { grid(1).cache(CACHE_NAME_LOC).get(KEY_VAL); - assert false; + + fail(); } catch (IllegalArgumentException | IllegalStateException ignored1) { try { grid(2).cache(CACHE_NAME_LOC).get(KEY_VAL); - assert false; + + fail(); } catch (IllegalArgumentException | IllegalStateException ignored2) { // No-op @@ -377,43 +403,48 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testDhtClose() throws Exception { - IgniteCache<String, String> dhtCache0 = grid(0).getOrCreateCache(getDhtConfig()); + IgniteCache<Integer, Integer> dhtCache0 = grid(0).getOrCreateCache(getDhtConfig()); - assert dhtCache0.get(KEY_VAL) == null; + final Integer key = primaryKey(dhtCache0); - dhtCache0.put(KEY_VAL, KEY_VAL); + assertNull(dhtCache0.get(key)); - assert dhtCache0.get(KEY_VAL).equals(KEY_VAL); + dhtCache0.put(key, key); - //DHT Close. No-op. + assertEquals(key, dhtCache0.get(key)); - IgniteCache<String, String> dhtCache1 = grid(1).cache(CACHE_NAME_DHT); - IgniteCache<String, String> dhtCache2 = grid(2).cache(CACHE_NAME_DHT); + // DHT Close. No-op. + + IgniteCache<Integer, Integer> dhtCache1 = grid(1).cache(CACHE_NAME_DHT); + IgniteCache<Integer, Integer> dhtCache2 = grid(2).cache(CACHE_NAME_DHT); dhtCache0.close(); try { - dhtCache0.get(KEY_VAL);// Not affected, but can not be taken. - assert false; + dhtCache0.get(key);// Not affected, but can not be taken. + + fail(); } catch (IllegalStateException ignored) { // No-op } - assert dhtCache1.get(KEY_VAL).equals(KEY_VAL);// Not affected. - assert dhtCache2.get(KEY_VAL).equals(KEY_VAL);// Not affected. + assertEquals(key, dhtCache1.get(key)); // Not affected. + assertEquals(key, dhtCache2.get(key));// Not affected. - //DHT Creation after closed. + // DHT Creation after closed. - dhtCache0 = grid(0).cache(CACHE_NAME_DHT); + IgniteCache<Integer, Integer> dhtCache0New = grid(0).cache(CACHE_NAME_DHT); - assert dhtCache0.get(KEY_VAL).equals(KEY_VAL);// Not affected, can be taken since cache reopened. + assertNotSame(dhtCache0, dhtCache0New); - dhtCache2.put(KEY_VAL, KEY_VAL + "recreated"); + assertEquals(key, dhtCache0New.get(key)); // Not affected, can be taken since cache reopened. - assert dhtCache0.get(KEY_VAL).equals(KEY_VAL + "recreated"); + dhtCache2.put(key, key + 1); - //Check close at last node. + assertEquals((Object)(key + 1), dhtCache0New.get(key)); + + // Check close at last node. stopAllGrids(true); @@ -421,18 +452,19 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { dhtCache0 = grid(0).getOrCreateCache(getDhtConfig()); - assert dhtCache0.get(KEY_VAL) == null; + assertNull(dhtCache0.get(key)); - dhtCache0.put(KEY_VAL, KEY_VAL); + dhtCache0.put(key, key); - assert dhtCache0.get(KEY_VAL).equals(KEY_VAL); + assertEquals(key, dhtCache0.get(key)); // Closing last node. dhtCache0.close(); try { - dhtCache0.get(KEY_VAL);// Can not be taken. - assert false; + dhtCache0.get(key);// Can not be taken. + + fail(); } catch (IllegalStateException ignored) { // No-op @@ -441,7 +473,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { // Reopening cache. dhtCache0 = grid(0).cache(CACHE_NAME_DHT); - assert dhtCache0.get(KEY_VAL).equals(KEY_VAL);// Entry not loosed. + assertEquals(key, dhtCache0.get(key)); // Entry not loosed. } /** @@ -457,9 +489,16 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { IgniteCache<String, String> cache1 = grid(1).cache(CACHE_NAME_DHT); IgniteCache<String, String> cache2 = grid(2).cache(CACHE_NAME_DHT); - assert cache0.get(KEY_VAL) == null || cache0.get(KEY_VAL).equals(curVal); - assert cache1.get(KEY_VAL) == null || cache1.get(KEY_VAL).equals(curVal); - assert cache2.get(KEY_VAL) == null || cache2.get(KEY_VAL).equals(curVal); + if (i == 0) { + assert cache0.get(KEY_VAL) == null; + assert cache1.get(KEY_VAL) == null; + assert cache2.get(KEY_VAL) == null; + } + else { + assert cache0.get(KEY_VAL).equals(curVal); + assert cache1.get(KEY_VAL).equals(curVal); + assert cache2.get(KEY_VAL).equals(curVal); + } curVal = KEY_VAL + curVal; @@ -486,7 +525,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { assert cache0.get(KEY_VAL).equals(KEY_VAL); - //DHT Close from client node. Should affect only client node. + // DHT Close from client node. Should affect only client node. IgniteCache<String, String> cache1 = grid(1).cache(CACHE_NAME_CLIENT); IgniteCache<String, String> cache2 = grid(2).cache(CACHE_NAME_CLIENT); @@ -500,22 +539,25 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { try { cache2.get(KEY_VAL);// Affected. + assert false; } catch (IllegalStateException ignored) { // No-op } - //DHT Creation from client node after closed. - cache2 = grid(2).cache(CACHE_NAME_CLIENT); + // DHT Creation from client node after closed. + IgniteCache<String, String> cache2New = grid(2).cache(CACHE_NAME_CLIENT); - assert cache2.get(KEY_VAL).equals(KEY_VAL); + assertNotSame(cache2, cache2New); + + assert cache2New.get(KEY_VAL).equals(KEY_VAL); cache0.put(KEY_VAL, KEY_VAL + "recreated"); assert cache0.get(KEY_VAL).equals(KEY_VAL + "recreated"); assert cache1.get(KEY_VAL).equals(KEY_VAL + "recreated"); - assert cache2.get(KEY_VAL).equals(KEY_VAL + "recreated"); + assert cache2New.get(KEY_VAL).equals(KEY_VAL + "recreated"); } /** @@ -531,9 +573,16 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { IgniteCache<String, String> cache0 = grid(0).cache(CACHE_NAME_CLIENT); IgniteCache<String, String> cache1 = grid(1).cache(CACHE_NAME_CLIENT); - assert cache0.get(KEY_VAL) == null || cache0.get(KEY_VAL).equals(curVal); - assert cache1.get(KEY_VAL) == null || cache1.get(KEY_VAL).equals(curVal); - assert cache2.get(KEY_VAL) == null || cache2.get(KEY_VAL).equals(curVal); + if (i == 0) { + assert cache0.get(KEY_VAL) == null; + assert cache1.get(KEY_VAL) == null; + assert cache2.get(KEY_VAL) == null; + } + else { + assert cache0.get(KEY_VAL).equals(curVal); + assert cache1.get(KEY_VAL).equals(curVal); + assert cache2.get(KEY_VAL).equals(curVal); + } curVal = KEY_VAL + curVal; @@ -556,17 +605,17 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { public void testNearClose() throws Exception { IgniteCache<String, String> cache0 = grid(0).getOrCreateCache(getNearConfig()); - //GridDhtTxPrepareRequest requests to Client node will be counted. + // GridDhtTxPrepareRequest requests to Client node will be counted. CountingTxRequestsToClientNodeTcpCommunicationSpi.nodeFilter = grid(2).context().localNodeId(); - //Near Close from client node. + // Near Close from client node. IgniteCache<String, String> cache1 = grid(1).cache(CACHE_NAME_NEAR); IgniteCache<String, String> cache2 = grid(2).createNearCache(CACHE_NAME_NEAR, new NearCacheConfiguration()); assert cache2.get(KEY_VAL) == null; - //Subscribing to events. + // Subscribing to events. cache2.put(KEY_VAL, KEY_VAL); CountingTxRequestsToClientNodeTcpCommunicationSpi.cnt.set(0); @@ -584,12 +633,12 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { CountingTxRequestsToClientNodeTcpCommunicationSpi.cnt.set(0); - //Should not produce messages to client node. + // Should not produce messages to client node. cache0.put(KEY_VAL, KEY_VAL + 0); U.sleep(1000); - //Ensure near cache was NOT automatically updated. + // Ensure near cache was NOT automatically updated. assert CountingTxRequestsToClientNodeTcpCommunicationSpi.cnt.get() == 0; assert cache0.get(KEY_VAL).equals(KEY_VAL + 0);// Not affected. @@ -597,26 +646,29 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { try { cache2.get(KEY_VAL);// Affected. + assert false; } catch (IllegalArgumentException | IllegalStateException ignored) { // No-op } - //Near Creation from client node after closed. + // Near Creation from client node after closed. - cache2 = grid(2).createNearCache(CACHE_NAME_NEAR, new NearCacheConfiguration()); + IgniteCache<String, String> cache2New = grid(2).createNearCache(CACHE_NAME_NEAR, new NearCacheConfiguration()); - //Subscribing to events. - cache2.put(KEY_VAL, KEY_VAL); + assertNotSame(cache2, cache2New); - assert cache2.localPeek(KEY_VAL).equals(KEY_VAL); + // Subscribing to events. + cache2New.put(KEY_VAL, KEY_VAL); + + assert cache2New.localPeek(KEY_VAL).equals(KEY_VAL); cache0.put(KEY_VAL, KEY_VAL + "recreated"); assert cache0.get(KEY_VAL).equals(KEY_VAL + "recreated"); assert cache1.get(KEY_VAL).equals(KEY_VAL + "recreated"); - assert cache2.localPeek(KEY_VAL).equals(KEY_VAL + "recreated"); + assert cache2New.localPeek(KEY_VAL).equals(KEY_VAL + "recreated"); } /** @@ -629,8 +681,10 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { grid(0).getOrCreateCache(getNearConfig()); + NearCacheConfiguration nearCfg = new NearCacheConfiguration(); + for (int i = 0; i < 3; i++) { - try (IgniteCache<String, String> cache2 = grid(2).getOrCreateNearCache(CACHE_NAME_NEAR, new NearCacheConfiguration())) { + try (IgniteCache<String, String> cache2 = grid(2).getOrCreateNearCache(CACHE_NAME_NEAR, nearCfg)) { IgniteCache<String, String> cache0 = grid(0).cache(CACHE_NAME_NEAR); IgniteCache<String, String> cache1 = grid(1).cache(CACHE_NAME_NEAR); @@ -670,23 +724,26 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { assert grid(0).cache(CACHE_NAME_LOC).get(KEY_VAL).equals(KEY_VAL + 0); assert grid(1).cache(CACHE_NAME_LOC).get(KEY_VAL).equals(KEY_VAL + 1); - //Local close. Same as Local destroy. + // Local close. Same as Local destroy. grid(1).cache(CACHE_NAME_LOC).close(); try { grid(0).cache(CACHE_NAME_LOC).get(KEY_VAL); - assert false; + + fail(); } catch (IllegalArgumentException | IllegalStateException ignored0) { try { grid(1).cache(CACHE_NAME_LOC).get(KEY_VAL); - assert false; + + fail(); } catch (IllegalArgumentException | IllegalStateException ignored1) { try { grid(2).cache(CACHE_NAME_LOC).get(KEY_VAL); - assert false; + + fail(); } catch (IllegalArgumentException | IllegalStateException ignored2) { // No-op @@ -694,7 +751,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { } } - //Local creation after closed. + // Local creation after closed. grid(0).getOrCreateCache(getLocalConfig()); @@ -742,7 +799,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { * * @throws Exception If failed. */ - public void _testConcurrentCloseSetWithTry() throws Exception { + public void testConcurrentCloseSetWithTry() throws Exception { final AtomicInteger a1 = new AtomicInteger(); final AtomicInteger a2 = new AtomicInteger(); final AtomicInteger a3 = new AtomicInteger(); @@ -750,21 +807,29 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { Thread t1 = new Thread(new Runnable() { @Override public void run() { + Thread.currentThread().setName("test-thread-1"); + closeWithTry(a1, 0); } }); Thread t2 = new Thread(new Runnable() { @Override public void run() { + Thread.currentThread().setName("test-thread-2"); + closeWithTry(a2, 0); } }); Thread t3 = new Thread(new Runnable() { @Override public void run() { + Thread.currentThread().setName("test-thread-3"); + closeWithTry(a3, 2); } }); Thread t4 = new Thread(new Runnable() { @Override public void run() { + Thread.currentThread().setName("test-thread-4"); + closeWithTry(a4, 2); } }); @@ -778,9 +843,12 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { t3.start(); t4.start(); - U.sleep(1000); - - stop = true; + try { + U.sleep(1000); + } + finally { + stop = true; + } t1.join(); t2.join(); @@ -835,10 +903,12 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { cache.close(); + cache.close(); + try { cache.get("key"); - assert false; + fail(); } catch (IllegalStateException e) { // No-op;