# ignite-60
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e3598a95 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e3598a95 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e3598a95 Branch: refs/heads/ignite-99-2 Commit: e3598a952057752a562f5c6a4ebf95778d2e5fa1 Parents: 7faa36f Author: sboikov <sboi...@gridgain.com> Authored: Mon Jan 26 12:40:45 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Jan 26 12:40:45 2015 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 79 +++++++ .../apache/ignite/cache/CacheProjection.java | 6 + .../processors/cache/GridCacheAdapter.java | 117 +++++++++- .../cache/GridCacheProjectionImpl.java | 5 + .../processors/cache/GridCacheProxyImpl.java | 12 + .../processors/cache/IgniteCacheProxy.java | 229 +++++++++++-------- .../dht/GridCacheGlobalLoadTest.java | 78 ++++++- .../GridCachePartitionedLoadCacheSelfTest.java | 25 +- 8 files changed, 443 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3598a95/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index d99ccc6..0655195 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -101,6 +101,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * {@link CacheStore#loadCache(IgniteBiInClosure, Object...)} method. * @throws CacheException If loading failed. */ + @IgniteAsyncSupported public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException; /** @@ -124,6 +125,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * {@link CacheStore#loadCache(IgniteBiInClosure, Object...)} method. * @throws CacheException If loading failed. */ + @IgniteAsyncSupported public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException; /** @@ -155,6 +157,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * @throws CacheException If put operation failed. * @throws org.apache.ignite.internal.processors.cache.CacheFlagException If projection flags validation failed. */ + @IgniteAsyncSupported @Nullable public V getAndPutIfAbsent(K key, V val) throws CacheException; /** @@ -295,6 +298,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * @param peekModes Optional peek modes. If not provided, then total cache size is returned. * @return Cache size across all nodes. */ + @IgniteAsyncSupported public int size(CachePeekMode... peekModes) throws CacheException; /** @@ -313,6 +317,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * will be returned for {@link EntryProcessor}s that return a * <code>null</code> value for a key. */ + @IgniteAsyncSupported <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args); /** @@ -353,4 +358,78 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * @return Projection for portable objects. */ public <K1, V1> IgniteCache<K1, V1> keepPortable(); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public V get(K key); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public Map<K, V> getAll(Set<? extends K> keys); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public boolean containsKey(K key); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public void put(K key, V val); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public V getAndPut(K key, V val); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public void putAll(Map<? extends K, ? extends V> map); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public boolean putIfAbsent(K key, V val); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public boolean remove(K key); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public boolean remove(K key, V oldVal); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public V getAndRemove(K key); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public boolean replace(K key, V oldVal, V newVal); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public boolean replace(K key, V val); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public V getAndReplace(K key, V val); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public void removeAll(Set<? extends K> keys); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public void removeAll(); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public void clear(); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments); + + /** {@inheritDoc} */ + @IgniteAsyncSupported + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, + EntryProcessor<K, V, T> entryProcessor, + Object... args); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3598a95/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java index 13f389d..4d61d1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java @@ -344,6 +344,12 @@ public interface CacheProjection<K, V> extends Iterable<CacheEntry<K, V>> { public boolean containsKey(K key); /** + * @param key Key. + * @return Future. + */ + public IgniteFuture<Boolean> containsKeyAsync(K key); + + /** * Returns {@code true} if this cache contains given value. * * @param val Value to check. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3598a95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 3c0ed75..474441b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -639,6 +639,20 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> containsKeyAsync(K key) { + return containsKeyAsync(key, null); + } + + /** + * @param key Key. + * @param filter Filter. + * @return Future. + */ + public IgniteFuture<Boolean> containsKeyAsync(K key, @Nullable IgnitePredicate<CacheEntry<K, V>> filter) { + return new GridFinishedFuture<>(ctx.kernalContext(), containsKey(key, filter)); + } + + /** {@inheritDoc} */ @Override public boolean containsValue(V val) { return containsValue(val, null); } @@ -3575,6 +3589,37 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } } + /** + * @param p Predicate. + * @param args Arguments. + * @throws IgniteCheckedException If failed. + */ + void globalLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws IgniteCheckedException { + ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCache(ctx.name()); + + IgniteCompute comp = ctx.kernalContext().grid().compute(nodes).withNoFailover(); + + comp.broadcast(new LoadCacheClosure<>(ctx.name(), p, args)); + } + + /** + * @param p Predicate. + * @param args Arguments. + * @throws IgniteCheckedException If failed. + */ + IgniteFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) + throws IgniteCheckedException { + ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCache(ctx.name()); + + IgniteCompute comp = ctx.kernalContext().grid().compute(nodes).withNoFailover(); + + comp = comp.enableAsync(); + + comp.broadcast(new LoadCacheClosure<>(ctx.name(), p, args)); + + return comp.future(); + } + /** {@inheritDoc} */ @Nullable @Override public CacheEntry<K, V> randomEntry() { GridCacheMapEntry<K, V> e = map.randomEntry(); @@ -5036,7 +5081,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public Integer apply(Object o) { - GridCache<Object, Object> cache = ((GridEx) ignite).cachex(cacheName); + GridCache<Object, Object> cache = ((GridEx)ignite).cachex(cacheName); return primaryOnly ? cache.primarySize() : cache.size(); } @@ -5367,4 +5412,74 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, ldr.addData(col); } } + + /** + * + */ + private static class LoadCacheClosure<K, V> implements Callable<Void>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private String cacheName; + + /** */ + private IgniteBiPredicate<K, V> p; + + /** */ + private Object[] args; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * Required by {@link Externalizable}. + */ + public LoadCacheClosure() { + // No-op. + } + + /** + * @param cacheName Cache name. + * @param p Predicate. + * @param args Arguments. + */ + private LoadCacheClosure(String cacheName, IgniteBiPredicate<K, V> p, Object[] args) { + this.cacheName = cacheName; + this.p = p; + this.args = args; + } + + /** {@inheritDoc} */ + @Override public Void call() throws Exception { + IgniteCache<K, V> cache = ignite.jcache(cacheName); + + assert cache != null : cacheName; + + cache.localLoadCache(p, args); + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(p); + + out.writeObject(args); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + p = (IgniteBiPredicate<K, V>)in.readObject(); + + args = (Object[])in.readObject(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(LoadCacheClosure.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3598a95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index 6e7ce4c..b1e564a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -613,6 +613,11 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> containsKeyAsync(K key) { + return cache.containsKeyAsync(key, entryFilter(false)); + } + + /** {@inheritDoc} */ @Override public boolean containsValue(V val) { return cache.containsValue(val, entryFilter(true)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3598a95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 44bdc3f..504fe7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -340,6 +340,18 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> containsKeyAsync(K key) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.containsKeyAsync(key); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public boolean containsValue(V val) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3598a95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index eacb5b3..ff40c5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -127,11 +127,10 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCache(ctx.name()); - - IgniteCompute comp = ctx.kernalContext().grid().compute(nodes).withNoFailover(); - - comp.broadcast(new LoadCacheClosure<>(ctx.name(), p, args)); + if (isAsync()) + curFut.set(ctx.cache().globalLoadCacheAsync(p, args)); + else + ctx.cache().globalLoadCache(p, args); } finally { gate.leave(prev); @@ -148,7 +147,10 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - delegate.<K, V>cache().loadCache(p, 0, args); + if (isAsync()) + curFut.set(delegate.<K, V>cache().loadCacheAsync(p, 0, args)); + else + delegate.<K, V>cache().loadCache(p, 0, args); } finally { gate.leave(prev); @@ -165,7 +167,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.putIfAbsent(key, val); + if (isAsync()) { + curFut.set(delegate.putIfAbsentAsync(key, val)); + + return null; + } + else + return delegate.putIfAbsent(key, val); } finally { gate.leave(prev); @@ -282,7 +290,10 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.size(); + return ctx.cache().globalSize(); + } + catch (IgniteCheckedException e) { + throw cacheException(e); } finally { gate.leave(prev); @@ -308,7 +319,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.get(key); + if (isAsync()) { + curFut.set(delegate.getAsync(key)); + + return null; + } + else + return delegate.get(key); } finally { gate.leave(prev); @@ -325,7 +342,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.getAll(keys); + if (isAsync()) { + curFut.set(delegate.getAllAsync(keys)); + + return null; + } + else + return delegate.getAll(keys); } finally { gate.leave(prev); @@ -345,7 +368,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.getAll(keys); + if (isAsync()) { + curFut.set(delegate.getAllAsync(keys)); + + return null; + } + else + return delegate.getAll(keys); } finally { gate.leave(prev); @@ -395,7 +424,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.containsKey(key); + if (isAsync()) { + curFut.set(delegate.containsKeyAsync(key)); + + return false; + } + else + return delegate.containsKey(key); } finally { gate.leave(prev); @@ -439,7 +474,10 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - delegate.putx(key, val); + if (isAsync()) + curFut.set(delegate.putxAsync(key, val)); + else + delegate.putx(key, val); } finally { gate.leave(prev); @@ -456,7 +494,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.put(key, val); + if (isAsync()) { + curFut.set(delegate.putAsync(key, val)); + + return null; + } + else + return delegate.put(key, val); } finally { gate.leave(prev); @@ -473,7 +517,10 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - delegate.putAll(map); + if (isAsync()) + curFut.set(delegate.putAllAsync(map)); + else + delegate.putAll(map); } finally { gate.leave(prev); @@ -490,7 +537,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.putxIfAbsent(key, val); + if (isAsync()) { + curFut.set(delegate.putxIfAbsentAsync(key, val)); + + return false; + } + else + return delegate.putxIfAbsent(key, val); } finally { gate.leave(prev); @@ -507,7 +560,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.removex(key); + if (isAsync()) { + curFut.set(delegate.removexAsync(key)); + + return false; + } + else + return delegate.removex(key); } finally { gate.leave(prev); @@ -524,7 +583,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.remove(key, oldVal); + if (isAsync()) { + curFut.set(delegate.removeAsync(key, oldVal)); + + return false; + } + else + return delegate.remove(key, oldVal); } finally { gate.leave(prev); @@ -541,7 +606,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.remove(key); + if (isAsync()) { + curFut.set(delegate.removeAsync(key)); + + return null; + } + else + return delegate.remove(key); } finally { gate.leave(prev); @@ -558,7 +629,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.replace(key, oldVal, newVal); + if (isAsync()) { + curFut.set(delegate.replaceAsync(key, oldVal, newVal)); + + return false; + } + else + return delegate.replace(key, oldVal, newVal); } finally { gate.leave(prev); @@ -575,7 +652,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.replacex(key, val); + if (isAsync()) { + curFut.set(delegate.replacexAsync(key, val)); + + return false; + } + else + return delegate.replacex(key, val); } finally { gate.leave(prev); @@ -592,7 +675,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.replace(key, val); + if (isAsync()) { + curFut.set(delegate.replaceAsync(key, val)); + + return null; + } + else + return delegate.replace(key, val); } finally { gate.leave(prev); @@ -609,7 +698,10 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - delegate.removeAll(keys); + if (isAsync()) + curFut.set(delegate.removeAllAsync(keys)); + else + delegate.removeAll(keys); } finally { gate.leave(prev); @@ -628,7 +720,10 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - delegate.removeAll(keys); + if (isAsync()) + curFut.set(delegate.removeAllAsync(keys)); + else + delegate.removeAll(keys); } finally { gate.leave(prev); @@ -717,7 +812,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return saveOrGet(delegate.invokeAllAsync(keys, entryProcessor, args)); + if (isAsync()) { + curFut.set(delegate.invokeAllAsync(keys, entryProcessor, args)); + + return null; + } + else + return delegate.invokeAll(keys, entryProcessor, args); } finally { gate.leave(prev); @@ -736,7 +837,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return saveOrGet(delegate.invokeAllAsync(map, args)); + if (isAsync()) { + curFut.set(delegate.invokeAllAsync(map, args)); + + return null; + } + else + return delegate.invokeAll(map, args); } finally { gate.leave(prev); @@ -976,74 +1083,4 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements @Override public String toString() { return S.toString(IgniteCacheProxy.class, this); } - - /** - * - */ - private static class LoadCacheClosure<K, V> implements Callable<Void>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private String cacheName; - - /** */ - private IgniteBiPredicate<K, V> p; - - /** */ - private Object[] args; - - /** */ - @IgniteInstanceResource - private Ignite ignite; - - /** - * Required by {@link Externalizable}. - */ - public LoadCacheClosure() { - // No-op. - } - - /** - * @param cacheName Cache name. - * @param p Predicate. - * @param args Arguments. - */ - private LoadCacheClosure(String cacheName, IgniteBiPredicate<K, V> p, Object[] args) { - this.cacheName = cacheName; - this.p = p; - this.args = args; - } - - /** {@inheritDoc} */ - @Override public Void call() throws Exception { - IgniteCache<K, V> cache = ignite.jcache(cacheName); - - assert cache != null : cacheName; - - cache.localLoadCache(p, args); - - return null; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(p); - - out.writeObject(args); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - p = (IgniteBiPredicate<K, V>)in.readObject(); - - args = (Object[])in.readObject(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(LoadCacheClosure.class, this); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3598a95/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheGlobalLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheGlobalLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheGlobalLoadTest.java index 01eb3e7..aabcc3a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheGlobalLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheGlobalLoadTest.java @@ -41,6 +41,9 @@ public class GridCacheGlobalLoadTest extends IgniteCacheAbstractTest { /** */ private static ConcurrentMap<String, Object[]> map; + /** */ + private static volatile boolean failStore; + /** {@inheritDoc} */ @Override protected int gridCount() { return 3; @@ -65,11 +68,36 @@ public class GridCacheGlobalLoadTest extends IgniteCacheAbstractTest { * @throws Exception If failed. */ public void testLoadCache() throws Exception { + loadCache(false); + } + + /** + * @throws Exception If failed. + */ + public void testLoadCacheAsync() throws Exception { + loadCache(true); + } + + /** + * @param async If {@code true} uses asynchronous method. + * @throws Exception If failed. + */ + private void loadCache(boolean async) throws Exception { IgniteCache<Integer, Integer> cache = jcache(); + IgniteCache<Integer, Integer> asyncCache = cache.enableAsync(); + + assertTrue(asyncCache.isAsync()); + map = new ConcurrentHashMap8<>(); - cache.loadCache(null, 1, 2, 3); + if (async) { + asyncCache.loadCache(null, 1, 2, 3); + + asyncCache.future().get(); + } + else + cache.loadCache(null, 1, 2, 3); assertEquals(3, map.size()); @@ -87,14 +115,28 @@ public class GridCacheGlobalLoadTest extends IgniteCacheAbstractTest { map = new ConcurrentHashMap8<>(); - cache.loadCache(new IgniteBiPredicate<Integer, Integer>() { - @Override public boolean apply(Integer key, Integer val) { - assertNotNull(key); - assertNotNull(val); + if (async) { + asyncCache.loadCache(new IgniteBiPredicate<Integer, Integer>() { + @Override public boolean apply(Integer key, Integer val) { + assertNotNull(key); + assertNotNull(val); - return key % 2 == 0; - } - }, 1, 2, 3, 4, 5, 6); + return key % 2 == 0; + } + }, 1, 2, 3, 4, 5, 6); + + asyncCache.future().get(); + } + else { + cache.loadCache(new IgniteBiPredicate<Integer, Integer>() { + @Override public boolean apply(Integer key, Integer val) { + assertNotNull(key); + assertNotNull(val); + + return key % 2 == 0; + } + }, 1, 2, 3, 4, 5, 6); + } assertEquals(3, map.size()); @@ -115,10 +157,24 @@ public class GridCacheGlobalLoadTest extends IgniteCacheAbstractTest { } /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + failStore = true; + } + + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); map = null; + + failStore = false; + + IgniteCache<Integer, Integer> cache = jcache(); + + for (int i = 0; i < 7; i++) + cache.remove(i); } /** {@inheritDoc} */ @@ -153,7 +209,8 @@ public class GridCacheGlobalLoadTest extends IgniteCacheAbstractTest { /** {@inheritDoc} */ @Override public Integer load(Integer key) { - assertEquals((Integer)5, key); + if (failStore) + assertEquals((Integer)5, key); return null; } @@ -165,7 +222,8 @@ public class GridCacheGlobalLoadTest extends IgniteCacheAbstractTest { /** {@inheritDoc} */ @Override public void delete(Object key) { - fail(); + if (failStore) + fail(); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3598a95/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedLoadCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedLoadCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedLoadCacheSelfTest.java index bf5271b..79ab0b8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedLoadCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedLoadCacheSelfTest.java @@ -79,12 +79,35 @@ public class GridCachePartitionedLoadCacheSelfTest extends GridCommonAbstractTes * @throws Exception If failed. */ public void testLocalLoadCache() throws Exception { + loadCache(false); + } + + /** + * @throws Exception If failed. + */ + public void testLocalLoadCacheAsync() throws Exception { + loadCache(true); + } + + /** + * @param async If {@code true} uses asynchronous load. + * @throws Exception If failed. + */ + private void loadCache(boolean async) throws Exception { try { startGridsMultiThreaded(GRID_CNT); IgniteCache<Integer, String> cache = jcache(0); - cache.localLoadCache(null, PUT_CNT); + if (async) { + IgniteCache<Integer, String> asyncCache = cache.enableAsync(); + + asyncCache.localLoadCache(null, PUT_CNT); + + asyncCache.future().get(); + } + else + cache.localLoadCache(null, PUT_CNT); GridCache<Integer, String> cache0 = cache(0);