This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
     new 7dde6e5d71 Minor tweaks
7dde6e5d71 is described below

commit 7dde6e5d711aceb66296e38feb29207644426a46
Author: Daniel Sun <[email protected]>
AuthorDate: Wed Mar 11 00:56:06 2026 +0900

    Minor tweaks
---
 src/main/java/groovy/concurrent/AsyncStream.java   |   8 +-
 src/main/java/groovy/concurrent/AwaitResult.java   |   9 ++
 src/main/java/groovy/concurrent/Awaitable.java     |  46 +++++++-
 .../java/groovy/concurrent/AwaitableAdapter.java   |  12 ++-
 .../groovy/runtime/async/AsyncStreamGenerator.java |  29 +++--
 src/spec/doc/core-async-await.adoc                 |  21 ++--
 .../codehaus/groovy/transform/AsyncApiTest.groovy  | 117 ++++++++++++++++++++-
 .../groovy/transform/AsyncPatternsTest.groovy      |   2 +-
 8 files changed, 225 insertions(+), 19 deletions(-)

diff --git a/src/main/java/groovy/concurrent/AsyncStream.java 
b/src/main/java/groovy/concurrent/AsyncStream.java
index f9c0ca89c9..26c7abe316 100644
--- a/src/main/java/groovy/concurrent/AsyncStream.java
+++ b/src/main/java/groovy/concurrent/AsyncStream.java
@@ -83,7 +83,13 @@ public interface AsyncStream<T> extends AutoCloseable {
         return (AsyncStream<T>) EMPTY;
     }
 
