IGNITE-104 - Ordered ATOMIC updates

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9781ea43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9781ea43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9781ea43

Branch: refs/heads/ignite-104
Commit: 9781ea4384a553e5126b8a7320f7070f6a340809
Parents: 7c73fc5
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Wed Jul 29 17:57:49 2015 -0700
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Wed Jul 29 17:57:49 2015 -0700

----------------------------------------------------------------------
 .../org/apache/ignite/internal/GridTopic.java   |  17 +-
 .../processors/cache/GridCacheIoManager.java    |   3 +-
 .../processors/cache/GridCacheUtils.java        |   4 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  29 ++--
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   2 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 172 ++++++-------------
 .../dht/atomic/GridNearAtomicUpdateRequest.java |  27 +--
 .../atomic/GridNearAtomicUpdateResponse.java    |  28 +--
 8 files changed, 87 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 3cf92f8..e9da40c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -185,11 +185,10 @@ public enum GridTopic {
     /**
      * @param id1 ID1.
      * @param id2 ID2.
-     * @param id3 ID3.
      * @return Grid message topic with specified IDs.
      */
-    public Object topic(int id1, int id2, byte id3) {
-        return new T9(this, id1, id2, id3);
+    public Object topic(int id1, int id2) {
+        return new T9(this, id1, id2);
     }
 
     /**
@@ -782,9 +781,6 @@ public enum GridTopic {
         /** */
         private int id2;
 
-        /** */
-        private int id3;
-
         /**
          * No-arg constructor needed for {@link Serializable}.
          */
@@ -796,13 +792,11 @@ public enum GridTopic {
          * @param topic Topic.
          * @param id1 ID1.
          * @param id2 ID2.
-         * @param id3 ID3.
          */
-        private T9(GridTopic topic, int id1, int id2, byte id3) {
+        private T9(GridTopic topic, int id1, int id2) {
             this.topic = topic;
             this.id1 = id1;
             this.id2 = id2;
-            this.id3 = id3;
         }
 
         /** {@inheritDoc} */
@@ -811,7 +805,6 @@ public enum GridTopic {
 
             res += 31 * res + id1;
             res += 31 * res + id2;
-            res += 31 * res + id3;
 
             return res;
         }
@@ -821,7 +814,7 @@ public enum GridTopic {
             if (obj.getClass() == T9.class) {
                 T9 that = (T9)obj;
 
-                return topic == that.topic && id1 == that.id1 && id2 == 
that.id2 && id3 == that.id3;
+                return topic == that.topic && id1 == that.id1 && id2 == 
that.id2;
             }
 
             return false;
@@ -832,7 +825,6 @@ public enum GridTopic {
             out.writeByte(topic.ordinal());
             out.writeInt(id1);
             out.writeInt(id2);
-            out.writeByte(id3);
         }
 
         /** {@inheritDoc} */
@@ -840,7 +832,6 @@ public enum GridTopic {
             topic = fromOrdinal(in.readByte());
             id1 = in.readInt();
             id2 = in.readInt();
-            id3 = in.readByte();
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index dec6aef..5858424 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -394,8 +394,7 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
                 GridNearAtomicUpdateResponse res = new 
GridNearAtomicUpdateResponse(
                     ctx.cacheId(),
                     nodeId,
-                    req.futureVersion(),
-                    req.partition());
+                    req.futureVersion());
 
                 res.error(req.classError());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 96df7c5..d82acca 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1739,9 +1739,9 @@ public class GridCacheUtils {
      * @param part Partition.
      * @return Per-partition message topic.
      */
-    public static Object partitionMessageTopic(GridCacheContext ctx, int part, 
boolean nearMsg) {
+    public static Object partitionMessageTopic(GridCacheContext ctx, int part) 
{
         assert part >= 0;
 
-        return TOPIC_CACHE.topic(ctx.cacheId(), part, (byte)(nearMsg ? 1 : 0));
+        return TOPIC_CACHE.topic(ctx.cacheId(), part);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 31606b2..3084e68 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -181,15 +181,15 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             }
         });
 
+        ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, 
new CI2<UUID, GridNearAtomicUpdateRequest>() {
+            @Override public void apply(UUID nodeId, 
GridNearAtomicUpdateRequest req) {
+                processNearAtomicUpdateRequest(nodeId, req);
+            }
+        });
+
         if (ctx.config().isAtomicOrderedUpdates()) {
             for (int part = 0; part < ctx.affinity().partitions(); part++) {
-                ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part, 
true), new CI2<UUID, GridNearAtomicUpdateRequest>() {
-                    @Override public void apply(UUID nodeId, 
GridNearAtomicUpdateRequest req) {
-                        processNearAtomicUpdateRequest(nodeId, req);
-                    }
-                });
-
-                ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part, 
false), new CI2<UUID, GridDhtAtomicUpdateRequest>() {
+                ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, 
part), new CI2<UUID, GridDhtAtomicUpdateRequest>() {
                     @Override public void apply(UUID nodeId, 
GridDhtAtomicUpdateRequest req) {
                         processDhtAtomicUpdateRequest(nodeId, req);
                     }
@@ -197,12 +197,6 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             }
         }
         else {
-            ctx.io().addHandler(ctx.cacheId(), 
GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() 
{
-                @Override public void apply(UUID nodeId, 
GridNearAtomicUpdateRequest req) {
-                    processNearAtomicUpdateRequest(nodeId, req);
-                }
-            });
-
             ctx.io().addHandler(ctx.cacheId(), 
GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
                 @Override public void apply(UUID nodeId, 
GridDhtAtomicUpdateRequest req) {
                     processDhtAtomicUpdateRequest(nodeId, req);
@@ -244,10 +238,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             buf.finish();
 
         if (ctx.config().isAtomicOrderedUpdates()) {
-            for (int part = 0; part < ctx.affinity().partitions(); part++) {
-                ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, 
part, true));
-                ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, 
part, false));
-            }
+            for (int part = 0; part < ctx.affinity().partitions(); part++)
+                ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, 
part));
         }
     }
 
