# ignite-1087
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b2770d55 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b2770d55 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b2770d55 Branch: refs/heads/ignite-1087 Commit: b2770d55d33b9b836030e73869875cc834ed100a Parents: b50067c Author: sboikov <sboi...@gridgain.com> Authored: Fri Jul 10 10:30:35 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Jul 10 10:30:35 2015 +0300 ---------------------------------------------------------------------- .../ignite/compute/ComputeJobResultPolicy.java | 3 +- .../failover/GridFailoverContextImpl.java | 12 ++-- .../managers/failover/GridFailoverManager.java | 10 ++- .../processors/closure/AffinityTask.java | 35 ++++++++++ .../closure/GridClosureProcessor.java | 71 ++++++++++++++++---- .../task/GridTaskThreadContextKey.java | 6 -- .../processors/task/GridTaskWorker.java | 20 ++++-- .../ignite/spi/failover/FailoverContext.java | 5 +- .../spi/failover/always/AlwaysFailoverSpi.java | 4 +- 9 files changed, 127 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java index 37aba91..26eb542 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java @@ -50,8 +50,7 @@ public enum ComputeJobResultPolicy { * @param ord Ordinal value. * @return Enumerated value. */ - @Nullable - public static ComputeJobResultPolicy fromOrdinal(byte ord) { + @Nullable public static ComputeJobResultPolicy fromOrdinal(byte ord) { return ord >= 0 && ord < VALS.length ? VALS[ord] : null; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java index 0c1e7d8..c2b104e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.managers.loadbalancer.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.spi.failover.*; +import org.jetbrains.annotations.*; import java.util.*; @@ -56,8 +57,11 @@ public class GridFailoverContextImpl implements FailoverContext { * @param affKey Affinity key. * @param affCacheName Affinity cache name. */ - public GridFailoverContextImpl(GridTaskSessionImpl taskSes, ComputeJobResult jobRes, - GridLoadBalancerManager loadMgr, Object affKey, String affCacheName) { + public GridFailoverContextImpl(GridTaskSessionImpl taskSes, + ComputeJobResult jobRes, + GridLoadBalancerManager loadMgr, + @Nullable Object affKey, + @Nullable String affCacheName) { assert taskSes != null; assert jobRes != null; assert loadMgr != null; @@ -85,12 +89,12 @@ public class GridFailoverContextImpl implements FailoverContext { } /** {@inheritDoc} */ - @Override public Object affinityKey() { + @Nullable @Override public Object affinityKey() { return affKey; } /** {@inheritDoc} */ - @Override public String affinityCacheName() { + @Nullable @Override public String affinityCacheName() { return affCacheName; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java index 4102514..dffc965 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java @@ -23,6 +23,7 @@ import org.apache.ignite.compute.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.*; import org.apache.ignite.spi.failover.*; +import org.jetbrains.annotations.*; import java.util.*; @@ -56,13 +57,16 @@ public class GridFailoverManager extends GridManagerAdapter<FailoverSpi> { /** * @param taskSes Task session. * @param jobRes Job result. - * @param top Collection of all top nodes that does not include the failed node. + * @param top Collection of all topology nodes. * @param affKey Affinity key. * @param affCacheName Affinity cache name. * @return New node to route this job to. */ - public ClusterNode failover(GridTaskSessionImpl taskSes, ComputeJobResult jobRes, List<ClusterNode> top, - Object affKey, String affCacheName) { + public ClusterNode failover(GridTaskSessionImpl taskSes, + ComputeJobResult jobRes, + List<ClusterNode> top, + @Nullable Object affKey, + @Nullable String affCacheName) { return getSpi(taskSes.getFailoverSpi()).failover(new GridFailoverContextImpl(taskSes, jobRes, ctx.loadBalancing(), affKey, affCacheName), top); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java new file mode 100644 index 0000000..1b32444 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.closure; + +import org.jetbrains.annotations.*; + +/** + * Affinity mapped task. + */ +public interface AffinityTask { + /** + * @return Affinity key. + */ + public Object affinityKey(); + + /** + * @return Affinity cache name. + */ + @Nullable public String affinityCacheName(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index 02b14a1..21bfc11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -413,13 +413,12 @@ public class GridClosureProcessor extends GridProcessorAdapter { final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0); - ctx.task().setThreadContext(TC_SUBGRID, nodes); - ctx.task().setThreadContext(TC_AFFINITY_KEY, affKey0); + if (node == null) + return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, U.emptyTopologyException()); - if (cacheName != null) - ctx.task().setThreadContext(TC_AFFINITY_CACHE, cacheName); + ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T5(node, job), null, false); + return ctx.task().execute(new T5(node, job, affKey0, cacheName), null, false); } catch (IgniteCheckedException e) { return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, e); @@ -449,13 +448,12 @@ public class GridClosureProcessor extends GridProcessorAdapter { final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0); - ctx.task().setThreadContext(TC_SUBGRID, nodes); - ctx.task().setThreadContext(TC_AFFINITY_KEY, affKey0); + if (node == null) + return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, U.emptyTopologyException()); - if (cacheName != null) - ctx.task().setThreadContext(TC_AFFINITY_CACHE, cacheName); + ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T4(node, job), null, false); + return ctx.task().execute(new T4(node, job, affKey0, cacheName), null, false); } catch (IgniteCheckedException e) { return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, e); @@ -1231,7 +1229,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** */ - private static class T4 extends TaskNoReduceAdapter<Void> implements GridNoImplicitInjection { + private static class T4 extends TaskNoReduceAdapter<Void> implements GridNoImplicitInjection, AffinityTask { /** */ private static final long serialVersionUID = 0L; @@ -1241,15 +1239,27 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** */ private Runnable job; + /** */ + private Object affKey; + + /** */ + private String affCacheName; + /** * @param node Cluster node. * @param job Job. + * @param affKey Affinity key. + * @param affCacheName Affinity cache name. */ - private T4(ClusterNode node, Runnable job) { + private T4(ClusterNode node, Runnable job, Object affKey, String affCacheName) { super(U.peerDeployAware0(job)); + assert affKey != null; + this.node = node; this.job = job; + this.affKey = affKey; + this.affCacheName = affCacheName; } /** {@inheritDoc} */ @@ -1258,11 +1268,22 @@ public class GridClosureProcessor extends GridProcessorAdapter { return Collections.singletonMap(job, node); } + + /** {@inheritDoc} */ + @Override public Object affinityKey() { + return affKey; + } + + /** {@inheritDoc} */ + @Nullable @Override public String affinityCacheName() { + return affCacheName; + } } /** */ - private static class T5<R> extends GridPeerDeployAwareTaskAdapter<Void, R> implements GridNoImplicitInjection { + private static class T5<R> extends GridPeerDeployAwareTaskAdapter<Void, R> implements + GridNoImplicitInjection, AffinityTask { /** */ private static final long serialVersionUID = 0L; @@ -1272,15 +1293,27 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** */ private Callable<R> job; + /** */ + private Object affKey; + + /** */ + private String affCacheName; + /** * @param node Cluster node. * @param job Job. + * @param affKey Affinity key. + * @param affCacheName Affinity cache name. */ - private T5(ClusterNode node, Callable<R> job) { + private T5(ClusterNode node, Callable<R> job, Object affKey, String affCacheName) { super(U.peerDeployAware0(job)); + assert affKey != null; + this.node = node; this.job = job; + this.affKey = affKey; + this.affCacheName = affCacheName; } /** {@inheritDoc} */ @@ -1299,6 +1332,16 @@ public class GridClosureProcessor extends GridProcessorAdapter { throw new IgniteException("Failed to find successful job result: " + res); } + + /** {@inheritDoc} */ + @Override public Object affinityKey() { + return affKey; + } + + /** {@inheritDoc} */ + @Nullable @Override public String affinityCacheName() { + return affCacheName; + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java index 0bfac62..df706cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java @@ -27,12 +27,6 @@ public enum GridTaskThreadContextKey { /** No failover flag. */ TC_NO_FAILOVER, - /** Affinity key. */ - TC_AFFINITY_KEY, - - /** Affinity cache. */ - TC_AFFINITY_CACHE, - /** Projection for the task. */ TC_SUBGRID, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index ed9958f..ee62df8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.compute.*; import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.processors.closure.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -252,8 +253,16 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { this.noFailover = noFailover != null ? noFailover : false; - this.affKey = getThreadContext(TC_AFFINITY_KEY); - this.affCache = getThreadContext(TC_AFFINITY_CACHE); + if (task instanceof AffinityTask) { + AffinityTask affTask = (AffinityTask)task; + + affKey = affTask.affinityKey(); + affCache = affTask.affinityCacheName(); + } + else { + affKey = null; + affCache = null; + } } /** @@ -406,7 +415,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { ses.setClassLoader(dep.classLoader()); - final List<ClusterNode> shuffledNodes = getTaskTopology(); + // Nodes are ignored by affinity tasks. + final List<ClusterNode> shuffledNodes = + affKey == null ? getTaskTopology() : Collections.<ClusterNode>emptyList(); // Load balancer. ComputeLoadBalancer balancer = ctx.loadBalancing().getLoadBalancer(ses, shuffledNodes); @@ -568,9 +579,6 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { Collection<? extends ClusterNode> subgrid = top != null ? ctx.discovery().nodes(top) : ctx.discovery().allNodes(); - if (affKey != null) - return Collections.singletonList(ctx.affinity().mapKeyToNode(affCache, affKey)); - int size = subgrid.size(); if (size == 0) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java index eddd9ea..865f1a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; import java.util.*; @@ -60,7 +61,7 @@ public interface FailoverContext { * * @return Affinity key. */ - public Object affinityKey(); + @Nullable public Object affinityKey(); /** * Returns affinity cache name {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)} @@ -68,5 +69,5 @@ public interface FailoverContext { * * @return Cache name. */ - public String affinityCacheName(); + @Nullable public String affinityCacheName(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2770d55/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java index eeaf18e..e925995 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java @@ -178,8 +178,6 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, return null; } - Collection<UUID> failedNodes = ctx.getJobResult().getJobContext().getAttribute(FAILED_NODE_LIST_ATTR); - if (ctx.affinityKey() != null) { Integer affCallAttempt = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_ATTEMPT); @@ -200,6 +198,8 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, } } + Collection<UUID> failedNodes = ctx.getJobResult().getJobContext().getAttribute(FAILED_NODE_LIST_ATTR); + if (failedNodes == null) failedNodes = U.newHashSet(1);