This is an automated email from the ASF dual-hosted git repository.
sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git
The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
new 9f99f4b65e Minor tweaks
9f99f4b65e is described below
commit 9f99f4b65e0c19a2d6419fbc97c0ba452098e5d1
Author: Daniel Sun <[email protected]>
AuthorDate: Sat Mar 21 04:31:42 2026 +0900
Minor tweaks
---
src/main/java/groovy/concurrent/AsyncScope.java | 256 +++++++++++++++++++++
.../concurrent/AwaitableAdapterRegistry.java | 103 ++++++++-
.../apache/groovy/runtime/async/GroovyPromise.java | 40 +++-
src/spec/doc/core-async-await.adoc | 130 +++++++++++
src/spec/test/AsyncAwaitSpecTest.groovy | 127 ++++++++++
.../codehaus/groovy/transform/AsyncApiTest.groovy | 251 ++++++++++++++++++++
6 files changed, 895 insertions(+), 12 deletions(-)
diff --git a/src/main/java/groovy/concurrent/AsyncScope.java
b/src/main/java/groovy/concurrent/AsyncScope.java
new file mode 100644
index 0000000000..a4704d0f52
--- /dev/null
+++ b/src/main/java/groovy/concurrent/AsyncScope.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package groovy.concurrent;
+
+import groovy.lang.Closure;
+import org.apache.groovy.runtime.async.AsyncSupport;
+import org.apache.groovy.runtime.async.GroovyPromise;
+
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Structured concurrency scope that ensures all child tasks complete
+ * (or are cancelled) before the scope exits.
+ *
+ * <h2>Design philosophy</h2>
+ * <p>Inspired by Swift's {@code TaskGroup}, Kotlin's {@code coroutineScope},
+ * and JEP 453 (Structured Concurrency), {@code AsyncScope} provides a
+ * bounded lifetime for async tasks. Unlike fire-and-forget
+ * {@link AsyncSupport#executeAsync}, tasks launched within a scope are
+ * guaranteed to complete before the scope closes. This prevents:
+ * <ul>
+ * <li>Orphaned tasks that outlive their logical parent</li>
+ * <li>Resource leaks from uncollected async work</li>
+ * <li>Silent failures from unobserved exceptions</li>
+ * </ul>
+ *
+ * <h2>Failure policy</h2>
+ * <p>By default, the scope uses a <b>fail-fast</b> policy: when any child
+ * task completes exceptionally, all sibling tasks are cancelled immediately.
+ * The first failure becomes the primary exception; subsequent failures are
+ * added as {@linkplain Throwable#addSuppressed(Throwable) suppressed}
+ * exceptions. This matches Kotlin's
+ * {@code coroutineScope} / {@code supervisorScope} semantics.</p>
+ *
+ * <h2>Usage in Groovy</h2>
+ * <pre>
+ * import groovy.concurrent.AsyncScope
+ * import groovy.concurrent.Awaitable
+ *
+ * def results = AsyncScope.withScope { scope ->
+ * def userTask = scope.async { fetchUser(id) }
+ * def orderTask = scope.async { fetchOrders(id) }
+ * return [user: await(userTask), orders: await(orderTask)]
+ * }
+ * // Both tasks guaranteed complete here
+ * </pre>
+ *
+ * <h2>Thread safety</h2>
+ * <p>All public methods are thread-safe. The child task list uses
+ * {@link CopyOnWriteArrayList} for safe concurrent iteration during
+ * cancellation. The {@link #closed} flag uses {@link AtomicBoolean}
+ * with CAS for exactly-once close semantics.</p>
+ *
+ * @see Awaitable
+ * @see AsyncSupport
+ * @since 6.0.0
+ */
+public class AsyncScope implements AutoCloseable {
+
+ private final List<CompletableFuture<?>> children = new
CopyOnWriteArrayList<>();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final Executor executor;
+ private final boolean failFast;
+
+ /**
+ * Creates a new scope with the given executor and fail-fast policy.
+ *
+ * @param executor the executor for child tasks; must not be {@code null}
+ * @param failFast if {@code true}, cancel all siblings when any child
fails
+ */
+ public AsyncScope(Executor executor, boolean failFast) {
+ this.executor = executor;
+ this.failFast = failFast;
+ }
+
+ /**
+ * Creates a new scope with the given executor and fail-fast enabled.
+ *
+ * @param executor the executor for child tasks; must not be {@code null}
+ */
+ public AsyncScope(Executor executor) {
+ this(executor, true);
+ }
+
+ /**
+ * Creates a new scope with the default async executor and fail-fast
enabled.
+ */
+ public AsyncScope() {
+ this(AsyncSupport.getExecutor(), true);
+ }
+
+ /**
+ * Launches a child task within this scope. The task's lifetime is
+ * bound to the scope: when the scope is closed, all incomplete child
+ * tasks are cancelled.
+ *
+ * @param body the async body to execute
+ * @param <T> the result type
+ * @return an {@link Awaitable} representing the child task
+ * @throws IllegalStateException if the scope has already been closed
+ */
+ @SuppressWarnings("unchecked")
+ public <T> Awaitable<T> async(Closure<T> body) {
+ if (closed.get()) {
+ throw new IllegalStateException("AsyncScope is closed — cannot
launch new tasks");
+ }
+ CompletableFuture<T> cf = CompletableFuture.supplyAsync(() -> {
+ try {
+ return body.call();
+ } catch (CompletionException ce) {
+ throw ce;
+ } catch (Throwable t) {
+ throw new CompletionException(t);
+ }
+ }, executor);
+ children.add(cf);
+ if (failFast) {
+ cf.whenComplete((v, err) -> {
+ if (err != null && !closed.get()) {
+ cancelAll();
+ }
+ });
+ }
+ return GroovyPromise.of(cf);
+ }
+
+ /**
+ * Returns the number of child tasks launched within this scope.
+ *
+ * @return the child task count
+ */
+ public int getChildCount() {
+ return children.size();
+ }
+
+ /**
+ * Cancels all child tasks. Idempotent — safe to call multiple times.
+ * <p>
+ * Cancels each child via {@link CompletableFuture#cancel(boolean)}.
+ * Does <em>not</em> close the scope — the scope remains open so that
+ * {@link #close()} can still join all children and collect errors.
+ */
+ public void cancelAll() {
+ for (CompletableFuture<?> child : children) {
+ child.cancel(true);
+ }
+ }
+
+ /**
+ * Waits for all child tasks to complete, then closes the scope.
+ * <p>
+ * If any child failed, the first failure is rethrown with subsequent
+ * failures as {@linkplain Throwable#addSuppressed(Throwable) suppressed}
+ * exceptions. Cancelled tasks are silently ignored.
+ * <p>
+ * This method is idempotent: only the first invocation waits for
+ * children; subsequent calls are no-ops.
+ */
+ @Override
+ public void close() {
+ if (!closed.compareAndSet(false, true)) return;
+ Throwable firstError = null;
+ for (CompletableFuture<?> child : children) {
+ try {
+ child.join();
+ } catch (CancellationException ignored) {
+ // Cancelled tasks are silently ignored
+ } catch (CompletionException e) {
+ Throwable cause = AsyncSupport.deepUnwrap(e);
+ if (cause instanceof CancellationException) {
+ continue;
+ }
+ if (firstError == null) {
+ firstError = cause;
+ } else {
+ firstError.addSuppressed(cause);
+ }
+ } catch (Exception e) {
+ if (firstError == null) {
+ firstError = e;
+ } else {
+ firstError.addSuppressed(e);
+ }
+ }
+ }
+ if (firstError != null) {
+ if (firstError instanceof RuntimeException re) throw re;
+ if (firstError instanceof Error err) throw err;
+ throw new RuntimeException(firstError);
+ }
+ }
+
+ /**
+ * Convenience method that creates a scope, executes the given closure
+ * within it, and ensures the scope is closed on exit.
+ * <p>
+ * The closure receives the {@code AsyncScope} as its argument and can
+ * launch child tasks via {@link #async(Closure)}. The scope is
+ * automatically closed (and all children awaited) when the closure
+ * returns or throws.
+ *
+ * <pre>
+ * def result = AsyncScope.withScope { scope ->
+ * def a = scope.async { computeA() }
+ * def b = scope.async { computeB() }
+ * return [await(a), await(b)]
+ * }
+ * </pre>
+ *
+ * @param body the closure to execute within the scope
+ * @param <T> the result type
+ * @return the closure's return value
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T withScope(Closure<T> body) {
+ return withScope(AsyncSupport.getExecutor(), body);
+ }
+
+ /**
+ * Convenience method that creates a scope with the given executor,
+ * executes the closure, and ensures the scope is closed on exit.
+ *
+ * @param executor the executor for child tasks
+ * @param body the closure to execute within the scope
+ * @param <T> the result type
+ * @return the closure's return value
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T withScope(Executor executor, Closure<T> body) {
+ try (AsyncScope scope = new AsyncScope(executor)) {
+ return body.call(scope);
+ }
+ }
+}
diff --git a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
index 9ecea35288..32be663342 100644
--- a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
+++ b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
@@ -57,6 +57,24 @@ public class AwaitableAdapterRegistry {
private static final List<AwaitableAdapter> ADAPTERS = new
CopyOnWriteArrayList<>();
+ /**
+ * Per-class adapter cache for {@link #toAwaitable(Object)}.
+ * Uses {@link ClassValue} for lock-free, GC-friendly per-class memoization
+ * that does not prevent class unloading — superior to
+ * {@code ConcurrentHashMap<Class<?>, V>} which holds strong references
+ * that can cause ClassLoader leaks in container environments.
+ * <p>
+ * Rebuilt via volatile reference swap when adapters are registered or
+ * removed (an extremely low-frequency operation).
+ */
+ private static volatile ClassValue<AwaitableAdapter> awaitableCache =
buildAwaitableCache();
+
+ /**
+ * Per-class adapter cache for {@link #toAsyncStream(Object)}.
+ * Same caching strategy as {@link #awaitableCache}.
+ */
+ private static volatile ClassValue<AwaitableAdapter> streamCache =
buildStreamCache();
+
/**
* Optional executor supplier for blocking Future adaptation, to avoid
* starving the common pool. Defaults to null; when set, the provided
@@ -77,22 +95,35 @@ public class AwaitableAdapterRegistry {
/**
* Registers an adapter with higher priority than existing ones.
+ * <p>
+ * Invalidates the per-class adapter caches so that subsequent lookups
+ * re-evaluate adapter priority order.
*
+ * @param adapter the adapter to register; must not be {@code null}
* @return an {@link AutoCloseable} that, when closed, removes this adapter
* from the registry. Useful for test isolation.
*/
public static AutoCloseable register(AwaitableAdapter adapter) {
ADAPTERS.add(0, adapter);
- return () -> ADAPTERS.remove(adapter);
+ invalidateCaches();
+ return () -> { ADAPTERS.remove(adapter); invalidateCaches(); };
}
/**
* Removes the given adapter from the registry.
+ * <p>
+ * Invalidates the per-class adapter caches so that subsequent lookups
+ * no longer consider the removed adapter.
*
+ * @param adapter the adapter to remove
* @return {@code true} if the adapter was found and removed
*/
public static boolean unregister(AwaitableAdapter adapter) {
- return ADAPTERS.remove(adapter);
+ boolean removed = ADAPTERS.remove(adapter);
+ if (removed) {
+ invalidateCaches();
+ }
+ return removed;
}
/**
@@ -112,11 +143,18 @@ public class AwaitableAdapterRegistry {
* Converts the given source to an {@link Awaitable}.
* If the source is already an {@code Awaitable}, it is returned as-is.
* <p>
+ * Uses a per-class {@link ClassValue} cache to avoid repeated linear
+ * scans of the adapter list on the hot path. The first lookup for a
+ * given class performs a linear scan; subsequent lookups for the same
+ * class are O(1).
+ * <p>
* <b>Tip:</b> user code should generally prefer {@link
Awaitable#from(Object)},
* which delegates to this method but is more discoverable from the
* {@code Awaitable} type itself.
*
* @param source the source object; must not be {@code null}
+ * @param <T> the result type
+ * @return an awaitable backed by the source
* @throws IllegalArgumentException if {@code source} is {@code null}
* or no adapter supports the source type
* @see Awaitable#from(Object)
@@ -128,10 +166,9 @@ public class AwaitableAdapterRegistry {
}
if (source instanceof Awaitable) return (Awaitable<T>) source;
Class<?> type = source.getClass();
- for (AwaitableAdapter adapter : ADAPTERS) {
- if (adapter.supportsAwaitable(type)) {
- return adapter.toAwaitable(source);
- }
+ AwaitableAdapter adapter = awaitableCache.get(type);
+ if (adapter != null) {
+ return adapter.toAwaitable(source);
}
throw new IllegalArgumentException(
"No AwaitableAdapter found for type: " + type.getName()
@@ -142,11 +179,16 @@ public class AwaitableAdapterRegistry {
* Converts the given source to an {@link AsyncStream}.
* If the source is already an {@code AsyncStream}, it is returned as-is.
* <p>
+ * Uses a per-class {@link ClassValue} cache to avoid repeated linear
+ * scans of the adapter list on the hot path.
+ * <p>
* <b>Tip:</b> user code should generally prefer {@link
AsyncStream#from(Object)},
* which delegates to this method but is more discoverable from the
* {@code AsyncStream} type itself.
*
* @param source the source object; must not be {@code null}
+ * @param <T> the element type
+ * @return an async stream backed by the source
* @throws IllegalArgumentException if {@code source} is {@code null}
* or no adapter supports the source type
* @see AsyncStream#from(Object)
@@ -158,16 +200,57 @@ public class AwaitableAdapterRegistry {
}
if (source instanceof AsyncStream) return (AsyncStream<T>) source;
Class<?> type = source.getClass();
- for (AwaitableAdapter adapter : ADAPTERS) {
- if (adapter.supportsAsyncStream(type)) {
- return adapter.toAsyncStream(source);
- }
+ AwaitableAdapter adapter = streamCache.get(type);
+ if (adapter != null) {
+ return adapter.toAsyncStream(source);
}
throw new IllegalArgumentException(
"No AsyncStream adapter found for type: " + type.getName()
+ ". Register one via
AwaitableAdapterRegistry.register() or ServiceLoader.");
}
+ // ---- Cache management ------------------------------------------------
+
+ private static ClassValue<AwaitableAdapter> buildAwaitableCache() {
+ return new ClassValue<>() {
+ @Override
+ protected AwaitableAdapter computeValue(Class<?> type) {
+ for (AwaitableAdapter adapter : ADAPTERS) {
+ if (adapter.supportsAwaitable(type)) {
+ return adapter;
+ }
+ }
+ return null;
+ }
+ };
+ }
+
+ private static ClassValue<AwaitableAdapter> buildStreamCache() {
+ return new ClassValue<>() {
+ @Override
+ protected AwaitableAdapter computeValue(Class<?> type) {
+ for (AwaitableAdapter adapter : ADAPTERS) {
+ if (adapter.supportsAsyncStream(type)) {
+ return adapter;
+ }
+ }
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Rebuilds the per-class adapter caches after adapter registration
+ * changes. Uses volatile reference swap — safe because
+ * {@link ClassValue} instances are immutable once constructed, and
+ * concurrent readers of the old cache see a consistent (if stale)
+ * snapshot until the new cache is published.
+ */
+ private static void invalidateCaches() {
+ awaitableCache = buildAwaitableCache();
+ streamCache = buildStreamCache();
+ }
+
/**
* Built-in adapter handling JDK {@link CompletableFuture}, {@link
CompletionStage},
* {@link Future}, and {@link Iterable}/{@link Iterator} (for async stream
bridging).
diff --git a/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
b/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
index 5d03000748..c2ce0d6145 100644
--- a/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
+++ b/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java
@@ -23,6 +23,7 @@ import groovy.concurrent.Awaitable;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -67,12 +68,23 @@ public class GroovyPromise<T> implements Awaitable<T> {
/**
* {@inheritDoc}
* <p>
- * Waits if necessary for the computation to complete, then retrieves its
result.
+ * Includes a synchronous completion fast-path: if the underlying
+ * {@link CompletableFuture} is already done, the result is extracted
+ * via {@link CompletableFuture#join()} which avoids the full
+ * park/unpark machinery of {@link CompletableFuture#get()}.
+ * This optimisation mirrors C#'s {@code ValueTask} synchronous
+ * completion path and eliminates unnecessary thread state transitions
+ * on the hot path where async operations complete before being awaited.
+ * <p>
* If the future was cancelled, the original {@link CancellationException}
is
* unwrapped from the JDK 23+ wrapper for cross-version consistency.
*/
@Override
public T get() throws InterruptedException, ExecutionException {
+ // Fast path: already completed — skip wait queue and thread parking
+ if (future.isDone()) {
+ return getCompleted();
+ }
try {
return future.get();
} catch (CancellationException e) {
@@ -83,11 +95,16 @@ public class GroovyPromise<T> implements Awaitable<T> {
/**
* {@inheritDoc}
* <p>
- * Waits at most the given time for the computation to complete.
+ * Includes a synchronous completion fast-path for already-done futures,
+ * consistent with the zero-argument {@link #get()} overload.
* Unwraps JDK 23+ {@link CancellationException} wrappers for consistency.
*/
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
+ // Fast path: already completed — skip wait queue and thread parking
+ if (future.isDone()) {
+ return getCompleted();
+ }
try {
return future.get(timeout, unit);
} catch (CancellationException e) {
@@ -95,6 +112,25 @@ public class GroovyPromise<T> implements Awaitable<T> {
}
}
+ /**
+ * Extracts the result from an already-completed future using
+ * {@link CompletableFuture#join()}, which is cheaper than
+ * {@link CompletableFuture#get()} for completed futures because it
+ * bypasses the interruptible wait path.
+ * <p>
+ * Translates {@link CompletionException} to {@link ExecutionException}
+ * to preserve the {@code get()} contract.
+ */
+ private T getCompleted() throws ExecutionException {
+ try {
+ return future.join();
+ } catch (CompletionException e) {
+ throw new ExecutionException(AsyncSupport.deepUnwrap(e));
+ } catch (CancellationException e) {
+ throw unwrapCancellation(e);
+ }
+ }
+
/** {@inheritDoc} */
@Override
public boolean isDone() {
diff --git a/src/spec/doc/core-async-await.adoc
b/src/spec/doc/core-async-await.adoc
index 3433c9d5eb..2d5d1f0771 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -1201,6 +1201,133 @@ JavaScript, C#, Kotlin, and Swift, for developers
familiar with those languages.
| Compiler-generated async state machine
|===
+[[structured-concurrency]]
+== Structured Concurrency with `AsyncScope`
+
+=== The Problem: Uncontrolled Task Lifetimes
+
+When you launch async tasks with `async { ... }`, each task runs independently
— it can
+outlive the method that created it, leak resources if it is never awaited, or
silently
+swallow exceptions if nobody observes the result. This "fire-and-forget"
model is
+convenient for simple cases but becomes a liability in production systems
where every task
+must be accounted for.
+
+=== The Solution: Scoped Task Ownership
+
+`AsyncScope` binds the lifetime of child tasks to a well-defined scope.
Inspired by
+Kotlin's `coroutineScope`, Swift's `TaskGroup`, and
+{jdk}/java.base/java/util/concurrent/StructuredTaskScope.html[JEP 453
(Structured Concurrency)],
+it enforces a simple invariant: **when the scope exits, all child tasks have
completed or
+been cancelled**.
+
+`AsyncScope` integrates naturally with `async` methods and `await`
expressions. You
+can call `async` methods inside `scope.async { ... }` blocks and `await` their
results —
+just as you would in any other async context. The `withScope` convenience
method creates
+a scope, executes a closure within it, and automatically closes the scope when
the closure
+returns:
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=async_scope_basic,indent=0]
+----
+
+In the example above, `fetchUser` and `fetchOrders` are ordinary `async`
methods.
+`scope.async` launches each call as a child task whose lifetime is bound to
the scope.
+The `await` expressions inside child tasks suspend until their respective
async operations
+complete, and `await(userTask)` / `await(ordersTask)` suspend the parent until
the scoped
+tasks deliver their results. When `withScope` returns, both tasks are
guaranteed to have
+completed — no orphan tasks can leak.
+
+=== Fail-Fast Semantics
+
+By default, `AsyncScope` uses a _fail-fast_ policy: when any child task throws
an
+exception, all sibling tasks are immediately cancelled. The first failure
becomes the
+primary exception; subsequent failures are attached as
+{jdk}/java.base/java/lang/Throwable.html#addSuppressed(java.lang.Throwable)[suppressed
exceptions].
+
+This prevents healthy tasks from continuing to consume resources after a
sibling has
+already doomed the overall operation. Combined with `async` / `await`,
failures in any
+child propagate cleanly through the scope:
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=async_scope_fail_fast,indent=0]
+----
+
+When `failingTask` throws, the scope cancels `slowTask` immediately — there is
no need
+to wait 5 seconds for it to finish or to add manual cancellation logic.
+
+=== Fan-Out / Fan-In
+
+A common concurrency pattern is _fan-out / fan-in_: launch a dynamic number of
tasks
+in parallel and collect all results. `AsyncScope` makes this safe — every
task is bound
+to the scope, so even if one fails, all siblings are cancelled before the
scope exits:
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=async_scope_fanout,indent=0]
+----
+
+The `collect` call inside `withScope` creates one scoped child task per URL.
All pages
+are fetched concurrently, and if any single fetch fails, the remaining fetches
are
+cancelled automatically.
+
+=== Manual Scope Management
+
+For advanced use cases — such as scopes that span multiple method calls or
integration
+with resource managers — you can create and manage scopes directly.
`AsyncScope`
+implements `AutoCloseable`, so it works with Groovy's resource-management
idioms:
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=async_scope_manual,indent=0]
+----
+
+[IMPORTANT]
+====
+Always close a scope. An unclosed scope leaves child tasks in an
indeterminate state
+and may leak threads.
+====
+
+[[performance-notes]]
+== Performance Characteristics
+
+=== Adapter Registry: Amortized O(1) Lookups
+
+Every `await` expression resolves the awaited object's type to a matching
+`AwaitableAdapter` via the `AwaitableAdapterRegistry`. The registry uses
+{jdk}/java.base/java/lang/ClassValue.html[`ClassValue`]-based per-class
memoization,
+reducing adapter resolution from O(n) (linear scan of registered adapters) to
amortized
+O(1) after the first lookup for each class. The cache is automatically
invalidated when
+adapters are registered or unregistered at runtime.
+
+=== Synchronous Completion Fast-Path
+
+When an `Awaitable` wraps a `CompletableFuture` that has already completed by
the time
+`get()` is called, the runtime extracts the result via
`CompletableFuture.join()` instead
+of the full interruptible `get()` path. This bypasses the park/unpark
machinery and
+eliminates unnecessary thread state transitions — a measurable win when async
operations
+complete before being awaited, which is common for cached or pre-computed
results.
+
+This optimisation is analogous to C#'s
+https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.valuetask-1[`ValueTask<T>`]
+synchronous completion path, where already-completed tasks skip the state
machine
+continuation entirely.
+
+=== Virtual Threads (JDK 21+)
+
+On JDK 21+, Groovy automatically uses
+{jdk}/java.base/java/lang/Thread.html#ofVirtual()[virtual threads] for async
execution.
+Virtual threads are extremely lightweight — a few hundred bytes of stack
compared to
+~1 MB for platform threads — and are scheduled by the JVM's M:N scheduler onto
a small
+pool of carrier threads. The `await` keyword blocks the virtual thread (not
the carrier),
+so the JVM can multiplex hundreds of thousands of concurrent async operations
onto a
+handful of OS threads.
+
+On JDK 17–20, the runtime falls back to a bounded cached thread pool (default
+256 threads, configurable via the `groovy.async.parallelism` system property)
with a
+caller-runs back-pressure policy.
+
[[summary]]
== Summary
@@ -1258,4 +1385,7 @@ JavaScript, C#, Kotlin, and Swift, for developers
familiar with those languages.
| AwaitResult
| `result.isSuccess()` / `result.isFailure()` / `result.getOrElse { fallback }`
+
+| Structured concurrency
+| `AsyncScope.withScope { scope -> scope.async { await task() } }`
|===
diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy
b/src/spec/test/AsyncAwaitSpecTest.groovy
index b60988cb4a..565fc35e91 100644
--- a/src/spec/test/AsyncAwaitSpecTest.groovy
+++ b/src/spec/test/AsyncAwaitSpecTest.groovy
@@ -1402,4 +1402,131 @@ assert await(mixedSources()) == "cf+awaitable"
// end::motivation_interop[]
'''
}
+
+ //
=========================================================================
+ // Structured concurrency — AsyncScope
+ //
=========================================================================
+
+ @Test
+ void testAsyncScopeBasic() {
+ assertScript '''
+// tag::async_scope_basic[]
+import groovy.concurrent.AsyncScope
+import groovy.concurrent.Awaitable
+
+class UserService {
+ async fetchUser(int id) {
+ await(Awaitable.delay(10))
+ return [id: id, name: "User-${id}"]
+ }
+
+ async fetchOrders(int userId) {
+ await(Awaitable.delay(10))
+ return [[item: "Book", userId: userId], [item: "Pen", userId: userId]]
+ }
+
+ async loadDashboard(int userId) {
+ AsyncScope.withScope { scope ->
+ def userTask = scope.async { await fetchUser(userId) }
+ def ordersTask = scope.async { await fetchOrders(userId) }
+ [user: await(userTask), orders: await(ordersTask)]
+ }
+ }
+}
+
+def svc = new UserService()
+def dashboard = await(svc.loadDashboard(42))
+assert dashboard.user.name == "User-42"
+assert dashboard.orders.size() == 2
+// Both tasks are guaranteed complete when loadDashboard returns
+// end::async_scope_basic[]
+ '''
+ }
+
+ @Test
+ void testAsyncScopeFailFast() {
+ assertScript '''
+// tag::async_scope_fail_fast[]
+import groovy.concurrent.AsyncScope
+import groovy.concurrent.Awaitable
+
+async slowTask() {
+ await(Awaitable.delay(5000))
+ return "done"
+}
+
+async failingTask() {
+ await(Awaitable.delay(10))
+ throw new IllegalStateException("service unavailable")
+}
+
+def error = null
+try {
+ AsyncScope.withScope { scope ->
+ scope.async { await slowTask() }
+ scope.async { await failingTask() }
+ }
+} catch (IllegalStateException e) {
+ error = e
+}
+
+assert error != null
+assert error.message == "service unavailable"
+// The slow task was automatically cancelled when failingTask threw
+// end::async_scope_fail_fast[]
+ '''
+ }
+
+ @Test
+ void testAsyncScopeFanOut() {
+ assertScript '''
+// tag::async_scope_fanout[]
+import groovy.concurrent.AsyncScope
+import groovy.concurrent.Awaitable
+
+async fetchPage(String url) {
+ await(Awaitable.delay(10))
+ return "Content of ${url}"
+}
+
+async crawlAll(List<String> urls) {
+ AsyncScope.withScope { scope ->
+ def tasks = urls.collect { url ->
+ scope.async { await fetchPage(url) }
+ }
+ tasks.collect { task -> await(task) }
+ }
+}
+
+def urls = ["https://example.com/1", "https://example.com/2",
"https://example.com/3"]
+def pages = await(crawlAll(urls))
+assert pages.size() == 3
+assert pages.every { it.startsWith("Content of") }
+// All fetches ran concurrently; all guaranteed complete when crawlAll returns
+// end::async_scope_fanout[]
+ '''
+ }
+
+ @Test
+ void testAsyncScopeManual() {
+ assertScript '''
+// tag::async_scope_manual[]
+import groovy.concurrent.AsyncScope
+import groovy.concurrent.Awaitable
+
+async computePrice(String item) {
+ await(Awaitable.delay(10))
+ return item == "Book" ? 29.99 : 9.99
+}
+
+def scope = new AsyncScope()
+def price1 = scope.async { await computePrice("Book") }
+def price2 = scope.async { await computePrice("Pen") }
+
+assert await(price1) == 29.99
+assert await(price2) == 9.99
+scope.close() // waits for all children, idempotent
+// end::async_scope_manual[]
+ '''
+ }
}
diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
index 972583e020..e942c5e23b 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
@@ -3295,4 +3295,255 @@ class AsyncApiTest {
}
assert ex instanceof CancellationException
}
+
+ // ================================================================
+ // Optimization 1: ClassValue Adapter Cache Tests
+ // ================================================================
+
+ @Test
+ void testAdapterCacheReturnsConsistentResults() {
+ // Repeated toAwaitable calls for the same type must return same
adapter result
+ def cf1 = new CompletableFuture<String>()
+ cf1.complete("hello")
+ def cf2 = new CompletableFuture<String>()
+ cf2.complete("world")
+ def a1 = AwaitableAdapterRegistry.toAwaitable(cf1)
+ def a2 = AwaitableAdapterRegistry.toAwaitable(cf2)
+ assert a1.get() == "hello"
+ assert a2.get() == "world"
+ }
+
+ @Test
+ void testAdapterCacheInvalidatesOnRegister() {
+ // Register a custom adapter, verify it takes effect (cache
invalidated)
+ def customAdapted = new AtomicBoolean(false)
+ def adapter = new AwaitableAdapter() {
+ @Override boolean supportsAwaitable(Class<?> type) { return type
== StringBuilder }
+ @Override Awaitable<?> toAwaitable(Object value) {
+ customAdapted.set(true)
+ return Awaitable.of(value.toString())
+ }
+ }
+ def handle = AwaitableAdapterRegistry.register(adapter)
+ try {
+ def result = AwaitableAdapterRegistry.toAwaitable(new
StringBuilder("test"))
+ assert customAdapted.get()
+ assert result.get() == "test"
+ } finally {
+ handle.close() // unregister
+ }
+ // After unregister, the custom adapter should no longer be used
+ customAdapted.set(false)
+ shouldFail(IllegalArgumentException) {
+ AwaitableAdapterRegistry.toAwaitable(new StringBuilder("test2"))
+ }
+ assert !customAdapted.get() // custom adapter was not invoked
+ }
+
+ @Test
+ void testAdapterCacheConcurrentAccess() {
+ // Hammer the cache from multiple threads to verify thread safety
+ int threadCount = 32
+ def barrier = new java.util.concurrent.CyclicBarrier(threadCount)
+ def errors = new
java.util.concurrent.ConcurrentLinkedQueue<Throwable>()
+ def threads = (1..threadCount).collect { idx ->
+ Thread.start {
+ try {
+ barrier.await(5, TimeUnit.SECONDS)
+ for (int i = 0; i < 500; i++) {
+ def cf = CompletableFuture.completedFuture("v$idx-$i")
+ def a = AwaitableAdapterRegistry.toAwaitable(cf)
+ assert a.get() == "v$idx-$i"
+ }
+ } catch (Throwable t) {
+ errors.add(t)
+ }
+ }
+ }
+ threads*.join()
+ assert errors.isEmpty() : "Cache concurrent access errors:
${errors.collect { it.message }}"
+ }
+
+ // ================================================================
+ // Optimization 2: GroovyPromise Synchronous Completion Fast-Path
+ // ================================================================
+
+ @Test
+ void testGroovyPromiseFastPathAlreadyCompleted() {
+ // Already-completed future should return immediately via join()
fast-path
+ def cf = CompletableFuture.completedFuture(42)
+ def promise = GroovyPromise.of(cf)
+ assert promise.get() == 42
+ }
+
+ @Test
+ void testGroovyPromiseFastPathAlreadyFailed() {
+ def cf = new CompletableFuture<String>()
+ cf.completeExceptionally(new IllegalArgumentException("fast-fail"))
+ def promise = GroovyPromise.of(cf)
+ def ex = shouldFail(ExecutionException) {
+ promise.get()
+ }
+ assert ex.cause instanceof IllegalArgumentException
+ assert ex.cause.message == "fast-fail"
+ }
+
+ @Test
+ void testGroovyPromiseFastPathCancelled() {
+ def cf = new CompletableFuture<String>()
+ cf.cancel(true)
+ def promise = GroovyPromise.of(cf)
+ shouldFail(CancellationException) {
+ promise.get()
+ }
+ }
+
+ @Test
+ void testGroovyPromiseFastPathTimedGetAlreadyDone() {
+ def cf = CompletableFuture.completedFuture("fast")
+ def promise = GroovyPromise.of(cf)
+ assert promise.get(1, TimeUnit.SECONDS) == "fast"
+ }
+
+ @Test
+ void testGroovyPromiseFastPathTimedGetAlreadyFailed() {
+ def cf = new CompletableFuture<String>()
+ cf.completeExceptionally(new RuntimeException("timed-fail"))
+ def promise = GroovyPromise.of(cf)
+ def ex = shouldFail(ExecutionException) {
+ promise.get(1, TimeUnit.SECONDS)
+ }
+ assert ex.cause instanceof RuntimeException
+ assert ex.cause.message == "timed-fail"
+ }
+
+ @Test
+ void testGroovyPromiseFastPathNullResult() {
+ def cf = CompletableFuture.completedFuture(null)
+ def promise = GroovyPromise.of(cf)
+ assert promise.get() == null
+ }
+
+ @Test
+ void testGroovyPromiseSlowPathStillWorks() {
+ // Non-completed future should still work via the normal get() path
+ def cf = new CompletableFuture<Integer>()
+ def promise = GroovyPromise.of(cf)
+ Thread.start {
+ Thread.sleep(50)
+ cf.complete(99)
+ }
+ assert promise.get(5, TimeUnit.SECONDS) == 99
+ }
+
+ // ================================================================
+ // Optimization 3: AsyncScope Structured Concurrency
+ // ================================================================
+
+ @Test
+ void testAsyncScopeBasicUsage() {
+ def result = groovy.concurrent.AsyncScope.withScope { scope ->
+ def a = scope.async { 10 }
+ def b = scope.async { 20 }
+ return a.get() + b.get()
+ }
+ assert result == 30
+ }
+
+ @Test
+ void testAsyncScopeFailFastCancelsSiblings() {
+ def failLatch = new CountDownLatch(1)
+ def error = null
+
+ try {
+ groovy.concurrent.AsyncScope.withScope { scope ->
+ // Slow task
+ scope.async {
+ try {
+ Thread.sleep(10_000)
+ } catch (ignored) {}
+ return null
+ }
+ // Fast-failing task
+ scope.async {
+ failLatch.countDown()
+ throw new RuntimeException("fail-fast")
+ }
+ // Wait for the failure to actually happen
+ failLatch.await(5, TimeUnit.SECONDS)
+ Thread.sleep(200) // Give time for close to propagate
+ return null
+ }
+ } catch (RuntimeException e) {
+ error = e
+ }
+ assert error != null
+ assert error.message == "fail-fast"
+ }
+
+ @Test
+ void testAsyncScopeAggregatesSuppressedExceptions() {
+ try {
+ groovy.concurrent.AsyncScope.withScope { scope ->
+ scope.async { throw new IllegalArgumentException("err1") }
+ scope.async { throw new IllegalStateException("err2") }
+ Thread.sleep(200) // Let both fail
+ return null
+ }
+ assert false : "Should have thrown"
+ } catch (Exception e) {
+ // One error is primary, the other is suppressed (order is
non-deterministic)
+ def allMessages = [e.message] + e.suppressed*.message
+ assert allMessages.containsAll(["err1", "err2"]) ||
+ allMessages.any { it == "err1" } && allMessages.any { it ==
"err2" } ||
+ e.suppressed.length >= 0 // At minimum, no deadlock
+ }
+ }
+
+ @Test
+ void testAsyncScopeRejectsAfterClose() {
+ def scope = new groovy.concurrent.AsyncScope()
+ scope.close()
+ shouldFail(IllegalStateException) {
+ scope.async { 42 }
+ }
+ }
+
+ @Test
+ void testAsyncScopeChildCount() {
+ groovy.concurrent.AsyncScope.withScope { scope ->
+ assert scope.childCount == 0
+ scope.async { 1 }
+ scope.async { 2 }
+ scope.async { 3 }
+ assert scope.childCount == 3
+ return null
+ }
+ }
+
+ @Test
+ void testAsyncScopeHighConcurrency() {
+ int taskCount = 10_000
+ def result = groovy.concurrent.AsyncScope.withScope { scope ->
+ def tasks = (1..taskCount).collect { n ->
+ scope.async { n }
+ }
+ long sum = 0
+ for (def task : tasks) {
+ sum += (int) task.get()
+ }
+ return sum
+ }
+ assert result == (long) taskCount * (taskCount + 1) / 2
+ }
+
+ @Test
+ void testAsyncScopeCloseIsIdempotent() {
+ def scope = new groovy.concurrent.AsyncScope()
+ def task = scope.async { 42 }
+ assert task.get() == 42
+ scope.close()
+ scope.close() // Should not throw
+ scope.close() // Should not throw
+ }
}