Repository: incubator-ignite Updated Branches: refs/heads/ignite-104 4864d4c00 -> 825fbc9e4
IGNITE-104 - Ordered ATOMIC updates Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e18872c7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e18872c7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e18872c7 Branch: refs/heads/ignite-104 Commit: e18872c7e1ee85bc5cc04df705458d5758975afe Parents: 4864d4c Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Tue Jul 21 19:15:56 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Tue Jul 21 19:15:56 2015 -0700 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 3 +- .../org/apache/ignite/internal/GridTopic.java | 35 +++++++++++++++----- .../processors/cache/GridCacheUtils.java | 4 +-- .../dht/atomic/GridDhtAtomicCache.java | 4 +-- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 2 +- 6 files changed, 35 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e18872c7/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 83847dc..ff32551 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -220,7 +220,8 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { private CacheAtomicWriteOrderMode atomicWriteOrderMode; /** Ordered updates mode. */ - private boolean atomicOrderedUpdates; + // TODO: IGNITE-104 - Switch default to false + private boolean atomicOrderedUpdates = true; /** Number of backups for cache. */ private int backups = DFLT_BACKUPS; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e18872c7/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index 1ed8725..3cf92f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -185,10 +185,11 @@ public enum GridTopic { /** * @param id1 ID1. * @param id2 ID2. + * @param id3 ID3. * @return Grid message topic with specified IDs. */ - public Object topic(int id1, int id2) { - return new T9(this, id1, id2); + public Object topic(int id1, int id2, byte id3) { + return new T9(this, id1, id2, id3); } /** @@ -577,7 +578,7 @@ public enum GridTopic { /** {@inheritDoc} */ @Override public int hashCode() { - return topic.ordinal() + id1.hashCode () + (int)(id2 ^ (id2 >>> 32)); + return topic.ordinal() + id1.hashCode() + (int)(id2 ^ (id2 >>> 32)); } /** {@inheritDoc} */ @@ -781,6 +782,9 @@ public enum GridTopic { /** */ private int id2; + /** */ + private int id3; + /** * No-arg constructor needed for {@link Serializable}. */ @@ -792,16 +796,24 @@ public enum GridTopic { * @param topic Topic. * @param id1 ID1. * @param id2 ID2. + * @param id3 ID3. */ - private T9(GridTopic topic, int id1, int id2) { + private T9(GridTopic topic, int id1, int id2, byte id3) { this.topic = topic; this.id1 = id1; this.id2 = id2; + this.id3 = id3; } /** {@inheritDoc} */ @Override public int hashCode() { - return topic.ordinal() + 31 * id1 + 31 * id2; + int res = topic.ordinal(); + + res += 31 * res + id1; + res += 31 * res + id2; + res += 31 * res + id3; + + return res; } /** {@inheritDoc} */ @@ -809,7 +821,7 @@ public enum GridTopic { if (obj.getClass() == T9.class) { T9 that = (T9)obj; - return topic == that.topic && id1 == that.id1 && id2 == that.id2; + return topic == that.topic && id1 == that.id1 && id2 == that.id2 && id3 == that.id3; } return false; @@ -820,13 +832,20 @@ public enum GridTopic { out.writeByte(topic.ordinal()); out.writeInt(id1); out.writeInt(id2); + out.writeByte(id3); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { topic = fromOrdinal(in.readByte()); - id1 = in.readByte(); - id2 = in.readByte(); + id1 = in.readInt(); + id2 = in.readInt(); + id3 = in.readByte(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(T9.class, this); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e18872c7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index b0edc3c..53ec67a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1755,9 +1755,9 @@ public class GridCacheUtils { * @param part Partition. * @return Per-partition message topic. */ - public static Object partitionMassageTopic(GridCacheContext ctx, int part) { + public static Object partitionMessageTopic(GridCacheContext ctx, int part, boolean nearMsg) { assert part >= 0; - return TOPIC_CACHE.topic(ctx.cacheId(), part); + return TOPIC_CACHE.topic(ctx.cacheId(), part, (byte)(nearMsg ? 1 : 0)); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e18872c7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 38073f3..bdaa994 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -183,7 +183,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (ctx.config().isAtomicOrderedUpdates()) { for (int part = 0; part < ctx.affinity().partitions(); part++) { - ctx.io().addOrderedHandler(CU.partitionMassageTopic(ctx, part), new CI2<UUID, GridNearAtomicUpdateRequest>() { + ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part, true), new CI2<UUID, GridNearAtomicUpdateRequest>() { @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) { processNearAtomicUpdateRequest(nodeId, req); } @@ -206,7 +206,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (ctx.config().isAtomicOrderedUpdates()) { for (int part = 0; part < ctx.affinity().partitions(); part++) { - ctx.io().addOrderedHandler(CU.partitionMassageTopic(ctx, part), new CI2<UUID, GridDhtAtomicUpdateRequest>() { + ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part, false), new CI2<UUID, GridDhtAtomicUpdateRequest>() { @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) { processDhtAtomicUpdateRequest(nodeId, req); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e18872c7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index f7e574d..80c97d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -350,7 +350,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); if (mappingKey.part >= 0) { - Object topic = CU.partitionMassageTopic(cctx, mappingKey.part); + Object topic = CU.partitionMessageTopic(cctx, mappingKey.part, false); cctx.io().sendOrderedMessage(mappingKey.nodeId, topic, req, cctx.ioPolicy(), 0); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e18872c7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 63818f2..9e46806 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -1143,7 +1143,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> */ private void sendRequest(MappingKey mappingKey, GridNearAtomicUpdateRequest req) throws IgniteCheckedException { if (mappingKey.part >= 0) { - Object topic = CU.partitionMassageTopic(cctx, mappingKey.part); + Object topic = CU.partitionMessageTopic(cctx, mappingKey.part, true); cctx.io().sendOrderedMessage(mappingKey.nodeId, topic, req, cctx.ioPolicy(), 0); }