# ignite-44
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/71ee2ee1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/71ee2ee1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/71ee2ee1 Branch: refs/heads/ignite-44 Commit: 71ee2ee194aea90f81a7b4299bb9e7163a59f143 Parents: 982d441 Author: sboikov <sboi...@gridgain.com> Authored: Tue Dec 23 17:05:07 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Dec 23 17:30:04 2014 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 32 +- .../processors/cache/CacheInvokeEntry.java | 72 ++++ .../processors/cache/CacheInvokeResult.java | 95 +++++ .../processors/cache/GridCacheAdapter.java | 18 +- .../processors/cache/GridCacheEntryEx.java | 2 + .../processors/cache/GridCacheMapEntry.java | 58 ++- .../processors/cache/GridCacheMessage.java | 54 +++ .../processors/cache/GridCacheProjectionEx.java | 21 ++ .../cache/GridCacheProjectionImpl.java | 15 + .../processors/cache/GridCacheProxyImpl.java | 29 ++ .../cache/GridCacheUpdateAtomicResult.java | 18 +- .../dht/atomic/GridDhtAtomicCache.java | 279 +++++++++++--- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 19 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 121 +++--- .../dht/atomic/GridNearAtomicUpdateFuture.java | 36 +- .../dht/atomic/GridNearAtomicUpdateRequest.java | 185 +++++++--- .../distributed/near/GridNearAtomicCache.java | 9 +- ...eCacheAtomicPrimaryWriteOrderInvokeTest.java | 47 +++ ...micPrimaryWriteOrderWithStoreInvokeTest.java | 23 ++ .../cache/IgniteCacheInvokeAbstractTest.java | 367 +++++++++++++++++++ .../processors/cache/GridCacheTestEntryEx.java | 11 +- .../junits/common/GridCommonAbstractTest.java | 43 +++ 22 files changed, 1383 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 3945234..410fb9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -494,16 +494,40 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws EntryProcessorException { - // TODO IGNITE-1. - throw new UnsupportedOperationException(); + try { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args).get(); + + return res.get(); + } + finally { + gate.leave(prev); + } + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } } /** {@inheritDoc} */ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args) { - // TODO IGNITE-1. - throw new UnsupportedOperationException(); + try { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.invokeAll(keys, entryProcessor, args).get(); + } + finally { + gate.leave(prev); + } + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java new file mode 100644 index 0000000..1f3900d --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java @@ -0,0 +1,72 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.internal.*; + +import javax.cache.processor.*; + +/** + * Implementation of {@link MutableEntry} passed to the {@link EntryProcessor#process(MutableEntry, Object...)}. + */ +public class CacheInvokeEntry<K, V> implements MutableEntry<K, V> { + /** */ + @GridToStringInclude + private final K key; + + /** */ + @GridToStringInclude + private V val; + + /** + * @param key Key. + * @param val Value. + */ + public CacheInvokeEntry(K key, V val) { + this.key = key; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public boolean exists() { + return val != null; + } + + /** {@inheritDoc} */ + @Override public void remove() { + val = null; + } + + /** {@inheritDoc} */ + @Override public void setValue(V val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public K getKey() { + return key; + } + + /** {@inheritDoc} */ + @Override public V getValue() { + return val; + } + + /** {@inheritDoc} */ + @Override public <T> T unwrap(Class<T> clazz) { + throw new IllegalArgumentException(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheInvokeEntry.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java new file mode 100644 index 0000000..50af119 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java @@ -0,0 +1,95 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.apache.ignite.marshaller.optimized.*; +import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import javax.cache.processor.*; +import java.io.*; + +/** + * Implementation of {@link EntryProcessorResult}. + */ +public class CacheInvokeResult<T> implements EntryProcessorResult<T>, Externalizable, IgniteOptimizedMarshallable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"}) + private static Object GG_CLASS_ID; + + /** */ + @GridToStringInclude + private T res; + + /** */ + private Exception err; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public CacheInvokeResult() { + // No-op. + } + + /** + * @param res Computed result. + */ + public CacheInvokeResult(@Nullable T res) { + this.res = res; + } + + /** + * @param err Exception thrown by {@link EntryProcessor#process(MutableEntry, Object...)}. + */ + public CacheInvokeResult(Exception err) { + this.err = err; + } + + /** {@inheritDoc} */ + @Override public Object ggClassId() { + return GG_CLASS_ID; + } + + /** {@inheritDoc} */ + @Override public T get() throws EntryProcessorException { + if (err != null) { + if (err instanceof EntryProcessorException) + throw (EntryProcessorException)err; + + throw new EntryProcessorException(err); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(res); + + out.writeObject(err); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + res = (T)in.readObject(); + + err = (Exception)in.readObject(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheInvokeResult.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index f55ee0e..b3e567c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -43,6 +43,7 @@ import org.jdk8.backport.*; import org.jetbrains.annotations.*; import javax.cache.expiry.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -2193,6 +2194,21 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im } /** {@inheritDoc} */ + @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) + throws EntryProcessorException { + // TODO IGNITE-44. + return null; + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys, + EntryProcessor<K, V, T> entryProcessor, + Object... args) { + // TODO IGNITE-44. + return null; + } + + /** {@inheritDoc} */ @Override public void transform(final K key, final IgniteClosure<V, V> transformer) throws IgniteCheckedException { A.notNull(key, "key", transformer, "valTransform"); @@ -4516,7 +4532,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im * @param key Cache key. * @throws IllegalArgumentException If validation fails. */ - private void validateCacheKey(Object key) { + protected void validateCacheKey(Object key) { if (keyCheck) { CU.validateCacheKey(log, key); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java index 5fb0b95..1b71eec 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java @@ -392,6 +392,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware { * @param op Update operation. * @param val Value. Type depends on operation. * @param valBytes Value bytes. Can be non-null only if operation is UPDATE. + * @param invokeArgs Optional arguments for entry processor. * @param writeThrough Write through flag. * @param retval Return value flag. * @param expiryPlc Expiry policy. @@ -424,6 +425,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware { GridCacheOperation op, @Nullable Object val, @Nullable byte[] valBytes, + @Nullable Object[] invokeArgs, boolean writeThrough, boolean retval, @Nullable IgniteCacheExpiryPolicy expiryPlc, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java index 0fd64de..f14bba5 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java @@ -29,6 +29,7 @@ import org.jetbrains.annotations.*; import sun.misc.*; import javax.cache.expiry.*; +import javax.cache.processor.*; import java.io.*; import java.nio.*; import java.util.*; @@ -1586,6 +1587,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> GridCacheOperation op, @Nullable Object writeObj, @Nullable byte[] valBytes, + @Nullable Object[] invokeArgs, boolean writeThrough, boolean retval, @Nullable IgniteCacheExpiryPolicy expiryPlc, @@ -1615,6 +1617,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> GridDrResolveResult<V> drRes = null; + EntryProcessorResult<?> invokeRes = null; + long newTtl = -1L; long newExpireTime = 0L; long newDrExpireTime = -1L; // Explicit DR expire time which possibly will be sent to DHT node. @@ -1644,7 +1648,15 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (drRes.isUseOld()) { old = retval ? rawGetOrUnmarshalUnlocked(false) : val; - return new GridCacheUpdateAtomicResult<>(false, old, null, -1L, -1L, null, null, false); + return new GridCacheUpdateAtomicResult<>(false, + old, + null, + invokeRes, + -1L, + -1L, + null, + null, + false); } newTtl = drRes.newTtl(); @@ -1692,7 +1704,15 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> old = retval ? rawGetOrUnmarshalUnlocked(false) : val; - return new GridCacheUpdateAtomicResult<>(false, old, null, -1L, -1L, null, null, false); + return new GridCacheUpdateAtomicResult<>(false, + old, + null, + invokeRes, + -1L, + -1L, + null, + null, + false); } } else @@ -1744,6 +1764,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> return new GridCacheUpdateAtomicResult<>(false, retval ? old : null, null, + invokeRes, -1L, -1L, null, @@ -1760,11 +1781,26 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (op == GridCacheOperation.TRANSFORM) { transformClo = writeObj; - IgniteClosure<V, V> transform = (IgniteClosure<V, V>)writeObj; + EntryProcessor<K, V, ?> entryProcessor = (EntryProcessor<K, V, ?>)writeObj; - updated = cctx.unwrapTemporary(transform.apply(old)); + CacheInvokeEntry<K, V> entry = new CacheInvokeEntry<>(key, old); + + try { + Object computed = entryProcessor.process(entry, invokeArgs); + + updated = cctx.unwrapTemporary(entry.getValue()); + + invokeRes = new CacheInvokeResult<>(cctx.unwrapTemporary(computed)); - valBytes = null; + valBytes = null; + } + catch (Exception e) { + invokeRes = new CacheInvokeResult<>(e); + + updated = old; + + valBytes = oldBytes.getIfMarshaled(); + } } else updated = (V)writeObj; @@ -1794,6 +1830,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> return new GridCacheUpdateAtomicResult<>(false, retval ? old : null, null, + invokeRes, -1L, -1L, null, @@ -1899,6 +1936,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> return new GridCacheUpdateAtomicResult<>(false, cctx.<V>unwrapTemporary(interceptRes.get2()), null, + invokeRes, -1L, -1L, null, @@ -2001,7 +2039,15 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (log.isDebugEnabled()) log.debug("Updated cache entry [val=" + val + ", old=" + old + ", entry=" + this + ']'); - return new GridCacheUpdateAtomicResult<>(res, old, updated, newTtl, newDrExpireTime, enqueueVer, drRes, true); + return new GridCacheUpdateAtomicResult<>(res, + old, + updated, + invokeRes, + newTtl, + newDrExpireTime, + enqueueVer, + drRes, + true); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java index ab98dcb..45eda5f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java @@ -372,6 +372,60 @@ public abstract class GridCacheMessage<K, V> extends GridTcpCommunicationMessage } /** + * @param args Arguments to marshal. + * @param ctx Context. + * @return Marshalled collection. + * @throws IgniteCheckedException If failed. + */ + @Nullable protected final byte[][] marshalInvokeArguments(@Nullable Object[] args, + GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + assert ctx != null; + + if (args == null || args.length == 0) + return null; + + byte[][] argsBytes = new byte[args.length][]; + + for (int i = 0; i < args.length; i++) { + Object arg = args[i]; + + if (ctx.deploymentEnabled()) + prepareObject(arg, ctx); + + argsBytes[i] = arg == null ? null : CU.marshal(ctx, arg); + } + + return argsBytes; + } + + + /** + * @param byteCol Collection to unmarshal. + * @param ctx Context. + * @param ldr Loader. + * @return Unmarshalled collection. + * @throws IgniteCheckedException If failed. + */ + @Nullable protected final Object[] unmarshalInvokeArguments(@Nullable byte[][] byteCol, + GridCacheSharedContext<K, V> ctx, + ClassLoader ldr) throws IgniteCheckedException { + assert ldr != null; + assert ctx != null; + + if (byteCol == null) + return null; + + Object[] args = new Object[byteCol.length]; + + IgniteMarshaller marsh = ctx.marshaller(); + + for (int i = 0; i < byteCol.length; i++) + args[i] = byteCol[i] == null ? null : marsh.unmarshal(byteCol[i], ldr); + + return args; + } + + /** * @param filter Collection to marshal. * @param ctx Context. * @return Marshalled collection. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java index 2362f57..1a98192 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java @@ -18,6 +18,7 @@ import org.gridgain.grid.kernal.processors.cache.dr.*; import org.jetbrains.annotations.*; import javax.cache.expiry.*; +import javax.cache.processor.*; import java.util.*; /** @@ -393,4 +394,24 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> { * @return New projection based on this one, but with the specified expiry policy. */ public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy plc); + + /** + * @param key Key. + * @param entryProcessor Entry processor. + * @param args Arguments. + * @return Future. + */ + public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args); + + /** + * @param keys Keys. + * @param entryProcessor Entry processor. + * @param args Arguments. + * @return Future. + */ + public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys, + EntryProcessor<K, V, T> entryProcessor, + Object... args); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java index 5bd973c..ad5cde3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java @@ -26,6 +26,7 @@ import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; import javax.cache.expiry.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -788,6 +789,20 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ + @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) { + return cache.invoke(key, entryProcessor, args); + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys, + EntryProcessor<K, V, T> entryProcessor, + Object... args) { + return cache.invokeAll(keys, entryProcessor, args); + } + + /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> putxAsync(K key, V val, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) { return putxAsync(key, val, null, -1, filter); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java index 136e078..90aeb0b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java @@ -26,6 +26,7 @@ import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; import javax.cache.expiry.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -737,6 +738,34 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ + @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.invoke(key, entryProcessor, args); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys, + EntryProcessor<K, V, T> entryProcessor, + Object... args) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.invokeAll(keys, entryProcessor, args); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> putxAsync(K key, V val, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java index 43ca819..34dbe52 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java @@ -9,11 +9,12 @@ package org.gridgain.grid.kernal.processors.cache; -import org.gridgain.grid.kernal.processors.dr.*; import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.grid.util.tostring.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; + /** * Cache entry atomic update result. */ @@ -46,14 +47,18 @@ public class GridCacheUpdateAtomicResult<K, V> { /** Whether update should be propagated to DHT node. */ private final boolean sndToDht; + /** Value computed by entry processor. */ + private EntryProcessorResult<?> res; + /** * Constructor. * * @param success Success flag. * @param oldVal Old value. * @param newVal New value. + * @param res Value computed by the {@link EntryProcessor}. * @param newTtl New TTL. - * @param drExpireTime Explict DR expire time (if any). + * @param drExpireTime Explicit DR expire time (if any). * @param rmvVer Version for deferred delete. * @param drRes DR resolution result. * @param sndToDht Whether update should be propagated to DHT node. @@ -61,6 +66,7 @@ public class GridCacheUpdateAtomicResult<K, V> { public GridCacheUpdateAtomicResult(boolean success, @Nullable V oldVal, @Nullable V newVal, + @Nullable EntryProcessorResult<?> res, long newTtl, long drExpireTime, @Nullable GridCacheVersion rmvVer, @@ -69,6 +75,7 @@ public class GridCacheUpdateAtomicResult<K, V> { this.success = success; this.oldVal = oldVal; this.newVal = newVal; + this.res = res; this.newTtl = newTtl; this.drExpireTime = drExpireTime; this.rmvVer = rmvVer; @@ -77,6 +84,13 @@ public class GridCacheUpdateAtomicResult<K, V> { } /** + * @return Value computed by the {@link EntryProcessor}. + */ + @Nullable public EntryProcessorResult<?> computedResult() { + return res; + } + + /** * @return Success flag. */ public boolean success() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 2c988e7..fec59b2 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -34,6 +34,7 @@ import org.jetbrains.annotations.*; import sun.misc.*; import javax.cache.expiry.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -306,14 +307,32 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @SuppressWarnings("unchecked") @Override public IgniteFuture<V> putAsync(K key, V val, @Nullable GridCacheEntryEx<K, V> entry, long ttl, @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) { - return updateAllAsync0(F0.asMap(key, val), null, null, null, true, false, entry, ttl, filter); + return updateAllAsync0(F0.asMap(key, val), + null, + null, + null, + null, + true, + false, + entry, + ttl, + filter); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public IgniteFuture<Boolean> putxAsync(K key, V val, @Nullable GridCacheEntryEx<K, V> entry, long ttl, @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) { - return updateAllAsync0(F0.asMap(key, val), null, null, null, false, false, entry, ttl, filter); + return updateAllAsync0(F0.asMap(key, val), + null, + null, + null, + null, + false, + false, + entry, + ttl, + filter); } /** {@inheritDoc} */ @@ -385,7 +404,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public IgniteFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) { - return updateAllAsync0(F.asMap(key, newVal), null, null, null, true, true, null, 0, + return updateAllAsync0(F.asMap(key, newVal), + null, + null, + null, + null, + true, + true, + null, + 0, ctx.equalsPeekArray(oldVal)); } @@ -398,7 +425,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public IgniteFuture<?> putAllAsync(Map<? extends K, ? extends V> m, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) { - return updateAllAsync0(m, null, null, null, false, false, null, 0, filter); + return updateAllAsync0(m, + null, + null, + null, + null, + false, + false, + null, + 0, + filter); } /** {@inheritDoc} */ @@ -410,7 +446,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public IgniteFuture<?> putAllDrAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) { ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); - return updateAllAsync0(null, null, drMap, null, false, false, null, 0, null); + return updateAllAsync0(null, + null, + null, + drMap, + null, + false, + false, + null, + 0, + null); } /** {@inheritDoc} */ @@ -421,16 +466,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public <R> R transformAndCompute(K key, IgniteClosure<V, IgniteBiTuple<V, R>> transformer) throws IgniteCheckedException { + /* return (R)updateAllAsync0(null, Collections.singletonMap(key, new GridCacheTransformComputeClosure<>(transformer)), null, null, true, false, null, 0, null).get(); + */ + // TODO IGNITE-44. + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public IgniteFuture<?> transformAsync(K key, IgniteClosure<V, V> transformer, @Nullable GridCacheEntryEx<K, V> entry, long ttl) { + /* return updateAllAsync0(null, Collections.singletonMap(key, transformer), null, null, false, false, entry, ttl, null); + */ + // TODO IGNITE-44. + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @@ -440,10 +493,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public IgniteFuture<?> transformAllAsync(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) { + /* if (F.isEmpty(m)) return new GridFinishedFuture<Object>(ctx.kernalContext()); return updateAllAsync0(null, m, null, null, false, false, null, 0, null); + */ + // TODO IGNITE-44. + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @@ -579,11 +636,83 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { "GridCacheAtomicityMode.ATOMIC mode (use GridCacheAtomicityMode.TRANSACTIONAL instead)")); } + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) { + A.notNull(key, "key", entryProcessor, "entryProcessor"); + + if (keyCheck) + validateCacheKey(key); + + ctx.denyOnLocalRead(); + + Map<? extends K, EntryProcessor> transformMap = + Collections.singletonMap(key, (EntryProcessor)entryProcessor); + + IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null, + transformMap, + args, + null, + null, + true, + false, + null, + -1L, + null); + + return fut.chain(new CX1<IgniteFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() { + @Override public EntryProcessorResult<T> applyx(IgniteFuture<Map<K, EntryProcessorResult<T>>> fut) + throws IgniteCheckedException { + Map<K, EntryProcessorResult<T>> resMap = fut.get(); + + assert resMap != null; + assert resMap.size() == 1 : resMap.size(); + + return resMap.values().iterator().next(); + } + }); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys, + final EntryProcessor<K, V, T> entryProcessor, + Object... args) { + A.notNull(keys, "keys", entryProcessor, "entryProcessor"); + + if (keyCheck) + validateCacheKeys(keys); + + ctx.denyOnLocalRead(); + + Map<? extends K, EntryProcessor> transformMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() { + @Override public EntryProcessor apply(K k) { + return entryProcessor; + } + }); + + IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null, + transformMap, + args, + null, + null, + true, + false, + null, + -1L, + null); + + return fut; + } + /** * Entry point for all public API put/transform methods. * * @param map Put map. Either {@code map}, {@code transformMap} or {@code drMap} should be passed. * @param transformMap Transform map. Either {@code map}, {@code transformMap} or {@code drMap} should be passed. + * @param invokeArgs Optional arguments for EntryProcessor. * @param drPutMap DR put map. * @param drRmvMap DR remove map. * @param retval Return value required flag. @@ -595,7 +724,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ private IgniteFuture updateAllAsync0( @Nullable final Map<? extends K, ? extends V> map, - @Nullable final Map<? extends K, ? extends IgniteClosure<V, V>> transformMap, + @Nullable final Map<? extends K, EntryProcessor> transformMap, + @Nullable Object[] invokeArgs, @Nullable final Map<? extends K, GridCacheDrInfo<V>> drPutMap, @Nullable final Map<? extends K, GridCacheVersion> drRmvMap, final boolean retval, @@ -623,6 +753,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { map != null ? map.keySet() : transformMap != null ? transformMap.keySet() : drPutMap != null ? drPutMap.keySet() : drRmvMap.keySet(), map != null ? map.values() : transformMap != null ? transformMap.values() : null, + invokeArgs, drPutMap != null ? drPutMap.values() : null, drRmvMap != null ? drRmvMap.values() : null, retval, @@ -682,6 +813,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { keys != null ? keys : drMap.keySet(), null, null, + null, keys != null ? null : drMap.values(), retval, rawRetval, @@ -692,8 +824,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { taskNameHash); return asyncOp(new CO<IgniteFuture<Object>>() { - @Override - public IgniteFuture<Object> apply() { + @Override public IgniteFuture<Object> apply() { updateFut.map(); return updateFut; @@ -858,11 +989,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { IgniteFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion()); if (forceFut.isDone()) - updateAllAsyncInternal0(nodeId, req, cached, completionCb); + updateAllAsyncInternal0(nodeId, req, completionCb); else { forceFut.listenAsync(new CI1<IgniteFuture<Object>>() { @Override public void apply(IgniteFuture<Object> t) { - updateAllAsyncInternal0(nodeId, req, cached, completionCb); + updateAllAsyncInternal0(nodeId, req, completionCb); } }); } @@ -873,13 +1004,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * * @param nodeId Node ID. * @param req Update request. - * @param cached Cached entry if updating single local entry. * @param completionCb Completion callback. */ public void updateAllAsyncInternal0( UUID nodeId, GridNearAtomicUpdateRequest<K, V> req, - @Nullable GridCacheEntryEx<K, V> cached, CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb ) { GridNearAtomicUpdateResponse<K, V> res = new GridNearAtomicUpdateResponse<>(ctx.cacheId(), nodeId, @@ -887,7 +1016,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { List<K> keys = req.keys(); - assert !req.returnValue() || keys.size() == 1; + assert !req.returnValue() || (req.operation() == TRANSFORM || keys.size() == 1); GridDhtAtomicUpdateFuture<K, V> dhtFut = null; @@ -966,6 +1095,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { deleted = updRes.deleted(); dhtFut = updRes.dhtFuture(); + + if (req.operation() == TRANSFORM) + retVal = new GridCacheReturn<>((Object)updRes.invokeResults(), true); } else { UpdateSingleResult<K, V> updRes = updateSingle(node, @@ -1075,8 +1207,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { String taskName, @Nullable IgniteCacheExpiryPolicy expiry ) throws GridCacheEntryRemovedException { - // Cannot update in batches during DR due to possible conflicts. - assert !req.returnValue(); // Should not request return values for putAll. + assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts. + assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll. if (!F.isEmpty(req.filter())) { try { @@ -1092,11 +1224,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { int size = req.keys().size(); Map<K, V> putMap = null; - Map<K, IgniteClosure<V, V>> transformMap = null; + Map<K, EntryProcessor<K, V, ?>> entryProcessorMap = null; Collection<K> rmvKeys = null; UpdateBatchResult<K, V> updRes = new UpdateBatchResult<>(); List<GridDhtCacheEntry<K, V>> filtered = new ArrayList<>(size); GridCacheOperation op = req.operation(); + Map<Object, Object> invokeResMap = op == TRANSFORM ? U.newHashMap(size) : null; int firstEntryIdx = 0; @@ -1136,7 +1269,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (op == TRANSFORM) { - IgniteClosure<V, V> transform = req.transformClosure(i); + EntryProcessor<K, V, ?> entryProcessor = req.entryProcessor(i); V old = entry.innerGet( null, @@ -1148,17 +1281,35 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /*event*/true, /*temporary*/true, req.subjectId(), - transform, + entryProcessor, taskName, CU.<K, V>empty(), null); - if (transformMap == null) - transformMap = new HashMap<>(); + if (entryProcessorMap == null) + entryProcessorMap = new HashMap<>(); + + entryProcessorMap.put(entry.key(), entryProcessor); + + CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(entry.key(), old); - transformMap.put(entry.key(), transform); + V updated; + CacheInvokeResult invokeRes; - V updated = transform.apply(old); + try { + Object computed = entryProcessor.process(invokeEntry, req.invokeArguments()); + + updated = ctx.unwrapTemporary(invokeEntry.getValue()); + + invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed)); + } + catch (Exception e) { + invokeRes = new CacheInvokeResult<>(e); + + updated = old; + } + + invokeResMap.put(entry.key(), invokeRes); if (updated == null) { if (intercept) { @@ -1179,7 +1330,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { node, putMap, null, - transformMap, + entryProcessorMap, dhtFut, completionCb, req, @@ -1192,7 +1343,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { firstEntryIdx = i + 1; putMap = null; - transformMap = null; + entryProcessorMap = null; filtered = new ArrayList<>(); } @@ -1221,7 +1372,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { node, null, rmvKeys, - transformMap, + entryProcessorMap, dhtFut, completionCb, req, @@ -1234,7 +1385,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { firstEntryIdx = i + 1; rmvKeys = null; - transformMap = null; + entryProcessorMap = null; filtered = new ArrayList<>(); } @@ -1331,7 +1482,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { node, putMap, rmvKeys, - transformMap, + entryProcessorMap, dhtFut, completionCb, req, @@ -1346,6 +1497,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { updRes.dhtFuture(dhtFut); + updRes.invokeResult(invokeResMap); + return updRes; } @@ -1442,6 +1595,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; + Map<K, EntryProcessorResult<?>> computedMap = null; + // Avoid iterator creation. for (int i = 0; i < keys.size(); i++) { K k = keys.get(i); @@ -1487,6 +1642,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { op, writeVal, newValBytes, + req.invokeArguments(), primary && storeEnabled(), req.returnValue(), expiry, @@ -1524,16 +1680,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { newValBytes = null; // Value has been changed. } - IgniteClosure<V, V> transformC = null; + EntryProcessor<K, V, ?> entryProcessor = null; if (req.forceTransformBackups() && op == TRANSFORM) - transformC = (IgniteClosure<V, V>)writeVal; + entryProcessor = (EntryProcessor<K, V, ?>)writeVal; if (!readersOnly) { dhtFut.addWriteEntry(entry, updRes.newValue(), newValBytes, - transformC, + entryProcessor, updRes.newTtl(), expireTime, newDrVer); @@ -1544,7 +1700,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { entry, updRes.newValue(), newValBytes, - transformC, + entryProcessor, ttl, expireTime); } @@ -1599,17 +1755,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { deleted.add(F.t(entry, updRes.removeVersion())); } - // Create only once. - if (retVal == null) { - Object ret = updRes.oldValue(); + if (op == TRANSFORM) { + assert req.returnValue(); - if (op == TRANSFORM && writeVal instanceof GridCacheTransformComputeClosure) { - assert req.returnValue(); + if (retVal == null) { + computedMap = U.newHashMap(keys.size()); - ret = ((GridCacheTransformComputeClosure<V, ?>)writeVal).returnValue(); + retVal = new GridCacheReturn<>((Object)computedMap, updRes.success()); } - retVal = new GridCacheReturn<>(req.returnValue() ? ret : null, updRes.success()); + computedMap.put(k, updRes.computedResult()); + } + else { + // Create only once. + if (retVal == null) { + Object ret = updRes.oldValue(); + + retVal = new GridCacheReturn<>(req.returnValue() ? ret : null, updRes.success()); + } } } catch (IgniteCheckedException e) { @@ -1628,7 +1791,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param node Originating node. * @param putMap Values to put. * @param rmvKeys Keys to remove. - * @param transformMap Transform closures. + * @param entryProcessorMap Entry processors. * @param dhtFut DHT update future if has backups. * @param completionCb Completion callback to invoke when DHT future is completed. * @param req Request. @@ -1648,7 +1811,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ClusterNode node, @Nullable Map<K, V> putMap, @Nullable Collection<K> rmvKeys, - @Nullable Map<K, IgniteClosure<V, V>> transformMap, + @Nullable Map<K, EntryProcessor<K, V, ?>> entryProcessorMap, @Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut, CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb, final GridNearAtomicUpdateRequest<K, V> req, @@ -1740,6 +1903,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { op, writeVal, null, + null, false, false, expiry, @@ -1784,13 +1948,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { byte[] valBytes = valBytesTuple.getIfMarshaled(); - IgniteClosure<V, V> transformC = transformMap == null ? null : transformMap.get(entry.key()); + EntryProcessor<K, V, ?> entryProcessor = + entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()); if (!batchRes.readersOnly()) dhtFut.addWriteEntry(entry, writeVal, valBytes, - transformC, + entryProcessor, updRes.newTtl(), -1, null); @@ -1800,7 +1965,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { entry, writeVal, valBytes, - transformC, + entryProcessor, updRes.newTtl(), -1); } @@ -2086,6 +2251,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.operation(), req.keys(), vals, + req.invokeArguments(), drPutVals, drRmvVals, req.returnValue(), @@ -2229,9 +2395,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { V val = req.value(i); byte[] valBytes = req.valueBytes(i); - IgniteClosure<V, V> transform = req.transformClosure(i); + EntryProcessor<K, V, ?> entryProcessor = req.entryProcessor(i); - GridCacheOperation op = transform != null ? TRANSFORM : + GridCacheOperation op = entryProcessor != null ? TRANSFORM : (val != null || valBytes != null) ? UPDATE : DELETE; @@ -2247,8 +2413,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { nodeId, nodeId, op, - op == TRANSFORM ? transform : val, + op == TRANSFORM ? entryProcessor : val, valBytes, + op == TRANSFORM ? req.invokeArguments() : null, /*write-through*/false, /*retval*/false, /*expiry policy*/null, @@ -2487,12 +2654,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** */ private boolean readersOnly; + /** */ + private Map<Object, Object> invokeRes; + /** * @param entry Entry. * @param updRes Entry update result. * @param entries All entries. */ - private void addDeleted(GridDhtCacheEntry<K, V> entry, GridCacheUpdateAtomicResult<K, V> updRes, + private void addDeleted(GridDhtCacheEntry<K, V> entry, + GridCacheUpdateAtomicResult<K, V> updRes, Collection<GridDhtCacheEntry<K, V>> entries) { if (updRes.removeVersion() != null) { if (deleted == null) @@ -2517,6 +2688,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** + * @param invokeRes Result for invoke operation. + */ + private void invokeResult(Map<Object, Object> invokeRes) { + this.invokeRes = invokeRes; + } + + /** + * @return Result for invoke operation. + */ + Map<Object, Object> invokeResults() { + return invokeRes; + } + + /** * @param dhtFut DHT future. */ private void dhtFuture(@Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 8602ae3..88cc8f6 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -22,6 +22,7 @@ import org.gridgain.grid.util.typedef.internal.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -197,7 +198,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> * @param entry Entry to map. * @param val Value to write. * @param valBytes Value bytes. - * @param transformC Transform closure. + * @param entryProcessor Entry processor. * @param ttl TTL (optional). * @param drExpireTime DR expire time (optional). * @param drVer DR version (optional). @@ -205,7 +206,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> public void addWriteEntry(GridDhtCacheEntry<K, V> entry, @Nullable V val, @Nullable byte[] valBytes, - IgniteClosure<V, V> transformC, + EntryProcessor<K, V, ?> entryProcessor, long ttl, long drExpireTime, @Nullable GridCacheVersion drVer) { @@ -236,7 +237,8 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> topVer, forceTransformBackups, this.updateReq.subjectId(), - this.updateReq.taskNameHash()); + this.updateReq.taskNameHash(), + forceTransformBackups ? this.updateReq.invokeArguments() : null); mappings.put(nodeId, updateReq); } @@ -245,7 +247,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> entry.keyBytes(), val, valBytes, - transformC, + entryProcessor, ttl, drExpireTime, drVer); @@ -258,7 +260,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> * @param entry Entry. * @param val Value. * @param valBytes Value bytes. - * @param transformC Transform closure. + * @param entryProcessor Entry processor.. * @param ttl TTL for near cache update (optional). * @param expireTime Expire time for near cache update (optional). */ @@ -266,7 +268,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> GridDhtCacheEntry<K, V> entry, @Nullable V val, @Nullable byte[] valBytes, - IgniteClosure<V, V> transformC, + EntryProcessor<K, V, ?> entryProcessor, long ttl, long expireTime) { GridCacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode(); @@ -294,7 +296,8 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> topVer, forceTransformBackups, this.updateReq.subjectId(), - this.updateReq.taskNameHash()); + this.updateReq.taskNameHash(), + forceTransformBackups ? this.updateReq.invokeArguments() : null); mappings.put(nodeId, updateReq); } @@ -308,7 +311,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> entry.keyBytes(), val, valBytes, - transformC, + entryProcessor, ttl, expireTime); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 2b68734..9441bd3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -10,7 +10,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht.atomic; import org.apache.ignite.*; -import org.apache.ignite.lang.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; @@ -20,6 +19,7 @@ import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; import java.io.*; import java.nio.*; import java.util.*; @@ -111,23 +111,30 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp @GridDirectVersion(2) private boolean forceTransformBackups; - /** Transform closures. */ + /** Entry processors. */ @GridDirectTransient - private List<IgniteClosure<V, V>> transformClos; + private List<EntryProcessor<K, V, ?>> entryProcessors; - /** Transform closure bytes. */ + /** Entry processors bytes. */ @GridDirectCollection(byte[].class) @GridDirectVersion(2) - private List<byte[]> transformClosBytes; + private List<byte[]> entryProcessorsBytes; /** Near transform closures. */ @GridDirectTransient - private List<IgniteClosure<V, V>> nearTransformClos; + private List<EntryProcessor<K, V, ?>> nearEntryProcessors; /** Near transform closures bytes. */ @GridDirectCollection(byte[].class) @GridDirectVersion(2) - private List<byte[]> nearTransformClosBytes; + private List<byte[]> nearEntryProcessorsBytes; + + /** Optional arguments for entry processor. */ + @GridDirectTransient + private Object[] invokeArgs; + + /** Filter bytes. */ + private byte[][] invokeArgsBytes; /** Subject ID. */ @GridDirectVersion(3) @@ -151,6 +158,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp * @param nodeId Node ID. * @param futVer Future version. * @param writeVer Write version for cache values. + * @param invokeArgs Optional arguments for entry processor. * @param syncMode Cache write synchronization mode. * @param topVer Topology version. * @param forceTransformBackups Force transform backups flag. @@ -166,8 +174,11 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp long topVer, boolean forceTransformBackups, UUID subjId, - int taskNameHash + int taskNameHash, + Object[] invokeArgs ) { + assert invokeArgs == null || forceTransformBackups; + this.cacheId = cacheId; this.nodeId = nodeId; this.futVer = futVer; @@ -177,13 +188,14 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp this.forceTransformBackups = forceTransformBackups; this.subjId = subjId; this.taskNameHash = taskNameHash; + this.invokeArgs = invokeArgs; keys = new ArrayList<>(); keyBytes = new ArrayList<>(); if (forceTransformBackups) { - transformClos = new ArrayList<>(); - transformClosBytes = new ArrayList<>(); + entryProcessors = new ArrayList<>(); + entryProcessorsBytes = new ArrayList<>(); } else { vals = new ArrayList<>(); @@ -203,7 +215,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp * @param keyBytes Key bytes, if key was already serialized. * @param val Value, {@code null} if should be removed. * @param valBytes Value bytes, {@code null} if should be removed. - * @param transformC Transform closure. + * @param entryProcessor Entry processor. * @param ttl TTL (optional). * @param drExpireTime DR expire time (optional). * @param drVer DR version (optional). @@ -212,15 +224,15 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp @Nullable byte[] keyBytes, @Nullable V val, @Nullable byte[] valBytes, - IgniteClosure<V, V> transformC, + EntryProcessor<K, V, ?> entryProcessor, long ttl, long drExpireTime, @Nullable GridCacheVersion drVer) { keys.add(key); this.keyBytes.add(keyBytes); - if (forceTransformBackups && transformC != null) - transformClos.add(transformC); + if (forceTransformBackups && entryProcessor != null) + entryProcessors.add(entryProcessor); else { vals.add(val); this.valBytes.add(valBytes != null ? GridCacheValueBytes.marshaled(valBytes) : null); @@ -270,7 +282,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp * @param keyBytes Key bytes, if key was already serialized. * @param val Value, {@code null} if should be removed. * @param valBytes Value bytes, {@code null} if should be removed. - * @param transformC Transform closure. + * @param entryProcessor Entry processor. * @param ttl TTL. * @param expireTime Expire time. */ @@ -278,7 +290,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp @Nullable byte[] keyBytes, @Nullable V val, @Nullable byte[] valBytes, - IgniteClosure<V, V> transformC, + EntryProcessor<K, V, ?> entryProcessor, long ttl, long expireTime) { @@ -287,8 +299,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp nearKeyBytes = new ArrayList<>(); if (forceTransformBackups) { - nearTransformClos = new ArrayList<>(); - nearTransformClosBytes = new ArrayList<>(); + nearEntryProcessors = new ArrayList<>(); + nearEntryProcessorsBytes = new ArrayList<>(); } else { nearVals = new ArrayList<>(); @@ -300,9 +312,9 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp nearKeyBytes.add(keyBytes); if (forceTransformBackups) { - assert transformC != null; + assert entryProcessor != null; - nearTransformClos.add(transformC); + nearEntryProcessors.add(entryProcessor); } else { nearVals.add(val); @@ -465,10 +477,10 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp /** * @param idx Key index. - * @return Transform closure. + * @return Entry processor. */ - @Nullable public IgniteClosure<V, V> transformClosure(int idx) { - return transformClos == null ? null : transformClos.get(idx); + @Nullable public EntryProcessor<K, V, ?> entryProcessor(int idx) { + return entryProcessors == null ? null : entryProcessors.get(idx); } /** @@ -497,8 +509,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp * @param idx Key index. * @return Transform closure. */ - @Nullable public IgniteClosure<V, V> nearTransformClosure(int idx) { - return nearTransformClos == null ? null : nearTransformClos.get(idx); + @Nullable public EntryProcessor<K, V, ?> nearEntryProcessor(int idx) { + return nearEntryProcessors == null ? null : nearEntryProcessors.get(idx); } /** @@ -615,6 +627,13 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp return -1L; } + /** + * @return Optional arguments for entry processor. + */ + @Nullable public Object[] invokeArguments() { + return invokeArgs; + } + /** {@inheritDoc} * @param ctx*/ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { @@ -623,14 +642,17 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp keyBytes = marshalCollection(keys, ctx); valBytes = marshalValuesCollection(vals, ctx); - if (forceTransformBackups) - transformClosBytes = marshalCollection(transformClos, ctx); + if (forceTransformBackups) { + invokeArgsBytes = marshalInvokeArguments(invokeArgs, ctx); + + entryProcessorsBytes = marshalCollection(entryProcessors, ctx); + } nearKeyBytes = marshalCollection(nearKeys, ctx); nearValBytes = marshalValuesCollection(nearVals, ctx); if (forceTransformBackups) - nearTransformClosBytes = marshalCollection(nearTransformClos, ctx); + nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, ctx); } /** {@inheritDoc} */ @@ -640,14 +662,17 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp keys = unmarshalCollection(keyBytes, ctx, ldr); vals = unmarshalValueBytesCollection(valBytes, ctx, ldr); - if (forceTransformBackups) - transformClos = unmarshalCollection(transformClosBytes, ctx, ldr); + if (forceTransformBackups) { + entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); + + invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr); + } nearKeys = unmarshalCollection(nearKeyBytes, ctx, ldr); nearVals = unmarshalValueBytesCollection(nearValBytes, ctx, ldr); if (forceTransformBackups) - nearTransformClos = unmarshalCollection(nearTransformClosBytes, ctx, ldr); + nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr); } /** {@inheritDoc} */ @@ -683,10 +708,10 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp _clone.nearVals = nearVals; _clone.nearValBytes = nearValBytes; _clone.forceTransformBackups = forceTransformBackups; - _clone.transformClos = transformClos; - _clone.transformClosBytes = transformClosBytes; - _clone.nearTransformClos = nearTransformClos; - _clone.nearTransformClosBytes = nearTransformClosBytes; + _clone.entryProcessors = entryProcessors; + _clone.entryProcessorsBytes = entryProcessorsBytes; + _clone.nearEntryProcessors = nearEntryProcessors; + _clone.nearEntryProcessorsBytes = nearEntryProcessorsBytes; _clone.nearExpireTimes = nearExpireTimes; _clone.nearTtls = nearTtls; _clone.subjId = subjId; @@ -872,7 +897,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp if (commState.cur == NULL) commState.cur = commState.it.next(); - if (!commState.putValueBytes((GridCacheValueBytes)commState.cur)) + if (!commState.putValueBytes((GridCacheValueBytes) commState.cur)) return false; commState.cur = NULL; @@ -893,12 +918,12 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp commState.idx++; case 16: - if (nearTransformClosBytes != null) { + if (nearEntryProcessorsBytes != null) { if (commState.it == null) { - if (!commState.putInt(nearTransformClosBytes.size())) + if (!commState.putInt(nearEntryProcessorsBytes.size())) return false; - commState.it = nearTransformClosBytes.iterator(); + commState.it = nearEntryProcessorsBytes.iterator(); } while (commState.it.hasNext() || commState.cur != NULL) { @@ -920,12 +945,12 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp commState.idx++; case 17: - if (transformClosBytes != null) { + if (entryProcessorsBytes != null) { if (commState.it == null) { - if (!commState.putInt(transformClosBytes.size())) + if (!commState.putInt(entryProcessorsBytes.size())) return false; - commState.it = transformClosBytes.iterator(); + commState.it = entryProcessorsBytes.iterator(); } while (commState.it.hasNext() || commState.cur != NULL) { @@ -1213,8 +1238,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp } if (commState.readSize >= 0) { - if (nearTransformClosBytes == null) - nearTransformClosBytes = new ArrayList<>(commState.readSize); + if (nearEntryProcessorsBytes == null) + nearEntryProcessorsBytes = new ArrayList<>(commState.readSize); for (int i = commState.readItems; i < commState.readSize; i++) { byte[] _val = commState.getByteArray(); @@ -1222,7 +1247,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp if (_val == BYTE_ARR_NOT_READ) return false; - nearTransformClosBytes.add((byte[])_val); + nearEntryProcessorsBytes.add((byte[]) _val); commState.readItems++; } @@ -1242,8 +1267,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp } if (commState.readSize >= 0) { - if (transformClosBytes == null) - transformClosBytes = new ArrayList<>(commState.readSize); + if (entryProcessorsBytes == null) + entryProcessorsBytes = new ArrayList<>(commState.readSize); for (int i = commState.readItems; i < commState.readSize; i++) { byte[] _val = commState.getByteArray(); @@ -1251,7 +1276,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp if (_val == BYTE_ARR_NOT_READ) return false; - transformClosBytes.add((byte[])_val); + entryProcessorsBytes.add((byte[])_val); commState.readItems++; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 6b233e7..1bc013a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -70,6 +70,9 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) private Collection<?> vals; + /** Optional arguments for entry processor. */ + private Object[] invokeArgs; + /** DR put values. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) private Collection<GridCacheDrInfo<V>> drPutVals; @@ -158,6 +161,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> * @param syncMode Write synchronization mode. * @param keys Keys to update. * @param vals Values or transform closure. + * @param invokeArgs Optional arguments for entry processor. * @param drPutVals DR put values (optional). * @param drRmvVals DR remove values (optional). * @param retval Return value require flag. @@ -175,6 +179,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> GridCacheOperation op, Collection<? extends K> keys, @Nullable Collection<?> vals, + @Nullable Object[] invokeArgs, @Nullable Collection<GridCacheDrInfo<V>> drPutVals, @Nullable Collection<GridCacheVersion> drRmvVals, final boolean retval, @@ -186,6 +191,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> int taskNameHash ) { super(cctx.kernalContext()); + this.rawRetval = rawRetval; assert vals == null || vals.size() == keys.size(); @@ -200,6 +206,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> this.op = op; this.keys = keys; this.vals = vals; + this.invokeArgs = invokeArgs; this.drPutVals = drPutVals; this.drRmvVals = drRmvVals; this.retval = retval; @@ -366,7 +373,12 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> if (res.error() != null) addFailedKeys(req.keys(), res.error()); else { - if (req.fastMap() && req.hasPrimary()) + if (op == TRANSFORM) { + assert !req.fastMap(); + + addInvokeResults(res.returnValue()); + } + else if (req.fastMap() && req.hasPrimary()) opRes = res.returnValue(); } @@ -464,7 +476,9 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> * @param remap Flag indicating if this is partial remap for this future. * @param oldNodeId Old node ID if was remap. */ - private void map0(GridDiscoveryTopologySnapshot topSnapshot, Collection<? extends K> keys, boolean remap, + private void map0(GridDiscoveryTopologySnapshot topSnapshot, + Collection<? extends K> keys, + boolean remap, @Nullable UUID oldNodeId) { assert oldNodeId == null || remap; @@ -560,6 +574,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> retval, op == TRANSFORM && cctx.hasFlag(FORCE_TRANSFORM_BACKUP), expiryPlc, + invokeArgs, filter, subjId, taskNameHash); @@ -666,6 +681,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> retval, op == TRANSFORM && cctx.hasFlag(FORCE_TRANSFORM_BACKUP), expiryPlc, + invokeArgs, filter, subjId, taskNameHash); @@ -820,6 +836,22 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> } /** + * @param ret Result from single node. + */ + private synchronized void addInvokeResults(GridCacheReturn<Object> ret) { + assert op == TRANSFORM : op; + assert ret.value() instanceof Map : ret.value(); + + if (opRes != null) { + Map<Object, Object> map = (Map<Object, Object>)opRes.value(); + + map.putAll((Map<Object, Object>)ret.value()); + } + else + opRes = ret; + } + + /** * @param failedKeys Failed keys. * @param err Error cause. * @return Root {@link GridCachePartialUpdateException}.