Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-104 9dec3b7b1 -> 2ac79893c


IGNITE-104 - Ordered ATOMIC updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2ac79893
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2ac79893
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2ac79893

Branch: refs/heads/ignite-104
Commit: 2ac79893cc21517c73e85a1bcc18c33b59140fb2
Parents: 9dec3b7
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Tue Jul 21 21:12:23 2015 -0700
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Tue Jul 21 21:12:23 2015 -0700

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 36 +++++++++++---------
 1 file changed, 19 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ac79893/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index bdaa994..aaf373d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -188,24 +188,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                         processNearAtomicUpdateRequest(nodeId, req);
                     }
                 });
-            }
-        }
-        else {
-            ctx.io().addHandler(ctx.cacheId(), 
GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() 
{
-                @Override public void apply(UUID nodeId, 
GridNearAtomicUpdateRequest req) {
-                    processNearAtomicUpdateRequest(nodeId, req);
-                }
-            });
-        }
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, 
new CI2<UUID, GridNearAtomicUpdateResponse>() {
-            @Override public void apply(UUID nodeId, 
GridNearAtomicUpdateResponse res) {
-                processNearAtomicUpdateResponse(nodeId, res);
-            }
-        });
-
-        if (ctx.config().isAtomicOrderedUpdates()) {
-            for (int part = 0; part < ctx.affinity().partitions(); part++) {
                 ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part, 
false), new CI2<UUID, GridDhtAtomicUpdateRequest>() {
                     @Override public void apply(UUID nodeId, 
GridDhtAtomicUpdateRequest req) {
                         processDhtAtomicUpdateRequest(nodeId, req);
@@ -214,6 +197,12 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             }
         }
         else {
+            ctx.io().addHandler(ctx.cacheId(), 
GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() 
{
+                @Override public void apply(UUID nodeId, 
GridNearAtomicUpdateRequest req) {
+                    processNearAtomicUpdateRequest(nodeId, req);
+                }
+            });
+
             ctx.io().addHandler(ctx.cacheId(), 
GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
                 @Override public void apply(UUID nodeId, 
GridDhtAtomicUpdateRequest req) {
                     processDhtAtomicUpdateRequest(nodeId, req);
@@ -221,6 +210,12 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             });
         }
 
+        ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, 
new CI2<UUID, GridNearAtomicUpdateResponse>() {
+            @Override public void apply(UUID nodeId, 
GridNearAtomicUpdateResponse res) {
+                processNearAtomicUpdateResponse(nodeId, res);
+            }
+        });
+
         ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateResponse.class, 
new CI2<UUID, GridDhtAtomicUpdateResponse>() {
             @Override public void apply(UUID nodeId, 
GridDhtAtomicUpdateResponse res) {
                 processDhtAtomicUpdateResponse(nodeId, res);
@@ -247,6 +242,13 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
     @Override public void stop() {
         for (DeferredResponseBuffer buf : pendingResponses.values())
             buf.finish();
+
+        if (ctx.config().isAtomicOrderedUpdates()) {
+            for (int part = 0; part < ctx.affinity().partitions(); part++) {
+                ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, 
part, true));
+                ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, 
part, false));
+            }
+        }
     }
 
     /**

Reply via email to