# IGNITE-22 Refactoring: unmarshal job in job thread.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/effcf9e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/effcf9e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/effcf9e1

Branch: refs/heads/ignite-22
Commit: effcf9e1c18250e9851c838da348acad21d4bbbf
Parents: 6ea50c7
Author: sevdokimov <sevdoki...@gridgain.com>
Authored: Tue Feb 17 14:28:15 2015 +0300
Committer: sevdokimov <sevdoki...@gridgain.com>
Committed: Tue Feb 17 14:28:15 2015 +0300

----------------------------------------------------------------------
 .../closure/GridClosureProcessor.java           | 98 +++++++++++++++++++-
 1 file changed, 97 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/effcf9e1/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 891b71b..75830d3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -1041,7 +1041,12 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
                 if (hadLocNode) {
                     Marshaller marsh = ctx.config().getMarshaller();
 
-                    job = marsh.unmarshal(marsh.marshal(job), null);
+                    byte[] jobBytes = marsh.marshal(job);
+
+                    if (job instanceof ComputeJobMasterLeaveAware)
+                        job = new MarshaledLocalJob(marsh, jobBytes);
+                    else
+                        job = new NoLAJob(new MarshaledLocalJob(marsh, 
jobBytes));
                 }
                 else
                     hadLocNode = true;
@@ -1050,6 +1055,9 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             map.put(job, node);
         }
 
+        /**
+         *
+         */
         public Map<ComputeJob, ClusterNode> map() {
             return map;
         }
@@ -1819,4 +1827,92 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return S.toString(C4MLA.class, this, super.toString());
         }
     }
+
+    /**
+     *
+     */
+    private static class NoLAJob implements ComputeJob {
+        /** */
+        private final ComputeJob delegate;
+
+        /**
+         * @param delegate Delegate.
+         */
+        private NoLAJob(ComputeJob delegate) {
+            this.delegate = delegate;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            delegate.cancel();
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object execute() throws IgniteException {
+            return delegate.execute();
+        }
+    }
+    /**
+     *
+     */
+    private static class MarshaledLocalJob implements ComputeJob, 
ComputeJobMasterLeaveAware {
+        /** */
+        private final Marshaller marshaller;
+
+        /** */
+        private final byte[] jobBytes;
+
+        /** */
+        private volatile ComputeJob job;
+
+        /**
+         * @param marshaller Marshaller.
+         * @param jobBytes Job bytes.
+         */
+        private MarshaledLocalJob(Marshaller marshaller, byte[] jobBytes) {
+            this.marshaller = marshaller;
+            this.jobBytes = jobBytes;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            job().cancel();
+        }
+
+        /**
+         *
+         */
+        private ComputeJob job() throws IgniteException {
+            ComputeJob res = job;
+
+            if (res == null) {
+                synchronized (this) {
+                    res = job;
+
+                    if (res == null) {
+                        try {
+                            res = marshaller.unmarshal(this.jobBytes, null);
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException("Failed to unmarshal 
job", e);
+                        }
+
+                        job = res;
+                    }
+                }
+            }
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object execute() throws IgniteException {
+            return job().execute();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws 
IgniteException {
+            ((ComputeJobMasterLeaveAware)job()).onMasterNodeLeft(ses);
+        }
+    }
 }

Reply via email to