IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b1814118 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b1814118 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b1814118 Branch: refs/heads/ignite-104 Commit: b18141183bfe7a6b0d566ed00585e4894dec887d Parents: 1fee086 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Wed Jul 22 16:51:41 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Wed Jul 22 16:51:41 2015 -0700 ---------------------------------------------------------------------- .../internal/processors/cache/GridCacheContext.java | 8 ++++++-- .../cache/distributed/dht/GridDhtLocalPartition.java | 15 +++++++++++---- .../cache/distributed/near/GridNearAtomicCache.java | 4 +++- 3 files changed, 20 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1814118/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 5f17746..05ce183 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -537,8 +537,12 @@ public class GridCacheContext<K, V> implements Externalizable { * @return {@code True} if entries should not be deleted from cache immediately. */ public boolean deferredDelete(GridCacheAdapter<?, ?> cache) { - return cache.isDht() || cache.isDhtAtomic() || cache.isColocated() || - (cache.isNear() && cache.configuration().getAtomicityMode() == ATOMIC); + boolean nearAtomic = cache.isNear() && cache.configuration().getAtomicityMode() == ATOMIC; + boolean orderedUpdates = cache.configuration().isAtomicOrderedUpdates(); + + return cache.isDht() || cache.isColocated() || + (cache.isDhtAtomic() && !orderedUpdates) || + (nearAtomic && !orderedUpdates); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1814118/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 87c7f0e..1d47840 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -90,7 +90,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, private final LongAdder8 mapPubSize = new LongAdder8(); /** Remove queue. */ - private final GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue; + private GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue; /** Group reservations. */ private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>(); @@ -120,7 +120,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 : Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20); - rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize)); + if (cctx.deferredDelete()) + rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize)); } /** @@ -270,6 +271,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * @throws IgniteCheckedException If failed. */ public void onDeferredDelete(KeyCacheObject key, GridCacheVersion ver) throws IgniteCheckedException { + assert cctx.deferredDelete(); + try { T2<KeyCacheObject, GridCacheVersion> evicted = rmvQueue.add(new T2<>(key, ver)); @@ -471,7 +474,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq); - clearDeferredDeletes(); + if (cctx.deferredDelete()) + clearDeferredDeletes(); return new GridFinishedFuture<>(true); } @@ -522,7 +526,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq); - clearDeferredDeletes(); + if (cctx.deferredDelete()) + clearDeferredDeletes(); return true; } @@ -681,6 +686,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * */ private void clearDeferredDeletes() { + assert cctx.deferredDelete(); + rmvQueue.forEach(new CI1<T2<KeyCacheObject, GridCacheVersion>>() { @Override public void apply(T2<KeyCacheObject, GridCacheVersion> t) { cctx.dht().removeVersionedEntry(t.get1(), t.get2()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1814118/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 041f83a..ed07f8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -71,7 +71,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { int size = CU.isSystemCache(ctx.name()) ? 100 : Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, 1_000_000); - rmvQueue = new GridCircularBuffer<>(U.ceilPow2(size / 10)); + if (ctx.deferredDelete()) + rmvQueue = new GridCircularBuffer<>(U.ceilPow2(size / 10)); } /** {@inheritDoc} */ @@ -642,6 +643,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public void onDeferredDelete(GridCacheEntryEx entry, GridCacheVersion ver) { + assert ctx.deferredDelete(); assert entry.isNear(); try {