ignite-656: support skipStore for removeAll()

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b291e5fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b291e5fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b291e5fd

Branch: refs/heads/ignite-286
Commit: b291e5fd42870303e246287334da0f83e754fc10
Parents: 53345b6
Author: Denis Magda <dma...@gridgain.com>
Authored: Mon Apr 13 17:22:40 2015 +0300
Committer: Denis Magda <dma...@gridgain.com>
Committed: Mon Apr 13 17:22:40 2015 +0300

----------------------------------------------------------------------
 .../distributed/GridDistributedCacheAdapter.java   | 17 +++++++++++------
 .../cache/GridCacheAbstractFullApiSelfTest.java    | 16 ++++++++++++++++
 2 files changed, 27 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b291e5fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index b76dce8..8e689f9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -153,10 +153,9 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
                 // Send job to all data nodes.
                 Collection<ClusterNode> nodes = 
ctx.grid().cluster().forDataNodes(name()).nodes();
 
-                if (!nodes.isEmpty()) {
+                if (!nodes.isEmpty())
                     ctx.closures().callAsyncNoFailover(BROADCAST,
-                        new GlobalRemoveAllCallable<>(name(), topVer), nodes, 
true).get();
-                }
+                        new GlobalRemoveAllCallable<>(name(), topVer, 
ctx.skipStore()), nodes, true).get();
             }
             while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) 
> 0);
         }
@@ -186,7 +185,7 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
 
         if (!nodes.isEmpty()) {
             IgniteInternalFuture<?> rmvFut = 
ctx.closures().callAsyncNoFailover(BROADCAST,
-                    new GlobalRemoveAllCallable<>(name(), topVer), nodes, 
true);
+                    new GlobalRemoveAllCallable<>(name(), topVer, 
ctx.skipStore()), nodes, true);
 
             rmvFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
                 @Override public void apply(IgniteInternalFuture<?> fut) {
@@ -241,6 +240,9 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
         /** Topology version. */
         private AffinityTopologyVersion topVer;
 
+        /** Skip store flag. */
+        private boolean skipStore;
+
         /** Injected grid instance. */
         @IgniteInstanceResource
         private Ignite ignite;
@@ -256,9 +258,10 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
          * @param cacheName Cache name.
          * @param topVer Topology version.
          */
-        private GlobalRemoveAllCallable(String cacheName, @NotNull 
AffinityTopologyVersion topVer) {
+        private GlobalRemoveAllCallable(String cacheName, @NotNull 
AffinityTopologyVersion topVer, boolean skipStore) {
             this.cacheName = cacheName;
             this.topVer = topVer;
+            this.skipStore = skipStore;
         }
 
         /**
@@ -291,7 +294,7 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
                          (DataStreamerImpl)ignite.dataStreamer(cacheName)) {
                     ((DataStreamerImpl)dataLdr).maxRemapCount(0);
 
-                    //dataLdr.skipStore(); //Pass skip store flag here (needed 
for removeAll operation)
+                    dataLdr.skipStore(skipStore);
 
                     
dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched());
 
@@ -335,12 +338,14 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
         @Override public void writeExternal(ObjectOutput out) throws 
IOException {
             U.writeString(out, cacheName);
             out.writeObject(topVer);
+            out.writeBoolean(skipStore);
         }
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
             cacheName = U.readString(in);
             topVer = (AffinityTopologyVersion)in.readObject();
+            skipStore = in.readBoolean();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b291e5fd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index bc48b40..9b0d675 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -4356,6 +4356,22 @@ public abstract class GridCacheAbstractFullApiSelfTest 
extends GridCacheAbstract
             assertTrue(map.containsKey(key));
         }
 
+        cacheSkipStore.removeAll(data.keySet());
+
+        for (String key : keys) {
+            assertNull(cacheSkipStore.get(key));
+            assertNotNull(cache.get(key));
+            assertTrue(map.containsKey(key));
+        }
+
+        cacheSkipStore.putAll(data);
+
+        for (String key : keys) {
+            assertNotNull(cacheSkipStore.get(key));
+            assertNotNull(cache.get(key));
+            assertTrue(map.containsKey(key));
+        }
+
         cacheSkipStore.removeAll();
 
         for (String key : keys) {

Reply via email to