# ignite-44
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3eba2e1d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3eba2e1d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3eba2e1d Branch: refs/heads/ignite-54 Commit: 3eba2e1db19c173a748c8d73b087fbd0e6fe4781 Parents: 090bf07 Author: sboikov <sboi...@gridgain.com> Authored: Fri Dec 26 13:41:33 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Dec 26 16:44:03 2014 +0300 ---------------------------------------------------------------------- .../examples/datagrid/CacheApiExample.java | 62 ++--- .../datagrid/CachePopularNumbersExample.java | 17 +- .../ScalarCachePopularNumbersExample.scala | 20 +- .../java/org/apache/ignite/IgniteCache.java | 16 ++ .../dataload/IgniteDataLoadCacheUpdater.java | 3 +- .../processors/cache/IgniteCacheProxy.java | 68 +++++- .../org/gridgain/grid/cache/GridCacheEntry.java | 21 -- .../org/gridgain/grid/cache/GridCacheFlag.java | 5 +- .../grid/cache/GridCacheProjection.java | 186 --------------- .../processors/cache/CacheInvokeResult.java | 4 +- .../processors/cache/GridCacheAdapter.java | 187 +++------------ .../processors/cache/GridCacheEntryImpl.java | 10 - .../cache/GridCacheEvictionEntry.java | 10 - .../cache/GridCacheFilterEvaluationEntry.java | 10 - .../processors/cache/GridCacheMapEntry.java | 5 +- .../processors/cache/GridCacheProcessor.java | 14 +- .../processors/cache/GridCacheProjectionEx.java | 35 ++- .../cache/GridCacheProjectionImpl.java | 69 +----- .../processors/cache/GridCacheProxyImpl.java | 111 ++------- .../cache/GridCacheTransformComputeClosure.java | 68 ------ .../GridAtomicCacheQueueImpl.java | 42 ++-- .../GridCacheDataStructuresManager.java | 7 +- .../datastructures/GridCacheQueueAdapter.java | 230 +++++++++++-------- .../GridTransactionalCacheQueueImpl.java | 31 +-- .../dht/atomic/GridDhtAtomicCache.java | 103 ++++----- .../dht/atomic/GridNearAtomicUpdateFuture.java | 20 +- .../distributed/near/GridNearAtomicCache.java | 36 +-- .../local/atomic/GridLocalAtomicCache.java | 152 ++++-------- .../cache/query/GridCacheQueryManager.java | 10 - .../GridCacheContinuousQueryEntry.java | 12 - .../transactions/IgniteTxLocalAdapter.java | 12 +- .../cache/transactions/IgniteTxLocalEx.java | 4 +- .../dataload/GridDataLoadCacheUpdaters.java | 28 ++- .../dataload/GridDataLoadUpdateJob.java | 20 +- .../dr/GridDrDataLoadCacheUpdater.java | 6 +- .../cache/IgniteCacheInvokeAbstractTest.java | 76 +++++- .../cache/GridCacheAbstractFullApiSelfTest.java | 2 +- .../GridCacheOffHeapTieredAbstractSelfTest.java | 12 +- .../cache/eviction/GridCacheMockEntry.java | 11 - ...idCachePartitionedHitsAndMissesSelfTest.java | 23 +- .../cache/websession/GridWebSessionFilter.java | 25 +- .../websession/GridWebSessionListener.java | 39 ++-- 42 files changed, 696 insertions(+), 1126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/examples/src/main/java/org/gridgain/examples/datagrid/CacheApiExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/gridgain/examples/datagrid/CacheApiExample.java b/examples/src/main/java/org/gridgain/examples/datagrid/CacheApiExample.java index a30c20f..f477a89 100644 --- a/examples/src/main/java/org/gridgain/examples/datagrid/CacheApiExample.java +++ b/examples/src/main/java/org/gridgain/examples/datagrid/CacheApiExample.java @@ -14,6 +14,7 @@ import org.apache.ignite.lang.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; +import javax.cache.processor.*; import java.util.concurrent.*; /** @@ -61,55 +62,62 @@ public class CacheApiExample { System.out.println(); System.out.println(">>> Cache atomic map operation examples."); - GridCache<Integer, String> cache = Ignition.ignite().cache(CACHE_NAME); + IgniteCache<Integer, String> cache = Ignition.ignite().jcache(CACHE_NAME); // Put and return previous value. - String v = cache.put(1, "1"); + String v = cache.getAndPut(1, "1"); assert v == null; // Put and do not return previous value (all methods ending with 'x' return boolean). // Performs better when previous value is not needed. - cache.putx(2, "2"); + cache.put(2, "2"); - // Put asynchronously (every cache operation has async counterpart). - IgniteFuture<String> fut = cache.putAsync(3, "3"); - // Asynchronously wait for result. - fut.listenAsync(new IgniteInClosure<IgniteFuture<String>>() { - @Override public void apply(IgniteFuture<String> fut) { - try { - System.out.println("Put operation completed [previous-value=" + fut.get() + ']'); - } - catch (IgniteCheckedException e) { - e.printStackTrace(); - } - } - }); + // Put asynchronously (every cache operation has async counterpart). + // TODO IGNITE-60: uncomment when implemented. +// IgniteFuture<String> fut = cache.putAsync(3, "3"); +// +// // Asynchronously wait for result. +// fut.listenAsync(new IgniteInClosure<IgniteFuture<String>>() { +// @Override public void apply(IgniteFuture<String> fut) { +// try { +// System.out.println("Put operation completed [previous-value=" + fut.get() + ']'); +// } +// catch (IgniteCheckedException e) { +// e.printStackTrace(); +// } +// } +// }); // Put-if-absent. - boolean b1 = cache.putxIfAbsent(4, "4"); - boolean b2 = cache.putxIfAbsent(4, "44"); + boolean b1 = cache.putIfAbsent(4, "4"); + boolean b2 = cache.putIfAbsent(4, "44"); assert b1 && !b2; // Put-with-predicate, will succeed if predicate evaluates to true. - cache.putx(5, "5"); - cache.putx(5, "55", new IgnitePredicate<GridCacheEntry<Integer, String>>() { - @Override public boolean apply(GridCacheEntry<Integer, String> e) { + cache.put(5, "5"); + cache.putIf(5, "55", new IgnitePredicate<GridCacheEntry<Integer, String>>() { + @Override + public boolean apply(GridCacheEntry<Integer, String> e) { return "5".equals(e.peek()); // Update only if previous value is "5". } }); - // Transform - assign new value based on previous value. - cache.putx(6, "6"); - cache.transform(6, new IgniteClosure<String, String>() { - @Override public String apply(String v) { - return v + "6"; // Set new value based on previous value. + // Invoke - assign new value based on previous value. + cache.put(6, "6"); + cache.invoke(6, new EntryProcessor<Integer, String, Void>() { + @Override public Void process(MutableEntry<Integer, String> entry, Object... args) { + String v = entry.getValue(); + + entry.setValue(v + "6"); // Set new value based on previous value. + + return null; } }); // Replace. - cache.putx(7, "7"); + cache.put(7, "7"); b1 = cache.replace(7, "7", "77"); b2 = cache.replace(7, "7", "777"); assert b1 & !b2; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/examples/src/main/java/org/gridgain/examples/datagrid/CachePopularNumbersExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/gridgain/examples/datagrid/CachePopularNumbersExample.java b/examples/src/main/java/org/gridgain/examples/datagrid/CachePopularNumbersExample.java index 69a1216..678a5e2 100644 --- a/examples/src/main/java/org/gridgain/examples/datagrid/CachePopularNumbersExample.java +++ b/examples/src/main/java/org/gridgain/examples/datagrid/CachePopularNumbersExample.java @@ -12,11 +12,10 @@ package org.gridgain.examples.datagrid; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.dataload.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.query.*; +import javax.cache.processor.*; import java.util.*; /** @@ -153,16 +152,20 @@ public class CachePopularNumbersExample { */ private static class IncrementingUpdater implements IgniteDataLoadCacheUpdater<Integer, Long> { /** */ - private static final IgniteClosure<Long, Long> INC = new IgniteClosure<Long, Long>() { - @Override public Long apply(Long e) { - return e == null ? 1L : e + 1; + private static final EntryProcessor<Integer, Long, Void> INC = new EntryProcessor<Integer, Long, Void>() { + @Override public Void process(MutableEntry<Integer, Long> e, Object... args) { + Long val = e.getValue(); + + e.setValue(val == null ? 1 : val + 1); + + return null; } }; /** {@inheritDoc} */ - @Override public void update(GridCache<Integer, Long> cache, Collection<Map.Entry<Integer, Long>> entries) throws IgniteCheckedException { + @Override public void update(IgniteCache<Integer, Long> cache, Collection<Map.Entry<Integer, Long>> entries) { for (Map.Entry<Integer, Long> entry : entries) - cache.transform(entry.getKey(), INC); + cache.invoke(entry.getKey(), INC); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/examples/src/main/scala/org/gridgain/scalar/examples/ScalarCachePopularNumbersExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/gridgain/scalar/examples/ScalarCachePopularNumbersExample.scala b/examples/src/main/scala/org/gridgain/scalar/examples/ScalarCachePopularNumbersExample.scala index 8546ab8..0d87f19 100644 --- a/examples/src/main/scala/org/gridgain/scalar/examples/ScalarCachePopularNumbersExample.scala +++ b/examples/src/main/scala/org/gridgain/scalar/examples/ScalarCachePopularNumbersExample.scala @@ -11,9 +11,10 @@ package org.gridgain.scalar.examples +import javax.cache.processor.{MutableEntry, EntryProcessor} + import org.apache.ignite.dataload.IgniteDataLoadCacheUpdater -import org.apache.ignite.IgniteCheckedException -import org.gridgain.grid.cache.GridCache +import org.apache.ignite.{IgniteCache, IgniteCheckedException} import java.util import java.util.Timer @@ -88,15 +89,24 @@ object ScalarCachePopularNumbersExample extends App { // Reduce parallel operations since we running the whole grid locally under heavy load. val ldr = dataLoader$[Int, Long](CACHE_NAME, 2048) - val f = (i: Long) => i + 1 + val f = new EntryProcessor[Int, Long, Void] { + override def process(e: MutableEntry[Int, Long], arguments: AnyRef*): Void = { + if (e.exists()) + e.setValue(e.getValue + 1) + else + e.setValue(1) + + null + } + } // Set custom updater to increment value for each key. ldr.updater(new IgniteDataLoadCacheUpdater[Int, Long] { - def update(cache: GridCache[Int, Long], entries: util.Collection[Entry[Int, Long]]) = { + def update(cache: IgniteCache[Int, Long], entries: util.Collection[Entry[Int, Long]]) = { import scala.collection.JavaConversions._ for (e <- entries) - cache.transform(e.getKey, f) + cache.invoke(e.getKey, f) } }) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index f51c237..f7b2c34 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -19,6 +19,7 @@ import org.jetbrains.annotations.*; import javax.cache.*; import javax.cache.configuration.*; import javax.cache.expiry.*; +import javax.cache.processor.*; import java.util.*; import java.util.concurrent.locks.*; @@ -427,6 +428,16 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public boolean removeIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter); /** + * @param map Map containing keys and entry processors to be applied to values. + * @param args Additional arguments to pass to the {@link EntryProcessor}. + * @return The map of {@link EntryProcessorResult}s of the processing per key, + * if any, defined by the {@link EntryProcessor} implementation. No mappings + * will be returned for {@link EntryProcessor}s that return a + * <code>null</code> value for a key. + */ + <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args); + + /** * Creates projection that will operate with portable objects. * <p> * Projection returned by this method will force cache not to deserialize portable objects, @@ -474,4 +485,9 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * @return New projection based on this one, but with the specified flags turned on. */ public IgniteCache<K, V> flagsOn(@Nullable GridCacheFlag... flags); + + /** + * @return Ignite instance. + */ + public Ignite ignite(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/apache/ignite/dataload/IgniteDataLoadCacheUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/dataload/IgniteDataLoadCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/dataload/IgniteDataLoadCacheUpdater.java index 88483b6..2e1f92c 100644 --- a/modules/core/src/main/java/org/apache/ignite/dataload/IgniteDataLoadCacheUpdater.java +++ b/modules/core/src/main/java/org/apache/ignite/dataload/IgniteDataLoadCacheUpdater.java @@ -10,7 +10,6 @@ package org.apache.ignite.dataload; import org.apache.ignite.*; -import org.gridgain.grid.cache.*; import java.io.*; import java.util.*; @@ -31,5 +30,5 @@ public interface IgniteDataLoadCacheUpdater<K, V> extends Serializable { * @param entries Collection of entries. * @throws IgniteCheckedException If failed. */ - public void update(GridCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteCheckedException; + public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index f7c157f..d2bad0f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -14,6 +14,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.lang.*; import org.gridgain.grid.cache.*; +import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.*; @@ -71,10 +72,26 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements gate = ctx.gate(); } + /** + * @return Context. + */ + public GridCacheContext<K, V> context() { + return ctx; + } + + /** + * @return Ignite instance. + */ + public GridEx ignite() { + return ctx.grid(); + } + /** {@inheritDoc} */ @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) { - // TODO IGNITE-1. - throw new UnsupportedOperationException(); + if (!clazz.equals(GridCacheConfiguration.class)) + throw new IllegalArgumentException(); + + return (C)ctx.config(); } /** {@inheritDoc} */ @@ -379,7 +396,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements * @param filter Filter. * @return Entry set. */ - public Set<GridCacheEntry<K, V>> entrySetx(IgnitePredicate<GridCacheEntry<K, V>> filter) { + public Set<GridCacheEntry<K, V>> entrySetx(IgnitePredicate<GridCacheEntry<K, V>>... filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -390,6 +407,23 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements } } + /** + * @param filter Filter. + */ + public void removeAll(IgnitePredicate<GridCacheEntry<K, V>>... filter) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + delegate.removeAll(filter); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + finally { + gate.leave(prev); + } + } + /** {@inheritDoc} */ @Override public boolean containsKey(K key) { // TODO IGNITE-1. @@ -576,6 +610,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void removeAll(Set<? extends K> keys) { + removeAll(keys); + } + + /** + * @param keys Keys to remove. + */ + public void removeAll(Collection<? extends K> keys) { try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -629,7 +670,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements else { EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args); - return res.get(); + return res != null ? res.get() : null; } } finally { @@ -661,6 +702,25 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements } /** {@inheritDoc} */ + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( + Map<? extends K, ? extends EntryProcessor<K, V, T>> map, + Object... args) { + try { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return saveOrGet(delegate.invokeAllAsync(map, args)); + } + finally { + gate.leave(prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ @Override public String getName() { return delegate.name(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheEntry.java b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheEntry.java index 6e61dc1..d494903 100644 --- a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheEntry.java @@ -315,27 +315,6 @@ public interface GridCacheEntry<K, V> extends Map.Entry<K, V>, GridMetadataAware /** * This method has the same semantic as - * {@link GridCacheProjection#transform(Object, org.apache.ignite.lang.IgniteClosure)} method. - * - * @param transformer Closure to be applied to the previous value in cache. If this closure returns - * {@code null}, the associated value will be removed from cache. - * @throws IgniteCheckedException If cache update failed. - * @see GridCacheProjection#transform(Object, org.apache.ignite.lang.IgniteClosure) - */ - public void transform(IgniteClosure<V, V> transformer) throws IgniteCheckedException; - - /** - * This method has the same semantic as - * {@link GridCacheProjection#transformAsync(Object, org.apache.ignite.lang.IgniteClosure)} method. - * - * @param transformer Closure to be applied to the previous value in cache. If this closure returns - * {@code null}, the associated value will be removed from cache. - * @return Transform operation future. - */ - public IgniteFuture<?> transformAsync(IgniteClosure<V, V> transformer); - - /** - * This method has the same semantic as * {@link GridCacheProjection#replace(Object, Object)} method. * * @param val See {@link GridCacheProjection#replace(Object, Object)} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheFlag.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheFlag.java b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheFlag.java index d57d7f7..fdff068 100644 --- a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheFlag.java +++ b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheFlag.java @@ -9,9 +9,12 @@ package org.gridgain.grid.cache; +import org.apache.ignite.IgniteCache; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; + /** * Cache projection flags that specify projection behaviour. This flags can be explicitly passed into * the following methods on {@link GridCacheProjection}: @@ -76,7 +79,7 @@ public enum GridCacheFlag { INVALIDATE, /** - * Skips version check during {@link GridCacheProjection#transform(Object, org.apache.ignite.lang.IgniteClosure)} writes in + * Skips version check during {@link IgniteCache#invoke(Object, EntryProcessor, Object[])} writes in * {@link GridCacheAtomicityMode#ATOMIC} mode. By default, in {@code ATOMIC} mode, whenever * {@code transform(...)} is called, cache values (and not the {@code transform} closure) are sent from primary * node to backup nodes to ensure proper update ordering. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java index 965f31e..2d14721 100644 --- a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java +++ b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java @@ -708,82 +708,6 @@ public interface GridCacheProjection<K, V> extends Iterable<GridCacheEntry<K, V> public IgniteFuture<Boolean> putxAsync(K key, V val, @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter); /** - * Stores result of applying {@code valTransform} closure to the previous value associated with - * given key in cache. Result of closure application is guaranteed to be atomic, however, closure - * itself can be applied more than once. - * <p> - * Note that transform closure must not throw any exceptions. If exception is thrown from {@code apply} - * method, the transaction will be invalidated and entries participating in transaction will be nullified. - * <p> - * Unlike {@link #putx(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} or {@link #put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} - * methods, this method will not transfer the whole updated value over the network, but instead will - * transfer the transforming closure that will be applied on each remote node involved in transaction. - * It may add significant performance gain when dealing with large values as the value is much larger - * than the closure itself. If write-through is enabled, the stored value will be persisted to - * {@link GridCacheStore} via {@link GridCacheStore#put(IgniteTx, Object, Object)} method. - * <h2 class="header">Transactions</h2> - * This method is transactional and will enlist the entry into ongoing transaction - * if there is one. - * <h2 class="header">Cache Flags</h2> - * This method is not available if any of the following flags are set on projection: - * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}. - * - * @param key Key to store in cache. - * @param transformer Closure to be applied to the previous value in cache. If this closure returns - * {@code null}, the associated value will be removed from cache. - * @throws NullPointerException If either key or transform closure is {@code null}. - * @throws IgniteCheckedException On any error occurred while storing value in cache. - */ - public void transform(K key, IgniteClosure<V, V> transformer) throws IgniteCheckedException; - - /** - * Applies {@code transformer} closure to the previous value associated with given key in cache, - * closure should return {@link org.apache.ignite.lang.IgniteBiTuple} instance where first value is new value stored in cache - * and second value is returned as result of this method. - * <h2 class="header">Transactions</h2> - * This method is transactional and will enlist the entry into ongoing transaction - * if there is one. - * <h2 class="header">Cache Flags</h2> - * This method is not available if any of the following flags are set on projection: - * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}. - * - * @param key Key to store in cache. - * @param transformer Closure to be applied to the previous value in cache. - * @return Value computed by the closure. - * @throws IgniteCheckedException On any error occurred while storing value in cache. - */ - public <R> R transformAndCompute(K key, IgniteClosure<V, IgniteBiTuple<V, R>> transformer) throws IgniteCheckedException; - - /** - * Stores result of applying {@code transformer} closure to the previous value associated with - * given key in cache. Result of closure application is guaranteed to be atomic, however, closure - * itself can be applied more than once. - * <p> - * Note that transform closure must not throw any exceptions. If exception is thrown from {@code apply} - * method, the transaction will be invalidated and entries participating in transaction will be nullified. - * <p> - * Unlike {@link #putx(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} method, this method will not transfer - * the whole updated value over the network, but instead will transfer the transforming closure - * that will be applied on each remote node involved in transaction. It may add significant performance - * gain when dealing with large values as the value is much larger than the closure itself. - * If write-through is enabled, the stored value will be persisted to {@link GridCacheStore} - * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method. - * <h2 class="header">Transactions</h2> - * This method is transactional and will enlist the entry into ongoing transaction - * if there is one. - * <h2 class="header">Cache Flags</h2> - * This method is not available if any of the following flags are set on projection: - * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}. - * - * @param key Key to store in cache. - * @param transformer Closure to be applied to the previous value in cache. If this closure returns - * {@code null}, the associated value will be removed from cache. - * @return Future for the transform operation. - * @throws NullPointerException If either key or transform closure is {@code null}. - */ - public IgniteFuture<?> transformAsync(K key, IgniteClosure<V, V> transformer); - - /** * Stores given key-value pair in cache only if cache had no previous mapping for it. If cache * previously contained value for the given key, then this value is returned. * In case of {@link GridCacheMode#PARTITIONED} or {@link GridCacheMode#REPLICATED} caches, @@ -1079,60 +1003,6 @@ public interface GridCacheProjection<K, V> extends Iterable<GridCacheEntry<K, V> @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) throws IgniteCheckedException; /** - * Stores result of applying transform closures from the given map to previous values associated - * with corresponding keys in cache. Execution of closure is guaranteed to be atomic, - * however, closure itself can be applied more than once. - * <p> - * Note that transform closure must not throw any exceptions. If exception is thrown from {@code apply} - * method, the transaction will be invalidated and entries participating in transaction will be nullified. - * <p> - * Unlike {@link #putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} method, this method will not transfer - * the whole updated value over the network, but instead will transfer the transforming closures - * that will be applied on each remote node involved in transaction. It may add significant - * performance gain when dealing with large values as the value is much larger than the closure itself. - * If write-through is enabled, the stored value will be persisted to {@link GridCacheStore} - * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method. - * <h2 class="header">Transactions</h2> - * This method is transactional and will enlist the entry into ongoing transaction - * if there is one. - * <h2 class="header">Cache Flags</h2> - * This method is not available if any of the following flags are set on projection: - * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}. - * - * @param m Map containing keys and closures to be applied to values. - * @throws IgniteCheckedException On any error occurred while storing value in cache. - */ - public void transformAll(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) throws IgniteCheckedException; - - /** - * Stores result of applying the specified transform closure to previous values associated - * with the specified keys in cache. Execution of closure is guaranteed to be atomic, - * however, closure itself can be applied more than once. - * <p> - * Note that transform closure must not throw any exceptions. If exception is thrown from {@code apply} - * method, the transaction will be invalidated and entries participating in transaction will be nullified. - * <p> - * Unlike {@link #putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} method, this method will not transfer - * the whole updated value over the network, but instead will transfer the transforming closure - * that will be applied on each remote node involved in transaction. It may add significant - * performance gain when dealing with large values as the value is much larger than the closure itself. - * If write-through is enabled, the stored value will be persisted to {@link GridCacheStore} - * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method. - * <h2 class="header">Transactions</h2> - * This method is transactional and will enlist the entry into ongoing transaction - * if there is one. - * <h2 class="header">Cache Flags</h2> - * This method is not available if any of the following flags are set on projection: - * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}. - * - * @param keys Keys for entries, to which the transformation closure will be applied. - * If the collection is {@code null} or empty, this method is no-op. - * @param transformer Transformation closure to be applied to each value. - * @throws IgniteCheckedException On any error occurred while storing value in cache. - */ - public void transformAll(@Nullable Set<? extends K> keys, IgniteClosure<V, V> transformer) throws IgniteCheckedException; - - /** * Asynchronously stores given key-value pairs in cache. If filters are provided, then entries will * be stored in cache only if they pass the filter. Note that filter check is atomic, * so value stored in cache is guaranteed to be consistent with the filters. @@ -1156,62 +1026,6 @@ public interface GridCacheProjection<K, V> extends Iterable<GridCacheEntry<K, V> @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter); /** - * Stores result of applying transform closures from the given map to previous values associated - * with corresponding keys in cache. Result of closure application is guaranteed to be atomic, - * however, closure itself can be applied more than once. - * <p> - * Note that transform closure must not throw any exceptions. If exception is thrown from {@code apply} - * method, the transaction will be invalidated and entries participating in transaction will be nullified. - * <p> - * Unlike {@link #putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} method, this method will not transfer - * the whole updated value over the network, but instead will transfer the transforming closures - * that will be applied on each remote node involved in transaction. It may add significant performance - * gain when dealing with large values as the value is much larger than the closure itself. - * If write-through is enabled, the stored value will be persisted to {@link GridCacheStore} - * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method. - * <h2 class="header">Transactions</h2> - * This method is transactional and will enlist the entry into ongoing transaction - * if there is one. - * <h2 class="header">Cache Flags</h2> - * This method is not available if any of the following flags are set on projection: - * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}. - * - * @param m Map containing keys and closures to be applied to values. - * @return Future for operation. - */ - public IgniteFuture<?> transformAllAsync(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m); - - /** - * Stores result of applying the specified transform closure to previous values associated - * with the specified keys in cache. Result of closure application is guaranteed to be atomic, - * however, closure itself can be applied more than once. - * <p> - * Note that transform closure must not throw any exceptions. If exception is thrown from {@code apply} - * method, the transaction will be invalidated and entries participating in transaction will be nullified. - * <p> - * Unlike {@link #putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} method, this method will not transfer - * the whole updated value over the network, but instead will transfer the transforming closure - * that will be applied on each remote node involved in transaction. It may add significant - * performance gain when dealing with large values as the value is much larger than the closure itself. - * If write-through is enabled, the stored value will be persisted to {@link GridCacheStore} - * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method. - * <h2 class="header">Transactions</h2> - * This method is transactional and will enlist the entry into ongoing transaction - * if there is one. - * <h2 class="header">Cache Flags</h2> - * This method is not available if any of the following flags are set on projection: - * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}. - * - * @param keys Keys for entries, to which the transformation closure will be applied. - * If the collection is {@code null} or empty, this method is no-op. - * @param transformer Transformation closure to be applied to each value. - * @return Future for operation. - * @throws IgniteCheckedException On any error occurred while storing value in cache. - */ - public IgniteFuture<?> transformAllAsync(@Nullable Set<? extends K> keys, IgniteClosure<V, V> transformer) - throws IgniteCheckedException; - - /** * Set of keys cached on this node. You can remove elements from this set, but you cannot add elements * to this set. All removal operation will be reflected on the cache itself. * <p> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java index 5f472d7..19b4141 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java @@ -45,7 +45,9 @@ public class CacheInvokeResult<T> implements EntryProcessorResult<T>, Externaliz /** * @param res Computed result. */ - public CacheInvokeResult(@Nullable T res) { + public CacheInvokeResult(T res) { + assert res != null; + this.res = res; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index d4028be..6abe9fd 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -2218,14 +2218,17 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor); IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut = - tx.invokeAsync(ctx, false, invokeMap, args); + tx.invokeAsync(ctx, invokeMap, args); Map<K, EntryProcessorResult<T>> resMap = fut.get().value(); - assert resMap != null; - assert resMap.size() == 1 : resMap.size(); + if (resMap != null) { + assert resMap.isEmpty() || resMap.size() == 1 : resMap.size(); - return resMap.values().iterator().next(); + return resMap.isEmpty() ? null : resMap.values().iterator().next(); + } + + return null; } }); } @@ -2251,7 +2254,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im }); IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut = - tx.invokeAsync(ctx, false, invokeMap, args); + tx.invokeAsync(ctx, invokeMap, args); return fut.get().value(); } @@ -2276,7 +2279,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor); - return tx.invokeAsync(ctx, false, invokeMap, args); + return tx.invokeAsync(ctx, invokeMap, args); } @Override public String toString() { @@ -2294,10 +2297,13 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im Map<K, EntryProcessorResult<T>> resMap = ret.value(); - assert resMap != null; - assert resMap.size() == 1 : resMap.size(); + if (resMap != null) { + assert resMap.isEmpty() || resMap.size() == 1 : resMap.size(); - return resMap.values().iterator().next(); + return resMap.isEmpty() ? null : resMap.values().iterator().next(); + } + + return null; } }); } @@ -2322,7 +2328,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im } }); - return tx.invokeAsync(ctx, false, invokeMap, args); + return tx.invokeAsync(ctx, invokeMap, args); } @Override public String toString() { @@ -2340,61 +2346,45 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im assert ret != null; - return ret.value(); + return ret.value() != null ? ret.value() : Collections.<K, EntryProcessorResult<T>>emptyMap(); } }); } /** {@inheritDoc} */ - @Override public void transform(final K key, final IgniteClosure<V, V> transformer) throws IgniteCheckedException { - // TODO IGNITE-44. - throw new UnsupportedOperationException(); - /* - A.notNull(key, "key", transformer, "valTransform"); + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( + final Map<? extends K, ? extends EntryProcessor<K, V, T>> map, + final Object... args) { + A.notNull(map, "map"); if (keyCheck) - validateCacheKey(key); + validateCacheKeys(map.keySet()); ctx.denyOnLocalRead(); - syncOp(new SyncInOp(true) { - @Override public void inOp(IgniteTxLocalAdapter<K, V> tx) throws IgniteCheckedException { - tx.transformAllAsync(ctx, Collections.singletonMap(key, transformer), false, null, -1).get(); + IgniteFuture<?> fut = asyncOp(new AsyncInOp(map.keySet()) { + @Override public IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> inOp(IgniteTxLocalAdapter<K, V> tx) { + return tx.invokeAsync(ctx, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args); } @Override public String toString() { - return "transform [key=" + key + ", valTransform=" + transformer + ']'; + return "invokeAllAsync [map=" + map + ']'; } }); - */ - } - /** {@inheritDoc} */ - @Override public <R> R transformAndCompute(final K key, final IgniteClosure<V, IgniteBiTuple<V, R>> transformer) - throws IgniteCheckedException { - // TODO IGNITE-44. - throw new UnsupportedOperationException(); - /* - A.notNull(key, "key", transformer, "transformer"); - - if (keyCheck) - validateCacheKey(key); - - ctx.denyOnLocalRead(); + IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut0 = + (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)fut; - return syncOp(new SyncOp<R>(true) { - @Override public R op(IgniteTxLocalAdapter<K, V> tx) throws IgniteCheckedException { - IgniteFuture<GridCacheReturn<V>> ret = tx.transformAllAsync(ctx, - F.t(key, new GridCacheTransformComputeClosure<>(transformer)), true, null, -1); + return fut0.chain(new CX1<IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>, Map<K, EntryProcessorResult<T>>>() { + @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut) + throws IgniteCheckedException { + GridCacheReturn<Map<K, EntryProcessorResult<T>>> ret = fut.get(); - return transformer.apply(ret.get().value()).get2(); - } + assert ret != null; - @Override public String toString() { - return "transformAndCompute [key=" + key + ", valTransform=" + transformer + ']'; + return ret.value() != null ? ret.value() : Collections.<K, EntryProcessorResult<T>>emptyMap(); } }); - */ } /** {@inheritDoc} */ @@ -2429,36 +2419,6 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im } /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAsync(final K key, final IgniteClosure<V, V> transformer) { - return transformAsync(key, transformer, null, -1); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAsync(final K key, final IgniteClosure<V, V> transformer, - @Nullable final GridCacheEntryEx<K, V> entry, final long ttl) { - // TODO IGNITE-44. - throw new UnsupportedOperationException(); - /* - A.notNull(key, "key", transformer, "transformer"); - - if (keyCheck) - validateCacheKey(key); - - ctx.denyOnLocalRead(); - - return asyncOp(new AsyncInOp(key) { - @Override public IgniteFuture<?> inOp(IgniteTxLocalAdapter<K, V> tx) { - return tx.transformAllAsync(ctx, F.t(key, transformer), false, entry, ttl); - } - - @Override public String toString() { - return "transformAsync [key=" + key + ", valTransform=" + transformer + ']'; - } - }); - */ - } - - /** {@inheritDoc} */ @Nullable @Override public V putIfAbsent(final K key, final V val) throws IgniteCheckedException { A.notNull(key, "key", val, "val"); @@ -2728,46 +2688,6 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im } /** {@inheritDoc} */ - @Override public void transformAll(@Nullable final Map<? extends K, ? extends IgniteClosure<V, V>> m) - throws IgniteCheckedException { - // TODO IGNITE-44. - throw new UnsupportedOperationException(); - /* - if (F.isEmpty(m)) - return; - - if (keyCheck) - validateCacheKeys(m.keySet()); - - ctx.denyOnLocalRead(); - - syncOp(new SyncInOp(m.size() == 1) { - @Override public void inOp(IgniteTxLocalAdapter<K, V> tx) throws IgniteCheckedException { - tx.transformAllAsync(ctx, m, false, null, -1).get(); - } - - @Override public String toString() { - return "transformAll [map=" + m + ']'; - } - }); - */ - } - - /** {@inheritDoc} */ - @Override public void transformAll(@Nullable Set<? extends K> keys, final IgniteClosure<V, V> transformer) - throws IgniteCheckedException { - if (F.isEmpty(keys)) - return; - - // Reuse transformAll(Map), mapping all keys to a transformer closure. - transformAll(F.viewAsMap(keys, new C1<K, IgniteClosure<V, V>>() { - @Override public IgniteClosure<V, V> apply(K k) { - return transformer; - } - })); - } - - /** {@inheritDoc} */ @Override public IgniteFuture<?> putAllAsync(final Map<? extends K, ? extends V> m, @Nullable final IgnitePredicate<GridCacheEntry<K, V>>... filter) { if (F.isEmpty(m)) @@ -2792,45 +2712,6 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im } /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAllAsync(@Nullable final Map<? extends K, ? extends IgniteClosure<V, V>> m) { - // TODO IGNITE-44. - throw new UnsupportedOperationException(); - /* - if (F.isEmpty(m)) - return new GridFinishedFuture<>(ctx.kernalContext()); - - if (keyCheck) - validateCacheKeys(m.keySet()); - - ctx.denyOnLocalRead(); - - return asyncOp(new AsyncInOp(m.keySet()) { - @Override public IgniteFuture<?> inOp(IgniteTxLocalAdapter<K, V> tx) { - return tx.transformAllAsync(ctx, m, false, null, -1); - } - - @Override public String toString() { - return "transformAllAsync [map=" + m + ']'; - } - }); - */ - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAllAsync(@Nullable Set<? extends K> keys, - final IgniteClosure<V, V> transformer) throws IgniteCheckedException { - if (F.isEmpty(keys)) - return new GridFinishedFuture<>(ctx.kernalContext()); - - // Reuse transformAllAsync(Map), mapping all keys to a transformer closure. - return transformAllAsync(F.viewAsMap(keys, new C1<K, IgniteClosure<V, V>>() { - @Override public IgniteClosure<V, V> apply(K k) { - return transformer; - } - })); - } - - /** {@inheritDoc} */ @Nullable @Override public V remove(K key, IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException { return remove(key, null, filter); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java index f7545b0..c910df2 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java @@ -507,16 +507,6 @@ public class GridCacheEntryImpl<K, V> implements GridCacheEntry<K, V>, Externali } /** {@inheritDoc} */ - @Override public void transform(IgniteClosure<V, V> transformer) throws IgniteCheckedException { - transformAsync(transformer).get(); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAsync(IgniteClosure<V, V> transformer) { - return proxy.transformAsync(key, transformer, isNearEnabled(ctx) ? null : cached, ttl); - } - - /** {@inheritDoc} */ @Override public boolean replacex(V val) throws IgniteCheckedException { return setx(val, ctx.hasPeekArray()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEvictionEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEvictionEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEvictionEntry.java index 04fe0a9..e81f637 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEvictionEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEvictionEntry.java @@ -267,16 +267,6 @@ public class GridCacheEvictionEntry<K, V> implements GridCacheEntry<K, V>, Exter } /** {@inheritDoc} */ - @Override public void transform(IgniteClosure<V, V> transformer) throws IgniteCheckedException { - throw unsupported(); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAsync(IgniteClosure<V, V> transformer) { - throw unsupported(); - } - - /** {@inheritDoc} */ @Nullable @Override public V replace(V val) throws IgniteCheckedException { throw unsupported(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheFilterEvaluationEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheFilterEvaluationEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheFilterEvaluationEntry.java index 6862a5e..9e5644a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheFilterEvaluationEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheFilterEvaluationEntry.java @@ -239,16 +239,6 @@ public class GridCacheFilterEvaluationEntry<K, V> implements GridCacheEntry<K, V } /** {@inheritDoc} */ - @Override public void transform(IgniteClosure<V, V> transformer) throws IgniteCheckedException { - throw new UnsupportedOperationException("transform"); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAsync(IgniteClosure<V, V> transformer) { - throw new UnsupportedOperationException("transformAsync"); - } - - /** {@inheritDoc} */ @Nullable @Override public V replace(V val) throws IgniteCheckedException { throw new UnsupportedOperationException("replace"); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java index 1b9ea20..110d8b1 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java @@ -1458,7 +1458,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> updated = cctx.unwrapTemporary(entry.getValue()); - opRes = new CacheInvokeResult<>(cctx.unwrapTemporary(computed)); + opRes = computed != null ? new CacheInvokeResult<>(cctx.unwrapTemporary(computed)) : null; } catch (Exception e) { updated = old; @@ -1808,7 +1808,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> updated = cctx.unwrapTemporary(entry.getValue()); - invokeRes = new CacheInvokeResult<>(cctx.unwrapTemporary(computed)); + if (computed != null) + invokeRes = new CacheInvokeResult<>(cctx.unwrapTemporary(computed)); valBytes = null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java index 70798ff..8e9cea0 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java @@ -72,6 +72,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Map of proxies. */ private final Map<String, GridCache<?, ?>> proxies; + /** Map of proxies. */ + private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies; + /** Map of public proxies, i.e. proxies which could be returned to the user. */ private final Map<String, GridCache<?, ?>> publicProxies; @@ -105,6 +108,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { caches = new LinkedHashMap<>(); proxies = new HashMap<>(); publicProxies = new HashMap<>(); + jCacheProxies = new HashMap<>(); preloadFuts = new TreeMap<>(); sysCaches = new HashSet<>(); @@ -816,6 +820,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCacheAdapter cache = e.getValue(); proxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null)); + + jCacheProxies.put(e.getKey(), new IgniteCacheProxy(cache.context(), cache, null, false)); } for (GridCacheAdapter<?, ?> cache : caches.values()) { @@ -1594,12 +1600,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (sysCaches.contains(name)) throw new IllegalStateException("Failed to get cache because it is system cache: " + name); - GridCacheAdapter<K, V> cache = (GridCacheAdapter<K, V>)caches.get(name); + IgniteCacheProxy<K, V> cache = (IgniteCacheProxy<K, V>)jCacheProxies.get(name); if (cache == null) throw new IllegalArgumentException("Cache is not configured: " + name); - return new IgniteCacheProxy<>(cache.context(), cache, null, false); + return cache; } /** @@ -1608,12 +1614,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ @SuppressWarnings("unchecked") public <K, V> IgniteCacheProxy<K, V> jcache(@Nullable String name) { - GridCacheAdapter<K, V> cache = (GridCacheAdapter<K, V>)caches.get(name); + IgniteCacheProxy<K, V> cache = (IgniteCacheProxy<K, V>)jCacheProxies.get(name); if (cache == null) throw new IllegalArgumentException("Cache is not configured: " + name); - return new IgniteCacheProxy<>(cache.context(), cache, null, false); + return cache; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java index 8df7d10..9831c59 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java @@ -117,18 +117,6 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> { /** * Internal method that is called from {@link GridCacheEntryImpl}. * - * @param key Key. - * @param transformer Transformer closure. - * @param entry Cached entry. - * @param ttl Optional time-to-lve. - * @return Transform operation future. - */ - public IgniteFuture<?> transformAsync(K key, IgniteClosure<V, V> transformer, @Nullable GridCacheEntryEx<K, V> entry, - long ttl); - - /** - * Internal method that is called from {@link GridCacheEntryImpl}. - * * @param key Key to remove. * @param entry Cached entry. If not provided, equivalent to {GridCacheProjection#put}. * @param filter Optional filter. @@ -402,11 +390,21 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> { * @return Invoke result. * @throws IgniteCheckedException If failed. */ - public <T> EntryProcessorResult<T> invoke(K key, + @Nullable public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws IgniteCheckedException; /** + * @param key Key. + * @param entryProcessor Entry processor. + * @param args Arguments. + * @return Future. + */ + public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args); + + /** * @param keys Keys. * @param entryProcessor Entry processor. * @param args Arguments. @@ -418,22 +416,21 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> { Object... args) throws IgniteCheckedException; /** - * @param key Key. + * @param keys Keys. * @param entryProcessor Entry processor. * @param args Arguments. * @return Future. */ - public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key, + public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args); /** - * @param keys Keys. - * @param entryProcessor Entry processor. + * @param map Map containing keys and entry processors to be applied to values. * @param args Arguments. * @return Future. */ - public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, - EntryProcessor<K, V, T> entryProcessor, + public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( + Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java index 62b6b72..765875f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java @@ -774,21 +774,6 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public void transform(K key, IgniteClosure<V, V> transformer) throws IgniteCheckedException { - A.notNull(key, "key", transformer, "valTransform"); - - cache.transform(key, transformer); - } - - /** {@inheritDoc} */ - @Override public <R> R transformAndCompute(K key, IgniteClosure<V, IgniteBiTuple<V, R>> transformer) - throws IgniteCheckedException { - A.notNull(key, "key", transformer, "transformer"); - - return cache.transformAndCompute(key, transformer); - } - - /** {@inheritDoc} */ @Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws IgniteCheckedException { return cache.invoke(key, entryProcessor, args); @@ -816,6 +801,13 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( + Map<? extends K, ? extends EntryProcessor<K, V, T>> map, + Object... args) { + return cache.invokeAllAsync(map, args); + } + + /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> putxAsync(K key, V val, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) { return putxAsync(key, val, null, -1, filter); @@ -834,13 +826,6 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAsync(K key, IgniteClosure<V, V> transformer) { - A.notNull(key, "key", transformer, "valTransform"); - - return cache.transformAsync(key, transformer); - } - - /** {@inheritDoc} */ @Override public V putIfAbsent(K key, V val) throws IgniteCheckedException { return putIfAbsentAsync(key, val).get(); } @@ -861,12 +846,6 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAsync(K key, IgniteClosure<V, V> transformer, - @Nullable GridCacheEntryEx<K, V> entry, long ttl) { - return cache.transformAsync(key, transformer, entry, ttl); - } - - /** {@inheritDoc} */ @Override public V replace(K key, V val) throws IgniteCheckedException { return replaceAsync(key, val).get(); } @@ -905,23 +884,6 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public void transformAll(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) throws IgniteCheckedException { - if (F.isEmpty(m)) - return; - - cache.transformAll(m); - } - - /** {@inheritDoc} */ - @Override public void transformAll(@Nullable Set<? extends K> keys, IgniteClosure<V, V> transformer) - throws IgniteCheckedException { - if (F.isEmpty(keys)) - return; - - cache.transformAll(keys, transformer); - } - - /** {@inheritDoc} */ @Override public IgniteFuture<?> putAllAsync(Map<? extends K, ? extends V> m, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) { m = isAll(m, true); @@ -933,23 +895,6 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAllAsync(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) { - if (F.isEmpty(m)) - return new GridFinishedFuture<>(cctx.kernalContext()); - - return cache.transformAllAsync(m); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAllAsync(@Nullable Set<? extends K> keys, IgniteClosure<V, V> transformer) - throws IgniteCheckedException { - if (F.isEmpty(keys)) - return new GridFinishedFuture<>(cctx.kernalContext()); - - return cache.transformAllAsync(keys, transformer); - } - - /** {@inheritDoc} */ @Override public Set<K> keySet() { return cache.keySet(entryFilter(true)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java index 66f8626..80a776a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java @@ -713,24 +713,13 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public void transform(K key, IgniteClosure<V, V> transformer) throws IgniteCheckedException { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - delegate.transform(key, transformer); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ - @Override public <R> R transformAndCompute(K key, IgniteClosure<V, IgniteBiTuple<V, R>> transformer) - throws IgniteCheckedException { + @Override public <T> EntryProcessorResult<T> invoke(K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.transformAndCompute(key, transformer); + return delegate.invoke(key, entryProcessor, args); } finally { gate.leave(prev); @@ -738,13 +727,13 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public <T> EntryProcessorResult<T> invoke(K key, + @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key, EntryProcessor<K, V, T> entryProcessor, - Object... args) throws IgniteCheckedException { + Object... args) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.invoke(key, entryProcessor, args); + return delegate.invokeAsync(key, entryProcessor, args); } finally { gate.leave(prev); @@ -766,13 +755,14 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key, + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( + Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.invokeAsync(key, entryProcessor, args); + return delegate.invokeAllAsync(keys, entryProcessor, args); } finally { gate.leave(prev); @@ -780,13 +770,13 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, - EntryProcessor<K, V, T> entryProcessor, + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( + Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.invokeAllAsync(keys, entryProcessor, args); + return delegate.invokeAllAsync(map, args); } finally { gate.leave(prev); @@ -820,18 +810,6 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAsync(K key, IgniteClosure<V, V> transformer) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - return delegate.transformAsync(key, transformer); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ @Nullable @Override public V putIfAbsent(K key, V val) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -880,19 +858,6 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAsync(K key, IgniteClosure<V, V> transformer, - @Nullable GridCacheEntryEx<K, V> entry, long ttl) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - return delegate.transformAsync(key, transformer); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ @Nullable @Override public V replace(K key, V val) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -978,31 +943,6 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public void transformAll(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) throws IgniteCheckedException { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - delegate.transformAll(m); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ - @Override public void transformAll(@Nullable Set<? extends K> keys, IgniteClosure<V, V> transformer) - throws IgniteCheckedException { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - delegate.transformAll(keys, transformer); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ @Override public IgniteFuture<?> putAllAsync(@Nullable Map<? extends K, ? extends V> m, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -1016,31 +956,6 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAllAsync(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - return delegate.transformAllAsync(m); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> transformAllAsync(@Nullable Set<? extends K> keys, IgniteClosure<V, V> transformer) - throws IgniteCheckedException { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - return delegate.transformAllAsync(keys, transformer); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ @Override public Set<K> keySet() { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTransformComputeClosure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTransformComputeClosure.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTransformComputeClosure.java deleted file mode 100644 index bf06bd8..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTransformComputeClosure.java +++ /dev/null @@ -1,68 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.lang.*; -import org.gridgain.grid.cache.GridCacheProjection; - -import java.io.*; - -/** - */ -public final class GridCacheTransformComputeClosure<V, R> implements IgniteClosure<V, V>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private IgniteClosure<V, IgniteBiTuple<V, R>> transformer; - - /** */ - private R retVal; - - /** - * Required by {@link Externalizable}. - */ - public GridCacheTransformComputeClosure() { - // No-op. - } - - /** - * @param transformer Transformer closure. - */ - public GridCacheTransformComputeClosure(IgniteClosure<V, IgniteBiTuple<V, R>> transformer) { - this.transformer = transformer; - } - - /** - * @return Return value for {@link GridCacheProjection#transformAndCompute(Object, org.apache.ignite.lang.IgniteClosure)} - */ - public R returnValue() { - return retVal; - } - - /** {@inheritDoc} */ - @Override public V apply(V v) { - IgniteBiTuple<V, R> t = transformer.apply(v); - - retVal = t.get2(); - - return t.get1(); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(transformer); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - transformer = (IgniteClosure<V, IgniteBiTuple<V, R>>)in.readObject(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java index 9ef3b7b..090fb94 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java @@ -10,13 +10,13 @@ package org.gridgain.grid.kernal.processors.cache.datastructures; import org.apache.ignite.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.cache.*; +import org.apache.ignite.cache.*; import org.gridgain.grid.cache.datastructures.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; import java.util.*; /** @@ -39,7 +39,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { @SuppressWarnings("unchecked") @Override public boolean offer(T item) throws IgniteException { try { - Long idx = transformHeader(new AddClosure(id, 1)); + Long idx = transformHeader(new AddProcessor(id, 1)); if (idx == null) return false; @@ -52,13 +52,11 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { while (true) { try { - boolean putx = cache.putx(key, item, null); - - assert putx; + cache.put(key, item); break; } - catch (GridCachePartialUpdateException e) { + catch (CachePartialUpdateException e) { if (cnt++ == MAX_UPDATE_RETRIES) throw e; else { @@ -81,7 +79,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { @Nullable @Override public T poll() throws IgniteException { try { while (true) { - Long idx = transformHeader(new PollClosure(id)); + Long idx = transformHeader(new PollProcessor(id)); if (idx == null) return null; @@ -96,7 +94,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { while (true) { try { - T data = (T)cache.remove(key, null); + T data = (T)cache.getAndRemove(key); if (data != null) return data; @@ -105,7 +103,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { stop = U.currentTimeMillis() + RETRY_TIMEOUT; while (U.currentTimeMillis() < stop ) { - data = (T)cache.remove(key, null); + data = (T)cache.getAndRemove(key); if (data != null) return data; @@ -113,7 +111,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { break; } - catch (GridCachePartialUpdateException e) { + catch (CachePartialUpdateException e) { if (cnt++ == MAX_UPDATE_RETRIES) throw e; else { @@ -138,7 +136,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { A.notNull(items, "items"); try { - Long idx = transformHeader(new AddClosure(id, items.size())); + Long idx = transformHeader(new AddProcessor(id, items.size())); if (idx == null) return false; @@ -157,11 +155,11 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { while (true) { try { - cache.putAll(putMap, null); + cache.putAll(putMap); break; } - catch (GridCachePartialUpdateException e) { + catch (CachePartialUpdateException e) { if (cnt++ == MAX_UPDATE_RETRIES) throw e; else { @@ -182,7 +180,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override protected void removeItem(long rmvIdx) throws IgniteCheckedException { - Long idx = (Long)cache.transformAndCompute(queueKey, new RemoveClosure(id, rmvIdx)); + Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)); if (idx != null) { checkRemoved(idx); @@ -195,20 +193,20 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { while (true) { try { - if (cache.removex(key)) + if (cache.remove(key)) return; if (stop == 0) stop = U.currentTimeMillis() + RETRY_TIMEOUT; while (U.currentTimeMillis() < stop ) { - if (cache.removex(key)) + if (cache.remove(key)) return; } break; } - catch (GridCachePartialUpdateException e) { + catch (CachePartialUpdateException e) { if (cnt++ == MAX_UPDATE_RETRIES) throw e; else { @@ -224,20 +222,20 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { } /** - * @param c Transform closure to be applied for queue header. + * @param c EntryProcessor to be applied for queue header. * @return Value computed by the transform closure. * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - @Nullable private Long transformHeader(IgniteClosure<GridCacheQueueHeader, IgniteBiTuple<GridCacheQueueHeader, Long>> c) + @Nullable private Long transformHeader(EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, Long> c) throws IgniteCheckedException { int cnt = 0; while (true) { try { - return (Long)cache.transformAndCompute(queueKey, c); + return (Long)cache.invoke(queueKey, c); } - catch (GridCachePartialUpdateException e) { + catch (CachePartialUpdateException e) { if (cnt++ == MAX_UPDATE_RETRIES) throw e; else { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3eba2e1d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java index b4fc6d2..35b10b7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheDataStructuresManager.java @@ -735,7 +735,12 @@ public final class GridCacheDataStructuresManager<K, V> extends GridCacheManager if (hdr.empty()) return true; - GridCacheQueueAdapter.removeKeys(cctx.cache(), hdr.id(), name, hdr.collocated(), hdr.head(), hdr.tail(), + GridCacheQueueAdapter.removeKeys(cctx.kernalContext().cache().jcache(cctx.cache().name()), + hdr.id(), + name, + hdr.collocated(), + hdr.head(), + hdr.tail(), batchSize); return true;