# IGNITE-625: WIP.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fb5eb422
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fb5eb422
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fb5eb422

Branch: refs/heads/ignite-625-1
Commit: fb5eb422ddf0ed3e2941e7248907b9b76193b546
Parents: a5ea4b4
Author: vozerov-gridgain <voze...@gridgain.com>
Authored: Wed Apr 8 09:19:49 2015 +0300
Committer: vozerov-gridgain <voze...@gridgain.com>
Committed: Wed Apr 8 09:19:49 2015 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsFileWorker.java         | 180 -------------------
 .../processors/igfs/IgfsFileWorkerBatch.java    |  14 +-
 .../internal/processors/igfs/IgfsImpl.java      | 126 ++++++++-----
 3 files changed, 92 insertions(+), 228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb5eb422/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorker.java
deleted file mode 100644
index 8b04c41..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorker.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs;
-
-import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
-
-/**
- * IGFS file worker for DUAL modes.
- */
-public class IgfsFileWorker extends IgfsThread {
-    /** Time during which thread remains alive since it's last batch is 
finished. */
-    private static final long THREAD_REUSE_WAIT_TIME = 5000;
-
-    /** Lock */
-    private final Lock lock = new ReentrantLock();
-
-    /** Condition. */
-    private final Condition cond = lock.newCondition();
-
-    /** Next queued batch. */
-    private IgfsFileWorkerBatch nextBatch;
-
-    /** Batch which is currently being processed. */
-    private IgfsFileWorkerBatch curBatch;
-
-    /** Cancellation flag. */
-    private volatile boolean cancelled;
-
-    /**
-     * Creates {@code IGFS} file worker.
-     *
-     * @param name Worker name.
-     */
-    IgfsFileWorker(String name) {
-        super(name);
-    }
-
-    /**
-     * Add worker batch.
-     *
-     * @return {@code True} if the batch was actually added.
-     */
-    boolean addBatch(IgfsFileWorkerBatch batch) {
-        assert batch != null;
-
-        lock.lock();
-
-        try {
-            if (!cancelled) {
-                assert nextBatch == null; // Remember, that write operations 
on a single file are exclusive.
-
-                nextBatch = batch;
-
-                cond.signalAll();
-
-                return true;
-            }
-            else
-                return false;
-        }
-        finally {
-            lock.unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void body() throws InterruptedException {
-        while (!cancelled) {
-            lock.lock();
-
-            try {
-                // If there are no more new batches, wait for several seconds 
before shutting down the thread.
-                if (!cancelled && nextBatch == null)
-                    cond.await(THREAD_REUSE_WAIT_TIME, TimeUnit.MILLISECONDS);
-
-                curBatch = nextBatch;
-
-                nextBatch = null;
-
-                if (cancelled && curBatch != null)
-                    curBatch.finish(); // Mark the batch as finished if 
cancelled.
-            }
-            finally {
-                lock.unlock();
-            }
-
-            if (curBatch != null)
-                curBatch.process();
-            else {
-                lock.lock();
-
-                try {
-                    // No more new batches, we can safely release the worker 
as it was inactive for too long.
-                    if (nextBatch == null)
-                        cancelled = true;
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void cleanup() {
-        // Clear interrupted flag.
-        boolean interrupted = interrupted();
-
-        // Process the last batch if any.
-        if (nextBatch != null)
-            nextBatch.process();
-
-        onFinish();
-
-        // Reset interrupted flag.
-        if (interrupted)
-            interrupt();
-    }
-
-    /**
-     * Forcefully finish execution of all batches.
-     */
-    void cancel() {
-        lock.lock();
-
-        try {
-            cancelled = true;
-
-            if (curBatch != null)
-                curBatch.finish();
-
-            if (nextBatch != null)
-                nextBatch.finish();
-
-            cond.signalAll(); // Awake the main loop in case it is still 
waiting for the next batch.
-        }
-        finally {
-            lock.unlock();
-        }
-    }
-
-    /**
-     * Get current batch.
-     *
-     * @return Current batch.
-     */
-    IgfsFileWorkerBatch currentBatch() {
-        lock.lock();
-
-        try {
-            return nextBatch == null ? curBatch : nextBatch;
-        }
-        finally {
-            lock.unlock();
-        }
-    }
-
-    /**
-     * Callback invoked when worker has processed all it's batches.
-     */
-    protected void onFinish() {
-        // No-op.
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb5eb422/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
index 782e1d9..352bf06 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
@@ -28,10 +28,13 @@ import java.util.concurrent.*;
 /**
  * Work batch is an abstraction of the logically grouped tasks.
  */
-public class IgfsFileWorkerBatch {
+public abstract class IgfsFileWorkerBatch implements Runnable {
     /** Stop marker. */
     private static final byte[] STOP_MARKER = new byte[0];
 
+    /** Cancel marker. */
+    private static final byte[] CANCEL_MARKER = new byte[0];
+
     /** Tasks queue. */
     private final BlockingDeque<byte[]> queue = new LinkedBlockingDeque<>();
 
@@ -98,7 +101,7 @@ public class IgfsFileWorkerBatch {
     /**
      * Process the batch.
      */
-    void process() {
+    public void run() {
         try {
             while (!fut.isDone()) {
                 try {
@@ -135,6 +138,8 @@ public class IgfsFileWorkerBatch {
             assert fut.isDone();
 
             U.closeQuiet(out);
+
+            onDone();
         }
     }
 
@@ -155,4 +160,9 @@ public class IgfsFileWorkerBatch {
     IgfsPath path() {
         return path;
     }
+
+    /**
+     * Callback invoked when execution finishes.
+     */
+    protected abstract void onDone();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb5eb422/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 4b4203e..3dac09d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
+import org.apache.ignite.thread.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
@@ -97,7 +98,7 @@ public final class IgfsImpl implements IgfsEx {
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
     /** Writers map. */
-    private final ConcurrentHashMap8<IgfsPath, IgfsFileWorker> workerMap = new 
ConcurrentHashMap8<>();
+    private final ConcurrentHashMap8<IgfsPath, IgfsFileWorkerBatch> workerMap 
= new ConcurrentHashMap8<>();
 
     /** Delete futures. */
     private final ConcurrentHashMap8<IgniteUuid, GridFutureAdapter<Object>> 
delFuts = new ConcurrentHashMap8<>();
@@ -120,6 +121,9 @@ public final class IgfsImpl implements IgfsEx {
     /** Eviction policy (if set). */
     private IgfsPerBlockLruEvictionPolicy evictPlc;
 
+    /** Pool for threads working in DUAL mode. */
+    private final IgniteThreadPoolExecutor dualPool;
+
     /**
      * Creates IGFS instance with given context.
      *
@@ -214,6 +218,9 @@ public final class IgfsImpl implements IgfsEx {
 
         igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr);
         igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, 
EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+        dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, 
Integer.MAX_VALUE, 5000L,
+            new LinkedBlockingQueue<Runnable>(), new 
IgfsThreadFactory(cfg.getName()), null) : null;
     }
 
     /**
@@ -233,28 +240,29 @@ public final class IgfsImpl implements IgfsEx {
         // Clear interrupted flag temporarily.
         boolean interrupted = Thread.interrupted();
 
-        // Force all workers to finish their batches.
-        for (IgfsFileWorker w : workerMap.values())
-            w.cancel();
+        if (secondaryFs != null) {
+            // Force all workers to finish their batches.
+            for (IgfsFileWorkerBatch batch : workerMap.values())
+                batch.finish();
 
-        // Wait for all writers to finish their execution.
-        for (IgfsFileWorker w : workerMap.values()) {
-            try {
-                w.join();
-            }
-            catch (InterruptedException e) {
-                U.error(log, e.getMessage(), e);
+            // Wait for all writers to finish their execution.
+            for (IgfsFileWorker w : workerMap.values()) {
+                try {
+                    w.join();
+                }
+                catch (InterruptedException e) {
+                    U.error(log, e.getMessage(), e);
+                }
             }
-        }
-
-        workerMap.clear();
 
-        if (secondaryFs instanceof AutoCloseable)
-            U.closeQuiet((AutoCloseable)secondaryFs);
+            if (secondaryFs instanceof AutoCloseable)
+                U.closeQuiet((AutoCloseable)secondaryFs);
+        }
 
         igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr);
         igfsCtx.kernalContext().event().removeLocalEventListener(delDiscoLsnr);
 
+        // Restore interrupted flag.
         if (interrupted)
             Thread.currentThread().interrupt();
     }
@@ -273,34 +281,26 @@ public final class IgfsImpl implements IgfsEx {
 
         if (enterBusy()) {
             try {
-                IgfsFileWorkerBatch batch = new IgfsFileWorkerBatch(path, out);
-
-                while (true) {
-                    IgfsFileWorker worker = workerMap.get(path);
-
-                    if (worker != null) {
-                        if (worker.addBatch(batch)) // Added batch to active 
worker.
-                            break;
-                        else
-                            workerMap.remove(path, worker); // Worker is 
stopping. Remove it from map.
+                // Create new batch.
+                IgfsFileWorkerBatch batch = new IgfsFileWorkerBatch(path, out) 
{
+                    @Override protected void onDone() {
+                        workerMap.remove(path, this);
                     }
-                    else {
-                        worker = new IgfsFileWorker("igfs-file-worker-" + 
path) {
-                            @Override protected void onFinish() {
-                                workerMap.remove(path, this);
-                            }
-                        };
+                };
 
-                        boolean b = worker.addBatch(batch);
+                // Submit it to the thread pool immediately.
+                assert dualPool != null;
 
-                        assert b;
+                dualPool.submit(batch);
 
-                        if (workerMap.putIfAbsent(path, worker) == null) {
-                            worker.start();
+                // Spin in case another batch is currently running.
+                while (true) {
+                    IgfsFileWorkerBatch prevBatch = 
workerMap.putIfAbsent(path, batch);
 
-                            break;
-                        }
-                    }
+                    if (prevBatch == null)
+                        break;
+                    else
+                        prevBatch.await();
                 }
 
                 return batch;
@@ -494,7 +494,8 @@ public final class IgfsImpl implements IgfsEx {
         A.notNull(path, "path");
 
         return safeOp(new Callable<IgfsFile>() {
-            @Override public IgfsFile call() throws Exception {
+            @Override
+            public IgfsFile call() throws Exception {
                 if (log.isDebugEnabled())
                     log.debug("Get file info: " + path);
 
@@ -988,7 +989,8 @@ public final class IgfsImpl implements IgfsEx {
         A.ensure(seqReadsBeforePrefetch >= 0, "seqReadsBeforePrefetch >= 0");
 
         return safeOp(new Callable<IgfsInputStreamAdapter>() {
-            @Override public IgfsInputStreamAdapter call() throws Exception {
+            @Override
+            public IgfsInputStreamAdapter call() throws Exception {
                 if (log.isDebugEnabled())
                     log.debug("Open file for reading [path=" + path + ", 
bufSize=" + bufSize + ']');
 
@@ -1075,7 +1077,8 @@ public final class IgfsImpl implements IgfsEx {
         A.ensure(bufSize >= 0, "bufSize >= 0");
 
         return safeOp(new Callable<IgfsOutputStream>() {
-            @Override public IgfsOutputStream call() throws Exception {
+            @Override
+            public IgfsOutputStream call() throws Exception {
                 if (log.isDebugEnabled())
                     log.debug("Open file for writing [path=" + path + ", 
bufSize=" + bufSize + ", overwrite=" +
                         overwrite + ", props=" + props + ']');
@@ -1090,7 +1093,7 @@ public final class IgfsImpl implements IgfsEx {
                     await(path);
 
                     IgfsSecondaryOutputStreamDescriptor desc = 
meta.createDual(secondaryFs, path, simpleCreate,
-                        props, overwrite, bufSize, (short)replication, 
groupBlockSize(), affKey);
+                        props, overwrite, bufSize, (short) replication, 
groupBlockSize(), affKey);
 
                     batch = newBatch(path, desc.out());
 
@@ -1333,7 +1336,8 @@ public final class IgfsImpl implements IgfsEx {
     /** {@inheritDoc} */
     @Override public IgfsMetrics metrics() {
         return safeOp(new Callable<IgfsMetrics>() {
-            @Override public IgfsMetrics call() throws Exception {
+            @Override
+            public IgfsMetrics call() throws Exception {
                 IgfsPathSummary sum = new IgfsPathSummary();
 
                 summary0(ROOT_ID, sum);
@@ -1343,8 +1347,7 @@ public final class IgfsImpl implements IgfsEx {
                 if (secondaryFs != null) {
                     try {
                         secondarySpaceSize = secondaryFs.usedSpaceSize();
-                    }
-                    catch (IgniteException e) {
+                    } catch (IgniteException e) {
                         LT.warn(log, e, "Failed to get secondary file system 
consumed space size.");
 
                         secondarySpaceSize = -1;
@@ -2097,4 +2100,35 @@ public final class IgfsImpl implements IgfsEx {
         else
             throw new IllegalStateException("Failed to perform IGFS action 
because grid is stopping.");
     }
+
+/**
+ * IGFS thread factory.
+ */
+@SuppressWarnings("NullableProblems")
+private static class IgfsThreadFactory implements ThreadFactory {
+    /** IGFS name. */
+    private final String name;
+
+    /** Counter. */
+    private final AtomicLong ctr = new AtomicLong();
+
+    /**
+     * Constructor.
+     *
+     * @param name IGFS name.
+     */
+    private IgfsThreadFactory(String name) {
+        this.name = name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Thread newThread(Runnable r) {
+        Thread t = new Thread(r);
+
+        t.setName("igfs-<" + name + ">-batch-worker-thread-" + 
ctr.incrementAndGet());
+        t.setDaemon(true);
+
+        return t;
+    }
+}
 }

Reply via email to