IGNITE-141 - Marshallers refactoring

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d4d96c45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d4d96c45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d4d96c45

Branch: refs/heads/ignite-141
Commit: d4d96c4544aad77adcdeb83c0d83534c458faab8
Parents: 41d9e17
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Wed Mar 4 18:46:27 2015 -0800
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Wed Mar 4 18:46:27 2015 -0800

----------------------------------------------------------------------
 .../closure/GridClosureProcessor.java           | 126 +++++++++----------
 1 file changed, 58 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d4d96c45/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 657539c..287c2eb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.*;
 import org.apache.ignite.resources.*;
-import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
@@ -58,8 +57,8 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
     /** Lock to control execution after stop. */
     private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
 
-    /** Workers count. */
-    private final LongAdder workersCnt = new LongAdder();
+    /** Stopping flag. */
+    private boolean stopping;
 
     /**
      * @param ctx Kernal context.
@@ -81,42 +80,38 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
     /** {@inheritDoc} */
     @SuppressWarnings("BusyWait")
     @Override public void onKernalStop(boolean cancel) {
-        busyLock.writeLock();
+        boolean interrupted = false;
 
-        boolean interrupted = Thread.interrupted();
-
-        while (workersCnt.sum() != 0) {
+        // Busy wait is intentional.
+        while (true) {
             try {
-                Thread.sleep(200);
+                if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS))
+                    break;
+                else
+                    Thread.sleep(200);
             }
-            catch (InterruptedException ignored) {
+            catch (InterruptedException ignore) {
+                // Preserve interrupt status & ignore.
+                // Note that interrupted flag is cleared.
                 interrupted = true;
             }
         }
 
-        if (interrupted)
-            Thread.currentThread().interrupt();
+        try {
+            if (interrupted)
+                Thread.currentThread().interrupt();
+
+            stopping = true;
+        }
+        finally {
+            busyLock.writeUnlock();
+        }
 
         if (log.isDebugEnabled())
             log.debug("Stopped closure processor.");
     }
 
     /**
-     * @throws IllegalStateException If grid is stopped.
-     */
-    private void enterBusy() throws IllegalStateException {
-        if (!busyLock.tryReadLock())
-            throw new IllegalStateException("Closure processor cannot be used 
on stopped grid: " + ctx.gridName());
-    }
-
-    /**
-     * Unlocks busy lock.
-     */
-    private void leaveBusy() {
-        busyLock.readUnlock();
-    }
-
-    /**
      * @param mode Distribution mode.
      * @param jobs Closures to execute.
      * @param nodes Grid nodes.
@@ -142,9 +137,14 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
         assert mode != null;
         assert !F.isEmpty(jobs) : jobs;
 
-        enterBusy();
+        busyLock.readLock();
 
         try {
+            if (stopping) {
+                return ComputeTaskInternalFuture.finishedFuture(ctx, T1.class,
+                    new IgniteCheckedException("Closure processor cannot be 
used on stopped grid: " + ctx.gridName()));
+            }
+
             if (F.isEmpty(nodes))
                 return ComputeTaskInternalFuture.finishedFuture(ctx, T1.class, 
U.emptyTopologyException());
 
@@ -153,7 +153,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return ctx.task().execute(new T1(mode, jobs), null, sys);
         }
         finally {
-            leaveBusy();
+            busyLock.readUnlock();
         }
     }
 
@@ -183,7 +183,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
         assert mode != null;
         assert job != null;
 
-        enterBusy();
+        busyLock.readLock();
 
         try {
             if (F.isEmpty(nodes))
@@ -194,7 +194,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return ctx.task().execute(new T2(mode, job), null, sys);
         }
         finally {
-            leaveBusy();
+            busyLock.readUnlock();
         }
     }
 
@@ -319,7 +319,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
         assert rdc != null;
         assert !F.isEmpty(jobs);
 
-        enterBusy();
+        busyLock.readLock();
 
         try {
             if (F.isEmpty(nodes))
@@ -330,7 +330,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return ctx.task().execute(new T3<>(mode, jobs, rdc), null);
         }
         finally {
-            leaveBusy();
+            busyLock.readUnlock();
         }
     }
 
@@ -364,7 +364,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
         assert mode != null;
         assert !F.isEmpty(jobs);
 
-        enterBusy();
+        busyLock.readLock();
 
         try {
             if (F.isEmpty(nodes))
@@ -375,7 +375,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return ctx.task().execute(new T6<>(mode, jobs), null, sys);
         }
         finally {
-            leaveBusy();
+            busyLock.readUnlock();
         }
     }
 
@@ -401,7 +401,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      */
     public <R> ComputeTaskInternalFuture<R> affinityCall(@Nullable String 
cacheName, Object affKey, Callable<R> job,
         @Nullable Collection<ClusterNode> nodes) {
-        enterBusy();
+        busyLock.readLock();
 
         try {
             if (F.isEmpty(nodes))
@@ -420,7 +420,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, e);
         }
         finally {
-            leaveBusy();
+            busyLock.readUnlock();
         }
     }
 
@@ -433,7 +433,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      */
     public ComputeTaskInternalFuture<?> affinityRun(@Nullable String 
cacheName, Object affKey, Runnable job,
         @Nullable Collection<ClusterNode> nodes) {
-        enterBusy();
+        busyLock.readLock();
 
         try {
             if (F.isEmpty(nodes))
@@ -452,7 +452,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, e);
         }
         finally {
-            leaveBusy();
+            busyLock.readUnlock();
         }
     }
 
