This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
     new f043c2cb5b Minor tweaks
f043c2cb5b is described below

commit f043c2cb5bf9eb921dc104e7d9ebd8c3093ba684
Author: Daniel Sun <[email protected]>
AuthorDate: Sat Mar 14 17:14:04 2026 +0900

    Minor tweaks
---
 .../apache/groovy/parser/antlr4/AstBuilder.java    |  4 +++
 .../groovy/runtime/async/FlowPublisherAdapter.java | 18 ++++++++++
 src/spec/doc/core-async-await.adoc                 |  8 ++++-
 src/spec/test/AsyncAwaitSpecTest.groovy            | 20 ++++++++---
 .../runtime/async}/AsyncConcurrencyTest.groovy     |  3 +-
 .../codehaus/groovy/transform/AsyncApiTest.groovy  | 40 +++++++++++++++++++++-
 .../groovy/transform/AsyncAwaitSyntaxTest.groovy   | 36 ++++++++++++++++---
 .../groovy/transform/AsyncDeferFlowTest.groovy     | 31 +++++++++++------
 8 files changed, 138 insertions(+), 22 deletions(-)

diff --git a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java 
b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
index 6f9dfcc36b..198067cf4d 100644
--- a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
+++ b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
@@ -925,6 +925,10 @@ public class AstBuilder extends 
GroovyParserBaseVisitor<Object> {
 
     @Override
     public ExpressionStatement visitYieldReturnStmtAlt(final 
YieldReturnStmtAltContext ctx) {
+        if (asyncContextDepth == 0) {
+            throw createParsingFailedException(
+                    "`yield return` can only be used inside an async method or 
async closure", ctx);
+        }
         Expression expr = (Expression) this.visit(ctx.expression());
         Expression yieldCall = AsyncTransformHelper.buildYieldReturnCall(expr);
         return configureAST(new ExpressionStatement(yieldCall), ctx);
diff --git 
a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java 
b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
index fa94f02ac2..cf15cb91a6 100644
--- a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
+++ b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
@@ -68,6 +68,10 @@ import java.util.concurrent.atomic.AtomicReference;
  *       blocking {@code put()} with a non-blocking {@code offer()} fallback
  *       when the publisher thread is interrupted — this prevents both silent
  *       item loss and consumer deadlock even under unexpected interrupts</li>
+ *   <li>Terminal callbacks atomically close the upstream side
+ *       ({@code closedRef}) and clear/cancel the stored subscription.
+ *       This makes post-terminal {@code onNext} calls from non-compliant
+ *       publishers harmless and releases resources promptly.</li>
  *   <li>Back-pressure is enforced by requesting exactly one item after
  *       each consumed element; demand is signalled <em>before</em>
  *       {@code moveNext()} returns, preventing livelock when producer and
@@ -210,6 +214,9 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
                 // Publisher completed before emitting — resolve to null
                 if (done.compareAndSet(false, true)) {
                     cf.complete(null);
+                    // Mirror onNext/onError cleanup for prompt resource 
release.
+                    Flow.Subscription sub = subRef.getAndSet(null);
+                    if (sub != null) sub.cancel();
                 }
             }
         });
