Repository: incubator-ignite Updated Branches: refs/heads/ignite-579 4833ec989 -> 9e3557a63
IGNITE-589 - Use correct topology version Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d11b2110 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d11b2110 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d11b2110 Branch: refs/heads/ignite-579 Commit: d11b21106f0a1dd9f0af4d5dd9a0175282e9dabe Parents: 42e467c Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Wed Mar 25 11:20:33 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Wed Mar 25 11:20:33 2015 -0700 ---------------------------------------------------------------------- .../processors/cache/GridCacheEntryEx.java | 1 + .../processors/cache/GridCacheMapEntry.java | 11 +++++----- .../GridDistributedTxRemoteAdapter.java | 16 +++++++++----- .../dht/atomic/GridDhtAtomicCache.java | 3 +++ .../distributed/near/GridNearAtomicCache.java | 2 ++ .../distributed/near/GridNearCacheEntry.java | 22 ++++++++++---------- .../distributed/near/GridNearLockFuture.java | 4 ++-- .../near/GridNearTxPrepareFuture.java | 2 +- .../processors/cache/GridCacheTestEntryEx.java | 1 + 9 files changed, 38 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d11b2110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index f4edb8c..d0c701c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -436,6 +436,7 @@ public interface GridCacheEntryEx { boolean metrics, boolean primary, boolean checkVer, + AffinityTopologyVersion topVer, @Nullable CacheEntryPredicate[] filter, GridDrType drType, long conflictTtl, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d11b2110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 2f5a5b2..948f916 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -934,7 +934,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { /** * @param nodeId Node ID. */ - protected void recordNodeId(UUID nodeId) { + protected void recordNodeId(UUID nodeId, AffinityTopologyVersion topVer) { // No-op. } @@ -1050,7 +1050,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { drReplicate(drType, val, newVer); - recordNodeId(affNodeId); + recordNodeId(affNodeId, topVer); if (metrics && cctx.cache().configuration().isStatisticsEnabled()) cctx.cache().metrics0().onWrite(); @@ -1254,7 +1254,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { log.debug("Entry could not be marked obsolete (it is still used): " + this); } else { - recordNodeId(affNodeId); + recordNodeId(affNodeId, topVer); // If entry was not marked obsolete, then removed lock // will be registered whenever removeLock is called. @@ -1602,6 +1602,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { boolean metrics, boolean primary, boolean verCheck, + AffinityTopologyVersion topVer, @Nullable CacheEntryPredicate[] filter, GridDrType drType, long explicitTtl, @@ -2062,7 +2063,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { drReplicate(drType, updated, newVer); - recordNodeId(affNodeId); + recordNodeId(affNodeId, topVer); if (evt) { CacheObject evtOld = null; @@ -2146,7 +2147,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { clearReaders(); - recordNodeId(affNodeId); + recordNodeId(affNodeId, topVer); drReplicate(drType, null, newVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d11b2110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index ceb8a7c..51d85a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -571,7 +571,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter val0, cached.expireTime(), cached.ttl(), - nodeId); + nodeId, + topVer); } } } @@ -582,7 +583,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter // Keep near entry up to date. if (nearCached != null) - nearCached.updateOrEvict(xidVer, null, 0, 0, nodeId); + nearCached.updateOrEvict(xidVer, null, 0, 0, nodeId, topVer); } else if (op == RELOAD) { CacheObject reloaded = cached.innerReload(); @@ -590,8 +591,12 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter if (nearCached != null) { nearCached.innerReload(); - nearCached.updateOrEvict(cached.version(), reloaded, - cached.expireTime(), cached.ttl(), nodeId); + nearCached.updateOrEvict(cached.version(), + reloaded, + cached.expireTime(), + cached.ttl(), + nodeId, + topVer); } } else if (op == READ) { @@ -617,7 +622,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter val0, cached.expireTime(), cached.ttl(), - nodeId); + nodeId, + topVer); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d11b2110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 35319ab..cf317a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1736,6 +1736,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, primary, ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node. + topVer, req.filter(), replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE, newConflictTtl, @@ -2008,6 +2009,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, primary, ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node. + topVer, null, replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE, CU.TTL_NOT_CHANGED, @@ -2511,6 +2513,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /*metrics*/true, /*primary*/false, /*check version*/!req.forceTransformBackups(), + req.topologyVersion(), CU.empty0(), replicate ? DR_BACKUP : DR_NONE, ttl, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d11b2110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 27980d2..6e24261 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -228,6 +228,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /*metrics*/true, /*primary*/false, /*check version*/true, + topVer, CU.empty0(), DR_NONE, ttl, @@ -324,6 +325,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /*metrics*/true, /*primary*/false, /*check version*/!req.forceTransformBackups(), + req.topologyVersion(), CU.empty0(), DR_NONE, ttl, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d11b2110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 1d0add9..ce80ad9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -170,7 +170,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { } } - recordNodeId(cctx.affinity().primary(key, topVer).id()); + recordNodeId(cctx.affinity().primary(key, topVer).id(), topVer); dhtVer = e.isNew() || e.isDeleted() ? null : e.version(); @@ -206,7 +206,8 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { public boolean resetFromPrimary(CacheObject val, GridCacheVersion ver, GridCacheVersion dhtVer, - UUID primaryNodeId) + UUID primaryNodeId, + AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException, IgniteCheckedException { assert dhtVer != null; @@ -216,7 +217,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { synchronized (this) { checkObsolete(); - primaryNode(primaryNodeId); + primaryNode(primaryNodeId, topVer); if (!F.eq(this.dhtVer, dhtVer)) { value(val); @@ -244,7 +245,8 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { @Nullable CacheObject val, long expireTime, long ttl, - UUID primaryNodeId) + UUID primaryNodeId, + AffinityTopologyVersion topVer) { assert dhtVer != null; @@ -264,7 +266,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { ttlAndExpireTimeExtras((int) ttl, expireTime); - primaryNode(primaryNodeId); + primaryNode(primaryNodeId, topVer); } } } @@ -299,10 +301,10 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override protected void recordNodeId(UUID primaryNodeId) { + @Override protected void recordNodeId(UUID primaryNodeId, AffinityTopologyVersion topVer) { assert Thread.holdsLock(this); - primaryNode(primaryNodeId); + primaryNode(primaryNodeId, topVer); } /** @@ -376,7 +378,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { boolean hasVal = hasValueUnlocked(); if (isNew() || !valid || expVer == null || expVer.equals(this.dhtVer)) { - primaryNode(primaryNodeId); + primaryNode(primaryNodeId, topVer); // Change entry only if dht version has changed. if (!dhtVer.equals(dhtVersion())) { @@ -621,12 +623,10 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { /** * @param nodeId Primary node ID. */ - private void primaryNode(UUID nodeId) { + private void primaryNode(UUID nodeId, AffinityTopologyVersion topVer) { assert Thread.holdsLock(this); assert nodeId != null; - AffinityTopologyVersion topVer = cctx.discovery().topologyVersionEx(); - ClusterNode primary = cctx.affinity().primary(part, topVer); if (primary == null || !nodeId.equals(primary.id())) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d11b2110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index c3d138e..d6d1a1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -1017,7 +1017,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B // Lock is held at this point, so we can set the // returned value if any. - entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id()); + entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer.get()); entry.readyNearLock(lockVer, mappedVer, res.committedVersions(), res.rolledbackVersions(), res.pending()); @@ -1369,7 +1369,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B // Lock is held at this point, so we can set the // returned value if any. - entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id()); + entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer); if (inTx() && implicitTx() && tx.onePhaseCommit()) { boolean pass = res.filterResult(i); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d11b2110/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index 4d70afb..a659b3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -933,7 +933,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut CacheVersionedValue tup = entry.getValue(); nearEntry.resetFromPrimary(tup.value(), tx.xidVersion(), - tup.version(), m.node().id()); + tup.version(), m.node().id(), tx.topologyVersion()); } else if (txEntry.cached().detached()) { GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d11b2110/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index bacd832..345b6c6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -493,6 +493,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr boolean metrics, boolean primary, boolean checkVer, + AffinityTopologyVersion topVer, @Nullable CacheEntryPredicate[] filter, GridDrType drType, long conflictTtl,