IGNITE-45 - Client mode fixes.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9373ed3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9373ed3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9373ed3c

Branch: refs/heads/ignite-421
Commit: 9373ed3c3379e50a3e7898688c1ec424f1da20b6
Parents: 52e4a96
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Tue Mar 10 22:36:54 2015 -0700
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Tue Mar 10 22:36:54 2015 -0700

----------------------------------------------------------------------
 .../cache/DynamicCacheChangeRequest.java        | 13 +++-
 .../GridCachePartitionExchangeManager.java      |  2 +
 .../processors/cache/GridCacheProcessor.java    | 77 +++++++++++++++-----
 .../GridDhtPartitionsExchangeFuture.java        | 16 +++-
 .../cache/IgniteDynamicCacheStartSelfTest.java  | 51 ++++++++++++-
 5 files changed, 132 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9373ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index f56a700..4c061f1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -71,10 +71,12 @@ public class DynamicCacheChangeRequest implements 
Serializable {
      * Constructor creates near cache start request.
      *
      * @param clientNodeId Client node ID.
+     * @param startCfg Start cache configuration.
      * @param nearCacheCfg Near cache configuration.
      */
-    public DynamicCacheChangeRequest(UUID clientNodeId, NearCacheConfiguration 
nearCacheCfg) {
+    public DynamicCacheChangeRequest(UUID clientNodeId, CacheConfiguration 
startCfg, NearCacheConfiguration nearCacheCfg) {
         this.clientNodeId = clientNodeId;
+        this.startCfg = startCfg;
         this.nearCacheCfg = nearCacheCfg;
     }
 
@@ -96,7 +98,7 @@ public class DynamicCacheChangeRequest implements 
Serializable {
      * @return {@code True} if this is a start request.
      */
     public boolean isStart() {
-        return startCfg != null;
+        return clientNodeId == null && startCfg != null;
     }
 
     /**
@@ -107,6 +109,13 @@ public class DynamicCacheChangeRequest implements 
Serializable {
     }
 
     /**
+     * @return {@code True} if this is a stop request.
+     */
+    public boolean isStop() {
+        return clientNodeId == null && startCfg == null;
+    }
+
+    /**
      * @return Cache name.
      */
     public String cacheName() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9373ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f7f1f9d..571c1a7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -99,6 +99,8 @@ public class GridCachePartitionExchangeManager<K, V> extends 
GridCacheSharedMana
      */
     private ExchangeFutureSet exchFuts = new ExchangeFutureSet();
 
+    public static volatile boolean stop = false;
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new 
GridLocalEventListener() {
         @Override public void onEvent(Event evt) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9373ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e689581..5d94d2f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1244,31 +1244,63 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @param req Request to check.
      * @return {@code True} if change request was registered to apply.
      */
+    @SuppressWarnings("IfMayBeConditional")
     public boolean dynamicCacheRegistered(DynamicCacheChangeRequest req) {
         DynamicCacheDescriptor desc = registeredCaches.get(req.cacheName());
 
-        return desc != null && desc.deploymentId().equals(req.deploymentId()) 
&& desc.cancelled() != req.isStart();
+        if (desc != null && desc.deploymentId().equals(req.deploymentId())) {
+            if (req.isStart() || req.isClientStart())
+                return !desc.cancelled();
+            else
+                return desc.cancelled();
+        }
+
+        return false;
     }
 
     /**
      * @param req Start request.
      */
     public void prepareCacheStart(DynamicCacheChangeRequest req) throws 
IgniteCheckedException {
-        assert req.isStart();
+        assert req.isStart() || req.isClientStart();
 
         IgnitePredicate nodeFilter = 
req.startCacheConfiguration().getNodeFilter();
 
-        if (nodeFilter.apply(ctx.discovery().localNode())) {
-            GridCacheContext cacheCtx = 
createCache(req.startCacheConfiguration());
+        ClusterNode locNode = ctx.discovery().localNode();
 
-            cacheCtx.dynamicDeploymentId(req.deploymentId());
+        if (req.isStart()) {
+            if (nodeFilter.apply(locNode)) {
+                GridCacheContext cacheCtx = 
createCache(req.startCacheConfiguration());
 
-            sharedCtx.addCacheContext(cacheCtx);
+                cacheCtx.dynamicDeploymentId(req.deploymentId());
+
+                sharedCtx.addCacheContext(cacheCtx);
+
+                startCache(cacheCtx.cache());
+                onKernalStart(cacheCtx.cache());
+
+                caches.put(cacheCtx.name(), cacheCtx.cache());
+            }
+        }
+        else if (req.isClientStart()) {
+            if (req.clientNodeId().equals(locNode.id())) {
+                if (nodeFilter.apply(locNode)) {
+                    U.warn(log, "Requested to start client cache on affinity 
node (will ignore): " + req);
+
+                    return;
+                }
+
+                GridCacheContext cacheCtx = 
createCache(req.startCacheConfiguration());
 
-            startCache(cacheCtx.cache());
-            onKernalStart(cacheCtx.cache());
+                cacheCtx.dynamicDeploymentId(req.deploymentId());
 
-            caches.put(cacheCtx.name(), cacheCtx.cache());
+                sharedCtx.addCacheContext(cacheCtx);
+
+                startCache(cacheCtx.cache());
+                onKernalStart(cacheCtx.cache());
+
+                caches.put(cacheCtx.name(), cacheCtx.cache());
+            }
         }
     }
 
@@ -1276,7 +1308,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @param req Stop request.
      */
     public void prepareCacheStop(DynamicCacheChangeRequest req) {
-        assert !req.isStart();
+        assert req.isStop();
 
         // Break the proxy before exchange future is done.
         IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(req.cacheName());
@@ -1306,7 +1338,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      */
     @SuppressWarnings("unchecked")
     public void onExchangeDone(DynamicCacheChangeRequest req) {
-        if (req.isStart()) {
+        if (req.isStart() || req.isClientStart()) {
             GridCacheAdapter<?, ?> cache = caches.get(req.cacheName());
 
             if (cache != null)
@@ -1474,19 +1506,19 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                     continue;
 
                 if (req.isStart()) {
-                    if (caches.containsKey(req.cacheName())) {
-                        fut.onDone(new GridFinishedFutureEx<>(new 
IgniteCheckedException("Failed to start cache " +
-                            "(a cache with the same name is already started): 
" + req.cacheName())));
+                    if (registeredCaches.containsKey(req.cacheName())) {
+                        fut.onDone(new IgniteCheckedException("Failed to start 
cache " +
+                            "(a cache with the same name is already started): 
" + req.cacheName()));
                     }
                 }
-                else {
-                    GridCacheAdapter<?, ?> cache = caches.get(req.cacheName());
+                else if (!req.isClientStart()) {
+                    DynamicCacheDescriptor desc = 
registeredCaches.get(req.cacheName());
 
-                    if (cache == null)
+                    if (desc == null)
                         // No-op.
                         fut.onDone();
                     else {
-                        IgniteUuid dynamicDeploymentId = 
cache.context().dynamicDeploymentId();
+                        IgniteUuid dynamicDeploymentId = desc.deploymentId();
 
                         assert dynamicDeploymentId != null;
 
@@ -2032,10 +2064,15 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             IgniteCache<K,V> cache = (IgniteCache<K, 
V>)jCacheProxies.get(name);
 
             if (cache == null) {
-                if (!registeredCaches.containsKey(name))
+                DynamicCacheDescriptor desc = registeredCaches.get(name);
+
+                if (desc == null || desc.cancelled())
                     throw new IllegalArgumentException("Cache is not started: 
" + name);
 
-                DynamicCacheChangeRequest req = new 
DynamicCacheChangeRequest(ctx.localNodeId(), null);
+                DynamicCacheChangeRequest req = new 
DynamicCacheChangeRequest(ctx.localNodeId(),
+                    desc.cacheConfiguration(), null);
+
+                req.deploymentId(desc.deploymentId());
 
                 F.first(initiateCacheChanges(F.asList(req))).get();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9373ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e8a47ef..234538a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -427,6 +427,8 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends 
GridFutureAdapter<Aff
         assert oldestNode.get() != null;
 
         if (init.compareAndSet(false, true)) {
+            U.debug(log, "Initializing exchange future: " + reqs);
+
             if (isDone())
                 return;
 
@@ -578,7 +580,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends 
GridFutureAdapter<Aff
      */
     private void startCaches() throws IgniteCheckedException {
         for (DynamicCacheChangeRequest req : reqs) {
-            if (req.isStart())
+            if (req.isStart() || req.isClientStart())
                 ctx.cache().prepareCacheStart(req);
         }
     }
@@ -588,7 +590,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends 
GridFutureAdapter<Aff
      */
     private void stopCaches() {
         for (DynamicCacheChangeRequest req : reqs) {
-            if (!req.isStart())
+            if (req.isStop())
                 ctx.cache().prepareCacheStop(req);
         }
     }
@@ -689,8 +691,14 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends 
GridFutureAdapter<Aff
 
             if (!F.isEmpty(reqs)) {
                 for (DynamicCacheChangeRequest req : reqs) {
-                    if (req.isStart() && F.eq(cacheCtx.name(), 
req.cacheName()))
-                        cacheCtx.preloader().onInitialExchangeComplete(err);
+                    if (F.eq(cacheCtx.name(), req.cacheName())) {
+                        if (req.isStart())
+                            
cacheCtx.preloader().onInitialExchangeComplete(err);
+                        else if (req.isClientStart()) {
+                            if (req.clientNodeId().equals(ctx.localNodeId()))
+                                
cacheCtx.preloader().onInitialExchangeComplete(err);
+                        }
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9373ed3c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 7aa82f9..7e311cc 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -120,6 +120,8 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
             try {
                 fut.get();
 
+                info("Succeeded: " + System.identityHashCode(fut));
+
                 succeeded++;
             }
             catch (IgniteCheckedException e) {
@@ -179,6 +181,8 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
             try {
                 fut.get();
 
+                info("Succeeded: " + System.identityHashCode(fut));
+
                 succeeded++;
             }
             catch (IgniteCheckedException e) {
@@ -389,7 +393,7 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
                 else
                     GridTestUtils.assertThrows(log, new Callable<Object>() {
                         @Override public Object call() throws Exception {
-                            return kernal0.jcache(DYNAMIC_CACHE_NAME);
+                            return kernal0.cache(DYNAMIC_CACHE_NAME);
                         }
                     }, IllegalArgumentException.class, null);
             }
@@ -424,4 +428,49 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
             }
         }, IgniteCheckedException.class, null);
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void _testClientCache() throws Exception {
+        try {
+            testAttribute = false;
+
+            startGrid(nodeCount());
+
+            final IgniteKernal kernal = (IgniteKernal)grid(0);
+
+            CacheConfiguration ccfg = new CacheConfiguration();
+            
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+
+            ccfg.setName(DYNAMIC_CACHE_NAME);
+
+            ccfg.setNodeFilter(NODE_FILTER);
+
+            kernal.context().cache().dynamicStartCache(ccfg).get();
+
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    IgniteKernal ignite = (IgniteKernal)grid(nodeCount());
+
+                    return ignite.cache(DYNAMIC_CACHE_NAME);
+                }
+            }, IllegalArgumentException.class, null);
+
+            GridCachePartitionExchangeManager.stop = true;
+
+            // Should obtain client cache on new node.
+            IgniteCache<Object, Object> clientCache = 
ignite(nodeCount()).jcache(DYNAMIC_CACHE_NAME);
+
+            clientCache.put("1", "1");
+
+            for (int g = 0; g < nodeCount() + 1; g++)
+                assertEquals("1", 
ignite(g).jcache(DYNAMIC_CACHE_NAME).get("1"));
+
+            
kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME).get();
+        }
+        finally {
+            stopGrid(nodeCount());
+        }
+    }
 }

Reply via email to