# ignite-44
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bcb30d10 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bcb30d10 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bcb30d10 Branch: refs/heads/ignite-54 Commit: bcb30d10471e2764b8b6e2928cdb50f04971b618 Parents: 928aa3d Author: sboikov <sboi...@gridgain.com> Authored: Thu Dec 25 10:33:52 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Dec 25 17:29:09 2014 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 176 +++++++++++++ .../cache/CachePartialUpdateException.java | 36 +++ .../processors/cache/IgniteCacheProxy.java | 255 ++++++++++++++++--- .../grid/cache/GridCacheInterceptor.java | 8 +- .../cache/GridCachePartialUpdateException.java | 1 + .../processors/cache/GridCacheAdapter.java | 83 +++++- .../processors/cache/GridCacheEntryEx.java | 6 +- .../processors/cache/GridCacheMapEntry.java | 38 ++- .../processors/cache/GridCacheProcessor.java | 16 +- .../processors/cache/GridCacheProjectionEx.java | 26 +- .../cache/GridCacheProjectionImpl.java | 21 +- .../processors/cache/GridCacheProxyImpl.java | 36 ++- .../processors/cache/GridCacheStoreManager.java | 7 +- .../dht/atomic/GridDhtAtomicCache.java | 56 ++-- .../distributed/near/GridNearAtomicCache.java | 28 ++ .../local/atomic/GridLocalAtomicCache.java | 229 +++++++++++++++-- .../cache/transactions/IgniteTxAdapter.java | 2 +- .../transactions/IgniteTxLocalAdapter.java | 49 +++- .../cache/transactions/IgniteTxLocalEx.java | 2 +- .../processors/ggfs/GridGgfsDataManager.java | 26 +- .../processors/ggfs/GridGgfsMetaManager.java | 72 ++++-- .../cache/IgniteCacheAtomicInvokeTest.java | 47 ++++ .../cache/IgniteCacheAtomicLocalInvokeTest.java | 41 +++ ...niteCacheAtomicLocalWithStoreInvokeTest.java | 22 ++ .../IgniteCacheAtomicNearEnabledInvokeTest.java | 24 ++ .../cache/IgniteCacheInvokeAbstractTest.java | 21 +- .../cache/IgniteCacheTxLocalInvokeTest.java | 41 +++ .../IgniteCacheTxNearEnabledInvokeTest.java | 24 ++ .../IgniteCacheExpiryPolicyAbstractTest.java | 7 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 171 ++++++------- ...idCacheGetAndTransformStoreAbstractTest.java | 21 +- .../cache/GridCacheIncrementTransformTest.java | 19 +- .../GridCacheInterceptorAbstractSelfTest.java | 66 +++-- ...ridCacheMultinodeUpdateAbstractSelfTest.java | 28 +- ...HeapMultiThreadedUpdateAbstractSelfTest.java | 21 +- ...CacheOffHeapMultiThreadedUpdateSelfTest.java | 9 +- .../GridCacheOffHeapTieredAbstractSelfTest.java | 45 ++-- ...heOffHeapTieredEvictionAbstractSelfTest.java | 32 ++- .../GridCacheReturnValueTransferSelfTest.java | 33 ++- .../processors/cache/GridCacheTestEntryEx.java | 3 +- .../IgniteTxExceptionAbstractSelfTest.java | 16 +- .../IgniteTxStoreExceptionAbstractSelfTest.java | 25 +- .../GridCacheTransformEventSelfTest.java | 68 +++-- .../IgniteTxPreloadAbstractTest.java | 44 +++- ...heAbstractTransformWriteThroughSelfTest.java | 37 ++- .../dht/GridCacheAtomicNearCacheSelfTest.java | 72 +++--- ...GridCacheValueConsistencyAtomicSelfTest.java | 23 +- .../testframework/junits/GridAbstractTest.java | 3 +- .../junits/common/GridCommonAbstractTest.java | 50 +++- .../bamboo/GridDataGridTestSuite.java | 15 +- .../hadoop/jobtracker/GridHadoopJobTracker.java | 124 +++++---- 51 files changed, 1801 insertions(+), 524 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index a59573e..f51c237 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -13,6 +13,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; +import org.gridgain.grid.cache.*; import org.jetbrains.annotations.*; import javax.cache.*; @@ -298,4 +299,179 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * @return Cache size on this node. */ public int localSize(CachePeekMode... peekModes); + + /** + * Stores given key-value pair in cache. If filters are provided, then entries will + * be stored in cache only if they pass the filter. Note that filter check is atomic, + * so value stored in cache is guaranteed to be consistent with the filters. If cache + * previously contained value for the given key, then this value is returned. + * In case of {@link GridCacheMode#PARTITIONED} or {@link GridCacheMode#REPLICATED} caches, + * the value will be loaded from the primary node, which in its turn may load the value + * from the swap storage, and consecutively, if it's not in swap, + * from the underlying persistent storage. If value has to be loaded from persistent + * storage, {@link org.gridgain.grid.cache.store.GridCacheStore#load(IgniteTx, Object)} method will be used. + * <p> + * If the returned value is not needed, method {@link #putIf(Object, Object, IgnitePredicate)} should + * always be used instead of this one to avoid the overhead associated with returning of the previous value. + * <p> + * If write-through is enabled, the stored value will be persisted to {@link org.gridgain.grid.cache.store.GridCacheStore} + * via {@link org.gridgain.grid.cache.store.GridCacheStore#put(IgniteTx, Object, Object)} method. + * <h2 class="header">Transactions</h2> + * This method is transactional and will enlist the entry into ongoing transaction + * if there is one. + * <h2 class="header">Cache Flags</h2> + * This method is not available if any of the following flags are set on projection: + * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}. + * + * @param key Key to store in cache. + * @param val Value to be associated with the given key. + * @param filter Optional filter to check prior to putting value in cache. Note + * that filter check is atomic with put operation. + * @return Previous value associated with specified key, or {@code null} + * if entry did not pass the filter, or if there was no mapping for the key in swap + * or in persistent storage. + * @throws NullPointerException If either key or value are {@code null}. + * @throws GridCacheFlagException If projection flags validation failed. + */ + // TODO IGNITE-1 fix entry type. + @Nullable public V getAndPutIf(K key, V val, @Nullable IgnitePredicate<GridCacheEntry<K, V>> filter); + + /** + * Stores given key-value pair in cache. If filters are provided, then entries will + * be stored in cache only if they pass the filter. Note that filter check is atomic, + * so value stored in cache is guaranteed to be consistent with the filters. + * <p> + * This method will return {@code true} if value is stored in cache and {@code false} otherwise. + * Unlike {@link #getAndPutIf(Object, Object, IgnitePredicate)} method, it does not return previous + * value and, therefore, does not have any overhead associated with returning a value. It + * should be used whenever return value is not required. + * <p> + * If write-through is enabled, the stored value will be persisted to {@link org.gridgain.grid.cache.store.GridCacheStore} + * via {@link org.gridgain.grid.cache.store.GridCacheStore#put(IgniteTx, Object, Object)} method. + * <h2 class="header">Transactions</h2> + * This method is transactional and will enlist the entry into ongoing transaction + * if there is one. + * <h2 class="header">Cache Flags</h2> + * This method is not available if any of the following flags are set on projection: + * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}. + * + * @param key Key to store in cache. + * @param val Value to be associated with the given key. + * @param filter Optional filter to check prior to putting value in cache. Note + * that filter check is atomic with put operation. + * @return {@code True} if optional filter passed and value was stored in cache, + * {@code false} otherwise. Note that this method will return {@code true} if filter is not + * specified. + * @throws NullPointerException If either key or value are {@code null}. + * @throws GridCacheFlagException If projection flags validation failed. + */ + // TODO IGNITE-1 fix entry type. + public boolean putIf(K key, V val, IgnitePredicate<GridCacheEntry<K, V>> filter); + + /** + * Removes given key mapping from cache. If cache previously contained value for the given key, + * then this value is returned. In case of {@link GridCacheMode#PARTITIONED} or {@link GridCacheMode#REPLICATED} + * caches, the value will be loaded from the primary node, which in its turn may load the value + * from the disk-based swap storage, and consecutively, if it's not in swap, + * from the underlying persistent storage. If value has to be loaded from persistent + * storage, {@link org.gridgain.grid.cache.store.GridCacheStore#load(IgniteTx, Object)} method will be used. + * <p> + * If the returned value is not needed, method {@link #removeIf(Object, IgnitePredicate)} should + * always be used instead of this one to avoid the overhead associated with returning of the + * previous value. + * <p> + * If write-through is enabled, the value will be removed from {@link org.gridgain.grid.cache.store.GridCacheStore} + * via {@link org.gridgain.grid.cache.store.GridCacheStore#remove(IgniteTx, Object)} method. + * <h2 class="header">Transactions</h2> + * This method is transactional and will enlist the entry into ongoing transaction + * if there is one. + * <h2 class="header">Cache Flags</h2> + * This method is not available if any of the following flags are set on projection: + * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}. + * + * @param key Key whose mapping is to be removed from cache. + * @param filter Optional filter to check prior to removing value form cache. Note + * that filter is checked atomically together with remove operation. + * @return Previous value associated with specified key, or {@code null} + * if there was no value for this key. + * @throws NullPointerException If key is {@code null}. + * @throws GridCacheFlagException If projection flags validation failed. + */ + // TODO IGNITE-1 fix entry type. + public V getAndRemoveIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter); + + /** + * Removes given key mapping from cache. + * <p> + * This method will return {@code true} if remove did occur, which means that all optionally + * provided filters have passed and there was something to remove, {@code false} otherwise. + * <p> + * If write-through is enabled, the value will be removed from {@link org.gridgain.grid.cache.store.GridCacheStore} + * via {@link org.gridgain.grid.cache.store.GridCacheStore#remove(IgniteTx, Object)} method. + * <h2 class="header">Transactions</h2> + * This method is transactional and will enlist the entry into ongoing transaction + * if there is one. + * <h2 class="header">Cache Flags</h2> + * This method is not available if any of the following flags are set on projection: + * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}. + * + * @param key Key whose mapping is to be removed from cache. + * @param filter Optional filter to check prior to removing value form cache. Note + * that filter is checked atomically together with remove operation. + * @return {@code True} if filter passed validation and entry was removed, {@code false} otherwise. + * Note that if filter is not specified, this method will return {@code true}. + * @throws NullPointerException if the key is {@code null}. + * @throws GridCacheFlagException If projection flags validation failed. + */ + // TODO IGNITE-1 fix entry type. + public boolean removeIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter); + + /** + * Creates projection that will operate with portable objects. + * <p> + * Projection returned by this method will force cache not to deserialize portable objects, + * so keys and values will be returned from cache API methods without changes. Therefore, + * signature of the projection can contain only following types: + * <ul> + * <li>{@link org.apache.ignite.portables.PortableObject} for portable classes</li> + * <li>All primitives (byte, int, ...) and there boxed versions (Byte, Integer, ...)</li> + * <li>Arrays of primitives (byte[], int[], ...)</li> + * <li>{@link String} and array of {@link String}s</li> + * <li>{@link UUID} and array of {@link UUID}s</li> + * <li>{@link Date} and array of {@link Date}s</li> + * <li>{@link java.sql.Timestamp} and array of {@link java.sql.Timestamp}s</li> + * <li>Enums and array of enums</li> + * <li> + * Maps, collections and array of objects (but objects inside + * them will still be converted if they are portable) + * </li> + * </ul> + * <p> + * For example, if you use {@link Integer} as a key and {@code Value} class as a value + * (which will be stored in portable format), you should acquire following projection + * to avoid deserialization: + * <pre> + * GridCacheProjection<Integer, GridPortableObject> prj = cache.keepPortable(); + * + * // Value is not deserialized and returned in portable format. + * GridPortableObject po = prj.get(1); + * </pre> + * <p> + * Note that this method makes sense only if cache is working in portable mode + * ({@link org.gridgain.grid.cache.GridCacheConfiguration#isPortableEnabled()} returns {@code true}. If not, + * this method is no-op and will return current projection. + * + * @return Projection for portable objects. + */ + public <K1, V1> IgniteCache<K1, V1> keepPortable(); + + /** + * Gets cache projection base on this one, but with the specified flags turned on. + * <h1 class="header">Cache Flags</h1> + * The resulting projection will inherit all the flags from this projection. + * + * @param flags Flags to turn on (if empty, then no-op). + * @return New projection based on this one, but with the specified flags turned on. + */ + public IgniteCache<K, V> flagsOn(@Nullable GridCacheFlag... flags); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/apache/ignite/cache/CachePartialUpdateException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CachePartialUpdateException.java b/modules/core/src/main/java/org/apache/ignite/cache/CachePartialUpdateException.java new file mode 100644 index 0000000..08ce72e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/CachePartialUpdateException.java @@ -0,0 +1,36 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.cache; + +import org.gridgain.grid.cache.*; + +import javax.cache.*; +import java.util.*; + +/** + * Exception thrown from non-transactional cache in case when update succeeded only partially. + * One can get list of keys for which update failed with method {@link #failedKeys()}. + */ +public class CachePartialUpdateException extends CacheException { + /** + * @param e Cause. + */ + public CachePartialUpdateException(GridCachePartialUpdateException e) { + super(e.getMessage(), e); + } + + /** + * Gets collection of failed keys. + * @return Collection of failed keys. + */ + public <K> Collection<K> failedKeys() { + return ((GridCachePartialUpdateException)getCause()).failedKeys(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 410fb9a..f7c157f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -13,8 +13,10 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.lang.*; +import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -30,7 +32,7 @@ import java.util.concurrent.locks.*; /** * Cache proxy. */ -public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable { +public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements IgniteCache<K, V>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -51,10 +53,14 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable * @param ctx Context. * @param delegate Delegate. * @param prj Projection. + * @param async Async support flag. */ public IgniteCacheProxy(GridCacheContext<K, V> ctx, GridCacheProjectionEx<K, V> delegate, - @Nullable GridCacheProjectionImpl<K, V> prj) { + @Nullable GridCacheProjectionImpl<K, V> prj, + boolean async) { + super(async); + assert ctx != null; assert delegate != null; @@ -84,7 +90,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable try { GridCacheProjectionEx<K, V> prj0 = prj != null ? prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc); - return new IgniteCacheProxy<>(ctx, prj0, (GridCacheProjectionImpl<K, V>)prj0); + return new IgniteCacheProxy<>(ctx, prj0, (GridCacheProjectionImpl<K, V>)prj0, isAsync()); } finally { gate.leave(prev); @@ -98,12 +104,81 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable } /** {@inheritDoc} */ - @Override public void localLoadCache(@Nullable IgniteBiPredicate p, @Nullable Object... args) throws CacheException { + @Override public void localLoadCache(@Nullable IgniteBiPredicate p, @Nullable Object... args) + throws CacheException { // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ + @Nullable @Override public V getAndPutIf(K key, V val, IgnitePredicate<GridCacheEntry<K, V>> filter) { + try { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.put(key, val, filter); + } + finally { + gate.leave(prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean putIf(K key, V val, IgnitePredicate<GridCacheEntry<K, V>> filter) { + try { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.putx(key, val, filter); + } + finally { + gate.leave(prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public V getAndRemoveIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter) { + try { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.remove(key, filter); + } + finally { + gate.leave(prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean removeIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter) { + try { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.removex(key, filter); + } + finally { + gate.leave(prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException { try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -116,7 +191,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable } } catch (IgniteCheckedException e) { - throw new CacheException(e); + throw cacheException(e); } } @@ -215,7 +290,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable } } catch (IgniteCheckedException e) { - throw new CacheException(e); + throw cacheException(e); } } @@ -257,7 +332,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable } } catch (IgniteCheckedException e) { - throw new CacheException(e); + throw cacheException(e); } } @@ -274,7 +349,44 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable } } catch (IgniteCheckedException e) { - throw new CacheException(e); + throw cacheException(e); + } + } + + /** + * @param keys Keys. + * @return Values map. + */ + public Map<K, V> getAll(Collection<? extends K> keys) { + try { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.getAll(keys); + } + finally { + gate.leave(prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** + * Gets entry set containing internal entries. + * + * @param filter Filter. + * @return Entry set. + */ + public Set<GridCacheEntry<K, V>> entrySetx(IgnitePredicate<GridCacheEntry<K, V>> filter) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.entrySetx(filter); + } + finally { + gate.leave(prev); } } @@ -305,7 +417,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable } } catch (IgniteCheckedException e) { - throw new CacheException(e); + throw cacheException(e); } } @@ -322,7 +434,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable } } catch (IgniteCheckedException e) { - throw new CacheException(e); + throw cacheException(e); } } @@ -339,7 +451,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable } } catch (IgniteCheckedException e) { - throw new CacheException(e); + throw cacheException(e); } } @@ -356,7 +468,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable } } catch (IgniteCheckedException e) { - throw new CacheException(e); + throw cacheException(e); } } @@ -373,7 +485,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable } } catch (IgniteCheckedException e) { - throw new CacheException(e); + throw cacheException(e); } } @@ -390,7 +502,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable } } catch (IgniteCheckedException e) { - throw new CacheException(e); + throw cacheException(e); } } @@ -407,7 +519,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable } } catch (IgniteCheckedException e) { - throw new CacheException(e); + throw cacheException(e); } } @@ -424,7 +536,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable } } catch (IgniteCheckedException e) { - throw new CacheException(e); + throw cacheException(e); } } @@ -441,7 +553,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable } } catch (IgniteCheckedException e) { - throw new CacheException(e); + throw cacheException(e); } } @@ -458,7 +570,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable } } catch (IgniteCheckedException e) { - throw new CacheException(e); + throw cacheException(e); } } @@ -475,7 +587,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable } } catch (IgniteCheckedException e) { - throw new CacheException(e); + throw cacheException(e); } } @@ -498,16 +610,34 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args).get(); + if (isAsync()) { + IgniteFuture<EntryProcessorResult<T>> fut = delegate.invokeAsync(key, entryProcessor, args); + + IgniteFuture<T> fut0 = fut.chain(new CX1<IgniteFuture<EntryProcessorResult<T>>, T>() { + @Override public T applyx(IgniteFuture<EntryProcessorResult<T>> fut) + throws IgniteCheckedException { + EntryProcessorResult<T> res = fut.get(); + + return res.get(); + } + }); + + curFut.set(fut0); - return res.get(); + return null; + } + else { + EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args); + + return res.get(); + } } finally { gate.leave(prev); } } catch (IgniteCheckedException e) { - throw new CacheException(e); + throw cacheException(e); } } @@ -519,14 +649,14 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.invokeAll(keys, entryProcessor, args).get(); + return saveOrGet(delegate.invokeAllAsync(keys, entryProcessor, args)); } finally { gate.leave(prev); } } catch (IgniteCheckedException e) { - throw new CacheException(e); + throw cacheException(e); } } @@ -618,20 +748,81 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public IgniteCache<K, V> enableAsync() { - // TODO IGNITE-1. - throw new UnsupportedOperationException(); + if (isAsync()) + return this; + + return new IgniteCacheProxy<>(ctx, delegate, prj, true); + } + + /** + * @param e Checked exception. + * @return Cache exception. + */ + private CacheException cacheException(IgniteCheckedException e) { + if (e instanceof GridCachePartialUpdateException) + return new CachePartialUpdateException((GridCachePartialUpdateException)e); + + return new CacheException(e); } /** {@inheritDoc} */ - @Override public boolean isAsync() { - // TODO IGNITE-1. - throw new UnsupportedOperationException(); + @SuppressWarnings("unchecked") + @Override public <K1, V1> IgniteCache<K1, V1> keepPortable() { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + GridCacheProjectionImpl<K1, V1> prj0 = new GridCacheProjectionImpl<>( + (GridCacheProjection<K1, V1>)(prj != null ? prj : delegate), + (GridCacheContext<K1, V1>)ctx, + null, + null, + prj != null ? prj.flags() : null, + prj != null ? prj.subjectId() : null, + true, + prj != null ? prj.expiry() : null); + + return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)ctx, + prj0, + prj0, + isAsync()); + } + finally { + gate.leave(prev); + } } /** {@inheritDoc} */ - @Override public <R> IgniteFuture<R> future() { - // TODO IGNITE-1. - throw new UnsupportedOperationException(); + @Override public IgniteCache<K, V> flagsOn(@Nullable GridCacheFlag... flags) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + Set<GridCacheFlag> res = EnumSet.noneOf(GridCacheFlag.class); + + Set<GridCacheFlag> flags0 = prj !=null ? prj.flags() : null; + + if (flags0 != null && !flags0.isEmpty()) + res.addAll(flags0); + + res.addAll(EnumSet.copyOf(F.asList(flags))); + + GridCacheProjectionImpl<K, V> prj0 = new GridCacheProjectionImpl<>( + (prj != null ? prj : delegate), + ctx, + null, + null, + res, + prj != null ? prj.subjectId() : null, + true, + prj != null ? prj.expiry() : null); + + return new IgniteCacheProxy<>(ctx, + prj0, + prj0, + isAsync()); + } + finally { + gate.leave(prev); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheInterceptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheInterceptor.java b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheInterceptor.java index cb0192c..b1030bd 100644 --- a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheInterceptor.java +++ b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheInterceptor.java @@ -42,7 +42,7 @@ public interface GridCacheInterceptor<K, V> { @Nullable public V onGet(K key, @Nullable V val); /** - * This method is called within {@link GridCacheProjection#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} + * This method is called within {@link GridCacheProjection#put(Object, Object, IgnitePredicate[])} * and similar operations before new value is stored in cache. * <p> * Implementations should not execute any complex logic, @@ -56,7 +56,7 @@ public interface GridCacheInterceptor<K, V> { * @param oldVal Old value. * @param newVal New value. * @return Value to be put to cache. Returning {@code null} cancels the update. - * @see GridCacheProjection#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[]) + * @see GridCacheProjection#put(Object, Object, IgnitePredicate[]) */ @Nullable public V onBeforePut(K key, @Nullable V oldVal, V newVal); @@ -76,7 +76,7 @@ public interface GridCacheInterceptor<K, V> { public void onAfterPut(K key, V val); /** - * This method is called within {@link GridCacheProjection#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} + * This method is called within {@link GridCacheProjection#remove(Object, IgnitePredicate[])} * and similar operations to provide control over returned value. * <p> * Implementations should not execute any complex logic, @@ -91,7 +91,7 @@ public interface GridCacheInterceptor<K, V> { * @return Tuple. The first value is the flag whether remove should be cancelled or not. * The second is the value to be returned as result of {@code remove()} operation, * may be {@code null}. - * @see GridCacheProjection#remove(Object, org.apache.ignite.lang.IgnitePredicate[]) + * @see GridCacheProjection#remove(Object, IgnitePredicate[]) */ @Nullable public IgniteBiTuple<Boolean, V> onBeforeRemove(K key, @Nullable V val); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/cache/GridCachePartialUpdateException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/GridCachePartialUpdateException.java b/modules/core/src/main/java/org/gridgain/grid/cache/GridCachePartialUpdateException.java index dd41e55..015a5a7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/cache/GridCachePartialUpdateException.java +++ b/modules/core/src/main/java/org/gridgain/grid/cache/GridCachePartialUpdateException.java @@ -49,6 +49,7 @@ public class GridCachePartialUpdateException extends IgniteCheckedException { addSuppressed(err); } + /** {@inheritDoc} */ @Override public String getMessage() { return super.getMessage() + ": " + failedKeys; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index 62daeb9..d4028be 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -1420,12 +1420,18 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im * @param reload Reload flag. * @param tx Transaction. * @param filter Filter. + * @param subjId Subject ID. + * @param taskName Task name. * @param vis Visitor. * @return Future. */ - public IgniteFuture<Object> readThroughAllAsync(final Collection<? extends K> keys, boolean reload, - @Nullable final IgniteTxEx<K, V> tx, IgnitePredicate<GridCacheEntry<K, V>>[] filter, @Nullable UUID subjId, - String taskName, final IgniteBiInClosure<K, V> vis) { + public IgniteFuture<Object> readThroughAllAsync(final Collection<? extends K> keys, + boolean reload, + @Nullable final IgniteTxEx<K, V> tx, + IgnitePredicate<GridCacheEntry<K, V>>[] filter, + @Nullable UUID subjId, + String taskName, + final IgniteBiInClosure<K, V> vis) { return ctx.closures().callLocalSafe(new GPC<Object>() { @Nullable @Override public Object call() { try { @@ -2194,7 +2200,66 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im } /** {@inheritDoc} */ - @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke( + @Override public <T> EntryProcessorResult<T> invoke(final K key, + final EntryProcessor<K, V, T> entryProcessor, + final Object... args) + throws IgniteCheckedException { + A.notNull(key, "key", entryProcessor, "entryProcessor"); + + if (keyCheck) + validateCacheKey(key); + + ctx.denyOnLocalRead(); + + return syncOp(new SyncOp<EntryProcessorResult<T>>(true) { + @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter<K, V> tx) + throws IgniteCheckedException { + Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = + Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor); + + IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut = + tx.invokeAsync(ctx, false, invokeMap, args); + + Map<K, EntryProcessorResult<T>> resMap = fut.get().value(); + + assert resMap != null; + assert resMap.size() == 1 : resMap.size(); + + return resMap.values().iterator().next(); + } + }); + } + + /** {@inheritDoc} */ + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(final Set<? extends K> keys, + final EntryProcessor<K, V, T> entryProcessor, + final Object... args) throws IgniteCheckedException { + A.notNull(keys, "keys", entryProcessor, "entryProcessor"); + + if (keyCheck) + validateCacheKeys(keys); + + ctx.denyOnLocalRead(); + + return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) { + @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx) + throws IgniteCheckedException { + Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { + @Override public EntryProcessor apply(K k) { + return entryProcessor; + } + }); + + IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut = + tx.invokeAsync(ctx, false, invokeMap, args); + + return fut.get().value(); + } + }); + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync( final K key, final EntryProcessor<K, V, T> entryProcessor, final Object... args) @@ -2208,8 +2273,8 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im IgniteFuture<?> fut = asyncOp(new AsyncInOp(key) { @Override public IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> inOp(IgniteTxLocalAdapter<K, V> tx) { - Map<? extends K, EntryProcessor> invokeMap = - Collections.singletonMap(key, (EntryProcessor)entryProcessor); + Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = + Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor); return tx.invokeAsync(ctx, false, invokeMap, args); } @@ -2238,11 +2303,11 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im } /** {@inheritDoc} */ - @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll( + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( final Set<? extends K> keys, final EntryProcessor<K, V, T> entryProcessor, final Object... args) { - A.notNull(entryProcessor, "entryProcessor"); + A.notNull(keys, "keys", entryProcessor, "entryProcessor"); if (keyCheck) validateCacheKeys(keys); @@ -2251,7 +2316,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im IgniteFuture<?> fut = asyncOp(new AsyncInOp(keys) { @Override public IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> inOp(IgniteTxLocalAdapter<K, V> tx) { - Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() { + Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { @Override public EntryProcessor apply(K k) { return entryProcessor; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java index 1b71eec..101427f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java @@ -450,6 +450,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware { * @param ver Cache version. * @param op Operation. * @param writeObj Value. Type depends on operation. + * @param invokeArgs Optional arguments for EntryProcessor. * @param writeThrough Write through flag. * @param retval Return value flag. * @param expiryPlc Expiry policy.. @@ -459,14 +460,15 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware { * @param intercept If {@code true} then calls cache interceptor. * @param subjId Subject ID initiated this update. * @param taskName Task name. - * @return Tuple containing success flag and old value. + * @return Tuple containing success flag and operation result. * @throws IgniteCheckedException If update failed. * @throws GridCacheEntryRemovedException If entry is obsolete. */ - public IgniteBiTuple<Boolean, V> innerUpdateLocal( + public IgniteBiTuple<Boolean, Object> innerUpdateLocal( GridCacheVersion ver, GridCacheOperation op, @Nullable Object writeObj, + @Nullable Object[] invokeArgs, boolean writeThrough, boolean retval, @Nullable ExpiryPolicy expiryPlc, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java index f14bba5..1b9ea20 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java @@ -1365,10 +1365,11 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteBiTuple<Boolean, V> innerUpdateLocal( + @Override public IgniteBiTuple<Boolean, Object> innerUpdateLocal( GridCacheVersion ver, GridCacheOperation op, @Nullable Object writeObj, + @Nullable Object[] invokeArgs, boolean writeThrough, boolean retval, @Nullable ExpiryPolicy expiryPlc, @@ -1381,7 +1382,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> ) throws IgniteCheckedException, GridCacheEntryRemovedException { assert cctx.isLocal() && cctx.atomic(); - V old; + Object opRes; boolean res = true; @@ -1397,7 +1398,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> unswap(true, retval); // Possibly get old value form store. - old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val; + V old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val; + + opRes = old; GridCacheValueBytes oldBytes = valueBytesUnlocked(); @@ -1428,7 +1431,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> updateTtl(ttl); } - return new IgniteBiTuple<>(false, retval ? old : null); + return new IgniteBiTuple<>(false, (Object)(retval ? old : null)); } } @@ -1444,11 +1447,24 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (op == GridCacheOperation.TRANSFORM) { transformCloClsName = writeObj.getClass().getName(); - IgniteClosure<V, V> transform = (IgniteClosure<V, V>)writeObj; + EntryProcessor<K, V, ?> entryProcessor = (EntryProcessor<K, V, ?>)writeObj; - assert transform != null; + assert entryProcessor != null; - updated = cctx.unwrapTemporary(transform.apply(old)); + CacheInvokeEntry<K, V> entry = new CacheInvokeEntry<>(key, old); + + try { + Object computed = entryProcessor.process(entry, invokeArgs); + + updated = cctx.unwrapTemporary(entry.getValue()); + + opRes = new CacheInvokeResult<>(cctx.unwrapTemporary(computed)); + } + catch (Exception e) { + updated = old; + + opRes = new CacheInvokeResult<>(e); + } } else updated = (V)writeObj; @@ -1460,13 +1476,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> updated = (V)cctx.config().getInterceptor().onBeforePut(key, old, updated); if (updated == null) - return new IgniteBiTuple<>(false, cctx.<V>unwrapTemporary(old)); + return new IgniteBiTuple<>(false, (Object)cctx.<V>unwrapTemporary(old)); } else { interceptorRes = cctx.config().getInterceptor().onBeforeRemove(key, old); if (cctx.cancelRemove(interceptorRes)) - return new IgniteBiTuple<>(false, cctx.<V>unwrapTemporary(interceptorRes.get2())); + return new IgniteBiTuple<>(false, + (Object)cctx.<V>unwrapTemporary(interceptorRes.get2())); } } @@ -1576,7 +1593,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } } - return new IgniteBiTuple<>(res, cctx.<V>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : old)); + return new IgniteBiTuple<>(res, + (Object)(cctx.<V>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : opRes))); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java index 0724a58..70798ff 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java @@ -1599,7 +1599,21 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cache == null) throw new IllegalArgumentException("Cache is not configured: " + name); - return new IgniteCacheProxy<>(cache.context(), cache, null); + return new IgniteCacheProxy<>(cache.context(), cache, null, false); + } + + /** + * @param name Cache name. + * @return Cache instance for given name. + */ + @SuppressWarnings("unchecked") + public <K, V> IgniteCacheProxy<K, V> jcache(@Nullable String name) { + GridCacheAdapter<K, V> cache = (GridCacheAdapter<K, V>)caches.get(name); + + if (cache == null) + throw new IllegalArgumentException("Cache is not configured: " + name); + + return new IgniteCacheProxy<>(cache.context(), cache, null, false); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java index 1a98192..8df7d10 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java @@ -399,9 +399,31 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> { * @param key Key. * @param entryProcessor Entry processor. * @param args Arguments. + * @return Invoke result. + * @throws IgniteCheckedException If failed. + */ + public <T> EntryProcessorResult<T> invoke(K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) throws IgniteCheckedException; + + /** + * @param keys Keys. + * @param entryProcessor Entry processor. + * @param args Arguments. + * @return Invoke results. + * @throws IgniteCheckedException If failed. + */ + public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, + EntryProcessor<K, V, T> entryProcessor, + Object... args) throws IgniteCheckedException; + + /** + * @param key Key. + * @param entryProcessor Entry processor. + * @param args Arguments. * @return Future. */ - public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key, + public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key, EntryProcessor<K, V, T> entryProcessor, Object... args); @@ -411,7 +433,7 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> { * @param args Arguments. * @return Future. */ - public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys, + public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java index ad5cde3..62b6b72 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java @@ -789,17 +789,30 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key, + @Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) + throws IgniteCheckedException { + return cache.invoke(key, entryProcessor, args); + } + + /** {@inheritDoc} */ + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, + EntryProcessor<K, V, T> entryProcessor, + Object... args) throws IgniteCheckedException { + return cache.invokeAll(keys, entryProcessor, args); + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) { - return cache.invoke(key, entryProcessor, args); + return cache.invokeAsync(key, entryProcessor, args); } /** {@inheritDoc} */ - @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys, + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args) { - return cache.invokeAll(keys, entryProcessor, args); + return cache.invokeAllAsync(keys, entryProcessor, args); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java index 90aeb0b..66f8626 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java @@ -738,9 +738,9 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key, + @Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, - Object... args) { + Object... args) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -752,9 +752,9 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys, + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, - Object... args) { + Object... args) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -766,6 +766,34 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ + @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.invokeAsync(key, entryProcessor, args); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, + EntryProcessor<K, V, T> entryProcessor, + Object... args) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.invokeAllAsync(keys, entryProcessor, args); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> putxAsync(K key, V val, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java index a7e47b0..b6fe4be 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java @@ -180,7 +180,8 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @throws IgniteCheckedException If data loading failed. */ @SuppressWarnings({"unchecked"}) - public boolean loadAllFromStore(@Nullable IgniteTx tx, Collection<? extends K> keys, + public boolean loadAllFromStore(@Nullable IgniteTx tx, + Collection<? extends K> keys, final IgniteBiInClosure<K, V> vis) throws IgniteCheckedException { if (store != null) { if (!keys.isEmpty()) { @@ -230,6 +231,10 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { return true; } + else { + for (K key : keys) + vis.apply(key, null); + } return false; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 78d92f8..0644821 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -455,7 +455,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public void transform(K key, IgniteClosure<V, V> transformer) throws IgniteCheckedException { - transformAsync(key, transformer).get(); + //transformAsync(key, transformer).get(); + // TODO IGNITE-44. + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @@ -482,8 +484,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public void transformAll(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) throws IgniteCheckedException { - transformAllAsync(m).get(); + @Override public void transformAll(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) + throws IgniteCheckedException { + //transformAllAsync(m).get(); + // TODO IGNITE-44. + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @@ -632,8 +637,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ + @Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) + throws IgniteCheckedException { + return invokeAsync(key, entryProcessor, args).get(); + } + + /** {@inheritDoc} */ + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, + EntryProcessor<K, V, T> entryProcessor, + Object... args) + throws IgniteCheckedException { + return invokeAllAsync(keys, entryProcessor, args).get(); + } + + /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key, + @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) { A.notNull(key, "key", entryProcessor, "entryProcessor"); @@ -671,7 +690,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys, + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, final EntryProcessor<K, V, T> entryProcessor, Object... args) { A.notNull(keys, "keys", entryProcessor, "entryProcessor"); @@ -701,8 +720,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * Entry point for all public API put/transform methods. * - * @param map Put map. Either {@code map}, {@code transformMap} or {@code drMap} should be passed. - * @param transformMap Transform map. Either {@code map}, {@code transformMap} or {@code drMap} should be passed. + * @param map Put map. Either {@code map}, {@code invokeMap} or {@code drMap} should be passed. + * @param invokeMap Invoke map. Either {@code map}, {@code invokeMap} or {@code drMap} should be passed. * @param invokeArgs Optional arguments for EntryProcessor. * @param drPutMap DR put map. * @param drRmvMap DR remove map. @@ -714,7 +733,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ private IgniteFuture updateAllAsync0( @Nullable final Map<? extends K, ? extends V> map, - @Nullable final Map<? extends K, EntryProcessor> transformMap, + @Nullable final Map<? extends K, EntryProcessor> invokeMap, @Nullable Object[] invokeArgs, @Nullable final Map<? extends K, GridCacheDrInfo<V>> drPutMap, @Nullable final Map<? extends K, GridCacheVersion> drRmvMap, @@ -738,10 +757,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx, this, ctx.config().getWriteSynchronizationMode(), - transformMap != null ? TRANSFORM : UPDATE, - map != null ? map.keySet() : transformMap != null ? transformMap.keySet() : drPutMap != null ? + invokeMap != null ? TRANSFORM : UPDATE, + map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : drPutMap != null ? drPutMap.keySet() : drRmvMap.keySet(), - map != null ? map.values() : transformMap != null ? transformMap.values() : null, + map != null ? map.values() : invokeMap != null ? invokeMap.values() : null, invokeArgs, drPutMap != null ? drPutMap.values() : null, drRmvMap != null ? drRmvMap.values() : null, @@ -1213,12 +1232,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { int size = req.keys().size(); Map<K, V> putMap = null; + Map<K, EntryProcessor<K, V, ?>> entryProcessorMap = null; + Collection<K> rmvKeys = null; + UpdateBatchResult<K, V> updRes = new UpdateBatchResult<>(); + List<GridDhtCacheEntry<K, V>> filtered = new ArrayList<>(size); + GridCacheOperation op = req.operation(); - Map<Object, Object> invokeResMap = op == TRANSFORM ? U.newHashMap(size) : null; + + Map<K, EntryProcessorResult> invokeResMap = + op == TRANSFORM ? U.<K, EntryProcessorResult>newHashMap(size) : null; int firstEntryIdx = 0; @@ -2644,7 +2670,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private boolean readersOnly; /** */ - private Map<Object, Object> invokeRes; + private Map<K, EntryProcessorResult> invokeRes; /** * @param entry Entry. @@ -2679,14 +2705,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * @param invokeRes Result for invoke operation. */ - private void invokeResult(Map<Object, Object> invokeRes) { + private void invokeResult(Map<K, EntryProcessorResult> invokeRes) { this.invokeRes = invokeRes; } /** * @return Result for invoke operation. */ - Map<Object, Object> invokeResults() { + Map<K, EntryProcessorResult> invokeResults() { return invokeRes; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java index 07e9785..5b3055a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -555,6 +555,34 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ + @Override public <T> EntryProcessorResult<T> invoke(K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) throws IgniteCheckedException { + return dht.invoke(key, entryProcessor, args); + } + + /** {@inheritDoc} */ + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, + EntryProcessor<K, V, T> entryProcessor, + Object... args) throws IgniteCheckedException { + return dht.invokeAll(keys, entryProcessor, args); + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) throws EntryProcessorException { + return dht.invokeAsync(key, entryProcessor, args); + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, + EntryProcessor<K, V, T> entryProcessor, + Object... args) { + return dht.invokeAllAsync(keys, entryProcessor, args); + } + + /** {@inheritDoc} */ @Override public V remove(K key, @Nullable GridCacheEntryEx<K, V> entry, @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) throws IgniteCheckedException {