Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-45 f4e9c473a -> 60d63a61a


IGNITE-45 - Fixed getOrCreate.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/60d63a61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/60d63a61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/60d63a61

Branch: refs/heads/ignite-45
Commit: 60d63a61a33e7c1dbe4e1283793ba1dd11a64955
Parents: f4e9c47
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Sun Mar 22 12:44:27 2015 -0700
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Sun Mar 22 12:44:27 2015 -0700

----------------------------------------------------------------------
 .../cache/DynamicCacheChangeRequest.java        |  17 ++
 .../GridCachePartitionExchangeManager.java      |   5 +-
 .../processors/cache/GridCacheProcessor.java    |  47 +++--
 .../GridDhtPartitionsExchangeFuture.java        |   3 +
 .../ignite/internal/util/IgniteUtils.java       |   3 -
 .../cache/IgniteDynamicCacheStartSelfTest.java  | 170 +++++++++++++++++++
 6 files changed, 224 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60d63a61/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index a539e1d..830078c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -54,6 +54,9 @@ public class DynamicCacheChangeRequest implements 
Serializable {
     /** Stop flag. */
     private boolean stop;
 
+    /** Fail if exists flag. */
+    private boolean failIfExists;
+
     /**
      * Constructor creates cache stop request.
      *
@@ -169,6 +172,20 @@ public class DynamicCacheChangeRequest implements 
Serializable {
         this.clientStartOnly = clientStartOnly;
     }
 
+    /**
+     * @return Fail if exists flag.
+     */
+    public boolean failIfExists() {
+        return failIfExists;
+    }
+
+    /**
+     * @param failIfExists Fail if exists flag.
+     */
+    public void failIfExists(boolean failIfExists) {
+        this.failIfExists = failIfExists;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DynamicCacheChangeRequest.class, this, "cacheName", 
cacheName());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60d63a61/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 201801f..e44d666 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -151,6 +151,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                         for (DynamicCacheChangeRequest req : batch.requests()) 
{
                             if (cctx.cache().dynamicCacheRegistered(req))
                                 valid.add(req);
+                            else
+                                cctx.cache().completeStartFuture(req);
                         }
 
                         if (!F.isEmpty(valid)) {
@@ -672,9 +674,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         if (fut.onAdded()) {
             exchWorker.addFuture(fut);
 
-            for (GridCacheContext cacheCtx : cctx.cacheContexts())
-                cacheCtx.preloader().onExchangeFutureAdded();
-
             return true;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60d63a61/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 543e8c7..5bbba49 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1248,11 +1248,17 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     public boolean dynamicCacheRegistered(DynamicCacheChangeRequest req) {
         DynamicCacheDescriptor desc = 
registeredCaches.get(maskNull(req.cacheName()));
 
-        if (desc != null && desc.deploymentId().equals(req.deploymentId())) {
-            if (req.start())
-                return !desc.cancelled();
-            else
-                return desc.cancelled();
+        if (desc != null) {
+            if (desc.deploymentId().equals(req.deploymentId())) {
+                if (req.start())
+                    return !desc.cancelled();
+                else
+                    return desc.cancelled();
+            }
+
+            // If client requested cache start
+            if (req.initiatingNodeId() != null)
+                return true;
         }
 
         return false;
@@ -1418,19 +1424,26 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                         registeredCaches.remove(masked, desc);
                 }
 
-                DynamicCacheStartFuture fut = 
(DynamicCacheStartFuture)pendingFuts.get(masked);
-
-                assert req.deploymentId() != null;
-                assert fut == null || fut.deploymentId != null;
-
-                if (fut != null && 
fut.deploymentId().equals(req.deploymentId()) &&
-                    F.eq(req.initiatingNodeId(), ctx.localNodeId()))
-                    fut.onDone();
+                completeStartFuture(req);
             }
         }
     }
 
     /**
+     * @param req Request to complete future for.
+     */
+    public void completeStartFuture(DynamicCacheChangeRequest req) {
+        DynamicCacheStartFuture fut = 
(DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName()));
+
+        assert req.deploymentId() != null;
+        assert fut == null || fut.deploymentId != null;
+
+        if (fut != null && fut.deploymentId().equals(req.deploymentId()) &&
+            F.eq(req.initiatingNodeId(), ctx.localNodeId()))
+            fut.onDone();
+    }
+
+    /**
      * Creates shared context.
      *
      * @param kernalCtx Kernal context.
@@ -1559,6 +1572,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
         DynamicCacheChangeRequest req = new 
DynamicCacheChangeRequest(cacheName, ctx.localNodeId());
 
+        req.failIfExists(failIfExists);
+
         if (ccfg != null) {
             if (desc != null && !desc.cancelled()) {
                 if (failIfExists)
@@ -1727,7 +1742,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
                 // Check if cache with the same name was concurrently started 
form different node.
                 if (desc != null) {
-                    if (!req.clientStartOnly()) {
+                    if (!req.clientStartOnly() && req.failIfExists()) {
                         // If local node initiated start, fail the start 
future.
                         if (startFut != null && 
startFut.deploymentId().equals(req.deploymentId())) {
                             startFut.onDone(new 
IgniteCacheExistsException("Failed to start cache " +
@@ -1736,6 +1751,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
                         return;
                     }
+
+                    req.clientStartOnly(true);
                 }
                 else {
                     if (req.clientStartOnly()) {
@@ -1748,7 +1765,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                     }
                 }
 
-                if (!req.clientStartOnly()) {
+                if (!req.clientStartOnly() && desc == null) {
                     DynamicCacheDescriptor startDesc = new 
DynamicCacheDescriptor(ccfg, req.deploymentId());
 
                     DynamicCacheDescriptor old = 
registeredCaches.put(maskNull(ccfg.getName()), startDesc);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60d63a61/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index b06f2de..8d21bc3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -434,6 +434,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
 
                 startCaches();
 
+                for (GridCacheContext cacheCtx : cctx.cacheContexts())
+                    cacheCtx.preloader().onExchangeFutureAdded();
+
                 assert discoEvt != null;
 
                 assert exchId.nodeId().equals(discoEvt.eventNode().id());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60d63a61/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index af327ce..1e5c3a8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.util;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.compute.*;
 import org.apache.ignite.configuration.*;
@@ -7370,8 +7369,6 @@ public abstract class IgniteUtils {
 
         try {
             for (Class<?> c = cls != null ? cls : obj.getClass(); cls != 
Object.class; cls = cls.getSuperclass()) {
-                Method[] mtds = c.getDeclaredMethods();
-
                 Method mtd = null;
 
                 for (Method declaredMtd : c.getDeclaredMethods()) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60d63a61/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 0d30511..3caadb0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.testframework.junits.common.*;
 import javax.cache.*;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
 
 /**
  * Test for dynamic cache start.
@@ -745,4 +746,173 @@ public class IgniteDynamicCacheStartSelfTest extends 
GridCommonAbstractTest {
             grid(0).destroyCache(DYNAMIC_CACHE_NAME);
         }
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetOrCreateMultiNode() throws Exception {
+        try {
+            final AtomicInteger cnt = new AtomicInteger();
+            final AtomicReference<Throwable> err = new AtomicReference<>();
+
+            GridTestUtils.runMultiThreaded(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    int idx = cnt.getAndIncrement();
+
+                    try {
+                        CacheConfiguration cfg = new 
CacheConfiguration(DYNAMIC_CACHE_NAME);
+
+                        ignite(idx).getOrCreateCache(cfg);
+                    }
+                    catch (Exception e) {
+                        err.compareAndSet(null, e);
+                    }
+
+                    return null;
+                }
+            }, nodeCount(), "starter");
+
+            assertNull(err.get());
+
+            for (int i = 0; i < nodeCount(); i++) {
+                GridCacheContext<Object, Object> ctx = ((IgniteKernal) 
ignite(i)).internalCache(DYNAMIC_CACHE_NAME)
+                    .context();
+
+                assertTrue(ctx.affinityNode());
+                assertFalse(ctx.isNear());
+            }
+
+            lightCheckDynamicCache();
+        }
+        finally {
+            ignite(0).destroyCache(DYNAMIC_CACHE_NAME);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetOrCreateNearOnlyMultiNode() throws Exception {
+        checkGetOrCreateNear(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetOrCreateNearMultiNode() throws Exception {
+        checkGetOrCreateNear(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void checkGetOrCreateNear(final boolean nearOnly) throws Exception {
+        try {
+            final AtomicInteger cnt = new AtomicInteger(nodeCount());
+            final AtomicReference<Throwable> err = new AtomicReference<>();
+
+            final int clientCnt = 2;
+
+            try {
+                testAttribute = false;
+
+                for (int i = 0; i < clientCnt; i++)
+                    startGrid(nodeCount() + i);
+
+                cnt.set(nodeCount());
+
+                final CacheConfiguration<Object, Object> cacheCfg = new 
CacheConfiguration<>(DYNAMIC_CACHE_NAME);
+                cacheCfg.setNodeFilter(NODE_FILTER);
+
+                if (nearOnly)
+                    ignite(0).createCache(cacheCfg);
+
+                GridTestUtils.runMultiThreaded(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        int idx = cnt.getAndIncrement();
+
+                        try {
+                            if (nearOnly)
+                                
ignite(idx).getOrCreateCache(DYNAMIC_CACHE_NAME, new 
NearCacheConfiguration<>());
+                            else
+                                ignite(idx).getOrCreateCache(cacheCfg, new 
NearCacheConfiguration<>());
+                        }
+                        catch (Exception ex) {
+                            err.compareAndSet(null, ex);
+                        }
+
+                        return null;
+                    }
+                }, clientCnt, "starter");
+
+                assertNull(err.get());
+
+                for (int i = 0; i < nodeCount(); i++) {
+                    GridCacheContext<Object, Object> ctx = ((IgniteKernal) 
ignite(i)).internalCache(DYNAMIC_CACHE_NAME)
+                        .context();
+
+                    assertTrue(ctx.affinityNode());
+                    assertFalse(ctx.isNear());
+                }
+
+                for (int i = 0; i < clientCnt; i++) {
+                    GridCacheContext<Object, Object> ctx = ((IgniteKernal) 
ignite(nodeCount() + i))
+                        .internalCache(DYNAMIC_CACHE_NAME).context();
+
+                    assertFalse(ctx.affinityNode());
+                    assertTrue("Cache is not near for index: " + (nodeCount() 
+ i), ctx.isNear());
+                }
+
+                lightCheckDynamicCache();
+            }
+            finally {
+                for (int i = 0; i < clientCnt; i++)
+                    stopGrid(nodeCount() + i);
+            }
+        }
+        finally {
+            ignite(0).destroyCache(DYNAMIC_CACHE_NAME);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void lightCheckDynamicCache() throws Exception {
+        int nodes = F.size(G.allGrids());
+
+        for (int i = 0; i < nodes; i++) {
+            IgniteCache<Object, Object> jcache = 
ignite(i).jcache(DYNAMIC_CACHE_NAME);
+
+            for (int k = 0; k < 20; k++) {
+                int key = i + k * nodes;
+
+                jcache.put(key, key);
+            }
+        }
+
+        for (int i = 0; i < nodes; i++) {
+            IgniteCache<Object, Object> jcache = 
ignite(i).jcache(DYNAMIC_CACHE_NAME);
+
+            for (int k = 0; k < 20 * nodes; k++)
+                assertEquals(k, jcache.get(k));
+        }
+
+        for (int i = 0; i < nodes; i++) {
+            IgniteCache<Object, Object> jcache = 
ignite(i).jcache(DYNAMIC_CACHE_NAME);
+
+            for (int k = 0; k < 20; k++) {
+                int key = i + k * nodes;
+
+                assertEquals(key, jcache.getAndRemove(key));
+            }
+        }
+
+        for (int i = 0; i < nodes; i++) {
+            IgniteCache<Object, Object> jcache = 
ignite(i).jcache(DYNAMIC_CACHE_NAME);
+
+            for (int k = 0; k < 20 * nodes; k++)
+                assertNull(jcache.get(k));
+        }
+    }
 }

Reply via email to