This is an automated email from the ASF dual-hosted git repository. ggregory pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-io.git
commit 4c1e1108e999a4883939e34cf4da8a9891ab947c Author: Gary Gregory <garydgreg...@gmail.com> AuthorDate: Sat Nov 9 11:00:18 2024 -0500 Add support to ThrottledInputStream for setting a consumer for ProxyInputStream.afterRead(int) --- src/changes/changes.xml | 1 + .../commons/io/input/ThrottledInputStream.java | 16 +++++------ .../commons/io/input/ThrottledInputStreamTest.java | 32 ++++++++++++++++++++++ 3 files changed, 40 insertions(+), 9 deletions(-) diff --git a/src/changes/changes.xml b/src/changes/changes.xml index f44b4d8f8..61019ee59 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -70,6 +70,7 @@ The <action> type attribute can be add,update,fix,remove. <action dev="ggregory" type="add" due-to="Gary Gregory">Add support to BOMInputStream for setting a consumer for ProxyInputStream.afterRead(int).</action> <action dev="ggregory" type="add" issue="IO-861" due-to="Gary Gregory">Add support to BoundedInputStream for setting a consumer for ProxyInputStream.afterRead(int).</action> <action dev="ggregory" type="add" due-to="Gary Gregory">Add support to ChecksumInputStream for setting a consumer for ProxyInputStream.afterRead(int).</action> + <action dev="ggregory" type="add" due-to="Gary Gregory">Add support to ThrottledInputStream for setting a consumer for ProxyInputStream.afterRead(int).</action> <!-- UPDATE --> <action dev="ggregory" type="update" due-to="Gary Gregory">Bump org.apache.commons:commons-parent from 74 to 78 #670, #676, #679, #688.</action> <action dev="ggregory" type="update" due-to="Gary Gregory">Bump commons.bytebuddy.version from 1.15.1 to 1.15.10 #672, #673, #685, #686, #694, #696, #698.</action> diff --git a/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java b/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java index 55f626978..1fcc78183 100644 --- a/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java +++ b/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java @@ -24,8 +24,6 @@ import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.concurrent.TimeUnit; -import org.apache.commons.io.build.AbstractStreamBuilder; - /** * Provides bandwidth throttling on a specified InputStream. It is implemented as a wrapper on top of another InputStream instance. The throttling works by * examining the number of bytes read from the underlying InputStream from the beginning, and sleep()ing for a time interval if the byte-transfer is found @@ -74,7 +72,7 @@ public final class ThrottledInputStream extends CountingInputStream { * @see #get() */ // @formatter:on - public static class Builder extends AbstractStreamBuilder<ThrottledInputStream, Builder> { + public static class Builder extends AbstractBuilder<ThrottledInputStream, Builder> { /** * Effectively not throttled. @@ -103,7 +101,7 @@ public final class ThrottledInputStream extends CountingInputStream { @SuppressWarnings("resource") @Override public ThrottledInputStream get() throws IOException { - return new ThrottledInputStream(getInputStream(), maxBytesPerSecond); + return new ThrottledInputStream(this); } /** @@ -147,12 +145,12 @@ public final class ThrottledInputStream extends CountingInputStream { private final long startTime = System.currentTimeMillis(); private Duration totalSleepDuration = Duration.ZERO; - private ThrottledInputStream(final InputStream proxy, final long maxBytesPerSecond) { - super(proxy); - if (maxBytesPerSecond <= 0) { - throw new IllegalArgumentException("Bandwidth " + maxBytesPerSecond + " is invalid."); + private ThrottledInputStream(final Builder builder) throws IOException { + super(builder); + if (builder.maxBytesPerSecond <= 0) { + throw new IllegalArgumentException("Bandwidth " + builder.maxBytesPerSecond + " is invalid."); } - this.maxBytesPerSecond = maxBytesPerSecond; + this.maxBytesPerSecond = builder.maxBytesPerSecond; } @Override diff --git a/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java b/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java index 749daa679..a072329e3 100644 --- a/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java +++ b/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java @@ -18,10 +18,16 @@ package org.apache.commons.io.input; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrowsExactly; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; +import java.io.InputStream; import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.test.CustomIOException; import org.junit.jupiter.api.Test; /** @@ -35,6 +41,32 @@ public class ThrottledInputStreamTest extends ProxyInputStreamTest<ThrottledInpu return ThrottledInputStream.builder().setInputStream(createOriginInputStream()).get(); } + @Test + public void testAfterReadConsumer() throws Exception { + final AtomicBoolean boolRef = new AtomicBoolean(); + // @formatter:off + try (InputStream bounded = ThrottledInputStream.builder() + .setCharSequence("Hi") + .setAfterRead(i -> boolRef.set(true)) + .get()) { + IOUtils.consume(bounded); + } + // @formatter:on + assertTrue(boolRef.get()); + // Throwing + final String message = "test exception message"; + // @formatter:off + try (InputStream bounded = ThrottledInputStream.builder() + .setCharSequence("Hi") + .setAfterRead(i -> { + throw new CustomIOException(message); + }) + .get()) { + assertEquals(message, assertThrowsExactly(CustomIOException.class, () -> IOUtils.consume(bounded)).getMessage()); + } + // @formatter:on + } + @Test public void testCalSleepTimeMs() { // case 0: initial - no read, no sleep