#ignite-1087: move affinity key and affinity cache to FailoverContext.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c9f9460a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c9f9460a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c9f9460a Branch: refs/heads/ignite-1087 Commit: c9f9460a5be54baafa6cc2d8cc4de53831a0cbb6 Parents: 585043d Author: ivasilinets <ivasilin...@gridgain.com> Authored: Thu Jul 9 15:07:34 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Thu Jul 9 15:07:34 2015 +0300 ---------------------------------------------------------------------- .../failover/GridFailoverContextImpl.java | 22 +++++++++++++++++++- .../managers/failover/GridFailoverManager.java | 7 +++++-- .../processors/task/GridTaskWorker.java | 9 +------- .../ignite/spi/failover/FailoverContext.java | 17 +++++++++++++++ .../spi/failover/always/AlwaysFailoverSpi.java | 14 ++----------- .../cache/GridCacheAffinityRoutingSelfTest.java | 2 ++ .../spi/failover/GridFailoverTestContext.java | 10 +++++++++ 7 files changed, 58 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java index a3f8e44..0c1e7d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java @@ -41,15 +41,23 @@ public class GridFailoverContextImpl implements FailoverContext { @GridToStringExclude private final GridLoadBalancerManager loadMgr; + /** Affinity key for affinityCall. */ + private final Object affKey; + + /** Affinity cache name for affinityCall. */ + private final String affCacheName; + /** * Initializes failover context. * * @param taskSes Grid task session. * @param jobRes Failed job result. * @param loadMgr Load manager. + * @param affKey Affinity key. + * @param affCacheName Affinity cache name. */ public GridFailoverContextImpl(GridTaskSessionImpl taskSes, ComputeJobResult jobRes, - GridLoadBalancerManager loadMgr) { + GridLoadBalancerManager loadMgr, Object affKey, String affCacheName) { assert taskSes != null; assert jobRes != null; assert loadMgr != null; @@ -57,6 +65,8 @@ public class GridFailoverContextImpl implements FailoverContext { this.taskSes = taskSes; this.jobRes = jobRes; this.loadMgr = loadMgr; + this.affKey = affKey; + this.affCacheName = affCacheName; } /** {@inheritDoc} */ @@ -75,6 +85,16 @@ public class GridFailoverContextImpl implements FailoverContext { } /** {@inheritDoc} */ + @Override public Object affinityKey() { + return affKey; + } + + /** {@inheritDoc} */ + @Override public String affinityCacheName() { + return affCacheName; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridFailoverContextImpl.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java index 714cccb..4102514 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java @@ -57,10 +57,13 @@ public class GridFailoverManager extends GridManagerAdapter<FailoverSpi> { * @param taskSes Task session. * @param jobRes Job result. * @param top Collection of all top nodes that does not include the failed node. + * @param affKey Affinity key. + * @param affCacheName Affinity cache name. * @return New node to route this job to. */ - public ClusterNode failover(GridTaskSessionImpl taskSes, ComputeJobResult jobRes, List<ClusterNode> top) { + public ClusterNode failover(GridTaskSessionImpl taskSes, ComputeJobResult jobRes, List<ClusterNode> top, + Object affKey, String affCacheName) { return getSpi(taskSes.getFailoverSpi()).failover(new GridFailoverContextImpl(taskSes, jobRes, - ctx.loadBalancing()), top); + ctx.loadBalancing(), affKey, affCacheName), top); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 61bb62a..c725a42 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -34,7 +34,6 @@ import org.apache.ignite.internal.visor.util.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.resources.*; -import org.apache.ignite.spi.failover.always.*; import org.jetbrains.annotations.*; import org.jsr166.*; @@ -569,9 +568,6 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { Collection<? extends ClusterNode> subgrid = top != null ? ctx.discovery().nodes(top) : ctx.discovery().allNodes(); - if (affKey != null) - return Collections.singletonList(ctx.affinity().mapKeyToNode(affCache, affKey)); - int size = subgrid.size(); if (size == 0) @@ -782,9 +778,6 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { } case FAILOVER: { - if (jobRes != null && affKey != null) - jobRes.getJobContext().setAttribute(AlwaysFailoverSpi.AFFINITY_CALL_FLAG, true); - if (!failover(res, jobRes, getTaskTopology())) plc = null; @@ -984,7 +977,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class); // Map to a new node. - ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top)); + ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affKey, affCache); if (node == null) { String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java index b0cae92..eddd9ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java @@ -20,6 +20,7 @@ package org.apache.ignite.spi.failover; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; +import org.apache.ignite.lang.*; import java.util.*; @@ -52,4 +53,20 @@ public interface FailoverContext { * @throws IgniteException If anything failed. */ public ClusterNode getBalancedNode(List<ClusterNode> top) throws IgniteException; + + /** + * Gets affinity key for {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)} + * and {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)}. + * + * @return Affinity key. + */ + public Object affinityKey(); + + /** + * Returns affinity cache name {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)} + * and {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)}. + * + * @return Cache name. + */ + public String affinityCacheName(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java index c5c0f18..ec07abb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java @@ -99,14 +99,6 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, */ public static final String AFFINITY_CALL_ATTEMPT = "ignite:failover:affinitycallattempt"; - /** - * Name of job context attribute containing affinity call flag for affinity call. - * - * @see org.apache.ignite.compute.ComputeJobContext - */ - public static final String AFFINITY_CALL_FLAG = "ignite:failover:affinitycall"; - - /** Maximum attempts attribute key should be the same on all nodes. */ public static final String MAX_FAILOVER_ATTEMPT_ATTR = "gg:failover:maxattempts"; @@ -190,9 +182,7 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, Collection<UUID> failedNodes = ctx.getJobResult().getJobContext().getAttribute(FAILED_NODE_LIST_ATTR); - Boolean affCall = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_FLAG); - - if (affCall != null && affCall) { + if (ctx.affinityKey() != null) { Integer affCallAttempt = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_ATTEMPT); if (affCallAttempt == null) @@ -208,7 +198,7 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, else { ctx.getJobResult().getJobContext().setAttribute(AFFINITY_CALL_ATTEMPT, affCallAttempt + 1); - return top.get(0); + return ignite.affinity(ctx.affinityCacheName()).mapKeyToNode(ctx.affinityKey()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java index 38de9a1..80e558b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java @@ -269,6 +269,8 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest { System.out.println("Attempt=" + attempt); + System.out.println("RUN ON NODE: " + ignite.cluster().localNode().id()); + assertEquals(ignite.affinity(NON_DFLT_CACHE_NAME).mapKeyToNode(key), ignite.cluster().localNode()); jobCtx.setAttribute("Attempt", attempt + 1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f9460a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java b/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java index db64475..bfca83d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java @@ -66,4 +66,14 @@ public class GridFailoverTestContext implements FailoverContext { @Override public ClusterNode getBalancedNode(List<ClusterNode> grid) { return grid.get(RAND.nextInt(grid.size())); } + + /** {@inheritDoc} */ + @Override public Object affinityKey() { + return null; + } + + /** {@inheritDoc} */ + @Override public String affinityCacheName() { + return null; + } }