IGNITE-137 Fix race condition.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/03c1b2a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/03c1b2a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/03c1b2a2

Branch: refs/heads/ignite-32
Commit: 03c1b2a23c5d997f68fd06a8c210877ff5a9c8f0
Parents: 37b4721
Author: sevdokimov <sevdoki...@gridgain.com>
Authored: Tue Jan 27 17:19:29 2015 +0300
Committer: sevdokimov <sevdoki...@gridgain.com>
Committed: Tue Jan 27 17:19:57 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/IgniteComputeImpl.java      | 63 ++++++++------------
 .../ignite/internal/IgniteEventsImpl.java       |  2 +-
 .../ignite/internal/IgniteMessagingImpl.java    |  2 +-
 .../ignite/lang/IgniteAsyncSupportAdapter.java  |  9 ++-
 4 files changed, 35 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03c1b2a2/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
index afd2b54..79f7306 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
@@ -33,9 +33,9 @@ import static 
org.apache.ignite.internal.GridClosureCallMode.*;
 import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*;
 
 /**
- * {@link org.apache.ignite.IgniteCompute} implementation.
+ * {@link IgniteCompute} implementation.
  */
-public class IgniteComputeImpl implements IgniteCompute, Externalizable {
+public class IgniteComputeImpl extends 
IgniteAsyncSupportAdapter<IgniteCompute> implements IgniteCompute, 
Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -48,9 +48,6 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
     /** */
     private UUID subjId;
 
-    /** */
-    private IgniteAsyncSupportAdapter<IgniteAsyncSupport> asyncSup;
-
     /**
      * Required by {@link Externalizable}.
      */
@@ -65,32 +62,19 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
      * @param async Async support flag.
      */
     public IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj, 
UUID subjId, boolean async) {
+        super(async);
+
         this.ctx = ctx;
         this.prj = prj;
         this.subjId = subjId;
-
-        asyncSup = new IgniteAsyncSupportAdapter(async);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <R> ComputeTaskFuture<R> future() {
-        return (ComputeTaskFuture<R>)asyncSup.future();
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteCompute enableAsync() {
-        if (asyncSup.isAsync())
-            return this;
-
+    @Override protected IgniteCompute createAsyncInstance() {
         return new IgniteComputeImpl(ctx, prj, subjId, true);
     }
 
     /** {@inheritDoc} */
-    @Override public boolean isAsync() {
-        return asyncSup.isAsync();
-    }
-
-    /** {@inheritDoc} */
     @Override public ClusterGroup clusterGroup() {
         return prj;
     }
@@ -103,7 +87,7 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
         guard();
 
         try {
-            asyncSup.saveOrGet(ctx.closure().affinityRun(cacheName, affKey, 
job, prj.nodes()));
+            saveOrGet(ctx.closure().affinityRun(cacheName, affKey, job, 
prj.nodes()));
         }
         finally {
             unguard();
@@ -119,7 +103,7 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
         guard();
 
         try {
-            return asyncSup.saveOrGet(ctx.closure().affinityCall(cacheName, 
affKey, job, prj.nodes()));
+            return saveOrGet(ctx.closure().affinityCall(cacheName, affKey, 
job, prj.nodes()));
         }
         finally {
             unguard();
@@ -136,7 +120,7 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
             ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes());
             ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
 
-            return (R)asyncSup.saveOrGet(ctx.task().execute(taskName, arg));
+            return (R)saveOrGet(ctx.task().execute(taskName, arg));
         }
         finally {
             unguard();
@@ -154,7 +138,7 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
             ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes());
             ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
 
-            return asyncSup.saveOrGet(ctx.task().execute(taskCls, arg));
+            return saveOrGet(ctx.task().execute(taskCls, arg));
         }
         finally {
             unguard();
@@ -171,7 +155,7 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
             ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes());
             ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
 
-            return asyncSup.saveOrGet(ctx.task().execute(task, arg));
+            return saveOrGet(ctx.task().execute(task, arg));
         }
         finally {
             unguard();
@@ -185,7 +169,7 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
         guard();
 
         try {
-            asyncSup.saveOrGet(ctx.closure().runAsync(BROADCAST, job, 
prj.nodes()));
+            saveOrGet(ctx.closure().runAsync(BROADCAST, job, prj.nodes()));
         }
         finally {
             unguard();
@@ -199,7 +183,7 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
         guard();
 
         try {
-            return asyncSup.saveOrGet(ctx.closure().callAsync(BROADCAST, 
Arrays.asList(job), prj.nodes()));
+            return saveOrGet(ctx.closure().callAsync(BROADCAST, 
Arrays.asList(job), prj.nodes()));
         }
         finally {
             unguard();
@@ -213,7 +197,7 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
         guard();
 
         try {
-            return asyncSup.saveOrGet(ctx.closure().broadcast(job, arg, 
prj.nodes()));
+            return saveOrGet(ctx.closure().broadcast(job, arg, prj.nodes()));
         }
         finally {
             unguard();
@@ -227,7 +211,7 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
         guard();
 
         try {
-            asyncSup.saveOrGet(ctx.closure().runAsync(BALANCE, job, 
prj.nodes()));
+            saveOrGet(ctx.closure().runAsync(BALANCE, job, prj.nodes()));
         }
         finally {
             unguard();
@@ -241,7 +225,7 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
         guard();
 
         try {
-            asyncSup.saveOrGet(ctx.closure().runAsync(BALANCE, jobs, 
prj.nodes()));
+            saveOrGet(ctx.closure().runAsync(BALANCE, jobs, prj.nodes()));
         }
         finally {
             unguard();
@@ -255,7 +239,7 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
         guard();
 
         try {
-            return asyncSup.saveOrGet(ctx.closure().callAsync(job, arg, 
prj.nodes()));
+            return saveOrGet(ctx.closure().callAsync(job, arg, prj.nodes()));
         }
         finally {
             unguard();
@@ -269,7 +253,7 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
         guard();
 
         try {
-            return asyncSup.saveOrGet(ctx.closure().callAsync(BALANCE, job, 
prj.nodes()));
+            return saveOrGet(ctx.closure().callAsync(BALANCE, job, 
prj.nodes()));
         }
         finally {
             unguard();
@@ -283,7 +267,7 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
         guard();
 
         try {
-            return asyncSup.saveOrGet(ctx.closure().callAsync(BALANCE, jobs, 
prj.nodes()));
+            return saveOrGet(ctx.closure().callAsync(BALANCE, jobs, 
prj.nodes()));
         }
         finally {
             unguard();
@@ -299,7 +283,7 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
         guard();
 
         try {
-            return asyncSup.saveOrGet(ctx.closure().callAsync(job, args, 
prj.nodes()));
+            return saveOrGet(ctx.closure().callAsync(job, args, prj.nodes()));
         }
         finally {
             unguard();
@@ -315,7 +299,7 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
         guard();
 
         try {
-            return asyncSup.saveOrGet(ctx.closure().forkjoinAsync(BALANCE, 
jobs, rdc, prj.nodes()));
+            return saveOrGet(ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, 
prj.nodes()));
         }
         finally {
             unguard();
@@ -332,7 +316,7 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
         guard();
 
         try {
-            return asyncSup.saveOrGet(ctx.closure().callAsync(job, args, rdc, 
prj.nodes()));
+            return saveOrGet(ctx.closure().callAsync(job, args, rdc, 
prj.nodes()));
         }
         finally {
             unguard();
@@ -473,4 +457,9 @@ public class IgniteComputeImpl implements IgniteCompute, 
Externalizable {
     protected Object readResolve() throws ObjectStreamException {
         return prj.compute();
     }
+
+    /** {@inheritDoc} */
+    @Override public <R> ComputeTaskFuture<R> future() {
+        return (ComputeTaskFuture<R>)super.future();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03c1b2a2/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
index 579f1f9..a63dfef 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
@@ -29,7 +29,7 @@ import java.io.*;
 import java.util.*;
 
 /**
- * {@link org.apache.ignite.IgniteEvents} implementation.
+ * {@link IgniteEvents} implementation.
  */
 public class IgniteEventsImpl extends IgniteAsyncSupportAdapter<IgniteEvents> 
implements IgniteEvents, Externalizable {
     /** */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03c1b2a2/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
index a5d9106..84aa687 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
@@ -29,7 +29,7 @@ import java.io.*;
 import java.util.*;
 
 /**
- * {@link org.apache.ignite.IgniteMessaging} implementation.
+ * {@link IgniteMessaging} implementation.
  */
 public class IgniteMessagingImpl extends 
IgniteAsyncSupportAdapter<IgniteMessaging>
     implements IgniteMessaging, Externalizable {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03c1b2a2/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupportAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupportAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupportAdapter.java
index 40c0297..b08d646 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupportAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupportAdapter.java
@@ -27,7 +27,7 @@ public class IgniteAsyncSupportAdapter<T extends 
IgniteAsyncSupport> implements
     protected ThreadLocal<IgniteFuture<?>> curFut;
 
     /** */
-    protected volatile T asyncInstance;
+    private volatile T asyncInstance;
 
     /**
      * Default constructor.
@@ -54,7 +54,12 @@ public class IgniteAsyncSupportAdapter<T extends 
IgniteAsyncSupport> implements
         if (res == null) {
             res = createAsyncInstance();
 
-            asyncInstance = res;
+            synchronized (IgniteAsyncSupportAdapter.class) {
+                if (asyncInstance != null)
+                    return asyncInstance;
+
+                asyncInstance = res;
+            }
         }
 
         return res;

Reply via email to