This is an automated email from the ASF dual-hosted git repository. ggregory pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-io.git
commit f48c5a6d27f091365bd969049b60627ac458aef8 Author: Gary Gregory <garydgreg...@gmail.com> AuthorDate: Sat Nov 9 10:58:09 2024 -0500 Add support to BoundedInputStream for setting a consumer for ProxyInputStream.afterRead(int) --- src/changes/changes.xml | 1 + .../commons/io/input/BoundedInputStream.java | 98 ++++++++++++----- .../commons/io/input/BoundedInputStreamTest.java | 122 +++++++++++++++++---- 3 files changed, 168 insertions(+), 53 deletions(-) diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 73f3885d8..ea1be6752 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -68,6 +68,7 @@ The <action> type attribute can be add,update,fix,remove. <action dev="ggregory" type="add" issue="IO-861" due-to="Gary Gregory">Add ProxyInputStream.AbstractBuilder. Supports setting a consumer for ProxyInputStream.afterRead(int).</action> <action dev="ggregory" type="add" due-to="Gary Gregory">Add support to AutoCloseInputStream for setting a consumer for ProxyInputStream.afterRead(int).</action> <action dev="ggregory" type="add" due-to="Gary Gregory">Add support to BOMInputStream for setting a consumer for ProxyInputStream.afterRead(int).</action> + <action dev="ggregory" type="add" issue="IO-861" due-to="Gary Gregory">Add support to BoundedInputStream for setting a consumer for ProxyInputStream.afterRead(int).</action> <!-- UPDATE --> <action dev="ggregory" type="update" due-to="Gary Gregory">Bump org.apache.commons:commons-parent from 74 to 78 #670, #676, #679, #688.</action> <action dev="ggregory" type="update" due-to="Gary Gregory">Bump commons.bytebuddy.version from 1.15.1 to 1.15.10 #672, #673, #685, #686, #694, #696, #698.</action> diff --git a/src/main/java/org/apache/commons/io/input/BoundedInputStream.java b/src/main/java/org/apache/commons/io/input/BoundedInputStream.java index f3b07b718..d23cf5696 100644 --- a/src/main/java/org/apache/commons/io/input/BoundedInputStream.java +++ b/src/main/java/org/apache/commons/io/input/BoundedInputStream.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.io.InputStream; import org.apache.commons.io.IOUtils; -import org.apache.commons.io.build.AbstractStreamBuilder; +import org.apache.commons.io.function.IOBiConsumer; //@formatter:off /** @@ -73,6 +73,15 @@ import org.apache.commons.io.build.AbstractStreamBuilder; * .get(); * } * </pre> + * <h2>Listening for the max count reached</h2> + * <pre>{@code + * BoundedInputStream s = BoundedInputStream.builder() + * .setPath(Paths.get("MyFile.xml")) + * .setMaxCount(1024) + * .setOnMaxCount((max, count) -> System.out.printf("Max count %,d reached with a last read count of %,d%n", max, count)) + * .get(); + * } + * </pre> * @see Builder * @since 2.0 */ @@ -84,7 +93,7 @@ public class BoundedInputStream extends ProxyInputStream { * * @param <T> The subclass. */ - static abstract class AbstractBuilder<T extends AbstractBuilder<T>> extends AbstractStreamBuilder<BoundedInputStream, T> { + static abstract class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<BoundedInputStream, T> { /** The current count of bytes counted. */ private long count; @@ -92,6 +101,8 @@ public class BoundedInputStream extends ProxyInputStream { /** The max count of bytes to read. */ private long maxCount = EOF; + private IOBiConsumer<Long, Long> onMaxCount = IOBiConsumer.noop(); + /** Flag if {@link #close()} should be propagated, {@code true} by default. */ private boolean propagateClose = true; @@ -103,6 +114,10 @@ public class BoundedInputStream extends ProxyInputStream { return maxCount; } + IOBiConsumer<Long, Long> getOnMaxCount() { + return onMaxCount; + } + boolean isPropagateClose() { return propagateClose; } @@ -138,6 +153,25 @@ public class BoundedInputStream extends ProxyInputStream { return asThis(); } + /** + * Sets the default {@link BoundedInputStream#onMaxLength(long, long)} behavior, {@code null} resets to a NOOP. + * <p> + * The first Long is the max count of bytes to read. The second Long is the count of bytes read. + * </p> + * <p> + * This does <em>not</em> override a {@code BoundedInputStream} subclass' implementation of the {@link BoundedInputStream#onMaxLength(long, long)} + * method. + * </p> + * + * @param onMaxCount the {@link ProxyInputStream#afterRead(int)} behavior. + * @return this instance. + * @since 2.18.0 + */ + public T setOnMaxCount(final IOBiConsumer<Long, Long> onMaxCount) { + this.onMaxCount = onMaxCount != null ? onMaxCount : IOBiConsumer.noop(); + return asThis(); + } + /** * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}. * <p> @@ -218,8 +252,11 @@ public class BoundedInputStream extends ProxyInputStream { * </p> * <ul> * <li>{@link #getInputStream()}</li> - * <li>maxCount</li> - * <li>propagateClose</li> + * <li>{@link #getAfterRead()}</li> + * <li>{@link #getCount()}</li> + * <li>{@link #getMaxCount()}</li> + * <li>{@link #isPropagateClose()}</li> + * <li>{@link #getOnMaxCount()}</li> * </ul> * * @return a new instance. @@ -228,10 +265,9 @@ public class BoundedInputStream extends ProxyInputStream { * @throws IOException if an I/O error occurs. * @see #getInputStream() */ - @SuppressWarnings("resource") @Override public BoundedInputStream get() throws IOException { - return new BoundedInputStream(getInputStream(), getCount(), getMaxCount(), isPropagateClose()); + return new BoundedInputStream(this); } } @@ -255,6 +291,8 @@ public class BoundedInputStream extends ProxyInputStream { /** The max count of bytes to read. */ private final long maxCount; + private final IOBiConsumer<Long, Long> onMaxCount; + /** * Flag if close should be propagated. * @@ -262,6 +300,14 @@ public class BoundedInputStream extends ProxyInputStream { */ private boolean propagateClose = true; + BoundedInputStream(final Builder builder) throws IOException { + super(builder); + this.count = builder.getCount(); + this.maxCount = builder.getMaxCount(); + this.propagateClose = builder.isPropagateClose(); + this.onMaxCount = builder.getOnMaxCount(); + } + /** * Constructs a new {@link BoundedInputStream} that wraps the given input stream and is unlimited. * @@ -273,6 +319,14 @@ public class BoundedInputStream extends ProxyInputStream { this(in, EOF); } + BoundedInputStream(final InputStream inputStream, final Builder builder) { + super(inputStream, builder); + this.count = builder.getCount(); + this.maxCount = builder.getMaxCount(); + this.propagateClose = builder.isPropagateClose(); + this.onMaxCount = builder.getOnMaxCount(); + } + /** * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size. * @@ -284,26 +338,7 @@ public class BoundedInputStream extends ProxyInputStream { public BoundedInputStream(final InputStream inputStream, final long maxCount) { // Some badly designed methods - e.g. the Servlet API - overload length // such that "-1" means stream finished - this(inputStream, 0, maxCount, true); - } - - /** - * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size. - * - * @param inputStream The wrapped input stream. - * @param count The current number of bytes read. - * @param maxCount The maximum number of bytes to return. - * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it - * does not. - */ - BoundedInputStream(final InputStream inputStream, final long count, final long maxCount, final boolean propagateClose) { - // Some badly designed methods - e.g. the Servlet API - overload length - // such that "-1" means stream finished - // Can't throw because we start from an InputStream. - super(inputStream); - this.count = count; - this.maxCount = maxCount; - this.propagateClose = propagateClose; + this(inputStream, builder().setMaxCount(maxCount)); } /** @@ -318,6 +353,7 @@ public class BoundedInputStream extends ProxyInputStream { if (n != EOF) { count += n; } + super.afterRead(n); } /** @@ -422,15 +458,19 @@ public class BoundedInputStream extends ProxyInputStream { /** * A caller has caused a request that would cross the {@code maxLength} boundary. + * <p> + * Delegates to the consumer set in {@link Builder#setOnMaxCount(IOBiConsumer)}. + * </p> * - * @param maxLength The max count of bytes to read. + * @param max The max count of bytes to read. * @param count The count of bytes read. * @throws IOException Subclasses may throw. * @since 2.12.0 */ @SuppressWarnings("unused") - protected void onMaxLength(final long maxLength, final long count) throws IOException { - // for subclasses + // TODO Rename to onMaxCount for 3.0 + protected void onMaxLength(final long max, final long count) throws IOException { + onMaxCount.accept(max, count); } /** diff --git a/src/test/java/org/apache/commons/io/input/BoundedInputStreamTest.java b/src/test/java/org/apache/commons/io/input/BoundedInputStreamTest.java index d81802ff4..f432d7a96 100644 --- a/src/test/java/org/apache/commons/io/input/BoundedInputStreamTest.java +++ b/src/test/java/org/apache/commons/io/input/BoundedInputStreamTest.java @@ -19,7 +19,9 @@ package org.apache.commons.io.input; import static org.apache.commons.io.IOUtils.EOF; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertThrowsExactly; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.ByteArrayInputStream; @@ -29,6 +31,7 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.IOUtils; +import org.apache.commons.io.test.CustomIOException; import org.apache.commons.lang3.mutable.MutableInt; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -78,14 +81,27 @@ public class BoundedInputStreamTest { } @SuppressWarnings("deprecation") + @Test + public void testPublicConstructors() throws IOException { + final byte[] helloWorld = "Hello World".getBytes(StandardCharsets.UTF_8); + try (ByteArrayInputStream baos = new ByteArrayInputStream(helloWorld); + BoundedInputStream inputStream = new BoundedInputStream(baos)) { + assertSame(baos, inputStream.unwrap()); + } + final long maxCount = 2; + try (ByteArrayInputStream baos = new ByteArrayInputStream(helloWorld); + BoundedInputStream inputStream = new BoundedInputStream(baos, maxCount)) { + assertSame(baos, inputStream.unwrap()); + assertSame(maxCount, inputStream.getMaxCount()); + } + } + @ParameterizedTest @ValueSource(longs = { -100, -1, 0, 1, 2, 4, 8, 16, 32, 64 }) public void testCounts(final long startCount) throws Exception { - final byte[] helloWorld = "Hello World".getBytes(StandardCharsets.UTF_8); final byte[] hello = "Hello".getBytes(StandardCharsets.UTF_8); final long actualStart = startCount < 0 ? 0 : startCount; - // limit = length try (BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(new ByteArrayInputStream(helloWorld)).setCount(startCount) .setMaxCount(helloWorld.length).get()) { @@ -149,7 +165,7 @@ public class BoundedInputStreamTest { assertTrue(bounded.markSupported()); } // limit < length - try (BoundedInputStream bounded = new BoundedInputStream(new ByteArrayInputStream(helloWorld), hello.length)) { + try (BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(new ByteArrayInputStream(helloWorld)).setMaxCount(hello.length).get()) { assertTrue(bounded.markSupported()); assertEquals(hello.length, bounded.getMaxLength()); assertEquals(0, bounded.getCount()); @@ -271,6 +287,65 @@ public class BoundedInputStreamTest { } } + @Test + public void testAfterReadConsumer() throws Exception { + final byte[] hello = "Hello".getBytes(StandardCharsets.UTF_8); + final AtomicBoolean boolRef = new AtomicBoolean(); + // @formatter:off + try (InputStream bounded = BoundedInputStream.builder() + .setInputStream(new ByteArrayInputStream(hello)) + .setMaxCount(hello.length) + .setAfterRead(i -> boolRef.set(true)) + .get()) { + IOUtils.consume(bounded); + } + // @formatter:on + assertTrue(boolRef.get()); + // Throwing + final String message = "test exception message"; + // @formatter:off + try (InputStream bounded = BoundedInputStream.builder() + .setInputStream(new ByteArrayInputStream(hello)) + .setMaxCount(hello.length) + .setAfterRead(i -> { + throw new CustomIOException(message); + }) + .get()) { + assertEquals(message, assertThrowsExactly(CustomIOException.class, () -> IOUtils.consume(bounded)).getMessage()); + } + // @formatter:on + } + + @Test + public void testOnMaxCountConsumer() throws Exception { + final byte[] hello = "Hello".getBytes(StandardCharsets.UTF_8); + final AtomicBoolean boolRef = new AtomicBoolean(); + // @formatter:off + try (BoundedInputStream bounded = BoundedInputStream.builder() + .setInputStream(new ByteArrayInputStream(hello)) + .setMaxCount(hello.length) + .setOnMaxCount(null) // should not blow up + .setOnMaxCount((m, c) -> boolRef.set(true)) + .get()) { + IOUtils.consume(bounded); + } + // @formatter:on + assertTrue(boolRef.get()); + // Throwing + final String message = "test exception message"; + // @formatter:off + try (BoundedInputStream bounded = BoundedInputStream.builder() + .setInputStream(new ByteArrayInputStream(hello)) + .setMaxCount(hello.length) + .setOnMaxCount((m, c) -> { + throw new CustomIOException(message); + }) + .get()) { + assertEquals(message, assertThrowsExactly(CustomIOException.class, () -> IOUtils.consume(bounded)).getMessage()); + } + // @formatter:on + } + @SuppressWarnings("deprecation") @Test public void testOnMaxLength() throws Exception { @@ -278,12 +353,11 @@ public class BoundedInputStreamTest { final byte[] hello = "Hello".getBytes(StandardCharsets.UTF_8); final AtomicBoolean boolRef = new AtomicBoolean(); // limit = length - try (BoundedInputStream bounded = new BoundedInputStream(new ByteArrayInputStream(helloWorld), helloWorld.length) { - @Override - protected void onMaxLength(final long max, final long readCount) { - boolRef.set(true); - } - }) { + try (BoundedInputStream bounded = BoundedInputStream.builder() + .setInputStream(new ByteArrayInputStream(helloWorld)) + .setMaxCount(helloWorld.length) + .setOnMaxCount((m, c) -> boolRef.set(true)) + .get()) { assertTrue(bounded.markSupported()); assertEquals(helloWorld.length, bounded.getMaxCount()); assertEquals(helloWorld.length, bounded.getMaxLength()); @@ -313,12 +387,11 @@ public class BoundedInputStreamTest { // limit > length boolRef.set(false); final int length2 = helloWorld.length + 1; - try (BoundedInputStream bounded = new BoundedInputStream(new ByteArrayInputStream(helloWorld), length2) { - @Override - protected void onMaxLength(final long max, final long readCount) { - boolRef.set(true); - } - }) { + try (BoundedInputStream bounded = BoundedInputStream.builder() + .setInputStream(new ByteArrayInputStream(helloWorld)) + .setMaxCount(length2) + .setOnMaxCount((m, c) -> boolRef.set(true)) + .get()) { assertTrue(bounded.markSupported()); assertEquals(length2, bounded.getMaxLength()); assertEquals(0, bounded.getCount()); @@ -343,12 +416,11 @@ public class BoundedInputStreamTest { } // limit < length boolRef.set(false); - try (BoundedInputStream bounded = new BoundedInputStream(new ByteArrayInputStream(helloWorld), hello.length) { - @Override - protected void onMaxLength(final long max, final long readCount) { - boolRef.set(true); - } - }) { + try (BoundedInputStream bounded = BoundedInputStream.builder() + .setInputStream(new ByteArrayInputStream(helloWorld)) + .setMaxCount(hello.length) + .setOnMaxCount((m, c) -> boolRef.set(true)) + .get()) { assertTrue(bounded.markSupported()); assertEquals(hello.length, bounded.getMaxLength()); assertEquals(0, bounded.getCount()); @@ -428,7 +500,8 @@ public class BoundedInputStreamTest { final byte[] helloWorld = "Hello World".getBytes(StandardCharsets.UTF_8); final byte[] hello = "Hello".getBytes(StandardCharsets.UTF_8); // limit = length - try (BoundedInputStream bounded = new BoundedInputStream(new ByteArrayInputStream(helloWorld), helloWorld.length)) { + try (BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(new ByteArrayInputStream(helloWorld)).setMaxCount(helloWorld.length) + .get()) { assertTrue(bounded.markSupported()); for (int i = 0; i < helloWorld.length; i++) { assertEquals(helloWorld[i], bounded.read(), "limit = length byte[" + i + "]"); @@ -438,7 +511,8 @@ public class BoundedInputStreamTest { assertTrue(bounded.markSupported()); } // limit > length - try (BoundedInputStream bounded = new BoundedInputStream(new ByteArrayInputStream(helloWorld), helloWorld.length + 1)) { + try (BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(new ByteArrayInputStream(helloWorld)).setMaxCount(helloWorld.length + 1) + .get()) { assertTrue(bounded.markSupported()); for (int i = 0; i < helloWorld.length; i++) { assertEquals(helloWorld[i], bounded.read(), "limit > length byte[" + i + "]"); @@ -448,7 +522,7 @@ public class BoundedInputStreamTest { assertTrue(bounded.markSupported()); } // limit < length - try (BoundedInputStream bounded = new BoundedInputStream(new ByteArrayInputStream(helloWorld), hello.length)) { + try (BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(new ByteArrayInputStream(helloWorld)).setMaxCount(hello.length).get()) { assertTrue(bounded.markSupported()); for (int i = 0; i < hello.length; i++) { assertEquals(hello[i], bounded.read(), "limit < length byte[" + i + "]");