Repository: incubator-ignite Updated Branches: refs/heads/sprint-1 269142161 -> 10b0a09db
Ignite-107 anonymous classes *ComputeJobAdapter became static classes Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/52b8967f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/52b8967f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/52b8967f Branch: refs/heads/sprint-1 Commit: 52b8967f2ec3a1914a6059dcf31f9e3e57bbfe79 Parents: 841ec8d Author: avinogradov <avinogra...@gridgain.com> Authored: Wed Jan 21 12:38:19 2015 +0300 Committer: avinogradov <avinogra...@gridgain.com> Committed: Wed Jan 21 12:38:19 2015 +0300 ---------------------------------------------------------------------- .../closure/GridClosureProcessor.java | 326 ++++++++++++++----- .../GridMasterLeaveAwareComputeJobAdapter.java | 5 +- 2 files changed, 244 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52b8967f/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java index 9817fd6..47f5f5a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java @@ -36,6 +36,9 @@ import org.gridgain.grid.util.worker.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; import java.util.*; import java.util.concurrent.*; @@ -971,17 +974,8 @@ public class GridClosureProcessor extends GridProcessorAdapter { private <T, R> ComputeJob job(final IgniteClosure<T, R> job, @Nullable final T arg) { A.notNull(job, "job"); - if (job instanceof ComputeJobMasterLeaveAware) { - return new GridMasterLeaveAwareComputeJobAdapter() { - @Nullable @Override public Object execute() { - return job.apply(arg); - } - - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - ((ComputeJobMasterLeaveAware)job).onMasterNodeLeft(ses); - } - }; - } + if (job instanceof ComputeJobMasterLeaveAware) + return new C1<>(job, arg); else { return new ComputeJobAdapter() { @Nullable @Override public Object execute() { @@ -1001,22 +995,8 @@ public class GridClosureProcessor extends GridProcessorAdapter { private ComputeJob job(final Callable<?> c) { A.notNull(c, "job"); - if (c instanceof ComputeJobMasterLeaveAware) { - return new GridMasterLeaveAwareComputeJobAdapter() { - @Override public Object execute() { - try { - return c.call(); - } - catch (Exception e) { - throw new IgniteException(e); - } - } - - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses); - } - }; - } + if (c instanceof ComputeJobMasterLeaveAware) + return new C2(c); else { return new ComputeJobAdapter() { @Override public Object execute() { @@ -1043,30 +1023,8 @@ public class GridClosureProcessor extends GridProcessorAdapter { private ComputeJob job(final Callable<?> c, @Nullable final String cacheName, final Object affKey) { A.notNull(c, "job"); - if (c instanceof ComputeJobMasterLeaveAware) { - return new GridMasterLeaveAwareComputeJobAdapter() { - /** */ - @GridCacheName - private final String cn = cacheName; - - /** */ - @GridCacheAffinityKeyMapped - private final Object ak = affKey; - - @Override public Object execute() { - try { - return c.call(); - } - catch (Exception e) { - throw new IgniteException(e); - } - } - - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses); - } - }; - } + if (c instanceof ComputeJobMasterLeaveAware) + return new C3(c,cacheName,affKey); else { return new ComputeJobAdapter() { /** */ @@ -1099,19 +1057,8 @@ public class GridClosureProcessor extends GridProcessorAdapter { private static ComputeJob job(final Runnable r) { A.notNull(r, "job"); - if (r instanceof ComputeJobMasterLeaveAware) { - return new GridMasterLeaveAwareComputeJobAdapter() { - @Nullable @Override public Object execute() { - r.run(); - - return null; - } - - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses); - } - }; - } + if (r instanceof ComputeJobMasterLeaveAware) + return new C4(r); else { return new ComputeJobAdapter() { @Nullable @Override public Object execute() { @@ -1135,27 +1082,8 @@ public class GridClosureProcessor extends GridProcessorAdapter { private ComputeJob job(final Runnable r, @Nullable final String cacheName, final Object affKey) { A.notNull(r, "job"); - if (r instanceof ComputeJobMasterLeaveAware) { - return new GridMasterLeaveAwareComputeJobAdapter() { - /** */ - @GridCacheName - private final String cn = cacheName; - - /** */ - @GridCacheAffinityKeyMapped - private final Object ak = affKey; - - @Nullable @Override public Object execute() { - r.run(); - - return null; - } - - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses); - } - }; - } + if (r instanceof ComputeJobMasterLeaveAware) + return new C5(r, cacheName, affKey); else { return new ComputeJobAdapter() { /** */ @@ -1741,4 +1669,232 @@ public class GridClosureProcessor extends GridProcessorAdapter { return F.jobResults(res); } } + + /** + */ + private static class C1<T, R> extends GridMasterLeaveAwareComputeJobAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private IgniteClosure<T, R> job; + + /** */ + private T arg; + + /** + */ + public C1(IgniteClosure<T, R> job, T arg) { + this.job = job; + this.arg = arg; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() { + return job.apply(arg); + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + ((ComputeJobMasterLeaveAware)job).onMasterNodeLeft(ses); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(job); + out.writeObject(arg); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + job = (IgniteClosure<T, R>) in.readObject(); + arg = (T) in.readObject(); + } + } + + /** + */ + private static class C2 extends GridMasterLeaveAwareComputeJobAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private Callable<?> c; + + /** + */ + public C2(Callable<?> c) { + this.c = c; + } + + /** {@inheritDoc} */ + @Override public Object execute() { + try { + return c.call(); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(c); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + c = (Callable<?>) in.readObject(); + } + } + + /** + */ + private static class C3 extends GridMasterLeaveAwareComputeJobAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @GridCacheName + private String cn; + + /** */ + @GridCacheAffinityKeyMapped + private Object ak; + + /** */ + private Callable<?> c; + + /** + */ + public C3(Callable<?> c, @Nullable String cacheName, Object affKey) { + this.cn = cacheName; + this.ak = affKey; + this.c = c; + } + + /** {@inheritDoc} */ + @Override public Object execute() { + try { + return c.call(); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(cn); + out.writeObject(ak); + out.writeObject(c); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cn = (String) in.readObject(); + ak = in.readObject(); + c = (Callable<?>) in.readObject(); + } + } + + /** + */ + private static class C4 extends GridMasterLeaveAwareComputeJobAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private Runnable r; + + /** + */ + public C4(Runnable r) { + this.r = r; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() { + r.run(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(r); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + r = (Runnable) in.readObject(); + } + } + + /** + */ + private static class C5 extends GridMasterLeaveAwareComputeJobAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @GridCacheName + private String cn; + + /** */ + @GridCacheAffinityKeyMapped + private Object ak; + + /** */ + private Runnable r; + + /** + */ + public C5(Runnable r, @Nullable String cacheName, Object affKey) { + this.cn = cacheName; + this.ak = affKey; + this.r = r; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() { + r.run(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(cn); + out.writeObject(ak); + out.writeObject(r); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cn = (String) in.readObject(); + ak = in.readObject(); + r = (Runnable) in.readObject(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52b8967f/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridMasterLeaveAwareComputeJobAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridMasterLeaveAwareComputeJobAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridMasterLeaveAwareComputeJobAdapter.java index d7cb935..005c3ad 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridMasterLeaveAwareComputeJobAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridMasterLeaveAwareComputeJobAdapter.java @@ -19,11 +19,12 @@ package org.gridgain.grid.kernal.processors.closure; import org.apache.ignite.compute.*; +import java.io.Externalizable; + /** * Job adapter implementing {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware}. */ -public abstract class GridMasterLeaveAwareComputeJobAdapter extends ComputeJobAdapter - implements ComputeJobMasterLeaveAware { +public abstract class GridMasterLeaveAwareComputeJobAdapter extends ComputeJobAdapter implements Externalizable, ComputeJobMasterLeaveAware { /** */ private static final long serialVersionUID = 0L;