IGNITE-45 - WIP

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/378112ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/378112ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/378112ed

Branch: refs/heads/ignite-45
Commit: 378112edf19ec7ae9dc0d607731109c23b810155
Parents: ff5c047
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Thu Mar 5 18:15:04 2015 -0800
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Thu Mar 5 18:15:04 2015 -0800

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    |  58 ++++++--
 .../cache/IgniteDynamicCacheStartSelfTest.java  | 141 ++++++++++++++++---
 2 files changed, 165 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/378112ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e7bc1ec..7aed925 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -665,23 +665,29 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
         // Start dynamic caches received from collect discovery data.
         for (DynamicCacheDescriptor desc : dynamicCaches.values()) {
-            GridCacheContext ctx = createCache(desc.cacheConfiguration());
+            if (hasStaticCache(desc.cacheConfiguration().getName()))
+                throw new IgniteCheckedException("Failed to start node 
(current grid has dynamic cache which " +
+                    "conflicts with locally configured static cache: " + 
desc.cacheConfiguration().getName());
 
-            ctx.dynamicDeploymentId(desc.deploymentId());
+            if (desc.nodeFilter().apply(ctx.discovery().localNode())) {
+                GridCacheContext ctx = createCache(desc.cacheConfiguration());
 
-            sharedCtx.addCacheContext(ctx);
+                ctx.dynamicDeploymentId(desc.deploymentId());
 
-            GridCacheAdapter cache = ctx.cache();
+                sharedCtx.addCacheContext(ctx);
 
-            String name = desc.cacheConfiguration().getName();
+                GridCacheAdapter cache = ctx.cache();
 
-            caches.put(name, cache);
+                String name = desc.cacheConfiguration().getName();
 
-            startCache(cache);
+                caches.put(name, cache);
+
+                startCache(cache);
 
-            proxies.put(name, new GridCacheProxyImpl(ctx, cache, null));
+                proxies.put(name, new GridCacheProxyImpl(ctx, cache, null));
 
-            jCacheProxies.put(name, new IgniteCacheProxy(ctx, cache, null, 
false));
+                jCacheProxies.put(name, new IgniteCacheProxy(ctx, cache, null, 
false));
+            }
         }
 
         if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
@@ -1203,16 +1209,25 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     public void prepareCacheStart(DynamicCacheChangeRequest req) throws 
IgniteCheckedException {
         assert req.isStart();
 
-        GridCacheContext cacheCtx = createCache(req.startCacheConfiguration());
+        if (hasStaticCache(req.cacheName())) {
+            U.warn(log, "Failed to start dynamic cache (a static cache with 
the same name is already " +
+                "configured: " + req.cacheName());
+
+            return;
+        }
 
-        cacheCtx.dynamicDeploymentId(req.deploymentId());
+        if (req.startNodeFilter().apply(ctx.discovery().localNode())) {
+            GridCacheContext cacheCtx = 
createCache(req.startCacheConfiguration());
 
-        sharedCtx.addCacheContext(cacheCtx);
+            cacheCtx.dynamicDeploymentId(req.deploymentId());
+
+            sharedCtx.addCacheContext(cacheCtx);
 
-        startCache(cacheCtx.cache());
-        onKernalStart(cacheCtx.cache());
+            startCache(cacheCtx.cache());
+            onKernalStart(cacheCtx.cache());
 
-        caches.put(cacheCtx.name(), cacheCtx.cache());
+            caches.put(cacheCtx.name(), cacheCtx.cache());
+        }
     }
 
     /**
@@ -1369,6 +1384,19 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * @param cacheName Cache name to check.
+     * @return {@code True} if local node has a static cache with the given 
name configured.
+     */
+    private boolean hasStaticCache(String cacheName) {
+        for (CacheConfiguration ccfg : ctx.config().getCacheConfiguration()) {
+            if (F.eq(cacheName, ccfg.getName()))
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
      * Dynamically starts cache.
      *
      * @param ccfg Cache configuration.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/378112ed/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 6c96b36..a0b593e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.junits.common.*;
 
@@ -35,7 +36,26 @@ import java.util.concurrent.*;
 @SuppressWarnings("unchecked")
 public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
     /** */
-    private static final String CACHE_NAME = "TestDynamicCache";
+    private static final String DYNAMIC_CACHE_NAME = "TestDynamicCache";
+
+    /** */
+    private static final String STATIC_CACHE_NAME = "TestStaticCache";
+
+    /** */
+    private static final String TEST_ATTRIBUTE_NAME = "TEST_ATTRIBUTE_NAME";
+
+    /** */
+    public static final IgnitePredicate<ClusterNode> NODE_FILTER = new 
IgnitePredicate<ClusterNode>() {
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode n) {
+            Boolean val = n.attribute(TEST_ATTRIBUTE_NAME);
+
+            return val != null && val;
+        }
+    };
+
+    /** */
+    private boolean testAttribute = true;
 
     /**
      * @return Number of nodes for this test.
@@ -45,6 +65,21 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setUserAttributes(F.asMap(TEST_ATTRIBUTE_NAME, testAttribute));
+
+        CacheConfiguration cacheCfg = new CacheConfiguration();
+
+        cacheCfg.setName(STATIC_CACHE_NAME);
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         startGridsMultiThreaded(nodeCount());
     }
@@ -68,7 +103,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
             @Override public Object call() throws Exception {
                 CacheConfiguration ccfg = new CacheConfiguration();
 
-                ccfg.setName(CACHE_NAME);
+                ccfg.setName(DYNAMIC_CACHE_NAME);
 
                 futs.add(kernal.context().cache().dynamicStartCache(ccfg, 
F.<ClusterNode>alwaysTrue()));
 
@@ -101,7 +136,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
 
         GridTestUtils.runMultiThreaded(new Callable<Object>() {
             @Override public Object call() throws Exception {
-                
futs.add(kernal.context().cache().dynamicStopCache(CACHE_NAME));
+                
futs.add(kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME));
 
                 return null;
             }
@@ -125,7 +160,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
             @Override public Object call() throws Exception {
                 CacheConfiguration ccfg = new CacheConfiguration();
 
-                ccfg.setName(CACHE_NAME);
+                ccfg.setName(DYNAMIC_CACHE_NAME);
 
                 IgniteKernal kernal = 
(IgniteKernal)grid(ThreadLocalRandom.current().nextInt(nodeCount()));
 
@@ -162,7 +197,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
             @Override public Object call() throws Exception {
                 IgniteKernal kernal = 
(IgniteKernal)grid(ThreadLocalRandom.current().nextInt(nodeCount()));
 
-                
futs.add(kernal.context().cache().dynamicStopCache(CACHE_NAME));
+                
futs.add(kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME));
 
                 return null;
             }
@@ -198,7 +233,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
         
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
         ccfg.setAtomicityMode(mode);
 
-        ccfg.setName(CACHE_NAME);
+        ccfg.setName(DYNAMIC_CACHE_NAME);
 
         kernal.context().cache().dynamicStartCache(ccfg, 
F.<ClusterNode>alwaysTrue()).get();
 
@@ -208,21 +243,21 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
             for (IgniteInternalFuture f : 
kernal0.context().cache().context().exchange().exchangeFutures())
                 f.get();
 
-            assertNotNull(grid(g).jcache(CACHE_NAME));
+            assertNotNull(grid(g).jcache(DYNAMIC_CACHE_NAME));
         }
 
-        grid(0).jcache(CACHE_NAME).put("1", "1");
+        grid(0).jcache(DYNAMIC_CACHE_NAME).put("1", "1");
 
         for (int g = 0; g < nodeCount(); g++)
-            assertEquals("1", grid(g).jcache(CACHE_NAME).get("1"));
+            assertEquals("1", grid(g).jcache(DYNAMIC_CACHE_NAME).get("1"));
 
         // Grab caches before stop.
         final IgniteCache[] caches = new IgniteCache[nodeCount()];
 
         for (int g = 0; g < nodeCount(); g++)
-            caches[g] = grid(g).jcache(CACHE_NAME);
+            caches[g] = grid(g).jcache(DYNAMIC_CACHE_NAME);
 
-        kernal.context().cache().dynamicStopCache(CACHE_NAME).get();
+        kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME).get();
 
         for (int g = 0; g < nodeCount(); g++) {
             final IgniteKernal kernal0 = (IgniteKernal)grid(g);
@@ -234,7 +269,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
 
             GridTestUtils.assertThrows(log, new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    return kernal0.jcache(CACHE_NAME);
+                    return kernal0.jcache(DYNAMIC_CACHE_NAME);
                 }
             }, IllegalArgumentException.class, null);
 
@@ -255,7 +290,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
         CacheConfiguration ccfg = new CacheConfiguration();
         
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
 
-        ccfg.setName(CACHE_NAME);
+        ccfg.setName(DYNAMIC_CACHE_NAME);
 
         kernal.context().cache().dynamicStartCache(ccfg, 
F.<ClusterNode>alwaysTrue()).get();
 
@@ -263,20 +298,23 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
 
         try {
             // Check that cache got deployed on new node.
-            IgniteCache<Object, Object> cache = 
ignite(nodeCount()).jcache(CACHE_NAME);
+            IgniteCache<Object, Object> cache = 
ignite(nodeCount()).jcache(DYNAMIC_CACHE_NAME);
 
             cache.put("1", "1");
 
-            for (int g = 0; g < nodeCount(); g++)
-                assertEquals("1", grid(g).jcache(CACHE_NAME).get("1"));
+            for (int g = 0; g < nodeCount() + 1; g++) {
+                assertEquals("1", grid(g).jcache(DYNAMIC_CACHE_NAME).get("1"));
+
+                assertEquals(nodeCount() + 1, 
grid(g).affinity(DYNAMIC_CACHE_NAME).mapKeyToPrimaryAndBackups(0).size());
+            }
 
             // Undeploy cache.
-            kernal.context().cache().dynamicStopCache(CACHE_NAME).get();
+            
kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME).get();
 
             startGrid(nodeCount() + 1);
 
             // Check that cache is not deployed on new node after undeploy.
-            for (int g = 0; g < nodeCount(); g++) {
+            for (int g = 0; g < nodeCount() + 2; g++) {
                 final IgniteKernal kernal0 = (IgniteKernal)grid(g);
 
                 for (IgniteInternalFuture f : 
kernal0.context().cache().context().exchange().exchangeFutures())
@@ -284,7 +322,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
 
                 GridTestUtils.assertThrows(log, new Callable<Object>() {
                     @Override public Object call() throws Exception {
-                        return kernal0.jcache(CACHE_NAME);
+                        return kernal0.jcache(DYNAMIC_CACHE_NAME);
                     }
                 }, IllegalArgumentException.class, null);
             }
@@ -294,4 +332,69 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
             stopGrid(nodeCount());
         }
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeployFilter() throws Exception {
+        try {
+            testAttribute = false;
+
+            startGrid(nodeCount());
+
+            final IgniteKernal kernal = (IgniteKernal)grid(0);
+
+            CacheConfiguration ccfg = new CacheConfiguration();
+            
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+
+            ccfg.setName(DYNAMIC_CACHE_NAME);
+
+            kernal.context().cache().dynamicStartCache(ccfg, 
NODE_FILTER).get();
+
+            startGrid(nodeCount() + 1);
+
+            for (int i = 0; i < 100; i++)
+                grid(0).jcache(DYNAMIC_CACHE_NAME).put(i, i);
+
+            for (int i = 0; i < 100; i++)
+                assertEquals(i, grid(1).jcache(DYNAMIC_CACHE_NAME).get(i));
+
+            info("Affinity nodes: " + 
grid(0).affinity(DYNAMIC_CACHE_NAME).mapKeyToPrimaryAndBackups(0));
+
+            for (int g = 0; g < nodeCount(); g++) {
+                for (int i = 0; i < 100; i++) {
+                    
assertFalse(grid(g).affinity(DYNAMIC_CACHE_NAME).mapKeyToPrimaryAndBackups(i)
+                        .contains(grid(nodeCount()).cluster().localNode()));
+
+                    
assertFalse(grid(g).affinity(DYNAMIC_CACHE_NAME).mapKeyToPrimaryAndBackups(i)
+                        .contains(grid(nodeCount() + 
1).cluster().localNode()));
+                }
+            }
+
+            // Check that cache is not deployed on new node after undeploy.
+            for (int g = 0; g < nodeCount() + 2; g++) {
+                final IgniteKernal kernal0 = (IgniteKernal)grid(g);
+
+                for (IgniteInternalFuture f : 
kernal0.context().cache().context().exchange().exchangeFutures())
+                    f.get();
+
+                if (g < nodeCount())
+                    assertNotNull(grid(g).jcache(DYNAMIC_CACHE_NAME));
+                else
+                    GridTestUtils.assertThrows(log, new Callable<Object>() {
+                        @Override public Object call() throws Exception {
+                            return kernal0.jcache(DYNAMIC_CACHE_NAME);
+                        }
+                    }, IllegalArgumentException.class, null);
+            }
+
+            
kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME).get();
+
+            stopGrid(nodeCount() + 1);
+            stopGrid(nodeCount());
+        }
+        finally {
+            testAttribute = true;
+        }
+    }
 }

Reply via email to