Repository: incubator-ignite Updated Branches: refs/heads/ignite-104 b27af71d8 -> 8e6b90cf8
IGNITE-104 - Ordered ATOMIC updates Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8e6b90cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8e6b90cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8e6b90cf Branch: refs/heads/ignite-104 Commit: 8e6b90cf835fe67778b961dccec0682f448f8e57 Parents: b27af71 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Mon Aug 3 17:57:25 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Mon Aug 3 17:57:25 2015 -0700 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 6 ++ .../dht/atomic/GridNearAtomicUpdateFuture.java | 78 +++++--------------- 2 files changed, 25 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e6b90cf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 4f1b887..a8dc8ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -134,6 +134,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { updateReplyClos = new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { @SuppressWarnings("ThrowableResultOfMethodCallIgnored") @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + if (res.nodeId().equals(locNodeId)) { + processNearAtomicUpdateResponse(res.nodeId(), res); + + return; + } + if (ctx.config().getAtomicWriteOrderMode() == CLOCK) { // Always send reply in CLOCK ordering mode. sendNearUpdateReply(res.nodeId(), res); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e6b90cf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index ff24964..5150113 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -1047,30 +1047,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> singleNodeId = mappingKey.nodeId(); singleReq = req; - if (cctx.localNodeId().equals(mappingKey.nodeId())) { - cache.updateAllAsyncInternal(mappingKey.nodeId(), req, - new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { - @Override public void apply(GridNearAtomicUpdateRequest req, - GridNearAtomicUpdateResponse res) { - assert res.futureVersion().equals(futVer) : futVer; - - onResult(res.nodeId(), res); - } - }); - } - else { - try { - if (log.isDebugEnabled()) - log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); + try { + if (log.isDebugEnabled()) + log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - sendRequest(mappingKey, req); + sendRequest(mappingKey, req); - if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) - onDone(new GridCacheReturn(cctx, true, null, true)); - } - catch (IgniteCheckedException e) { - onDone(addFailedKeys(req.keys(), e)); - } + if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) + onDone(new GridCacheReturn(cctx, true, null, true)); + } + catch (IgniteCheckedException e) { + onDone(addFailedKeys(req.keys(), e)); } } @@ -1080,57 +1067,30 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param mappings Mappings to send. */ private void doUpdate(Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings) { - UUID locNodeId = cctx.localNodeId(); - - Collection<GridNearAtomicUpdateRequest> locUpdates = null; - - // Send messages to remote nodes first, then run local update. for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) { GridAtomicMappingKey mappingKey = e.getKey(); GridNearAtomicUpdateRequest req = e.getValue(); - if (locNodeId.equals(req.nodeId())) { - if (locUpdates == null) - locUpdates = new ArrayList<>(mappings.size()); + try { + if (log.isDebugEnabled()) + log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - locUpdates.add(req); + sendRequest(mappingKey, req); } - else { - try { - if (log.isDebugEnabled()) - log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - - sendRequest(mappingKey, req); - } - catch (IgniteCheckedException ex) { - addFailedKeys(req.keys(), ex); - - removeMapping(mappingKey); - } + catch (IgniteCheckedException ex) { + addFailedKeys(req.keys(), ex); - if (syncMode == PRIMARY_SYNC && !req.hasPrimary()) - removeMapping(mappingKey); + removeMapping(mappingKey); } + + if (syncMode == PRIMARY_SYNC && !req.hasPrimary()) + removeMapping(mappingKey); } if (syncMode == FULL_ASYNC) // In FULL_ASYNC mode always return (null, true). opRes = new GridCacheReturn(cctx, true, null, true); - if (locUpdates != null) { - for (GridNearAtomicUpdateRequest locUpdate : locUpdates) { - cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, - new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { - @Override public void apply(GridNearAtomicUpdateRequest req, - GridNearAtomicUpdateResponse res) { - assert res.futureVersion().equals(futVer) : futVer; - - onResult(res.nodeId(), res); - } - }); - } - } - checkComplete(); }