Ignite-176 fixed GridClosureProcessor routing

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/46d4d260
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/46d4d260
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/46d4d260

Branch: refs/heads/ignite-194
Commit: 46d4d2608e1fa96081535beda6b4493572895523
Parents: f5901bc
Author: avinogradov <avinogra...@gridgain.com>
Authored: Mon Feb 9 14:04:50 2015 +0300
Committer: avinogradov <avinogra...@gridgain.com>
Committed: Mon Feb 9 14:04:50 2015 +0300

----------------------------------------------------------------------
 .../closure/GridClosureProcessor.java           | 286 ++-----------------
 1 file changed, 18 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/46d4d260/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index c5dda9a..0ca6355 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -416,9 +416,11 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             // In case cache key is passed instead of affinity key.
             final Object affKey0 = ctx.affinity().affinityKey(cacheName, 
affKey);
 
+            final ClusterNode node = ctx.affinity().mapKeyToNode(affKey0);
+
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T5<>(cacheName, affKey0, job), null, 
false);
+            return ctx.task().execute(new T5<>(node, job), null, false);
         }
         catch (IgniteCheckedException e) {
             return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, e);
@@ -446,9 +448,11 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             // In case cache key is passed instead of affinity key.
             final Object affKey0 = ctx.affinity().affinityKey(cacheName, 
affKey);
 
+            final ClusterNode node = ctx.affinity().mapKeyToNode(affKey0);
+
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T4(cacheName, affKey0, job), null, 
false);
+            return ctx.task().execute(new T4(node, job), null, false);
         }
         catch (IgniteCheckedException e) {
             return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, e);
@@ -1006,21 +1010,6 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
     /**
      * Converts given closure to a grid job.
      *
-     * @param c Closure to convert to grid job.
-     * @param cacheName Cache name.
-     * @param affKey Affinity key.
-     * @return Grid job made out of closure.
-     */
-    private static <R> ComputeJob job(final Callable<R> c, @Nullable final 
String cacheName, final Object affKey) {
-        A.notNull(c, "job");
-
-        return c instanceof ComputeJobMasterLeaveAware ? new C3MLA<>(c, 
cacheName, affKey) :
-            new C3<>(c, cacheName, affKey);
-    }
-
-    /**
-     * Converts given closure to a grid job.
-     *
      * @param r Closure to convert to grid job.
      * @return Grid job made out of closure.
      */
@@ -1030,20 +1019,6 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
        return r instanceof ComputeJobMasterLeaveAware ? new C4MLA(r) : new 
C4(r);
     }
 
-    /**
-     * Converts given closure to a grid job.
-     *
-     * @param r Closure to convert to grid job.
-     * @param cacheName Cache name.
-     * @param affKey Affinity key.
-     * @return Grid job made out of closure.
-     */
-    private static ComputeJob job(final Runnable r, @Nullable final String 
cacheName, final Object affKey) {
-        A.notNull(r, "job");
-
-        return r instanceof ComputeJobMasterLeaveAware ? new C5MLA(r, 
cacheName, affKey) : new C5(r, cacheName, affKey);
-    }
-
     /** */
     private class JobMapper {
         /** */
@@ -1232,36 +1207,27 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
         private static final long serialVersionUID = 0L;
 
         /** */
-        private final String cacheName;
-
-        /** */
-        private Object affKey;
+        private ClusterNode node;
 
         /** */
         private Runnable job;
 
-        /** */
-        @LoadBalancerResource
-        private ComputeLoadBalancer lb;
-
         /**
-         * @param cacheName Cache name.
-         * @param affKey Affinity key.
+         * @param node Cluster node.
          * @param job Job.
          */
-        private T4(@Nullable String cacheName, Object affKey, Runnable job) {
+        private T4(ClusterNode node, Runnable job) {
             super(U.peerDeployAware0(job));
 
-            this.cacheName = cacheName;
-            this.affKey = affKey;
+            this.node = node;
             this.job = job;
         }
 
         /** {@inheritDoc} */
         @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, @Nullable Void arg) {
-            ComputeJob job = job(this.job, cacheName, affKey);
+            ComputeJob job = job(this.job);
 
-            return Collections.singletonMap(job, lb.getBalancedNode(job, 
null));
+            return Collections.singletonMap(job, node);
         }
     }
 
@@ -1272,36 +1238,27 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
         private static final long serialVersionUID = 0L;
 
         /** */
