IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e3ebcb96 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e3ebcb96 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e3ebcb96 Branch: refs/heads/ignite-104 Commit: e3ebcb96efd469dd162b5b0e8cd3904ab7003a67 Parents: 98ad892 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Tue Aug 4 11:08:45 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Tue Aug 4 11:08:45 2015 -0700 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 22 ------------ .../processors/cache/GridCacheContext.java | 8 ++--- .../dht/atomic/GridDhtAtomicCache.java | 35 +++++--------------- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 8 ++--- .../dht/atomic/GridNearAtomicUpdateFuture.java | 30 ++++++----------- 5 files changed, 24 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3ebcb96/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index ff32551..3ad0f01 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -219,10 +219,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Write ordering mode. */ private CacheAtomicWriteOrderMode atomicWriteOrderMode; - /** Ordered updates mode. */ - // TODO: IGNITE-104 - Switch default to false - private boolean atomicOrderedUpdates = true; - /** Number of backups for cache. */ private int backups = DFLT_BACKUPS; @@ -349,7 +345,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { aff = cc.getAffinity(); affMapper = cc.getAffinityMapper(); atomicityMode = cc.getAtomicityMode(); - atomicOrderedUpdates = cc.isAtomicOrderedUpdates(); atomicWriteOrderMode = cc.getAtomicWriteOrderMode(); backups = cc.getBackups(); cacheLoaderFactory = cc.getCacheLoaderFactory(); @@ -901,23 +896,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { } /** - * @return Ordered updates mode. - */ - public boolean isAtomicOrderedUpdates() { - return atomicOrderedUpdates; - } - - /** - * @param atomicOrderedUpdates Ordered updates mode. - * @return {@code this} for chaining. - */ - public CacheConfiguration<K, V> setAtomicOrderedUpdates(boolean atomicOrderedUpdates) { - this.atomicOrderedUpdates = atomicOrderedUpdates; - - return this; - } - - /** * Gets number of nodes used to back up single partition for {@link CacheMode#PARTITIONED} cache. * <p> * If not set, default value is {@link #DFLT_BACKUPS}. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3ebcb96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 05ce183..db62f20 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -537,12 +537,8 @@ public class GridCacheContext<K, V> implements Externalizable { * @return {@code True} if entries should not be deleted from cache immediately. */ public boolean deferredDelete(GridCacheAdapter<?, ?> cache) { - boolean nearAtomic = cache.isNear() && cache.configuration().getAtomicityMode() == ATOMIC; - boolean orderedUpdates = cache.configuration().isAtomicOrderedUpdates(); - - return cache.isDht() || cache.isColocated() || - (cache.isDhtAtomic() && !orderedUpdates) || - (nearAtomic && !orderedUpdates); + // Only TRANSACTIONAL caches. + return cache.isDht() || cache.isColocated(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3ebcb96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index f5119f6..01694d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -187,33 +187,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - if (ctx.config().isAtomicOrderedUpdates()) { - for (int part = 0; part < ctx.affinity().partitions(); part++) { - Object nearTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, true); + for (int part = 0; part < ctx.affinity().partitions(); part++) { + Object nearTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, true); - ctx.io().addPerTopicHandler(nearTopic, new CI2<UUID, GridNearAtomicUpdateRequest>() { - @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) { - processNearAtomicUpdateRequest(nodeId, req); - } - }); - - Object dhtTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, false); - - ctx.io().addPerTopicHandler(dhtTopic, new CI2<UUID, GridDhtAtomicUpdateRequest>() { - @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) { - processDhtAtomicUpdateRequest(nodeId, req); - } - }); - } - } - else { - ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() { + ctx.io().addPerTopicHandler(nearTopic, new CI2<UUID, GridNearAtomicUpdateRequest>() { @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) { processNearAtomicUpdateRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() { + Object dhtTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, false); + + ctx.io().addPerTopicHandler(dhtTopic, new CI2<UUID, GridDhtAtomicUpdateRequest>() { @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) { processDhtAtomicUpdateRequest(nodeId, req); } @@ -253,11 +238,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { for (DeferredResponseBuffer buf : pendingResponses.values()) buf.finish(); - if (ctx.config().isAtomicOrderedUpdates()) { - for (int part = 0; part < ctx.affinity().partitions(); part++) { - ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, true)); - ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, false)); - } + for (int part = 0; part < ctx.affinity().partitions(); part++) { + ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, true)); + ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, false)); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3ebcb96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 5c22b3b..52d59ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -123,8 +123,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> waitForExchange = !topLocked; // We can send entry processor instead of value to backup if updates are ordered. - forceTransformBackups = updateReq.operation() == GridCacheOperation.TRANSFORM && - cctx.config().isAtomicOrderedUpdates(); + forceTransformBackups = updateReq.operation() == GridCacheOperation.TRANSFORM; } /** {@inheritDoc} */ @@ -218,9 +217,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(part, topVer); - if (!cctx.config().isAtomicOrderedUpdates()) - part = -1; - if (log.isDebugEnabled()) log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']'); @@ -281,7 +277,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> AffinityTopologyVersion topVer = updateReq.topologyVersion(); - int part = cctx.config().isAtomicOrderedUpdates() ? entry.partition() : -1; + int part = entry.partition(); for (UUID nodeId : readers) { GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3ebcb96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 5150113..c4704cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -512,7 +512,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys(); - mapOnTopology(remapKeys, true, nodeId); + mapOnTopology(remapKeys, true, new GridAtomicMappingKey(nodeId, res.partition())); return; } @@ -591,9 +591,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * * @param keys Keys to map. * @param remap Boolean flag indicating if this is partial future remap. - * @param oldNodeId Old node ID if remap. + * @param remapKey Mapping key (if remap). */ - private void mapOnTopology(final Collection<?> keys, final boolean remap, final UUID oldNodeId) { + private void mapOnTopology(final Collection<?> keys, final boolean remap, final GridAtomicMappingKey remapKey) { cache.topology().readLock(); AffinityTopologyVersion topVer = null; @@ -624,7 +624,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - mapOnTopology(keys, remap, oldNodeId); + mapOnTopology(keys, remap, remapKey); } }); } @@ -640,7 +640,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> cache.topology().readUnlock(); } - map0(topVer, keys, remap, oldNodeId); + map0(topVer, keys, remap, remapKey); } /** @@ -683,14 +683,14 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param topVer Topology version. * @param remapKeys Keys to remap or {@code null} to map all keys. * @param remap Flag indicating if this is partial remap for this future. - * @param oldNodeId Old node ID if was remap. + * @param remapKey Mapping key (if remap). */ private void map0( AffinityTopologyVersion topVer, @Nullable Collection<?> remapKeys, boolean remap, - @Nullable UUID oldNodeId) { - assert oldNodeId == null || remap || fastMapRemap; + @Nullable GridAtomicMappingKey remapKey) { + assert remapKey == null || remap || fastMapRemap; Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); @@ -783,9 +783,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> int part = cctx.affinity().partition(cacheKey); ClusterNode primary = cctx.affinity().primary(part, topVer); - if (!ccfg.isAtomicOrderedUpdates()) - part = -1; - if (primary == null) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + "left the grid).")); @@ -852,13 +849,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> // Must do this in synchronized block because we need to atomically remove and add mapping. // Otherwise checkComplete() may see empty intermediate state. synchronized (this) { - if (oldNodeId != null) { - // TODO: IGNITE-104 - Try to avoid iteration. - for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) { - if (e.getKey().nodeId().equals(oldNodeId)) - mappings.remove(e.getKey()); - } - } + if (remapKey != null) + mappings.remove(remapKey); // For fastMap mode wait for all responses before remapping. if (remap && fastMap && !mappings.isEmpty()) { @@ -930,7 +922,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> T2<Integer, Collection<ClusterNode>> t = mapKey(cacheKey, topVer, fastMap); - int part = ccfg.isAtomicOrderedUpdates() ? t.get1() : -1; + int part = t.get1(); Collection<ClusterNode> affNodes = t.get2(); if (affNodes.isEmpty()) {