# ignite-57 async support for removeAll()
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9140a087 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9140a087 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9140a087 Branch: refs/heads/ignite-sql-tests Commit: 9140a08747366186778382a09674c0e95b03d53a Parents: ed55c48 Author: sboikov <semen.boi...@inria.fr> Authored: Fri Feb 6 21:43:11 2015 +0300 Committer: sboikov <semen.boi...@inria.fr> Committed: Fri Feb 6 21:43:11 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/cache/CacheProjection.java | 5 ++ .../cache/GridCacheProjectionImpl.java | 5 ++ .../processors/cache/GridCacheProxyImpl.java | 12 ++++ .../processors/cache/IgniteCacheProxy.java | 5 +- .../GridDistributedCacheAdapter.java | 60 ++++++++++++++++-- .../distributed/near/GridNearAtomicCache.java | 5 ++ .../processors/cache/local/GridLocalCache.java | 12 ++++ .../local/atomic/GridLocalAtomicCache.java | 11 ++++ .../cache/GridCacheAbstractFullApiSelfTest.java | 64 +++++++++++++++----- 9 files changed, 159 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java index 84c2839..bca1ae4 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java @@ -1678,6 +1678,11 @@ public interface CacheProjection<K, V> extends Iterable<CacheEntry<K, V>> { public void removeAll() throws IgniteCheckedException; /** + * @return Remove future. + */ + public IgniteInternalFuture<?> removeAllAsync(); + + /** * Asynchronously removes mappings from cache for entries for which the optionally passed in filters do * pass. If passed in filters are {@code null}, then all entries in cache will be enrolled * into transaction. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index 80b7341..b4ac195 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -1206,6 +1206,11 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> removeAllAsync() { + return cache.removeAllAsync(); + } + + /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> removeAllAsync(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { return cache.removeAllAsync(and(filter, false)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 8fd125e..ee8a89f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -1614,6 +1614,18 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> removeAllAsync() { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.removeAllAsync(); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> removeAllAsync(@Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index c6166af..a733ed2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -736,7 +736,10 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - delegate.removeAll(); + if (isAsync()) + setFuture(delegate.removeAllAsync()); + else + delegate.removeAll(); } catch (IgniteCheckedException e) { throw cacheException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 5365ec3..c0ec962 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.task.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; @@ -141,7 +142,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter do { topVer = ctx.affinity().affinityTopologyVersion(); - // Send job to all nodes. + // Send job to all data nodes. Collection<ClusterNode> nodes = ctx.grid().forDataNodes(name()).nodes(); if (!nodes.isEmpty()) { @@ -155,12 +156,61 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter if (log.isDebugEnabled()) log.debug("All remote nodes left while cache remove [cacheName=" + name() + "]"); } - catch (ComputeTaskTimeoutCheckedException e) { - U.warn(log, "Timed out waiting for remote nodes to finish cache remove (consider increasing " + - "'networkTimeout' configuration property) [cacheName=" + name() + "]"); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> removeAllAsync() { + GridFutureAdapter<Void> opFut = new GridFutureAdapter<>(); + + long topVer = ctx.affinity().affinityTopologyVersion(); + + removeAllAsync(opFut, topVer); + + return opFut; + } + + /** + * @param opFut Future. + * @param topVer Topology version. + */ + private void removeAllAsync(final GridFutureAdapter<Void> opFut, final long topVer) { + Collection<ClusterNode> nodes = ctx.grid().forDataNodes(name()).nodes(); - throw e; + if (!nodes.isEmpty()) { + IgniteInternalFuture<?> rmvFut = ctx.closures().callAsyncNoFailover(BROADCAST, + new GlobalRemoveAllCallable<>(name(), topVer), nodes, true); + + rmvFut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + try { + fut.get(); + + long topVer0 = ctx.affinity().affinityTopologyVersion(); + + if (topVer0 == topVer) + opFut.onDone(); + else + removeAllAsync(opFut, topVer0); + } + catch (ClusterGroupEmptyCheckedException ignore) { + if (log.isDebugEnabled()) + log.debug("All remote nodes left while cache remove [cacheName=" + name() + "]"); + + opFut.onDone(); + } + catch (IgniteCheckedException e) { + opFut.onDone(e); + } + catch (Error e) { + opFut.onDone(e); + + throw e; + } + } + }); } + else + opFut.onDone(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 335f268..62a59a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -635,6 +635,11 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> removeAllAsync() { + return dht.removeAllAsync(); + } + + /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> removeAllAsync(IgnitePredicate<CacheEntry<K, V>>[] filter) { return dht.removeAllAsync(keySet(filter)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java index b4bc3a5..845f697 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java @@ -31,6 +31,7 @@ import org.jetbrains.annotations.*; import java.io.*; import java.util.*; +import java.util.concurrent.*; /** * Local cache implementation. @@ -197,6 +198,17 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { } /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> removeAllAsync() { + return ctx.closures().callLocalSafe(new Callable<Void>() { + @Override public Void call() throws Exception { + removeAll(); + + return null; + } + }); + } + + /** {@inheritDoc} */ @Override public void onDeferredDelete(GridCacheEntryEx<K, V> entry, GridCacheVersion ver) { assert false : "Should not be called"; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 4a268d4..109667e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -472,6 +472,17 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { } /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> removeAllAsync() { + return ctx.closures().callLocalSafe(new Callable<Void>() { + @Override public Void call() throws Exception { + removeAll(); + + return null; + } + }); + } + + /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> removeAllAsync(IgnitePredicate<CacheEntry<K, V>>[] filter) { return removeAllAsync(keySet(filter), filter); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9140a087/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 0d66b15..ce06330 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -2098,14 +2098,36 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract /** * @throws Exception In case of error. */ - public void testRemoveAll() throws Exception { - cache().put("key1", 1); - cache().put("key2", 2); - cache().put("key3", 3); + public void testGlobalRemoveAll() throws Exception { + globalRemoveAll(false); + } + + /** + * @throws Exception In case of error. + */ + public void testGlobalRemoveAllAsync() throws Exception { + globalRemoveAll(true); + } + + /** + * @throws Exception In case of error. + */ + private void globalRemoveAll(boolean async) throws Exception { + jcache().put("key1", 1); + jcache().put("key2", 2); + jcache().put("key3", 3); checkSize(F.asSet("key1", "key2", "key3")); - cache().removeAll(F.asList("key1", "key2")); + IgniteCache<String, Integer> asyncCache = jcache().withAsync(); + + if (async) { + asyncCache.removeAll(F.asSet("key1", "key2")); + + asyncCache.future().get(); + } + else + jcache().removeAll(F.asSet("key1", "key2")); checkSize(F.asSet("key3")); @@ -2114,24 +2136,38 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract checkContainsKey(true, "key3"); // Put values again. - cache().put("key1", 1); - cache().put("key2", 2); - cache().put("key3", 3); + jcache().put("key1", 1); + jcache().put("key2", 2); + jcache().put("key3", 3); + + if (async) { + IgniteCache<String, Integer> asyncCache0 = jcache(gridCount() > 1 ? 1 : 0).withAsync(); - cache(gridCount() > 1 ? 1 : 0).removeAll(); + asyncCache0.removeAll(); + + asyncCache0.future().get(); + } + else + jcache(gridCount() > 1 ? 1 : 0).removeAll(); assert cache().isEmpty(); - long entryCount = hugeRemoveAllEntryCount(); + long entryCnt = hugeRemoveAllEntryCount(); - for (int i = 0; i < entryCount; i++) + for (int i = 0; i < entryCnt; i++) cache().put(String.valueOf(i), i); - for (int i = 0; i < entryCount; i++) + for (int i = 0; i < entryCnt; i++) assertEquals(Integer.valueOf(i), cache().get(String.valueOf(i))); - cache().removeAll(); + if (async) { + asyncCache.removeAll(); + + asyncCache.future().get(); + } + else + cache().removeAll(); - for (int i = 0; i < entryCount; i++) + for (int i = 0; i < entryCnt; i++) assertNull(cache().get(String.valueOf(i))); }