# ignite-57 async support for removeAll()

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9140a087
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9140a087
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9140a087

Branch: refs/heads/ignite-sql-tests
Commit: 9140a08747366186778382a09674c0e95b03d53a
Parents: ed55c48
Author: sboikov <semen.boi...@inria.fr>
Authored: Fri Feb 6 21:43:11 2015 +0300
Committer: sboikov <semen.boi...@inria.fr>
Committed: Fri Feb 6 21:43:11 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/cache/CacheProjection.java    |  5 ++
 .../cache/GridCacheProjectionImpl.java          |  5 ++
 .../processors/cache/GridCacheProxyImpl.java    | 12 ++++
 .../processors/cache/IgniteCacheProxy.java      |  5 +-
 .../GridDistributedCacheAdapter.java            | 60 ++++++++++++++++--
 .../distributed/near/GridNearAtomicCache.java   |  5 ++
 .../processors/cache/local/GridLocalCache.java  | 12 ++++
 .../local/atomic/GridLocalAtomicCache.java      | 11 ++++
 .../cache/GridCacheAbstractFullApiSelfTest.java | 64 +++++++++++++++-----
 9 files changed, 159 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java 
b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java
index 84c2839..bca1ae4 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java
@@ -1678,6 +1678,11 @@ public interface CacheProjection<K, V> extends 
Iterable<CacheEntry<K, V>> {
     public void removeAll() throws IgniteCheckedException;
 
     /**
+     * @return Remove future.
+     */
+    public IgniteInternalFuture<?> removeAllAsync();
+
+    /**
      * Asynchronously removes mappings from cache for entries for which the 
optionally passed in filters do
      * pass. If passed in filters are {@code null}, then all entries in cache 
will be enrolled
      * into transaction.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
index 80b7341..b4ac195 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
@@ -1206,6 +1206,11 @@ public class GridCacheProjectionImpl<K, V> implements 
GridCacheProjectionEx<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> removeAllAsync() {
+        return cache.removeAllAsync();
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> removeAllAsync(@Nullable 
IgnitePredicate<CacheEntry<K, V>>... filter) {
         return cache.removeAllAsync(and(filter, false));
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 8fd125e..ee8a89f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -1614,6 +1614,18 @@ public class GridCacheProxyImpl<K, V> implements 
GridCacheProxy<K, V>, Externali
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> removeAllAsync() {
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            return delegate.removeAllAsync();
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> removeAllAsync(@Nullable 
IgnitePredicate<CacheEntry<K, V>>[] filter) {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index c6166af..a733ed2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -736,7 +736,10 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter<IgniteCach
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            delegate.removeAll();
+            if (isAsync())
+                setFuture(delegate.removeAllAsync());
+            else
+                delegate.removeAll();
         }
         catch (IgniteCheckedException e) {
             throw cacheException(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 5365ec3..c0ec962 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -29,6 +29,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.task.*;
+import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
@@ -141,7 +142,7 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
             do {
                 topVer = ctx.affinity().affinityTopologyVersion();
 
-                // Send job to all nodes.
+                // Send job to all data nodes.
                 Collection<ClusterNode> nodes = 
ctx.grid().forDataNodes(name()).nodes();
 
                 if (!nodes.isEmpty()) {
@@ -155,12 +156,61 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
             if (log.isDebugEnabled())
                 log.debug("All remote nodes left while cache remove 
[cacheName=" + name() + "]");
         }
-        catch (ComputeTaskTimeoutCheckedException e) {
-            U.warn(log, "Timed out waiting for remote nodes to finish cache 
remove (consider increasing " +
-                "'networkTimeout' configuration property) [cacheName=" + 
name() + "]");
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> removeAllAsync() {
+        GridFutureAdapter<Void> opFut = new GridFutureAdapter<>();
+
+        long topVer = ctx.affinity().affinityTopologyVersion();
+
+        removeAllAsync(opFut, topVer);
+
+        return opFut;
+    }
+
+    /**
+     * @param opFut Future.
+     * @param topVer Topology version.
+     */
+    private void removeAllAsync(final GridFutureAdapter<Void> opFut, final 
long topVer) {
+        Collection<ClusterNode> nodes = 
ctx.grid().forDataNodes(name()).nodes();
 
-            throw e;
+        if (!nodes.isEmpty()) {
+            IgniteInternalFuture<?> rmvFut = 
ctx.closures().callAsyncNoFailover(BROADCAST,
+                    new GlobalRemoveAllCallable<>(name(), topVer), nodes, 
true);
+
+            rmvFut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> fut) {
+                    try {
+                        fut.get();
+
+                        long topVer0 = 
ctx.affinity().affinityTopologyVersion();
+
+                        if (topVer0 == topVer)
+                            opFut.onDone();
+                        else
+                            removeAllAsync(opFut, topVer0);
+                    }
+                    catch (ClusterGroupEmptyCheckedException ignore) {
+                        if (log.isDebugEnabled())
+                            log.debug("All remote nodes left while cache 
remove [cacheName=" + name() + "]");
+
+                        opFut.onDone();
+                    }
+                    catch (IgniteCheckedException e) {
+                        opFut.onDone(e);
+                    }
+                    catch (Error e) {
+                        opFut.onDone(e);
+
+                        throw e;
+                    }
+                }
+            });
         }
