IGNITE-991 - Fix cache start from client node config.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1de11fff Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1de11fff Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1de11fff Branch: refs/heads/ignite-389 Commit: 1de11fff3cea7883a2f28a35107d7c9dfc75d5e0 Parents: 97d0bc1 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Wed Jun 3 16:34:12 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Wed Jun 3 16:34:12 2015 -0700 ---------------------------------------------------------------------- .../dht/GridClientPartitionTopology.java | 2 +- .../dht/GridDhtPartitionTopologyImpl.java | 6 +- .../GridDhtPartitionsExchangeFuture.java | 18 +++- ...niteDynamicCacheWithConfigStartSelfTest.java | 98 ++++++++++++++++++++ .../testsuites/IgniteCacheTestSuite4.java | 1 + 5 files changed, 116 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 2049d03..c3f3e7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -220,7 +220,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); // If this is the oldest node. - if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId)) { + if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) { if (node2part == null) { node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 1ae4ae7..af121c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -249,7 +249,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); // If this is the oldest node. - if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId())) { + if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion())) { if (node2part == null) { node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); @@ -276,7 +276,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (cctx.rebalanceEnabled()) { for (int p = 0; p < num; p++) { // If this is the first node in grid. - boolean added = exchFut.isCacheAdded(cctx.cacheId()); + boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()); if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId()) && exchId.isJoined()) || added) { assert exchId.isJoined() || added; @@ -668,7 +668,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + - ", allIds=" + allIds + ", node2part=" + node2part + ']'; + ", allIds=" + allIds + ", node2part=" + node2part + ", cache=" + cctx.name() + ']'; Collection<UUID> nodeIds = part2node.get(p); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index db43c6c..a03e2e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -295,7 +295,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @param cacheId Cache ID to check. * @return {@code True} if cache was added during this exchange. */ - public boolean isCacheAdded(int cacheId) { + public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) { if (!F.isEmpty(reqs)) { for (DynamicCacheChangeRequest req : reqs) { if (req.start() && !req.clientStartOnly()) { @@ -305,7 +305,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } - return false; + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer); } /** @@ -505,11 +507,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (cacheCtx.isLocal()) continue; - cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion()); - GridDhtPartitionTopology top = cacheCtx.topology(); top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId())); + + if (cacheCtx.affinity().affinityTopologyVersion() == AffinityTopologyVersion.NONE) { + initTopology(cacheCtx); + + top.beforeExchange(this); + } + else + cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion()); } if (exchId.isLeft()) @@ -566,7 +574,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert oldestNode.get() != null; for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (isCacheAdded(cacheCtx.cacheId())) { + if (isCacheAdded(cacheCtx.cacheId(), exchId.topologyVersion())) { if (cacheCtx.discovery().cacheAffinityNodes(cacheCtx.name(), topologyVersion()).isEmpty()) U.quietAndWarn(log, "No server nodes found for cache client: " + cacheCtx.namex()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java new file mode 100644 index 0000000..dcd6a69 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +/** + * + */ +public class IgniteDynamicCacheWithConfigStartSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String CACHE_NAME = "partitioned"; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + if (client) + cfg.setCacheConfiguration(cacheConfiguration(gridName)); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @param cacheName Cache name. + * @return Cache configuration. + */ + protected CacheConfiguration cacheConfiguration(String cacheName) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE_NAME); + + ccfg.setIndexedTypes(String.class, String.class); + + return ccfg; + } + + /** + * @throws Exception If failed. + */ + public void testStartCacheOnClient() throws Exception { + int srvCnt = 3; + + startGrids(srvCnt); + + try { + client = true; + + IgniteEx client = startGrid(srvCnt); + + for (int i = 0; i < 100; i++) + client.cache(CACHE_NAME).put(i, i); + + for (int i = 0; i < 100; i++) + assertEquals(i, grid(0).cache(CACHE_NAME).get(i)); + + client.cache(CACHE_NAME).removeAll(); + + for (int i = 0; i < 100; i++) + assertNull(grid(0).cache(CACHE_NAME).get(i)); + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1de11fff/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index a8019d2..15756d8 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -99,6 +99,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(IgniteCacheTxPreloadNoWriteTest.class); suite.addTestSuite(IgniteDynamicCacheStartSelfTest.class); + suite.addTestSuite(IgniteDynamicCacheWithConfigStartSelfTest.class); suite.addTestSuite(IgniteCacheDynamicStopSelfTest.class); suite.addTestSuite(IgniteCacheConfigurationTemplateTest.class); suite.addTestSuite(IgniteCacheConfigurationDefaultTemplateTest.class);