GG-9655 - Fixing tests after merge.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/212293eb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/212293eb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/212293eb Branch: refs/heads/sprint-1 Commit: 212293eb2bede9cca6610e7c43e870ab386bd3f8 Parents: 886299d Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Wed Feb 4 17:53:05 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Wed Feb 4 17:53:05 2015 -0800 ---------------------------------------------------------------------- .../processors/cache/GridCacheContext.java | 30 ++-- .../processors/cache/GridCacheMapEntry.java | 166 +++++++------------ .../processors/cache/GridCacheMvcc.java | 8 +- .../cache/GridCacheMvccCandidate.java | 18 -- .../distributed/dht/GridDhtCacheEntry.java | 10 +- .../distributed/dht/GridDhtLockFuture.java | 111 +------------ .../distributed/dht/GridDhtTxPrepareFuture.java | 3 + .../near/GridNearTxPrepareFuture.java | 32 +++- .../GridCacheAbstractJobExecutionTest.java | 11 ++ 9 files changed, 131 insertions(+), 258 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 1f624f8..c738b27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1482,7 +1482,7 @@ public class GridCacheContext<K, V> implements Externalizable { */ public boolean dhtMap(UUID nearNodeId, long topVer, GridDhtCacheEntry<K, V> entry, IgniteLogger log, Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> dhtMap, - Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> nearMap) throws GridCacheEntryRemovedException { + @Nullable Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> nearMap) throws GridCacheEntryRemovedException { assert topVer != -1; Collection<ClusterNode> dhtNodes = dht().topology().nodes(entry.partition(), topVer); @@ -1490,25 +1490,27 @@ public class GridCacheContext<K, V> implements Externalizable { if (log.isDebugEnabled()) log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']'); - Collection<UUID> readers = entry.readers(); + Collection<ClusterNode> dhtRemoteNodes = F.view(dhtNodes, F.remoteNodes(nodeId())); // Exclude local node. - Collection<ClusterNode> nearNodes = null; + boolean ret = map(entry, dhtRemoteNodes, dhtMap); - if (!F.isEmpty(readers)) { - nearNodes = discovery().nodes(readers, F0.notEqualTo(nearNodeId)); + if (nearMap != null) { + Collection<UUID> readers = entry.readers(); - if (log.isDebugEnabled()) - log.debug("Mapping entry to near nodes [nodes=" + U.nodeIds(nearNodes) + ", entry=" + entry + ']'); - } - else if (log.isDebugEnabled()) - log.debug("Entry has no near readers: " + entry); + Collection<ClusterNode> nearNodes = null; - Collection<ClusterNode> dhtRemoteNodes = F.view(dhtNodes, F.remoteNodes(nodeId())); // Exclude local node. + if (!F.isEmpty(readers)) { + nearNodes = discovery().nodes(readers, F0.notEqualTo(nearNodeId)); - boolean ret = map(entry, dhtRemoteNodes, dhtMap); + if (log.isDebugEnabled()) + log.debug("Mapping entry to near nodes [nodes=" + U.nodeIds(nearNodes) + ", entry=" + entry + ']'); + } + else if (log.isDebugEnabled()) + log.debug("Entry has no near readers: " + entry); - if (nearNodes != null && !nearNodes.isEmpty()) - ret |= map(entry, F.view(nearNodes, F.notIn(dhtNodes)), nearMap); + if (nearNodes != null && !nearNodes.isEmpty()) + ret |= map(entry, F.view(nearNodes, F.notIn(dhtNodes)), nearMap); + } return ret; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 61d4959..ada0c1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1089,16 +1089,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> assert newVer != null : "Failed to get write version for tx: " + tx; - if (tx != null && !tx.local() && tx.onePhaseCommit() && explicitVer == null) { - if (!(isNew() || !valid) && ver.compareTo(newVer) > 0) { - if (log.isDebugEnabled()) - log.debug("Skipping entry update for one-phase commit since current entry version is " + - "greater than write version [entry=" + this + ", newVer=" + newVer + ']'); - - return new GridCacheUpdateTxResult<>(false, null); - } - } - old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : this.val; GridCacheValueBytes oldBytes = valueBytesUnlocked(); @@ -1219,129 +1209,101 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> GridCacheVersion obsoleteVer = null; - GridCacheVersion enqueueVer = null; - boolean intercept = cctx.config().getInterceptor() != null; IgniteBiTuple<Boolean, V> interceptRes = null; - try { - synchronized (this) { - checkObsolete(); - - if (tx != null && tx.groupLock() && cctx.kernalContext().config().isCacheSanityCheckEnabled()) - groupLockSanityCheck(tx); - else - assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) : - "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']'; - - boolean startVer = isStartVersion(); - - if (startVer) { - if (tx != null && !tx.local() && tx.onePhaseCommit()) - // Must promote to check version for one-phase commit tx. - unswap(true, retval); - else - // Release swap. - releaseSwap(); - } + synchronized (this) { + checkObsolete(); - newVer = explicitVer != null ? explicitVer : tx == null ? nextVersion() : tx.writeVersion(); + if (tx != null && tx.groupLock() && cctx.kernalContext().config().isCacheSanityCheckEnabled()) + groupLockSanityCheck(tx); + else + assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) : + "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']'; - if (tx != null && !tx.local() && tx.onePhaseCommit() && explicitVer == null) { - if (!startVer && ver.compareTo(newVer) > 0) { - if (log.isDebugEnabled()) - log.debug("Skipping entry removal for one-phase commit since current entry version is " + - "greater than write version [entry=" + this + ", newVer=" + newVer + ']'); + boolean startVer = isStartVersion(); - return new GridCacheUpdateTxResult<>(false, null); - } + if (startVer) { + // Release swap. + releaseSwap(); + } - if (!detached()) - enqueueVer = newVer; - } + newVer = explicitVer != null ? explicitVer : tx == null ? nextVersion() : tx.writeVersion(); - old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : val; + old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : val; - if (intercept) { - interceptRes = cctx.config().<K, V>getInterceptor().onBeforeRemove(key, old); + if (intercept) { + interceptRes = cctx.config().<K, V>getInterceptor().onBeforeRemove(key, old); - if (cctx.cancelRemove(interceptRes)) - return new GridCacheUpdateTxResult<>(false, cctx.<V>unwrapTemporary(interceptRes.get2())); - } + if (cctx.cancelRemove(interceptRes)) + return new GridCacheUpdateTxResult<>(false, cctx.<V>unwrapTemporary(interceptRes.get2())); + } - GridCacheValueBytes oldBytes = valueBytesUnlocked(); + GridCacheValueBytes oldBytes = valueBytesUnlocked(); - if (old == null) - old = saveValueForIndexUnlocked(); + if (old == null) + old = saveValueForIndexUnlocked(); - // Clear indexes inside of synchronization since indexes - // can be updated without actually holding entry lock. - clearIndex(old); + // Clear indexes inside of synchronization since indexes + // can be updated without actually holding entry lock. + clearIndex(old); - boolean hadValPtr = valPtr != 0; + boolean hadValPtr = valPtr != 0; - update(null, null, 0, 0, newVer); + update(null, null, 0, 0, newVer); - if (cctx.offheapTiered() && hadValPtr) { - boolean rmv = cctx.swap().removeOffheap(key, getOrMarshalKeyBytes()); + if (cctx.offheapTiered() && hadValPtr) { + boolean rmv = cctx.swap().removeOffheap(key, getOrMarshalKeyBytes()); - assert rmv; - } + assert rmv; + } - if (cctx.deferredDelete() && !detached() && !isInternal()) { - if (!deletedUnlocked()) { - deletedUnlocked(true); + if (cctx.deferredDelete() && !detached() && !isInternal()) { + if (!deletedUnlocked()) { + deletedUnlocked(true); - if (tx != null) { - GridCacheMvcc<K> mvcc = mvccExtras(); + if (tx != null) { + GridCacheMvcc<K> mvcc = mvccExtras(); - if (mvcc == null || mvcc.isEmpty(tx.xidVersion())) - clearReaders(); - else - clearReader(tx.originatingNodeId()); - } + if (mvcc == null || mvcc.isEmpty(tx.xidVersion())) + clearReaders(); + else + clearReader(tx.originatingNodeId()); } } + } - drReplicate(drType, null, null, newVer); - - if (metrics && cctx.cache().configuration().isStatisticsEnabled()) - cctx.cache().metrics0().onRemove(); + drReplicate(drType, null, null, newVer); - if (tx == null) - obsoleteVer = newVer; - else { - // Only delete entry if the lock is not explicit. - if (tx.groupLock() || lockedBy(tx.xidVersion())) - obsoleteVer = tx.xidVersion(); - else if (log.isDebugEnabled()) - log.debug("Obsolete version was not set because lock was explicit: " + this); - } + if (metrics && cctx.cache().configuration().isStatisticsEnabled()) + cctx.cache().metrics0().onRemove(); - if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { - V evtOld = cctx.unwrapTemporary(old); + if (tx == null) + obsoleteVer = newVer; + else { + // Only delete entry if the lock is not explicit. + if (tx.groupLock() || lockedBy(tx.xidVersion())) + obsoleteVer = tx.xidVersion(); + else if (log.isDebugEnabled()) + log.debug("Obsolete version was not set because lock was explicit: " + this); + } - cctx.events().addEvent(partition(), key, evtNodeId, tx == null ? null : tx.xid(), newVer, - EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hasValueUnlocked(), subjId, - null, taskName); - } + if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { + V evtOld = cctx.unwrapTemporary(old); - CacheMode mode = cctx.config().getCacheMode(); + cctx.events().addEvent(partition(), key, evtNodeId, tx == null ? null : tx.xid(), newVer, + EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hasValueUnlocked(), subjId, + null, taskName); + } - if (mode == CacheMode.LOCAL || mode == CacheMode.REPLICATED || - (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdate(this, key, null, null, old, oldBytes, false); + CacheMode mode = cctx.config().getCacheMode(); - cctx.dataStructures().onEntryUpdated(key, true); - } - } - finally { - if (enqueueVer != null) { - assert cctx.deferredDelete(); + if (mode == CacheMode.LOCAL || mode == CacheMode.REPLICATED || + (tx != null && tx.local() && !isNear())) + cctx.continuousQueries().onEntryUpdate(this, key, null, null, old, oldBytes, false); - cctx.onDeferredDelete(this, enqueueVer); - } + cctx.dataStructures().onEntryUpdated(key, true); } // Persist outside of synchronization. The correctness of the http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java index 5489246..51cc2b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java @@ -685,8 +685,12 @@ public final class GridCacheMvcc<K> { break; } else { - assert !c.ready() : "Cannot have more then one ready near-local candidate: " + c; - assert !c.owner() : "Cannot have ready near-local candidate while exists near-local owner: " + c; + if (c.owner()) + continue; + + assert !c.ready() : + "Cannot have more then one ready near-local candidate [c=" + c + ", cand=" + cand + + ", mvcc=" + this + ']'; it.remove(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java index 43aa691..4857757 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java @@ -93,10 +93,6 @@ public class GridCacheMvccCandidate<K> implements Externalizable, /** Other lock version (near version vs dht version). */ private transient GridCacheVersion otherVer; - /** Mapped node IDS. */ - @GridToStringInclude - private transient volatile Collection<UUID> mappedNodeIds; - /** Owned lock version by the moment this candidate was added. */ @GridToStringInclude private transient volatile GridCacheVersion ownerVer; @@ -285,20 +281,6 @@ public class GridCacheMvccCandidate<K> implements Externalizable, } /** - * @return Mapped node IDs. - */ - public Collection<UUID> mappedNodeIds() { - return mappedNodeIds; - } - - /** - * @param mappedNodeIds Mapped node IDs. - */ - public void mappedNodeIds(Collection<UUID> mappedNodeIds) { - this.mappedNodeIds = mappedNodeIds; - } - - /** * @return Near version. */ public GridCacheVersion otherVersion() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 550a693..cc36335 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -612,22 +612,16 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { * Sets mappings into entry. * * @param ver Version. - * @param mappings Mappings to set. * @return Candidate, if one existed for the version, or {@code null} if candidate was not found. * @throws GridCacheEntryRemovedException If removed. */ - @Nullable public synchronized GridCacheMvccCandidate<K> mappings(GridCacheVersion ver, Collection<UUID> mappings) + @Nullable public synchronized GridCacheMvccCandidate<K> mappings(GridCacheVersion ver) throws GridCacheEntryRemovedException { checkObsolete(); GridCacheMvcc<K> mvcc = mvccExtras(); - GridCacheMvccCandidate<K> cand = mvcc == null ? null : mvcc.candidate(ver); - - if (cand != null) - cand.mappedNodeIds(mappings); - - return cand; + return mvcc == null ? null : mvcc.candidate(ver); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index efc3452..9e9f088 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -76,10 +76,6 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo @GridToStringExclude private List<GridDhtCacheEntry<K, V>> entries; - /** Near mappings. */ - private Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> nearMap = - new ConcurrentHashMap8<>(); - /** DHT mappings. */ private Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> dhtMap = new ConcurrentHashMap8<>(); @@ -752,10 +748,9 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo try { while (true) { try { - hasRmtNodes = cctx.dhtMap(nearNodeId, topVer, entry, log, dhtMap, nearMap); + hasRmtNodes = cctx.dhtMap(nearNodeId, topVer, entry, log, dhtMap, null); - GridCacheMvccCandidate<K> cand = entry.mappings(lockVer, - F.nodeIds(F.concat(false, dhtMap.keySet(), nearMap.keySet()))); + GridCacheMvccCandidate<K> cand = entry.mappings(lockVer); // Possible in case of lock cancellation. if (cand == null) { @@ -791,18 +786,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo } if (log.isDebugEnabled()) - log.debug("Mapped DHT lock future [dhtMap=" + F.nodeIds(dhtMap.keySet()) + ", nearMap=" + - F.nodeIds(nearMap.keySet()) + ", dhtLockFut=" + this + ']'); - - if (inTx() && tx.onePhaseCommit()) { - if (dhtMap.size() == 1 && nearMap.isEmpty()) { - if (log.isDebugEnabled()) - log.debug("One-phase commit transaction mapped to single node (will send locks on commit): " + tx); - - // Will mark initialized in finally block. - return; - } - } + log.debug("Mapped DHT lock future [dhtMap=" + F.nodeIds(dhtMap.keySet()) + ", dhtLockFut=" + this + ']'); // Create mini futures. for (Map.Entry<ClusterNode, List<GridDhtCacheEntry<K, V>>> mapped : dhtMap.entrySet()) { @@ -815,9 +799,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo if (cnt > 0) { assert !n.id().equals(ctx.localNodeId()); - List<GridDhtCacheEntry<K, V>> nearMapping = nearMap.get(n); - - MiniFuture fut = new MiniFuture(n, dhtMapping, nearMapping); + MiniFuture fut = new MiniFuture(n, dhtMapping); GridDhtLockRequest<K, V> req = new GridDhtLockRequest<>( cctx.cacheId(), @@ -834,7 +816,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo isInvalidate(), timeout, cnt, - F.size(nearMapping), + 0, inTx() ? tx.size() : cnt, inTx() ? tx.groupLockKey() : null, inTx() && tx.partitionLock(), @@ -903,70 +885,6 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo } } } - - for (Map.Entry<ClusterNode, List<GridDhtCacheEntry<K, V>>> mapped : nearMap.entrySet()) { - ClusterNode n = mapped.getKey(); - - List<GridDhtCacheEntry<K, V>> nearMapping = mapped.getValue(); - - int cnt = F.size(nearMapping); - - if (cnt > 0) { - MiniFuture fut = new MiniFuture(n, null, nearMapping); - - GridDhtLockRequest<K, V> req = new GridDhtLockRequest<>( - cctx.cacheId(), - nearNodeId, - inTx() ? tx.nearXidVersion() : null, - threadId, - futId, - fut.futureId(), - lockVer, - topVer, - inTx(), - read, - isolation(), - isInvalidate(), - timeout, - 0, - cnt, - inTx() ? tx.size() : cnt, - inTx() ? tx.groupLockKey() : null, - inTx() && tx.partitionLock(), - inTx() ? tx.subjectId() : null, - inTx() ? tx.taskNameHash() : 0, - read ? accessTtl : -1L); - - try { - for (ListIterator<GridDhtCacheEntry<K, V>> it = nearMapping.listIterator(); it.hasNext();) { - GridDhtCacheEntry<K, V> e = it.next(); - - req.addNearKey(e.key(), e.getOrMarshalKeyBytes(), cctx.shared()); - - it.set(addOwned(req, e)); - } - - add(fut); // Append new future. - - // Primary node can never be a reader. - assert !n.id().equals(ctx.localNodeId()); - - if (log.isDebugEnabled()) - log.debug("Sending DHT lock request to near node [node=" + n.id() + - ", req=" + req + ']'); - - cctx.io().send(n, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); - } - catch (ClusterTopologyException e) { - fut.onResult(e); - } - catch (IgniteCheckedException e) { - onError(e); - - break; // For - } - } - } } finally { markInitialized(); @@ -1061,10 +979,6 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo @GridToStringInclude private List<GridDhtCacheEntry<K, V>> dhtMapping; - /** Near mapping. */ - @GridToStringInclude - private List<GridDhtCacheEntry<K, V>> nearMapping; - /** * Empty constructor required for {@link Externalizable}. */ @@ -1075,16 +989,14 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo /** * @param node Node. * @param dhtMapping Mapping. - * @param nearMapping nearMapping. */ - MiniFuture(ClusterNode node, List<GridDhtCacheEntry<K, V>> dhtMapping, List<GridDhtCacheEntry<K, V>> nearMapping) { + MiniFuture(ClusterNode node, List<GridDhtCacheEntry<K, V>> dhtMapping) { super(cctx.kernalContext()); assert node != null; this.node = node; this.dhtMapping = dhtMapping; - this.nearMapping = nearMapping; } /** @@ -1133,17 +1045,6 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo // Fail the whole compound future. onError(res.error()); else { - if (nearMapping != null && !F.isEmpty(res.nearEvicted())) { - if (tx != null) { - GridDistributedTxMapping<K, V> m = tx.nearMapping(node.id()); - - if (m != null) - m.evictReaders(res.nearEvicted()); - } - - evictReaders(cctx, res.nearEvicted(), node.id(), res.messageId(), nearMapping); - } - Set<Integer> invalidParts = res.invalidPartitions(); // Removing mappings for invalid partitions. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index ea9aa00..ee05fa2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -811,6 +811,9 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } tx.needsCompletedVersions(hasRemoteNodes); + + tx.addDhtMapping(futDhtMap); + tx.addNearMapping(futNearMap); } if (isDone()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index deb991a..6496ef6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -23,6 +23,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; @@ -485,7 +486,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut GridDistributedTxMapping<K, V> cur = null; for (IgniteTxEntry<K, V> read : reads) { - GridDistributedTxMapping<K, V> updated = map(read, topVer, cur); + GridDistributedTxMapping<K, V> updated = map(read, topVer, cur, false); if (cur != updated) { mappings.offer(updated); @@ -502,7 +503,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut } for (IgniteTxEntry<K, V> write : writes) { - GridDistributedTxMapping<K, V> updated = map(write, topVer, cur); + GridDistributedTxMapping<K, V> updated = map(write, topVer, cur, true); if (cur != updated) { mappings.offer(updated); @@ -746,8 +747,12 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut * @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key. * @return Mapping. */ - private GridDistributedTxMapping<K, V> map(IgniteTxEntry<K, V> entry, long topVer, - GridDistributedTxMapping<K, V> cur) throws IgniteCheckedException { + private GridDistributedTxMapping<K, V> map( + IgniteTxEntry<K, V> entry, + long topVer, + GridDistributedTxMapping<K, V> cur, + boolean waitLock + ) throws IgniteCheckedException { GridCacheContext<K, V> cacheCtx = entry.context(); List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer); @@ -769,14 +774,19 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut " key)[key=" + entry.key() + ", primaryNodeId=" + primary.id() + ']'); // Must re-initialize cached entry while holding topology lock. - if (cacheCtx.isNear()) + if (cacheCtx.isNear()) { entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer), entry.keyBytes()); + + if (waitLock && entry.explicitVersion() == null) + lockKeys.add(entry.txKey()); + } else if (!cacheCtx.isLocal()) entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true), entry.keyBytes()); else { entry.cached(cacheCtx.local().entryEx(entry.key(), topVer), entry.keyBytes()); - lockKeys.add(entry.txKey()); + if (waitLock && entry.explicitVersion() == null) + lockKeys.add(entry.txKey()); } if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) { @@ -791,9 +801,6 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut entry.nodeId(primary.id()); if (cacheCtx.isNear()) { - if (entry.explicitVersion() == null) - lockKeys.add(entry.txKey()); - while (true) { try { GridNearCacheEntry<K, V> cached = (GridNearCacheEntry<K, V>)entry.cached(); @@ -943,6 +950,13 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut nearEntry.resetFromPrimary(tup.get2(), tup.get3(), tx.xidVersion(), tup.get1(), m.node().id()); } + else if (txEntry.cached().detached()) { + GridDhtDetachedCacheEntry<K, V> detachedEntry = (GridDhtDetachedCacheEntry<K, V>)txEntry.cached(); + + GridTuple3<GridCacheVersion, V, byte[]> tup = entry.getValue(); + + detachedEntry.resetFromPrimary(tup.get2(), tup.get3(), tx.xidVersion()); + } break; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java index 172e914..2c82cd6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.marshaller.optimized.*; import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; @@ -162,6 +163,16 @@ public abstract class GridCacheAbstractJobExecutionTest extends GridCommonAbstra fut.get(); // Wait for completion. for (int i = 0; i < GRID_CNT; i++) { + info("Running iteration: " + i); + + for (int g = 0; g < GRID_CNT; g++) { + info("Will check grid: " + g); + + GridCacheEntryEx<Object, Object> testEntry = ((IgniteKernal)grid(i)).internalCache(null).peekEx("TestKey"); + + info("Entry: " + testEntry); + } + CacheProjection<String, int[]> c = grid(i).cache(null).projection(String.class, int[].class); // Do within transaction to make sure that lock is acquired