GG-9655 - Fixing tests after merge.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9a995a3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9a995a3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9a995a3a

Branch: refs/heads/ingite-9655-merge
Commit: 9a995a3aba1cc3cef0dd8027407d234dcf87c885
Parents: 61c102c
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Fri Jan 30 16:41:37 2015 -0800
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Fri Jan 30 16:41:37 2015 -0800

----------------------------------------------------------------------
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  1 +
 .../distributed/dht/GridDhtTxPrepareFuture.java | 74 ++++++++----------
 .../distributed/near/GridNearLockRequest.java   |  5 --
 .../cache/distributed/near/GridNearTxLocal.java | 11 ++-
 .../near/GridNearTxPrepareFuture.java           | 81 ++++++++------------
 .../transactions/IgniteTxLocalAdapter.java      |  8 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |  5 +-
 7 files changed, 76 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a995a3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 33509ab..ae8e3c2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -451,6 +451,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends 
IgniteTxLocalAdapter<K
                 entry.valueBytes(e.valueBytes());
                 entry.ttl(e.ttl());
                 entry.filters(e.filters());
+                entry.expiry(e.expiry());
                 entry.drExpireTime(e.drExpireTime());
                 entry.drVersion(e.drVersion());
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a995a3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 407b8fe..256c9f9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -116,6 +116,12 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
     /** Keys that did not pass the filter. */
     private Collection<IgniteTxKey<K>> filterFailedKeys;
 
+    /** Keys that should be locked. */
+    private GridConcurrentHashSet<IgniteTxKey<K>> lockKeys = new 
GridConcurrentHashSet<>();
+
+    /** Locks ready flag. */
+    private volatile boolean locksReady;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -209,9 +215,9 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
         if (log.isDebugEnabled())
             log.debug("Transaction future received owner changed callback: " + 
entry);
 
-        boolean ret = tx.hasWriteKey(entry.txKey());
+        boolean rmv = lockKeys.remove(entry.txKey());
 
-        return ret && mapIfLocked();
+        return rmv && mapIfLocked();
     }
 
     /** {@inheritDoc} */
@@ -235,44 +241,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
      * @return {@code True} if all locks are owned.
      */
     private boolean checkLocks() {
-        for (IgniteTxEntry<K, V> txEntry : tx.optimisticLockEntries()) {
-            while (true) {
-                GridCacheEntryEx<K, V> cached = txEntry.cached();
-
-                try {
-                    if (txEntry.explicitVersion() == null) {
-                        // Don't compare entry against itself.
-                        if (!cached.lockedLocally(tx.xidVersion())) {
-                            if (log.isDebugEnabled())
-                                log.debug("Transaction entry is not locked by 
transaction (will wait) [entry=" +
-                                    cached + ", tx=" + tx + ']');
-
-                            return false;
-                        }
-                    }
-                    else {
-                        if (!cached.lockedBy(txEntry.explicitVersion())) {
-                            if (log.isDebugEnabled())
-                                log.debug("Transaction entry is not locked by 
explicit version (will wait) [entry=" +
-                                    cached + ", tx=" + tx + ']');
-
-                            return false;
-                        }
-                    }
-
-                    break; // While.
-                }
-                // Possible if entry cached within transaction is obsolete.
-                catch (GridCacheEntryRemovedException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Got removed entry in future onAllReplies 
method (will retry): " + txEntry);
-
-                    
txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), 
txEntry.keyBytes());
-                }
-            }
-        }
-
-        return true;
+        return locksReady && lockKeys.isEmpty();
     }
 
     /** {@inheritDoc} */
@@ -460,13 +429,26 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
             Collections.singletonList(tx.groupLockEntry()) : writes;
 
         for (IgniteTxEntry<K, V> txEntry : checkEntries) {
-            if (txEntry.cached().isLocal())
+            GridCacheContext<K, V> cacheCtx = txEntry.context();
+
+            if (cacheCtx.isLocal())
                 continue;
 
-            while (true) {
-                GridDistributedCacheEntry<K, V> entry = 
(GridDistributedCacheEntry<K, V>)txEntry.cached();
+            GridDistributedCacheEntry<K, V> entry = 
(GridDistributedCacheEntry<K, V>)txEntry.cached();
+
+            if (entry == null) {
+                entry = (GridDistributedCacheEntry<K, 
V>)cacheCtx.cache().entryEx(txEntry.key());
+
+                txEntry.cached(entry, txEntry.keyBytes());
+            }
+
+            if (tx.optimistic() && txEntry.explicitVersion() == null)
+                lockKeys.add(txEntry.txKey());
 
+            while (true) {
                 try {
+                    assert txEntry.explicitVersion() == null || 
entry.lockedBy(txEntry.explicitVersion());
+
                     GridCacheMvccCandidate<K> c = 
entry.readyLock(tx.xidVersion());
 
                     if (log.isDebugEnabled())
@@ -479,10 +461,14 @@ public final class GridDhtTxPrepareFuture<K, V> extends 
GridCompoundIdentityFutu
                     if (log.isDebugEnabled())
                         log.debug("Got removed entry in future onAllReplies 
method (will retry): " + txEntry);
 
-                    
txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), 
txEntry.keyBytes());
+                    entry = (GridDistributedCacheEntry<K, 
V>)cacheCtx.cache().entryEx(txEntry.key());
+
+                    txEntry.cached(entry, txEntry.keyBytes());
                 }
             }
         }
