Repository: incubator-ignite Updated Branches: refs/heads/ignite-44 928aa3d48 -> bcb30d104
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java index 844fc5c..3c2d32c 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java @@ -9,12 +9,14 @@ package org.gridgain.grid.kernal.processors.cache; -import org.apache.ignite.lang.*; +import org.apache.ignite.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.store.*; import org.gridgain.testframework.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; +import java.io.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -51,8 +53,8 @@ public abstract class GridCacheMultinodeUpdateAbstractSelfTest extends GridCache /** * @throws Exception If failed. */ - public void testTransform() throws Exception { - GridCache<Integer, Integer> cache = grid(0).cache(null); + public void testInvoke() throws Exception { + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); final Integer key = primaryKey(cache); @@ -72,16 +74,16 @@ public abstract class GridCacheMultinodeUpdateAbstractSelfTest extends GridCache @Override public Void call() throws Exception { int idx = gridIdx.incrementAndGet() - 1; - final GridCache<Integer, Integer> cache = grid(idx).cache(null); + final IgniteCache<Integer, Integer> cache = grid(idx).jcache(null); for (int i = 0; i < ITERATIONS_PER_THREAD && !failed; i++) - cache.transform(key, new IncClosure()); + cache.invoke(key, new IncProcessor()); return null; } - }, THREADS, "transform"); + }, THREADS, "invoke"); - assertFalse("Got null in transform.", failed); + assertFalse("Got null in processor.", failed); expVal += ITERATIONS_PER_THREAD * THREADS; @@ -103,18 +105,22 @@ public abstract class GridCacheMultinodeUpdateAbstractSelfTest extends GridCache /** * */ - protected static class IncClosure implements IgniteClosure<Integer, Integer> { + protected static class IncProcessor implements EntryProcessor<Integer, Integer, Void>, Serializable { /** {@inheritDoc} */ - @Override public Integer apply(Integer val) { + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + Integer val = e.getValue(); + if (val == null) { failed = true; - System.out.println(Thread.currentThread() + " got null in transform: " + val); + System.out.println(Thread.currentThread() + " got null in processor: " + val); return null; } - return val + 1; + e.setValue(val + 1); + + return null; } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java index e1f5c5a..5a97b7f 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java @@ -9,10 +9,13 @@ package org.gridgain.grid.kernal.processors.cache; +import org.apache.ignite.*; import org.apache.ignite.lang.*; import org.gridgain.grid.cache.*; import org.gridgain.testframework.*; +import javax.cache.processor.*; +import java.io.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -76,7 +79,7 @@ public abstract class GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest extend * @throws Exception If failed. */ private void testTransform(final Integer key) throws Exception { - final GridCache<Integer, Integer> cache = grid(0).cache(null); + final IgniteCache<Integer, Integer> cache = grid(0).jcache(null); cache.put(key, 0); @@ -89,7 +92,7 @@ public abstract class GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest extend if (i % 500 == 0) log.info("Iteration " + i); - cache.transform(key, new IncClosure()); + cache.invoke(key, new IncProcessor()); } return null; @@ -339,23 +342,29 @@ public abstract class GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest extend } /** + * */ - protected static class IncClosure implements IgniteClosure<Integer, Integer> { + protected static class IncProcessor implements EntryProcessor<Integer, Integer, Void>, Serializable { /** {@inheritDoc} */ - @Override public Integer apply(Integer val) { + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + Integer val = e.getValue(); + if (val == null) { failed = true; - System.out.println(Thread.currentThread() + " got null in transform: " + val); + System.out.println(Thread.currentThread() + " got null in processor: " + val); return null; } - return val + 1; + e.setValue(val + 1); + + return null; } } /** + * */ protected static class TestFilter implements IgnitePredicate<GridCacheEntry<Integer, Integer>> { /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java index f84e9c7..baa03c2 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.cache; +import org.apache.ignite.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; import org.gridgain.testframework.*; @@ -48,7 +49,7 @@ public class GridCacheOffHeapMultiThreadedUpdateSelfTest extends GridCacheOffHea * @throws Exception If failed. */ private void testTransformTx(final Integer key, final IgniteTxConcurrency txConcurrency) throws Exception { - final GridCache<Integer, Integer> cache = grid(0).cache(null); + final IgniteCache<Integer, Integer> cache = grid(0).jcache(null); cache.put(key, 0); @@ -57,12 +58,14 @@ public class GridCacheOffHeapMultiThreadedUpdateSelfTest extends GridCacheOffHea GridTestUtils.runMultiThreaded(new Callable<Void>() { @Override public Void call() throws Exception { + IgniteTransactions txs = ignite(0).transactions(); + for (int i = 0; i < ITERATIONS_PER_THREAD && !failed; i++) { if (i % 500 == 0) log.info("Iteration " + i); - try (IgniteTx tx = cache.txStart(txConcurrency, REPEATABLE_READ)) { - cache.transform(key, new IncClosure()); + try (IgniteTx tx = txs.txStart(txConcurrency, REPEATABLE_READ)) { + cache.invoke(key, new IncProcessor()); tx.commit(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java index 4456346..0799180 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.cache; +import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; import org.apache.ignite.portables.*; @@ -18,6 +19,7 @@ import org.gridgain.grid.util.typedef.*; import org.jetbrains.annotations.*; import org.junit.*; +import javax.cache.processor.*; import java.util.*; import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*; @@ -92,48 +94,61 @@ public abstract class GridCacheOffHeapTieredAbstractSelfTest extends GridCacheAb * @throws Exception If failed. */ private void checkTransform(Integer key) throws Exception { - GridCache<Integer, Integer> c = grid(0).cache(null); + IgniteCache<Integer, Integer> c = grid(0).jcache(null); + + c.invoke(key, new EntryProcessor<Integer, Integer, Void>() { + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + Integer val = e.getValue(); - c.transform(key, new C1<Integer, Integer>() { - @Override public Integer apply(Integer val) { assertNull("Unexpected value: " + val, val); return null; } }); - c.putx(key, 1); + c.put(key, 1); + + c.invoke(key, new EntryProcessor<Integer, Integer, Void>() { + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + Integer val = e.getValue(); - c.transform(key, new C1<Integer, Integer>() { - @Override public Integer apply(Integer val) { assertNotNull("Unexpected value: " + val, val); assertEquals((Integer) 1, val); - return val + 1; + e.setValue(val + 1); + + return null; } }); assertEquals((Integer)2, c.get(key)); - c.transform(key, new C1<Integer, Integer>() { - @Override public Integer apply(Integer val) { + c.invoke(key, new EntryProcessor<Integer, Integer, Void>() { + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + Integer val = e.getValue(); + assertNotNull("Unexpected value: " + val, val); assertEquals((Integer)2, val); - return val; + e.setValue(val); + + return null; } }); - assertEquals((Integer) 2, c.get(key)); + assertEquals((Integer)2, c.get(key)); + + c.invoke(key, new EntryProcessor<Integer, Integer, Void>() { + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + Integer val = e.getValue(); - c.transform(key, new C1<Integer, Integer>() { - @Override - public Integer apply(Integer val) { assertNotNull("Unexpected value: " + val, val); - assertEquals((Integer) 2, val); + assertEquals((Integer)2, val); + + e.remove(); return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java index b25fcbd..d4eb179 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java @@ -9,14 +9,16 @@ package org.gridgain.grid.kernal.processors.cache; +import org.apache.ignite.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; import org.apache.ignite.portables.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.testframework.*; +import javax.cache.processor.*; +import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -171,7 +173,7 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri * @throws Exception If failed. */ public void testTransform() throws Exception { - final GridCache<Integer, Object> cache = grid(0).cache(null); + final IgniteCache<Integer, Object> cache = grid(0).jcache(null); GridTestUtils.runMultiThreaded(new Callable<Void>() { @Override public Void call() throws Exception { @@ -182,9 +184,9 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri final TestValue val = vals.get(key % VAL_SIZE); - TestClosure c = testClosure(val.val, false); + TestProcessor c = testClosure(val.val, false); - cache.transform(key, c); + cache.invoke(key, c); } return null; @@ -208,7 +210,7 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri * @param acceptNull If {@code true} value can be null; * @return Predicate. */ - private TestClosure testClosure(String expVal, boolean acceptNull) { + private TestProcessor testClosure(String expVal, boolean acceptNull) { return portableEnabled() ? new PortableValueClosure(expVal, acceptNull) : new TestValueClosure(expVal, acceptNull); @@ -326,7 +328,7 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri /** * */ - protected abstract static class TestClosure implements IgniteClosure<Object, Object> { + protected abstract static class TestProcessor implements EntryProcessor<Integer, Object, Void>, Serializable { /** */ protected String expVal; @@ -337,23 +339,29 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri * @param expVal Expected value. * @param acceptNull If {@code true} value can be null; */ - protected TestClosure(String expVal, boolean acceptNull) { + protected TestProcessor(String expVal, boolean acceptNull) { this.expVal = expVal; this.acceptNull = acceptNull; } /** {@inheritDoc} */ - @Override public final Object apply(Object val) { + @Override public Void process(MutableEntry<Integer, Object> e, Object... args) { + Object val = e.getValue(); + if (val == null) { if (!acceptNull) assertNotNull(val); - return true; + e.setValue(true); + + return null; } checkValue(val); - return val; + e.setValue(val); + + return null; } /** @@ -366,7 +374,7 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri * */ @SuppressWarnings("PackageVisibleInnerClass") - static class PortableValueClosure extends TestClosure { + static class PortableValueClosure extends TestProcessor { /** * @param expVal Expected value. * @param acceptNull If {@code true} value can be null; @@ -387,7 +395,7 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri * */ @SuppressWarnings("PackageVisibleInnerClass") - static class TestValueClosure extends TestClosure { + static class TestValueClosure extends TestProcessor { /** * @param expVal Expected value. * @param acceptNull If {@code true} value can be null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java index 4b88530..b65fcad 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java @@ -9,11 +9,12 @@ package org.gridgain.grid.kernal.processors.cache; +import org.apache.ignite.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; import org.gridgain.grid.cache.*; import org.gridgain.testframework.junits.common.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; @@ -104,6 +105,9 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest } /** + * @param mode Atomicity mode. + * @param order Atomic cache write order mode. + * @param b Number of backups. * @throws Exception If failed. */ private void checkTransform(GridCacheAtomicityMode mode, GridCacheAtomicWriteOrderMode order, int b) @@ -126,7 +130,7 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest failDeserialization = false; // Get client grid. - GridCacheProjection<Integer, TestObject> cache = grid(2).cache(null); + IgniteCache<Integer, TestObject> cache = grid(2).jcache(null); if (backups > 0 && atomicityMode == ATOMIC) cache = cache.flagsOn(FORCE_TRANSFORM_BACKUP); @@ -138,17 +142,17 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest info(">>>>>> Transforming"); - // Transform (check non-existent keys also. + // Transform (check non-existent keys also). for (int i = 0; i < 200; i++) - cache.transform(i, new Transform()); + cache.invoke(i, new Transform()); - Map<Integer, Transform> transformMap = new HashMap<>(); + Set<Integer> keys = new HashSet<>(); // Check transformAll. for (int i = 0; i < 300; i++) - transformMap.put(i, new Transform()); + keys.add(i); - cache.transformAll(transformMap); + cache.invokeAll(keys, new Transform()); // Avoid errors during stop. failDeserialization = false; @@ -158,10 +162,15 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest } } - private static class Transform implements IgniteClosure<TestObject, TestObject> { + /** + * + */ + private static class Transform implements EntryProcessor<Integer, TestObject, Void>, Serializable { /** {@inheritDoc} */ - @Override public TestObject apply(TestObject testObject) { - return new TestObject(); + @Override public Void process(MutableEntry<Integer, TestObject> entry, Object... args) { + entry.setValue(new TestObject()); + + return null; } } @@ -169,7 +178,11 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest * */ private static class TestObject implements Externalizable { + /** + * + */ public TestObject() { + // No-op. } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java index 6f9d32c..af729a1 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java @@ -423,9 +423,10 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ - @Override public IgniteBiTuple<Boolean, V> innerUpdateLocal(GridCacheVersion ver, + @Override public IgniteBiTuple<Boolean, Object> innerUpdateLocal(GridCacheVersion ver, GridCacheOperation op, @Nullable Object writeObj, + @Nullable Object[] invokeArgs, boolean writeThrough, boolean retval, @Nullable ExpiryPolicy expiryPlc, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java index 0be91f0..8cf48bf 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java @@ -21,6 +21,8 @@ import org.gridgain.grid.kernal.processors.cache.distributed.near.*; import org.gridgain.testframework.*; import org.jetbrains.annotations.*; +import javax.cache.*; +import javax.cache.processor.*; import java.util.*; import java.util.concurrent.*; @@ -417,17 +419,21 @@ public abstract class IgniteTxExceptionAbstractSelfTest extends GridCacheAbstrac info("Going to transform: " + key); - GridTestUtils.assertThrows(log, new Callable<Void>() { + Throwable e = GridTestUtils.assertThrows(log, new Callable<Void>() { @Override public Void call() throws Exception { - grid(0).cache(null).transform(key, new IgniteClosure<Object, Object>() { - @Override public Object apply(Object o) { - return 2; + grid(0).<Integer, Integer>jcache(null).invoke(key, new EntryProcessor<Integer, Integer, Void>() { + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + e.setValue(2); + + return null; } }); return null; } - }, IgniteTxHeuristicException.class, null); + }, CacheException.class, null); + + assertTrue("Unexpected cause: " +e, e.getCause() instanceof IgniteTxHeuristicException); checkEmpty(key); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java index 39eb728..a46353e 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java @@ -21,6 +21,8 @@ import org.gridgain.grid.kernal.processors.cache.distributed.near.*; import org.gridgain.testframework.*; import org.jetbrains.annotations.*; +import javax.cache.*; +import javax.cache.processor.*; import java.util.*; import java.util.concurrent.*; @@ -422,17 +424,21 @@ public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb info("Going to transform: " + key); - GridTestUtils.assertThrows(log, new Callable<Void>() { + Throwable e = GridTestUtils.assertThrows(log, new Callable<Void>() { @Override public Void call() throws Exception { - grid(0).cache(null).transform(key, new IgniteClosure<Object, Object>() { - @Override public Object apply(Object o) { - return 2; + grid(0).<Integer, Integer>jcache(null).invoke(key, new EntryProcessor<Integer, Integer, Void>() { + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + e.setValue(2); + + return null; } }); return null; } - }, IgniteTxRollbackException.class, null); + }, CacheException.class, null); + + assertTrue("Unexpected cause: " + e, e.getCause() instanceof IgniteTxRollbackException); checkValue(key, putBefore); } @@ -588,41 +594,48 @@ public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb this.fail = fail; } - + /** {@inheritDoc} */ @Nullable @Override public Object load(@Nullable IgniteTx tx, Object key) throws IgniteCheckedException { return null; } + /** {@inheritDoc} */ @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args) throws IgniteCheckedException { if (fail) throw new IgniteCheckedException("Store exception"); } + /** {@inheritDoc} */ @Override public void loadAll(@Nullable IgniteTx tx, Collection<?> keys, IgniteBiInClosure<Object, Object> c) throws IgniteCheckedException { } + /** {@inheritDoc} */ @Override public void put(@Nullable IgniteTx tx, Object key, Object val) throws IgniteCheckedException { if (fail) throw new IgniteCheckedException("Store exception"); } + /** {@inheritDoc} */ @Override public void putAll(@Nullable IgniteTx tx, Map<?, ?> map) throws IgniteCheckedException { if (fail) throw new IgniteCheckedException("Store exception"); } + /** {@inheritDoc} */ @Override public void remove(@Nullable IgniteTx tx, Object key) throws IgniteCheckedException { if (fail) throw new IgniteCheckedException("Store exception"); } + /** {@inheritDoc} */ @Override public void removeAll(@Nullable IgniteTx tx, Collection<?> keys) throws IgniteCheckedException { if (fail) throw new IgniteCheckedException("Store exception"); } + /** {@inheritDoc} */ @Override public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException { if (fail && commit) throw new IgniteCheckedException("Store exception"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTransformEventSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTransformEventSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTransformEventSelfTest.java index 5c7ba54..1ced531 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTransformEventSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTransformEventSelfTest.java @@ -19,9 +19,12 @@ import org.gridgain.grid.cache.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.testframework.junits.common.*; +import javax.cache.processor.*; +import java.io.*; import java.util.*; import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*; @@ -68,7 +71,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest { private UUID[] ids; /** Caches. */ - private GridCache<Integer, Integer>[] caches; + private IgniteCache<Integer, Integer>[] caches; /** Recorded events.*/ private ConcurrentHashSet<IgniteCacheEvent> evts; @@ -157,14 +160,14 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest { ignites = new Ignite[GRID_CNT]; ids = new UUID[GRID_CNT]; - caches = new GridCache[GRID_CNT]; + caches = new IgniteCache[GRID_CNT]; for (int i = 0; i < GRID_CNT; i++) { ignites[i] = grid(i); ids[i] = ignites[i].cluster().localNode().id(); - caches[i] = ignites[i].cache(CACHE_NAME); + caches[i] = ignites[i].jcache(CACHE_NAME); ignites[i].events().localListen(new IgnitePredicate<IgniteEvent>() { @Override public boolean apply(IgniteEvent evt) { @@ -184,7 +187,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest { int key = 0; while (true) { - if (cacheMode != PARTITIONED || (caches[0].entry(key).primary() && caches[1].entry(key).backup())) { + if (cacheMode != PARTITIONED || (primary(0, key) && backup(1, key))) { key1 = key++; break; @@ -194,7 +197,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest { } while (true) { - if (cacheMode != PARTITIONED || (caches[0].entry(key).primary() && caches[1].entry(key).backup())) { + if (cacheMode != PARTITIONED || (primary(0, key) && backup(1, key))) { key2 = key; break; @@ -226,6 +229,28 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest { } /** + * @param gridIdx Grid index. + * @param key Key. + * @return {@code True} if grid is primary for given key. + */ + private boolean primary(int gridIdx, Object key) { + GridCacheAffinity<Object> aff = grid(0).cache(CACHE_NAME).affinity(); + + return aff.isPrimary(grid(gridIdx).cluster().localNode(), key); + } + + /** + * @param gridIdx Grid index. + * @param key Key. + * @return {@code True} if grid is primary for given key. + */ + private boolean backup(int gridIdx, Object key) { + GridCacheAffinity<Object> aff = grid(0).cache(CACHE_NAME).affinity(); + + return aff.isBackup(grid(gridIdx).cluster().localNode(), key); + } + + /** * Test TRANSACTIONAL LOCAL cache with OPTIMISTIC/REPEATABLE_READ transaction. * * @throws Exception If failed. @@ -423,13 +448,13 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest { private void checkAtomic(GridCacheMode cacheMode) throws Exception { initialize(cacheMode, ATOMIC, null, null); - caches[0].transform(key1, new Transformer()); + caches[0].invoke(key1, new Transformer()); checkEventNodeIdsStrict(primaryIdsForKeys(key1)); assert evts.isEmpty(); - caches[0].transformAll(keys, new Transformer()); + caches[0].invokeAll(keys, new Transformer()); checkEventNodeIdsStrict(primaryIdsForKeys(key1, key2)); } @@ -449,7 +474,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest { System.out.println("BEFORE: " + evts.size()); - caches[0].transform(key1, new Transformer()); + caches[0].invoke(key1, new Transformer()); System.out.println("AFTER: " + evts.size()); @@ -457,7 +482,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest { assert evts.isEmpty(); - caches[0].transformAll(keys, new Transformer()); + caches[0].invokeAll(keys, new Transformer()); checkEventNodeIdsStrict(idsForKeys(key1, key2)); } @@ -500,9 +525,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest { else if (cacheMode == PARTITIONED) { for (int key : keys) { for (int i = 0; i < GRID_CNT; i++) { - GridCacheEntry<Integer, Integer> entry = caches[i].entry(key); - - if (entry.primary() || (!primaryOnly && entry.backup())) + if (primary(i, key) || (!primaryOnly && backup(i, key))) res.add(ids[i]); } } @@ -510,7 +533,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest { else if (cacheMode == REPLICATED) { for (int key : keys) { if (primaryOnly) - res.add(caches[0].affinity().mapKeyToNode(key).id()); + res.add(grid(0).cache(CACHE_NAME).affinity().mapKeyToNode(key).id()); else res.addAll(Arrays.asList(ids)); } @@ -544,22 +567,19 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest { } if (foundEvt == null) { - GridCache<Integer, Integer> affectedCache = null; + int gridIdx = -1; for (int i = 0; i < GRID_CNT; i++) { if (F.eq(this.ids[i], id)) { - affectedCache = caches[i]; + gridIdx = i; break; } } - GridCacheEntry<Integer, Integer> entry1 = affectedCache.entry(key1); - GridCacheEntry<Integer, Integer> entry2 = affectedCache.entry(key2); - fail("Expected transform event was not triggered on the node [nodeId=" + id + - ", key1Primary=" + entry1.primary() + ", key1Backup=" + entry1.backup() + - ", key2Primary=" + entry2.primary() + ", key2Backup=" + entry2.backup() + ']'); + ", key1Primary=" + primary(gridIdx, key1) + ", key1Backup=" + backup(gridIdx, key1) + + ", key2Primary=" + primary(gridIdx, key2) + ", key2Backup=" + backup(gridIdx, key2) + ']'); } else evts.remove(foundEvt); @@ -570,10 +590,12 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest { /** * Transform closure. */ - private static class Transformer implements IgniteClosure<Integer, Integer> { + private static class Transformer implements EntryProcessor<Integer, Integer, Void>, Serializable { /** {@inheritDoc} */ - @Override public Integer apply(Integer val) { - return ++val; + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + e.setValue(e.getValue() + 1); + + return null; } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java index 2227e56..2b1fdff 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java @@ -9,14 +9,15 @@ package org.gridgain.grid.kernal.processors.cache.distributed; +import org.apache.ignite.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.util.typedef.*; import org.gridgain.testframework.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -58,7 +59,7 @@ public abstract class IgniteTxPreloadAbstractTest extends GridCacheAbstractSelfT * @throws Exception If failed. */ public void testRemoteTxPreloading() throws Exception { - GridCache<String, Integer> cache = cache(0); + IgniteCache<String, Integer> cache = jcache(0); for (int i = 0; i < 10000; i++) cache.put(String.valueOf(i), 0); @@ -86,12 +87,21 @@ public abstract class IgniteTxPreloadAbstractTest extends GridCacheAbstractSelfT for (int i = 0; i < 10; i++) keys.add(String.valueOf(i * 1000)); - cache.transformAll(keys, new C1<Integer, Integer>() { - @Override public Integer apply(Integer val) { - if (val == null) + cache.invokeAll(keys, new EntryProcessor<String, Integer, Void>() { + @Override public Void process(MutableEntry<String, Integer> e, Object... args) { + Integer val = e.getValue(); + + if (val == null) { keyNotLoaded = true; - return val + 1; + e.setValue(1); + + return null; + } + + e.setValue(val + 1); + + return null; } }); @@ -135,7 +145,7 @@ public abstract class IgniteTxPreloadAbstractTest extends GridCacheAbstractSelfT for (int i = 0; i < 10000; i++) map.put(String.valueOf(i), 0); - GridCache<String, Integer> cache0 = cache(0); + IgniteCache<String, Integer> cache0 = jcache(0); cache0.putAll(map); @@ -148,18 +158,26 @@ public abstract class IgniteTxPreloadAbstractTest extends GridCacheAbstractSelfT startGrid(i); - GridCache<String, Integer> cache = cache(i); + IgniteCache<String, Integer> cache = jcache(i); + + IgniteTransactions txs = ignite(i).transactions(); + + try (IgniteTx tx = txs.txStart(txConcurrency, IgniteTxIsolation.READ_COMMITTED)) { + cache.invoke(TX_KEY, new EntryProcessor<String, Integer, Void>() { + @Override public Void process(MutableEntry<String, Integer> e, Object... args) { + Integer val = e.getValue(); - try (IgniteTx tx = cache.txStart(txConcurrency, IgniteTxIsolation.READ_COMMITTED)) { - cache.transform(TX_KEY, new C1<Integer, Integer>() { - @Override public Integer apply(Integer val) { if (val == null) { keyNotLoaded = true; - return 1; + e.setValue(1); + + return null; } - return val + 1; + e.setValue(val + 1); + + return null; } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java index 5635ac8..d4d3396 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java @@ -9,8 +9,9 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht; +import org.apache.ignite.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; @@ -19,6 +20,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.gridgain.testframework.junits.common.*; +import javax.cache.processor.*; import java.util.*; import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; @@ -55,16 +57,23 @@ public abstract class GridCacheAbstractTransformWriteThroughSelfTest extends Gri /** IP finder. */ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - /** Value increment closure. */ - private static final IgniteClosure<Integer, Integer> INCR_CLOS = new IgniteClosure<Integer, Integer>() { - @Override public Integer apply(Integer src) { - return src == null ? 1 : src + 1; + /** Value increment processor. */ + private static final EntryProcessor<String, Integer, Void> INCR_CLOS = new EntryProcessor<String, Integer, Void>() { + @Override public Void process(MutableEntry<String, Integer> e, Object... args) { + if (!e.exists()) + e.setValue(1); + else + e.setValue(e.getValue() + 1); + + return null; } }; - /** Value remove closure. */ - private static final IgniteClosure<Integer, Integer> RMV_CLOS = new IgniteClosure<Integer, Integer>() { - @Override public Integer apply(Integer src) { + /** Value remove processor. */ + private static final EntryProcessor<String, Integer, Void> RMV_CLOS = new EntryProcessor<String, Integer, Void>() { + @Override public Void process(MutableEntry<String, Integer> e, Object... args) { + e.remove(); + return null; } }; @@ -82,6 +91,8 @@ public abstract class GridCacheAbstractTransformWriteThroughSelfTest extends Gri @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); discoSpi.setIpFinder(IP_FINDER); @@ -221,7 +232,7 @@ public abstract class GridCacheAbstractTransformWriteThroughSelfTest extends Gri * @throws Exception If failed. */ protected void checkTransform(IgniteTxConcurrency concurrency, int nodeType, int op) throws Exception { - GridCacheProjection<String, Integer> cache = cache(0); + IgniteCache<String, Integer> cache = jcache(0); Collection<String> keys = keysForType(nodeType); @@ -233,18 +244,18 @@ public abstract class GridCacheAbstractTransformWriteThroughSelfTest extends Gri nearStore.reset(); for (String key : keys) - cache.clear(key); + cache(0).clear(key); info(">>> Starting transform transaction"); - try (IgniteTx tx = cache.txStart(concurrency, READ_COMMITTED)) { + try (IgniteTx tx = ignite(0).transactions().txStart(concurrency, READ_COMMITTED)) { if (op == OP_UPDATE) { for (String key : keys) - cache.transform(key, INCR_CLOS); + cache.invoke(key, INCR_CLOS); } else { for (String key : keys) - cache.transform(key, RMV_CLOS); + cache.invoke(key, RMV_CLOS); } tx.commit(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java index f84d006..ab2024a 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java @@ -11,8 +11,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.kernal.*; @@ -23,6 +21,8 @@ import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.testframework.junits.common.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; +import java.io.*; import java.util.*; import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*; @@ -255,9 +255,9 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest { Ignite ignite0 = grid(0); - GridCache<Integer, Integer> cache0 = ignite0.cache(null); + IgniteCache<Integer, Integer> cache0 = ignite0.jcache(null); - GridCacheAffinity<Integer> aff = cache0.affinity(); + GridCacheAffinity<Object> aff = cache(0).affinity(); UUID id0 = ignite0.cluster().localNode().id(); @@ -265,7 +265,7 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest { log.info("Transform from primary."); - cache0.transform(primaryKey, new TransformClosure(primaryKey)); + cache0.invoke(primaryKey, new Processor(primaryKey)); for (int i = 0; i < GRID_CNT; i++) checkEntry(grid(i), primaryKey, primaryKey, false); @@ -275,7 +275,7 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest { log.info("Transform from backup."); - cache0.transform(backupKey, new TransformClosure(backupKey)); + cache0.invoke(backupKey, new Processor(backupKey)); for (int i = 0; i < GRID_CNT; i++) checkEntry(grid(i), backupKey, backupKey, false); @@ -285,7 +285,7 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest { log.info("Transform from near."); - cache0.transform(nearKey, new TransformClosure(nearKey)); + cache0.invoke(nearKey, new Processor(nearKey)); for (int i = 0; i < GRID_CNT; i++) { UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[]{id0} : new UUID[]{}; @@ -302,11 +302,11 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest { for (int i = 0; i < GRID_CNT; i++) { delay(); - GridCache<Integer, Integer> cache = grid(i).cache(null); + IgniteCache<Integer, Integer> cache = grid(i).jcache(null); log.info("Transform [grid=" + grid(i).name() + ", val=" + val + ']'); - cache.transform(nearKey, new TransformClosure(val)); + cache.invoke(nearKey, new Processor(val)); if (!aff.isPrimaryOrBackup(grid(i).localNode(), nearKey)) readers.add(grid(i).localNode().id()); @@ -332,53 +332,53 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest { Ignite ignite0 = grid(0); - GridCache<Integer, Integer> cache0 = ignite0.cache(null); + IgniteCache<Integer, Integer> cache0 = ignite0.jcache(null); - GridCacheAffinity<Integer> aff = cache0.affinity(); + GridCacheAffinity<Object> aff = ignite0.cache(null).affinity(); UUID id0 = ignite0.cluster().localNode().id(); - Map<Integer, TransformClosure> primaryKeys = new HashMap<>(); + Set<Integer> primaryKeys = new HashSet<>(); for (int i = 0; i < 10; i++) - primaryKeys.put(key(ignite0, PRIMARY), new TransformClosure(1)); + primaryKeys.add(key(ignite0, PRIMARY)); log.info("TransformAll from primary."); - cache0.transformAll(primaryKeys); + cache0.invokeAll(primaryKeys, new Processor(1)); for (int i = 0; i < GRID_CNT; i++) { - for (Integer primaryKey : primaryKeys.keySet()) + for (Integer primaryKey : primaryKeys) checkEntry(grid(i), primaryKey, 1, false); } if (backups > 0) { - Map<Integer, TransformClosure> backupKeys = new HashMap<>(); + Set<Integer> backupKeys = new HashSet<>(); for (int i = 0; i < 10; i++) - backupKeys.put(key(ignite0, BACKUP), new TransformClosure(2)); + backupKeys.add(key(ignite0, BACKUP)); log.info("TransformAll from backup."); - cache0.transformAll(backupKeys); + cache0.invokeAll(backupKeys, new Processor(2)); for (int i = 0; i < GRID_CNT; i++) { - for (Integer backupKey : backupKeys.keySet()) + for (Integer backupKey : backupKeys) checkEntry(grid(i), backupKey, 2, false); } } - Map<Integer, TransformClosure> nearKeys = new HashMap<>(); + Set<Integer> nearKeys = new HashSet<>(); for (int i = 0; i < 30; i++) - nearKeys.put(key(ignite0, NOT_PRIMARY_AND_BACKUP), new TransformClosure(3)); + nearKeys.add(key(ignite0, NOT_PRIMARY_AND_BACKUP)); log.info("TransformAll from near."); - cache0.transformAll(nearKeys); + cache0.invokeAll(nearKeys, new Processor(3)); for (int i = 0; i < GRID_CNT; i++) { - for (Integer nearKey : nearKeys.keySet()) { + for (Integer nearKey : nearKeys) { UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[]{id0} : new UUID[]{}; checkEntry(grid(i), nearKey, 3, i == 0, expReaders); @@ -387,7 +387,7 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest { Map<Integer, Collection<UUID>> readersMap = new HashMap<>(); - for (Integer key : nearKeys.keySet()) + for (Integer key : nearKeys) readersMap.put(key, new HashSet<UUID>()); int val = 4; @@ -395,22 +395,22 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest { for (int i = 0; i < GRID_CNT; i++) { delay(); - GridCache<Integer, Integer> cache = grid(i).cache(null); + IgniteCache<Integer, Integer> cache = grid(i).jcache(null); - for (Integer key : nearKeys.keySet()) - nearKeys.put(key, new TransformClosure(val)); + for (Integer key : nearKeys) + nearKeys.add(key); log.info("TransformAll [grid=" + grid(i).name() + ", val=" + val + ']'); - cache.transformAll(nearKeys); + cache.invokeAll(nearKeys, new Processor(val)); - for (Integer key : nearKeys.keySet()) { + for (Integer key : nearKeys) { if (!aff.isPrimaryOrBackup(grid(i).localNode(), key)) readersMap.get(key).add(grid(i).localNode().id()); } for (int j = 0; j < GRID_CNT; j++) { - for (Integer key : nearKeys.keySet()) { + for (Integer key : nearKeys) { boolean primaryNode = aff.isPrimary(grid(j).localNode(), key); Collection<UUID> readers = readersMap.get(key); @@ -789,21 +789,23 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest { } /** + * */ - private static class TransformClosure implements IgniteClosure<Integer, Integer> { + private static class Processor implements EntryProcessor<Integer, Integer, Void>, Serializable { /** */ private final Integer newVal; /** * @param newVal New value. */ - private TransformClosure(Integer newVal) { + private Processor(Integer newVal) { this.newVal = newVal; } - /** {@inheritDoc} */ - @Override public Integer apply(Integer val) { - return newVal; + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + e.setValue(newVal); + + return null; } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java index 252ac3a..402cb24 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java @@ -10,13 +10,14 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht.atomic; import org.apache.ignite.*; -import org.apache.ignite.lang.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.testframework.*; import org.jdk8.backport.*; +import javax.cache.processor.*; +import java.io.*; import java.util.concurrent.atomic.*; import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; @@ -62,14 +63,14 @@ public class GridCacheValueConsistencyAtomicSelfTest extends GridCacheValueConsi for (int i = rangeStart; i < rangeStart + range; i++) { int idx = ThreadLocalRandom8.current().nextInt(gridCount()); - GridCacheProjection<Integer, Integer> cache = grid(idx).cache(null); + IgniteCache<Integer, Integer> cache = grid(idx).jcache(null); cache = cache.flagsOn(GridCacheFlag.FORCE_TRANSFORM_BACKUP); - cache.transform(i, new Transformer(i)); + cache.invoke(i, new Transformer(i)); } } - catch (IgniteCheckedException e) { + catch (Exception e) { throw new IgniteException(e); } } @@ -102,20 +103,28 @@ public class GridCacheValueConsistencyAtomicSelfTest extends GridCacheValueConsi /** * */ - private static class Transformer implements IgniteClosure<Integer, Integer> { + private static class Transformer implements EntryProcessor<Integer, Integer, Void>, Serializable { + /** */ private int key; + /** + * @param key Key. + */ private Transformer(int key) { this.key = key; } /** {@inheritDoc} */ - @Override public Integer apply(Integer old) { + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + Integer old = e.getValue(); + if (key < 5) System.err.println(Thread.currentThread().getName() + " <> Transforming value [key=" + key + ", val=" + old + ']'); - return old == null ? 1 : old + 1; + e.setValue(old == null ? 1 : old + 1); + + return null; } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java index 3c56237..96ab5fb 100644 --- a/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java @@ -1402,7 +1402,8 @@ public abstract class GridAbstractTest extends TestCase { int cnt = 0; for (Method m : GridAbstractTest.this.getClass().getMethods()) - if (m.getDeclaringClass().getName().startsWith("org.gridgain")) { + if (m.getDeclaringClass().getName().startsWith("org.gridgain") || + m.getDeclaringClass().getName().startsWith("org.apache.ignite")) { if (m.getName().startsWith("test") && Modifier.isPublic(m.getModifiers())) cnt++; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java index 9fc3ff7..f40d941 100644 --- a/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java @@ -61,6 +61,14 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { /** * @param idx Grid index. + * @return Cache. + */ + protected <K, V> IgniteCache<K, V> jcache(int idx) { + return grid(idx).jcache(null); + } + + /** + * @param idx Grid index. * @param name Cache name. * @return Cache. */ @@ -275,7 +283,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { /** * @param cache Cache. - * @return Collection of keys for which given cache is primary. + * @return Key for which given cache is primary. * @throws IgniteCheckedException If failed. */ protected Integer primaryKey(GridCacheProjection<?, ?> cache) @@ -327,7 +335,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { /** * @param cache Cache. - * @return Collection of keys for which given cache is backup. + * @return Key for which given cache is backup. * @throws IgniteCheckedException If failed. */ protected Integer backupKey(GridCacheProjection<?, ?> cache) @@ -379,7 +387,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { /** * @param cache Cache. - * @return Collection of keys for which given cache is neither primary nor backup. + * @return Keys for which given cache is neither primary nor backup. * @throws IgniteCheckedException If failed. */ protected Integer nearKey(GridCacheProjection<?, ?> cache) @@ -472,6 +480,42 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { } /** + * @param cache Cache. + * @return Collection of keys for which given cache is primary. + * @throws IgniteCheckedException If failed. + */ + protected Integer primaryKey(IgniteCache<?, ?> cache) + throws IgniteCheckedException { + GridCacheProjection<?, ?> prj = GridTestUtils.getFieldValue(cache, "delegate"); + + return primaryKey(prj); + } + + /** + * @param cache Cache. + * @return Keys for which given cache is backup. + * @throws IgniteCheckedException If failed. + */ + protected Integer backupKey(IgniteCache<?, ?> cache) + throws IgniteCheckedException { + GridCacheProjection<?, ?> prj = GridTestUtils.getFieldValue(cache, "delegate"); + + return backupKey(prj); + } + + /** + * @param cache Cache. + * @return Key for which given cache is neither primary nor backup. + * @throws IgniteCheckedException If failed. + */ + protected Integer nearKey(IgniteCache<?, ?> cache) + throws IgniteCheckedException { + GridCacheProjection<?, ?> prj = GridTestUtils.getFieldValue(cache, "delegate"); + + return nearKey(prj); + } + + /** * @param comp Compute. * @param task Task. * @param arg Task argument. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java index e642753..d9a5e22 100644 --- a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java +++ b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java @@ -10,6 +10,7 @@ package org.gridgain.testsuites.bamboo; import junit.framework.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.expiry.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.affinity.fair.*; @@ -31,14 +32,24 @@ import org.gridgain.testsuites.*; */ public class GridDataGridTestSuite extends TestSuite { /** - * @return GridGain TeamCity in-memory data grid test suite. + * @return IgniteCache test suite. * @throws Exception Thrown in case of the failure. */ public static TestSuite suite() throws Exception { - TestSuite suite = new TestSuite("Gridgain In-Memory Data Grid Test Suite"); + TestSuite suite = new TestSuite("IgniteCache Test Suite"); suite.addTest(IgniteCacheExpiryPolicyTestSuite.suite()); + suite.addTestSuite(IgniteCacheAtomicInvokeTest.class); + suite.addTestSuite(IgniteCacheAtomicNearEnabledInvokeTest.class); + suite.addTestSuite(IgniteCacheAtomicPrimaryWriteOrderInvokeTest.class); + suite.addTestSuite(IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest.class); + suite.addTestSuite(IgniteCacheAtomicLocalInvokeTest.class); + suite.addTestSuite(IgniteCacheAtomicLocalWithStoreInvokeTest.class); + suite.addTestSuite(IgniteCacheTxInvokeTest.class); + suite.addTestSuite(IgniteCacheTxNearEnabledInvokeTest.class); + suite.addTestSuite(IgniteCacheTxLocalInvokeTest.class); + // Affinity tests. suite.addTestSuite(GridCachePartitionFairAffinityNodesSelfTest.class); suite.addTestSuite(GridCacheAffinityBackupsSelfTest.class); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java index 75cc266..23d98c1 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java @@ -29,6 +29,8 @@ import org.jdk8.backport.*; import org.jetbrains.annotations.*; import javax.cache.expiry.*; +import javax.cache.processor.*; +import java.io.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -46,10 +48,10 @@ public class GridHadoopJobTracker extends GridHadoopComponent { private final GridMutex mux = new GridMutex(); /** */ - private volatile GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> jobMetaPrj; + private volatile GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> jobMetaPrj; /** Projection with expiry policy for finished job updates. */ - private volatile GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaPrj; + private volatile GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaPrj; /** Map-reduce execution planner. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") @@ -96,8 +98,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @return Job meta projection. */ @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") - private GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> jobMetaCache() { - GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> prj = jobMetaPrj; + private GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> jobMetaCache() { + GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> prj = jobMetaPrj; if (prj == null) { synchronized (mux) { @@ -118,7 +120,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent { throw new IllegalStateException(e); } - jobMetaPrj = prj = sysCache.projection(GridHadoopJobId.class, GridHadoopJobMetadata.class); + jobMetaPrj = prj = (GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata>) + sysCache.projection(GridHadoopJobId.class, GridHadoopJobMetadata.class); if (ctx.configuration().getFinishedJobInfoTtl() > 0) { ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy( @@ -139,8 +142,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent { /** * @return Projection with expiry policy for finished job updates. */ - private GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaCache() { - GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> prj = finishedJobMetaPrj; + private GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaCache() { + GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> prj = finishedJobMetaPrj; if (prj == null) { jobMetaCache(); @@ -430,10 +433,10 @@ public class GridHadoopJobTracker extends GridHadoopComponent { assert state != null || (ctx.jobUpdateLeader() && (info.type() == COMMIT || info.type() == ABORT)): "Missing local state for finished task [info=" + info + ", status=" + status + ']'; - StackedClosure incrCntrs = null; + StackedProcessor incrCntrs = null; if (status.state() == COMPLETED) - incrCntrs = new IncrementCountersClosure(null, status.counters()); + incrCntrs = new IncrementCountersProcessor(null, status.counters()); switch (info.type()) { case SETUP: { @@ -462,9 +465,9 @@ public class GridHadoopJobTracker extends GridHadoopComponent { case COMMIT: case ABORT: { - GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> cache = finishedJobMetaCache(); + GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> cache = finishedJobMetaCache(); - cache.transformAsync(info.jobId(), new UpdatePhaseClosure(incrCntrs, PHASE_COMPLETE)). + cache.invokeAsync(info.jobId(), new UpdatePhaseProcessor(incrCntrs, PHASE_COMPLETE)). listenAsync(failsLog); break; @@ -480,8 +483,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param jobId Job id. * @param c Closure of operation. */ - private void transform(GridHadoopJobId jobId, IgniteClosure<GridHadoopJobMetadata, GridHadoopJobMetadata> c) { - jobMetaCache().transformAsync(jobId, c).listenAsync(failsLog); + private void transform(GridHadoopJobId jobId, EntryProcessor<GridHadoopJobId, GridHadoopJobMetadata, Void> c) { + jobMetaCache().invokeAsync(jobId, c).listenAsync(failsLog); } /** @@ -493,7 +496,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { */ public void onExternalMappersInitialized(GridHadoopJobId jobId, Collection<Integer> reducers, GridHadoopProcessDescriptor desc) { - transform(jobId, new InitializeReducersClosure(null, reducers, desc)); + transform(jobId, new InitializeReducersProcessor(null, reducers, desc)); } /** @@ -601,7 +604,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { } if (cancelSplits != null || cancelReducers != null) - jobMetaCache().transform(meta.jobId(), new CancelJobClosure(null, new IgniteCheckedException( + jobMetaCache().invoke(meta.jobId(), new CancelJobProcessor(null, new IgniteCheckedException( "One or more nodes participating in map-reduce job execution failed."), cancelSplits, cancelReducers)); } @@ -615,8 +618,10 @@ public class GridHadoopJobTracker extends GridHadoopComponent { /** * @param updated Updated cache entries. + * @throws IgniteCheckedException If failed. */ - private void processJobMetadataUpdates(Iterable<Map.Entry<GridHadoopJobId, GridHadoopJobMetadata>> updated) throws IgniteCheckedException { + private void processJobMetadataUpdates(Iterable<Map.Entry<GridHadoopJobId, GridHadoopJobMetadata>> updated) + throws IgniteCheckedException { UUID locNodeId = ctx.localNodeId(); for (Map.Entry<GridHadoopJobId, GridHadoopJobMetadata> entry : updated) { @@ -637,7 +642,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { U.error(log, "Failed to process job state changed callback (will fail the job) " + "[locNodeId=" + locNodeId + ", jobId=" + jobId + ", meta=" + meta + ']', e); - transform(jobId, new CancelJobClosure(null, e)); + transform(jobId, new CancelJobProcessor(null, e)); continue; } @@ -780,7 +785,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { } if (!cancelMappers.isEmpty() || !cancelReducers.isEmpty()) - transform(jobId, new CancelJobClosure(null, cancelMappers, cancelReducers)); + transform(jobId, new CancelJobProcessor(null, cancelMappers, cancelReducers)); } break; @@ -1017,7 +1022,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { if (meta != null && meta.phase() != PHASE_COMPLETE && meta.phase() != PHASE_CANCELLING) { GridHadoopTaskCancelledException err = new GridHadoopTaskCancelledException("Job cancelled."); - jobMetaCache().transform(jobId, new CancelJobClosure(null, err)); + jobMetaCache().invoke(jobId, new CancelJobProcessor(null, err)); } } finally { @@ -1146,13 +1151,13 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param status Task status. * @param prev Previous closure. */ - private void onSetupFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedClosure prev) { + private void onSetupFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) { final GridHadoopJobId jobId = taskInfo.jobId(); if (status.state() == FAILED || status.state() == CRASHED) - transform(jobId, new CancelJobClosure(prev, status.failCause())); + transform(jobId, new CancelJobProcessor(prev, status.failCause())); else - transform(jobId, new UpdatePhaseClosure(prev, PHASE_MAP)); + transform(jobId, new UpdatePhaseProcessor(prev, PHASE_MAP)); } /** @@ -1161,14 +1166,14 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param prev Previous closure. */ private void onMapFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, - final StackedClosure prev) { + final StackedProcessor prev) { final GridHadoopJobId jobId = taskInfo.jobId(); boolean lastMapperFinished = completedMappersCnt.incrementAndGet() == currMappers.size(); if (status.state() == FAILED || status.state() == CRASHED) { // Fail the whole job. - transform(jobId, new RemoveMappersClosure(prev, taskInfo.inputSplit(), status.failCause())); + transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), status.failCause())); return; } @@ -1186,7 +1191,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { } } - transform(jobId, new RemoveMappersClosure(prev, taskInfo.inputSplit(), err)); + transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), err)); } }; @@ -1201,13 +1206,13 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param status Task status. * @param prev Previous closure. */ - private void onReduceFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedClosure prev) { + private void onReduceFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) { GridHadoopJobId jobId = taskInfo.jobId(); if (status.state() == FAILED || status.state() == CRASHED) // Fail the whole job. - transform(jobId, new RemoveReducerClosure(prev, taskInfo.taskNumber(), status.failCause())); + transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber(), status.failCause())); else - transform(jobId, new RemoveReducerClosure(prev, taskInfo.taskNumber())); + transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber())); } /** @@ -1216,12 +1221,12 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param prev Previous closure. */ private void onCombineFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, - final StackedClosure prev) { + final StackedProcessor prev) { final GridHadoopJobId jobId = taskInfo.jobId(); if (status.state() == FAILED || status.state() == CRASHED) // Fail the whole job. - transform(jobId, new RemoveMappersClosure(prev, currMappers, status.failCause())); + transform(jobId, new RemoveMappersProcessor(prev, currMappers, status.failCause())); else { ctx.shuffle().flush(jobId).listenAsync(new CIX1<IgniteFuture<?>>() { @Override public void applyx(IgniteFuture<?> f) { @@ -1236,7 +1241,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { } } - transform(jobId, new RemoveMappersClosure(prev, currMappers, err)); + transform(jobId, new RemoveMappersProcessor(prev, currMappers, err)); } }); } @@ -1272,7 +1277,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { /** * Update job phase transform closure. */ - private static class UpdatePhaseClosure extends StackedClosure { + private static class UpdatePhaseProcessor extends StackedProcessor { /** */ private static final long serialVersionUID = 0L; @@ -1283,7 +1288,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param prev Previous closure. * @param phase Phase to update. */ - private UpdatePhaseClosure(@Nullable StackedClosure prev, GridHadoopJobPhase phase) { + private UpdatePhaseProcessor(@Nullable StackedProcessor prev, GridHadoopJobPhase phase) { super(prev); this.phase = phase; @@ -1298,7 +1303,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { /** * Remove mapper transform closure. */ - private static class RemoveMappersClosure extends StackedClosure { + private static class RemoveMappersProcessor extends StackedProcessor { /** */ private static final long serialVersionUID = 0L; @@ -1313,7 +1318,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param split Mapper split to remove. * @param err Error. */ - private RemoveMappersClosure(@Nullable StackedClosure prev, GridHadoopInputSplit split, Throwable err) { + private RemoveMappersProcessor(@Nullable StackedProcessor prev, GridHadoopInputSplit split, Throwable err) { this(prev, Collections.singletonList(split), err); } @@ -1321,8 +1326,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param prev Previous closure. * @param splits Mapper splits to remove. */ - private RemoveMappersClosure(@Nullable StackedClosure prev, Collection<GridHadoopInputSplit> splits, - Throwable err) { + private RemoveMappersProcessor(@Nullable StackedProcessor prev, Collection<GridHadoopInputSplit> splits, + Throwable err) { super(prev); this.splits = splits; @@ -1354,7 +1359,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { /** * Remove reducer transform closure. */ - private static class RemoveReducerClosure extends StackedClosure { + private static class RemoveReducerProcessor extends StackedProcessor { /** */ private static final long serialVersionUID = 0L; @@ -1368,7 +1373,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param prev Previous closure. * @param rdc Reducer to remove. */ - private RemoveReducerClosure(@Nullable StackedClosure prev, int rdc) { + private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc) { super(prev); this.rdc = rdc; @@ -1378,7 +1383,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param prev Previous closure. * @param rdc Reducer to remove. */ - private RemoveReducerClosure(@Nullable StackedClosure prev, int rdc, Throwable err) { + private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc, Throwable err) { super(prev); this.rdc = rdc; @@ -1403,7 +1408,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { /** * Initialize reducers. */ - private static class InitializeReducersClosure extends StackedClosure { + private static class InitializeReducersProcessor extends StackedProcessor { /** */ private static final long serialVersionUID = 0L; @@ -1418,7 +1423,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param rdc Reducers to initialize. * @param desc External process descriptor. */ - private InitializeReducersClosure(@Nullable StackedClosure prev, Collection<Integer> rdc, + private InitializeReducersProcessor(@Nullable StackedProcessor prev, + Collection<Integer> rdc, GridHadoopProcessDescriptor desc) { super(prev); @@ -1446,7 +1452,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { /** * Remove reducer transform closure. */ - private static class CancelJobClosure extends StackedClosure { + private static class CancelJobProcessor extends StackedProcessor { /** */ private static final long serialVersionUID = 0L; @@ -1463,7 +1469,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param prev Previous closure. * @param err Fail cause. */ - private CancelJobClosure(@Nullable StackedClosure prev, Throwable err) { + private CancelJobProcessor(@Nullable StackedProcessor prev, Throwable err) { this(prev, err, null, null); } @@ -1472,7 +1478,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param splits Splits to remove. * @param rdc Reducers to remove. */ - private CancelJobClosure(@Nullable StackedClosure prev, Collection<GridHadoopInputSplit> splits, + private CancelJobProcessor(@Nullable StackedProcessor prev, + Collection<GridHadoopInputSplit> splits, Collection<Integer> rdc) { this(prev, null, splits, rdc); } @@ -1483,7 +1490,9 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param splits Splits to remove. * @param rdc Reducers to remove. */ - private CancelJobClosure(@Nullable StackedClosure prev, Throwable err, Collection<GridHadoopInputSplit> splits, + private CancelJobProcessor(@Nullable StackedProcessor prev, + Throwable err, + Collection<GridHadoopInputSplit> splits, Collection<Integer> rdc) { super(prev); @@ -1522,7 +1531,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { /** * Increment counter values closure. */ - private static class IncrementCountersClosure extends StackedClosure { + private static class IncrementCountersProcessor extends StackedProcessor { /** */ private static final long serialVersionUID = 0L; @@ -1533,7 +1542,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { * @param prev Previous closure. * @param counters Task counters to add into job counters. */ - private IncrementCountersClosure(@Nullable StackedClosure prev, GridHadoopCounters counters) { + private IncrementCountersProcessor(@Nullable StackedProcessor prev, GridHadoopCounters counters) { super(prev); assert counters != null; @@ -1554,22 +1563,33 @@ public class GridHadoopJobTracker extends GridHadoopComponent { /** * Abstract stacked closure. */ - private abstract static class StackedClosure implements IgniteClosure<GridHadoopJobMetadata, GridHadoopJobMetadata> { + private abstract static class StackedProcessor implements + EntryProcessor<GridHadoopJobId, GridHadoopJobMetadata, Void>, Serializable { /** */ private static final long serialVersionUID = 0L; /** */ - private final StackedClosure prev; + private final StackedProcessor prev; /** * @param prev Previous closure. */ - private StackedClosure(@Nullable StackedClosure prev) { + private StackedProcessor(@Nullable StackedProcessor prev) { this.prev = prev; } /** {@inheritDoc} */ - @Override public final GridHadoopJobMetadata apply(GridHadoopJobMetadata meta) { + @Override public Void process(MutableEntry<GridHadoopJobId, GridHadoopJobMetadata> e, Object... args) { + e.setValue(apply(e.getValue())); + + return null; + } + + /** + * @param meta Old value. + * @return New value. + */ + private GridHadoopJobMetadata apply(GridHadoopJobMetadata meta) { if (meta == null) return null;