GG-9655 - Fixing tests after merge.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9a995a3a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9a995a3a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9a995a3a Branch: refs/heads/ingite-9655-merge Commit: 9a995a3aba1cc3cef0dd8027407d234dcf87c885 Parents: 61c102c Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Fri Jan 30 16:41:37 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Fri Jan 30 16:41:37 2015 -0800 ---------------------------------------------------------------------- .../distributed/dht/GridDhtTxLocalAdapter.java | 1 + .../distributed/dht/GridDhtTxPrepareFuture.java | 74 ++++++++---------- .../distributed/near/GridNearLockRequest.java | 5 -- .../cache/distributed/near/GridNearTxLocal.java | 11 ++- .../near/GridNearTxPrepareFuture.java | 81 ++++++++------------ .../transactions/IgniteTxLocalAdapter.java | 8 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 5 +- 7 files changed, 76 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a995a3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 33509ab..ae8e3c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -451,6 +451,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K entry.valueBytes(e.valueBytes()); entry.ttl(e.ttl()); entry.filters(e.filters()); + entry.expiry(e.expiry()); entry.drExpireTime(e.drExpireTime()); entry.drVersion(e.drVersion()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a995a3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 407b8fe..256c9f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -116,6 +116,12 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu /** Keys that did not pass the filter. */ private Collection<IgniteTxKey<K>> filterFailedKeys; + /** Keys that should be locked. */ + private GridConcurrentHashSet<IgniteTxKey<K>> lockKeys = new GridConcurrentHashSet<>(); + + /** Locks ready flag. */ + private volatile boolean locksReady; + /** * Empty constructor required for {@link Externalizable}. */ @@ -209,9 +215,9 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu if (log.isDebugEnabled()) log.debug("Transaction future received owner changed callback: " + entry); - boolean ret = tx.hasWriteKey(entry.txKey()); + boolean rmv = lockKeys.remove(entry.txKey()); - return ret && mapIfLocked(); + return rmv && mapIfLocked(); } /** {@inheritDoc} */ @@ -235,44 +241,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu * @return {@code True} if all locks are owned. */ private boolean checkLocks() { - for (IgniteTxEntry<K, V> txEntry : tx.optimisticLockEntries()) { - while (true) { - GridCacheEntryEx<K, V> cached = txEntry.cached(); - - try { - if (txEntry.explicitVersion() == null) { - // Don't compare entry against itself. - if (!cached.lockedLocally(tx.xidVersion())) { - if (log.isDebugEnabled()) - log.debug("Transaction entry is not locked by transaction (will wait) [entry=" + - cached + ", tx=" + tx + ']'); - - return false; - } - } - else { - if (!cached.lockedBy(txEntry.explicitVersion())) { - if (log.isDebugEnabled()) - log.debug("Transaction entry is not locked by explicit version (will wait) [entry=" + - cached + ", tx=" + tx + ']'); - - return false; - } - } - - break; // While. - } - // Possible if entry cached within transaction is obsolete. - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in future onAllReplies method (will retry): " + txEntry); - - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes()); - } - } - } - - return true; + return locksReady && lockKeys.isEmpty(); } /** {@inheritDoc} */ @@ -460,13 +429,26 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu Collections.singletonList(tx.groupLockEntry()) : writes; for (IgniteTxEntry<K, V> txEntry : checkEntries) { - if (txEntry.cached().isLocal()) + GridCacheContext<K, V> cacheCtx = txEntry.context(); + + if (cacheCtx.isLocal()) continue; - while (true) { - GridDistributedCacheEntry<K, V> entry = (GridDistributedCacheEntry<K, V>)txEntry.cached(); + GridDistributedCacheEntry<K, V> entry = (GridDistributedCacheEntry<K, V>)txEntry.cached(); + + if (entry == null) { + entry = (GridDistributedCacheEntry<K, V>)cacheCtx.cache().entryEx(txEntry.key()); + + txEntry.cached(entry, txEntry.keyBytes()); + } + + if (tx.optimistic() && txEntry.explicitVersion() == null) + lockKeys.add(txEntry.txKey()); + while (true) { try { + assert txEntry.explicitVersion() == null || entry.lockedBy(txEntry.explicitVersion()); + GridCacheMvccCandidate<K> c = entry.readyLock(tx.xidVersion()); if (log.isDebugEnabled()) @@ -479,10 +461,14 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu if (log.isDebugEnabled()) log.debug("Got removed entry in future onAllReplies method (will retry): " + txEntry); - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes()); + entry = (GridDistributedCacheEntry<K, V>)cacheCtx.cache().entryEx(txEntry.key()); + + txEntry.cached(entry, txEntry.keyBytes()); } } } + + locksReady = true; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a995a3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index e707e85..732703e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@ -301,11 +301,6 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> return dhtVers[idx]; } - /** {@inheritDoc} */ - @Override protected boolean transferExpiryPolicy() { - return true; - } - /** * @return TTL for read operation. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a995a3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index f006508..b8bda46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -82,7 +82,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { private boolean colocatedLocallyMapped; /** Info for entries accessed locally in optimistic transaction. */ - private Map<IgniteTxKey, IgniteCacheExpiryPolicy> accessMap; + private Map<IgniteTxKey<K>, IgniteCacheExpiryPolicy> accessMap; /** * Empty constructor required for {@link Externalizable}. @@ -560,8 +560,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { while (true) { GridCacheContext<K, V> cacheCtx = txEntry.cached().context(); - if (!cacheCtx.isNear()) - break; + assert cacheCtx.isNear(); GridDistributedCacheEntry<K, V> entry = (GridDistributedCacheEntry<K, V>)txEntry.cached(); @@ -1156,7 +1155,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { /** {@inheritDoc} */ @Override protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext ctx, - IgniteTxKey key, + IgniteTxKey<K> key, @Nullable ExpiryPolicy expiryPlc) { assert optimistic(); @@ -1187,7 +1186,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { */ private IgniteCacheExpiryPolicy accessPolicy(GridCacheContext<K, V> cacheCtx, Collection<? extends K> keys) { if (accessMap != null) { - for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) { + for (Map.Entry<IgniteTxKey<K>, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) { if (e.getKey().cacheId() == cacheCtx.cacheId() && keys.contains(e.getKey().key())) return e.getValue(); } @@ -1203,7 +1202,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { if (accessMap != null) { assert optimistic(); - for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) { + for (Map.Entry<IgniteTxKey<K>, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) { if (e.getValue().entries() != null) { GridCacheContext cctx0 = cctx.cacheContext(e.getKey().cacheId()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a995a3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index e7b3601..4d9dadf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; +import org.apache.ignite.client.util.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; @@ -78,6 +79,9 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut /** Full information about transaction nodes mapping. */ private GridDhtTxMapping<K, V> txMapping; + /** */ + private Collection<IgniteTxKey<K>> lockKeys = new GridConcurrentHashSet<>(); + /** * Empty constructor required for {@link Externalizable}. */ @@ -128,6 +132,8 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut log.debug("Transaction future received owner changed callback: " + entry); if (entry.context().isNear() && owner != null && tx.hasWriteKey(entry.txKey())) { + lockKeys.remove(entry.txKey()); + // This will check for locks. onDone(); @@ -213,45 +219,16 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut * @return {@code True} if all locks are owned. */ private boolean checkLocks() { - Collection<IgniteTxEntry<K, V>> checkEntries = tx.groupLock() ? - Collections.singletonList(tx.groupLockEntry()) : - tx.writeEntries(); - - for (IgniteTxEntry<K, V> txEntry : checkEntries) { - // Wait for near locks only. - if (!txEntry.context().isNear()) - continue; - - while (true) { - GridCacheEntryEx<K, V> cached = txEntry.cached(); - - try { - GridCacheVersion ver = txEntry.explicitVersion() != null ? - txEntry.explicitVersion() : tx.xidVersion(); + boolean locked = lockKeys.isEmpty(); - // If locks haven't been acquired yet, keep waiting. - if (!cached.lockedBy(ver)) { - if (log.isDebugEnabled()) - log.debug("Transaction entry is not locked by transaction (will wait) [entry=" + cached + - ", tx=" + tx + ']'); - - return false; - } - - break; // While. - } - // Possible if entry cached within transaction is obsolete. - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in future onAllReplies method (will retry): " + txEntry); - - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes()); - } - } + if (locked) { + if (log.isDebugEnabled()) + log.debug("All locks are acquired for near prepare future: " + this); + } + else { + if (log.isDebugEnabled()) + log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']'); } - - if (log.isDebugEnabled()) - log.debug("All locks are acquired for near prepare future: " + this); return true; } @@ -564,7 +541,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut * */ private void preparePessimistic() { - Map<ClusterNode, GridDistributedTxMapping<K, V>> mappings = new HashMap<>(); + Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping<K, V>> mappings = new HashMap<>(); long topVer = tx.topologyVersion(); @@ -577,12 +554,18 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut ClusterNode primary = F.first(nodes); - GridDistributedTxMapping<K, V> nodeMapping = mappings.get(primary); + boolean near = cacheCtx.isNear(); + + IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, near); + + GridDistributedTxMapping<K, V> nodeMapping = mappings.get(key); if (nodeMapping == null) { nodeMapping = new GridDistributedTxMapping<>(primary); - mappings.put(primary, nodeMapping); + nodeMapping.near(cacheCtx.isNear()); + + mappings.put(key, nodeMapping); } txEntry.nodeId(primary.id()); @@ -663,13 +646,6 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut tx.addDhtVersion(m.node().id(), dhtTx.xidVersion()); m.dhtVersion(dhtTx.xidVersion()); - - GridCacheVersion min = dhtTx.minVersion(); - - IgniteTxManager<K, V> tm = cctx.tm(); - - tx.readyNearLocks(m, Collections.<GridCacheVersion>emptyList(), - tm.committedVersions(min), tm.rolledbackVersions(min)); } tx.implicitSingleResult(dhtTx.implicitSingleResult()); @@ -821,8 +797,9 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut IgniteTxManager<K, V> tm = cctx.tm(); - tx.readyNearLocks(m, Collections.<GridCacheVersion>emptyList(), - tm.committedVersions(min), tm.rolledbackVersions(min)); + if (m.near()) + tx.readyNearLocks(m, Collections.<GridCacheVersion>emptyList(), + tm.committedVersions(min), tm.rolledbackVersions(min)); } // Continue prepare before completing the future. @@ -902,6 +879,9 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut entry.nodeId(primary.id()); if (cacheCtx.isNear()) { + if (entry.explicitVersion() == null) + lockKeys.add(entry.txKey()); + while (true) { try { GridNearCacheEntry<K, V> cached = (GridNearCacheEntry<K, V>)entry.cached(); @@ -1083,7 +1063,8 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut m.dhtVersion(res.dhtVersion()); - tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions()); + if (m.near()) + tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions()); } // Proceed prepare before finishing mini future. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a995a3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index f097833..e481f84 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1360,9 +1360,11 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * @param expiryPlc Expiry policy. * @return Expiry policy wrapper for entries accessed locally in optimistic transaction. */ - protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext ctx, - IgniteTxKey key, - @Nullable ExpiryPolicy expiryPlc) { + protected IgniteCacheExpiryPolicy accessPolicy( + GridCacheContext ctx, + IgniteTxKey<K> key, + @Nullable ExpiryPolicy expiryPlc + ) { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a995a3a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 237ee2f..5526c7b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -4477,10 +4477,13 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract try { grid(0).jcache(null).withExpiryPolicy(expiry).put(key, 1); + + if (tx != null) + tx.commit(); } finally { if (tx != null) - tx.commit(); + tx.close(); } long[] expireTimes = new long[gridCount()];