# IGNITE-22 Refactoring: unmarshal job in job thread.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/effcf9e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/effcf9e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/effcf9e1 Branch: refs/heads/ignite-22 Commit: effcf9e1c18250e9851c838da348acad21d4bbbf Parents: 6ea50c7 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Tue Feb 17 14:28:15 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Tue Feb 17 14:28:15 2015 +0300 ---------------------------------------------------------------------- .../closure/GridClosureProcessor.java | 98 +++++++++++++++++++- 1 file changed, 97 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/effcf9e1/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index 891b71b..75830d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -1041,7 +1041,12 @@ public class GridClosureProcessor extends GridProcessorAdapter { if (hadLocNode) { Marshaller marsh = ctx.config().getMarshaller(); - job = marsh.unmarshal(marsh.marshal(job), null); + byte[] jobBytes = marsh.marshal(job); + + if (job instanceof ComputeJobMasterLeaveAware) + job = new MarshaledLocalJob(marsh, jobBytes); + else + job = new NoLAJob(new MarshaledLocalJob(marsh, jobBytes)); } else hadLocNode = true; @@ -1050,6 +1055,9 @@ public class GridClosureProcessor extends GridProcessorAdapter { map.put(job, node); } + /** + * + */ public Map<ComputeJob, ClusterNode> map() { return map; } @@ -1819,4 +1827,92 @@ public class GridClosureProcessor extends GridProcessorAdapter { return S.toString(C4MLA.class, this, super.toString()); } } + + /** + * + */ + private static class NoLAJob implements ComputeJob { + /** */ + private final ComputeJob delegate; + + /** + * @param delegate Delegate. + */ + private NoLAJob(ComputeJob delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + delegate.cancel(); + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() throws IgniteException { + return delegate.execute(); + } + } + /** + * + */ + private static class MarshaledLocalJob implements ComputeJob, ComputeJobMasterLeaveAware { + /** */ + private final Marshaller marshaller; + + /** */ + private final byte[] jobBytes; + + /** */ + private volatile ComputeJob job; + + /** + * @param marshaller Marshaller. + * @param jobBytes Job bytes. + */ + private MarshaledLocalJob(Marshaller marshaller, byte[] jobBytes) { + this.marshaller = marshaller; + this.jobBytes = jobBytes; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + job().cancel(); + } + + /** + * + */ + private ComputeJob job() throws IgniteException { + ComputeJob res = job; + + if (res == null) { + synchronized (this) { + res = job; + + if (res == null) { + try { + res = marshaller.unmarshal(this.jobBytes, null); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal job", e); + } + + job = res; + } + } + } + + return res; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() throws IgniteException { + return job().execute(); + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteException { + ((ComputeJobMasterLeaveAware)job()).onMasterNodeLeft(ses); + } + } }