Repository: incubator-ignite Updated Branches: refs/heads/ignite-54 [created] 3344e7271
IGNITE-54 Implemented removal of primary entries on each node Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3344e727 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3344e727 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3344e727 Branch: refs/heads/ignite-54 Commit: 3344e7271f20ec91a8b17d26a3d6455f054e6a3d Parents: 6efcfb2 Author: avinogradov <avinogra...@gridgain.com> Authored: Fri Jan 16 17:49:46 2015 +0300 Committer: avinogradov <avinogra...@gridgain.com> Committed: Fri Jan 16 17:49:46 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 15 ++- .../grid/cache/GridCacheProjection.java | 17 +++ .../processors/cache/GridCacheAdapter.java | 134 +++++++++++++++++-- .../cache/GridCacheProjectionImpl.java | 5 + .../processors/cache/GridCacheProxyImpl.java | 12 ++ .../processors/cache/IgniteCacheTest.java | 20 +++ 6 files changed, 189 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index ab45aea..2533d34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -458,8 +458,19 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public void removeAll() { - // TODO IGNITE-1. - throw new UnsupportedOperationException(); + try { + GridCacheProjectionImpl<K, V> prev = gate.enter(null); + + try { + delegate.removeAll(); + } + finally { + gate.leave(prev); + } + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java index d188b71..2a84fbe 100644 --- a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java +++ b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java @@ -1235,6 +1235,23 @@ public interface GridCacheProjection<K, V> extends Iterable<GridCacheEntry<K, V> public Set<K> keySet(); /** + * Set of keys cached on this node. You can remove elements from this set, but you cannot add elements + * to this set. All removal operation will be reflected on the cache itself. + * <p> + * Iterator over this set will not fail if set was concurrently updated + * by another thread. This means that iterator may or may not return latest + * keys depending on whether they were added before or after current + * iterator position. + * <p> + * NOTE: this operation is not distributed and returns only the keys cached on this node. + * + * @param filter Optional filter to check prior to getting key form cache. Note + * that filter is checked atomically together with get operation. + * @return Key set for this cache projection. + */ + public Set<K> keySet(@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) ; + + /** * Set of keys for which this node is primary. * This set is dynamic and may change with grid topology changes. * Note that this set will contain mappings for all keys, even if their values are http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index 97d914a..ffd015f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -78,6 +78,9 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im /** clearAll() split threshold. */ public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000; + /** removeAll() batch size. */ + private static final long REMOVE_ALL_BATCH_SIZE = 100L; + /** Deserialization stash. */ private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String, String>>() { @@ -2994,22 +2997,38 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im /** {@inheritDoc} */ @Override public void removeAll(IgnitePredicate<GridCacheEntry<K, V>>... filter) throws IgniteCheckedException { - ctx.denyOnLocalRead(); + try { + if (F.isEmptyOrNulls(filter)) + filter = ctx.trueArray(); - if (F.isEmptyOrNulls(filter)) - filter = ctx.trueArray(); + long topVer; - final IgnitePredicate<GridCacheEntry<K, V>>[] p = filter; + do { + topVer = ctx.affinity().affinityTopologyVersion(); - syncOp(new SyncInOp(false) { - @Override public void inOp(IgniteTxLocalAdapter<K, V> tx) throws IgniteCheckedException { - tx.removeAllAsync(ctx, keySet(p), null, false, null).get(); - } + // Send job to all nodes. + Collection<ClusterNode> nodes = ctx.grid().forCache(name()).nodes(); - @Override public String toString() { - return "removeAll [filter=" + Arrays.toString(p) + ']'; - } - }); + IgniteFuture<Object> fut = null; + + if (!nodes.isEmpty()) + fut = ctx.closures().callAsyncNoFailover(BROADCAST, new GlobalRemoveAllCallable<>(name(), topVer, REMOVE_ALL_BATCH_SIZE, filter), nodes, true); + + if (fut != null) + fut.get(); + + } while (ctx.affinity().affinityTopologyVersion() > topVer); + } + catch (ClusterGroupEmptyException ignore) { + if (log.isDebugEnabled()) + log.debug("All remote nodes left while cache remove [cacheName=" + name() + "]"); + } + catch (ComputeTaskTimeoutException e) { + U.warn(log, "Timed out waiting for remote nodes to finish cache remove (consider increasing " + + "'networkTimeout' configuration property) [cacheName=" + name() + "]"); + + throw e; + } } /** {@inheritDoc} */ @@ -4670,6 +4689,97 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im } /** + * Internal callable which performs remove all primary key mappings + * operation on a cache with the given name. + */ + @GridInternal + private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache name. */ + private String cacheName; + + /** Topology version. */ + private long topVer; + + /** Remove batch size. */ + private long rmvBatchSz; + + /** Filters. */ + private IgnitePredicate<GridCacheEntry<K, V>>[] filter; + + /** Injected grid instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * Empty constructor for serialization. + */ + public GlobalRemoveAllCallable() { + // No-op. + } + + /** + * @param cacheName Cache name. + * @param topVer Topology version. + * @param rmvBatchSz Remove batch size. + * @param filter Filter. + */ + private GlobalRemoveAllCallable(String cacheName, long topVer, long rmvBatchSz, IgnitePredicate<GridCacheEntry<K, V>> ... filter) { + this.cacheName = cacheName; + this.topVer = topVer; + this.rmvBatchSz = rmvBatchSz; + this.filter = filter; + } + + /** + * {@inheritDoc} + */ + @Override public Object call() throws Exception { + Set<K> keys = new HashSet<>(); + + final GridKernal grid = ((GridKernal) ignite); + + final GridCache<K,V> cache = grid.cachex(cacheName); + + final GridCacheContext<K, V> ctx = grid.context().cache().<K, V>internalCache(cacheName).context(); + + assert cache != null; + + for (K k : cache.keySet(filter)) { + if (ctx.affinity().primary(ctx.localNode(), k, topVer)) + keys.add(k); + if (keys.size() >= rmvBatchSz) { + cache.removeAll(keys); + + keys.clear(); + } + } + cache.removeAll(keys); + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, cacheName); + out.writeLong(topVer); + out.writeLong(rmvBatchSz); + out.writeObject(filter); + + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cacheName = U.readString(in); + topVer = in.readLong(); + rmvBatchSz = in.readLong(); + filter = (IgnitePredicate<GridCacheEntry<K, V>>[]) in.readObject(); + } + } + + /** * Internal callable which performs {@link GridCacheProjection#size()} or {@link GridCacheProjection#primarySize()} * operation on a cache with the given name. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java index a9896e5..ccb045b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java @@ -896,6 +896,11 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ + @Override public Set<K> keySet(@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) { + return cache.keySet(filter); + } + + /** {@inheritDoc} */ @Override public Set<K> primaryKeySet() { return cache.primaryKeySet(entryFilter(true)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java index c88183e..b0a4713 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java @@ -1002,6 +1002,18 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } } + /** {@inheritDoc} */ + @Override public Set<K> keySet(@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.keySet(filter); + } + finally { + gate.leave(prev); + } + } + /** {@inheritDoc} */ @Override public Set<K> primaryKeySet() { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java index fed75ec..6099928 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java @@ -30,6 +30,8 @@ import org.gridgain.testframework.junits.common.*; * */ public class IgniteCacheTest extends GridCommonAbstractTest { + private static long ENTRY_COUNT = 1000; + /** */ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); @@ -105,6 +107,24 @@ public class IgniteCacheTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testPutGetRemoveAll() throws Exception { + IgniteCache<Integer, String> cache = jcache(); + + for (int i = 0; i < ENTRY_COUNT; i++) + cache.put(i, String.valueOf(i)); + + for (int i = 0; i < ENTRY_COUNT; i++) + assertEquals(String.valueOf(i), cache.get(i)); + + cache.removeAll(); + + for (int i = 0; i < ENTRY_COUNT; i++) + assertNull(cache.get(i)); + } + + /** * @return Cache. */ protected <K, V> IgniteCache<K, V> jcache() {