#ignite-732: wip.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/62ff4924 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/62ff4924 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/62ff4924 Branch: refs/heads/ignite-732 Commit: 62ff4924b6b9c16e3dc5f52480804a7b864adbdc Parents: d04bc6d Author: ivasilinets <ivasilin...@gridgain.com> Authored: Thu Apr 30 18:06:27 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Thu Apr 30 18:06:27 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 109 +++++++++++-------- 1 file changed, 63 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62ff4924/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index c0026ab..77fa104 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -124,6 +124,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Must use JDK marshaller since it is used by discovery to fire custom events. */ private Marshaller marshaller = new JdkMarshaller(); + /** Count down latch for caches. */ + private CountDownLatch cacheStartedLatch = new CountDownLatch(1); + /** * @param ctx Kernal context. */ @@ -657,87 +660,92 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void onKernalStart() throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; + try { + if (ctx.config().isDaemon()) + return; - ClusterNode locNode = ctx.discovery().localNode(); + ClusterNode locNode = ctx.discovery().localNode(); - // Init cache plugin managers. - final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>(); + // Init cache plugin managers. + final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>(); - for (DynamicCacheDescriptor desc : registeredCaches.values()) { - CacheConfiguration locCcfg = desc.cacheConfiguration(); + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + CacheConfiguration locCcfg = desc.cacheConfiguration(); - CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg); + CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg); - cache2PluginMgr.put(locCcfg.getName(), pluginMgr); - } + cache2PluginMgr.put(locCcfg.getName(), pluginMgr); + } - if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { - for (ClusterNode n : ctx.discovery().remoteNodes()) { - checkTransactionConfiguration(n); + if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { + for (ClusterNode n : ctx.discovery().remoteNodes()) { + checkTransactionConfiguration(n); - DeploymentMode locDepMode = ctx.config().getDeploymentMode(); - DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE); + DeploymentMode locDepMode = ctx.config().getDeploymentMode(); + DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE); - CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode", - locDepMode, rmtDepMode, true); + CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode", + locDepMode, rmtDepMode, true); - for (DynamicCacheDescriptor desc : registeredCaches.values()) { - CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id()); + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id()); - if (rmtCfg != null) { - CacheConfiguration locCfg = desc.cacheConfiguration(); + if (rmtCfg != null) { + CacheConfiguration locCfg = desc.cacheConfiguration(); - checkCache(locCfg, rmtCfg, n); + checkCache(locCfg, rmtCfg, n); - // Check plugin cache configurations. - CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName()); + // Check plugin cache configurations. + CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName()); - assert pluginMgr != null : " Map=" + cache2PluginMgr; + assert pluginMgr != null : " Map=" + cache2PluginMgr; - pluginMgr.validateRemotes(rmtCfg, n); + pluginMgr.validateRemotes(rmtCfg, n); + } } } } - } - // Start dynamic caches received from collect discovery data. - for (DynamicCacheDescriptor desc : registeredCaches.values()) { - boolean started = desc.onStart(); + // Start dynamic caches received from collect discovery data. + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + boolean started = desc.onStart(); - assert started : "Failed to change started flag for locally configured cache: " + desc; + assert started : "Failed to change started flag for locally configured cache: " + desc; - desc.clearRemoteConfigurations(); + desc.clearRemoteConfigurations(); - CacheConfiguration ccfg = desc.cacheConfiguration(); + CacheConfiguration ccfg = desc.cacheConfiguration(); - IgnitePredicate filter = ccfg.getNodeFilter(); + IgnitePredicate filter = ccfg.getNodeFilter(); - if (filter.apply(locNode)) { - CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); + if (filter.apply(locNode)) { + CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); - CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName()); + CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName()); - assert pluginMgr != null : " Map=" + cache2PluginMgr; + assert pluginMgr != null : " Map=" + cache2PluginMgr; - GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx); + GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx); - ctx.dynamicDeploymentId(desc.deploymentId()); + ctx.dynamicDeploymentId(desc.deploymentId()); - sharedCtx.addCacheContext(ctx); + sharedCtx.addCacheContext(ctx); - GridCacheAdapter cache = ctx.cache(); + GridCacheAdapter cache = ctx.cache(); - String name = ccfg.getName(); + String name = ccfg.getName(); - caches.put(maskNull(name), cache); + caches.put(maskNull(name), cache); - startCache(cache); + startCache(cache); - jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); + jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); + } } } + finally { + cacheStartedLatch.countDown(); + } ctx.marshallerContext().onMarshallerCacheStarted(ctx); @@ -835,6 +843,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void onKernalStop(boolean cancel) { + cacheStartedLatch.countDown(); + if (ctx.config().isDaemon()) return; @@ -2686,6 +2696,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (log.isDebugEnabled()) log.debug("Getting internal cache adapter: " + name); + try { + cacheStartedLatch.await(); + } + catch (InterruptedException e) { + throw new IgniteException("Failed to wait starting caches."); + } + return (GridCacheAdapter<K, V>)caches.get(maskNull(name)); }