+        else
+            opFut.onDone();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 335f268..62a59a7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -635,6 +635,11 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> removeAllAsync() {
+        return dht.removeAllAsync();
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> 
removeAllAsync(IgnitePredicate<CacheEntry<K, V>>[] filter) {
         return dht.removeAllAsync(keySet(filter));
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index b4bc3a5..845f697 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -31,6 +31,7 @@ import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * Local cache implementation.
@@ -197,6 +198,17 @@ public class GridLocalCache<K, V> extends 
GridCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> removeAllAsync() {
+        return ctx.closures().callLocalSafe(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                removeAll();
+
+                return null;
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
     @Override public void onDeferredDelete(GridCacheEntryEx<K, V> entry, 
GridCacheVersion ver) {
         assert false : "Should not be called";
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 4a268d4..109667e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -472,6 +472,17 @@ public class GridLocalAtomicCache<K, V> extends 
GridCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> removeAllAsync() {
+        return ctx.closures().callLocalSafe(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                removeAll();
+
+                return null;
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> 
removeAllAsync(IgnitePredicate<CacheEntry<K, V>>[] filter) {
         return removeAllAsync(keySet(filter), filter);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 0d66b15..ce06330 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -2098,14 +2098,36 @@ public abstract class GridCacheAbstractFullApiSelfTest 
extends GridCacheAbstract
     /**
      * @throws Exception In case of error.
      */
-    public void testRemoveAll() throws Exception {
-        cache().put("key1", 1);
-        cache().put("key2", 2);
-        cache().put("key3", 3);
+    public void testGlobalRemoveAll() throws Exception {
+        globalRemoveAll(false);
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testGlobalRemoveAllAsync() throws Exception {
+        globalRemoveAll(true);
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    private void globalRemoveAll(boolean async) throws Exception {
+        jcache().put("key1", 1);
+        jcache().put("key2", 2);
+        jcache().put("key3", 3);
 
         checkSize(F.asSet("key1", "key2", "key3"));
 
-        cache().removeAll(F.asList("key1", "key2"));
+        IgniteCache<String, Integer> asyncCache = jcache().withAsync();
+
+        if (async) {
+            asyncCache.removeAll(F.asSet("key1", "key2"));
+
+            asyncCache.future().get();
+        }
+        else
+            jcache().removeAll(F.asSet("key1", "key2"));
 
         checkSize(F.asSet("key3"));
 
@@ -2114,24 +2136,38 @@ public abstract class GridCacheAbstractFullApiSelfTest 
extends GridCacheAbstract
         checkContainsKey(true, "key3");
 
         // Put values again.
-        cache().put("key1", 1);
-        cache().put("key2", 2);
-        cache().put("key3", 3);
+        jcache().put("key1", 1);
+        jcache().put("key2", 2);
+        jcache().put("key3", 3);
+
+        if (async) {
+            IgniteCache<String, Integer> asyncCache0 = jcache(gridCount() > 1 
? 1 : 0).withAsync();
 
-        cache(gridCount() > 1 ? 1 : 0).removeAll();
+            asyncCache0.removeAll();
+
+            asyncCache0.future().get();
+        }
+        else
+            jcache(gridCount() > 1 ? 1 : 0).removeAll();
 
         assert cache().isEmpty();
-        long entryCount = hugeRemoveAllEntryCount();
+        long entryCnt = hugeRemoveAllEntryCount();
 
-        for (int i = 0; i < entryCount; i++)
+        for (int i = 0; i < entryCnt; i++)
             cache().put(String.valueOf(i), i);
 
-        for (int i = 0; i < entryCount; i++)
+        for (int i = 0; i < entryCnt; i++)
             assertEquals(Integer.valueOf(i), cache().get(String.valueOf(i)));
 
-        cache().removeAll();
+        if (async) {
+            asyncCache.removeAll();
+
+            asyncCache.future().get();
+        }
+        else
+            cache().removeAll();
 
-        for (int i = 0; i < entryCount; i++)
+        for (int i = 0; i < entryCnt; i++)
             assertNull(cache().get(String.valueOf(i)));
     }
 

Reply via email to