# IGNITE-22 Reuse serialized closure for each argument.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5e33254c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5e33254c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5e33254c Branch: refs/heads/ignite-22 Commit: 5e33254c4daf17705dc2bbc8de9d8c26ff1090a2 Parents: effcf9e Author: sevdokimov <sevdoki...@gridgain.com> Authored: Tue Feb 17 17:12:53 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Tue Feb 17 17:12:53 2015 +0300 ---------------------------------------------------------------------- .../closure/GridClosureProcessor.java | 147 +++++++++++++++---- 1 file changed, 117 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e33254c/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index 75830d3..d45149a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -1021,6 +1021,12 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** */ private boolean hadLocNode; + /** */ + private IgniteClosure<?, ?> c; + + /** */ + private byte[] cBytes; + /** * @param expJobCnt Expected Jobs count. */ @@ -1041,12 +1047,27 @@ public class GridClosureProcessor extends GridProcessorAdapter { if (hadLocNode) { Marshaller marsh = ctx.config().getMarshaller(); - byte[] jobBytes = marsh.marshal(job); + ComputeJob smartJob; + + if (job instanceof C1) { + if (cBytes == null) { + c = ((C1)job).job; + + cBytes = marsh.marshal(c); + } - if (job instanceof ComputeJobMasterLeaveAware) - job = new MarshaledLocalJob(marsh, jobBytes); + if (c == ((C1)job).job) + smartJob = new MarshaledLocalClosure<>(marsh, cBytes, ((C1)job).arg); + else + smartJob = new MarshaledLocalJob(marsh, marsh.marshal(job)); + } else - job = new NoLAJob(new MarshaledLocalJob(marsh, jobBytes)); + smartJob = new MarshaledLocalJob(marsh, marsh.marshal(job)); + + if (!(job instanceof ComputeJobMasterLeaveAware)) + smartJob = new NoLAJob(smartJob); + + job = smartJob; } else hadLocNode = true; @@ -1851,68 +1872,134 @@ public class GridClosureProcessor extends GridProcessorAdapter { @Nullable @Override public Object execute() throws IgniteException { return delegate.execute(); } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(NoLAJob.class, this); + } } + /** * */ - private static class MarshaledLocalJob implements ComputeJob, ComputeJobMasterLeaveAware { - /** */ - private final Marshaller marshaller; + private static class MarshaledLocalJob extends MarshaledLazyValue<ComputeJob> + implements ComputeJob, ComputeJobMasterLeaveAware { + /** + * @param marshaller Marshaller. + * @param jobBytes Job bytes. + */ + private MarshaledLocalJob(Marshaller marshaller, byte[] jobBytes) { + super(marshaller, jobBytes); + } - /** */ - private final byte[] jobBytes; + /** {@inheritDoc} */ + @Override public void cancel() { + value().cancel(); + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() throws IgniteException { + return value().execute(); + } + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteException { + ((ComputeJobMasterLeaveAware)value()).onMasterNodeLeft(ses); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MarshaledLocalJob.class, this); + } + } + + /** + * + */ + private static class MarshaledLocalClosure<T, R> extends MarshaledLazyValue<IgniteClosure<T, R>> + implements ComputeJob, ComputeJobMasterLeaveAware { /** */ - private volatile ComputeJob job; + private final T arg; /** * @param marshaller Marshaller. - * @param jobBytes Job bytes. + * @param cBytes Closure bytes. + * @param arg Argument. */ - private MarshaledLocalJob(Marshaller marshaller, byte[] jobBytes) { - this.marshaller = marshaller; - this.jobBytes = jobBytes; + private MarshaledLocalClosure(Marshaller marshaller, byte[] cBytes, @Nullable T arg) { + super(marshaller, cBytes); + + this.arg = arg; } /** {@inheritDoc} */ @Override public void cancel() { - job().cancel(); + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() { + return value().apply(arg); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MarshaledLocalClosure.class, this); + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteException { + ((ComputeJobMasterLeaveAware)value()).onMasterNodeLeft(ses); + } + } + + /** + * + * @param <T> + */ + private abstract static class MarshaledLazyValue<T> { + /** */ + private final Marshaller marshaller; + + /** */ + private final byte[] bytes; + + /** */ + private T val; + + /** + * @param marshaller Marshaller. + * @param bytes Bytes. + */ + MarshaledLazyValue(Marshaller marshaller, byte[] bytes) { + this.marshaller = marshaller; + this.bytes = bytes; } /** * */ - private ComputeJob job() throws IgniteException { - ComputeJob res = job; + protected T value() { + T res = val; if (res == null) { synchronized (this) { - res = job; + res = val; if (res == null) { try { - res = marshaller.unmarshal(this.jobBytes, null); + res = marshaller.unmarshal(bytes, null); } catch (IgniteCheckedException e) { throw new IgniteException("Failed to unmarshal job", e); } - job = res; + val = res; } } } return res; } - - /** {@inheritDoc} */ - @Nullable @Override public Object execute() throws IgniteException { - return job().execute(); - } - - /** {@inheritDoc} */ - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteException { - ((ComputeJobMasterLeaveAware)job()).onMasterNodeLeft(ses); - } } }