This is an automated email from the ASF dual-hosted git repository.
sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git
The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
new 8cbbea9552 Minor tweaks
8cbbea9552 is described below
commit 8cbbea9552fd3c28a9563d0b653deda330c4a6d5
Author: Daniel Sun <[email protected]>
AuthorDate: Sat Mar 21 20:24:25 2026 +0900
Minor tweaks
---
.../groovy/runtime/async/FlowPublisherAdapter.java | 17 +++--
.../groovy/transform/AsyncDeferFlowTest.groovy | 81 ++++++++++++++++++++++
2 files changed, 93 insertions(+), 5 deletions(-)
diff --git
a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
index 3c56af6bc1..e4102642dc 100644
--- a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
+++ b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
@@ -187,7 +187,9 @@ public class FlowPublisherAdapter implements
AwaitableAdapter {
@Override
public void onError(Throwable t) {
if (done.compareAndSet(false, true)) {
- cf.completeExceptionally(t);
+ // §2.13 requires non-null, but defend against
non-compliant publishers
+ cf.completeExceptionally(t != null ? t
+ : new NullPointerException("onError called with
null (Reactive Streams §2.13 violation)"));
// Cancel subscription to release resources (idempotent
per §3.7)
Flow.Subscription sub = subRef.getAndSet(null);
if (sub != null) sub.cancel();
@@ -319,16 +321,21 @@ public class FlowPublisherAdapter implements
AwaitableAdapter {
if (!queue.offer(new ValueSignal(item))) {
cancelSubscription();
closed.set(true);
- queue.offer(new ErrorSignal(
- new CancellationException(
- "Item delivery interrupted and
queue full")));
+ // Clear the queue to guarantee room for the
terminal signal;
+ // without this, a full queue could leave the
consumer blocked
+ // in take() with no signal to unblock it.
+ queue.clear();
+ queue.offer(COMPLETE);
}
}
}
@Override
public void onError(Throwable t) {
- putTerminalSignal(new ErrorSignal(t));
+ // §2.13 requires non-null, but defend against
non-compliant publishers
+ Throwable cause = t != null ? t
+ : new NullPointerException("onError called with
null (Reactive Streams §2.13 violation)");
+ putTerminalSignal(new ErrorSignal(cause));
}
@Override
diff --git
a/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy
index a875c13c30..aae15147e7 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy
@@ -519,4 +519,85 @@ class AsyncDeferFlowTest {
'''
}
+ // ---- Defensive handling: non-compliant Flow.Publisher ----
+
+ @Test
+ void testAwaitPublisherOnErrorNull() {
+ assertScript '''
+ import java.util.concurrent.Flow
+
+ // Non-compliant publisher that calls onError(null) — violates
§2.13
+ class NullErrorPublisher implements Flow.Publisher<String> {
+ void subscribe(Flow.Subscriber<? super String> subscriber) {
+ subscriber.onSubscribe(new Flow.Subscription() {
+ void request(long n) {
+ subscriber.onError(null)
+ }
+ void cancel() {}
+ })
+ }
+ }
+
+ class OnErrorNullTest {
+ Flow.Publisher pub
+ OnErrorNullTest(Flow.Publisher p) { this.pub = p }
+ async run() {
+ try {
+ await pub
+ assert false : 'Should have thrown'
+ } catch (NullPointerException e) {
+ assert e.message.contains('2.13')
+ return 'caught'
+ }
+ }
+ }
+
+ assert await(new OnErrorNullTest(new NullErrorPublisher()).run())
== 'caught'
+ '''
+ }
+
+ @Test
+ void testForAwaitPublisherOnErrorNull() {
+ assertScript '''
+ import java.util.concurrent.Flow
+
+ // Non-compliant publisher that emits one item then calls
onError(null)
+ class ItemThenNullErrorPublisher implements Flow.Publisher<String>
{
+ void subscribe(Flow.Subscriber<? super String> subscriber) {
+ subscriber.onSubscribe(new Flow.Subscription() {
+ boolean sent = false
+ void request(long n) {
+ if (!sent) {
+ sent = true
+ subscriber.onNext("item1")
+ } else {
+ subscriber.onError(null)
+ }
+ }
+ void cancel() {}
+ })
+ }
+ }
+
+ class ForAwaitOnErrorNullTest {
+ Flow.Publisher pub
+ ForAwaitOnErrorNullTest(Flow.Publisher p) { this.pub = p }
+ async run() {
+ def items = []
+ try {
+ for await (item in pub) {
+ items << item
+ }
+ assert false : 'Should have thrown'
+ } catch (NullPointerException e) {
+ assert e.message.contains('2.13')
+ }
+ return items
+ }
+ }
+
+ assert await(new ForAwaitOnErrorNullTest(new
ItemThenNullErrorPublisher()).run()) == ['item1']
+ '''
+ }
+
}