This is an automated email from the ASF dual-hosted git repository. ggregory pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-io.git
commit f79a3d66a3ac6d84ff0c757c9b18f7c91cae5811 Author: Gary Gregory <garydgreg...@gmail.com> AuthorDate: Sat Nov 9 11:00:48 2024 -0500 Add support to ObservableInputStream for setting a consumer for ProxyInputStream.afterRead(int) --- src/changes/changes.xml | 1 + .../commons/io/input/ObservableInputStream.java | 41 ++++++++++++++++++++++ .../io/input/ObservableInputStreamTest.java | 29 +++++++++++++++ 3 files changed, 71 insertions(+) diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 61019ee59..6f623c115 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -71,6 +71,7 @@ The <action> type attribute can be add,update,fix,remove. <action dev="ggregory" type="add" issue="IO-861" due-to="Gary Gregory">Add support to BoundedInputStream for setting a consumer for ProxyInputStream.afterRead(int).</action> <action dev="ggregory" type="add" due-to="Gary Gregory">Add support to ChecksumInputStream for setting a consumer for ProxyInputStream.afterRead(int).</action> <action dev="ggregory" type="add" due-to="Gary Gregory">Add support to ThrottledInputStream for setting a consumer for ProxyInputStream.afterRead(int).</action> + <action dev="ggregory" type="add" due-to="Gary Gregory">Add support to ObservableInputStream for setting a consumer for ProxyInputStream.afterRead(int).</action> <!-- UPDATE --> <action dev="ggregory" type="update" due-to="Gary Gregory">Bump org.apache.commons:commons-parent from 74 to 78 #670, #676, #679, #688.</action> <action dev="ggregory" type="update" due-to="Gary Gregory">Bump commons.bytebuddy.version from 1.15.1 to 1.15.10 #672, #673, #685, #686, #694, #696, #698.</action> diff --git a/src/main/java/org/apache/commons/io/input/ObservableInputStream.java b/src/main/java/org/apache/commons/io/input/ObservableInputStream.java index 5fd27b2c4..7be7b7a2c 100644 --- a/src/main/java/org/apache/commons/io/input/ObservableInputStream.java +++ b/src/main/java/org/apache/commons/io/input/ObservableInputStream.java @@ -43,6 +43,42 @@ import org.apache.commons.io.function.IOConsumer; */ public class ObservableInputStream extends ProxyInputStream { + /** + * For subclassing builders from {@link BoundedInputStream} subclassses. + * + * @param <T> The subclass. + * @since 2.18.0 + */ + public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> extends ProxyInputStream.AbstractBuilder<ObservableInputStream, T> { + + private List<Observer> observers; + + /** + * Sets the list of observer callbacks. + * + * @param observers The list of observer callbacks. + */ + public void setObservers(final List<Observer> observers) { + this.observers = observers; + } + + } + + + /** + * Builds instances of {@link ObservableInputStream}. + * + * @since 2.18.0 + */ + public static class Builder extends AbstractBuilder<Builder> { + + @Override + public ObservableInputStream get() throws IOException { + return new ObservableInputStream(this); + } + + } + /** * Abstracts observer callback for {@link ObservableInputStream}s. */ @@ -109,6 +145,11 @@ public class ObservableInputStream extends ProxyInputStream { private final List<Observer> observers; + ObservableInputStream(final AbstractBuilder builder) throws IOException { + super(builder); + this.observers = builder.observers; + } + /** * Constructs a new ObservableInputStream for the given InputStream. * diff --git a/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java b/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java index f45861403..d5bb39dd6 100644 --- a/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java +++ b/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertThrowsExactly; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.ByteArrayInputStream; @@ -27,10 +28,12 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.IOUtils; import org.apache.commons.io.input.ObservableInputStream.Observer; import org.apache.commons.io.output.NullOutputStream; +import org.apache.commons.io.test.CustomIOException; import org.junit.jupiter.api.Test; /** @@ -143,6 +146,32 @@ public class ObservableInputStreamTest { return new ObservableInputStream(origin); } + @Test + public void testAfterReadConsumer() throws Exception { + final AtomicBoolean boolRef = new AtomicBoolean(); + // @formatter:off + try (InputStream bounded = new ObservableInputStream.Builder() + .setCharSequence("Hi") + .setAfterRead(i -> boolRef.set(true)) + .get()) { + IOUtils.consume(bounded); + } + // @formatter:on + assertTrue(boolRef.get()); + // Throwing + final String message = "test exception message"; + // @formatter:off + try (InputStream bounded = new ObservableInputStream.Builder() + .setCharSequence("Hi") + .setAfterRead(i -> { + throw new CustomIOException(message); + }) + .get()) { + assertEquals(message, assertThrowsExactly(CustomIOException.class, () -> IOUtils.consume(bounded)).getMessage()); + } + // @formatter:on + } + @SuppressWarnings("resource") @Test public void testAvailableAfterClose() throws Exception {