IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e5c69b83 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e5c69b83 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e5c69b83 Branch: refs/heads/ignite-426 Commit: e5c69b831a8f564440bd0960cc2a865cd907525a Parents: 424ab07 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Wed Jul 29 14:19:24 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Wed Jul 29 14:19:24 2015 -0700 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 9 ++++-- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 33 +++++++++++++++----- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 7 ++++- 3 files changed, 37 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5c69b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 7a8cc06..02e48df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1178,6 +1178,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { e.printStackTrace(); } finally { + if (dhtFut != null && !remap) + dhtFut.map(); + if (locked != null) unlockEntries(locked, req.topologyVersion()); @@ -1221,8 +1224,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { else { // If there are backups, map backup update future. if (dhtFut != null) - dhtFut.map(); - // Otherwise, complete the call. + dhtFut.onMapped(); + // Otherwise, complete the call. else completionCb.apply(req, res); } @@ -2523,7 +2526,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } catch (ClusterTopologyCheckedException ignored) { U.warn(log, "Failed to send DHT atomic update response to node because it left grid: " + - req.nodeId()); + nodeId); } catch (IgniteCheckedException e) { U.error(log, "Failed to send DHT atomic update response (did node leave grid?) [nodeId=" + nodeId + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5c69b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 3a68263..15ec121 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -90,6 +90,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** */ private boolean waitForExchange; + /** */ + private boolean mapped; + /** * @param cctx Cache context. * @param completionCb Callback to invoke when future is completed. @@ -349,37 +352,51 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> GridAtomicMappingKey mappingKey = e.getKey(); GridDhtAtomicUpdateRequest req = e.getValue(); + UUID nodeId = mappingKey.nodeId(); + int part = mappingKey.partition(); + + assert !nodeId.equals(cctx.localNodeId()); + try { if (log.isDebugEnabled()) - log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); + log.debug("Sending DHT atomic update request [nodeId=" + nodeId + ", req=" + req + ']'); - if (mappingKey.partition() >= 0) { - Object topic = CU.partitionMessageTopic(cctx, mappingKey.partition(), false); + if (part >= 0) { + Object topic = CU.partitionMessageTopic(cctx, part, false); - cctx.io().sendOrderedMessage(mappingKey.nodeId(), topic, req, cctx.ioPolicy(), + cctx.io().sendOrderedMessage(nodeId, topic, req, cctx.ioPolicy(), 2 * cctx.gridConfig().getNetworkTimeout()); } else { - assert mappingKey.partition() == -1; + assert part == -1; - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + cctx.io().send(nodeId, req, cctx.ioPolicy()); } } catch (ClusterTopologyCheckedException ignored) { U.warn(log, "Failed to send update request to backup node because it left grid: " + - req.nodeId()); + nodeId); mappings.remove(mappingKey); } catch (IgniteCheckedException ex) { U.error(log, "Failed to send update request to backup node (did node leave the grid?): " - + req.nodeId(), ex); + + nodeId, ex); mappings.remove(mappingKey); } } } + mapped = true; + } + + /** + * On mapped callback. + */ + public void onMapped() { + assert mapped; + checkComplete(); // Send response right away if no ACKs from backup is required. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5c69b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 35c6910..7149dec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -44,7 +44,12 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** Message index. */ public static final int CACHE_MSG_IDX = nextIndexId(); - /** Node ID. */ + /** + * Node ID. + * + * @deprecated Not used anymore, but removal will break compatibility. + */ + @Deprecated private UUID nodeId; /** Future version. */