This is an automated email from the ASF dual-hosted git repository.

ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-io.git

commit df770deffd9917b06da33cc547fea22ebfc29226
Author: Gary Gregory <garydgreg...@gmail.com>
AuthorDate: Sat Apr 22 09:25:29 2023 -0400

    Use a builder instead of adding another constructor
    
    Fix Javadoc
    Follow existing code conventions
---
 .../apache/commons/io/input/QueueInputStream.java  | 106 ++++++++++++++-----
 .../commons/io/input/QueueInputStreamTest.java     | 115 ++++++++++-----------
 2 files changed, 138 insertions(+), 83 deletions(-)

diff --git a/src/main/java/org/apache/commons/io/input/QueueInputStream.java 
b/src/main/java/org/apache/commons/io/input/QueueInputStream.java
index 5efaeb7e..bb9e0a07 100644
--- a/src/main/java/org/apache/commons/io/input/QueueInputStream.java
+++ b/src/main/java/org/apache/commons/io/input/QueueInputStream.java
@@ -28,15 +28,16 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.build.AbstractStreamBuilder;
 import org.apache.commons.io.output.QueueOutputStream;
 
 /**
- * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input 
stream provides what's written in queue
- * output stream.
+ * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input 
stream provides what's written in queue output stream.
  *
  * <p>
  * Example usage:
  * </p>
+ *
  * <pre>
  * QueueInputStream inputStream = new QueueInputStream();
  * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
@@ -45,13 +46,12 @@ import org.apache.commons.io.output.QueueOutputStream;
  * inputStream.read();
  * </pre>
  * <p>
- * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue 
input/output streams may be used safely in a
- * single thread or multiple threads. Also, unlike JDK classes, no special 
meaning is attached to initial or current
- * thread. Instances can be used longer after initial threads exited.
+ * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue 
input/output streams may be used safely in a single thread or multiple threads.
+ * Also, unlike JDK classes, no special meaning is attached to initial or 
current thread. Instances can be used longer after initial threads exited.
  * </p>
  * <p>
- * Closing a {@link QueueInputStream} has no effect. The methods in this class 
can be called after the stream has been
- * closed without generating an {@link IOException}.
+ * Closing a {@link QueueInputStream} has no effect. The methods in this class 
can be called after the stream has been closed without generating an
+ * {@link IOException}.
  * </p>
  *
  * @see QueueOutputStream
@@ -59,8 +59,72 @@ import org.apache.commons.io.output.QueueOutputStream;
  */
 public class QueueInputStream extends InputStream {
 
+    /**
+     * Builds a new {@link QueueInputStream} instance.
+     * <p>
+     * For example:
+     * </p>
+     *
+     * <pre>{@code
+     * QueueInputStream s = QueueInputStream.builder()
+     *   .setBlockingQueue(new LinkedBlockingQueue<>())
+     *   .setTimeout(Duration.ZERO)
+     *   .get()}
+     * </pre>
+     * <p>
+     *
+     * @since 2.12.0
+     */
+    public static class Builder extends 
AbstractStreamBuilder<QueueInputStream, Builder> {
+
+        private BlockingQueue<Integer> blockingQueue = new 
LinkedBlockingQueue<>();
+        private Duration timeout = Duration.ZERO;
+
+        @Override
+        public QueueInputStream get() throws IOException {
+            return new QueueInputStream(blockingQueue, timeout);
+        }
+
+        /**
+         * Sets backing queue for the stream.
+         *
+         * @param blockingQueue backing queue for the stream.
+         * @return this
+         */
+        public Builder setBlockingQueue(final BlockingQueue<Integer> 
blockingQueue) {
+            this.blockingQueue = blockingQueue != null ? blockingQueue : new 
LinkedBlockingQueue<>();
+            return this;
+        }
+
+        /**
+         * Sets the polling timeout.
+         *
+         * @param timeout the polling timeout.
+         * @return this.
+         */
+        public Builder setTimeout(final Duration timeout) {
+            if (timeout != null && timeout.toMillis() < 0) {
+                throw new IllegalArgumentException("waitTime must not be 
negative");
+            }
+            this.timeout = timeout != null ? timeout : Duration.ZERO;
+            return this;
+        }
+
+    }
+
+    /**
+     * Constructs a new {@link Builder}.
+     *
+     * @return a new {@link Builder}.
+     * @since 2.12.0
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
     private final BlockingQueue<Integer> blockingQueue;
-    private final long waitTimeMillis;
+
+    private final long timeoutMillis;
 
     /**
      * Constructs a new instance with no limit to its internal buffer size and 
zero wait time.
@@ -72,7 +136,7 @@ public class QueueInputStream extends InputStream {
     /**
      * Constructs a new instance with given buffer and zero wait time.
      *
-     * @param blockingQueue backing queue for the stream
+     * @param blockingQueue backing queue for the stream.
      */
     public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
         this(blockingQueue, Duration.ZERO);
@@ -81,24 +145,18 @@ public class QueueInputStream extends InputStream {
     /**
      * Constructs a new instance with given buffer and wait time.
      *
-     * @param blockingQueue backing queue for the stream
-     * @param waitTime how long to wait if necessary for a queue element is 
available
-     * @since 2.12.0
+     * @param blockingQueue backing queue for the stream.
+     * @param timeout       how long to wait before giving up when polling the 
queue.
      */
-    public QueueInputStream(final BlockingQueue<Integer> blockingQueue, final 
Duration waitTime) {
+    private QueueInputStream(final BlockingQueue<Integer> blockingQueue, final 
Duration timeout) {
         this.blockingQueue = Objects.requireNonNull(blockingQueue, 
"blockingQueue");
-        Objects.requireNonNull(waitTime, "waitTime");
-        if (waitTime.toMillis() < 0) {
-            throw new IllegalArgumentException("waitTime must not be 
negative");
-        }
-        this.waitTimeMillis = waitTime.toMillis();
+        this.timeoutMillis = Objects.requireNonNull(timeout, 
"timeout").toMillis();
     }
 
     /**
-     * Creates a new QueueOutputStream instance connected to this. Writes to 
the output stream will be visible to this
-     * input stream.
+     * Creates a new QueueOutputStream instance connected to this. Writes to 
the output stream will be visible to this input stream.
      *
-     * @return QueueOutputStream connected to this stream
+     * @return QueueOutputStream connected to this stream.
      */
     public QueueOutputStream newQueueOutputStream() {
         return new QueueOutputStream(blockingQueue);
@@ -107,13 +165,13 @@ public class QueueInputStream extends InputStream {
     /**
      * Reads and returns a single byte.
      *
-     * @return either the byte read or {@code -1} if the wait time elapses 
before a queue element is available
-     * @throws IllegalStateException if thread is interrupted while waiting
+     * @return either the byte read or {@code -1} if the wait time elapses 
before a queue element is available.
+     * @throws IllegalStateException if thread is interrupted while waiting.
      */
     @Override
     public int read() {
         try {
-            final Integer value = blockingQueue.poll(waitTimeMillis, 
TimeUnit.MILLISECONDS);
+            final Integer value = blockingQueue.poll(timeoutMillis, 
TimeUnit.MILLISECONDS);
             return value == null ? EOF : 0xFF & value;
         } catch (final InterruptedException e) {
             Thread.currentThread().interrupt();
diff --git 
a/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java 
b/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
index b99e3562..3943c1db 100644
--- a/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
+++ b/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
@@ -16,23 +16,11 @@
  */
 package org.apache.commons.io.input;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTimeout;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import com.google.common.base.Stopwatch;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.output.QueueOutputStream;
-import org.apache.commons.io.output.QueueOutputStreamTest;
-import org.apache.commons.lang3.StringUtils;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
@@ -47,10 +35,22 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Stream;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.QueueOutputStream;
+import org.apache.commons.io.output.QueueOutputStreamTest;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import com.google.common.base.Stopwatch;
+
 /**
  * Test {@link QueueInputStream}.
  *
- * @see {@link QueueOutputStreamTest}
+ * @see QueueOutputStreamTest
  */
 public class QueueInputStreamTest {
 
@@ -71,80 +71,78 @@ public class QueueInputStreamTest {
         // @formatter:on
     }
 
+    private int defaultBufferSize() {
+        return 8192;
+    }
+
+    private String readUnbuffered(final InputStream inputStream) throws 
IOException {
+        return readUnbuffered(inputStream, Integer.MAX_VALUE);
+    }
+
+    private String readUnbuffered(final InputStream inputStream, final int 
maxBytes) throws IOException {
+        if (maxBytes == 0) {
+            return "";
+        }
+
+        final ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        int n = -1;
+        while ((n = inputStream.read()) != -1) {
+            byteArrayOutputStream.write(n);
+            if (byteArrayOutputStream.size() >= maxBytes) {
+                break;
+            }
+        }
+        return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name());
+    }
+
     @ParameterizedTest(name = "inputData={0}")
     @MethodSource("inputData")
-    public void bufferedReads(final String inputData) throws IOException {
+    public void testBufferedReads(final String inputData) throws IOException {
         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
         try (BufferedInputStream inputStream = new BufferedInputStream(new 
QueueInputStream(queue));
                 final QueueOutputStream outputStream = new 
QueueOutputStream(queue)) {
-            outputStream.write(inputData.getBytes(UTF_8));
-            final String actualData = IOUtils.toString(inputStream, UTF_8);
+            outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
+            final String actualData = IOUtils.toString(inputStream, 
StandardCharsets.UTF_8);
             assertEquals(inputData, actualData);
         }
     }
 
     @ParameterizedTest(name = "inputData={0}")
     @MethodSource("inputData")
-    public void bufferedReadWrite(final String inputData) throws IOException {
+    public void testBufferedReadWrite(final String inputData) throws 
IOException {
         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
         try (BufferedInputStream inputStream = new BufferedInputStream(new 
QueueInputStream(queue));
                 final BufferedOutputStream outputStream = new 
BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
-            outputStream.write(inputData.getBytes(UTF_8));
+            outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
             outputStream.flush();
-            final String dataCopy = IOUtils.toString(inputStream, UTF_8);
+            final String dataCopy = IOUtils.toString(inputStream, 
StandardCharsets.UTF_8);
             assertEquals(inputData, dataCopy);
         }
     }
 
     @ParameterizedTest(name = "inputData={0}")
     @MethodSource("inputData")
-    public void bufferedWrites(final String inputData) throws IOException {
+    public void testBufferedWrites(final String inputData) throws IOException {
         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
         try (QueueInputStream inputStream = new QueueInputStream(queue);
                 final BufferedOutputStream outputStream = new 
BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
-            outputStream.write(inputData.getBytes(UTF_8));
+            outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
             outputStream.flush();
             final String actualData = readUnbuffered(inputStream);
             assertEquals(inputData, actualData);
         }
     }
 
-    private int defaultBufferSize() {
-        return 8192;
-    }
-
     @Test
-    public void invalidArguments() {
+    public void testInvalidArguments() {
         assertThrows(NullPointerException.class, () -> new 
QueueInputStream(null), "queue is required");
-        assertThrows(NullPointerException.class, () -> new 
QueueInputStream(new LinkedBlockingQueue<>(), null), "waitTime is required");
-        assertThrows(IllegalArgumentException.class, () -> new 
QueueInputStream(new LinkedBlockingQueue<>(), Duration.ofMillis(-1)),
-                "waitTime must not be negative");
-    }
-
-    private String readUnbuffered(final InputStream inputStream) throws 
IOException {
-        return readUnbuffered(inputStream, Integer.MAX_VALUE);
-    }
-
-    private String readUnbuffered(final InputStream inputStream, final int 
maxBytes) throws IOException {
-        if (maxBytes == 0) {
-            return "";
-        }
-
-        final ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
-        int n = -1;
-        while ((n = inputStream.read()) != -1) {
-            byteArrayOutputStream.write(n);
-            if (byteArrayOutputStream.size() >= maxBytes) {
-                break;
-            }
-        }
-        return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name());
+        assertThrows(IllegalArgumentException.class, () -> 
QueueInputStream.builder().setTimeout(Duration.ofMillis(-1)).get(), "waitTime 
must not be negative");
     }
 
     @Test
     @DisplayName("If read is interrupted while waiting, then exception is 
thrown")
-    public void timeoutInterrupted() throws Exception {
-        try (QueueInputStream inputStream = new QueueInputStream(new 
LinkedBlockingQueue<>(), Duration.ofMinutes(2));
+    public void testTimeoutInterrupted() throws Exception {
+        try (QueueInputStream inputStream = 
QueueInputStream.builder().setTimeout(Duration.ofMinutes(2)).get();
                 final QueueOutputStream outputStream = 
inputStream.newQueueOutputStream()) {
 
             // read in a background thread
@@ -169,22 +167,21 @@ public class QueueInputStreamTest {
 
     @Test
     @DisplayName("If data is not available in queue, then read will wait until 
wait time elapses")
-    public void timeoutUnavailableData() throws IOException {
-        try (QueueInputStream inputStream = new QueueInputStream(new 
LinkedBlockingQueue<>(), Duration.ofMillis(500));
+    public void testTimeoutUnavailableData() throws IOException {
+        try (QueueInputStream inputStream = 
QueueInputStream.builder().setTimeout(Duration.ofMillis(500)).get();
                 final QueueOutputStream outputStream = 
inputStream.newQueueOutputStream()) {
-
             final Stopwatch stopwatch = Stopwatch.createStarted();
             final String actualData = assertTimeout(Duration.ofSeconds(1), () 
-> readUnbuffered(inputStream, 3));
             stopwatch.stop();
             assertEquals("", actualData);
 
-            assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) >= 500);
+            assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) >= 500, () -> 
stopwatch.toString());
         }
     }
 
     @ParameterizedTest(name = "inputData={0}")
     @MethodSource("inputData")
-    public void unbufferedReadWrite(final String inputData) throws IOException 
{
+    public void testUnbufferedReadWrite(final String inputData) throws 
IOException {
         try (QueueInputStream inputStream = new QueueInputStream();
                 final QueueOutputStream outputStream = 
inputStream.newQueueOutputStream()) {
             writeUnbuffered(outputStream, inputData);
@@ -195,8 +192,8 @@ public class QueueInputStreamTest {
 
     @ParameterizedTest(name = "inputData={0}")
     @MethodSource("inputData")
-    public void unbufferedReadWriteWithTimeout(final String inputData) throws 
IOException {
-        try (QueueInputStream inputStream = new QueueInputStream(new 
LinkedBlockingQueue<>(), Duration.ofMinutes(2));
+    public void testUnbufferedReadWriteWithTimeout(final String inputData) 
throws IOException {
+        try (QueueInputStream inputStream = 
QueueInputStream.builder().setTimeout(Duration.ofMinutes(2)).get();
                 final QueueOutputStream outputStream = 
inputStream.newQueueOutputStream()) {
             writeUnbuffered(outputStream, inputData);
             final String actualData = assertTimeout(Duration.ofSeconds(1), () 
-> readUnbuffered(inputStream, inputData.length()));
@@ -205,7 +202,7 @@ public class QueueInputStreamTest {
     }
 
     private void writeUnbuffered(final QueueOutputStream outputStream, final 
String inputData) throws IOException {
-        final byte[] bytes = inputData.getBytes(UTF_8);
+        final byte[] bytes = inputData.getBytes(StandardCharsets.UTF_8);
         outputStream.write(bytes, 0, bytes.length);
     }
 }

Reply via email to