IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0f54d4c4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0f54d4c4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0f54d4c4 Branch: refs/heads/ignite-104 Commit: 0f54d4c4530ee750a6d333d5346832def9556830 Parents: 73f1be6 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Wed Jul 22 19:48:38 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Wed Jul 22 19:48:38 2015 -0700 ---------------------------------------------------------------------- .../internal/processors/cache/GridCacheMapEntry.java | 6 ++++-- .../cache/distributed/dht/atomic/GridDhtAtomicCache.java | 10 ++++++---- .../distributed/dht/atomic/GridDhtAtomicUpdateFuture.java | 3 ++- .../cache/distributed/near/GridNearAtomicCache.java | 4 ++++ 4 files changed, 16 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0f54d4c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index b0237f8..78e975e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -2164,7 +2164,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } if (!cctx.deferredDelete()) - markObsolete(rmvVer); + markObsolete0(rmvVer, true); res = hadVal; } @@ -4016,7 +4016,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { */ protected void deletedUnlocked(boolean deleted) { assert Thread.holdsLock(this); - assert cctx.deferredDelete(); + + if (!cctx.deferredDelete()) + return; if (deleted) { assert !deletedUnlocked() : this; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0f54d4c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index fb309c3..bb036da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1194,6 +1194,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { assert entry.obsolete(); + entry.onMarkedObsolete(); + removeEntry(entry); } } @@ -1749,12 +1751,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { else if (conflictCtx.isMerge()) newConflictVer = null; // Conflict version is discarded in case of merge. - EntryProcessor<Object, Object, Object> entryProcessor = null; - if (!readersOnly) { dhtFut.addWriteEntry(entry, updRes.newValue(), - entryProcessor, + op == TRANSFORM ? req.entryProcessor(i) : null, updRes.newTtl(), updRes.conflictExpireTime(), newConflictVer); @@ -1764,7 +1764,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { dhtFut.addNearWriteEntries(filteredReaders, entry, updRes.newValue(), - entryProcessor, + null, updRes.newTtl(), updRes.conflictExpireTime()); } @@ -2480,6 +2480,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { else { assert entry.obsolete(); + entry.onMarkedObsolete(); + removeEntry(entry); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0f54d4c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index f1d37f5..3a68263 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -123,7 +123,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> waitForExchange = !topLocked; // We can send entry processor instead of value to backup if updates are ordered. - forceTransformBackups = cctx.config().isAtomicOrderedUpdates(); + forceTransformBackups = updateReq.operation() == GridCacheOperation.TRANSFORM && + cctx.config().isAtomicOrderedUpdates(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0f54d4c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index f8fa573..ab3b06f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -247,6 +247,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { else { assert entry.obsolete(); + entry.onMarkedObsolete(); + removeEntry(entry); } } @@ -352,6 +354,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { else { assert entry.obsolete(); + entry.onMarkedObsolete(); + removeEntry(entry); } }