#ignite-1087: add test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d52186b8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d52186b8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d52186b8 Branch: refs/heads/ignite-1087 Commit: d52186b8663c2e8d30aa06ad26aefad5be9a16a9 Parents: c9f9460 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Thu Jul 9 15:50:31 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Thu Jul 9 15:50:31 2015 +0300 ---------------------------------------------------------------------- .../processors/task/GridTaskWorker.java | 3 + .../cache/GridCacheAffinityRoutingSelfTest.java | 74 ++++++++++++++++---- 2 files changed, 63 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d52186b8/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index c725a42..ed9958f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -568,6 +568,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { Collection<? extends ClusterNode> subgrid = top != null ? ctx.discovery().nodes(top) : ctx.discovery().allNodes(); + if (affKey != null) + return Collections.singletonList(ctx.affinity().mapKeyToNode(affCache, affKey)); + int size = subgrid.size(); if (size == 0) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d52186b8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java index 80e558b..bb99406 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java @@ -22,12 +22,15 @@ import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.failover.always.*; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; @@ -50,6 +53,9 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest { private static final int KEY_CNT = 50; /** */ + private static final int MAX_FAILOVER_ATTEMPTS = 5; + + /** */ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); /** @@ -69,6 +75,10 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest { cfg.setDiscoverySpi(spi); + AlwaysFailoverSpi failSpi = new AlwaysFailoverSpi(); + failSpi.setMaximumFailoverAttempts(MAX_FAILOVER_ATTEMPTS); + cfg.setFailoverSpi(failSpi); + if (!gridName.equals(getTestGridName(GRID_CNT))) { // Default cache configuration. CacheConfiguration dfltCacheCfg = defaultCacheConfiguration(); @@ -134,16 +144,45 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testAffinityRunRestart() throws Exception { + public void testAffinityCallRestartFails() throws Exception { GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { - grid(0).compute().affinityRun(NON_DFLT_CACHE_NAME, "key", new FailedRunnable("key")); + grid(0).compute().affinityCall(NON_DFLT_CACHE_NAME, "key", + new FailedCallable("key", MAX_FAILOVER_ATTEMPTS + 1)); return null; } }, ClusterTopologyException.class, "Failed to failover a job to another node"); } /** + * @throws Exception If failed. + */ + public void testAffinityCallRestart() throws Exception { + assertEquals(MAX_FAILOVER_ATTEMPTS, + grid(0).compute().affinityCall(NON_DFLT_CACHE_NAME, "key", + new FailedCallable("key", MAX_FAILOVER_ATTEMPTS))); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityCallRestartNode() throws Exception { + Integer key = primaryKey(grid(0).cache(NON_DFLT_CACHE_NAME)); + + IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + U.sleep(500); + stopGrid(0); + + return null; + } + }); + + while (!fut.isDone()) + grid(1).compute().affinityCall(NON_DFLT_CACHE_NAME, key, new CheckCallable(key, key)); + } + + /** * JUnit. * * @throws Exception If failed. @@ -241,7 +280,10 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest { /** * Test runnable. */ - private static class FailedRunnable extends CAX { + private static class FailedCallable implements IgniteCallable<Object> { + /** */ + private static final String ATTR_ATTEMPT = "Attempt"; + /** */ @IgniteInstanceResource private Ignite ignite; @@ -253,29 +295,33 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest { /** Key. */ private final Object key; + /** Call attempts. */ + private final Integer callAttempt; + /** * @param key Key. + * @param callAttempt Call attempts. */ - public FailedRunnable(Object key) { + public FailedCallable(Object key, Integer callAttempt) { this.key = key; + this.callAttempt = callAttempt; } /** {@inheritDoc} */ - @Override public void applyx() throws IgniteCheckedException { - Integer attempt = jobCtx.getAttribute("Attempt"); + @Override public Object call() throws IgniteCheckedException { + Integer attempt = jobCtx.getAttribute(ATTR_ATTEMPT); if (attempt == null) - attempt = 0; - - System.out.println("Attempt=" + attempt); - - System.out.println("RUN ON NODE: " + ignite.cluster().localNode().id()); + attempt = 1; assertEquals(ignite.affinity(NON_DFLT_CACHE_NAME).mapKeyToNode(key), ignite.cluster().localNode()); - jobCtx.setAttribute("Attempt", attempt + 1); + jobCtx.setAttribute(ATTR_ATTEMPT, attempt + 1); - throw new ComputeJobFailoverException("Failover exception."); + if (attempt < callAttempt) + throw new ComputeJobFailoverException("Failover exception."); + else + return attempt; } } @@ -310,7 +356,7 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest { @Override public Object call() throws IgniteCheckedException { assert ignite.cluster().localNode().id().equals(ignite.cluster().mapKeyToNode(null, affKey).id()); assert ignite.cluster().localNode().id().equals(ignite.cluster().mapKeyToNode(null, key).id()); - + System.out.println("CALL ON NODE=" + ignite.name()); return null; } }