This is an automated email from the ASF dual-hosted git repository. ggregory pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-io.git
commit df770deffd9917b06da33cc547fea22ebfc29226 Author: Gary Gregory <garydgreg...@gmail.com> AuthorDate: Sat Apr 22 09:25:29 2023 -0400 Use a builder instead of adding another constructor Fix Javadoc Follow existing code conventions --- .../apache/commons/io/input/QueueInputStream.java | 106 ++++++++++++++----- .../commons/io/input/QueueInputStreamTest.java | 115 ++++++++++----------- 2 files changed, 138 insertions(+), 83 deletions(-) diff --git a/src/main/java/org/apache/commons/io/input/QueueInputStream.java b/src/main/java/org/apache/commons/io/input/QueueInputStream.java index 5efaeb7e..bb9e0a07 100644 --- a/src/main/java/org/apache/commons/io/input/QueueInputStream.java +++ b/src/main/java/org/apache/commons/io/input/QueueInputStream.java @@ -28,15 +28,16 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.build.AbstractStreamBuilder; import org.apache.commons.io.output.QueueOutputStream; /** - * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue - * output stream. + * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue output stream. * * <p> * Example usage: * </p> + * * <pre> * QueueInputStream inputStream = new QueueInputStream(); * QueueOutputStream outputStream = inputStream.newQueueOutputStream(); @@ -45,13 +46,12 @@ import org.apache.commons.io.output.QueueOutputStream; * inputStream.read(); * </pre> * <p> - * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a - * single thread or multiple threads. Also, unlike JDK classes, no special meaning is attached to initial or current - * thread. Instances can be used longer after initial threads exited. + * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads. + * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited. * </p> * <p> - * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been - * closed without generating an {@link IOException}. + * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an + * {@link IOException}. * </p> * * @see QueueOutputStream @@ -59,8 +59,72 @@ import org.apache.commons.io.output.QueueOutputStream; */ public class QueueInputStream extends InputStream { + /** + * Builds a new {@link QueueInputStream} instance. + * <p> + * For example: + * </p> + * + * <pre>{@code + * QueueInputStream s = QueueInputStream.builder() + * .setBlockingQueue(new LinkedBlockingQueue<>()) + * .setTimeout(Duration.ZERO) + * .get()} + * </pre> + * <p> + * + * @since 2.12.0 + */ + public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> { + + private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(); + private Duration timeout = Duration.ZERO; + + @Override + public QueueInputStream get() throws IOException { + return new QueueInputStream(blockingQueue, timeout); + } + + /** + * Sets backing queue for the stream. + * + * @param blockingQueue backing queue for the stream. + * @return this + */ + public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) { + this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>(); + return this; + } + + /** + * Sets the polling timeout. + * + * @param timeout the polling timeout. + * @return this. + */ + public Builder setTimeout(final Duration timeout) { + if (timeout != null && timeout.toMillis() < 0) { + throw new IllegalArgumentException("waitTime must not be negative"); + } + this.timeout = timeout != null ? timeout : Duration.ZERO; + return this; + } + + } + + /** + * Constructs a new {@link Builder}. + * + * @return a new {@link Builder}. + * @since 2.12.0 + */ + public static Builder builder() { + return new Builder(); + } + private final BlockingQueue<Integer> blockingQueue; - private final long waitTimeMillis; + + private final long timeoutMillis; /** * Constructs a new instance with no limit to its internal buffer size and zero wait time. @@ -72,7 +136,7 @@ public class QueueInputStream extends InputStream { /** * Constructs a new instance with given buffer and zero wait time. * - * @param blockingQueue backing queue for the stream + * @param blockingQueue backing queue for the stream. */ public QueueInputStream(final BlockingQueue<Integer> blockingQueue) { this(blockingQueue, Duration.ZERO); @@ -81,24 +145,18 @@ public class QueueInputStream extends InputStream { /** * Constructs a new instance with given buffer and wait time. * - * @param blockingQueue backing queue for the stream - * @param waitTime how long to wait if necessary for a queue element is available - * @since 2.12.0 + * @param blockingQueue backing queue for the stream. + * @param timeout how long to wait before giving up when polling the queue. */ - public QueueInputStream(final BlockingQueue<Integer> blockingQueue, final Duration waitTime) { + private QueueInputStream(final BlockingQueue<Integer> blockingQueue, final Duration timeout) { this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue"); - Objects.requireNonNull(waitTime, "waitTime"); - if (waitTime.toMillis() < 0) { - throw new IllegalArgumentException("waitTime must not be negative"); - } - this.waitTimeMillis = waitTime.toMillis(); + this.timeoutMillis = Objects.requireNonNull(timeout, "timeout").toMillis(); } /** - * Creates a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this - * input stream. + * Creates a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream. * - * @return QueueOutputStream connected to this stream + * @return QueueOutputStream connected to this stream. */ public QueueOutputStream newQueueOutputStream() { return new QueueOutputStream(blockingQueue); @@ -107,13 +165,13 @@ public class QueueInputStream extends InputStream { /** * Reads and returns a single byte. * - * @return either the byte read or {@code -1} if the wait time elapses before a queue element is available - * @throws IllegalStateException if thread is interrupted while waiting + * @return either the byte read or {@code -1} if the wait time elapses before a queue element is available. + * @throws IllegalStateException if thread is interrupted while waiting. */ @Override public int read() { try { - final Integer value = blockingQueue.poll(waitTimeMillis, TimeUnit.MILLISECONDS); + final Integer value = blockingQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS); return value == null ? EOF : 0xFF & value; } catch (final InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java b/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java index b99e3562..3943c1db 100644 --- a/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java +++ b/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java @@ -16,23 +16,11 @@ */ package org.apache.commons.io.input; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTimeout; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.google.common.base.Stopwatch; -import org.apache.commons.io.IOUtils; -import org.apache.commons.io.output.QueueOutputStream; -import org.apache.commons.io.output.QueueOutputStreamTest; -import org.apache.commons.lang3.StringUtils; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.ByteArrayOutputStream; @@ -47,10 +35,22 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.QueueOutputStream; +import org.apache.commons.io.output.QueueOutputStreamTest; +import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import com.google.common.base.Stopwatch; + /** * Test {@link QueueInputStream}. * - * @see {@link QueueOutputStreamTest} + * @see QueueOutputStreamTest */ public class QueueInputStreamTest { @@ -71,80 +71,78 @@ public class QueueInputStreamTest { // @formatter:on } + private int defaultBufferSize() { + return 8192; + } + + private String readUnbuffered(final InputStream inputStream) throws IOException { + return readUnbuffered(inputStream, Integer.MAX_VALUE); + } + + private String readUnbuffered(final InputStream inputStream, final int maxBytes) throws IOException { + if (maxBytes == 0) { + return ""; + } + + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + int n = -1; + while ((n = inputStream.read()) != -1) { + byteArrayOutputStream.write(n); + if (byteArrayOutputStream.size() >= maxBytes) { + break; + } + } + return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name()); + } + @ParameterizedTest(name = "inputData={0}") @MethodSource("inputData") - public void bufferedReads(final String inputData) throws IOException { + public void testBufferedReads(final String inputData) throws IOException { final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue)); final QueueOutputStream outputStream = new QueueOutputStream(queue)) { - outputStream.write(inputData.getBytes(UTF_8)); - final String actualData = IOUtils.toString(inputStream, UTF_8); + outputStream.write(inputData.getBytes(StandardCharsets.UTF_8)); + final String actualData = IOUtils.toString(inputStream, StandardCharsets.UTF_8); assertEquals(inputData, actualData); } } @ParameterizedTest(name = "inputData={0}") @MethodSource("inputData") - public void bufferedReadWrite(final String inputData) throws IOException { + public void testBufferedReadWrite(final String inputData) throws IOException { final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue)); final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) { - outputStream.write(inputData.getBytes(UTF_8)); + outputStream.write(inputData.getBytes(StandardCharsets.UTF_8)); outputStream.flush(); - final String dataCopy = IOUtils.toString(inputStream, UTF_8); + final String dataCopy = IOUtils.toString(inputStream, StandardCharsets.UTF_8); assertEquals(inputData, dataCopy); } } @ParameterizedTest(name = "inputData={0}") @MethodSource("inputData") - public void bufferedWrites(final String inputData) throws IOException { + public void testBufferedWrites(final String inputData) throws IOException { final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); try (QueueInputStream inputStream = new QueueInputStream(queue); final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) { - outputStream.write(inputData.getBytes(UTF_8)); + outputStream.write(inputData.getBytes(StandardCharsets.UTF_8)); outputStream.flush(); final String actualData = readUnbuffered(inputStream); assertEquals(inputData, actualData); } } - private int defaultBufferSize() { - return 8192; - } - @Test - public void invalidArguments() { + public void testInvalidArguments() { assertThrows(NullPointerException.class, () -> new QueueInputStream(null), "queue is required"); - assertThrows(NullPointerException.class, () -> new QueueInputStream(new LinkedBlockingQueue<>(), null), "waitTime is required"); - assertThrows(IllegalArgumentException.class, () -> new QueueInputStream(new LinkedBlockingQueue<>(), Duration.ofMillis(-1)), - "waitTime must not be negative"); - } - - private String readUnbuffered(final InputStream inputStream) throws IOException { - return readUnbuffered(inputStream, Integer.MAX_VALUE); - } - - private String readUnbuffered(final InputStream inputStream, final int maxBytes) throws IOException { - if (maxBytes == 0) { - return ""; - } - - final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - int n = -1; - while ((n = inputStream.read()) != -1) { - byteArrayOutputStream.write(n); - if (byteArrayOutputStream.size() >= maxBytes) { - break; - } - } - return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name()); + assertThrows(IllegalArgumentException.class, () -> QueueInputStream.builder().setTimeout(Duration.ofMillis(-1)).get(), "waitTime must not be negative"); } @Test @DisplayName("If read is interrupted while waiting, then exception is thrown") - public void timeoutInterrupted() throws Exception { - try (QueueInputStream inputStream = new QueueInputStream(new LinkedBlockingQueue<>(), Duration.ofMinutes(2)); + public void testTimeoutInterrupted() throws Exception { + try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(Duration.ofMinutes(2)).get(); final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) { // read in a background thread @@ -169,22 +167,21 @@ public class QueueInputStreamTest { @Test @DisplayName("If data is not available in queue, then read will wait until wait time elapses") - public void timeoutUnavailableData() throws IOException { - try (QueueInputStream inputStream = new QueueInputStream(new LinkedBlockingQueue<>(), Duration.ofMillis(500)); + public void testTimeoutUnavailableData() throws IOException { + try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(Duration.ofMillis(500)).get(); final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) { - final Stopwatch stopwatch = Stopwatch.createStarted(); final String actualData = assertTimeout(Duration.ofSeconds(1), () -> readUnbuffered(inputStream, 3)); stopwatch.stop(); assertEquals("", actualData); - assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) >= 500); + assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) >= 500, () -> stopwatch.toString()); } } @ParameterizedTest(name = "inputData={0}") @MethodSource("inputData") - public void unbufferedReadWrite(final String inputData) throws IOException { + public void testUnbufferedReadWrite(final String inputData) throws IOException { try (QueueInputStream inputStream = new QueueInputStream(); final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) { writeUnbuffered(outputStream, inputData); @@ -195,8 +192,8 @@ public class QueueInputStreamTest { @ParameterizedTest(name = "inputData={0}") @MethodSource("inputData") - public void unbufferedReadWriteWithTimeout(final String inputData) throws IOException { - try (QueueInputStream inputStream = new QueueInputStream(new LinkedBlockingQueue<>(), Duration.ofMinutes(2)); + public void testUnbufferedReadWriteWithTimeout(final String inputData) throws IOException { + try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(Duration.ofMinutes(2)).get(); final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) { writeUnbuffered(outputStream, inputData); final String actualData = assertTimeout(Duration.ofSeconds(1), () -> readUnbuffered(inputStream, inputData.length())); @@ -205,7 +202,7 @@ public class QueueInputStreamTest { } private void writeUnbuffered(final QueueOutputStream outputStream, final String inputData) throws IOException { - final byte[] bytes = inputData.getBytes(UTF_8); + final byte[] bytes = inputData.getBytes(StandardCharsets.UTF_8); outputStream.write(bytes, 0, bytes.length); } }