IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bc394436 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bc394436 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bc394436 Branch: refs/heads/ignite-426 Commit: bc394436a89c945cbca1c4f158b00f19951cf47e Parents: 6c7358d Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Wed Aug 5 17:05:35 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Wed Aug 5 17:05:35 2015 -0700 ---------------------------------------------------------------------- .../dht/atomic/GridAtomicRequestTopic.java | 44 ++++++++++++++++---- .../dht/atomic/GridDhtAtomicCache.java | 8 ++-- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 2 +- 4 files changed, 42 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc394436/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java index 9feb409..709f739 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java @@ -28,13 +28,41 @@ class GridAtomicRequestTopic implements Externalizable { private static final long serialVersionUID = 0L; /** */ + private static final byte NEAR_UPDATE_REQ = 1; + + /** */ + private static final byte DHT_UPDATE_REQ = 2; + + /** */ private int cacheId; /** */ private int part; /** */ - private boolean near; + private byte type; + + /** + * Near request topic. + * + * @param cacheId Cache ID. + * @param part Partition. + * @return Topic. + */ + static GridAtomicRequestTopic nearUpdateRequest(int cacheId, int part) { + return new GridAtomicRequestTopic(cacheId, part, NEAR_UPDATE_REQ); + } + + /** + * DHT request topic. + * + * @param cacheId Cache ID. + * @param part Partition. + * @return Topic. + */ + static GridAtomicRequestTopic dhtUpdateRequest(int cacheId, int part) { + return new GridAtomicRequestTopic(cacheId, part, DHT_UPDATE_REQ); + } /** * For {@link Externalizable}. @@ -46,12 +74,12 @@ class GridAtomicRequestTopic implements Externalizable { /** * @param cacheId Cache ID. * @param part Partition. - * @param near Near flag. + * @param type Type. */ - GridAtomicRequestTopic(int cacheId, int part, boolean near) { + private GridAtomicRequestTopic(int cacheId, int part, byte type) { this.cacheId = cacheId; this.part = part; - this.near = near; + this.type = type; } /** {@inheritDoc} */ @@ -62,7 +90,7 @@ class GridAtomicRequestTopic implements Externalizable { GridAtomicRequestTopic topic = (GridAtomicRequestTopic)o; - return cacheId == topic.cacheId && part == topic.part && near == topic.near; + return cacheId == topic.cacheId && part == topic.part && type == topic.type; } /** {@inheritDoc} */ @@ -70,7 +98,7 @@ class GridAtomicRequestTopic implements Externalizable { int res = cacheId; res = 31 * res + part; - res = 31 * res + (near ? 1 : 0); + res = 31 * res + type; return res; } @@ -79,14 +107,14 @@ class GridAtomicRequestTopic implements Externalizable { @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(cacheId); out.writeInt(part); - out.writeBoolean(near); + out.writeByte(type); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { cacheId = in.readInt(); part = in.readInt(); - near = in.readBoolean(); + type = in.readByte(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc394436/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 01694d7..6949ae2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -188,7 +188,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { }); for (int part = 0; part < ctx.affinity().partitions(); part++) { - Object nearTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, true); + Object nearTopic = GridAtomicRequestTopic.nearUpdateRequest(ctx.cacheId(), part); ctx.io().addPerTopicHandler(nearTopic, new CI2<UUID, GridNearAtomicUpdateRequest>() { @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) { @@ -196,7 +196,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - Object dhtTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, false); + Object dhtTopic = GridAtomicRequestTopic.dhtUpdateRequest(ctx.cacheId(), part); ctx.io().addPerTopicHandler(dhtTopic, new CI2<UUID, GridDhtAtomicUpdateRequest>() { @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) { @@ -239,8 +239,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { buf.finish(); for (int part = 0; part < ctx.affinity().partitions(); part++) { - ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, true)); - ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, false)); + ctx.io().removePerTopicHandler(GridAtomicRequestTopic.nearUpdateRequest(ctx.cacheId(), part)); + ctx.io().removePerTopicHandler(GridAtomicRequestTopic.dhtUpdateRequest(ctx.cacheId(), part)); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc394436/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 7823a52..63edcaa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -352,7 +352,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> log.debug("Sending DHT atomic update request [nodeId=" + nodeId + ", req=" + req + ']'); if (part >= 0) { - Object topic = new GridAtomicRequestTopic(cctx.cacheId(), part, false); + Object topic = GridAtomicRequestTopic.dhtUpdateRequest(cctx.cacheId(), part); cctx.io().sendSequentialMessage(nodeId, topic, req, cctx.ioPolicy()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc394436/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 5b364a5..66f0300 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -1091,7 +1091,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> private void sendRequest(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req) throws IgniteCheckedException { if (mappingKey.partition() >= 0) { - Object topic = new GridAtomicRequestTopic(cctx.cacheId(), mappingKey.partition(), true); + Object topic = GridAtomicRequestTopic.nearUpdateRequest(cctx.cacheId(), mappingKey.partition()); cctx.io().sendSequentialMessage(mappingKey.nodeId(), topic, req, cctx.ioPolicy()); }