This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
     new 8cbbea9552 Minor tweaks
8cbbea9552 is described below

commit 8cbbea9552fd3c28a9563d0b653deda330c4a6d5
Author: Daniel Sun <[email protected]>
AuthorDate: Sat Mar 21 20:24:25 2026 +0900

    Minor tweaks
---
 .../groovy/runtime/async/FlowPublisherAdapter.java | 17 +++--
 .../groovy/transform/AsyncDeferFlowTest.groovy     | 81 ++++++++++++++++++++++
 2 files changed, 93 insertions(+), 5 deletions(-)

diff --git 
a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java 
b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
index 3c56af6bc1..e4102642dc 100644
--- a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
+++ b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
@@ -187,7 +187,9 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
             @Override
             public void onError(Throwable t) {
                 if (done.compareAndSet(false, true)) {
-                    cf.completeExceptionally(t);
+                    // §2.13 requires non-null, but defend against 
non-compliant publishers
+                    cf.completeExceptionally(t != null ? t
+                            : new NullPointerException("onError called with 
null (Reactive Streams §2.13 violation)"));
                     // Cancel subscription to release resources (idempotent 
per §3.7)
                     Flow.Subscription sub = subRef.getAndSet(null);
                     if (sub != null) sub.cancel();
@@ -319,16 +321,21 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
                         if (!queue.offer(new ValueSignal(item))) {
                             cancelSubscription();
                             closed.set(true);
-                            queue.offer(new ErrorSignal(
-                                    new CancellationException(
-                                            "Item delivery interrupted and 
queue full")));
+                            // Clear the queue to guarantee room for the 
terminal signal;
+                            // without this, a full queue could leave the 
consumer blocked
+                            // in take() with no signal to unblock it.
+                            queue.clear();
+                            queue.offer(COMPLETE);
                         }
                     }
                 }
 
                 @Override
                 public void onError(Throwable t) {
-                    putTerminalSignal(new ErrorSignal(t));
+                    // §2.13 requires non-null, but defend against 
non-compliant publishers
+                    Throwable cause = t != null ? t
+                            : new NullPointerException("onError called with 
null (Reactive Streams §2.13 violation)");
+                    putTerminalSignal(new ErrorSignal(cause));
                 }
 
                 @Override
diff --git 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy
index a875c13c30..aae15147e7 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy
@@ -519,4 +519,85 @@ class AsyncDeferFlowTest {
         '''
     }
 
+    // ---- Defensive handling: non-compliant Flow.Publisher ----
+
+    @Test
+    void testAwaitPublisherOnErrorNull() {
+        assertScript '''
+            import java.util.concurrent.Flow
+
+            // Non-compliant publisher that calls onError(null) — violates 
§2.13
+            class NullErrorPublisher implements Flow.Publisher<String> {
+                void subscribe(Flow.Subscriber<? super String> subscriber) {
+                    subscriber.onSubscribe(new Flow.Subscription() {
+                        void request(long n) {
+                            subscriber.onError(null)
+                        }
+                        void cancel() {}
+                    })
+                }
+            }
+
+            class OnErrorNullTest {
+                Flow.Publisher pub
+                OnErrorNullTest(Flow.Publisher p) { this.pub = p }
+                async run() {
+                    try {
+                        await pub
+                        assert false : 'Should have thrown'
+                    } catch (NullPointerException e) {
+                        assert e.message.contains('2.13')
+                        return 'caught'
+                    }
+                }
+            }
+
+            assert await(new OnErrorNullTest(new NullErrorPublisher()).run()) 
== 'caught'
+        '''
+    }
+
+    @Test
+    void testForAwaitPublisherOnErrorNull() {
+        assertScript '''
+            import java.util.concurrent.Flow
+
+            // Non-compliant publisher that emits one item then calls 
onError(null)
+            class ItemThenNullErrorPublisher implements Flow.Publisher<String> 
{
+                void subscribe(Flow.Subscriber<? super String> subscriber) {
+                    subscriber.onSubscribe(new Flow.Subscription() {
+                        boolean sent = false
+                        void request(long n) {
+                            if (!sent) {
+                                sent = true
+                                subscriber.onNext("item1")
+                            } else {
+                                subscriber.onError(null)
+                            }
+                        }
+                        void cancel() {}
+                    })
+                }
+            }
+
+            class ForAwaitOnErrorNullTest {
+                Flow.Publisher pub
+                ForAwaitOnErrorNullTest(Flow.Publisher p) { this.pub = p }
+                async run() {
+                    def items = []
+                    try {
+                        for await (item in pub) {
+                            items << item
+                        }
+                        assert false : 'Should have thrown'
+                    } catch (NullPointerException e) {
+                        assert e.message.contains('2.13')
+                    }
+                    return items
+                }
+            }
+
+            assert await(new ForAwaitOnErrorNullTest(new 
ItemThenNullErrorPublisher()).run()) == ['item1']
+        '''
+    }
+
 }

Reply via email to