IGNITE-104 - Ordered ATOMIC updates

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bc394436
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bc394436
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bc394436

Branch: refs/heads/ignite-104
Commit: bc394436a89c945cbca1c4f158b00f19951cf47e
Parents: 6c7358d
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Wed Aug 5 17:05:35 2015 -0700
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Wed Aug 5 17:05:35 2015 -0700

----------------------------------------------------------------------
 .../dht/atomic/GridAtomicRequestTopic.java      | 44 ++++++++++++++++----
 .../dht/atomic/GridDhtAtomicCache.java          |  8 ++--
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  2 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  2 +-
 4 files changed, 42 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc394436/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java
index 9feb409..709f739 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java
@@ -28,13 +28,41 @@ class GridAtomicRequestTopic implements Externalizable {
     private static final long serialVersionUID = 0L;
 
     /** */
+    private static final byte NEAR_UPDATE_REQ = 1;
+
+    /** */
+    private static final byte DHT_UPDATE_REQ = 2;
+
+    /** */
     private int cacheId;
 
     /** */
     private int part;
 
     /** */
-    private boolean near;
+    private byte type;
+
+    /**
+     * Near request topic.
+     *
+     * @param cacheId Cache ID.
+     * @param part Partition.
+     * @return Topic.
+     */
+    static GridAtomicRequestTopic nearUpdateRequest(int cacheId, int part) {
+        return new GridAtomicRequestTopic(cacheId, part, NEAR_UPDATE_REQ);
+    }
+
+    /**
+     * DHT request topic.
+     *
+     * @param cacheId Cache ID.
+     * @param part Partition.
+     * @return Topic.
+     */
+    static GridAtomicRequestTopic dhtUpdateRequest(int cacheId, int part) {
+        return new GridAtomicRequestTopic(cacheId, part, DHT_UPDATE_REQ);
+    }
 
     /**
      * For {@link Externalizable}.
@@ -46,12 +74,12 @@ class GridAtomicRequestTopic implements Externalizable {
     /**
      * @param cacheId Cache ID.
      * @param part Partition.
-     * @param near Near flag.
+     * @param type Type.
      */
-    GridAtomicRequestTopic(int cacheId, int part, boolean near) {
+    private GridAtomicRequestTopic(int cacheId, int part, byte type) {
         this.cacheId = cacheId;
         this.part = part;
-        this.near = near;
+        this.type = type;
     }
 
     /** {@inheritDoc} */
@@ -62,7 +90,7 @@ class GridAtomicRequestTopic implements Externalizable {
 
         GridAtomicRequestTopic topic = (GridAtomicRequestTopic)o;
 
-        return cacheId == topic.cacheId && part == topic.part && near == 
topic.near;
+        return cacheId == topic.cacheId && part == topic.part && type == 
topic.type;
     }
 
     /** {@inheritDoc} */
@@ -70,7 +98,7 @@ class GridAtomicRequestTopic implements Externalizable {
         int res = cacheId;
 
         res = 31 * res + part;
-        res = 31 * res + (near ? 1 : 0);
+        res = 31 * res + type;
 
         return res;
     }
@@ -79,14 +107,14 @@ class GridAtomicRequestTopic implements Externalizable {
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeInt(cacheId);
         out.writeInt(part);
-        out.writeBoolean(near);
+        out.writeByte(type);
     }
 
     /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
         cacheId = in.readInt();
         part = in.readInt();
-        near = in.readBoolean();
+        type = in.readByte();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc394436/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 01694d7..6949ae2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -188,7 +188,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         });
 
         for (int part = 0; part < ctx.affinity().partitions(); part++) {
-            Object nearTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, 
true);
+            Object nearTopic = 
GridAtomicRequestTopic.nearUpdateRequest(ctx.cacheId(), part);
 
             ctx.io().addPerTopicHandler(nearTopic, new CI2<UUID, 
GridNearAtomicUpdateRequest>() {
                 @Override public void apply(UUID nodeId, 
GridNearAtomicUpdateRequest req) {
@@ -196,7 +196,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                 }
             });
 
-            Object dhtTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, 
false);
+            Object dhtTopic = 
GridAtomicRequestTopic.dhtUpdateRequest(ctx.cacheId(), part);
 
             ctx.io().addPerTopicHandler(dhtTopic, new CI2<UUID, 
GridDhtAtomicUpdateRequest>() {
                 @Override public void apply(UUID nodeId, 
GridDhtAtomicUpdateRequest req) {
@@ -239,8 +239,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             buf.finish();
 
         for (int part = 0; part < ctx.affinity().partitions(); part++) {
-            ctx.io().removePerTopicHandler(new 
GridAtomicRequestTopic(ctx.cacheId(), part, true));
-            ctx.io().removePerTopicHandler(new 
GridAtomicRequestTopic(ctx.cacheId(), part, false));
+            
ctx.io().removePerTopicHandler(GridAtomicRequestTopic.nearUpdateRequest(ctx.cacheId(),
 part));
+            
ctx.io().removePerTopicHandler(GridAtomicRequestTopic.dhtUpdateRequest(ctx.cacheId(),
 part));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc394436/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 7823a52..63edcaa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -352,7 +352,7 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
                         log.debug("Sending DHT atomic update request [nodeId=" 
+ nodeId + ", req=" + req + ']');
 
                     if (part >= 0) {
-                        Object topic = new 
GridAtomicRequestTopic(cctx.cacheId(), part, false);
+                        Object topic = 
GridAtomicRequestTopic.dhtUpdateRequest(cctx.cacheId(), part);
 
                         cctx.io().sendSequentialMessage(nodeId, topic, req, 
cctx.ioPolicy());
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc394436/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 5b364a5..66f0300 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -1091,7 +1091,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
     private void sendRequest(GridAtomicMappingKey mappingKey, 
GridNearAtomicUpdateRequest req)
         throws IgniteCheckedException {
         if (mappingKey.partition() >= 0) {
-            Object topic = new GridAtomicRequestTopic(cctx.cacheId(), 
mappingKey.partition(), true);
+            Object topic = 
GridAtomicRequestTopic.nearUpdateRequest(cctx.cacheId(), 
mappingKey.partition());
 
             cctx.io().sendSequentialMessage(mappingKey.nodeId(), topic, req, 
cctx.ioPolicy());
         }

Reply via email to