Ignite-176 fixed GridClosureProcessor routing
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/46d4d260 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/46d4d260 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/46d4d260 Branch: refs/heads/ignite-194 Commit: 46d4d2608e1fa96081535beda6b4493572895523 Parents: f5901bc Author: avinogradov <avinogra...@gridgain.com> Authored: Mon Feb 9 14:04:50 2015 +0300 Committer: avinogradov <avinogra...@gridgain.com> Committed: Mon Feb 9 14:04:50 2015 +0300 ---------------------------------------------------------------------- .../closure/GridClosureProcessor.java | 286 ++----------------- 1 file changed, 18 insertions(+), 268 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/46d4d260/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index c5dda9a..0ca6355 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -416,9 +416,11 @@ public class GridClosureProcessor extends GridProcessorAdapter { // In case cache key is passed instead of affinity key. final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey); + final ClusterNode node = ctx.affinity().mapKeyToNode(affKey0); + ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T5<>(cacheName, affKey0, job), null, false); + return ctx.task().execute(new T5<>(node, job), null, false); } catch (IgniteCheckedException e) { return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, e); @@ -446,9 +448,11 @@ public class GridClosureProcessor extends GridProcessorAdapter { // In case cache key is passed instead of affinity key. final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey); + final ClusterNode node = ctx.affinity().mapKeyToNode(affKey0); + ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T4(cacheName, affKey0, job), null, false); + return ctx.task().execute(new T4(node, job), null, false); } catch (IgniteCheckedException e) { return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, e); @@ -1006,21 +1010,6 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** * Converts given closure to a grid job. * - * @param c Closure to convert to grid job. - * @param cacheName Cache name. - * @param affKey Affinity key. - * @return Grid job made out of closure. - */ - private static <R> ComputeJob job(final Callable<R> c, @Nullable final String cacheName, final Object affKey) { - A.notNull(c, "job"); - - return c instanceof ComputeJobMasterLeaveAware ? new C3MLA<>(c, cacheName, affKey) : - new C3<>(c, cacheName, affKey); - } - - /** - * Converts given closure to a grid job. - * * @param r Closure to convert to grid job. * @return Grid job made out of closure. */ @@ -1030,20 +1019,6 @@ public class GridClosureProcessor extends GridProcessorAdapter { return r instanceof ComputeJobMasterLeaveAware ? new C4MLA(r) : new C4(r); } - /** - * Converts given closure to a grid job. - * - * @param r Closure to convert to grid job. - * @param cacheName Cache name. - * @param affKey Affinity key. - * @return Grid job made out of closure. - */ - private static ComputeJob job(final Runnable r, @Nullable final String cacheName, final Object affKey) { - A.notNull(r, "job"); - - return r instanceof ComputeJobMasterLeaveAware ? new C5MLA(r, cacheName, affKey) : new C5(r, cacheName, affKey); - } - /** */ private class JobMapper { /** */ @@ -1232,36 +1207,27 @@ public class GridClosureProcessor extends GridProcessorAdapter { private static final long serialVersionUID = 0L; /** */ - private final String cacheName; - - /** */ - private Object affKey; + private ClusterNode node; /** */ private Runnable job; - /** */ - @LoadBalancerResource - private ComputeLoadBalancer lb; - /** - * @param cacheName Cache name. - * @param affKey Affinity key. + * @param node Cluster node. * @param job Job. */ - private T4(@Nullable String cacheName, Object affKey, Runnable job) { + private T4(ClusterNode node, Runnable job) { super(U.peerDeployAware0(job)); - this.cacheName = cacheName; - this.affKey = affKey; + this.node = node; this.job = job; } /** {@inheritDoc} */ @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) { - ComputeJob job = job(this.job, cacheName, affKey); + ComputeJob job = job(this.job); - return Collections.singletonMap(job, lb.getBalancedNode(job, null)); + return Collections.singletonMap(job, node); } } @@ -1272,36 +1238,27 @@ public class GridClosureProcessor extends GridProcessorAdapter { private static final long serialVersionUID = 0L; /** */ - private final String cacheName; - - /** */ - private Object affKey; + private ClusterNode node; /** */ private Callable<R> job; - /** */ - @LoadBalancerResource - private ComputeLoadBalancer lb; - /** - * @param cacheName Cache name. - * @param affKey Affinity key. + * @param node Cluster node. * @param job Job. */ - private T5(@Nullable String cacheName, Object affKey, Callable<R> job) { + private T5(ClusterNode node, Callable<R> job) { super(U.peerDeployAware0(job)); - this.cacheName = cacheName; - this.affKey = affKey; + this.node = node; this.job = job; } /** {@inheritDoc} */ @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) { - ComputeJob job = job(this.job, cacheName, affKey); + ComputeJob job = job(this.job); - return Collections.singletonMap(job, lb.getBalancedNode(job, null)); + return Collections.singletonMap(job, node); } /** {@inheritDoc} */ @@ -1792,112 +1749,6 @@ public class GridClosureProcessor extends GridProcessorAdapter { } /** - * - */ - private static class C3<R> implements ComputeJob, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @CacheName - private String cn; - - /** */ - @CacheAffinityKeyMapped - private Object ak; - - - /** */ - protected Callable<R> c; - - /** - * - */ - public C3(){ - // No-op. - } - - /** - * @param c Callable. - * @param cacheName Cache name. - * @param affKey Affinity key. - */ - public C3(Callable<R> c, @Nullable String cacheName, Object affKey) { - this.cn = cacheName; - this.ak = affKey; - this.c = c; - } - - /** {@inheritDoc} */ - @Override public Object execute() { - try { - return c.call(); - } - catch (Exception e) { - throw new IgniteException(e); - } - } - - /** {@inheritDoc} */ - @Override public void cancel() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(cn); - out.writeObject(ak); - out.writeObject(c); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cn = (String)in.readObject(); - ak = in.readObject(); - c = (Callable<R>)in.readObject(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(C3.class, this); - } - } - - /** - * - */ - private static class C3MLA<R> extends C3<R> implements ComputeJobMasterLeaveAware{ - /** */ - private static final long serialVersionUID = 0L; - - /** - * - */ - public C3MLA() { - super(); - } - - /** - * @param c Callable. - * @param cacheName Cache name. - * @param affKey Affinity key. - */ - public C3MLA(Callable<R> c, @Nullable String cacheName, Object affKey) { - super(c, cacheName, affKey); - } - - /** {@inheritDoc} */ - @Override public void onMasterNodeLeft(ComputeTaskSession ses) { - ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(C3MLA.class, this, super.toString()); - } - } - - /** */ private static class C4 implements ComputeJob, Externalizable { /** */ @@ -1979,105 +1830,4 @@ public class GridClosureProcessor extends GridProcessorAdapter { return S.toString(C4MLA.class, this, super.toString()); } } - - /** - */ - private static class C5 implements ComputeJob, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @CacheName - private String cn; - - /** */ - @CacheAffinityKeyMapped - private Object ak; - - /** */ - protected Runnable r; - - /** - * - */ - public C5(){ - // No-op. - } - - /** - * @param r Runnable. - * @param cacheName Cache name. - * @param affKey Affinity key. - */ - public C5(Runnable r, @Nullable String cacheName, Object affKey) { - this.cn = cacheName; - this.ak = affKey; - this.r = r; - } - - /** {@inheritDoc} */ - @Nullable @Override public Object execute() { - r.run(); - - return null; - } - - /** {@inheritDoc} */ - @Override public void cancel() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(cn); - out.writeObject(ak); - out.writeObject(r); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cn = (String)in.readObject(); - ak = in.readObject(); - r = (Runnable)in.readObject(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(C5.class, this, super.toString()); - } - } - - /** - * - */ - private static class C5MLA extends C5 implements ComputeJobMasterLeaveAware{ - /** */ - private static final long serialVersionUID = 0L; - - /** - * - */ - public C5MLA() { - super(); - } - - /** - * @param r Runnable. - * @param cacheName Cache name. - * @param affKey Affinity key. - */ - public C5MLA(Runnable r, @Nullable String cacheName, Object affKey) { - super(r, cacheName, affKey); - } - - /** {@inheritDoc} */ - @Override public void onMasterNodeLeft(ComputeTaskSession ses) { - ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(C5MLA.class, this, super.toString()); - } - } }