Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1 d3d678709 -> 180720f15


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index fde03bd..391a5b2 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -22,6 +22,7 @@ import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.expiry.*;
+import javax.cache.processor.*;
 import java.io.*;
 import java.nio.*;
 import java.util.*;
@@ -77,6 +78,13 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
     @GridDirectCollection(GridCacheValueBytes.class)
     private List<GridCacheValueBytes> valBytes;
 
+    /** Optional arguments for entry processor. */
+    @GridDirectTransient
+    private Object[] invokeArgs;
+
+    /** Filter bytes. */
+    private byte[][] invokeArgsBytes;
+
     /** DR versions. */
     @GridDirectCollection(GridCacheVersion.class)
     private List<GridCacheVersion> drVers;
@@ -91,6 +99,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
     private boolean retval;
 
     /** Expiry policy. */
+    @GridDirectTransient
     private ExpiryPolicy expiryPlc;
 
     /** Expiry policy bytes. */
@@ -137,8 +146,12 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
      * @param syncMode Synchronization mode.
      * @param op Cache update operation.
      * @param retval Return value required flag.
+     * @param forceTransformBackups Force transform backups flag.
      * @param expiryPlc Expiry policy.
+     * @param invokeArgs Optional arguments for entry processor.
      * @param filter Optional filter for atomic check.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
      */
     public GridNearAtomicUpdateRequest(
         int cacheId,
@@ -151,7 +164,8 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
         GridCacheOperation op,
         boolean retval,
         boolean forceTransformBackups,
-        ExpiryPolicy expiryPlc,
+        @Nullable ExpiryPolicy expiryPlc,
+        @Nullable Object[] invokeArgs,
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
         @Nullable UUID subjId,
         int taskNameHash
@@ -168,6 +182,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
         this.retval = retval;
         this.forceTransformBackups = forceTransformBackups;
         this.expiryPlc = expiryPlc;
+        this.invokeArgs = invokeArgs;
         this.filter = filter;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
@@ -273,10 +288,14 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
      * @param drVer DR version (optional).
      * @param primary If given key is primary on this mapping.
      */
-    public void addUpdateEntry(K key, @Nullable Object val, long drTtl, long 
drExpireTime,
-        @Nullable GridCacheVersion drVer, boolean primary) {
+    public void addUpdateEntry(K key,
+        @Nullable Object val,
+        long drTtl,
+        long drExpireTime,
+        @Nullable GridCacheVersion drVer,
+        boolean primary) {
         assert val != null || op == DELETE;
-        assert op != TRANSFORM || val instanceof IgniteClosure;
+        assert op != TRANSFORM || val instanceof EntryProcessor;
 
         keys.add(key);
         vals.add(val);
@@ -342,6 +361,13 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
     }
 
     /**
+     * @return Optional arguments for entry processor.
+     */
+    @Nullable public Object[] invokeArguments() {
+        return invokeArgs;
+    }
+
+    /**
      * @param idx Key index.
      * @return Value.
      */
@@ -353,12 +379,12 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
 
     /**
      * @param idx Key index.
-     * @return Transform closure.
+     * @return Entry processor.
      */
-    public IgniteClosure<V, V> transformClosure(int idx) {
+    public EntryProcessor<K, V, ?> entryProcessor(int idx) {
         assert op == TRANSFORM : op;
 
-        return (IgniteClosure<V, V>)vals.get(idx);
+        return (EntryProcessor<K, V, ?>)vals.get(idx);
     }
 
     /**
@@ -490,6 +516,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
         keyBytes = marshalCollection(keys, ctx);
         valBytes = marshalValuesCollection(vals, ctx);
         filterBytes = marshalFilter(filter, ctx);
+        invokeArgsBytes = marshalInvokeArguments(invokeArgs, ctx);
 
         if (expiryPlc != null)
             expiryPlcBytes = CU.marshal(ctx, new 
IgniteExternalizableExpiryPolicy(expiryPlc));
@@ -502,6 +529,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
         keys = unmarshalCollection(keyBytes, ctx, ldr);
         vals = unmarshalValueBytesCollection(valBytes, ctx, ldr);
         filter = unmarshalFilter(filterBytes, ctx, ldr);
+        invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
 
         if (expiryPlcBytes != null)
             expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, ldr);
@@ -534,11 +562,14 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
         _clone.keyBytes = keyBytes;
         _clone.vals = vals;
         _clone.valBytes = valBytes;
+        _clone.invokeArgs = invokeArgs;
+        _clone.invokeArgsBytes = invokeArgsBytes;
         _clone.drVers = drVers;
         _clone.drTtls = drTtls;
         _clone.drExpireTimes = drExpireTimes;
         _clone.retval = retval;
         _clone.expiryPlc = expiryPlc;
+        _clone.expiryPlcBytes = expiryPlcBytes;
         _clone.filter = filter;
         _clone.filterBytes = filterBytes;
         _clone.hasPrimary = hasPrimary;
@@ -603,12 +634,18 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
                 commState.idx++;
 
             case 6:
-                if (!commState.putBoolean(fastMap))
+                if (!commState.putByteArray(expiryPlcBytes))
                     return false;
 
                 commState.idx++;
 
             case 7:
+                if (!commState.putBoolean(fastMap))
+                    return false;
+
+                commState.idx++;
+
+            case 8:
                 if (filterBytes != null) {
                     if (commState.it == null) {
                         if (!commState.putInt(filterBytes.length))
@@ -635,19 +672,46 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
 
                 commState.idx++;
 
-            case 8:
+            case 9:
                 if (!commState.putCacheVersion(futVer))
                     return false;
 
                 commState.idx++;
 
-            case 9:
+            case 10:
                 if (!commState.putBoolean(hasPrimary))
                     return false;
 
                 commState.idx++;
 
-            case 10:
+            case 11:
+                if (invokeArgsBytes != null) {
+                    if (commState.it == null) {
+                        if (!commState.putInt(invokeArgsBytes.length))
+                            return false;
+
+                        commState.it = arrayIterator(invokeArgsBytes);
+                    }
+
+                    while (commState.it.hasNext() || commState.cur != NULL) {
+                        if (commState.cur == NULL)
+                            commState.cur = commState.it.next();
+
+                        if (!commState.putByteArray((byte[])commState.cur))
+                            return false;
+
+                        commState.cur = NULL;
+                    }
+
+                    commState.it = null;
+                } else {
+                    if (!commState.putInt(-1))
+                        return false;
+                }
+
+                commState.idx++;
+
+            case 12:
                 if (keyBytes != null) {
                     if (commState.it == null) {
                         if (!commState.putInt(keyBytes.size()))
@@ -674,43 +738,37 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
 
                 commState.idx++;
 
-            case 11:
+            case 13:
                 if (!commState.putEnum(op))
                     return false;
 
                 commState.idx++;
 
-            case 12:
+            case 14:
                 if (!commState.putBoolean(retval))
                     return false;
 
                 commState.idx++;
 
-            case 13:
+            case 15:
                 if (!commState.putEnum(syncMode))
                     return false;
 
                 commState.idx++;
 
-            case 14:
+            case 16:
                 if (!commState.putLong(topVer))
                     return false;
 
                 commState.idx++;
 
-            case 15:
-                if (!commState.putByteArray(expiryPlcBytes))
-                    return false;
-
-                commState.idx++;
-
-            case 16:
+            case 17:
                 if (!commState.putCacheVersion(updateVer))
                     return false;
 
                 commState.idx++;
 
-            case 17:
+            case 18:
                 if (valBytes != null) {
                     if (commState.it == null) {
                         if (!commState.putInt(valBytes.size()))
@@ -737,19 +795,19 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
 
                 commState.idx++;
 
-            case 18:
+            case 19:
                 if (!commState.putBoolean(forceTransformBackups))
                     return false;
 
                 commState.idx++;
 
-            case 19:
+            case 20:
                 if (!commState.putUuid(subjId))
                     return false;
 
                 commState.idx++;
 
-            case 20:
+            case 21:
                 if (!commState.putInt(taskNameHash))
                     return false;
 
@@ -819,6 +877,16 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
                 commState.idx++;
 
             case 6:
+                byte[] expiryPlcBytes0 = commState.getByteArray();
+
+                if (expiryPlcBytes0 == BYTE_ARR_NOT_READ)
+                    return false;
+
+                expiryPlcBytes = expiryPlcBytes0;
+
+                commState.idx++;
+
+            case 7:
                 if (buf.remaining() < 1)
                     return false;
 
@@ -826,7 +894,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
 
                 commState.idx++;
 
-            case 7:
+            case 8:
                 if (commState.readSize == -1) {
                     if (buf.remaining() < 4)
                         return false;
@@ -855,7 +923,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
 
                 commState.idx++;
 
-            case 8:
+            case 9:
                 GridCacheVersion futVer0 = commState.getCacheVersion();
 
                 if (futVer0 == CACHE_VER_NOT_READ)
@@ -865,7 +933,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
 
                 commState.idx++;
 
-            case 9:
+            case 10:
                 if (buf.remaining() < 1)
                     return false;
 
@@ -873,7 +941,36 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
 
                 commState.idx++;
 
-            case 10:
+            case 11:
+                if (commState.readSize == -1) {
+                    if (buf.remaining() < 4)
+                        return false;
+
+                    commState.readSize = commState.getInt();
+                }
+
+                if (commState.readSize >= 0) {
+                    if (invokeArgsBytes == null)
+                        invokeArgsBytes = new byte[commState.readSize][];
+
+                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
+                        byte[] _val = commState.getByteArray();
+
+                        if (_val == BYTE_ARR_NOT_READ)
+                            return false;
+
+                        invokeArgsBytes[i] = (byte[])_val;
+
+                        commState.readItems++;
+                    }
+                }
+
+                commState.readSize = -1;
+                commState.readItems = 0;
+
+                commState.idx++;
+
+            case 12:
                 if (commState.readSize == -1) {
                     if (buf.remaining() < 4)
                         return false;
@@ -902,7 +999,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
 
                 commState.idx++;
 
-            case 11:
+            case 13:
                 if (buf.remaining() < 1)
                     return false;
 
@@ -912,7 +1009,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
 
                 commState.idx++;
 
-            case 12:
+            case 14:
                 if (buf.remaining() < 1)
                     return false;
 
@@ -920,7 +1017,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
 
                 commState.idx++;
 
-            case 13:
+            case 15:
                 if (buf.remaining() < 1)
                     return false;
 
@@ -930,7 +1027,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
 
                 commState.idx++;
 
-            case 14:
+            case 16:
                 if (buf.remaining() < 8)
                     return false;
 
@@ -938,17 +1035,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
 
                 commState.idx++;
 
-            case 15:
-                byte[] expiryPlcBytes0 = commState.getByteArray();
-
-                if (expiryPlcBytes0 == BYTE_ARR_NOT_READ)
-                    return false;
-
-                expiryPlcBytes = expiryPlcBytes0;
-
-                commState.idx++;
-
-            case 16:
+            case 17:
                 GridCacheVersion updateVer0 = commState.getCacheVersion();
 
                 if (updateVer0 == CACHE_VER_NOT_READ)
@@ -958,7 +1045,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
 
                 commState.idx++;
 
-            case 17:
+            case 18:
                 if (commState.readSize == -1) {
                     if (buf.remaining() < 4)
                         return false;
@@ -987,7 +1074,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
 
                 commState.idx++;
 
-            case 18:
+            case 19:
                 if (buf.remaining() < 1)
                     return false;
 
@@ -995,7 +1082,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
 
                 commState.idx++;
 
-            case 19:
+            case 20:
                 UUID subjId0 = commState.getUuid();
 
                 if (subjId0 == UUID_NOT_READ)
@@ -1005,7 +1092,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> im
 
                 commState.idx++;
 
-            case 20:
+            case 21:
                 if (buf.remaining() < 4)
                     return false;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
index d7e32b3..07e9785 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -25,6 +25,7 @@ import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
 
@@ -213,6 +214,7 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
                         op,
                         val,
                         valBytes,
+                        null,
                         /*write-through*/false,
                         /*retval*/false,
                         /**expiry policy*/null,
@@ -295,9 +297,9 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
 
                         V val = req.nearValue(i);
                         byte[] valBytes = req.nearValueBytes(i);
-                        IgniteClosure<V, V> transform = 
req.nearTransformClosure(i);
+                        EntryProcessor<K, V, ?> entryProcessor = 
req.nearEntryProcessor(i);
 
-                        GridCacheOperation op = transform != null ? TRANSFORM :
+                        GridCacheOperation op = entryProcessor != null ? 
TRANSFORM :
                             (val != null || valBytes != null) ?
                                 UPDATE :
                                 DELETE;
@@ -313,8 +315,9 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
                             nodeId,
                             nodeId,
                             op,
-                            op == TRANSFORM ? transform : val,
+                            op == TRANSFORM ? entryProcessor : val,
                             valBytes,
+                            op == TRANSFORM ? req.invokeArguments() : null,
                             /*write-through*/false,
                             /*retval*/false,
                             null,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderInvokeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderInvokeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderInvokeTest.java
new file mode 100644
index 0000000..8f8b479
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderInvokeTest.java
@@ -0,0 +1,47 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*;
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicPrimaryWriteOrderInvokeTest extends 
IgniteCacheInvokeAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() {
+        return PRIMARY;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheDistributionMode distributionMode() {
+        return PARTITIONED_ONLY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest.java
new file mode 100644
index 0000000..a674d76
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest.java
@@ -0,0 +1,23 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.store.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest extends
+    IgniteCacheAtomicPrimaryWriteOrderInvokeTest {
+    /** {@inheritDoc} */
+    @Override protected GridCacheStore<?, ?> cacheStore() {
+        return new TestStore();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
new file mode 100644
index 0000000..1da627b
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
@@ -0,0 +1,367 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.testframework.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.processor.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public abstract class IgniteCacheInvokeAbstractTest extends 
IgniteCacheAbstractTest {
+    /** */
+    private Integer lastKey = 0;
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvoke() throws Exception {
+        // TODO IGNITE41 test with forceTransformBackups.
+
+        final IgniteCache<Integer, Integer> cache = jcache();
+
+        IncrementProcessor incProcessor = new IncrementProcessor();
+
+        for (final Integer key : keys()) {
+            log.info("Test invoke [key=" + key + ']');
+
+            cache.remove(key);
+
+            Integer res = cache.invoke(key, incProcessor);
+
+            assertEquals(-1, (int)res);
+
+            checkValue(key, 1);
+
+            res = cache.invoke(key, incProcessor);
+
+            assertEquals(1, (int)res);
+
+            checkValue(key, 2);
+
+            res = cache.invoke(key, incProcessor);
+
+            assertEquals(2, (int)res);
+
+            checkValue(key, 3);
+
+            res = cache.invoke(key, new ArgumentsSumProcessor(), 10, 20, 30);
+
+            assertEquals(3, (int)res);
+
+            checkValue(key, 63);
+
+            GridTestUtils.assertThrows(log, new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    cache.invoke(key, new ExceptionProcessor(63));
+
+                    return null;
+                }
+            }, EntryProcessorException.class, "Test processor exception.");
+
+            checkValue(key, 63);
+
+            assertNull(cache.invoke(key, new RemoveProcessor(63)));
+
+            checkValue(key, null);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeAll() throws Exception {
+        IgniteCache<Integer, Integer> cache = jcache();
+
+        invokeAll(cache, new HashSet<>(primaryKeys(cache, 3, 0)));
+
+        invokeAll(cache, new HashSet<>(backupKeys(cache, 3, 0)));
+
+        invokeAll(cache, new HashSet<>(nearKeys(cache, 3, 0)));
+
+        Set<Integer> keys = new HashSet<>();
+
+        keys.addAll(primaryKeys(jcache(0), 3, 0));
+        keys.addAll(primaryKeys(jcache(1), 3, 0));
+        keys.addAll(primaryKeys(jcache(2), 3, 0));
+
+        invokeAll(cache, keys);
+
+        keys = new HashSet<>();
+
+        for (int i = 0; i < 1000; i++)
+            keys.add(i);
+
+        invokeAll(cache, keys);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param keys Keys.
+     */
+    private void invokeAll(IgniteCache<Integer, Integer> cache, Set<Integer> 
keys) {
+        cache.removeAll(keys);
+
+        log.info("Test invokeAll [keys=" + keys + ']');
+
+        IncrementProcessor incProcessor = new IncrementProcessor();
+
+        Map<Integer, EntryProcessorResult<Integer>> resMap = 
cache.invokeAll(keys, incProcessor);
+
+        Map<Object, Object> exp = new HashMap<>();
+
+        for (Integer key : keys)
+            exp.put(key, -1);
+
+        checkResult(resMap, exp);
+
+        for (Integer key : keys)
+            checkValue(key, 1);
+
+        resMap = cache.invokeAll(keys, incProcessor);
+
+        exp = new HashMap<>();
+
+        for (Integer key : keys)
+            exp.put(key, 1);
+
+        checkResult(resMap, exp);
+
+        for (Integer key : keys)
+            checkValue(key, 2);
+
+        resMap = cache.invokeAll(keys, new ArgumentsSumProcessor(), 10, 20, 
30);
+
+        for (Integer key : keys)
+            exp.put(key, 3);
+
+        checkResult(resMap, exp);
+
+        for (Integer key : keys)
+            checkValue(key, 62);
+
+        resMap = cache.invokeAll(keys, new ExceptionProcessor(null));
+
+        for (Integer key : keys) {
+            final EntryProcessorResult<Integer> res = resMap.get(key);
+
+            assertNotNull("No result for " + key);
+
+            GridTestUtils.assertThrows(log, new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    res.get();
+
+                    return null;
+                }
+            }, EntryProcessorException.class, "Test processor exception.");
+        }
+
+        for (Integer key : keys)
+            checkValue(key, 62);
+
+        resMap = cache.invokeAll(keys, new RemoveProcessor(null));
+
+        for (Integer key : keys) {
+            final EntryProcessorResult<Integer> res = resMap.get(key);
+
+            assertNotNull("No result for " + key);
+
+            assertNull(res.get());
+        }
+
+        for (Integer key : keys)
+            checkValue(key, null);
+    }
+
+    /**
+     * @param resMap Result map.
+     * @param exp Expected results.
+     */
+    private void checkResult(Map<Integer, EntryProcessorResult<Integer>> 
resMap, Map<Object, Object> exp) {
+        assertNotNull(resMap);
+
+        assertEquals(exp.size(), resMap.size());
+
+        for (Map.Entry<Object, Object> expVal : exp.entrySet()) {
+            EntryProcessorResult<Integer> res = resMap.get(expVal.getKey());
+
+            assertNotNull("No result for " + expVal.getKey());
+
+            assertEquals("Unexpected result for " + expVal.getKey(), 
res.get(), expVal.getValue());
+        }
+    }
+
+    /**
+     * @param key Key.
+     * @param expVal Expected value.
+     */
+    protected void checkValue(Object key, @Nullable Object expVal) {
+        if (expVal != null) {
+            for (int i = 0; i < gridCount(); i++) {
+                IgniteCache<Object, Object> cache = jcache(i);
+
+                Object val = cache.localPeek(key);
+
+                if (val == null)
+                    
assertFalse(cache(0).affinity().isPrimaryOrBackup(ignite(i).cluster().localNode(),
 key));
+                else
+                    assertEquals("Unexpected value for grid " + i, expVal, 
val);
+            }
+        }
+        else {
+            for (int i = 0; i < gridCount(); i++) {
+                IgniteCache<Object, Object> cache = jcache(i);
+
+                assertNull("Unexpected non null value for grid " + i, 
cache.localPeek(key));
+            }
+        }
+    }
+
+    /**
+     * @return Test keys.
+     * @throws Exception If failed.
+     */
+    private Collection<Integer> keys() throws Exception {
+        GridCache<Integer, Object> cache = cache(0);
+
+        ArrayList<Integer> keys = new ArrayList<>();
+
+        keys.add(primaryKeys(cache, 1, lastKey).get(0));
+
+        if (gridCount() > 1) {
+            keys.add(backupKeys(cache, 1, lastKey).get(0));
+
+            if (cache.configuration().getCacheMode() != REPLICATED)
+                keys.add(nearKeys(cache, 1, lastKey).get(0));
+        }
+
+        lastKey = Collections.max(keys) + 1;
+
+        return keys;
+    }
+
+    /**
+     *
+     */
+    private static class ArgumentsSumProcessor implements 
EntryProcessor<Integer, Integer, Integer> {
+        /** {@inheritDoc} */
+        @Override public Integer process(MutableEntry<Integer, Integer> e, 
Object... args)
+            throws EntryProcessorException {
+            assertEquals(3, args.length);
+            assertEquals(10, args[0]);
+            assertEquals(20, args[1]);
+            assertEquals(30, args[2]);
+
+            assertTrue(e.exists());
+
+            Integer res = e.getValue();
+
+            for (Object arg : args)
+                res += (Integer)arg;
+
+            e.setValue(res);
+
+            return args.length;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class IncrementProcessor implements EntryProcessor<Integer, 
Integer, Integer> {
+        /** {@inheritDoc} */
+        @Override public Integer process(MutableEntry<Integer, Integer> e,
+            Object... arguments) throws EntryProcessorException {
+            if (e.exists()) {
+                Integer val = e.getValue();
+
+                assertNotNull(val);
+
+                e.setValue(val + 1);
+
+                assertTrue(e.exists());
+
+                assertEquals(val + 1, (int) e.getValue());
+
+                return val;
+            }
+            else {
+                e.setValue(1);
+
+                return -1;
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class RemoveProcessor implements EntryProcessor<Integer, 
Integer, Integer> {
+        /** */
+        private Integer expVal;
+
+        /**
+         * @param expVal Expected value.
+         */
+        RemoveProcessor(@Nullable Integer expVal) {
+            this.expVal = expVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer process(MutableEntry<Integer, Integer> e,
+            Object... arguments) throws EntryProcessorException {
+            assertTrue(e.exists());
+
+            if (expVal != null)
+                assertEquals(expVal, e.getValue());
+
+            e.remove();
+
+            assertFalse(e.exists());
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ExceptionProcessor implements EntryProcessor<Integer, 
Integer, Integer> {
+        /** */
+        private Integer expVal;
+
+        /**
+         * @param expVal Expected value.
+         */
+        ExceptionProcessor(@Nullable Integer expVal) {
+            this.expVal = expVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer process(MutableEntry<Integer, Integer> e,
+            Object... arguments) throws EntryProcessorException {
+            assertTrue(e.exists());
+
+            if (expVal != null)
+                assertEquals(expVal, e.getValue());
+
+            throw new EntryProcessorException("Test processor exception.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
index 64218d2..6f9d32c 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
@@ -447,6 +447,7 @@ public class GridCacheTestEntryEx<K, V> extends 
GridMetadataAwareAdapter impleme
         GridCacheOperation op,
         @Nullable Object val,
         @Nullable byte[] valBytes,
+        @Nullable Object[] invokeArgs,
         boolean writeThrough,
         boolean retval,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
@@ -464,7 +465,15 @@ public class GridCacheTestEntryEx<K, V> extends 
GridMetadataAwareAdapter impleme
         UUID subjId,
         String taskName) throws IgniteCheckedException,
         GridCacheEntryRemovedException {
-        return new GridCacheUpdateAtomicResult<>(true, rawPut((V)val, 0), 
(V)val, 0L, 0L, null, null, true);
+        return new GridCacheUpdateAtomicResult<>(true,
+            rawPut((V)val, 0),
+            (V)val,
+            null,
+            0L,
+            0L,
+            null,
+            null,
+            true);
     }
 
     /** @inheritDoc */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
 
b/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
index 8e858da..9fc3ff7 100644
--- 
a/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
@@ -24,6 +24,7 @@ import 
org.gridgain.grid.kernal.processors.cache.distributed.near.*;
 import org.gridgain.grid.kernal.processors.cache.local.*;
 import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.testframework.*;
 import org.gridgain.testframework.junits.*;
 import org.jetbrains.annotations.*;
 
@@ -429,6 +430,48 @@ public abstract class GridCommonAbstractTest extends 
GridAbstractTest {
     }
 
     /**
+     * @param cache Cache.
+     * @param cnt Keys count.
+     * @param startFrom Start value for keys search.
+     * @return Collection of keys for which given cache is primary.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected List<Integer> primaryKeys(IgniteCache<?, ?> cache, int cnt, int 
startFrom)
+        throws IgniteCheckedException {
+        GridCacheProjection<?, ?> prj = GridTestUtils.getFieldValue(cache, 
"delegate");
+
+        return primaryKeys(prj, cnt, startFrom);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param cnt Keys count.
+     * @param startFrom Start value for keys search.
+     * @return Collection of keys for which given cache is backup.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected List<Integer> backupKeys(IgniteCache<?, ?> cache, int cnt, int 
startFrom)
+        throws IgniteCheckedException {
+        GridCacheProjection<?, ?> prj = GridTestUtils.getFieldValue(cache, 
"delegate");
+
+        return backupKeys(prj, cnt, startFrom);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param cnt Keys count.
+     * @param startFrom Start value for keys search.
+     * @return Collection of keys for which given cache is neither primary nor 
backup.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected List<Integer> nearKeys(IgniteCache<?, ?> cache, int cnt, int 
startFrom)
+        throws IgniteCheckedException {
+        GridCacheProjection<?, ?> prj = GridTestUtils.getFieldValue(cache, 
"delegate");
+
+        return nearKeys(prj, cnt, startFrom);
+    }
+
+    /**
      * @param comp Compute.
      * @param task Task.
      * @param arg Task argument.

Reply via email to