IGNITE-45 - getOrCreate methods.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6cb8b4db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6cb8b4db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6cb8b4db

Branch: refs/heads/ignite-45
Commit: 6cb8b4db2dbbca83e8e60eef854cfc042e38a929
Parents: 1a91054
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Fri Mar 20 18:26:43 2015 -0700
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Fri Mar 20 18:26:43 2015 -0700

----------------------------------------------------------------------
 .../cache/DynamicCacheChangeRequest.java        | 14 +++-
 .../processors/cache/GridCacheProcessor.java    | 68 ++++++++++++-----
 .../GridDhtPartitionsExchangeFuture.java        |  8 +-
 .../cache/IgniteDynamicCacheStartSelfTest.java  | 77 ++++++++++++++++++--
 4 files changed, 131 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6cb8b4db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index be88217..5763a36 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -51,13 +51,19 @@ public class DynamicCacheChangeRequest implements 
Serializable {
     /** Start only client cache, do not start data nodes. */
     private boolean clientStartOnly;
 
+    /** Stop flag. */
+    private boolean stop;
+
     /**
      * Constructor creates cache stop request.
      *
      * @param cacheName Cache stop name.
      */
-    public DynamicCacheChangeRequest(String cacheName) {
+    public DynamicCacheChangeRequest(String cacheName, UUID initiatingNodeId) {
         this.cacheName = cacheName;
+        this.initiatingNodeId = initiatingNodeId;
+
+        stop = true;
     }
 
     /**
@@ -88,15 +94,15 @@ public class DynamicCacheChangeRequest implements 
Serializable {
     /**
      * @return {@code True} if this is a start request.
      */
-    public boolean isStart() {
+    public boolean start() {
         return startCfg != null;
     }
 
     /**
      * @return {@code True} if this is a stop request.
      */
-    public boolean isStop() {
-        return initiatingNodeId == null && startCfg == null;
+    public boolean stop() {
+        return stop;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6cb8b4db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 278e2c8..2eb030a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -963,8 +963,10 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @return Cache context.
      * @throws IgniteCheckedException If failed to create cache.
      */
-    @SuppressWarnings( {"unchecked"})
+    @SuppressWarnings({"unchecked"})
     private GridCacheContext createCache(CacheConfiguration<?, ?> cfg, 
CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
+        assert cfg != null;
+
         CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? 
cfg.getCacheStoreFactory().create() : null;
 
         validate(ctx.config(), cfg, cfgStore);
@@ -1236,7 +1238,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         DynamicCacheDescriptor desc = 
registeredCaches.get(maskNull(req.cacheName()));
 
         if (desc != null && desc.deploymentId().equals(req.deploymentId())) {
-            if (req.isStart())
+            if (req.start())
                 return !desc.cancelled();
             else
                 return desc.cancelled();
@@ -1255,7 +1257,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         AffinityTopologyVersion topVer
     ) throws IgniteCheckedException {
         for (DynamicCacheChangeRequest req : reqs) {
-            assert req.isStart();
+            assert req.start();
 
             prepareCacheStart(
                 req.startCacheConfiguration(),
@@ -1308,6 +1310,9 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         boolean affNodeStart = !clientStartOnly && nodeFilter.apply(locNode);
         boolean clientNodeStart = locNode.id().equals(initiatingNodeId);
 
+        if (sharedCtx.cacheContext(CU.cacheId(cfg.getName())) != null)
+            return;
+
         if (affNodeStart || clientNodeStart) {
             if (clientNodeStart && !affNodeStart) {
                 if (nearCfg != null)
@@ -1335,7 +1340,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @param req Stop request.
      */
     public void blockGateway(DynamicCacheChangeRequest req) {
-        assert req.isStop();
+        assert req.stop();
 
         // Break the proxy before exchange future is done.
         IgniteCacheProxy<?, ?> proxy = 
jCacheProxies.remove(maskNull(req.cacheName()));
@@ -1348,7 +1353,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @param req Stop request.
      */
     public void prepareCacheStop(DynamicCacheChangeRequest req) {
-        assert req.isStop();
+        assert req.stop();
 
         GridCacheAdapter<?, ?> cache = 
caches.remove(maskNull(req.cacheName()));
 
@@ -1393,7 +1398,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             for (DynamicCacheChangeRequest req : reqs) {
                 String masked = maskNull(req.cacheName());
 
-                if (req.isStop()) {
+                if (req.stop()) {
                     prepareCacheStop(req);
 
                     DynamicCacheDescriptor desc = registeredCaches.get(masked);
@@ -1407,7 +1412,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                 assert req.deploymentId() != null;
                 assert fut == null || fut.deploymentId != null;
 
-                if (fut != null && 
fut.deploymentId().equals(req.deploymentId()))
+                if (fut != null && 
fut.deploymentId().equals(req.deploymentId()) &&
+                    F.eq(req.initiatingNodeId(), ctx.localNodeId()))
                     fut.onDone();
             }
         }
@@ -1451,7 +1457,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
         for (DynamicCacheDescriptor desc : registeredCaches.values()) {
             if (!desc.cancelled()) {
-                DynamicCacheChangeRequest req = new 
DynamicCacheChangeRequest((UUID)null);
+                DynamicCacheChangeRequest req = new 
DynamicCacheChangeRequest(null);
 
                 req.startCacheConfiguration(desc.cacheConfiguration());
 
@@ -1476,7 +1482,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             for (DynamicCacheChangeRequest req : batch.requests()) {
                 DynamicCacheDescriptor existing = 
registeredCaches.get(maskNull(req.cacheName()));
 
-                if (req.isStart() && !req.clientStartOnly()) {
+                if (req.start() && !req.clientStartOnly()) {
                     CacheConfiguration ccfg = req.startCacheConfiguration();
 
                     if (existing != null) {
@@ -1529,6 +1535,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @param ccfg Cache configuration.
      * @return Future that will be completed when cache is deployed.
      */
+    @SuppressWarnings("IfMayBeConditional")
     public IgniteInternalFuture<?> dynamicStartCache(
         @Nullable CacheConfiguration ccfg,
         String cacheName,
@@ -1547,14 +1554,28 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                     return new GridFinishedFuture<>(new 
IgniteCacheExistsException("Failed to start cache " +
                             "(a cache with the same name is already started): 
" + cacheName));
                 else {
+                    CacheConfiguration descCfg = desc.cacheConfiguration();
+
                     // Check if we were asked to start a near cache.
-                    if (nearCfg != null)
-                        req.clientStartOnly(true);
+                    if (nearCfg != null) {
+                        if 
(descCfg.getNodeFilter().apply(ctx.discovery().localNode())) {
+                            // If we are on a data node and near cache was 
enabled, return success, else - fail.
+                            if (descCfg.getNearConfiguration() != null)
+                                return new GridFinishedFuture<>();
+                            else
+                                return new GridFinishedFuture<>(new 
IgniteCheckedException("Failed to start near " +
+                                        "cache (local node is an affinity node 
for cache): " + cacheName));
+                        }
+                        else
+                            // If local node has near cache, return success.
+                            req.clientStartOnly(true);
+                    }
                     else
                         return new GridFinishedFuture<>();
 
                     req.deploymentId(desc.deploymentId());
-                    req.startCacheConfiguration(ccfg);
+
+                    req.startCacheConfiguration(descCfg);
                 }
             }
             else {
@@ -1583,9 +1604,13 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                 return new GridFinishedFuture<>(new 
IgniteCacheExistsException("Failed to start near cache " +
                     "(a cache with the given name is not started): " + 
cacheName));
 
-            if (ccfg.getNodeFilter().apply(ctx.discovery().localNode()))
-                return new GridFinishedFuture<>(new 
IgniteCheckedException("Failed to start near cache " +
-                    "(local node is an affinity node for cache): " + 
cacheName));
+            if (ccfg.getNodeFilter().apply(ctx.discovery().localNode())) {
+                if (ccfg.getNearConfiguration() != null)
+                    return new GridFinishedFuture<>();
+                else
+                    return new GridFinishedFuture<>(new 
IgniteCheckedException("Failed to start near cache " +
+                        "(local node is an affinity node for cache): " + 
cacheName));
+            }
 
             req.deploymentId(desc.deploymentId());
             req.startCacheConfiguration(ccfg);
@@ -1602,7 +1627,9 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @return Future that will be completed when cache is stopped.
      */
     public IgniteInternalFuture<?> dynamicStopCache(String cacheName) {
-        return F.first(initiateCacheChanges(F.asList(new 
DynamicCacheChangeRequest(cacheName))));
+        DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, 
ctx.localNodeId());
+
+        return F.first(initiateCacheChanges(F.asList(t)));
     }
 
     /**
@@ -1618,7 +1645,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             DynamicCacheStartFuture fut = new 
DynamicCacheStartFuture(req.cacheName(), req.deploymentId());
 
             try {
-                if (req.isStop()) {
+                if (req.stop()) {
                     DynamicCacheDescriptor desc = 
registeredCaches.get(maskNull(req.cacheName()));
 
                     if (desc == null)
@@ -1642,7 +1669,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                     maskNull(req.cacheName()), fut);
 
                 if (old != null) {
-                    if (req.isStart() && !req.clientStartOnly()) {
+                    if (req.start() && !req.clientStartOnly()) {
                         fut.onDone(new IgniteCacheExistsException("Failed to 
start cache " +
                             "(a cache with the same name is already being 
started or stopped): " + req.cacheName()));
                     }
@@ -1666,7 +1693,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             }
         }
 
-        ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sendReqs));
+        if (!sendReqs.isEmpty())
+            ctx.discovery().sendCustomEvent(new 
DynamicCacheChangeBatch(sendReqs));
 
         return res;
     }
@@ -1680,7 +1708,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         for (DynamicCacheChangeRequest req : batch.requests()) {
             DynamicCacheDescriptor desc = 
registeredCaches.get(maskNull(req.cacheName()));
 
-            if (req.isStart()) {
+            if (req.start()) {
                 CacheConfiguration ccfg = req.startCacheConfiguration();
 
                 DynamicCacheStartFuture startFut = 
(DynamicCacheStartFuture)pendingFuts.get(

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6cb8b4db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index bab9f57..b06f2de 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -281,7 +281,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     public boolean isCacheAdded(int cacheId) {
         if (!F.isEmpty(reqs)) {
             for (DynamicCacheChangeRequest req : reqs) {
-                if (req.isStart() && !req.clientStartOnly()) {
+                if (req.start() && !req.clientStartOnly()) {
                     if (CU.cacheId(req.cacheName()) == cacheId)
                         return true;
                 }
@@ -584,7 +584,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
         if (!F.isEmpty(reqs)) {
             for (DynamicCacheChangeRequest req : reqs) {
                 if (cacheId == CU.cacheId(req.cacheName())) {
-                    stopping = req.isStop();
+                    stopping = req.stop();
 
                     break;
                 }
@@ -600,7 +600,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     private void startCaches() throws IgniteCheckedException {
         cctx.cache().prepareCachesStart(F.view(reqs, new 
IgnitePredicate<DynamicCacheChangeRequest>() {
             @Override public boolean apply(DynamicCacheChangeRequest req) {
-                return req.isStart();
+                return req.start();
             }
         }), exchId.topologyVersion());
     }
@@ -610,7 +610,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
      */
     private void blockGateways() {
         for (DynamicCacheChangeRequest req : reqs) {
-            if (req.isStop())
+            if (req.stop())
                 cctx.cache().blockGateway(req);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6cb8b4db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 3f82cbd..cfb0e9f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.lang.*;
 import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.junits.common.*;
 
+import javax.cache.*;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -257,7 +258,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
 
         ccfg.setName(DYNAMIC_CACHE_NAME);
 
-        kernal.createCache(ccfg, null);
+        kernal.createCache(ccfg);
 
         for (int g = 0; g < nodeCount(); g++) {
             IgniteKernal kernal0 = (IgniteKernal) grid(g);
@@ -319,7 +320,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
 
         ccfg.setName(DYNAMIC_CACHE_NAME);
 
-        kernal.createCache(ccfg, null);
+        kernal.createCache(ccfg);
 
         info(">>>>>>> Deployed dynamic cache");
 
@@ -382,7 +383,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
 
             ccfg.setNodeFilter(NODE_FILTER);
 
-            kernal.createCache(ccfg, null);
+            kernal.createCache(ccfg);
 
             startGrid(nodeCount() + 1);
 
@@ -435,7 +436,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testFailWhenConfiguredCacheExists() throws Exception {
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
+        GridTestUtils.assertThrowsInherited(log, new Callable<Object>() {
             @Override
             public Object call() throws Exception {
                 final IgniteKernal kernal = (IgniteKernal) grid(0);
@@ -448,9 +449,9 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
 
                 ccfg.setNodeFilter(NODE_FILTER);
 
-                return kernal.createCache(ccfg, null);
+                return kernal.createCache(ccfg);
             }
-        }, IgniteCheckedException.class, null);
+        }, IgniteCacheExistsException.class, null);
     }
 
     /**
@@ -471,7 +472,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
 
             ccfg.setNodeFilter(NODE_FILTER);
 
-            kernal.createCache(ccfg, null);
+            kernal.createCache(ccfg);
 
             GridTestUtils.assertThrows(log, new Callable<Object>() {
                 @Override
@@ -516,7 +517,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
 
             final IgniteKernal started = (IgniteKernal) grid(nodeCount());
 
-            started.createCache(ccfg, null);
+            started.createCache(ccfg);
 
             GridCacheAdapter<Object, Object> cache = 
started.internalCache(DYNAMIC_CACHE_NAME);
 
@@ -681,4 +682,64 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
             stopGrid(nodeCount());
         }
     }
+
+    /** {@inheritDoc} */
+    public void testGetOrCreate() throws Exception {
+        try {
+            final CacheConfiguration cfg = new CacheConfiguration();
+
+            cfg.setName(DYNAMIC_CACHE_NAME);
+            cfg.setNodeFilter(NODE_FILTER);
+
+            grid(0).getOrCreateCache(cfg);
+            grid(0).getOrCreateCache(cfg);
+
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    return grid(0).getOrCreateCache(cfg, new 
NearCacheConfiguration());
+                }
+            }, CacheException.class, null);
+
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    return grid(0).getOrCreateCache(DYNAMIC_CACHE_NAME, new 
NearCacheConfiguration());
+                }
+            }, CacheException.class, null);
+
+            testAttribute = false;
+
+            startGrid(nodeCount());
+            startGrid(nodeCount() + 1);
+
+            try {
+                IgniteEx nearGrid = grid(nodeCount());
+
+                nearGrid.getOrCreateCache(cfg, new NearCacheConfiguration());
+                nearGrid.getOrCreateCache(DYNAMIC_CACHE_NAME, new 
NearCacheConfiguration());
+
+                GridCacheContext<Object, Object> nCtx = 
((IgniteKernal)nearGrid)
+                        .internalCache(DYNAMIC_CACHE_NAME).context();
+
+                assertTrue(nCtx.isNear());
+                assertFalse(nCtx.affinityNode());
+
+                IgniteEx clientGrid = grid(nodeCount() + 1);
+
+                clientGrid.getOrCreateCache(cfg);
+                clientGrid.getOrCreateCache(cfg);
+
+                GridCacheContext<Object, Object> cCtx = 
((IgniteKernal)clientGrid)
+                        .internalCache(DYNAMIC_CACHE_NAME).context();
+
+                assertFalse(cCtx.isNear());
+                assertFalse(cCtx.affinityNode());
+            } finally {
+                stopGrid(nodeCount() + 1);
+                stopGrid(nodeCount());
+            }
+        }
+        finally {
+            grid(0).destroyCache(DYNAMIC_CACHE_NAME);
+        }
+    }
 }

Reply via email to