This is an automated email from the ASF dual-hosted git repository.
ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-io.git
The following commit(s) were added to refs/heads/master by this push:
new 4ba62929 Apply nanoseconds precision for timeout duration. (#453)
4ba62929 is described below
commit 4ba62929ba78f8a01818ffea6f230f04809c0ceb
Author: maxxedev <[email protected]>
AuthorDate: Sun Apr 23 06:15:43 2023 -0700
Apply nanoseconds precision for timeout duration. (#453)
Also improve javadocs
---
.../apache/commons/io/input/QueueInputStream.java | 20 ++++++++++----------
.../commons/io/input/QueueInputStreamTest.java | 8 +++++---
2 files changed, 15 insertions(+), 13 deletions(-)
diff --git a/src/main/java/org/apache/commons/io/input/QueueInputStream.java
b/src/main/java/org/apache/commons/io/input/QueueInputStream.java
index bcbf79f5..6019ab55 100644
--- a/src/main/java/org/apache/commons/io/input/QueueInputStream.java
+++ b/src/main/java/org/apache/commons/io/input/QueueInputStream.java
@@ -103,8 +103,8 @@ public class QueueInputStream extends InputStream {
* @return this.
*/
public Builder setTimeout(final Duration timeout) {
- if (timeout != null && timeout.toMillis() < 0) {
- throw new IllegalArgumentException("waitTime must not be
negative");
+ if (timeout != null && timeout.toNanos() < 0) {
+ throw new IllegalArgumentException("timeout must not be
negative");
}
this.timeout = timeout != null ? timeout : Duration.ZERO;
return this;
@@ -124,17 +124,17 @@ public class QueueInputStream extends InputStream {
private final BlockingQueue<Integer> blockingQueue;
- private final long timeoutMillis;
+ private final long timeoutNanos;
/**
- * Constructs a new instance with no limit to its internal buffer size and
zero wait time.
+ * Constructs a new instance with no limit to its internal queue size and
zero timeout.
*/
public QueueInputStream() {
this(new LinkedBlockingQueue<>());
}
/**
- * Constructs a new instance with given buffer and zero wait time.
+ * Constructs a new instance with given queue and zero timeout.
*
* @param blockingQueue backing queue for the stream.
*/
@@ -143,14 +143,14 @@ public class QueueInputStream extends InputStream {
}
/**
- * Constructs a new instance with given buffer and wait time.
+ * Constructs a new instance with given queue and timeout.
*
* @param blockingQueue backing queue for the stream.
* @param timeout how long to wait before giving up when polling the
queue.
*/
private QueueInputStream(final BlockingQueue<Integer> blockingQueue, final
Duration timeout) {
this.blockingQueue = Objects.requireNonNull(blockingQueue,
"blockingQueue");
- this.timeoutMillis = Objects.requireNonNull(timeout,
"timeout").toMillis();
+ this.timeoutNanos = Objects.requireNonNull(timeout,
"timeout").toNanos();
}
/**
@@ -168,7 +168,7 @@ public class QueueInputStream extends InputStream {
* @return the timeout duration.
*/
Duration getTimeout() {
- return Duration.ofMillis(timeoutMillis);
+ return Duration.ofNanos(timeoutNanos);
}
/**
@@ -183,13 +183,13 @@ public class QueueInputStream extends InputStream {
/**
* Reads and returns a single byte.
*
- * @return either the byte read or {@code -1} if the wait time elapses
before a queue element is available.
+ * @return the byte read, or {@code -1} if a timeout occurs before a queue
element is available.
* @throws IllegalStateException if thread is interrupted while waiting.
*/
@Override
public int read() {
try {
- final Integer value = blockingQueue.poll(timeoutMillis,
TimeUnit.MILLISECONDS);
+ final Integer value = blockingQueue.poll(timeoutNanos,
TimeUnit.NANOSECONDS);
return value == null ? EOF : 0xFF & value;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
diff --git
a/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
b/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
index e42db56f..dcb90cbc 100644
--- a/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
+++ b/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java
@@ -32,7 +32,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
@@ -158,7 +158,7 @@ public class QueueInputStreamTest {
final QueueOutputStream outputStream =
inputStream.newQueueOutputStream()) {
// read in a background thread
- final AtomicReference<Boolean> result = new AtomicReference<>();
+ final AtomicBoolean result = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
final Thread thread = new Thread(() -> {
// when thread is interrupted, verify ...
@@ -205,8 +205,10 @@ public class QueueInputStreamTest {
@ParameterizedTest(name = "inputData={0}")
@MethodSource("inputData")
public void testUnbufferedReadWriteWithTimeout(final String inputData)
throws IOException {
- try (QueueInputStream inputStream =
QueueInputStream.builder().setTimeout(Duration.ofMinutes(2)).get();
+ final Duration timeout = Duration.ofMinutes(2);
+ try (QueueInputStream inputStream =
QueueInputStream.builder().setTimeout(timeout).get();
final QueueOutputStream outputStream =
inputStream.newQueueOutputStream()) {
+ assertEquals(timeout, inputStream.getTimeout());
writeUnbuffered(outputStream, inputData);
final String actualData = assertTimeout(Duration.ofSeconds(1), ()
-> readUnbuffered(inputStream, inputData.length()));
assertEquals(inputData, actualData);