Ignite-107 code restores
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3e936528 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3e936528 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3e936528 Branch: refs/heads/sprint-1 Commit: 3e9365289e97dac312ec22d2fb00a63eb3c58fbd Parents: 79a4710 Author: avinogradov <avinogra...@gridgain.com> Authored: Fri Jan 23 18:09:54 2015 +0300 Committer: avinogradov <avinogra...@gridgain.com> Committed: Fri Jan 23 18:09:54 2015 +0300 ---------------------------------------------------------------------- .../closure/GridClosureProcessor.java | 570 ++++++++++++++----- .../processors/resource/GridResourceUtils.java | 4 +- 2 files changed, 421 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3e936528/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index a5324b8..2951104 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -25,6 +25,7 @@ import org.apache.ignite.compute.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.resources.*; @@ -36,6 +37,7 @@ import org.apache.ignite.internal.util.worker.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -971,24 +973,9 @@ public class GridClosureProcessor extends GridProcessorAdapter { private <T, R> ComputeJob job(final IgniteClosure<T, R> job, @Nullable final T arg) { A.notNull(job, "job"); - if (job instanceof ComputeJobMasterLeaveAware) { - return new GridMasterLeaveAwareComputeJobAdapter() { - @Nullable @Override public Object execute() { - return job.apply(arg); - } - - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - ((ComputeJobMasterLeaveAware)job).onMasterNodeLeft(ses); - } - }; - } - else { - return new ComputeJobAdapter() { - @Nullable @Override public Object execute() { - return job.apply(arg); - } - }; - } + if (job instanceof ComputeJobMasterLeaveAware) + return new C1MLA<>(job, arg); + else return new C1<>(job, arg); } /** @@ -998,37 +985,12 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @return Grid job made out of closure. */ @SuppressWarnings("IfMayBeConditional") - private ComputeJob job(final Callable<?> c) { + private <R> ComputeJob job(final Callable<R> c) { A.notNull(c, "job"); - if (c instanceof ComputeJobMasterLeaveAware) { - return new GridMasterLeaveAwareComputeJobAdapter() { - @Override public Object execute() { - try { - return c.call(); - } - catch (Exception e) { - throw new IgniteException(e); - } - } - - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses); - } - }; - } - else { - return new ComputeJobAdapter() { - @Override public Object execute() { - try { - return c.call(); - } - catch (Exception e) { - throw new IgniteException(e); - } - } - }; - } + if (c instanceof ComputeJobMasterLeaveAware) + return new C2MLA<>(c); + else return new C2<>(c); } /** @@ -1040,53 +1002,12 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @return Grid job made out of closure. */ @SuppressWarnings(value = {"IfMayBeConditional", "UnusedDeclaration"}) - private ComputeJob job(final Callable<?> c, @Nullable final String cacheName, final Object affKey) { + private <R> ComputeJob job(final Callable<R> c, @Nullable final String cacheName, final Object affKey) { A.notNull(c, "job"); - if (c instanceof ComputeJobMasterLeaveAware) { - return new GridMasterLeaveAwareComputeJobAdapter() { - /** */ - @CacheName - private final String cn = cacheName; - - /** */ - @CacheAffinityKeyMapped - private final Object ak = affKey; - - @Override public Object execute() { - try { - return c.call(); - } - catch (Exception e) { - throw new IgniteException(e); - } - } - - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses); - } - }; - } - else { - return new ComputeJobAdapter() { - /** */ - @CacheName - private final String cn = cacheName; - - /** */ - @CacheAffinityKeyMapped - private final Object ak = affKey; - - @Override public Object execute() { - try { - return c.call(); - } - catch (Exception e) { - throw new IgniteException(e); - } - } - }; - } + if (c instanceof ComputeJobMasterLeaveAware) + return new C3MLA<>(c, cacheName, affKey); + else return new C3<>(c, cacheName, affKey); } /** @@ -1099,28 +1020,9 @@ public class GridClosureProcessor extends GridProcessorAdapter { private static ComputeJob job(final Runnable r) { A.notNull(r, "job"); - if (r instanceof ComputeJobMasterLeaveAware) { - return new GridMasterLeaveAwareComputeJobAdapter() { - @Nullable @Override public Object execute() { - r.run(); - - return null; - } - - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses); - } - }; - } - else { - return new ComputeJobAdapter() { - @Nullable @Override public Object execute() { - r.run(); - - return null; - } - }; - } + if (r instanceof ComputeJobMasterLeaveAware) + return new C4MLA(r); + else return new C4(r); } /** @@ -1135,44 +1037,9 @@ public class GridClosureProcessor extends GridProcessorAdapter { private ComputeJob job(final Runnable r, @Nullable final String cacheName, final Object affKey) { A.notNull(r, "job"); - if (r instanceof ComputeJobMasterLeaveAware) { - return new GridMasterLeaveAwareComputeJobAdapter() { - /** */ - @CacheName - private final String cn = cacheName; - - /** */ - @CacheAffinityKeyMapped - private final Object ak = affKey; - - @Nullable @Override public Object execute() { - r.run(); - - return null; - } - - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses); - } - }; - } - else { - return new ComputeJobAdapter() { - /** */ - @CacheName - private final String cn = cacheName; - - /** */ - @CacheAffinityKeyMapped - private final Object ak = affKey; - - @Nullable @Override public Object execute() { - r.run(); - - return null; - } - }; - } + if (r instanceof ComputeJobMasterLeaveAware) + return new C5MLA(r, cacheName, affKey); + else return new C5(r, cacheName, affKey); } /** */ @@ -1741,4 +1608,403 @@ public class GridClosureProcessor extends GridProcessorAdapter { return F.jobResults(res); } } + + /** + * + */ + private static class C1<T, R> implements ComputeJob, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } + + /** */ + protected IgniteClosure<T, R> job; + + /** */ + @GridToStringInclude + private T arg; + + /** + * + */ + public C1(){ + // No-op. + } + + /** + * + */ + public C1(IgniteClosure<T, R> job, T arg) { + this.job = job; + this.arg = arg; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() { + return job.apply(arg); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(job); + out.writeObject(arg); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + job = (IgniteClosure<T, R>)in.readObject(); + arg = (T)in.readObject(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(C1.class, this); + } + } + + /** + * + */ + private static class C1MLA<T, R> extends C1<T, R> implements ComputeJobMasterLeaveAware{ + /** + * + */ + public C1MLA() { + super(); + } + + /** + * + */ + public C1MLA(IgniteClosure<T, R> job, T arg) { + super(job, arg); + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + ((ComputeJobMasterLeaveAware)job).onMasterNodeLeft(ses); + } + } + + /** + * + */ + private static class C2<R> implements ComputeJob, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } + + /** */ + protected Callable<R> c; + + /** + * + */ + public C2(){ + // No-op. + } + + /** + * + */ + public C2(Callable<R> c) { + this.c = c; + } + + /** {@inheritDoc} */ + @Override public Object execute() { + try { + return c.call(); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(c); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + c = (Callable<R>)in.readObject(); + } + } + + /** + * + */ + private static class C2MLA<R> extends C2<R> implements ComputeJobMasterLeaveAware{ + /** + * + */ + public C2MLA() { + super(); + } + + /** + * + */ + public C2MLA(Callable<R> c) { + super(c); + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses); + } + } + + /** + */ + private static class C3<R> implements ComputeJob, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } + + /** */ + @CacheName + private String cn; + + /** */ + @CacheAffinityKeyMapped + private Object ak; + + + /** */ + protected Callable<R> c; + + /** + * + */ + public C3(){ + // No-op. + } + + /** + * + */ + public C3(Callable<R> c, @Nullable String cacheName, Object affKey) { + this.cn = cacheName; + this.ak = affKey; + this.c = c; + } + + /** {@inheritDoc} */ + @Override public Object execute() { + try { + return c.call(); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(cn); + out.writeObject(ak); + out.writeObject(c); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cn = (String)in.readObject(); + ak = in.readObject(); + c = (Callable<R>)in.readObject(); + } + } + + /** + * + */ + private static class C3MLA<R> extends C3<R> implements ComputeJobMasterLeaveAware{ + /** + * + */ + public C3MLA() { + super(); + } + + /** + * + */ + public C3MLA(Callable<R> c, @Nullable String cacheName, Object affKey) { + super(c, cacheName, affKey); + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses); + } + } + + /** + */ + private static class C4 implements ComputeJob, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } + + /** */ + protected Runnable r; + + /** + * + */ + public C4(){ + // No-op. + } + + /** + * + */ + public C4(Runnable r) { + this.r = r; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() { + r.run(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(r); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + r = (Runnable)in.readObject(); + } + } + + /** + * + */ + private static class C4MLA extends C4 implements ComputeJobMasterLeaveAware{ + /** + * + */ + public C4MLA() { + super(); + } + + /** + * + */ + public C4MLA(Runnable r) { + super(r); + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses); + } + + } + + /** + */ + private static class C5 implements ComputeJob, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } + + /** */ + @CacheName + private String cn; + + /** */ + @CacheAffinityKeyMapped + private Object ak; + + /** */ + protected Runnable r; + + /** + * + */ + public C5(){ + // No-op. + } + + /** + * + */ + public C5(Runnable r, @Nullable String cacheName, Object affKey) { + this.cn = cacheName; + this.ak = affKey; + this.r = r; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() { + r.run(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(cn); + out.writeObject(ak); + out.writeObject(r); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cn = (String)in.readObject(); + ak = in.readObject(); + r = (Runnable)in.readObject(); + } + } + + /** + * + */ + private static class C5MLA extends C5 implements ComputeJobMasterLeaveAware{ + /** + * + */ + public C5MLA() { + super(); + } + + /** + * + */ + public C5MLA(Runnable r, @Nullable String cacheName, Object affKey) { + super(r, cacheName, affKey); + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses); + } + + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3e936528/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceUtils.java index 572fd91..c80b167 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceUtils.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.resource; import org.apache.ignite.*; +import org.apache.ignite.lang.*; import java.lang.reflect.*; import java.util.concurrent.*; @@ -99,6 +100,7 @@ final class GridResourceUtils { // Need to inspect anonymous classes, callable and runnable instances. return f.getName().startsWith("this$") || f.getName().startsWith("val$") || - Callable.class.isAssignableFrom(f.getType()) || Runnable.class.isAssignableFrom(f.getType()); + Callable.class.isAssignableFrom(f.getType()) || Runnable.class.isAssignableFrom(f.getType()) || + IgniteClosure.class.isAssignableFrom(f.getType()); } }