This is an automated email from the ASF dual-hosted git repository.
sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git
The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
new f043c2cb5b Minor tweaks
f043c2cb5b is described below
commit f043c2cb5bf9eb921dc104e7d9ebd8c3093ba684
Author: Daniel Sun <[email protected]>
AuthorDate: Sat Mar 14 17:14:04 2026 +0900
Minor tweaks
---
.../apache/groovy/parser/antlr4/AstBuilder.java | 4 +++
.../groovy/runtime/async/FlowPublisherAdapter.java | 18 ++++++++++
src/spec/doc/core-async-await.adoc | 8 ++++-
src/spec/test/AsyncAwaitSpecTest.groovy | 20 ++++++++---
.../runtime/async}/AsyncConcurrencyTest.groovy | 3 +-
.../codehaus/groovy/transform/AsyncApiTest.groovy | 40 +++++++++++++++++++++-
.../groovy/transform/AsyncAwaitSyntaxTest.groovy | 36 ++++++++++++++++---
.../groovy/transform/AsyncDeferFlowTest.groovy | 31 +++++++++++------
8 files changed, 138 insertions(+), 22 deletions(-)
diff --git a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
index 6f9dfcc36b..198067cf4d 100644
--- a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
+++ b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
@@ -925,6 +925,10 @@ public class AstBuilder extends
GroovyParserBaseVisitor<Object> {
@Override
public ExpressionStatement visitYieldReturnStmtAlt(final
YieldReturnStmtAltContext ctx) {
+ if (asyncContextDepth == 0) {
+ throw createParsingFailedException(
+ "`yield return` can only be used inside an async method or
async closure", ctx);
+ }
Expression expr = (Expression) this.visit(ctx.expression());
Expression yieldCall = AsyncTransformHelper.buildYieldReturnCall(expr);
return configureAST(new ExpressionStatement(yieldCall), ctx);
diff --git
a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
index fa94f02ac2..cf15cb91a6 100644
--- a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
+++ b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
@@ -68,6 +68,10 @@ import java.util.concurrent.atomic.AtomicReference;
* blocking {@code put()} with a non-blocking {@code offer()} fallback
* when the publisher thread is interrupted — this prevents both silent
* item loss and consumer deadlock even under unexpected interrupts</li>
+ * <li>Terminal callbacks atomically close the upstream side
+ * ({@code closedRef}) and clear/cancel the stored subscription.
+ * This makes post-terminal {@code onNext} calls from non-compliant
+ * publishers harmless and releases resources promptly.</li>
* <li>Back-pressure is enforced by requesting exactly one item after
* each consumed element; demand is signalled <em>before</em>
* {@code moveNext()} returns, preventing livelock when producer and
@@ -210,6 +214,9 @@ public class FlowPublisherAdapter implements
AwaitableAdapter {
// Publisher completed before emitting — resolve to null
if (done.compareAndSet(false, true)) {
cf.complete(null);
+ // Mirror onNext/onError cleanup for prompt resource
release.
+ Flow.Subscription sub = subRef.getAndSet(null);
+ if (sub != null) sub.cancel();
}
}
});
@@ -324,6 +331,10 @@ public class FlowPublisherAdapter implements
AwaitableAdapter {
@Override
public void onError(Throwable t) {
+ // First terminal signal wins. Ignore duplicate terminal
callbacks.
+ if (!closedRef.compareAndSet(false, true)) {
+ return;
+ }
// Cancel subscription eagerly to release upstream resources
Flow.Subscription sub = subRef.getAndSet(null);
if (sub != null) sub.cancel();
@@ -345,6 +356,13 @@ public class FlowPublisherAdapter implements
AwaitableAdapter {
@Override
public void onComplete() {
+ // First terminal signal wins. Ignore duplicate terminal
callbacks.
+ if (!closedRef.compareAndSet(false, true)) {
+ return;
+ }
+ // Clear subscription consistently with other terminal paths.
+ Flow.Subscription sub = subRef.getAndSet(null);
+ if (sub != null) sub.cancel();
try {
// Blocking put() guarantees the consumer will see the
sentinel,
// even if the queue was temporarily full from buffered
values.
diff --git a/src/spec/doc/core-async-await.adoc
b/src/spec/doc/core-async-await.adoc
index fa43090a1e..d0ebd0d859 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -405,6 +405,10 @@ until the consumer is ready for the next value. This
design means:
interrupted, preventing resource leaks
* The generator can mix `yield return` with `await`, performing I/O between
yields
+NOTE: `yield return` can only appear inside an `async` method or `async`
closure/lambda.
+Using it elsewhere produces a compile-time error:
+_"`yield return` can only be used inside an async method or async
closure/lambda"_.
+
=== Basic Generator
[source,groovy]
@@ -619,6 +623,9 @@ one item at a time; demand for the next element is
signalled _before_ the consum
`moveNext()` awaitable completes, so the publisher can begin producing the
next value while
the consumer processes the current one.
+The examples below use deterministic subscriber-handshake waiting with a
bounded timeout
+before submitting values, avoiding timing races and unbounded waits in CI
environments.
+
=== Awaiting a Single Value
[source,groovy]
@@ -1277,4 +1284,3 @@ JavaScript, C#, Kotlin, and Swift, for developers
familiar with those languages.
| AwaitResult
| `result.isSuccess()` / `result.isFailure()` / `result.getOrElse { fallback }`
|===
-
diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy
b/src/spec/test/AsyncAwaitSpecTest.groovy
index 945466fea4..b60988cb4a 100644
--- a/src/spec/test/AsyncAwaitSpecTest.groovy
+++ b/src/spec/test/AsyncAwaitSpecTest.groovy
@@ -699,10 +699,15 @@ assert
await(Awaitable.completeOnTimeoutMillis(slowCall(), "cached", 50)) == "ca
import java.util.concurrent.SubmissionPublisher
def publisher = new SubmissionPublisher<String>()
-Thread.start {
- while (publisher.numberOfSubscribers == 0) {
- Thread.yield()
+def waitForSubscriber = { SubmissionPublisher pub, long timeoutMillis = 5_000L
->
+ long deadline = System.nanoTime() + timeoutMillis * 1_000_000L
+ while (pub.numberOfSubscribers == 0 && System.nanoTime() < deadline) {
+ Thread.sleep(1)
}
+ assert pub.numberOfSubscribers > 0 : "Timed out waiting for publisher
subscription"
+}
+Thread.start {
+ waitForSubscriber(publisher)
publisher.submit("hello from publisher")
publisher.close()
}
@@ -731,8 +736,15 @@ class StreamConsumer {
def publisher = new SubmissionPublisher<Integer>()
def future = new StreamConsumer().consumeAll(publisher)
+def waitForSubscriber = { SubmissionPublisher pub, long timeoutMillis = 5_000L
->
+ long deadline = System.nanoTime() + timeoutMillis * 1_000_000L
+ while (pub.numberOfSubscribers == 0 && System.nanoTime() < deadline) {
+ Thread.sleep(1)
+ }
+ assert pub.numberOfSubscribers > 0 : "Timed out waiting for publisher
subscription"
+}
Thread.start {
- Thread.sleep(50)
+ waitForSubscriber(publisher)
(1..5).each { publisher.submit(it) }
publisher.close()
}
diff --git
a/src/test/groovy/org/codehaus/groovy/transform/AsyncConcurrencyTest.groovy
b/src/test/groovy/org/apache/groovy/runtime/async/AsyncConcurrencyTest.groovy
similarity index 99%
rename from
src/test/groovy/org/codehaus/groovy/transform/AsyncConcurrencyTest.groovy
rename to
src/test/groovy/org/apache/groovy/runtime/async/AsyncConcurrencyTest.groovy
index 58be5d29c6..7e74e0bf82 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncConcurrencyTest.groovy
+++
b/src/test/groovy/org/apache/groovy/runtime/async/AsyncConcurrencyTest.groovy
@@ -17,9 +17,8 @@
* under the License.
*/
-package org.codehaus.groovy.transform
+package org.apache.groovy.runtime.async
-import org.apache.groovy.runtime.async.AsyncStreamGenerator
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.DisplayName
diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
index feaa8d1a89..07072bccbb 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
@@ -41,6 +41,7 @@ import java.util.concurrent.FutureTask
import java.util.concurrent.SubmissionPublisher
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicReference
import static groovy.test.GroovyAssert.shouldFail
@@ -1455,17 +1456,54 @@ class AsyncApiTest {
@Test
void testFlowPublisherToAwaitableOnCompleteWithoutValue() {
+ def cancelled = new AtomicBoolean(false)
def pub = new Flow.Publisher<String>() {
void subscribe(Flow.Subscriber<? super String> s) {
s.onSubscribe(new Flow.Subscription() {
void request(long n) {}
- void cancel() {}
+ void cancel() { cancelled.set(true) }
})
s.onComplete()
}
}
Awaitable<String> aw = Awaitable.from(pub)
assert aw.get() == null
+ assert cancelled.get() : 'Subscription should be cancelled/cleared on
terminal completion'
+ }
+
+ @Test
+ void testFlowPublisherAsyncStreamIgnoresLateOnNextAfterComplete() {
+ def producerRef = new AtomicReference<Thread>()
+ def pub = new Flow.Publisher<Integer>() {
+ void subscribe(Flow.Subscriber<? super Integer> s) {
+ s.onSubscribe(new Flow.Subscription() {
+ void request(long n) {}
+ void cancel() {}
+ })
+ producerRef.set(Thread.start {
+ // Non-compliant publisher: emits onNext after terminal
signal.
+ s.onComplete()
+ for (int i = 0; i < 1000; i++) {
+ s.onNext(i)
+ }
+ })
+ }
+ }
+ AsyncStream<Integer> stream = AsyncStream.from(pub)
+ Thread producer = producerRef.get()
+ assert producer != null
+ try {
+ producer.join(1000)
+ assert !producer.isAlive() : 'late onNext after terminal should
not block producer'
+ assert stream.moveNext().get() == false
+ assert stream.moveNext().get() == false
+ } finally {
+ stream.close()
+ if (producer != null && producer.isAlive()) {
+ producer.interrupt()
+ producer.join(1000)
+ }
+ }
}
@Test
diff --git
a/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
index d0b93cb186..73acb454d2 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
@@ -1762,6 +1762,36 @@ class AsyncAwaitSyntaxTest {
assert err.message.contains('defer') && err.message.contains('async')
}
+ @Test
+ void testYieldReturnOutsideAsyncMethodFailsCompileTime() {
+ def err = shouldFail CompilationFailedException, '''
+ class Svc {
+ def normalMethod() {
+ yield return 1
+ }
+ }
+ '''
+ assert err.message.contains('yield return') &&
err.message.contains('async')
+ }
+
+ @Test
+ void testYieldReturnOutsideAsyncClosureFailsCompileTime() {
+ def err = shouldFail CompilationFailedException, '''
+ def fn = { ->
+ yield return 1
+ }
+ '''
+ assert err.message.contains('yield return') &&
err.message.contains('async')
+ }
+
+ @Test
+ void testYieldReturnAtScriptTopLevelFailsCompileTime() {
+ def err = shouldFail CompilationFailedException, '''
+ yield return 1
+ '''
+ assert err.message.contains('yield return') &&
err.message.contains('async')
+ }
+
@Test
void testDeferInsideAsyncMethodSucceeds() {
assertScript '''
@@ -2026,14 +2056,12 @@ class AsyncAwaitSyntaxTest {
}
@Test
- void testYieldReturnWithAnnotation() {
+ void testYieldReturnInAsyncMethod() {
assertScript '''
- import groovy.transform.Async
import groovy.concurrent.AsyncStream
class Gen {
- @Async
- def items() {
+ async items() {
yield return "x"
yield return "y"
}
diff --git
a/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy
index 0919b5736b..38c876fc86 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy
@@ -33,11 +33,10 @@ import static groovy.test.GroovyAssert.assertScript
* <p>{@code Flow.Publisher} instances are automatically adapted by the
built-in
* adapter, enabling seamless use with {@code await} and {@code for await}.
*
- * <p><b>Test synchronisation:</b> Flow.Publisher tests use
+ * <p><b>Test synchronisation:</b> Flow.Publisher tests use bounded waits on
* {@code SubmissionPublisher.getNumberOfSubscribers()} to wait until the
- * subscription handshake is complete before submitting items. This
- * eliminates the race between subscription establishment and item
- * delivery that caused intermittent failures with hard-coded delays.
+ * subscription handshake is complete before submitting items. This keeps
+ * tests deterministic while preventing unbounded hangs on regressions.
*/
class AsyncDeferFlowTest {
@@ -265,7 +264,9 @@ class AsyncDeferFlowTest {
}
def future = task()
// Wait until the subscriber is registered with the publisher
- while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+ def deadline = System.nanoTime() + 5_000_000_000L
+ while (publisher.getNumberOfSubscribers() == 0 &&
System.nanoTime() < deadline) { Thread.sleep(1) }
+ assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting
for publisher subscription'
subscribed.countDown()
def result = await(future)
assert result == 'hello'
@@ -288,7 +289,9 @@ class AsyncDeferFlowTest {
def publisher = new
java.util.concurrent.SubmissionPublisher<Integer>()
def future = new FlowTest().consumePublisher(publisher)
// Wait until the for-await loop has subscribed to the publisher
- while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+ def deadline = System.nanoTime() + 5_000_000_000L
+ while (publisher.getNumberOfSubscribers() == 0 &&
System.nanoTime() < deadline) { Thread.sleep(1) }
+ assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting
for publisher subscription'
(1..5).each { publisher.submit(it) }
publisher.close()
def result = await(future)
@@ -312,7 +315,9 @@ class AsyncDeferFlowTest {
def publisher = new
java.util.concurrent.SubmissionPublisher<Integer>()
def future = new FlowTest().consumeWithError(publisher)
// Wait until the for-await loop has subscribed
- while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+ def deadline = System.nanoTime() + 5_000_000_000L
+ while (publisher.getNumberOfSubscribers() == 0 &&
System.nanoTime() < deadline) { Thread.sleep(1) }
+ assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting
for publisher subscription'
publisher.submit(1)
publisher.submit(2)
publisher.closeExceptionally(new RuntimeException('stream-error'))
@@ -340,7 +345,9 @@ class AsyncDeferFlowTest {
}
def future = task()
// Wait until the for-await loop has subscribed
- while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+ def deadline = System.nanoTime() + 5_000_000_000L
+ while (publisher.getNumberOfSubscribers() == 0 &&
System.nanoTime() < deadline) { Thread.sleep(1) }
+ assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting
for publisher subscription'
['a', 'b', 'c'].each { publisher.submit(it) }
publisher.close()
def result = await(future)
@@ -368,7 +375,9 @@ class AsyncDeferFlowTest {
def publisher = new
java.util.concurrent.SubmissionPublisher<Integer>()
def future = new CombinedTest().processStream(publisher)
// Wait until the for-await loop has subscribed
- while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+ def deadline = System.nanoTime() + 5_000_000_000L
+ while (publisher.getNumberOfSubscribers() == 0 &&
System.nanoTime() < deadline) { Thread.sleep(1) }
+ assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting
for publisher subscription'
(1..3).each { publisher.submit(it) }
publisher.close()
def result = await(future)
@@ -388,7 +397,9 @@ class AsyncDeferFlowTest {
}
def future = task()
// Wait until the subscriber is registered
- while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
+ def deadline = System.nanoTime() + 5_000_000_000L
+ while (publisher.getNumberOfSubscribers() == 0 &&
System.nanoTime() < deadline) { Thread.sleep(1) }
+ assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting
for publisher subscription'
publisher.submit(42)
publisher.submit(99) // second value ignored by await
publisher.close()