+
+        locksReady = true;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a995a3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index e707e85..732703e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -301,11 +301,6 @@ public class GridNearLockRequest<K, V> extends 
GridDistributedLockRequest<K, V>
         return dhtVers[idx];
     }
 
-    /** {@inheritDoc} */
-    @Override protected boolean transferExpiryPolicy() {
-        return true;
-    }
-
     /**
      * @return TTL for read operation.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a995a3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index f006508..b8bda46 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -82,7 +82,7 @@ public class GridNearTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> {
     private boolean colocatedLocallyMapped;
 
     /** Info for entries accessed locally in optimistic transaction. */
-    private Map<IgniteTxKey, IgniteCacheExpiryPolicy> accessMap;
+    private Map<IgniteTxKey<K>, IgniteCacheExpiryPolicy> accessMap;
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -560,8 +560,7 @@ public class GridNearTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> {
             while (true) {
                 GridCacheContext<K, V> cacheCtx = txEntry.cached().context();
 
-                if (!cacheCtx.isNear())
-                    break;
+                assert cacheCtx.isNear();
 
                 GridDistributedCacheEntry<K, V> entry = 
(GridDistributedCacheEntry<K, V>)txEntry.cached();
 
@@ -1156,7 +1155,7 @@ public class GridNearTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> {
 
     /** {@inheritDoc} */
     @Override protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext 
ctx,
-        IgniteTxKey key,
+        IgniteTxKey<K> key,
         @Nullable ExpiryPolicy expiryPlc)
     {
         assert optimistic();
@@ -1187,7 +1186,7 @@ public class GridNearTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> {
      */
     private IgniteCacheExpiryPolicy accessPolicy(GridCacheContext<K, V> 
cacheCtx, Collection<? extends K> keys) {
         if (accessMap != null) {
-            for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : 
accessMap.entrySet()) {
+            for (Map.Entry<IgniteTxKey<K>, IgniteCacheExpiryPolicy> e : 
accessMap.entrySet()) {
                 if (e.getKey().cacheId() == cacheCtx.cacheId() && 
keys.contains(e.getKey().key()))
                     return e.getValue();
             }
@@ -1203,7 +1202,7 @@ public class GridNearTxLocal<K, V> extends 
GridDhtTxLocalAdapter<K, V> {
         if (accessMap != null) {
             assert optimistic();
 
-            for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : 
accessMap.entrySet()) {
+            for (Map.Entry<IgniteTxKey<K>, IgniteCacheExpiryPolicy> e : 
accessMap.entrySet()) {
                 if (e.getValue().entries() != null) {
                     GridCacheContext cctx0 = 
cctx.cacheContext(e.getKey().cacheId());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a995a3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index e7b3601..4d9dadf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import org.apache.ignite.*;
+import org.apache.ignite.client.util.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
@@ -78,6 +79,9 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
     /** Full information about transaction nodes mapping. */
     private GridDhtTxMapping<K, V> txMapping;
 
+    /** */
+    private Collection<IgniteTxKey<K>> lockKeys = new 
GridConcurrentHashSet<>();
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -128,6 +132,8 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
             log.debug("Transaction future received owner changed callback: " + 
entry);
 
         if (entry.context().isNear() && owner != null && 
tx.hasWriteKey(entry.txKey())) {
+            lockKeys.remove(entry.txKey());
+
             // This will check for locks.
             onDone();
 
@@ -213,45 +219,16 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
      * @return {@code True} if all locks are owned.
      */
     private boolean checkLocks() {
-        Collection<IgniteTxEntry<K, V>> checkEntries = tx.groupLock() ?
-            Collections.singletonList(tx.groupLockEntry()) :
-            tx.writeEntries();
-
-        for (IgniteTxEntry<K, V> txEntry : checkEntries) {
-            // Wait for near locks only.
-            if (!txEntry.context().isNear())
-                continue;
-
-            while (true) {
-                GridCacheEntryEx<K, V> cached = txEntry.cached();
-
-                try {
-                    GridCacheVersion ver = txEntry.explicitVersion() != null ?
-                        txEntry.explicitVersion() : tx.xidVersion();
+        boolean locked = lockKeys.isEmpty();
 
-                    // If locks haven't been acquired yet, keep waiting.
-                    if (!cached.lockedBy(ver)) {
-                        if (log.isDebugEnabled())
-                            log.debug("Transaction entry is not locked by 
transaction (will wait) [entry=" + cached +
-                                ", tx=" + tx + ']');
-
-                        return false;
-                    }
-
-                    break; // While.
-                }
-                // Possible if entry cached within transaction is obsolete.
-                catch (GridCacheEntryRemovedException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Got removed entry in future onAllReplies 
method (will retry): " + txEntry);
-
-                    
txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), 
txEntry.keyBytes());
-                }
-            }
+        if (locked) {
+            if (log.isDebugEnabled())
+                log.debug("All locks are acquired for near prepare future: " + 
this);
+        }
+        else {
+            if (log.isDebugEnabled())
+                log.debug("Still waiting for locks [fut=" + this + ", keys=" + 
lockKeys + ']');
         }
-
-        if (log.isDebugEnabled())
-            log.debug("All locks are acquired for near prepare future: " + 
this);
 
         return true;
     }
@@ -564,7 +541,7 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
      *
      */
     private void preparePessimistic() {
-        Map<ClusterNode, GridDistributedTxMapping<K, V>> mappings = new 
HashMap<>();
+        Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping<K, 
V>> mappings = new HashMap<>();
 
         long topVer = tx.topologyVersion();
 
@@ -577,12 +554,18 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
 
             ClusterNode primary = F.first(nodes);
 
-            GridDistributedTxMapping<K, V> nodeMapping = mappings.get(primary);
+            boolean near = cacheCtx.isNear();
+
+            IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, near);
+
+            GridDistributedTxMapping<K, V> nodeMapping = mappings.get(key);
 
             if (nodeMapping == null) {
                 nodeMapping = new GridDistributedTxMapping<>(primary);
 
-                mappings.put(primary, nodeMapping);
+                nodeMapping.near(cacheCtx.isNear());
+
+                mappings.put(key, nodeMapping);
             }
 
             txEntry.nodeId(primary.id());
@@ -663,13 +646,6 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
                                 tx.addDhtVersion(m.node().id(), 
dhtTx.xidVersion());
 
                                 m.dhtVersion(dhtTx.xidVersion());
-
-                                GridCacheVersion min = dhtTx.minVersion();
-
-                                IgniteTxManager<K, V> tm = cctx.tm();
-
-                                tx.readyNearLocks(m, 
Collections.<GridCacheVersion>emptyList(),
-                                    tm.committedVersions(min), 
tm.rolledbackVersions(min));
                             }
 
                             
tx.implicitSingleResult(dhtTx.implicitSingleResult());
@@ -821,8 +797,9 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
 
                             IgniteTxManager<K, V> tm = cctx.tm();
 
-                            tx.readyNearLocks(m, 
Collections.<GridCacheVersion>emptyList(),
-                                tm.committedVersions(min), 
tm.rolledbackVersions(min));
+                            if (m.near())
+                                tx.readyNearLocks(m, 
Collections.<GridCacheVersion>emptyList(),
+                                    tm.committedVersions(min), 
tm.rolledbackVersions(min));
                         }
 
                         // Continue prepare before completing the future.
@@ -902,6 +879,9 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
         entry.nodeId(primary.id());
 
         if (cacheCtx.isNear()) {
+            if (entry.explicitVersion() == null)
+                lockKeys.add(entry.txKey());
+
             while (true) {
                 try {
                     GridNearCacheEntry<K, V> cached = (GridNearCacheEntry<K, 
V>)entry.cached();
@@ -1083,7 +1063,8 @@ public final class GridNearTxPrepareFuture<K, V> extends 
GridCompoundIdentityFut
 
                         m.dhtVersion(res.dhtVersion());
 
-                        tx.readyNearLocks(m, res.pending(), 
res.committedVersions(), res.rolledbackVersions());
+                        if (m.near())
+                            tx.readyNearLocks(m, res.pending(), 
res.committedVersions(), res.rolledbackVersions());
                     }
 
                     // Proceed prepare before finishing mini future.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a995a3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index f097833..e481f84 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1360,9 +1360,11 @@ public abstract class IgniteTxLocalAdapter<K, V> extends 
IgniteTxAdapter<K, V>
      * @param expiryPlc Expiry policy.
      * @return Expiry policy wrapper for entries accessed locally in 
optimistic transaction.
      */
-    protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext ctx,
-        IgniteTxKey key,
-        @Nullable ExpiryPolicy expiryPlc) {
+    protected IgniteCacheExpiryPolicy accessPolicy(
+        GridCacheContext ctx,
+        IgniteTxKey<K> key,
+        @Nullable ExpiryPolicy expiryPlc
+    ) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a995a3a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 237ee2f..5526c7b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -4477,10 +4477,13 @@ public abstract class GridCacheAbstractFullApiSelfTest 
extends GridCacheAbstract
 
         try {
             grid(0).jcache(null).withExpiryPolicy(expiry).put(key, 1);
+
+            if (tx != null)
+                tx.commit();
         }
         finally {
             if (tx != null)
-                tx.commit();
+                tx.close();
         }
 
         long[] expireTimes = new long[gridCount()];

Reply via email to