IGNITE-104 - Ordered ATOMIC updates

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0f54d4c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0f54d4c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0f54d4c4

Branch: refs/heads/ignite-104
Commit: 0f54d4c4530ee750a6d333d5346832def9556830
Parents: 73f1be6
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Wed Jul 22 19:48:38 2015 -0700
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Wed Jul 22 19:48:38 2015 -0700

----------------------------------------------------------------------
 .../internal/processors/cache/GridCacheMapEntry.java      |  6 ++++--
 .../cache/distributed/dht/atomic/GridDhtAtomicCache.java  | 10 ++++++----
 .../distributed/dht/atomic/GridDhtAtomicUpdateFuture.java |  3 ++-
 .../cache/distributed/near/GridNearAtomicCache.java       |  4 ++++
 4 files changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0f54d4c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index b0237f8..78e975e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2164,7 +2164,7 @@ public abstract class GridCacheMapEntry implements 
GridCacheEntryEx {
                 }
 
                 if (!cctx.deferredDelete())
-                    markObsolete(rmvVer);
+                    markObsolete0(rmvVer, true);
 
                 res = hadVal;
             }
@@ -4016,7 +4016,9 @@ public abstract class GridCacheMapEntry implements 
GridCacheEntryEx {
      */
     protected void deletedUnlocked(boolean deleted) {
         assert Thread.holdsLock(this);
-        assert cctx.deferredDelete();
+
+        if (!cctx.deferredDelete())
+            return;
 
         if (deleted) {
             assert !deletedUnlocked() : this;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0f54d4c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index fb309c3..bb036da 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1194,6 +1194,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
                             assert entry.obsolete();
 
+                            entry.onMarkedObsolete();
+
                             removeEntry(entry);
                         }
                     }
@@ -1749,12 +1751,10 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                         else if (conflictCtx.isMerge())
                             newConflictVer = null; // Conflict version is 
discarded in case of merge.
 
-                        EntryProcessor<Object, Object, Object> entryProcessor 
= null;
-
                         if (!readersOnly) {
                             dhtFut.addWriteEntry(entry,
                                 updRes.newValue(),
-                                entryProcessor,
+                                op == TRANSFORM ? req.entryProcessor(i) : null,
                                 updRes.newTtl(),
                                 updRes.conflictExpireTime(),
                                 newConflictVer);
@@ -1764,7 +1764,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                             dhtFut.addNearWriteEntries(filteredReaders,
                                 entry,
                                 updRes.newValue(),
-                                entryProcessor,
+                                null,
                                 updRes.newTtl(),
                                 updRes.conflictExpireTime());
                     }
@@ -2480,6 +2480,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                             else {
                                 assert entry.obsolete();
 
+                                entry.onMarkedObsolete();
+
                                 removeEntry(entry);
                             }
                         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0f54d4c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index f1d37f5..3a68263 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -123,7 +123,8 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
         waitForExchange = !topLocked;
 
         // We can send entry processor instead of value to backup if updates 
are ordered.
-        forceTransformBackups = cctx.config().isAtomicOrderedUpdates();
+        forceTransformBackups = updateReq.operation() == 
GridCacheOperation.TRANSFORM &&
+            cctx.config().isAtomicOrderedUpdates();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0f54d4c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index f8fa573..ab3b06f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -247,6 +247,8 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
                         else {
                             assert entry.obsolete();
 
+                            entry.onMarkedObsolete();
+
                             removeEntry(entry);
                         }
                     }
@@ -352,6 +354,8 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
                             else {
                                 assert entry.obsolete();
 
+                                entry.onMarkedObsolete();
+
                                 removeEntry(entry);
                             }
                         }

Reply via email to