http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java index eaf0173..650f0ab 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -25,6 +25,7 @@ import org.jetbrains.annotations.*; import sun.misc.*; import javax.cache.expiry.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -106,6 +107,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (V)updateAllInternal(UPDATE, Collections.singleton(key), Collections.singleton(val), + null, expiryPerCall(), true, false, @@ -127,6 +129,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (Boolean)updateAllInternal(UPDATE, Collections.singleton(key), Collections.singleton(val), + null, expiryPerCall(), false, false, @@ -145,6 +148,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (Boolean)updateAllInternal(UPDATE, Collections.singleton(key), Collections.singleton(val), + null, expiryPerCall(), false, false, @@ -163,7 +167,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { ctx.denyOnLocalRead(); - return updateAllAsync0(F0.asMap(key, val), null, true, false, ttl, filter); + return updateAllAsync0(F0.asMap(key, val), + null, + null, + true, + false, + filter); } /** {@inheritDoc} */ @@ -177,7 +186,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { ctx.denyOnLocalRead(); - return updateAllAsync0(F0.asMap(key, val), null, false, false, ttl, filter); + return updateAllAsync0(F0.asMap(key, val), + null, + null, + false, + false, + filter); } /** {@inheritDoc} */ @@ -242,6 +256,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (GridCacheReturn<V>)updateAllInternal(UPDATE, Collections.singleton(key), Collections.singleton(newVal), + null, expiryPerCall(), true, true, @@ -259,6 +274,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (GridCacheReturn<V>)updateAllInternal(DELETE, Collections.singleton(key), null, + null, expiryPerCall(), true, true, @@ -283,7 +299,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { ctx.denyOnLocalRead(); - return updateAllAsync0(F.asMap(key, newVal), null, true, true, 0, + return updateAllAsync0(F.asMap(key, newVal), + null, + null, + true, + true, ctx.equalsPeekArray(oldVal)); } @@ -295,6 +315,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { updateAllInternal(UPDATE, m.keySet(), m.values(), + null, expiryPerCall(), false, false, @@ -307,11 +328,17 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) { ctx.denyOnLocalRead(); - return updateAllAsync0(m, null, false, false, 0, filter); + return updateAllAsync0(m, + null, + null, + false, + false, + filter); } /** {@inheritDoc} */ @Override public void transform(K key, IgniteClosure<V, V> transformer) throws IgniteCheckedException { + /* ctx.denyOnLocalRead(); updateAllInternal(TRANSFORM, @@ -322,12 +349,16 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { false, null, ctx.isStoreEnabled()); + */ + // TODO IGNITE-44. + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <R> R transformAndCompute(K key, IgniteClosure<V, IgniteBiTuple<V, R>> transformer) throws IgniteCheckedException { + /* return (R)updateAllInternal(TRANSFORM, Collections.singleton(key), Collections.singleton(new GridCacheTransformComputeClosure<>(transformer)), @@ -336,6 +367,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { false, null, ctx.isStoreEnabled()); + */ + // TODO IGNITE-44. + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @@ -343,14 +377,19 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { IgniteClosure<V, V> transformer, @Nullable GridCacheEntryEx<K, V> entry, long ttl) { + /* ctx.denyOnLocalRead(); return updateAllAsync0(null, Collections.singletonMap(key, transformer), false, false, ttl, null); + */ + // TODO IGNITE-44. + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @SuppressWarnings("ConstantConditions") @Override public void transformAll(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) throws IgniteCheckedException { + /* ctx.denyOnLocalRead(); if (F.isEmpty(m)) @@ -364,16 +403,23 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { false, null, ctx.isStoreEnabled()); + */ + // TODO IGNITE-44. + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public IgniteFuture<?> transformAllAsync(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) { + /* ctx.denyOnLocalRead(); if (F.isEmpty(m)) return new GridFinishedFuture<Object>(ctx.kernalContext()); return updateAllAsync0(null, m, false, false, 0, null); + */ + // TODO IGNITE-44. + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @@ -386,6 +432,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (V)updateAllInternal(DELETE, Collections.singleton(key), null, + null, expiryPerCall(), true, false, @@ -412,6 +459,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { updateAllInternal(DELETE, keys, null, + null, expiryPerCall(), false, false, @@ -439,6 +487,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (Boolean)updateAllInternal(DELETE, Collections.singleton(key), null, + null, expiryPerCall(), false, false, @@ -467,6 +516,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (Boolean)updateAllInternal(DELETE, Collections.singleton(key), null, + null, expiryPerCall(), false, false, @@ -669,31 +719,107 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return getAllAsync(keys, null, false, subjId, taskName, deserializePortable, false, expiry, filter).get(); } + /** {@inheritDoc} */ + @Override public <T> EntryProcessorResult<T> invoke(K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) throws IgniteCheckedException { + return invokeAsync(key, entryProcessor, args).get(); + } + + /** {@inheritDoc} */ + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, + EntryProcessor<K, V, T> entryProcessor, + Object... args) throws IgniteCheckedException { + return invokeAllAsync(keys, entryProcessor, args).get(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) throws EntryProcessorException { + A.notNull(key, "key", entryProcessor, "entryProcessor"); + + if (keyCheck) + validateCacheKey(key); + + ctx.denyOnLocalRead(); + + Map<? extends K, EntryProcessor> invokeMap = + Collections.singletonMap(key, (EntryProcessor)entryProcessor); + + IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null, + invokeMap, + args, + true, + false, + null); + + return fut.chain(new CX1<IgniteFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() { + @Override public EntryProcessorResult<T> applyx(IgniteFuture<Map<K, EntryProcessorResult<T>>> fut) + throws IgniteCheckedException { + Map<K, EntryProcessorResult<T>> resMap = fut.get(); + + assert resMap != null; + assert resMap.size() == 1 : resMap.size(); + + return resMap.values().iterator().next(); + } + }); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( + Set<? extends K> keys, + final EntryProcessor<K, V, T> entryProcessor, + Object... args) { + A.notNull(keys, "keys", entryProcessor, "entryProcessor"); + + if (keyCheck) + validateCacheKeys(keys); + + ctx.denyOnLocalRead(); + + Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() { + @Override public EntryProcessor apply(K k) { + return entryProcessor; + } + }); + + return updateAllAsync0(null, + invokeMap, + args, + true, + false, + null); + } + /** * Entry point for public API update methods. * - * @param map Put map. Either {@code map} or {@code transformMap} should be passed. - * @param transformMap Transform map. Either {@code map} or {@code transformMap} should be passed. + * @param map Put map. Either {@code map} or {@code invokeMap} should be passed. + * @param invokeMap Transform map. Either {@code map} or {@code invokeMap} should be passed. + * @param invokeArgs Optional arguments for EntryProcessor. * @param retval Return value required flag. * @param rawRetval Return {@code GridCacheReturn} instance. - * @param ttl Entry time-to-live. * @param filter Cache entry filter for atomic updates. * @return Completion future. */ private IgniteFuture updateAllAsync0( @Nullable final Map<? extends K, ? extends V> map, - @Nullable final Map<? extends K, ? extends IgniteClosure<V, V>> transformMap, + @Nullable final Map<? extends K, EntryProcessor> invokeMap, + @Nullable final Object[] invokeArgs, final boolean retval, final boolean rawRetval, - final long ttl, @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter ) { - final GridCacheOperation op = transformMap != null ? TRANSFORM : UPDATE; + final GridCacheOperation op = invokeMap != null ? TRANSFORM : UPDATE; final Collection<? extends K> keys = - map != null ? map.keySet() : transformMap != null ? transformMap.keySet() : null; + map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : null; - final Collection<?> vals = map != null ? map.values() : transformMap != null ? transformMap.values() : null; + final Collection<?> vals = map != null ? map.values() : invokeMap != null ? invokeMap.values() : null; final boolean storeEnabled = ctx.isStoreEnabled(); @@ -704,6 +830,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return updateAllInternal(op, keys, vals, + invokeArgs, expiry, retval, rawRetval, @@ -737,6 +864,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return updateAllInternal(DELETE, keys, null, + null, expiryPlc, retval, rawRetval, @@ -747,11 +875,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { } /** - * Entry point for all public update methods (put, remove, transform). + * Entry point for all public update methods (put, remove, invoke). * * @param op Operation. * @param keys Keys. * @param vals Values. + * @param invokeArgs Optional arguments for EntryProcessor. * @param expiryPlc Expiry policy. * @param retval Return value required flag. * @param rawRetval Return {@code GridCacheReturn} instance. @@ -764,6 +893,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { private Object updateAllInternal(GridCacheOperation op, Collection<? extends K> keys, @Nullable Iterable<?> vals, + @Nullable Object[] invokeArgs, @Nullable ExpiryPolicy expiryPlc, boolean retval, boolean rawRetval, @@ -784,9 +914,15 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { UUID subjId = ctx.subjectIdPerCall(null); if (storeEnabled && keys.size() > 1) { - updateWithBatch(op, keys, vals, expiryPlc, ver, filter, subjId, taskName); - - return null; + return updateWithBatch(op, + keys, + vals, + invokeArgs, + expiryPlc, + ver, + filter, + subjId, + taskName); } Iterator<?> valsIter = vals != null ? vals.iterator() : null; @@ -809,10 +945,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { try { entry = entryEx(key); - IgniteBiTuple<Boolean, V> t = entry.innerUpdateLocal( + IgniteBiTuple<Boolean, Object> t = entry.innerUpdateLocal( ver, val == null ? DELETE : op, val, + invokeArgs, storeEnabled, retval, expiryPlc, @@ -823,16 +960,23 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { subjId, taskName); - if (res == null) { - if (op == TRANSFORM && val instanceof GridCacheTransformComputeClosure) { - assert retval; + if (op == TRANSFORM) { + assert t.get2() instanceof EntryProcessorResult : t.get2(); + + Map<K, EntryProcessorResult> computedMap; - res = new IgniteBiTuple<>(t.get1(), - ((GridCacheTransformComputeClosure<V, ?>)val).returnValue()); + if (res == null) { + computedMap = U.newHashMap(keys.size()); + + res = new IgniteBiTuple<>(true, computedMap); } else - res = t; + computedMap = (Map<K, EntryProcessorResult>)res.get2(); + + computedMap.put(key, (EntryProcessorResult)t.getValue()); } + else if (res == null) + res = t; break; // While. } @@ -872,18 +1016,21 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { * @param op Operation. * @param keys Keys. * @param vals Values. + * @param invokeArgs Optional arguments for EntryProcessor. * @param expiryPlc Expiry policy. * @param ver Cache version. * @param filter Optional filter. * @param subjId Subject ID. * @param taskName Task name. * @throws GridCachePartialUpdateException If update failed. + * @return Results map for invoke operation. */ @SuppressWarnings({"ForLoopReplaceableByForEach", "unchecked"}) - private void updateWithBatch( + private Map<K, EntryProcessorResult> updateWithBatch( GridCacheOperation op, Collection<? extends K> keys, @Nullable Iterable<?> vals, + @Nullable Object[] invokeArgs, @Nullable ExpiryPolicy expiryPlc, GridCacheVersion ver, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, @@ -896,7 +1043,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { int size = locked.size(); Map<K, V> putMap = null; + Collection<K> rmvKeys = null; + + Map<K, EntryProcessorResult> invokeResMap = + op == TRANSFORM ? U.<K, EntryProcessorResult>newHashMap(size) : null; + List<GridCacheEntryEx<K, V>> filtered = new ArrayList<>(size); GridCachePartialUpdateException err = null; @@ -933,7 +1085,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { } if (op == TRANSFORM) { - IgniteClosure<V, V> transform = (IgniteClosure<V, V>)val; + EntryProcessor<K, V, ?> entryProcessor = (EntryProcessor<K, V, ?>)val; V old = entry.innerGet(null, /*swap*/true, @@ -944,12 +1096,30 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { /**event*/true, /**temporary*/true, subjId, - transform, + entryProcessor, taskName, CU.<K, V>empty(), null); - V updated = transform.apply(old); + CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(entry.key(), old); + + V updated; + CacheInvokeResult invokeRes; + + try { + Object computed = entryProcessor.process(invokeEntry, invokeArgs); + + updated = ctx.unwrapTemporary(invokeEntry.getValue()); + + invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed)); + } + catch (Exception e) { + invokeRes = new CacheInvokeResult<>(e); + + updated = old; + } + + invokeResMap.put(entry.key(), invokeRes); if (updated == null) { if (intercept) { @@ -1107,6 +1277,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { if (err != null) throw err; + + return invokeResMap; } finally { unlockEntries(locked); @@ -1179,10 +1351,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { assert writeVal != null || op == DELETE : "null write value found."; - IgniteBiTuple<Boolean, V> t = entry.innerUpdateLocal( + IgniteBiTuple<Boolean, Object> t = entry.innerUpdateLocal( ver, op, writeVal, + null, false, false, expiryPlc,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java index 6fb77a1..bfd9359 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java @@ -1194,7 +1194,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter /*event*/recordEvt, /*temporary*/true, /*subjId*/subjId, - /**closure name */recordEvt ? F.first(txEntry.entryProcessors()) : null, + /**closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null, resolveTaskName(), CU.<K, V>empty(), null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java index 34938d5..1680724 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1768,7 +1768,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> @Override public <T> IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync( GridCacheContext<K, V> cacheCtx, boolean retval, - @Nullable Map<? extends K, EntryProcessor> map, + @Nullable Map<? extends K, EntryProcessor<K, V, Object>> map, Object... invokeArgs ) { return (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)putAllAsync0(cacheCtx, @@ -1829,7 +1829,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> @Nullable ExpiryPolicy expiryPlc, boolean implicit, @Nullable Map<? extends K, ? extends V> lookup, - @Nullable Map<? extends K, EntryProcessor> invokeMap, + @Nullable Map<? extends K, EntryProcessor<K, V, Object>> invokeMap, @Nullable Object[] invokeArgs, boolean retval, boolean lockOnly, @@ -1992,7 +1992,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> break; // While. } - GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE : + final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE : entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE; txEntry = addEntry(op, @@ -2019,7 +2019,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> txEntry.markValid(); if (old == null) { - if (retval && !readThrough) { + boolean load = retval && !readThrough; + + // Check for transform here to avoid map creation. + load |= (op == TRANSFORM && keys.size() == 1); + + if (load) { // If return value is required, then we know for sure that there is only // one key in the keys collection. assert keys.size() == 1; @@ -2035,7 +2040,16 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> log.debug("Loaded value from remote node [key=" + k + ", val=" + v + ']'); - ret.set(v, true); + if (op == TRANSFORM) { + IgniteTxEntry<K, V> e = + entry(new IgniteTxKey<>(k, cacheCtx.cacheId())); + + assert e != null && e.op() == TRANSFORM : e; + + addInvokeResult(e, v, ret); + } + else + ret.set(v, true); } }); @@ -2130,6 +2144,9 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> drVer); enlisted.add(key); + + if (txEntry.op() == TRANSFORM) + addInvokeResult(txEntry, txEntry.value(), ret); } if (!pessimistic()) { @@ -2137,6 +2154,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (retval) ret.set(v, true); + else + ret.success(true); } } } @@ -2155,11 +2174,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> missedForInvoke, deserializePortables(cacheCtx), new CI2<K, V>() { - @Override public void apply(K k, V v) { + @Override public void apply(K key, V val) { if (log.isDebugEnabled()) - log.debug("Loaded value from remote node [key=" + k + ", val=" + v + ']'); + log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']'); + + IgniteTxEntry<K, V> e = entry(new IgniteTxKey<>(key, cacheCtx.cacheId())); - addInvokeResult(entry(new IgniteTxKey<>(k, cacheCtx.cacheId())), v, ret); + assert e != null && e.op() == TRANSFORM : e; + + addInvokeResult(e, val, ret); } }); @@ -2373,17 +2396,19 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> private IgniteFuture putAllAsync0( final GridCacheContext<K, V> cacheCtx, @Nullable Map<? extends K, ? extends V> map, - @Nullable Map<? extends K, EntryProcessor> invokeMap, + @Nullable Map<? extends K, EntryProcessor<K, V, Object>> invokeMap, @Nullable final Object[] invokeArgs, @Nullable final Map<? extends K, GridCacheDrInfo<V>> drMap, final boolean retval, @Nullable GridCacheEntryEx<K, V> cached, @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) { + assert filter == null || invokeMap == null; + cacheCtx.checkSecurity(GridSecurityPermission.CACHE_PUT); // Cached entry may be passed only from entry wrapper. final Map<K, V> map0; - final Map<K, EntryProcessor> invokeMap0; + final Map<K, EntryProcessor<K, V, Object>> invokeMap0; if (drMap != null) { assert map == null; @@ -2419,7 +2444,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> invokeMap0 = U.newHashMap(invokeMap.size()); try { - for (Map.Entry<? extends K, EntryProcessor> e : invokeMap.entrySet()) { + for (Map.Entry<? extends K, EntryProcessor<K, V, Object>> e : invokeMap.entrySet()) { K key = (K)cacheCtx.marshalToPortable(e.getKey()); invokeMap0.put(key, e.getValue()); @@ -2434,7 +2459,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } else { map0 = (Map<K, V>)map; - invokeMap0 = (Map<K, EntryProcessor>)invokeMap; + invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap; } if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java index 8a485b6..12680f3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java @@ -96,7 +96,7 @@ public interface IgniteTxLocalEx<K, V> extends IgniteTxEx<K, V> { public <T> IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync( GridCacheContext<K, V> cacheCtx, boolean retval, - Map<? extends K, EntryProcessor> map, + Map<? extends K, EntryProcessor<K, V, Object>> map, Object... invokeArgs); /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java index a21c8fc..89bcbe6 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java @@ -34,6 +34,7 @@ import org.gridgain.grid.util.worker.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; import java.io.*; import java.nio.*; import java.util.*; @@ -1101,7 +1102,7 @@ public class GridGgfsDataManager extends GridGgfsManager { // No affinity key present, just concat and return. if (colocatedKey.affinityKey() == null) { - dataCachePrj.transform(colocatedKey, new UpdateClosure(startOff, data)); + dataCachePrj.invoke(colocatedKey, new UpdateProcessor(startOff, data)); return; } @@ -1125,16 +1126,16 @@ public class GridGgfsDataManager extends GridGgfsManager { boolean hasVal = false; - UpdateClosure transformClos = new UpdateClosure(startOff, data); + UpdateProcessor transformClos = new UpdateProcessor(startOff, data); if (vals.get(colocatedKey) != null) { - dataCachePrj.transform(colocatedKey, transformClos); + dataCachePrj.invoke(colocatedKey, transformClos); hasVal = true; } if (vals.get(key) != null) { - dataCachePrj.transform(key, transformClos); + dataCachePrj.invoke(key, transformClos); hasVal = true; } @@ -1570,7 +1571,8 @@ public class GridGgfsDataManager extends GridGgfsManager { * Helper closure to update data in cache. */ @GridInternal - private static final class UpdateClosure implements IgniteClosure<byte[], byte[]>, Externalizable { + private static final class UpdateProcessor implements EntryProcessor<GridGgfsBlockKey, byte[], Void>, + Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -1584,7 +1586,7 @@ public class GridGgfsDataManager extends GridGgfsManager { * Empty constructor required for {@link Externalizable}. * */ - public UpdateClosure() { + public UpdateProcessor() { // No-op. } @@ -1594,7 +1596,7 @@ public class GridGgfsDataManager extends GridGgfsManager { * @param start Start position in the block to write new data from. * @param data Data block to write into cache. */ - private UpdateClosure(int start, byte[] data) { + private UpdateProcessor(int start, byte[] data) { assert start >= 0; assert data != null; assert start + data.length >= 0 : "Too much data [start=" + start + ", data.length=" + data.length + ']'; @@ -1604,7 +1606,9 @@ public class GridGgfsDataManager extends GridGgfsManager { } /** {@inheritDoc} */ - @Override public byte[] apply(byte[] e) { + @Override public Void process(MutableEntry<GridGgfsBlockKey, byte[]> entry, Object... args) { + byte[] e = entry.getValue(); + final int size = data.length; if (e == null || e.length == 0) @@ -1621,7 +1625,9 @@ public class GridGgfsDataManager extends GridGgfsManager { // Copy data into entry. U.arrayCopy(data, 0, e, start, size); - return e; + entry.setValue(e); + + return null; } /** {@inheritDoc} */ @@ -1638,7 +1644,7 @@ public class GridGgfsDataManager extends GridGgfsManager { /** {@inheritDoc} */ @Override public String toString() { - return S.toString(UpdateClosure.class, this, "start", start, "data.length", data.length); + return S.toString(UpdateProcessor.class, this, "start", start, "data.length", data.length); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java index eb0a728..87e09b9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java @@ -26,6 +26,7 @@ import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.grid.util.lang.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; @@ -751,7 +752,7 @@ public class GridGgfsMetaManager extends GridGgfsManager { assert metaCache.get(parentId) != null; - id2InfoPrj.transform(parentId, new UpdateListing(fileName, new GridGgfsListingEntry(newFileInfo), false)); + id2InfoPrj.invoke(parentId, new UpdateListing(fileName, new GridGgfsListingEntry(newFileInfo), false)); return null; } @@ -868,10 +869,10 @@ public class GridGgfsMetaManager extends GridGgfsManager { assert metaCache.get(destParentId) != null; // Remove listing entry from the source parent listing. - id2InfoPrj.transform(srcParentId, new UpdateListing(srcFileName, srcEntry, true)); + id2InfoPrj.invoke(srcParentId, new UpdateListing(srcFileName, srcEntry, true)); // Add listing entry into the destination parent listing. - id2InfoPrj.transform(destParentId, new UpdateListing(destFileName, srcEntry, false)); + id2InfoPrj.invoke(destParentId, new UpdateListing(destFileName, srcEntry, false)); } /** @@ -987,7 +988,7 @@ public class GridGgfsMetaManager extends GridGgfsManager { // Update a file info of the removed file with a file path, // which will be used by delete worker for event notifications. - id2InfoPrj.transform(fileId, new UpdatePath(path)); + id2InfoPrj.invoke(fileId, new UpdatePath(path)); return GridGgfsFileInfo.builder(fileInfo).path(path).build(); } @@ -1086,12 +1087,12 @@ public class GridGgfsMetaManager extends GridGgfsManager { id2InfoPrj.put(newInfo.id(), newInfo); // Add new info to trash listing. - id2InfoPrj.transform(TRASH_ID, new UpdateListing(newInfo.id().toString(), + id2InfoPrj.invoke(TRASH_ID, new UpdateListing(newInfo.id().toString(), new GridGgfsListingEntry(newInfo), false)); // Remove listing entries from root. for (Map.Entry<String, GridGgfsListingEntry> entry : transferListing.entrySet()) - id2InfoPrj.transform(ROOT_ID, new UpdateListing(entry.getKey(), entry.getValue(), true)); + id2InfoPrj.invoke(ROOT_ID, new UpdateListing(entry.getKey(), entry.getValue(), true)); resId = newInfo.id(); } @@ -1228,7 +1229,7 @@ public class GridGgfsMetaManager extends GridGgfsManager { GridGgfsListingEntry listingEntry = parentInfo.listing().get(name); if (listingEntry != null) - id2InfoPrj.transform(parentId, new UpdateListing(name, listingEntry, true)); + id2InfoPrj.invoke(parentId, new UpdateListing(name, listingEntry, true)); id2InfoPrj.remove(id); @@ -1359,7 +1360,7 @@ public class GridGgfsMetaManager extends GridGgfsManager { assert metaCache.get(parentId) != null; - id2InfoPrj.transform(parentId, new UpdateListing(fileName, entry, false)); + id2InfoPrj.invoke(parentId, new UpdateListing(fileName, entry, false)); } return newInfo; @@ -1424,7 +1425,7 @@ public class GridGgfsMetaManager extends GridGgfsManager { assert validTxState(false); - id2InfoPrj.transformAsync(parentId, new UpdateListingEntry(fileId, fileName, lenDelta, 0, + id2InfoPrj.invokeAsync(parentId, new UpdateListingEntry(fileId, fileName, lenDelta, 0, modificationTime)); } finally { @@ -1659,9 +1660,9 @@ public class GridGgfsMetaManager extends GridGgfsManager { id2InfoPrj.removex(oldId); // Remove the old one. id2InfoPrj.putx(newInfo.id(), newInfo); // Put the new one. - id2InfoPrj.transform(parentInfo.id(), + id2InfoPrj.invoke(parentInfo.id(), new UpdateListing(path.name(), parentInfo.listing().get(path.name()), true)); - id2InfoPrj.transform(parentInfo.id(), + id2InfoPrj.invoke(parentInfo.id(), new UpdateListing(path.name(), new GridGgfsListingEntry(newInfo), false)); IgniteFuture<?> delFut = ggfsCtx.data().delete(oldInfo); @@ -2150,7 +2151,7 @@ public class GridGgfsMetaManager extends GridGgfsManager { } // Update the deleted file info with path information for delete worker. - id2InfoPrj.transform(info.id(), new UpdatePath(path)); + id2InfoPrj.invoke(info.id(), new UpdatePath(path)); return true; // No additional handling is required. } @@ -2606,7 +2607,7 @@ public class GridGgfsMetaManager extends GridGgfsManager { id2InfoPrj.putx(fileId, updated); - id2InfoPrj.transform(parentId, new UpdateListingEntry(fileId, fileName, 0, accessTime, + id2InfoPrj.invoke(parentId, new UpdateListingEntry(fileId, fileName, 0, accessTime, modificationTime)); tx.commit(); @@ -2741,7 +2742,7 @@ public class GridGgfsMetaManager extends GridGgfsManager { /** * Updates file length information in parent listing. */ - private static final class UpdateListingEntry implements IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo>, + private static final class UpdateListingEntry implements EntryProcessor<IgniteUuid, GridGgfsFileInfo, Void>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -2775,8 +2776,11 @@ public class GridGgfsMetaManager extends GridGgfsManager { * @param accessTime Last access time. * @param modificationTime Last modification time. */ - private UpdateListingEntry(IgniteUuid fileId, String fileName, long lenDelta, - long accessTime, long modificationTime) { + private UpdateListingEntry(IgniteUuid fileId, + String fileName, + long lenDelta, + long accessTime, + long modificationTime) { this.fileId = fileId; this.fileName = fileName; this.lenDelta = lenDelta; @@ -2785,13 +2789,18 @@ public class GridGgfsMetaManager extends GridGgfsManager { } /** {@inheritDoc} */ - @Override public GridGgfsFileInfo apply(GridGgfsFileInfo fileInfo) { + @Override public Void process(MutableEntry<IgniteUuid, GridGgfsFileInfo> e, Object... args) { + GridGgfsFileInfo fileInfo = e.getValue(); + Map<String, GridGgfsListingEntry> listing = fileInfo.listing(); GridGgfsListingEntry entry = listing.get(fileName); - if (entry == null || !entry.fileId().equals(fileId)) - return fileInfo; + if (entry == null || !entry.fileId().equals(fileId)) { + e.setValue(fileInfo); + + return null; + } entry = new GridGgfsListingEntry(entry, entry.length() + lenDelta, accessTime == -1 ? entry.accessTime() : accessTime, @@ -2803,7 +2812,9 @@ public class GridGgfsMetaManager extends GridGgfsManager { // Modify listing map in-place since map is serialization-safe. listing.put(fileName, entry); - return new GridGgfsFileInfo(listing, fileInfo); + e.setValue(new GridGgfsFileInfo(listing, fileInfo)); + + return null; } /** {@inheritDoc} */ @@ -2829,7 +2840,7 @@ public class GridGgfsMetaManager extends GridGgfsManager { * Update directory listing closure. */ @GridInternal - private static final class UpdateListing implements IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo>, + private static final class UpdateListing implements EntryProcessor<IgniteUuid, GridGgfsFileInfo, Void>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -2868,7 +2879,9 @@ public class GridGgfsMetaManager extends GridGgfsManager { } /** {@inheritDoc} */ - @Override @Nullable public GridGgfsFileInfo apply(GridGgfsFileInfo fileInfo) { + @Override public Void process(MutableEntry<IgniteUuid, GridGgfsFileInfo> e, Object... args) { + GridGgfsFileInfo fileInfo = e.getValue(); + assert fileInfo != null : "File info not found for the child: " + entry.fileId(); assert fileInfo.isDirectory(); @@ -2897,7 +2910,9 @@ public class GridGgfsMetaManager extends GridGgfsManager { ", oldEntry=" + oldEntry + ']'); } - return new GridGgfsFileInfo(listing, fileInfo); + e.setValue(new GridGgfsFileInfo(listing, fileInfo)); + + return null; } /** {@inheritDoc} */ @@ -2924,7 +2939,7 @@ public class GridGgfsMetaManager extends GridGgfsManager { * Update path closure. */ @GridInternal - private static final class UpdatePath implements IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo>, + private static final class UpdatePath implements EntryProcessor<IgniteUuid, GridGgfsFileInfo, Void>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -2943,11 +2958,16 @@ public class GridGgfsMetaManager extends GridGgfsManager { * Default constructor (required by Externalizable). */ public UpdatePath() { + // No-op. } /** {@inheritDoc} */ - @Override public GridGgfsFileInfo apply(GridGgfsFileInfo info) { - return GridGgfsFileInfo.builder(info).path(path).build(); + @Override public Void process(MutableEntry<IgniteUuid, GridGgfsFileInfo> e, Object... args) { + GridGgfsFileInfo info = e.getValue(); + + e.setValue(GridGgfsFileInfo.builder(info).path(path).build()); + + return null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicInvokeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicInvokeTest.java new file mode 100644 index 0000000..6a97b19 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicInvokeTest.java @@ -0,0 +1,47 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*; +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public class IgniteCacheAtomicInvokeTest extends IgniteCacheInvokeAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() { + return CLOCK; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalInvokeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalInvokeTest.java new file mode 100644 index 0000000..7048f18 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalInvokeTest.java @@ -0,0 +1,41 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public class IgniteCacheAtomicLocalInvokeTest extends IgniteCacheInvokeAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return LOCAL; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java new file mode 100644 index 0000000..2ff0468 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java @@ -0,0 +1,22 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache; + +import org.gridgain.grid.cache.store.*; + +/** + * + */ +public class IgniteCacheAtomicLocalWithStoreInvokeTest extends IgniteCacheAtomicLocalInvokeTest { + /** {@inheritDoc} */ + @Override protected GridCacheStore<?, ?> cacheStore() { + return new TestStore(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicNearEnabledInvokeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicNearEnabledInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicNearEnabledInvokeTest.java new file mode 100644 index 0000000..8f4d71c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicNearEnabledInvokeTest.java @@ -0,0 +1,24 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; + +/** + * + */ +public class IgniteCacheAtomicNearEnabledInvokeTest extends IgniteCacheAtomicInvokeTest { + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return NEAR_PARTITIONED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java index 8731306..068476c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java @@ -39,7 +39,7 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT public void testInvoke() throws Exception { // TODO IGNITE41 test with forceTransformBackups. - invoke(null); + invoke(null); if (atomicityMode() == TRANSACTIONAL) { invoke(PESSIMISTIC); @@ -165,19 +165,21 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT invokeAll(cache, new HashSet<>(primaryKeys(cache, 3, 0)), txMode); - invokeAll(cache, new HashSet<>(backupKeys(cache, 3, 0)), txMode); + if (gridCount() > 1) { + invokeAll(cache, new HashSet<>(backupKeys(cache, 3, 0)), txMode); - invokeAll(cache, new HashSet<>(nearKeys(cache, 3, 0)), txMode); + invokeAll(cache, new HashSet<>(nearKeys(cache, 3, 0)), txMode); - Set<Integer> keys = new HashSet<>(); + Set<Integer> keys = new HashSet<>(); - keys.addAll(primaryKeys(jcache(0), 3, 0)); - keys.addAll(primaryKeys(jcache(1), 3, 0)); - keys.addAll(primaryKeys(jcache(2), 3, 0)); + keys.addAll(primaryKeys(jcache(0), 3, 0)); + keys.addAll(primaryKeys(jcache(1), 3, 0)); + keys.addAll(primaryKeys(jcache(2), 3, 0)); - invokeAll(cache, keys, txMode); + invokeAll(cache, keys, txMode); + } - keys = new HashSet<>(); + Set<Integer> keys = new HashSet<>(); for (int i = 0; i < 1000; i++) keys.add(i); @@ -415,7 +417,6 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT /** {@inheritDoc} */ @Override public Integer process(MutableEntry<Integer, Integer> e, Object... arguments) throws EntryProcessorException { - System.out.println(Thread.currentThread() + " compute, old=" + e.getValue()); if (e.exists()) { Integer val = e.getValue(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxLocalInvokeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxLocalInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxLocalInvokeTest.java new file mode 100644 index 0000000..20576ab --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxLocalInvokeTest.java @@ -0,0 +1,41 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public class IgniteCacheTxLocalInvokeTest extends IgniteCacheInvokeAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return LOCAL; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxNearEnabledInvokeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxNearEnabledInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxNearEnabledInvokeTest.java new file mode 100644 index 0000000..ffc50ff --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxNearEnabledInvokeTest.java @@ -0,0 +1,24 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; + +/** + * + */ +public class IgniteCacheTxNearEnabledInvokeTest extends IgniteCacheTxInvokeTest { + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return NEAR_PARTITIONED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java index a57da71..0d3b54f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java @@ -363,12 +363,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs createUpdatePutAll(null); if (atomicityMode() == TRANSACTIONAL) { - IgniteTxConcurrency[] txModes; - - if (cacheMode() == LOCAL) - txModes= new IgniteTxConcurrency[]{PESSIMISTIC}; - else - txModes= new IgniteTxConcurrency[]{PESSIMISTIC, OPTIMISTIC}; + IgniteTxConcurrency[] txModes = new IgniteTxConcurrency[]{PESSIMISTIC, OPTIMISTIC}; for (IgniteTxConcurrency tx : txModes) { for (final Integer key : keys()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java index fd3751c..9d56c7f 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -891,6 +891,10 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assertEquals("3", res.get("key3").get()); assertEquals(3, res.size()); + + cache.remove("key1"); + cache.put("key2", 1); + cache.put("key3", 3); } Map<String, EntryProcessorResult<String>> res = cache.invokeAll(F.asSet("key1", "key2", "key3"), RMV_PROCESSOR); @@ -901,9 +905,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assertNull(cache(i).peek("key3")); } - assertEquals("1", res.get("key1").get()); - assertEquals("2", res.get("key2").get()); - assertEquals("4", res.get("key3").get()); + assertEquals("null", res.get("key1").get()); + assertEquals("1", res.get("key2").get()); + assertEquals("3", res.get("key3").get()); assertEquals(3, res.size()); @@ -928,9 +932,23 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract * @throws Exception If failed. */ public void testTransformAllWithNulls() throws Exception { - final GridCacheProjection<String, Integer> cache = cache(); + final IgniteCache<String, Integer> cache = jcache(); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.invokeAll(null, INCR_PROCESSOR); + + return null; + } + }, NullPointerException.class, null); - cache.transformAll(null); // This should be no-op. + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.invokeAll(F.asSet("key1"), null); + + return null; + } + }, NullPointerException.class, null); { Map<String, Integer> m = new HashMap<>(2); @@ -944,38 +962,14 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract } { - Map<String, IgniteClosure<Integer, Integer>> tm = new HashMap<>(2); - - tm.put("key1", INCR_PROCESSOR); - tm.put(null, INCR_PROCESSOR); - - // WARN: F.asMap() doesn't work here, because it will throw NPE. - - cache.transformAll(tm); - } - - { - Map<String, IgniteClosure<Integer, Integer>> tm = new HashMap<>(2); - - tm.put("key1", INCR_PROCESSOR); - tm.put("key2", null); - - // WARN: F.asMap() doesn't work here, because it will throw NPE. - - cache.transformAll(tm); - } - - cache.transformAll(null, INCR_PROCESSOR); // This should be no-op. - - { - Set<String> ts = new HashSet<>(3); + Set<String> keys = new HashSet<>(2); - ts.add("key1"); - ts.add(null); + keys.add("key1"); + keys.add(null); // WARN: F.asSet() doesn't work here, because it will throw NPE. - cache.transformAll(ts, INCR_PROCESSOR); + cache.invokeAll(keys, INCR_PROCESSOR); } } @@ -1014,17 +1008,17 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract */ private void checkTransformSequential0(boolean startVal, IgniteTxConcurrency concurrency) throws Exception { - GridCacheProjection<String, Integer> cache = cache(); + IgniteCache<String, Integer> cache = jcache(); - IgniteTx tx = txEnabled() ? cache.txStart(concurrency, READ_COMMITTED) : null; + IgniteTx tx = txEnabled() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null; try { if (startVal) cache.put("key", 2); - cache.transform("key", INCR_PROCESSOR); - cache.transform("key", INCR_PROCESSOR); - cache.transform("key", INCR_PROCESSOR); + cache.invoke("key", INCR_PROCESSOR); + cache.invoke("key", INCR_PROCESSOR); + cache.invoke("key", INCR_PROCESSOR); if (tx != null) tx.commit(); @@ -1063,18 +1057,18 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract * @throws Exception If failed. */ private void checkTransformAfterRemove(IgniteTxConcurrency concurrency) throws Exception { - GridCacheProjection<String, Integer> cache = cache(); + IgniteCache<String, Integer> cache = jcache(); cache.put("key", 4); - IgniteTx tx = txEnabled() ? cache.txStart(concurrency, READ_COMMITTED) : null; + IgniteTx tx = txEnabled() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null; try { cache.remove("key"); - cache.transform("key", INCR_PROCESSOR); - cache.transform("key", INCR_PROCESSOR); - cache.transform("key", INCR_PROCESSOR); + cache.invoke("key", INCR_PROCESSOR); + cache.invoke("key", INCR_PROCESSOR); + cache.invoke("key", INCR_PROCESSOR); if (tx != null) tx.commit(); @@ -1128,20 +1122,23 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract * @param isolation Isolation. * @throws Exception If failed. */ - private void checkTransformReturnValue(boolean put, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation) throws Exception { - GridCacheProjection<String, Integer> cache = cache(); + private void checkTransformReturnValue(boolean put, + IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation) + throws Exception + { + IgniteCache<String, Integer> cache = jcache(); if (!put) cache.put("key", 1); - IgniteTx tx = txEnabled() ? cache.txStart(concurrency, isolation) : null; + IgniteTx tx = txEnabled() ? ignite(0).transactions().txStart(concurrency, isolation) : null; try { if (put) cache.put("key", 1); - cache.transform("key", INCR_PROCESSOR); + cache.invoke("key", INCR_PROCESSOR); assertEquals((Integer)2, cache.get("key")); @@ -1211,33 +1208,25 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract /** * @throws Exception If failed. */ - public void testTransformEntry() throws Exception { - GridCacheEntry<String, Integer> entry = cache().entry("test"); + public void testInvokeAsync() throws Exception { + IgniteCache<String, Integer> cache = jcache(); - entry.setValue(1); + cache.put("key2", 1); + cache.put("key3", 3); - // Make user entry capture cache entry. - entry.version(); + cache = cache.enableAsync(); - assertEquals((Integer)1, entry.getValue()); + assertNull(cache.invoke("key1", INCR_PROCESSOR)); - entry.transform(INCR_PROCESSOR); + IgniteFuture<?> fut0 = cache.future(); - assertEquals((Integer)2, entry.getValue()); - } + assertNull(cache.invoke("key2", INCR_PROCESSOR)); - /** - * @throws Exception If failed. - */ - public void testTransformAsync() throws Exception { - GridCacheProjection<String, Integer> cache = cache(); + IgniteFuture<?> fut1 = cache.future(); - cache.put("key2", 1); - cache.put("key3", 3); + assertNull(cache.invoke("key3", RMV_PROCESSOR)); - IgniteFuture<?> fut0 = cache.transformAsync("key1", INCR_PROCESSOR); - IgniteFuture<?> fut1 = cache.transformAsync("key2", INCR_PROCESSOR); - IgniteFuture<?> fut2 = cache.transformAsync("key3", RMV_PROCESSOR); + IgniteFuture<?> fut2 = cache.future(); fut0.get(); fut1.get(); @@ -1254,46 +1243,54 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract /** * @throws Exception If failed. */ - public void testTransformCompute() throws Exception { - GridCacheProjection<String, Integer> cache = cache(); - - IgniteClosure<Integer, IgniteBiTuple<Integer, String>> c; - - c = new IgniteClosure<Integer, IgniteBiTuple<Integer, String>>() { - @Override public IgniteBiTuple<Integer, String> apply(Integer val) { - return val == null ? new IgniteBiTuple<>(0, "null") : new IgniteBiTuple<>(val + 1, String.valueOf(val)); - } - }; + public void testInvoke() throws Exception { + final IgniteCache<String, Integer> cache = jcache(); - assertEquals("null", cache.transformAndCompute("k0", c)); + assertEquals("null", cache.invoke("k0", INCR_PROCESSOR)); - assertEquals((Integer)0, cache.get("k0")); + assertEquals((Integer)1, cache.get("k0")); - assertEquals("0", cache.transformAndCompute("k0", c)); + assertEquals("1", cache.invoke("k0", INCR_PROCESSOR)); - assertEquals((Integer)1, cache.get("k0")); + assertEquals((Integer)2, cache.get("k0")); cache.put("k1", 1); - assertEquals("1", cache.transformAndCompute("k1", c)); + assertEquals("1", cache.invoke("k1", INCR_PROCESSOR)); assertEquals((Integer)2, cache.get("k1")); - assertEquals("2", cache.transformAndCompute("k1", c)); + assertEquals("2", cache.invoke("k1", INCR_PROCESSOR)); assertEquals((Integer)3, cache.get("k1")); - c = new IgniteClosure<Integer, IgniteBiTuple<Integer, String>>() { - @Override public IgniteBiTuple<Integer, String> apply(Integer integer) { - return new IgniteBiTuple<>(null, null); + EntryProcessor<String, Integer, Integer> c = new EntryProcessor<String, Integer, Integer>() { + @Override public Integer process(MutableEntry<String, Integer> e, Object... args) { + e.remove(); + + return null; } }; - assertNull(cache.transformAndCompute("k1", c)); + assertNull(cache.invoke("k1", c)); assertNull(cache.get("k1")); for (int i = 0; i < gridCount(); i++) assertNull(cache(i).peek("k1")); + + final EntryProcessor<String, Integer, Integer> errProcessor = new EntryProcessor<String, Integer, Integer>() { + @Override public Integer process(MutableEntry<String, Integer> e, Object... args) { + throw new EntryProcessorException("Test entry processor exception."); + } + }; + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.invoke("k1", errProcessor); + + return null; + } + }, EntryProcessorException.class, "Test entry processor exception."); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java index 8a929bf..c204e00 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.cache; +import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; import org.gridgain.grid.cache.*; @@ -17,6 +18,8 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.gridgain.testframework.junits.common.*; +import javax.cache.processor.*; +import java.io.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -108,19 +111,21 @@ public abstract class GridCacheGetAndTransformStoreAbstractTest extends GridComm startGrid(1); startGrid(2); - final IgniteClosure<String, String> trans = new TransformClosure(); + final Processor entryProcessor = new Processor(); IgniteFuture<?> fut = multithreadedAsync( new Callable<Object>() { @Override public Object call() throws Exception { - GridCache<Integer, String> c = cache(ThreadLocalRandom.current().nextInt(3)); + IgniteCache<Integer, String> c = jcache(ThreadLocalRandom.current().nextInt(3)); while (!finish.get() && !Thread.currentThread().isInterrupted()) { c.get(ThreadLocalRandom.current().nextInt(100)); + c.put(ThreadLocalRandom.current().nextInt(100), "s"); - c.transform( + + c.invoke( ThreadLocalRandom.current().nextInt(100), - trans); + entryProcessor); } return null; @@ -147,10 +152,12 @@ public abstract class GridCacheGetAndTransformStoreAbstractTest extends GridComm /** * */ - private static class TransformClosure implements IgniteClosure<String, String> { + private static class Processor implements EntryProcessor<Integer, String, Void>, Serializable { /** {@inheritDoc} */ - @Override public String apply(String s) { - return "str"; + @Override public Void process(MutableEntry<Integer, String> e, Object... args) { + e.setValue("str"); + + return null; } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java index 44b8618..0e8602c 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java @@ -10,16 +10,17 @@ package org.gridgain.grid.kernal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; import org.gridgain.grid.cache.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.gridgain.grid.util.typedef.*; import org.gridgain.testframework.*; import org.gridgain.testframework.junits.common.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; import java.util.concurrent.atomic.*; @@ -162,7 +163,7 @@ public class GridCacheIncrementTransformTest extends GridCommonAbstractTest { ignite = restarts ? grids.getAndSet(idx, null) : grid(idx); } - GridCache <String, TestObject> cache = ignite.cache(null); + IgniteCache<String, TestObject> cache = ignite.jcache(null); assertNotNull(cache); @@ -173,11 +174,11 @@ public class GridCacheIncrementTransformTest extends GridCommonAbstractTest { while (true) { try { - cache.transform("key", new Transformer()); + cache.invoke("key", new Processor()); break; } - catch (GridCachePartialUpdateException ignored) { + catch (CachePartialUpdateException ignored) { // Need to re-check if update actually succeeded. TestObject updated = cache.get("key"); @@ -210,12 +211,16 @@ public class GridCacheIncrementTransformTest extends GridCommonAbstractTest { } /** */ - private static class Transformer implements C1<TestObject, TestObject> { + private static class Processor implements EntryProcessor<String, TestObject, Void>, Serializable { /** {@inheritDoc} */ - @Override public TestObject apply(TestObject obj) { + @Override public Void process(MutableEntry<String, TestObject> e, Object... args) { + TestObject obj = e.getValue(); + assert obj != null; - return new TestObject(obj.val + 1); + e.setValue(new TestObject(obj.val + 1)); + + return null; } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java index b42406d..66abdc6 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.cache; +import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; @@ -18,6 +19,7 @@ import org.gridgain.grid.util.typedef.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; import java.util.*; import java.util.concurrent.atomic.*; @@ -1172,27 +1174,28 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst private void cacheUpdate(int grid, boolean rmv, Operation op, String key, final Integer val, @Nullable final Integer expOld, @Nullable final Integer expRmvRet) throws Exception { - GridCache<String, Integer> cache = cache(grid); + IgniteCache<String, Integer> cache = jcache(grid); if (rmv) { assertNull(val); switch (op) { case UPDATE: { - assertEquals(expRmvRet, cache.remove(key)); + assertEquals(expRmvRet, cache.getAndRemove(key)); break; } case UPDATEX: { - cache.removex(key); + cache.remove(key); break; } case UPDATE_FILTER: { - Object old = cache.remove(key, new IgnitePredicate<GridCacheEntry<String, Integer>>() { - @Override public boolean apply(GridCacheEntry<String, Integer> entry) { + Object old = cache.getAndRemoveIf(key, new IgnitePredicate<GridCacheEntry<String, Integer>>() { + @Override + public boolean apply(GridCacheEntry<String, Integer> entry) { return true; } }); @@ -1203,10 +1206,15 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst } case TRANSFORM: { - cache.transform(key, new IgniteClosure<Integer, Integer>() { - @Nullable @Override public Integer apply(Integer old) { + cache.invoke(key, new EntryProcessor<String, Integer, Void>() { + @Override + public Void process(MutableEntry<String, Integer> e, Object... args) { + Integer old = e.getValue(); + assertEquals(expOld, old); + e.remove(); + return null; } }); @@ -1221,20 +1229,21 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst else { switch (op) { case UPDATE: { - assertEquals(expOld, cache.put(key, val)); + assertEquals(expOld, cache.getAndPut(key, val)); break; } case UPDATEX: { - cache.putx(key, val); + cache.put(key, val); break; } case UPDATE_FILTER: { - Object old = cache.put(key, val, new P1<GridCacheEntry<String, Integer>>() { - @Override public boolean apply(GridCacheEntry<String, Integer> entry) { + Object old = cache.getAndPutIf(key, val, new P1<GridCacheEntry<String, Integer>>() { + @Override + public boolean apply(GridCacheEntry<String, Integer> entry) { return true; } }); @@ -1245,11 +1254,16 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst } case TRANSFORM: { - cache.transform(key, new IgniteClosure<Integer, Integer>() { - @Override public Integer apply(Integer old) { + cache.invoke(key, new EntryProcessor<String, Integer, Void>() { + @Override + public Void process(MutableEntry<String, Integer> e, Object... args) { + Integer old = e.getValue(); + assertEquals(expOld, old); - return val; + e.setValue(val); + + return null; } }); @@ -1294,7 +1308,7 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst @SuppressWarnings("unchecked") private void cacheBatchUpdate(int grid, boolean rmv, Operation op, final Map<String, Integer> map) throws Exception { - GridCache<String, Integer> cache = cache(grid); + IgniteCache<String, Integer> cache = jcache(grid); if (rmv) { switch (op) { @@ -1305,8 +1319,10 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst } case TRANSFORM: { - cache.transformAll(map.keySet(), new IgniteClosure<Integer, Integer>() { - @Nullable @Override public Integer apply(Integer old) { + cache.invokeAll(map.keySet(), new EntryProcessor<String, Integer, Void>() { + @Override public Void process(MutableEntry<String, Integer> e, Object... args) { + e.remove(); + return null; } }); @@ -1327,17 +1343,13 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst } case TRANSFORM: { - Map<String, IgniteClosure<Integer, Integer>> m = new HashMap<>(); - - for (final String key : map.keySet()) { - m.put(key, new IgniteClosure<Integer, Integer>() { - @Override public Integer apply(Integer old) { - return map.get(key); - } - }); - } + cache.invokeAll(map.keySet(), new EntryProcessor<String, Integer, Void>() { + @Override public Void process(MutableEntry<String, Integer> e, Object... args) { + e.setValue(map.get(e.getKey())); - cache.transformAll(m); + return null; + } + }); break; }