-    /** Singleton empty stream instance. */
+    /**
+     * Singleton empty stream instance.
+     * <p>
+     * This is an implementation detail backing {@link #empty()}.
+     * User code should call {@code AsyncStream.empty()} rather than
+     * referencing this field directly.
+     */
     AsyncStream<Object> EMPTY = new AsyncStream<>() {
         @Override public Awaitable<Boolean> moveNext() { return 
Awaitable.of(false); }
         @Override public Object getCurrent() { return null; }
diff --git a/src/main/java/groovy/concurrent/AwaitResult.java 
b/src/main/java/groovy/concurrent/AwaitResult.java
index 31aedbee80..ea89d97dff 100644
--- a/src/main/java/groovy/concurrent/AwaitResult.java
+++ b/src/main/java/groovy/concurrent/AwaitResult.java
@@ -47,6 +47,10 @@ public final class AwaitResult<T> {
 
     /**
      * Creates a successful result with the given value.
+     *
+     * @param value the computation result (may be {@code null})
+     * @param <T>   the value type
+     * @return a success result wrapping the value
      */
     @SuppressWarnings("unchecked")
     public static <T> AwaitResult<T> success(Object value) {
@@ -55,6 +59,11 @@ public final class AwaitResult<T> {
 
     /**
      * Creates a failure result with the given exception.
+     *
+     * @param error the exception that caused the failure; must not be {@code 
null}
+     * @param <T>   the value type (never actually used, since the result is a 
failure)
+     * @return a failure result wrapping the exception
+     * @throws NullPointerException if {@code error} is {@code null}
      */
     public static <T> AwaitResult<T> failure(Throwable error) {
         return new AwaitResult<>(null, Objects.requireNonNull(error), false);
diff --git a/src/main/java/groovy/concurrent/Awaitable.java 
b/src/main/java/groovy/concurrent/Awaitable.java
index 4640bebcef..b64d556d87 100644
--- a/src/main/java/groovy/concurrent/Awaitable.java
+++ b/src/main/java/groovy/concurrent/Awaitable.java
@@ -94,16 +94,30 @@ public interface Awaitable<T> {
 
     /**
      * Blocks until the computation completes and returns the result.
+     *
+     * @return the computed result
+     * @throws InterruptedException if the calling thread is interrupted while 
waiting
+     * @throws ExecutionException if the computation completed exceptionally
      */
     T get() throws InterruptedException, ExecutionException;
 
     /**
      * Blocks until the computation completes or the timeout expires.
+     *
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     * @return the computed result
+     * @throws InterruptedException if the calling thread is interrupted while 
waiting
+     * @throws ExecutionException if the computation completed exceptionally
+     * @throws TimeoutException if the wait timed out
      */
     T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException;
 
     /**
-     * Returns {@code true} if the computation has completed (normally or 
exceptionally).
+     * Returns {@code true} if the computation has completed (normally,
+     * exceptionally, or via cancellation).
+     *
+     * @return {@code true} if complete
      */
     boolean isDone();
 
@@ -118,24 +132,37 @@ public interface Awaitable<T> {
 
     /**
      * Returns {@code true} if the computation was cancelled before completing 
normally.
+     *
+     * @return {@code true} if cancelled
      */
     boolean isCancelled();
 
     /**
      * Returns {@code true} if this computation completed exceptionally
      * (including cancellation).
+     *
+     * @return {@code true} if completed with an error or cancellation
      */
     boolean isCompletedExceptionally();
 
     /**
      * Returns a new {@code Awaitable} whose result is obtained by applying the
      * given function to this awaitable's result when it completes.
+     *
+     * @param fn the mapping function
+     * @param <U> the type of the mapped result
+     * @return a new awaitable holding the mapped result
      */
     <U> Awaitable<U> then(Function<? super T, ? extends U> fn);
 
     /**
-     * Returns a new {@code Awaitable} produced by applying the given function
-     * to this awaitable's result, flattening the nested {@code Awaitable}.
+     * Returns a new {@code Awaitable} produced by applying the given async
+     * function to this awaitable's result, flattening the nested {@code 
Awaitable}.
+     * This is the monadic {@code flatMap} operation for awaitables.
+     *
+     * @param fn the async mapping function that returns an {@code Awaitable}
+     * @param <U> the type of the inner awaitable's result
+     * @return a new awaitable holding the inner result
      */
     <U> Awaitable<U> thenCompose(Function<? super T, ? extends Awaitable<U>> 
fn);
 
@@ -158,6 +185,11 @@ public interface Awaitable<T> {
     /**
      * Returns a new {@code Awaitable} that, if this one completes 
exceptionally,
      * applies the given function to the exception to produce a recovery value.
+     * The throwable passed to the function is deeply unwrapped to strip JDK
+     * wrapper layers.
+     *
+     * @param fn the recovery function
+     * @return a new awaitable that recovers from failures
      */
     Awaitable<T> exceptionally(Function<Throwable, ? extends T> fn);
 
@@ -260,6 +292,8 @@ public interface Awaitable<T> {
     /**
      * Converts this {@code Awaitable} to a JDK {@link CompletableFuture}
      * for interoperability with APIs that require it.
+     *
+     * @return a {@code CompletableFuture} representing this computation
      */
     CompletableFuture<T> toCompletableFuture();
 
@@ -267,6 +301,12 @@ public interface Awaitable<T> {
 
     /**
      * Returns an already-completed {@code Awaitable} with the given value.
+     * Analogous to C#'s {@code Task.FromResult()} or JavaScript's
+     * {@code Promise.resolve()}.
+     *
+     * @param value the result value (may be {@code null})
+     * @param <T>   the result type
+     * @return a completed awaitable
      */
     static <T> Awaitable<T> of(T value) {
         return new GroovyPromise<>(CompletableFuture.completedFuture(value));
diff --git a/src/main/java/groovy/concurrent/AwaitableAdapter.java 
b/src/main/java/groovy/concurrent/AwaitableAdapter.java
index 961ddbf5a7..148440a595 100644
--- a/src/main/java/groovy/concurrent/AwaitableAdapter.java
+++ b/src/main/java/groovy/concurrent/AwaitableAdapter.java
@@ -60,7 +60,12 @@ public interface AwaitableAdapter {
 
     /**
      * Returns {@code true} if this adapter can convert instances of the given
-     * type to {@link AsyncStream}.
+     * type to {@link AsyncStream}.  Defaults to {@code false}; override if
+     * this adapter supports multi-value stream conversion (e.g., for Reactor
+     * {@code Flux} or RxJava {@code Observable}).
+     *
+     * @param type the source class to check
+     * @return {@code true} if this adapter can produce an {@code AsyncStream}
      */
     default boolean supportsAsyncStream(Class<?> type) {
         return false;
@@ -69,6 +74,11 @@ public interface AwaitableAdapter {
     /**
      * Converts the given source object to an {@link AsyncStream}.
      * Called only when {@link #supportsAsyncStream} returned {@code true}.
+     *
+     * @param source the source object (e.g., a Reactor {@code Flux})
+     * @param <T>    the element type
+     * @return an async stream backed by the source
+     * @throws UnsupportedOperationException if not overridden (default)
      */
     default <T> AsyncStream<T> toAsyncStream(Object source) {
         throw new UnsupportedOperationException("AsyncStream conversion not 
supported by " + getClass().getName());
diff --git 
a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java 
b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
index 51396f4ef4..38c66e62a7 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
@@ -144,7 +144,13 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
 
     /**
      * Signals that the generator has completed (no more elements).
-     * If interrupted, the completion signal is delivered on a best-effort 
basis.
+     * <p>
+     * If the blocking {@code queue.put()} is interrupted, a best-effort
+     * non-blocking {@code queue.offer()} is attempted.  If that also fails
+     * (no consumer is currently blocked in {@code take()}), the stream is
+     * force-closed to prevent the consumer from blocking indefinitely on a
+     * subsequent {@link #moveNext()} call.  This defensive close ensures no
+     * thread leak occurs even under unexpected interrupt timing.
      */
     public void complete() {
         if (closed.get()) {
@@ -155,15 +161,25 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
         } catch (InterruptedException e) {
             if (!closed.get()) {
                 Thread.currentThread().interrupt();
-                // Best-effort delivery: use non-blocking offer as fallback
-                queue.offer(DONE);
+                // Best-effort: non-blocking handoff to a waiting consumer.
+                // If no consumer is waiting, offer() returns false and the 
DONE
+                // signal is lost — force-close to unblock future moveNext() 
calls.
+                if (!queue.offer(DONE)) {
+                    close();
+                }
             }
         }
     }
 
     /**
      * Signals that the generator failed with an exception.
-     * If interrupted, the error signal is delivered on a best-effort basis.
+     * <p>
+     * If the blocking {@code queue.put()} is interrupted, a best-effort
+     * non-blocking {@code queue.offer()} is attempted.  If that also fails,
+     * the stream is force-closed (same rationale as {@link #complete()}).
+     * The original error is not propagated to the consumer in this edge case;
+     * instead the consumer sees a clean stream closure — this is acceptable
+     * because the interrupt itself indicates an external cancellation.
      */
     public void error(Throwable t) {
         if (closed.get()) {
@@ -175,8 +191,9 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
         } catch (InterruptedException e) {
             if (!closed.get()) {
                 Thread.currentThread().interrupt();
-                // Best-effort delivery: use non-blocking offer as fallback
-                queue.offer(item);
+                if (!queue.offer(item)) {
+                    close();
+                }
             }
         }
     }
diff --git a/src/spec/doc/core-async-await.adoc 
b/src/spec/doc/core-async-await.adoc
index aa9e56bb80..a491c347bf 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -973,17 +973,26 @@ Several concurrency hazards are handled transparently:
 allowing it to clean up promptly even if blocked in I/O or a long computation.
 
 | **TOCTOU race prevention**
-| `moveNext()` uses a CAS-based double-check after taking from the queue: if 
the generator was
-closed between the call to `moveNext()` and the queue handoff, the taken value 
is discarded
-and `false` is returned, preventing stale values from leaking to the consumer.
+| `moveNext()` uses a double-check pattern: the `closed` flag is re-checked 
_after_
+registering the consumer thread. This closes a race window where `close()` 
could execute
+between the initial check and the `consumerThread.set()` call, which would 
leave the
+consumer stranded in `queue.take()` with no one to interrupt it.
 
-| **Single-consumer enforcement**
-| A single consumer is enforced by CAS on an internal flag. Attempting to call 
`moveNext()`
-from multiple threads concurrently throws `IllegalStateException`, catching 
misuse early.
+| **Thread-safe consumer tracking**
+| The consumer thread is tracked via `AtomicReference<Thread>` during 
`moveNext()`. This
+enables `close()` to interrupt a blocked consumer.  Note: concurrent 
`moveNext()` calls
+from multiple threads are not supported and may produce unpredictable results 
— async
+generators are inherently single-consumer (just like C#'s `IAsyncEnumerator`).
 
 | **Idempotent close**
 | `close()` is guarded by `AtomicBoolean.compareAndSet()`, making it safe to 
call multiple
 times from any thread without side effects.
+
+| **Signal delivery under interrupt**
+| If the producer's `complete()` or `error()` signal is interrupted and the 
non-blocking
+fallback delivery fails (no consumer waiting), the generator force-closes 
itself. This
+prevents the consumer from blocking indefinitely on a subsequent `moveNext()` 
— a defensive
+measure against unexpected thread interruption outside the normal close path.
 |===
 
 These mechanisms ensure that `yield return` / `for await` code remains as 
simple as writing
diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
index 2ed5ec1a64..7df7d1a152 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
@@ -1906,7 +1906,7 @@ class AsyncApiTest {
 
     @Test
     void testGroovyPromiseIsCompletedExceptionally() {
-        def cf = new java.util.concurrent.CompletableFuture<>()
+        def cf = new CompletableFuture<>()
         def promise = GroovyPromise.of(cf)
         assert !promise.isCompletedExceptionally()
         cf.completeExceptionally(new RuntimeException())
@@ -1919,4 +1919,119 @@ class AsyncApiTest {
             new GroovyPromise(null)
         }
     }
+
+    // ================================================================
+    // AsyncStreamGenerator: complete()/error() robustness under interrupt
+    // ================================================================
+
+    /**
+     * Verifies that when a producer's complete() signal is interrupted and
+     * the best-effort offer() fails (no consumer waiting), the generator
+     * force-closes so a subsequent moveNext() returns false instead of
+     * blocking indefinitely.
+     */
+    @Test
+    void testGeneratorCompleteForceClosesOnOfferFailure() {
+        def gen = new AsyncStreamGenerator<Integer>()
+        def producerThread = Thread.currentThread()
+        gen.attachProducer(producerThread)
+
+        // Interrupt the current thread so that queue.put(DONE) inside
+        // complete() throws InterruptedException.  Since no consumer is
+        // blocked in take(), the non-blocking offer(DONE) will also fail,
+        // triggering the force-close path.
+        producerThread.interrupt()
+        gen.complete()
+
+        // Clear the interrupt flag set by the force-close path
+        Thread.interrupted()
+
+        gen.detachProducer(producerThread)
+
+        // Consumer should see a cleanly closed stream — not block forever
+        def result = gen.moveNext()
+        assert !AsyncSupport.await(result) : "moveNext() should return false 
after force-close"
+    }
+
+    /**
+     * Verifies that when a producer's error() signal is interrupted and
+     * the best-effort offer() fails, the generator force-closes so the
+     * consumer does not hang.
+     */
+    @Test
+    void testGeneratorErrorForceClosesOnOfferFailure() {
+        def gen = new AsyncStreamGenerator<Integer>()
+        def producerThread = Thread.currentThread()
+        gen.attachProducer(producerThread)
+
+        producerThread.interrupt()
+        gen.error(new RuntimeException("test error"))
+
+        Thread.interrupted()
+        gen.detachProducer(producerThread)
+
+        def result = gen.moveNext()
+        assert !AsyncSupport.await(result) : "moveNext() should return false 
after force-close"
+    }
+
+    /**
+     * Verifies that complete() is a no-op when the stream is already closed.
+     */
+    @Test
+    void testGeneratorCompleteAfterClose() {
+        def gen = new AsyncStreamGenerator<Integer>()
+        gen.close()
+        // Should not throw or block
+        gen.complete()
+        def result = gen.moveNext()
+        assert !AsyncSupport.await(result)
+    }
+
+    /**
+     * Verifies that error() is a no-op when the stream is already closed.
+     */
+    @Test
+    void testGeneratorErrorAfterClose() {
+        def gen = new AsyncStreamGenerator<Integer>()
+        gen.close()
+        // Should not throw or block
+        gen.error(new RuntimeException("ignored"))
+        def result = gen.moveNext()
+        assert !AsyncSupport.await(result)
+    }
+
+    /**
+     * Verifies that close() is idempotent — multiple calls do not throw or
+     * cause double-interrupt of threads.
+     */
+    @Test
+    void testGeneratorCloseIdempotent() {
+        def gen = new AsyncStreamGenerator<Integer>()
+        gen.close()
+        gen.close()
+        gen.close()
+        // All should be no-ops; moveNext should still return false
+        assert !AsyncSupport.await(gen.moveNext())
+    }
+
+    /**
+     * Verifies that attachProducer on an already-closed stream immediately
+     * interrupts the producer thread, allowing the generator body to exit.
+     */
+    @Test
+    void testAttachProducerOnClosedStreamInterrupts() {
+        def gen = new AsyncStreamGenerator<Integer>()
+        gen.close()
+
+        def interrupted = new AtomicBoolean(false)
+        def latch = new CountDownLatch(1)
+        def t = new Thread({
+            gen.attachProducer(Thread.currentThread())
+            interrupted.set(Thread.currentThread().isInterrupted())
+            latch.countDown()
+        })
+        t.start()
+        latch.await(5, TimeUnit.SECONDS)
+        assert interrupted.get() : "Producer should be interrupted when 
attached to a closed stream"
+    }
 }
diff --git 
a/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy
index 76174a9bd5..5a0737f9cc 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy
@@ -696,7 +696,7 @@ class AsyncPatternsTest {
     }
 
     @Test
-    void testCancellationPattern() {
+    void testCancellationPatternWithClosureExpression() {
         assertScript '''
             import java.util.concurrent.CancellationException
             import groovy.concurrent.Awaitable

Reply via email to