-        private final String cacheName;
-
-        /** */
-        private Object affKey;
+        private ClusterNode node;
 
         /** */
         private Callable<R> job;
 
-        /** */
-        @LoadBalancerResource
-        private ComputeLoadBalancer lb;
-
         /**
-         * @param cacheName Cache name.
-         * @param affKey Affinity key.
+         * @param node Cluster node.
          * @param job Job.
          */
-        private T5(@Nullable String cacheName, Object affKey, Callable<R> job) 
{
+        private T5(ClusterNode node, Callable<R> job) {
             super(U.peerDeployAware0(job));
 
-            this.cacheName = cacheName;
-            this.affKey = affKey;
+            this.node = node;
             this.job = job;
         }
 
         /** {@inheritDoc} */
         @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, @Nullable Void arg) {
-            ComputeJob job = job(this.job, cacheName, affKey);
+            ComputeJob job = job(this.job);
 
-            return Collections.singletonMap(job, lb.getBalancedNode(job, 
null));
+            return Collections.singletonMap(job, node);
         }
 
         /** {@inheritDoc} */
@@ -1792,112 +1749,6 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     *
-     */
-    private static class C3<R> implements ComputeJob, Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        @CacheName
-        private String cn;
-
-        /** */
-        @CacheAffinityKeyMapped
-        private Object ak;
-
-
-        /** */
-        protected Callable<R> c;
-
-        /**
-         *
-         */
-        public C3(){
-            // No-op.
-        }
-
-        /**
-         * @param c Callable.
-         * @param cacheName Cache name.
-         * @param affKey Affinity key.
-         */
-        public C3(Callable<R> c, @Nullable String cacheName, Object affKey) {
-            this.cn = cacheName;
-            this.ak = affKey;
-            this.c = c;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object execute() {
-            try {
-                return c.call();
-            }
-            catch (Exception e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void cancel() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            out.writeObject(cn);
-            out.writeObject(ak);
-            out.writeObject(c);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            cn = (String)in.readObject();
-            ak = in.readObject();
-            c = (Callable<R>)in.readObject();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(C3.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    private static class C3MLA<R> extends C3<R> implements 
ComputeJobMasterLeaveAware{
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         *
-         */
-        public C3MLA() {
-            super();
-        }
-
-        /**
-         * @param c Callable.
-         * @param cacheName Cache name.
-         * @param affKey Affinity key.
-         */
-        public C3MLA(Callable<R> c, @Nullable String cacheName, Object affKey) 
{
-            super(c, cacheName, affKey);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onMasterNodeLeft(ComputeTaskSession ses) {
-            ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(C3MLA.class, this, super.toString());
-        }
-    }
-
-    /**
      */
     private static class C4 implements ComputeJob, Externalizable {
         /** */
@@ -1979,105 +1830,4 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return S.toString(C4MLA.class, this, super.toString());
         }
     }
-
-    /**
-     */
-    private static class C5 implements ComputeJob, Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        @CacheName
-        private String cn;
-
-        /** */
-        @CacheAffinityKeyMapped
-        private Object ak;
-
-        /** */
-        protected Runnable r;
-
-        /**
-         *
-         */
-        public C5(){
-            // No-op.
-        }
-
-        /**
-         * @param r Runnable.
-         * @param cacheName Cache name.
-         * @param affKey Affinity key.
-         */
-        public C5(Runnable r, @Nullable String cacheName, Object affKey) {
-            this.cn = cacheName;
-            this.ak = affKey;
-            this.r = r;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Object execute() {
-            r.run();
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void cancel() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            out.writeObject(cn);
-            out.writeObject(ak);
-            out.writeObject(r);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            cn = (String)in.readObject();
-            ak = in.readObject();
-            r = (Runnable)in.readObject();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(C5.class, this, super.toString());
-        }
-    }
-
-    /**
-     *
-     */
-    private static class C5MLA extends C5 implements 
ComputeJobMasterLeaveAware{
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         *
-         */
-        public C5MLA() {
-            super();
-        }
-
-        /**
-         * @param r Runnable.
-         * @param cacheName Cache name.
-         * @param affKey Affinity key.
-         */
-        public C5MLA(Runnable r, @Nullable String cacheName, Object affKey) {
-            super(r, cacheName, affKey);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onMasterNodeLeft(ComputeTaskSession ses) {
-            ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(C5MLA.class, this, super.toString());
-        }
-    }
 }

Reply via email to