# IGNITE-625: WIP.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/542a92d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/542a92d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/542a92d1

Branch: refs/heads/ignite-625-1
Commit: 542a92d122a998c8cb6aa0be982a99ec6b6c23be
Parents: 0defad0
Author: vozerov-gridgain <voze...@gridgain.com>
Authored: Wed Apr 8 10:19:21 2015 +0300
Committer: vozerov-gridgain <voze...@gridgain.com>
Committed: Wed Apr 8 10:19:21 2015 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsFileWorkerBatch.java        | 12 ++++++++++--
 .../ignite/internal/processors/igfs/IgfsImpl.java   | 16 +++-------------
 .../processors/igfs/IgfsAbstractSelfTest.java       | 14 ++++++--------
 3 files changed, 19 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/542a92d1/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
index 352bf06..4d91432 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
@@ -92,6 +92,13 @@ public abstract class IgfsFileWorkerBatch implements 
Runnable {
     }
 
     /**
+     * Cancel batch processing.
+     */
+    synchronized void cancel() {
+        queue.addFirst(CANCEL_MARKER);
+    }
+
+    /**
      * @return {@code True} if finish was called on this batch.
      */
     boolean finishing() {
@@ -109,12 +116,13 @@ public abstract class IgfsFileWorkerBatch implements 
Runnable {
 
                     if (data == null)
                         continue;
-
-                    if (data == STOP_MARKER) {
+                    else if (data == STOP_MARKER) {
                         assert queue.isEmpty();
 
                         fut.onDone();
                     }
+                    else if (data == CANCEL_MARKER)
+                        throw new IgniteCheckedException("Write to file was 
cancelled due to node stop.");
 
                     try {
                         out.write(data);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/542a92d1/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 3dac09d..6eefde5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -243,17 +243,7 @@ public final class IgfsImpl implements IgfsEx {
         if (secondaryFs != null) {
             // Force all workers to finish their batches.
             for (IgfsFileWorkerBatch batch : workerMap.values())
-                batch.finish();
-
-            // Wait for all writers to finish their execution.
-            for (IgfsFileWorker w : workerMap.values()) {
-                try {
-                    w.join();
-                }
-                catch (InterruptedException e) {
-                    U.error(log, e.getMessage(), e);
-                }
-            }
+                batch.cancel();
 
             if (secondaryFs instanceof AutoCloseable)
                 U.closeQuiet((AutoCloseable)secondaryFs);
@@ -333,7 +323,7 @@ public final class IgfsImpl implements IgfsEx {
     void await(IgfsPath... paths) {
         assert paths != null;
 
-        for (Map.Entry<IgfsPath, IgfsFileWorker> workerEntry : 
workerMap.entrySet()) {
+        for (Map.Entry<IgfsPath, IgfsFileWorkerBatch> workerEntry : 
workerMap.entrySet()) {
             IgfsPath workerPath = workerEntry.getKey();
 
             boolean await = false;
@@ -347,7 +337,7 @@ public final class IgfsImpl implements IgfsEx {
             }
 
             if (await) {
-                IgfsFileWorkerBatch batch = 
workerEntry.getValue().currentBatch();
+                IgfsFileWorkerBatch batch = workerEntry.getValue();
 
                 if (batch != null) {
                     try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/542a92d1/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index 81fbce8..1ebbf9e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -2589,12 +2589,11 @@ public abstract class IgfsAbstractSelfTest extends 
IgfsCommonAbstractTest {
         workerMapFld.setAccessible(true);
 
         // Wait for all workers to finish.
-        Map<IgfsPath, IgfsFileWorker> workerMap = (Map<IgfsPath, 
IgfsFileWorker>)workerMapFld.get(igfs);
+        Map<IgfsPath, IgfsFileWorkerBatch> workerMap = (Map<IgfsPath, 
IgfsFileWorkerBatch>)workerMapFld.get(igfs);
 
-        for (Map.Entry<IgfsPath, IgfsFileWorker> entry : workerMap.entrySet()) 
{
+        for (Map.Entry<IgfsPath, IgfsFileWorkerBatch> entry : 
workerMap.entrySet()) {
             entry.getValue().cancel();
-
-            U.join(entry.getValue());
+            entry.getValue().await();
         }
 
         // Clear igfs.
@@ -2617,12 +2616,11 @@ public abstract class IgfsAbstractSelfTest extends 
IgfsCommonAbstractTest {
             workerMapFld.setAccessible(true);
 
             // Wait for all workers to finish.
-            Map<IgfsPath, IgfsFileWorker> workerMap = (Map<IgfsPath, 
IgfsFileWorker>)workerMapFld.get(igfsEx);
+            Map<IgfsPath, IgfsFileWorkerBatch> workerMap = (Map<IgfsPath, 
IgfsFileWorkerBatch>)workerMapFld.get(igfs);
 
-            for (Map.Entry<IgfsPath, IgfsFileWorker> entry : 
workerMap.entrySet()) {
+            for (Map.Entry<IgfsPath, IgfsFileWorkerBatch> entry : 
workerMap.entrySet()) {
                 entry.getValue().cancel();
-
-                U.join(entry.getValue());
+                entry.getValue().await();
             }
         }
 

Reply via email to