This is an automated email from the ASF dual-hosted git repository.

ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-io.git

commit 4c1e1108e999a4883939e34cf4da8a9891ab947c
Author: Gary Gregory <garydgreg...@gmail.com>
AuthorDate: Sat Nov 9 11:00:18 2024 -0500

    Add support to ThrottledInputStream for setting a consumer for
    ProxyInputStream.afterRead(int)
---
 src/changes/changes.xml                            |  1 +
 .../commons/io/input/ThrottledInputStream.java     | 16 +++++------
 .../commons/io/input/ThrottledInputStreamTest.java | 32 ++++++++++++++++++++++
 3 files changed, 40 insertions(+), 9 deletions(-)

diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index f44b4d8f8..61019ee59 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -70,6 +70,7 @@ The <action> type attribute can be add,update,fix,remove.
       <action dev="ggregory" type="add"                due-to="Gary 
Gregory">Add support to BOMInputStream for setting a consumer for 
ProxyInputStream.afterRead(int).</action>
       <action dev="ggregory" type="add" issue="IO-861" due-to="Gary 
Gregory">Add support to BoundedInputStream for setting a consumer for 
ProxyInputStream.afterRead(int).</action>
       <action dev="ggregory" type="add"                due-to="Gary 
Gregory">Add support to ChecksumInputStream for setting a consumer for 
ProxyInputStream.afterRead(int).</action>
+      <action dev="ggregory" type="add"                due-to="Gary 
Gregory">Add support to ThrottledInputStream for setting a consumer for 
ProxyInputStream.afterRead(int).</action>
       <!-- UPDATE -->
       <action dev="ggregory" type="update"             due-to="Gary 
Gregory">Bump org.apache.commons:commons-parent from 74 to 78 #670, #676, #679, 
#688.</action>
       <action dev="ggregory" type="update"             due-to="Gary 
Gregory">Bump commons.bytebuddy.version from 1.15.1 to 1.15.10 #672, #673, 
#685, #686, #694, #696, #698.</action>
diff --git 
a/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java 
b/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java
index 55f626978..1fcc78183 100644
--- a/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java
+++ b/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java
@@ -24,8 +24,6 @@ import java.time.Duration;
 import java.time.temporal.ChronoUnit;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.io.build.AbstractStreamBuilder;
-
 /**
  * Provides bandwidth throttling on a specified InputStream. It is implemented 
as a wrapper on top of another InputStream instance. The throttling works by
  * examining the number of bytes read from the underlying InputStream from the 
beginning, and sleep()ing for a time interval if the byte-transfer is found
@@ -74,7 +72,7 @@ public final class ThrottledInputStream extends 
CountingInputStream {
      * @see #get()
      */
     // @formatter:on
-    public static class Builder extends 
AbstractStreamBuilder<ThrottledInputStream, Builder> {
+    public static class Builder extends AbstractBuilder<ThrottledInputStream, 
Builder> {
 
         /**
          * Effectively not throttled.
@@ -103,7 +101,7 @@ public final class ThrottledInputStream extends 
CountingInputStream {
         @SuppressWarnings("resource")
         @Override
         public ThrottledInputStream get() throws IOException {
-            return new ThrottledInputStream(getInputStream(), 
maxBytesPerSecond);
+            return new ThrottledInputStream(this);
         }
 
         /**
@@ -147,12 +145,12 @@ public final class ThrottledInputStream extends 
CountingInputStream {
     private final long startTime = System.currentTimeMillis();
     private Duration totalSleepDuration = Duration.ZERO;
 
-    private ThrottledInputStream(final InputStream proxy, final long 
maxBytesPerSecond) {
-        super(proxy);
-        if (maxBytesPerSecond <= 0) {
-            throw new IllegalArgumentException("Bandwidth " + 
maxBytesPerSecond + " is invalid.");
+    private ThrottledInputStream(final Builder builder) throws IOException {
+        super(builder);
+        if (builder.maxBytesPerSecond <= 0) {
+            throw new IllegalArgumentException("Bandwidth " + 
builder.maxBytesPerSecond + " is invalid.");
         }
-        this.maxBytesPerSecond = maxBytesPerSecond;
+        this.maxBytesPerSecond = builder.maxBytesPerSecond;
     }
 
     @Override
diff --git 
a/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java 
b/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java
index 749daa679..a072329e3 100644
--- a/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java
+++ b/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java
@@ -18,10 +18,16 @@
 package org.apache.commons.io.input;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.test.CustomIOException;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -35,6 +41,32 @@ public class ThrottledInputStreamTest extends 
ProxyInputStreamTest<ThrottledInpu
         return 
ThrottledInputStream.builder().setInputStream(createOriginInputStream()).get();
     }
 
+    @Test
+    public void testAfterReadConsumer() throws Exception {
+        final AtomicBoolean boolRef = new AtomicBoolean();
+        // @formatter:off
+        try (InputStream bounded = ThrottledInputStream.builder()
+                .setCharSequence("Hi")
+                .setAfterRead(i -> boolRef.set(true))
+                .get()) {
+            IOUtils.consume(bounded);
+        }
+        // @formatter:on
+        assertTrue(boolRef.get());
+        // Throwing
+        final String message = "test exception message";
+        // @formatter:off
+        try (InputStream bounded = ThrottledInputStream.builder()
+                .setCharSequence("Hi")
+                .setAfterRead(i -> {
+                    throw new CustomIOException(message);
+                })
+                .get()) {
+            assertEquals(message, assertThrowsExactly(CustomIOException.class, 
() -> IOUtils.consume(bounded)).getMessage());
+        }
+        // @formatter:on
+    }
+
     @Test
     public void testCalSleepTimeMs() {
         // case 0: initial - no read, no sleep

Reply via email to