Repository: incubator-ignite Updated Branches: refs/heads/ignite-1087 [created] 355e33db0
#ignite-1087: affinity run runs several times on primary node. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/355e33db Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/355e33db Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/355e33db Branch: refs/heads/ignite-1087 Commit: 355e33db0b0551759a7ce976f6af53bc7ad5a36a Parents: d8f5b6f Author: ivasilinets <ivasilin...@gridgain.com> Authored: Fri Jul 3 14:37:55 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Fri Jul 3 14:37:55 2015 +0300 ---------------------------------------------------------------------- .../closure/GridClosureProcessor.java | 4 ++ .../task/GridTaskThreadContextKey.java | 6 +++ .../processors/task/GridTaskWorker.java | 16 ++++++ .../spi/failover/always/AlwaysFailoverSpi.java | 37 +++++++++++++ .../cache/GridCacheAffinityRoutingSelfTest.java | 56 +++++++++++++++++++- 5 files changed, 118 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/355e33db/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index 658557e..1eb53ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -414,6 +414,8 @@ public class GridClosureProcessor extends GridProcessorAdapter { final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0); ctx.task().setThreadContext(TC_SUBGRID, nodes); + ctx.task().setThreadContext(TC_AFFINITY_KEY, affKey0); + ctx.task().setThreadContext(TC_AFFINITY_CACHE, cacheName); return ctx.task().execute(new T5(node, job), null, false); } @@ -446,6 +448,8 @@ public class GridClosureProcessor extends GridProcessorAdapter { final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0); ctx.task().setThreadContext(TC_SUBGRID, nodes); + ctx.task().setThreadContext(TC_AFFINITY_KEY, affKey0); + ctx.task().setThreadContext(TC_AFFINITY_CACHE, cacheName); return ctx.task().execute(new T4(node, job), null, false); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/355e33db/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java index df706cf..0bfac62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java @@ -27,6 +27,12 @@ public enum GridTaskThreadContextKey { /** No failover flag. */ TC_NO_FAILOVER, + /** Affinity key. */ + TC_AFFINITY_KEY, + + /** Affinity cache. */ + TC_AFFINITY_CACHE, + /** Projection for the task. */ TC_SUBGRID, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/355e33db/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index eb5fa77..61bb62a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.visor.util.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.resources.*; +import org.apache.ignite.spi.failover.always.*; import org.jetbrains.annotations.*; import org.jsr166.*; @@ -136,6 +137,12 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { private final boolean noFailover; /** */ + private final Object affKey; + + /** */ + private final String affCache; + + /** */ private final UUID subjId; /** Continuous mapper. */ @@ -245,6 +252,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { Boolean noFailover = getThreadContext(TC_NO_FAILOVER); this.noFailover = noFailover != null ? noFailover : false; + + this.affKey = getThreadContext(TC_AFFINITY_KEY); + this.affCache = getThreadContext(TC_AFFINITY_CACHE); } /** @@ -559,6 +569,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { Collection<? extends ClusterNode> subgrid = top != null ? ctx.discovery().nodes(top) : ctx.discovery().allNodes(); + if (affKey != null) + return Collections.singletonList(ctx.affinity().mapKeyToNode(affCache, affKey)); + int size = subgrid.size(); if (size == 0) @@ -769,6 +782,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { } case FAILOVER: { + if (jobRes != null && affKey != null) + jobRes.getJobContext().setAttribute(AlwaysFailoverSpi.AFFINITY_CALL_FLAG, true); + if (!failover(res, jobRes, getTaskTopology())) plc = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/355e33db/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java index e075d3e..c5c0f18 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java @@ -92,6 +92,21 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, */ public static final String FAILED_NODE_LIST_ATTR = "gg:failover:failednodelist"; + /** + * Name of job context attribute containing affinity call flag for affinity call. + * + * @see org.apache.ignite.compute.ComputeJobContext + */ + public static final String AFFINITY_CALL_ATTEMPT = "ignite:failover:affinitycallattempt"; + + /** + * Name of job context attribute containing affinity call flag for affinity call. + * + * @see org.apache.ignite.compute.ComputeJobContext + */ + public static final String AFFINITY_CALL_FLAG = "ignite:failover:affinitycall"; + + /** Maximum attempts attribute key should be the same on all nodes. */ public static final String MAX_FAILOVER_ATTEMPT_ATTR = "gg:failover:maxattempts"; @@ -175,6 +190,28 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, Collection<UUID> failedNodes = ctx.getJobResult().getJobContext().getAttribute(FAILED_NODE_LIST_ATTR); + Boolean affCall = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_FLAG); + + if (affCall != null && affCall) { + Integer affCallAttempt = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_ATTEMPT); + + if (affCallAttempt == null) + affCallAttempt = 1; + + if (maxFailoverAttempts <= affCallAttempt) { + U.warn(log, "Job failover failed because number of maximum failover attempts for affinity call" + + " is exceeded [failedJob=" + ctx.getJobResult().getJob() + ", maxFailoverAttempts=" + + maxFailoverAttempts + ']'); + + return null; + } + else { + ctx.getJobResult().getJobContext().setAttribute(AFFINITY_CALL_ATTEMPT, affCallAttempt + 1); + + return top.get(0); + } + } + if (failedNodes == null) failedNodes = U.newHashSet(1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/355e33db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java index 78ecf08..38de9a1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java @@ -19,17 +19,20 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.optimized.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; +import java.util.concurrent.*; + import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; @@ -129,6 +132,18 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testAffinityRunRestart() throws Exception { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + grid(0).compute().affinityRun(NON_DFLT_CACHE_NAME, "key", new FailedRunnable("key")); + return null; + } + }, ClusterTopologyException.class, "Failed to failover a job to another node"); + } + + /** * JUnit. * * @throws Exception If failed. @@ -224,6 +239,45 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest { } /** + * Test runnable. + */ + private static class FailedRunnable extends CAX { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @JobContextResource + private ComputeJobContext jobCtx; + + /** Key. */ + private final Object key; + + /** + * @param key Key. + */ + public FailedRunnable(Object key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public void applyx() throws IgniteCheckedException { + Integer attempt = jobCtx.getAttribute("Attempt"); + + if (attempt == null) + attempt = 0; + + System.out.println("Attempt=" + attempt); + + assertEquals(ignite.affinity(NON_DFLT_CACHE_NAME).mapKeyToNode(key), ignite.cluster().localNode()); + + jobCtx.setAttribute("Attempt", attempt + 1); + + throw new ComputeJobFailoverException("Failover exception."); + } + } + + /** * Test callable. */ private static class CheckCallable implements IgniteCallable<Object> {