IGNITE-45 - WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/62d39de2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/62d39de2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/62d39de2 Branch: refs/heads/ignite-45 Commit: 62d39de2acfa00d8cf6abf1d1596d9e0116bc534 Parents: c0e60f8 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Wed Mar 4 18:54:47 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Wed Mar 4 18:54:47 2015 -0800 ---------------------------------------------------------------------- .../apache/ignite/internal/GridComponent.java | 3 + .../discovery/GridDiscoveryManager.java | 4 +- .../processors/cache/GridCacheProcessor.java | 84 +++++++++++++++++--- .../processors/cache/GridCacheUtils.java | 2 +- .../cache/IgniteDynamicCacheStartSelfTest.java | 9 ++- 5 files changed, 87 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62d39de2/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java index d496c9a..e226bf7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java @@ -36,6 +36,9 @@ public interface GridComponent { CONTINUOUS_PROC, /** */ + CACHE_PROC, + + /** */ PLUGIN } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62d39de2/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 04ca3d0..e510303 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -224,9 +224,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param filter Cache filter. */ public void addDynamicCacheFilter(String cacheName, IgnitePredicate<ClusterNode> filter) { - IgnitePredicate<ClusterNode> old = dynamicCacheFilters.put(cacheName, filter); - - assert old == null; + dynamicCacheFilters.put(cacheName, filter); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62d39de2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index b7ac0af..ea7cb9c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -663,6 +663,25 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (ctx.config().isDaemon()) return; + // Start dynamic caches received from collect discovery data. + for (DynamicCacheDescriptor desc : dynamicCaches.values()) { + GridCacheContext ctx = createCache(desc.cacheConfiguration()); + + sharedCtx.addCacheContext(ctx); + + GridCacheAdapter cache = ctx.cache(); + + String name = desc.cacheConfiguration().getName(); + + caches.put(name, cache); + + startCache(cache); + + proxies.put(name, new GridCacheProxyImpl(ctx, cache, null)); + + jCacheProxies.put(name, new IgniteCacheProxy(ctx, cache, null, false)); + } + if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { for (ClusterNode n : ctx.discovery().remoteNodes()) checkCache(n); @@ -1182,11 +1201,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { public void prepareCacheStart(DynamicCacheChangeRequest req) throws IgniteCheckedException { assert req.isStart(); - CacheConfiguration cfg = new CacheConfiguration(req.startCacheConfiguration()); - - initialize(cfg); - - GridCacheContext cacheCtx = createCache(cfg); + GridCacheContext cacheCtx = createCache(req.startCacheConfiguration()); cacheCtx.dynamicDeploymentId(req.deploymentId()); @@ -1309,6 +1324,44 @@ public class GridCacheProcessor extends GridProcessorAdapter { attrs.put(ATTR_CACHE_INTERCEPTORS, interceptors); } + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { + return DiscoveryDataExchangeType.CACHE_PROC; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object collectDiscoveryData(UUID nodeId) { + // Collect dynamically started caches to a single object. + Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(dynamicCaches.size()); + + for (DynamicCacheDescriptor desc : dynamicCaches.values()) { + if (!desc.cancelled()) + reqs.add(new DynamicCacheChangeRequest(desc.cacheConfiguration(), desc.nodeFilter())); + } + + U.debug(log, "Collected discovery data for cache: " + reqs.size()); + + return new DynamicCacheChangeBatch(reqs); + } + + /** {@inheritDoc} */ + @Override public void onDiscoveryDataReceived(UUID nodeId, Object data) { + if (data instanceof DynamicCacheChangeBatch) { + DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data; + + U.debug(log, "Received discovery data: " + batch.requests()); + + for (DynamicCacheChangeRequest req : batch.requests()) { + dynamicCaches.put(req.cacheName(), new DynamicCacheDescriptor( + req.startCacheConfiguration(), + req.startNodeFilter(), + req.deploymentId())); + + ctx.discovery().addDynamicCacheFilter(req.cacheName(), req.startNodeFilter()); + } + } + } + /** * Dynamically starts cache. * @@ -1317,12 +1370,21 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Future that will be completed when cache is deployed. */ public IgniteInternalFuture<?> dynamicStartCache(CacheConfiguration ccfg, IgnitePredicate<ClusterNode> nodeFilter) { - if (nodeFilter == null) - nodeFilter = F.alwaysTrue(); + try { + if (nodeFilter == null) + nodeFilter = F.alwaysTrue(); + + CacheConfiguration cfg = new CacheConfiguration(ccfg); + + initialize(cfg); - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(ccfg, nodeFilter); + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cfg, nodeFilter); - return F.first(initiateCacheChanges(F.asList(req))); + return F.first(initiateCacheChanges(F.asList(req))); + } + catch (IgniteCheckedException e) { + return new GridFinishedFutureEx<>(e); + } } /** @@ -1425,6 +1487,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param batch Change request batch. */ private void onCacheChangeRequested(DynamicCacheChangeBatch batch) { + U.debug(log, "<><><>Received cache change request: " + batch.requests().size()); + for (DynamicCacheChangeRequest req : batch.requests()) { if (req.isStart()) { CacheConfiguration ccfg = req.startCacheConfiguration(); @@ -1471,6 +1535,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { return; } + U.debug(log, "Cancelling descriptor: " + desc); + desc.onCancelled(); ctx.discovery().removeDynamicCacheFilter(req.cacheName()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62d39de2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index d6974f3..43062b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -741,7 +741,7 @@ public class GridCacheUtils { if (oldest == null || n.order() < oldest.order()) oldest = n; - assert oldest != null; + assert oldest != null : "Failed to find oldest node for cache context: " + cctx.name(); assert oldest.order() <= topOrder || topOrder < 0; return oldest; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62d39de2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index 29eac7c..3b40dc3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -249,7 +249,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void _testStartStopCacheAddNode() throws Exception { + public void testStartStopCacheAddNode() throws Exception { final IgniteKernal kernal = (IgniteKernal)grid(0); CacheConfiguration ccfg = new CacheConfiguration(); @@ -271,7 +271,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { assertEquals("1", grid(g).jcache(CACHE_NAME).get("1")); // Undeploy cache. - kernal.context().cache().dynamicStopCache(CACHE_NAME); + kernal.context().cache().dynamicStopCache(CACHE_NAME).get(); // TODO debug without get(). + + info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"); + info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"); + info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"); + info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"); startGrid(nodeCount() + 1);