Repository: incubator-ignite Updated Branches: refs/heads/ignite-23 [created] a2a6f31fe
# ignite-23 avoid exchange for client node join/leave Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a2a6f31f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a2a6f31f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a2a6f31f Branch: refs/heads/ignite-23 Commit: a2a6f31fe4207dbb3cd2146c3f22a5c99d83fc5b Parents: 5c8591c Author: sboikov <sboi...@gridgain.com> Authored: Mon May 18 17:50:06 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon May 18 17:50:06 2015 +0300 ---------------------------------------------------------------------- .../affinity/GridAffinityAssignmentCache.java | 27 +++ .../cache/GridCacheAffinityManager.java | 12 ++ .../GridDhtPartitionsExchangeFuture.java | 100 +++++++--- .../IgniteCacheClientNodeExchangeTest.java | 184 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite2.java | 2 + 5 files changed, 300 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a2a6f31f/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index eccd9f9..18ac65a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -103,6 +103,32 @@ public class GridAffinityAssignmentCache { } /** + * @param node Node. + * @param topVer Topology version. + */ + public void clientNodeTopologyChange(ClusterNode node, AffinityTopologyVersion topVer) { + GridAffinityAssignment assignment = head.get(); + + assert assignment.primaryPartitions(node.id()).isEmpty() : node; + assert assignment.backupPartitions(node.id()).isEmpty() : node; + + GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, assignment.assignment()); + + affCache.put(topVer, assignmentCpy); + head.set(assignmentCpy); + + for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) { + if (entry.getKey().compareTo(topVer) <= 0) { + if (log.isDebugEnabled()) + log.debug("Completing topology ready future (use previous affinity) " + + "[locNodeId=" + ctx.localNodeId() + ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']'); + + entry.getValue().onDone(topVer); + } + } + } + + /** * Initializes affinity with given topology version and assignment. The assignment is calculated on remote nodes * and brought to local node on partition map exchange. * @@ -422,6 +448,7 @@ public class GridAffinityAssignmentCache { /** * + * @param reqTopVer Required topology version. */ private AffinityReadyFuture(AffinityTopologyVersion reqTopVer) { this.reqTopVer = reqTopVer; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a2a6f31f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index fe7efd5..6541e9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -140,6 +140,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * * @param topVer Topology version to calculate affinity for. * @param discoEvt Discovery event that causes this topology change. + * @return Affinity assignments. */ public List<List<ClusterNode>> calculateAffinity(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) { assert !cctx.isLocal(); @@ -148,6 +149,17 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { } /** + * + * @param node Node. + * @param topVer Topology version. + */ + public void clientNodeTopologyChange(ClusterNode node, AffinityTopologyVersion topVer) { + assert !cctx.isLocal(); + + aff.clientNodeTopologyChange(node, topVer); + } + + /** * @return Partition count. */ public int partitions() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a2a6f31f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 599e391..f4dcf3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -44,6 +44,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; +import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; /** @@ -146,6 +147,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** Dynamic cache change requests. */ private Collection<DynamicCacheChangeRequest> reqs; + /** Cache validation results. */ private volatile Map<Integer, Boolean> cacheValidRes; /** @@ -200,6 +202,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @param cctx Cache context. * @param busyLock Busy lock. * @param exchId Exchange ID. + * @param reqs Cache change requests. */ public GridDhtPartitionsExchangeFuture( GridCacheSharedContext cctx, @@ -322,7 +325,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** - * Rechecks topology. + * @param cacheCtx Cache context. + * @throws IgniteCheckedException If failed. */ private void initTopology(GridCacheContext cacheCtx) throws IgniteCheckedException { if (stopping(cacheCtx.cacheId())) @@ -471,6 +475,62 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT // will return corresponding nodes. U.await(evtLatch); + if (!dummy && !forcePreload && F.isEmpty(reqs)) { // If exchange initiated by node join or leave. + assert discoEvt != null; + + int type = discoEvt.type(); + + assert type == EVT_NODE_JOINED || type == EVT_NODE_LEFT || type == EVT_NODE_FAILED : discoEvt; + + ClusterNode node = discoEvt.eventNode(); + + if (!node.isLocal()) { + boolean affNode = false; + + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.isLocal()) + continue; + + if (CU.affinityNode(node, cacheCtx.config().getNodeFilter())) { + affNode = true; + + break; + } + } + + if (!affNode) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.isLocal()) + continue; + + cacheCtx.affinity().clientNodeTopologyChange(node, exchId.topologyVersion()); + + GridDhtPartitionTopology top = cacheCtx.topology(); + + GridDhtPartitionMap parts = top.partitions(node.id()); + + assert parts == null || parts.size() == 0 : parts; + + top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId())); + } + + if (!exchId.isLeft()) { + rmtNodes = new ConcurrentLinkedQueue<>(F.asList(node)); + + rmtIds = F.asList(node.id()); + } + + ready.set(true); + + initFut.onDone(true); + + onDone(exchId.topologyVersion()); + + return; + } + } + } + startCaches(); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { @@ -484,8 +544,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT List<String> cachesWithoutNodes = null; - for (String name : cctx.cache().cacheNames()) { - if (exchId.isLeft()) { + if (exchId.isLeft()) { + for (String name : cctx.cache().cacheNames()) { if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) { if (cachesWithoutNodes == null) cachesWithoutNodes = new ArrayList<>(); @@ -521,7 +581,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } if (cachesWithoutNodes != null) { - StringBuilder sb = new StringBuilder("All server nodes for the following caches have left the cluster: "); + StringBuilder sb = + new StringBuilder("All server nodes for the following caches have left the cluster: "); for (int i = 0; i < cachesWithoutNodes.size(); i++) { String cache = cachesWithoutNodes.get(i); @@ -666,36 +727,25 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (log.isDebugEnabled()) log.debug("Initialized future: " + this); - if (canSkipExchange()) - onDone(exchId.topologyVersion()); + // If this node is not oldest. + if (!oldestNode.get().id().equals(cctx.localNodeId())) + sendPartitions(); else { - // If this node is not oldest. - if (!oldestNode.get().id().equals(cctx.localNodeId())) - sendPartitions(); - else { - boolean allReceived = allReceived(); - - if (allReceived && replied.compareAndSet(false, true)) { - if (spreadPartitions()) - onDone(exchId.topologyVersion()); - } - } + boolean allReceived = allReceived(); - scheduleRecheck(); + if (allReceived && replied.compareAndSet(false, true)) { + if (spreadPartitions()) + onDone(exchId.topologyVersion()); + } } + + scheduleRecheck(); } else assert false : "Skipped init future: " + this; } /** - * @return {@code True} if no distributed exchange is needed. - */ - private boolean canSkipExchange() { - return false; // TODO ignite-23; - } - - /** * */ private void dumpPendingObjects() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a2a6f31f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java new file mode 100644 index 0000000..66db3c6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.eclipse.jetty.util.*; + +import java.util.*; + +/** + * + */ +public class IgniteCacheClientNodeExchangeTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + CacheConfiguration ccfg = new CacheConfiguration(); + + cfg.setCacheConfiguration(ccfg); + + cfg.setCommunicationSpi(new TestCommunicationSpi()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testNoPartitionExchangeForClient() throws Exception { + Ignite ignite0 = startGrid(0); + + TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi(); + + Ignite ignite1 = startGrid(1); + + TestCommunicationSpi spi1 = (TestCommunicationSpi)ignite1.configuration().getCommunicationSpi(); + + assertEquals(0, spi0.partitionsSingleMessages().size()); + assertEquals(1, spi0.partitionsFullMessages().size()); + + assertEquals(1, spi1.partitionsSingleMessages().size()); + assertEquals(0, spi1.partitionsFullMessages().size()); + + spi0.reset(); + spi1.reset(); + + client = true; + + for (int i = 0; i < 3; i++) { + log.info("Start client node: " + i); + + Ignite ignite2 = startGrid(2); + + TestCommunicationSpi spi2 = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + assertEquals(0, spi0.partitionsSingleMessages().size()); + assertEquals(1, spi0.partitionsFullMessages().size()); + + assertEquals(0, spi1.partitionsSingleMessages().size()); + assertEquals(0, spi1.partitionsFullMessages().size()); + + assertEquals(1, spi2.partitionsSingleMessages().size()); + assertEquals(0, spi2.partitionsFullMessages().size()); + + spi0.reset(); + spi1.reset(); + spi2.reset(); + + log.info("Stop client node."); + + ignite2.close(); + + assertEquals(0, spi0.partitionsSingleMessages().size()); + assertEquals(0, spi0.partitionsFullMessages().size()); + + assertEquals(0, spi1.partitionsSingleMessages().size()); + assertEquals(0, spi1.partitionsFullMessages().size()); + } + } + + /** + * Test communication SPI. + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** */ + private ConcurrentHashSet<GridDhtPartitionsSingleMessage> partSingleMsgs = new ConcurrentHashSet<>(); + + /** */ + private ConcurrentHashSet<GridDhtPartitionsFullMessage> partFullMsgs = new ConcurrentHashSet<>(); + + /** */ + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) { + super.sendMessage(node, msg); + + Object msg0 = ((GridIoMessage)msg).message(); + + if (msg0 instanceof GridDhtPartitionsSingleMessage) { + if (((GridDhtPartitionsSingleMessage)msg0).exchangeId() != null) { + log.info("Partitions message: " + msg0.getClass().getSimpleName()); + + partSingleMsgs.add((GridDhtPartitionsSingleMessage) msg0); + } + } + else if (msg0 instanceof GridDhtPartitionsFullMessage) { + if (((GridDhtPartitionsFullMessage)msg0).exchangeId() != null) { + log.info("Partitions message: " + msg0.getClass().getSimpleName()); + + partFullMsgs.add((GridDhtPartitionsFullMessage) msg0); + } + } + } + + /** + * + */ + void reset() { + partSingleMsgs.clear(); + partFullMsgs.clear(); + } + + /** + * @return Sent partitions single messages. + */ + Collection<GridDhtPartitionsSingleMessage> partitionsSingleMessages() { + return partSingleMsgs; + } + + /** + * @return Sent partitions full messages. + */ + Collection<GridDhtPartitionsFullMessage> partitionsFullMessages() { + return partFullMsgs; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a2a6f31f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index 5738778..6031dcb 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -138,6 +138,8 @@ public class IgniteCacheTestSuite2 extends TestSuite { suite.addTest(new TestSuite(GridCacheNearPrimarySyncSelfTest.class)); suite.addTest(new TestSuite(GridCacheColocatedPrimarySyncSelfTest.class)); + suite.addTest(new TestSuite(IgniteCacheClientNodeExchangeTest.class)); + return suite; } }