ignite-656: skipStore is fixed for get and putAll operations
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/53345b6b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/53345b6b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/53345b6b Branch: refs/heads/ignite-286 Commit: 53345b6ba3e30e6cb2df30d87c0ece4bbdacc77c Parents: e914bc8 Author: Denis Magda <dma...@gridgain.com> Authored: Mon Apr 13 16:09:39 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Mon Apr 13 16:09:39 2015 +0300 ---------------------------------------------------------------------- .../GridDistributedCacheAdapter.java | 2 + .../dht/GridPartitionedGetFuture.java | 3 +- .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../distributed/near/GridNearGetFuture.java | 3 +- .../distributed/near/GridNearGetRequest.java | 7 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 83 +++++++++++++++++++- 6 files changed, 95 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53345b6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index b78e7e0..b76dce8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -291,6 +291,8 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter (DataStreamerImpl)ignite.dataStreamer(cacheName)) { ((DataStreamerImpl)dataLdr).maxRemapCount(0); + //dataLdr.skipStore(); //Pass skip store flag here (needed for removeAll operation) + dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched()); for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53345b6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index da2105d..ec58075 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -372,7 +372,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M subjId, taskName == null ? 0 : taskName.hashCode(), expiryPlc != null ? expiryPlc.forAccess() : -1L, - skipVals); + skipVals, + cctx.skipStore()); add(fut); // Append new future. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53345b6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 18a1c25..b47c284 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1117,7 +1117,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { //TODO: support skipStore for putAll if (keys.size() > 1 && // Several keys ... - writeThrough() && // and store is enabled ... + writeThrough() && !req.skipStore() && // and store is enabled ... !ctx.store().isLocal() && // and this is not local store ... !ctx.dr().receiveEnabled() // and no DR. ) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53345b6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index b1017d8..ece64a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -372,7 +372,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma subjId, taskName == null ? 0 : taskName.hashCode(), expiryPlc != null ? expiryPlc.forAccess() : -1L, - skipVals); + skipVals, + cctx.skipStore()); add(fut); // Append new future. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53345b6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java index 93c869d..28be254 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java @@ -105,6 +105,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep * @param subjId Subject ID. * @param taskNameHash Task name hash. * @param accessTtl New TTL to set after entry is accessed, -1 to leave unchanged. + * @param skipStore Skipe store flag. Used to skip read-through from a persistent storage. */ public GridNearGetRequest( int cacheId, @@ -118,7 +119,8 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep UUID subjId, int taskNameHash, long accessTtl, - boolean skipVals + boolean skipVals, + boolean skipStore ) { assert futId != null; assert miniId != null; @@ -138,6 +140,9 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep this.taskNameHash = taskNameHash; this.accessTtl = accessTtl; this.skipVals = skipVals; + + if (readThrough) + this.readThrough = !skipStore; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53345b6b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index d0309c3..bc48b40 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -4293,6 +4293,87 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assertNotNull(cache.get(key)); } + cache.removeAll(); + + for (String key : keys) + assertNull(cache.get(key)); + + // Put/remove data from multiple nodes. + + keys = new ArrayList<>(1000); + + for (int i = 0; i < 1000; i++) + keys.add("key_" + i); + + for (int i = 0; i < keys.size(); ++i) + cache.put(keys.get(i), i); + + for (String key : keys) { + assertNotNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertTrue(map.containsKey(key)); + } + + for (String key : keys) { + cacheSkipStore.remove(key); + + assertNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertTrue(map.containsKey(key)); + } + + for (String key : keys) { + cache.remove(key); + + assertNull(cacheSkipStore.get(key)); + assertNull(cache.get(key)); + assertFalse(map.containsKey(key)); + } + + assertFalse(cacheSkipStore.iterator().hasNext()); + assertTrue(map.size() == 0); + + // putAll/removeAll from multiple nodes. + + Map<String, Integer> data = new HashMap<>(); + + for (int i = 0; i < keys.size(); i++) + data.put(keys.get(i), i); + + cacheSkipStore.putAll(data); + + for (String key : keys) { + assertNotNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertFalse(map.containsKey(key)); + } + + cache.putAll(data); + + for (String key : keys) { + assertNotNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertTrue(map.containsKey(key)); + } + + cacheSkipStore.removeAll(); + + for (String key : keys) { + assertNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertTrue(map.containsKey(key)); + } + + cache.removeAll(); + + for (String key : keys) { + assertNull(cacheSkipStore.get(key)); + assertNull(cache.get(key)); + assertFalse(map.containsKey(key)); + } + + // Final checks + String newKey = "New key"; assertFalse(map.containsKey(newKey)); @@ -4311,7 +4392,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract Cache.Entry<String, Integer> entry = it.next(); - String rmvKey = entry.getKey(); + String rmvKey = entry.getKey(); assertTrue(map.containsKey(rmvKey));