ignite-981 Do not access cache in exchange future before cache is ready
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1603fe50 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1603fe50 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1603fe50 Branch: refs/heads/ignite-389 Commit: 1603fe502b03d5f3e57e7837e14f0d33af002236 Parents: 97d0bc1 Author: sboikov <sboi...@gridgain.com> Authored: Wed Jun 3 13:21:32 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jun 3 13:34:01 2015 +0300 ---------------------------------------------------------------------- .../dht/GridDhtPartitionTopologyImpl.java | 8 ++- .../GridDhtPartitionsExchangeFuture.java | 10 +++- .../cache/IgniteDynamicCacheStartSelfTest.java | 62 ++++++++++++++++++++ 3 files changed, 77 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1603fe50/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 1ae4ae7..68652c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -740,7 +740,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + - ", locNodeId=" + cctx.localNode().id() + ", locName=" + cctx.gridName() + ']'; + ", cache=" + cctx.name() + + ", started=" + cctx.started() + + ", stopping=" + stopping + + ", locNodeId=" + cctx.localNode().id() + + ", locName=" + cctx.gridName() + ']'; GridDhtPartitionFullMap m = node2part; @@ -758,6 +762,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); + assert partMap != null; + lock.writeLock().lock(); try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1603fe50/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index db43c6c..e0bfee6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -902,8 +902,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT id.topologyVersion()); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) - m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true)); + if (!cacheCtx.isLocal()) { + AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion(); + + boolean ready = startTopVer == null || startTopVer.compareTo(id.topologyVersion()) <= 0; + + if (ready) + m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true)); + } } // It is important that client topologies be added after contexts. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1603fe50/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index 095221e..db9e6a8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -25,6 +25,7 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; @@ -68,6 +69,9 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { private boolean testAttribute = true; /** */ + private boolean client; + + /** */ private boolean daemon; /** @@ -85,6 +89,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + if (client) { + cfg.setClientMode(true); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true); + } + cfg.setUserAttributes(F.asMap(TEST_ATTRIBUTE_NAME, testAttribute)); CacheConfiguration cacheCfg = new CacheConfiguration(); @@ -1024,4 +1034,56 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { stopGrid(nodeCount()); } } + + /** + * @throws Exception If failed. + */ + public void testStartStopWithClientJoin() throws Exception { + Ignite ignite1 = ignite(1); + + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + client = true; + + int iter = 0; + + while (!stop.get()) { + if (iter % 10 == 0) + log.info("Client start/stop iteration: " + iter); + + iter++; + + try (Ignite ignite = startGrid(nodeCount())) { + assertTrue(ignite.configuration().isClientMode()); + } + } + + return null; + } + }, 1, "client-start-stop"); + + try { + long stopTime = U.currentTimeMillis() + 30_000; + + int iter = 0; + + while (System.currentTimeMillis() < stopTime) { + if (iter % 10 == 0) + log.info("Cache start/stop iteration: " + iter); + + try (IgniteCache<Object, Object> cache = ignite1.getOrCreateCache("cache-" + iter)) { + assertNotNull(cache); + } + + iter++; + } + } + finally { + stop.set(true); + } + + fut.get(); + } }