This is an automated email from the ASF dual-hosted git repository.

ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-io.git

commit f48c5a6d27f091365bd969049b60627ac458aef8
Author: Gary Gregory <garydgreg...@gmail.com>
AuthorDate: Sat Nov 9 10:58:09 2024 -0500

    Add support to BoundedInputStream for setting a consumer for
    ProxyInputStream.afterRead(int)
---
 src/changes/changes.xml                            |   1 +
 .../commons/io/input/BoundedInputStream.java       |  98 ++++++++++++-----
 .../commons/io/input/BoundedInputStreamTest.java   | 122 +++++++++++++++++----
 3 files changed, 168 insertions(+), 53 deletions(-)

diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 73f3885d8..ea1be6752 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -68,6 +68,7 @@ The <action> type attribute can be add,update,fix,remove.
       <action dev="ggregory" type="add" issue="IO-861" due-to="Gary 
Gregory">Add ProxyInputStream.AbstractBuilder. Supports setting a consumer for 
ProxyInputStream.afterRead(int).</action>
       <action dev="ggregory" type="add"                due-to="Gary 
Gregory">Add support to AutoCloseInputStream for setting a consumer for 
ProxyInputStream.afterRead(int).</action>
       <action dev="ggregory" type="add"                due-to="Gary 
Gregory">Add support to BOMInputStream for setting a consumer for 
ProxyInputStream.afterRead(int).</action>
+      <action dev="ggregory" type="add" issue="IO-861" due-to="Gary 
Gregory">Add support to BoundedInputStream for setting a consumer for 
ProxyInputStream.afterRead(int).</action>
       <!-- UPDATE -->
       <action dev="ggregory" type="update"             due-to="Gary 
Gregory">Bump org.apache.commons:commons-parent from 74 to 78 #670, #676, #679, 
#688.</action>
       <action dev="ggregory" type="update"             due-to="Gary 
Gregory">Bump commons.bytebuddy.version from 1.15.1 to 1.15.10 #672, #673, 
#685, #686, #694, #696, #698.</action>
diff --git a/src/main/java/org/apache/commons/io/input/BoundedInputStream.java 
b/src/main/java/org/apache/commons/io/input/BoundedInputStream.java
index f3b07b718..d23cf5696 100644
--- a/src/main/java/org/apache/commons/io/input/BoundedInputStream.java
+++ b/src/main/java/org/apache/commons/io/input/BoundedInputStream.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.build.AbstractStreamBuilder;
+import org.apache.commons.io.function.IOBiConsumer;
 
 //@formatter:off
 /**
@@ -73,6 +73,15 @@ import org.apache.commons.io.build.AbstractStreamBuilder;
  *   .get();
  * }
  * </pre>
+ * <h2>Listening for the max count reached</h2>
+ * <pre>{@code
+ * BoundedInputStream s = BoundedInputStream.builder()
+ *   .setPath(Paths.get("MyFile.xml"))
+ *   .setMaxCount(1024)
+ *   .setOnMaxCount((max, count) -> System.out.printf("Max count %,d reached 
with a last read count of %,d%n", max, count))
+ *   .get();
+ * }
+ * </pre>
  * @see Builder
  * @since 2.0
  */
