This is an automated email from the ASF dual-hosted git repository.

ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-io.git

commit f79a3d66a3ac6d84ff0c757c9b18f7c91cae5811
Author: Gary Gregory <garydgreg...@gmail.com>
AuthorDate: Sat Nov 9 11:00:48 2024 -0500

    Add support to ObservableInputStream for setting a consumer for
    ProxyInputStream.afterRead(int)
---
 src/changes/changes.xml                            |  1 +
 .../commons/io/input/ObservableInputStream.java    | 41 ++++++++++++++++++++++
 .../io/input/ObservableInputStreamTest.java        | 29 +++++++++++++++
 3 files changed, 71 insertions(+)

diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 61019ee59..6f623c115 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -71,6 +71,7 @@ The <action> type attribute can be add,update,fix,remove.
       <action dev="ggregory" type="add" issue="IO-861" due-to="Gary 
Gregory">Add support to BoundedInputStream for setting a consumer for 
ProxyInputStream.afterRead(int).</action>
       <action dev="ggregory" type="add"                due-to="Gary 
Gregory">Add support to ChecksumInputStream for setting a consumer for 
ProxyInputStream.afterRead(int).</action>
       <action dev="ggregory" type="add"                due-to="Gary 
Gregory">Add support to ThrottledInputStream for setting a consumer for 
ProxyInputStream.afterRead(int).</action>
+      <action dev="ggregory" type="add"                due-to="Gary 
Gregory">Add support to ObservableInputStream for setting a consumer for 
ProxyInputStream.afterRead(int).</action>
       <!-- UPDATE -->
       <action dev="ggregory" type="update"             due-to="Gary 
Gregory">Bump org.apache.commons:commons-parent from 74 to 78 #670, #676, #679, 
#688.</action>
       <action dev="ggregory" type="update"             due-to="Gary 
Gregory">Bump commons.bytebuddy.version from 1.15.1 to 1.15.10 #672, #673, 
#685, #686, #694, #696, #698.</action>
diff --git 
a/src/main/java/org/apache/commons/io/input/ObservableInputStream.java 
b/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
index 5fd27b2c4..7be7b7a2c 100644
--- a/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
+++ b/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
@@ -43,6 +43,42 @@ import org.apache.commons.io.function.IOConsumer;
  */
 public class ObservableInputStream extends ProxyInputStream {
 
+    /**
+     * For subclassing builders from {@link BoundedInputStream} subclassses.
+     *
+     * @param <T> The subclass.
+     * @since 2.18.0
+     */
+    public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> 
extends ProxyInputStream.AbstractBuilder<ObservableInputStream, T> {
+
+        private List<Observer> observers;
+
+        /**
+         * Sets the list of observer callbacks.
+         *
+         * @param observers The list of observer callbacks.
+         */
+        public void setObservers(final List<Observer> observers) {
+            this.observers = observers;
+        }
+
+    }
+
+
+    /**
+     * Builds instances of {@link ObservableInputStream}.
+     *
+     * @since 2.18.0
+     */
+    public static class Builder extends AbstractBuilder<Builder> {
+
+        @Override
+        public ObservableInputStream get() throws IOException {
+            return new ObservableInputStream(this);
+        }
+
+    }
+
     /**
      * Abstracts observer callback for {@link ObservableInputStream}s.
      */
@@ -109,6 +145,11 @@ public class ObservableInputStream extends 
ProxyInputStream {
 
     private final List<Observer> observers;
 
+    ObservableInputStream(final AbstractBuilder builder) throws IOException {
+        super(builder);
+        this.observers = builder.observers;
+    }
+
     /**
      * Constructs a new ObservableInputStream for the given InputStream.
      *
diff --git 
a/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java 
b/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
index f45861403..d5bb39dd6 100644
--- a/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
+++ b/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
@@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.ByteArrayInputStream;
@@ -27,10 +28,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.ObservableInputStream.Observer;
 import org.apache.commons.io.output.NullOutputStream;
+import org.apache.commons.io.test.CustomIOException;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -143,6 +146,32 @@ public class ObservableInputStreamTest {
         return new ObservableInputStream(origin);
     }
 
+    @Test
+    public void testAfterReadConsumer() throws Exception {
+        final AtomicBoolean boolRef = new AtomicBoolean();
+        // @formatter:off
+        try (InputStream bounded = new ObservableInputStream.Builder()
+                .setCharSequence("Hi")
+                .setAfterRead(i -> boolRef.set(true))
+                .get()) {
+            IOUtils.consume(bounded);
+        }
+        // @formatter:on
+        assertTrue(boolRef.get());
+        // Throwing
+        final String message = "test exception message";
+        // @formatter:off
+        try (InputStream bounded = new ObservableInputStream.Builder()
+                .setCharSequence("Hi")
+                .setAfterRead(i -> {
+                    throw new CustomIOException(message);
+                })
+                .get()) {
+            assertEquals(message, assertThrowsExactly(CustomIOException.class, 
() -> IOUtils.consume(bounded)).getMessage());
+        }
+        // @formatter:on
+    }
+
     @SuppressWarnings("resource")
     @Test
     public void testAvailableAfterClose() throws Exception {

Reply via email to