# ignite-23 remap for tx updates from client
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0163cece Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0163cece Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0163cece Branch: refs/heads/ignite-943 Commit: 0163cecebdf310b2c7bd7c2cea4780facdb16d0c Parents: cceb303 Author: sboikov <sboi...@gridgain.com> Authored: Fri May 22 14:01:04 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri May 22 18:33:16 2015 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 9 +- .../GridCachePartitionExchangeManager.java | 2 +- .../processors/cache/GridCacheUtils.java | 182 ----------------- .../dht/GridClientPartitionTopology.java | 8 +- .../distributed/dht/GridDhtCacheAdapter.java | 29 +++ .../dht/GridDhtPartitionTopologyImpl.java | 2 +- .../dht/GridDhtTransactionalCacheAdapter.java | 4 +- .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 4 +- .../colocated/GridDhtColocatedLockFuture.java | 23 ++- .../distributed/near/GridNearLockFuture.java | 23 ++- .../distributed/near/GridNearLockMapping.java | 17 -- .../cache/query/GridCacheQueryAdapter.java | 10 +- .../cache/transactions/IgniteTxHandler.java | 33 ++- .../GridDiscoveryManagerAliveCacheSelfTest.java | 4 +- ...niteCacheClientNodeChangingTopologyTest.java | 203 ++++++++++++++++++- ...teCacheClientNodePartitionsExchangeTest.java | 43 ++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 3 +- 18 files changed, 355 insertions(+), 246 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 7130421..a36873a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -2134,12 +2134,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { allNodes = Collections.unmodifiableList(all); - Map<String, Collection<ClusterNode>> cacheMap = - new HashMap<>(allNodes.size(), 1.0f); - Map<String, Collection<ClusterNode>> rmtCacheMap = - new HashMap<>(allNodes.size(), 1.0f); - Map<String, Collection<ClusterNode>> dhtNodesMap = - new HashMap<>(allNodes.size(), 1.0f); + Map<String, Collection<ClusterNode>> cacheMap = new HashMap<>(allNodes.size(), 1.0f); + Map<String, Collection<ClusterNode>> rmtCacheMap = new HashMap<>(allNodes.size(), 1.0f); + Map<String, Collection<ClusterNode>> dhtNodesMap =new HashMap<>(allNodes.size(), 1.0f); Collection<ClusterNode> nodesWithCaches = new HashSet<>(allNodes.size()); Collection<ClusterNode> rmtNodesWithCaches = new HashSet<>(allNodes.size()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 41a13ba..30ec46a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -571,7 +571,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana try { // If this is the oldest node. if (oldest.id().equals(cctx.localNodeId())) { - rmts = CU.remoteNodes(cctx); + rmts = CU.remoteNodes(cctx, AffinityTopologyVersion.NONE); if (log.isDebugEnabled()) log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 4041f13..32d6acb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -114,13 +114,6 @@ public class GridCacheUtils { } }; - /** Not evicted partitions. */ - private static final IgnitePredicate PART_NOT_EVICTED = new P1<GridDhtLocalPartition>() { - @Override public boolean apply(GridDhtLocalPartition p) { - return p.state() != GridDhtPartitionState.EVICTED; - } - }; - /** */ private static final IgniteClosure<Integer, GridCacheVersion[]> VER_ARR_FACTORY = new C1<Integer, GridCacheVersion[]>() { @@ -403,25 +396,6 @@ public class GridCacheUtils { } /** - * @return Not evicted partitions. - */ - @SuppressWarnings( {"unchecked"}) - public static <K, V> IgnitePredicate<GridDhtLocalPartition> notEvicted() { - return PART_NOT_EVICTED; - } - - /** - * Gets all nodes on which cache with the same name is started. - * - * @param ctx Cache context. - * @return All nodes on which cache with the same name is started (including nodes - * that may have already left). - */ - public static Collection<ClusterNode> allNodes(GridCacheContext ctx) { - return allNodes(ctx, AffinityTopologyVersion.NONE); - } - - /** * Gets all nodes on which cache with the same name is started. * * @param ctx Cache context. @@ -446,59 +420,6 @@ public class GridCacheUtils { } /** - * Gets alive nodes. - * - * @param ctx Cache context. - * @param topOrder Maximum allowed node order. - * @return Affinity nodes. - */ - public static Collection<ClusterNode> aliveNodes(final GridCacheContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().aliveCacheNodes(ctx.namex(), topOrder); - } - - /** - * Gets remote nodes on which cache with the same name is started. - * - * @param ctx Cache context. - * @return Remote nodes on which cache with the same name is started. - */ - public static Collection<ClusterNode> remoteNodes(final GridCacheContext ctx) { - return remoteNodes(ctx, AffinityTopologyVersion.NONE); - } - - /** - * Gets remote node with at least one cache configured. - * - * @param ctx Shared cache context. - * @return Collection of nodes with at least one cache configured. - */ - public static Collection<ClusterNode> remoteNodes(GridCacheSharedContext ctx) { - return remoteNodes(ctx, AffinityTopologyVersion.NONE); - } - - /** - * Gets remote nodes on which cache with the same name is started. - * - * @param ctx Cache context. - * @param topOrder Maximum allowed node order. - * @return Remote nodes on which cache with the same name is started. - */ - public static Collection<ClusterNode> remoteNodes(final GridCacheContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().remoteCacheNodes(ctx.namex(), topOrder); - } - - /** - * Gets alive nodes. - * - * @param ctx Cache context. - * @param topOrder Maximum allowed node order. - * @return Affinity nodes. - */ - public static Collection<ClusterNode> aliveRemoteNodes(final GridCacheContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().aliveRemoteCacheNodes(ctx.namex(), topOrder); - } - - /** * Gets remote nodes with at least one cache configured. * * @param ctx Cache shared context. @@ -510,17 +431,6 @@ public class GridCacheUtils { } /** - * Gets alive nodes with at least one cache configured. - * - * @param ctx Cache context. - * @param topOrder Maximum allowed node order. - * @return Affinity nodes. - */ - public static Collection<ClusterNode> aliveCacheNodes(final GridCacheSharedContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().aliveNodesWithCaches(topOrder); - } - - /** * Gets alive remote nodes with at least one cache configured. * * @param ctx Cache context. @@ -578,74 +488,6 @@ public class GridCacheUtils { } /** - * Checks if given node has specified cache started. - * - * @param cacheName Cache name. - * @param node Node to check. - * @return {@code True} if given node has specified cache started. - */ - public static boolean cacheNode(String cacheName, ClusterNode node) { - return cacheNode(cacheName, (GridCacheAttributes[])node.attribute(ATTR_CACHE)); - } - - /** - * Checks if given attributes relate the the node which has (or had) specified cache started. - * - * @param cacheName Cache name. - * @param caches Node cache attributes. - * @return {@code True} if given node has specified cache started. - */ - public static boolean cacheNode(String cacheName, GridCacheAttributes[] caches) { - if (caches != null) - for (GridCacheAttributes attrs : caches) - if (F.eq(cacheName, attrs.cacheName())) - return true; - - return false; - } - - /** - * Gets oldest alive node for specified topology version. - * - * @param cctx Cache context. - * @param topOrder Maximum allowed node order. - * @return Oldest node for the given topology version. - */ - public static ClusterNode oldest(GridCacheContext cctx, AffinityTopologyVersion topOrder) { - ClusterNode oldest = null; - - for (ClusterNode n : aliveNodes(cctx, topOrder)) - if (oldest == null || n.order() < oldest.order()) - oldest = n; - - assert oldest != null : "Failed to find oldest node for cache context [name=" + cctx.name() + ", topOrder=" + topOrder + ']'; - assert oldest.order() <= topOrder.topologyVersion() || AffinityTopologyVersion.NONE.equals(topOrder); - - return oldest; - } - - /** - * Gets oldest alive node with at least one cache configured for specified topology version. - * - * @param cctx Shared cache context. - * @param topOrder Maximum allowed node order. - * @return Oldest node for the given topology version. - */ - public static ClusterNode oldest(GridCacheSharedContext cctx, AffinityTopologyVersion topOrder) { - ClusterNode oldest = oldest(aliveCacheNodes(cctx, topOrder)); - - for (ClusterNode n : aliveCacheNodes(cctx, topOrder)) { - if (oldest == null || n.order() < oldest.order()) - oldest = n; - } - - assert oldest != null : "Failed to find oldest node with caches: " + topOrder; - assert oldest.order() <= topOrder.topologyVersion() || AffinityTopologyVersion.NONE.equals(topOrder); - - return oldest; - } - - /** * Gets oldest alive server node with at least one cache configured for specified topology version. * * @param ctx Context. @@ -731,30 +573,6 @@ public class GridCacheUtils { } /** - * @return Closure that converts tx entry to key. - */ - @SuppressWarnings({"unchecked"}) - public static <K, V> IgniteClosure<IgniteTxEntry, K> tx2key() { - return (IgniteClosure<IgniteTxEntry, K>)tx2key; - } - - /** - * @return Closure that converts tx entry collection to key collection. - */ - @SuppressWarnings({"unchecked"}) - public static <K, V> IgniteClosure<Collection<IgniteTxEntry>, Collection<K>> txCol2Key() { - return (IgniteClosure<Collection<IgniteTxEntry>, Collection<K>>)txCol2key; - } - - /** - * @return Converts transaction entry to cache entry. - */ - @SuppressWarnings( {"unchecked"}) - public static <K, V> IgniteClosure<IgniteTxEntry, GridCacheEntryEx> tx2entry() { - return (IgniteClosure<IgniteTxEntry, GridCacheEntryEx>)tx2entry; - } - - /** * @return Closure which converts transaction entry xid to XID version. */ @SuppressWarnings( {"unchecked"}) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 331de4e..2049d03 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -210,7 +210,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { removeNode(exchId.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer); + + assert oldest != null; if (log.isDebugEnabled()) log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); @@ -665,7 +667,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { assert nodeId.equals(cctx.localNodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer); // If this node became the oldest node. if (oldest.id().equals(cctx.localNodeId())) { @@ -715,7 +717,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { assert nodeId != null; assert lock.writeLock().isHeldByCurrentThread(); - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer); ClusterNode loc = cctx.localNode(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index b5641b9..dc29c8a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -932,6 +932,35 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** + * @param expVer Expected topology version. + * @param curVer Current topology version. + * @param keys Keys. + * @return {@code True} if cache affinity changed and operation should be remapped. + */ + protected final boolean needRemap(AffinityTopologyVersion expVer, + AffinityTopologyVersion curVer, + Collection<?> keys) + { + if (expVer.equals(curVer)) + return false; + + Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer); + Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer); + + if (!cacheNodes0.equals(cacheNodes1)) { + for (Object key : keys) { + Collection<ClusterNode> keyNodes0 = ctx.affinity().nodes(key, expVer); + Collection<ClusterNode> keyNodes1 = ctx.affinity().nodes(key, curVer); + + if (!keyNodes0.equals(keyNodes1)) + return true; + } + } + + return false; + } + + /** * @param primary If {@code true} includes primary entries. * @param backup If {@code true} includes backup entries. * @return Local entries iterator. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 761dbce..1ae4ae7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -662,7 +662,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @return List of nodes for the partition. */ private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { - Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null; + Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null; lock.readLock().lock(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 60e891c..fd58ef4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -731,7 +731,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } try { - if (top != null && !top.topologyVersion().equals(req.topologyVersion())) { + if (top != null && needRemap(req.topologyVersion(), top.topologyVersion(), req.keys())) { if (log.isDebugEnabled()) { log.debug("Client topology version mismatch, need remap lock request [" + "reqTopVer=" + req.topologyVersion() + @@ -828,7 +828,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } try { - if (top != null && !top.topologyVersion().equals(req.topologyVersion())) { + if (top != null && needRemap(req.topologyVersion(), top.topologyVersion(), req.keys())) { if (log.isDebugEnabled()) { log.debug("Client topology version mismatch, need remap lock request [" + "reqTopVer=" + req.topologyVersion() + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 9315b28..301943a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1059,7 +1059,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Also do not check topology version if topology was locked on near node by // external transaction or explicit lock. if ((req.fastMap() && !clientReq) || req.topologyLocked() || - topology().topologyVersion().equals(req.topologyVersion())) { + !needRemap(req.topologyVersion(), topology().topologyVersion(), req.keys())) { boolean hasNear = ctx.discovery().cacheNearNode(node, name()); GridCacheVersion ver = req.updateVersion(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index f78ced3..82659ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -345,7 +345,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (res.remapKeys() != null) { assert !fastMap || cctx.kernalContext().clientNode(); - mapOnTopology(res.remapKeys(), true, nodeId, true); + Collection<?> remapKeys = fastMap && cctx.kernalContext().clientNode() ? null : res.remapKeys(); + + mapOnTopology(remapKeys, true, nodeId, true); return; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 788a101..500495a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -647,7 +647,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture boolean clientNode = cctx.kernalContext().clientNode(); - assert !remap || (clientNode && !tx.hasRemoteLocks()); + assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks())); // First assume this node is primary for all keys passed in. if (!clientNode && mapAsPrimary(keys, topVer)) @@ -658,18 +658,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture // Assign keys to primary nodes. GridNearLockMapping map = null; - boolean first = true; - for (KeyCacheObject key : keys) { GridNearLockMapping updated = map(key, map, topVer); - if (first) { - if (clientNode) - updated.clientFirst(tx == null || !tx.hasRemoteLocks()); - - first = false; - } - // If new mapping was created, add to collection. if (updated != map) { mappings.add(updated); @@ -693,6 +684,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture boolean hasRmtNodes = false; + boolean first = true; + // Create mini futures. for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) { GridNearLockMapping mapping = iter.next(); @@ -761,6 +754,14 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture if (cand != null && !cand.reentry()) { if (req == null) { + boolean clientFirst = false; + + if (first) { + clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks()); + + first = false; + } + req = new GridNearLockRequest( cctx.cacheId(), topVer, @@ -783,7 +784,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture inTx() ? tx.taskNameHash() : 0, read ? accessTtl : -1L, skipStore, - mapping.clientFirst()); + clientFirst); mapping.request(req); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 001c78c..3d28018 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -778,25 +778,16 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean boolean clientNode = cctx.kernalContext().clientNode(); - assert !remap || (clientNode && !tx.hasRemoteLocks()); + assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks())); ConcurrentLinkedDeque8<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>(); // Assign keys to primary nodes. GridNearLockMapping map = null; - boolean first = true; - for (KeyCacheObject key : keys) { GridNearLockMapping updated = map(key, map, topVer); - if (first) { - if (clientNode) - updated.clientFirst(tx == null || !tx.hasRemoteLocks()); - - first = false; - } - // If new mapping was created, add to collection. if (updated != map) { mappings.add(updated); @@ -818,6 +809,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean if (log.isDebugEnabled()) log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']'); + boolean first = true; + // Create mini futures. for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) { GridNearLockMapping mapping = iter.next(); @@ -895,6 +888,14 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean if (!cand.reentry()) { if (req == null) { + boolean clientFirst = false; + + if (first) { + clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks()); + + first = false; + } + req = new GridNearLockRequest( cctx.cacheId(), topVer, @@ -917,7 +918,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean inTx() ? tx.taskNameHash() : 0, read ? accessTtl : -1L, skipStore, - mapping.clientFirst()); + clientFirst); mapping.request(req); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java index 3ea5b7c..51000ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java @@ -44,9 +44,6 @@ public class GridNearLockMapping { @GridToStringInclude private Collection<KeyCacheObject> distributedKeys; - /** {@code True} if this is first mapping for lock operation on client node. */ - private boolean clientFirst; - /** * Creates near lock mapping for specified node and key. * @@ -63,20 +60,6 @@ public class GridNearLockMapping { } /** - * @return {@code True} if this is first mapping for lock operation on client node. - */ - public boolean clientFirst() { - return clientFirst; - } - - /** - * @param clientFirst {@code True} if this is first mapping for lock operation on client node. - */ - public void clientFirst(boolean clientFirst) { - this.clientFirst = clientFirst; - } - - /** * @return Node to which keys are mapped. */ public ClusterNode node() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 4b1fc87..7e3fb26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -470,10 +470,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, @Nullable final ClusterGroup prj) { assert cctx != null; - return F.view(CU.allNodes(cctx), new P1<ClusterNode>() { + Collection<ClusterNode> affNodes = CU.affinityNodes(cctx); + + if (prj == null) + return affNodes; + + return F.view(affNodes, new P1<ClusterNode>() { @Override public boolean apply(ClusterNode n) { - return cctx.discovery().cacheAffinityNode(n, cctx.name()) && - (prj == null || prj.node(n.id()) != null); + return prj.node(n.id()) != null; } }); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index aba1185..c5d5240 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; @@ -273,7 +274,7 @@ public class IgniteTxHandler { } try { - if (top != null && !top.topologyVersion().equals(req.topologyVersion())) { + if (top != null && needRemap(req.topologyVersion(), top.topologyVersion(), req)) { if (log.isDebugEnabled()) { log.debug("Client topology version mismatch, need remap transaction [" + "reqTopVer=" + req.topologyVersion() + @@ -403,6 +404,36 @@ public class IgniteTxHandler { } /** + * @param expVer Expected topology version. + * @param curVer Current topology version. + * @param req Request. + * @return {@code True} if cache affinity changed and request should be remapped. + */ + private boolean needRemap(AffinityTopologyVersion expVer, + AffinityTopologyVersion curVer, + GridNearTxPrepareRequest req) { + if (expVer.equals(curVer)) + return false; + + for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) { + GridCacheContext ctx = e.context(); + + Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer); + Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer); + + if (!cacheNodes0.equals(cacheNodes1)) { + Collection<ClusterNode> keyNodes0 = ctx.affinity().nodes(e.key(), expVer); + Collection<ClusterNode> keyNodes1 = ctx.affinity().nodes(e.key(), curVer); + + if (!keyNodes0.equals(keyNodes1)) + return true; + } + } + + return false; + } + + /** * @param nodeId Node ID. * @param res Response. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java index dce7d4b..6490d21 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java @@ -124,7 +124,7 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe } /** - * + * @throws Exception If failed. */ private void doTestAlive() throws Exception { for (int i = 0; i < ITERATIONS; i++) { @@ -220,7 +220,7 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe }); assertTrue( - currTop.contains(GridCacheUtils.oldest(k.internalCache().context(), new AffinityTopologyVersion(currVer)))); + currTop.contains(GridCacheUtils.oldestAliveCacheServerNode(k.context().cache().context(), new AffinityTopologyVersion(currVer)))); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java index c0c7614..c01ef6f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -298,7 +298,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac ccfg = new CacheConfiguration(); ccfg.setCacheMode(PARTITIONED); - ccfg.setBackups(0); + ccfg.setBackups(1); ccfg.setAtomicityMode(TRANSACTIONAL); ccfg.setWriteSynchronizationMode(FULL_SYNC); ccfg.setRebalanceMode(SYNC); @@ -315,7 +315,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac final Map<Integer, Integer> map = new HashMap<>(); - for (int i = 0; i < 1; i++) + for (int i = 0; i < 100; i++) map.put(i, i); TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); @@ -401,6 +401,204 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac /** * @throws Exception If failed. */ + public void _testLock() throws Exception { + lock(null); + } + + /** + * @throws Exception If failed. + */ + public void testLockNearEnabled() throws Exception { + lock(new NearCacheConfiguration()); + } + + /** + * @param nearCfg Near cache configuration. + * @throws Exception If failed. + */ + private void lock(NearCacheConfiguration nearCfg) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + ccfg.setNearConfiguration(nearCfg); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + client = true; + + final Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + final List<Integer> keys = new ArrayList<>(); + + for (int i = 0; i < 100; i++) + keys.add(i); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id()); + + final IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + final CountDownLatch lockedLatch = new CountDownLatch(1); + + final CountDownLatch unlockLatch = new CountDownLatch(1); + + IgniteInternalFuture<Lock> lockFut = GridTestUtils.runAsync(new Callable<Lock>() { + @Override public Lock call() throws Exception { + Thread.currentThread().setName("put-thread"); + + Lock lock = cache.lockAll(keys); + + lock.lock(); + + log.info("Locked"); + + lockedLatch.countDown(); + + unlockLatch.await(); + + lock.unlock(); + + return lock; + } + }); + + client = false; + + IgniteEx ignite3 = startGrid(3); + + log.info("Stop block."); + + assertEquals(1, lockedLatch.getCount()); + + spi.stopBlock(); + + assertTrue(lockedLatch.await(3000, TimeUnit.MILLISECONDS)); + + IgniteCache<Integer, Integer> cache0 = ignite0.cache(null); + + for (Integer key : keys) { + Lock lock = cache0.lock(key); + + assertFalse(lock.tryLock()); + } + + unlockLatch.countDown(); + + lockFut.get(); + + for (Integer key : keys) { + Lock lock = cache0.lock(key); + + assertTrue(lock.tryLock()); + + lock.unlock(); + } + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxMessageClientFirstFlag() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + IgniteEx ignite2 = startGrid(2); + + client = true; + + Ignite ignite3 = startGrid(3); + + assertTrue(ignite3.configuration().isClientMode()); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + + spi.record(GridNearLockRequest.class); + + IgniteCache<Integer, Integer> cache = ignite3.cache(null); + + try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(1, 1); + cache.put(2, 2); + cache.put(3, 3); + + tx.commit(); + } + + checkClientLockMessages(spi.recordedMessages(), 3); + + Map<Integer, Integer> map = new HashMap<>(); + + map.put(4, 4); + map.put(5, 5); + map.put(6, 6); + map.put(7, 7); + + try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(map); + + tx.commit(); + } + + checkClientLockMessages(spi.recordedMessages(), 4); + + spi.record(null); + + TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi(); + + spi0.record(GridNearLockRequest.class); + + List<Integer> keys = primaryKeys(ignite1.cache(null), 3, 0); + + IgniteCache<Integer, Integer> cache0 = ignite0.cache(null); + + try (Transaction tx = ignite0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache0.put(keys.get(0), 0); + cache0.put(keys.get(1), 1); + cache0.put(keys.get(2), 2); + + tx.commit(); + } + + List<Object> msgs = spi0.recordedMessages(); + + assertEquals(3, msgs.size()); + + for (Object msg : msgs) + assertFalse(((GridNearLockRequest)msg).firstClientRequest()); + } + + /** + * @param msgs Messages. + * @param expCnt Expected number of messages. + */ + private void checkClientLockMessages(List<Object> msgs, int expCnt) { + assertEquals(expCnt, msgs.size()); + + assertTrue(((GridNearLockRequest)msgs.get(0)).firstClientRequest()); + + for (int i = 1; i < msgs.size(); i++) + assertFalse(((GridNearLockRequest)msgs.get(i)).firstClientRequest()); + } + + /** + * @throws Exception If failed. + */ public void testOptimisticTxMessageClientFirstFlag() throws Exception { ccfg = new CacheConfiguration(); @@ -543,6 +741,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac /** * @param map Expected data. + * @param clientCache Client cache. * @param expNodes Expected nodes number. * @throws Exception If failed. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java index 3fac400..d680d26 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java @@ -425,6 +425,49 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr } /** + * @throws Exception If failed. + */ + public void testClientOnlyCacheStart() throws Exception { + Ignite ignite0 = startGrid(0); + Ignite ignite1 = startGrid(1); + + waitForTopologyUpdate(2, 2); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName("cache1"); + + ignite0.createCache(ccfg); + + client = true; + + Ignite ignite2 = startGrid(2); + + waitForTopologyUpdate(3, 3); + + TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi(); + TestCommunicationSpi spi1 = (TestCommunicationSpi)ignite1.configuration().getCommunicationSpi(); + TestCommunicationSpi spi2 = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + spi0.reset(); + spi1.reset(); + spi2.reset(); + + assertNull(((IgniteKernal) ignite2).context().cache().context().cache().internalCache("cache1")); + + ignite2.cache("cache1"); + + assertNotNull(((IgniteKernal) ignite2).context().cache().context().cache().internalCache("cache1")); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(0, spi0.partitionsFullMessages()); + assertEquals(0, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + assertEquals(0, spi2.partitionsSingleMessages()); + assertEquals(0, spi2.partitionsFullMessages()); + } + + /** * Test communication SPI. */ private static class TestCommunicationSpi extends TcpCommunicationSpi { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0163cece/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 506fa50..9634e9a 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -148,8 +148,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(GridCacheTxPartitionedLocalStoreSelfTest.class); suite.addTestSuite(IgniteCacheSystemTransactionsSelfTest.class); - // TODO IGNITE-23 temporary disabled. - // suite.addTest(IgniteCacheTcpClientDiscoveryTestSuite.suite()); + suite.addTest(IgniteCacheTcpClientDiscoveryTestSuite.suite()); // Heuristic exception handling. TODO IGNITE-257 // suite.addTestSuite(GridCacheColocatedTxExceptionSelfTest.class);