IGNITE-45 - getOrCreate methods.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6cb8b4db Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6cb8b4db Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6cb8b4db Branch: refs/heads/ignite-541 Commit: 6cb8b4db2dbbca83e8e60eef854cfc042e38a929 Parents: 1a91054 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Fri Mar 20 18:26:43 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Fri Mar 20 18:26:43 2015 -0700 ---------------------------------------------------------------------- .../cache/DynamicCacheChangeRequest.java | 14 +++- .../processors/cache/GridCacheProcessor.java | 68 ++++++++++++----- .../GridDhtPartitionsExchangeFuture.java | 8 +- .../cache/IgniteDynamicCacheStartSelfTest.java | 77 ++++++++++++++++++-- 4 files changed, 131 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6cb8b4db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index be88217..5763a36 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -51,13 +51,19 @@ public class DynamicCacheChangeRequest implements Serializable { /** Start only client cache, do not start data nodes. */ private boolean clientStartOnly; + /** Stop flag. */ + private boolean stop; + /** * Constructor creates cache stop request. * * @param cacheName Cache stop name. */ - public DynamicCacheChangeRequest(String cacheName) { + public DynamicCacheChangeRequest(String cacheName, UUID initiatingNodeId) { this.cacheName = cacheName; + this.initiatingNodeId = initiatingNodeId; + + stop = true; } /** @@ -88,15 +94,15 @@ public class DynamicCacheChangeRequest implements Serializable { /** * @return {@code True} if this is a start request. */ - public boolean isStart() { + public boolean start() { return startCfg != null; } /** * @return {@code True} if this is a stop request. */ - public boolean isStop() { - return initiatingNodeId == null && startCfg == null; + public boolean stop() { + return stop; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6cb8b4db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 278e2c8..2eb030a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -963,8 +963,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Cache context. * @throws IgniteCheckedException If failed to create cache. */ - @SuppressWarnings( {"unchecked"}) + @SuppressWarnings({"unchecked"}) private GridCacheContext createCache(CacheConfiguration<?, ?> cfg, CacheObjectContext cacheObjCtx) throws IgniteCheckedException { + assert cfg != null; + CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null; validate(ctx.config(), cfg, cfgStore); @@ -1236,7 +1238,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); if (desc != null && desc.deploymentId().equals(req.deploymentId())) { - if (req.isStart()) + if (req.start()) return !desc.cancelled(); else return desc.cancelled(); @@ -1255,7 +1257,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { AffinityTopologyVersion topVer ) throws IgniteCheckedException { for (DynamicCacheChangeRequest req : reqs) { - assert req.isStart(); + assert req.start(); prepareCacheStart( req.startCacheConfiguration(), @@ -1308,6 +1310,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean affNodeStart = !clientStartOnly && nodeFilter.apply(locNode); boolean clientNodeStart = locNode.id().equals(initiatingNodeId); + if (sharedCtx.cacheContext(CU.cacheId(cfg.getName())) != null) + return; + if (affNodeStart || clientNodeStart) { if (clientNodeStart && !affNodeStart) { if (nearCfg != null) @@ -1335,7 +1340,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param req Stop request. */ public void blockGateway(DynamicCacheChangeRequest req) { - assert req.isStop(); + assert req.stop(); // Break the proxy before exchange future is done. IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(maskNull(req.cacheName())); @@ -1348,7 +1353,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param req Stop request. */ public void prepareCacheStop(DynamicCacheChangeRequest req) { - assert req.isStop(); + assert req.stop(); GridCacheAdapter<?, ?> cache = caches.remove(maskNull(req.cacheName())); @@ -1393,7 +1398,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (DynamicCacheChangeRequest req : reqs) { String masked = maskNull(req.cacheName()); - if (req.isStop()) { + if (req.stop()) { prepareCacheStop(req); DynamicCacheDescriptor desc = registeredCaches.get(masked); @@ -1407,7 +1412,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { assert req.deploymentId() != null; assert fut == null || fut.deploymentId != null; - if (fut != null && fut.deploymentId().equals(req.deploymentId())) + if (fut != null && fut.deploymentId().equals(req.deploymentId()) && + F.eq(req.initiatingNodeId(), ctx.localNodeId())) fut.onDone(); } } @@ -1451,7 +1457,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (DynamicCacheDescriptor desc : registeredCaches.values()) { if (!desc.cancelled()) { - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest((UUID)null); + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null); req.startCacheConfiguration(desc.cacheConfiguration()); @@ -1476,7 +1482,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (DynamicCacheChangeRequest req : batch.requests()) { DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName())); - if (req.isStart() && !req.clientStartOnly()) { + if (req.start() && !req.clientStartOnly()) { CacheConfiguration ccfg = req.startCacheConfiguration(); if (existing != null) { @@ -1529,6 +1535,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param ccfg Cache configuration. * @return Future that will be completed when cache is deployed. */ + @SuppressWarnings("IfMayBeConditional") public IgniteInternalFuture<?> dynamicStartCache( @Nullable CacheConfiguration ccfg, String cacheName, @@ -1547,14 +1554,28 @@ public class GridCacheProcessor extends GridProcessorAdapter { return new GridFinishedFuture<>(new IgniteCacheExistsException("Failed to start cache " + "(a cache with the same name is already started): " + cacheName)); else { + CacheConfiguration descCfg = desc.cacheConfiguration(); + // Check if we were asked to start a near cache. - if (nearCfg != null) - req.clientStartOnly(true); + if (nearCfg != null) { + if (descCfg.getNodeFilter().apply(ctx.discovery().localNode())) { + // If we are on a data node and near cache was enabled, return success, else - fail. + if (descCfg.getNearConfiguration() != null) + return new GridFinishedFuture<>(); + else + return new GridFinishedFuture<>(new IgniteCheckedException("Failed to start near " + + "cache (local node is an affinity node for cache): " + cacheName)); + } + else + // If local node has near cache, return success. + req.clientStartOnly(true); + } else return new GridFinishedFuture<>(); req.deploymentId(desc.deploymentId()); - req.startCacheConfiguration(ccfg); + + req.startCacheConfiguration(descCfg); } } else { @@ -1583,9 +1604,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { return new GridFinishedFuture<>(new IgniteCacheExistsException("Failed to start near cache " + "(a cache with the given name is not started): " + cacheName)); - if (ccfg.getNodeFilter().apply(ctx.discovery().localNode())) - return new GridFinishedFuture<>(new IgniteCheckedException("Failed to start near cache " + - "(local node is an affinity node for cache): " + cacheName)); + if (ccfg.getNodeFilter().apply(ctx.discovery().localNode())) { + if (ccfg.getNearConfiguration() != null) + return new GridFinishedFuture<>(); + else + return new GridFinishedFuture<>(new IgniteCheckedException("Failed to start near cache " + + "(local node is an affinity node for cache): " + cacheName)); + } req.deploymentId(desc.deploymentId()); req.startCacheConfiguration(ccfg); @@ -1602,7 +1627,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Future that will be completed when cache is stopped. */ public IgniteInternalFuture<?> dynamicStopCache(String cacheName) { - return F.first(initiateCacheChanges(F.asList(new DynamicCacheChangeRequest(cacheName)))); + DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId()); + + return F.first(initiateCacheChanges(F.asList(t))); } /** @@ -1618,7 +1645,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req.deploymentId()); try { - if (req.isStop()) { + if (req.stop()) { DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); if (desc == null) @@ -1642,7 +1669,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { maskNull(req.cacheName()), fut); if (old != null) { - if (req.isStart() && !req.clientStartOnly()) { + if (req.start() && !req.clientStartOnly()) { fut.onDone(new IgniteCacheExistsException("Failed to start cache " + "(a cache with the same name is already being started or stopped): " + req.cacheName())); } @@ -1666,7 +1693,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } - ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sendReqs)); + if (!sendReqs.isEmpty()) + ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sendReqs)); return res; } @@ -1680,7 +1708,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (DynamicCacheChangeRequest req : batch.requests()) { DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); - if (req.isStart()) { + if (req.start()) { CacheConfiguration ccfg = req.startCacheConfiguration(); DynamicCacheStartFuture startFut = (DynamicCacheStartFuture)pendingFuts.get( http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6cb8b4db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index bab9f57..b06f2de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -281,7 +281,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT public boolean isCacheAdded(int cacheId) { if (!F.isEmpty(reqs)) { for (DynamicCacheChangeRequest req : reqs) { - if (req.isStart() && !req.clientStartOnly()) { + if (req.start() && !req.clientStartOnly()) { if (CU.cacheId(req.cacheName()) == cacheId) return true; } @@ -584,7 +584,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (!F.isEmpty(reqs)) { for (DynamicCacheChangeRequest req : reqs) { if (cacheId == CU.cacheId(req.cacheName())) { - stopping = req.isStop(); + stopping = req.stop(); break; } @@ -600,7 +600,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT private void startCaches() throws IgniteCheckedException { cctx.cache().prepareCachesStart(F.view(reqs, new IgnitePredicate<DynamicCacheChangeRequest>() { @Override public boolean apply(DynamicCacheChangeRequest req) { - return req.isStart(); + return req.start(); } }), exchId.topologyVersion()); } @@ -610,7 +610,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT */ private void blockGateways() { for (DynamicCacheChangeRequest req : reqs) { - if (req.isStop()) + if (req.stop()) cctx.cache().blockGateway(req); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6cb8b4db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index 3f82cbd..cfb0e9f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -29,6 +29,7 @@ import org.apache.ignite.lang.*; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; +import javax.cache.*; import java.util.*; import java.util.concurrent.*; @@ -257,7 +258,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { ccfg.setName(DYNAMIC_CACHE_NAME); - kernal.createCache(ccfg, null); + kernal.createCache(ccfg); for (int g = 0; g < nodeCount(); g++) { IgniteKernal kernal0 = (IgniteKernal) grid(g); @@ -319,7 +320,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { ccfg.setName(DYNAMIC_CACHE_NAME); - kernal.createCache(ccfg, null); + kernal.createCache(ccfg); info(">>>>>>> Deployed dynamic cache"); @@ -382,7 +383,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { ccfg.setNodeFilter(NODE_FILTER); - kernal.createCache(ccfg, null); + kernal.createCache(ccfg); startGrid(nodeCount() + 1); @@ -435,7 +436,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testFailWhenConfiguredCacheExists() throws Exception { - GridTestUtils.assertThrows(log, new Callable<Object>() { + GridTestUtils.assertThrowsInherited(log, new Callable<Object>() { @Override public Object call() throws Exception { final IgniteKernal kernal = (IgniteKernal) grid(0); @@ -448,9 +449,9 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { ccfg.setNodeFilter(NODE_FILTER); - return kernal.createCache(ccfg, null); + return kernal.createCache(ccfg); } - }, IgniteCheckedException.class, null); + }, IgniteCacheExistsException.class, null); } /** @@ -471,7 +472,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { ccfg.setNodeFilter(NODE_FILTER); - kernal.createCache(ccfg, null); + kernal.createCache(ccfg); GridTestUtils.assertThrows(log, new Callable<Object>() { @Override @@ -516,7 +517,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { final IgniteKernal started = (IgniteKernal) grid(nodeCount()); - started.createCache(ccfg, null); + started.createCache(ccfg); GridCacheAdapter<Object, Object> cache = started.internalCache(DYNAMIC_CACHE_NAME); @@ -681,4 +682,64 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { stopGrid(nodeCount()); } } + + /** {@inheritDoc} */ + public void testGetOrCreate() throws Exception { + try { + final CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setName(DYNAMIC_CACHE_NAME); + cfg.setNodeFilter(NODE_FILTER); + + grid(0).getOrCreateCache(cfg); + grid(0).getOrCreateCache(cfg); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return grid(0).getOrCreateCache(cfg, new NearCacheConfiguration()); + } + }, CacheException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return grid(0).getOrCreateCache(DYNAMIC_CACHE_NAME, new NearCacheConfiguration()); + } + }, CacheException.class, null); + + testAttribute = false; + + startGrid(nodeCount()); + startGrid(nodeCount() + 1); + + try { + IgniteEx nearGrid = grid(nodeCount()); + + nearGrid.getOrCreateCache(cfg, new NearCacheConfiguration()); + nearGrid.getOrCreateCache(DYNAMIC_CACHE_NAME, new NearCacheConfiguration()); + + GridCacheContext<Object, Object> nCtx = ((IgniteKernal)nearGrid) + .internalCache(DYNAMIC_CACHE_NAME).context(); + + assertTrue(nCtx.isNear()); + assertFalse(nCtx.affinityNode()); + + IgniteEx clientGrid = grid(nodeCount() + 1); + + clientGrid.getOrCreateCache(cfg); + clientGrid.getOrCreateCache(cfg); + + GridCacheContext<Object, Object> cCtx = ((IgniteKernal)clientGrid) + .internalCache(DYNAMIC_CACHE_NAME).context(); + + assertFalse(cCtx.isNear()); + assertFalse(cCtx.affinityNode()); + } finally { + stopGrid(nodeCount() + 1); + stopGrid(nodeCount()); + } + } + finally { + grid(0).destroyCache(DYNAMIC_CACHE_NAME); + } + } }