Repository: incubator-ignite Updated Branches: refs/heads/ignite-23 913b0efab -> a837fe1e3
# ignite-23 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a837fe1e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a837fe1e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a837fe1e Branch: refs/heads/ignite-23 Commit: a837fe1e358191fee15dc193090634b2e9a11b6a Parents: 913b0ef Author: sboikov <semen.boi...@inria.fr> Authored: Mon May 25 21:38:58 2015 +0300 Committer: sboikov <semen.boi...@inria.fr> Committed: Mon May 25 21:38:58 2015 +0300 ---------------------------------------------------------------------- .../affinity/GridAffinityAssignmentCache.java | 59 ++++++----- .../cache/GridCacheAffinityManager.java | 9 +- .../GridDhtPartitionsExchangeFuture.java | 17 ++- ...teCacheClientNodePartitionsExchangeTest.java | 105 +++++++++++++++---- 4 files changed, 137 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a837fe1e/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 2992a6c..0969a57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -32,6 +32,8 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*; + /** * Affinity cached function. */ @@ -103,34 +105,6 @@ public class GridAffinityAssignmentCache { } /** - * Copies previous affinity assignment when client node joins on leaves. - * - * @param node Node. - * @param topVer Topology version. - */ - public void clientNodeTopologyChange(ClusterNode node, AffinityTopologyVersion topVer) { - GridAffinityAssignment assignment = head.get(); - - assert assignment.primaryPartitions(node.id()).isEmpty() : node; - assert assignment.backupPartitions(node.id()).isEmpty() : node; - - GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, assignment.assignment()); - - affCache.put(topVer, assignmentCpy); - head.set(assignmentCpy); - - for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) { - if (entry.getKey().compareTo(topVer) <= 0) { - if (log.isDebugEnabled()) - log.debug("Completing topology ready future (use previous affinity) " + - "[locNodeId=" + ctx.localNodeId() + ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']'); - - entry.getValue().onDone(topVer); - } - } - } - - /** * Initializes affinity with given topology version and assignment. The assignment is calculated on remote nodes * and brought to local node on partition map exchange. * @@ -249,6 +223,35 @@ public class GridAffinityAssignmentCache { } /** + * Copies previous affinity assignment when discovery event does not cause affinity assignment changes + * (e.g. client node joins on leaves). + * + * @param evt Event. + * @param topVer Topology version. + */ + public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersion topVer) { + GridAffinityAssignment aff = head.get(); + + assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.primaryPartitions(evt.eventNode().id()).isEmpty() : evt; + assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.backupPartitions(evt.eventNode().id()).isEmpty() : evt; + + GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, aff.assignment()); + + affCache.put(topVer, assignmentCpy); + head.set(assignmentCpy); + + for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) { + if (entry.getKey().compareTo(topVer) <= 0) { + if (log.isDebugEnabled()) + log.debug("Completing topology ready future (use previous affinity) " + + "[locNodeId=" + ctx.localNodeId() + ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']'); + + entry.getValue().onDone(topVer); + } + } + } + + /** * @return Last calculated affinity version. */ public AffinityTopologyVersion lastVersion() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a837fe1e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index 20fca7e..ea17df1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -149,15 +149,16 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { } /** - * Copies previous affinity assignment when client node joins on leaves. + * Copies previous affinity assignment when discovery event does not cause affinity assignment changes + * (e.g. client node joins on leaves). * - * @param node Node. + * @param evt Event. * @param topVer Topology version. */ - public void clientNodeTopologyChange(ClusterNode node, AffinityTopologyVersion topVer) { + public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersion topVer) { assert !cctx.isLocal(); - aff.clientNodeTopologyChange(node, topVer); + aff.clientEventTopologyChange(evt, topVer); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a837fe1e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 2ff445f..7963c56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -482,18 +482,29 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT else { assert discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt; - clientNodeEvt = false; + boolean clientOnlyStart = true; + + for (DynamicCacheChangeRequest req : reqs) { + if (!req.clientStartOnly()) { + clientOnlyStart = false; + + break; + } + } + + clientNodeEvt = clientOnlyStart; } if (clientNodeEvt) { ClusterNode node = discoEvt.eventNode(); - if (!node.isLocal()) { // Client need to initialize affinity for local join event. + // Client need to initialize affinity for local join event or for stated client caches. + if (!node.isLocal()) { for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; - cacheCtx.affinity().clientNodeTopologyChange(node, exchId.topologyVersion()); + cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion()); GridDhtPartitionTopology top = cacheCtx.topology(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a837fe1e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java index 726ff22..162aa81 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java @@ -373,23 +373,32 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr * @throws Exception If failed. */ private void waitForTopologyUpdate(int expNodes, int topVer) throws Exception { + final AffinityTopologyVersion ver = new AffinityTopologyVersion(topVer, 0); + + waitForTopologyUpdate(expNodes, ver); + } + + /** + * @param expNodes Expected number of nodes. + * @param topVer Expected topology version. + * @throws Exception If failed. + */ + private void waitForTopologyUpdate(int expNodes, final AffinityTopologyVersion topVer) throws Exception { List<Ignite> nodes = G.allGrids(); assertEquals(expNodes, nodes.size()); - final AffinityTopologyVersion ver = new AffinityTopologyVersion(topVer, 0); - for (Ignite ignite : nodes) { final IgniteKernal kernal = (IgniteKernal)ignite; GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - return ver.equals(kernal.context().cache().context().exchange().readyAffinityVersion()); + return topVer.equals(kernal.context().cache().context().exchange().readyAffinityVersion()); } }, 10_000); assertEquals("Unexpected affinity version for " + ignite.name(), - ver, + topVer, kernal.context().cache().context().exchange().readyAffinityVersion()); } @@ -417,7 +426,7 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr GridDhtPartitionTopology top = cache.context().topology(); assertEquals("Unexpected topology version [node=" + ignite.name() + ", cache=" + cache.name() + ']', - ver, + topVer, top.topologyVersion()); } } @@ -429,35 +438,52 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr * @throws Exception If failed. */ public void testClientOnlyCacheStart() throws Exception { - clientOnlyCacheStart(false); + clientOnlyCacheStart(false, false); } /** * @throws Exception If failed. */ public void testNearOnlyCacheStart() throws Exception { - clientOnlyCacheStart(true); + clientOnlyCacheStart(true, false); + } + + /** + * @throws Exception If failed. + */ + public void testClientOnlyCacheStartFromServerNode() throws Exception { + clientOnlyCacheStart(false, true); + } + + /** + * @throws Exception If failed. + */ + public void testNearOnlyCacheStartFromServerNode() throws Exception { + clientOnlyCacheStart(true, true); } /** * @param nearCache If {@code true} creates near cache on client. * @throws Exception If failed. */ - public void clientOnlyCacheStart(boolean nearCache) throws Exception { + private void clientOnlyCacheStart(boolean nearCache, boolean srvNode) throws Exception { Ignite ignite0 = startGrid(0); Ignite ignite1 = startGrid(1); waitForTopologyUpdate(2, 2); - final String CACHE_NAME = "cache1"; + final String CACHE_NAME1 = "cache1"; CacheConfiguration ccfg = new CacheConfiguration(); - ccfg.setName(CACHE_NAME); + ccfg.setName(CACHE_NAME1); + + if (srvNode) + ccfg.setNodeFilter(new TestFilter(getTestGridName(2))); ignite0.createCache(ccfg); - client = true; + client = !srvNode; Ignite ignite2 = startGrid(2); @@ -474,9 +500,11 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr assertNull(((IgniteKernal)ignite2).context().cache().context().cache().internalCache("cache1")); if (nearCache) - ignite2.getOrCreateNearCache(CACHE_NAME, new NearCacheConfiguration<>()); + ignite2.getOrCreateNearCache(CACHE_NAME1, new NearCacheConfiguration<>()); else - ignite2.cache(CACHE_NAME); + ignite2.cache(CACHE_NAME1); + + waitForTopologyUpdate(3, new AffinityTopologyVersion(3, 1)); GridCacheAdapter cache = ((IgniteKernal)ignite2).context().cache().context().cache().internalCache("cache1"); @@ -484,10 +512,10 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr assertEquals(nearCache, cache.context().isNear()); assertEquals(0, spi0.partitionsSingleMessages()); - assertEquals(0, spi0.partitionsFullMessages()); + assertEquals(1, spi0.partitionsFullMessages()); assertEquals(0, spi1.partitionsSingleMessages()); assertEquals(0, spi1.partitionsFullMessages()); - assertEquals(0, spi2.partitionsSingleMessages()); + assertEquals(1, spi2.partitionsSingleMessages()); assertEquals(0, spi2.partitionsFullMessages()); ClusterNode clientNode = ((IgniteKernal)ignite2).localNode(); @@ -495,9 +523,50 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr for (Ignite ignite : Ignition.allGrids()) { GridDiscoveryManager disco = ((IgniteKernal)ignite).context().discovery(); - assertTrue(disco.cacheNode(clientNode, CACHE_NAME)); - assertFalse(disco.cacheAffinityNode(clientNode, CACHE_NAME)); - assertEquals(nearCache, disco.cacheNearNode(clientNode, CACHE_NAME)); + assertTrue(disco.cacheNode(clientNode, CACHE_NAME1)); + assertFalse(disco.cacheAffinityNode(clientNode, CACHE_NAME1)); + assertEquals(nearCache, disco.cacheNearNode(clientNode, CACHE_NAME1)); + } + + spi0.reset(); + spi1.reset(); + spi2.reset(); + + final String CACHE_NAME2 = "cache2"; + + ccfg = new CacheConfiguration(); + + ccfg.setName(CACHE_NAME2); + + ignite2.createCache(ccfg); + + waitForTopologyUpdate(3, new AffinityTopologyVersion(3, 2)); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(2, spi0.partitionsFullMessages()); + assertEquals(1, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + assertEquals(1, spi2.partitionsSingleMessages()); + assertEquals(0, spi2.partitionsFullMessages()); + } + + /** + * + */ + private static class TestFilter implements IgnitePredicate<ClusterNode> { + /** */ + private String exclNodeName; + + /** + * @param exclNodeName Node name to exclude. + */ + public TestFilter(String exclNodeName) { + this.exclNodeName = exclNodeName; + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode clusterNode) { + return !exclNodeName.equals(clusterNode.attribute(IgniteNodeAttributes.ATTR_GRID_NAME)); } }