#gg-10186: NullPointerException at CacheDrStateTransferHandler.java:320 #gg-10187: NullPointerException at GridEntSecurityProcessor.java:263
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/587103fd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/587103fd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/587103fd Branch: refs/heads/ignite-sprint-5 Commit: 587103fdd1273e1d98897a07f98594dac85e38bc Parents: 99c7e22 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Wed May 6 12:40:27 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Wed May 6 12:40:27 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 109 ++++++++++-------- .../processors/cache/CacheGetFromJobTest.java | 110 +++++++++++++++++++ .../testsuites/IgniteCacheRestartTestSuite.java | 1 + 3 files changed, 174 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/587103fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index c0026ab..d22d224 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -124,6 +124,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Must use JDK marshaller since it is used by discovery to fire custom events. */ private Marshaller marshaller = new JdkMarshaller(); + /** Count down latch for caches. */ + private final CountDownLatch cacheStartedLatch = new CountDownLatch(1); + /** * @param ctx Kernal context. */ @@ -657,87 +660,92 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void onKernalStart() throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; + try { + if (ctx.config().isDaemon()) + return; - ClusterNode locNode = ctx.discovery().localNode(); + ClusterNode locNode = ctx.discovery().localNode(); - // Init cache plugin managers. - final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>(); + // Init cache plugin managers. + final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>(); - for (DynamicCacheDescriptor desc : registeredCaches.values()) { - CacheConfiguration locCcfg = desc.cacheConfiguration(); + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + CacheConfiguration locCcfg = desc.cacheConfiguration(); - CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg); + CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg); - cache2PluginMgr.put(locCcfg.getName(), pluginMgr); - } + cache2PluginMgr.put(locCcfg.getName(), pluginMgr); + } - if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { - for (ClusterNode n : ctx.discovery().remoteNodes()) { - checkTransactionConfiguration(n); + if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { + for (ClusterNode n : ctx.discovery().remoteNodes()) { + checkTransactionConfiguration(n); - DeploymentMode locDepMode = ctx.config().getDeploymentMode(); - DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE); + DeploymentMode locDepMode = ctx.config().getDeploymentMode(); + DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE); - CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode", - locDepMode, rmtDepMode, true); + CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode", + locDepMode, rmtDepMode, true); - for (DynamicCacheDescriptor desc : registeredCaches.values()) { - CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id()); + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id()); - if (rmtCfg != null) { - CacheConfiguration locCfg = desc.cacheConfiguration(); + if (rmtCfg != null) { + CacheConfiguration locCfg = desc.cacheConfiguration(); - checkCache(locCfg, rmtCfg, n); + checkCache(locCfg, rmtCfg, n); - // Check plugin cache configurations. - CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName()); + // Check plugin cache configurations. + CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName()); - assert pluginMgr != null : " Map=" + cache2PluginMgr; + assert pluginMgr != null : " Map=" + cache2PluginMgr; - pluginMgr.validateRemotes(rmtCfg, n); + pluginMgr.validateRemotes(rmtCfg, n); + } } } } - } - // Start dynamic caches received from collect discovery data. - for (DynamicCacheDescriptor desc : registeredCaches.values()) { - boolean started = desc.onStart(); + // Start dynamic caches received from collect discovery data. + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + boolean started = desc.onStart(); - assert started : "Failed to change started flag for locally configured cache: " + desc; + assert started : "Failed to change started flag for locally configured cache: " + desc; - desc.clearRemoteConfigurations(); + desc.clearRemoteConfigurations(); - CacheConfiguration ccfg = desc.cacheConfiguration(); + CacheConfiguration ccfg = desc.cacheConfiguration(); - IgnitePredicate filter = ccfg.getNodeFilter(); + IgnitePredicate filter = ccfg.getNodeFilter(); - if (filter.apply(locNode)) { - CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); + if (filter.apply(locNode)) { + CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); - CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName()); + CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName()); - assert pluginMgr != null : " Map=" + cache2PluginMgr; + assert pluginMgr != null : " Map=" + cache2PluginMgr; - GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx); + GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx); - ctx.dynamicDeploymentId(desc.deploymentId()); + ctx.dynamicDeploymentId(desc.deploymentId()); - sharedCtx.addCacheContext(ctx); + sharedCtx.addCacheContext(ctx); - GridCacheAdapter cache = ctx.cache(); + GridCacheAdapter cache = ctx.cache(); - String name = ccfg.getName(); + String name = ccfg.getName(); - caches.put(maskNull(name), cache); + caches.put(maskNull(name), cache); - startCache(cache); + startCache(cache); - jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); + jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); + } } } + finally { + cacheStartedLatch.countDown(); + } ctx.marshallerContext().onMarshallerCacheStarted(ctx); @@ -835,6 +843,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void onKernalStop(boolean cancel) { + cacheStartedLatch.countDown(); + if (ctx.config().isDaemon()) return; @@ -959,6 +969,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @throws IgniteCheckedException If failed to wait. + */ + public void awaitStarted() throws IgniteCheckedException { + U.await(cacheStartedLatch); + } + + /** * @param cache Cache. * @throws IgniteCheckedException If failed. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/587103fd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java new file mode 100644 index 0000000..5859bec --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.testframework.*; + +import java.util.concurrent.atomic.*; + +/** + * Job tries to get cache during topology change. + */ +public class CacheGetFromJobTest extends GridCacheAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testTopologyChange() throws Exception { + final AtomicReference<Exception> err = new AtomicReference<>(); + + final AtomicInteger id = new AtomicInteger(1); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() { + @Override public void apply() { + info("Run topology change."); + + try { + for (int i = 0; i < 5; i++) { + info("Topology change: " + i); + + startGrid(id.getAndIncrement()); + } + } + catch (Exception e) { + err.set(e); + + log.error("Unexpected exception in topology-change-thread: " + e, e); + } + } + }, 3, "topology-change-thread"); + + int cntr = 0; + + while (!fut.isDone()) { + grid(0).compute().broadcast(new TestJob()); + + cntr++; + } + + log.info("Job execution count: " + cntr); + + Exception err0 = err.get(); + + if (err0 != null) + throw err0; + } + + /** + * Test job. + */ + private static class TestJob implements IgniteCallable<Object> { + /** Ignite. */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + public TestJob() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + IgniteCache cache = ignite.cache(null); + + assertNotNull(cache); + + assertEquals(0, cache.localSize()); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/587103fd/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java index 0ced1c8..796d138 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java @@ -48,6 +48,7 @@ public class IgniteCacheRestartTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheAtomicPutAllFailoverSelfTest.class); suite.addTestSuite(IgniteCachePutAllRestartTest.class); suite.addTestSuite(GridCachePutAllFailoverSelfTest.class); + suite.addTestSuite(CacheGetFromJobTest.class); return suite; }