Repository: incubator-ignite Updated Branches: refs/heads/ignite-625-1 [created] 542a92d12
# IGNITE-625: WIP on file batches rework. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a5ea4b42 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a5ea4b42 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a5ea4b42 Branch: refs/heads/ignite-625-1 Commit: a5ea4b4263a6377ae6efc95057aa78ab34ba9103 Parents: bed567f Author: vozerov-gridgain <voze...@gridgain.com> Authored: Tue Apr 7 15:30:11 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Tue Apr 7 15:30:11 2015 +0300 ---------------------------------------------------------------------- .../processors/igfs/IgfsFileWorkerBatch.java | 196 ++++++------------- .../processors/igfs/IgfsFileWorkerTask.java | 32 --- .../internal/processors/igfs/IgfsImpl.java | 3 +- 3 files changed, 61 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5ea4b42/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java index 27f9b7d..782e1d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java @@ -19,29 +19,24 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.*; import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; /** * Work batch is an abstraction of the logically grouped tasks. */ public class IgfsFileWorkerBatch { - /** Completion latch. */ - private final CountDownLatch completeLatch = new CountDownLatch(1); - - /** Finish guard. */ - private final AtomicBoolean finishGuard = new AtomicBoolean(); - - /** Lock for finish operation. */ - private final ReadWriteLock finishLock = new ReentrantReadWriteLock(); + /** Stop marker. */ + private static final byte[] STOP_MARKER = new byte[0]; /** Tasks queue. */ - private final BlockingDeque<IgfsFileWorkerTask> queue = new LinkedBlockingDeque<>(); + private final BlockingDeque<byte[]> queue = new LinkedBlockingDeque<>(); + + /** Future which completes when batch is finished. */ + private final GridFutureAdapter fut = new GridFutureAdapter(); /** Path to the file in the primary file system. */ private final IgfsPath path; @@ -49,11 +44,8 @@ public class IgfsFileWorkerBatch { /** Output stream to the file. */ private final OutputStream out; - /** Caught exception. */ - private volatile IgniteCheckedException err; - - /** Last task marker. */ - private boolean lastTask; + /** Finishing flag. */ + private volatile boolean finishing; /** * Constructor. @@ -70,23 +62,37 @@ public class IgfsFileWorkerBatch { } /** - * Perform write. + * Perform write if batch is not finishing yet. * * @param data Data to be written. - * @return {@code True} in case operation was enqueued. + * @return {@code True} in case write was enqueued. */ - boolean write(final byte[] data) { - return addTask(new IgfsFileWorkerTask() { - @Override public void execute() throws IgniteCheckedException { - try { - out.write(data); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to write data to the file due to secondary file system " + - "exception: " + path, e); - } - } - }); + synchronized boolean write(final byte[] data) { + if (!finishing) { + queue.add(data); + + return true; + } + else + return false; + } + + /** + * Add the last task to that batch which will release all the resources. + */ + synchronized void finish() { + if (!finishing) { + finishing = true; + + queue.add(STOP_MARKER); + } + } + + /** + * @return {@code True} if finish was called on this batch. + */ + boolean finishing() { + return finishing; } /** @@ -94,64 +100,41 @@ public class IgfsFileWorkerBatch { */ void process() { try { - boolean cancelled = false; - - while (!cancelled) { + while (!fut.isDone()) { try { - IgfsFileWorkerTask task = queue.poll(1000, TimeUnit.MILLISECONDS); + byte[] data = queue.poll(1000, TimeUnit.MILLISECONDS); - if (task == null) + if (data == null) continue; - task.execute(); + if (data == STOP_MARKER) { + assert queue.isEmpty(); - if (lastTask) - cancelled = true; - } - catch (IgniteCheckedException e) { - err = e; + fut.onDone(); + } - cancelled = true; + try { + out.write(data); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to write data to the file due to secondary " + + "file system exception: " + path, e); + } } - catch (InterruptedException ignore) { + catch (InterruptedException e) { Thread.currentThread().interrupt(); - cancelled = true; + fut.onDone(e); + } + catch (Exception e) { + fut.onDone(e); } } } finally { - try { - onComplete(); - } - finally { - U.closeQuiet(out); + assert fut.isDone(); - completeLatch.countDown(); - } - } - } - - /** - * Add the last task to that batch which will release all the resources. - */ - @SuppressWarnings("LockAcquiredButNotSafelyReleased") - void finish() { - if (finishGuard.compareAndSet(false, true)) { - finishLock.writeLock().lock(); - - try { - queue.add(new IgfsFileWorkerTask() { - @Override public void execute() { - assert queue.isEmpty(); - - lastTask = true; - } - }); - } - finally { - finishLock.writeLock().unlock(); - } + U.closeQuiet(out); } } @@ -161,29 +144,7 @@ public class IgfsFileWorkerBatch { * @throws IgniteCheckedException In case any exception has occurred during batch tasks processing. */ void await() throws IgniteCheckedException { - try { - completeLatch.await(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException(e); - } - - IgniteCheckedException err0 = err; - - if (err0 != null) - throw err0; - } - - /** - * Await for that worker batch to complete in case it was marked as finished. - * - * @throws IgniteCheckedException In case any exception has occurred during batch tasks processing. - */ - void awaitIfFinished() throws IgniteCheckedException { - if (finishGuard.get()) - await(); + fut.get(); } /** @@ -194,43 +155,4 @@ public class IgfsFileWorkerBatch { IgfsPath path() { return path; } - - /** - * Callback invoked when all the tasks within the batch are completed. - */ - protected void onComplete() { - // No-op. - } - - /** - * Add task to the queue. - * - * @param task Task to add. - * @return {@code True} in case the task was added to the queue. - */ - private boolean addTask(IgfsFileWorkerTask task) { - finishLock.readLock().lock(); - - try { - if (!finishGuard.get()) { - try { - queue.put(task); - - return true; - } - catch (InterruptedException ignore) { - // Task was not enqueued due to interruption. - Thread.currentThread().interrupt(); - - return false; - } - } - else - return false; - - } - finally { - finishLock.readLock().unlock(); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5ea4b42/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerTask.java deleted file mode 100644 index ba788b4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerTask.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.igfs; - -import org.apache.ignite.*; - -/** - * Generic IGFS worker task which could potentially throw an exception. - */ -public interface IgfsFileWorkerTask { - /** - * Execute task logic. - * - * @throws IgniteCheckedException If failed. - */ - public void execute() throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5ea4b42/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 824f178..4b4203e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -351,7 +351,8 @@ public final class IgfsImpl implements IgfsEx { if (batch != null) { try { - batch.awaitIfFinished(); + if (batch.finishing()) + batch.await(); } catch (IgniteCheckedException ignore) { // No-op.