This is an automated email from the ASF dual-hosted git repository. ggregory pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-io.git
commit 518071297ee09d47ca8e9945cd050f386fc36ed4 Author: Gary Gregory <gardgreg...@gmail.com> AuthorDate: Thu Oct 29 20:15:55 2020 -0400 [IO-510] Add and adapt ReadAheadInputStream and BufferedFileChannelInputStream from Apache Spark. --- src/changes/changes.xml | 3 + .../io/input/BufferedFileChannelInputStream.java | 243 ++++++++++++ .../commons/io/input/ReadAheadInputStream.java | 437 +++++++++++++++++++++ .../commons/io/input/AbstractInputStreamTest.java | 164 ++++++++ .../input/BufferedFileChannelInputStreamTest.java | 44 +++ .../commons/io/input/ReadAheadInputStreamTest.java | 49 +++ 6 files changed, 940 insertions(+) diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 41cc4f3..9ef4a98 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -81,6 +81,9 @@ The <action> type attribute can be add,update,fix,remove. <action dev="ggregory" type="add" due-to="Gary Gregory"> Let org.apache.commons.io.filefilter classes work with java.nio.file.Files#newDirectoryStream(Path, DirectoryStream.Filter). </action> + <action issue-"IO-510" dev="ggregory" type="add" due-to="Gary Gregory, Apache Spark, David Mollitor"> + Add and adapt ReadAheadInputStream and BufferedFileChannelInputStream from Apache Spark. + </action> <!-- UPDATES --> <action dev="ggregory" type="update" due-to="Dependabot"> Update junit-jupiter from 5.6.2 to 5.7.0 #153. diff --git a/src/main/java/org/apache/commons/io/input/BufferedFileChannelInputStream.java b/src/main/java/org/apache/commons/io/input/BufferedFileChannelInputStream.java new file mode 100644 index 0000000..e24c1a0 --- /dev/null +++ b/src/main/java/org/apache/commons/io/input/BufferedFileChannelInputStream.java @@ -0,0 +1,243 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.io.input; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Objects; + +import org.apache.commons.io.IOUtils; + +import sun.misc.Cleaner; +import sun.nio.ch.DirectBuffer; + +/** + * {@link InputStream} implementation which uses direct buffer to read a file to avoid extra copy of data between Java + * and native memory which happens when using {@link java.io.BufferedInputStream}. Unfortunately, this is not something + * already available in JDK, {@code sun.nio.ch.ChannelInputStream} supports reading a file using NIO, but does not + * support buffering. + * <p> + * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was + * called {@code NioBufferedFileInputStream}. + * </p> + * + * @since 2.9.0 + */ +@SuppressWarnings("restriction") +public final class BufferedFileChannelInputStream extends InputStream { + + private final ByteBuffer byteBuffer; + + private final FileChannel fileChannel; + + /** + * Constructs a new instance for the given File. + * + * @param file The file to stream. + * @throws IOException If an I/O error occurs + */ + public BufferedFileChannelInputStream(final File file) throws IOException { + this(file, IOUtils.DEFAULT_BUFFER_SIZE); + } + + /** + * Constructs a new instance for the given File and buffer size. + * + * @param file The file to stream. + * @param bufferSizeInBytes buffer size. + * @throws IOException If an I/O error occurs + */ + public BufferedFileChannelInputStream(final File file, final int bufferSizeInBytes) throws IOException { + this(file.toPath(), bufferSizeInBytes); + } + + /** + * Constructs a new instance for the given Path. + * + * @param path The path to stream. + * @throws IOException If an I/O error occurs + */ + public BufferedFileChannelInputStream(final Path path) throws IOException { + this(path, IOUtils.DEFAULT_BUFFER_SIZE); + } + + /** + * Constructs a new instance for the given Path and buffer size. + * + * @param path The path to stream. + * @param bufferSizeInBytes buffer size. + * @throws IOException If an I/O error occurs + */ + public BufferedFileChannelInputStream(final Path path, final int bufferSizeInBytes) throws IOException { + Objects.requireNonNull(path, "path"); + fileChannel = FileChannel.open(path, StandardOpenOption.READ); + byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes); + byteBuffer.flip(); + } + + @Override + public synchronized int available() throws IOException { + return byteBuffer.remaining(); + } + + /** + * In Java 8, the type of DirectBuffer.cleaner() was sun.misc.Cleaner, and it was possible to access the method + * sun.misc.Cleaner.clean() to invoke it. The type changed to jdk.internal.ref.Cleaner in later JDKs, and the + * .clean() method is not accessible even with reflection. However sun.misc.Unsafe added a invokeCleaner() method in + * JDK 9+ and this is still accessible with reflection. + */ + private void bufferCleaner(final DirectBuffer buffer) { + // + // Ported from StorageUtils.scala. + // +// private val bufferCleaner: DirectBuffer => Unit = +// if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { +// val cleanerMethod = +// Utils.classForName("sun.misc.Unsafe").getMethod("invokeCleaner", classOf[ByteBuffer]) +// val unsafeField = classOf[Unsafe].getDeclaredField("theUnsafe") +// unsafeField.setAccessible(true) +// val unsafe = unsafeField.get(null).asInstanceOf[Unsafe] +// buffer: DirectBuffer => cleanerMethod.invoke(unsafe, buffer) +// } else { +// val cleanerMethod = Utils.classForName("sun.misc.Cleaner").getMethod("clean") +// buffer: DirectBuffer => { +// // Careful to avoid the return type of .cleaner(), which changes with JDK +// val cleaner: AnyRef = buffer.cleaner() +// if (cleaner != null) { +// cleanerMethod.invoke(cleaner) +// } +// } +// } + // + final String specVer = System.getProperty("java.specification.version"); + if ("1.8".equals(specVer)) { + // On Java 8. + final Cleaner cleaner = buffer.cleaner(); + if (cleaner != null) { + cleaner.clean(); + } + } else { + // On Java 9 and up, but compiled on Java 8. + try { + final Class<?> cls = Class.forName("sun.misc.Unsafe"); + final Method cleanerMethod = cls.getMethod("invokeCleaner", ByteBuffer.class); + final Field unsafeField = cls.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + cleanerMethod.invoke(unsafeField.get(null), buffer); + } catch (ReflectiveOperationException e) { + throw new IllegalStateException(e); + } + } + } + + @Override + public synchronized void close() throws IOException { + try { + fileChannel.close(); + } finally { + dispose(byteBuffer); + } + } + + /** + * Attempts to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun API that will cause + * errors if one attempts to read from the disposed buffer. However, neither the bytes allocated to direct buffers + * nor file descriptors opened for memory-mapped buffers put pressure on the garbage collector. Waiting for garbage + * collection may lead to the depletion of off-heap memory or huge numbers of open files. There's unfortunately no + * standard API to manually dispose of these kinds of buffers. + */ + private void dispose(final ByteBuffer buffer) { + if (buffer instanceof sun.nio.ch.DirectBuffer) { + bufferCleaner((sun.nio.ch.DirectBuffer) buffer); + } + } + + @Override + public synchronized int read() throws IOException { + if (!refill()) { + return -1; + } + return byteBuffer.get() & 0xFF; + } + + @Override + public synchronized int read(final byte[] b, final int offset, int len) throws IOException { + if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) { + throw new IndexOutOfBoundsException(); + } + if (!refill()) { + return -1; + } + len = Math.min(len, byteBuffer.remaining()); + byteBuffer.get(b, offset, len); + return len; + } + + /** + * Checks whether data is left to be read from the input stream. + * + * @return true if data is left, false otherwise + */ + private boolean refill() throws IOException { + if (!byteBuffer.hasRemaining()) { + byteBuffer.clear(); + int nRead = 0; + while (nRead == 0) { + nRead = fileChannel.read(byteBuffer); + } + byteBuffer.flip(); + if (nRead < 0) { + return false; + } + } + return true; + } + + @Override + public synchronized long skip(final long n) throws IOException { + if (n <= 0L) { + return 0L; + } + if (byteBuffer.remaining() >= n) { + // The buffered content is enough to skip + byteBuffer.position(byteBuffer.position() + (int) n); + return n; + } + final long skippedFromBuffer = byteBuffer.remaining(); + final long toSkipFromFileChannel = n - skippedFromBuffer; + // Discard everything we have read in the buffer. + byteBuffer.position(0); + byteBuffer.flip(); + return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel); + } + + private long skipFromFileChannel(final long n) throws IOException { + final long currentFilePosition = fileChannel.position(); + final long size = fileChannel.size(); + if (n > size - currentFilePosition) { + fileChannel.position(size); + return size - currentFilePosition; + } + fileChannel.position(currentFilePosition + n); + return n; + } + +} diff --git a/src/main/java/org/apache/commons/io/input/ReadAheadInputStream.java b/src/main/java/org/apache/commons/io/input/ReadAheadInputStream.java new file mode 100644 index 0000000..e6abaad --- /dev/null +++ b/src/main/java/org/apache/commons/io/input/ReadAheadInputStream.java @@ -0,0 +1,437 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.io.input; + +// import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount + * of data has been read from the current buffer. It does so by maintaining two buffers: an active buffer and a read + * ahead buffer. The active buffer contains data which should be returned when a read() call is issued. The read ahead + * buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, + * we flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O. + * <p> + * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19. + * </p> + * + * @since 2.9.0 + */ +public class ReadAheadInputStream extends InputStream { + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a new daemon executor service. + * + * @return a new daemon executor service. + */ + private static ExecutorService newExecutorService() { + return Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread); + } + + /** + * Creates a new daemon thread. + * + * @param r the thread's runnable. + * @return a new daemon thread. + */ + private static Thread newThread(Runnable r) { + final Thread thread = new Thread(r, "commons-io-read-ahead"); + thread.setDaemon(true); + return thread; + } + + private final ReentrantLock stateChangeLock = new ReentrantLock(); + + // @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + // @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + // @GuardedBy("stateChangeLock") + private boolean endOfStream; + + // @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + // @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + // @GuardedBy("stateChangeLock") + private Throwable readException; + + // @GuardedBy("stateChangeLock") + // whether the close method is called. + private boolean isClosed; + + // @GuardedBy("stateChangeLock") + // true when the close method will close the underlying input stream. This is valid only if + // `isClosed` is true. + private boolean isUnderlyingInputStreamBeingClosed; + + // @GuardedBy("stateChangeLock") + // whether there is a read ahead task running, + private boolean isReading; + + // Whether there is a reader waiting for data. + private final AtomicBoolean isWaiting = new AtomicBoolean(false); + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService; + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + /** + * Creates an instance with the specified buffer size and read-ahead threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + */ + public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) { + this(inputStream, bufferSizeInBytes, newExecutorService()); + } + + /** + * Creates an instance with the specified buffer size and read-ahead threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param executorService An executor service for the read-ahead thread. + */ + public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, + final ExecutorService executorService) { + if (bufferSizeInBytes <= 0) { + throw new IllegalArgumentException( + "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); + } + this.executorService = executorService; + this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); + this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); + this.underlyingInputStream = inputStream; + this.activeBuffer.flip(); + this.readAheadBuffer.flip(); + } + + @Override + public int available() throws IOException { + stateChangeLock.lock(); + // Make sure we have no integer overflow. + try { + return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining()); + } finally { + stateChangeLock.unlock(); + } + } + + private void checkReadException() throws IOException { + if (readAborted) { + if (readException instanceof IOException) { + throw (IOException) readException; + } + throw new IOException(readException); + } + } + + @Override + public void close() throws IOException { + boolean isSafeToCloseUnderlyingInputStream = false; + stateChangeLock.lock(); + try { + if (isClosed) { + return; + } + isClosed = true; + if (!isReading) { + // Nobody is reading, so we can close the underlying input stream in this method. + isSafeToCloseUnderlyingInputStream = true; + // Flip this to make sure the read ahead task will not close the underlying input stream. + isUnderlyingInputStreamBeingClosed = true; + } + } finally { + stateChangeLock.unlock(); + } + + try { + executorService.shutdownNow(); + executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); + iio.initCause(e); + throw iio; + } finally { + if (isSafeToCloseUnderlyingInputStream) { + underlyingInputStream.close(); + } + } + } + + private void closeUnderlyingInputStreamIfNecessary() { + boolean needToCloseUnderlyingInputStream = false; + stateChangeLock.lock(); + try { + isReading = false; + if (isClosed && !isUnderlyingInputStreamBeingClosed) { + // close method cannot close underlyingInputStream because we were reading. + needToCloseUnderlyingInputStream = true; + } + } finally { + stateChangeLock.unlock(); + } + if (needToCloseUnderlyingInputStream) { + try { + underlyingInputStream.close(); + } catch (final IOException e) { + // TODO ? + } + } + } + + private boolean isEndOfStream() { + return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream; + } + + @Override + public int read() throws IOException { + if (activeBuffer.hasRemaining()) { + // short path - just get one byte. + return activeBuffer.get() & 0xFF; + } + final byte[] oneByteArray = oneByte.get(); + return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF; + } + + @Override + public int read(final byte[] b, final int offset, int len) throws IOException { + if (offset < 0 || len < 0 || len > b.length - offset) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return 0; + } + + if (!activeBuffer.hasRemaining()) { + // No remaining in active buffer - lock and switch to write ahead buffer. + stateChangeLock.lock(); + try { + waitForAsyncReadComplete(); + if (!readAheadBuffer.hasRemaining()) { + // The first read. + readAsync(); + waitForAsyncReadComplete(); + if (isEndOfStream()) { + return -1; + } + } + // Swap the newly read read ahead buffer in place of empty active buffer. + swapBuffers(); + // After swapping buffers, trigger another async read for read ahead buffer. + readAsync(); + } finally { + stateChangeLock.unlock(); + } + } + len = Math.min(len, activeBuffer.remaining()); + activeBuffer.get(b, offset, len); + + return len; + } + + /** Read data from underlyingInputStream to readAheadBuffer asynchronously. */ + private void readAsync() throws IOException { + stateChangeLock.lock(); + final byte[] arr; + try { + arr = readAheadBuffer.array(); + if (endOfStream || readInProgress) { + return; + } + checkReadException(); + readAheadBuffer.position(0); + readAheadBuffer.flip(); + readInProgress = true; + } finally { + stateChangeLock.unlock(); + } + executorService.execute(() -> { + stateChangeLock.lock(); + try { + if (isClosed) { + readInProgress = false; + return; + } + // Flip this so that the close method will not close the underlying input stream when we + // are reading. + isReading = true; + } finally { + stateChangeLock.unlock(); + } + + // Please note that it is safe to release the lock and read into the read ahead buffer + // because either of following two conditions will hold: + // + // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer. + // + // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits + // for this async read to complete. + // + // So there is no race condition in both the situations. + int read = 0; + int off = 0, len = arr.length; + Throwable exception = null; + try { + // try to fill the read ahead buffer. + // if a reader is waiting, possibly return early. + do { + read = underlyingInputStream.read(arr, off, len); + if (read <= 0) { + break; + } + off += read; + len -= read; + } while (len > 0 && !isWaiting.get()); + } catch (final Throwable ex) { + exception = ex; + if (ex instanceof Error) { + // `readException` may not be reported to the user. Rethrow Error to make sure at least + // The user can see Error in UncaughtExceptionHandler. + throw (Error) ex; + } + } finally { + stateChangeLock.lock(); + try { + readAheadBuffer.limit(off); + if (read < 0 || (exception instanceof EOFException)) { + endOfStream = true; + } else if (exception != null) { + readAborted = true; + readException = exception; + } + readInProgress = false; + signalAsyncReadComplete(); + } finally { + stateChangeLock.unlock(); + } + closeUnderlyingInputStreamIfNecessary(); + } + }); + } + + private void signalAsyncReadComplete() { + stateChangeLock.lock(); + try { + asyncReadComplete.signalAll(); + } finally { + stateChangeLock.unlock(); + } + } + + @Override + public long skip(final long n) throws IOException { + if (n <= 0L) { + return 0L; + } + if (n <= activeBuffer.remaining()) { + // Only skipping from active buffer is sufficient + activeBuffer.position((int) n + activeBuffer.position()); + return n; + } + stateChangeLock.lock(); + long skipped; + try { + skipped = skipInternal(n); + } finally { + stateChangeLock.unlock(); + } + return skipped; + } + + /** + * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is + * already acquired in the caller before calling this function. + */ + private long skipInternal(final long n) throws IOException { + assert stateChangeLock.isLocked(); + waitForAsyncReadComplete(); + if (isEndOfStream()) { + return 0; + } + if (available() >= n) { + // we can skip from the internal buffers + int toSkip = (int) n; + // We need to skip from both active buffer and read ahead buffer + toSkip -= activeBuffer.remaining(); + assert toSkip > 0; // skipping from activeBuffer already handled. + activeBuffer.position(0); + activeBuffer.flip(); + readAheadBuffer.position(toSkip + readAheadBuffer.position()); + swapBuffers(); + // Trigger async read to emptied read ahead buffer. + readAsync(); + return n; + } + final int skippedBytes = available(); + final long toSkip = n - skippedBytes; + activeBuffer.position(0); + activeBuffer.flip(); + readAheadBuffer.position(0); + readAheadBuffer.flip(); + final long skippedFromInputStream = underlyingInputStream.skip(toSkip); + readAsync(); + return skippedBytes + skippedFromInputStream; + } + + /** + * Flips the active and read ahead buffer + */ + private void swapBuffers() { + final ByteBuffer temp = activeBuffer; + activeBuffer = readAheadBuffer; + readAheadBuffer = temp; + } + + private void waitForAsyncReadComplete() throws IOException { + stateChangeLock.lock(); + try { + isWaiting.set(true); + // There is only one reader, and one writer, so the writer should signal only once, + // but a while loop checking the wake up condition is still needed to avoid spurious wakeups. + while (readInProgress) { + asyncReadComplete.await(); + } + } catch (final InterruptedException e) { + final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); + iio.initCause(e); + throw iio; + } finally { + isWaiting.set(false); + stateChangeLock.unlock(); + } + checkReadException(); + } +} diff --git a/src/test/java/org/apache/commons/io/input/AbstractInputStreamTest.java b/src/test/java/org/apache/commons/io/input/AbstractInputStreamTest.java new file mode 100644 index 0000000..eaef96e --- /dev/null +++ b/src/test/java/org/apache/commons/io/input/AbstractInputStreamTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.io.input; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.RandomUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Tests functionality of {@link BufferedFileChannelInputStream}. + * + * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was + * called {@code GenericFileInputStreamSuite}. + */ +public abstract class AbstractInputStreamTest { + + private byte[] randomBytes; + + protected File inputFile; + + protected InputStream[] inputStreams; + + @BeforeEach + public void setUp() throws IOException { + // Create a byte array of size 2 MB with random bytes + randomBytes = RandomUtils.nextBytes(2 * 1024 * 1024); + inputFile = File.createTempFile("temp-file", ".tmp"); + FileUtils.writeByteArrayToFile(inputFile, randomBytes); + } + + @AfterEach + public void tearDown() throws IOException { + inputFile.delete(); + + for (final InputStream is : inputStreams) { + is.close(); + } + } + + @Test + public void testBytesSkipped() throws IOException { + for (final InputStream inputStream : inputStreams) { + assertEquals(1024, inputStream.skip(1024)); + for (int i = 1024; i < randomBytes.length; i++) { + assertEquals(randomBytes[i], (byte) inputStream.read()); + } + } + } + + @Test + public void testBytesSkippedAfterEOF() throws IOException { + for (final InputStream inputStream : inputStreams) { + assertEquals(randomBytes.length, inputStream.skip(randomBytes.length + 1)); + assertEquals(-1, inputStream.read()); + } + } + + @Test + public void testBytesSkippedAfterRead() throws IOException { + for (final InputStream inputStream : inputStreams) { + for (int i = 0; i < 1024; i++) { + assertEquals(randomBytes[i], (byte) inputStream.read()); + } + assertEquals(1024, inputStream.skip(1024)); + for (int i = 2048; i < randomBytes.length; i++) { + assertEquals(randomBytes[i], (byte) inputStream.read()); + } + } + } + + @Test + public void testNegativeBytesSkippedAfterRead() throws IOException { + for (final InputStream inputStream : inputStreams) { + for (int i = 0; i < 1024; i++) { + assertEquals(randomBytes[i], (byte) inputStream.read()); + } + // Skipping negative bytes should essential be a no-op + assertEquals(0, inputStream.skip(-1)); + assertEquals(0, inputStream.skip(-1024)); + assertEquals(0, inputStream.skip(Long.MIN_VALUE)); + assertEquals(1024, inputStream.skip(1024)); + for (int i = 2048; i < randomBytes.length; i++) { + assertEquals(randomBytes[i], (byte) inputStream.read()); + } + } + } + + @Test + public void testReadMultipleBytes() throws IOException { + for (final InputStream inputStream : inputStreams) { + final byte[] readBytes = new byte[8 * 1024]; + int i = 0; + while (i < randomBytes.length) { + final int read = inputStream.read(readBytes, 0, 8 * 1024); + for (int j = 0; j < read; j++) { + assertEquals(randomBytes[i], readBytes[j]); + i++; + } + } + } + } + + @Test + public void testReadOneByte() throws IOException { + for (final InputStream inputStream : inputStreams) { + for (final byte randomByte : randomBytes) { + assertEquals(randomByte, (byte) inputStream.read()); + } + } + } + + @Test + public void testReadPastEOF() throws IOException { + final InputStream is = inputStreams[0]; + final byte[] buf = new byte[1024]; + int read; + while ((read = is.read(buf, 0, buf.length)) != -1) { + + } + + final int readAfterEOF = is.read(buf, 0, buf.length); + assertEquals(-1, readAfterEOF); + } + + @Test + public void testSkipFromFileChannel() throws IOException { + for (final InputStream inputStream : inputStreams) { + // Since the buffer is smaller than the skipped bytes, this will guarantee + // we skip from underlying file channel. + assertEquals(1024, inputStream.skip(1024)); + for (int i = 1024; i < 2048; i++) { + assertEquals(randomBytes[i], (byte) inputStream.read()); + } + assertEquals(256, inputStream.skip(256)); + assertEquals(256, inputStream.skip(256)); + assertEquals(512, inputStream.skip(512)); + for (int i = 3072; i < randomBytes.length; i++) { + assertEquals(randomBytes[i], (byte) inputStream.read()); + } + } + } +} diff --git a/src/test/java/org/apache/commons/io/input/BufferedFileChannelInputStreamTest.java b/src/test/java/org/apache/commons/io/input/BufferedFileChannelInputStreamTest.java new file mode 100644 index 0000000..2580abe --- /dev/null +++ b/src/test/java/org/apache/commons/io/input/BufferedFileChannelInputStreamTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.io.input; + +import java.io.IOException; +import java.io.InputStream; + +import org.junit.jupiter.api.BeforeEach; + +/** + * Tests functionality of {@link BufferedFileChannelInputStream}. + * + * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 wher it was + * called {@code BufferedFileChannelInputStreamSuite}. + */ +public class BufferedFileChannelInputStreamTest extends AbstractInputStreamTest { + + @SuppressWarnings("resource") + @Override + @BeforeEach + public void setUp() throws IOException { + super.setUp(); + // @formatter:off + inputStreams = new InputStream[] { + new BufferedFileChannelInputStream(inputFile), // default + new BufferedFileChannelInputStream(inputFile, 123) // small, unaligned buffer + }; + //@formatter:on + } +} diff --git a/src/test/java/org/apache/commons/io/input/ReadAheadInputStreamTest.java b/src/test/java/org/apache/commons/io/input/ReadAheadInputStreamTest.java new file mode 100644 index 0000000..cf1c643 --- /dev/null +++ b/src/test/java/org/apache/commons/io/input/ReadAheadInputStreamTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.io.input; + +import java.io.IOException; +import java.io.InputStream; + +import org.junit.jupiter.api.BeforeEach; + +/** + * Tests {@link ReadAheadInputStream}. + * + * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was + * called {@code ReadAheadInputStreamSuite}. + */ +public class ReadAheadInputStreamTest extends AbstractInputStreamTest { + + @SuppressWarnings("resource") + @Override + @BeforeEach + public void setUp() throws IOException { + super.setUp(); + inputStreams = new InputStream[] { + // Tests equal and aligned buffers of wrapped an outer stream. + new ReadAheadInputStream(new BufferedFileChannelInputStream(inputFile, 8 * 1024), 8 * 1024), + // Tests aligned buffers, wrapped bigger than outer. + new ReadAheadInputStream(new BufferedFileChannelInputStream(inputFile, 3 * 1024), 2 * 1024), + // Tests aligned buffers, wrapped smaller than outer. + new ReadAheadInputStream(new BufferedFileChannelInputStream(inputFile, 2 * 1024), 3 * 1024), + // Tests unaligned buffers, wrapped bigger than outer. + new ReadAheadInputStream(new BufferedFileChannelInputStream(inputFile, 321), 123), + // Tests unaligned buffers, wrapped smaller than outer. + new ReadAheadInputStream(new BufferedFileChannelInputStream(inputFile, 123), 321)}; + } +}