GG-9655 - Fixing tests after merge.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/212293eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/212293eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/212293eb

Branch: refs/heads/sprint-1
Commit: 212293eb2bede9cca6610e7c43e870ab386bd3f8
Parents: 886299d
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Wed Feb 4 17:53:05 2015 -0800
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Wed Feb 4 17:53:05 2015 -0800

----------------------------------------------------------------------
 .../processors/cache/GridCacheContext.java      |  30 ++--
 .../processors/cache/GridCacheMapEntry.java     | 166 +++++++------------
 .../processors/cache/GridCacheMvcc.java         |   8 +-
 .../cache/GridCacheMvccCandidate.java           |  18 --
 .../distributed/dht/GridDhtCacheEntry.java      |  10 +-
 .../distributed/dht/GridDhtLockFuture.java      | 111 +------------
 .../distributed/dht/GridDhtTxPrepareFuture.java |   3 +
 .../near/GridNearTxPrepareFuture.java           |  32 +++-
 .../GridCacheAbstractJobExecutionTest.java      |  11 ++
 9 files changed, 131 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 1f624f8..c738b27 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1482,7 +1482,7 @@ public class GridCacheContext<K, V> implements 
Externalizable {
      */
     public boolean dhtMap(UUID nearNodeId, long topVer, GridDhtCacheEntry<K, 
V> entry, IgniteLogger log,
         Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> dhtMap,
-        Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> nearMap) throws 
GridCacheEntryRemovedException {
+        @Nullable Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> nearMap) 
throws GridCacheEntryRemovedException {
         assert topVer != -1;
 
         Collection<ClusterNode> dhtNodes = 
dht().topology().nodes(entry.partition(), topVer);
@@ -1490,25 +1490,27 @@ public class GridCacheContext<K, V> implements 
Externalizable {
         if (log.isDebugEnabled())
             log.debug("Mapping entry to DHT nodes [nodes=" + 
U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
 
-        Collection<UUID> readers = entry.readers();
+        Collection<ClusterNode> dhtRemoteNodes = F.view(dhtNodes, 
F.remoteNodes(nodeId())); // Exclude local node.
 
-        Collection<ClusterNode> nearNodes = null;
+        boolean ret = map(entry, dhtRemoteNodes, dhtMap);
 
-        if (!F.isEmpty(readers)) {
-            nearNodes = discovery().nodes(readers, F0.notEqualTo(nearNodeId));
+        if (nearMap != null) {
+            Collection<UUID> readers = entry.readers();
 
-            if (log.isDebugEnabled())
-                log.debug("Mapping entry to near nodes [nodes=" + 
U.nodeIds(nearNodes) + ", entry=" + entry + ']');
-        }
-        else if (log.isDebugEnabled())
-            log.debug("Entry has no near readers: " + entry);
+            Collection<ClusterNode> nearNodes = null;
 
-        Collection<ClusterNode> dhtRemoteNodes = F.view(dhtNodes, 
F.remoteNodes(nodeId())); // Exclude local node.
+            if (!F.isEmpty(readers)) {
+                nearNodes = discovery().nodes(readers, 
F0.notEqualTo(nearNodeId));
 
-        boolean ret = map(entry, dhtRemoteNodes, dhtMap);
+                if (log.isDebugEnabled())
+                    log.debug("Mapping entry to near nodes [nodes=" + 
U.nodeIds(nearNodes) + ", entry=" + entry + ']');
+            }
+            else if (log.isDebugEnabled())
+                log.debug("Entry has no near readers: " + entry);
 
-        if (nearNodes != null && !nearNodes.isEmpty())
-            ret |= map(entry, F.view(nearNodes, F.notIn(dhtNodes)), nearMap);
+            if (nearNodes != null && !nearNodes.isEmpty())
+                ret |= map(entry, F.view(nearNodes, F.notIn(dhtNodes)), 
nearMap);
+        }
 
         return ret;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 61d4959..ada0c1c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1089,16 +1089,6 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
 
             assert newVer != null : "Failed to get write version for tx: " + 
tx;
 
-            if (tx != null && !tx.local() && tx.onePhaseCommit() && 
explicitVer == null) {
-                if (!(isNew() || !valid) && ver.compareTo(newVer) > 0) {
-                    if (log.isDebugEnabled())
-                        log.debug("Skipping entry update for one-phase commit 
since current entry version is " +
-                            "greater than write version [entry=" + this + ", 
newVer=" + newVer + ']');
-
-                    return new GridCacheUpdateTxResult<>(false, null);
-                }
-            }
-
             old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : 
this.val;
 
             GridCacheValueBytes oldBytes = valueBytesUnlocked();
@@ -1219,129 +1209,101 @@ public abstract class GridCacheMapEntry<K, V> 
implements GridCacheEntryEx<K, V>
 
         GridCacheVersion obsoleteVer = null;
 
-        GridCacheVersion enqueueVer = null;
-
         boolean intercept = cctx.config().getInterceptor() != null;
 
         IgniteBiTuple<Boolean, V> interceptRes = null;
 
-        try {
-            synchronized (this) {
-                checkObsolete();
-
-                if (tx != null && tx.groupLock() && 
cctx.kernalContext().config().isCacheSanityCheckEnabled())
-                    groupLockSanityCheck(tx);
-                else
-                    assert tx == null || (!tx.local() && tx.onePhaseCommit()) 
|| tx.ownsLock(this) :
-                        "Transaction does not own lock for remove[entry=" + 
this + ", tx=" + tx + ']';
-
-                boolean startVer = isStartVersion();
-
-                if (startVer) {
-                    if (tx != null && !tx.local() && tx.onePhaseCommit())
-                        // Must promote to check version for one-phase commit 
tx.
-                        unswap(true, retval);
-                    else
-                        // Release swap.
-                        releaseSwap();
-                }
+        synchronized (this) {
+            checkObsolete();
 
-                newVer = explicitVer != null ? explicitVer : tx == null ? 
nextVersion() : tx.writeVersion();
+            if (tx != null && tx.groupLock() && 
cctx.kernalContext().config().isCacheSanityCheckEnabled())
+                groupLockSanityCheck(tx);
+            else
+                assert tx == null || (!tx.local() && tx.onePhaseCommit()) || 
tx.ownsLock(this) :
+                    "Transaction does not own lock for remove[entry=" + this + 
", tx=" + tx + ']';
 
-                if (tx != null && !tx.local() && tx.onePhaseCommit() && 
explicitVer == null) {
-                    if (!startVer && ver.compareTo(newVer) > 0) {
-                        if (log.isDebugEnabled())
-                            log.debug("Skipping entry removal for one-phase 
commit since current entry version is " +
-                                "greater than write version [entry=" + this + 
", newVer=" + newVer + ']');
+            boolean startVer = isStartVersion();
 
-                        return new GridCacheUpdateTxResult<>(false, null);
-                    }
+            if (startVer) {
+                // Release swap.
+                releaseSwap();
+            }
 
-                    if (!detached())
-                        enqueueVer = newVer;
-                }
+            newVer = explicitVer != null ? explicitVer : tx == null ? 
nextVersion() : tx.writeVersion();
 
-                old = (retval || intercept) ? 
rawGetOrUnmarshalUnlocked(!retval) : val;
+            old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : 
val;
 
-                if (intercept) {
-                    interceptRes = cctx.config().<K, 
V>getInterceptor().onBeforeRemove(key, old);
+            if (intercept) {
+                interceptRes = cctx.config().<K, 
V>getInterceptor().onBeforeRemove(key, old);
 
-                    if (cctx.cancelRemove(interceptRes))
-                        return new GridCacheUpdateTxResult<>(false, 
cctx.<V>unwrapTemporary(interceptRes.get2()));
-                }
+                if (cctx.cancelRemove(interceptRes))
+                    return new GridCacheUpdateTxResult<>(false, 
cctx.<V>unwrapTemporary(interceptRes.get2()));
+            }
 
-                GridCacheValueBytes oldBytes = valueBytesUnlocked();
+            GridCacheValueBytes oldBytes = valueBytesUnlocked();
 
-                if (old == null)
-                    old = saveValueForIndexUnlocked();
+            if (old == null)
+                old = saveValueForIndexUnlocked();
 
-                // Clear indexes inside of synchronization since indexes
-                // can be updated without actually holding entry lock.
-                clearIndex(old);
+            // Clear indexes inside of synchronization since indexes
+            // can be updated without actually holding entry lock.
+            clearIndex(old);
 
-                boolean hadValPtr = valPtr != 0;
+            boolean hadValPtr = valPtr != 0;
 
-                update(null, null, 0, 0, newVer);
+            update(null, null, 0, 0, newVer);
 
-                if (cctx.offheapTiered() && hadValPtr) {
-                    boolean rmv = cctx.swap().removeOffheap(key, 
getOrMarshalKeyBytes());
+            if (cctx.offheapTiered() && hadValPtr) {
+                boolean rmv = cctx.swap().removeOffheap(key, 
getOrMarshalKeyBytes());
 
-                    assert rmv;
-                }
+                assert rmv;
+            }
 
-                if (cctx.deferredDelete() && !detached() && !isInternal()) {
-                    if (!deletedUnlocked()) {
-                        deletedUnlocked(true);
+            if (cctx.deferredDelete() && !detached() && !isInternal()) {
+                if (!deletedUnlocked()) {
+                    deletedUnlocked(true);
 
-                        if (tx != null) {
-                            GridCacheMvcc<K> mvcc = mvccExtras();
+                    if (tx != null) {
+                        GridCacheMvcc<K> mvcc = mvccExtras();
 
-                            if (mvcc == null || mvcc.isEmpty(tx.xidVersion()))
-                                clearReaders();
-                            else
-                                clearReader(tx.originatingNodeId());
-                        }
+                        if (mvcc == null || mvcc.isEmpty(tx.xidVersion()))
+                            clearReaders();
+                        else
+                            clearReader(tx.originatingNodeId());
                     }
                 }
+            }
 
-                drReplicate(drType, null, null, newVer);
-
-                if (metrics && 
cctx.cache().configuration().isStatisticsEnabled())
-                    cctx.cache().metrics0().onRemove();
+            drReplicate(drType, null, null, newVer);
 
-                if (tx == null)
-                    obsoleteVer = newVer;
-                else {
-                    // Only delete entry if the lock is not explicit.
-                    if (tx.groupLock() || lockedBy(tx.xidVersion()))
-                        obsoleteVer = tx.xidVersion();
-                    else if (log.isDebugEnabled())
-                        log.debug("Obsolete version was not set because lock 
was explicit: " + this);
-                }
+            if (metrics && cctx.cache().configuration().isStatisticsEnabled())
+                cctx.cache().metrics0().onRemove();
 
-                if (evt && newVer != null && 
cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
-                    V evtOld = cctx.unwrapTemporary(old);
+            if (tx == null)
+                obsoleteVer = newVer;
+            else {
+                // Only delete entry if the lock is not explicit.
+                if (tx.groupLock() || lockedBy(tx.xidVersion()))
+                    obsoleteVer = tx.xidVersion();
+                else if (log.isDebugEnabled())
+                    log.debug("Obsolete version was not set because lock was 
explicit: " + this);
+            }
 
-                    cctx.events().addEvent(partition(), key, evtNodeId, tx == 
null ? null : tx.xid(), newVer,
-                        EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld 
!= null || hasValueUnlocked(), subjId,
-                        null, taskName);
-                }
+            if (evt && newVer != null && 
cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
+                V evtOld = cctx.unwrapTemporary(old);
 
-                CacheMode mode = cctx.config().getCacheMode();
+                cctx.events().addEvent(partition(), key, evtNodeId, tx == null 
? null : tx.xid(), newVer,
+                    EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != 
null || hasValueUnlocked(), subjId,
+                    null, taskName);
+            }
 
-                if (mode == CacheMode.LOCAL || mode == CacheMode.REPLICATED ||
-                    (tx != null && tx.local() && !isNear()))
-                    cctx.continuousQueries().onEntryUpdate(this, key, null, 
null, old, oldBytes, false);
+            CacheMode mode = cctx.config().getCacheMode();
 
-                cctx.dataStructures().onEntryUpdated(key, true);
-            }
-        }
-        finally {
-            if (enqueueVer != null) {
-                assert cctx.deferredDelete();
+            if (mode == CacheMode.LOCAL || mode == CacheMode.REPLICATED ||
+                (tx != null && tx.local() && !isNear()))
+                cctx.continuousQueries().onEntryUpdate(this, key, null, null, 
old, oldBytes, false);
 
-                cctx.onDeferredDelete(this, enqueueVer);
-            }
+            cctx.dataStructures().onEntryUpdated(key, true);
         }
 
         // Persist outside of synchronization. The correctness of the

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
index 5489246..51cc2b0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
@@ -685,8 +685,12 @@ public final class GridCacheMvcc<K> {
                     break;
                 }
                 else {
-                    assert !c.ready() : "Cannot have more then one ready 
near-local candidate: " + c;
-                    assert !c.owner() : "Cannot have ready near-local 
candidate while exists near-local owner: " + c;
+                    if (c.owner())
+                        continue;
+
+                    assert !c.ready() :
+                        "Cannot have more then one ready near-local candidate 
[c=" + c + ", cand=" + cand +
+                            ", mvcc=" + this + ']';
 
                     it.remove();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
index 43aa691..4857757 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
@@ -93,10 +93,6 @@ public class GridCacheMvccCandidate<K> implements 
Externalizable,
     /** Other lock version (near version vs dht version). */
     private transient GridCacheVersion otherVer;
 
-    /** Mapped node IDS. */
-    @GridToStringInclude
-    private transient volatile Collection<UUID> mappedNodeIds;
-
     /** Owned lock version by the moment this candidate was added. */
     @GridToStringInclude
     private transient volatile GridCacheVersion ownerVer;
@@ -285,20 +281,6 @@ public class GridCacheMvccCandidate<K> implements 
Externalizable,
     }
 
     /**
-     * @return Mapped node IDs.
-     */
-    public Collection<UUID> mappedNodeIds() {
-        return mappedNodeIds;
-    }
-
-    /**
-     * @param mappedNodeIds Mapped node IDs.
-     */
-    public void mappedNodeIds(Collection<UUID> mappedNodeIds) {
-        this.mappedNodeIds = mappedNodeIds;
-    }
-
-    /**
      * @return Near version.
      */
     public GridCacheVersion otherVersion() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 550a693..cc36335 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -612,22 +612,16 @@ public class GridDhtCacheEntry<K, V> extends 
GridDistributedCacheEntry<K, V> {
      * Sets mappings into entry.
      *
      * @param ver Version.
-     * @param mappings Mappings to set.
      * @return Candidate, if one existed for the version, or {@code null} if 
candidate was not found.
      * @throws GridCacheEntryRemovedException If removed.
      */
-    @Nullable public synchronized GridCacheMvccCandidate<K> 
mappings(GridCacheVersion ver, Collection<UUID> mappings)
+    @Nullable public synchronized GridCacheMvccCandidate<K> 
mappings(GridCacheVersion ver)
         throws GridCacheEntryRemovedException {
         checkObsolete();
 
         GridCacheMvcc<K> mvcc = mvccExtras();
 
-        GridCacheMvccCandidate<K> cand = mvcc == null ? null : 
mvcc.candidate(ver);
-
-        if (cand != null)
-            cand.mappedNodeIds(mappings);
-
-        return cand;
+        return mvcc == null ? null : mvcc.candidate(ver);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index efc3452..9e9f088 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -76,10 +76,6 @@ public final class GridDhtLockFuture<K, V> extends 
GridCompoundIdentityFuture<Bo
     @GridToStringExclude
     private List<GridDhtCacheEntry<K, V>> entries;
 
-    /** Near mappings. */
-    private Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> nearMap =
-        new ConcurrentHashMap8<>();
-
     /** DHT mappings. */
     private Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> dhtMap =
         new ConcurrentHashMap8<>();
@@ -752,10 +748,9 @@ public final class GridDhtLockFuture<K, V> extends 
GridCompoundIdentityFuture<Bo
                 try {
                     while (true) {
                         try {
-                            hasRmtNodes = cctx.dhtMap(nearNodeId, topVer, 
entry, log, dhtMap, nearMap);
+                            hasRmtNodes = cctx.dhtMap(nearNodeId, topVer, 
entry, log, dhtMap, null);
 
-                            GridCacheMvccCandidate<K> cand = 
entry.mappings(lockVer,
-                                F.nodeIds(F.concat(false, dhtMap.keySet(), 
nearMap.keySet())));
+                            GridCacheMvccCandidate<K> cand = 
entry.mappings(lockVer);
 
                             // Possible in case of lock cancellation.
                             if (cand == null) {
@@ -791,18 +786,7 @@ public final class GridDhtLockFuture<K, V> extends 
GridCompoundIdentityFuture<Bo
             }
 
             if (log.isDebugEnabled())
-                log.debug("Mapped DHT lock future [dhtMap=" + 
F.nodeIds(dhtMap.keySet()) + ", nearMap=" +
-                    F.nodeIds(nearMap.keySet()) + ", dhtLockFut=" + this + 
']');
-
-            if (inTx() && tx.onePhaseCommit()) {
-                if (dhtMap.size() == 1 && nearMap.isEmpty()) {
-                    if (log.isDebugEnabled())
-                        log.debug("One-phase commit transaction mapped to 
single node (will send locks on commit): " + tx);
-
-                    // Will mark initialized in finally block.
-                    return;
-                }
-            }
+                log.debug("Mapped DHT lock future [dhtMap=" + 
F.nodeIds(dhtMap.keySet()) + ", dhtLockFut=" + this + ']');
 
             // Create mini futures.
             for (Map.Entry<ClusterNode, List<GridDhtCacheEntry<K, V>>> mapped 
: dhtMap.entrySet()) {
@@ -815,9 +799,7 @@ public final class GridDhtLockFuture<K, V> extends 
GridCompoundIdentityFuture<Bo
                 if (cnt > 0) {
                     assert !n.id().equals(ctx.localNodeId());
 
-                    List<GridDhtCacheEntry<K, V>> nearMapping = nearMap.get(n);
-
-                    MiniFuture fut = new MiniFuture(n, dhtMapping, 
nearMapping);
+                    MiniFuture fut = new MiniFuture(n, dhtMapping);
 
                     GridDhtLockRequest<K, V> req = new GridDhtLockRequest<>(
                         cctx.cacheId(),
@@ -834,7 +816,7 @@ public final class GridDhtLockFuture<K, V> extends 
GridCompoundIdentityFuture<Bo
                         isInvalidate(),
                         timeout,
                         cnt,
-                        F.size(nearMapping),
+                        0,
                         inTx() ? tx.size() : cnt,
                         inTx() ? tx.groupLockKey() : null,
                         inTx() && tx.partitionLock(),
@@ -903,70 +885,6 @@ public final class GridDhtLockFuture<K, V> extends 
GridCompoundIdentityFuture<Bo
                     }
                 }
             }
-
-            for (Map.Entry<ClusterNode, List<GridDhtCacheEntry<K, V>>> mapped 
: nearMap.entrySet()) {
-                ClusterNode n = mapped.getKey();
-
-                List<GridDhtCacheEntry<K, V>> nearMapping = mapped.getValue();
-
-                int cnt = F.size(nearMapping);
-
-                if (cnt > 0) {
-                    MiniFuture fut = new MiniFuture(n, null, nearMapping);
-
-                    GridDhtLockRequest<K, V> req = new GridDhtLockRequest<>(
-                        cctx.cacheId(),
-                        nearNodeId,
-                        inTx() ? tx.nearXidVersion() : null,
-                        threadId,
-                        futId,
-                        fut.futureId(),
-                        lockVer,
-                        topVer,
-                        inTx(),
-                        read,
-                        isolation(),
-                        isInvalidate(),
-                        timeout,
-                        0,
-                        cnt,
-                        inTx() ? tx.size() : cnt,
-                        inTx() ? tx.groupLockKey() : null,
-                        inTx() && tx.partitionLock(),
-                        inTx() ? tx.subjectId() : null,
-                        inTx() ? tx.taskNameHash() : 0,
-                        read ? accessTtl : -1L);
-
-                    try {
-                        for (ListIterator<GridDhtCacheEntry<K, V>> it = 
nearMapping.listIterator(); it.hasNext();) {
-                            GridDhtCacheEntry<K, V> e = it.next();
-
-                            req.addNearKey(e.key(), e.getOrMarshalKeyBytes(), 
cctx.shared());
-
-                            it.set(addOwned(req, e));
-                        }
-
-                        add(fut); // Append new future.
-
-                        // Primary node can never be a reader.
-                        assert !n.id().equals(ctx.localNodeId());
-
-                        if (log.isDebugEnabled())
-                            log.debug("Sending DHT lock request to near node 
[node=" + n.id() +
-                                ", req=" + req + ']');
-
-                        cctx.io().send(n, req, cctx.system() ? 
UTILITY_CACHE_POOL : SYSTEM_POOL);
-                    }
-                    catch (ClusterTopologyException e) {
-                        fut.onResult(e);
-                    }
-                    catch (IgniteCheckedException e) {
-                        onError(e);
-
-                        break; // For
-                    }
-                }
-            }
         }
         finally {
             markInitialized();
@@ -1061,10 +979,6 @@ public final class GridDhtLockFuture<K, V> extends 
GridCompoundIdentityFuture<Bo
         @GridToStringInclude
         private List<GridDhtCacheEntry<K, V>> dhtMapping;
 
-        /** Near mapping. */
-        @GridToStringInclude
-        private List<GridDhtCacheEntry<K, V>> nearMapping;
-
         /**
          * Empty constructor required for {@link Externalizable}.
          */
@@ -1075,16 +989,14 @@ public final class GridDhtLockFuture<K, V> extends 
GridCompoundIdentityFuture<Bo
         /**
          * @param node Node.
          * @param dhtMapping Mapping.
-         * @param nearMapping nearMapping.
          */
-        MiniFuture(ClusterNode node, List<GridDhtCacheEntry<K, V>> dhtMapping, 
List<GridDhtCacheEntry<K, V>> nearMapping) {
+        MiniFuture(ClusterNode node, List<GridDhtCacheEntry<K, V>> dhtMapping) 
{
             super(cctx.kernalContext());
 
             assert node != null;
 
             this.node = node;
             this.dhtMapping = dhtMapping;
-            this.nearMapping = nearMapping;
         }
 
         /**
@@ -1133,17 +1045,6 @@ public final class GridDhtLockFuture<K, V> extends 
GridCompoundIdentityFuture<Bo
                 // Fail the whole compound future.
                 onError(res.error());
             else {
-                if (nearMapping != null && !F.isEmpty(res.nearEvicted())) {
-                    if (tx != null) {
-                        GridDistributedTxMapping<K, V> m = 
tx.nearMapping(node.id());
-
-                        if (m != null)
-                            m.evictReaders(res.nearEvicted());
-                    }
-
-                    evictReaders(cctx, res.nearEvicted(), node.id(), 
res.messageId(), nearMapping);
-                }
-
                 Set<Integer> invalidParts = res.invalidPartitions();
 
                 // Removing mappings for invalid partitions.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index ea9aa00..ee05fa2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -811,6 +811,9 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
                 }
 
                 tx.needsCompletedVersions(hasRemoteNodes);
+
+                tx.addDhtMapping(futDhtMap);
+                tx.addNearMapping(futNearMap);
             }
 
             if (isDone())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index deb991a..6496ef6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -23,6 +23,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.transactions.*;
@@ -485,7 +486,7 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
         GridDistributedTxMapping<K, V> cur = null;
 
         for (IgniteTxEntry<K, V> read : reads) {
-            GridDistributedTxMapping<K, V> updated = map(read, topVer, cur);
+            GridDistributedTxMapping<K, V> updated = map(read, topVer, cur, 
false);
 
             if (cur != updated) {
                 mappings.offer(updated);
@@ -502,7 +503,7 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
         }
 
         for (IgniteTxEntry<K, V> write : writes) {
-            GridDistributedTxMapping<K, V> updated = map(write, topVer, cur);
+            GridDistributedTxMapping<K, V> updated = map(write, topVer, cur, 
true);
 
             if (cur != updated) {
                 mappings.offer(updated);
@@ -746,8 +747,12 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
      * @throws IgniteCheckedException If transaction is group-lock and local 
node is not primary for key.
      * @return Mapping.
      */
-    private GridDistributedTxMapping<K, V> map(IgniteTxEntry<K, V> entry, long 
topVer,
-        GridDistributedTxMapping<K, V> cur) throws IgniteCheckedException {
+    private GridDistributedTxMapping<K, V> map(
+        IgniteTxEntry<K, V> entry,
+        long topVer,
+        GridDistributedTxMapping<K, V> cur,
+        boolean waitLock
+    ) throws IgniteCheckedException {
         GridCacheContext<K, V> cacheCtx = entry.context();
 
         List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), 
topVer);
@@ -769,14 +774,19 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
                 " key)[key=" + entry.key() + ", primaryNodeId=" + primary.id() 
+ ']');
 
         // Must re-initialize cached entry while holding topology lock.
-        if (cacheCtx.isNear())
+        if (cacheCtx.isNear()) {
             entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer), 
entry.keyBytes());
+
+            if (waitLock && entry.explicitVersion() == null)
+                lockKeys.add(entry.txKey());
+        }
         else if (!cacheCtx.isLocal())
             entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, 
true), entry.keyBytes());
         else {
             entry.cached(cacheCtx.local().entryEx(entry.key(), topVer), 
entry.keyBytes());
 
-            lockKeys.add(entry.txKey());
+            if (waitLock && entry.explicitVersion() == null)
+                lockKeys.add(entry.txKey());
         }
 
         if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() 
!= cacheCtx.isNear()) {
@@ -791,9 +801,6 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
         entry.nodeId(primary.id());
 
         if (cacheCtx.isNear()) {
-            if (entry.explicitVersion() == null)
-                lockKeys.add(entry.txKey());
-
             while (true) {
                 try {
                     GridNearCacheEntry<K, V> cached = (GridNearCacheEntry<K, 
V>)entry.cached();
@@ -943,6 +950,13 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
                                     nearEntry.resetFromPrimary(tup.get2(), 
tup.get3(), tx.xidVersion(),
                                         tup.get1(), m.node().id());
                                 }
+                                else if (txEntry.cached().detached()) {
+                                    GridDhtDetachedCacheEntry<K, V> 
detachedEntry = (GridDhtDetachedCacheEntry<K, V>)txEntry.cached();
+
+                                    GridTuple3<GridCacheVersion, V, byte[]> 
tup = entry.getValue();
+
+                                    detachedEntry.resetFromPrimary(tup.get2(), 
tup.get3(), tx.xidVersion());
+                                }
 
                                 break;
                             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/212293eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java
index 172e914..2c82cd6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.marshaller.optimized.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.transactions.*;
@@ -162,6 +163,16 @@ public abstract class GridCacheAbstractJobExecutionTest 
extends GridCommonAbstra
             fut.get(); // Wait for completion.
 
         for (int i = 0; i < GRID_CNT; i++) {
+            info("Running iteration: " + i);
+
+            for (int g = 0; g < GRID_CNT; g++) {
+                info("Will check grid: " + g);
+
+                GridCacheEntryEx<Object, Object> testEntry = 
((IgniteKernal)grid(i)).internalCache(null).peekEx("TestKey");
+
+                info("Entry: " + testEntry);
+            }
+
             CacheProjection<String, int[]> c = 
grid(i).cache(null).projection(String.class, int[].class);
 
             // Do within transaction to make sure that lock is acquired

Reply via email to