IGNITE-45 - Client mode fixes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9373ed3c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9373ed3c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9373ed3c Branch: refs/heads/ignite-421 Commit: 9373ed3c3379e50a3e7898688c1ec424f1da20b6 Parents: 52e4a96 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Tue Mar 10 22:36:54 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Tue Mar 10 22:36:54 2015 -0700 ---------------------------------------------------------------------- .../cache/DynamicCacheChangeRequest.java | 13 +++- .../GridCachePartitionExchangeManager.java | 2 + .../processors/cache/GridCacheProcessor.java | 77 +++++++++++++++----- .../GridDhtPartitionsExchangeFuture.java | 16 +++- .../cache/IgniteDynamicCacheStartSelfTest.java | 51 ++++++++++++- 5 files changed, 132 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9373ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index f56a700..4c061f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -71,10 +71,12 @@ public class DynamicCacheChangeRequest implements Serializable { * Constructor creates near cache start request. * * @param clientNodeId Client node ID. + * @param startCfg Start cache configuration. * @param nearCacheCfg Near cache configuration. */ - public DynamicCacheChangeRequest(UUID clientNodeId, NearCacheConfiguration nearCacheCfg) { + public DynamicCacheChangeRequest(UUID clientNodeId, CacheConfiguration startCfg, NearCacheConfiguration nearCacheCfg) { this.clientNodeId = clientNodeId; + this.startCfg = startCfg; this.nearCacheCfg = nearCacheCfg; } @@ -96,7 +98,7 @@ public class DynamicCacheChangeRequest implements Serializable { * @return {@code True} if this is a start request. */ public boolean isStart() { - return startCfg != null; + return clientNodeId == null && startCfg != null; } /** @@ -107,6 +109,13 @@ public class DynamicCacheChangeRequest implements Serializable { } /** + * @return {@code True} if this is a stop request. + */ + public boolean isStop() { + return clientNodeId == null && startCfg == null; + } + + /** * @return Cache name. */ public String cacheName() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9373ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index f7f1f9d..571c1a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -99,6 +99,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana */ private ExchangeFutureSet exchFuts = new ExchangeFutureSet(); + public static volatile boolean stop = false; + /** Discovery listener. */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9373ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index e689581..5d94d2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1244,31 +1244,63 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param req Request to check. * @return {@code True} if change request was registered to apply. */ + @SuppressWarnings("IfMayBeConditional") public boolean dynamicCacheRegistered(DynamicCacheChangeRequest req) { DynamicCacheDescriptor desc = registeredCaches.get(req.cacheName()); - return desc != null && desc.deploymentId().equals(req.deploymentId()) && desc.cancelled() != req.isStart(); + if (desc != null && desc.deploymentId().equals(req.deploymentId())) { + if (req.isStart() || req.isClientStart()) + return !desc.cancelled(); + else + return desc.cancelled(); + } + + return false; } /** * @param req Start request. */ public void prepareCacheStart(DynamicCacheChangeRequest req) throws IgniteCheckedException { - assert req.isStart(); + assert req.isStart() || req.isClientStart(); IgnitePredicate nodeFilter = req.startCacheConfiguration().getNodeFilter(); - if (nodeFilter.apply(ctx.discovery().localNode())) { - GridCacheContext cacheCtx = createCache(req.startCacheConfiguration()); + ClusterNode locNode = ctx.discovery().localNode(); - cacheCtx.dynamicDeploymentId(req.deploymentId()); + if (req.isStart()) { + if (nodeFilter.apply(locNode)) { + GridCacheContext cacheCtx = createCache(req.startCacheConfiguration()); - sharedCtx.addCacheContext(cacheCtx); + cacheCtx.dynamicDeploymentId(req.deploymentId()); + + sharedCtx.addCacheContext(cacheCtx); + + startCache(cacheCtx.cache()); + onKernalStart(cacheCtx.cache()); + + caches.put(cacheCtx.name(), cacheCtx.cache()); + } + } + else if (req.isClientStart()) { + if (req.clientNodeId().equals(locNode.id())) { + if (nodeFilter.apply(locNode)) { + U.warn(log, "Requested to start client cache on affinity node (will ignore): " + req); + + return; + } + + GridCacheContext cacheCtx = createCache(req.startCacheConfiguration()); - startCache(cacheCtx.cache()); - onKernalStart(cacheCtx.cache()); + cacheCtx.dynamicDeploymentId(req.deploymentId()); - caches.put(cacheCtx.name(), cacheCtx.cache()); + sharedCtx.addCacheContext(cacheCtx); + + startCache(cacheCtx.cache()); + onKernalStart(cacheCtx.cache()); + + caches.put(cacheCtx.name(), cacheCtx.cache()); + } } } @@ -1276,7 +1308,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param req Stop request. */ public void prepareCacheStop(DynamicCacheChangeRequest req) { - assert !req.isStart(); + assert req.isStop(); // Break the proxy before exchange future is done. IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(req.cacheName()); @@ -1306,7 +1338,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ @SuppressWarnings("unchecked") public void onExchangeDone(DynamicCacheChangeRequest req) { - if (req.isStart()) { + if (req.isStart() || req.isClientStart()) { GridCacheAdapter<?, ?> cache = caches.get(req.cacheName()); if (cache != null) @@ -1474,19 +1506,19 @@ public class GridCacheProcessor extends GridProcessorAdapter { continue; if (req.isStart()) { - if (caches.containsKey(req.cacheName())) { - fut.onDone(new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " + - "(a cache with the same name is already started): " + req.cacheName()))); + if (registeredCaches.containsKey(req.cacheName())) { + fut.onDone(new IgniteCheckedException("Failed to start cache " + + "(a cache with the same name is already started): " + req.cacheName())); } } - else { - GridCacheAdapter<?, ?> cache = caches.get(req.cacheName()); + else if (!req.isClientStart()) { + DynamicCacheDescriptor desc = registeredCaches.get(req.cacheName()); - if (cache == null) + if (desc == null) // No-op. fut.onDone(); else { - IgniteUuid dynamicDeploymentId = cache.context().dynamicDeploymentId(); + IgniteUuid dynamicDeploymentId = desc.deploymentId(); assert dynamicDeploymentId != null; @@ -2032,10 +2064,15 @@ public class GridCacheProcessor extends GridProcessorAdapter { IgniteCache<K,V> cache = (IgniteCache<K, V>)jCacheProxies.get(name); if (cache == null) { - if (!registeredCaches.containsKey(name)) + DynamicCacheDescriptor desc = registeredCaches.get(name); + + if (desc == null || desc.cancelled()) throw new IllegalArgumentException("Cache is not started: " + name); - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(ctx.localNodeId(), null); + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(ctx.localNodeId(), + desc.cacheConfiguration(), null); + + req.deploymentId(desc.deploymentId()); F.first(initiateCacheChanges(F.asList(req))).get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9373ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index e8a47ef..234538a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -427,6 +427,8 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff assert oldestNode.get() != null; if (init.compareAndSet(false, true)) { + U.debug(log, "Initializing exchange future: " + reqs); + if (isDone()) return; @@ -578,7 +580,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff */ private void startCaches() throws IgniteCheckedException { for (DynamicCacheChangeRequest req : reqs) { - if (req.isStart()) + if (req.isStart() || req.isClientStart()) ctx.cache().prepareCacheStart(req); } } @@ -588,7 +590,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff */ private void stopCaches() { for (DynamicCacheChangeRequest req : reqs) { - if (!req.isStart()) + if (req.isStop()) ctx.cache().prepareCacheStop(req); } } @@ -689,8 +691,14 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff if (!F.isEmpty(reqs)) { for (DynamicCacheChangeRequest req : reqs) { - if (req.isStart() && F.eq(cacheCtx.name(), req.cacheName())) - cacheCtx.preloader().onInitialExchangeComplete(err); + if (F.eq(cacheCtx.name(), req.cacheName())) { + if (req.isStart()) + cacheCtx.preloader().onInitialExchangeComplete(err); + else if (req.isClientStart()) { + if (req.clientNodeId().equals(ctx.localNodeId())) + cacheCtx.preloader().onInitialExchangeComplete(err); + } + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9373ed3c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index 7aa82f9..7e311cc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -120,6 +120,8 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { try { fut.get(); + info("Succeeded: " + System.identityHashCode(fut)); + succeeded++; } catch (IgniteCheckedException e) { @@ -179,6 +181,8 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { try { fut.get(); + info("Succeeded: " + System.identityHashCode(fut)); + succeeded++; } catch (IgniteCheckedException e) { @@ -389,7 +393,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { else GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { - return kernal0.jcache(DYNAMIC_CACHE_NAME); + return kernal0.cache(DYNAMIC_CACHE_NAME); } }, IllegalArgumentException.class, null); } @@ -424,4 +428,49 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { } }, IgniteCheckedException.class, null); } + + /** + * @throws Exception If failed. + */ + public void _testClientCache() throws Exception { + try { + testAttribute = false; + + startGrid(nodeCount()); + + final IgniteKernal kernal = (IgniteKernal)grid(0); + + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + + ccfg.setName(DYNAMIC_CACHE_NAME); + + ccfg.setNodeFilter(NODE_FILTER); + + kernal.context().cache().dynamicStartCache(ccfg).get(); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + IgniteKernal ignite = (IgniteKernal)grid(nodeCount()); + + return ignite.cache(DYNAMIC_CACHE_NAME); + } + }, IllegalArgumentException.class, null); + + GridCachePartitionExchangeManager.stop = true; + + // Should obtain client cache on new node. + IgniteCache<Object, Object> clientCache = ignite(nodeCount()).jcache(DYNAMIC_CACHE_NAME); + + clientCache.put("1", "1"); + + for (int g = 0; g < nodeCount() + 1; g++) + assertEquals("1", ignite(g).jcache(DYNAMIC_CACHE_NAME).get("1")); + + kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME).get(); + } + finally { + stopGrid(nodeCount()); + } + } }