IGNITE-141 - Marshallers refactoring
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d4d96c45 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d4d96c45 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d4d96c45 Branch: refs/heads/ignite-329 Commit: d4d96c4544aad77adcdeb83c0d83534c458faab8 Parents: 41d9e17 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Wed Mar 4 18:46:27 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Wed Mar 4 18:46:27 2015 -0800 ---------------------------------------------------------------------- .../closure/GridClosureProcessor.java | 126 +++++++++---------- 1 file changed, 58 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d4d96c45/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index 657539c..287c2eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -32,7 +32,6 @@ import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.resources.*; -import org.jdk8.backport.*; import org.jetbrains.annotations.*; import java.io.*; @@ -58,8 +57,8 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** Lock to control execution after stop. */ private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); - /** Workers count. */ - private final LongAdder workersCnt = new LongAdder(); + /** Stopping flag. */ + private boolean stopping; /** * @param ctx Kernal context. @@ -81,42 +80,38 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings("BusyWait") @Override public void onKernalStop(boolean cancel) { - busyLock.writeLock(); + boolean interrupted = false; - boolean interrupted = Thread.interrupted(); - - while (workersCnt.sum() != 0) { + // Busy wait is intentional. + while (true) { try { - Thread.sleep(200); + if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS)) + break; + else + Thread.sleep(200); } - catch (InterruptedException ignored) { + catch (InterruptedException ignore) { + // Preserve interrupt status & ignore. + // Note that interrupted flag is cleared. interrupted = true; } } - if (interrupted) - Thread.currentThread().interrupt(); + try { + if (interrupted) + Thread.currentThread().interrupt(); + + stopping = true; + } + finally { + busyLock.writeUnlock(); + } if (log.isDebugEnabled()) log.debug("Stopped closure processor."); } /** - * @throws IllegalStateException If grid is stopped. - */ - private void enterBusy() throws IllegalStateException { - if (!busyLock.tryReadLock()) - throw new IllegalStateException("Closure processor cannot be used on stopped grid: " + ctx.gridName()); - } - - /** - * Unlocks busy lock. - */ - private void leaveBusy() { - busyLock.readUnlock(); - } - - /** * @param mode Distribution mode. * @param jobs Closures to execute. * @param nodes Grid nodes. @@ -142,9 +137,14 @@ public class GridClosureProcessor extends GridProcessorAdapter { assert mode != null; assert !F.isEmpty(jobs) : jobs; - enterBusy(); + busyLock.readLock(); try { + if (stopping) { + return ComputeTaskInternalFuture.finishedFuture(ctx, T1.class, + new IgniteCheckedException("Closure processor cannot be used on stopped grid: " + ctx.gridName())); + } + if (F.isEmpty(nodes)) return ComputeTaskInternalFuture.finishedFuture(ctx, T1.class, U.emptyTopologyException()); @@ -153,7 +153,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return ctx.task().execute(new T1(mode, jobs), null, sys); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -183,7 +183,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { assert mode != null; assert job != null; - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -194,7 +194,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return ctx.task().execute(new T2(mode, job), null, sys); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -319,7 +319,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { assert rdc != null; assert !F.isEmpty(jobs); - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -330,7 +330,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return ctx.task().execute(new T3<>(mode, jobs, rdc), null); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -364,7 +364,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { assert mode != null; assert !F.isEmpty(jobs); - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -375,7 +375,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return ctx.task().execute(new T6<>(mode, jobs), null, sys); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -401,7 +401,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { */ public <R> ComputeTaskInternalFuture<R> affinityCall(@Nullable String cacheName, Object affKey, Callable<R> job, @Nullable Collection<ClusterNode> nodes) { - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -420,7 +420,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, e); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -433,7 +433,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { */ public ComputeTaskInternalFuture<?> affinityRun(@Nullable String cacheName, Object affKey, Runnable job, @Nullable Collection<ClusterNode> nodes) { - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -452,7 +452,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, e); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -468,7 +468,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { @Nullable Collection<ClusterNode> nodes, boolean sys) { assert mode != null; - enterBusy(); + busyLock.readLock(); try { if (job == null) @@ -483,7 +483,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return ctx.task().execute(new T7<>(mode, job), null, sys); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -500,7 +500,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { boolean sys) { assert mode != null; - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(jobs)) @@ -515,7 +515,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return ctx.task().execute(new T6<>(mode, jobs), null, sys); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -535,7 +535,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { assert mode != null; assert job != null; - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -546,7 +546,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return ctx.task().execute(new T7<>(mode, job), null, sys); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -558,7 +558,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { */ public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R> job, @Nullable T arg, @Nullable Collection<ClusterNode> nodes) { - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -569,7 +569,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return ctx.task().execute(new T8<>(job, arg), null, false); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -581,7 +581,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { */ public <T, R> IgniteInternalFuture<Collection<R>> broadcast(IgniteClosure<T, R> job, @Nullable T arg, @Nullable Collection<ClusterNode> nodes) { - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -592,7 +592,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return ctx.task().execute(new T11<>(job, arg, nodes), null, false); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -604,7 +604,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { */ public <T, R> IgniteInternalFuture<Collection<R>> broadcastNoFailover(IgniteClosure<T, R> job, @Nullable T arg, @Nullable Collection<ClusterNode> nodes) { - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -616,7 +616,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return ctx.task().execute(new T11<>(job, arg, nodes), null, false); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -630,7 +630,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { @Nullable Collection<? extends T> args, @Nullable Collection<ClusterNode> nodes) { - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -641,7 +641,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return ctx.task().execute(new T9<>(job, args), null, false); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -654,7 +654,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { */ public <T, R1, R2> ComputeTaskInternalFuture<R2> callAsync(IgniteClosure<T, R1> job, Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes) { - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -665,7 +665,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return ctx.task().execute(new T10<>(job, args, rdc), null, false); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -733,7 +733,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { if (c == null) return new GridFinishedFuture(ctx); - enterBusy(); + busyLock.readLock(); try { // Inject only if needed. @@ -744,8 +744,6 @@ public class GridClosureProcessor extends GridProcessorAdapter { final GridWorkerFuture fut = new GridWorkerFuture(ctx); - workersCnt.increment(); - GridWorker w = new GridWorker(ctx.gridName(), "closure-proc-worker", log) { @Override protected void body() { try { @@ -762,9 +760,6 @@ public class GridClosureProcessor extends GridProcessorAdapter { fut.onDone(U.cast(e)); } - finally { - workersCnt.decrement(); - } } }; @@ -783,7 +778,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return fut; } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -870,7 +865,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { if (c == null) return new GridFinishedFuture<>(ctx); - enterBusy(); + busyLock.readLock(); try { // Inject only if needed. @@ -881,8 +876,6 @@ public class GridClosureProcessor extends GridProcessorAdapter { final GridWorkerFuture<R> fut = new GridWorkerFuture<>(ctx); - workersCnt.increment(); - GridWorker w = new GridWorker(ctx.gridName(), "closure-proc-worker", log) { @Override protected void body() { try { @@ -897,9 +890,6 @@ public class GridClosureProcessor extends GridProcessorAdapter { fut.onDone(U.cast(e)); } - finally { - workersCnt.decrement(); - } } }; @@ -918,7 +908,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return fut; } finally { - leaveBusy(); + busyLock.readUnlock(); } }