This is an automated email from the ASF dual-hosted git repository.

ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-io.git

commit 518071297ee09d47ca8e9945cd050f386fc36ed4
Author: Gary Gregory <gardgreg...@gmail.com>
AuthorDate: Thu Oct 29 20:15:55 2020 -0400

    [IO-510] Add and adapt ReadAheadInputStream and
    BufferedFileChannelInputStream from Apache Spark.
---
 src/changes/changes.xml                            |   3 +
 .../io/input/BufferedFileChannelInputStream.java   | 243 ++++++++++++
 .../commons/io/input/ReadAheadInputStream.java     | 437 +++++++++++++++++++++
 .../commons/io/input/AbstractInputStreamTest.java  | 164 ++++++++
 .../input/BufferedFileChannelInputStreamTest.java  |  44 +++
 .../commons/io/input/ReadAheadInputStreamTest.java |  49 +++
 6 files changed, 940 insertions(+)

diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 41cc4f3..9ef4a98 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -81,6 +81,9 @@ The <action> type attribute can be add,update,fix,remove.
       <action dev="ggregory" type="add" due-to="Gary Gregory">
         Let org.apache.commons.io.filefilter classes work with 
java.nio.file.Files#newDirectoryStream(Path, DirectoryStream.Filter).
       </action>
+      <action issue-"IO-510" dev="ggregory" type="add" due-to="Gary Gregory, 
Apache Spark, David Mollitor">
+        Add and adapt ReadAheadInputStream and BufferedFileChannelInputStream 
from Apache Spark.
+      </action>
       <!-- UPDATES -->
       <action dev="ggregory" type="update" due-to="Dependabot">
         Update junit-jupiter from 5.6.2 to 5.7.0 #153.
