# IGNITE-625: WIP.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fb5eb422 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fb5eb422 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fb5eb422 Branch: refs/heads/ignite-625-1 Commit: fb5eb422ddf0ed3e2941e7248907b9b76193b546 Parents: a5ea4b4 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Wed Apr 8 09:19:49 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Wed Apr 8 09:19:49 2015 +0300 ---------------------------------------------------------------------- .../processors/igfs/IgfsFileWorker.java | 180 ------------------- .../processors/igfs/IgfsFileWorkerBatch.java | 14 +- .../internal/processors/igfs/IgfsImpl.java | 126 ++++++++----- 3 files changed, 92 insertions(+), 228 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb5eb422/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorker.java deleted file mode 100644 index 8b04c41..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorker.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.igfs; - -import java.util.concurrent.*; -import java.util.concurrent.locks.*; - -/** - * IGFS file worker for DUAL modes. - */ -public class IgfsFileWorker extends IgfsThread { - /** Time during which thread remains alive since it's last batch is finished. */ - private static final long THREAD_REUSE_WAIT_TIME = 5000; - - /** Lock */ - private final Lock lock = new ReentrantLock(); - - /** Condition. */ - private final Condition cond = lock.newCondition(); - - /** Next queued batch. */ - private IgfsFileWorkerBatch nextBatch; - - /** Batch which is currently being processed. */ - private IgfsFileWorkerBatch curBatch; - - /** Cancellation flag. */ - private volatile boolean cancelled; - - /** - * Creates {@code IGFS} file worker. - * - * @param name Worker name. - */ - IgfsFileWorker(String name) { - super(name); - } - - /** - * Add worker batch. - * - * @return {@code True} if the batch was actually added. - */ - boolean addBatch(IgfsFileWorkerBatch batch) { - assert batch != null; - - lock.lock(); - - try { - if (!cancelled) { - assert nextBatch == null; // Remember, that write operations on a single file are exclusive. - - nextBatch = batch; - - cond.signalAll(); - - return true; - } - else - return false; - } - finally { - lock.unlock(); - } - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - while (!cancelled) { - lock.lock(); - - try { - // If there are no more new batches, wait for several seconds before shutting down the thread. - if (!cancelled && nextBatch == null) - cond.await(THREAD_REUSE_WAIT_TIME, TimeUnit.MILLISECONDS); - - curBatch = nextBatch; - - nextBatch = null; - - if (cancelled && curBatch != null) - curBatch.finish(); // Mark the batch as finished if cancelled. - } - finally { - lock.unlock(); - } - - if (curBatch != null) - curBatch.process(); - else { - lock.lock(); - - try { - // No more new batches, we can safely release the worker as it was inactive for too long. - if (nextBatch == null) - cancelled = true; - } - finally { - lock.unlock(); - } - } - } - } - - /** {@inheritDoc} */ - @Override protected void cleanup() { - // Clear interrupted flag. - boolean interrupted = interrupted(); - - // Process the last batch if any. - if (nextBatch != null) - nextBatch.process(); - - onFinish(); - - // Reset interrupted flag. - if (interrupted) - interrupt(); - } - - /** - * Forcefully finish execution of all batches. - */ - void cancel() { - lock.lock(); - - try { - cancelled = true; - - if (curBatch != null) - curBatch.finish(); - - if (nextBatch != null) - nextBatch.finish(); - - cond.signalAll(); // Awake the main loop in case it is still waiting for the next batch. - } - finally { - lock.unlock(); - } - } - - /** - * Get current batch. - * - * @return Current batch. - */ - IgfsFileWorkerBatch currentBatch() { - lock.lock(); - - try { - return nextBatch == null ? curBatch : nextBatch; - } - finally { - lock.unlock(); - } - } - - /** - * Callback invoked when worker has processed all it's batches. - */ - protected void onFinish() { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb5eb422/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java index 782e1d9..352bf06 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java @@ -28,10 +28,13 @@ import java.util.concurrent.*; /** * Work batch is an abstraction of the logically grouped tasks. */ -public class IgfsFileWorkerBatch { +public abstract class IgfsFileWorkerBatch implements Runnable { /** Stop marker. */ private static final byte[] STOP_MARKER = new byte[0]; + /** Cancel marker. */ + private static final byte[] CANCEL_MARKER = new byte[0]; + /** Tasks queue. */ private final BlockingDeque<byte[]> queue = new LinkedBlockingDeque<>(); @@ -98,7 +101,7 @@ public class IgfsFileWorkerBatch { /** * Process the batch. */ - void process() { + public void run() { try { while (!fut.isDone()) { try { @@ -135,6 +138,8 @@ public class IgfsFileWorkerBatch { assert fut.isDone(); U.closeQuiet(out); + + onDone(); } } @@ -155,4 +160,9 @@ public class IgfsFileWorkerBatch { IgfsPath path() { return path; } + + /** + * Callback invoked when execution finishes. + */ + protected abstract void onDone(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb5eb422/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 4b4203e..3dac09d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; +import org.apache.ignite.thread.*; import org.jetbrains.annotations.*; import org.jsr166.*; @@ -97,7 +98,7 @@ public final class IgfsImpl implements IgfsEx { private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); /** Writers map. */ - private final ConcurrentHashMap8<IgfsPath, IgfsFileWorker> workerMap = new ConcurrentHashMap8<>(); + private final ConcurrentHashMap8<IgfsPath, IgfsFileWorkerBatch> workerMap = new ConcurrentHashMap8<>(); /** Delete futures. */ private final ConcurrentHashMap8<IgniteUuid, GridFutureAdapter<Object>> delFuts = new ConcurrentHashMap8<>(); @@ -120,6 +121,9 @@ public final class IgfsImpl implements IgfsEx { /** Eviction policy (if set). */ private IgfsPerBlockLruEvictionPolicy evictPlc; + /** Pool for threads working in DUAL mode. */ + private final IgniteThreadPoolExecutor dualPool; + /** * Creates IGFS instance with given context. * @@ -214,6 +218,9 @@ public final class IgfsImpl implements IgfsEx { igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr); igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); + + dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, Integer.MAX_VALUE, 5000L, + new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null; } /** @@ -233,28 +240,29 @@ public final class IgfsImpl implements IgfsEx { // Clear interrupted flag temporarily. boolean interrupted = Thread.interrupted(); - // Force all workers to finish their batches. - for (IgfsFileWorker w : workerMap.values()) - w.cancel(); + if (secondaryFs != null) { + // Force all workers to finish their batches. + for (IgfsFileWorkerBatch batch : workerMap.values()) + batch.finish(); - // Wait for all writers to finish their execution. - for (IgfsFileWorker w : workerMap.values()) { - try { - w.join(); - } - catch (InterruptedException e) { - U.error(log, e.getMessage(), e); + // Wait for all writers to finish their execution. + for (IgfsFileWorker w : workerMap.values()) { + try { + w.join(); + } + catch (InterruptedException e) { + U.error(log, e.getMessage(), e); + } } - } - - workerMap.clear(); - if (secondaryFs instanceof AutoCloseable) - U.closeQuiet((AutoCloseable)secondaryFs); + if (secondaryFs instanceof AutoCloseable) + U.closeQuiet((AutoCloseable)secondaryFs); + } igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr); igfsCtx.kernalContext().event().removeLocalEventListener(delDiscoLsnr); + // Restore interrupted flag. if (interrupted) Thread.currentThread().interrupt(); } @@ -273,34 +281,26 @@ public final class IgfsImpl implements IgfsEx { if (enterBusy()) { try { - IgfsFileWorkerBatch batch = new IgfsFileWorkerBatch(path, out); - - while (true) { - IgfsFileWorker worker = workerMap.get(path); - - if (worker != null) { - if (worker.addBatch(batch)) // Added batch to active worker. - break; - else - workerMap.remove(path, worker); // Worker is stopping. Remove it from map. + // Create new batch. + IgfsFileWorkerBatch batch = new IgfsFileWorkerBatch(path, out) { + @Override protected void onDone() { + workerMap.remove(path, this); } - else { - worker = new IgfsFileWorker("igfs-file-worker-" + path) { - @Override protected void onFinish() { - workerMap.remove(path, this); - } - }; + }; - boolean b = worker.addBatch(batch); + // Submit it to the thread pool immediately. + assert dualPool != null; - assert b; + dualPool.submit(batch); - if (workerMap.putIfAbsent(path, worker) == null) { - worker.start(); + // Spin in case another batch is currently running. + while (true) { + IgfsFileWorkerBatch prevBatch = workerMap.putIfAbsent(path, batch); - break; - } - } + if (prevBatch == null) + break; + else + prevBatch.await(); } return batch; @@ -494,7 +494,8 @@ public final class IgfsImpl implements IgfsEx { A.notNull(path, "path"); return safeOp(new Callable<IgfsFile>() { - @Override public IgfsFile call() throws Exception { + @Override + public IgfsFile call() throws Exception { if (log.isDebugEnabled()) log.debug("Get file info: " + path); @@ -988,7 +989,8 @@ public final class IgfsImpl implements IgfsEx { A.ensure(seqReadsBeforePrefetch >= 0, "seqReadsBeforePrefetch >= 0"); return safeOp(new Callable<IgfsInputStreamAdapter>() { - @Override public IgfsInputStreamAdapter call() throws Exception { + @Override + public IgfsInputStreamAdapter call() throws Exception { if (log.isDebugEnabled()) log.debug("Open file for reading [path=" + path + ", bufSize=" + bufSize + ']'); @@ -1075,7 +1077,8 @@ public final class IgfsImpl implements IgfsEx { A.ensure(bufSize >= 0, "bufSize >= 0"); return safeOp(new Callable<IgfsOutputStream>() { - @Override public IgfsOutputStream call() throws Exception { + @Override + public IgfsOutputStream call() throws Exception { if (log.isDebugEnabled()) log.debug("Open file for writing [path=" + path + ", bufSize=" + bufSize + ", overwrite=" + overwrite + ", props=" + props + ']'); @@ -1090,7 +1093,7 @@ public final class IgfsImpl implements IgfsEx { await(path); IgfsSecondaryOutputStreamDescriptor desc = meta.createDual(secondaryFs, path, simpleCreate, - props, overwrite, bufSize, (short)replication, groupBlockSize(), affKey); + props, overwrite, bufSize, (short) replication, groupBlockSize(), affKey); batch = newBatch(path, desc.out()); @@ -1333,7 +1336,8 @@ public final class IgfsImpl implements IgfsEx { /** {@inheritDoc} */ @Override public IgfsMetrics metrics() { return safeOp(new Callable<IgfsMetrics>() { - @Override public IgfsMetrics call() throws Exception { + @Override + public IgfsMetrics call() throws Exception { IgfsPathSummary sum = new IgfsPathSummary(); summary0(ROOT_ID, sum); @@ -1343,8 +1347,7 @@ public final class IgfsImpl implements IgfsEx { if (secondaryFs != null) { try { secondarySpaceSize = secondaryFs.usedSpaceSize(); - } - catch (IgniteException e) { + } catch (IgniteException e) { LT.warn(log, e, "Failed to get secondary file system consumed space size."); secondarySpaceSize = -1; @@ -2097,4 +2100,35 @@ public final class IgfsImpl implements IgfsEx { else throw new IllegalStateException("Failed to perform IGFS action because grid is stopping."); } + +/** + * IGFS thread factory. + */ +@SuppressWarnings("NullableProblems") +private static class IgfsThreadFactory implements ThreadFactory { + /** IGFS name. */ + private final String name; + + /** Counter. */ + private final AtomicLong ctr = new AtomicLong(); + + /** + * Constructor. + * + * @param name IGFS name. + */ + private IgfsThreadFactory(String name) { + this.name = name; + } + + /** {@inheritDoc} */ + @Override public Thread newThread(Runnable r) { + Thread t = new Thread(r); + + t.setName("igfs-<" + name + ">-batch-worker-thread-" + ctr.incrementAndGet()); + t.setDaemon(true); + + return t; + } +} }