# ignite-44
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/928aa3d4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/928aa3d4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/928aa3d4 Branch: refs/heads/ignite-54 Commit: 928aa3d48c7a29dc101c10866a8c6bde66492953 Parents: 71ee2ee Author: sboikov <sboi...@gridgain.com> Authored: Wed Dec 24 17:45:42 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Dec 24 17:45:42 2014 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheInvokeResult.java | 2 +- .../processors/cache/GridCacheAdapter.java | 108 +++++- .../processors/cache/GridCacheReturn.java | 57 ++-- .../GridDistributedLockResponse.java | 19 +- .../GridDistributedTxRemoteAdapter.java | 6 +- .../dht/GridDhtTransactionalCacheAdapter.java | 2 +- .../cache/distributed/dht/GridDhtTxLocal.java | 9 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 13 +- .../cache/distributed/dht/GridDhtTxRemote.java | 8 +- .../dht/atomic/GridDhtAtomicCache.java | 21 +- .../colocated/GridDhtColocatedLockFuture.java | 68 ++-- .../distributed/near/GridNearLockResponse.java | 1 + .../cache/transactions/IgniteTxAdapter.java | 16 +- .../cache/transactions/IgniteTxEntry.java | 77 +++-- .../cache/transactions/IgniteTxHandler.java | 2 +- .../transactions/IgniteTxLocalAdapter.java | 339 +++++++++++++------ .../cache/transactions/IgniteTxLocalEx.java | 20 +- .../cache/IgniteCacheInvokeAbstractTest.java | 163 ++++++++- .../cache/IgniteCacheTxInvokeTest.java | 41 +++ .../cache/GridCacheAbstractFullApiSelfTest.java | 115 ++++--- .../cache/GridCacheAbstractSelfTest.java | 15 + 21 files changed, 817 insertions(+), 285 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java index 50af119..5f472d7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java @@ -25,7 +25,7 @@ public class CacheInvokeResult<T> implements EntryProcessorResult<T>, Externaliz private static final long serialVersionUID = 0L; /** */ - @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"}) + @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "JavaAbbreviationUsage", "UnusedDeclaration"}) private static Object GG_CLASS_ID; /** */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index b3e567c..62daeb9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -2194,22 +2194,97 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im } /** {@inheritDoc} */ - @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) + @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke( + final K key, + final EntryProcessor<K, V, T> entryProcessor, + final Object... args) throws EntryProcessorException { - // TODO IGNITE-44. - return null; + A.notNull(key, "key", entryProcessor, "entryProcessor"); + + if (keyCheck) + validateCacheKey(key); + + ctx.denyOnLocalRead(); + + IgniteFuture<?> fut = asyncOp(new AsyncInOp(key) { + @Override public IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> inOp(IgniteTxLocalAdapter<K, V> tx) { + Map<? extends K, EntryProcessor> invokeMap = + Collections.singletonMap(key, (EntryProcessor)entryProcessor); + + return tx.invokeAsync(ctx, false, invokeMap, args); + } + + @Override public String toString() { + return "invokeAsync [key=" + key + ", entryProcessor=" + entryProcessor + ']'; + } + }); + + IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut0 = + (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)fut; + + return fut0.chain(new CX1<IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>, EntryProcessorResult<T>>() { + @Override public EntryProcessorResult<T> applyx(IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut) + throws IgniteCheckedException { + GridCacheReturn<Map<K, EntryProcessorResult<T>>> ret = fut.get(); + + Map<K, EntryProcessorResult<T>> resMap = ret.value(); + + assert resMap != null; + assert resMap.size() == 1 : resMap.size(); + + return resMap.values().iterator().next(); + } + }); } /** {@inheritDoc} */ - @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys, - EntryProcessor<K, V, T> entryProcessor, - Object... args) { - // TODO IGNITE-44. - return null; + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll( + final Set<? extends K> keys, + final EntryProcessor<K, V, T> entryProcessor, + final Object... args) { + A.notNull(entryProcessor, "entryProcessor"); + + if (keyCheck) + validateCacheKeys(keys); + + ctx.denyOnLocalRead(); + + IgniteFuture<?> fut = asyncOp(new AsyncInOp(keys) { + @Override public IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> inOp(IgniteTxLocalAdapter<K, V> tx) { + Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() { + @Override public EntryProcessor apply(K k) { + return entryProcessor; + } + }); + + return tx.invokeAsync(ctx, false, invokeMap, args); + } + + @Override public String toString() { + return "invokeAllAsync [keys=" + keys + ", entryProcessor=" + entryProcessor + ']'; + } + }); + + IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut0 = + (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)fut; + + return fut0.chain(new CX1<IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>, Map<K, EntryProcessorResult<T>>>() { + @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut) + throws IgniteCheckedException { + GridCacheReturn<Map<K, EntryProcessorResult<T>>> ret = fut.get(); + + assert ret != null; + + return ret.value(); + } + }); } /** {@inheritDoc} */ @Override public void transform(final K key, final IgniteClosure<V, V> transformer) throws IgniteCheckedException { + // TODO IGNITE-44. + throw new UnsupportedOperationException(); + /* A.notNull(key, "key", transformer, "valTransform"); if (keyCheck) @@ -2226,11 +2301,15 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im return "transform [key=" + key + ", valTransform=" + transformer + ']'; } }); + */ } /** {@inheritDoc} */ @Override public <R> R transformAndCompute(final K key, final IgniteClosure<V, IgniteBiTuple<V, R>> transformer) throws IgniteCheckedException { + // TODO IGNITE-44. + throw new UnsupportedOperationException(); + /* A.notNull(key, "key", transformer, "transformer"); if (keyCheck) @@ -2250,6 +2329,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im return "transformAndCompute [key=" + key + ", valTransform=" + transformer + ']'; } }); + */ } /** {@inheritDoc} */ @@ -2291,6 +2371,9 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im /** {@inheritDoc} */ @Override public IgniteFuture<?> transformAsync(final K key, final IgniteClosure<V, V> transformer, @Nullable final GridCacheEntryEx<K, V> entry, final long ttl) { + // TODO IGNITE-44. + throw new UnsupportedOperationException(); + /* A.notNull(key, "key", transformer, "transformer"); if (keyCheck) @@ -2307,6 +2390,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im return "transformAsync [key=" + key + ", valTransform=" + transformer + ']'; } }); + */ } /** {@inheritDoc} */ @@ -2581,6 +2665,9 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im /** {@inheritDoc} */ @Override public void transformAll(@Nullable final Map<? extends K, ? extends IgniteClosure<V, V>> m) throws IgniteCheckedException { + // TODO IGNITE-44. + throw new UnsupportedOperationException(); + /* if (F.isEmpty(m)) return; @@ -2598,6 +2685,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im return "transformAll [map=" + m + ']'; } }); + */ } /** {@inheritDoc} */ @@ -2640,6 +2728,9 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im /** {@inheritDoc} */ @Override public IgniteFuture<?> transformAllAsync(@Nullable final Map<? extends K, ? extends IgniteClosure<V, V>> m) { + // TODO IGNITE-44. + throw new UnsupportedOperationException(); + /* if (F.isEmpty(m)) return new GridFinishedFuture<>(ctx.kernalContext()); @@ -2657,6 +2748,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im return "transformAllAsync [map=" + m + ']'; } }); + */ } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturn.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturn.java index e9c476a..ab05b34 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturn.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturn.java @@ -14,7 +14,9 @@ import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.grid.util.tostring.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; import java.io.*; +import java.util.*; /** * Return value for cases where both, value and success flag need to be returned. @@ -24,7 +26,7 @@ public class GridCacheReturn<V> implements Externalizable, IgniteOptimizedMarsha private static final long serialVersionUID = 0L; /** */ - @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"}) + @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "JavaAbbreviationUsage", "UnusedDeclaration"}) private static Object GG_CLASS_ID; /** Value. */ @@ -42,13 +44,6 @@ public class GridCacheReturn<V> implements Externalizable, IgniteOptimizedMarsha } /** - * @param v Value. - */ - public GridCacheReturn(V v) { - this.v = v; - } - - /** * * @param success Success flag. */ @@ -93,17 +88,6 @@ public class GridCacheReturn<V> implements Externalizable, IgniteOptimizedMarsha } /** - * @param v Value. - * @return This instance for chaining. - */ - public GridCacheReturn<V> valueIfNull(V v) { - if (this.v == null) - this.v = v; - - return this; - } - - /** * @return Success flag. */ public boolean success() { @@ -123,27 +107,34 @@ public class GridCacheReturn<V> implements Externalizable, IgniteOptimizedMarsha } /** - * @param v Value. * @param success Success flag. * @return This instance for chaining. */ - public GridCacheReturn<V> setIfNull(V v, boolean success) { - if (this.v == null) { - this.v = v; - this.success = success; - } + public GridCacheReturn<V> success(boolean success) { + this.success = success; return this; } /** - * @param success Success flag. - * @return This instance for chaining. + * @param key Key. + * @param res Result. */ - public GridCacheReturn<V> success(boolean success) { - this.success = success; + @SuppressWarnings("unchecked") + public synchronized void addEntryProcessResult(Object key, EntryProcessorResult<?> res) { + assert v == null || v instanceof Map : v; + assert key != null; + assert res != null; - return this; + HashMap<Object, EntryProcessorResult> resMap = (HashMap<Object, EntryProcessorResult>)v; + + if (resMap == null) { + resMap = new HashMap<>(); + + v = (V)resMap; + } + + resMap.put(key, res); } /** {@inheritDoc} */ @@ -157,11 +148,15 @@ public class GridCacheReturn<V> implements Externalizable, IgniteOptimizedMarsha out.writeObject(v); } + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { success = in.readBoolean(); v = (V)in.readObject(); } /** {@inheritDoc} */ - @Override public String toString() { return S.toString(GridCacheReturn.class, this); } + @Override public String toString() { + return S.toString(GridCacheReturn.class, this); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockResponse.java index 8edfc7c..76fd449 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockResponse.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockResponse.java @@ -57,11 +57,15 @@ public class GridDistributedLockResponse<K, V> extends GridDistributedBaseMessag } /** + * @param cacheId Cache ID. * @param lockVer Lock version. * @param futId Future ID. * @param cnt Key count. */ - public GridDistributedLockResponse(int cacheId, GridCacheVersion lockVer, IgniteUuid futId, int cnt) { + public GridDistributedLockResponse(int cacheId, + GridCacheVersion lockVer, + IgniteUuid futId, + int cnt) { super(lockVer, cnt); assert futId != null; @@ -74,11 +78,15 @@ public class GridDistributedLockResponse<K, V> extends GridDistributedBaseMessag } /** + * @param cacheId Cache ID. * @param lockVer Lock ID. * @param futId Future ID. * @param err Error. */ - public GridDistributedLockResponse(int cacheId, GridCacheVersion lockVer, IgniteUuid futId, Throwable err) { + public GridDistributedLockResponse(int cacheId, + GridCacheVersion lockVer, + IgniteUuid futId, + Throwable err) { super(lockVer, 0); assert futId != null; @@ -89,12 +97,17 @@ public class GridDistributedLockResponse<K, V> extends GridDistributedBaseMessag } /** + * @param cacheId Cache ID. * @param lockVer Lock ID. * @param futId Future ID. * @param cnt Count. * @param err Error. */ - public GridDistributedLockResponse(int cacheId, GridCacheVersion lockVer, IgniteUuid futId, int cnt, Throwable err) { + public GridDistributedLockResponse(int cacheId, + GridCacheVersion lockVer, + IgniteUuid futId, + int cnt, + Throwable err) { super(lockVer, cnt); assert futId != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 4a93646..d507d0d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -83,6 +83,8 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> * @param timeout Timeout. * @param txSize Expected transaction size. * @param grpLockKey Group lock key if this is a group-lock transaction. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. */ public GridDistributedTxRemoteAdapter( GridCacheSharedContext<K, V> ctx, @@ -325,7 +327,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> else { // Copy values. entry.value(e.value(), e.hasWriteValue(), e.hasReadValue()); - entry.transformClosures(e.transformClosures()); + entry.entryProcessors(e.entryProcessors()); entry.valueBytes(e.valueBytes()); entry.op(e.op()); entry.ttl(e.ttl()); @@ -481,7 +483,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> if (updateNearCache(cacheCtx, txEntry.key(), topVer)) nearCached = cacheCtx.dht().near().peekExx(txEntry.key()); - if (!F.isEmpty(txEntry.transformClosures()) || !F.isEmpty(txEntry.filters())) + if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters())) txEntry.cached().unswap(true, false); GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(txEntry, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index dc8ddcb..b7ff63e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -215,7 +215,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.keyBytes() != null ? req.keyBytes().get(i) : null, writeEntry == null ? null : writeEntry.value(), writeEntry == null ? null : writeEntry.valueBytes(), - writeEntry == null ? null : writeEntry.transformClosures(), + writeEntry == null ? null : writeEntry.entryProcessors(), drVer, req.accessTtl()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java index 0f11ecc..32e43a7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -340,8 +340,13 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements * @return Future that will be completed when locks are acquired. */ public IgniteFuture<IgniteTxEx<K, V>> prepareAsync(@Nullable Iterable<IgniteTxEntry<K, V>> reads, - @Nullable Iterable<IgniteTxEntry<K, V>> writes, Map<IgniteTxKey<K>, GridCacheVersion> verMap, long msgId, - IgniteUuid nearMiniId, Map<UUID, Collection<UUID>> txNodes, boolean last, Collection<UUID> lastBackups) { + @Nullable Iterable<IgniteTxEntry<K, V>> writes, + Map<IgniteTxKey<K>, GridCacheVersion> verMap, + long msgId, + IgniteUuid nearMiniId, + Map<UUID, Collection<UUID>> txNodes, + boolean last, + Collection<UUID> lastBackups) { assert optimistic(); // In optimistic mode prepare still can be called explicitly from salvageTx. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index b752178..55d8f7a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -406,7 +406,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K if (entry != null) { entry.op(e.op()); // Absolutely must set operation, as default is DELETE. entry.value(e.value(), e.hasWriteValue(), e.hasReadValue()); - entry.transformClosures(e.transformClosures()); + entry.entryProcessors(e.entryProcessors()); entry.valueBytes(e.valueBytes()); entry.ttl(e.ttl()); entry.filters(e.filters()); @@ -525,11 +525,13 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K cached.unswap(!read, read); - IgniteTxEntry<K, V> w = writeEntries == null ? null : writeEntries.get(idx++); + IgniteTxEntry<K, V> + w = writeEntries == null ? null : writeEntries.get(idx++); txEntry = addEntry(NOOP, null, null, + null, cached, null, CU.<K, V>empty(), @@ -545,7 +547,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K txEntry.value(w.value(), w.hasWriteValue(), w.hasReadValue()); txEntry.valueBytes(w.valueBytes()); txEntry.drVersion(w.drVersion()); - txEntry.transformClosures(w.transformClosures()); + txEntry.entryProcessors(w.entryProcessors()); txEntry.ttl(w.ttl()); txEntry.filters(w.filters()); txEntry.drExpireTime(w.drExpireTime()); @@ -635,14 +637,13 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K postLockWrite(cacheCtx, passedKeys, skipped, - null, - null, ret, /*remove*/false, /*retval*/false, /*read*/read, accessTtl, - filter == null ? CU.<K, V>empty() : filter); + filter == null ? CU.<K, V>empty() : filter, + /**computeInvoke*/false); return ret; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java index 97ec1af..2b4491f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -16,9 +16,11 @@ import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.*; import org.gridgain.grid.kernal.processors.cache.transactions.*; import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; @@ -280,7 +282,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> * @param val Value. * @param valBytes Value bytes. * @param drVer Data center replication version. - * @param clos Transform closures. + * @param entryProcessors Entry processors. * @param ttl TTL. */ public void addWrite(GridCacheContext<K, V> cacheCtx, @@ -289,7 +291,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> byte[] keyBytes, @Nullable V val, @Nullable byte[] valBytes, - @Nullable Collection<IgniteClosure<V, V>> clos, + @Nullable Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessors, @Nullable GridCacheVersion drVer, long ttl) { checkInternal(key); @@ -310,7 +312,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> txEntry.keyBytes(keyBytes); txEntry.valueBytes(valBytes); - txEntry.transformClosures(clos); + txEntry.entryProcessors(entryProcessors); writeMap.put(key, txEntry); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index fec59b2..78d92f8 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -315,7 +315,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, false, entry, - ttl, filter); } @@ -331,7 +330,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, entry, - ttl, filter); } @@ -412,7 +410,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, true, null, - 0, ctx.equalsPeekArray(oldVal)); } @@ -433,7 +430,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, null, - 0, filter); } @@ -454,7 +450,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, null, - 0, null); } @@ -648,18 +643,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.denyOnLocalRead(); - Map<? extends K, EntryProcessor> transformMap = + Map<? extends K, EntryProcessor> invokeMap = Collections.singletonMap(key, (EntryProcessor)entryProcessor); IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null, - transformMap, + invokeMap, args, null, null, true, false, null, - -1L, null); return fut.chain(new CX1<IgniteFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() { @@ -687,24 +681,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.denyOnLocalRead(); - Map<? extends K, EntryProcessor> transformMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() { + Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() { @Override public EntryProcessor apply(K k) { return entryProcessor; } }); - IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null, - transformMap, + return updateAllAsync0(null, + invokeMap, args, null, null, true, false, null, - -1L, null); - - return fut; } /** @@ -718,7 +709,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param retval Return value required flag. * @param rawRetval Return {@code GridCacheReturn} instance. * @param cached Cached cache entry for key. May be passed if and only if map size is {@code 1}. - * @param ttl Entry time-to-live. * @param filter Cache entry filter for atomic updates. * @return Completion future. */ @@ -731,7 +721,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final boolean retval, final boolean rawRetval, @Nullable GridCacheEntryEx<K, V> cached, - long ttl, @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter ) { if (map != null && keyCheck) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 8c8a8e5..e6a4eb7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -164,15 +164,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @return Participating nodes. */ @Override public Collection<? extends ClusterNode> nodes() { - return - F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) { - if (isMini(f)) - return ((MiniFuture)f).node(); + return F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) { + if (isMini(f)) + return ((MiniFuture)f).node(); - return cctx.discovery().localNode(); - } - }); + return cctx.discovery().localNode(); + } + }); } /** {@inheritDoc} */ @@ -272,18 +271,38 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity } else { // Check transaction entries (corresponding tx entries must be enlisted in transaction). - cand = new GridCacheMvccCandidate<>(entry, cctx.localNodeId(), - null, null, threadId, lockVer, timeout, true, tx.entry(entry.txKey()).locked(), inTx(), - inTx() && tx.implicitSingle(), false, false); + cand = new GridCacheMvccCandidate<>(entry, + cctx.localNodeId(), + null, + null, + threadId, + lockVer, + timeout, + true, + tx.entry(entry.txKey()).locked(), + inTx(), + inTx() && tx.implicitSingle(), + false, + false); cand.topologyVersion(topSnapshot.get().topologyVersion()); } } else { if (cand == null) { - cand = new GridCacheMvccCandidate<>(entry, cctx.localNodeId(), - null, null, threadId, lockVer, timeout, true, false, inTx(), - inTx() && tx.implicitSingle(), false, false); + cand = new GridCacheMvccCandidate<>(entry, + cctx.localNodeId(), + null, + null, + threadId, + lockVer, + timeout, + true, + false, + inTx(), + inTx() && tx.implicitSingle(), + false, + false); cand.topologyVersion(topSnapshot.get().topologyVersion()); } @@ -611,8 +630,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity if (mapAsPrimary(keys, topVer)) return; - ConcurrentLinkedDeque8<GridNearLockMapping<K, V>> mappings = - new ConcurrentLinkedDeque8<>(); + ConcurrentLinkedDeque8<GridNearLockMapping<K, V>> mappings = new ConcurrentLinkedDeque8<>(); // Assign keys to primary nodes. GridNearLockMapping<K, V> map = null; @@ -1270,10 +1288,20 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity else cctx.mvcc().markExplicitOwner(k, threadId); - if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) - cctx.events().addEvent(cctx.affinity().partition(k), k, tx, null, - EVT_CACHE_OBJECT_READ, newVal, newVal != null || newBytes != null, - null, false, CU.subjectId(tx, cctx.shared()), null, tx == null ? null : tx.resolveTaskName()); + if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { + cctx.events().addEvent(cctx.affinity().partition(k), + k, + tx, + null, + EVT_CACHE_OBJECT_READ, + newVal, + newVal != null || newBytes != null, + null, + false, + CU.subjectId(tx, cctx.shared()), + null, + tx == null ? null : tx.resolveTaskName()); + } i++; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java index 7a0c2fd..7711470 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java @@ -57,6 +57,7 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V } /** + * @param cacheId Cache ID. * @param lockVer Lock ID. * @param futId Future ID. * @param miniId Mini future ID. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java index 29e33b8..6fb77a1 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java @@ -1169,7 +1169,8 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter * @throws IgniteCheckedException If failed to get previous value for transform. * @throws GridCacheEntryRemovedException If entry was concurrently deleted. */ - protected GridTuple3<GridCacheOperation, V, byte[]> applyTransformClosures(IgniteTxEntry<K, V> txEntry, + protected GridTuple3<GridCacheOperation, V, byte[]> applyTransformClosures( + IgniteTxEntry<K, V> txEntry, boolean metrics) throws GridCacheEntryRemovedException, IgniteCheckedException { GridCacheContext cacheCtx = txEntry.context(); @@ -1177,7 +1178,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter if (isSystemInvalidate()) return F.t(cacheCtx.isStoreEnabled() ? RELOAD : DELETE, null, null); - if (F.isEmpty(txEntry.transformClosures())) + if (F.isEmpty(txEntry.entryProcessors())) return F.t(txEntry.op(), txEntry.value(), txEntry.valueBytes()); else { try { @@ -1193,19 +1194,12 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter /*event*/recordEvt, /*temporary*/true, /*subjId*/subjId, - /**closure name */recordEvt ? F.first(txEntry.transformClosures()) : null, + /**closure name */recordEvt ? F.first(txEntry.entryProcessors()) : null, resolveTaskName(), CU.<K, V>empty(), null); - try { - for (IgniteClosure<V, V> clos : txEntry.transformClosures()) - val = clos.apply(val); - } - catch (Throwable e) { - throw new IgniteException("Transform closure must not throw any exceptions " + - "(transaction will be invalidated)", e); - } + val = txEntry.applyEntryProcessors(val); GridCacheOperation op = val == null ? DELETE : UPDATE; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java index 17b153d..73d17b5 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java @@ -22,6 +22,7 @@ import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; import javax.cache.expiry.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; import java.util.concurrent.atomic.*; @@ -71,7 +72,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, /** Transform. */ @GridToStringInclude - private Collection<IgniteClosure<V, V>> transformClosCol; + private Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessorsCol; /** Transform closure bytes. */ @GridToStringExclude @@ -192,7 +193,8 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, * @param tx Owning transaction. * @param op Operation. * @param val Value. - * @param transformClos Transform closure. + * @param entryProcessor Entry processor. + * @param invokeArgs Optional arguments for EntryProcessor. * @param ttl Time to live. * @param entry Cache entry. * @param filters Put filters. @@ -202,9 +204,10 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, IgniteTxEx<K, V> tx, GridCacheOperation op, V val, - IgniteClosure<V, V> transformClos, + EntryProcessor<K, V, ?> entryProcessor, + Object[] invokeArgs, long ttl, - GridCacheEntryEx<K,V> entry, + GridCacheEntryEx<K, V> entry, IgnitePredicate<GridCacheEntry<K, V>>[] filters, GridCacheVersion drVer) { assert ctx != null; @@ -220,8 +223,8 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, this.filters = filters; this.drVer = drVer; - if (transformClos != null) - addTransformClosure(transformClos); + if (entryProcessor != null) + addEntryProcessor(entryProcessor, invokeArgs); key = entry.key(); keyBytes = entry.keyBytes(); @@ -299,7 +302,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, cp.filters = filters; cp.val.value(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue()); cp.val.valueBytes(val.valueBytes()); - cp.transformClosCol = transformClosCol; + cp.entryProcessorsCol = entryProcessorsCol; cp.ttl = ttl; cp.drExpireTime = drExpireTime; cp.explicitVer = explicitVer; @@ -605,13 +608,14 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, } /** - * @param transformClos Transform closure. + * @param entryProcessor Entry processor. + * @param invokeArgs Optional arguments for EntryProcessor. */ - public void addTransformClosure(IgniteClosure<V, V> transformClos) { - if (transformClosCol == null) - transformClosCol = new LinkedList<>(); + public void addEntryProcessor(EntryProcessor<K, V, ?> entryProcessor, Object[] invokeArgs) { + if (entryProcessorsCol == null) + entryProcessorsCol = new LinkedList<>(); - transformClosCol.add(transformClos); + entryProcessorsCol.add(new T2<EntryProcessor<K, V, ?>, Object[]>(entryProcessor, invokeArgs)); // Must clear transform closure bytes since collection has changed. transformClosBytes = null; @@ -620,17 +624,41 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, } /** - * @return Collection of transform closures. + * @return Collection of entry processors. */ - public Collection<IgniteClosure<V, V>> transformClosures() { - return transformClosCol; + public Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessors() { + return entryProcessorsCol; + } + + /** + * @param val Value. + * @return New value. + */ + @SuppressWarnings("unchecked") + public V applyEntryProcessors(V val) { + for (T2<EntryProcessor<K, V, ?>, Object[]> t : entryProcessors()) { + try { + CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(key, val); + + EntryProcessor processor = t.get1(); + + processor.process(invokeEntry, t.get2()); + + val = invokeEntry.getValue(); + } + catch (Exception ignore) { + // No-op. + } + } + + return val; } /** - * @param transformClosCol Collection of transform closures. + * @param entryProcessorsCol Collection of entry processors. */ - public void transformClosures(@Nullable Collection<IgniteClosure<V, V>> transformClosCol) { - this.transformClosCol = transformClosCol; + public void entryProcessors(@Nullable Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessorsCol) { + this.entryProcessorsCol = entryProcessorsCol; // Must clear transform closure bytes since collection has changed. transformClosBytes = null; @@ -740,8 +768,8 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, if (keyBytes == null) keyBytes = entry.getOrMarshalKeyBytes(); - if (transformClosBytes == null && transformClosCol != null) - transformClosBytes = CU.marshal(ctx, transformClosCol); + if (transformClosBytes == null && entryProcessorsCol != null) + transformClosBytes = CU.marshal(ctx, entryProcessorsCol); if (F.isEmptyOrNulls(filters)) filterBytes = null; @@ -781,8 +809,8 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, key = ctx.marshaller().unmarshal(keyBytes, clsLdr); // Unmarshal transform closure anyway if it exists. - if (transformClosBytes != null && transformClosCol == null) - transformClosCol = ctx.marshaller().unmarshal(transformClosBytes, clsLdr); + if (transformClosBytes != null && entryProcessorsCol == null) + entryProcessorsCol = ctx.marshaller().unmarshal(transformClosBytes, clsLdr); if (filters == null && filterBytes != null) { filters = ctx.marshaller().unmarshal(filterBytes, clsLdr); @@ -820,7 +848,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, } else { out.writeObject(key); - U.writeCollection(out, transformClosCol); + U.writeCollection(out, entryProcessorsCol); U.writeArray(out, filters); } @@ -850,7 +878,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, } else { key = (K)in.readObject(); - transformClosCol = U.readCollection(in); + entryProcessorsCol = U.readCollection(in); filters = U.readEntryFilterArray(in); } @@ -1022,6 +1050,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, } /** + * @param sharedCtx Shared cache context. * @param ctx Cache context. * @param depEnabled Deployment enabled flag. * @throws IgniteCheckedException If marshaling failed. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java index 1d4b5d7..7284161 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java @@ -1155,7 +1155,7 @@ public class IgniteTxHandler<K, V> { txEntry.keyBytes(), txEntry.value(), txEntry.valueBytes(), - txEntry.transformClosures(), + txEntry.entryProcessors(), txEntry.drVersion(), txEntry.ttl()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java index 84ef5b7..34938d5 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -28,6 +28,7 @@ import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; import javax.cache.expiry.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; import java.util.concurrent.atomic.*; @@ -456,7 +457,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> boolean intercept = e.context().config().getInterceptor() != null; - if (intercept || !F.isEmpty(e.transformClosures())) + if (intercept || !F.isEmpty(e.entryProcessors())) e.cached().unswap(true, false); GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(e, false); @@ -645,7 +646,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (cacheCtx.isNear()) ((GridNearCacheEntry<K, V>)cached).recordDhtVersion(txEntry.dhtVersion()); - if (!F.isEmpty(txEntry.transformClosures()) || !F.isEmpty(txEntry.filters())) + if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters())) txEntry.cached().unswap(true, false); GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(txEntry, @@ -702,7 +703,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> txEntry.value(val, true, false); txEntry.valueBytes(valBytes); txEntry.op(op); - txEntry.transformClosures(null); + txEntry.entryProcessors(null); txEntry.drVersion(explicitVer); } @@ -1061,10 +1062,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> // Read value from locked entry in group-lock transaction as well. if (txEntry.hasValue()) { - if (!F.isEmpty(txEntry.transformClosures())) { - for (IgniteClosure<V, V> clos : txEntry.transformClosures()) - val = clos.apply(val); - } + if (!F.isEmpty(txEntry.entryProcessors())) + val = txEntry.applyEntryProcessors(val); if (val != null) { V val0 = val; @@ -1082,7 +1081,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> try { Object transformClo = (txEntry.op() == TRANSFORM && cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ? - F.first(txEntry.transformClosures()) : null; + F.first(txEntry.entryProcessors()) : null; val = txEntry.cached().innerGet(this, /*swap*/true, @@ -1102,10 +1101,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (!readCommitted()) txEntry.readValue(val); - if (!F.isEmpty(txEntry.transformClosures())) { - for (IgniteClosure<V, V> clos : txEntry.transformClosures()) - val = clos.apply(val); - } + if (!F.isEmpty(txEntry.entryProcessors())) + val = txEntry.applyEntryProcessors(val); V val0 = val; @@ -1195,6 +1192,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> txEntry = addEntry(READ, val, null, + null, entry, expiryPlc, filter, @@ -1229,6 +1227,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> txEntry = addEntry(READ, val, null, + null, entry, expiryPlc, CU.<K, V>empty(), @@ -1344,10 +1343,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (!readCommitted()) txEntry.readValue(val); - if (!F.isEmpty(txEntry.transformClosures())) { - for (IgniteClosure<V, V> clos : txEntry.transformClosures()) - visibleVal = clos.apply(visibleVal); - } + if (!F.isEmpty(txEntry.entryProcessors())) + visibleVal = txEntry.applyEntryProcessors(visibleVal); } // In pessimistic mode we hold the lock, so filter validation @@ -1560,9 +1557,9 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> try { Object transformClo = - (!F.isEmpty(txEntry.transformClosures()) && + (!F.isEmpty(txEntry.entryProcessors()) && cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ? - F.first(txEntry.transformClosures()) : null; + F.first(txEntry.entryProcessors()) : null; V val = cached.innerGet(IgniteTxLocalAdapter.this, cacheCtx.isSwapOrOffheapEnabled(), @@ -1584,10 +1581,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> txEntry.setAndMarkValid(val); - if (!F.isEmpty(txEntry.transformClosures())) { - for (IgniteClosure<V, V> clos : txEntry.transformClosures()) - val = clos.apply(val); - } + if (!F.isEmpty(txEntry.entryProcessors())) + val = txEntry.applyEntryProcessors(val); if (cacheCtx.portableEnabled()) val = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable); @@ -1711,10 +1706,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (!readCommitted()) txEntry.readValue(val); - if (!F.isEmpty(txEntry.transformClosures())) { - for (IgniteClosure<V, V> clos : txEntry.transformClosures()) - val = clos.apply(val); - } + if (!F.isEmpty(txEntry.entryProcessors())) + val = txEntry.applyEntryProcessors(val); retMap.put(entry.getKey(), val); } @@ -1736,6 +1729,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public IgniteFuture<GridCacheReturn<V>> putAllAsync( GridCacheContext<K, V> cacheCtx, Map<? extends K, ? extends V> map, @@ -1744,7 +1738,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> long ttl, IgnitePredicate<GridCacheEntry<K, V>>[] filter ) { - return putAllAsync0(cacheCtx, map, null, null, retval, cached, ttl, filter); + return (IgniteFuture<GridCacheReturn<V>>)putAllAsync0(cacheCtx, + map, + null, + null, + null, + retval, + cached, + filter); } /** {@inheritDoc} */ @@ -1752,18 +1753,32 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> GridCacheContext<K, V> cacheCtx, Map<? extends K, GridCacheDrInfo<V>> drMap ) { - return putAllAsync0(cacheCtx, null, null, drMap, false, null, -1, null); + return putAllAsync0(cacheCtx, + null, + null, + null, + drMap, + false, + null, + null); } /** {@inheritDoc} */ - @Override public IgniteFuture<GridCacheReturn<V>> transformAllAsync( + @SuppressWarnings("unchecked") + @Override public <T> IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync( GridCacheContext<K, V> cacheCtx, - @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> map, boolean retval, - @Nullable GridCacheEntryEx<K, V> cached, - long ttl + @Nullable Map<? extends K, EntryProcessor> map, + Object... invokeArgs ) { - return putAllAsync0(cacheCtx, null, map, null, retval, null, -1, null); + return (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)putAllAsync0(cacheCtx, + null, + map, + invokeArgs, + null, + retval, + null, + null); } /** {@inheritDoc} */ @@ -1796,7 +1811,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * @param expiryPlc Explicitly specified expiry policy for entry. * @param implicit Implicit flag. * @param lookup Value lookup map ({@code null} for remove). - * @param transformMap Map with transform closures if this is a transform operation. + * @param invokeMap Map with entry processors for invoke operation. + * @param invokeArgs Optional arguments for EntryProcessor. * @param retval Flag indicating whether a value should be returned. * @param lockOnly If {@code true}, then entry will be enlisted as noop. * @param filter User filters. @@ -1807,13 +1823,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions). */ protected IgniteFuture<Set<K>> enlistWrite( - GridCacheContext<K, V> cacheCtx, + final GridCacheContext<K, V> cacheCtx, Collection<? extends K> keys, @Nullable GridCacheEntryEx<K, V> cached, @Nullable ExpiryPolicy expiryPlc, boolean implicit, @Nullable Map<? extends K, ? extends V> lookup, - @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap, + @Nullable Map<? extends K, EntryProcessor> invokeMap, + @Nullable Object[] invokeArgs, boolean retval, boolean lockOnly, IgnitePredicate<GridCacheEntry<K, V>>[] filter, @@ -1834,18 +1851,20 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> Set<K> skipped = null; - boolean rmv = lookup == null && transformMap == null; + Set<K> missedForInvoke = null; + + boolean rmv = lookup == null && invokeMap == null; try { // Set transform flag for transaction. - if (transformMap != null) + if (invokeMap != null) transform = true; groupLockSanityCheck(cacheCtx, keys); for (K key : keys) { V val = rmv || lookup == null ? null : lookup.get(key); - IgniteClosure<V, V> transformClo = transformMap == null ? null : transformMap.get(key); + EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key); GridCacheVersion drVer; long drTtl; @@ -1876,7 +1895,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (key == null) continue; - if (!rmv && val == null && transformClo == null) { + if (!rmv && val == null && entryProcessor == null) { skipped = skip(skipped, key); continue; @@ -1930,7 +1949,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> /*events*/retval, /*temporary*/false, CU.subjectId(this, cctx), - transformClo, + entryProcessor, resolveTaskName(), CU.<K, V>empty(), null); @@ -1952,7 +1971,16 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (!readCommitted() && old != null) { // Enlist failed filters as reads for non-read-committed mode, // so future ops will get the same values. - txEntry = addEntry(READ, old, null, entry, null, CU.<K, V>empty(), false, -1L, -1L, + txEntry = addEntry(READ, + old, + null, + null, + entry, + null, + CU.<K, V>empty(), + false, + -1L, + -1L, null); txEntry.markValid(); @@ -1964,9 +1992,20 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> break; // While. } - txEntry = addEntry(lockOnly ? NOOP : rmv ? DELETE : transformClo != null ? TRANSFORM : - old != null ? UPDATE : CREATE, val, transformClo, entry, expiryPlc, filter, true, drTtl, - drExpireTime, drVer); + GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE : + entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE; + + txEntry = addEntry(op, + val, + entryProcessor, + invokeArgs, + entry, + expiryPlc, + filter, + true, + drTtl, + drExpireTime, + drVer); if (!implicit() && readCommitted()) cacheCtx.evicts().touch(entry, topologyVersion()); @@ -2013,15 +2052,39 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } ); } - else - ret.set(null, true); + else { + if (retval) + ret.set(null, true); + else { + if (txEntry.op() == TRANSFORM) { + if (missedForInvoke == null) + missedForInvoke = new HashSet<>(); + + missedForInvoke.add(key); + } + else + ret.success(true); + } + } + } + else { + if (retval) + ret.set(old, true); + else { + if (txEntry.op() == TRANSFORM) + addInvokeResult(txEntry, old, ret); + else + ret.success(true); + } } - else - ret.set(old, true); } // Pessimistic. - else - ret.set(old, true); + else { + if (retval) + ret.set(old, true); + else + ret.success(true); + } break; // While. } @@ -2032,7 +2095,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } } else { - if (transformClo == null && txEntry.op() == TRANSFORM) + if (entryProcessor == null && txEntry.op() == TRANSFORM) throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " + "transaction after transform closure is applied): " + key); @@ -2051,9 +2114,20 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> continue; } - txEntry = addEntry(rmv ? DELETE : transformClo != null ? TRANSFORM : - v != null ? UPDATE : CREATE, val, transformClo, entry, expiryPlc, filter, true, drTtl, - drExpireTime, drVer); + GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM : + v != null ? UPDATE : CREATE; + + txEntry = addEntry(op, + val, + entryProcessor, + invokeArgs, + entry, + expiryPlc, + filter, + true, + drTtl, + drExpireTime, + drVer); enlisted.add(key); } @@ -2061,8 +2135,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (!pessimistic()) { txEntry.markValid(); - // Set tx entry and return values. - ret.set(v, true); + if (retval) + ret.set(v, true); } } } @@ -2071,6 +2145,38 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> return new GridFinishedFuture<>(cctx.kernalContext(), e); } + if (missedForInvoke != null) { + assert optimistic(); + assert invokeMap != null; + + IgniteFuture<Boolean> fut = loadMissing( + cacheCtx, + true, + missedForInvoke, + deserializePortables(cacheCtx), + new CI2<K, V>() { + @Override public void apply(K k, V v) { + if (log.isDebugEnabled()) + log.debug("Loaded value from remote node [key=" + k + ", val=" + v + ']'); + + addInvokeResult(entry(new IgniteTxKey<>(k, cacheCtx.cacheId())), v, ret); + } + }); + + return new GridEmbeddedFuture<>( + cctx.kernalContext(), + fut, + new C2<Boolean, Exception, Set<K>>() { + @Override public Set<K> apply(Boolean b, Exception e) { + if (e != null) + throw new GridClosureException(e); + + return Collections.emptySet(); + } + } + ); + } + return new GridFinishedFuture<>(cctx.kernalContext(), skipped); } @@ -2080,8 +2186,6 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * @param cacheCtx Context. * @param keys Keys. * @param failed Collection of potentially failed keys (need to populate in this method). - * @param transformed Output map where transformed values will be placed. - * @param transformMap Transform map. * @param ret Return value. * @param rmv {@code True} if remove. * @param retval Flag to return value or not. @@ -2090,19 +2194,20 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * @param filter Filter to check entries. * @return Failed keys. * @throws IgniteCheckedException If error. + * @param computeInvoke If {@code true} computes return value for invoke operation. */ + @SuppressWarnings("unchecked") protected Set<K> postLockWrite( GridCacheContext<K, V> cacheCtx, Iterable<? extends K> keys, Set<K> failed, - @Nullable Map<K, V> transformed, - @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap, - GridCacheReturn<V> ret, + GridCacheReturn ret, boolean rmv, boolean retval, boolean read, long accessTtl, - IgnitePredicate<GridCacheEntry<K, V>>[] filter + IgnitePredicate<GridCacheEntry<K, V>>[] filter, + boolean computeInvoke ) throws IgniteCheckedException { for (K k : keys) { IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(k)); @@ -2132,7 +2237,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter)) retval = true; - if (retval) { + if (retval || txEntry.op() == TRANSFORM) { if (!cacheCtx.isNear()) { try { if (!hasPrevVal) @@ -2161,7 +2266,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> v = retval ? cached.rawGetOrUnmarshal(false) : cached.rawGet(); } - ret.value(v); + if (txEntry.op() == TRANSFORM) { + if (computeInvoke) + addInvokeResult(txEntry, v, ret); + } + else + ret.value(v); } boolean pass = cacheCtx.isAll(cached, filter); @@ -2185,7 +2295,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> failed = skip(failed, k); // Revert operation to previous. (if no - NOOP, so entry will be unlocked). - txEntry.setAndMarkValid(txEntry.previousOperation(), ret.value()); + txEntry.setAndMarkValid(txEntry.previousOperation(), (V)ret.value()); txEntry.filters(CU.<K, V>empty()); txEntry.filtersSet(false); @@ -2222,33 +2332,58 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } /** + * @param txEntry Entry. + * @param val Value. + * @param ret Return value to update. + */ + private void addInvokeResult(IgniteTxEntry<K, V> txEntry, V val, GridCacheReturn ret) { + try { + Object res = null; + + for (T2<EntryProcessor<K, V, ?>, Object[]> t : txEntry.entryProcessors()) { + CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(txEntry.key(), val); + + EntryProcessor<K, V, ?> entryProcessor = t.get1(); + + res = entryProcessor.process(invokeEntry, t.get2()); + } + + ret.addEntryProcessResult(txEntry.key(), new CacheInvokeResult<>(res)); + } + catch (Exception e) { + ret.addEntryProcessResult(txEntry.key(), new CacheInvokeResult(e)); + } + } + + /** * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap} * maps must be non-null. * * @param cacheCtx Context. * @param map Key-value map to store. - * @param transformMap Transform map. + * @param invokeMap Invoke map. + * @param invokeArgs Optional arguments for EntryProcessor. * @param drMap DR map. * @param retval Key-transform value map to store. * @param cached Cached entry, if any. - * @param ttl Time to live. * @param filter Filter. * @return Operation future. */ - private IgniteFuture<GridCacheReturn<V>> putAllAsync0( + @SuppressWarnings("unchecked") + private IgniteFuture putAllAsync0( final GridCacheContext<K, V> cacheCtx, @Nullable Map<? extends K, ? extends V> map, - @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap, + @Nullable Map<? extends K, EntryProcessor> invokeMap, + @Nullable final Object[] invokeArgs, @Nullable final Map<? extends K, GridCacheDrInfo<V>> drMap, final boolean retval, @Nullable GridCacheEntryEx<K, V> cached, - long ttl, @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) { cacheCtx.checkSecurity(GridSecurityPermission.CACHE_PUT); // Cached entry may be passed only from entry wrapper. final Map<K, V> map0; - final Map<K, IgniteClosure<V, V>> transformMap0; + final Map<K, EntryProcessor> invokeMap0; if (drMap != null) { assert map == null; @@ -2259,7 +2394,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } }); - transformMap0 = null; + invokeMap0 = null; } else if (cacheCtx.portableEnabled()) { if (map != null) { @@ -2280,14 +2415,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> else map0 = null; - if (transformMap != null) { - transformMap0 = U.newHashMap(transformMap.size()); + if (invokeMap != null) { + invokeMap0 = U.newHashMap(invokeMap.size()); try { - for (Map.Entry<? extends K, ? extends IgniteClosure<V, V>> e : transformMap.entrySet()) { + for (Map.Entry<? extends K, EntryProcessor> e : invokeMap.entrySet()) { K key = (K)cacheCtx.marshalToPortable(e.getKey()); - transformMap0.put(key, e.getValue()); + invokeMap0.put(key, e.getValue()); } } catch (PortableException e) { @@ -2295,19 +2430,19 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } } else - transformMap0 = null; + invokeMap0 = null; } else { map0 = (Map<K, V>)map; - transformMap0 = (Map<K, IgniteClosure<V, V>>)transformMap; + invokeMap0 = (Map<K, EntryProcessor>)invokeMap; } if (log.isDebugEnabled()) log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]"); - assert map0 != null || transformMap0 != null; + assert map0 != null || invokeMap0 != null; assert cached == null || - (map0 != null && map0.size() == 1) || (transformMap0 != null && transformMap0.size() == 1); + (map0 != null && map0.size() == 1) || (invokeMap0 != null && invokeMap0.size() == 1); try { checkValid(); @@ -2320,7 +2455,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> final GridCacheReturn<V> ret = new GridCacheReturn<>(false); - if (F.isEmpty(map0) && F.isEmpty(transformMap0)) { + if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) { if (implicit()) try { commit(); @@ -2333,7 +2468,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } try { - Set<? extends K> keySet = map0 != null ? map0.keySet() : transformMap0.keySet(); + Set<? extends K> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet(); Collection<K> enlisted = new LinkedList<>(); @@ -2346,7 +2481,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> prj != null ? prj.expiry() : null, implicit, map0, - transformMap0, + invokeMap0, + invokeArgs, retval, false, filter, @@ -2390,19 +2526,16 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (log.isDebugEnabled()) log.debug("Acquired transaction lock for put on keys: " + keys); - Map<K, V> transformed = null; - postLockWrite(cacheCtx, keys, loaded, - transformed, - transformMap0, ret, /*remove*/false, retval, /*read*/false, -1L, - filter); + filter, + /*computeInvoke*/true); return ret; } @@ -2554,7 +2687,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> plc, implicit, /** lookup map */null, - /** transform map */null, + /** invoke map */null, + /** invoke arguments */null, retval, /** lock only */false, filter, @@ -2595,14 +2729,13 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> postLockWrite(cacheCtx, passedKeys, loadFut.get(), - null, - null, ret, /*remove*/true, retval, /*read*/false, -1L, - filter); + filter, + /*computeInvoke*/false); return ret; } @@ -2650,6 +2783,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> /** * Checks if portable values should be deserialized. * + * @param cacheCtx Cache context. * @return {@code True} if portables should be deserialized, {@code false} otherwise. */ private boolean deserializePortables(GridCacheContext<K, V> cacheCtx) { @@ -2670,6 +2804,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> /** * Checks that affinity keys are enlisted in group transaction on start. * + * @param cacheCtx Cache context. * @param keys Keys to check. * @throws IgniteCheckedException If sanity check failed. */ @@ -2721,7 +2856,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> /** expiry - leave unchanged */null, /** implicit */false, /** lookup map */null, - /** transform map */null, + /** invoke map */null, + /** invoke arguments */null, /** retval */false, /** lock only */true, CU.<K, V>empty(), @@ -2842,7 +2978,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * @param op Cache operation. * @param val Value. * @param expiryPlc Explicitly specified expiry policy. - * @param transformClos Transform closure. + * @param invokeArgs Optional arguments for EntryProcessor. + * @param entryProcessor Entry processor. * @param entry Cache entry. * @param filter Filter. * @param filtersSet {@code True} if filter should be marked as set. @@ -2853,7 +2990,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> */ protected final IgniteTxEntry<K, V> addEntry(GridCacheOperation op, @Nullable V val, - @Nullable IgniteClosure<V, V> transformClos, + @Nullable EntryProcessor entryProcessor, + Object[] invokeArgs, GridCacheEntryEx<K, V> entry, @Nullable ExpiryPolicy expiryPlc, IgnitePredicate<GridCacheEntry<K, V>>[] filter, @@ -2861,6 +2999,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> long drTtl, long drExpireTime, @Nullable GridCacheVersion drVer) { + assert invokeArgs == null || op == TRANSFORM; + IgniteTxKey<K> key = entry.txKey(); checkInternal(key); @@ -2883,12 +3023,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> IgniteTxEntry<K, V> txEntry; if (old != null) { - if (transformClos != null) { + if (entryProcessor != null) { assert val == null; assert op == TRANSFORM; // Will change the op. - old.addTransformClosure(transformClos); + old.addEntryProcessor(entryProcessor, invokeArgs); } else { assert old.op() != TRANSFORM; @@ -2922,7 +3062,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> this, op, val, - transformClos, + entryProcessor, + invokeArgs, hasDrTtl ? drTtl : -1L, entry, filter,