This is an automated email from the ASF dual-hosted git repository.
sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git
The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
new c69441c4b5 Minor tweaks
c69441c4b5 is described below
commit c69441c4b5fec2e025e7945ccd9b5362d14889a6
Author: Daniel Sun <[email protected]>
AuthorDate: Sun Mar 8 00:20:20 2026 +0900
Minor tweaks
---
src/main/java/groovy/concurrent/AsyncStream.java | 18 +-
src/main/java/groovy/concurrent/Awaitable.java | 194 +++++++++++++++++++++
.../concurrent/AwaitableAdapterRegistry.java | 48 ++++-
.../apache/groovy/parser/antlr4/AstBuilder.java | 4 +-
.../groovy/runtime/async/AsyncStreamGenerator.java | 118 +++++++++++--
.../apache/groovy/runtime/async/AsyncSupport.java | 119 +++++++++++++
.../apache/groovy/runtime/async/GroovyPromise.java | 24 ++-
src/spec/doc/core-async-await.adoc | 108 ++++++++++--
src/spec/test/AsyncAwaitSpecTest.groovy | 59 ++++++-
.../groovy/transform/AsyncAwaitSyntaxTest.groovy | 81 +++++++++
.../groovy/transform/AsyncCoverageTest.groovy | 139 +++++++++++++++
11 files changed, 870 insertions(+), 42 deletions(-)
diff --git a/src/main/java/groovy/concurrent/AsyncStream.java
b/src/main/java/groovy/concurrent/AsyncStream.java
index d6acfc5beb..f9c0ca89c9 100644
--- a/src/main/java/groovy/concurrent/AsyncStream.java
+++ b/src/main/java/groovy/concurrent/AsyncStream.java
@@ -44,7 +44,7 @@ package groovy.concurrent;
* @see AwaitableAdapterRegistry
* @since 6.0.0
*/
-public interface AsyncStream<T> {
+public interface AsyncStream<T> extends AutoCloseable {
/**
* Asynchronously advances to the next element. Returns an {@link
Awaitable}
@@ -59,6 +59,22 @@ public interface AsyncStream<T> {
*/
T getCurrent();
+ /**
+ * Closes the stream and releases any associated resources.
+ * <p>
+ * The default implementation is a no-op. Implementations that bridge to
+ * generators, publishers, or other resource-owning sources may override
+ * this to propagate cancellation upstream. Compiler-generated
+ * {@code for await} loops invoke {@code close()} automatically from a
+ * {@code finally} block, including on early {@code break}, {@code return},
+ * and exceptional exit.
+ *
+ * @since 6.0.0
+ */
+ @Override
+ default void close() {
+ }
+
/**
* Returns an empty {@code AsyncStream} that completes immediately.
*/
diff --git a/src/main/java/groovy/concurrent/Awaitable.java
b/src/main/java/groovy/concurrent/Awaitable.java
index 7cb541f7b2..4640bebcef 100644
--- a/src/main/java/groovy/concurrent/Awaitable.java
+++ b/src/main/java/groovy/concurrent/Awaitable.java
@@ -28,6 +28,9 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
import java.util.function.Function;
/**
@@ -51,6 +54,10 @@ import java.util.function.Function;
* {@code Promise.allSettled()}</li>
* <li>{@link #delay(long) Awaitable.delay(ms)} — like
* {@code Task.Delay()} / {@code setTimeout}</li>
+ * <li>{@link #timeout(Object, long) Awaitable.timeout(task, ms)} — like
+ * Kotlin's {@code withTimeout} or a JavaScript promise raced against a
timer</li>
+ * <li>{@link #timeoutOr(Object, Object, long) Awaitable.timeoutOr(task,
fallback, ms)} —
+ * like a timeout with fallback/default value</li>
* </ul>
* <p>
* <b>Static factories:</b>
@@ -61,6 +68,17 @@ import java.util.function.Function;
* {@code Task.FromException()} / {@code Promise.reject()}</li>
* </ul>
* <p>
+ * <b>Instance continuations</b> provide ergonomic composition without exposing
+ * raw {@link CompletableFuture} APIs:
+ * <ul>
+ * <li>{@link #then(Function)} and {@link #thenCompose(Function)} for
success chaining</li>
+ * <li>{@link #thenAccept(Consumer)} for side-effecting continuations</li>
+ * <li>{@link #exceptionally(Function)}, {@link #whenComplete(BiConsumer)},
+ * and {@link #handle(BiFunction)} for failure/completion handling</li>
+ * <li>{@link #orTimeout(long, TimeUnit)} and
+ * {@link #completeOnTimeout(Object, long, TimeUnit)} for deadline
composition</li>
+ * </ul>
+ * <p>
* Third-party frameworks (RxJava, Reactor, etc.) can integrate by registering
* an {@link AwaitableAdapter} via {@link AwaitableAdapterRegistry}.
* <p>
@@ -121,12 +139,124 @@ public interface Awaitable<T> {
*/
<U> Awaitable<U> thenCompose(Function<? super T, ? extends Awaitable<U>>
fn);
+ /**
+ * Returns a new {@code Awaitable} that, when this one completes normally,
+ * invokes the given consumer and completes with {@code null}.
+ * <p>
+ * This is analogous to JavaScript's {@code promise.then(v =>
sideEffect(v))}
+ * when the result value is no longer needed, and to the consumer-oriented
+ * continuations commonly used with C#, Kotlin, and Swift async APIs.
+ *
+ * @param action the side-effecting consumer to invoke on success
+ * @return a new awaitable that completes after the action runs
+ * @since 6.0.0
+ */
+ default Awaitable<Void> thenAccept(Consumer<? super T> action) {
+ return GroovyPromise.of(toCompletableFuture().thenAccept(action));
+ }
+
/**
* Returns a new {@code Awaitable} that, if this one completes
exceptionally,
* applies the given function to the exception to produce a recovery value.
*/
Awaitable<T> exceptionally(Function<Throwable, ? extends T> fn);
+ /**
+ * Returns a new {@code Awaitable} that invokes the given action when this
+ * computation completes, regardless of success or failure.
+ * <p>
+ * The supplied throwable is transparently unwrapped so handlers see the
+ * original failure rather than {@link ExecutionException} /
+ * {@link java.util.concurrent.CompletionException} wrappers.
+ * This is analogous to JavaScript's {@code Promise.prototype.finally()}
and
+ * to completion callbacks in C#, Kotlin, and Swift.
+ *
+ * @param action the completion callback receiving the result or failure
+ * @return a new awaitable that completes with the original result
+ * @since 6.0.0
+ */
+ default Awaitable<T> whenComplete(BiConsumer<? super T, ? super Throwable>
action) {
+ return GroovyPromise.of(toCompletableFuture().whenComplete((value,
error) ->
+ action.accept(value, error == null ? null :
AsyncSupport.deepUnwrap(error))));
+ }
+
+ /**
+ * Returns a new {@code Awaitable} that handles both the successful and the
+ * exceptional completion paths in a single continuation.
+ * <p>
+ * The supplied throwable is transparently unwrapped so the handler sees
the
+ * original failure. This mirrors Java's {@code
CompletableFuture.handle()},
+ * while providing the same “single place for success/failure projection”
+ * convenience commonly used in C#, Kotlin, and Swift async code.
+ *
+ * @param fn the handler receiving either the value or the failure
+ * @param <U> the projected result type
+ * @return a new awaitable holding the handler's result
+ * @since 6.0.0
+ */
+ default <U> Awaitable<U> handle(BiFunction<? super T, Throwable, ? extends
U> fn) {
+ return GroovyPromise.of(toCompletableFuture().handle((value, error) ->
+ fn.apply(value, error == null ? null :
AsyncSupport.deepUnwrap(error))));
+ }
+
+ /**
+ * Returns a new {@code Awaitable} that fails with {@link TimeoutException}
+ * if this computation does not complete within the specified duration.
+ * <p>
+ * Unlike {@link #get(long, TimeUnit)}, this is a non-blocking, composable
+ * timeout combinator: it returns another {@code Awaitable} that can itself
+ * be awaited, chained, or passed to {@link #all(Object...)} / {@link
#any(Object...)}.
+ * This plays a role similar to Kotlin's {@code withTimeout} while
+ * preserving Groovy's awaitable abstraction.
+ *
+ * @param duration the timeout duration in milliseconds
+ * @return a new awaitable with timeout semantics
+ * @since 6.0.0
+ */
+ default Awaitable<T> orTimeout(long duration) {
+ return Awaitable.timeout(this, duration, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Returns a new {@code Awaitable} that fails with {@link TimeoutException}
+ * if this computation does not complete within the specified duration.
+ *
+ * @param duration the timeout duration
+ * @param unit the time unit
+ * @return a new awaitable with timeout semantics
+ * @since 6.0.0
+ */
+ default Awaitable<T> orTimeout(long duration, TimeUnit unit) {
+ return Awaitable.timeout(this, duration, unit);
+ }
+
+ /**
+ * Returns a new {@code Awaitable} that completes with the supplied
fallback
+ * value if this computation does not finish before the timeout expires.
+ *
+ * @param fallback the value to use when the timeout expires
+ * @param duration the timeout duration in milliseconds
+ * @return a new awaitable that yields either the original result or the
fallback
+ * @since 6.0.0
+ */
+ default Awaitable<T> completeOnTimeout(T fallback, long duration) {
+ return Awaitable.timeoutOr(this, fallback, duration,
TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Returns a new {@code Awaitable} that completes with the supplied
fallback
+ * value if this computation does not finish before the timeout expires.
+ *
+ * @param fallback the value to use when the timeout expires
+ * @param duration the timeout duration
+ * @param unit the time unit
+ * @return a new awaitable that yields either the original result or the
fallback
+ * @since 6.0.0
+ */
+ default Awaitable<T> completeOnTimeout(T fallback, long duration, TimeUnit
unit) {
+ return Awaitable.timeoutOr(this, fallback, duration, unit);
+ }
+
/**
* Converts this {@code Awaitable} to a JDK {@link CompletableFuture}
* for interoperability with APIs that require it.
@@ -236,6 +366,70 @@ public interface Awaitable<T> {
return AsyncSupport.delay(duration, unit);
}
+ /**
+ * Adapts the given source to an {@code Awaitable} and applies a
non-blocking
+ * timeout to it.
+ * <p>
+ * The source may be a Groovy {@link Awaitable}, a JDK
+ * {@link CompletableFuture}/{@link java.util.concurrent.CompletionStage},
+ * or any type supported by {@link AwaitableAdapterRegistry}. This
provides
+ * a concise timeout combinator analogous to Kotlin's {@code withTimeout},
+ * but as a value-level operation that returns another awaitable.
+ *
+ * @param source the async source to time out
+ * @param duration the timeout duration in milliseconds
+ * @return a new awaitable that fails with {@link TimeoutException} on
timeout
+ * @since 6.0.0
+ */
+ static <T> Awaitable<T> timeout(Object source, long duration) {
+ return AsyncSupport.timeout(source, duration, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Adapts the given source to an {@code Awaitable} and applies a
non-blocking
+ * timeout to it.
+ *
+ * @param source the async source to time out
+ * @param duration the timeout duration
+ * @param unit the time unit
+ * @return a new awaitable that fails with {@link TimeoutException} on
timeout
+ * @since 6.0.0
+ */
+ static <T> Awaitable<T> timeout(Object source, long duration, TimeUnit
unit) {
+ return AsyncSupport.timeout(source, duration, unit);
+ }
+
+ /**
+ * Adapts the given source to an {@code Awaitable} and returns a new
+ * awaitable that yields the supplied fallback value if the timeout expires
+ * first.
+ *
+ * @param source the async source to wait for
+ * @param fallback the fallback value to use on timeout
+ * @param duration the timeout duration in milliseconds
+ * @return a new awaitable yielding either the original result or the
fallback
+ * @since 6.0.0
+ */
+ static <T> Awaitable<T> timeoutOr(Object source, T fallback, long
duration) {
+ return AsyncSupport.timeoutOr(source, fallback, duration,
TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Adapts the given source to an {@code Awaitable} and returns a new
+ * awaitable that yields the supplied fallback value if the timeout expires
+ * first.
+ *
+ * @param source the async source to wait for
+ * @param fallback the fallback value to use on timeout
+ * @param duration the timeout duration
+ * @param unit the time unit
+ * @return a new awaitable yielding either the original result or the
fallback
+ * @since 6.0.0
+ */
+ static <T> Awaitable<T> timeoutOr(Object source, T fallback, long
duration, TimeUnit unit) {
+ return AsyncSupport.timeoutOr(source, fallback, duration, unit);
+ }
+
// ---- Executor configuration ----
/**
diff --git a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
index 174986e017..291505974a 100644
--- a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
+++ b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -45,7 +46,8 @@ import java.util.concurrent.atomic.AtomicReference;
* <li>{@link Future} (adapted via a blocking wrapper)</li>
* <li>JDK {@link Flow.Publisher} — single-value
* ({@link #toAwaitable}) and multi-value ({@link #toAsyncStream})
- * with backpressure support</li>
+ * with backpressure support and upstream cancellation when the
+ * resulting {@link AsyncStream} is {@linkplain AsyncStream#close()
closed}</li>
* </ul>
* <p>
* Additional adapters can be registered at runtime via {@link #register}.
@@ -283,41 +285,58 @@ public class AwaitableAdapterRegistry {
private static <T> AsyncStream<T>
publisherToAsyncStream(Flow.Publisher<T> publisher) {
BlockingQueue<Object> queue = new LinkedBlockingQueue<>(256);
AtomicReference<Flow.Subscription> subRef = new
AtomicReference<>();
+ AtomicBoolean closedRef = new AtomicBoolean(false);
publisher.subscribe(new Flow.Subscriber<T>() {
@Override
public void onSubscribe(Flow.Subscription s) {
- subRef.set(s);
- s.request(1);
+ if (!closedRef.get()) {
+ subRef.set(s);
+ s.request(1);
+ } else {
+ s.cancel();
+ }
}
@Override
public void onNext(T item) {
- queue.add(new ValueSignal<>(item));
+ if (!closedRef.get()) {
+ queue.offer(new ValueSignal<>(item));
+ }
}
@Override
public void onError(Throwable t) {
- queue.add(new ErrorSignal(t));
+ if (!closedRef.get()) {
+ queue.offer(new ErrorSignal(t));
+ }
}
@Override
public void onComplete() {
- queue.add(COMPLETE_SENTINEL);
+ if (!closedRef.get()) {
+ queue.offer(COMPLETE_SENTINEL);
+ }
}
});
return new AsyncStream<T>() {
private volatile T current;
+ private final AtomicBoolean streamClosed = new
AtomicBoolean(false);
@Override
public Awaitable<Boolean> moveNext() {
+ if (streamClosed.get()) {
+ return Awaitable.of(false);
+ }
CompletableFuture<Boolean> cf = new CompletableFuture<>();
try {
Object signal = queue.take();
if (signal == COMPLETE_SENTINEL) {
+ streamClosed.set(true);
cf.complete(false);
} else if (signal instanceof ErrorSignal es) {
+ streamClosed.set(true);
cf.completeExceptionally(es.error());
} else if (signal instanceof ValueSignal<?> vs) {
current = (T) vs.value();
@@ -326,6 +345,9 @@ public class AwaitableAdapterRegistry {
if (sub != null) sub.request(1);
}
} catch (InterruptedException e) {
+ if (streamClosed.get()) {
+ return Awaitable.of(false);
+ }
// Throw directly instead of storing in the
CompletableFuture.
// On JDK 23+, CF.get() wraps stored
CancellationExceptions in a
// new CancellationException("get"), discarding our
message and
@@ -344,6 +366,20 @@ public class AwaitableAdapterRegistry {
public T getCurrent() {
return current;
}
+
+ @Override
+ public void close() {
+ if (!streamClosed.compareAndSet(false, true)) {
+ return;
+ }
+ closedRef.set(true);
+ Flow.Subscription subscription = subRef.getAndSet(null);
+ if (subscription != null) {
+ subscription.cancel();
+ }
+ queue.clear();
+ queue.offer(COMPLETE_SENTINEL);
+ }
};
}
diff --git a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
index 01e37f88a8..3f221e35d8 100644
--- a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
+++ b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
@@ -525,8 +525,10 @@ public class AstBuilder extends
GroovyParserBaseVisitor<Object> {
BlockStatement whileBody = block(getItemStmt, loopBody);
WhileStatement whileStmt = new WhileStatement(condition, whileBody);
+ Expression closeCall =
AsyncTransformHelper.buildCloseStreamCall(varX(streamVar));
+ TryCatchStatement tryFinally = new TryCatchStatement(whileStmt,
block(new ExpressionStatement(closeCall)));
- return configureAST(block(streamDecl, whileStmt), ctx);
+ return configureAST(block(streamDecl, tryFinally), ctx);
}
@Override
diff --git
a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
index 39c7a5d251..7893556035 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
@@ -23,6 +23,8 @@ import groovy.concurrent.Awaitable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
/**
* A producer/consumer implementation of {@link AsyncStream} used by
@@ -38,6 +40,10 @@ import java.util.concurrent.SynchronousQueue;
* consumer has consumed the previous element (mirroring C#'s async
* iterator suspension semantics).
* <p>
+ * Calling {@link #close()} marks the stream as closed and interrupts any
+ * currently blocked producer/consumer threads so that early termination from
+ * {@code for await} can cancel the generator promptly.
+ * <p>
* This class is an internal implementation detail and should not be referenced
* directly by user code.
*
@@ -49,21 +55,57 @@ public class AsyncStreamGenerator<T> implements
AsyncStream<T> {
private static final Object DONE = new Object();
private final SynchronousQueue<Object> queue = new SynchronousQueue<>();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final AtomicReference<Thread> producerThread = new
AtomicReference<>();
+ private final AtomicReference<Thread> consumerThread = new
AtomicReference<>();
private volatile T current;
+ /**
+ * Registers the given thread as the producer for this stream.
+ * Called by the async runtime immediately after the producer
+ * thread starts, <em>before</em> the generator body executes.
+ * <p>
+ * If the stream has already been {@linkplain #close() closed} by the
+ * time this method runs, the thread is immediately interrupted so
+ * that the generator body can exit promptly.
+ *
+ * @param thread the producer thread to register
+ */
+ void attachProducer(Thread thread) {
+ producerThread.set(thread);
+ if (closed.get()) {
+ thread.interrupt();
+ }
+ }
+
+ /**
+ * Unregisters the given thread as the producer for this stream.
+ * Called from a {@code finally} block after the generator body
+ * completes (normally or exceptionally).
+ *
+ * @param thread the producer thread to unregister
+ */
+ void detachProducer(Thread thread) {
+ producerThread.compareAndSet(thread, null);
+ }
+
/**
* Produces the next element. Called from the generator body when
* a {@code yield return expr} statement is executed. Blocks until
* the consumer is ready.
*/
public void yield(Object value) {
+ if (closed.get()) {
+ throw streamClosed(null);
+ }
try {
queue.put(new Item(value));
} catch (InterruptedException e) {
+ if (closed.get()) {
+ throw streamClosed(e);
+ }
Thread.currentThread().interrupt();
- CancellationException ce = new CancellationException("Interrupted
during yield");
- ce.initCause(e);
- throw ce;
+ throw interrupted("Interrupted during yield", e);
}
}
@@ -72,12 +114,17 @@ public class AsyncStreamGenerator<T> implements
AsyncStream<T> {
* If interrupted, the completion signal is delivered on a best-effort
basis.
*/
public void complete() {
+ if (closed.get()) {
+ return;
+ }
try {
queue.put(DONE);
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- // Best-effort delivery: use non-blocking offer as fallback
- queue.offer(DONE);
+ if (!closed.get()) {
+ Thread.currentThread().interrupt();
+ // Best-effort delivery: use non-blocking offer as fallback
+ queue.offer(DONE);
+ }
}
}
@@ -86,24 +133,37 @@ public class AsyncStreamGenerator<T> implements
AsyncStream<T> {
* If interrupted, the error signal is delivered on a best-effort basis.
*/
public void error(Throwable t) {
+ if (closed.get()) {
+ return;
+ }
+ ErrorItem item = new ErrorItem(t != null ? t : new
NullPointerException("null error in generator"));
try {
- queue.put(new ErrorItem(t != null ? t : new
NullPointerException("null error in generator")));
+ queue.put(item);
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- // Best-effort delivery: use non-blocking offer as fallback
- queue.offer(new ErrorItem(t != null ? t : new
NullPointerException("null error in generator")));
+ if (!closed.get()) {
+ Thread.currentThread().interrupt();
+ // Best-effort delivery: use non-blocking offer as fallback
+ queue.offer(item);
+ }
}
}
@Override
@SuppressWarnings("unchecked")
public Awaitable<Boolean> moveNext() {
+ if (closed.get()) {
+ return Awaitable.of(false);
+ }
+ Thread currentThread = Thread.currentThread();
+ consumerThread.set(currentThread);
try {
Object next = queue.take();
if (next == DONE) {
+ closed.set(true);
return Awaitable.of(false);
}
if (next instanceof ErrorItem ei) {
+ closed.set(true);
Throwable cause = ei.error;
if (cause instanceof Error err) throw err;
throw AsyncSupport.sneakyThrow(cause);
@@ -111,10 +171,13 @@ public class AsyncStreamGenerator<T> implements
AsyncStream<T> {
current = (T) ((Item) next).value;
return Awaitable.of(true);
} catch (InterruptedException e) {
+ if (closed.get()) {
+ return Awaitable.of(false);
+ }
Thread.currentThread().interrupt();
- CancellationException ce = new CancellationException("Interrupted
during moveNext");
- ce.initCause(e);
- throw ce;
+ throw interrupted("Interrupted during moveNext", e);
+ } finally {
+ consumerThread.compareAndSet(currentThread, null);
}
}
@@ -123,6 +186,35 @@ public class AsyncStreamGenerator<T> implements
AsyncStream<T> {
return current;
}
+ @Override
+ public void close() {
+ if (!closed.compareAndSet(false, true)) {
+ return;
+ }
+ Thread producer = producerThread.getAndSet(null);
+ if (producer != null && producer != Thread.currentThread()) {
+ producer.interrupt();
+ }
+ Thread consumer = consumerThread.getAndSet(null);
+ if (consumer != null && consumer != Thread.currentThread()) {
+ consumer.interrupt();
+ }
+ }
+
+ private static CancellationException interrupted(String message,
InterruptedException cause) {
+ CancellationException ce = new CancellationException(message);
+ ce.initCause(cause);
+ return ce;
+ }
+
+ private static CancellationException streamClosed(InterruptedException
cause) {
+ CancellationException ce = new CancellationException("Async stream was
closed");
+ if (cause != null) {
+ ce.initCause(cause);
+ }
+ return ce;
+ }
+
// Wrapper to handle null values in the queue
private record Item(Object value) { }
private record ErrorItem(Throwable error) { }
diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
index 8875fd52d0..633e7038d4 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.List;
+import java.util.Locale;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -44,9 +45,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* Internal runtime support for the {@code async}/{@code await} language
feature
@@ -69,11 +72,16 @@ import java.util.concurrent.TimeUnit;
* producer/consumer bridge for methods using {@code yield return}</li>
* <li><b>Async stream conversion</b> — {@code toAsyncStream()} adapts
* collections, arrays, and adapter-supported types for {@code for
await}</li>
+ * <li><b>Async stream cleanup</b> — {@code closeStream()} lets
compiler-generated
+ * {@code for await} loops release resources and propagate early-exit
+ * cancellation upstream</li>
* <li><b>Defer</b> — {@code createDeferScope()}, {@code defer()}, and
* {@code executeDeferScope()} implement Go-style deferred cleanup with
* LIFO execution and exception suppression</li>
* <li><b>Delay</b> — {@code delay()} provides non-blocking delays using a
* shared {@link java.util.concurrent.ScheduledExecutorService}</li>
+ * <li><b>Timeouts</b> — {@code timeout()} and {@code timeoutOr()} apply
+ * non-blocking deadlines while preserving the {@link Awaitable}
abstraction</li>
* </ul>
* <p>
* <b>Thread pool configuration</b>
@@ -411,6 +419,7 @@ public class AsyncSupport {
public AsyncStream<T> doCall(Object... args) {
AsyncStreamGenerator<T> gen = new AsyncStreamGenerator<>();
CompletableFuture.runAsync(() -> {
+ gen.attachProducer(Thread.currentThread());
try {
Object[] allArgs = new Object[args.length + 1];
allArgs[0] = gen;
@@ -419,6 +428,8 @@ public class AsyncSupport {
gen.complete();
} catch (Throwable t) {
gen.error(t);
+ } finally {
+ gen.detachProducer(Thread.currentThread());
}
}, defaultExecutor);
return gen;
@@ -621,6 +632,94 @@ public class AsyncSupport {
return results;
}
+ /**
+ * Adapts the given source to an {@link Awaitable} and returns a new
+ * awaitable that fails with {@link java.util.concurrent.TimeoutException}
+ * if the source does not complete before the timeout expires.
+ * <p>
+ * The timeout does <em>not</em> cancel the original source automatically.
+ * This mirrors the value-level timeout composition style used by
+ * JavaScript's race-based patterns and keeps Groovy's semantics explicit.
+ * Callers that require cooperative cancellation can still invoke
+ * {@link Awaitable#cancel()} on the original task explicitly.
+ *
+ * @param source the async source to time out
+ * @param duration the timeout duration
+ * @param unit the time unit
+ * @param <T> the result type
+ * @return a new awaitable with timeout semantics
+ * @throws IllegalArgumentException if {@code source} is {@code null},
+ * {@code duration} is negative, or {@code unit} is {@code null}
+ * @since 6.0.0
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> Awaitable<T> timeout(Object source, long duration,
TimeUnit unit) {
+ validateTimeoutArguments(source, duration, unit, "Awaitable.timeout");
+ CompletableFuture<T> sourceFuture = (CompletableFuture<T>)
toCompletableFuture(source);
+ CompletableFuture<T> result = new CompletableFuture<>();
+ TimeoutException te = new TimeoutException(
+ "Timed out after " + duration + " " +
unit.name().toLowerCase(Locale.ROOT));
+ scheduleTimeoutRace(sourceFuture, result, () ->
result.completeExceptionally(te), duration, unit);
+ return GroovyPromise.of(result);
+ }
+
+ /**
+ * Adapts the given source to an {@link Awaitable} and returns a new
+ * awaitable that yields the supplied fallback value if the timeout expires
+ * first.
+ *
+ * @param source the async source to wait for
+ * @param fallback the value to use when the timeout expires
+ * @param duration the timeout duration
+ * @param unit the time unit
+ * @param <T> the result type
+ * @return a new awaitable yielding either the source result or the
fallback
+ * @throws IllegalArgumentException if {@code source} is {@code null},
+ * {@code duration} is negative, or {@code unit} is {@code null}
+ * @since 6.0.0
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> Awaitable<T> timeoutOr(Object source, T fallback, long
duration, TimeUnit unit) {
+ validateTimeoutArguments(source, duration, unit,
"Awaitable.timeoutOr");
+ CompletableFuture<T> sourceFuture = (CompletableFuture<T>)
toCompletableFuture(source);
+ CompletableFuture<T> result = new CompletableFuture<>();
+ scheduleTimeoutRace(sourceFuture, result, () ->
result.complete(fallback), duration, unit);
+ return GroovyPromise.of(result);
+ }
+
+ /**
+ * Shared logic for {@link #timeout} and {@link #timeoutOr}: schedules the
+ * timeout action and wires up source completion to cancel the timer and
+ * propagate the result or error.
+ */
+ private static <T> void scheduleTimeoutRace(
+ CompletableFuture<T> sourceFuture,
+ CompletableFuture<T> result,
+ Runnable onTimeout,
+ long duration, TimeUnit unit) {
+ ScheduledFuture<?> timeoutTask = DELAY_SCHEDULER.schedule(onTimeout,
duration, unit);
+ sourceFuture.whenComplete((value, error) -> {
+ timeoutTask.cancel(false);
+ if (error != null) {
+ result.completeExceptionally(deepUnwrap(error));
+ } else {
+ result.complete(value);
+ }
+ });
+ }
+
+ private static void validateTimeoutArguments(Object source, long duration,
TimeUnit unit, String methodName) {
+ if (source == null) {
+ throw new IllegalArgumentException(methodName + ": source must not
be null");
+ }
+ if (unit == null) {
+ throw new IllegalArgumentException(methodName + ": TimeUnit must
not be null");
+ }
+ if (duration < 0) {
+ throw new IllegalArgumentException(methodName + ": duration must
not be negative: " + duration);
+ }
+ }
+
// ---- delay ----------------------------------------------------------
/**
@@ -695,6 +794,23 @@ public class AsyncSupport {
return AwaitableAdapterRegistry.toAsyncStream(source);
}
+ /**
+ * Closes the given async stream if it is non-null.
+ * <p>
+ * Compiler-generated {@code for await} loops call this from a synthetic
+ * {@code finally} block so that early {@code break}, {@code return}, or
+ * exceptional exit reliably releases the underlying stream resources and
+ * propagates cancellation to upstream producers where supported.
+ *
+ * @param stream the stream to close, or {@code null}
+ * @since 6.0.0
+ */
+ public static void closeStream(Object stream) {
+ if (stream instanceof AsyncStream<?> asyncStream) {
+ asyncStream.close();
+ }
+ }
+
// ---- yield return / async generator ----------------------------------
/**
@@ -743,11 +859,14 @@ public class AsyncSupport {
public static <T> AsyncStream<T> generateAsyncStream(Closure<?> body) {
AsyncStreamGenerator<T> gen = new AsyncStreamGenerator<>();
CompletableFuture.runAsync(() -> {
+ gen.attachProducer(Thread.currentThread());
try {
body.call(gen);
gen.complete();
} catch (Throwable t) {
gen.error(t);
+ } finally {
+ gen.detachProducer(Thread.currentThread());
}
}, defaultExecutor);
return gen;
diff --git a/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
b/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
index 0b3a85a03e..df22416b2d 100644
--- a/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
+++ b/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
@@ -21,6 +21,7 @@ package org.apache.groovy.runtime.async;
import groovy.concurrent.Awaitable;
import java.util.Objects;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -59,12 +60,20 @@ public class GroovyPromise<T> implements Awaitable<T> {
@Override
public T get() throws InterruptedException, ExecutionException {
- return future.get();
+ try {
+ return future.get();
+ } catch (CancellationException e) {
+ throw unwrapCancellation(e);
+ }
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
- return future.get(timeout, unit);
+ try {
+ return future.get(timeout, unit);
+ } catch (CancellationException e) {
+ throw unwrapCancellation(e);
+ }
}
@Override
@@ -123,6 +132,17 @@ public class GroovyPromise<T> implements Awaitable<T> {
return future;
}
+ /**
+ * JDK 23+ wraps a stored {@link CancellationException} in a new instance
+ * with the generic message {@code "get"} when {@link
CompletableFuture#get()}
+ * is called. Unwrap it here so Groovy users consistently observe the
+ * original cancellation message and cause chain across all supported JDKs.
+ */
+ private static CancellationException
unwrapCancellation(CancellationException exception) {
+ Throwable cause = exception.getCause();
+ return cause instanceof CancellationException ce ? ce : exception;
+ }
+
@Override
public String toString() {
if (future.isDone()) {
diff --git a/src/spec/doc/core-async-await.adoc
b/src/spec/doc/core-async-await.adoc
index 9d804f4381..a52c09e8d4 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -26,7 +26,8 @@
== Introduction
Groovy provides native `async`/`await` support as a language-level feature,
enabling developers to write
-asynchronous code in a sequential, readable style. Inspired by similar
constructs in JavaScript, C# and Go,
+asynchronous code in a sequential, readable style. Inspired by similar
constructs in JavaScript, C#, Kotlin,
+Swift, and Go,
Groovy's async/await integrates seamlessly with the JVM concurrency model
while maintaining Groovy's hallmark
conciseness and expressiveness.
@@ -350,7 +351,8 @@
include::../test/AsyncAwaitSpecTest.groovy[tags=exception_multiple_tasks,indent=
The `groovy.concurrent.Awaitable` interface provides static combinator methods
for common async
coordination patterns, analogous to JavaScript's `Promise.all()` /
`Promise.any()` /
-`Promise.allSettled()` and C#'s `Task.WhenAll()` / `Task.WhenAny()`.
+`Promise.allSettled()`, C#'s `Task.WhenAll()` / `Task.WhenAny()`, and Kotlin's
coroutine
+coordination helpers.
=== `all` — Parallel Execution
@@ -380,6 +382,18 @@ each result is wrapped in an `AwaitResult` with
`isSuccess()`, `isFailure()`, `v
include::../test/AsyncAwaitSpecTest.groovy[tags=await_all_settled,indent=0]
----
+=== Continuation Helpers
+
+Use `thenAccept`, `whenComplete`, and `handle` when you want continuation-style
+composition rather than a direct `await`. This is especially useful at API
boundaries
+where you want to attach logging, recovery, or side effects without immediately
+blocking for the result.
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=continuation_helpers,indent=0]
+----
+
=== `delay` — Non-Blocking Pause
Create an awaitable timer (analogous to JavaScript's `setTimeout` wrapped in a
`Promise`):
@@ -389,6 +403,18 @@ Create an awaitable timer (analogous to JavaScript's
`setTimeout` wrapped in a `
include::../test/AsyncAwaitSpecTest.groovy[tags=delay_example,indent=0]
----
+=== `timeout` / `timeoutOr` — Non-Blocking Deadlines
+
+Apply a deadline without blocking the calling thread. `Awaitable.timeout(...)`
+fails with `TimeoutException`, while `Awaitable.timeoutOr(...)` completes with
a
+fallback value. The instance forms `orTimeout(...)` and
`completeOnTimeout(...)`
+provide the same capability on an existing awaitable.
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=timeout_combinators,indent=0]
+----
+
[[flow-publisher]]
== `Flow.Publisher` Integration
@@ -411,6 +437,15 @@
include::../test/AsyncAwaitSpecTest.groovy[tags=flow_publisher_await,indent=0]
include::../test/AsyncAwaitSpecTest.groovy[tags=flow_publisher_for_await,indent=0]
----
+[NOTE]
+====
+`for await` closes the adapted stream from a compiler-generated `finally`
block.
+For `Flow.Publisher` sources, this means an early `break`, `return`, or
exception
+cancels the upstream subscription rather than leaving it running in the
background.
+This is conceptually similar to JavaScript async iterators' `return()`, C#'s
+`await foreach` disposal, and Swift's `AsyncSequence` early-exit cleanup.
+====
+
[[adapter-registry]]
== Adapter Registry
@@ -507,7 +542,8 @@
include::../test/AsyncAwaitSpecTest.groovy[tags=retry_pattern,indent=0]
=== Timeout
-Use `Awaitable.any()` to race an operation against a delay, implementing a
timeout:
+Use `Awaitable.timeout()` (or the instance form `orTimeout()`) to apply a
+deadline without dropping back to manual race logic:
[source,groovy]
----
@@ -610,9 +646,11 @@ This section provides architectural context for Groovy
language maintainers and
[[implementation-strategy]]
=== Thread-Based vs State Machine
-Languages like C# transform async methods into **state machine** classes at
-compile time. Each `await` point becomes a numbered state, and the compiler
rewrites the
-method body so it can be suspended and resumed without blocking an OS thread.
+Languages like C#, Kotlin, and Swift transform async workflows into
compiler-managed
+state machines or coroutine frames. Each suspension point becomes resumable
state,
+allowing the runtime to suspend the logical task without blocking an OS thread.
+JavaScript takes a different route again, relying on an event loop plus promise
+microtasks.
Groovy takes a different approach: each `async` method body runs on its **own
thread**
(a virtual thread on JDK 21+, or a pooled platform thread on JDK 17–20). The
`await`
@@ -650,7 +688,8 @@ consistent code generation for both the keyword and
annotation forms.
**Runtime layer** (`AsyncSupport`, `GroovyPromise`, `AsyncStreamGenerator`,
`AwaitableAdapterRegistry`): `AsyncSupport` is the central runtime class
containing static
-methods for `await`, `async`, `defer`, `yield return`, and combinators.
`GroovyPromise` wraps
+methods for `await`, `async`, `defer`, `yield return`, timeout composition,
stream cleanup,
+and combinators. `GroovyPromise` wraps
`CompletableFuture` to implement the `Awaitable` interface, decoupling the
public API from the
JDK implementation. `AsyncStreamGenerator` implements the producer/consumer
pattern for async
generators using a `SynchronousQueue`. `AwaitableAdapterRegistry` provides the
SPI extension
@@ -665,7 +704,9 @@ All runtime components employ lock-free or
minimal-contention synchronization:
* `AwaitableAdapterRegistry.ADAPTERS` — `CopyOnWriteArrayList` for lock-free
iteration during adapter lookup
* `AwaitableAdapterRegistry.blockingExecutor` — `volatile` field
* `AsyncStreamGenerator.current` — `volatile` field for producer/consumer
visibility
-* `Flow.Publisher` adaptation — `AtomicReference<Subscription>` for
thread-safe subscription management
+* `AsyncStreamGenerator` close state — `AtomicBoolean` +
`AtomicReference<Thread>` for prompt,
+ idempotent close/cancellation signalling
+* `Flow.Publisher` adaptation — `AtomicReference<Subscription>` plus
close-aware queue signalling
* Defer scopes — per-task `ArrayDeque`, no cross-thread sharing
* `DELAY_SCHEDULER` — single daemon thread for non-blocking timer operations
@@ -673,101 +714,144 @@ All runtime components employ lock-free or
minimal-contention synchronization:
== Cross-Language Comparison
The following table compares Groovy's async/await with the corresponding
features in
-JavaScript and C#, for developers familiar with those languages.
+JavaScript, C#, Kotlin, and Swift, for developers familiar with those
languages.
-[cols="3,4,4,4"]
+[cols="2,3,3,3,3,3"]
|===
-| Feature | Groovy | JavaScript | C#
+| Feature | Groovy | JavaScript | C# | Kotlin | Swift
| **Async declaration**
| `async def foo() { }`
| `async function foo() { }`
| `async Task<T> Foo() { }`
+| `suspend fun foo(): T`
+| `func foo() async throws -> T`
| **Await expression**
| `await expr` or `await(expr)`
| `await expr`
| `await expr`
+| `deferred.await()` / suspend call
+| `try await expr`
| **Return type**
| `Awaitable<T>`
| `Promise<T>`
| `Task<T>` / `ValueTask<T>`
+| `Deferred<T>` / suspended `T`
+| `Task<T, Error>` / `async` function returns `T`
| **Async closure/lambda**
| `async { -> body }`
| `async () => body`
| `async () => body`
+| `scope.async { body }`
+| `Task { body }`
| **Async iteration**
| `for await (x in src) { }`
| `for await (x of src) { }`
| `await foreach (x in src) { }`
+| _(manual via `Flow`, `Channel`, or `Flow.collect`)_
+| `for try await x in seq`
| **Async generator**
| `yield return expr`
| `yield expr` (in `async function*`)
| `yield return expr` (in `IAsyncEnumerable<T>`)
+| `flow { emit(x) }` / channel producers
+| `AsyncStream { continuation.yield(x) }`
| **Deferred cleanup**
| `defer { cleanup }`
| _(none; use `try`/`finally`)_
-| _(none; use `try`/`finally` or `IAsyncDisposable`)_
+| `await using` / `try`/`finally`
+| `try`/`finally` / `use`
+| `defer`
| **Wait all**
| `Awaitable.all(a, b, c)`
| `Promise.all([a, b, c])`
| `Task.WhenAll(a, b, c)`
+| `awaitAll(a, b, c)`
+| `async let` / `withTaskGroup`
| **Wait any**
| `Awaitable.any(a, b, c)`
| `Promise.any([a, b, c])`
| `Task.WhenAny(a, b, c)`
+| `select {}` / manual race
+| `TaskGroup.next()`
| **Wait all settled**
| `Awaitable.allSettled(a, b, c)`
| `Promise.allSettled([a, b, c])`
| _(manual; no built-in equivalent)_
+| _(manual; typically `runCatching`)_
+| _(manual; typically `Result`)_
| **Pre-computed result**
| `Awaitable.of(value)`
| `Promise.resolve(value)`
| `Task.FromResult(value)`
+| `CompletableDeferred(value)` / `async { value }`
+| `Task { value }`
| **Pre-computed error**
| `Awaitable.failed(error)`
| `Promise.reject(error)`
| `Task.FromException(ex)`
+| `CompletableDeferred<T>().completeExceptionally(ex)`
+| `Task { throw error }`
| **Delay**
| `Awaitable.delay(ms)`
| `new Promise(r => setTimeout(r, ms))`
| `Task.Delay(ms)`
+| `delay(ms)`
+| `Task.sleep(for:)`
+
+| **Timeout**
+| `Awaitable.timeout(task, ms)` / `task.orTimeout(ms)`
+| `Promise.race([...])` / `AbortSignal.timeout()`
+| `task.WaitAsync(timeout)` / `CancelAfter`
+| `withTimeout(ms) { ... }`
+| _(manual race with `TaskGroup` / sleep)_
| **Cancellation**
| `awaitable.cancel()`
| `AbortController.abort()`
| `CancellationToken`
+| `Job.cancel()`
+| `Task.cancel()`
| **Executor/scheduler**
| `Awaitable.setExecutor(exec)`
| _(event loop; not configurable)_
| `TaskScheduler`
+| `CoroutineDispatcher`
+| task priority / executor chosen by runtime
| **Third-party support**
| `AwaitableAdapterRegistry` SPI
| _(native `thenable` protocol)_
| Custom awaiters via `GetAwaiter()`
+| coroutine adapters / bridges
+| `AsyncSequence` / `Task` interop patterns
| **Annotation form**
| `@Async`
| _(none)_
| _(none; attribute-driven async not supported)_
+| _(none)_
+| _(none)_
| **Implementation**
| Thread-per-task (virtual threads on JDK 21+)
| Event loop + microtask queue
| State machine (compiled IL transformation)
+| Coroutine state machine
+| Compiler-generated async state machine
|===
[[summary]]
diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy
b/src/spec/test/AsyncAwaitSpecTest.groovy
index 4b03993f85..66f5735d06 100644
--- a/src/spec/test/AsyncAwaitSpecTest.groovy
+++ b/src/spec/test/AsyncAwaitSpecTest.groovy
@@ -638,6 +638,56 @@ assert await(delayedGreeting()) == "Hello after delay"
'''
}
+ @Test
+ void testContinuationHelpers() {
+ assertScript '''
+// tag::continuation_helpers[]
+import groovy.concurrent.Awaitable
+import java.util.concurrent.atomic.AtomicReference
+
+def observed = new AtomicReference()
+await(Awaitable.of("groovy").thenAccept { observed.set(it.toUpperCase()) })
+assert observed.get() == "GROOVY"
+
+def recovered = await(
+ Awaitable.failed(new IOException("boom"))
+ .handle { value, error -> "recovered from ${error.message}" }
+)
+assert recovered == "recovered from boom"
+
+def completion = new AtomicReference()
+assert await(Awaitable.of(7).whenComplete { value, error ->
+ completion.set("${value}:${error}")
+}) == 7
+assert completion.get() == "7:null"
+// end::continuation_helpers[]
+ '''
+ }
+
+ @Test
+ void testTimeoutCombinators() {
+ assertScript '''
+// tag::timeout_combinators[]
+import groovy.concurrent.Awaitable
+import java.util.concurrent.TimeoutException
+
+async slowCall() {
+ await(Awaitable.delay(5_000))
+ return "done"
+}
+
+try {
+ await(Awaitable.timeout(slowCall(), 50))
+ assert false : "should have timed out"
+} catch (TimeoutException e) {
+ assert e.message.contains("Timed out after 50")
+}
+
+assert await(Awaitable.timeoutOr(slowCall(), "cached", 50)) == "cached"
+// end::timeout_combinators[]
+ '''
+ }
+
//
=========================================================================
// 9. Flow.Publisher integration
//
=========================================================================
@@ -823,16 +873,11 @@ async longRunningTask() {
return "completed"
}
-async withTimeout(Awaitable task, long timeoutMs) {
- def timeout = Awaitable.delay(timeoutMs).then { throw new
TimeoutException("timed out") }
- return await Awaitable.any(task, timeout)
-}
-
try {
- await withTimeout(longRunningTask(), 50)
+ await Awaitable.timeout(longRunningTask(), 50)
assert false : "should have timed out"
} catch (TimeoutException e) {
- assert e.message == "timed out"
+ assert e.message.contains("Timed out after 50")
}
// end::timeout_pattern[]
'''
diff --git
a/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
index acbbce06ed..c2d30ba7f3 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
@@ -672,6 +672,87 @@ class AsyncAwaitSyntaxTest {
'''
}
+ @Test
+ void testForAwaitClosesCustomStreamOnEarlyReturn() {
+ assertScript '''
+ import groovy.concurrent.AsyncStream
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.atomic.AtomicInteger
+
+ class TrackingStream implements AsyncStream<Integer> {
+ private final List<Integer> values = [10, 20, 30]
+ private int index = 0
+ private Integer current
+ final AtomicInteger closeCount = new AtomicInteger()
+
+ Awaitable<Boolean> moveNext() {
+ if (index >= values.size()) return Awaitable.of(false)
+ current = values[index++]
+ return Awaitable.of(true)
+ }
+
+ Integer getCurrent() { current }
+
+ void close() { closeCount.incrementAndGet() }
+ }
+
+ class Svc {
+ async first(TrackingStream stream) {
+ for await (item in stream) {
+ return item
+ }
+ return -1
+ }
+ }
+
+ def stream = new TrackingStream()
+ assert new Svc().first(stream).get() == 10
+ assert stream.closeCount.get() == 1
+ '''
+ }
+
+ @Test
+ void testForAwaitCancelsPublisherOnEarlyReturn() {
+ assertScript '''
+ import java.util.concurrent.Flow
+ import java.util.concurrent.atomic.AtomicBoolean
+
+ class TestPublisher implements Flow.Publisher<Integer> {
+ final AtomicBoolean cancelled = new AtomicBoolean(false)
+
+ void subscribe(Flow.Subscriber<? super Integer> subscriber) {
+ subscriber.onSubscribe(new Flow.Subscription() {
+ boolean emitted
+
+ void request(long n) {
+ if (!emitted) {
+ emitted = true
+ subscriber.onNext(42)
+ }
+ }
+
+ void cancel() {
+ cancelled.set(true)
+ }
+ })
+ }
+ }
+
+ class Svc {
+ async first(TestPublisher publisher) {
+ for await (item in publisher) {
+ return item
+ }
+ return -1
+ }
+ }
+
+ def publisher = new TestPublisher()
+ assert new Svc().first(publisher).get() == 42
+ assert publisher.cancelled.get()
+ '''
+ }
+
@Test
void testForAwaitWithContinue() {
assertScript '''
diff --git
a/src/test/groovy/org/codehaus/groovy/transform/AsyncCoverageTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncCoverageTest.groovy
index 2802193bf4..21e5ec380e 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncCoverageTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncCoverageTest.groovy
@@ -39,6 +39,7 @@ import java.util.concurrent.Flow
import java.util.concurrent.FutureTask
import java.util.concurrent.SubmissionPublisher
import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
import static groovy.test.GroovyAssert.assertScript
@@ -109,6 +110,100 @@ class AsyncCoverageTest {
'''
}
+ @Test
+ void testThenAcceptRunsSideEffect() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.atomic.AtomicInteger
+
+ def seen = new AtomicInteger()
+ await(Awaitable.of(21).thenAccept { seen.set(it * 2) })
+ assert seen.get() == 42
+ '''
+ }
+
+ @Test
+ void testWhenCompleteSeesUnwrappedFailure() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.atomic.AtomicReference
+
+ def successful = new AtomicReference()
+ assert await(Awaitable.of("ok").whenComplete { value, error ->
+ successful.set([value, error])
+ }) == "ok"
+ assert successful.get()[0] == "ok"
+ assert successful.get()[1] == null
+
+ def failed = Awaitable.failed(new
IOException("boom")).whenComplete { value, error ->
+ assert value == null
+ assert error instanceof IOException
+ assert error.message == "boom"
+ }
+
+ try {
+ await(failed)
+ assert false : "Should have thrown"
+ } catch (IOException e) {
+ assert e.message == "boom"
+ }
+ '''
+ }
+
+ @Test
+ void testHandleProjectsSuccessAndFailure() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+
+ def ok = Awaitable.of(5).handle { value, error ->
+ assert error == null
+ value * 2
+ }
+ assert await(ok) == 10
+
+ def recovered = Awaitable.failed(new IOException("boom")).handle {
value, error ->
+ assert value == null
+ "fallback: ${error.message}"
+ }
+ assert await(recovered) == "fallback: boom"
+ '''
+ }
+
+ @Test
+ void testTimeoutCombinators() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CompletableFuture
+ import java.util.concurrent.TimeUnit
+ import java.util.concurrent.TimeoutException
+
+ assert await(Awaitable.of("fast").orTimeout(1, TimeUnit.SECONDS))
== "fast"
+
+ try {
+ await(Awaitable.delay(10_000).then { "late" }.orTimeout(50,
TimeUnit.MILLISECONDS))
+ assert false : "Should have timed out"
+ } catch (TimeoutException e) {
+ assert e.message.contains("Timed out after 50")
+ }
+
+ try {
+ await(Awaitable.timeout(new CompletableFuture<String>(), 50,
TimeUnit.MILLISECONDS))
+ assert false : "Should have timed out"
+ } catch (TimeoutException e) {
+ assert e.message.contains("Timed out after 50")
+ }
+
+ def fallback = await(Awaitable.timeoutOr(
+ Awaitable.delay(10_000).then { "late" },
+ "cached",
+ 50,
+ TimeUnit.MILLISECONDS
+ ))
+ assert fallback == "cached"
+ assert await(Awaitable.of("value").completeOnTimeout("fallback",
1, TimeUnit.SECONDS)) == "value"
+ '''
+ }
+
// ================================================================
// Awaitable instance methods: isDone, cancel, isCancelled,
// isCompletedExceptionally, toCompletableFuture, get(timeout)
@@ -201,6 +296,27 @@ class AsyncCoverageTest {
'''
}
+ @Test
+ void testCancellationMessageIsStableAcrossJdks() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CancellationException
+
+ def original = new CancellationException("cancelled-by-user")
+ original.initCause(new InterruptedException("interrupt-cause"))
+
+ def failed = Awaitable.failed(original)
+ try {
+ failed.get()
+ assert false : "Should have thrown"
+ } catch (CancellationException e) {
+ assert e.message == "cancelled-by-user"
+ assert e.cause instanceof InterruptedException
+ assert e.cause.message == "interrupt-cause"
+ }
+ '''
+ }
+
// ================================================================
// Awaitable static: delay(long, TimeUnit), getExecutor,
// isVirtualThreadsAvailable
@@ -242,6 +358,29 @@ class AsyncCoverageTest {
'''
}
+ @Test
+ void testCloseStreamHelperClosesStream() {
+ AtomicBoolean closed = new AtomicBoolean(false)
+ AsyncSupport.closeStream(new AsyncStream<String>() {
+ @Override
+ Awaitable<Boolean> moveNext() {
+ Awaitable.of(false)
+ }
+
+ @Override
+ String getCurrent() {
+ null
+ }
+
+ @Override
+ void close() {
+ closed.set(true)
+ }
+ })
+ assert closed.get()
+ AsyncSupport.closeStream(null)
+ }
+
@Test
void testSetExecutorAndRestore() {
assertScript '''