#ignite-732: wip.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/62ff4924
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/62ff4924
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/62ff4924

Branch: refs/heads/ignite-732
Commit: 62ff4924b6b9c16e3dc5f52480804a7b864adbdc
Parents: d04bc6d
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Thu Apr 30 18:06:27 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Thu Apr 30 18:06:27 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    | 109 +++++++++++--------
 1 file changed, 63 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62ff4924/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c0026ab..77fa104 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -124,6 +124,9 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     /** Must use JDK marshaller since it is used by discovery to fire custom 
events. */
     private Marshaller marshaller = new JdkMarshaller();
 
+    /** Count down latch for caches. */
+    private CountDownLatch cacheStartedLatch = new CountDownLatch(1);
+
     /**
      * @param ctx Kernal context.
      */
@@ -657,87 +660,92 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onKernalStart() throws IgniteCheckedException {
-        if (ctx.config().isDaemon())
-            return;
+        try {
+            if (ctx.config().isDaemon())
+                return;
 
-        ClusterNode locNode = ctx.discovery().localNode();
+            ClusterNode locNode = ctx.discovery().localNode();
 
-        // Init cache plugin managers.
-        final Map<String, CachePluginManager> cache2PluginMgr = new 
HashMap<>();
+            // Init cache plugin managers.
+            final Map<String, CachePluginManager> cache2PluginMgr = new 
HashMap<>();
 
-        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-            CacheConfiguration locCcfg = desc.cacheConfiguration();
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                CacheConfiguration locCcfg = desc.cacheConfiguration();
 
-            CachePluginManager pluginMgr = new CachePluginManager(ctx, 
locCcfg);
+                CachePluginManager pluginMgr = new CachePluginManager(ctx, 
locCcfg);
 
-            cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
-        }
+                cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
+            }
 
-        if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
-            for (ClusterNode n : ctx.discovery().remoteNodes()) {
-                checkTransactionConfiguration(n);
+            if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
+                for (ClusterNode n : ctx.discovery().remoteNodes()) {
+                    checkTransactionConfiguration(n);
 
-                DeploymentMode locDepMode = ctx.config().getDeploymentMode();
-                DeploymentMode rmtDepMode = 
n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
+                    DeploymentMode locDepMode = 
ctx.config().getDeploymentMode();
+                    DeploymentMode rmtDepMode = 
n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
 
-                CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", 
"Deployment mode",
-                    locDepMode, rmtDepMode, true);
+                    CU.checkAttributeMismatch(log, null, n.id(), 
"deploymentMode", "Deployment mode",
+                        locDepMode, rmtDepMode, true);
 
-                for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-                    CacheConfiguration rmtCfg = 
desc.remoteConfiguration(n.id());
+                    for (DynamicCacheDescriptor desc : 
registeredCaches.values()) {
+                        CacheConfiguration rmtCfg = 
desc.remoteConfiguration(n.id());
 
-                    if (rmtCfg != null) {
-                        CacheConfiguration locCfg = desc.cacheConfiguration();
+                        if (rmtCfg != null) {
+                            CacheConfiguration locCfg = 
desc.cacheConfiguration();
 
-                        checkCache(locCfg, rmtCfg, n);
+                            checkCache(locCfg, rmtCfg, n);
 
-                        // Check plugin cache configurations.
-                        CachePluginManager pluginMgr = 
cache2PluginMgr.get(locCfg.getName());
+                            // Check plugin cache configurations.
+                            CachePluginManager pluginMgr = 
cache2PluginMgr.get(locCfg.getName());
 
-                        assert pluginMgr != null : " Map=" + cache2PluginMgr;
+                            assert pluginMgr != null : " Map=" + 
cache2PluginMgr;
 
-                        pluginMgr.validateRemotes(rmtCfg, n);
+                            pluginMgr.validateRemotes(rmtCfg, n);
+                        }
                     }
                 }
             }
-        }
 
-        // Start dynamic caches received from collect discovery data.
-        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-            boolean started = desc.onStart();
+            // Start dynamic caches received from collect discovery data.
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                boolean started = desc.onStart();
 
-            assert started : "Failed to change started flag for locally 
configured cache: " + desc;
+                assert started : "Failed to change started flag for locally 
configured cache: " + desc;
 
-            desc.clearRemoteConfigurations();
+                desc.clearRemoteConfigurations();
 
-            CacheConfiguration ccfg = desc.cacheConfiguration();
+                CacheConfiguration ccfg = desc.cacheConfiguration();
 
-            IgnitePredicate filter = ccfg.getNodeFilter();
+                IgnitePredicate filter = ccfg.getNodeFilter();
 
-            if (filter.apply(locNode)) {
-                CacheObjectContext cacheObjCtx = 
ctx.cacheObjects().contextForCache(ccfg);
+                if (filter.apply(locNode)) {
+                    CacheObjectContext cacheObjCtx = 
ctx.cacheObjects().contextForCache(ccfg);
 
-                CachePluginManager pluginMgr = 
cache2PluginMgr.get(ccfg.getName());
+                    CachePluginManager pluginMgr = 
cache2PluginMgr.get(ccfg.getName());
 
-                assert pluginMgr != null : " Map=" + cache2PluginMgr;
+                    assert pluginMgr != null : " Map=" + cache2PluginMgr;
 
-                GridCacheContext ctx = createCache(ccfg, pluginMgr, 
desc.cacheType(), cacheObjCtx);
+                    GridCacheContext ctx = createCache(ccfg, pluginMgr, 
desc.cacheType(), cacheObjCtx);
 
-                ctx.dynamicDeploymentId(desc.deploymentId());
+                    ctx.dynamicDeploymentId(desc.deploymentId());
 
-                sharedCtx.addCacheContext(ctx);
+                    sharedCtx.addCacheContext(ctx);
 
-                GridCacheAdapter cache = ctx.cache();
+                    GridCacheAdapter cache = ctx.cache();
 
-                String name = ccfg.getName();
+                    String name = ccfg.getName();
 
-                caches.put(maskNull(name), cache);
+                    caches.put(maskNull(name), cache);
 
-                startCache(cache);
+                    startCache(cache);
 
-                jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, 
cache, null, false));
+                    jCacheProxies.put(maskNull(name), new 
IgniteCacheProxy(ctx, cache, null, false));
+                }
             }
         }
+        finally {
+            cacheStartedLatch.countDown();
+        }
 
         ctx.marshallerContext().onMarshallerCacheStarted(ctx);
 
@@ -835,6 +843,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onKernalStop(boolean cancel) {
+        cacheStartedLatch.countDown();
+
         if (ctx.config().isDaemon())
             return;
 
@@ -2686,6 +2696,13 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         if (log.isDebugEnabled())
             log.debug("Getting internal cache adapter: " + name);
 
+        try {
+            cacheStartedLatch.await();
+        }
+        catch (InterruptedException e) {
+            throw new IgniteException("Failed to wait starting caches.");
+        }
+
         return (GridCacheAdapter<K, V>)caches.get(maskNull(name));
     }
 

Reply via email to