IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/73f1be64 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/73f1be64 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/73f1be64 Branch: refs/heads/ignite-104 Commit: 73f1be64fe7beef1b687aaadcc3184991e49fa10 Parents: b181411 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Wed Jul 22 18:38:45 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Wed Jul 22 18:38:45 2015 -0700 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 9 ++++-- .../dht/atomic/GridDhtAtomicCache.java | 31 ++++++++++++++------ .../distributed/near/GridNearAtomicCache.java | 22 +++++++++++--- 3 files changed, 46 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/73f1be64/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 4680994..b0237f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1611,7 +1611,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { CacheObject oldVal; CacheObject updated; - GridCacheVersion enqueueVer = null; + GridCacheVersion rmvVer = null; GridCacheVersionConflictContext<?, ?> conflictCtx = null; @@ -2120,7 +2120,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } } - enqueueVer = newVer; + rmvVer = newVer; boolean hasValPtr = hasOffHeapPointer(); @@ -2163,6 +2163,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } } + if (!cctx.deferredDelete()) + markObsolete(rmvVer); + res = hadVal; } @@ -2194,7 +2197,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { invokeRes, newSysTtl, newSysExpireTime, - enqueueVer, + rmvVer, conflictCtx, true); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/73f1be64/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index aaf373d..fb309c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1181,13 +1181,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (locked != null) unlockEntries(locked, req.topologyVersion()); - // Enqueue if necessary after locks release. if (deleted != null) { assert !deleted.isEmpty(); - assert ctx.deferredDelete(this) : this; - for (IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion> e : deleted) - ctx.onDeferredDelete(e.get1(), e.get2()); + boolean deferred = ctx.deferredDelete(); + + for (IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion> e : deleted) { + if (deferred) + ctx.onDeferredDelete(e.get1(), e.get2()); + else { + GridDhtCacheEntry entry = e.get1(); + + assert entry.obsolete(); + + removeEntry(entry); + } + } } } } @@ -2182,9 +2191,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param topVer Topology version. */ private void unlockEntries(Collection<GridDhtCacheEntry> locked, AffinityTopologyVersion topVer) { - // Process deleted entries before locks release. - assert ctx.deferredDelete(this) : this; - // Entries to skip eviction manager notification for. // Enqueue entries while holding locks. Collection<KeyCacheObject> skip = null; @@ -2468,8 +2474,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), taskName); - if (updRes.removeVersion() != null) - ctx.onDeferredDelete(entry, updRes.removeVersion()); + if (updRes.removeVersion() != null) { + if (ctx.deferredDelete()) + ctx.onDeferredDelete(entry, updRes.removeVersion()); + else { + assert entry.obsolete(); + + removeEntry(entry); + } + } entry.onUnlock(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/73f1be64/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index ed07f8f..f8fa573 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -241,8 +241,15 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { subjId, taskName); - if (updRes.removeVersion() != null) - ctx.onDeferredDelete(entry, updRes.removeVersion()); + if (updRes.removeVersion() != null) { + if (ctx.deferredDelete()) + ctx.onDeferredDelete(entry, updRes.removeVersion()); + else { + assert entry.obsolete(); + + removeEntry(entry); + } + } break; // While. } @@ -339,8 +346,15 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { req.subjectId(), taskName); - if (updRes.removeVersion() != null) - ctx.onDeferredDelete(entry, updRes.removeVersion()); + if (updRes.removeVersion() != null) { + if (ctx.deferredDelete()) + ctx.onDeferredDelete(entry, updRes.removeVersion()); + else { + assert entry.obsolete(); + + removeEntry(entry); + } + } break; }