IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9781ea43 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9781ea43 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9781ea43 Branch: refs/heads/ignite-104 Commit: 9781ea4384a553e5126b8a7320f7070f6a340809 Parents: 7c73fc5 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Wed Jul 29 17:57:49 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Wed Jul 29 17:57:49 2015 -0700 ---------------------------------------------------------------------- .../org/apache/ignite/internal/GridTopic.java | 17 +- .../processors/cache/GridCacheIoManager.java | 3 +- .../processors/cache/GridCacheUtils.java | 4 +- .../dht/atomic/GridDhtAtomicCache.java | 29 ++-- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 172 ++++++------------- .../dht/atomic/GridNearAtomicUpdateRequest.java | 27 +-- .../atomic/GridNearAtomicUpdateResponse.java | 28 +-- 8 files changed, 87 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index 3cf92f8..e9da40c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -185,11 +185,10 @@ public enum GridTopic { /** * @param id1 ID1. * @param id2 ID2. - * @param id3 ID3. * @return Grid message topic with specified IDs. */ - public Object topic(int id1, int id2, byte id3) { - return new T9(this, id1, id2, id3); + public Object topic(int id1, int id2) { + return new T9(this, id1, id2); } /** @@ -782,9 +781,6 @@ public enum GridTopic { /** */ private int id2; - /** */ - private int id3; - /** * No-arg constructor needed for {@link Serializable}. */ @@ -796,13 +792,11 @@ public enum GridTopic { * @param topic Topic. * @param id1 ID1. * @param id2 ID2. - * @param id3 ID3. */ - private T9(GridTopic topic, int id1, int id2, byte id3) { + private T9(GridTopic topic, int id1, int id2) { this.topic = topic; this.id1 = id1; this.id2 = id2; - this.id3 = id3; } /** {@inheritDoc} */ @@ -811,7 +805,6 @@ public enum GridTopic { res += 31 * res + id1; res += 31 * res + id2; - res += 31 * res + id3; return res; } @@ -821,7 +814,7 @@ public enum GridTopic { if (obj.getClass() == T9.class) { T9 that = (T9)obj; - return topic == that.topic && id1 == that.id1 && id2 == that.id2 && id3 == that.id3; + return topic == that.topic && id1 == that.id1 && id2 == that.id2; } return false; @@ -832,7 +825,6 @@ public enum GridTopic { out.writeByte(topic.ordinal()); out.writeInt(id1); out.writeInt(id2); - out.writeByte(id3); } /** {@inheritDoc} */ @@ -840,7 +832,6 @@ public enum GridTopic { topic = fromOrdinal(in.readByte()); id1 = in.readInt(); id2 = in.readInt(); - id3 = in.readByte(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index dec6aef..5858424 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -394,8 +394,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( ctx.cacheId(), nodeId, - req.futureVersion(), - req.partition()); + req.futureVersion()); res.error(req.classError()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 96df7c5..d82acca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1739,9 +1739,9 @@ public class GridCacheUtils { * @param part Partition. * @return Per-partition message topic. */ - public static Object partitionMessageTopic(GridCacheContext ctx, int part, boolean nearMsg) { + public static Object partitionMessageTopic(GridCacheContext ctx, int part) { assert part >= 0; - return TOPIC_CACHE.topic(ctx.cacheId(), part, (byte)(nearMsg ? 1 : 0)); + return TOPIC_CACHE.topic(ctx.cacheId(), part); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 31606b2..3084e68 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -181,15 +181,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); + ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() { + @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) { + processNearAtomicUpdateRequest(nodeId, req); + } + }); + if (ctx.config().isAtomicOrderedUpdates()) { for (int part = 0; part < ctx.affinity().partitions(); part++) { - ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part, true), new CI2<UUID, GridNearAtomicUpdateRequest>() { - @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) { - processNearAtomicUpdateRequest(nodeId, req); - } - }); - - ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part, false), new CI2<UUID, GridDhtAtomicUpdateRequest>() { + ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part), new CI2<UUID, GridDhtAtomicUpdateRequest>() { @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) { processDhtAtomicUpdateRequest(nodeId, req); } @@ -197,12 +197,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } else { - ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() { - @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) { - processNearAtomicUpdateRequest(nodeId, req); - } - }); - ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() { @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) { processDhtAtomicUpdateRequest(nodeId, req); @@ -244,10 +238,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { buf.finish(); if (ctx.config().isAtomicOrderedUpdates()) { - for (int part = 0; part < ctx.affinity().partitions(); part++) { - ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, part, true)); - ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, part, false)); - } + for (int part = 0; part < ctx.affinity().partitions(); part++) + ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, part)); } } @@ -1041,8 +1033,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ) { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, - req.futureVersion(), - req.partition()); + req.futureVersion()); List<KeyCacheObject> keys = req.keys(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index ab0c2e1..8595dc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -359,7 +359,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> log.debug("Sending DHT atomic update request [nodeId=" + nodeId + ", req=" + req + ']'); if (part >= 0) { - Object topic = CU.partitionMessageTopic(cctx, part, false); + Object topic = CU.partitionMessageTopic(cctx, part); cctx.io().sendOrderedMessage(nodeId, topic, req, cctx.ioPolicy(), 2 * cctx.gridConfig().getNetworkTimeout()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 9b2a5e2..4c8a161 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -90,7 +90,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** Mappings. */ @GridToStringInclude - private ConcurrentMap<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings; + private ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings; /** Error. */ private volatile CachePartialUpdateCheckedException err; @@ -246,11 +246,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** {@inheritDoc} */ @Override public Collection<? extends ClusterNode> nodes() { - return F.view(F.viewReadOnly(mappings.keySet(), new C1<GridAtomicMappingKey, ClusterNode>() { - @Override public ClusterNode apply(GridAtomicMappingKey mappingKey) { - return cctx.kernalContext().discovery().node(mappingKey.nodeId()); - } - }), F.notNull()); + return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull()); } /** @@ -287,24 +283,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return false; } - Collection<GridAtomicMappingKey> mappingKeys = new ArrayList<>(mappings.size()); - Collection<KeyCacheObject> failedKeys = new ArrayList<>(); - - for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) { - if (e.getKey().nodeId().equals(nodeId)) { - mappingKeys.add(e.getKey()); - - failedKeys.addAll(e.getValue().keys()); - } - } + GridNearAtomicUpdateRequest req = mappings.get(nodeId); - if (!mappingKeys.isEmpty()) { - if (!failedKeys.isEmpty()) - addFailedKeys(failedKeys, new ClusterTopologyCheckedException("Primary node left grid before " + - "response is received: " + nodeId)); + if (req != null) { + addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before response is " + + "received: " + nodeId)); - for (GridAtomicMappingKey key : mappingKeys) - mappings.remove(key); + mappings.remove(nodeId); checkComplete(); @@ -544,9 +529,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } } else { - GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, res.partition()); - - GridNearAtomicUpdateRequest req = mappings.get(mappingKey); + GridNearAtomicUpdateRequest req = mappings.get(nodeId); if (req != null) { // req can be null if onResult is being processed concurrently with onNodeLeft. updateNear(req, res); @@ -564,7 +547,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> opRes = ret; } - mappings.remove(mappingKey); + mappings.remove(nodeId); } checkComplete(); @@ -780,11 +763,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (op != TRANSFORM) val = cctx.toCacheObject(val); - int part = cctx.affinity().partition(cacheKey); - ClusterNode primary = cctx.affinity().primary(part, topVer); - - if (!ccfg.isAtomicOrderedUpdates()) - part = -1; + ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); if (primary == null) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + @@ -810,8 +789,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> subjId, taskNameHash, skipStore, - cctx.kernalContext().clientNode(), - part); + cctx.kernalContext().clientNode()); req.addUpdateEntry(cacheKey, val, @@ -827,7 +805,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } // Optimize mapping for single key. - mapSingle(new GridAtomicMappingKey(primary.id(), part), req); + mapSingle(primary.id(), req); return; } @@ -847,18 +825,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (conflictRmvVals != null) conflictRmvValsIt = conflictRmvVals.iterator(); - Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f); + Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f); // Must do this in synchronized block because we need to atomically remove and add mapping. // Otherwise checkComplete() may see empty intermediate state. synchronized (this) { - if (oldNodeId != null) { - // TODO: IGNITE-104 - Try to avoid iteration. - for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) { - if (e.getKey().nodeId().equals(oldNodeId)) - mappings.remove(e.getKey()); - } - } + if (oldNodeId != null) + removeMapping(oldNodeId); // For fastMap mode wait for all responses before remapping. if (remap && fastMap && !mappings.isEmpty()) { @@ -928,10 +901,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (op != TRANSFORM) val = cctx.toCacheObject(val); - T2<Integer, Collection<ClusterNode>> t = mapKey(cacheKey, topVer, fastMap); - - int part = ccfg.isAtomicOrderedUpdates() ? t.get1() : -1; - Collection<ClusterNode> affNodes = t.get2(); + Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap); if (affNodes.isEmpty()) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + @@ -952,9 +922,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> UUID nodeId = affNode.id(); - GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part); - - GridNearAtomicUpdateRequest mapped = pendingMappings.get(mappingKey); + GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId); if (mapped == null) { mapped = new GridNearAtomicUpdateRequest( @@ -974,12 +942,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> subjId, taskNameHash, skipStore, - cctx.kernalContext().clientNode(), - part); + cctx.kernalContext().clientNode()); - pendingMappings.put(mappingKey, mapped); + pendingMappings.put(nodeId, mapped); - GridNearAtomicUpdateRequest old = mappings.put(mappingKey, mapped); + GridNearAtomicUpdateRequest old = mappings.put(nodeId, mapped); assert old == null || (old != null && remap) : "Invalid mapping state [old=" + old + ", remap=" + remap + ']'; @@ -997,7 +964,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } if ((single == null || single) && pendingMappings.size() == 1) { - Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet()); + Map.Entry<UUID, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet()); single = true; @@ -1020,35 +987,31 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param fastMap Flag indicating whether mapping is performed for fast-circuit update. * @return Collection of nodes to which key is mapped. */ - private T2<Integer, Collection<ClusterNode>> mapKey( + private Collection<ClusterNode> mapKey( KeyCacheObject key, AffinityTopologyVersion topVer, boolean fastMap ) { GridCacheAffinityManager affMgr = cctx.affinity(); - int part = affMgr.partition(key); - // If we can send updates in parallel - do it. - Collection<ClusterNode> nodes = fastMap ? - cctx.topology().nodes(part, topVer) : - Collections.singletonList(affMgr.primary(part, topVer)); - - return new T2<>(part, nodes); + return fastMap ? + cctx.topology().nodes(affMgr.partition(key), topVer) : + Collections.singletonList(affMgr.primary(key, topVer)); } /** * Maps future to single node. * - * @param mappingKey Mapping key. + * @param nodeId Node ID. * @param req Request. */ - private void mapSingle(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req) { - singleNodeId = mappingKey.nodeId(); + private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { + singleNodeId = nodeId; singleReq = req; - if (cctx.localNodeId().equals(mappingKey.nodeId())) { - cache.updateAllAsyncInternal(mappingKey.nodeId(), req, + if (cctx.localNodeId().equals(nodeId)) { + cache.updateAllAsyncInternal(nodeId, req, new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { @@ -1063,7 +1026,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (log.isDebugEnabled()) log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - sendRequest(mappingKey, req); + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) onDone(new GridCacheReturn(cctx, true, null, true)); @@ -1079,37 +1042,34 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * * @param mappings Mappings to send. */ - private void doUpdate(Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings) { + private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) { UUID locNodeId = cctx.localNodeId(); - Collection<GridNearAtomicUpdateRequest> locUpdates = null; + GridNearAtomicUpdateRequest locUpdate = null; // Send messages to remote nodes first, then run local update. - for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) { - GridAtomicMappingKey mappingKey = e.getKey(); - GridNearAtomicUpdateRequest req = e.getValue(); - + for (GridNearAtomicUpdateRequest req : mappings.values()) { if (locNodeId.equals(req.nodeId())) { - if (locUpdates == null) - locUpdates = new ArrayList<>(mappings.size()); + assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate + + ", req=" + req + ']'; - locUpdates.add(req); + locUpdate = req; } else { try { if (log.isDebugEnabled()) log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - sendRequest(mappingKey, req); + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); } - catch (IgniteCheckedException ex) { - addFailedKeys(req.keys(), ex); + catch (IgniteCheckedException e) { + addFailedKeys(req.keys(), e); - removeMapping(mappingKey); + removeMapping(req.nodeId()); } if (syncMode == PRIMARY_SYNC && !req.hasPrimary()) - removeMapping(mappingKey); + removeMapping(req.nodeId()); } } @@ -1117,52 +1077,28 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> // In FULL_ASYNC mode always return (null, true). opRes = new GridCacheReturn(cctx, true, null, true); - if (locUpdates != null) { - for (GridNearAtomicUpdateRequest locUpdate : locUpdates) { - cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, - new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { - @Override public void apply(GridNearAtomicUpdateRequest req, - GridNearAtomicUpdateResponse res) { - assert res.futureVersion().equals(futVer) : futVer; + if (locUpdate != null) { + cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, + new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { + @Override public void apply(GridNearAtomicUpdateRequest req, + GridNearAtomicUpdateResponse res) { + assert res.futureVersion().equals(futVer) : futVer; - onResult(res.nodeId(), res); - } - }); - } + onResult(res.nodeId(), res); + } + }); } checkComplete(); } /** - * Sends request. - * - * @param mappingKey Mapping key. - * @param req Update request. - * @throws IgniteCheckedException In case of error. - */ - private void sendRequest(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req) - throws IgniteCheckedException { - if (mappingKey.partition() >= 0) { - Object topic = CU.partitionMessageTopic(cctx, mappingKey.partition(), true); - - cctx.io().sendOrderedMessage(mappingKey.nodeId(), topic, req, cctx.ioPolicy(), - 2 * cctx.gridConfig().getNetworkTimeout()); - } - else { - assert mappingKey.partition() == -1; - - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); - } - } - - /** * Removes mapping from future mappings map. * - * @param mappingKey Mapping key. + * @param nodeId Node ID to remove mapping for. */ - private void removeMapping(GridAtomicMappingKey mappingKey) { - mappings.remove(mappingKey); + private void removeMapping(UUID nodeId) { + mappings.remove(nodeId); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index b3075c4..734cf6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -135,9 +135,6 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** */ private boolean clientReq; - /** Partition. */ - private int part; - /** * Empty constructor required by {@link Externalizable}. */ @@ -165,7 +162,6 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri * @param taskNameHash Task name hash code. * @param skipStore Skip write-through to a persistent storage. * @param clientReq Client node request flag. - * @param part Partition. */ public GridNearAtomicUpdateRequest( int cacheId, @@ -184,8 +180,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri @Nullable UUID subjId, int taskNameHash, boolean skipStore, - boolean clientReq, - int part + boolean clientReq ) { this.cacheId = cacheId; this.nodeId = nodeId; @@ -205,7 +200,6 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri this.taskNameHash = taskNameHash; this.skipStore = skipStore; this.clientReq = clientReq; - this.part = part; keys = new ArrayList<>(); } @@ -321,13 +315,6 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } /** - * @return Partition. - */ - public int partition() { - return part; - } - - /** * @param key Key to add. * @param val Optional update value. * @param conflictTtl Conflict TTL (optional). @@ -679,8 +666,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri writer.incrementState(); case 16: - if (!writer.writeInt("part", part)) - return false; +// if (!writer.writeInt("part", part)) +// return false; writer.incrementState(); @@ -863,10 +850,10 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 16: - part = reader.readInt("part"); - - if (!reader.isLastRead()) - return false; +// part = reader.readInt("part"); +// +// if (!reader.isLastRead()) +// return false; reader.incrementState(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index e2d33d5..2b30536 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -92,9 +92,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** Near expire times. */ private GridLongList nearExpireTimes; - /** Partition. */ - private int part; - /** * Empty constructor required by {@link Externalizable}. */ @@ -106,13 +103,11 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr * @param cacheId Cache ID. * @param nodeId Node ID this reply should be sent to. * @param futVer Future version. - * @param part Partition. */ - public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, int part) { + public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer) { this.cacheId = cacheId; this.nodeId = nodeId; this.futVer = futVer; - this.part = part; } /** {@inheritDoc} */ @@ -143,7 +138,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** * Sets update error. - * @param err + * @param err Exception. */ public void error(IgniteCheckedException err){ this.err = err; @@ -193,13 +188,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr } /** - * @return Partition. - */ - public int partition() { - return part; - } - - /** * Adds value to be put in near cache on originating node. * * @param keyIdx Key index. @@ -497,8 +485,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr writer.incrementState(); case 12: - if (!writer.writeInt("part", part)) - return false; +// if (!writer.writeInt("part", part)) +// return false; writer.incrementState(); @@ -603,10 +591,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 12: - part = reader.readInt("part"); - - if (!reader.isLastRead()) - return false; +// part = reader.readInt("part"); +// +// if (!reader.isLastRead()) +// return false; reader.incrementState();