diff --git 
a/src/main/java/org/apache/commons/io/input/BufferedFileChannelInputStream.java 
b/src/main/java/org/apache/commons/io/input/BufferedFileChannelInputStream.java
new file mode 100644
index 0000000..e24c1a0
--- /dev/null
+++ 
b/src/main/java/org/apache/commons/io/input/BufferedFileChannelInputStream.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Objects;
+
+import org.apache.commons.io.IOUtils;
+
+import sun.misc.Cleaner;
+import sun.nio.ch.DirectBuffer;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer to read a file 
to avoid extra copy of data between Java
+ * and native memory which happens when using {@link 
java.io.BufferedInputStream}. Unfortunately, this is not something
+ * already available in JDK, {@code sun.nio.ch.ChannelInputStream} supports 
reading a file using NIO, but does not
+ * support buffering.
+ * <p>
+ * This class was ported and adapted from Apache Spark commit 
933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was
+ * called {@code NioBufferedFileInputStream}.
+ * </p>
+ *
+ * @since 2.9.0
+ */
+@SuppressWarnings("restriction")
+public final class BufferedFileChannelInputStream extends InputStream {
+
+    private final ByteBuffer byteBuffer;
+
+    private final FileChannel fileChannel;
+
+    /**
+     * Constructs a new instance for the given File.
+     *
+     * @param file The file to stream.
+     * @throws IOException If an I/O error occurs
+     */
+    public BufferedFileChannelInputStream(final File file) throws IOException {
+        this(file, IOUtils.DEFAULT_BUFFER_SIZE);
+    }
+
+    /**
+     * Constructs a new instance for the given File and buffer size.
+     *
+     * @param file The file to stream.
+     * @param bufferSizeInBytes buffer size.
+     * @throws IOException If an I/O error occurs
+     */
+    public BufferedFileChannelInputStream(final File file, final int 
bufferSizeInBytes) throws IOException {
+        this(file.toPath(), bufferSizeInBytes);
+    }
+
+    /**
+     * Constructs a new instance for the given Path.
+     *
+     * @param path The path to stream.
+     * @throws IOException If an I/O error occurs
+     */
+    public BufferedFileChannelInputStream(final Path path) throws IOException {
+        this(path, IOUtils.DEFAULT_BUFFER_SIZE);
+    }
+
+    /**
+     * Constructs a new instance for the given Path and buffer size.
+     *
+     * @param path The path to stream.
+     * @param bufferSizeInBytes buffer size.
+     * @throws IOException If an I/O error occurs
+     */
+    public BufferedFileChannelInputStream(final Path path, final int 
bufferSizeInBytes) throws IOException {
+        Objects.requireNonNull(path, "path");
+        fileChannel = FileChannel.open(path, StandardOpenOption.READ);
+        byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+        byteBuffer.flip();
+    }
+
+    @Override
+    public synchronized int available() throws IOException {
+        return byteBuffer.remaining();
+    }
+
+    /**
+     * In Java 8, the type of DirectBuffer.cleaner() was sun.misc.Cleaner, and 
it was possible to access the method
+     * sun.misc.Cleaner.clean() to invoke it. The type changed to 
jdk.internal.ref.Cleaner in later JDKs, and the
+     * .clean() method is not accessible even with reflection. However 
sun.misc.Unsafe added a invokeCleaner() method in
+     * JDK 9+ and this is still accessible with reflection.
+     */
+    private void bufferCleaner(final DirectBuffer buffer) {
+        //
+        // Ported from StorageUtils.scala.
+        //
+//      private val bufferCleaner: DirectBuffer => Unit =
+//      if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
+//        val cleanerMethod =
+//          Utils.classForName("sun.misc.Unsafe").getMethod("invokeCleaner", 
classOf[ByteBuffer])
+//        val unsafeField = classOf[Unsafe].getDeclaredField("theUnsafe")
+//        unsafeField.setAccessible(true)
+//        val unsafe = unsafeField.get(null).asInstanceOf[Unsafe]
+//        buffer: DirectBuffer => cleanerMethod.invoke(unsafe, buffer)
+//      } else {
+//        val cleanerMethod = 
Utils.classForName("sun.misc.Cleaner").getMethod("clean")
+//        buffer: DirectBuffer => {
+//          // Careful to avoid the return type of .cleaner(), which changes 
with JDK
+//          val cleaner: AnyRef = buffer.cleaner()
+//          if (cleaner != null) {
+//            cleanerMethod.invoke(cleaner)
+//          }
+//        }
+//      }
+        //
+        final String specVer = 
System.getProperty("java.specification.version");
+        if ("1.8".equals(specVer)) {
+            // On Java 8.
+            final Cleaner cleaner = buffer.cleaner();
+            if (cleaner != null) {
+                cleaner.clean();
+            }
+        } else {
+            // On Java 9 and up, but compiled on Java 8.
+            try {
+                final Class<?> cls = Class.forName("sun.misc.Unsafe");
+                final Method cleanerMethod = cls.getMethod("invokeCleaner", 
ByteBuffer.class);
+                final Field unsafeField = cls.getDeclaredField("theUnsafe");
+                unsafeField.setAccessible(true);
+                cleanerMethod.invoke(unsafeField.get(null), buffer);
+            } catch (ReflectiveOperationException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+        try {
+            fileChannel.close();
+        } finally {
+            dispose(byteBuffer);
+        }
+    }
+
+    /**
+     * Attempts to clean up a ByteBuffer if it is direct or memory-mapped. 
This uses an *unsafe* Sun API that will cause
+     * errors if one attempts to read from the disposed buffer. However, 
neither the bytes allocated to direct buffers
+     * nor file descriptors opened for memory-mapped buffers put pressure on 
the garbage collector. Waiting for garbage
+     * collection may lead to the depletion of off-heap memory or huge numbers 
of open files. There's unfortunately no
+     * standard API to manually dispose of these kinds of buffers.
+     */
+    private void dispose(final ByteBuffer buffer) {
+        if (buffer instanceof sun.nio.ch.DirectBuffer) {
+            bufferCleaner((sun.nio.ch.DirectBuffer) buffer);
+        }
+    }
+
+    @Override
+    public synchronized int read() throws IOException {
+        if (!refill()) {
+            return -1;
+        }
+        return byteBuffer.get() & 0xFF;
+    }
+
+    @Override
+    public synchronized int read(final byte[] b, final int offset, int len) 
throws IOException {
+        if (offset < 0 || len < 0 || offset + len < 0 || offset + len > 
b.length) {
+            throw new IndexOutOfBoundsException();
+        }
+        if (!refill()) {
+            return -1;
+        }
+        len = Math.min(len, byteBuffer.remaining());
+        byteBuffer.get(b, offset, len);
+        return len;
+    }
+
+    /**
+     * Checks whether data is left to be read from the input stream.
+     *
+     * @return true if data is left, false otherwise
+     */
+    private boolean refill() throws IOException {
+        if (!byteBuffer.hasRemaining()) {
+            byteBuffer.clear();
+            int nRead = 0;
+            while (nRead == 0) {
+                nRead = fileChannel.read(byteBuffer);
+            }
+            byteBuffer.flip();
+            if (nRead < 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public synchronized long skip(final long n) throws IOException {
+        if (n <= 0L) {
+            return 0L;
+        }
+        if (byteBuffer.remaining() >= n) {
+            // The buffered content is enough to skip
+            byteBuffer.position(byteBuffer.position() + (int) n);
+            return n;
+        }
+        final long skippedFromBuffer = byteBuffer.remaining();
+        final long toSkipFromFileChannel = n - skippedFromBuffer;
+        // Discard everything we have read in the buffer.
+        byteBuffer.position(0);
+        byteBuffer.flip();
+        return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
+    }
+
+    private long skipFromFileChannel(final long n) throws IOException {
+        final long currentFilePosition = fileChannel.position();
+        final long size = fileChannel.size();
+        if (n > size - currentFilePosition) {
+            fileChannel.position(size);
+            return size - currentFilePosition;
+        }
+        fileChannel.position(currentFilePosition + n);
+        return n;
+    }
+
+}
diff --git 
a/src/main/java/org/apache/commons/io/input/ReadAheadInputStream.java 
b/src/main/java/org/apache/commons/io/input/ReadAheadInputStream.java
new file mode 100644
index 0000000..e6abaad
--- /dev/null
+++ b/src/main/java/org/apache/commons/io/input/ReadAheadInputStream.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+// import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Implements {@link InputStream} to asynchronously read ahead from an 
underlying input stream when a specified amount
+ * of data has been read from the current buffer. It does so by maintaining 
two buffers: an active buffer and a read
+ * ahead buffer. The active buffer contains data which should be returned when 
a read() call is issued. The read ahead
+ * buffer is used to asynchronously read from the underlying input stream. 
When the current active buffer is exhausted,
+ * we flip the two buffers so that we can start reading from the read ahead 
buffer without being blocked by disk I/O.
+ * <p>
+ * This class was ported and adapted from Apache Spark commit 
933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
+ * </p>
+ *
+ * @since 2.9.0
+ */
+public class ReadAheadInputStream extends InputStream {
+
+    private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+    /**
+     * Creates a new daemon executor service.
+     *
+     * @return a new daemon executor service.
+     */
+    private static ExecutorService newExecutorService() {
+        return 
Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread);
+    }
+
+    /**
+     * Creates a new daemon thread.
+     * 
+     * @param r the thread's runnable.
+     * @return a new daemon thread.
+     */
+    private static Thread newThread(Runnable r) {
+        final Thread thread = new Thread(r, "commons-io-read-ahead");
+        thread.setDaemon(true);
+        return thread;
+    }
+
+    private final ReentrantLock stateChangeLock = new ReentrantLock();
+
+    // @GuardedBy("stateChangeLock")
+    private ByteBuffer activeBuffer;
+
+    // @GuardedBy("stateChangeLock")
+    private ByteBuffer readAheadBuffer;
+
+    // @GuardedBy("stateChangeLock")
+    private boolean endOfStream;
+
+    // @GuardedBy("stateChangeLock")
+    // true if async read is in progress
+    private boolean readInProgress;
+
+    // @GuardedBy("stateChangeLock")
+    // true if read is aborted due to an exception in reading from underlying 
input stream.
+    private boolean readAborted;
+
+    // @GuardedBy("stateChangeLock")
+    private Throwable readException;
+
+    // @GuardedBy("stateChangeLock")
+    // whether the close method is called.
+    private boolean isClosed;
+
+    // @GuardedBy("stateChangeLock")
+    // true when the close method will close the underlying input stream. This 
is valid only if
+    // `isClosed` is true.
+    private boolean isUnderlyingInputStreamBeingClosed;
+
+    // @GuardedBy("stateChangeLock")
+    // whether there is a read ahead task running,
+    private boolean isReading;
+
+    // Whether there is a reader waiting for data.
+    private final AtomicBoolean isWaiting = new AtomicBoolean(false);
+
+    private final InputStream underlyingInputStream;
+
+    private final ExecutorService executorService;
+
+    private final Condition asyncReadComplete = stateChangeLock.newCondition();
+
+    /**
+     * Creates an instance with the specified buffer size and read-ahead 
threshold
+     *
+     * @param inputStream The underlying input stream.
+     * @param bufferSizeInBytes The buffer size.
+     */
+    public ReadAheadInputStream(final InputStream inputStream, final int 
bufferSizeInBytes) {
+        this(inputStream, bufferSizeInBytes, newExecutorService());
+    }
+
+    /**
+     * Creates an instance with the specified buffer size and read-ahead 
threshold
+     *
+     * @param inputStream The underlying input stream.
+     * @param bufferSizeInBytes The buffer size.
+     * @param executorService An executor service for the read-ahead thread.
+     */
+    public ReadAheadInputStream(final InputStream inputStream, final int 
bufferSizeInBytes,
+        final ExecutorService executorService) {
+        if (bufferSizeInBytes <= 0) {
+            throw new IllegalArgumentException(
+                "bufferSizeInBytes should be greater than 0, but the value is 
" + bufferSizeInBytes);
+        }
+        this.executorService = executorService;
+        this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+        this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+        this.underlyingInputStream = inputStream;
+        this.activeBuffer.flip();
+        this.readAheadBuffer.flip();
+    }
+
+    @Override
+    public int available() throws IOException {
+        stateChangeLock.lock();
+        // Make sure we have no integer overflow.
+        try {
+            return (int) Math.min(Integer.MAX_VALUE, (long) 
activeBuffer.remaining() + readAheadBuffer.remaining());
+        } finally {
+            stateChangeLock.unlock();
+        }
+    }
+
+    private void checkReadException() throws IOException {
+        if (readAborted) {
+            if (readException instanceof IOException) {
+                throw (IOException) readException;
+            }
+            throw new IOException(readException);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        boolean isSafeToCloseUnderlyingInputStream = false;
+        stateChangeLock.lock();
+        try {
+            if (isClosed) {
+                return;
+            }
+            isClosed = true;
+            if (!isReading) {
+                // Nobody is reading, so we can close the underlying input 
stream in this method.
+                isSafeToCloseUnderlyingInputStream = true;
+                // Flip this to make sure the read ahead task will not close 
the underlying input stream.
+                isUnderlyingInputStreamBeingClosed = true;
+            }
+        } finally {
+            stateChangeLock.unlock();
+        }
+
+        try {
+            executorService.shutdownNow();
+            executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+        } catch (final InterruptedException e) {
+            final InterruptedIOException iio = new 
InterruptedIOException(e.getMessage());
+            iio.initCause(e);
+            throw iio;
+        } finally {
+            if (isSafeToCloseUnderlyingInputStream) {
+                underlyingInputStream.close();
+            }
+        }
+    }
+
+    private void closeUnderlyingInputStreamIfNecessary() {
+        boolean needToCloseUnderlyingInputStream = false;
+        stateChangeLock.lock();
+        try {
+            isReading = false;
+            if (isClosed && !isUnderlyingInputStreamBeingClosed) {
+                // close method cannot close underlyingInputStream because we 
were reading.
+                needToCloseUnderlyingInputStream = true;
+            }
+        } finally {
+            stateChangeLock.unlock();
+        }
+        if (needToCloseUnderlyingInputStream) {
+            try {
+                underlyingInputStream.close();
+            } catch (final IOException e) {
+                // TODO ?
+            }
+        }
+    }
+
+    private boolean isEndOfStream() {
+        return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() 
&& endOfStream;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (activeBuffer.hasRemaining()) {
+            // short path - just get one byte.
+            return activeBuffer.get() & 0xFF;
+        }
+        final byte[] oneByteArray = oneByte.get();
+        return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF;
+    }
+
+    @Override
+    public int read(final byte[] b, final int offset, int len) throws 
IOException {
+        if (offset < 0 || len < 0 || len > b.length - offset) {
+            throw new IndexOutOfBoundsException();
+        }
+        if (len == 0) {
+            return 0;
+        }
+
+        if (!activeBuffer.hasRemaining()) {
+            // No remaining in active buffer - lock and switch to write ahead 
buffer.
+            stateChangeLock.lock();
+            try {
+                waitForAsyncReadComplete();
+                if (!readAheadBuffer.hasRemaining()) {
+                    // The first read.
+                    readAsync();
+                    waitForAsyncReadComplete();
+                    if (isEndOfStream()) {
+                        return -1;
+                    }
+                }
+                // Swap the newly read read ahead buffer in place of empty 
active buffer.
+                swapBuffers();
+                // After swapping buffers, trigger another async read for read 
ahead buffer.
+                readAsync();
+            } finally {
+                stateChangeLock.unlock();
+            }
+        }
+        len = Math.min(len, activeBuffer.remaining());
+        activeBuffer.get(b, offset, len);
+
+        return len;
+    }
+
+    /** Read data from underlyingInputStream to readAheadBuffer 
asynchronously. */
+    private void readAsync() throws IOException {
+        stateChangeLock.lock();
+        final byte[] arr;
+        try {
+            arr = readAheadBuffer.array();
+            if (endOfStream || readInProgress) {
+                return;
+            }
+            checkReadException();
+            readAheadBuffer.position(0);
+            readAheadBuffer.flip();
+            readInProgress = true;
+        } finally {
+            stateChangeLock.unlock();
+        }
+        executorService.execute(() -> {
+            stateChangeLock.lock();
+            try {
+                if (isClosed) {
+                    readInProgress = false;
+                    return;
+                }
+                // Flip this so that the close method will not close the 
underlying input stream when we
+                // are reading.
+                isReading = true;
+            } finally {
+                stateChangeLock.unlock();
+            }
+
+            // Please note that it is safe to release the lock and read into 
the read ahead buffer
+            // because either of following two conditions will hold:
+            //
+            // 1. The active buffer has data available to read so the reader 
will not read from the read ahead buffer.
+            //
+            // 2. This is the first time read is called or the active buffer 
is exhausted, in that case the reader waits
+            // for this async read to complete.
+            //
+            // So there is no race condition in both the situations.
+            int read = 0;
+            int off = 0, len = arr.length;
+            Throwable exception = null;
+            try {
+                // try to fill the read ahead buffer.
+                // if a reader is waiting, possibly return early.
+                do {
+                    read = underlyingInputStream.read(arr, off, len);
+                    if (read <= 0) {
+                        break;
+                    }
+                    off += read;
+                    len -= read;
+                } while (len > 0 && !isWaiting.get());
+            } catch (final Throwable ex) {
+                exception = ex;
+                if (ex instanceof Error) {
+                    // `readException` may not be reported to the user. 
Rethrow Error to make sure at least
+                    // The user can see Error in UncaughtExceptionHandler.
+                    throw (Error) ex;
+                }
+            } finally {
+                stateChangeLock.lock();
+                try {
+                    readAheadBuffer.limit(off);
+                    if (read < 0 || (exception instanceof EOFException)) {
+                        endOfStream = true;
+                    } else if (exception != null) {
+                        readAborted = true;
+                        readException = exception;
+                    }
+                    readInProgress = false;
+                    signalAsyncReadComplete();
+                } finally {
+                    stateChangeLock.unlock();
+                }
+                closeUnderlyingInputStreamIfNecessary();
+            }
+        });
+    }
+
+    private void signalAsyncReadComplete() {
+        stateChangeLock.lock();
+        try {
+            asyncReadComplete.signalAll();
+        } finally {
+            stateChangeLock.unlock();
+        }
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+        if (n <= 0L) {
+            return 0L;
+        }
+        if (n <= activeBuffer.remaining()) {
+            // Only skipping from active buffer is sufficient
+            activeBuffer.position((int) n + activeBuffer.position());
+            return n;
+        }
+        stateChangeLock.lock();
+        long skipped;
+        try {
+            skipped = skipInternal(n);
+        } finally {
+            stateChangeLock.unlock();
+        }
+        return skipped;
+    }
+
+    /**
+     * Internal skip function which should be called only from skip(). The 
assumption is that the stateChangeLock is
+     * already acquired in the caller before calling this function.
+     */
+    private long skipInternal(final long n) throws IOException {
+        assert stateChangeLock.isLocked();
+        waitForAsyncReadComplete();
+        if (isEndOfStream()) {
+            return 0;
+        }
+        if (available() >= n) {
+            // we can skip from the internal buffers
+            int toSkip = (int) n;
+            // We need to skip from both active buffer and read ahead buffer
+            toSkip -= activeBuffer.remaining();
+            assert toSkip > 0; // skipping from activeBuffer already handled.
+            activeBuffer.position(0);
+            activeBuffer.flip();
+            readAheadBuffer.position(toSkip + readAheadBuffer.position());
+            swapBuffers();
+            // Trigger async read to emptied read ahead buffer.
+            readAsync();
+            return n;
+        }
+        final int skippedBytes = available();
+        final long toSkip = n - skippedBytes;
+        activeBuffer.position(0);
+        activeBuffer.flip();
+        readAheadBuffer.position(0);
+        readAheadBuffer.flip();
+        final long skippedFromInputStream = underlyingInputStream.skip(toSkip);
+        readAsync();
+        return skippedBytes + skippedFromInputStream;
+    }
+
+    /**
+     * Flips the active and read ahead buffer
+     */
+    private void swapBuffers() {
+        final ByteBuffer temp = activeBuffer;
+        activeBuffer = readAheadBuffer;
+        readAheadBuffer = temp;
+    }
+
+    private void waitForAsyncReadComplete() throws IOException {
+        stateChangeLock.lock();
+        try {
+            isWaiting.set(true);
+            // There is only one reader, and one writer, so the writer should 
signal only once,
+            // but a while loop checking the wake up condition is still needed 
to avoid spurious wakeups.
+            while (readInProgress) {
+                asyncReadComplete.await();
+            }
+        } catch (final InterruptedException e) {
+            final InterruptedIOException iio = new 
InterruptedIOException(e.getMessage());
+            iio.initCause(e);
+            throw iio;
+        } finally {
+            isWaiting.set(false);
+            stateChangeLock.unlock();
+        }
+        checkReadException();
+    }
+}
diff --git 
a/src/test/java/org/apache/commons/io/input/AbstractInputStreamTest.java 
b/src/test/java/org/apache/commons/io/input/AbstractInputStreamTest.java
new file mode 100644
index 0000000..eaef96e
--- /dev/null
+++ b/src/test/java/org/apache/commons/io/input/AbstractInputStreamTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests functionality of {@link BufferedFileChannelInputStream}.
+ *
+ * This class was ported and adapted from Apache Spark commit 
933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was
+ * called {@code GenericFileInputStreamSuite}.
+ */
+public abstract class AbstractInputStreamTest {
+
+    private byte[] randomBytes;
+
+    protected File inputFile;
+
+    protected InputStream[] inputStreams;
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        // Create a byte array of size 2 MB with random bytes
+        randomBytes = RandomUtils.nextBytes(2 * 1024 * 1024);
+        inputFile = File.createTempFile("temp-file", ".tmp");
+        FileUtils.writeByteArrayToFile(inputFile, randomBytes);
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        inputFile.delete();
+
+        for (final InputStream is : inputStreams) {
+            is.close();
+        }
+    }
+
+    @Test
+    public void testBytesSkipped() throws IOException {
+        for (final InputStream inputStream : inputStreams) {
+            assertEquals(1024, inputStream.skip(1024));
+            for (int i = 1024; i < randomBytes.length; i++) {
+                assertEquals(randomBytes[i], (byte) inputStream.read());
+            }
+        }
+    }
+
+    @Test
+    public void testBytesSkippedAfterEOF() throws IOException {
+        for (final InputStream inputStream : inputStreams) {
+            assertEquals(randomBytes.length, 
inputStream.skip(randomBytes.length + 1));
+            assertEquals(-1, inputStream.read());
+        }
+    }
+
+    @Test
+    public void testBytesSkippedAfterRead() throws IOException {
+        for (final InputStream inputStream : inputStreams) {
+            for (int i = 0; i < 1024; i++) {
+                assertEquals(randomBytes[i], (byte) inputStream.read());
+            }
+            assertEquals(1024, inputStream.skip(1024));
+            for (int i = 2048; i < randomBytes.length; i++) {
+                assertEquals(randomBytes[i], (byte) inputStream.read());
+            }
+        }
+    }
+
+    @Test
+    public void testNegativeBytesSkippedAfterRead() throws IOException {
+        for (final InputStream inputStream : inputStreams) {
+            for (int i = 0; i < 1024; i++) {
+                assertEquals(randomBytes[i], (byte) inputStream.read());
+            }
+            // Skipping negative bytes should essential be a no-op
+            assertEquals(0, inputStream.skip(-1));
+            assertEquals(0, inputStream.skip(-1024));
+            assertEquals(0, inputStream.skip(Long.MIN_VALUE));
+            assertEquals(1024, inputStream.skip(1024));
+            for (int i = 2048; i < randomBytes.length; i++) {
+                assertEquals(randomBytes[i], (byte) inputStream.read());
+            }
+        }
+    }
+
+    @Test
+    public void testReadMultipleBytes() throws IOException {
+        for (final InputStream inputStream : inputStreams) {
+            final byte[] readBytes = new byte[8 * 1024];
+            int i = 0;
+            while (i < randomBytes.length) {
+                final int read = inputStream.read(readBytes, 0, 8 * 1024);
+                for (int j = 0; j < read; j++) {
+                    assertEquals(randomBytes[i], readBytes[j]);
+                    i++;
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testReadOneByte() throws IOException {
+        for (final InputStream inputStream : inputStreams) {
+            for (final byte randomByte : randomBytes) {
+                assertEquals(randomByte, (byte) inputStream.read());
+            }
+        }
+    }
+
+    @Test
+    public void testReadPastEOF() throws IOException {
+        final InputStream is = inputStreams[0];
+        final byte[] buf = new byte[1024];
+        int read;
+        while ((read = is.read(buf, 0, buf.length)) != -1) {
+            
+        }
+
+        final int readAfterEOF = is.read(buf, 0, buf.length);
+        assertEquals(-1, readAfterEOF);
+    }
+
+    @Test
+    public void testSkipFromFileChannel() throws IOException {
+        for (final InputStream inputStream : inputStreams) {
+            // Since the buffer is smaller than the skipped bytes, this will 
guarantee
+            // we skip from underlying file channel.
+            assertEquals(1024, inputStream.skip(1024));
+            for (int i = 1024; i < 2048; i++) {
+                assertEquals(randomBytes[i], (byte) inputStream.read());
+            }
+            assertEquals(256, inputStream.skip(256));
+            assertEquals(256, inputStream.skip(256));
+            assertEquals(512, inputStream.skip(512));
+            for (int i = 3072; i < randomBytes.length; i++) {
+                assertEquals(randomBytes[i], (byte) inputStream.read());
+            }
+        }
+    }
+}
diff --git 
a/src/test/java/org/apache/commons/io/input/BufferedFileChannelInputStreamTest.java
 
b/src/test/java/org/apache/commons/io/input/BufferedFileChannelInputStreamTest.java
new file mode 100644
index 0000000..2580abe
--- /dev/null
+++ 
b/src/test/java/org/apache/commons/io/input/BufferedFileChannelInputStreamTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Tests functionality of {@link BufferedFileChannelInputStream}.
+ *
+ * This class was ported and adapted from Apache Spark commit 
933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 wher it was
+ * called {@code BufferedFileChannelInputStreamSuite}.
+ */
+public class BufferedFileChannelInputStreamTest extends 
AbstractInputStreamTest {
+
+    @SuppressWarnings("resource")
+    @Override
+    @BeforeEach
+    public void setUp() throws IOException {
+        super.setUp();
+        // @formatter:off
+        inputStreams = new InputStream[] {
+            new BufferedFileChannelInputStream(inputFile), // default
+            new BufferedFileChannelInputStream(inputFile, 123) // small, 
unaligned buffer
+        };
+        //@formatter:on
+    }
+}
diff --git 
a/src/test/java/org/apache/commons/io/input/ReadAheadInputStreamTest.java 
b/src/test/java/org/apache/commons/io/input/ReadAheadInputStreamTest.java
new file mode 100644
index 0000000..cf1c643
--- /dev/null
+++ b/src/test/java/org/apache/commons/io/input/ReadAheadInputStreamTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Tests {@link ReadAheadInputStream}.
+ *
+ * This class was ported and adapted from Apache Spark commit 
933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was
+ * called {@code ReadAheadInputStreamSuite}.
+ */
+public class ReadAheadInputStreamTest extends AbstractInputStreamTest {
+
+    @SuppressWarnings("resource")
+    @Override
+    @BeforeEach
+    public void setUp() throws IOException {
+        super.setUp();
+        inputStreams = new InputStream[] {
+            // Tests equal and aligned buffers of wrapped an outer stream.
+            new ReadAheadInputStream(new 
BufferedFileChannelInputStream(inputFile, 8 * 1024), 8 * 1024),
+            // Tests aligned buffers, wrapped bigger than outer.
+            new ReadAheadInputStream(new 
BufferedFileChannelInputStream(inputFile, 3 * 1024), 2 * 1024),
+            // Tests aligned buffers, wrapped smaller than outer.
+            new ReadAheadInputStream(new 
BufferedFileChannelInputStream(inputFile, 2 * 1024), 3 * 1024),
+            // Tests unaligned buffers, wrapped bigger than outer.
+            new ReadAheadInputStream(new 
BufferedFileChannelInputStream(inputFile, 321), 123),
+            // Tests unaligned buffers, wrapped smaller than outer.
+            new ReadAheadInputStream(new 
BufferedFileChannelInputStream(inputFile, 123), 321)};
+    }
+}

Reply via email to