This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
     new 7cd7f9edce Minor tweaks
7cd7f9edce is described below

commit 7cd7f9edcef1de93ca8df6a135a705c4e0a0f716
Author: Daniel Sun <[email protected]>
AuthorDate: Thu Mar 19 02:57:26 2026 +0900

    Minor tweaks
---
 .../concurrent/AwaitableAdapterRegistry.java       |   2 -
 .../apache/groovy/runtime/async/AsyncSupport.java  |  69 ++++----
 .../groovy/transform/AsyncTransformHelper.java     |  36 ++--
 src/spec/doc/core-async-await.adoc                 | 191 ++++++++-------------
 .../codehaus/groovy/transform/AsyncApiTest.groovy  |  82 +++++++++
 5 files changed, 200 insertions(+), 180 deletions(-)

diff --git a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java 
b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
index 61398d154d..9ecea35288 100644
--- a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
+++ b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
@@ -252,8 +252,6 @@ public class AwaitableAdapterRegistry {
                 cf.completeExceptionally(ce);
             } catch (ExecutionException e) {
                 cf.completeExceptionally(e.getCause() != null ? e.getCause() : 
e);
-            } catch (CancellationException e) {
-                cf.completeExceptionally(e);
             } catch (Exception e) {
                 cf.completeExceptionally(e);
             }
diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java 
b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
index 96658f81e8..7996c110e3 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
@@ -456,13 +456,7 @@ public class AsyncSupport {
         if (awaitables == null || awaitables.length == 0) {
             return new ArrayList<>();
         }
-        for (int i = 0; i < awaitables.length; i++) {
-            if (awaitables[i] == null) {
-                throw new IllegalArgumentException("awaitAll: element at index 
" + i + " is null");
-            }
-        }
-        CompletableFuture<?>[] futures =
-            
Arrays.stream(awaitables).map(AsyncSupport::toCompletableFuture).toArray(CompletableFuture[]::new);
+        CompletableFuture<?>[] futures = toCompletableFutures(awaitables, 
"awaitAll");
         try {
             CompletableFuture.allOf(futures).join();
         } catch (CompletionException e) {
@@ -490,13 +484,7 @@ public class AsyncSupport {
         if (awaitables == null || awaitables.length == 0) {
             throw new IllegalArgumentException("awaitAny requires at least one 
awaitable");
         }
-        for (int i = 0; i < awaitables.length; i++) {
-            if (awaitables[i] == null) {
-                throw new IllegalArgumentException("awaitAny: element at index 
" + i + " is null");
-            }
-        }
-        CompletableFuture<?>[] futures =
-            
Arrays.stream(awaitables).map(AsyncSupport::toCompletableFuture).toArray(CompletableFuture[]::new);
+        CompletableFuture<?>[] futures = toCompletableFutures(awaitables, 
"awaitAny");
         try {
             return CompletableFuture.anyOf(futures).join();
         } catch (CompletionException e) {
@@ -521,13 +509,7 @@ public class AsyncSupport {
         if (awaitables == null || awaitables.length == 0) {
             return new ArrayList<>();
         }
-        for (int i = 0; i < awaitables.length; i++) {
-            if (awaitables[i] == null) {
-                throw new IllegalArgumentException("awaitAllSettled: element 
at index " + i + " is null");
-            }
-        }
-        CompletableFuture<?>[] futures =
-            
Arrays.stream(awaitables).map(AsyncSupport::toCompletableFuture).toArray(CompletableFuture[]::new);
+        CompletableFuture<?>[] futures = toCompletableFutures(awaitables, 
"awaitAllSettled");
         CompletableFuture.allOf(
             Arrays.stream(futures)
                 .map(f -> f.handle((v, t) -> null))
@@ -545,6 +527,29 @@ public class AsyncSupport {
         return Awaitable.from(source).toCompletableFuture();
     }
 
+    /**
+     * Validates that no element is {@code null} and converts all elements to
+     * {@link CompletableFuture}s in one pass.  Used by combinator methods
+     * ({@code awaitAll}, {@code awaitAny}, {@code awaitAllSettled}, and their
+     * non-blocking {@code Async} variants) to eliminate duplicate
+     * null-checking and conversion loops.
+     *
+     * @param sources    the source objects to validate and convert
+     * @param callerName the method name for error messages
+     * @return an array of completable futures corresponding to the sources
+     * @throws IllegalArgumentException if any element is {@code null}
+     */
+    private static CompletableFuture<?>[] toCompletableFutures(Object[] 
sources, String callerName) {
+        CompletableFuture<?>[] futures = new 
CompletableFuture<?>[sources.length];
+        for (int i = 0; i < sources.length; i++) {
+            if (sources[i] == null) {
+                throw new IllegalArgumentException(callerName + ": element at 
index " + i + " is null");
+            }
+            futures[i] = toCompletableFuture(sources[i]);
+        }
+        return futures;
+    }
+
     // ---- non-blocking combinators (return Awaitable) --------------------
 
     /**
@@ -556,13 +561,7 @@ public class AsyncSupport {
         if (sources == null || sources.length == 0) {
             return Awaitable.of(new ArrayList<>());
         }
-        CompletableFuture<?>[] futures = new CompletableFuture[sources.length];
-        for (int i = 0; i < sources.length; i++) {
-            if (sources[i] == null) {
-                throw new IllegalArgumentException("Awaitable.all: element at 
index " + i + " is null");
-            }
-            futures[i] = toCompletableFuture(sources[i]);
-        }
+        CompletableFuture<?>[] futures = toCompletableFutures(sources, 
"Awaitable.all");
         CompletableFuture<List<Object>> combined = 
CompletableFuture.allOf(futures)
                 .thenApply(v -> {
                     List<Object> results = new ArrayList<>(futures.length);
@@ -584,12 +583,7 @@ public class AsyncSupport {
         if (sources == null || sources.length == 0) {
             throw new IllegalArgumentException("Awaitable.any requires at 
least one source");
         }
-        for (int i = 0; i < sources.length; i++) {
-            if (sources[i] == null) {
-                throw new IllegalArgumentException("Awaitable.any: element at 
index " + i + " is null");
-            }
-        }
-        CompletableFuture<?>[] futures = 
Arrays.stream(sources).map(AsyncSupport::toCompletableFuture).toArray(CompletableFuture[]::new);
+        CompletableFuture<?>[] futures = toCompletableFutures(sources, 
"Awaitable.any");
         return GroovyPromise.of((CompletableFuture<T>) 
CompletableFuture.anyOf(futures));
     }
 
@@ -602,12 +596,7 @@ public class AsyncSupport {
         if (sources == null || sources.length == 0) {
             return Awaitable.of(new ArrayList<>());
         }
-        for (int i = 0; i < sources.length; i++) {
-            if (sources[i] == null) {
-                throw new IllegalArgumentException("Awaitable.allSettled: 
element at index " + i + " is null");
-            }
-        }
-        CompletableFuture<?>[] futures = 
Arrays.stream(sources).map(AsyncSupport::toCompletableFuture).toArray(CompletableFuture[]::new);
+        CompletableFuture<?>[] futures = toCompletableFutures(sources, 
"Awaitable.allSettled");
         // Wait for all to settle (handle converts failures to non-exceptional 
completions)
         CompletableFuture<List<AwaitResult<Object>>> combined = 
CompletableFuture.allOf(
                 Arrays.stream(futures)
diff --git 
a/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java 
b/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java
index b027f91cfc..6c07699678 100644
--- a/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java
+++ b/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java
@@ -260,25 +260,7 @@ public final class AsyncTransformHelper {
      * @return {@code true} if at least one {@code yieldReturn} call is found
      */
     public static boolean containsYieldReturn(Statement code) {
-        boolean[] found = {false};
-        code.visit(new CodeVisitorSupport() {
-            @Override
-            public void 
visitStaticMethodCallExpression(StaticMethodCallExpression call) {
-                if (YIELD_RETURN_METHOD.equals(call.getMethod())
-                        && 
ASYNC_SUPPORT_CLASS.equals(call.getOwnerType().getName())) {
-                    found[0] = true;
-                }
-                if (!found[0]) {
-                    super.visitStaticMethodCallExpression(call);
-                }
-            }
-
-            @Override
-            public void visitClosureExpression(ClosureExpression expression) {
-                // Do not descend into nested closures — each manages its own 
generator
-            }
-        });
-        return found[0];
+        return containsStaticCall(code, YIELD_RETURN_METHOD);
     }
 
     // ---- rewriting utilities --------------------------------------------
@@ -353,11 +335,25 @@ public final class AsyncTransformHelper {
      * @return {@code true} if at least one {@code defer} call is found
      */
     public static boolean containsDefer(Statement code) {
+        return containsStaticCall(code, DEFER_METHOD);
+    }
+
+    /**
+     * Scans the given statement tree for a static method call to
+     * {@code AsyncSupport.<methodName>()}.  Does <em>not</em> descend
+     * into nested {@link ClosureExpression}s, since each nested closure
+     * manages its own transformation independently.
+     *
+     * @param code       the statement tree to scan
+     * @param methodName the method name to look for on {@code AsyncSupport}
+     * @return {@code true} if at least one matching call is found
+     */
+    private static boolean containsStaticCall(Statement code, String 
methodName) {
         boolean[] found = {false};
         code.visit(new CodeVisitorSupport() {
             @Override
             public void 
visitStaticMethodCallExpression(StaticMethodCallExpression call) {
-                if (DEFER_METHOD.equals(call.getMethod())
+                if (methodName.equals(call.getMethod())
                         && 
ASYNC_SUPPORT_CLASS.equals(call.getOwnerType().getName())) {
                     found[0] = true;
                 }
diff --git a/src/spec/doc/core-async-await.adoc 
b/src/spec/doc/core-async-await.adoc
index ef3a3fb9f9..3433c9d5eb 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -40,7 +40,7 @@ Key capabilities include:
 * **`yield return`** — produce asynchronous streams (generators)
 * **`defer`** — schedule Go-style cleanup actions that run when the method 
completes
 * **Framework integration** — built-in adapters for `CompletableFuture` and 
`Future`;
-`Flow.Publisher` support via the auto-discovered `FlowPublisherAdapter` (an 
internal runtime adapter);
+`Flow.Publisher` support via an auto-discovered runtime adapter;
 extensible to RxJava, Reactor, and Spring via the adapter registry
 
 On JDK 21+, async methods automatically leverage
@@ -115,7 +115,7 @@ 
include::../test/AsyncAwaitSpecTest.groovy[tags=motivation_stream_processing,ind
 ----
 
 The producer yields values on demand, and the consumer pulls them with
-`for await`. A `SynchronousQueue` sits between the two, providing
+`for await`. Internally, the runtime uses a handoff queue between the two, 
providing
 natural **back-pressure**: the producer blocks on each `yield return` until the
 consumer is ready, preventing unbounded memory growth — all without requiring
 the developer to manage queues, signals, or synchronization.
@@ -124,7 +124,7 @@ the developer to manage queues, signals, or synchronization.
 
 Groovy does not force developers to abandon their existing async investments.
 The `await` keyword natively understands `CompletableFuture`, 
`CompletionStage`,
-`Future`, and `Flow.Publisher` (via the auto-discovered `FlowPublisherAdapter` 
runtime adapter):
+`Future`, and `Flow.Publisher` (via an auto-discovered runtime adapter):
 
 [source,groovy]
 ----
@@ -396,7 +396,7 @@ This is analogous to C#'s `yield return` within 
`IAsyncEnumerable<T>` and JavaSc
 generators.
 
 The producer (generator) and consumer (`for await` loop) coordinate via a
-`SynchronousQueue`, providing natural back-pressure: the producer blocks on 
each `yield return`
+handoff queue, providing natural back-pressure: the producer blocks on each 
`yield return`
 until the consumer is ready for the next value. This design means:
 
 * The generator body runs on its own thread, executing sequentially like any 
normal method
@@ -616,7 +616,7 @@ the reactive streams interface standardized in JDK 9. This 
enables seamless cons
 reactive streams from any library that implements the `Flow.Publisher` 
contract (Project Reactor,
 RxJava 3 via adapters, Vert.x, etc.).
 
-The internal adapter uses a bounded buffer (capacity 256) to bridge the 
push-based `Publisher`
+The internal adapter uses a small bounded buffer to bridge the push-based 
`Publisher`
 model to Groovy's pull-based `AsyncStream` model, preventing unbounded memory 
growth with
 fast publishers while maintaining throughput.  Back-pressure is enforced by 
requesting exactly
 one item at a time; demand for the next element is signalled _before_ the 
consumer's
@@ -678,8 +678,8 @@ asynchronous types from third-party frameworks. The 
built-in adapter handles:
 * `java.util.concurrent.Future` (blocking, delegated to a configurable 
executor)
 
 Additionally, `java.util.concurrent.Flow.Publisher` support (single-value 
`await` and
-multi-value `for await`) is provided by `FlowPublisherAdapter`
-(`org.apache.groovy.runtime.async`), which is auto-discovered via 
`ServiceLoader`.
+multi-value `for await`) is provided by an internal runtime adapter
+that is auto-discovered via `ServiceLoader`.
 This decoupled design allows applications to override the default
 reactive-streams bridge with a framework-specific adapter (e.g., one backed by 
Reactor's
 native scheduler) by registering a higher-priority adapter.
@@ -899,7 +899,10 @@ and **closure/lambda expressions**. Applying `async` to 
other declarations is a
 [[implementation-details]]
 == Implementation Notes
 
-This section provides architectural context for Groovy language maintainers 
and advanced users.
+This section provides architectural context for Groovy language maintainers 
and advanced
+users who want to understand how async/await works under the hood.  The 
descriptions
+focus on _design principles and behavioural guarantees_ rather than internal 
class names,
+so they remain valid even as the runtime implementation evolves.
 
 [[implementation-strategy]]
 === Thread-Based vs State Machine
@@ -935,45 +938,41 @@ for back-pressure, ensuring stable performance even 
without virtual threads.
 
 The async/await feature is implemented across three layers:
 
-**Grammar layer** (`GroovyLexer.g4`, `GroovyParser.g4`): Defines `async`, 
`await`, `defer` as contextual
-keywords, along with parser rules for `for await`, `yield return`, and `defer` 
statements.
-
-**AST transformation layer** (`AstBuilder`, `AsyncASTTransformation`, 
`AsyncTransformHelper`):
-The `AstBuilder` converts parse trees into AST nodes, rewriting `async` method 
bodies into calls
-to `AsyncSupport` static methods. The `@Async` annotation is processed by 
`AsyncASTTransformation`,
-which delegates to the same `AsyncTransformHelper` utility class. This shared 
helper ensures
-consistent code generation for both the keyword and annotation forms.
-
-**Runtime layer** (`AsyncSupport`, `GroovyPromise`, `AbstractAsyncStream`,
-`AsyncStreamGenerator`, `FlowPublisherAdapter`, `AwaitableAdapterRegistry`): 
`AsyncSupport` is the
-central runtime class containing static methods for `await`, `async`, `defer`, 
`yield return`,
-timeout composition, stream cleanup, and combinators. `GroovyPromise` wraps
-`CompletableFuture` to implement the `Awaitable` interface, decoupling the 
public API from the
-JDK implementation. `AbstractAsyncStream` is the template base class for 
queue-based `AsyncStream`
-implementations — it centralises the `moveNext()` signal dispatch, 
`getCurrent()`/`close()`
-lifecycle, and interrupt handling, using the Template Method pattern with hook 
methods
-(`beforeTake`, `afterValueConsumed`, `afterMoveNext`, `onMoveNextInterrupted`, 
`onClose`).
-`AsyncStreamGenerator` extends the template for generator-style streams backed 
by a
-`SynchronousQueue`. `FlowPublisherAdapter.FlowAsyncStream` extends the 
template for
-`Flow.Publisher` adaptation using a bounded `LinkedBlockingQueue` with 
one-at-a-time
-demand. `AwaitableAdapterRegistry` provides the SPI extension point for 
third-party async type
-support.
+**Grammar layer**: Defines `async`, `await`, `defer` as contextual keywords, 
along with
+parser rules for `for await`, `yield return`, and `defer` statements.
+
+**AST transformation layer**: Converts parse trees into AST nodes, rewriting 
`async`
+method bodies into calls to runtime support methods.  The `@Async` annotation 
is processed
+by the same transformation pipeline, ensuring consistent code generation for 
both the
+keyword and annotation forms.
+
+**Runtime layer**: Provides the static methods invoked by compiler-generated 
code —
+`await`, `async`, `defer`, `yield return`, timeout composition, stream 
cleanup, and
+combinators.  Key design decisions include:
+
+* A dedicated promise wrapper decouples the public `Awaitable` API from 
`CompletableFuture`,
+  so the implementation can evolve independently of the user-facing contract.
+* Queue-based `AsyncStream` implementations share a common template base class 
(Template
+  Method pattern) that centralises signal dispatch, lifecycle management, and 
interrupt
+  handling.  Subclasses only override a small number of hook methods — e.g. 
one subclass
+  powers async generators (`yield return`), another bridges `Flow.Publisher` 
sources.
+* The adapter registry (`AwaitableAdapterRegistry`) provides an SPI extension 
point for
+  third-party async types, with `ServiceLoader`-based auto-discovery and 
runtime
+  registration.
 
 ==== API Decoupling
 
 The public API that Groovy developers interact with is defined entirely in the
 `groovy.concurrent` package (`Awaitable`, `AsyncStream`, `AwaitResult`,
-`AwaitableAdapter`, `AwaitableAdapterRegistry`). The implementation classes
-(`AsyncSupport`, `GroovyPromise`, `AbstractAsyncStream`, 
`AsyncStreamGenerator`,
-`FlowPublisherAdapter`) live in
-`org.apache.groovy.runtime.async` — an internal package.
+`AwaitableAdapter`, `AwaitableAdapterRegistry`).  All implementation classes 
live in an
+internal package that is not part of the public contract.
 
 This separation means:
 
 * User code depends on `Awaitable`, _not_ on `CompletableFuture`. If the JDK 
async
   infrastructure evolves (e.g., structured concurrency in future JDK 
releases), only the
   internal implementation needs to change — the public `Awaitable` contract 
remains stable.
-* `GroovyPromise` is the sole bridge between `Awaitable` and 
`CompletableFuture`.
+* The promise wrapper is the sole bridge between `Awaitable` and 
`CompletableFuture`.
   Replacing it (for example, with a structured-concurrency-based 
implementation) would be
   transparent to all application code.
 * `toCompletableFuture()` is the one explicit escape hatch for interoperating 
with Java
@@ -982,120 +981,76 @@ This separation means:
 [[thread-safety]]
 === Thread Safety and Robustness
 
-A key design goal is that **thread safety is the framework's responsibility, 
not the user's**.
-All concurrency control is encapsulated inside the runtime, so application 
code never needs
-explicit locks, atomics, or volatile annotations.
+A key design goal is that **thread safety is the framework's responsibility, 
not the
+user's**.  All concurrency control is encapsulated inside the runtime, so 
application code
+never needs explicit locks, atomics, or volatile annotations.
 
 ==== Lock-Free Synchronization
 
-All runtime components employ lock-free or minimal-contention synchronization:
-
-* `AsyncSupport.defaultExecutor` — `volatile` field for safe publication 
without locks
-* `AwaitableAdapterRegistry.ADAPTERS` — `CopyOnWriteArrayList` for lock-free 
iteration during adapter lookup;
-writes (adapter registration) are rare, reads (every `await`) are frequent
-* `AwaitableAdapterRegistry.blockingExecutor` — `volatile` field
-* `AbstractAsyncStream.current` — `volatile` field for cross-thread visibility
-* `AbstractAsyncStream.closed` — `AtomicBoolean` for lifecycle management 
(shared by all queue-based streams)
-* `AsyncStreamGenerator` — extends `AbstractAsyncStream`; adds 
`AtomicReference<Thread>` for prompt,
-idempotent close/cancellation signalling
-* `Flow.Publisher` adaptation (`FlowPublisherAdapter.FlowAsyncStream`) — 
extends `AbstractAsyncStream`;
-  the inherited `AtomicBoolean` closed flag governs the entire lifecycle (both 
upstream terminal signals
-  and consumer-side close), with `AtomicReference<Subscription>` for 
CAS-guarded onSubscribe (§2.5
-  compliance); all signals (`onNext`/`onError`/`onComplete`) use blocking 
`put()` with a non-blocking
-  `offer()` fallback on interrupt, preventing both silent item loss and 
consumer deadlock; demand is
-  signalled before `moveNext()` returns to prevent livelock; `moveNext()` uses 
shared cached `Awaitable`
-  instances
-  (defined on the `AsyncStream` interface) to eliminate per-call allocations 
on the hot path
-* Defer scopes — per-task `ArrayDeque`, confined to a single thread (no 
sharing)
-* `DELAY_SCHEDULER` — single daemon thread for non-blocking timer operations
+All runtime components employ lock-free or minimal-contention synchronization. 
 Shared
+mutable state uses `volatile` fields for safe publication, `AtomicBoolean` for 
lifecycle
+flags, `AtomicReference` for thread and subscription tracking, and 
copy-on-write
+collections for read-heavy registries (such as the adapter list).  No 
component acquires a
+monitor lock during normal operation.
 
 ==== Async Generator Safety
 
-The `AsyncStreamGenerator` extends `AbstractAsyncStream` — a template base 
class that
-centralises the `moveNext()` signal dispatch, lifecycle management, and 
interrupt handling
-common to all queue-based `AsyncStream` implementations.  The generator adds 
the producer-side
-API (`yield`, `complete`, `error`) and overrides four template hooks 
(`beforeTake`,
-`afterMoveNext`, `onMoveNextInterrupted`, `onClose`).  Several concurrency 
hazards are
-handled transparently:
+The async generator (powering `yield return` / `for await`) uses a handoff 
queue between
+the producer and consumer threads, with several concurrency safeguards handled
+transparently by the runtime:
 
 [cols="2,3"]
 |===
-| Concern | Mechanism
+| Concern | Guarantee
 
 | **Back-pressure**
-| A `SynchronousQueue` between producer and consumer ensures the producer 
blocks on each
-`yield return` until the consumer calls `moveNext()`. No unbounded buffering 
can occur.
+| The producer blocks on each `yield return` until the consumer calls 
`moveNext()`.
+No unbounded buffering can occur.
 
 | **Cooperative cancellation**
-| The producer thread is tracked via `AtomicReference<Thread>`. When the 
consumer calls
-`close()` (explicitly or via compiler-generated `finally`), the producer 
thread is interrupted,
-allowing it to clean up promptly even if blocked in I/O or a long computation.
-
-| **TOCTOU race prevention**
-| `moveNext()` uses a double-check pattern: the `closed` flag is re-checked 
_after_
-registering the consumer thread. This closes a race window where `close()` 
could execute
-between the initial check and the `consumerThread.set()` call, which would 
leave the
-consumer stranded in `queue.take()` with no one to interrupt it.
-
-| **Thread-safe consumer tracking**
-| The consumer thread is tracked via `AtomicReference<Thread>` during 
`moveNext()`. This
-enables `close()` to interrupt a blocked consumer.  Note: concurrent 
`moveNext()` calls
-from multiple threads are not supported and may produce unpredictable results 
— async
-generators are inherently single-consumer (just like C#'s `IAsyncEnumerator`).
+| When the consumer calls `close()` (explicitly or via the compiler-generated 
`finally`
+block), the producer thread is interrupted, allowing it to clean up promptly 
even if
+blocked in I/O or a long computation.
+
+| **Race-free lifecycle**
+| The `closed` flag is checked _after_ registering the consumer thread in 
`moveNext()`,
+closing a race window where `close()` could execute between the initial check 
and the
+thread registration, which would leave the consumer blocked with no one to 
interrupt it.
 
 | **Idempotent close**
-| `close()` is guarded by `AtomicBoolean.compareAndSet()`, making it safe to 
call multiple
-times from any thread without side effects.
+| `close()` can be called multiple times from any thread without side effects.
 
 | **Signal delivery under interrupt**
-| If the producer's `complete()` or `error()` signal is interrupted and the 
non-blocking
-fallback delivery fails (no consumer waiting), the generator force-closes 
itself. This
-prevents the consumer from blocking indefinitely on a subsequent `moveNext()` 
— a defensive
-measure against unexpected thread interruption outside the normal close path.
+| If a terminal signal (`complete` or `error`) cannot be delivered because the 
thread is
+interrupted and no consumer is waiting, the generator force-closes itself to 
prevent the
+consumer from blocking indefinitely on a subsequent `moveNext()`.
 |===
 
 ==== Flow.Publisher Adaptation Safety
 
-The `FlowPublisherAdapter` bridges the push-based `Flow.Publisher` protocol 
into the pull-based
-`AsyncStream` model via a named inner class `FlowAsyncStream` that also extends
-`AbstractAsyncStream`.  It overrides three template hooks (`beforeTake`, 
`afterValueConsumed`,
-`onClose`) and inherits the default `onMoveNextInterrupted` behaviour.  A 
single `AtomicBoolean`
-closed flag (inherited from the template) governs the entire lifecycle — set 
by the first terminal
-signal, by the consumer's `close()`, or by an interrupt.  Several concurrency 
hazards are handled
-transparently:
+The `Flow.Publisher` bridge converts the push-based `Publisher` protocol into 
the
+pull-based `AsyncStream` model, with the following guarantees:
 
 [cols="2,3"]
 |===
-| Concern | Mechanism
+| Concern | Guarantee
 
 | **Back-pressure**
-| A bounded `LinkedBlockingQueue` (capacity 2) bridges push and pull.  Demand 
is capped at
-one item per `moveNext()` call via `request(1)`.  The small capacity reflects 
the one-at-a-time
-demand model: at most one value plus a racing terminal signal.  Demand is 
signalled _before_
-`moveNext()` returns, preventing livelock when producer and consumer share a 
thread pool.
+| A small bounded buffer bridges push and pull.  Demand is capped at one item 
per
+`moveNext()` call.  Demand is signalled _before_ `moveNext()` returns, 
preventing
+livelock when producer and consumer share a thread pool.
 
 | **Interrupt-safe signal delivery**
-| All subscriber callbacks (`onNext`, `onError`, `onComplete`) use blocking 
`put()` for normal
-delivery, with a non-blocking `offer()` fallback when the publisher thread is 
interrupted.
-If `offer()` also fails (queue full from a misbehaving publisher), `onNext` 
cancels the
-upstream subscription and injects an error signal; terminal signals 
(`onError`/`onComplete`)
-share a common `putTerminalSignal()` helper that atomically CAS-closes the 
stream, cancels
-the subscription, and delivers the signal.
+| Subscriber callbacks use blocking delivery with a non-blocking fallback when 
the
+publisher thread is interrupted.  If the fallback also fails, the upstream 
subscription
+is cancelled and an error signal is injected to unblock the consumer.
 
 | **Allocation-free hot path**
-| `moveNext()` returns shared cached `Awaitable<Boolean>` constants defined on 
the `AsyncStream`
-interface for the value and end-of-stream cases, eliminating per-call 
`CompletableFuture` +
-`GroovyPromise` allocation.  Error signals are thrown directly (matching 
`AsyncStreamGenerator`
-behaviour) rather than wrapped in a failed `CompletableFuture`.
-
-| **Non-blocking close**
-| `close()` uses non-blocking `offer()` (instead of blocking `put()`) to 
inject the completion
-sentinel after clearing the queue — this cannot throw `InterruptedException` 
and effectively
-always succeeds because the queue was just drained.
+| `moveNext()` returns shared cached `Awaitable<Boolean>` constants for the 
value and
+end-of-stream cases, eliminating per-call object allocation on the hot path.
 
 | **Idempotent close**
-| `close()` is guarded by `AtomicBoolean.compareAndSet()`, making it safe to 
call multiple
-times from any thread without side effects.
+| `close()` can be called multiple times from any thread without side effects.
 |===
 
 These mechanisms ensure that `yield return` / `for await` code remains as 
simple as writing
diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
index fc76e4c017..972583e020 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
@@ -3213,4 +3213,86 @@ class AsyncApiTest {
         assert gen.getCurrent() == 'after-null'
         assert gen.moveNext().get() == false
     }
+
+    // ---- Tests for toCompletableFutures null-validation helper ----
+
+    void testAwaitAllRejectsNullElement() {
+        def ex = shouldFail(IllegalArgumentException) {
+            AsyncSupport.awaitAll(Awaitable.of(1), null, Awaitable.of(3))
+        }
+        assert ex.message.contains('awaitAll')
+        assert ex.message.contains('index 1')
+    }
+
+    void testAwaitAnyRejectsNullElement() {
+        def ex = shouldFail(IllegalArgumentException) {
+            AsyncSupport.awaitAny(Awaitable.of(1), null)
+        }
+        assert ex.message.contains('awaitAny')
+        assert ex.message.contains('index 1')
+    }
+
+    void testAwaitAllSettledRejectsNullElement() {
+        def ex = shouldFail(IllegalArgumentException) {
+            AsyncSupport.awaitAllSettled(null, Awaitable.of(2))
+        }
+        assert ex.message.contains('awaitAllSettled')
+        assert ex.message.contains('index 0')
+    }
+
+    void testAllAsyncRejectsNullElement() {
+        def ex = shouldFail(IllegalArgumentException) {
+            AsyncSupport.allAsync(Awaitable.of('a'), null)
+        }
+        assert ex.message.contains('Awaitable.all')
+        assert ex.message.contains('index 1')
+    }
+
+    void testAnyAsyncRejectsNullElement() {
+        def ex = shouldFail(IllegalArgumentException) {
+            AsyncSupport.anyAsync(null, Awaitable.of('b'))
+        }
+        assert ex.message.contains('Awaitable.any')
+        assert ex.message.contains('index 0')
+    }
+
+    void testAllSettledAsyncRejectsNullElement() {
+        def ex = shouldFail(IllegalArgumentException) {
+            AsyncSupport.allSettledAsync(Awaitable.of(1), Awaitable.of(2), 
null)
+        }
+        assert ex.message.contains('Awaitable.allSettled')
+        assert ex.message.contains('index 2')
+    }
+
+    void testAwaitAllAcceptsMixedTypes() {
+        // Verify toCompletableFutures correctly converts mixed types
+        def cf = CompletableFuture.completedFuture(10)
+        def awaitable = Awaitable.of(20)
+        def results = AsyncSupport.awaitAll(cf, awaitable)
+        assert results == [10, 20]
+    }
+
+    void testAwaitAllSettledAcceptsMixedTypes() {
+        def cf = CompletableFuture.completedFuture('ok')
+        def failedCf = new CompletableFuture()
+        failedCf.completeExceptionally(new RuntimeException('boom'))
+        def results = AsyncSupport.awaitAllSettled(cf, failedCf)
+        assert results.size() == 2
+        assert results[0].isSuccess()
+        assert results[0].value == 'ok'
+        assert results[1].isFailure()
+        assert results[1].error.message == 'boom'
+    }
+
+    // ---- Tests for AwaitableAdapterRegistry.completeFrom simplified 
exception handling ----
+
+    void testAdapterRegistryHandlesCancelledFuture() {
+        def future = new CompletableFuture()
+        future.cancel(true)
+        def awaitable = Awaitable.from(future)
+        def ex = shouldFail(CancellationException) {
+            awaitable.get()
+        }
+        assert ex instanceof CancellationException
+    }
 }

Reply via email to