Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-625-1 [created] 542a92d12


# IGNITE-625: WIP on file batches rework.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a5ea4b42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a5ea4b42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a5ea4b42

Branch: refs/heads/ignite-625-1
Commit: a5ea4b4263a6377ae6efc95057aa78ab34ba9103
Parents: bed567f
Author: vozerov-gridgain <voze...@gridgain.com>
Authored: Tue Apr 7 15:30:11 2015 +0300
Committer: vozerov-gridgain <voze...@gridgain.com>
Committed: Tue Apr 7 15:30:11 2015 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsFileWorkerBatch.java    | 196 ++++++-------------
 .../processors/igfs/IgfsFileWorkerTask.java     |  32 ---
 .../internal/processors/igfs/IgfsImpl.java      |   3 +-
 3 files changed, 61 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5ea4b42/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
index 27f9b7d..782e1d9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
@@ -19,29 +19,24 @@ package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
 import java.io.*;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
 
 /**
  * Work batch is an abstraction of the logically grouped tasks.
  */
 public class IgfsFileWorkerBatch {
-    /** Completion latch. */
-    private final CountDownLatch completeLatch = new CountDownLatch(1);
-
-    /** Finish guard. */
-    private final AtomicBoolean finishGuard = new AtomicBoolean();
-
-    /** Lock for finish operation. */
-    private final ReadWriteLock finishLock = new ReentrantReadWriteLock();
+    /** Stop marker. */
+    private static final byte[] STOP_MARKER = new byte[0];
 
     /** Tasks queue. */
-    private final BlockingDeque<IgfsFileWorkerTask> queue = new 
LinkedBlockingDeque<>();
+    private final BlockingDeque<byte[]> queue = new LinkedBlockingDeque<>();
+
+    /** Future which completes when batch is finished. */
+    private final GridFutureAdapter fut = new GridFutureAdapter();
 
     /** Path to the file in the primary file system. */
     private final IgfsPath path;
@@ -49,11 +44,8 @@ public class IgfsFileWorkerBatch {
     /** Output stream to the file. */
     private final OutputStream out;
 
-    /** Caught exception. */
-    private volatile IgniteCheckedException err;
-
-    /** Last task marker. */
-    private boolean lastTask;
+    /** Finishing flag. */
+    private volatile boolean finishing;
 
     /**
      * Constructor.
@@ -70,23 +62,37 @@ public class IgfsFileWorkerBatch {
     }
 
     /**
-     * Perform write.
+     * Perform write if batch is not finishing yet.
      *
      * @param data Data to be written.
-     * @return {@code True} in case operation was enqueued.
+     * @return {@code True} in case write was enqueued.
      */
-    boolean write(final byte[] data) {
-        return addTask(new IgfsFileWorkerTask() {
-            @Override public void execute() throws IgniteCheckedException {
-                try {
-                    out.write(data);
-                }
-                catch (IOException e) {
-                    throw new IgniteCheckedException("Failed to write data to 
the file due to secondary file system " +
-                        "exception: " + path, e);
-                }
-            }
-        });
+    synchronized boolean write(final byte[] data) {
+        if (!finishing) {
+            queue.add(data);
+
+            return true;
+        }
+        else
+            return false;
+    }
+
+    /**
+     * Add the last task to that batch which will release all the resources.
+     */
+    synchronized void finish() {
+        if (!finishing) {
+            finishing = true;
+
+            queue.add(STOP_MARKER);
+        }
+    }
+
+    /**
+     * @return {@code True} if finish was called on this batch.
+     */
+    boolean finishing() {
+        return finishing;
     }
 
     /**
@@ -94,64 +100,41 @@ public class IgfsFileWorkerBatch {
      */
     void process() {
         try {
-            boolean cancelled = false;
-
-            while (!cancelled) {
+            while (!fut.isDone()) {
                 try {
-                    IgfsFileWorkerTask task = queue.poll(1000, 
TimeUnit.MILLISECONDS);
+                    byte[] data = queue.poll(1000, TimeUnit.MILLISECONDS);
 
-                    if (task == null)
+                    if (data == null)
                         continue;
 
-                    task.execute();
+                    if (data == STOP_MARKER) {
+                        assert queue.isEmpty();
 
-                    if (lastTask)
-                        cancelled = true;
-                }
-                catch (IgniteCheckedException e) {
-                    err = e;
+                        fut.onDone();
+                    }
 
-                    cancelled = true;
+                    try {
+                        out.write(data);
+                    }
+                    catch (IOException e) {
+                        throw new IgniteCheckedException("Failed to write data 
to the file due to secondary " +
+                            "file system exception: " + path, e);
+                    }
                 }
-                catch (InterruptedException ignore) {
+                catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
 
-                    cancelled = true;
+                    fut.onDone(e);
+                }
+                catch (Exception e) {
+                    fut.onDone(e);
                 }
             }
         }
         finally {
-            try {
-                onComplete();
-            }
-            finally {
-                U.closeQuiet(out);
+            assert fut.isDone();
 
-                completeLatch.countDown();
-            }
-        }
-    }
-
-    /**
-     * Add the last task to that batch which will release all the resources.
-     */
-    @SuppressWarnings("LockAcquiredButNotSafelyReleased")
-    void finish() {
-        if (finishGuard.compareAndSet(false, true)) {
-            finishLock.writeLock().lock();
-
-            try {
-                queue.add(new IgfsFileWorkerTask() {
-                    @Override public void execute() {
-                        assert queue.isEmpty();
-
-                        lastTask = true;
-                    }
-                });
-            }
-            finally {
-                finishLock.writeLock().unlock();
-            }
+            U.closeQuiet(out);
         }
     }
 
@@ -161,29 +144,7 @@ public class IgfsFileWorkerBatch {
      * @throws IgniteCheckedException In case any exception has occurred 
during batch tasks processing.
      */
     void await() throws IgniteCheckedException {
-        try {
-            completeLatch.await();
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteInterruptedCheckedException(e);
-        }
-
-        IgniteCheckedException err0 = err;
-
-        if (err0 != null)
-            throw err0;
-    }
-
-    /**
-     * Await for that worker batch to complete in case it was marked as 
finished.
-     *
-     * @throws IgniteCheckedException In case any exception has occurred 
during batch tasks processing.
-     */
-    void awaitIfFinished() throws IgniteCheckedException {
-        if (finishGuard.get())
-            await();
+        fut.get();
     }
 
     /**
@@ -194,43 +155,4 @@ public class IgfsFileWorkerBatch {
     IgfsPath path() {
         return path;
     }
-
-    /**
-     * Callback invoked when all the tasks within the batch are completed.
-     */
-    protected void onComplete() {
-        // No-op.
-    }
-
-    /**
-     * Add task to the queue.
-     *
-     * @param task Task to add.
-     * @return {@code True} in case the task was added to the queue.
-     */
-    private boolean addTask(IgfsFileWorkerTask task) {
-        finishLock.readLock().lock();
-
-        try {
-            if (!finishGuard.get()) {
-                try {
-                    queue.put(task);
-
-                    return true;
-                }
-                catch (InterruptedException ignore) {
-                    // Task was not enqueued due to interruption.
-                    Thread.currentThread().interrupt();
-
-                    return false;
-                }
-            }
-            else
-                return false;
-
-        }
-        finally {
-            finishLock.readLock().unlock();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5ea4b42/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerTask.java
deleted file mode 100644
index ba788b4..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerTask.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs;
-
-import org.apache.ignite.*;
-
-/**
- * Generic IGFS worker task which could potentially throw an exception.
- */
-public interface IgfsFileWorkerTask {
-    /**
-     * Execute task logic.
-     *
-     * @throws IgniteCheckedException If failed.
-     */
-    public void execute() throws IgniteCheckedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5ea4b42/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 824f178..4b4203e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -351,7 +351,8 @@ public final class IgfsImpl implements IgfsEx {
 
                 if (batch != null) {
                     try {
-                        batch.awaitIfFinished();
+                        if (batch.finishing())
+                            batch.await();
                     }
                     catch (IgniteCheckedException ignore) {
                         // No-op.

Reply via email to