Repository: incubator-ignite Updated Branches: refs/heads/ignite-104 9dec3b7b1 -> 2ac79893c
IGNITE-104 - Ordered ATOMIC updates Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2ac79893 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2ac79893 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2ac79893 Branch: refs/heads/ignite-104 Commit: 2ac79893cc21517c73e85a1bcc18c33b59140fb2 Parents: 9dec3b7 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Tue Jul 21 21:12:23 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Tue Jul 21 21:12:23 2015 -0700 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 36 +++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ac79893/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index bdaa994..aaf373d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -188,24 +188,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { processNearAtomicUpdateRequest(nodeId, req); } }); - } - } - else { - ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() { - @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) { - processNearAtomicUpdateRequest(nodeId, req); - } - }); - } - ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() { - @Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) { - processNearAtomicUpdateResponse(nodeId, res); - } - }); - - if (ctx.config().isAtomicOrderedUpdates()) { - for (int part = 0; part < ctx.affinity().partitions(); part++) { ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part, false), new CI2<UUID, GridDhtAtomicUpdateRequest>() { @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) { processDhtAtomicUpdateRequest(nodeId, req); @@ -214,6 +197,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } else { + ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() { + @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) { + processNearAtomicUpdateRequest(nodeId, req); + } + }); + ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() { @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) { processDhtAtomicUpdateRequest(nodeId, req); @@ -221,6 +210,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { }); } + ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() { + @Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) { + processNearAtomicUpdateResponse(nodeId, res); + } + }); + ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateResponse.class, new CI2<UUID, GridDhtAtomicUpdateResponse>() { @Override public void apply(UUID nodeId, GridDhtAtomicUpdateResponse res) { processDhtAtomicUpdateResponse(nodeId, res); @@ -247,6 +242,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public void stop() { for (DeferredResponseBuffer buf : pendingResponses.values()) buf.finish(); + + if (ctx.config().isAtomicOrderedUpdates()) { + for (int part = 0; part < ctx.affinity().partitions(); part++) { + ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, part, true)); + ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, part, false)); + } + } } /**