This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
     new c69441c4b5 Minor tweaks
c69441c4b5 is described below

commit c69441c4b5fec2e025e7945ccd9b5362d14889a6
Author: Daniel Sun <[email protected]>
AuthorDate: Sun Mar 8 00:20:20 2026 +0900

    Minor tweaks
---
 src/main/java/groovy/concurrent/AsyncStream.java   |  18 +-
 src/main/java/groovy/concurrent/Awaitable.java     | 194 +++++++++++++++++++++
 .../concurrent/AwaitableAdapterRegistry.java       |  48 ++++-
 .../apache/groovy/parser/antlr4/AstBuilder.java    |   4 +-
 .../groovy/runtime/async/AsyncStreamGenerator.java | 118 +++++++++++--
 .../apache/groovy/runtime/async/AsyncSupport.java  | 119 +++++++++++++
 .../apache/groovy/runtime/async/GroovyPromise.java |  24 ++-
 src/spec/doc/core-async-await.adoc                 | 108 ++++++++++--
 src/spec/test/AsyncAwaitSpecTest.groovy            |  59 ++++++-
 .../groovy/transform/AsyncAwaitSyntaxTest.groovy   |  81 +++++++++
 .../groovy/transform/AsyncCoverageTest.groovy      | 139 +++++++++++++++
 11 files changed, 870 insertions(+), 42 deletions(-)

diff --git a/src/main/java/groovy/concurrent/AsyncStream.java 
b/src/main/java/groovy/concurrent/AsyncStream.java
index d6acfc5beb..f9c0ca89c9 100644
--- a/src/main/java/groovy/concurrent/AsyncStream.java
+++ b/src/main/java/groovy/concurrent/AsyncStream.java
@@ -44,7 +44,7 @@ package groovy.concurrent;
  * @see AwaitableAdapterRegistry
  * @since 6.0.0
  */