@@ -1041,8 +1033,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
     ) {
         GridNearAtomicUpdateResponse res = new 
GridNearAtomicUpdateResponse(ctx.cacheId(),
             nodeId,
-            req.futureVersion(),
-            req.partition());
+            req.futureVersion());
 
         List<KeyCacheObject> keys = req.keys();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index ab0c2e1..8595dc7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -359,7 +359,7 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
                         log.debug("Sending DHT atomic update request [nodeId=" 
+ nodeId + ", req=" + req + ']');
 
                     if (part >= 0) {
-                        Object topic = CU.partitionMessageTopic(cctx, part, 
false);
+                        Object topic = CU.partitionMessageTopic(cctx, part);
 
                         cctx.io().sendOrderedMessage(nodeId, topic, req, 
cctx.ioPolicy(),
                             2 * cctx.gridConfig().getNetworkTimeout());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 9b2a5e2..4c8a161 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -90,7 +90,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
 
     /** Mappings. */
     @GridToStringInclude
-    private ConcurrentMap<GridAtomicMappingKey, GridNearAtomicUpdateRequest> 
mappings;
+    private ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings;
 
     /** Error. */
     private volatile CachePartialUpdateCheckedException err;
@@ -246,11 +246,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
 
     /** {@inheritDoc} */
     @Override public Collection<? extends ClusterNode> nodes() {
-        return F.view(F.viewReadOnly(mappings.keySet(), new 
C1<GridAtomicMappingKey, ClusterNode>() {
-            @Override public ClusterNode apply(GridAtomicMappingKey 
mappingKey) {
-                return 
cctx.kernalContext().discovery().node(mappingKey.nodeId());
-            }
-        }), F.notNull());
+        return F.view(F.viewReadOnly(mappings.keySet(), 
U.id2Node(cctx.kernalContext())), F.notNull());
     }
 
     /**
@@ -287,24 +283,13 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
             return false;
         }
 
-        Collection<GridAtomicMappingKey> mappingKeys = new 
ArrayList<>(mappings.size());
-        Collection<KeyCacheObject> failedKeys = new ArrayList<>();
-
-        for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : 
mappings.entrySet()) {
-            if (e.getKey().nodeId().equals(nodeId)) {
-                mappingKeys.add(e.getKey());
-
-                failedKeys.addAll(e.getValue().keys());
-            }
-        }
+        GridNearAtomicUpdateRequest req = mappings.get(nodeId);
 
-        if (!mappingKeys.isEmpty()) {
-            if (!failedKeys.isEmpty())
-                addFailedKeys(failedKeys, new 
ClusterTopologyCheckedException("Primary node left grid before " +
-                    "response is received: " + nodeId));
+        if (req != null) {
+            addFailedKeys(req.keys(), new 
ClusterTopologyCheckedException("Primary node left grid before response is " +
+                "received: " + nodeId));
 
-            for (GridAtomicMappingKey key : mappingKeys)
-                mappings.remove(key);
+            mappings.remove(nodeId);
 
             checkComplete();
 
@@ -544,9 +529,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
             }
         }
         else {
-            GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, 
res.partition());
-
-            GridNearAtomicUpdateRequest req = mappings.get(mappingKey);
+            GridNearAtomicUpdateRequest req = mappings.get(nodeId);
 
             if (req != null) { // req can be null if onResult is being 
processed concurrently with onNodeLeft.
                 updateNear(req, res);
@@ -564,7 +547,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                         opRes = ret;
                 }
 
-                mappings.remove(mappingKey);
+                mappings.remove(nodeId);
             }
 
             checkComplete();
@@ -780,11 +763,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
             if (op != TRANSFORM)
                 val = cctx.toCacheObject(val);
 
-            int part = cctx.affinity().partition(cacheKey);
-            ClusterNode primary = cctx.affinity().primary(part, topVer);
-
-            if (!ccfg.isAtomicOrderedUpdates())
-                part = -1;
+            ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
 
             if (primary == null) {
                 onDone(new ClusterTopologyServerNotFoundException("Failed to 
map keys for cache (all partition nodes " +
@@ -810,8 +789,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                 subjId,
                 taskNameHash,
                 skipStore,
-                cctx.kernalContext().clientNode(),
-                part);
+                cctx.kernalContext().clientNode());
 
             req.addUpdateEntry(cacheKey,
                 val,
@@ -827,7 +805,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
             }
 
             // Optimize mapping for single key.
-            mapSingle(new GridAtomicMappingKey(primary.id(), part), req);
+            mapSingle(primary.id(), req);
 
             return;
         }
@@ -847,18 +825,13 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
         if (conflictRmvVals != null)
             conflictRmvValsIt = conflictRmvVals.iterator();
 
-        Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> pendingMappings 
= new HashMap<>(topNodes.size(), 1.0f);
+        Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = new 
HashMap<>(topNodes.size(), 1.0f);
 
         // Must do this in synchronized block because we need to atomically 
remove and add mapping.
         // Otherwise checkComplete() may see empty intermediate state.
         synchronized (this) {
-            if (oldNodeId != null) {
-                // TODO: IGNITE-104 - Try to avoid iteration.
-                for (Map.Entry<GridAtomicMappingKey, 
GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
-                    if (e.getKey().nodeId().equals(oldNodeId))
-                        mappings.remove(e.getKey());
-                }
-            }
+            if (oldNodeId != null)
+                removeMapping(oldNodeId);
 
             // For fastMap mode wait for all responses before remapping.
             if (remap && fastMap && !mappings.isEmpty()) {
@@ -928,10 +901,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                 if (op != TRANSFORM)
                     val = cctx.toCacheObject(val);
 
-                T2<Integer, Collection<ClusterNode>> t = mapKey(cacheKey, 
topVer, fastMap);
-
-                int part = ccfg.isAtomicOrderedUpdates() ? t.get1() : -1;
-                Collection<ClusterNode> affNodes = t.get2();
+                Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, 
fastMap);
 
                 if (affNodes.isEmpty()) {
                     onDone(new ClusterTopologyServerNotFoundException("Failed 
to map keys for cache " +
@@ -952,9 +922,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
 
                     UUID nodeId = affNode.id();
 
-                    GridAtomicMappingKey mappingKey = new 
GridAtomicMappingKey(nodeId, part);
-
-                    GridNearAtomicUpdateRequest mapped = 
pendingMappings.get(mappingKey);
+                    GridNearAtomicUpdateRequest mapped = 
pendingMappings.get(nodeId);
 
                     if (mapped == null) {
                         mapped = new GridNearAtomicUpdateRequest(
@@ -974,12 +942,11 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                             subjId,
                             taskNameHash,
                             skipStore,
-                            cctx.kernalContext().clientNode(),
-                            part);
+                            cctx.kernalContext().clientNode());
 
-                        pendingMappings.put(mappingKey, mapped);
+                        pendingMappings.put(nodeId, mapped);
 
-                        GridNearAtomicUpdateRequest old = 
mappings.put(mappingKey, mapped);
+                        GridNearAtomicUpdateRequest old = mappings.put(nodeId, 
mapped);
 
                         assert old == null || (old != null && remap) :
                             "Invalid mapping state [old=" + old + ", remap=" + 
remap + ']';
@@ -997,7 +964,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
         }
 
         if ((single == null || single) && pendingMappings.size() == 1) {
-            Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> entry 
= F.first(pendingMappings.entrySet());
+            Map.Entry<UUID, GridNearAtomicUpdateRequest> entry = 
F.first(pendingMappings.entrySet());
 
             single = true;
 
@@ -1020,35 +987,31 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
      * @param fastMap Flag indicating whether mapping is performed for 
fast-circuit update.
      * @return Collection of nodes to which key is mapped.
      */
-    private T2<Integer, Collection<ClusterNode>> mapKey(
+    private Collection<ClusterNode> mapKey(
         KeyCacheObject key,
         AffinityTopologyVersion topVer,
         boolean fastMap
     ) {
         GridCacheAffinityManager affMgr = cctx.affinity();
 
-        int part = affMgr.partition(key);
-
         // If we can send updates in parallel - do it.
-        Collection<ClusterNode> nodes = fastMap ?
-            cctx.topology().nodes(part, topVer) :
-            Collections.singletonList(affMgr.primary(part, topVer));
-
-        return new T2<>(part, nodes);
+        return fastMap ?
+            cctx.topology().nodes(affMgr.partition(key), topVer) :
+            Collections.singletonList(affMgr.primary(key, topVer));
     }
 
     /**
      * Maps future to single node.
      *
-     * @param mappingKey Mapping key.
+     * @param nodeId Node ID.
      * @param req Request.
      */
-    private void mapSingle(GridAtomicMappingKey mappingKey, 
GridNearAtomicUpdateRequest req) {
-        singleNodeId = mappingKey.nodeId();
+    private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
+        singleNodeId = nodeId;
         singleReq = req;
 
-        if (cctx.localNodeId().equals(mappingKey.nodeId())) {
-            cache.updateAllAsyncInternal(mappingKey.nodeId(), req,
+        if (cctx.localNodeId().equals(nodeId)) {
+            cache.updateAllAsyncInternal(nodeId, req,
                 new CI2<GridNearAtomicUpdateRequest, 
GridNearAtomicUpdateResponse>() {
                     @Override public void apply(GridNearAtomicUpdateRequest 
req,
                         GridNearAtomicUpdateResponse res) {
@@ -1063,7 +1026,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                 if (log.isDebugEnabled())
                     log.debug("Sending near atomic update request [nodeId=" + 
req.nodeId() + ", req=" + req + ']');
 
-                sendRequest(mappingKey, req);
+                cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
                 if (syncMode == FULL_ASYNC && 
cctx.config().getAtomicWriteOrderMode() == PRIMARY)
                     onDone(new GridCacheReturn(cctx, true, null, true));
@@ -1079,37 +1042,34 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
      *
      * @param mappings Mappings to send.
      */
-    private void doUpdate(Map<GridAtomicMappingKey, 
GridNearAtomicUpdateRequest> mappings) {
+    private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
         UUID locNodeId = cctx.localNodeId();
 
-        Collection<GridNearAtomicUpdateRequest> locUpdates = null;
+        GridNearAtomicUpdateRequest locUpdate = null;
 
         // Send messages to remote nodes first, then run local update.
-        for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : 
mappings.entrySet()) {
-            GridAtomicMappingKey mappingKey = e.getKey();
-            GridNearAtomicUpdateRequest req = e.getValue();
-
+        for (GridNearAtomicUpdateRequest req : mappings.values()) {
             if (locNodeId.equals(req.nodeId())) {
-                if (locUpdates == null)
-                    locUpdates = new ArrayList<>(mappings.size());
+                assert locUpdate == null : "Cannot have more than one local 
mapping [locUpdate=" + locUpdate +
+                    ", req=" + req + ']';
 
-                locUpdates.add(req);
+                locUpdate = req;
             }
             else {
                 try {
                     if (log.isDebugEnabled())
                         log.debug("Sending near atomic update request 
[nodeId=" + req.nodeId() + ", req=" + req + ']');
 
-                    sendRequest(mappingKey, req);
+                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
                 }
-                catch (IgniteCheckedException ex) {
-                    addFailedKeys(req.keys(), ex);
+                catch (IgniteCheckedException e) {
+                    addFailedKeys(req.keys(), e);
 
-                    removeMapping(mappingKey);
+                    removeMapping(req.nodeId());
                 }
 
                 if (syncMode == PRIMARY_SYNC && !req.hasPrimary())
-                    removeMapping(mappingKey);
+                    removeMapping(req.nodeId());
             }
         }
 
@@ -1117,52 +1077,28 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
             // In FULL_ASYNC mode always return (null, true).
             opRes = new GridCacheReturn(cctx, true, null, true);
 
-        if (locUpdates != null) {
-            for (GridNearAtomicUpdateRequest locUpdate : locUpdates) {
-                cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
-                    new CI2<GridNearAtomicUpdateRequest, 
GridNearAtomicUpdateResponse>() {
-                        @Override public void 
apply(GridNearAtomicUpdateRequest req,
-                            GridNearAtomicUpdateResponse res) {
-                            assert res.futureVersion().equals(futVer) : futVer;
+        if (locUpdate != null) {
+            cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
+                new CI2<GridNearAtomicUpdateRequest, 
GridNearAtomicUpdateResponse>() {
+                    @Override public void apply(GridNearAtomicUpdateRequest 
req,
+                        GridNearAtomicUpdateResponse res) {
+                        assert res.futureVersion().equals(futVer) : futVer;
 
-                            onResult(res.nodeId(), res);
-                        }
-                    });
-            }
+                        onResult(res.nodeId(), res);
+                    }
+                });
         }
 
         checkComplete();
     }
 
     /**
-     * Sends request.
-     *
-     * @param mappingKey Mapping key.
-     * @param req Update request.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private void sendRequest(GridAtomicMappingKey mappingKey, 
GridNearAtomicUpdateRequest req)
-        throws IgniteCheckedException {
-        if (mappingKey.partition() >= 0) {
-            Object topic = CU.partitionMessageTopic(cctx, 
mappingKey.partition(), true);
-
-            cctx.io().sendOrderedMessage(mappingKey.nodeId(), topic, req, 
cctx.ioPolicy(),
-                2 * cctx.gridConfig().getNetworkTimeout());
-        }
-        else {
-            assert mappingKey.partition() == -1;
-
-            cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
-        }
-    }
-
-    /**
      * Removes mapping from future mappings map.
      *
-     * @param mappingKey Mapping key.
+     * @param nodeId Node ID to remove mapping for.
      */
-    private void removeMapping(GridAtomicMappingKey mappingKey) {
-        mappings.remove(mappingKey);
+    private void removeMapping(UUID nodeId) {
+        mappings.remove(nodeId);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index b3075c4..734cf6d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -135,9 +135,6 @@ public class GridNearAtomicUpdateRequest extends 
GridCacheMessage implements Gri
     /** */
     private boolean clientReq;
 
-    /** Partition. */
-    private int part;
-
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -165,7 +162,6 @@ public class GridNearAtomicUpdateRequest extends 
GridCacheMessage implements Gri
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip write-through to a persistent storage.
      * @param clientReq Client node request flag.
-     * @param part Partition.
      */
     public GridNearAtomicUpdateRequest(
         int cacheId,
@@ -184,8 +180,7 @@ public class GridNearAtomicUpdateRequest extends 
GridCacheMessage implements Gri
         @Nullable UUID subjId,
         int taskNameHash,
         boolean skipStore,
-        boolean clientReq,
-        int part
+        boolean clientReq
     ) {
         this.cacheId = cacheId;
         this.nodeId = nodeId;
@@ -205,7 +200,6 @@ public class GridNearAtomicUpdateRequest extends 
GridCacheMessage implements Gri
         this.taskNameHash = taskNameHash;
         this.skipStore = skipStore;
         this.clientReq = clientReq;
-        this.part = part;
 
         keys = new ArrayList<>();
     }
@@ -321,13 +315,6 @@ public class GridNearAtomicUpdateRequest extends 
GridCacheMessage implements Gri
     }
 
     /**
-     * @return Partition.
-     */
-    public int partition() {
-        return part;
-    }
-
-    /**
      * @param key Key to add.
      * @param val Optional update value.
      * @param conflictTtl Conflict TTL (optional).
@@ -679,8 +666,8 @@ public class GridNearAtomicUpdateRequest extends 
GridCacheMessage implements Gri
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeInt("part", part))
-                    return false;
+//                if (!writer.writeInt("part", part))
+//                    return false;
 
                 writer.incrementState();
 
@@ -863,10 +850,10 @@ public class GridNearAtomicUpdateRequest extends 
GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 16:
-                part = reader.readInt("part");
-
-                if (!reader.isLastRead())
-                    return false;
+//                part = reader.readInt("part");
+//
+//                if (!reader.isLastRead())
+//                    return false;
 
                 reader.incrementState();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index e2d33d5..2b30536 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -92,9 +92,6 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheMessage implements Gr
     /** Near expire times. */
     private GridLongList nearExpireTimes;
 
-    /** Partition. */
-    private int part;
-
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -106,13 +103,11 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheMessage implements Gr
      * @param cacheId Cache ID.
      * @param nodeId Node ID this reply should be sent to.
      * @param futVer Future version.
-     * @param part Partition.
      */
-    public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, 
GridCacheVersion futVer, int part) {
+    public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, 
GridCacheVersion futVer) {
         this.cacheId = cacheId;
         this.nodeId = nodeId;
         this.futVer = futVer;
-        this.part = part;
     }
 
     /** {@inheritDoc} */
@@ -143,7 +138,7 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheMessage implements Gr
 
     /**
      * Sets update error.
-     * @param err
+     * @param err Exception.
      */
     public void error(IgniteCheckedException err){
         this.err = err;
@@ -193,13 +188,6 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheMessage implements Gr
     }
 
     /**
-     * @return Partition.
-     */
-    public int partition() {
-        return part;
-    }
-
-    /**
      * Adds value to be put in near cache on originating node.
      *
      * @param keyIdx Key index.
@@ -497,8 +485,8 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheMessage implements Gr
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeInt("part", part))
-                    return false;
+//                if (!writer.writeInt("part", part))
+//                    return false;
 
                 writer.incrementState();
 
@@ -603,10 +591,10 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 12:
-                part = reader.readInt("part");
-
-                if (!reader.isLastRead())
-                    return false;
+//                part = reader.readInt("part");
+//
+//                if (!reader.isLastRead())
+//                    return false;
 
                 reader.incrementState();
 

Reply via email to