# ignite-23 skip client nodes from partition exchange
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/72d6ea5d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/72d6ea5d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/72d6ea5d Branch: refs/heads/ignite-23 Commit: 72d6ea5d1d772594d9ef4c567b2011024268475b Parents: d6f5f15 Author: sboikov <sboi...@gridgain.com> Authored: Tue May 19 14:49:37 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue May 19 17:47:15 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridKernalContext.java | 5 + .../ignite/internal/GridKernalContextImpl.java | 5 + .../discovery/GridDiscoveryManager.java | 64 ++- .../GridCachePartitionExchangeManager.java | 76 ++- .../processors/cache/GridCachePreloader.java | 4 +- .../cache/GridCachePreloaderAdapter.java | 4 +- .../cache/GridCacheSharedContext.java | 1 + .../processors/cache/GridCacheUtils.java | 58 +-- .../dht/GridDhtPartitionTopologyImpl.java | 4 +- .../preloader/GridDhtPartitionDemandPool.java | 20 +- .../preloader/GridDhtPartitionSupplyPool.java | 6 +- .../GridDhtPartitionsExchangeFuture.java | 223 +++++---- .../preloader/GridDhtPartitionsFullMessage.java | 4 +- .../GridDhtPartitionsSingleMessage.java | 33 +- .../dht/preloader/GridDhtPreloader.java | 14 +- .../preloader/GridDhtPreloaderAssignments.java | 3 +- .../GridCacheAbstractRemoveFailureTest.java | 20 + ...niteCacheClientNodeChangingTopologyTest.java | 42 ++ .../IgniteCacheClientNodeExchangeTest.java | 184 ------- ...teCacheClientNodePartitionsExchangeTest.java | 486 +++++++++++++++++++ .../GridCacheDhtClientRemoveFailureTest.java | 28 ++ ...cClientInvalidPartitionHandlingSelfTest.java | 29 ++ .../GridCacheAtomicClientRemoveFailureTest.java | 28 ++ ...eAtomicInvalidPartitionHandlingSelfTest.java | 12 + .../IgniteCacheFailoverTestSuite.java | 3 + .../testsuites/IgniteCacheTestSuite2.java | 2 +- 26 files changed, 985 insertions(+), 373 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index ad7d562..d6542f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -552,4 +552,9 @@ public interface GridKernalContext extends Iterable<GridComponent> { * @return Marshaller context. */ public MarshallerContextImpl marshallerContext(); + + /** + * @return {@code True} if local node is client node (has flag {@link IgniteConfiguration#isClientMode()} set). + */ + public boolean clientNode(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 1ff483e..f921d49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -894,6 +894,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public boolean clientNode() { + return cfg.isClientMode(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridKernalContextImpl.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 62548d8..7130421 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -1246,13 +1246,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Gets alive remote nodes with at least one cache configured. + * Gets alive remote server nodes with at least one cache configured. * * @param topVer Topology version (maximum allowed node order). * @return Collection of alive cache nodes. */ - public Collection<ClusterNode> aliveRemoteNodesWithCaches(AffinityTopologyVersion topVer) { - return resolveDiscoCache(null, topVer).aliveRemoteNodesWithCaches(topVer.topologyVersion()); + public Collection<ClusterNode> aliveRemoteServerNodesWithCaches(AffinityTopologyVersion topVer) { + return resolveDiscoCache(null, topVer).aliveRemoteServerNodesWithCaches(topVer.topologyVersion()); + } + + /** + * Gets alive server nodes with at least one cache configured. + * + * @param topVer Topology version (maximum allowed node order). + * @return Collection of alive cache nodes. + */ + public Collection<ClusterNode> aliveServerNodesWithCaches(AffinityTopologyVersion topVer) { + return resolveDiscoCache(null, topVer).aliveServerNodesWithCaches(topVer.topologyVersion()); } /** @@ -1350,7 +1360,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { // Find the eldest acceptable discovery cache. Map.Entry<AffinityTopologyVersion, DiscoCache> eldest = Collections.min(discoCacheHist.entrySet(), histCmp); - if (topVer.compareTo(eldest.getKey()) < 0) + if (topVer.compareTo(eldest.getKey()) <= 0) cache = eldest.getValue(); } @@ -2094,9 +2104,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private final Collection<ClusterNode> aliveNodesWithCaches; /** - * Cached alive remote nodes with caches. + * Cached alive server remote nodes with caches. + */ + private final Collection<ClusterNode> aliveSrvNodesWithCaches; + + /** + * Cached alive remote server nodes with caches. */ - private final Collection<ClusterNode> aliveRmtNodesWithCaches; + private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches; /** * @param loc Local node. @@ -2131,7 +2146,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); aliveNodesWithCaches = new ConcurrentSkipListSet<>(); - aliveRmtNodesWithCaches = new ConcurrentSkipListSet<>(); + aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>(); + aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>(); nodesByVer = new TreeMap<>(); long maxOrder0 = 0; @@ -2183,8 +2199,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (alive(node.id())) { aliveNodesWithCaches.add(node); - if (!loc.id().equals(node.id())) - aliveRmtNodesWithCaches.add(node); + if (!CU.clientNode(node)) { + aliveSrvNodesWithCaches.add(node); + + if (!loc.id().equals(node.id())) + aliveRmtSrvNodesWithCaches.add(node); + } } } @@ -2269,13 +2289,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * @return All nodes with at least one cache configured. - */ - Collection<ClusterNode> allNodesWithCaches() { - return allNodesWithCaches; - } - - /** * Gets collection of nodes which have version equal or greater than {@code ver}. * * @param ver Version to check. @@ -2374,13 +2387,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Gets all alive remote nodes with at least one cache configured. + * Gets all alive remote server nodes with at least one cache configured. + * + * @param topVer Topology version. + * @return Collection of nodes. + */ + Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) { + return filter(topVer, aliveRmtSrvNodesWithCaches); + } + + /** + * Gets all alive server nodes with at least one cache configured. * * @param topVer Topology version. * @return Collection of nodes. */ - Collection<ClusterNode> aliveRemoteNodesWithCaches(final long topVer) { - return filter(topVer, aliveRmtNodesWithCaches); + Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) { + return filter(topVer, aliveSrvNodesWithCaches); } /** @@ -2417,7 +2440,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { filterNodeMap(aliveRmtCacheNodes, leftNode); aliveNodesWithCaches.remove(leftNode); - aliveRmtNodesWithCaches.remove(leftNode); + aliveSrvNodesWithCaches.remove(leftNode); + aliveRmtSrvNodesWithCaches.remove(leftNode); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index c399c23..cc06d4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -554,7 +554,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * Partition refresh callback. */ void refreshPartitions() { - ClusterNode oldest = CU.oldest(cctx); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE); + + if (oldest == null) { + if (log.isDebugEnabled()) + log.debug("Skip partitions refresh, there are no server nodes [loc=" + cctx.localNodeId() + ']'); + + return; + } if (log.isDebugEnabled()) log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']'); @@ -641,7 +648,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana */ private boolean sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException { - GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.versions().last()); + GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, + cctx.kernalContext().clientNode(), + cctx.versions().last()); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) { @@ -687,6 +696,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param exchId Exchange ID. * @param discoEvt Discovery event. + * @param reqs Cache change requests. * @return Exchange future. */ GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId, @@ -827,7 +837,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param node Node ID. * @param msg Message. */ - private void processSinglePartitionUpdate(ClusterNode node, GridDhtPartitionsSingleMessage msg) { + private void processSinglePartitionUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { if (!enterBusy()) return; @@ -858,8 +868,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (updated) scheduleResendPartitions(); } - else - exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg); + else { + if (msg.client()) { + IgniteInternalFuture<?> fut = affinityReadyFuture(msg.exchangeId().topologyVersion()); + + if (fut != null) { + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + processSinglePartitionClientUpdate(node, msg); + } + }); + } + else + processSinglePartitionClientUpdate(node, msg); + } + else + exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg); + } } finally { leaveBusy(); @@ -867,6 +892,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * @param node Node. + * @param msg Message. + */ + private void processSinglePartitionClientUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { + final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(), + null, + null); + + exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + // Finished future should reply only to sender client node. + exchFut.onReceive(node.id(), msg); + } + }); + } + + /** * @param node Node ID. * @param msg Message. */ @@ -982,7 +1024,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana busy = true; - Map<Integer, GridDhtPreloaderAssignments<K, V>> assignsMap = new HashMap<>(); + Map<Integer, GridDhtPreloaderAssignments> assignsMap = null; boolean dummyReassign = exchFut.dummyReassign(); boolean forcePreload = exchFut.forcePreload(); @@ -1017,7 +1059,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana changed |= cacheCtx.topology().afterExchange(exchFut); // Preload event notification. - if (cacheCtx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED)) { + if (!exchFut.skipPreload() && cacheCtx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED)) { if (!cacheCtx.isReplicated() || !startEvtFired) { DiscoveryEvent discoEvt = exchFut.discoveryEvent(); @@ -1043,16 +1085,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - long delay = cacheCtx.config().getRebalanceDelay(); + if (!exchFut.skipPreload()) { + assignsMap = new HashMap<>(); - GridDhtPreloaderAssignments<K, V> assigns = null; + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + long delay = cacheCtx.config().getRebalanceDelay(); + + GridDhtPreloaderAssignments assigns = null; - // Don't delay for dummy reassigns to avoid infinite recursion. - if (delay == 0 || forcePreload) - assigns = cacheCtx.preloader().assign(exchFut); + // Don't delay for dummy reassigns to avoid infinite recursion. + if (delay == 0 || forcePreload) + assigns = cacheCtx.preloader().assign(exchFut); - assignsMap.put(cacheCtx.cacheId(), assigns); + assignsMap.put(cacheCtx.cacheId(), assigns); + } } } finally { @@ -1061,7 +1107,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } if (assignsMap != null) { - for (Map.Entry<Integer, GridDhtPreloaderAssignments<K, V>> e : assignsMap.entrySet()) { + for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) { int cacheId = e.getKey(); GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 2e181f9..5a73843 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -78,7 +78,7 @@ public interface GridCachePreloader<K, V> { * @param exchFut Exchange future to assign. * @return Assignments. */ - public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut); + public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut); /** * Adds assignments to preloader. @@ -86,7 +86,7 @@ public interface GridCachePreloader<K, V> { * @param assignments Assignments to add. * @param forcePreload Force preload flag. */ - public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload); + public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload); /** * @param p Preload predicate. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 80d3d6b..8cd5264 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -131,12 +131,12 @@ public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V> } /** {@inheritDoc} */ - @Override public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) { + @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { return null; } /** {@inheritDoc} */ - @Override public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload) { + @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) { // No-op. } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 294c2b0..4c08beb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -499,6 +499,7 @@ public class GridCacheSharedContext<K, V> { /** * @param tx Transaction to rollback. * @throws IgniteCheckedException If failed. + * @return Rollback future. */ public IgniteInternalFuture rollbackTxAsync(IgniteInternalTx tx) throws IgniteCheckedException { Collection<Integer> cacheIds = tx.activeCacheIds(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index ef04ff4..b7bc115 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -527,8 +527,9 @@ public class GridCacheUtils { * @param topOrder Maximum allowed node order. * @return Affinity nodes. */ - public static Collection<ClusterNode> aliveRemoteCacheNodes(final GridCacheSharedContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().aliveRemoteNodesWithCaches(topOrder); + public static Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final GridCacheSharedContext ctx, + AffinityTopologyVersion topOrder) { + return ctx.discovery().aliveRemoteServerNodesWithCaches(topOrder); } /** @@ -607,26 +608,6 @@ public class GridCacheUtils { * Gets oldest alive node for specified topology version. * * @param cctx Cache context. - * @return Oldest node for the current topology version. - */ - public static ClusterNode oldest(GridCacheContext cctx) { - return oldest(cctx, AffinityTopologyVersion.NONE); - } - - /** - * Gets oldest alive node across nodes with at least one cache configured. - * - * @param ctx Cache context. - * @return Oldest node. - */ - public static ClusterNode oldest(GridCacheSharedContext ctx) { - return oldest(ctx, AffinityTopologyVersion.NONE); - } - - /** - * Gets oldest alive node for specified topology version. - * - * @param cctx Cache context. * @param topOrder Maximum allowed node order. * @return Oldest node for the given topology version. */ @@ -665,6 +646,23 @@ public class GridCacheUtils { } /** + * Gets oldest alive server node with at least one cache configured for specified topology version. + * + * @param ctx Context. + * @param topVer Maximum allowed topology version. + * @return Oldest alive cache server node. + */ + @Nullable public static ClusterNode oldestAliveCacheServerNode(GridCacheSharedContext ctx, + AffinityTopologyVersion topVer) { + Collection<ClusterNode> nodes = ctx.discovery().aliveServerNodesWithCaches(topVer); + + if (nodes.isEmpty()) + return null; + + return oldest(nodes); + } + + /** * @param nodes Nodes. * @return Oldest node for the given topology version. */ @@ -1802,16 +1800,22 @@ public class GridCacheUtils { /** * @param node Node. - * @param filter Node filter. - * @return {@code True} if node is not client node and pass given filter. + * @return {@code True} if given node is client node (has flag {@link IgniteConfiguration#isClientMode()} set). */ - public static boolean affinityNode(ClusterNode node, IgnitePredicate<ClusterNode> filter) { + public static boolean clientNode(ClusterNode node) { Boolean clientModeAttr = node.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE); assert clientModeAttr != null : node; - boolean clientMode = clientModeAttr != null && clientModeAttr; + return clientModeAttr != null && clientModeAttr; + } - return !clientMode && filter.apply(node); + /** + * @param node Node. + * @param filter Node filter. + * @return {@code True} if node is not client node and pass given filter. + */ + public static boolean affinityNode(ClusterNode node, IgnitePredicate<ClusterNode> filter) { + return !clientNode(node) && filter.apply(node); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 073e0e7..56f6a62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -239,7 +239,9 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { removeNode(exchId.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx.shared(), topVer); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); + + assert oldest != null; if (log.isDebugEnabled()) log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 633f237..1071468 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -53,12 +53,12 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*; * and populating local cache. */ @SuppressWarnings("NonConstantFieldWithUpperCaseName") -public class GridDhtPartitionDemandPool<K, V> { +public class GridDhtPartitionDemandPool { /** Dummy message to wake up a blocking queue if a node leaves. */ private final SupplyMessage DUMMY_TOP = new SupplyMessage(); /** */ - private final GridCacheContext<K, V> cctx; + private final GridCacheContext<?, ?> cctx; /** */ private final IgniteLogger log; @@ -99,7 +99,7 @@ public class GridDhtPartitionDemandPool<K, V> { * @param cctx Cache context. * @param busyLock Shutdown lock. */ - public GridDhtPartitionDemandPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) { + public GridDhtPartitionDemandPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) { assert cctx != null; assert busyLock != null; @@ -327,7 +327,7 @@ public class GridDhtPartitionDemandPool<K, V> { * @param assigns Assignments. * @param force {@code True} if dummy reassign. */ - void addAssignments(final GridDhtPreloaderAssignments<K, V> assigns, boolean force) { + void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) { if (log.isDebugEnabled()) log.debug("Adding partition assignments: " + assigns); @@ -399,7 +399,7 @@ public class GridDhtPartitionDemandPool<K, V> { private int id; /** Partition-to-node assignments. */ - private final LinkedBlockingDeque<GridDhtPreloaderAssignments<K, V>> assignQ = new LinkedBlockingDeque<>(); + private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>(); /** Message queue. */ private final LinkedBlockingDeque<SupplyMessage> msgQ = @@ -425,7 +425,7 @@ public class GridDhtPartitionDemandPool<K, V> { /** * @param assigns Assignments. */ - void addAssignments(GridDhtPreloaderAssignments<K, V> assigns) { + void addAssignments(GridDhtPreloaderAssignments assigns) { assert assigns != null; assignQ.offer(assigns); @@ -885,7 +885,7 @@ public class GridDhtPartitionDemandPool<K, V> { } // Sync up all demand threads at this step. - GridDhtPreloaderAssignments<K, V> assigns = null; + GridDhtPreloaderAssignments assigns = null; while (assigns == null) assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this); @@ -995,12 +995,12 @@ public class GridDhtPartitionDemandPool<K, V> { * @param exchFut Exchange future. * @return Assignments of partitions to nodes. */ - GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) { + GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { // No assignments for disabled preloader. GridDhtPartitionTopology top = cctx.dht().topology(); if (!cctx.rebalanceEnabled()) - return new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion()); + return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); int partCnt = cctx.affinity().partitions(); @@ -1009,7 +1009,7 @@ public class GridDhtPartitionDemandPool<K, V> { "Topology version mismatch [exchId=" + exchFut.exchangeId() + ", topVer=" + top.topologyVersion() + ']'; - GridDhtPreloaderAssignments<K, V> assigns = new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion()); + GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); AffinityTopologyVersion topVer = assigns.topologyVersion(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java index 5d9677d..84ac7c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java @@ -43,9 +43,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** * Thread pool for supplying partitions to demanding nodes. */ -class GridDhtPartitionSupplyPool<K, V> { +class GridDhtPartitionSupplyPool { /** */ - private final GridCacheContext<K, V> cctx; + private final GridCacheContext<?, ?> cctx; /** */ private final IgniteLogger log; @@ -72,7 +72,7 @@ class GridDhtPartitionSupplyPool<K, V> { * @param cctx Cache context. * @param busyLock Shutdown lock. */ - GridDhtPartitionSupplyPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) { + GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) { assert cctx != null; assert busyLock != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index f4dcf3b..102176e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -45,6 +45,7 @@ import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; /** @@ -118,8 +119,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT private GridFutureAdapter<Boolean> initFut; /** Topology snapshot. */ - private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = - new AtomicReference<>(); + private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>(); /** Last committed cache version before next topology version use. */ private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>(); @@ -150,6 +150,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** Cache validation results. */ private volatile Map<Integer, Boolean> cacheValidRes; + /** Skip preload flag. */ + private boolean skipPreload; + /** * Dummy future created to trigger reassignments if partition * topology changed while preloading. @@ -227,23 +230,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT initFut = new GridFutureAdapter<>(); // Grab all nodes with order of equal or less than last joined node. - Collection<ClusterNode> nodes = CU.aliveCacheNodes(cctx, exchId.topologyVersion()); - - if (nodes.isEmpty()) { - initFut.onDone(true); + ClusterNode node = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion()); - onDone(exchId.topologyVersion()); - - return; - } - - oldestNode.set(CU.oldest(nodes)); - - assert oldestNode.get() != null; + oldestNode.set(node); if (log.isDebugEnabled()) - log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + - ", fut=" + this + ']'); + log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']'); } /** {@inheritDoc} */ @@ -263,6 +255,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** + * @return Skip preload flag. + */ + public boolean skipPreload() { + return skipPreload; + } + + /** * @return Dummy flag. */ public boolean dummy() { @@ -415,13 +414,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** - * @return Exchange id. - */ - GridDhtPartitionExchangeId key() { - return exchId; - } - - /** * @return Exchange ID. */ public GridDhtPartitionExchangeId exchangeId() { @@ -429,13 +421,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** - * @return Init future. - */ - IgniteInternalFuture<?> initFuture() { - return initFut; - } - - /** * @return {@code true} if entered to busy state. */ private boolean enterBusy() { @@ -464,8 +449,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (isDone()) return; - assert oldestNode.get() != null; - if (init.compareAndSet(false, true)) { if (isDone()) return; @@ -475,63 +458,98 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT // will return corresponding nodes. U.await(evtLatch); - if (!dummy && !forcePreload && F.isEmpty(reqs)) { // If exchange initiated by node join or leave. - assert discoEvt != null; + assert discoEvt != null : this; + assert !dummy && !forcePreload : this; + startCaches(); + + // True if client node joined or failed. + boolean clientNodeEvt; + + if (F.isEmpty(reqs)) { int type = discoEvt.type(); assert type == EVT_NODE_JOINED || type == EVT_NODE_LEFT || type == EVT_NODE_FAILED : discoEvt; - ClusterNode node = discoEvt.eventNode(); + clientNodeEvt = CU.clientNode(discoEvt.eventNode()); + } + else { + assert discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt; + + clientNodeEvt = false; + } - if (!node.isLocal()) { - boolean affNode = false; + if (clientNodeEvt) { + ClusterNode node = discoEvt.eventNode(); + if (!node.isLocal()) { // Client need to initialize affinity for local join event. for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; - if (CU.affinityNode(node, cacheCtx.config().getNodeFilter())) { - affNode = true; + cacheCtx.affinity().clientNodeTopologyChange(node, exchId.topologyVersion()); - break; - } + GridDhtPartitionTopology top = cacheCtx.topology(); + + GridDhtPartitionMap parts = top.partitions(node.id()); + + assert parts == null || parts.size() == 0 : parts; + + top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId())); } - if (!affNode) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) - continue; + if (exchId.isLeft()) + cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion()); - cacheCtx.affinity().clientNodeTopologyChange(node, exchId.topologyVersion()); + onDone(exchId.topologyVersion()); - GridDhtPartitionTopology top = cacheCtx.topology(); + skipPreload = true; - GridDhtPartitionMap parts = top.partitions(node.id()); + return; + } + } - assert parts == null || parts.size() == 0 : parts; + if (cctx.kernalContext().clientNode()) { + skipPreload = true; - top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId())); - } + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.isLocal()) + continue; - if (!exchId.isLeft()) { - rmtNodes = new ConcurrentLinkedQueue<>(F.asList(node)); + GridDhtPartitionTopology top = cacheCtx.topology(); - rmtIds = F.asList(node.id()); - } + top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId())); + } - ready.set(true); + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.isLocal()) + continue; - initFut.onDone(true); + initTopology(cacheCtx); + } - onDone(exchId.topologyVersion()); + if (oldestNode.get() != null) { + rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx, + exchId.topologyVersion())); - return; - } + rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes))); + + ready.set(true); + + initFut.onDone(true); + + if (log.isDebugEnabled()) + log.debug("Initialized future: " + this); + + sendPartitions(); } + else + onDone(exchId.topologyVersion()); + + return; } - startCaches(); + assert oldestNode.get() != null; for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (isCacheAdded(cacheCtx.cacheId())) { @@ -614,7 +632,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } // Grab all alive remote nodes with order of equal or less than last joined node. - rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx, + rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx, exchId.topologyVersion())); rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes))); @@ -821,7 +839,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @throws IgniteCheckedException If failed. */ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException { - GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.versions().last()); + GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, + cctx.kernalContext().clientNode(), cctx.versions().last()); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) @@ -1091,9 +1110,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT log.debug("Received full partition map from unexpected node [oldest=" + curOldest.id() + ", unexpectedNodeId=" + nodeId + ']'); - ClusterNode sender = cctx.discovery().node(nodeId); + ClusterNode snd = cctx.discovery().node(nodeId); - if (sender == null) { + if (snd == null) { if (log.isDebugEnabled()) log.debug("Sender node left grid, will ignore message from unexpected node [nodeId=" + nodeId + ", exchId=" + msg.exchangeId() + ']'); @@ -1102,7 +1121,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } // Will process message later if sender node becomes oldest node. - if (sender.order() > curOldest.order()) + if (snd.order() > curOldest.order()) fullMsgs.put(nodeId, msg); return; @@ -1141,8 +1160,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (cacheCtx != null) cacheCtx.topology().update(exchId, entry.getValue()); - else if (CU.oldest(cctx).isLocal()) - cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue()); + else { + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE); + + if (oldest != null && oldest.isLocal()) + cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue()); + } } } @@ -1201,40 +1224,47 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT boolean set = false; - ClusterNode newOldest = CU.oldest(cctx, exchId.topologyVersion()); - - // If local node is now oldest. - if (newOldest.id().equals(cctx.localNodeId())) { - synchronized (mux) { - if (oldestNode.compareAndSet(oldest, newOldest)) { - // If local node is just joining. - if (exchId.nodeId().equals(cctx.localNodeId())) { - try { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) - cacheCtx.topology().beforeExchange( - GridDhtPartitionsExchangeFuture.this); + for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); ) { + if (it.next().id().equals(nodeId)) + it.remove(); + } + + ClusterNode newOldest = CU.oldest(rmtNodes); + + if (newOldest != null) { + // If local node is now oldest. + if (newOldest.id().equals(cctx.localNodeId())) { + synchronized (mux) { + if (oldestNode.compareAndSet(oldest, newOldest)) { + // If local node is just joining. + if (exchId.nodeId().equals(cctx.localNodeId())) { + try { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (!cacheCtx.isLocal()) + cacheCtx.topology().beforeExchange( + GridDhtPartitionsExchangeFuture.this); + } } - } - catch (IgniteCheckedException e) { - onDone(e); + catch (IgniteCheckedException e) { + onDone(e); - return; + return; + } } - } - set = true; + set = true; + } } } - } - else { - synchronized (mux) { - set = oldestNode.compareAndSet(oldest, newOldest); - } + else { + synchronized (mux) { + set = oldestNode.compareAndSet(oldest, newOldest); + } - if (set && log.isDebugEnabled()) - log.debug("Reassigned oldest node [this=" + cctx.localNodeId() + - ", old=" + oldest.id() + ", new=" + newOldest.id() + ']'); + if (set && log.isDebugEnabled()) + log.debug("Reassigned oldest node [this=" + cctx.localNodeId() + + ", old=" + oldest.id() + ", new=" + newOldest.id() + ']'); + } } if (set) { @@ -1256,9 +1286,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert rmtNodes != null; - for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); ) + for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); ) { if (it.next().id().equals(nodeId)) it.remove(); + } if (allReceived() && ready.get() && replied.compareAndSet(false, true)) if (spreadPartitions()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 8256274..73794ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -59,8 +59,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** * @param id Exchange ID. * @param lastVer Last version. + * @param topVer Topology version. */ - public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, @Nullable GridCacheVersion lastVer, + public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, + @Nullable GridCacheVersion lastVer, @NotNull AffinityTopologyVersion topVer) { super(id, lastVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 66140cd..713a80b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -45,6 +45,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** Serialized partitions. */ private byte[] partsBytes; + /** */ + private boolean client; + /** * Required by {@link Externalizable}. */ @@ -54,10 +57,22 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** * @param exchId Exchange ID. + * @param client Client message flag. * @param lastVer Last version. */ - public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer) { + public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, + boolean client, + @Nullable GridCacheVersion lastVer) { super(exchId, lastVer); + + this.client = client; + } + + /** + * @return {@code True} if sent from client node. + */ + public boolean client() { + return client; } /** @@ -110,6 +125,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes switch (writer.state()) { case 5: + if (!writer.writeBoolean("client", client)) + return false; + + writer.incrementState(); + + case 6: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; @@ -132,6 +153,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes switch (reader.state()) { case 5: + client = reader.readBoolean("client"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -151,7 +180,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 6; + return 7; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index d6373f0..61ba8b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -60,10 +60,10 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<K, V>> forceKeyFuts = newMap(); /** Partition suppliers. */ - private GridDhtPartitionSupplyPool<K, V> supplyPool; + private GridDhtPartitionSupplyPool supplyPool; /** Partition demanders. */ - private GridDhtPartitionDemandPool<K, V> demandPool; + private GridDhtPartitionDemandPool demandPool; /** Start future. */ private final GridFutureAdapter<Object> startFut; @@ -158,8 +158,8 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { } }); - supplyPool = new GridDhtPartitionSupplyPool<>(cctx, busyLock); - demandPool = new GridDhtPartitionDemandPool<>(cctx, busyLock); + supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock); + demandPool = new GridDhtPartitionDemandPool(cctx, busyLock); cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); } @@ -253,12 +253,12 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { } /** {@inheritDoc} */ - @Override public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) { + @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { return demandPool.assign(exchFut); } /** {@inheritDoc} */ - @Override public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload) { + @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) { demandPool.addAssignments(assignments, forcePreload); } @@ -271,7 +271,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> syncFuture() { - return demandPool.syncFuture(); + return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>() : demandPool.syncFuture(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java index 369fc68..2f6ef6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java @@ -27,8 +27,7 @@ import java.util.concurrent.*; /** * Partition to node assignments. */ -public class GridDhtPreloaderAssignments<K, V> extends - ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage> { +public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage> { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java index c6ede61..d5d80ab 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; @@ -71,6 +72,16 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra private String sizePropVal; /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (testClientNode() && getTestGridName(0).equals(gridName)) + cfg.setClientMode(true); + + return cfg; + } + + /** {@inheritDoc} */ @Override protected int gridCount() { return GRID_CNT; } @@ -106,9 +117,18 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra } /** + * @return {@code True} if test updates from client node. + */ + protected boolean testClientNode() { + return false; + } + + /** * @throws Exception If failed. */ public void testPutAndRemove() throws Exception { + assertEquals(testClientNode(), (boolean)grid(0).configuration().isClientMode()); + final IgniteCache<Integer, Integer> sndCache0 = grid(0).cache(null); final AtomicBoolean stop = new AtomicBoolean(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java new file mode 100644 index 0000000..c233bb9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +/** + * + */ +public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + return cfg; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java deleted file mode 100644 index 66db3c6..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.managers.communication.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; -import org.apache.ignite.plugin.extensions.communication.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.communication.tcp.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; -import org.eclipse.jetty.util.*; - -import java.util.*; - -/** - * - */ -public class IgniteCacheClientNodeExchangeTest extends GridCommonAbstractTest { - /** */ - protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** */ - private boolean client; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); - - cfg.setClientMode(client); - - CacheConfiguration ccfg = new CacheConfiguration(); - - cfg.setCacheConfiguration(ccfg); - - cfg.setCommunicationSpi(new TestCommunicationSpi()); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - super.afterTest(); - - stopAllGrids(); - } - - /** - * @throws Exception If failed. - */ - public void testNoPartitionExchangeForClient() throws Exception { - Ignite ignite0 = startGrid(0); - - TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi(); - - Ignite ignite1 = startGrid(1); - - TestCommunicationSpi spi1 = (TestCommunicationSpi)ignite1.configuration().getCommunicationSpi(); - - assertEquals(0, spi0.partitionsSingleMessages().size()); - assertEquals(1, spi0.partitionsFullMessages().size()); - - assertEquals(1, spi1.partitionsSingleMessages().size()); - assertEquals(0, spi1.partitionsFullMessages().size()); - - spi0.reset(); - spi1.reset(); - - client = true; - - for (int i = 0; i < 3; i++) { - log.info("Start client node: " + i); - - Ignite ignite2 = startGrid(2); - - TestCommunicationSpi spi2 = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); - - assertEquals(0, spi0.partitionsSingleMessages().size()); - assertEquals(1, spi0.partitionsFullMessages().size()); - - assertEquals(0, spi1.partitionsSingleMessages().size()); - assertEquals(0, spi1.partitionsFullMessages().size()); - - assertEquals(1, spi2.partitionsSingleMessages().size()); - assertEquals(0, spi2.partitionsFullMessages().size()); - - spi0.reset(); - spi1.reset(); - spi2.reset(); - - log.info("Stop client node."); - - ignite2.close(); - - assertEquals(0, spi0.partitionsSingleMessages().size()); - assertEquals(0, spi0.partitionsFullMessages().size()); - - assertEquals(0, spi1.partitionsSingleMessages().size()); - assertEquals(0, spi1.partitionsFullMessages().size()); - } - } - - /** - * Test communication SPI. - */ - private static class TestCommunicationSpi extends TcpCommunicationSpi { - /** */ - private ConcurrentHashSet<GridDhtPartitionsSingleMessage> partSingleMsgs = new ConcurrentHashSet<>(); - - /** */ - private ConcurrentHashSet<GridDhtPartitionsFullMessage> partFullMsgs = new ConcurrentHashSet<>(); - - /** */ - @LoggerResource - private IgniteLogger log; - - /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) { - super.sendMessage(node, msg); - - Object msg0 = ((GridIoMessage)msg).message(); - - if (msg0 instanceof GridDhtPartitionsSingleMessage) { - if (((GridDhtPartitionsSingleMessage)msg0).exchangeId() != null) { - log.info("Partitions message: " + msg0.getClass().getSimpleName()); - - partSingleMsgs.add((GridDhtPartitionsSingleMessage) msg0); - } - } - else if (msg0 instanceof GridDhtPartitionsFullMessage) { - if (((GridDhtPartitionsFullMessage)msg0).exchangeId() != null) { - log.info("Partitions message: " + msg0.getClass().getSimpleName()); - - partFullMsgs.add((GridDhtPartitionsFullMessage) msg0); - } - } - } - - /** - * - */ - void reset() { - partSingleMsgs.clear(); - partFullMsgs.clear(); - } - - /** - * @return Sent partitions single messages. - */ - Collection<GridDhtPartitionsSingleMessage> partitionsSingleMessages() { - return partSingleMsgs; - } - - /** - * @return Sent partitions full messages. - */ - Collection<GridDhtPartitionsFullMessage> partitionsFullMessages() { - return partFullMsgs; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java new file mode 100644 index 0000000..3fac400 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.fair.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** */ + private boolean fairAffinity; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + CacheConfiguration ccfg = new CacheConfiguration(); + + if (fairAffinity) + ccfg.setAffinity(new FairAffinityFunction()); + + cfg.setCacheConfiguration(ccfg); + + cfg.setCommunicationSpi(new TestCommunicationSpi()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testServerNodeLeave() throws Exception { + Ignite ignite0 = startGrid(0); + + client = true; + + final Ignite ignite1 = startGrid(1); + + waitForTopologyUpdate(2, 2); + + final Ignite ignite2 = startGrid(2); + + waitForTopologyUpdate(3, 3); + + ignite0.close(); + + waitForTopologyUpdate(2, 4); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + ignite1.cache(null).get(1); + + return null; + } + }, CacheServerNotFoundException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + ignite2.cache(null).get(1); + + return null; + } + }, CacheServerNotFoundException.class, null); + + ignite1.close(); + + waitForTopologyUpdate(1, 5); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + ignite2.cache(null).get(1); + + return null; + } + }, CacheServerNotFoundException.class, null); + } + + /** + * @throws Exception If failed. + */ + public void testSkipPreload() throws Exception { + Ignite ignite0 = startGrid(0); + + final CountDownLatch evtLatch0 = new CountDownLatch(1); + + ignite0.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + log.info("Rebalance event: " + evt); + + evtLatch0.countDown(); + + return true; + } + }, EventType.EVT_CACHE_REBALANCE_STARTED, EventType.EVT_CACHE_REBALANCE_STOPPED); + + client = true; + + Ignite ignite1 = startGrid(1); + + assertFalse(evtLatch0.await(1000, TimeUnit.MILLISECONDS)); + + ignite1.close(); + + assertFalse(evtLatch0.await(1000, TimeUnit.MILLISECONDS)); + + ignite1 = startGrid(1); + + final CountDownLatch evtLatch1 = new CountDownLatch(1); + + ignite1.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + log.info("Rebalance event: " + evt); + + evtLatch1.countDown(); + + return true; + } + }, EventType.EVT_CACHE_REBALANCE_STARTED, EventType.EVT_CACHE_REBALANCE_STOPPED); + + assertFalse(evtLatch0.await(1000, TimeUnit.MILLISECONDS)); + + client = false; + + startGrid(2); + + assertTrue(evtLatch0.await(1000, TimeUnit.MILLISECONDS)); + assertFalse(evtLatch1.await(1000, TimeUnit.MILLISECONDS)); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionsExchange() throws Exception { + partitionsExchange(); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionsExchangeFairAffinity() throws Exception { + fairAffinity = true; + + partitionsExchange(); + } + + /** + * @throws Exception If failed. + */ + private void partitionsExchange() throws Exception { + Ignite ignite0 = startGrid(0); + + TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi(); + + Ignite ignite1 = startGrid(1); + + waitForTopologyUpdate(2, 2); + + TestCommunicationSpi spi1 = (TestCommunicationSpi)ignite1.configuration().getCommunicationSpi(); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(1, spi0.partitionsFullMessages()); + + assertEquals(1, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + + spi0.reset(); + spi1.reset(); + + client = true; + + log.info("Start client node1."); + + Ignite ignite2 = startGrid(2); + + waitForTopologyUpdate(3, 3); + + TestCommunicationSpi spi2 = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(1, spi0.partitionsFullMessages()); + + assertEquals(0, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + + assertEquals(1, spi2.partitionsSingleMessages()); + assertEquals(0, spi2.partitionsFullMessages()); + + spi0.reset(); + spi1.reset(); + spi2.reset(); + + log.info("Start client node2."); + + Ignite ignite3 = startGrid(3); + + waitForTopologyUpdate(4, 4); + + TestCommunicationSpi spi3 = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(1, spi0.partitionsFullMessages()); + + assertEquals(0, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + + assertEquals(0, spi2.partitionsSingleMessages()); + assertEquals(0, spi2.partitionsFullMessages()); + + assertEquals(1, spi3.partitionsSingleMessages()); + assertEquals(0, spi3.partitionsFullMessages()); + + spi0.reset(); + spi1.reset(); + spi2.reset(); + spi3.reset(); + + log.info("Start one more server node."); + + client = false; + + Ignite ignite4 = startGrid(4); + + waitForTopologyUpdate(5, 5); + + TestCommunicationSpi spi4 = (TestCommunicationSpi)ignite4.configuration().getCommunicationSpi(); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(4, spi0.partitionsFullMessages()); + + assertEquals(1, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + + assertEquals(1, spi2.partitionsSingleMessages()); + assertEquals(0, spi2.partitionsFullMessages()); + + assertEquals(1, spi3.partitionsSingleMessages()); + assertEquals(0, spi3.partitionsFullMessages()); + + assertEquals(1, spi4.partitionsSingleMessages()); + assertEquals(0, spi4.partitionsFullMessages()); + + spi0.reset(); + spi1.reset(); + spi2.reset(); + spi3.reset(); + + log.info("Stop server node."); + + ignite4.close(); + + waitForTopologyUpdate(4, 6); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(3, spi0.partitionsFullMessages()); + + assertEquals(1, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + + assertEquals(1, spi2.partitionsSingleMessages()); + assertEquals(0, spi2.partitionsFullMessages()); + + assertEquals(1, spi3.partitionsSingleMessages()); + assertEquals(0, spi3.partitionsFullMessages()); + + spi0.reset(); + spi1.reset(); + spi2.reset(); + + log.info("Stop client node2."); + + ignite3.close(); + + waitForTopologyUpdate(3, 7); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(0, spi0.partitionsFullMessages()); + + assertEquals(0, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + + assertEquals(0, spi2.partitionsSingleMessages()); + assertEquals(0, spi2.partitionsFullMessages()); + + spi0.reset(); + spi1.reset(); + + log.info("Stop client node1."); + + ignite2.close(); + + waitForTopologyUpdate(2, 8); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(0, spi0.partitionsFullMessages()); + + assertEquals(0, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + + log.info("Stop server node."); + + ignite1.close(); + + waitForTopologyUpdate(1, 9); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(0, spi0.partitionsFullMessages()); + } + + /** + * @param expNodes Expected number of nodes. + * @param topVer Expected topology version. + * @throws Exception If failed. + */ + private void waitForTopologyUpdate(int expNodes, int topVer) throws Exception { + List<Ignite> nodes = G.allGrids(); + + assertEquals(expNodes, nodes.size()); + + final AffinityTopologyVersion ver = new AffinityTopologyVersion(topVer, 0); + + for (Ignite ignite : nodes) { + final IgniteKernal kernal = (IgniteKernal)ignite; + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return ver.equals(kernal.context().cache().context().exchange().readyAffinityVersion()); + } + }, 10_000); + + assertEquals("Unexpected affinity version for " + ignite.name(), + ver, + kernal.context().cache().context().exchange().readyAffinityVersion()); + } + + Iterator<Ignite> it = nodes.iterator(); + + Ignite ignite0 = it.next(); + + Affinity<Integer> aff0 = ignite0.affinity(null); + + while (it.hasNext()) { + Ignite ignite = it.next(); + + Affinity<Integer> aff = ignite.affinity(null); + + assertEquals(aff0.partitions(), aff.partitions()); + + for (int part = 0; part < aff.partitions(); part++) + assertEquals(aff0.mapPartitionToPrimaryAndBackups(part), aff.mapPartitionToPrimaryAndBackups(part)); + } + + for (Ignite ignite : nodes) { + final IgniteKernal kernal = (IgniteKernal)ignite; + + for (IgniteInternalCache cache : kernal.context().cache().caches()) { + GridDhtPartitionTopology top = cache.context().topology(); + + assertEquals("Unexpected topology version [node=" + ignite.name() + ", cache=" + cache.name() + ']', + ver, + top.topologyVersion()); + } + } + + awaitPartitionMapExchange(); + } + + /** + * Test communication SPI. + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** */ + private AtomicInteger partSingleMsgs = new AtomicInteger(); + + /** */ + private AtomicInteger partFullMsgs = new AtomicInteger(); + + /** */ + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) { + super.sendMessage(node, msg); + + Object msg0 = ((GridIoMessage)msg).message(); + + if (msg0 instanceof GridDhtPartitionsSingleMessage) { + if (((GridDhtPartitionsSingleMessage)msg0).exchangeId() != null) { + log.info("Partitions message: " + msg0.getClass().getSimpleName()); + + partSingleMsgs.incrementAndGet(); + } + } + else if (msg0 instanceof GridDhtPartitionsFullMessage) { + if (((GridDhtPartitionsFullMessage)msg0).exchangeId() != null) { + log.info("Partitions message: " + msg0.getClass().getSimpleName()); + + partFullMsgs.incrementAndGet(); + } + } + } + + /** + * + */ + void reset() { + partSingleMsgs.set(0); + partFullMsgs.set(0); + } + + /** + * @return Sent partitions single messages. + */ + int partitionsSingleMessages() { + return partSingleMsgs.get(); + } + + /** + * @return Sent partitions full messages. + */ + int partitionsFullMessages() { + return partFullMsgs.get(); + } + } + +}