@@ -324,6 +331,10 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
 
             @Override
             public void onError(Throwable t) {
+                // First terminal signal wins. Ignore duplicate terminal 
callbacks.
+                if (!closedRef.compareAndSet(false, true)) {
+                    return;
+                }
                 // Cancel subscription eagerly to release upstream resources
                 Flow.Subscription sub = subRef.getAndSet(null);
                 if (sub != null) sub.cancel();
@@ -345,6 +356,13 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
 
             @Override
             public void onComplete() {
+                // First terminal signal wins. Ignore duplicate terminal 
callbacks.
+                if (!closedRef.compareAndSet(false, true)) {
+                    return;
+                }
+                // Clear subscription consistently with other terminal paths.
+                Flow.Subscription sub = subRef.getAndSet(null);
+                if (sub != null) sub.cancel();
                 try {
                     // Blocking put() guarantees the consumer will see the 
sentinel,
                     // even if the queue was temporarily full from buffered 
values.
diff --git a/src/spec/doc/core-async-await.adoc 
b/src/spec/doc/core-async-await.adoc
index fa43090a1e..d0ebd0d859 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -405,6 +405,10 @@ until the consumer is ready for the next value. This 
design means:
 interrupted, preventing resource leaks
 * The generator can mix `yield return` with `await`, performing I/O between 
yields
 
+NOTE: `yield return` can only appear inside an `async` method or `async` 
closure/lambda.
+Using it elsewhere produces a compile-time error:
+_"`yield return` can only be used inside an async method or async 
closure/lambda"_.
+
 === Basic Generator
 
 [source,groovy]
@@ -619,6 +623,9 @@ one item at a time; demand for the next element is 
signalled _before_ the consum
 `moveNext()` awaitable completes, so the publisher can begin producing the 
next value while
 the consumer processes the current one.
 
+The examples below use deterministic subscriber-handshake waiting with a 
bounded timeout
+before submitting values, avoiding timing races and unbounded waits in CI 
environments.
+
 === Awaiting a Single Value
 
 [source,groovy]
@@ -1277,4 +1284,3 @@ JavaScript, C#, Kotlin, and Swift, for developers 
familiar with those languages.
 | AwaitResult
 | `result.isSuccess()` / `result.isFailure()` / `result.getOrElse { fallback }`
 |===
-
diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy 
b/src/spec/test/AsyncAwaitSpecTest.groovy
index 945466fea4..b60988cb4a 100644
--- a/src/spec/test/AsyncAwaitSpecTest.groovy
+++ b/src/spec/test/AsyncAwaitSpecTest.groovy
@@ -699,10 +699,15 @@ assert 
await(Awaitable.completeOnTimeoutMillis(slowCall(), "cached", 50)) == "ca
 import java.util.concurrent.SubmissionPublisher
 
 def publisher = new SubmissionPublisher<String>()
-Thread.start {
-    while (publisher.numberOfSubscribers == 0) {
-        Thread.yield()
+def waitForSubscriber = { SubmissionPublisher pub, long timeoutMillis = 5_000L 
->
+    long deadline = System.nanoTime() + timeoutMillis * 1_000_000L
+    while (pub.numberOfSubscribers == 0 && System.nanoTime() < deadline) {
+        Thread.sleep(1)
     }
+    assert pub.numberOfSubscribers > 0 : "Timed out waiting for publisher 
subscription"
+}
+Thread.start {
+    waitForSubscriber(publisher)
     publisher.submit("hello from publisher")
     publisher.close()
 }
@@ -731,8 +736,15 @@ class StreamConsumer {
 
 def publisher = new SubmissionPublisher<Integer>()
 def future = new StreamConsumer().consumeAll(publisher)
+def waitForSubscriber = { SubmissionPublisher pub, long timeoutMillis = 5_000L 
->
+    long deadline = System.nanoTime() + timeoutMillis * 1_000_000L
+    while (pub.numberOfSubscribers == 0 && System.nanoTime() < deadline) {
+        Thread.sleep(1)
+    }
+    assert pub.numberOfSubscribers > 0 : "Timed out waiting for publisher 
subscription"
+}
 Thread.start {
-    Thread.sleep(50)
+    waitForSubscriber(publisher)
     (1..5).each { publisher.submit(it) }
     publisher.close()
 }
diff --git 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncConcurrencyTest.groovy 
b/src/test/groovy/org/apache/groovy/runtime/async/AsyncConcurrencyTest.groovy
similarity index 99%
rename from 
src/test/groovy/org/codehaus/groovy/transform/AsyncConcurrencyTest.groovy
rename to 
src/test/groovy/org/apache/groovy/runtime/async/AsyncConcurrencyTest.groovy
index 58be5d29c6..7e74e0bf82 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncConcurrencyTest.groovy
+++ 
b/src/test/groovy/org/apache/groovy/runtime/async/AsyncConcurrencyTest.groovy
@@ -17,9 +17,8 @@
  *  under the License.
  */
 
-package org.codehaus.groovy.transform
+package org.apache.groovy.runtime.async
 
-import org.apache.groovy.runtime.async.AsyncStreamGenerator
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Timeout
 import org.junit.jupiter.api.DisplayName
diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
index feaa8d1a89..07072bccbb 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
@@ -41,6 +41,7 @@ import java.util.concurrent.FutureTask
 import java.util.concurrent.SubmissionPublisher
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicReference
 
 import static groovy.test.GroovyAssert.shouldFail
 
@@ -1455,17 +1456,54 @@ class AsyncApiTest {
 
     @Test
     void testFlowPublisherToAwaitableOnCompleteWithoutValue() {
+        def cancelled = new AtomicBoolean(false)
         def pub = new Flow.Publisher<String>() {
             void subscribe(Flow.Subscriber<? super String> s) {
                 s.onSubscribe(new Flow.Subscription() {
                     void request(long n) {}
-                    void cancel() {}
+                    void cancel() { cancelled.set(true) }
                 })
                 s.onComplete()
             }
         }
         Awaitable<String> aw = Awaitable.from(pub)
         assert aw.get() == null
+        assert cancelled.get() : 'Subscription should be cancelled/cleared on 
terminal completion'
+    }
+
+    @Test
+    void testFlowPublisherAsyncStreamIgnoresLateOnNextAfterComplete() {
+        def producerRef = new AtomicReference<Thread>()
+        def pub = new Flow.Publisher<Integer>() {
+            void subscribe(Flow.Subscriber<? super Integer> s) {
+                s.onSubscribe(new Flow.Subscription() {
+                    void request(long n) {}
+                    void cancel() {}
+                })
+                producerRef.set(Thread.start {
+                    // Non-compliant publisher: emits onNext after terminal 
signal.
+                    s.onComplete()
+                    for (int i = 0; i < 1000; i++) {
+                        s.onNext(i)
+                    }
+                })
+            }
+        }
+        AsyncStream<Integer> stream = AsyncStream.from(pub)
+        Thread producer = producerRef.get()
+        assert producer != null
+        try {
+            producer.join(1000)
+            assert !producer.isAlive() : 'late onNext after terminal should 
not block producer'
+            assert stream.moveNext().get() == false
+            assert stream.moveNext().get() == false
+        } finally {
+            stream.close()
+            if (producer != null && producer.isAlive()) {
+                producer.interrupt()
+                producer.join(1000)
+            }
+        }
     }
 
     @Test
diff --git 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
index d0b93cb186..73acb454d2 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
@@ -1762,6 +1762,36 @@ class AsyncAwaitSyntaxTest {
         assert err.message.contains('defer') && err.message.contains('async')
     }
 
+    @Test
+    void testYieldReturnOutsideAsyncMethodFailsCompileTime() {
+        def err = shouldFail CompilationFailedException, '''
+            class Svc {
+                def normalMethod() {
+                    yield return 1
+                }
+            }
+        '''
+        assert err.message.contains('yield return') && 
err.message.contains('async')
+    }
+
+    @Test
+    void testYieldReturnOutsideAsyncClosureFailsCompileTime() {
+        def err = shouldFail CompilationFailedException, '''
+            def fn = { ->
+                yield return 1
+            }
+        '''
+        assert err.message.contains('yield return') && 
err.message.contains('async')
+    }
+
+    @Test
+    void testYieldReturnAtScriptTopLevelFailsCompileTime() {
+        def err = shouldFail CompilationFailedException, '''
+            yield return 1
+        '''
+        assert err.message.contains('yield return') && 
err.message.contains('async')
+    }
+
     @Test
     void testDeferInsideAsyncMethodSucceeds() {
         assertScript '''
@@ -2026,14 +2056,12 @@ class AsyncAwaitSyntaxTest {
     }
 
     @Test
-    void testYieldReturnWithAnnotation() {
+    void testYieldReturnInAsyncMethod() {
         assertScript '''
-            import groovy.transform.Async
             import groovy.concurrent.AsyncStream
 
             class Gen {
-                @Async
-                def items() {
+                async items() {
                     yield return "x"
                     yield return "y"
                 }
diff --git 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy
index 0919b5736b..38c876fc86 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy
@@ -33,11 +33,10 @@ import static groovy.test.GroovyAssert.assertScript
  * <p>{@code Flow.Publisher} instances are automatically adapted by the 
built-in
  * adapter, enabling seamless use with {@code await} and {@code for await}.
  *
- * <p><b>Test synchronisation:</b> Flow.Publisher tests use
+ * <p><b>Test synchronisation:</b> Flow.Publisher tests use bounded waits on
  * {@code SubmissionPublisher.getNumberOfSubscribers()} to wait until the
- * subscription handshake is complete before submitting items.  This
- * eliminates the race between subscription establishment and item
- * delivery that caused intermittent failures with hard-coded delays.
+ * subscription handshake is complete before submitting items. This keeps
+ * tests deterministic while preventing unbounded hangs on regressions.
  */
 class AsyncDeferFlowTest {
 
@@ -265,7 +264,9 @@ class AsyncDeferFlowTest {
             }
             def future = task()
             // Wait until the subscriber is registered with the publisher
-            while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+            def deadline = System.nanoTime() + 5_000_000_000L
+            while (publisher.getNumberOfSubscribers() == 0 && 
System.nanoTime() < deadline) { Thread.sleep(1) }
+            assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting 
for publisher subscription'
             subscribed.countDown()
             def result = await(future)
             assert result == 'hello'
@@ -288,7 +289,9 @@ class AsyncDeferFlowTest {
             def publisher = new 
java.util.concurrent.SubmissionPublisher<Integer>()
             def future = new FlowTest().consumePublisher(publisher)
             // Wait until the for-await loop has subscribed to the publisher
-            while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+            def deadline = System.nanoTime() + 5_000_000_000L
+            while (publisher.getNumberOfSubscribers() == 0 && 
System.nanoTime() < deadline) { Thread.sleep(1) }
+            assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting 
for publisher subscription'
             (1..5).each { publisher.submit(it) }
             publisher.close()
             def result = await(future)
@@ -312,7 +315,9 @@ class AsyncDeferFlowTest {
             def publisher = new 
java.util.concurrent.SubmissionPublisher<Integer>()
             def future = new FlowTest().consumeWithError(publisher)
             // Wait until the for-await loop has subscribed
-            while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+            def deadline = System.nanoTime() + 5_000_000_000L
+            while (publisher.getNumberOfSubscribers() == 0 && 
System.nanoTime() < deadline) { Thread.sleep(1) }
+            assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting 
for publisher subscription'
             publisher.submit(1)
             publisher.submit(2)
             publisher.closeExceptionally(new RuntimeException('stream-error'))
@@ -340,7 +345,9 @@ class AsyncDeferFlowTest {
             }
             def future = task()
             // Wait until the for-await loop has subscribed
-            while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+            def deadline = System.nanoTime() + 5_000_000_000L
+            while (publisher.getNumberOfSubscribers() == 0 && 
System.nanoTime() < deadline) { Thread.sleep(1) }
+            assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting 
for publisher subscription'
             ['a', 'b', 'c'].each { publisher.submit(it) }
             publisher.close()
             def result = await(future)
@@ -368,7 +375,9 @@ class AsyncDeferFlowTest {
             def publisher = new 
java.util.concurrent.SubmissionPublisher<Integer>()
             def future = new CombinedTest().processStream(publisher)
             // Wait until the for-await loop has subscribed
-            while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+            def deadline = System.nanoTime() + 5_000_000_000L
+            while (publisher.getNumberOfSubscribers() == 0 && 
System.nanoTime() < deadline) { Thread.sleep(1) }
+            assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting 
for publisher subscription'
             (1..3).each { publisher.submit(it) }
             publisher.close()
             def result = await(future)
@@ -388,7 +397,9 @@ class AsyncDeferFlowTest {
             }
             def future = task()
             // Wait until the subscriber is registered
-            while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+            def deadline = System.nanoTime() + 5_000_000_000L
+            while (publisher.getNumberOfSubscribers() == 0 && 
System.nanoTime() < deadline) { Thread.sleep(1) }
+            assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting 
for publisher subscription'
             publisher.submit(42)
             publisher.submit(99)  // second value ignored by await
             publisher.close()

Reply via email to