Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 f4e9c473a -> 60d63a61a
IGNITE-45 - Fixed getOrCreate. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/60d63a61 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/60d63a61 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/60d63a61 Branch: refs/heads/ignite-45 Commit: 60d63a61a33e7c1dbe4e1283793ba1dd11a64955 Parents: f4e9c47 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Sun Mar 22 12:44:27 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Sun Mar 22 12:44:27 2015 -0700 ---------------------------------------------------------------------- .../cache/DynamicCacheChangeRequest.java | 17 ++ .../GridCachePartitionExchangeManager.java | 5 +- .../processors/cache/GridCacheProcessor.java | 47 +++-- .../GridDhtPartitionsExchangeFuture.java | 3 + .../ignite/internal/util/IgniteUtils.java | 3 - .../cache/IgniteDynamicCacheStartSelfTest.java | 170 +++++++++++++++++++ 6 files changed, 224 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60d63a61/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index a539e1d..830078c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -54,6 +54,9 @@ public class DynamicCacheChangeRequest implements Serializable { /** Stop flag. */ private boolean stop; + /** Fail if exists flag. */ + private boolean failIfExists; + /** * Constructor creates cache stop request. * @@ -169,6 +172,20 @@ public class DynamicCacheChangeRequest implements Serializable { this.clientStartOnly = clientStartOnly; } + /** + * @return Fail if exists flag. + */ + public boolean failIfExists() { + return failIfExists; + } + + /** + * @param failIfExists Fail if exists flag. + */ + public void failIfExists(boolean failIfExists) { + this.failIfExists = failIfExists; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheChangeRequest.class, this, "cacheName", cacheName()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60d63a61/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 201801f..e44d666 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -151,6 +151,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (DynamicCacheChangeRequest req : batch.requests()) { if (cctx.cache().dynamicCacheRegistered(req)) valid.add(req); + else + cctx.cache().completeStartFuture(req); } if (!F.isEmpty(valid)) { @@ -672,9 +674,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (fut.onAdded()) { exchWorker.addFuture(fut); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) - cacheCtx.preloader().onExchangeFutureAdded(); - return true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60d63a61/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 543e8c7..5bbba49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1248,11 +1248,17 @@ public class GridCacheProcessor extends GridProcessorAdapter { public boolean dynamicCacheRegistered(DynamicCacheChangeRequest req) { DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); - if (desc != null && desc.deploymentId().equals(req.deploymentId())) { - if (req.start()) - return !desc.cancelled(); - else - return desc.cancelled(); + if (desc != null) { + if (desc.deploymentId().equals(req.deploymentId())) { + if (req.start()) + return !desc.cancelled(); + else + return desc.cancelled(); + } + + // If client requested cache start + if (req.initiatingNodeId() != null) + return true; } return false; @@ -1418,19 +1424,26 @@ public class GridCacheProcessor extends GridProcessorAdapter { registeredCaches.remove(masked, desc); } - DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(masked); - - assert req.deploymentId() != null; - assert fut == null || fut.deploymentId != null; - - if (fut != null && fut.deploymentId().equals(req.deploymentId()) && - F.eq(req.initiatingNodeId(), ctx.localNodeId())) - fut.onDone(); + completeStartFuture(req); } } } /** + * @param req Request to complete future for. + */ + public void completeStartFuture(DynamicCacheChangeRequest req) { + DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName())); + + assert req.deploymentId() != null; + assert fut == null || fut.deploymentId != null; + + if (fut != null && fut.deploymentId().equals(req.deploymentId()) && + F.eq(req.initiatingNodeId(), ctx.localNodeId())) + fut.onDone(); + } + + /** * Creates shared context. * * @param kernalCtx Kernal context. @@ -1559,6 +1572,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId()); + req.failIfExists(failIfExists); + if (ccfg != null) { if (desc != null && !desc.cancelled()) { if (failIfExists) @@ -1727,7 +1742,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { // Check if cache with the same name was concurrently started form different node. if (desc != null) { - if (!req.clientStartOnly()) { + if (!req.clientStartOnly() && req.failIfExists()) { // If local node initiated start, fail the start future. if (startFut != null && startFut.deploymentId().equals(req.deploymentId())) { startFut.onDone(new IgniteCacheExistsException("Failed to start cache " + @@ -1736,6 +1751,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { return; } + + req.clientStartOnly(true); } else { if (req.clientStartOnly()) { @@ -1748,7 +1765,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } - if (!req.clientStartOnly()) { + if (!req.clientStartOnly() && desc == null) { DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ccfg, req.deploymentId()); DynamicCacheDescriptor old = registeredCaches.put(maskNull(ccfg.getName()), startDesc); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60d63a61/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index b06f2de..8d21bc3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -434,6 +434,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT startCaches(); + for (GridCacheContext cacheCtx : cctx.cacheContexts()) + cacheCtx.preloader().onExchangeFutureAdded(); + assert discoEvt != null; assert exchId.nodeId().equals(discoEvt.eventNode().id()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60d63a61/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index af327ce..1e5c3a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.util; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; @@ -7370,8 +7369,6 @@ public abstract class IgniteUtils { try { for (Class<?> c = cls != null ? cls : obj.getClass(); cls != Object.class; cls = cls.getSuperclass()) { - Method[] mtds = c.getDeclaredMethods(); - Method mtd = null; for (Method declaredMtd : c.getDeclaredMethods()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60d63a61/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index 0d30511..3caadb0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.testframework.junits.common.*; import javax.cache.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; /** * Test for dynamic cache start. @@ -745,4 +746,173 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { grid(0).destroyCache(DYNAMIC_CACHE_NAME); } } + + /** + * @throws Exception If failed. + */ + public void testGetOrCreateMultiNode() throws Exception { + try { + final AtomicInteger cnt = new AtomicInteger(); + final AtomicReference<Throwable> err = new AtomicReference<>(); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + int idx = cnt.getAndIncrement(); + + try { + CacheConfiguration cfg = new CacheConfiguration(DYNAMIC_CACHE_NAME); + + ignite(idx).getOrCreateCache(cfg); + } + catch (Exception e) { + err.compareAndSet(null, e); + } + + return null; + } + }, nodeCount(), "starter"); + + assertNull(err.get()); + + for (int i = 0; i < nodeCount(); i++) { + GridCacheContext<Object, Object> ctx = ((IgniteKernal) ignite(i)).internalCache(DYNAMIC_CACHE_NAME) + .context(); + + assertTrue(ctx.affinityNode()); + assertFalse(ctx.isNear()); + } + + lightCheckDynamicCache(); + } + finally { + ignite(0).destroyCache(DYNAMIC_CACHE_NAME); + } + } + + /** + * @throws Exception If failed. + */ + public void testGetOrCreateNearOnlyMultiNode() throws Exception { + checkGetOrCreateNear(true); + } + + /** + * @throws Exception If failed. + */ + public void testGetOrCreateNearMultiNode() throws Exception { + checkGetOrCreateNear(false); + } + + /** + * @throws Exception If failed. + */ + public void checkGetOrCreateNear(final boolean nearOnly) throws Exception { + try { + final AtomicInteger cnt = new AtomicInteger(nodeCount()); + final AtomicReference<Throwable> err = new AtomicReference<>(); + + final int clientCnt = 2; + + try { + testAttribute = false; + + for (int i = 0; i < clientCnt; i++) + startGrid(nodeCount() + i); + + cnt.set(nodeCount()); + + final CacheConfiguration<Object, Object> cacheCfg = new CacheConfiguration<>(DYNAMIC_CACHE_NAME); + cacheCfg.setNodeFilter(NODE_FILTER); + + if (nearOnly) + ignite(0).createCache(cacheCfg); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + int idx = cnt.getAndIncrement(); + + try { + if (nearOnly) + ignite(idx).getOrCreateCache(DYNAMIC_CACHE_NAME, new NearCacheConfiguration<>()); + else + ignite(idx).getOrCreateCache(cacheCfg, new NearCacheConfiguration<>()); + } + catch (Exception ex) { + err.compareAndSet(null, ex); + } + + return null; + } + }, clientCnt, "starter"); + + assertNull(err.get()); + + for (int i = 0; i < nodeCount(); i++) { + GridCacheContext<Object, Object> ctx = ((IgniteKernal) ignite(i)).internalCache(DYNAMIC_CACHE_NAME) + .context(); + + assertTrue(ctx.affinityNode()); + assertFalse(ctx.isNear()); + } + + for (int i = 0; i < clientCnt; i++) { + GridCacheContext<Object, Object> ctx = ((IgniteKernal) ignite(nodeCount() + i)) + .internalCache(DYNAMIC_CACHE_NAME).context(); + + assertFalse(ctx.affinityNode()); + assertTrue("Cache is not near for index: " + (nodeCount() + i), ctx.isNear()); + } + + lightCheckDynamicCache(); + } + finally { + for (int i = 0; i < clientCnt; i++) + stopGrid(nodeCount() + i); + } + } + finally { + ignite(0).destroyCache(DYNAMIC_CACHE_NAME); + } + } + + /** + * @throws Exception If failed. + */ + private void lightCheckDynamicCache() throws Exception { + int nodes = F.size(G.allGrids()); + + for (int i = 0; i < nodes; i++) { + IgniteCache<Object, Object> jcache = ignite(i).jcache(DYNAMIC_CACHE_NAME); + + for (int k = 0; k < 20; k++) { + int key = i + k * nodes; + + jcache.put(key, key); + } + } + + for (int i = 0; i < nodes; i++) { + IgniteCache<Object, Object> jcache = ignite(i).jcache(DYNAMIC_CACHE_NAME); + + for (int k = 0; k < 20 * nodes; k++) + assertEquals(k, jcache.get(k)); + } + + for (int i = 0; i < nodes; i++) { + IgniteCache<Object, Object> jcache = ignite(i).jcache(DYNAMIC_CACHE_NAME); + + for (int k = 0; k < 20; k++) { + int key = i + k * nodes; + + assertEquals(key, jcache.getAndRemove(key)); + } + } + + for (int i = 0; i < nodes; i++) { + IgniteCache<Object, Object> jcache = ignite(i).jcache(DYNAMIC_CACHE_NAME); + + for (int k = 0; k < 20 * nodes; k++) + assertNull(jcache.get(k)); + } + } }