# ignite-929 close does not destroy cache (cherry picked from commit e3fba88)
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a233fa00 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a233fa00 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a233fa00 Branch: refs/heads/ignite-gg-9615 Commit: a233fa00fcfb1266acdecfec45b7ac6024cc9791 Parents: 3dcf891 Author: sboikov <sboi...@gridgain.com> Authored: Fri Jul 10 10:20:11 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Jul 10 10:48:44 2015 +0300 ---------------------------------------------------------------------- .../examples/ScalarCacheAffinityExample.scala | 2 +- .../scalar/examples/ScalarCacheExample.scala | 2 +- .../ScalarCachePopularNumbersExample.scala | 2 +- .../examples/ScalarCacheQueryExample.scala | 2 +- .../examples/ScalarSnowflakeSchemaExample.scala | 4 +- .../java/org/apache/ignite/IgniteCache.java | 14 +- .../org/apache/ignite/cache/CacheManager.java | 13 +- .../apache/ignite/internal/IgniteKernal.java | 2 +- .../discovery/GridDiscoveryManager.java | 23 +- .../cache/DynamicCacheChangeRequest.java | 39 +- .../processors/cache/GridCacheGateway.java | 4 +- .../GridCachePartitionExchangeManager.java | 6 +- .../processors/cache/GridCacheProcessor.java | 102 ++- .../processors/cache/IgniteCacheProxy.java | 448 +++++++--- .../distributed/dht/GridDhtCacheEntry.java | 4 +- .../GridDhtPartitionsExchangeFuture.java | 30 +- .../visor/cache/VisorCacheStopTask.java | 2 +- .../affinity/IgniteClientNodeAffinityTest.java | 14 +- .../IgniteFairAffinityDynamicCacheSelfTest.java | 3 +- ...cheStoreSessionListenerAbstractSelfTest.java | 111 ++- .../GridCacheTxLoadFromStoreOnLockSelfTest.java | 34 +- .../CacheMetricsForClusterGroupSelfTest.java | 10 +- .../cache/CacheOffheapMapEntrySelfTest.java | 7 +- .../cache/CacheStopAndDestroySelfTest.java | 859 +++++++++++++++++++ ...eUsageMultinodeDynamicStartAbstractTest.java | 2 +- ...ProjectionForCachesOnDaemonNodeSelfTest.java | 2 +- .../cache/IgniteDynamicCacheStartSelfTest.java | 140 +-- ...teCacheClientNodePartitionsExchangeTest.java | 29 +- ...CacheLocalOffHeapAndSwapMetricsSelfTest.java | 2 +- .../DataStreamerMultinodeCreateCacheTest.java | 14 +- .../testsuites/IgniteCacheTestSuite4.java | 2 + .../CacheConfigurationP2PTestClient.java | 4 +- 32 files changed, 1593 insertions(+), 339 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala index fbf66bc..40b947d 100644 --- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala @@ -62,7 +62,7 @@ object ScalarCacheAffinityExample extends App { visitUsingMapKeysToNodes(cache) } finally { - cache.close() + cache.destroy() } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala index 42e8ca4..0bf8d6f 100644 --- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala @@ -50,7 +50,7 @@ object ScalarCacheExample extends App { basicOperations() } finally { - cache.close() + cache.destroy() } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala index 828c5a3..d113297 100644 --- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala @@ -93,7 +93,7 @@ object ScalarCachePopularNumbersExample extends App { } } finally { - cache.close() + cache.destroy() } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala index b8054eb..1a42947 100644 --- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala @@ -55,7 +55,7 @@ object ScalarCacheQueryExample { example(ignite$) } finally { - cache.close() + cache.destroy() } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala index 2656f44..33b2fcc 100644 --- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala @@ -86,11 +86,11 @@ object ScalarSnowflakeSchemaExample { queryProductPurchases() } finally { - factCache.close() + factCache.destroy() } } finally { - dimCache.close() + dimCache.destroy() } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index c8d6d7a..4938ab1 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -543,9 +543,21 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS CacheEntryProcessor<K, V, T> entryProcessor, Object... args); /** + * Closes this cache instance. + * <p> + * For local cache equivalent to {@link #destroy()}. + * For distributed caches, if called on clients, stops client cache, if called on a server node, + * just closes this cache instance and does not destroy cache data. + * <p> + * After cache instance is closed another {@link IgniteCache} instance for the same + * cache can be created using {@link Ignite#cache(String)} method. + */ + @Override public void close(); + + /** * Completely deletes the cache with all its data from the system on all cluster nodes. */ - @Override void close(); + public void destroy(); /** * This cache node to re-balance its partitions. This method is usually used when http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java index 9ba50d1..bc6df76 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java @@ -130,6 +130,7 @@ public class CacheManager implements javax.cache.CacheManager { } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public <K, V, C extends Configuration<K, V>> Cache<K, V> createCache(String cacheName, C cacheCfg) throws IllegalArgumentException { kernalGateway.readLock(); @@ -155,11 +156,11 @@ public class CacheManager implements javax.cache.CacheManager { IgniteCache<K, V> res = ignite.createCache(igniteCacheCfg); - ((IgniteCacheProxy<K, V>)res).setCacheManager(this); - if (res == null) throw new CacheException(); + ((IgniteCacheProxy<K, V>)res).setCacheManager(this); + if (igniteCacheCfg.isManagementEnabled()) enableManagement(cacheName, true); @@ -219,6 +220,7 @@ public class CacheManager implements javax.cache.CacheManager { /** * @param cacheName Cache name. + * @return Cache. */ @Nullable private <K, V> IgniteCache<K, V> getCache0(String cacheName) { if (cacheName == null) @@ -272,11 +274,13 @@ public class CacheManager implements javax.cache.CacheManager { } if (cache != null) - cache.close(); + cache.destroy(); } /** * @param cacheName Cache name. + * @param objName Object name. + * @return Object name instance. */ private ObjectName getObjectName(String cacheName, String objName) { String mBeanName = "javax.cache:type=" + objName + ",CacheManager=" @@ -339,7 +343,8 @@ public class CacheManager implements javax.cache.CacheManager { /** * @param mxbean MXBean. - * @param name cache name. + * @param name Cache name. + * @param beanType Bean type. */ private void registerCacheObject(Object mxbean, String name, String beanType) { MBeanServer mBeanSrv = ignite.configuration().getMBeanServer(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index d6ddf79..024dc7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2436,7 +2436,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { IgniteInternalFuture<?> stopFut; try { - stopFut = ctx.cache().dynamicStopCache(cacheName); + stopFut = ctx.cache().dynamicDestroyCache(cacheName); } finally { unguard(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index a8ce8ff..eae07ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -263,6 +263,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * Removes near node ID from cache filter. + * + * @param cacheName Cache name. + * @param clientNodeId Near node ID. + */ + public void onClientCacheClose(String cacheName, UUID clientNodeId) { + CachePredicate predicate = registeredCaches.get(cacheName); + + if (predicate != null) + predicate.onNodeLeft(clientNodeId); + } + + /** * @return Client nodes map. */ public Map<String, Map<UUID, Boolean>> clientNodesMap() { @@ -1079,9 +1092,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if node for given ID is alive. */ public boolean alive(UUID nodeId) { + return getAlive(nodeId) != null; + } + + /** + * @param nodeId Node ID. + * @return Node if node is alive. + */ + @Nullable public ClusterNode getAlive(UUID nodeId) { assert nodeId != null; - return getSpi().getNode(nodeId) != null; // Go directly to SPI without checking disco cache. + return getSpi().getNode(nodeId); // Go directly to SPI without checking disco cache. } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index c08a179..7af1572 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -57,6 +57,9 @@ public class DynamicCacheChangeRequest implements Serializable { /** Stop flag. */ private boolean stop; + /** Close flag. */ + private boolean close; + /** Fail if exists flag. */ private boolean failIfExists; @@ -68,23 +71,10 @@ public class DynamicCacheChangeRequest implements Serializable { * * @param cacheName Cache stop name. * @param initiatingNodeId Initiating node ID. - * @param stop Stop flag. */ - public DynamicCacheChangeRequest(String cacheName, UUID initiatingNodeId, boolean stop) { + public DynamicCacheChangeRequest(String cacheName, UUID initiatingNodeId) { this.cacheName = cacheName; this.initiatingNodeId = initiatingNodeId; - - this.stop = stop; - } - - /** - * Constructor means for start requests. - * - * @param cacheName Cache name. - * @param initiatingNodeId Initiating node ID. - */ - public DynamicCacheChangeRequest(String cacheName, UUID initiatingNodeId) { - this(cacheName, initiatingNodeId, false); } /** @@ -130,6 +120,13 @@ public class DynamicCacheChangeRequest implements Serializable { } /** + * @param stop New stop flag. + */ + public void stop(boolean stop) { + this.stop = stop; + } + + /** * @return Cache name. */ public String cacheName() { @@ -220,6 +217,20 @@ public class DynamicCacheChangeRequest implements Serializable { this.failIfExists = failIfExists; } + /** + * @return Close flag. + */ + public boolean close() { + return close; + } + + /** + * @param close New close flag. + */ + public void close(boolean close) { + this.close = close; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheChangeRequest.class, this, "cacheName", cacheName()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index d9d151c..f2beb0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -68,7 +68,7 @@ public class GridCacheGateway<K, V> { * * @return {@code True} if enter successful, {@code false} if the cache or the node was stopped. */ - public boolean enterIfNotClosed() { + public boolean enterIfNotStopped() { onEnter(); // Must unlock in case of unexpected errors to avoid @@ -89,7 +89,7 @@ public class GridCacheGateway<K, V> { * * @return {@code True} if enter successful, {@code false} if the cache or the node was stopped. */ - public boolean enterIfNotClosedNoLock() { + public boolean enterIfNotStoppedNoLock() { onEnter(); return !stopped; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index af87685..4398b4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -156,16 +156,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana // Validate requests to check if event should trigger partition exchange. for (DynamicCacheChangeRequest req : batch.requests()) { - if (cctx.cache().dynamicCacheRegistered(req)) + if (cctx.cache().exchangeNeeded(req)) valid.add(req); else cctx.cache().completeStartFuture(req); } if (!F.isEmpty(valid)) { - exchId = exchangeId(n.id(), - affinityTopologyVersion(e), - e.type()); + exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type()); exchFut = exchangeFuture(exchId, e, valid); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index de1eac2..bb87a86 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1390,10 +1390,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return {@code True} if change request was registered to apply. */ @SuppressWarnings("IfMayBeConditional") - public boolean dynamicCacheRegistered(DynamicCacheChangeRequest req) { + public boolean exchangeNeeded(DynamicCacheChangeRequest req) { DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); if (desc != null) { + if (req.close()) { + assert req.initiatingNodeId() != null : req; + + return true; + } + if (desc.deploymentId().equals(req.deploymentId())) { if (req.start()) return !desc.cancelled(); @@ -1515,20 +1521,26 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param req Stop request. */ public void blockGateway(DynamicCacheChangeRequest req) { - assert req.stop(); + assert req.stop() || req.close(); - // Break the proxy before exchange future is done. - IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName())); + if (req.stop() || (req.close() && req.initiatingNodeId().equals(ctx.localNodeId()))) { + // Break the proxy before exchange future is done. + IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName())); - if (proxy != null) - proxy.gate().block(); + if (proxy != null) { + if (req.stop()) + proxy.gate().block(); + else + proxy.closeProxy(); + } + } } /** * @param req Request. */ private void stopGateway(DynamicCacheChangeRequest req) { - assert req.stop(); + assert req.stop() : req; // Break the proxy before exchange future is done. IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(maskNull(req.cacheName())); @@ -1541,7 +1553,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param req Stop request. */ public void prepareCacheStop(DynamicCacheChangeRequest req) { - assert req.stop(); + assert req.stop() || req.close() : req; GridCacheAdapter<?, ?> cache = caches.remove(maskNull(req.cacheName())); @@ -1597,6 +1609,23 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (desc != null && desc.cancelled() && desc.deploymentId().equals(req.deploymentId())) registeredCaches.remove(masked, desc); } + else if (req.close() && req.initiatingNodeId().equals(ctx.localNodeId())) { + IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(masked); + + if (proxy != null) { + if (proxy.context().affinityNode()) { + GridCacheAdapter<?, ?> cache = caches.get(masked); + + if (cache != null) + jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false)); + } + else { + proxy.context().gate().onStopped(); + + prepareCacheStop(req); + } + } + } completeStartFuture(req); } @@ -2005,13 +2034,35 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * @param cacheName Cache name to stop. - * @return Future that will be completed when cache is stopped. + * @param cacheName Cache name to destroy. + * @return Future that will be completed when cache is destroyed. */ - public IgniteInternalFuture<?> dynamicStopCache(String cacheName) { + public IgniteInternalFuture<?> dynamicDestroyCache(String cacheName) { checkEmptyTransactions(); - DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId(), true); + DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId()); + + t.stop(true); + + return F.first(initiateCacheChanges(F.asList(t), false)); + } + + + /** + * @param cacheName Cache name to close. + * @return Future that will be completed when cache is closed. + */ + public IgniteInternalFuture<?> dynamicCloseCache(String cacheName) { + IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(cacheName)); + + if (proxy == null || proxy.proxyClosed()) + return new GridFinishedFuture<>(); // No-op. + + checkEmptyTransactions(); + + DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId()); + + t.close(true); return F.first(initiateCacheChanges(F.asList(t), false)); } @@ -2031,16 +2082,24 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req.deploymentId(), req); try { - if (req.stop()) { + if (req.stop() || req.close()) { DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); if (desc == null) // No-op. fut.onDone(); else { + assert desc.cacheConfiguration() != null : desc; + + if (req.close() && desc.cacheConfiguration().getCacheMode() == LOCAL) { + req.close(false); + + req.stop(true); + } + IgniteUuid dynamicDeploymentId = desc.deploymentId(); - assert dynamicDeploymentId != null; + assert dynamicDeploymentId != null : desc; // Save deployment ID to avoid concurrent stops. req.deploymentId(dynamicDeploymentId); @@ -2188,9 +2247,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.nearCacheConfiguration() != null); } else { + assert req.stop() || req.close() : req; + if (desc == null) { - // If local node initiated start, fail the start future. - DynamicCacheStartFuture changeFut = (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName())); + // If local node initiated start, finish future. + DynamicCacheStartFuture changeFut = + (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName())); if (changeFut != null && changeFut.deploymentId().equals(req.deploymentId())) { // No-op. @@ -2200,9 +2262,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { return; } - desc.onCancelled(); + if (req.stop()) { + desc.onCancelled(); - ctx.discovery().removeCacheFilter(req.cacheName()); + ctx.discovery().removeCacheFilter(req.cacheName()); + } + else + ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId()); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index b31b2e8..9767f49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -18,8 +18,8 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; -import org.apache.ignite.cache.CacheManager; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.CacheManager; import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; @@ -171,19 +171,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public CacheMetrics metrics() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return ctx.cache().metrics(); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public CacheMetrics metrics(ClusterGroup grp) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size()); @@ -202,19 +206,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return new CacheMetricsSnapshot(ctx.cache().metrics(), metrics); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public CacheMetricsMXBean mxBean() { - CacheOperationContext prev = gate.enter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return ctx.cache().mxBean(); } finally { - gate.leave(prev); + onLeave(gate, prev); } } @@ -230,19 +236,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Nullable @Override public Cache.Entry<K, V> randomEntry() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return ctx.cache().randomEntry(); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { CacheOperationContext prj0 = opCtx != null ? opCtx.withExpiryPolicy(plc) : @@ -251,7 +261,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return new IgniteCacheProxy<>(ctx, delegate, prj0, isAsync(), lock); } finally { - onLeave(prev); + onLeave(gate, prev); } } @@ -262,7 +272,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public IgniteCache<K, V> withNoRetries() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { boolean noRetries = opCtx != null && opCtx.noRetries(); @@ -280,14 +292,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V lock); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) @@ -296,7 +310,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V ctx.cache().globalLoadCache(p, args); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -307,7 +321,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) @@ -316,7 +332,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V delegate.localLoadCache(p, args); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -327,7 +343,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -339,7 +357,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.getAndPutIfAbsent(key, val); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -359,13 +377,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public boolean isLocalLocked(K key, boolean byCurrThread) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return byCurrThread ? delegate.isLockedByThread(key) : delegate.isLocked(key); } finally { - onLeave(prev); + onLeave(gate, prev); } } @@ -379,7 +399,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V final CacheQuery<Map.Entry<K,V>> qry; final CacheQueryFuture<Map.Entry<K,V>> fut; - boolean isKeepPortable = opCtx != null ? opCtx.isKeepPortable() : false; + boolean isKeepPortable = opCtx != null && opCtx.isKeepPortable(); if (filter instanceof ScanQuery) { IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter(); @@ -444,11 +464,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** - * @param local Enforce local. + * @param loc Enforce local. * @return Local node cluster group. */ - private ClusterGroup projection(boolean local) { - if (local || ctx.isLocal() || isReplicatedDataNode()) + private ClusterGroup projection(boolean loc) { + if (loc || ctx.isLocal() || isReplicatedDataNode()) return ctx.kernalContext().grid().cluster().forLocal(); if (ctx.isReplicated()) @@ -517,7 +537,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V @Override public <R> QueryCursor<R> query(Query<R> qry) { A.notNull(qry, "qry"); - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -558,7 +580,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw new CacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } @@ -589,7 +611,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public Iterable<Cache.Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return delegate.localEntries(peekModes); @@ -598,37 +622,43 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public QueryMetrics queryMetrics() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return delegate.context().queries().metrics(); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void localEvict(Collection<? extends K> keys) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { delegate.evictAll(keys); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return delegate.localPeek(key, peekModes, null); @@ -637,20 +667,22 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void localPromote(Set<? extends K> keys) throws CacheException { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { delegate.promoteAll(keys); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -660,7 +692,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public int size(CachePeekMode... peekModes) throws CacheException { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -675,13 +709,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public int localSize(CachePeekMode... peekModes) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return delegate.localSize(peekModes); @@ -690,14 +726,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public V get(K key) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -709,7 +747,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.get(key); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -720,7 +758,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public Map<K, V> getAll(Set<? extends K> keys) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -732,7 +772,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.getAll(keys); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -743,7 +783,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -755,7 +797,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.getAllOutTx(keys); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -769,7 +811,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V */ public Map<K, V> getAll(Collection<? extends K> keys) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -781,7 +825,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.getAll(keys); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -796,19 +840,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @return Entry set. */ public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return delegate.entrySetx(filter); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public boolean containsKey(K key) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -820,13 +868,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.containsKey(key); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public boolean containsKeys(Set<? extends K> keys) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -838,7 +888,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.containsKeys(keys); } finally { - onLeave(prev); + onLeave(gate, prev); } } @@ -848,7 +898,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V boolean replaceExisting, @Nullable final CompletionListener completionLsnr ) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { IgniteInternalFuture<?> fut = ctx.cache().loadAll(keys, replaceExisting); @@ -869,14 +921,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void put(K key, V val) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -896,7 +950,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V delegate.put(key, val); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -907,7 +961,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public V getAndPut(K key, V val) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -919,7 +975,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.getAndPut(key, val); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -930,7 +986,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public void putAll(Map<? extends K, ? extends V> map) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) @@ -939,7 +997,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V delegate.putAll(map); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -950,7 +1008,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public boolean putIfAbsent(K key, V val) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -962,7 +1022,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.putIfAbsent(key, val); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -973,7 +1033,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public boolean remove(K key) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -985,7 +1047,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.remove(key); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -996,7 +1058,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public boolean remove(K key, V oldVal) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1008,7 +1072,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.remove(key, oldVal); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1019,7 +1083,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public V getAndRemove(K key) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1031,7 +1097,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.getAndRemove(key); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1042,7 +1108,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public boolean replace(K key, V oldVal, V newVal) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1054,7 +1122,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.replace(key, oldVal, newVal); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1065,7 +1133,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public boolean replace(K key, V val) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1077,7 +1147,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.replace(key, val); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1088,7 +1158,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public V getAndReplace(K key, V val) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1100,7 +1172,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.getAndReplace(key, val); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1111,7 +1183,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public void removeAll(Set<? extends K> keys) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) @@ -1120,7 +1194,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V delegate.removeAll(keys); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1130,7 +1204,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public void removeAll() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) @@ -1142,13 +1218,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void clear(K key) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) @@ -1160,13 +1238,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void clearAll(Set<? extends K> keys) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) @@ -1178,13 +1258,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void clear() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) @@ -1196,32 +1278,36 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void localClear(K key) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { delegate.clearLocally(key); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void localClearAll(Set<? extends K> keys) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { for (K key : keys) delegate.clearLocally(key); } finally { - onLeave(prev); + onLeave(gate, prev); } } @@ -1229,7 +1315,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws EntryProcessorException { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1255,7 +1343,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1267,7 +1355,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... args) throws EntryProcessorException { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1293,7 +1383,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1303,10 +1393,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, - EntryProcessor<K, V, T> entryProcessor, - Object... args) { + EntryProcessor<K, V, T> entryProcessor, + Object... args) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1318,7 +1410,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.invokeAll(keys, entryProcessor, args); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1331,7 +1423,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V CacheEntryProcessor<K, V, T> entryProcessor, Object... args) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1343,7 +1437,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.invokeAll(keys, entryProcessor, args); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1356,7 +1450,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) { try { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { if (isAsync()) { @@ -1368,7 +1464,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return delegate.invokeAll(map, args); } finally { - onLeave(prev); + onLeave(gate, prev); } } catch (IgniteCheckedException e) { @@ -1394,17 +1490,43 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public void destroy() { + GridCacheGateway<K, V> gate = this.gate; + + if (!onEnterIfNoStop(gate)) + return; + + IgniteInternalFuture<?> fut; + + try { + fut = ctx.kernalContext().cache().dynamicDestroyCache(ctx.name()); + } + finally { + onLeave(gate); + } + + try { + fut.get(); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ @Override public void close() { - if (!onEnterIfNoClose()) + GridCacheGateway<K, V> gate = this.gate; + + if (!onEnterIfNoStop(gate)) return; IgniteInternalFuture<?> fut; try { - fut = ctx.kernalContext().cache().dynamicStopCache(ctx.name()); + fut = ctx.kernalContext().cache().dynamicCloseCache(ctx.name()); } finally { - onLeave(); + onLeave(gate); } try { @@ -1417,14 +1539,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public boolean isClosed() { - if (!onEnterIfNoClose()) + GridCacheGateway<K, V> gate = this.gate; + + if (!onEnterIfNoStop(gate)) return true; try { return ctx.kernalContext().cache().context().closed(ctx); } finally { - onLeave(); + onLeave(gate); } } @@ -1448,7 +1572,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false); @@ -1457,13 +1583,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { ctx.continuousQueries().cancelJCacheQuery(lsnrCfg); @@ -1472,19 +1600,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw cacheException(e); } finally { - onLeave(prev); + onLeave(gate, prev); } } /** {@inheritDoc} */ @Override public Iterator<Cache.Entry<K, V>> iterator() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { return ctx.cache().igniteIterator(); } finally { - onLeave(prev); + onLeave(gate, prev); } } @@ -1516,8 +1646,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * * @return Projection for portable objects. */ + @SuppressWarnings("unchecked") public <K1, V1> IgniteCache<K1, V1> keepPortable() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { CacheOperationContext opCtx0 = @@ -1535,7 +1668,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V lock); } finally { - onLeave(prev); + onLeave(gate, prev); } } @@ -1543,7 +1676,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @return Cache with skip store enabled. */ public IgniteCache<K, V> skipStore() { - CacheOperationContext prev = onEnter(opCtx); + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); try { boolean skip = opCtx != null && opCtx.skipStore(); @@ -1565,7 +1700,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V lock); } finally { - onLeave(prev); + onLeave(gate, prev); } } @@ -1592,10 +1727,69 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** + * @return {@code True} if proxy was closed. + */ + public boolean proxyClosed() { + return !gate.getClass().equals(GridCacheGateway.class); + } + + /** + * Closes this proxy instance. + */ + public void closeProxy() { + gate = new GridCacheGateway<K, V>(ctx) { + @Override public void enter() { + throw new IllegalStateException("Cache has been closed: " + ctx.name()); + } + + @Override public boolean enterIfNotStopped() { + return false; + } + + @Override public boolean enterIfNotStoppedNoLock() { + return false; + } + + @Override public void leaveNoLock() { + assert false; + } + + @Override public void leave() { + assert false; + } + + @Nullable @Override public CacheOperationContext enter(@Nullable CacheOperationContext opCtx) { + throw new IllegalStateException("Cache has been closed: " + ctx.name()); + } + + @Nullable @Override public CacheOperationContext enterNoLock(@Nullable CacheOperationContext opCtx) { + throw new IllegalStateException("Cache has been closed: " + ctx.name()); + } + + @Override public void leave(CacheOperationContext prev) { + assert false; + } + + @Override public void leaveNoLock(CacheOperationContext prev) { + assert false; + } + + @Override public void block() { + // No-op. + } + + @Override public void onStopped() { + // No-op. + } + }; + } + + /** + * @param gate Cache gateway. * @param opCtx Cache operation context to guard. * @return Previous projection set on this thread. */ - private CacheOperationContext onEnter(CacheOperationContext opCtx) { + private CacheOperationContext onEnter(GridCacheGateway<K, V> gate, CacheOperationContext opCtx) { if (lock) return gate.enter(opCtx); else @@ -1603,21 +1797,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** - * On enter. - * + * @param gate Cache gateway. * @return {@code True} if enter successful. */ - private boolean onEnterIfNoClose() { + private boolean onEnterIfNoStop(GridCacheGateway<K, V> gate) { if (lock) - return gate.enterIfNotClosed(); + return gate.enterIfNotStopped(); else - return gate.enterIfNotClosedNoLock(); + return gate.enterIfNotStoppedNoLock(); } /** + * @param gate Cache gateway. * @param opCtx Operation context to guard. */ - private void onLeave(CacheOperationContext opCtx) { + private void onLeave(GridCacheGateway<K, V> gate, CacheOperationContext opCtx) { if (lock) gate.leave(opCtx); else @@ -1625,9 +1819,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** - * On leave. + * @param gate Cache gateway. */ - private void onLeave() { + private void onLeave(GridCacheGateway<K, V> gate) { if (lock) gate.leave(); else http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 89b85c4..3b411b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -597,7 +597,9 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { List<ReaderId> newRdrs = null; for (int i = 0; i < rdrs.length; i++) { - if (!cctx.discovery().alive(rdrs[i].nodeId())) { + ClusterNode node = cctx.discovery().getAlive(rdrs[i].nodeId()); + + if (node == null || !cctx.discovery().cacheNode(node, cacheName())) { // Node has left and if new list has already been created, just skip. // Otherwise, create new list and add alive nodes. if (newRdrs == null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 38a0d55..5701749 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -474,6 +474,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT oldestNode.set(oldest); + if (!F.isEmpty(reqs)) + blockGateways(); + startCaches(); // True if client node joined or failed. @@ -489,24 +492,25 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT else { assert discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt; - boolean clientOnlyStart = true; + boolean clientOnlyCacheEvt = true; for (DynamicCacheChangeRequest req : reqs) { - if (!req.clientStartOnly()) { - clientOnlyStart = false; + if (req.clientStartOnly() || req.close()) + continue; - break; - } + clientOnlyCacheEvt = false; + + break; } - clientNodeEvt = clientOnlyStart; + clientNodeEvt = clientOnlyCacheEvt; } if (clientNodeEvt) { ClusterNode node = discoEvt.eventNode(); // Client need to initialize affinity for local join event or for stated client caches. - if (!node.isLocal()) { + if (!node.isLocal() || clientCacheClose()) { for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; @@ -733,9 +737,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (log.isDebugEnabled()) log.debug("After waiting for partition release future: " + this); - if (!F.isEmpty(reqs)) - blockGateways(); - if (exchId.isLeft()) cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion()); @@ -839,6 +840,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** + * @return {@code True} if exchange initiated for client cache close. + */ + private boolean clientCacheClose() { + return reqs != null && reqs.size() == 1 && reqs.iterator().next().close(); + } + + /** * */ private void dumpPendingObjects() { @@ -903,7 +911,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT */ private void blockGateways() { for (DynamicCacheChangeRequest req : reqs) { - if (req.stop()) + if (req.stop() || req.close()) cctx.cache().blockGateway(req); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStopTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStopTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStopTask.java index 0e848f9..83d19f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStopTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStopTask.java @@ -56,7 +56,7 @@ public class VisorCacheStopTask extends VisorOneNodeTask<String, Void> { @Override protected Void run(String cacheName) { IgniteCache cache = ignite.cache(cacheName); - cache.close(); + cache.destroy(); return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java index 467349f..da27fb2 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java @@ -127,13 +127,23 @@ public class IgniteClientNodeAffinityTest extends GridCommonAbstractTest { ccfg.setNodeFilter(new TestNodesFilter()); - try (IgniteCache<Integer, Integer> cache = client.createCache(ccfg)) { + IgniteCache<Integer, Integer> cache = client.createCache(ccfg); + + try { checkCache(null, 1); } + finally { + cache.destroy(); + } - try (IgniteCache<Integer, Integer> cache = client.createCache(ccfg, new NearCacheConfiguration())) { + cache = client.createCache(ccfg, new NearCacheConfiguration()); + + try { checkCache(null, 1); } + finally { + cache.destroy(); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java index 18b77e0..e51be58 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java @@ -84,8 +84,7 @@ public class IgniteFairAffinityDynamicCacheSelfTest extends GridCommonAbstractTe cache.put(i, i); IgniteInternalFuture<Object> destFut = GridTestUtils.runAsync(new Callable<Object>() { - @Override - public Object call() throws Exception { + @Override public Object call() throws Exception { ignite(0).destroyCache(cache.getName()); return null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java index 0634197..8e53f05 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java @@ -113,12 +113,17 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm public void testAtomicCache() throws Exception { CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.ATOMIC); - try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) { + IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg); + + try { cache.loadCache(null); cache.get(1); cache.put(1, 1); cache.remove(1); } + finally { + cache.destroy(); + } assertEquals(3, loadCacheCnt.get()); assertEquals(1, loadCnt.get()); @@ -133,12 +138,17 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm public void testTransactionalCache() throws Exception { CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.TRANSACTIONAL); - try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) { + IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg); + + try { cache.loadCache(null); cache.get(1); cache.put(1, 1); cache.remove(1); } + finally { + cache.destroy(); + } assertEquals(3, loadCacheCnt.get()); assertEquals(1, loadCnt.get()); @@ -153,15 +163,18 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm public void testExplicitTransaction() throws Exception { CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.TRANSACTIONAL); - try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) { - try (Transaction tx = ignite(0).transactions().txStart()) { - cache.put(1, 1); - cache.put(2, 2); - cache.remove(3); - cache.remove(4); + IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg); - tx.commit(); - } + try (Transaction tx = ignite(0).transactions().txStart()) { + cache.put(1, 1); + cache.put(2, 2); + cache.remove(3); + cache.remove(4); + + tx.commit(); + } + finally { + cache.destroy(); } assertEquals(2, writeCnt.get()); @@ -176,18 +189,20 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL); CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL); - try ( - IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1); - IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2) - ) { - try (Transaction tx = ignite(0).transactions().txStart()) { - cache1.put(1, 1); - cache2.put(2, 2); - cache1.remove(3); - cache2.remove(4); - - tx.commit(); - } + IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1); + IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2); + + try (Transaction tx = ignite(0).transactions().txStart()) { + cache1.put(1, 1); + cache2.put(2, 2); + cache1.remove(3); + cache2.remove(4); + + tx.commit(); + } + finally { + cache1.destroy(); + cache2.destroy(); } assertEquals(2, writeCnt.get()); @@ -204,16 +219,18 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL); CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL); - try ( - IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1); - IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2) - ) { - try (Transaction tx = ignite(0).transactions().txStart()) { - cache1.put(1, 1); - cache2.put(2, 2); + IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1); + IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2); - tx.commit(); - } + try (Transaction tx = ignite(0).transactions().txStart()) { + cache1.put(1, 1); + cache2.put(2, 2); + + tx.commit(); + } + finally { + cache1.destroy(); + cache2.destroy(); } try (Connection conn = DriverManager.getConnection(URL)) { @@ -232,25 +249,27 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL); CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL); - try ( - IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1); - IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2) - ) { - try (Transaction tx = ignite(0).transactions().txStart()) { - cache1.put(1, 1); - cache2.put(2, 2); + IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1); + IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2); - tx.commit(); + try (Transaction tx = ignite(0).transactions().txStart()) { + cache1.put(1, 1); + cache2.put(2, 2); - assert false : "Exception was not thrown."; - } - catch (IgniteException e) { - CacheWriterException we = X.cause(e, CacheWriterException.class); + tx.commit(); - assertNotNull(we); + assert false : "Exception was not thrown."; + } + catch (IgniteException e) { + CacheWriterException we = X.cause(e, CacheWriterException.class); + + assertNotNull(we); - assertEquals("Expected failure.", we.getMessage()); - } + assertEquals("Expected failure.", we.getMessage()); + } + finally { + cache1.destroy(); + cache2.destroy(); } try (Connection conn = DriverManager.getConnection(URL)) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java index 7b01f0f..bc6b443 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java @@ -92,31 +92,33 @@ public class GridCacheTxLoadFromStoreOnLockSelfTest extends GridCommonAbstractTe cacheCfg.setBackups(backups); cacheCfg.setLoadPreviousValue(true); - try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cacheCfg)) { - for (int i = 0; i < 10; i++) - assertEquals((Integer)i, cache.get(i)); + IgniteCache<Integer, Integer> cache = ignite(0).createCache(cacheCfg); - cache.removeAll(); + for (int i = 0; i < 10; i++) + assertEquals((Integer)i, cache.get(i)); - assertEquals(0, cache.size()); + cache.removeAll(); - for (TransactionConcurrency conc : TransactionConcurrency.values()) { - for (TransactionIsolation iso : TransactionIsolation.values()) { - info("Checking transaction [conc=" + conc + ", iso=" + iso + ']'); + assertEquals(0, cache.size()); - try (Transaction tx = ignite(0).transactions().txStart(conc, iso)) { - for (int i = 0; i < 10; i++) - assertEquals("Invalid value for transaction [conc=" + conc + ", iso=" + iso + ']', - (Integer)i, cache.get(i)); + for (TransactionConcurrency conc : TransactionConcurrency.values()) { + for (TransactionIsolation iso : TransactionIsolation.values()) { + info("Checking transaction [conc=" + conc + ", iso=" + iso + ']'); - tx.commit(); - } + try (Transaction tx = ignite(0).transactions().txStart(conc, iso)) { + for (int i = 0; i < 10; i++) + assertEquals("Invalid value for transaction [conc=" + conc + ", iso=" + iso + ']', + (Integer)i, cache.get(i)); - cache.removeAll(); - assertEquals(0, cache.size()); + tx.commit(); } + + cache.removeAll(); + assertEquals(0, cache.size()); } } + + cache.destroy(); } /**