IGNITE-45 - WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/378112ed Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/378112ed Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/378112ed Branch: refs/heads/ignite-45 Commit: 378112edf19ec7ae9dc0d607731109c23b810155 Parents: ff5c047 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Thu Mar 5 18:15:04 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Thu Mar 5 18:15:04 2015 -0800 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 58 ++++++-- .../cache/IgniteDynamicCacheStartSelfTest.java | 141 ++++++++++++++++--- 2 files changed, 165 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/378112ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index e7bc1ec..7aed925 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -665,23 +665,29 @@ public class GridCacheProcessor extends GridProcessorAdapter { // Start dynamic caches received from collect discovery data. for (DynamicCacheDescriptor desc : dynamicCaches.values()) { - GridCacheContext ctx = createCache(desc.cacheConfiguration()); + if (hasStaticCache(desc.cacheConfiguration().getName())) + throw new IgniteCheckedException("Failed to start node (current grid has dynamic cache which " + + "conflicts with locally configured static cache: " + desc.cacheConfiguration().getName()); - ctx.dynamicDeploymentId(desc.deploymentId()); + if (desc.nodeFilter().apply(ctx.discovery().localNode())) { + GridCacheContext ctx = createCache(desc.cacheConfiguration()); - sharedCtx.addCacheContext(ctx); + ctx.dynamicDeploymentId(desc.deploymentId()); - GridCacheAdapter cache = ctx.cache(); + sharedCtx.addCacheContext(ctx); - String name = desc.cacheConfiguration().getName(); + GridCacheAdapter cache = ctx.cache(); - caches.put(name, cache); + String name = desc.cacheConfiguration().getName(); - startCache(cache); + caches.put(name, cache); + + startCache(cache); - proxies.put(name, new GridCacheProxyImpl(ctx, cache, null)); + proxies.put(name, new GridCacheProxyImpl(ctx, cache, null)); - jCacheProxies.put(name, new IgniteCacheProxy(ctx, cache, null, false)); + jCacheProxies.put(name, new IgniteCacheProxy(ctx, cache, null, false)); + } } if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { @@ -1203,16 +1209,25 @@ public class GridCacheProcessor extends GridProcessorAdapter { public void prepareCacheStart(DynamicCacheChangeRequest req) throws IgniteCheckedException { assert req.isStart(); - GridCacheContext cacheCtx = createCache(req.startCacheConfiguration()); + if (hasStaticCache(req.cacheName())) { + U.warn(log, "Failed to start dynamic cache (a static cache with the same name is already " + + "configured: " + req.cacheName()); + + return; + } - cacheCtx.dynamicDeploymentId(req.deploymentId()); + if (req.startNodeFilter().apply(ctx.discovery().localNode())) { + GridCacheContext cacheCtx = createCache(req.startCacheConfiguration()); - sharedCtx.addCacheContext(cacheCtx); + cacheCtx.dynamicDeploymentId(req.deploymentId()); + + sharedCtx.addCacheContext(cacheCtx); - startCache(cacheCtx.cache()); - onKernalStart(cacheCtx.cache()); + startCache(cacheCtx.cache()); + onKernalStart(cacheCtx.cache()); - caches.put(cacheCtx.name(), cacheCtx.cache()); + caches.put(cacheCtx.name(), cacheCtx.cache()); + } } /** @@ -1369,6 +1384,19 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param cacheName Cache name to check. + * @return {@code True} if local node has a static cache with the given name configured. + */ + private boolean hasStaticCache(String cacheName) { + for (CacheConfiguration ccfg : ctx.config().getCacheConfiguration()) { + if (F.eq(cacheName, ccfg.getName())) + return true; + } + + return false; + } + + /** * Dynamically starts cache. * * @param ccfg Cache configuration. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/378112ed/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index 6c96b36..a0b593e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -23,6 +23,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; @@ -35,7 +36,26 @@ import java.util.concurrent.*; @SuppressWarnings("unchecked") public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { /** */ - private static final String CACHE_NAME = "TestDynamicCache"; + private static final String DYNAMIC_CACHE_NAME = "TestDynamicCache"; + + /** */ + private static final String STATIC_CACHE_NAME = "TestStaticCache"; + + /** */ + private static final String TEST_ATTRIBUTE_NAME = "TEST_ATTRIBUTE_NAME"; + + /** */ + public static final IgnitePredicate<ClusterNode> NODE_FILTER = new IgnitePredicate<ClusterNode>() { + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode n) { + Boolean val = n.attribute(TEST_ATTRIBUTE_NAME); + + return val != null && val; + } + }; + + /** */ + private boolean testAttribute = true; /** * @return Number of nodes for this test. @@ -45,6 +65,21 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setUserAttributes(F.asMap(TEST_ATTRIBUTE_NAME, testAttribute)); + + CacheConfiguration cacheCfg = new CacheConfiguration(); + + cacheCfg.setName(STATIC_CACHE_NAME); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { startGridsMultiThreaded(nodeCount()); } @@ -68,7 +103,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { @Override public Object call() throws Exception { CacheConfiguration ccfg = new CacheConfiguration(); - ccfg.setName(CACHE_NAME); + ccfg.setName(DYNAMIC_CACHE_NAME); futs.add(kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue())); @@ -101,7 +136,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { GridTestUtils.runMultiThreaded(new Callable<Object>() { @Override public Object call() throws Exception { - futs.add(kernal.context().cache().dynamicStopCache(CACHE_NAME)); + futs.add(kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME)); return null; } @@ -125,7 +160,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { @Override public Object call() throws Exception { CacheConfiguration ccfg = new CacheConfiguration(); - ccfg.setName(CACHE_NAME); + ccfg.setName(DYNAMIC_CACHE_NAME); IgniteKernal kernal = (IgniteKernal)grid(ThreadLocalRandom.current().nextInt(nodeCount())); @@ -162,7 +197,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { @Override public Object call() throws Exception { IgniteKernal kernal = (IgniteKernal)grid(ThreadLocalRandom.current().nextInt(nodeCount())); - futs.add(kernal.context().cache().dynamicStopCache(CACHE_NAME)); + futs.add(kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME)); return null; } @@ -198,7 +233,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); ccfg.setAtomicityMode(mode); - ccfg.setName(CACHE_NAME); + ccfg.setName(DYNAMIC_CACHE_NAME); kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue()).get(); @@ -208,21 +243,21 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures()) f.get(); - assertNotNull(grid(g).jcache(CACHE_NAME)); + assertNotNull(grid(g).jcache(DYNAMIC_CACHE_NAME)); } - grid(0).jcache(CACHE_NAME).put("1", "1"); + grid(0).jcache(DYNAMIC_CACHE_NAME).put("1", "1"); for (int g = 0; g < nodeCount(); g++) - assertEquals("1", grid(g).jcache(CACHE_NAME).get("1")); + assertEquals("1", grid(g).jcache(DYNAMIC_CACHE_NAME).get("1")); // Grab caches before stop. final IgniteCache[] caches = new IgniteCache[nodeCount()]; for (int g = 0; g < nodeCount(); g++) - caches[g] = grid(g).jcache(CACHE_NAME); + caches[g] = grid(g).jcache(DYNAMIC_CACHE_NAME); - kernal.context().cache().dynamicStopCache(CACHE_NAME).get(); + kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME).get(); for (int g = 0; g < nodeCount(); g++) { final IgniteKernal kernal0 = (IgniteKernal)grid(g); @@ -234,7 +269,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { - return kernal0.jcache(CACHE_NAME); + return kernal0.jcache(DYNAMIC_CACHE_NAME); } }, IllegalArgumentException.class, null); @@ -255,7 +290,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - ccfg.setName(CACHE_NAME); + ccfg.setName(DYNAMIC_CACHE_NAME); kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue()).get(); @@ -263,20 +298,23 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { try { // Check that cache got deployed on new node. - IgniteCache<Object, Object> cache = ignite(nodeCount()).jcache(CACHE_NAME); + IgniteCache<Object, Object> cache = ignite(nodeCount()).jcache(DYNAMIC_CACHE_NAME); cache.put("1", "1"); - for (int g = 0; g < nodeCount(); g++) - assertEquals("1", grid(g).jcache(CACHE_NAME).get("1")); + for (int g = 0; g < nodeCount() + 1; g++) { + assertEquals("1", grid(g).jcache(DYNAMIC_CACHE_NAME).get("1")); + + assertEquals(nodeCount() + 1, grid(g).affinity(DYNAMIC_CACHE_NAME).mapKeyToPrimaryAndBackups(0).size()); + } // Undeploy cache. - kernal.context().cache().dynamicStopCache(CACHE_NAME).get(); + kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME).get(); startGrid(nodeCount() + 1); // Check that cache is not deployed on new node after undeploy. - for (int g = 0; g < nodeCount(); g++) { + for (int g = 0; g < nodeCount() + 2; g++) { final IgniteKernal kernal0 = (IgniteKernal)grid(g); for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures()) @@ -284,7 +322,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { - return kernal0.jcache(CACHE_NAME); + return kernal0.jcache(DYNAMIC_CACHE_NAME); } }, IllegalArgumentException.class, null); } @@ -294,4 +332,69 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { stopGrid(nodeCount()); } } + + /** + * @throws Exception If failed. + */ + public void testDeployFilter() throws Exception { + try { + testAttribute = false; + + startGrid(nodeCount()); + + final IgniteKernal kernal = (IgniteKernal)grid(0); + + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + + ccfg.setName(DYNAMIC_CACHE_NAME); + + kernal.context().cache().dynamicStartCache(ccfg, NODE_FILTER).get(); + + startGrid(nodeCount() + 1); + + for (int i = 0; i < 100; i++) + grid(0).jcache(DYNAMIC_CACHE_NAME).put(i, i); + + for (int i = 0; i < 100; i++) + assertEquals(i, grid(1).jcache(DYNAMIC_CACHE_NAME).get(i)); + + info("Affinity nodes: " + grid(0).affinity(DYNAMIC_CACHE_NAME).mapKeyToPrimaryAndBackups(0)); + + for (int g = 0; g < nodeCount(); g++) { + for (int i = 0; i < 100; i++) { + assertFalse(grid(g).affinity(DYNAMIC_CACHE_NAME).mapKeyToPrimaryAndBackups(i) + .contains(grid(nodeCount()).cluster().localNode())); + + assertFalse(grid(g).affinity(DYNAMIC_CACHE_NAME).mapKeyToPrimaryAndBackups(i) + .contains(grid(nodeCount() + 1).cluster().localNode())); + } + } + + // Check that cache is not deployed on new node after undeploy. + for (int g = 0; g < nodeCount() + 2; g++) { + final IgniteKernal kernal0 = (IgniteKernal)grid(g); + + for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures()) + f.get(); + + if (g < nodeCount()) + assertNotNull(grid(g).jcache(DYNAMIC_CACHE_NAME)); + else + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return kernal0.jcache(DYNAMIC_CACHE_NAME); + } + }, IllegalArgumentException.class, null); + } + + kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME).get(); + + stopGrid(nodeCount() + 1); + stopGrid(nodeCount()); + } + finally { + testAttribute = true; + } + } }