IGNITE-137 Fix race condition.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/03c1b2a2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/03c1b2a2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/03c1b2a2 Branch: refs/heads/ignite-125 Commit: 03c1b2a23c5d997f68fd06a8c210877ff5a9c8f0 Parents: 37b4721 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Tue Jan 27 17:19:29 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Tue Jan 27 17:19:57 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/IgniteComputeImpl.java | 63 ++++++++------------ .../ignite/internal/IgniteEventsImpl.java | 2 +- .../ignite/internal/IgniteMessagingImpl.java | 2 +- .../ignite/lang/IgniteAsyncSupportAdapter.java | 9 ++- 4 files changed, 35 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03c1b2a2/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java index afd2b54..79f7306 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java @@ -33,9 +33,9 @@ import static org.apache.ignite.internal.GridClosureCallMode.*; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*; /** - * {@link org.apache.ignite.IgniteCompute} implementation. + * {@link IgniteCompute} implementation. */ -public class IgniteComputeImpl implements IgniteCompute, Externalizable { +public class IgniteComputeImpl extends IgniteAsyncSupportAdapter<IgniteCompute> implements IgniteCompute, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -48,9 +48,6 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { /** */ private UUID subjId; - /** */ - private IgniteAsyncSupportAdapter<IgniteAsyncSupport> asyncSup; - /** * Required by {@link Externalizable}. */ @@ -65,32 +62,19 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { * @param async Async support flag. */ public IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj, UUID subjId, boolean async) { + super(async); + this.ctx = ctx; this.prj = prj; this.subjId = subjId; - - asyncSup = new IgniteAsyncSupportAdapter(async); - } - - /** {@inheritDoc} */ - @Override public <R> ComputeTaskFuture<R> future() { - return (ComputeTaskFuture<R>)asyncSup.future(); } /** {@inheritDoc} */ - @Override public IgniteCompute enableAsync() { - if (asyncSup.isAsync()) - return this; - + @Override protected IgniteCompute createAsyncInstance() { return new IgniteComputeImpl(ctx, prj, subjId, true); } /** {@inheritDoc} */ - @Override public boolean isAsync() { - return asyncSup.isAsync(); - } - - /** {@inheritDoc} */ @Override public ClusterGroup clusterGroup() { return prj; } @@ -103,7 +87,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { guard(); try { - asyncSup.saveOrGet(ctx.closure().affinityRun(cacheName, affKey, job, prj.nodes())); + saveOrGet(ctx.closure().affinityRun(cacheName, affKey, job, prj.nodes())); } finally { unguard(); @@ -119,7 +103,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { guard(); try { - return asyncSup.saveOrGet(ctx.closure().affinityCall(cacheName, affKey, job, prj.nodes())); + return saveOrGet(ctx.closure().affinityCall(cacheName, affKey, job, prj.nodes())); } finally { unguard(); @@ -136,7 +120,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes()); ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId); - return (R)asyncSup.saveOrGet(ctx.task().execute(taskName, arg)); + return (R)saveOrGet(ctx.task().execute(taskName, arg)); } finally { unguard(); @@ -154,7 +138,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes()); ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId); - return asyncSup.saveOrGet(ctx.task().execute(taskCls, arg)); + return saveOrGet(ctx.task().execute(taskCls, arg)); } finally { unguard(); @@ -171,7 +155,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes()); ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId); - return asyncSup.saveOrGet(ctx.task().execute(task, arg)); + return saveOrGet(ctx.task().execute(task, arg)); } finally { unguard(); @@ -185,7 +169,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { guard(); try { - asyncSup.saveOrGet(ctx.closure().runAsync(BROADCAST, job, prj.nodes())); + saveOrGet(ctx.closure().runAsync(BROADCAST, job, prj.nodes())); } finally { unguard(); @@ -199,7 +183,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { guard(); try { - return asyncSup.saveOrGet(ctx.closure().callAsync(BROADCAST, Arrays.asList(job), prj.nodes())); + return saveOrGet(ctx.closure().callAsync(BROADCAST, Arrays.asList(job), prj.nodes())); } finally { unguard(); @@ -213,7 +197,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { guard(); try { - return asyncSup.saveOrGet(ctx.closure().broadcast(job, arg, prj.nodes())); + return saveOrGet(ctx.closure().broadcast(job, arg, prj.nodes())); } finally { unguard(); @@ -227,7 +211,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { guard(); try { - asyncSup.saveOrGet(ctx.closure().runAsync(BALANCE, job, prj.nodes())); + saveOrGet(ctx.closure().runAsync(BALANCE, job, prj.nodes())); } finally { unguard(); @@ -241,7 +225,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { guard(); try { - asyncSup.saveOrGet(ctx.closure().runAsync(BALANCE, jobs, prj.nodes())); + saveOrGet(ctx.closure().runAsync(BALANCE, jobs, prj.nodes())); } finally { unguard(); @@ -255,7 +239,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { guard(); try { - return asyncSup.saveOrGet(ctx.closure().callAsync(job, arg, prj.nodes())); + return saveOrGet(ctx.closure().callAsync(job, arg, prj.nodes())); } finally { unguard(); @@ -269,7 +253,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { guard(); try { - return asyncSup.saveOrGet(ctx.closure().callAsync(BALANCE, job, prj.nodes())); + return saveOrGet(ctx.closure().callAsync(BALANCE, job, prj.nodes())); } finally { unguard(); @@ -283,7 +267,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { guard(); try { - return asyncSup.saveOrGet(ctx.closure().callAsync(BALANCE, jobs, prj.nodes())); + return saveOrGet(ctx.closure().callAsync(BALANCE, jobs, prj.nodes())); } finally { unguard(); @@ -299,7 +283,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { guard(); try { - return asyncSup.saveOrGet(ctx.closure().callAsync(job, args, prj.nodes())); + return saveOrGet(ctx.closure().callAsync(job, args, prj.nodes())); } finally { unguard(); @@ -315,7 +299,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { guard(); try { - return asyncSup.saveOrGet(ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, prj.nodes())); + return saveOrGet(ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, prj.nodes())); } finally { unguard(); @@ -332,7 +316,7 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { guard(); try { - return asyncSup.saveOrGet(ctx.closure().callAsync(job, args, rdc, prj.nodes())); + return saveOrGet(ctx.closure().callAsync(job, args, rdc, prj.nodes())); } finally { unguard(); @@ -473,4 +457,9 @@ public class IgniteComputeImpl implements IgniteCompute, Externalizable { protected Object readResolve() throws ObjectStreamException { return prj.compute(); } + + /** {@inheritDoc} */ + @Override public <R> ComputeTaskFuture<R> future() { + return (ComputeTaskFuture<R>)super.future(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03c1b2a2/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java index 579f1f9..a63dfef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java @@ -29,7 +29,7 @@ import java.io.*; import java.util.*; /** - * {@link org.apache.ignite.IgniteEvents} implementation. + * {@link IgniteEvents} implementation. */ public class IgniteEventsImpl extends IgniteAsyncSupportAdapter<IgniteEvents> implements IgniteEvents, Externalizable { /** */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03c1b2a2/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java index a5d9106..84aa687 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java @@ -29,7 +29,7 @@ import java.io.*; import java.util.*; /** - * {@link org.apache.ignite.IgniteMessaging} implementation. + * {@link IgniteMessaging} implementation. */ public class IgniteMessagingImpl extends IgniteAsyncSupportAdapter<IgniteMessaging> implements IgniteMessaging, Externalizable { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03c1b2a2/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupportAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupportAdapter.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupportAdapter.java index 40c0297..b08d646 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupportAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupportAdapter.java @@ -27,7 +27,7 @@ public class IgniteAsyncSupportAdapter<T extends IgniteAsyncSupport> implements protected ThreadLocal<IgniteFuture<?>> curFut; /** */ - protected volatile T asyncInstance; + private volatile T asyncInstance; /** * Default constructor. @@ -54,7 +54,12 @@ public class IgniteAsyncSupportAdapter<T extends IgniteAsyncSupport> implements if (res == null) { res = createAsyncInstance(); - asyncInstance = res; + synchronized (IgniteAsyncSupportAdapter.class) { + if (asyncInstance != null) + return asyncInstance; + + asyncInstance = res; + } } return res;