# IGNITE-625: WIP.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/542a92d1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/542a92d1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/542a92d1 Branch: refs/heads/ignite-625-1 Commit: 542a92d122a998c8cb6aa0be982a99ec6b6c23be Parents: 0defad0 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Wed Apr 8 10:19:21 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Wed Apr 8 10:19:21 2015 +0300 ---------------------------------------------------------------------- .../processors/igfs/IgfsFileWorkerBatch.java | 12 ++++++++++-- .../ignite/internal/processors/igfs/IgfsImpl.java | 16 +++------------- .../processors/igfs/IgfsAbstractSelfTest.java | 14 ++++++-------- 3 files changed, 19 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/542a92d1/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java index 352bf06..4d91432 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java @@ -92,6 +92,13 @@ public abstract class IgfsFileWorkerBatch implements Runnable { } /** + * Cancel batch processing. + */ + synchronized void cancel() { + queue.addFirst(CANCEL_MARKER); + } + + /** * @return {@code True} if finish was called on this batch. */ boolean finishing() { @@ -109,12 +116,13 @@ public abstract class IgfsFileWorkerBatch implements Runnable { if (data == null) continue; - - if (data == STOP_MARKER) { + else if (data == STOP_MARKER) { assert queue.isEmpty(); fut.onDone(); } + else if (data == CANCEL_MARKER) + throw new IgniteCheckedException("Write to file was cancelled due to node stop."); try { out.write(data); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/542a92d1/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 3dac09d..6eefde5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -243,17 +243,7 @@ public final class IgfsImpl implements IgfsEx { if (secondaryFs != null) { // Force all workers to finish their batches. for (IgfsFileWorkerBatch batch : workerMap.values()) - batch.finish(); - - // Wait for all writers to finish their execution. - for (IgfsFileWorker w : workerMap.values()) { - try { - w.join(); - } - catch (InterruptedException e) { - U.error(log, e.getMessage(), e); - } - } + batch.cancel(); if (secondaryFs instanceof AutoCloseable) U.closeQuiet((AutoCloseable)secondaryFs); @@ -333,7 +323,7 @@ public final class IgfsImpl implements IgfsEx { void await(IgfsPath... paths) { assert paths != null; - for (Map.Entry<IgfsPath, IgfsFileWorker> workerEntry : workerMap.entrySet()) { + for (Map.Entry<IgfsPath, IgfsFileWorkerBatch> workerEntry : workerMap.entrySet()) { IgfsPath workerPath = workerEntry.getKey(); boolean await = false; @@ -347,7 +337,7 @@ public final class IgfsImpl implements IgfsEx { } if (await) { - IgfsFileWorkerBatch batch = workerEntry.getValue().currentBatch(); + IgfsFileWorkerBatch batch = workerEntry.getValue(); if (batch != null) { try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/542a92d1/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java index 81fbce8..1ebbf9e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java @@ -2589,12 +2589,11 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { workerMapFld.setAccessible(true); // Wait for all workers to finish. - Map<IgfsPath, IgfsFileWorker> workerMap = (Map<IgfsPath, IgfsFileWorker>)workerMapFld.get(igfs); + Map<IgfsPath, IgfsFileWorkerBatch> workerMap = (Map<IgfsPath, IgfsFileWorkerBatch>)workerMapFld.get(igfs); - for (Map.Entry<IgfsPath, IgfsFileWorker> entry : workerMap.entrySet()) { + for (Map.Entry<IgfsPath, IgfsFileWorkerBatch> entry : workerMap.entrySet()) { entry.getValue().cancel(); - - U.join(entry.getValue()); + entry.getValue().await(); } // Clear igfs. @@ -2617,12 +2616,11 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { workerMapFld.setAccessible(true); // Wait for all workers to finish. - Map<IgfsPath, IgfsFileWorker> workerMap = (Map<IgfsPath, IgfsFileWorker>)workerMapFld.get(igfsEx); + Map<IgfsPath, IgfsFileWorkerBatch> workerMap = (Map<IgfsPath, IgfsFileWorkerBatch>)workerMapFld.get(igfs); - for (Map.Entry<IgfsPath, IgfsFileWorker> entry : workerMap.entrySet()) { + for (Map.Entry<IgfsPath, IgfsFileWorkerBatch> entry : workerMap.entrySet()) { entry.getValue().cancel(); - - U.join(entry.getValue()); + entry.getValue().await(); } }