@@ -84,7 +93,7 @@ public class BoundedInputStream extends ProxyInputStream {
      *
      * @param <T> The subclass.
      */
-    static abstract class AbstractBuilder<T extends AbstractBuilder<T>> 
extends AbstractStreamBuilder<BoundedInputStream, T> {
+    static abstract class AbstractBuilder<T extends AbstractBuilder<T>> 
extends ProxyInputStream.AbstractBuilder<BoundedInputStream, T> {
 
         /** The current count of bytes counted. */
         private long count;
@@ -92,6 +101,8 @@ public class BoundedInputStream extends ProxyInputStream {
         /** The max count of bytes to read. */
         private long maxCount = EOF;
 
+        private IOBiConsumer<Long, Long> onMaxCount = IOBiConsumer.noop();
+
         /** Flag if {@link #close()} should be propagated, {@code true} by 
default. */
         private boolean propagateClose = true;
 
@@ -103,6 +114,10 @@ public class BoundedInputStream extends ProxyInputStream {
             return maxCount;
         }
 
+        IOBiConsumer<Long, Long> getOnMaxCount() {
+            return onMaxCount;
+        }
+
         boolean isPropagateClose() {
             return propagateClose;
         }
@@ -138,6 +153,25 @@ public class BoundedInputStream extends ProxyInputStream {
             return asThis();
         }
 
+        /**
+         * Sets the default {@link BoundedInputStream#onMaxLength(long, long)} 
behavior, {@code null} resets to a NOOP.
+         * <p>
+         * The first Long is the max count of bytes to read. The second Long 
is the count of bytes read.
+         * </p>
+         * <p>
+         * This does <em>not</em> override a {@code BoundedInputStream} 
subclass' implementation of the {@link BoundedInputStream#onMaxLength(long, 
long)}
+         * method.
+         * </p>
+         *
+         * @param onMaxCount the {@link ProxyInputStream#afterRead(int)} 
behavior.
+         * @return this instance.
+         * @since 2.18.0
+         */
+        public T setOnMaxCount(final IOBiConsumer<Long, Long> onMaxCount) {
+            this.onMaxCount = onMaxCount != null ? onMaxCount : 
IOBiConsumer.noop();
+            return asThis();
+        }
+
         /**
          * Sets whether the {@link #close()} method should propagate to the 
underling {@link InputStream}.
          * <p>
@@ -218,8 +252,11 @@ public class BoundedInputStream extends ProxyInputStream {
          * </p>
          * <ul>
          * <li>{@link #getInputStream()}</li>
-         * <li>maxCount</li>
-         * <li>propagateClose</li>
+         * <li>{@link #getAfterRead()}</li>
+         * <li>{@link #getCount()}</li>
+         * <li>{@link #getMaxCount()}</li>
+         * <li>{@link #isPropagateClose()}</li>
+         * <li>{@link #getOnMaxCount()}</li>
          * </ul>
          *
          * @return a new instance.
@@ -228,10 +265,9 @@ public class BoundedInputStream extends ProxyInputStream {
          * @throws IOException                   if an I/O error occurs.
          * @see #getInputStream()
          */
-        @SuppressWarnings("resource")
         @Override
         public BoundedInputStream get() throws IOException {
-            return new BoundedInputStream(getInputStream(), getCount(), 
getMaxCount(), isPropagateClose());
+            return new BoundedInputStream(this);
         }
 
     }
@@ -255,6 +291,8 @@ public class BoundedInputStream extends ProxyInputStream {
     /** The max count of bytes to read. */
     private final long maxCount;
 
+    private final IOBiConsumer<Long, Long> onMaxCount;
+
     /**
      * Flag if close should be propagated.
      *
@@ -262,6 +300,14 @@ public class BoundedInputStream extends ProxyInputStream {
      */
     private boolean propagateClose = true;
 
+    BoundedInputStream(final Builder builder) throws IOException {
+        super(builder);
+        this.count = builder.getCount();
+        this.maxCount = builder.getMaxCount();
+        this.propagateClose = builder.isPropagateClose();
+        this.onMaxCount = builder.getOnMaxCount();
+    }
+
     /**
      * Constructs a new {@link BoundedInputStream} that wraps the given input 
stream and is unlimited.
      *
@@ -273,6 +319,14 @@ public class BoundedInputStream extends ProxyInputStream {
         this(in, EOF);
     }
 
+    BoundedInputStream(final InputStream inputStream, final Builder builder) {
+        super(inputStream, builder);
+        this.count = builder.getCount();
+        this.maxCount = builder.getMaxCount();
+        this.propagateClose = builder.isPropagateClose();
+        this.onMaxCount = builder.getOnMaxCount();
+    }
+
     /**
      * Constructs a new {@link BoundedInputStream} that wraps the given input 
stream and limits it to a certain size.
      *
@@ -284,26 +338,7 @@ public class BoundedInputStream extends ProxyInputStream {
     public BoundedInputStream(final InputStream inputStream, final long 
maxCount) {
         // Some badly designed methods - e.g. the Servlet API - overload length
         // such that "-1" means stream finished
-        this(inputStream, 0, maxCount, true);
-    }
-
-    /**
-     * Constructs a new {@link BoundedInputStream} that wraps the given input 
stream and limits it to a certain size.
-     *
-     * @param inputStream    The wrapped input stream.
-     * @param count          The current number of bytes read.
-     * @param maxCount       The maximum number of bytes to return.
-     * @param propagateClose {@code true} if calling {@link #close()} 
propagates to the {@code close()} method of the underlying stream or {@code 
false} if it
-     *                       does not.
-     */
-    BoundedInputStream(final InputStream inputStream, final long count, final 
long maxCount, final boolean propagateClose) {
-        // Some badly designed methods - e.g. the Servlet API - overload length
-        // such that "-1" means stream finished
-        // Can't throw because we start from an InputStream.
-        super(inputStream);
-        this.count = count;
-        this.maxCount = maxCount;
-        this.propagateClose = propagateClose;
+        this(inputStream, builder().setMaxCount(maxCount));
     }
 
     /**
@@ -318,6 +353,7 @@ public class BoundedInputStream extends ProxyInputStream {
         if (n != EOF) {
             count += n;
         }
+        super.afterRead(n);
     }
 
     /**
@@ -422,15 +458,19 @@ public class BoundedInputStream extends ProxyInputStream {
 
     /**
      * A caller has caused a request that would cross the {@code maxLength} 
boundary.
+     * <p>
+     * Delegates to the consumer set in {@link 
Builder#setOnMaxCount(IOBiConsumer)}.
+     * </p>
      *
-     * @param maxLength The max count of bytes to read.
+     * @param max The max count of bytes to read.
      * @param count     The count of bytes read.
      * @throws IOException Subclasses may throw.
      * @since 2.12.0
      */
     @SuppressWarnings("unused")
-    protected void onMaxLength(final long maxLength, final long count) throws 
IOException {
-        // for subclasses
+    // TODO Rename to onMaxCount for 3.0
+    protected void onMaxLength(final long max, final long count) throws 
IOException {
+        onMaxCount.accept(max, count);
     }
 
     /**
diff --git 
a/src/test/java/org/apache/commons/io/input/BoundedInputStreamTest.java 
b/src/test/java/org/apache/commons/io/input/BoundedInputStreamTest.java
index d81802ff4..f432d7a96 100644
--- a/src/test/java/org/apache/commons/io/input/BoundedInputStreamTest.java
+++ b/src/test/java/org/apache/commons/io/input/BoundedInputStreamTest.java
@@ -19,7 +19,9 @@ package org.apache.commons.io.input;
 import static org.apache.commons.io.IOUtils.EOF;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.ByteArrayInputStream;
@@ -29,6 +31,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.test.CustomIOException;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -78,14 +81,27 @@ public class BoundedInputStreamTest {
     }
 
     @SuppressWarnings("deprecation")
+    @Test
+    public void testPublicConstructors() throws IOException {
+        final byte[] helloWorld = "Hello 
World".getBytes(StandardCharsets.UTF_8);
+        try (ByteArrayInputStream baos = new ByteArrayInputStream(helloWorld);
+                BoundedInputStream inputStream = new BoundedInputStream(baos)) 
{
+            assertSame(baos, inputStream.unwrap());
+        }
+        final long maxCount = 2;
+        try (ByteArrayInputStream baos = new ByteArrayInputStream(helloWorld);
+                BoundedInputStream inputStream = new BoundedInputStream(baos, 
maxCount)) {
+            assertSame(baos, inputStream.unwrap());
+            assertSame(maxCount, inputStream.getMaxCount());
+        }
+    }
+
     @ParameterizedTest
     @ValueSource(longs = { -100, -1, 0, 1, 2, 4, 8, 16, 32, 64 })
     public void testCounts(final long startCount) throws Exception {
-
         final byte[] helloWorld = "Hello 
World".getBytes(StandardCharsets.UTF_8);
         final byte[] hello = "Hello".getBytes(StandardCharsets.UTF_8);
         final long actualStart = startCount < 0 ? 0 : startCount;
-
         // limit = length
         try (BoundedInputStream bounded = 
BoundedInputStream.builder().setInputStream(new 
ByteArrayInputStream(helloWorld)).setCount(startCount)
                 .setMaxCount(helloWorld.length).get()) {
@@ -149,7 +165,7 @@ public class BoundedInputStreamTest {
             assertTrue(bounded.markSupported());
         }
         // limit < length
-        try (BoundedInputStream bounded = new BoundedInputStream(new 
ByteArrayInputStream(helloWorld), hello.length)) {
+        try (BoundedInputStream bounded = 
BoundedInputStream.builder().setInputStream(new 
ByteArrayInputStream(helloWorld)).setMaxCount(hello.length).get()) {
             assertTrue(bounded.markSupported());
             assertEquals(hello.length, bounded.getMaxLength());
             assertEquals(0, bounded.getCount());
@@ -271,6 +287,65 @@ public class BoundedInputStreamTest {
         }
     }
 
+    @Test
+    public void testAfterReadConsumer() throws Exception {
+        final byte[] hello = "Hello".getBytes(StandardCharsets.UTF_8);
+        final AtomicBoolean boolRef = new AtomicBoolean();
+        // @formatter:off
+        try (InputStream bounded = BoundedInputStream.builder()
+                .setInputStream(new ByteArrayInputStream(hello))
+                .setMaxCount(hello.length)
+                .setAfterRead(i -> boolRef.set(true))
+                .get()) {
+            IOUtils.consume(bounded);
+        }
+        // @formatter:on
+        assertTrue(boolRef.get());
+        // Throwing
+        final String message = "test exception message";
+        // @formatter:off
+        try (InputStream bounded = BoundedInputStream.builder()
+                .setInputStream(new ByteArrayInputStream(hello))
+                .setMaxCount(hello.length)
+                .setAfterRead(i -> {
+                    throw new CustomIOException(message);
+                })
+                .get()) {
+            assertEquals(message, assertThrowsExactly(CustomIOException.class, 
() -> IOUtils.consume(bounded)).getMessage());
+        }
+        // @formatter:on
+    }
+
+    @Test
+    public void testOnMaxCountConsumer() throws Exception {
+        final byte[] hello = "Hello".getBytes(StandardCharsets.UTF_8);
+        final AtomicBoolean boolRef = new AtomicBoolean();
+        // @formatter:off
+        try (BoundedInputStream bounded = BoundedInputStream.builder()
+                .setInputStream(new ByteArrayInputStream(hello))
+                .setMaxCount(hello.length)
+                .setOnMaxCount(null) // should not blow up
+                .setOnMaxCount((m, c) -> boolRef.set(true))
+                .get()) {
+            IOUtils.consume(bounded);
+        }
+        // @formatter:on
+        assertTrue(boolRef.get());
+        // Throwing
+        final String message = "test exception message";
+        // @formatter:off
+        try (BoundedInputStream bounded = BoundedInputStream.builder()
+                .setInputStream(new ByteArrayInputStream(hello))
+                .setMaxCount(hello.length)
+                .setOnMaxCount((m, c) -> {
+                    throw new CustomIOException(message);
+                })
+                .get()) {
+            assertEquals(message, assertThrowsExactly(CustomIOException.class, 
() -> IOUtils.consume(bounded)).getMessage());
+        }
+        // @formatter:on
+    }
+
     @SuppressWarnings("deprecation")
     @Test
     public void testOnMaxLength() throws Exception {
@@ -278,12 +353,11 @@ public class BoundedInputStreamTest {
         final byte[] hello = "Hello".getBytes(StandardCharsets.UTF_8);
         final AtomicBoolean boolRef = new AtomicBoolean();
         // limit = length
-        try (BoundedInputStream bounded = new BoundedInputStream(new 
ByteArrayInputStream(helloWorld), helloWorld.length) {
-            @Override
-            protected void onMaxLength(final long max, final long readCount) {
-                boolRef.set(true);
-            }
-        }) {
+        try (BoundedInputStream bounded = BoundedInputStream.builder()
+                .setInputStream(new ByteArrayInputStream(helloWorld))
+                .setMaxCount(helloWorld.length)
+                .setOnMaxCount((m, c) -> boolRef.set(true))
+                .get()) {
             assertTrue(bounded.markSupported());
             assertEquals(helloWorld.length, bounded.getMaxCount());
             assertEquals(helloWorld.length, bounded.getMaxLength());
@@ -313,12 +387,11 @@ public class BoundedInputStreamTest {
         // limit > length
         boolRef.set(false);
         final int length2 = helloWorld.length + 1;
-        try (BoundedInputStream bounded = new BoundedInputStream(new 
ByteArrayInputStream(helloWorld), length2) {
-            @Override
-            protected void onMaxLength(final long max, final long readCount) {
-                boolRef.set(true);
-            }
-        }) {
+        try (BoundedInputStream bounded = BoundedInputStream.builder()
+                .setInputStream(new ByteArrayInputStream(helloWorld))
+                .setMaxCount(length2)
+                .setOnMaxCount((m, c) -> boolRef.set(true))
+                .get()) {
             assertTrue(bounded.markSupported());
             assertEquals(length2, bounded.getMaxLength());
             assertEquals(0, bounded.getCount());
@@ -343,12 +416,11 @@ public class BoundedInputStreamTest {
         }
         // limit < length
         boolRef.set(false);
-        try (BoundedInputStream bounded = new BoundedInputStream(new 
ByteArrayInputStream(helloWorld), hello.length) {
-            @Override
-            protected void onMaxLength(final long max, final long readCount) {
-                boolRef.set(true);
-            }
-        }) {
+        try (BoundedInputStream bounded = BoundedInputStream.builder()
+                .setInputStream(new ByteArrayInputStream(helloWorld))
+                .setMaxCount(hello.length)
+                .setOnMaxCount((m, c) -> boolRef.set(true))
+                .get()) {
             assertTrue(bounded.markSupported());
             assertEquals(hello.length, bounded.getMaxLength());
             assertEquals(0, bounded.getCount());
@@ -428,7 +500,8 @@ public class BoundedInputStreamTest {
         final byte[] helloWorld = "Hello 
World".getBytes(StandardCharsets.UTF_8);
         final byte[] hello = "Hello".getBytes(StandardCharsets.UTF_8);
         // limit = length
-        try (BoundedInputStream bounded = new BoundedInputStream(new 
ByteArrayInputStream(helloWorld), helloWorld.length)) {
+        try (BoundedInputStream bounded = 
BoundedInputStream.builder().setInputStream(new 
ByteArrayInputStream(helloWorld)).setMaxCount(helloWorld.length)
+                .get()) {
             assertTrue(bounded.markSupported());
             for (int i = 0; i < helloWorld.length; i++) {
                 assertEquals(helloWorld[i], bounded.read(), "limit = length 
byte[" + i + "]");
@@ -438,7 +511,8 @@ public class BoundedInputStreamTest {
             assertTrue(bounded.markSupported());
         }
         // limit > length
-        try (BoundedInputStream bounded = new BoundedInputStream(new 
ByteArrayInputStream(helloWorld), helloWorld.length + 1)) {
+        try (BoundedInputStream bounded = 
BoundedInputStream.builder().setInputStream(new 
ByteArrayInputStream(helloWorld)).setMaxCount(helloWorld.length + 1)
+                .get()) {
             assertTrue(bounded.markSupported());
             for (int i = 0; i < helloWorld.length; i++) {
                 assertEquals(helloWorld[i], bounded.read(), "limit > length 
byte[" + i + "]");
@@ -448,7 +522,7 @@ public class BoundedInputStreamTest {
             assertTrue(bounded.markSupported());
         }
         // limit < length
-        try (BoundedInputStream bounded = new BoundedInputStream(new 
ByteArrayInputStream(helloWorld), hello.length)) {
+        try (BoundedInputStream bounded = 
BoundedInputStream.builder().setInputStream(new 
ByteArrayInputStream(helloWorld)).setMaxCount(hello.length).get()) {
             assertTrue(bounded.markSupported());
             for (int i = 0; i < hello.length; i++) {
                 assertEquals(hello[i], bounded.read(), "limit < length byte[" 
+ i + "]");

Reply via email to