Repository: incubator-ignite Updated Branches: refs/heads/ignite-1 d3d678709 -> 180720f15
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index fde03bd..391a5b2 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -22,6 +22,7 @@ import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; import javax.cache.expiry.*; +import javax.cache.processor.*; import java.io.*; import java.nio.*; import java.util.*; @@ -77,6 +78,13 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im @GridDirectCollection(GridCacheValueBytes.class) private List<GridCacheValueBytes> valBytes; + /** Optional arguments for entry processor. */ + @GridDirectTransient + private Object[] invokeArgs; + + /** Filter bytes. */ + private byte[][] invokeArgsBytes; + /** DR versions. */ @GridDirectCollection(GridCacheVersion.class) private List<GridCacheVersion> drVers; @@ -91,6 +99,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im private boolean retval; /** Expiry policy. */ + @GridDirectTransient private ExpiryPolicy expiryPlc; /** Expiry policy bytes. */ @@ -137,8 +146,12 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im * @param syncMode Synchronization mode. * @param op Cache update operation. * @param retval Return value required flag. + * @param forceTransformBackups Force transform backups flag. * @param expiryPlc Expiry policy. + * @param invokeArgs Optional arguments for entry processor. * @param filter Optional filter for atomic check. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. */ public GridNearAtomicUpdateRequest( int cacheId, @@ -151,7 +164,8 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im GridCacheOperation op, boolean retval, boolean forceTransformBackups, - ExpiryPolicy expiryPlc, + @Nullable ExpiryPolicy expiryPlc, + @Nullable Object[] invokeArgs, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, @Nullable UUID subjId, int taskNameHash @@ -168,6 +182,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im this.retval = retval; this.forceTransformBackups = forceTransformBackups; this.expiryPlc = expiryPlc; + this.invokeArgs = invokeArgs; this.filter = filter; this.subjId = subjId; this.taskNameHash = taskNameHash; @@ -273,10 +288,14 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im * @param drVer DR version (optional). * @param primary If given key is primary on this mapping. */ - public void addUpdateEntry(K key, @Nullable Object val, long drTtl, long drExpireTime, - @Nullable GridCacheVersion drVer, boolean primary) { + public void addUpdateEntry(K key, + @Nullable Object val, + long drTtl, + long drExpireTime, + @Nullable GridCacheVersion drVer, + boolean primary) { assert val != null || op == DELETE; - assert op != TRANSFORM || val instanceof IgniteClosure; + assert op != TRANSFORM || val instanceof EntryProcessor; keys.add(key); vals.add(val); @@ -342,6 +361,13 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im } /** + * @return Optional arguments for entry processor. + */ + @Nullable public Object[] invokeArguments() { + return invokeArgs; + } + + /** * @param idx Key index. * @return Value. */ @@ -353,12 +379,12 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im /** * @param idx Key index. - * @return Transform closure. + * @return Entry processor. */ - public IgniteClosure<V, V> transformClosure(int idx) { + public EntryProcessor<K, V, ?> entryProcessor(int idx) { assert op == TRANSFORM : op; - return (IgniteClosure<V, V>)vals.get(idx); + return (EntryProcessor<K, V, ?>)vals.get(idx); } /** @@ -490,6 +516,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im keyBytes = marshalCollection(keys, ctx); valBytes = marshalValuesCollection(vals, ctx); filterBytes = marshalFilter(filter, ctx); + invokeArgsBytes = marshalInvokeArguments(invokeArgs, ctx); if (expiryPlc != null) expiryPlcBytes = CU.marshal(ctx, new IgniteExternalizableExpiryPolicy(expiryPlc)); @@ -502,6 +529,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im keys = unmarshalCollection(keyBytes, ctx, ldr); vals = unmarshalValueBytesCollection(valBytes, ctx, ldr); filter = unmarshalFilter(filterBytes, ctx, ldr); + invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr); if (expiryPlcBytes != null) expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, ldr); @@ -534,11 +562,14 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im _clone.keyBytes = keyBytes; _clone.vals = vals; _clone.valBytes = valBytes; + _clone.invokeArgs = invokeArgs; + _clone.invokeArgsBytes = invokeArgsBytes; _clone.drVers = drVers; _clone.drTtls = drTtls; _clone.drExpireTimes = drExpireTimes; _clone.retval = retval; _clone.expiryPlc = expiryPlc; + _clone.expiryPlcBytes = expiryPlcBytes; _clone.filter = filter; _clone.filterBytes = filterBytes; _clone.hasPrimary = hasPrimary; @@ -603,12 +634,18 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; case 6: - if (!commState.putBoolean(fastMap)) + if (!commState.putByteArray(expiryPlcBytes)) return false; commState.idx++; case 7: + if (!commState.putBoolean(fastMap)) + return false; + + commState.idx++; + + case 8: if (filterBytes != null) { if (commState.it == null) { if (!commState.putInt(filterBytes.length)) @@ -635,19 +672,46 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; - case 8: + case 9: if (!commState.putCacheVersion(futVer)) return false; commState.idx++; - case 9: + case 10: if (!commState.putBoolean(hasPrimary)) return false; commState.idx++; - case 10: + case 11: + if (invokeArgsBytes != null) { + if (commState.it == null) { + if (!commState.putInt(invokeArgsBytes.length)) + return false; + + commState.it = arrayIterator(invokeArgsBytes); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putByteArray((byte[])commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 12: if (keyBytes != null) { if (commState.it == null) { if (!commState.putInt(keyBytes.size())) @@ -674,43 +738,37 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; - case 11: + case 13: if (!commState.putEnum(op)) return false; commState.idx++; - case 12: + case 14: if (!commState.putBoolean(retval)) return false; commState.idx++; - case 13: + case 15: if (!commState.putEnum(syncMode)) return false; commState.idx++; - case 14: + case 16: if (!commState.putLong(topVer)) return false; commState.idx++; - case 15: - if (!commState.putByteArray(expiryPlcBytes)) - return false; - - commState.idx++; - - case 16: + case 17: if (!commState.putCacheVersion(updateVer)) return false; commState.idx++; - case 17: + case 18: if (valBytes != null) { if (commState.it == null) { if (!commState.putInt(valBytes.size())) @@ -737,19 +795,19 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; - case 18: + case 19: if (!commState.putBoolean(forceTransformBackups)) return false; commState.idx++; - case 19: + case 20: if (!commState.putUuid(subjId)) return false; commState.idx++; - case 20: + case 21: if (!commState.putInt(taskNameHash)) return false; @@ -819,6 +877,16 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; case 6: + byte[] expiryPlcBytes0 = commState.getByteArray(); + + if (expiryPlcBytes0 == BYTE_ARR_NOT_READ) + return false; + + expiryPlcBytes = expiryPlcBytes0; + + commState.idx++; + + case 7: if (buf.remaining() < 1) return false; @@ -826,7 +894,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; - case 7: + case 8: if (commState.readSize == -1) { if (buf.remaining() < 4) return false; @@ -855,7 +923,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; - case 8: + case 9: GridCacheVersion futVer0 = commState.getCacheVersion(); if (futVer0 == CACHE_VER_NOT_READ) @@ -865,7 +933,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; - case 9: + case 10: if (buf.remaining() < 1) return false; @@ -873,7 +941,36 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; - case 10: + case 11: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (invokeArgsBytes == null) + invokeArgsBytes = new byte[commState.readSize][]; + + for (int i = commState.readItems; i < commState.readSize; i++) { + byte[] _val = commState.getByteArray(); + + if (_val == BYTE_ARR_NOT_READ) + return false; + + invokeArgsBytes[i] = (byte[])_val; + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 12: if (commState.readSize == -1) { if (buf.remaining() < 4) return false; @@ -902,7 +999,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; - case 11: + case 13: if (buf.remaining() < 1) return false; @@ -912,7 +1009,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; - case 12: + case 14: if (buf.remaining() < 1) return false; @@ -920,7 +1017,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; - case 13: + case 15: if (buf.remaining() < 1) return false; @@ -930,7 +1027,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; - case 14: + case 16: if (buf.remaining() < 8) return false; @@ -938,17 +1035,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; - case 15: - byte[] expiryPlcBytes0 = commState.getByteArray(); - - if (expiryPlcBytes0 == BYTE_ARR_NOT_READ) - return false; - - expiryPlcBytes = expiryPlcBytes0; - - commState.idx++; - - case 16: + case 17: GridCacheVersion updateVer0 = commState.getCacheVersion(); if (updateVer0 == CACHE_VER_NOT_READ) @@ -958,7 +1045,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; - case 17: + case 18: if (commState.readSize == -1) { if (buf.remaining() < 4) return false; @@ -987,7 +1074,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; - case 18: + case 19: if (buf.remaining() < 1) return false; @@ -995,7 +1082,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; - case 19: + case 20: UUID subjId0 = commState.getUuid(); if (subjId0 == UUID_NOT_READ) @@ -1005,7 +1092,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; - case 20: + case 21: if (buf.remaining() < 4) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java index d7e32b3..07e9785 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -25,6 +25,7 @@ import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; @@ -213,6 +214,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { op, val, valBytes, + null, /*write-through*/false, /*retval*/false, /**expiry policy*/null, @@ -295,9 +297,9 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { V val = req.nearValue(i); byte[] valBytes = req.nearValueBytes(i); - IgniteClosure<V, V> transform = req.nearTransformClosure(i); + EntryProcessor<K, V, ?> entryProcessor = req.nearEntryProcessor(i); - GridCacheOperation op = transform != null ? TRANSFORM : + GridCacheOperation op = entryProcessor != null ? TRANSFORM : (val != null || valBytes != null) ? UPDATE : DELETE; @@ -313,8 +315,9 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { nodeId, nodeId, op, - op == TRANSFORM ? transform : val, + op == TRANSFORM ? entryProcessor : val, valBytes, + op == TRANSFORM ? req.invokeArguments() : null, /*write-through*/false, /*retval*/false, null, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderInvokeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderInvokeTest.java new file mode 100644 index 0000000..8f8b479 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderInvokeTest.java @@ -0,0 +1,47 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*; +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public class IgniteCacheAtomicPrimaryWriteOrderInvokeTest extends IgniteCacheInvokeAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() { + return PRIMARY; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest.java new file mode 100644 index 0000000..a674d76 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest.java @@ -0,0 +1,23 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache; + +import org.gridgain.grid.cache.store.*; + +/** + * + */ +public class IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest extends + IgniteCacheAtomicPrimaryWriteOrderInvokeTest { + /** {@inheritDoc} */ + @Override protected GridCacheStore<?, ?> cacheStore() { + return new TestStore(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java new file mode 100644 index 0000000..1da627b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java @@ -0,0 +1,367 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.gridgain.grid.cache.*; +import org.gridgain.testframework.*; +import org.jetbrains.annotations.*; + +import javax.cache.processor.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * + */ +public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractTest { + /** */ + private Integer lastKey = 0; + + /** + * @throws Exception If failed. + */ + public void testInvoke() throws Exception { + // TODO IGNITE41 test with forceTransformBackups. + + final IgniteCache<Integer, Integer> cache = jcache(); + + IncrementProcessor incProcessor = new IncrementProcessor(); + + for (final Integer key : keys()) { + log.info("Test invoke [key=" + key + ']'); + + cache.remove(key); + + Integer res = cache.invoke(key, incProcessor); + + assertEquals(-1, (int)res); + + checkValue(key, 1); + + res = cache.invoke(key, incProcessor); + + assertEquals(1, (int)res); + + checkValue(key, 2); + + res = cache.invoke(key, incProcessor); + + assertEquals(2, (int)res); + + checkValue(key, 3); + + res = cache.invoke(key, new ArgumentsSumProcessor(), 10, 20, 30); + + assertEquals(3, (int)res); + + checkValue(key, 63); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.invoke(key, new ExceptionProcessor(63)); + + return null; + } + }, EntryProcessorException.class, "Test processor exception."); + + checkValue(key, 63); + + assertNull(cache.invoke(key, new RemoveProcessor(63))); + + checkValue(key, null); + } + } + + /** + * @throws Exception If failed. + */ + public void testInvokeAll() throws Exception { + IgniteCache<Integer, Integer> cache = jcache(); + + invokeAll(cache, new HashSet<>(primaryKeys(cache, 3, 0))); + + invokeAll(cache, new HashSet<>(backupKeys(cache, 3, 0))); + + invokeAll(cache, new HashSet<>(nearKeys(cache, 3, 0))); + + Set<Integer> keys = new HashSet<>(); + + keys.addAll(primaryKeys(jcache(0), 3, 0)); + keys.addAll(primaryKeys(jcache(1), 3, 0)); + keys.addAll(primaryKeys(jcache(2), 3, 0)); + + invokeAll(cache, keys); + + keys = new HashSet<>(); + + for (int i = 0; i < 1000; i++) + keys.add(i); + + invokeAll(cache, keys); + } + + /** + * @param cache Cache. + * @param keys Keys. + */ + private void invokeAll(IgniteCache<Integer, Integer> cache, Set<Integer> keys) { + cache.removeAll(keys); + + log.info("Test invokeAll [keys=" + keys + ']'); + + IncrementProcessor incProcessor = new IncrementProcessor(); + + Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, incProcessor); + + Map<Object, Object> exp = new HashMap<>(); + + for (Integer key : keys) + exp.put(key, -1); + + checkResult(resMap, exp); + + for (Integer key : keys) + checkValue(key, 1); + + resMap = cache.invokeAll(keys, incProcessor); + + exp = new HashMap<>(); + + for (Integer key : keys) + exp.put(key, 1); + + checkResult(resMap, exp); + + for (Integer key : keys) + checkValue(key, 2); + + resMap = cache.invokeAll(keys, new ArgumentsSumProcessor(), 10, 20, 30); + + for (Integer key : keys) + exp.put(key, 3); + + checkResult(resMap, exp); + + for (Integer key : keys) + checkValue(key, 62); + + resMap = cache.invokeAll(keys, new ExceptionProcessor(null)); + + for (Integer key : keys) { + final EntryProcessorResult<Integer> res = resMap.get(key); + + assertNotNull("No result for " + key); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + res.get(); + + return null; + } + }, EntryProcessorException.class, "Test processor exception."); + } + + for (Integer key : keys) + checkValue(key, 62); + + resMap = cache.invokeAll(keys, new RemoveProcessor(null)); + + for (Integer key : keys) { + final EntryProcessorResult<Integer> res = resMap.get(key); + + assertNotNull("No result for " + key); + + assertNull(res.get()); + } + + for (Integer key : keys) + checkValue(key, null); + } + + /** + * @param resMap Result map. + * @param exp Expected results. + */ + private void checkResult(Map<Integer, EntryProcessorResult<Integer>> resMap, Map<Object, Object> exp) { + assertNotNull(resMap); + + assertEquals(exp.size(), resMap.size()); + + for (Map.Entry<Object, Object> expVal : exp.entrySet()) { + EntryProcessorResult<Integer> res = resMap.get(expVal.getKey()); + + assertNotNull("No result for " + expVal.getKey()); + + assertEquals("Unexpected result for " + expVal.getKey(), res.get(), expVal.getValue()); + } + } + + /** + * @param key Key. + * @param expVal Expected value. + */ + protected void checkValue(Object key, @Nullable Object expVal) { + if (expVal != null) { + for (int i = 0; i < gridCount(); i++) { + IgniteCache<Object, Object> cache = jcache(i); + + Object val = cache.localPeek(key); + + if (val == null) + assertFalse(cache(0).affinity().isPrimaryOrBackup(ignite(i).cluster().localNode(), key)); + else + assertEquals("Unexpected value for grid " + i, expVal, val); + } + } + else { + for (int i = 0; i < gridCount(); i++) { + IgniteCache<Object, Object> cache = jcache(i); + + assertNull("Unexpected non null value for grid " + i, cache.localPeek(key)); + } + } + } + + /** + * @return Test keys. + * @throws Exception If failed. + */ + private Collection<Integer> keys() throws Exception { + GridCache<Integer, Object> cache = cache(0); + + ArrayList<Integer> keys = new ArrayList<>(); + + keys.add(primaryKeys(cache, 1, lastKey).get(0)); + + if (gridCount() > 1) { + keys.add(backupKeys(cache, 1, lastKey).get(0)); + + if (cache.configuration().getCacheMode() != REPLICATED) + keys.add(nearKeys(cache, 1, lastKey).get(0)); + } + + lastKey = Collections.max(keys) + 1; + + return keys; + } + + /** + * + */ + private static class ArgumentsSumProcessor implements EntryProcessor<Integer, Integer, Integer> { + /** {@inheritDoc} */ + @Override public Integer process(MutableEntry<Integer, Integer> e, Object... args) + throws EntryProcessorException { + assertEquals(3, args.length); + assertEquals(10, args[0]); + assertEquals(20, args[1]); + assertEquals(30, args[2]); + + assertTrue(e.exists()); + + Integer res = e.getValue(); + + for (Object arg : args) + res += (Integer)arg; + + e.setValue(res); + + return args.length; + } + } + + /** + * + */ + private static class IncrementProcessor implements EntryProcessor<Integer, Integer, Integer> { + /** {@inheritDoc} */ + @Override public Integer process(MutableEntry<Integer, Integer> e, + Object... arguments) throws EntryProcessorException { + if (e.exists()) { + Integer val = e.getValue(); + + assertNotNull(val); + + e.setValue(val + 1); + + assertTrue(e.exists()); + + assertEquals(val + 1, (int) e.getValue()); + + return val; + } + else { + e.setValue(1); + + return -1; + } + } + } + + /** + * + */ + private static class RemoveProcessor implements EntryProcessor<Integer, Integer, Integer> { + /** */ + private Integer expVal; + + /** + * @param expVal Expected value. + */ + RemoveProcessor(@Nullable Integer expVal) { + this.expVal = expVal; + } + + /** {@inheritDoc} */ + @Override public Integer process(MutableEntry<Integer, Integer> e, + Object... arguments) throws EntryProcessorException { + assertTrue(e.exists()); + + if (expVal != null) + assertEquals(expVal, e.getValue()); + + e.remove(); + + assertFalse(e.exists()); + + return null; + } + } + + /** + * + */ + private static class ExceptionProcessor implements EntryProcessor<Integer, Integer, Integer> { + /** */ + private Integer expVal; + + /** + * @param expVal Expected value. + */ + ExceptionProcessor(@Nullable Integer expVal) { + this.expVal = expVal; + } + + /** {@inheritDoc} */ + @Override public Integer process(MutableEntry<Integer, Integer> e, + Object... arguments) throws EntryProcessorException { + assertTrue(e.exists()); + + if (expVal != null) + assertEquals(expVal, e.getValue()); + + throw new EntryProcessorException("Test processor exception."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java index 64218d2..6f9d32c 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java @@ -447,6 +447,7 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme GridCacheOperation op, @Nullable Object val, @Nullable byte[] valBytes, + @Nullable Object[] invokeArgs, boolean writeThrough, boolean retval, @Nullable IgniteCacheExpiryPolicy expiryPlc, @@ -464,7 +465,15 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme UUID subjId, String taskName) throws IgniteCheckedException, GridCacheEntryRemovedException { - return new GridCacheUpdateAtomicResult<>(true, rawPut((V)val, 0), (V)val, 0L, 0L, null, null, true); + return new GridCacheUpdateAtomicResult<>(true, + rawPut((V)val, 0), + (V)val, + null, + 0L, + 0L, + null, + null, + true); } /** @inheritDoc */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java index 8e858da..9fc3ff7 100644 --- a/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java @@ -24,6 +24,7 @@ import org.gridgain.grid.kernal.processors.cache.distributed.near.*; import org.gridgain.grid.kernal.processors.cache.local.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.testframework.*; import org.gridgain.testframework.junits.*; import org.jetbrains.annotations.*; @@ -429,6 +430,48 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { } /** + * @param cache Cache. + * @param cnt Keys count. + * @param startFrom Start value for keys search. + * @return Collection of keys for which given cache is primary. + * @throws IgniteCheckedException If failed. + */ + protected List<Integer> primaryKeys(IgniteCache<?, ?> cache, int cnt, int startFrom) + throws IgniteCheckedException { + GridCacheProjection<?, ?> prj = GridTestUtils.getFieldValue(cache, "delegate"); + + return primaryKeys(prj, cnt, startFrom); + } + + /** + * @param cache Cache. + * @param cnt Keys count. + * @param startFrom Start value for keys search. + * @return Collection of keys for which given cache is backup. + * @throws IgniteCheckedException If failed. + */ + protected List<Integer> backupKeys(IgniteCache<?, ?> cache, int cnt, int startFrom) + throws IgniteCheckedException { + GridCacheProjection<?, ?> prj = GridTestUtils.getFieldValue(cache, "delegate"); + + return backupKeys(prj, cnt, startFrom); + } + + /** + * @param cache Cache. + * @param cnt Keys count. + * @param startFrom Start value for keys search. + * @return Collection of keys for which given cache is neither primary nor backup. + * @throws IgniteCheckedException If failed. + */ + protected List<Integer> nearKeys(IgniteCache<?, ?> cache, int cnt, int startFrom) + throws IgniteCheckedException { + GridCacheProjection<?, ?> prj = GridTestUtils.getFieldValue(cache, "delegate"); + + return nearKeys(prj, cnt, startFrom); + } + + /** * @param comp Compute. * @param task Task. * @param arg Task argument.