Repository: incubator-ignite Updated Branches: refs/heads/ignite-104 [created] 4f14522ab
IGNITE-104 - Ordered ATOMIC updates Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4f14522a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4f14522a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4f14522a Branch: refs/heads/ignite-104 Commit: 4f14522ab7b92b38810fd24ec15bcb094f480d08 Parents: 73a2b14 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Tue Jul 21 16:55:32 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Tue Jul 21 16:55:32 2015 -0700 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 21 ++ .../org/apache/ignite/internal/GridTopic.java | 73 +++++++ .../processors/cache/GridCacheIoManager.java | 32 ++- .../processors/cache/GridCacheUtils.java | 12 +- .../dht/atomic/GridDhtAtomicCache.java | 60 +++-- .../GridDhtAtomicDeferredUpdateResponse.java | 16 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 129 +++++++++-- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 15 +- .../dht/atomic/GridDhtAtomicUpdateResponse.java | 17 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 219 ++++++++++++++----- .../dht/atomic/GridNearAtomicUpdateRequest.java | 15 +- .../atomic/GridNearAtomicUpdateResponse.java | 14 +- 12 files changed, 519 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 3ad0f01..83847dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -219,6 +219,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Write ordering mode. */ private CacheAtomicWriteOrderMode atomicWriteOrderMode; + /** Ordered updates mode. */ + private boolean atomicOrderedUpdates; + /** Number of backups for cache. */ private int backups = DFLT_BACKUPS; @@ -345,6 +348,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { aff = cc.getAffinity(); affMapper = cc.getAffinityMapper(); atomicityMode = cc.getAtomicityMode(); + atomicOrderedUpdates = cc.isAtomicOrderedUpdates(); atomicWriteOrderMode = cc.getAtomicWriteOrderMode(); backups = cc.getBackups(); cacheLoaderFactory = cc.getCacheLoaderFactory(); @@ -896,6 +900,23 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { } /** + * @return Ordered updates mode. + */ + public boolean isAtomicOrderedUpdates() { + return atomicOrderedUpdates; + } + + /** + * @param atomicOrderedUpdates Ordered updates mode. + * @return {@code this} for chaining. + */ + public CacheConfiguration<K, V> setAtomicOrderedUpdates(boolean atomicOrderedUpdates) { + this.atomicOrderedUpdates = atomicOrderedUpdates; + + return this; + } + + /** * Gets number of nodes used to back up single partition for {@link CacheMode#PARTITIONED} cache. * <p> * If not set, default value is {@link #DFLT_BACKUPS}. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index ba3b8b2..1ed8725 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -183,6 +183,15 @@ public enum GridTopic { } /** + * @param id1 ID1. + * @param id2 ID2. + * @return Grid message topic with specified IDs. + */ + public Object topic(int id1, int id2) { + return new T9(this, id1, id2); + } + + /** * */ private static class T1 implements Externalizable { @@ -756,4 +765,68 @@ public enum GridTopic { return S.toString(T8.class, this); } } + + /** + */ + private static class T9 implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private GridTopic topic; + + /** */ + private int id1; + + /** */ + private int id2; + + /** + * No-arg constructor needed for {@link Serializable}. + */ + public T9() { + // No-op. + } + + /** + * @param topic Topic. + * @param id1 ID1. + * @param id2 ID2. + */ + private T9(GridTopic topic, int id1, int id2) { + this.topic = topic; + this.id1 = id1; + this.id2 = id2; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return topic.ordinal() + 31 * id1 + 31 * id2; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj.getClass() == T9.class) { + T9 that = (T9)obj; + + return topic == that.topic && id1 == that.id1 && id2 == that.id2; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeByte(topic.ordinal()); + out.writeInt(id1); + out.writeInt(id2); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + topic = fromOrdinal(in.readByte()); + id1 = in.readByte(); + id2 = in.readByte(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 84e4dc2..dec6aef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -378,7 +378,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse( ctx.cacheId(), - req.futureVersion()); + req.futureVersion(), + req.partition()); res.onError(req.classError()); @@ -393,7 +394,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( ctx.cacheId(), nodeId, - req.futureVersion()); + req.futureVersion(), + req.partition()); res.error(req.classError()); @@ -745,13 +747,32 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { ClusterNode n = cctx.discovery().node(nodeId); if (n == null) - throw new ClusterTopologyCheckedException("Failed to send message because node left grid [nodeId=" + nodeId + - ", msg=" + msg + ']'); + throw new ClusterTopologyCheckedException("Failed to send message because node left grid [nodeId=" + + nodeId + ", msg=" + msg + ']'); send(n, msg, plc); } /** + * @param nodeId Destination node ID. + * @param topic Topic to send the message to. + * @param msg Message to send. + * @param plc IO policy. + * @param timeout Timeout to keep a message on receiving queue. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void sendOrderedMessage(UUID nodeId, Object topic, GridCacheMessage msg, byte plc, long timeout) + throws IgniteCheckedException { + ClusterNode n = cctx.discovery().node(nodeId); + + if (n == null) + throw new ClusterTopologyCheckedException("Failed to send message because node left grid [nodeId=" + + nodeId + ", msg=" + msg + ']'); + + sendOrderedMessage(n, topic, msg, plc, timeout); + } + + /** * @param node Destination node. * @param topic Topic to send the message to. * @param msg Message to send. @@ -779,7 +800,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } catch (IgniteCheckedException e) { if (cctx.discovery().node(node.id()) == null) - throw new ClusterTopologyCheckedException("Node left grid while sending ordered message to: " + node.id(), e); + throw new ClusterTopologyCheckedException("Node left grid while sending ordered message to: " + + node.id(), e); if (cnt == retryCnt) throw e; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index e16e30d..b0edc3c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -57,7 +57,6 @@ import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CacheRebalanceMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; import static org.apache.ignite.internal.GridTopic.*; -import static org.apache.ignite.internal.IgniteNodeAttributes.*; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*; /** @@ -1750,4 +1749,15 @@ public class GridCacheUtils { } }; } + + /** + * @param ctx Cache context. + * @param part Partition. + * @return Per-partition message topic. + */ + public static Object partitionMassageTopic(GridCacheContext ctx, int part) { + assert part >= 0; + + return TOPIC_CACHE.topic(ctx.cacheId(), part); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 0a21979..38073f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -181,11 +181,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() { - @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) { - processNearAtomicUpdateRequest(nodeId, req); + if (ctx.config().isAtomicOrderedUpdates()) { + for (int part = 0; part < ctx.affinity().partitions(); part++) { + ctx.io().addOrderedHandler(CU.partitionMassageTopic(ctx, part), new CI2<UUID, GridNearAtomicUpdateRequest>() { + @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) { + processNearAtomicUpdateRequest(nodeId, req); + } + }); } - }); + } + else { + ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() { + @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) { + processNearAtomicUpdateRequest(nodeId, req); + } + }); + } ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() { @Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) { @@ -193,11 +204,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() { - @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) { - processDhtAtomicUpdateRequest(nodeId, req); + if (ctx.config().isAtomicOrderedUpdates()) { + for (int part = 0; part < ctx.affinity().partitions(); part++) { + ctx.io().addOrderedHandler(CU.partitionMassageTopic(ctx, part), new CI2<UUID, GridDhtAtomicUpdateRequest>() { + @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) { + processDhtAtomicUpdateRequest(nodeId, req); + } + }); } - }); + } + else { + ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() { + @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) { + processDhtAtomicUpdateRequest(nodeId, req); + } + }); + } ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateResponse.class, new CI2<UUID, GridDhtAtomicUpdateResponse>() { @Override public void apply(UUID nodeId, GridDhtAtomicUpdateResponse res) { @@ -1017,7 +1039,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ) { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, - req.futureVersion()); + req.futureVersion(), + req.partition()); List<KeyCacheObject> keys = req.keys(); @@ -2389,7 +2412,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheVersion ver = req.writeVersion(); // Always send update reply. - GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion()); + GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse( + ctx.cacheId(), req.futureVersion(), req.partition()); Boolean replicate = ctx.isDrEnabled(); @@ -2477,7 +2501,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.io().send(nodeId, res, ctx.ioPolicy()); else { // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response. - sendDeferredUpdateResponse(nodeId, req.futureVersion()); + sendDeferredUpdateResponse(nodeId, req.futureVersion(), req.partition()); } } catch (ClusterTopologyCheckedException ignored) { @@ -2494,7 +2518,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param nodeId Node ID to send message to. * @param ver Version to ack. */ - private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver) { + private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver, int part) { while (true) { DeferredResponseBuffer buf = pendingResponses.get(nodeId); @@ -2511,7 +2535,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { buf = old; } - if (!buf.addResponse(ver)) + if (!buf.addResponse(ver, part)) // Some thread is sending filled up buffer, we can remove it. pendingResponses.remove(nodeId, buf); else @@ -2551,7 +2575,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver); if (updateFut != null) - updateFut.onResult(nodeId); + updateFut.onResult(nodeId, res); else U.warn(log, "Failed to find DHT update future for deferred update response [nodeId=" + nodeId + ", ver=" + ver + ", res=" + res + ']'); @@ -2751,6 +2775,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** Response versions. */ private Collection<GridCacheVersion> respVers = new ConcurrentLinkedDeque8<>(); + /** Response partitions. */ + private Collection<Integer> respParts = new GridConcurrentHashSet<>(); + /** Node ID. */ private final UUID nodeId; @@ -2805,7 +2832,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param ver Version to send. * @return {@code True} if response was handled, {@code false} if this buffer is filled and cannot be used. */ - public boolean addResponse(GridCacheVersion ver) { + public boolean addResponse(GridCacheVersion ver, int part) { readLock().lock(); boolean snd = false; @@ -2815,6 +2842,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return false; respVers.add(ver); + respParts.add(part); if (respVers.size() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && guard.compareAndSet(false, true)) snd = true; @@ -2845,7 +2873,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ private void finish() { GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(), - respVers); + respVers, respParts); try { ctx.kernalContext().gateway().readLock(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java index 1163761..e203b75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java @@ -41,6 +41,10 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem @GridDirectCollection(GridCacheVersion.class) private Collection<GridCacheVersion> futVers; + /** Partitions. */ + @GridDirectCollection(int.class) + private Collection<Integer> parts; + /** {@inheritDoc} */ @Override public int lookupIndex() { return CACHE_MSG_IDX; @@ -57,12 +61,15 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem * Constructor. * * @param futVers Future versions. + * @param parts Partitions. */ - public GridDhtAtomicDeferredUpdateResponse(int cacheId, Collection<GridCacheVersion> futVers) { + public GridDhtAtomicDeferredUpdateResponse(int cacheId, Collection<GridCacheVersion> futVers, + Collection<Integer> parts) { assert !F.isEmpty(futVers); this.cacheId = cacheId; this.futVers = futVers; + this.parts = parts; } /** @@ -72,6 +79,13 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem return futVers; } + /** + * @return Partitions. + */ + public Collection<Integer> partitions() { + return parts; + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 4b1a58f..f7e574d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -73,7 +73,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** Mappings. */ @GridToStringInclude - private ConcurrentMap<UUID, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>(); + private ConcurrentMap<MappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>(); /** Entries with readers. */ private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries; @@ -135,7 +135,11 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** {@inheritDoc} */ @Override public Collection<? extends ClusterNode> nodes() { - return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull()); + return F.view(F.viewReadOnly(mappings.keySet(), new C1<MappingKey, ClusterNode>() { + @Override public ClusterNode apply(MappingKey mappingKey) { + return cctx.kernalContext().discovery().node(mappingKey.nodeId); + } + }), F.notNull()); } /** {@inheritDoc} */ @@ -143,11 +147,16 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> if (log.isDebugEnabled()) log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']'); - GridDhtAtomicUpdateRequest req = mappings.get(nodeId); + Collection<MappingKey> mappingKeys = new ArrayList<>(mappings.size()); + + for (MappingKey mappingKey : mappings.keySet()) { + if (mappingKey.nodeId.equals(nodeId)) + mappingKeys.add(mappingKey); + } - if (req != null) { - // Remove only after added keys to failed set. - mappings.remove(nodeId); + if (!mappingKeys.isEmpty()) { + for (MappingKey mappingKey : mappingKeys) + mappings.remove(mappingKey); checkComplete(); @@ -201,7 +210,12 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> @Nullable GridCacheVersion conflictVer) { AffinityTopologyVersion topVer = updateReq.topologyVersion(); - Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer); + int part = entry.partition(); + + Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(part, topVer); + + if (!cctx.config().isAtomicOrderedUpdates()) + part = -1; if (log.isDebugEnabled()) log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']'); @@ -213,8 +227,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> for (ClusterNode node : dhtNodes) { UUID nodeId = node.id(); + MappingKey mappingKey = new MappingKey(nodeId, part); + if (!nodeId.equals(cctx.localNodeId())) { - GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId); + GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey); if (updateReq == null) { updateReq = new GridDhtAtomicUpdateRequest( @@ -227,9 +243,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> forceTransformBackups, this.updateReq.subjectId(), this.updateReq.taskNameHash(), - forceTransformBackups ? this.updateReq.invokeArguments() : null); + forceTransformBackups ? this.updateReq.invokeArguments() : null, + part); - mappings.put(nodeId, updateReq); + mappings.put(mappingKey, updateReq); } updateReq.addWriteValue(entry.key(), @@ -262,8 +279,12 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> AffinityTopologyVersion topVer = updateReq.topologyVersion(); + int part = cctx.config().isAtomicOrderedUpdates() ? entry.partition() : -1; + for (UUID nodeId : readers) { - GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId); + MappingKey mappingKey = new MappingKey(nodeId, part); + + GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey); if (updateReq == null) { ClusterNode node = cctx.discovery().node(nodeId); @@ -282,9 +303,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> forceTransformBackups, this.updateReq.subjectId(), this.updateReq.taskNameHash(), - forceTransformBackups ? this.updateReq.invokeArguments() : null); + forceTransformBackups ? this.updateReq.invokeArguments() : null, + part); - mappings.put(nodeId, updateReq); + mappings.put(mappingKey, updateReq); } if (nearReadersEntries == null) @@ -319,24 +341,36 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> */ public void map() { if (!mappings.isEmpty()) { - for (GridDhtAtomicUpdateRequest req : mappings.values()) { + for (Map.Entry<MappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) { + MappingKey mappingKey = e.getKey(); + GridDhtAtomicUpdateRequest req = e.getValue(); + try { if (log.isDebugEnabled()) log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + if (mappingKey.part >= 0) { + Object topic = CU.partitionMassageTopic(cctx, mappingKey.part); + + cctx.io().sendOrderedMessage(mappingKey.nodeId, topic, req, cctx.ioPolicy(), 0); + } + else { + assert mappingKey.part == -1; + + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + } } catch (ClusterTopologyCheckedException ignored) { U.warn(log, "Failed to send update request to backup node because it left grid: " + req.nodeId()); - mappings.remove(req.nodeId()); + mappings.remove(mappingKey); } - catch (IgniteCheckedException e) { + catch (IgniteCheckedException ex) { U.error(log, "Failed to send update request to backup node (did node leave the grid?): " - + req.nodeId(), e); + + req.nodeId(), ex); - mappings.remove(req.nodeId()); + mappings.remove(mappingKey); } } } @@ -376,7 +410,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> } } - mappings.remove(nodeId); + mappings.remove(new MappingKey(nodeId, updateRes.partition())); checkComplete(); } @@ -385,12 +419,14 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> * Deferred update response. * * @param nodeId Backup node ID. + * @param res Response. */ - public void onResult(UUID nodeId) { + public void onResult(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) { if (log.isDebugEnabled()) log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']'); - mappings.remove(nodeId); + for (Integer part : res.partitions()) + mappings.remove(new MappingKey(nodeId, part)); checkComplete(); } @@ -412,4 +448,53 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> @Override public String toString() { return S.toString(GridDhtAtomicUpdateFuture.class, this); } + + /** + */ + private static class MappingKey { + /** Node ID. */ + private final UUID nodeId; + + /** Partition. */ + private final int part; + + /** + * @param nodeId Node ID. + * @param part Partition. + */ + private MappingKey(UUID nodeId, int part) { + assert nodeId != null; + assert part >= -1 : part; + + this.nodeId = nodeId; + this.part = part; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + MappingKey key = (MappingKey)o; + + return nodeId.equals(key.nodeId) && part == key.part; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = nodeId.hashCode(); + + res = 31 * res + part; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MappingKey.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index f83b8fa..031edb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -127,6 +127,9 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** Task name hash. */ private int taskNameHash; + /** Partition. */ + private int part; + /** * Empty constructor required by {@link Externalizable}. */ @@ -147,6 +150,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid * @param forceTransformBackups Force transform backups flag. * @param subjId Subject ID. * @param taskNameHash Task name hash code. + * @param part Partition. */ public GridDhtAtomicUpdateRequest( int cacheId, @@ -158,7 +162,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid boolean forceTransformBackups, UUID subjId, int taskNameHash, - Object[] invokeArgs + Object[] invokeArgs, + int part ) { assert invokeArgs == null || forceTransformBackups; @@ -172,6 +177,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid this.subjId = subjId; this.taskNameHash = taskNameHash; this.invokeArgs = invokeArgs; + this.part = part; keys = new ArrayList<>(); @@ -318,6 +324,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid } /** + * @return Partition. + */ + public int partition() { + return part; + } + + /** * @return Node ID. */ public UUID nodeId() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java index c5b5a37..509a918 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java @@ -59,6 +59,9 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri @GridDirectCollection(KeyCacheObject.class) private List<KeyCacheObject> nearEvicted; + /** Partition. */ + private int part; + /** * Empty constructor required by {@link Externalizable}. */ @@ -69,10 +72,12 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri /** * @param cacheId Cache ID. * @param futVer Future version. + * @param part Partition. */ - public GridDhtAtomicUpdateResponse(int cacheId, GridCacheVersion futVer) { + public GridDhtAtomicUpdateResponse(int cacheId, GridCacheVersion futVer, int part) { this.cacheId = cacheId; this.futVer = futVer; + this.part = part; } /** {@inheritDoc} */ @@ -89,7 +94,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri /** * Sets update error. - * @param err + * + * @param err Error. */ public void onError(IgniteCheckedException err){ this.err = err; @@ -110,6 +116,13 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri } /** + * @return Partition. + */ + public int partition() { + return part; + } + + /** * Adds key to collection of failed keys. * * @param key Key to add. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 4c8a161..63818f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -90,7 +90,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** Mappings. */ @GridToStringInclude - private ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings; + private ConcurrentMap<MappingKey, GridNearAtomicUpdateRequest> mappings; /** Error. */ private volatile CachePartialUpdateCheckedException err; @@ -246,7 +246,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** {@inheritDoc} */ @Override public Collection<? extends ClusterNode> nodes() { - return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull()); + return F.view(F.viewReadOnly(mappings.keySet(), new C1<MappingKey, ClusterNode>() { + @Override public ClusterNode apply(MappingKey mappingKey) { + return cctx.kernalContext().discovery().node(mappingKey.nodeId); + } + }), F.notNull()); } /** @@ -283,13 +287,24 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return false; } - GridNearAtomicUpdateRequest req = mappings.get(nodeId); + Collection<MappingKey> mappingKeys = new ArrayList<>(mappings.size()); + Collection<KeyCacheObject> failedKeys = new ArrayList<>(); + + for (Map.Entry<MappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) { + if (e.getKey().nodeId.equals(nodeId)) { + mappingKeys.add(e.getKey()); - if (req != null) { - addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before response is " + - "received: " + nodeId)); + failedKeys.addAll(e.getValue().keys()); + } + } - mappings.remove(nodeId); + if (!mappingKeys.isEmpty()) { + if (!failedKeys.isEmpty()) + addFailedKeys(failedKeys, new ClusterTopologyCheckedException("Primary node left grid before " + + "response is received: " + nodeId)); + + for (MappingKey key : mappingKeys) + mappings.remove(key); checkComplete(); @@ -529,7 +544,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } } else { - GridNearAtomicUpdateRequest req = mappings.get(nodeId); + MappingKey mappingKey = new MappingKey(nodeId, res.partition()); + + GridNearAtomicUpdateRequest req = mappings.get(mappingKey); if (req != null) { // req can be null if onResult is being processed concurrently with onNodeLeft. updateNear(req, res); @@ -547,7 +564,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> opRes = ret; } - mappings.remove(nodeId); + mappings.remove(mappingKey); } checkComplete(); @@ -763,7 +780,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (op != TRANSFORM) val = cctx.toCacheObject(val); - ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); + int part = cctx.affinity().partition(cacheKey); + ClusterNode primary = cctx.affinity().primary(part, topVer); + + if (!ccfg.isAtomicOrderedUpdates()) + part = -1; if (primary == null) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + @@ -789,7 +810,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> subjId, taskNameHash, skipStore, - cctx.kernalContext().clientNode()); + cctx.kernalContext().clientNode(), + part); req.addUpdateEntry(cacheKey, val, @@ -805,7 +827,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } // Optimize mapping for single key. - mapSingle(primary.id(), req); + mapSingle(new MappingKey(primary.id(), part), req); return; } @@ -825,13 +847,18 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (conflictRmvVals != null) conflictRmvValsIt = conflictRmvVals.iterator(); - Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f); + Map<MappingKey, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f); // Must do this in synchronized block because we need to atomically remove and add mapping. // Otherwise checkComplete() may see empty intermediate state. synchronized (this) { - if (oldNodeId != null) - removeMapping(oldNodeId); + if (oldNodeId != null) { + // TODO: IGNITE-104 - Try to avoid iteration. + for (Map.Entry<MappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) { + if (e.getKey().nodeId.equals(oldNodeId)) + mappings.remove(e.getKey()); + } + } // For fastMap mode wait for all responses before remapping. if (remap && fastMap && !mappings.isEmpty()) { @@ -901,7 +928,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (op != TRANSFORM) val = cctx.toCacheObject(val); - Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap); + T2<Integer, Collection<ClusterNode>> t = mapKey(cacheKey, topVer, fastMap); + + int part = ccfg.isAtomicOrderedUpdates() ? t.get1() : -1; + Collection<ClusterNode> affNodes = t.get2(); if (affNodes.isEmpty()) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + @@ -922,7 +952,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> UUID nodeId = affNode.id(); - GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId); + MappingKey mappingKey = new MappingKey(nodeId, part); + + GridNearAtomicUpdateRequest mapped = pendingMappings.get(mappingKey); if (mapped == null) { mapped = new GridNearAtomicUpdateRequest( @@ -942,11 +974,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> subjId, taskNameHash, skipStore, - cctx.kernalContext().clientNode()); + cctx.kernalContext().clientNode(), + part); - pendingMappings.put(nodeId, mapped); + pendingMappings.put(mappingKey, mapped); - GridNearAtomicUpdateRequest old = mappings.put(nodeId, mapped); + GridNearAtomicUpdateRequest old = mappings.put(mappingKey, mapped); assert old == null || (old != null && remap) : "Invalid mapping state [old=" + old + ", remap=" + remap + ']'; @@ -964,7 +997,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } if ((single == null || single) && pendingMappings.size() == 1) { - Map.Entry<UUID, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet()); + Map.Entry<MappingKey, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet()); single = true; @@ -987,31 +1020,35 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param fastMap Flag indicating whether mapping is performed for fast-circuit update. * @return Collection of nodes to which key is mapped. */ - private Collection<ClusterNode> mapKey( + private T2<Integer, Collection<ClusterNode>> mapKey( KeyCacheObject key, AffinityTopologyVersion topVer, boolean fastMap ) { GridCacheAffinityManager affMgr = cctx.affinity(); + int part = affMgr.partition(key); + // If we can send updates in parallel - do it. - return fastMap ? - cctx.topology().nodes(affMgr.partition(key), topVer) : - Collections.singletonList(affMgr.primary(key, topVer)); + Collection<ClusterNode> nodes = fastMap ? + cctx.topology().nodes(part, topVer) : + Collections.singletonList(affMgr.primary(part, topVer)); + + return new T2<>(part, nodes); } /** * Maps future to single node. * - * @param nodeId Node ID. + * @param mappingKey Mapping key. * @param req Request. */ - private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { - singleNodeId = nodeId; + private void mapSingle(MappingKey mappingKey, GridNearAtomicUpdateRequest req) { + singleNodeId = mappingKey.nodeId; singleReq = req; - if (cctx.localNodeId().equals(nodeId)) { - cache.updateAllAsyncInternal(nodeId, req, + if (cctx.localNodeId().equals(mappingKey.nodeId)) { + cache.updateAllAsyncInternal(mappingKey.nodeId, req, new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { @@ -1026,7 +1063,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (log.isDebugEnabled()) log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + sendRequest(mappingKey, req); if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) onDone(new GridCacheReturn(cctx, true, null, true)); @@ -1042,34 +1079,37 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * * @param mappings Mappings to send. */ - private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) { + private void doUpdate(Map<MappingKey, GridNearAtomicUpdateRequest> mappings) { UUID locNodeId = cctx.localNodeId(); - GridNearAtomicUpdateRequest locUpdate = null; + Collection<GridNearAtomicUpdateRequest> locUpdates = null; // Send messages to remote nodes first, then run local update. - for (GridNearAtomicUpdateRequest req : mappings.values()) { + for (Map.Entry<MappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) { + MappingKey mappingKey = e.getKey(); + GridNearAtomicUpdateRequest req = e.getValue(); + if (locNodeId.equals(req.nodeId())) { - assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate + - ", req=" + req + ']'; + if (locUpdates == null) + locUpdates = new ArrayList<>(mappings.size()); - locUpdate = req; + locUpdates.add(req); } else { try { if (log.isDebugEnabled()) log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + sendRequest(mappingKey, req); } - catch (IgniteCheckedException e) { - addFailedKeys(req.keys(), e); + catch (IgniteCheckedException ex) { + addFailedKeys(req.keys(), ex); - removeMapping(req.nodeId()); + removeMapping(mappingKey); } if (syncMode == PRIMARY_SYNC && !req.hasPrimary()) - removeMapping(req.nodeId()); + removeMapping(mappingKey); } } @@ -1077,28 +1117,50 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> // In FULL_ASYNC mode always return (null, true). opRes = new GridCacheReturn(cctx, true, null, true); - if (locUpdate != null) { - cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, - new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { - @Override public void apply(GridNearAtomicUpdateRequest req, - GridNearAtomicUpdateResponse res) { - assert res.futureVersion().equals(futVer) : futVer; + if (locUpdates != null) { + for (GridNearAtomicUpdateRequest locUpdate : locUpdates) { + cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, + new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { + @Override public void apply(GridNearAtomicUpdateRequest req, + GridNearAtomicUpdateResponse res) { + assert res.futureVersion().equals(futVer) : futVer; - onResult(res.nodeId(), res); - } - }); + onResult(res.nodeId(), res); + } + }); + } } checkComplete(); } /** + * Sends request. + * + * @param mappingKey Mapping key. + * @param req Update request. + * @throws IgniteCheckedException In case of error. + */ + private void sendRequest(MappingKey mappingKey, GridNearAtomicUpdateRequest req) throws IgniteCheckedException { + if (mappingKey.part >= 0) { + Object topic = CU.partitionMassageTopic(cctx, mappingKey.part); + + cctx.io().sendOrderedMessage(mappingKey.nodeId, topic, req, cctx.ioPolicy(), 0); + } + else { + assert mappingKey.part == -1; + + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + } + } + + /** * Removes mapping from future mappings map. * - * @param nodeId Node ID to remove mapping for. + * @param mappingKey Mapping key. */ - private void removeMapping(UUID nodeId) { - mappings.remove(nodeId); + private void removeMapping(MappingKey mappingKey) { + mappings.remove(mappingKey); } /** @@ -1142,4 +1204,53 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> public String toString() { return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString()); } + + /** + */ + private static class MappingKey { + /** Node ID. */ + private final UUID nodeId; + + /** Partition. */ + private final int part; + + /** + * @param nodeId Node ID. + * @param part Partition. + */ + private MappingKey(UUID nodeId, int part) { + assert nodeId != null; + assert part >= -1 : part; + + this.nodeId = nodeId; + this.part = part; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + MappingKey key = (MappingKey)o; + + return nodeId.equals(key.nodeId) && part == key.part; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = nodeId.hashCode(); + + res = 31 * res + part; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MappingKey.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index 86c5ab8..93429c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -135,6 +135,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** */ private boolean clientReq; + /** Partition. */ + private int part; + /** * Empty constructor required by {@link Externalizable}. */ @@ -162,6 +165,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri * @param taskNameHash Task name hash code. * @param skipStore Skip write-through to a persistent storage. * @param clientReq Client node request flag. + * @param part Partition. */ public GridNearAtomicUpdateRequest( int cacheId, @@ -180,7 +184,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri @Nullable UUID subjId, int taskNameHash, boolean skipStore, - boolean clientReq + boolean clientReq, + int part ) { this.cacheId = cacheId; this.nodeId = nodeId; @@ -200,6 +205,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri this.taskNameHash = taskNameHash; this.skipStore = skipStore; this.clientReq = clientReq; + this.part = part; keys = new ArrayList<>(); } @@ -315,6 +321,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } /** + * @return Partition. + */ + public int partition() { + return part; + } + + /** * @param key Key to add. * @param val Optional update value. * @param conflictTtl Conflict TTL (optional). http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 330e43c..404670a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -92,6 +92,9 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** Near expire times. */ private GridLongList nearExpireTimes; + /** Partition. */ + private int part; + /** * Empty constructor required by {@link Externalizable}. */ @@ -103,11 +106,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr * @param cacheId Cache ID. * @param nodeId Node ID this reply should be sent to. * @param futVer Future version. + * @param part Partition. */ - public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer) { + public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, int part) { this.cacheId = cacheId; this.nodeId = nodeId; this.futVer = futVer; + this.part = part; } /** {@inheritDoc} */ @@ -188,6 +193,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr } /** + * @return Partition. + */ + public int partition() { + return part; + } + + /** * Adds value to be put in near cache on originating node. * * @param keyIdx Key index.