# IGNITE-22 Reuse serialized closure for each argument.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5e33254c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5e33254c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5e33254c

Branch: refs/heads/ignite-22
Commit: 5e33254c4daf17705dc2bbc8de9d8c26ff1090a2
Parents: effcf9e
Author: sevdokimov <sevdoki...@gridgain.com>
Authored: Tue Feb 17 17:12:53 2015 +0300
Committer: sevdokimov <sevdoki...@gridgain.com>
Committed: Tue Feb 17 17:12:53 2015 +0300

----------------------------------------------------------------------
 .../closure/GridClosureProcessor.java           | 147 +++++++++++++++----
 1 file changed, 117 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e33254c/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 75830d3..d45149a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -1021,6 +1021,12 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
         /** */
         private boolean hadLocNode;
 
+        /** */
+        private IgniteClosure<?, ?> c;
+        
+        /** */
+        private byte[] cBytes;
+        
         /**
          * @param expJobCnt Expected Jobs count.
          */
@@ -1041,12 +1047,27 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
                 if (hadLocNode) {
                     Marshaller marsh = ctx.config().getMarshaller();
 
-                    byte[] jobBytes = marsh.marshal(job);
+                    ComputeJob smartJob;
+                    
+                    if (job instanceof C1) {
+                        if (cBytes == null) {
+                            c = ((C1)job).job;
+
+                            cBytes = marsh.marshal(c);
+                        }
 
-                    if (job instanceof ComputeJobMasterLeaveAware)
-                        job = new MarshaledLocalJob(marsh, jobBytes);
+                        if (c == ((C1)job).job)
+                            smartJob = new MarshaledLocalClosure<>(marsh, 
cBytes, ((C1)job).arg);
+                        else
+                            smartJob = new MarshaledLocalJob(marsh, 
marsh.marshal(job));
+                    }
                     else
-                        job = new NoLAJob(new MarshaledLocalJob(marsh, 
jobBytes));
+                        smartJob = new MarshaledLocalJob(marsh, 
marsh.marshal(job));
+                    
+                    if (!(job instanceof ComputeJobMasterLeaveAware))
+                        smartJob = new NoLAJob(smartJob);
+
+                    job = smartJob;
                 }
                 else
                     hadLocNode = true;
@@ -1851,68 +1872,134 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
         @Nullable @Override public Object execute() throws IgniteException {
             return delegate.execute();
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(NoLAJob.class, this);
+        }
     }
+
     /**
      *
      */
-    private static class MarshaledLocalJob implements ComputeJob, 
ComputeJobMasterLeaveAware {
-        /** */
-        private final Marshaller marshaller;
+    private static class MarshaledLocalJob extends 
MarshaledLazyValue<ComputeJob>
+        implements ComputeJob, ComputeJobMasterLeaveAware {
+        /**
+         * @param marshaller Marshaller.
+         * @param jobBytes Job bytes.
+         */
+        private MarshaledLocalJob(Marshaller marshaller, byte[] jobBytes) {
+            super(marshaller, jobBytes);
+        }
 
-        /** */
-        private final byte[] jobBytes;
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            value().cancel();
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object execute() throws IgniteException {
+            return value().execute();
+        }
 
+        /** {@inheritDoc} */
+        @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws 
IgniteException {
+            ((ComputeJobMasterLeaveAware)value()).onMasterNodeLeft(ses);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MarshaledLocalJob.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class MarshaledLocalClosure<T, R> extends 
MarshaledLazyValue<IgniteClosure<T, R>>
+        implements ComputeJob, ComputeJobMasterLeaveAware {
         /** */
-        private volatile ComputeJob job;
+        private final T arg;
 
         /**
          * @param marshaller Marshaller.
-         * @param jobBytes Job bytes.
+         * @param cBytes Closure bytes.
+         * @param arg Argument.
          */
-        private MarshaledLocalJob(Marshaller marshaller, byte[] jobBytes) {
-            this.marshaller = marshaller;
-            this.jobBytes = jobBytes;
+        private MarshaledLocalClosure(Marshaller marshaller, byte[] cBytes, 
@Nullable T arg) {
+            super(marshaller, cBytes);
+
+            this.arg = arg;
         }
 
         /** {@inheritDoc} */
         @Override public void cancel() {
-            job().cancel();
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object execute() {
+            return value().apply(arg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MarshaledLocalClosure.class, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws 
IgniteException {
+            ((ComputeJobMasterLeaveAware)value()).onMasterNodeLeft(ses);
+        }
+    }
+
+    /**
+     *
+     * @param <T>
+     */
+    private abstract static class MarshaledLazyValue<T> {
+        /** */
+        private final Marshaller marshaller;
+
+        /** */
+        private final byte[] bytes;
+
+        /** */
+        private T val;
+
+        /**
+         * @param marshaller Marshaller.
+         * @param bytes Bytes.
+         */
+        MarshaledLazyValue(Marshaller marshaller, byte[] bytes) {
+            this.marshaller = marshaller;
+            this.bytes = bytes;
         }
 
         /**
          *
          */
-        private ComputeJob job() throws IgniteException {
-            ComputeJob res = job;
+        protected T value() {
+            T res = val;
 
             if (res == null) {
                 synchronized (this) {
-                    res = job;
+                    res = val;
 
                     if (res == null) {
                         try {
-                            res = marshaller.unmarshal(this.jobBytes, null);
+                            res = marshaller.unmarshal(bytes, null);
                         }
                         catch (IgniteCheckedException e) {
                             throw new IgniteException("Failed to unmarshal 
job", e);
                         }
 
-                        job = res;
+                        val = res;
                     }
                 }
             }
 
             return res;
         }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Object execute() throws IgniteException {
-            return job().execute();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws 
IgniteException {
-            ((ComputeJobMasterLeaveAware)job()).onMasterNodeLeft(ses);
-        }
     }
 }

Reply via email to