-public interface AsyncStream<T> {
+public interface AsyncStream<T> extends AutoCloseable {
 
     /**
      * Asynchronously advances to the next element. Returns an {@link 
Awaitable}
@@ -59,6 +59,22 @@ public interface AsyncStream<T> {
      */
     T getCurrent();
 
+    /**
+     * Closes the stream and releases any associated resources.
+     * <p>
+     * The default implementation is a no-op.  Implementations that bridge to
+     * generators, publishers, or other resource-owning sources may override
+     * this to propagate cancellation upstream.  Compiler-generated
+     * {@code for await} loops invoke {@code close()} automatically from a
+     * {@code finally} block, including on early {@code break}, {@code return},
+     * and exceptional exit.
+     *
+     * @since 6.0.0
+     */
+    @Override
+    default void close() {
+    }
+
     /**
      * Returns an empty {@code AsyncStream} that completes immediately.
      */
diff --git a/src/main/java/groovy/concurrent/Awaitable.java 
b/src/main/java/groovy/concurrent/Awaitable.java
index 7cb541f7b2..4640bebcef 100644
--- a/src/main/java/groovy/concurrent/Awaitable.java
+++ b/src/main/java/groovy/concurrent/Awaitable.java
@@ -28,6 +28,9 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**
@@ -51,6 +54,10 @@ import java.util.function.Function;
  *       {@code Promise.allSettled()}</li>
  *   <li>{@link #delay(long) Awaitable.delay(ms)} — like
  *       {@code Task.Delay()} / {@code setTimeout}</li>
+ *   <li>{@link #timeout(Object, long) Awaitable.timeout(task, ms)} — like
+ *       Kotlin's {@code withTimeout} or a JavaScript promise raced against a 
timer</li>
+ *   <li>{@link #timeoutOr(Object, Object, long) Awaitable.timeoutOr(task, 
fallback, ms)} —
+ *       like a timeout with fallback/default value</li>
  * </ul>
  * <p>
  * <b>Static factories:</b>
@@ -61,6 +68,17 @@ import java.util.function.Function;
  *       {@code Task.FromException()} / {@code Promise.reject()}</li>
  * </ul>
  * <p>
+ * <b>Instance continuations</b> provide ergonomic composition without exposing
+ * raw {@link CompletableFuture} APIs:
+ * <ul>
+ *   <li>{@link #then(Function)} and {@link #thenCompose(Function)} for 
success chaining</li>
+ *   <li>{@link #thenAccept(Consumer)} for side-effecting continuations</li>
+ *   <li>{@link #exceptionally(Function)}, {@link #whenComplete(BiConsumer)},
+ *       and {@link #handle(BiFunction)} for failure/completion handling</li>
+ *   <li>{@link #orTimeout(long, TimeUnit)} and
+ *       {@link #completeOnTimeout(Object, long, TimeUnit)} for deadline 
composition</li>
+ * </ul>
+ * <p>
  * Third-party frameworks (RxJava, Reactor, etc.) can integrate by registering
  * an {@link AwaitableAdapter} via {@link AwaitableAdapterRegistry}.
  * <p>
@@ -121,12 +139,124 @@ public interface Awaitable<T> {
      */
     <U> Awaitable<U> thenCompose(Function<? super T, ? extends Awaitable<U>> 
fn);
 
+    /**
+     * Returns a new {@code Awaitable} that, when this one completes normally,
+     * invokes the given consumer and completes with {@code null}.
+     * <p>
+     * This is analogous to JavaScript's {@code promise.then(v =&gt; 
sideEffect(v))}
+     * when the result value is no longer needed, and to the consumer-oriented
+     * continuations commonly used with C#, Kotlin, and Swift async APIs.
+     *
+     * @param action the side-effecting consumer to invoke on success
+     * @return a new awaitable that completes after the action runs
+     * @since 6.0.0
+     */
+    default Awaitable<Void> thenAccept(Consumer<? super T> action) {
+        return GroovyPromise.of(toCompletableFuture().thenAccept(action));
+    }
+
     /**
      * Returns a new {@code Awaitable} that, if this one completes 
exceptionally,
      * applies the given function to the exception to produce a recovery value.
      */
     Awaitable<T> exceptionally(Function<Throwable, ? extends T> fn);
 
+    /**
+     * Returns a new {@code Awaitable} that invokes the given action when this
+     * computation completes, regardless of success or failure.
+     * <p>
+     * The supplied throwable is transparently unwrapped so handlers see the
+     * original failure rather than {@link ExecutionException} /
+     * {@link java.util.concurrent.CompletionException} wrappers.
+     * This is analogous to JavaScript's {@code Promise.prototype.finally()} 
and
+     * to completion callbacks in C#, Kotlin, and Swift.
+     *
+     * @param action the completion callback receiving the result or failure
+     * @return a new awaitable that completes with the original result
+     * @since 6.0.0
+     */
+    default Awaitable<T> whenComplete(BiConsumer<? super T, ? super Throwable> 
action) {
+        return GroovyPromise.of(toCompletableFuture().whenComplete((value, 
error) ->
+                action.accept(value, error == null ? null : 
AsyncSupport.deepUnwrap(error))));
+    }
+
+    /**
+     * Returns a new {@code Awaitable} that handles both the successful and the
+     * exceptional completion paths in a single continuation.
+     * <p>
+     * The supplied throwable is transparently unwrapped so the handler sees 
the
+     * original failure.  This mirrors Java's {@code 
CompletableFuture.handle()},
+     * while providing the same “single place for success/failure projection”
+     * convenience commonly used in C#, Kotlin, and Swift async code.
+     *
+     * @param fn the handler receiving either the value or the failure
+     * @param <U> the projected result type
+     * @return a new awaitable holding the handler's result
+     * @since 6.0.0
+     */
+    default <U> Awaitable<U> handle(BiFunction<? super T, Throwable, ? extends 
U> fn) {
+        return GroovyPromise.of(toCompletableFuture().handle((value, error) ->
+                fn.apply(value, error == null ? null : 
AsyncSupport.deepUnwrap(error))));
+    }
+
+    /**
+     * Returns a new {@code Awaitable} that fails with {@link TimeoutException}
+     * if this computation does not complete within the specified duration.
+     * <p>
+     * Unlike {@link #get(long, TimeUnit)}, this is a non-blocking, composable
+     * timeout combinator: it returns another {@code Awaitable} that can itself
+     * be awaited, chained, or passed to {@link #all(Object...)} / {@link 
#any(Object...)}.
+     * This plays a role similar to Kotlin's {@code withTimeout} while
+     * preserving Groovy's awaitable abstraction.
+     *
+     * @param duration the timeout duration in milliseconds
+     * @return a new awaitable with timeout semantics
+     * @since 6.0.0
+     */
+    default Awaitable<T> orTimeout(long duration) {
+        return Awaitable.timeout(this, duration, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Returns a new {@code Awaitable} that fails with {@link TimeoutException}
+     * if this computation does not complete within the specified duration.
+     *
+     * @param duration the timeout duration
+     * @param unit the time unit
+     * @return a new awaitable with timeout semantics
+     * @since 6.0.0
+     */
+    default Awaitable<T> orTimeout(long duration, TimeUnit unit) {
+        return Awaitable.timeout(this, duration, unit);
+    }
+
+    /**
+     * Returns a new {@code Awaitable} that completes with the supplied 
fallback
+     * value if this computation does not finish before the timeout expires.
+     *
+     * @param fallback the value to use when the timeout expires
+     * @param duration the timeout duration in milliseconds
+     * @return a new awaitable that yields either the original result or the 
fallback
+     * @since 6.0.0
+     */
+    default Awaitable<T> completeOnTimeout(T fallback, long duration) {
+        return Awaitable.timeoutOr(this, fallback, duration, 
TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Returns a new {@code Awaitable} that completes with the supplied 
fallback
+     * value if this computation does not finish before the timeout expires.
+     *
+     * @param fallback the value to use when the timeout expires
+     * @param duration the timeout duration
+     * @param unit the time unit
+     * @return a new awaitable that yields either the original result or the 
fallback
+     * @since 6.0.0
+     */
+    default Awaitable<T> completeOnTimeout(T fallback, long duration, TimeUnit 
unit) {
+        return Awaitable.timeoutOr(this, fallback, duration, unit);
+    }
+
     /**
      * Converts this {@code Awaitable} to a JDK {@link CompletableFuture}
      * for interoperability with APIs that require it.
@@ -236,6 +366,70 @@ public interface Awaitable<T> {
         return AsyncSupport.delay(duration, unit);
     }
 
+    /**
+     * Adapts the given source to an {@code Awaitable} and applies a 
non-blocking
+     * timeout to it.
+     * <p>
+     * The source may be a Groovy {@link Awaitable}, a JDK
+     * {@link CompletableFuture}/{@link java.util.concurrent.CompletionStage},
+     * or any type supported by {@link AwaitableAdapterRegistry}.  This 
provides
+     * a concise timeout combinator analogous to Kotlin's {@code withTimeout},
+     * but as a value-level operation that returns another awaitable.
+     *
+     * @param source the async source to time out
+     * @param duration the timeout duration in milliseconds
+     * @return a new awaitable that fails with {@link TimeoutException} on 
timeout
+     * @since 6.0.0
+     */
+    static <T> Awaitable<T> timeout(Object source, long duration) {
+        return AsyncSupport.timeout(source, duration, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Adapts the given source to an {@code Awaitable} and applies a 
non-blocking
+     * timeout to it.
+     *
+     * @param source the async source to time out
+     * @param duration the timeout duration
+     * @param unit the time unit
+     * @return a new awaitable that fails with {@link TimeoutException} on 
timeout
+     * @since 6.0.0
+     */
+    static <T> Awaitable<T> timeout(Object source, long duration, TimeUnit 
unit) {
+        return AsyncSupport.timeout(source, duration, unit);
+    }
+
+    /**
+     * Adapts the given source to an {@code Awaitable} and returns a new
+     * awaitable that yields the supplied fallback value if the timeout expires
+     * first.
+     *
+     * @param source the async source to wait for
+     * @param fallback the fallback value to use on timeout
+     * @param duration the timeout duration in milliseconds
+     * @return a new awaitable yielding either the original result or the 
fallback
+     * @since 6.0.0
+     */
+    static <T> Awaitable<T> timeoutOr(Object source, T fallback, long 
duration) {
+        return AsyncSupport.timeoutOr(source, fallback, duration, 
TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Adapts the given source to an {@code Awaitable} and returns a new
+     * awaitable that yields the supplied fallback value if the timeout expires
+     * first.
+     *
+     * @param source the async source to wait for
+     * @param fallback the fallback value to use on timeout
+     * @param duration the timeout duration
+     * @param unit the time unit
+     * @return a new awaitable yielding either the original result or the 
fallback
+     * @since 6.0.0
+     */
+    static <T> Awaitable<T> timeoutOr(Object source, T fallback, long 
duration, TimeUnit unit) {
+        return AsyncSupport.timeoutOr(source, fallback, duration, unit);
+    }
+
     // ---- Executor configuration ----
 
     /**
diff --git a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java 
b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
index 174986e017..291505974a 100644
--- a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
+++ b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.Flow;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -45,7 +46,8 @@ import java.util.concurrent.atomic.AtomicReference;
  *   <li>{@link Future} (adapted via a blocking wrapper)</li>
  *   <li>JDK {@link Flow.Publisher} — single-value
  *       ({@link #toAwaitable}) and multi-value ({@link #toAsyncStream})
- *       with backpressure support</li>
+ *       with backpressure support and upstream cancellation when the
+ *       resulting {@link AsyncStream} is {@linkplain AsyncStream#close() 
closed}</li>
  * </ul>
  * <p>
  * Additional adapters can be registered at runtime via {@link #register}.
@@ -283,41 +285,58 @@ public class AwaitableAdapterRegistry {
         private static <T> AsyncStream<T> 
publisherToAsyncStream(Flow.Publisher<T> publisher) {
             BlockingQueue<Object> queue = new LinkedBlockingQueue<>(256);
             AtomicReference<Flow.Subscription> subRef = new 
AtomicReference<>();
+            AtomicBoolean closedRef = new AtomicBoolean(false);
 
             publisher.subscribe(new Flow.Subscriber<T>() {
                 @Override
                 public void onSubscribe(Flow.Subscription s) {
-                    subRef.set(s);
-                    s.request(1);
+                    if (!closedRef.get()) {
+                        subRef.set(s);
+                        s.request(1);
+                    } else {
+                        s.cancel();
+                    }
                 }
 
                 @Override
                 public void onNext(T item) {
-                    queue.add(new ValueSignal<>(item));
+                    if (!closedRef.get()) {
+                        queue.offer(new ValueSignal<>(item));
+                    }
                 }
 
                 @Override
                 public void onError(Throwable t) {
-                    queue.add(new ErrorSignal(t));
+                    if (!closedRef.get()) {
+                        queue.offer(new ErrorSignal(t));
+                    }
                 }
 
                 @Override
                 public void onComplete() {
-                    queue.add(COMPLETE_SENTINEL);
+                    if (!closedRef.get()) {
+                        queue.offer(COMPLETE_SENTINEL);
+                    }
                 }
             });
 
             return new AsyncStream<T>() {
                 private volatile T current;
+                private final AtomicBoolean streamClosed = new 
AtomicBoolean(false);
 
                 @Override
                 public Awaitable<Boolean> moveNext() {
+                    if (streamClosed.get()) {
+                        return Awaitable.of(false);
+                    }
                     CompletableFuture<Boolean> cf = new CompletableFuture<>();
                     try {
                         Object signal = queue.take();
                         if (signal == COMPLETE_SENTINEL) {
+                            streamClosed.set(true);
                             cf.complete(false);
                         } else if (signal instanceof ErrorSignal es) {
+                            streamClosed.set(true);
                             cf.completeExceptionally(es.error());
                         } else if (signal instanceof ValueSignal<?> vs) {
                             current = (T) vs.value();
@@ -326,6 +345,9 @@ public class AwaitableAdapterRegistry {
                             if (sub != null) sub.request(1);
                         }
                     } catch (InterruptedException e) {
+                        if (streamClosed.get()) {
+                            return Awaitable.of(false);
+                        }
                         // Throw directly instead of storing in the 
CompletableFuture.
                         // On JDK 23+, CF.get() wraps stored 
CancellationExceptions in a
                         // new CancellationException("get"), discarding our 
message and
@@ -344,6 +366,20 @@ public class AwaitableAdapterRegistry {
                 public T getCurrent() {
                     return current;
                 }
+
+                @Override
+                public void close() {
+                    if (!streamClosed.compareAndSet(false, true)) {
+                        return;
+                    }
+                    closedRef.set(true);
+                    Flow.Subscription subscription = subRef.getAndSet(null);
+                    if (subscription != null) {
+                        subscription.cancel();
+                    }
+                    queue.clear();
+                    queue.offer(COMPLETE_SENTINEL);
+                }
             };
         }
 
diff --git a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java 
b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
index 01e37f88a8..3f221e35d8 100644
--- a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
+++ b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java
@@ -525,8 +525,10 @@ public class AstBuilder extends 
GroovyParserBaseVisitor<Object> {
 
         BlockStatement whileBody = block(getItemStmt, loopBody);
         WhileStatement whileStmt = new WhileStatement(condition, whileBody);
+        Expression closeCall = 
AsyncTransformHelper.buildCloseStreamCall(varX(streamVar));
+        TryCatchStatement tryFinally = new TryCatchStatement(whileStmt, 
block(new ExpressionStatement(closeCall)));
 
-        return configureAST(block(streamDecl, whileStmt), ctx);
+        return configureAST(block(streamDecl, tryFinally), ctx);
     }
 
     @Override
diff --git 
a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java 
b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
index 39c7a5d251..7893556035 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
@@ -23,6 +23,8 @@ import groovy.concurrent.Awaitable;
 
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A producer/consumer implementation of {@link AsyncStream} used by
@@ -38,6 +40,10 @@ import java.util.concurrent.SynchronousQueue;
  * consumer has consumed the previous element (mirroring C#'s async
  * iterator suspension semantics).
  * <p>
+ * Calling {@link #close()} marks the stream as closed and interrupts any
+ * currently blocked producer/consumer threads so that early termination from
+ * {@code for await} can cancel the generator promptly.
+ * <p>
  * This class is an internal implementation detail and should not be referenced
  * directly by user code.
  *
@@ -49,21 +55,57 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
     private static final Object DONE = new Object();
 
     private final SynchronousQueue<Object> queue = new SynchronousQueue<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final AtomicReference<Thread> producerThread = new 
AtomicReference<>();
+    private final AtomicReference<Thread> consumerThread = new 
AtomicReference<>();
     private volatile T current;
 
+    /**
+     * Registers the given thread as the producer for this stream.
+     * Called by the async runtime immediately after the producer
+     * thread starts, <em>before</em> the generator body executes.
+     * <p>
+     * If the stream has already been {@linkplain #close() closed} by the
+     * time this method runs, the thread is immediately interrupted so
+     * that the generator body can exit promptly.
+     *
+     * @param thread the producer thread to register
+     */
+    void attachProducer(Thread thread) {
+        producerThread.set(thread);
+        if (closed.get()) {
+            thread.interrupt();
+        }
+    }
+
+    /**
+     * Unregisters the given thread as the producer for this stream.
+     * Called from a {@code finally} block after the generator body
+     * completes (normally or exceptionally).
+     *
+     * @param thread the producer thread to unregister
+     */
+    void detachProducer(Thread thread) {
+        producerThread.compareAndSet(thread, null);
+    }
+
     /**
      * Produces the next element. Called from the generator body when
      * a {@code yield return expr} statement is executed. Blocks until
      * the consumer is ready.
      */
     public void yield(Object value) {
+        if (closed.get()) {
+            throw streamClosed(null);
+        }
         try {
             queue.put(new Item(value));
         } catch (InterruptedException e) {
+            if (closed.get()) {
+                throw streamClosed(e);
+            }
             Thread.currentThread().interrupt();
-            CancellationException ce = new CancellationException("Interrupted 
during yield");
-            ce.initCause(e);
-            throw ce;
+            throw interrupted("Interrupted during yield", e);
         }
     }
 
@@ -72,12 +114,17 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
      * If interrupted, the completion signal is delivered on a best-effort 
basis.
      */
     public void complete() {
+        if (closed.get()) {
+            return;
+        }
         try {
             queue.put(DONE);
         } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            // Best-effort delivery: use non-blocking offer as fallback
-            queue.offer(DONE);
+            if (!closed.get()) {
+                Thread.currentThread().interrupt();
+                // Best-effort delivery: use non-blocking offer as fallback
+                queue.offer(DONE);
+            }
         }
     }
 
@@ -86,24 +133,37 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
      * If interrupted, the error signal is delivered on a best-effort basis.
      */
     public void error(Throwable t) {
+        if (closed.get()) {
+            return;
+        }
+        ErrorItem item = new ErrorItem(t != null ? t : new 
NullPointerException("null error in generator"));
         try {
-            queue.put(new ErrorItem(t != null ? t : new 
NullPointerException("null error in generator")));
+            queue.put(item);
         } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            // Best-effort delivery: use non-blocking offer as fallback
-            queue.offer(new ErrorItem(t != null ? t : new 
NullPointerException("null error in generator")));
+            if (!closed.get()) {
+                Thread.currentThread().interrupt();
+                // Best-effort delivery: use non-blocking offer as fallback
+                queue.offer(item);
+            }
         }
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public Awaitable<Boolean> moveNext() {
+        if (closed.get()) {
+            return Awaitable.of(false);
+        }
+        Thread currentThread = Thread.currentThread();
+        consumerThread.set(currentThread);
         try {
             Object next = queue.take();
             if (next == DONE) {
+                closed.set(true);
                 return Awaitable.of(false);
             }
             if (next instanceof ErrorItem ei) {
+                closed.set(true);
                 Throwable cause = ei.error;
                 if (cause instanceof Error err) throw err;
                 throw AsyncSupport.sneakyThrow(cause);
@@ -111,10 +171,13 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
             current = (T) ((Item) next).value;
             return Awaitable.of(true);
         } catch (InterruptedException e) {
+            if (closed.get()) {
+                return Awaitable.of(false);
+            }
             Thread.currentThread().interrupt();
-            CancellationException ce = new CancellationException("Interrupted 
during moveNext");
-            ce.initCause(e);
-            throw ce;
+            throw interrupted("Interrupted during moveNext", e);
+        } finally {
+            consumerThread.compareAndSet(currentThread, null);
         }
     }
 
@@ -123,6 +186,35 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
         return current;
     }
 
+    @Override
+    public void close() {
+        if (!closed.compareAndSet(false, true)) {
+            return;
+        }
+        Thread producer = producerThread.getAndSet(null);
+        if (producer != null && producer != Thread.currentThread()) {
+            producer.interrupt();
+        }
+        Thread consumer = consumerThread.getAndSet(null);
+        if (consumer != null && consumer != Thread.currentThread()) {
+            consumer.interrupt();
+        }
+    }
+
+    private static CancellationException interrupted(String message, 
InterruptedException cause) {
+        CancellationException ce = new CancellationException(message);
+        ce.initCause(cause);
+        return ce;
+    }
+
+    private static CancellationException streamClosed(InterruptedException 
cause) {
+        CancellationException ce = new CancellationException("Async stream was 
closed");
+        if (cause != null) {
+            ce.initCause(cause);
+        }
+        return ce;
+    }
+
     // Wrapper to handle null values in the queue
     private record Item(Object value) { }
     private record ErrorItem(Throwable error) { }
diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java 
b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
index 8875fd52d0..633e7038d4 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Deque;
 import java.util.List;
+import java.util.Locale;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -44,9 +45,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Internal runtime support for the {@code async}/{@code await} language 
feature
@@ -69,11 +72,16 @@ import java.util.concurrent.TimeUnit;
  *       producer/consumer bridge for methods using {@code yield return}</li>
  *   <li><b>Async stream conversion</b> — {@code toAsyncStream()} adapts
  *       collections, arrays, and adapter-supported types for {@code for 
await}</li>
+ *   <li><b>Async stream cleanup</b> — {@code closeStream()} lets 
compiler-generated
+ *       {@code for await} loops release resources and propagate early-exit
+ *       cancellation upstream</li>
  *   <li><b>Defer</b> — {@code createDeferScope()}, {@code defer()}, and
  *       {@code executeDeferScope()} implement Go-style deferred cleanup with
  *       LIFO execution and exception suppression</li>
  *   <li><b>Delay</b> — {@code delay()} provides non-blocking delays using a
  *       shared {@link java.util.concurrent.ScheduledExecutorService}</li>
+ *   <li><b>Timeouts</b> — {@code timeout()} and {@code timeoutOr()} apply
+ *       non-blocking deadlines while preserving the {@link Awaitable} 
abstraction</li>
  * </ul>
  * <p>
  * <b>Thread pool configuration</b>
@@ -411,6 +419,7 @@ public class AsyncSupport {
             public AsyncStream<T> doCall(Object... args) {
                 AsyncStreamGenerator<T> gen = new AsyncStreamGenerator<>();
                 CompletableFuture.runAsync(() -> {
+                    gen.attachProducer(Thread.currentThread());
                     try {
                         Object[] allArgs = new Object[args.length + 1];
                         allArgs[0] = gen;
@@ -419,6 +428,8 @@ public class AsyncSupport {
                         gen.complete();
                     } catch (Throwable t) {
                         gen.error(t);
+                    } finally {
+                        gen.detachProducer(Thread.currentThread());
                     }
                 }, defaultExecutor);
                 return gen;
@@ -621,6 +632,94 @@ public class AsyncSupport {
         return results;
     }
 
+    /**
+     * Adapts the given source to an {@link Awaitable} and returns a new
+     * awaitable that fails with {@link java.util.concurrent.TimeoutException}
+     * if the source does not complete before the timeout expires.
+     * <p>
+     * The timeout does <em>not</em> cancel the original source automatically.
+     * This mirrors the value-level timeout composition style used by
+     * JavaScript's race-based patterns and keeps Groovy's semantics explicit.
+     * Callers that require cooperative cancellation can still invoke
+     * {@link Awaitable#cancel()} on the original task explicitly.
+     *
+     * @param source the async source to time out
+     * @param duration the timeout duration
+     * @param unit the time unit
+     * @param <T> the result type
+     * @return a new awaitable with timeout semantics
+     * @throws IllegalArgumentException if {@code source} is {@code null},
+     *         {@code duration} is negative, or {@code unit} is {@code null}
+     * @since 6.0.0
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> Awaitable<T> timeout(Object source, long duration, 
TimeUnit unit) {
+        validateTimeoutArguments(source, duration, unit, "Awaitable.timeout");
+        CompletableFuture<T> sourceFuture = (CompletableFuture<T>) 
toCompletableFuture(source);
+        CompletableFuture<T> result = new CompletableFuture<>();
+        TimeoutException te = new TimeoutException(
+                "Timed out after " + duration + " " + 
unit.name().toLowerCase(Locale.ROOT));
+        scheduleTimeoutRace(sourceFuture, result, () -> 
result.completeExceptionally(te), duration, unit);
+        return GroovyPromise.of(result);
+    }
+
+    /**
+     * Adapts the given source to an {@link Awaitable} and returns a new
+     * awaitable that yields the supplied fallback value if the timeout expires
+     * first.
+     *
+     * @param source the async source to wait for
+     * @param fallback the value to use when the timeout expires
+     * @param duration the timeout duration
+     * @param unit the time unit
+     * @param <T> the result type
+     * @return a new awaitable yielding either the source result or the 
fallback
+     * @throws IllegalArgumentException if {@code source} is {@code null},
+     *         {@code duration} is negative, or {@code unit} is {@code null}
+     * @since 6.0.0
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> Awaitable<T> timeoutOr(Object source, T fallback, long 
duration, TimeUnit unit) {
+        validateTimeoutArguments(source, duration, unit, 
"Awaitable.timeoutOr");
+        CompletableFuture<T> sourceFuture = (CompletableFuture<T>) 
toCompletableFuture(source);
+        CompletableFuture<T> result = new CompletableFuture<>();
+        scheduleTimeoutRace(sourceFuture, result, () -> 
result.complete(fallback), duration, unit);
+        return GroovyPromise.of(result);
+    }
+
+    /**
+     * Shared logic for {@link #timeout} and {@link #timeoutOr}: schedules the
+     * timeout action and wires up source completion to cancel the timer and
+     * propagate the result or error.
+     */
+    private static <T> void scheduleTimeoutRace(
+            CompletableFuture<T> sourceFuture,
+            CompletableFuture<T> result,
+            Runnable onTimeout,
+            long duration, TimeUnit unit) {
+        ScheduledFuture<?> timeoutTask = DELAY_SCHEDULER.schedule(onTimeout, 
duration, unit);
+        sourceFuture.whenComplete((value, error) -> {
+            timeoutTask.cancel(false);
+            if (error != null) {
+                result.completeExceptionally(deepUnwrap(error));
+            } else {
+                result.complete(value);
+            }
+        });
+    }
+
+    private static void validateTimeoutArguments(Object source, long duration, 
TimeUnit unit, String methodName) {
+        if (source == null) {
+            throw new IllegalArgumentException(methodName + ": source must not 
be null");
+        }
+        if (unit == null) {
+            throw new IllegalArgumentException(methodName + ": TimeUnit must 
not be null");
+        }
+        if (duration < 0) {
+            throw new IllegalArgumentException(methodName + ": duration must 
not be negative: " + duration);
+        }
+    }
+
     // ---- delay ----------------------------------------------------------
 
     /**
@@ -695,6 +794,23 @@ public class AsyncSupport {
         return AwaitableAdapterRegistry.toAsyncStream(source);
     }
 
+    /**
+     * Closes the given async stream if it is non-null.
+     * <p>
+     * Compiler-generated {@code for await} loops call this from a synthetic
+     * {@code finally} block so that early {@code break}, {@code return}, or
+     * exceptional exit reliably releases the underlying stream resources and
+     * propagates cancellation to upstream producers where supported.
+     *
+     * @param stream the stream to close, or {@code null}
+     * @since 6.0.0
+     */
+    public static void closeStream(Object stream) {
+        if (stream instanceof AsyncStream<?> asyncStream) {
+            asyncStream.close();
+        }
+    }
+
     // ---- yield return / async generator ----------------------------------
 
     /**
@@ -743,11 +859,14 @@ public class AsyncSupport {
     public static <T> AsyncStream<T> generateAsyncStream(Closure<?> body) {
         AsyncStreamGenerator<T> gen = new AsyncStreamGenerator<>();
         CompletableFuture.runAsync(() -> {
+            gen.attachProducer(Thread.currentThread());
             try {
                 body.call(gen);
                 gen.complete();
             } catch (Throwable t) {
                 gen.error(t);
+            } finally {
+                gen.detachProducer(Thread.currentThread());
             }
         }, defaultExecutor);
         return gen;
diff --git a/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java 
b/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
index 0b3a85a03e..df22416b2d 100644
--- a/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
+++ b/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
@@ -21,6 +21,7 @@ package org.apache.groovy.runtime.async;
 import groovy.concurrent.Awaitable;
 
 import java.util.Objects;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -59,12 +60,20 @@ public class GroovyPromise<T> implements Awaitable<T> {
 
     @Override
     public T get() throws InterruptedException, ExecutionException {
-        return future.get();
+        try {
+            return future.get();
+        } catch (CancellationException e) {
+            throw unwrapCancellation(e);
+        }
     }
 
     @Override
     public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
-        return future.get(timeout, unit);
+        try {
+            return future.get(timeout, unit);
+        } catch (CancellationException e) {
+            throw unwrapCancellation(e);
+        }
     }
 
     @Override
@@ -123,6 +132,17 @@ public class GroovyPromise<T> implements Awaitable<T> {
         return future;
     }
 
+    /**
+     * JDK 23+ wraps a stored {@link CancellationException} in a new instance
+     * with the generic message {@code "get"} when {@link 
CompletableFuture#get()}
+     * is called.  Unwrap it here so Groovy users consistently observe the
+     * original cancellation message and cause chain across all supported JDKs.
+     */
+    private static CancellationException 
unwrapCancellation(CancellationException exception) {
+        Throwable cause = exception.getCause();
+        return cause instanceof CancellationException ce ? ce : exception;
+    }
+
     @Override
     public String toString() {
         if (future.isDone()) {
diff --git a/src/spec/doc/core-async-await.adoc 
b/src/spec/doc/core-async-await.adoc
index 9d804f4381..a52c09e8d4 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -26,7 +26,8 @@
 == Introduction
 
 Groovy provides native `async`/`await` support as a language-level feature, 
enabling developers to write
-asynchronous code in a sequential, readable style. Inspired by similar 
constructs in JavaScript, C# and Go,
+asynchronous code in a sequential, readable style. Inspired by similar 
constructs in JavaScript, C#, Kotlin,
+Swift, and Go,
 Groovy's async/await integrates seamlessly with the JVM concurrency model 
while maintaining Groovy's hallmark
 conciseness and expressiveness.
 
@@ -350,7 +351,8 @@ 
include::../test/AsyncAwaitSpecTest.groovy[tags=exception_multiple_tasks,indent=
 
 The `groovy.concurrent.Awaitable` interface provides static combinator methods 
for common async
 coordination patterns, analogous to JavaScript's `Promise.all()` / 
`Promise.any()` /
-`Promise.allSettled()` and C#'s `Task.WhenAll()` / `Task.WhenAny()`.
+`Promise.allSettled()`, C#'s `Task.WhenAll()` / `Task.WhenAny()`, and Kotlin's 
coroutine
+coordination helpers.
 
 === `all` — Parallel Execution
 
@@ -380,6 +382,18 @@ each result is wrapped in an `AwaitResult` with 
`isSuccess()`, `isFailure()`, `v
 include::../test/AsyncAwaitSpecTest.groovy[tags=await_all_settled,indent=0]
 ----
 
+=== Continuation Helpers
+
+Use `thenAccept`, `whenComplete`, and `handle` when you want continuation-style
+composition rather than a direct `await`. This is especially useful at API 
boundaries
+where you want to attach logging, recovery, or side effects without immediately
+blocking for the result.
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=continuation_helpers,indent=0]
+----
+
 === `delay` — Non-Blocking Pause
 
 Create an awaitable timer (analogous to JavaScript's `setTimeout` wrapped in a 
`Promise`):
@@ -389,6 +403,18 @@ Create an awaitable timer (analogous to JavaScript's 
`setTimeout` wrapped in a `
 include::../test/AsyncAwaitSpecTest.groovy[tags=delay_example,indent=0]
 ----
 
+=== `timeout` / `timeoutOr` — Non-Blocking Deadlines
+
+Apply a deadline without blocking the calling thread. `Awaitable.timeout(...)`
+fails with `TimeoutException`, while `Awaitable.timeoutOr(...)` completes with 
a
+fallback value. The instance forms `orTimeout(...)` and 
`completeOnTimeout(...)`
+provide the same capability on an existing awaitable.
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=timeout_combinators,indent=0]
+----
+
 [[flow-publisher]]
 == `Flow.Publisher` Integration
 
@@ -411,6 +437,15 @@ 
include::../test/AsyncAwaitSpecTest.groovy[tags=flow_publisher_await,indent=0]
 
include::../test/AsyncAwaitSpecTest.groovy[tags=flow_publisher_for_await,indent=0]
 ----
 
+[NOTE]
+====
+`for await` closes the adapted stream from a compiler-generated `finally` 
block.
+For `Flow.Publisher` sources, this means an early `break`, `return`, or 
exception
+cancels the upstream subscription rather than leaving it running in the 
background.
+This is conceptually similar to JavaScript async iterators' `return()`, C#'s
+`await foreach` disposal, and Swift's `AsyncSequence` early-exit cleanup.
+====
+
 [[adapter-registry]]
 == Adapter Registry
 
@@ -507,7 +542,8 @@ 
include::../test/AsyncAwaitSpecTest.groovy[tags=retry_pattern,indent=0]
 
 === Timeout
 
-Use `Awaitable.any()` to race an operation against a delay, implementing a 
timeout:
+Use `Awaitable.timeout()` (or the instance form `orTimeout()`) to apply a
+deadline without dropping back to manual race logic:
 
 [source,groovy]
 ----
@@ -610,9 +646,11 @@ This section provides architectural context for Groovy 
language maintainers and
 [[implementation-strategy]]
 === Thread-Based vs State Machine
 
-Languages like C# transform async methods into **state machine** classes at
-compile time. Each `await` point becomes a numbered state, and the compiler 
rewrites the
-method body so it can be suspended and resumed without blocking an OS thread.
+Languages like C#, Kotlin, and Swift transform async workflows into 
compiler-managed
+state machines or coroutine frames. Each suspension point becomes resumable 
state,
+allowing the runtime to suspend the logical task without blocking an OS thread.
+JavaScript takes a different route again, relying on an event loop plus promise
+microtasks.
 
 Groovy takes a different approach: each `async` method body runs on its **own 
thread**
 (a virtual thread on JDK 21+, or a pooled platform thread on JDK 17–20). The 
`await`
@@ -650,7 +688,8 @@ consistent code generation for both the keyword and 
annotation forms.
 
 **Runtime layer** (`AsyncSupport`, `GroovyPromise`, `AsyncStreamGenerator`,
 `AwaitableAdapterRegistry`): `AsyncSupport` is the central runtime class 
containing static
-methods for `await`, `async`, `defer`, `yield return`, and combinators. 
`GroovyPromise` wraps
+methods for `await`, `async`, `defer`, `yield return`, timeout composition, 
stream cleanup,
+and combinators. `GroovyPromise` wraps
 `CompletableFuture` to implement the `Awaitable` interface, decoupling the 
public API from the
 JDK implementation. `AsyncStreamGenerator` implements the producer/consumer 
pattern for async
 generators using a `SynchronousQueue`. `AwaitableAdapterRegistry` provides the 
SPI extension
@@ -665,7 +704,9 @@ All runtime components employ lock-free or 
minimal-contention synchronization:
 * `AwaitableAdapterRegistry.ADAPTERS` — `CopyOnWriteArrayList` for lock-free 
iteration during adapter lookup
 * `AwaitableAdapterRegistry.blockingExecutor` — `volatile` field
 * `AsyncStreamGenerator.current` — `volatile` field for producer/consumer 
visibility
-* `Flow.Publisher` adaptation — `AtomicReference<Subscription>` for 
thread-safe subscription management
+* `AsyncStreamGenerator` close state — `AtomicBoolean` + 
`AtomicReference<Thread>` for prompt,
+  idempotent close/cancellation signalling
+* `Flow.Publisher` adaptation — `AtomicReference<Subscription>` plus 
close-aware queue signalling
 * Defer scopes — per-task `ArrayDeque`, no cross-thread sharing
 * `DELAY_SCHEDULER` — single daemon thread for non-blocking timer operations
 
@@ -673,101 +714,144 @@ All runtime components employ lock-free or 
minimal-contention synchronization:
 == Cross-Language Comparison
 
 The following table compares Groovy's async/await with the corresponding 
features in
-JavaScript and C#, for developers familiar with those languages.
+JavaScript, C#, Kotlin, and Swift, for developers familiar with those 
languages.
 
-[cols="3,4,4,4"]
+[cols="2,3,3,3,3,3"]
 |===
-| Feature | Groovy | JavaScript | C#
+| Feature | Groovy | JavaScript | C# | Kotlin | Swift
 
 | **Async declaration**
 | `async def foo() { }`
 | `async function foo() { }`
 | `async Task<T> Foo() { }`
+| `suspend fun foo(): T`
+| `func foo() async throws -> T`
 
 | **Await expression**
 | `await expr` or `await(expr)`
 | `await expr`
 | `await expr`
+| `deferred.await()` / suspend call
+| `try await expr`
 
 | **Return type**
 | `Awaitable<T>`
 | `Promise<T>`
 | `Task<T>` / `ValueTask<T>`
+| `Deferred<T>` / suspended `T`
+| `Task<T, Error>` / `async` function returns `T`
 
 | **Async closure/lambda**
 | `async { -> body }`
 | `async () => body`
 | `async () => body`
+| `scope.async { body }`
+| `Task { body }`
 
 | **Async iteration**
 | `for await (x in src) { }`
 | `for await (x of src) { }`
 | `await foreach (x in src) { }`
+| _(manual via `Flow`, `Channel`, or `Flow.collect`)_
+| `for try await x in seq`
 
 | **Async generator**
 | `yield return expr`
 | `yield expr` (in `async function*`)
 | `yield return expr` (in `IAsyncEnumerable<T>`)
+| `flow { emit(x) }` / channel producers
+| `AsyncStream { continuation.yield(x) }`
 
 | **Deferred cleanup**
 | `defer { cleanup }`
 | _(none; use `try`/`finally`)_
-| _(none; use `try`/`finally` or `IAsyncDisposable`)_
+| `await using` / `try`/`finally`
+| `try`/`finally` / `use`
+| `defer`
 
 | **Wait all**
 | `Awaitable.all(a, b, c)`
 | `Promise.all([a, b, c])`
 | `Task.WhenAll(a, b, c)`
+| `awaitAll(a, b, c)`
+| `async let` / `withTaskGroup`
 
 | **Wait any**
 | `Awaitable.any(a, b, c)`
 | `Promise.any([a, b, c])`
 | `Task.WhenAny(a, b, c)`
+| `select {}` / manual race
+| `TaskGroup.next()`
 
 | **Wait all settled**
 | `Awaitable.allSettled(a, b, c)`
 | `Promise.allSettled([a, b, c])`
 | _(manual; no built-in equivalent)_
+| _(manual; typically `runCatching`)_
+| _(manual; typically `Result`)_
 
 | **Pre-computed result**
 | `Awaitable.of(value)`
 | `Promise.resolve(value)`
 | `Task.FromResult(value)`
+| `CompletableDeferred(value)` / `async { value }`
+| `Task { value }`
 
 | **Pre-computed error**
 | `Awaitable.failed(error)`
 | `Promise.reject(error)`
 | `Task.FromException(ex)`
+| `CompletableDeferred<T>().completeExceptionally(ex)`
+| `Task { throw error }`
 
 | **Delay**
 | `Awaitable.delay(ms)`
 | `new Promise(r => setTimeout(r, ms))`
 | `Task.Delay(ms)`
+| `delay(ms)`
+| `Task.sleep(for:)`
+
+| **Timeout**
+| `Awaitable.timeout(task, ms)` / `task.orTimeout(ms)`
+| `Promise.race([...])` / `AbortSignal.timeout()`
+| `task.WaitAsync(timeout)` / `CancelAfter`
+| `withTimeout(ms) { ... }`
+| _(manual race with `TaskGroup` / sleep)_
 
 | **Cancellation**
 | `awaitable.cancel()`
 | `AbortController.abort()`
 | `CancellationToken`
+| `Job.cancel()`
+| `Task.cancel()`
 
 | **Executor/scheduler**
 | `Awaitable.setExecutor(exec)`
 | _(event loop; not configurable)_
 | `TaskScheduler`
+| `CoroutineDispatcher`
+| task priority / executor chosen by runtime
 
 | **Third-party support**
 | `AwaitableAdapterRegistry` SPI
 | _(native `thenable` protocol)_
 | Custom awaiters via `GetAwaiter()`
+| coroutine adapters / bridges
+| `AsyncSequence` / `Task` interop patterns
 
 | **Annotation form**
 | `@Async`
 | _(none)_
 | _(none; attribute-driven async not supported)_
+| _(none)_
+| _(none)_
 
 | **Implementation**
 | Thread-per-task (virtual threads on JDK 21+)
 | Event loop + microtask queue
 | State machine (compiled IL transformation)
+| Coroutine state machine
+| Compiler-generated async state machine
 |===
 
 [[summary]]
diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy 
b/src/spec/test/AsyncAwaitSpecTest.groovy
index 4b03993f85..66f5735d06 100644
--- a/src/spec/test/AsyncAwaitSpecTest.groovy
+++ b/src/spec/test/AsyncAwaitSpecTest.groovy
@@ -638,6 +638,56 @@ assert await(delayedGreeting()) == "Hello after delay"
         '''
     }
 
+    @Test
+    void testContinuationHelpers() {
+        assertScript '''
+// tag::continuation_helpers[]
+import groovy.concurrent.Awaitable
+import java.util.concurrent.atomic.AtomicReference
+
+def observed = new AtomicReference()
+await(Awaitable.of("groovy").thenAccept { observed.set(it.toUpperCase()) })
+assert observed.get() == "GROOVY"
+
+def recovered = await(
+    Awaitable.failed(new IOException("boom"))
+        .handle { value, error -> "recovered from ${error.message}" }
+)
+assert recovered == "recovered from boom"
+
+def completion = new AtomicReference()
+assert await(Awaitable.of(7).whenComplete { value, error ->
+    completion.set("${value}:${error}")
+}) == 7
+assert completion.get() == "7:null"
+// end::continuation_helpers[]
+        '''
+    }
+
+    @Test
+    void testTimeoutCombinators() {
+        assertScript '''
+// tag::timeout_combinators[]
+import groovy.concurrent.Awaitable
+import java.util.concurrent.TimeoutException
+
+async slowCall() {
+    await(Awaitable.delay(5_000))
+    return "done"
+}
+
+try {
+    await(Awaitable.timeout(slowCall(), 50))
+    assert false : "should have timed out"
+} catch (TimeoutException e) {
+    assert e.message.contains("Timed out after 50")
+}
+
+assert await(Awaitable.timeoutOr(slowCall(), "cached", 50)) == "cached"
+// end::timeout_combinators[]
+        '''
+    }
+
     // 
=========================================================================
     // 9. Flow.Publisher integration
     // 
=========================================================================
@@ -823,16 +873,11 @@ async longRunningTask() {
     return "completed"
 }
 
-async withTimeout(Awaitable task, long timeoutMs) {
-    def timeout = Awaitable.delay(timeoutMs).then { throw new 
TimeoutException("timed out") }
-    return await Awaitable.any(task, timeout)
-}
-
 try {
-    await withTimeout(longRunningTask(), 50)
+    await Awaitable.timeout(longRunningTask(), 50)
     assert false : "should have timed out"
 } catch (TimeoutException e) {
-    assert e.message == "timed out"
+    assert e.message.contains("Timed out after 50")
 }
 // end::timeout_pattern[]
         '''
diff --git 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
index acbbce06ed..c2d30ba7f3 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy
@@ -672,6 +672,87 @@ class AsyncAwaitSyntaxTest {
         '''
     }
 
+    @Test
+    void testForAwaitClosesCustomStreamOnEarlyReturn() {
+        assertScript '''
+            import groovy.concurrent.AsyncStream
+            import groovy.concurrent.Awaitable
+            import java.util.concurrent.atomic.AtomicInteger
+
+            class TrackingStream implements AsyncStream<Integer> {
+                private final List<Integer> values = [10, 20, 30]
+                private int index = 0
+                private Integer current
+                final AtomicInteger closeCount = new AtomicInteger()
+
+                Awaitable<Boolean> moveNext() {
+                    if (index >= values.size()) return Awaitable.of(false)
+                    current = values[index++]
+                    return Awaitable.of(true)
+                }
+
+                Integer getCurrent() { current }
+
+                void close() { closeCount.incrementAndGet() }
+            }
+
+            class Svc {
+                async first(TrackingStream stream) {
+                    for await (item in stream) {
+                        return item
+                    }
+                    return -1
+                }
+            }
+
+            def stream = new TrackingStream()
+            assert new Svc().first(stream).get() == 10
+            assert stream.closeCount.get() == 1
+        '''
+    }
+
+    @Test
+    void testForAwaitCancelsPublisherOnEarlyReturn() {
+        assertScript '''
+            import java.util.concurrent.Flow
+            import java.util.concurrent.atomic.AtomicBoolean
+
+            class TestPublisher implements Flow.Publisher<Integer> {
+                final AtomicBoolean cancelled = new AtomicBoolean(false)
+
+                void subscribe(Flow.Subscriber<? super Integer> subscriber) {
+                    subscriber.onSubscribe(new Flow.Subscription() {
+                        boolean emitted
+
+                        void request(long n) {
+                            if (!emitted) {
+                                emitted = true
+                                subscriber.onNext(42)
+                            }
+                        }
+
+                        void cancel() {
+                            cancelled.set(true)
+                        }
+                    })
+                }
+            }
+
+            class Svc {
+                async first(TestPublisher publisher) {
+                    for await (item in publisher) {
+                        return item
+                    }
+                    return -1
+                }
+            }
+
+            def publisher = new TestPublisher()
+            assert new Svc().first(publisher).get() == 42
+            assert publisher.cancelled.get()
+        '''
+    }
+
     @Test
     void testForAwaitWithContinue() {
         assertScript '''
diff --git 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncCoverageTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncCoverageTest.groovy
index 2802193bf4..21e5ec380e 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncCoverageTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncCoverageTest.groovy
@@ -39,6 +39,7 @@ import java.util.concurrent.Flow
 import java.util.concurrent.FutureTask
 import java.util.concurrent.SubmissionPublisher
 import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
 
 import static groovy.test.GroovyAssert.assertScript
 
@@ -109,6 +110,100 @@ class AsyncCoverageTest {
         '''
     }
 
+    @Test
+    void testThenAcceptRunsSideEffect() {
+        assertScript '''
+            import groovy.concurrent.Awaitable
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def seen = new AtomicInteger()
+            await(Awaitable.of(21).thenAccept { seen.set(it * 2) })
+            assert seen.get() == 42
+        '''
+    }
+
+    @Test
+    void testWhenCompleteSeesUnwrappedFailure() {
+        assertScript '''
+            import groovy.concurrent.Awaitable
+            import java.util.concurrent.atomic.AtomicReference
+
+            def successful = new AtomicReference()
+            assert await(Awaitable.of("ok").whenComplete { value, error ->
+                successful.set([value, error])
+            }) == "ok"
+            assert successful.get()[0] == "ok"
+            assert successful.get()[1] == null
+
+            def failed = Awaitable.failed(new 
IOException("boom")).whenComplete { value, error ->
+                assert value == null
+                assert error instanceof IOException
+                assert error.message == "boom"
+            }
+
+            try {
+                await(failed)
+                assert false : "Should have thrown"
+            } catch (IOException e) {
+                assert e.message == "boom"
+            }
+        '''
+    }
+
+    @Test
+    void testHandleProjectsSuccessAndFailure() {
+        assertScript '''
+            import groovy.concurrent.Awaitable
+
+            def ok = Awaitable.of(5).handle { value, error ->
+                assert error == null
+                value * 2
+            }
+            assert await(ok) == 10
+
+            def recovered = Awaitable.failed(new IOException("boom")).handle { 
value, error ->
+                assert value == null
+                "fallback: ${error.message}"
+            }
+            assert await(recovered) == "fallback: boom"
+        '''
+    }
+
+    @Test
+    void testTimeoutCombinators() {
+        assertScript '''
+            import groovy.concurrent.Awaitable
+            import java.util.concurrent.CompletableFuture
+            import java.util.concurrent.TimeUnit
+            import java.util.concurrent.TimeoutException
+
+            assert await(Awaitable.of("fast").orTimeout(1, TimeUnit.SECONDS)) 
== "fast"
+
+            try {
+                await(Awaitable.delay(10_000).then { "late" }.orTimeout(50, 
TimeUnit.MILLISECONDS))
+                assert false : "Should have timed out"
+            } catch (TimeoutException e) {
+                assert e.message.contains("Timed out after 50")
+            }
+
+            try {
+                await(Awaitable.timeout(new CompletableFuture<String>(), 50, 
TimeUnit.MILLISECONDS))
+                assert false : "Should have timed out"
+            } catch (TimeoutException e) {
+                assert e.message.contains("Timed out after 50")
+            }
+
+            def fallback = await(Awaitable.timeoutOr(
+                Awaitable.delay(10_000).then { "late" },
+                "cached",
+                50,
+                TimeUnit.MILLISECONDS
+            ))
+            assert fallback == "cached"
+            assert await(Awaitable.of("value").completeOnTimeout("fallback", 
1, TimeUnit.SECONDS)) == "value"
+        '''
+    }
+
     // ================================================================
     // Awaitable instance methods: isDone, cancel, isCancelled,
     // isCompletedExceptionally, toCompletableFuture, get(timeout)
@@ -201,6 +296,27 @@ class AsyncCoverageTest {
         '''
     }
 
+    @Test
+    void testCancellationMessageIsStableAcrossJdks() {
+        assertScript '''
+            import groovy.concurrent.Awaitable
+            import java.util.concurrent.CancellationException
+
+            def original = new CancellationException("cancelled-by-user")
+            original.initCause(new InterruptedException("interrupt-cause"))
+
+            def failed = Awaitable.failed(original)
+            try {
+                failed.get()
+                assert false : "Should have thrown"
+            } catch (CancellationException e) {
+                assert e.message == "cancelled-by-user"
+                assert e.cause instanceof InterruptedException
+                assert e.cause.message == "interrupt-cause"
+            }
+        '''
+    }
+
     // ================================================================
     // Awaitable static: delay(long, TimeUnit), getExecutor,
     // isVirtualThreadsAvailable
@@ -242,6 +358,29 @@ class AsyncCoverageTest {
         '''
     }
 
+    @Test
+    void testCloseStreamHelperClosesStream() {
+        AtomicBoolean closed = new AtomicBoolean(false)
+        AsyncSupport.closeStream(new AsyncStream<String>() {
+            @Override
+            Awaitable<Boolean> moveNext() {
+                Awaitable.of(false)
+            }
+
+            @Override
+            String getCurrent() {
+                null
+            }
+
+            @Override
+            void close() {
+                closed.set(true)
+            }
+        })
+        assert closed.get()
+        AsyncSupport.closeStream(null)
+    }
+
     @Test
     void testSetExecutorAndRestore() {
         assertScript '''

Reply via email to