#ignite-333: Add methods clear(K key), clear(Set<K> keys), localClear(K key), localClearAll(Set<K> keys), localClear()
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a094c880 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a094c880 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a094c880 Branch: refs/heads/ignite-gg-9858 Commit: a094c8803e4cf6a53c95cffd0d5e27385ec67021 Parents: 0d008db Author: ivasilinets <ivasilin...@gridgain.com> Authored: Thu Mar 12 15:29:23 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Thu Mar 12 15:29:23 2015 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 48 ++++- .../processors/cache/CacheProjection.java | 64 ++++++ .../processors/cache/GridCacheAdapter.java | 196 +++++++++++++++++-- .../cache/GridCacheProjectionImpl.java | 25 +++ .../processors/cache/IgniteCacheProxy.java | 53 ++++- 5 files changed, 368 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a094c880/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 119b952..4476670 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -380,6 +380,28 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @Override public void clear(); /** + * Clear key, without notifying listeners or + * {@link javax.cache.integration.CacheWriter}s. + * + * @param key Key to clear. + * @throws IllegalStateException if the cache is {@link #isClosed()} + * @throws CacheException if there is a problem during the clear + */ + @IgniteAsyncSupported + public void clear(K key); + + /** + * Clear keys, without notifying listeners or + * {@link javax.cache.integration.CacheWriter}s. + * + * @param keys Keys to clear. + * @throws IllegalStateException if the cache is {@link #isClosed()} + * @throws CacheException if there is a problem during the clear + */ + @IgniteAsyncSupported + public void clearAll(Set<K> keys); + + /** * Clears an entry from this cache and swap storage only if the entry * is not currently locked, and is not participating in a transaction. * <p/> @@ -400,10 +422,30 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * if entry was in use at the time of this method invocation and could not be * cleared. */ - public boolean clearLocally(K key); + public boolean localClear(K key); + + /** + * Clears entries from this cache and swap storage only if the entry + * is not currently locked, and is not participating in a transaction. + * <p/> + * If {@link CacheConfiguration#isSwapEnabled()} is set to {@code true} and + * {@link CacheFlag#SKIP_SWAP} is not enabled, the evicted entries will + * also be cleared from swap. + * <p/> + * Note that this operation is local as it merely clears + * an entry from local cache. It does not remove entries from + * remote caches or from underlying persistent storage. + * This method is not transactionally consistent. + * Transactional semantics must be guaranteed outside of Ignite. + * <h2 class="header">Cache Flags</h2> + * This method is not available if flag {@link CacheFlag#READ} are set on projection. + * + * @param keys Set of keys to clear. + */ + public void localClearAll(Set<K> keys); /** - * Clears all entry from this cache and swap storage only if the entry + * Clears all entries from this cache and swap storage only if the entry * is not currently locked, and is not participating in a transaction. * <p/> * If {@link CacheConfiguration#isSwapEnabled()} is set to {@code true} and @@ -418,7 +460,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * <h2 class="header">Cache Flags</h2> * This method is not available if flag {@link CacheFlag#READ} are set on projection. */ - public void clearLocally(); + public void localClear(); /** {@inheritDoc} */ @IgniteAsyncSupported http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a094c880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java index 23fd0ee..d027aec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java @@ -1255,6 +1255,58 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { public boolean clearLocally(K key); /** + * Clears entries from this cache and swap storage only if the entry + * is not currently locked, and is not participating in a transaction. + * <p> + * If {@link org.apache.ignite.configuration.CacheConfiguration#isSwapEnabled()} is set to {@code true} and + * {@link CacheFlag#SKIP_SWAP} is not enabled, the evicted entries will + * also be cleared from swap. + * <p> + * Note that this operation is local as it merely clears + * an entry from local cache. It does not remove entries from + * remote caches or from underlying persistent storage. + * <h2 class="header">Cache Flags</h2> + * This method is not available if any of the following flags are set on projection: + * {@link CacheFlag#READ}. + * + * @param keys Keys to clearLocally. + * @return {@code True} if entry was successfully cleared from cache, {@code false} + * if entry was in use at the time of this method invocation and could not be + * cleared. + */ + public void clearLocally(Set<K> keys); + + /** + * Clears key on all nodes that store it's data. That is, caches are cleared on remote + * nodes and local node, as opposed to {@link CacheProjection#clearLocally()} method which only + * clears local node's cache. + * <p> + * Ignite will make the best attempt to clear caches on all nodes. If some caches + * could not be cleared, then exception will be thrown. + * <p> + * This method is identical to calling {@link #clear(long) clear(0)}. + * + * @param key Key to clear. + * @throws IgniteCheckedException In case of cache could not be cleared on any of the nodes. + */ + public void clear(K key) throws IgniteCheckedException; + + /** + * Clears keys on all nodes that store it's data. That is, caches are cleared on remote + * nodes and local node, as opposed to {@link CacheProjection#clearLocally()} method which only + * clears local node's cache. + * <p> + * Ignite will make the best attempt to clear caches on all nodes. If some caches + * could not be cleared, then exception will be thrown. + * <p> + * This method is identical to calling {@link #clear(long) clear(0)}. + * + * @param keys Keys to clear. + * @throws IgniteCheckedException In case of cache could not be cleared on any of the nodes. + */ + public void clear(Set<K> keys) throws IgniteCheckedException; + + /** * Clears cache on all nodes that store it's data. That is, caches are cleared on remote * nodes and local node, as opposed to {@link CacheProjection#clearLocally()} method which only * clears local node's cache. @@ -1275,6 +1327,18 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { public IgniteInternalFuture<?> clearAsync(); /** + * @param key Key to clear. + * @return Clear future. + */ + public IgniteInternalFuture<?> clearAsync(K key); + + /** + * @param keys Keys to clear. + * @return Clear future. + */ + public IgniteInternalFuture<?> clearAsync(Set<K> keys); + + /** * Clears cache on all nodes that store it's data. That is, caches are cleared on remote * nodes and local node, as opposed to {@link CacheProjection#clearLocally()} method which only * clears local node's cache. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a094c880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index de3f5b7..c6732ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1339,6 +1339,11 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ + @Override public void clearLocally(Set<K> keys) { + clearLocally0(keys); + } + + /** {@inheritDoc} */ @Override public void clearLocally() { ctx.denyOnFlag(READ); ctx.checkSecurity(GridSecurityPermission.CACHE_REMOVE); @@ -1431,7 +1436,46 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ + @Override public void clear(K key) throws IgniteCheckedException { + // Clear local cache synchronously. + clearLocally(key); + + clearRemotes(0, new GlobalClearKeyCallable<K>(name(), key)); + } + + /** {@inheritDoc} */ + @Override public void clear(Set<K> keys) throws IgniteCheckedException { + // Clear local cache synchronously. + clearLocally(keys); + + clearRemotes(0, new GlobalClearKeySetCallable<K>(name(), keys)); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> clearAsync(K key) { + return clearAsync(new GlobalClearKeyCallable<K>(name(), key)); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> clearAsync(Set<K> keys) { + return clearAsync(new GlobalClearKeySetCallable<K>(name(), keys)); + } + + /** {@inheritDoc} */ @Override public void clear(long timeout) throws IgniteCheckedException { + // Clear local cache synchronously. + clearLocally(); + + clearRemotes(timeout, new GlobalClearAllCallable(name())); + } + + /** + * @param timeout Timeout for clearLocally all task in milliseconds (0 for never). + * Set it to larger value for large caches. + * @param clearCallable Global clear callable object. + * @throws IgniteCheckedException In case of cache could not be cleared on any of the nodes. + */ + private void clearRemotes(long timeout, GlobalClearCallable clearCallable) throws IgniteCheckedException { try { // Send job to remote nodes only. Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes(); @@ -1441,12 +1485,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (!nodes.isEmpty()) { ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout); - fut = ctx.closures().callAsyncNoFailover(BROADCAST, new GlobalClearAllCallable(name()), nodes, true); + fut = ctx.closures().callAsyncNoFailover(BROADCAST, clearCallable, nodes, true); } - // Clear local cache synchronously. - clearLocally(); - if (fut != null) fut.get(); } @@ -1464,11 +1505,19 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync() { + return clearAsync(new GlobalClearAllCallable(name())); + } + + /** + * @param clearCallable Global clear callable object. + * @return Future. + */ + private IgniteInternalFuture<?> clearAsync(GlobalClearCallable clearCallable) { Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).nodes(); if (!nodes.isEmpty()) { IgniteInternalFuture<Object> fut = - ctx.closures().callAsyncNoFailover(BROADCAST, new GlobalClearAllCallable(name()), nodes, true); + ctx.closures().callAsyncNoFailover(BROADCAST, clearCallable, nodes, true); return fut.chain(new CX1<IgniteInternalFuture<Object>, Object>() { @Override public Object applyx(IgniteInternalFuture<Object> fut) throws IgniteCheckedException { @@ -5457,16 +5506,47 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * operation on a cache with the given name. */ @GridInternal - private static class GlobalClearAllCallable implements Callable<Object>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; + private static abstract class GlobalClearCallable implements Callable<Object>, Externalizable { /** Cache name. */ - private String cacheName; + protected String cacheName; /** Injected grid instance. */ @IgniteInstanceResource - private Ignite ignite; + protected Ignite ignite; + + /** + * Empty constructor for serialization. + */ + public GlobalClearCallable() { + // No-op. + } + + /** + * @param cacheName Cache name. + */ + protected GlobalClearCallable(String cacheName) { + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, cacheName); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cacheName = U.readString(in); + } + } + + /** + * Global clear all. + */ + @GridInternal + private static class GlobalClearAllCallable extends GlobalClearCallable { + /** */ + private static final long serialVersionUID = 0L; /** * Empty constructor for serialization. @@ -5479,7 +5559,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param cacheName Cache name. */ private GlobalClearAllCallable(String cacheName) { - this.cacheName = cacheName; + super(cacheName); } /** {@inheritDoc} */ @@ -5488,15 +5568,105 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return null; } + } + + /** + * Global clear key. + */ + @GridInternal + private static class GlobalClearKeyCallable<K> extends GlobalClearCallable { + /** */ + private static final long serialVersionUID = 0L; + + /** Key to remove. */ + private K key; + + /** + * Empty constructor for serialization. + */ + public GlobalClearKeyCallable() { + // No-op. + } + + /** + * @param cacheName Cache name. + * @param key Key to clear. + */ + private GlobalClearKeyCallable(String cacheName, K key) { + super(cacheName); + + this.key = key; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + ((IgniteEx)ignite).cachex(cacheName).clearLocally(key); + + return null; + } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); + super.writeExternal(out); + + out.writeObject(key); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); + super.readExternal(in); + + key = (K)in.readObject(); + } + } + + /** + * Global clear keys. + */ + @GridInternal + private static class GlobalClearKeySetCallable<K> extends GlobalClearCallable { + /** */ + private static final long serialVersionUID = 0L; + + /** Keys to remove. */ + private Set<K> keys; + + /** + * Empty constructor for serialization. + */ + public GlobalClearKeySetCallable() { + // No-op. + } + + /** + * @param cacheName Cache name. + * @param keys Keys to clear. + */ + private GlobalClearKeySetCallable(String cacheName, Set<K> keys) { + super(cacheName); + + this.keys = keys; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + ((IgniteEx)ignite).cachex(cacheName).clearLocally(keys); + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + out.writeObject(keys); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + keys = (Set<K>) in.readObject(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a094c880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index 73bcc64..191c2a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -875,11 +875,36 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ + @Override public void clearLocally(Set<K> keys) { + cache.clearLocally(keys); + } + + /** {@inheritDoc} */ @Override public void clear() throws IgniteCheckedException { cache.clear(); } /** {@inheritDoc} */ + @Override public void clear(K key) throws IgniteCheckedException { + cache.clear(key); + } + + /** {@inheritDoc} */ + @Override public void clear(Set<K> keys) throws IgniteCheckedException { + cache.clear(keys); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> clearAsync(K key) { + return cache.clearAsync(key); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> clearAsync(Set<K> keys) { + return cache.clearAsync(keys); + } + + /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync() { return cache.clearAsync(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a094c880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index d7549b5..5b9d801 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -1043,6 +1043,42 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public void clear(K key) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + if (isAsync()) + setFuture(delegate.clearAsync(key)); + else + delegate.clear(key); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ + @Override public void clearAll(Set<K> keys) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + if (isAsync()) + setFuture(delegate.clearAsync(keys)); + else + delegate.clear(keys); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public void clear() { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -1061,7 +1097,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ - @Override public boolean clearLocally(K key) { + @Override public boolean localClear(K key) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1075,7 +1111,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ - @Override public void clearLocally() { + @Override public void localClearAll(Set<K> keys) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + for (K key : keys) + delegate.clearLocally(key); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ + @Override public void localClear() { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try {