@@ -468,7 +468,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
         @Nullable Collection<ClusterNode> nodes, boolean sys) {
         assert mode != null;
 
-        enterBusy();
+        busyLock.readLock();
 
         try {
             if (job == null)
@@ -483,7 +483,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return ctx.task().execute(new T7<>(mode, job), null, sys);
         }
         finally {
-            leaveBusy();
+            busyLock.readUnlock();
         }
     }
 
@@ -500,7 +500,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
         boolean sys) {
         assert mode != null;
 
-        enterBusy();
+        busyLock.readLock();
 
         try {
             if (F.isEmpty(jobs))
@@ -515,7 +515,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return ctx.task().execute(new T6<>(mode, jobs), null, sys);
         }
         finally {
-            leaveBusy();
+            busyLock.readUnlock();
         }
     }
 
@@ -535,7 +535,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
         assert mode != null;
         assert job != null;
 
-        enterBusy();
+        busyLock.readLock();
 
         try {
             if (F.isEmpty(nodes))
@@ -546,7 +546,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return ctx.task().execute(new T7<>(mode, job), null, sys);
         }
         finally {
-            leaveBusy();
+            busyLock.readUnlock();
         }
     }
 
@@ -558,7 +558,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      */
     public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R> 
job, @Nullable T arg,
         @Nullable Collection<ClusterNode> nodes) {
-        enterBusy();
+        busyLock.readLock();
 
         try {
             if (F.isEmpty(nodes))
@@ -569,7 +569,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return ctx.task().execute(new T8<>(job, arg), null, false);
         }
         finally {
-            leaveBusy();
+            busyLock.readUnlock();
         }
     }
 
@@ -581,7 +581,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      */
     public <T, R> IgniteInternalFuture<Collection<R>> 
broadcast(IgniteClosure<T, R> job, @Nullable T arg,
         @Nullable Collection<ClusterNode> nodes) {
-        enterBusy();
+        busyLock.readLock();
 
         try {
             if (F.isEmpty(nodes))
@@ -592,7 +592,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return ctx.task().execute(new T11<>(job, arg, nodes), null, false);
         }
         finally {
-            leaveBusy();
+            busyLock.readUnlock();
         }
     }
 
@@ -604,7 +604,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      */
     public <T, R> IgniteInternalFuture<Collection<R>> 
broadcastNoFailover(IgniteClosure<T, R> job, @Nullable T arg,
         @Nullable Collection<ClusterNode> nodes) {
-        enterBusy();
+        busyLock.readLock();
 
         try {
             if (F.isEmpty(nodes))
@@ -616,7 +616,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return ctx.task().execute(new T11<>(job, arg, nodes), null, false);
         }
         finally {
-            leaveBusy();
+            busyLock.readUnlock();
         }
     }
 
@@ -630,7 +630,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
         @Nullable Collection<? extends T> args,
         @Nullable Collection<ClusterNode> nodes)
     {
-        enterBusy();
+        busyLock.readLock();
 
         try {
             if (F.isEmpty(nodes))
@@ -641,7 +641,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return ctx.task().execute(new T9<>(job, args), null, false);
         }
         finally {
-            leaveBusy();
+            busyLock.readUnlock();
         }
     }
 
@@ -654,7 +654,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      */
     public <T, R1, R2> ComputeTaskInternalFuture<R2> 
callAsync(IgniteClosure<T, R1> job,
         Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable 
Collection<ClusterNode> nodes) {
-        enterBusy();
+        busyLock.readLock();
 
         try {
             if (F.isEmpty(nodes))
@@ -665,7 +665,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return ctx.task().execute(new T10<>(job, args, rdc), null, false);
         }
         finally {
-            leaveBusy();
+            busyLock.readUnlock();
         }
     }
 
@@ -733,7 +733,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
         if (c == null)
             return new GridFinishedFuture(ctx);
 
-        enterBusy();
+        busyLock.readLock();
 
         try {
             // Inject only if needed.
@@ -744,8 +744,6 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
 
             final GridWorkerFuture fut = new GridWorkerFuture(ctx);
 
-            workersCnt.increment();
-
             GridWorker w = new GridWorker(ctx.gridName(), 
"closure-proc-worker", log) {
                 @Override protected void body() {
                     try {
@@ -762,9 +760,6 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
 
                         fut.onDone(U.cast(e));
                     }
-                    finally {
-                        workersCnt.decrement();
-                    }
                 }
             };
 
@@ -783,7 +778,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return fut;
         }
         finally {
-            leaveBusy();
+            busyLock.readUnlock();
         }
     }
 
@@ -870,7 +865,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
         if (c == null)
             return new GridFinishedFuture<>(ctx);
 
-        enterBusy();
+        busyLock.readLock();
 
         try {
             // Inject only if needed.
@@ -881,8 +876,6 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
 
             final GridWorkerFuture<R> fut = new GridWorkerFuture<>(ctx);
 
-            workersCnt.increment();
-
             GridWorker w = new GridWorker(ctx.gridName(), 
"closure-proc-worker", log) {
                 @Override protected void body() {
                     try {
@@ -897,9 +890,6 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
 
                         fut.onDone(U.cast(e));
                     }
-                    finally {
-                        workersCnt.decrement();
-                    }
                 }
             };
 
@@ -918,7 +908,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return fut;
         }
         finally {
-            leaveBusy();
+            busyLock.readUnlock();
         }
     }
 

Reply via email to