This is an automated email from the ASF dual-hosted git repository. paulk pushed a commit to branch GROOVY_9381_5 in repository https://gitbox.apache.org/repos/asf/groovy.git
commit 9ebd690aa14dbcf2a7f13b75567eba6fa00972c7 Author: Paul King <[email protected]> AuthorDate: Fri Apr 3 15:40:26 2026 +1000 GROOVY-9381: Add native async/await support --- src/main/java/groovy/concurrent/AsyncChannel.java | 114 ++++++ .../groovy/concurrent/ChannelClosedException.java | 77 ++++ .../apache/groovy/parser/antlr4/AstBuilder.java | 4 +- .../apache/groovy/runtime/async/AsyncSupport.java | 53 ++- .../groovy/runtime/async/DefaultAsyncChannel.java | 306 ++++++++++++++ .../groovy/transform/AsyncTransformHelper.java | 16 +- src/spec/doc/core-async-await.adoc | 449 ++++++++++----------- src/spec/test/AsyncAwaitSpecTest.groovy | 392 +++++++++--------- src/test/groovy/groovy/AsyncAwaitTest.groovy | 175 +++++--- 9 files changed, 1095 insertions(+), 491 deletions(-) diff --git a/src/main/java/groovy/concurrent/AsyncChannel.java b/src/main/java/groovy/concurrent/AsyncChannel.java new file mode 100644 index 0000000000..8fbf5a2fdc --- /dev/null +++ b/src/main/java/groovy/concurrent/AsyncChannel.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.concurrent; + +import org.apache.groovy.runtime.async.DefaultAsyncChannel; + +/** + * An asynchronous channel for inter-task communication with optional buffering. + * <p> + * A channel coordinates producers and consumers without exposing explicit + * locks or shared mutable state, following the CSP (Communicating Sequential + * Processes) paradigm popularized by Go's channels. + * <p> + * Channels support both unbuffered (rendezvous) and buffered modes: + * <ul> + * <li><b>Unbuffered</b> — {@code create()} or {@code create(0)}. Each + * {@code send} suspends until a matching {@code receive} arrives.</li> + * <li><b>Buffered</b> — {@code create(n)}. Values are enqueued until the + * buffer fills, then senders suspend.</li> + * </ul> + * <p> + * Channels implement {@link Iterable}, so they work with {@code for await} + * and regular {@code for} loops — iteration yields received values until the + * channel is closed and drained: + * <pre>{@code + * def ch = AsyncChannel.create(2) + * async { ch.send('a'); ch.send('b'); ch.close() } + * for await (item in ch) { + * println item // prints 'a', then 'b' + * } + * }</pre> + * + * @param <T> the payload type + * @see Awaitable + * @since 6.0.0 + */ +public interface AsyncChannel<T> extends Iterable<T> { + + /** + * Creates an unbuffered (rendezvous) channel. + */ + static <T> AsyncChannel<T> create() { + return new DefaultAsyncChannel<>(); + } + + /** + * Creates a channel with the specified buffer capacity. + * + * @param capacity the maximum buffer size; 0 for unbuffered + */ + static <T> AsyncChannel<T> create(int capacity) { + return new DefaultAsyncChannel<>(capacity); + } + + /** Returns this channel's buffer capacity. */ + int getCapacity(); + + /** Returns the number of values currently buffered. */ + int getBufferedSize(); + + /** Returns {@code true} if this channel has been closed. */ + boolean isClosed(); + + /** + * Sends a value through this channel. + * <p> + * The returned {@link Awaitable} completes when the value has been + * delivered to a receiver or buffered. Sending to a closed channel + * fails immediately with {@link ChannelClosedException}. + * + * @param value the value to send; must not be {@code null} + * @return an Awaitable that completes when the send succeeds + * @throws NullPointerException if value is null + */ + Awaitable<Void> send(T value); + + /** + * Receives the next value from this channel. + * <p> + * The returned {@link Awaitable} completes when a value is available. + * Receiving from a closed, empty channel fails with + * {@link ChannelClosedException}. + * + * @return an Awaitable that yields the next value + */ + Awaitable<T> receive(); + + /** + * Closes this channel. Idempotent. + * <p> + * Buffered values remain receivable. Pending senders fail with + * {@link ChannelClosedException}. After all buffered values are + * drained, subsequent receives also fail. + * + * @return {@code true} if this call actually closed the channel + */ + boolean close(); +} diff --git a/src/main/java/groovy/concurrent/ChannelClosedException.java b/src/main/java/groovy/concurrent/ChannelClosedException.java new file mode 100644 index 0000000000..6a963b24af --- /dev/null +++ b/src/main/java/groovy/concurrent/ChannelClosedException.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.concurrent; + +/** + * Thrown when an {@link AsyncChannel} operation is attempted after the channel + * has been closed. + * <p> + * This exception is raised in the following situations: + * <ul> + * <li>{@link AsyncChannel#send(Object) send()} — the channel was closed before + * or during the send attempt. Pending senders that were waiting for buffer + * space when the channel closed also receive this exception.</li> + * <li>{@link AsyncChannel#receive() receive()} — the channel was closed and all + * buffered values have been drained. Note that values buffered before + * closure are still delivered normally; only once the buffer is exhausted + * does this exception appear.</li> + * </ul> + * <p> + * When used with {@code for await}, the loop infrastructure translates + * {@code ChannelClosedException} into a clean end-of-stream signal (i.e., + * the loop exits normally rather than propagating the exception): + * <pre> + * def ch = AsyncChannel.create() + * // ... producer sends values, then calls ch.close() + * for await (item in ch) { + * process(item) // processes all buffered values + * } + * // loop exits cleanly after the channel is closed and drained + * </pre> + * + * @see AsyncChannel#send(Object) + * @see AsyncChannel#receive() + * @see AsyncChannel#close() + * @since 6.0.0 + */ +public class ChannelClosedException extends IllegalStateException { + + private static final long serialVersionUID = 1L; + + /** + * Creates a {@code ChannelClosedException} with the specified detail message. + * + * @param message the detail message describing which operation failed + */ + public ChannelClosedException(String message) { + super(message); + } + + /** + * Creates a {@code ChannelClosedException} with the specified detail message + * and cause. + * + * @param message the detail message + * @param cause the underlying cause (e.g., an {@link InterruptedException} + * if the thread was interrupted while waiting on a channel operation) + */ + public ChannelClosedException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java index 40b1c55c89..af40395cca 100644 --- a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java +++ b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java @@ -2971,10 +2971,10 @@ public class AstBuilder extends GroovyParserBaseVisitor<Object> { ClosureExpression genClosure = new ClosureExpression(newParams, closure.getCode()); genClosure.setVariableScope(closure.getVariableScope()); genClosure.setSourcePosition(closure); - return configureAST(AsyncTransformHelper.buildWrapAsyncGeneratorCall( + return configureAST(AsyncTransformHelper.buildAsyncGeneratorCall( new ArgumentListExpression(genClosure)), ctx); } else { - return configureAST(AsyncTransformHelper.buildWrapAsyncCall( + return configureAST(AsyncTransformHelper.buildAsyncCall( new ArgumentListExpression(closure)), ctx); } } diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java index 52d3f3837e..1c4a21d46d 100644 --- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java +++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java @@ -328,6 +328,33 @@ public class AsyncSupport { }; } + /** + * Starts a generator immediately, returning an {@link Iterable} backed by a + * {@link GeneratorBridge}. This is the runtime entry point for + * {@code async { ... yield return ... }} expressions. + * + * @param closure the generator closure; receives a GeneratorBridge as first parameter + * @param <T> the element type + * @return an Iterable that yields values from the generator + */ + @SuppressWarnings("unchecked") + public static <T> Iterable<T> asyncGenerator(Closure<?> closure) { + Objects.requireNonNull(closure, "closure must not be null"); + GeneratorBridge<T> bridge = new GeneratorBridge<>(); + Object[] args = new Object[]{bridge}; + defaultExecutor.execute(() -> { + try { + closure.call(args); + bridge.complete(); + } catch (GeneratorBridge.GeneratorClosedException ignored) { + // Consumer closed early — normal for break in for-await + } catch (Throwable t) { + bridge.completeExceptionally(t); + } + }); + return () -> bridge; + } + // ---- for-await (blocking iterable conversion) ----------------------- /** @@ -467,12 +494,26 @@ public class AsyncSupport { CompletableFuture<?>[] futures = Arrays.stream(sources) .map(s -> Awaitable.from(s).toCompletableFuture()) .toArray(CompletableFuture[]::new); - CompletableFuture<List<Object>> combined = CompletableFuture.allOf(futures) - .thenApply(v -> { - List<Object> results = new ArrayList<>(futures.length); - for (CompletableFuture<?> f : futures) results.add(f.join()); - return results; - }); + + // Track the temporally-first failure explicitly, since + // CompletableFuture.allOf() doesn't guarantee which exception + // propagates when multiple futures fail. + var firstError = new java.util.concurrent.atomic.AtomicReference<Throwable>(); + for (CompletableFuture<?> f : futures) { + f.whenComplete((v, e) -> { + if (e != null) firstError.compareAndSet(null, e); + }); + } + + CompletableFuture<List<Object>> combined = CompletableFuture.allOf( + Arrays.stream(futures).map(f -> f.handle((v, e) -> null)).toArray(CompletableFuture[]::new) + ).thenApply(v -> { + Throwable err = firstError.get(); + if (err != null) throw err instanceof CompletionException ce ? ce : new CompletionException(err); + List<Object> results = new ArrayList<>(futures.length); + for (CompletableFuture<?> f : futures) results.add(f.join()); + return results; + }); return GroovyPromise.of(combined); } diff --git a/src/main/java/org/apache/groovy/runtime/async/DefaultAsyncChannel.java b/src/main/java/org/apache/groovy/runtime/async/DefaultAsyncChannel.java new file mode 100644 index 0000000000..467bb3d229 --- /dev/null +++ b/src/main/java/org/apache/groovy/runtime/async/DefaultAsyncChannel.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.groovy.runtime.async; + +import groovy.concurrent.AsyncChannel; +import groovy.concurrent.Awaitable; +import groovy.concurrent.ChannelClosedException; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Default lock-based implementation of {@link AsyncChannel}. + * <p> + * Uses a {@link ReentrantLock} to coordinate access to the internal buffer + * and the waiting-sender/waiting-receiver queues. All operations return + * {@link Awaitable} immediately; the underlying {@link CompletableFuture} + * is completed asynchronously when matching counterparts arrive. + * + * @param <T> the payload type + * @see AsyncChannel + * @since 6.0.0 + */ +public final class DefaultAsyncChannel<T> implements AsyncChannel<T> { + + private final ReentrantLock lock = new ReentrantLock(); + private final Deque<T> buffer = new ArrayDeque<>(); + private final Deque<PendingSend<T>> waitingSenders = new ArrayDeque<>(); + private final Deque<CompletableFuture<T>> waitingReceivers = new ArrayDeque<>(); + private final int capacity; + private volatile boolean closed; + + public DefaultAsyncChannel() { + this(0); + } + + public DefaultAsyncChannel(int capacity) { + if (capacity < 0) { + throw new IllegalArgumentException("channel capacity must not be negative: " + capacity); + } + this.capacity = capacity; + } + + // ---- Query ---------------------------------------------------------- + + @Override + public int getCapacity() { + return capacity; + } + + @Override + public int getBufferedSize() { + lock.lock(); + try { + return buffer.size(); + } finally { + lock.unlock(); + } + } + + @Override + public boolean isClosed() { + return closed; + } + + // ---- Core Operations ------------------------------------------------ + + @Override + public Awaitable<Void> send(T value) { + Objects.requireNonNull(value, "channel does not support null values"); + + CompletableFuture<Void> completion = new CompletableFuture<>(); + PendingSend<T> pending = new PendingSend<>(value, completion); + boolean queued; + + lock.lock(); + try { + if (closed) { + completion.completeExceptionally(closedForSend()); + queued = false; + } else if (deliverToWaitingReceiver(value)) { + completion.complete(null); + queued = false; + } else if (buffer.size() < capacity) { + buffer.addLast(value); + completion.complete(null); + queued = false; + } else { + waitingSenders.addLast(pending); + queued = true; + } + } finally { + lock.unlock(); + } + + if (queued) { + completion.whenComplete((ignored, error) -> { + if (error != null || completion.isCancelled()) { + removePendingSender(pending); + } + }); + } + + return GroovyPromise.of(completion); + } + + @Override + public Awaitable<T> receive() { + CompletableFuture<T> completion = new CompletableFuture<>(); + boolean queued; + + lock.lock(); + try { + T buffered = pollBuffer(); + if (buffered != null) { + completion.complete(buffered); + queued = false; + } else { + PendingSend<T> sender = pollPendingSender(); + if (sender != null) { + sender.completion.complete(null); + completion.complete(sender.value); + queued = false; + } else if (closed) { + completion.completeExceptionally(closedForReceive()); + queued = false; + } else { + waitingReceivers.addLast(completion); + queued = true; + } + } + } finally { + lock.unlock(); + } + + if (queued) { + completion.whenComplete((ignored, error) -> { + if (error != null || completion.isCancelled()) { + removePendingReceiver(completion); + } + }); + } + + return GroovyPromise.of(completion); + } + + @Override + public boolean close() { + lock.lock(); + try { + if (closed) return false; + closed = true; + + drainBufferToReceivers(); + + while (!waitingReceivers.isEmpty()) { + waitingReceivers.removeFirst().completeExceptionally(closedForReceive()); + } + while (!waitingSenders.isEmpty()) { + waitingSenders.removeFirst().completion.completeExceptionally(closedForSend()); + } + + return true; + } finally { + lock.unlock(); + } + } + + // ---- Iterable (for await / for loop) -------------------------------- + + /** + * Returns a blocking iterator that receives values until the channel + * is closed and drained. Each {@code next()} call blocks until a value + * is available. {@link ChannelClosedException} signals end-of-iteration. + */ + @Override + public Iterator<T> iterator() { + return new ChannelIterator(); + } + + private final class ChannelIterator implements Iterator<T> { + private T next; + private boolean done; + + @Override + public boolean hasNext() { + if (done) return false; + if (next != null) return true; + try { + next = AsyncSupport.await(receive()); + return true; + } catch (ChannelClosedException e) { + done = true; + return false; + } + } + + @Override + public T next() { + if (!hasNext()) throw new NoSuchElementException(); + T value = next; + next = null; + return value; + } + } + + // ---- toString ------------------------------------------------------- + + @Override + public String toString() { + lock.lock(); + try { + return "AsyncChannel{capacity=" + capacity + + ", buffered=" + buffer.size() + + ", waitingSenders=" + waitingSenders.size() + + ", waitingReceivers=" + waitingReceivers.size() + + ", closed=" + closed + '}'; + } finally { + lock.unlock(); + } + } + + // ---- Internal ------------------------------------------------------- + + private boolean deliverToWaitingReceiver(T value) { + while (!waitingReceivers.isEmpty()) { + CompletableFuture<T> receiver = waitingReceivers.removeFirst(); + if (receiver.complete(value)) return true; + } + return false; + } + + private void drainBufferToReceivers() { + while (!waitingReceivers.isEmpty() && !buffer.isEmpty()) { + CompletableFuture<T> receiver = waitingReceivers.removeFirst(); + if (receiver.complete(buffer.peekFirst())) { + buffer.removeFirst(); + } + } + } + + private T pollBuffer() { + if (buffer.isEmpty()) return null; + T value = buffer.removeFirst(); + refillBufferFromWaitingSenders(); + return value; + } + + private void refillBufferFromWaitingSenders() { + while (buffer.size() < capacity) { + PendingSend<T> sender = pollPendingSender(); + if (sender == null) return; + buffer.addLast(sender.value); + sender.completion.complete(null); + } + } + + private PendingSend<T> pollPendingSender() { + while (!waitingSenders.isEmpty()) { + PendingSend<T> sender = waitingSenders.removeFirst(); + if (!sender.completion.isDone()) return sender; + } + return null; + } + + private void removePendingSender(PendingSend<T> sender) { + lock.lock(); + try { waitingSenders.remove(sender); } finally { lock.unlock(); } + } + + private void removePendingReceiver(CompletableFuture<T> receiver) { + lock.lock(); + try { waitingReceivers.remove(receiver); } finally { lock.unlock(); } + } + + private static ChannelClosedException closedForSend() { + return new ChannelClosedException("channel is closed for send"); + } + + private static ChannelClosedException closedForReceive() { + return new ChannelClosedException("channel is closed"); + } + + private record PendingSend<T>(T value, CompletableFuture<Void> completion) {} +} diff --git a/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java b/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java index 00b323e0a6..766fe72d78 100644 --- a/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java +++ b/src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java @@ -57,8 +57,8 @@ public final class AsyncTransformHelper { private static final String ASYNC_GEN_PARAM_NAME = "$__asyncGen__"; private static final String AWAIT_METHOD = "await"; private static final String YIELD_RETURN_METHOD = "yieldReturn"; - private static final String WRAP_ASYNC_METHOD = "wrapAsync"; - private static final String WRAP_ASYNC_GENERATOR_METHOD = "wrapAsyncGenerator"; + private static final String ASYNC_METHOD = "async"; + private static final String ASYNC_GENERATOR_METHOD = "asyncGenerator"; private static final String TO_BLOCKING_ITERABLE_METHOD = "toBlockingIterable"; private static final String CLOSE_ITERABLE_METHOD = "closeIterable"; private static final String CREATE_DEFER_SCOPE_METHOD = "createDeferScope"; @@ -100,17 +100,17 @@ public final class AsyncTransformHelper { } /** - * Builds {@code AsyncSupport.wrapAsync(closure)}. + * Builds {@code AsyncSupport.async(closure)} — starts immediately, returns Awaitable. */ - public static Expression buildWrapAsyncCall(Expression closure) { - return callX(ASYNC_SUPPORT_TYPE, WRAP_ASYNC_METHOD, ensureArgs(closure)); + public static Expression buildAsyncCall(Expression closure) { + return callX(ASYNC_SUPPORT_TYPE, ASYNC_METHOD, ensureArgs(closure)); } /** - * Builds {@code AsyncSupport.wrapAsyncGenerator(closure)}. + * Builds {@code AsyncSupport.asyncGenerator(closure)} — starts immediately, returns Iterable. */ - public static Expression buildWrapAsyncGeneratorCall(Expression closure) { - return callX(ASYNC_SUPPORT_TYPE, WRAP_ASYNC_GENERATOR_METHOD, ensureArgs(closure)); + public static Expression buildAsyncGeneratorCall(Expression closure) { + return callX(ASYNC_SUPPORT_TYPE, ASYNC_GENERATOR_METHOD, ensureArgs(closure)); } /** diff --git a/src/spec/doc/core-async-await.adoc b/src/spec/doc/core-async-await.adoc index b4d609cb75..afa0b298c8 100644 --- a/src/spec/doc/core-async-await.adoc +++ b/src/spec/doc/core-async-await.adoc @@ -21,134 +21,72 @@ = Async/Await (Incubating) -[[async-await]] +[[async-intro]] == Introduction Groovy provides native `async`/`await` support, enabling developers to write concurrent code in a sequential, readable style. Rather than dealing with callbacks, `CompletableFuture` chains, or manual thread management, you express -concurrency with two simple constructs: +concurrency with two constructs: -* **`async { ... }`** — execute a closure asynchronously, returning an `Awaitable` +* **`async { ... }`** — start a closure on a background thread, returning an `Awaitable` * **`await expr`** — block until an asynchronous result is available On JDK 21+, async tasks automatically leverage https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/lang/Thread.html#ofVirtual()[virtual threads] for optimal scalability. On JDK 17–20, a cached thread pool is used as a -fallback, providing correct behavior with slightly higher resource cost for -high-concurrency scenarios. +fallback. -A `defer` statement for cleanup is also provided, inspired by Go's `defer` keyword. +The examples throughout this guide use an online multiplayer card game as a +running theme — dealing hands, racing for the fastest play, streaming cards, +and managing tournament rounds. -[[async-basic]] -== Basic Usage +[[async-getting-started]] +== Getting Started -The simplest use of `async`/`await` spawns work in a background thread and -collects the result: +=== Your first async/await -[source,groovy] ----- -include::../test/AsyncAwaitSpecTest.groovy[tags=basic_async_await,indent=0] ----- - -`async { ... }` creates a _reusable async closure_ — the work does **not** -start immediately. You must call it with `()` to launch the computation and -get back an `Awaitable`: - -[source,groovy] ----- -def task = async { expensiveWork() } // defines the async closure — nothing runs yet -def awaitable = task() // () starts the work, returns an Awaitable -def result = await awaitable // blocks until the result is ready ----- - -Each `()` invocation starts a fresh, independent execution, so the same -async closure can be reused. If you want work to start immediately without -the extra `()` call, use `Awaitable.go`: +`async { ... }` starts work immediately on a background thread and returns an +`Awaitable`. Use `await` to collect the result: [source,groovy] ---- -def task = Awaitable.go { expensiveWork() } // starts immediately -def result = await task // no () needed +include::../test/AsyncAwaitSpecTest.groovy[tags=draw_card,indent=0] ---- -Here is a more realistic example — a sequential workflow where each step -depends on the previous one: +=== Exception handling -[source,groovy] ----- -include::../test/AsyncAwaitSpecTest.groovy[tags=sequential_workflow,indent=0] ----- - -The code reads top-to-bottom like synchronous code, but the entire workflow -runs in a background thread. No callbacks, no `.thenCompose()` chains. - -[[async-shared-state]] -=== A Note on Shared State - -Async closures run on a separate thread, so mutating shared variables from -multiple async closures is a race condition — just as it would be with -`Thread.start { ... }`: - -[source,groovy] ----- -// UNSAFE — count is shared and mutated without synchronization -var count = 0 -def tasks = (1..100).collect { async { count++ } } -tasks.each { await it() } -// count may not be 100! ----- - -Prefer returning values from async closures and collecting results with -`await` or combinators, rather than mutating shared state: - -[source,groovy] ----- -// SAFE — each async closure returns a value, no shared mutation -def tasks = (1..100).collect { n -> async { n } } -def results = await(tasks.collect { it() }) -assert results.sum() == 5050 ----- - -When shared mutable state is unavoidable, use `AtomicInteger`, -`ConcurrentHashMap`, or other thread-safe types from `java.util.concurrent`. - -[[async-exception-handling]] -=== Exception Handling - -`await` automatically unwraps `CompletionException` and `ExecutionException`, -rethrowing the original exception. Standard `try`/`catch` works naturally: +`await` unwraps `CompletionException` and `ExecutionException` automatically. +Standard `try`/`catch` works exactly as with synchronous code: [source,groovy] ---- include::../test/AsyncAwaitSpecTest.groovy[tags=exception_handling,indent=0] ---- -[[async-cf-interop]] -=== CompletableFuture Interop +=== CompletableFuture interop `await` works directly with `CompletableFuture`, `CompletionStage`, and `Future` -from Java libraries — no wrapping needed: +from Java libraries: [source,groovy] ---- include::../test/AsyncAwaitSpecTest.groovy[tags=cf_interop,indent=0] ---- -[[async-combinators]] -== Combinators +[[async-parallel]] +== Running Tasks in Parallel -`Awaitable` provides static combinator methods for coordinating multiple -concurrent tasks. +The real power of async/await appears when you need to run several tasks +concurrently and coordinate their results. -[[async-all]] -=== Waiting for All: `Awaitable.all()` and Multi-arg `await` +=== Waiting for all: `Awaitable.all()` -Launch several tasks in parallel and wait for all results: +Deal cards to multiple players at the same time and wait for all hands: [source,groovy] ---- -include::../test/AsyncAwaitSpecTest.groovy[tags=parallel_tasks,indent=0] +include::../test/AsyncAwaitSpecTest.groovy[tags=deal_hands,indent=0] ---- Multi-argument `await` is syntactic sugar for `Awaitable.all()`: @@ -158,279 +96,324 @@ Multi-argument `await` is syntactic sugar for `Awaitable.all()`: include::../test/AsyncAwaitSpecTest.groovy[tags=multi_arg_await,indent=0] ---- -[[async-any]] === Racing: `Awaitable.any()` -Returns the result of the first task to complete (success or failure): +Returns the result of the first task to complete — useful for fallback +patterns or latency-sensitive code: [source,groovy] ---- -include::../test/AsyncAwaitSpecTest.groovy[tags=racing_tasks,indent=0] +include::../test/AsyncAwaitSpecTest.groovy[tags=fastest_server,indent=0] ---- -[[async-first]] -=== First Success: `Awaitable.first()` +=== First success: `Awaitable.first()` Like JavaScript's `Promise.any()` — returns the first _successful_ result, -ignoring individual failures. Only fails when all tasks fail: +silently ignoring individual failures. Only fails when every task fails: [source,groovy] ---- include::../test/AsyncAwaitSpecTest.groovy[tags=first_success,indent=0] ---- -Typical use cases include hedged requests (send to multiple endpoints, use -whichever responds first) and graceful degradation (try primary, then fallback). - -[[async-all-settled]] -=== Inspecting All Outcomes: `Awaitable.allSettled()` +=== Inspecting all outcomes: `Awaitable.allSettled()` -Waits for all tasks to complete (succeed or fail) without throwing, returning -an `AwaitResult` list: +Waits for all tasks to finish (succeed or fail) without throwing. Returns an +`AwaitResult` list where each entry has `success`, `value`, and `error` fields: [source,groovy] ---- include::../test/AsyncAwaitSpecTest.groovy[tags=all_settled,indent=0] ---- -Each `AwaitResult` has `success` (boolean), `value` (if successful), and -`error` (if failed). +[[async-generators]] +== Generators and Streaming -[[async-go]] -=== Lightweight Spawn: `Awaitable.go()` +=== Producing values with `yield return` -A shorthand for spawning a task on the default executor: +An `async` closure containing `yield return` becomes a _generator_ — it lazily +produces a sequence of values. The generator runs on a background thread and +blocks on each `yield return` until the consumer is ready, providing natural +back-pressure: [source,groovy] ---- -include::../test/AsyncAwaitSpecTest.groovy[tags=go_spawn,indent=0] +include::../test/AsyncAwaitSpecTest.groovy[tags=deck_generator,indent=0] ---- -[[async-structured]] -== Structured Concurrency - -Structured concurrency binds the lifetime of concurrent tasks to a scope. -When the scope exits, all child tasks are guaranteed to have completed (or -been cancelled). This prevents orphaned tasks, resource leaks, and silent -failures from unobserved exceptions. - -Use `AsyncScope.withScope` to create a scope: +Generators return a standard `Iterable`, so regular `for` loops and Groovy +collection methods (`collect`, `findAll`, `take`, etc.) work out of the box: [source,groovy] ---- -include::../test/AsyncAwaitSpecTest.groovy[tags=structured_concurrency,indent=0] +include::../test/AsyncAwaitSpecTest.groovy[tags=generator_regular_for,indent=0] ---- -[[async-fail-fast]] -=== Fail-fast Cancellation +=== Consuming with `for await` -By default, if any child task fails, all siblings are cancelled immediately. -The first failure propagates as the scope's exception: +`for await` iterates over any async source. For generators and plain collections, +it works identically to a regular `for` loop: [source,groovy] ---- -include::../test/AsyncAwaitSpecTest.groovy[tags=scope_fail_fast,indent=0] +include::../test/AsyncAwaitSpecTest.groovy[tags=for_await_generator,indent=0] ---- -[[async-scope-waits]] -=== All Children Complete +The key value of `for await` is with **reactive types** (Reactor `Flux`, RxJava +`Observable`) where it automatically converts the source to a blocking iterable +via the adapter SPI. Without `for await`, you would need to call the conversion +manually (e.g., `flux.toIterable()`). For generators and plain collections, +a regular `for` loop works identically. -The scope blocks until every child task finishes, even if no explicit `await` -is used: +[[async-channels]] +== Channels + +Channels provide Go-style inter-task communication. A producer sends values +into a channel; a consumer receives them as they arrive. The channel handles +synchronization and optional buffering — no shared mutable state needed: [source,groovy] ---- -include::../test/AsyncAwaitSpecTest.groovy[tags=scope_waits,indent=0] +include::../test/AsyncAwaitSpecTest.groovy[tags=channel,indent=0] ---- -[[async-timeout]] -== Timeouts and Delays +Channels support unbuffered (rendezvous, `create()`) and buffered (`create(n)`) +modes. Sending blocks when the buffer is full; receiving blocks when empty. +With virtual threads, this blocking is essentially free. -[[async-delay]] -=== Non-blocking Delay +Since channels implement `Iterable`, they also work with regular `for` loops +and Groovy collection methods. -`Awaitable.delay()` pauses without blocking a thread: +[[async-defer]] +== Deferred Cleanup with `defer` + +The `defer` keyword schedules a cleanup action to run when the enclosing +`async` closure completes, regardless of success or failure. Multiple +deferred actions execute in LIFO order — last registered, first to run — +making it natural to pair resource acquisition with cleanup: [source,groovy] ---- -include::../test/AsyncAwaitSpecTest.groovy[tags=delay,indent=0] +include::../test/AsyncAwaitSpecTest.groovy[tags=defer_basic,indent=0] ---- -[[async-or-timeout]] -=== Fail-fast Timeout - -Apply a deadline to any task. If the task doesn't complete in time, -a `TimeoutException` is thrown: +Deferred actions always run, even when an exception occurs: [source,groovy] ---- -include::../test/AsyncAwaitSpecTest.groovy[tags=timeout,indent=0] +include::../test/AsyncAwaitSpecTest.groovy[tags=defer_exception,indent=0] ---- -[[async-complete-on-timeout]] -=== Fallback on Timeout +If a deferred action returns an `Awaitable` or `Future`, the result is awaited +before the next deferred action runs, ensuring orderly cleanup of asynchronous +resources. -Use a fallback value instead of throwing: +[[async-diving-deeper]] +== Diving Deeper + +[[async-structured]] +=== Structured Concurrency + +`AsyncScope` binds the lifetime of child tasks to a scope. When the scope +exits, all children are guaranteed complete (or cancelled). This prevents +orphaned tasks and silent failures: [source,groovy] ---- -include::../test/AsyncAwaitSpecTest.groovy[tags=timeout_fallback,indent=0] +include::../test/AsyncAwaitSpecTest.groovy[tags=structured_concurrency,indent=0] ---- -[[async-generators]] -== Generators with `yield return` - -Async closures containing `yield return` become _generators_ — functions that -lazily produce a sequence of values. The generator runs on a background thread -and each `yield return` hands a value to the consumer, blocking until the -consumer is ready for the next one (natural back-pressure): +By default, the scope uses **fail-fast** semantics — if any child fails, +all siblings are cancelled immediately: [source,groovy] ---- -include::../test/AsyncAwaitSpecTest.groovy[tags=yield_return,indent=0] +include::../test/AsyncAwaitSpecTest.groovy[tags=scope_fail_fast,indent=0] ---- -Since generators return a standard `Iterable`, you can consume them with a -regular `for` loop or any Groovy collection method — no special syntax needed: +The scope waits for every child to finish, even without explicit `await` calls: [source,groovy] ---- -include::../test/AsyncAwaitSpecTest.groovy[tags=generator_regular_for,indent=0] +include::../test/AsyncAwaitSpecTest.groovy[tags=scope_waits,indent=0] ---- -With virtual threads (JDK 21+), blocking iteration is essentially free. -On JDK 17-20, the generator runs on a platform thread from the cached pool. +[[async-timeouts]] +=== Timeouts and Delays -[[async-for-await]] -== Iterating with `for await` +Apply a deadline to any task. If it doesn't complete in time, a +`TimeoutException` is thrown: -The `for await` loop works with any iterable source: +[source,groovy] +---- +include::../test/AsyncAwaitSpecTest.groovy[tags=timeout,indent=0] +---- + +Or use a fallback value instead of throwing: [source,groovy] ---- -include::../test/AsyncAwaitSpecTest.groovy[tags=for_await_no_generator,indent=0] +include::../test/AsyncAwaitSpecTest.groovy[tags=timeout_fallback,indent=0] ---- -It also works with generators: +`Awaitable.delay()` pauses without blocking a thread: [source,groovy] ---- -include::../test/AsyncAwaitSpecTest.groovy[tags=for_await,indent=0] +include::../test/AsyncAwaitSpecTest.groovy[tags=delay,indent=0] ---- -The key value of `for await` is with **reactive types** (Reactor `Flux`, RxJava -`Observable`) where it automatically converts the source to a blocking iterable -via the `AwaitableAdapter` SPI. Without `for await`, you would need to call the -conversion manually (e.g., `flux.toIterable()` or `observable.blockingIterable()`). -For generators and plain collections, a regular `for` loop works identically. +[[async-adapters]] +=== Framework Adapters -[[async-defer]] -== Deferred Cleanup with `defer` +`await` natively understands `Awaitable`, `CompletableFuture`, `CompletionStage`, +and `Future`. Third-party async types can be supported by implementing the +`AwaitableAdapter` SPI and registering it via +`META-INF/services/groovy.concurrent.AwaitableAdapter`. -The `defer` keyword schedules a cleanup action to run when the enclosing -`async` closure completes, regardless of whether it succeeds or throws. -Deferred actions execute in LIFO (last-in, first-out) order, making it -natural to pair resource acquisition with cleanup: +Drop-in adapter modules are provided for: + +* **`groovy-reactor`** — `await` on `Mono`, `for await` over `Flux` +* **`groovy-rxjava`** — `await` on `Single`/`Maybe`/`Completable`, + `for await` over `Observable`/`Flowable` + +For example, without the adapter you must manually convert RxJava types: [source,groovy] ---- -include::../test/AsyncAwaitSpecTest.groovy[tags=defer_basic,indent=0] +// Without groovy-rxjava — manual conversion +def result = Single.just('hello').toCompletionStage().toCompletableFuture().join() ---- -Deferred actions always run, even when an exception occurs: +With `groovy-rxjava` on the classpath, the conversion is transparent: [source,groovy] ---- -include::../test/AsyncAwaitSpecTest.groovy[tags=defer_exception,indent=0] +// With groovy-rxjava — adapter handles the plumbing +def result = await Awaitable.from(Single.just('hello')) ---- -If a deferred action itself returns an `Awaitable` or `Future`, the result -is awaited before the next deferred action runs, ensuring orderly cleanup -of asynchronous resources. - [[async-executor]] -== Executor Configuration +=== Executor Configuration By default, `async` uses: -* **JDK 21+**: a virtual-thread-per-task executor (essentially unlimited, - near-zero overhead per task) -* **JDK 17–20**: a cached daemon thread pool (max 256 threads by default, - configurable via the `groovy.async.parallelism` system property) +* **JDK 21+**: a virtual-thread-per-task executor +* **JDK 17–20**: a cached daemon thread pool (max 256 threads, configurable + via `groovy.async.parallelism` system property) -You can override the executor for specific needs: +You can override the executor: [source,groovy] ---- import org.apache.groovy.runtime.async.AsyncSupport import java.util.concurrent.Executors -// Use a custom executor AsyncSupport.setExecutor(Executors.newFixedThreadPool(4)) - -// Reset to the default -AsyncSupport.resetExecutor() +AsyncSupport.resetExecutor() // restore default ---- -[[async-adapter]] -== Framework Extensibility +[[async-best-practices]] +== Best Practices -`await` natively understands `Awaitable`, `CompletableFuture`, `CompletionStage`, -and `Future`. Third-party async types can be supported by implementing the -`AwaitableAdapter` SPI and registering it via -`META-INF/services/groovy.concurrent.AwaitableAdapter`. - -Drop-in adapter modules are provided for: - -* **`groovy-reactor`** — `await` on `Mono`, `for await` over `Flux` -* **`groovy-rxjava`** — `await` on `Single`/`Maybe`/`Completable`, - `for await` over `Observable`/`Flowable` +[[async-prefer-values]] +=== Prefer returning values over mutating shared state -For example, without the adapter you must manually convert RxJava types: +Async closures run on separate threads. Mutating shared variables from multiple +closures is a race condition: [source,groovy] ---- -// Without groovy-rxjava — manual conversion boilerplate -def result = Single.just('hello') - .toCompletionStage() - .toCompletableFuture() - .join() - -def items = [] -for (item in Observable.just(1, 2, 3).blockingIterable()) { - items << item -} +// UNSAFE — shared mutation without synchronization +var count = 0 +def tasks = (1..100).collect { async { count++ } } +tasks.each { await it } +// count may not be 100! ---- -With `groovy-rxjava` on the classpath, the adapter is auto-discovered and -the conversion happens transparently: +Instead, return values and collect results: [source,groovy] ---- -// With groovy-rxjava — await and for-await handle the plumbing -def result = await Awaitable.from(Single.just('hello')) - -def items = [] -for await (item in Observable.just(1, 2, 3)) { - items << item -} +// SAFE — no shared mutation +def tasks = (1..100).collect { n -> async { n } } +def results = await Awaitable.all(*tasks) +assert results.sum() == 5050 ---- +When shared mutable state is unavoidable, use the appropriate concurrency-aware +type, e.g. `AtomicInteger` for a shared counter, or thread-safe types from +`java.util.concurrent` for players concurrently drawing cards from a shared deck. + +[[async-choose-right-tool]] +=== Choosing the right tool + +[cols="2,3", options="header"] +|=== +| Feature | Use when... + +| `async`/`await` +| You have sequential steps involving I/O or blocking work and want code that reads top-to-bottom. + +| `Awaitable.all` / `any` / `first` +| You need to launch independent tasks and collect all results, race them, or take the first success. + +| `yield return` / `for await` +| You're producing or consuming a stream of values — paginated APIs, card dealing, log tailing. + +| `defer` +| You acquire resources at different points and want guaranteed cleanup without nested `try`/`finally`. + +| `AsyncChannel` +| Two or more tasks need to communicate — producer/consumer, fan-out/fan-in, or hand-off. + +| `AsyncScope` +| You want child task lifetimes tied to a scope with automatic cancellation on failure. + +| Framework adapters +| You're already using Reactor or RxJava and want `await` / `for await` to work with their types. +|=== + [[async-summary]] -== Summary of Keywords +== Quick Reference [cols="1,3"] |=== -| Keyword | Description - -| `async { ... }` | Execute a closure asynchronously, returning an `Awaitable` -| `await expr` | Block until the result is available; rethrows original exceptions -| `await(a, b, c)` | Wait for all — syntactic sugar for `await Awaitable.all(a, b, c)` -| `yield return expr` | Produce a value from an async generator -| `for await (x in source)` | Iterate over an async source (generator, Flux, Observable, etc.) -| `defer expr` | Schedule a cleanup action (LIFO order) inside an `async` closure -| `AsyncScope.withScope { scope -> ... }` | Structured concurrency scope — all children complete on exit +| Construct | Description + +| `async { ... }` +| Start a closure on a background thread. Returns `Awaitable` (or `Iterable` for generators). + +| `await expr` +| Block until the result is available. Rethrows the original exception. + +| `await(a, b, c)` +| Wait for all — syntactic sugar for `await Awaitable.all(a, b, c)`. + +| `yield return expr` +| Produce a value from an async generator. Consumer blocks until ready. + +| `for await (x in src)` +| Iterate over an async source (generator, channel, Flux, Observable, etc.). + +| `defer expr` +| Schedule a cleanup action (LIFO order) inside an `async` closure. + +| `AsyncChannel.create(n)` +| Create a buffered (or unbuffered) channel for inter-task communication. + +| `AsyncScope.withScope { ... }` +| Structured concurrency — all children complete (or are cancelled) on scope exit. + +| `Awaitable.orTimeoutMillis` +| Apply a deadline. Throws `TimeoutException` if the task exceeds it. + +| `Awaitable.completeOnTimeoutMillis` +| Apply a deadline with a fallback value instead of throwing. + +| `Awaitable.delay(ms)` +| Non-blocking pause. |=== -All three keywords (`async`, `await`, `defer`) are contextual — they can -still be used as variable or method names in existing code. +All keywords (`async`, `await`, `defer`) are contextual — they can still be +used as variable or method names in existing code. diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy b/src/spec/test/AsyncAwaitSpecTest.groovy index 3cfb7d10b5..6d4f3c2aaf 100644 --- a/src/spec/test/AsyncAwaitSpecTest.groovy +++ b/src/spec/test/AsyncAwaitSpecTest.groovy @@ -23,52 +23,79 @@ import static groovy.test.GroovyAssert.assertScript class AsyncAwaitSpecTest { + // === Getting started === + @Test void testBasicAsyncAwait() { assertScript ''' // tag::basic_async_await[] def task = async { 21 * 2 } - assert await(task()) == 42 + assert await(task) == 42 // end::basic_async_await[] ''' } @Test - void testSequentialWorkflow() { + void testDrawCard() { assertScript ''' - // tag::sequential_workflow[] - def fetchUserId = { String token -> 'user-42' } - def fetchUserName = { String id -> 'Alice' } - def loadProfile = { String name -> [name: name, level: 10] } + // tag::draw_card[] + def deck = ['2♠', '3♥', 'K♦', 'A♣'] + def card = async { deck.shuffled()[0] } + println "You drew: ${await card}" + // end::draw_card[] + ''' + } - def task = async { - var userId = fetchUserId('token-abc') - var name = fetchUserName(userId) - var profile = loadProfile(name) - profile + @Test + void testExceptionHandling() { + assertScript ''' + // tag::exception_handling[] + def drawFromEmpty = async { + throw new IllegalStateException('deck is empty') } - def profile = await task() - assert profile.name == 'Alice' - assert profile.level == 10 - // end::sequential_workflow[] + try { + await drawFromEmpty + } catch (IllegalStateException e) { + // Original exception — no CompletionException wrapper + assert e.message == 'deck is empty' + } + // end::exception_handling[] + ''' + } + + @Test + void testCfInterop() { + assertScript ''' + import java.util.concurrent.CompletableFuture + + // tag::cf_interop[] + // await works with CompletableFuture from Java libraries + def future = CompletableFuture.supplyAsync { 'A♠' } + assert await(future) == 'A♠' + // end::cf_interop[] ''' } + // === Parallel tasks and combinators === + @Test - void testParallelTasks() { + void testDealHands() { assertScript ''' import groovy.concurrent.Awaitable - // tag::parallel_tasks[] - def stats = async { [hp: 100, mp: 50] } - def inventory = async { ['sword', 'shield'] } - def villain = async { [name: 'Dragon', level: 20] } + // tag::deal_hands[] + // Deal cards to three players concurrently + def deck = ('A'..'K').collectMany { r -> ['♠','♥','♦','♣'].collect { "$r$it" } }.shuffled() + int i = 0 + def draw5 = { -> deck[i..<(i+5)].tap { i += 5 } } + + def alice = async { draw5() } + def bob = async { draw5() } + def carol = async { draw5() } - def (s, inv, v) = await Awaitable.all(stats(), inventory(), villain()) - assert s.hp == 100 - assert inv.size() == 2 - assert v.name == 'Dragon' - // end::parallel_tasks[] + def (a, b, c) = await Awaitable.all(alice, bob, carol) + assert a.size() == 5 && b.size() == 5 && c.size() == 5 + // end::deal_hands[] ''' } @@ -80,25 +107,26 @@ class AsyncAwaitSpecTest { def b = async { 2 } def c = async { 3 } - // These three forms are equivalent: - def r1 = await(a(), b(), c()) // parenthesized multi-arg - assert r1 == [1, 2, 3] + // Parenthesized multi-arg await (sugar for Awaitable.all): + def results = await(a, b, c) + assert results == [1, 2, 3] // end::multi_arg_await[] ''' } @Test - void testRacingTasks() { + void testFastestServer() { assertScript ''' import groovy.concurrent.Awaitable - // tag::racing_tasks[] - def fast = async { 'fast' } - def slow = async { Thread.sleep(500); 'slow' } + // tag::fastest_server[] + // Race two servers — use whichever responds first + def primary = async { Thread.sleep(200); 'primary-response' } + def fallback = async { 'fallback-response' } - def winner = await Awaitable.any(fast(), slow()) - assert winner == 'fast' - // end::racing_tasks[] + def response = await Awaitable.any(primary, fallback) + assert response == 'fallback-response' + // end::fastest_server[] ''' } @@ -108,12 +136,12 @@ class AsyncAwaitSpecTest { import groovy.concurrent.Awaitable // tag::first_success[] - def failing = async { throw new RuntimeException('fail') } - def succeeding = async { 'ok' } + // Try multiple sources — use the first that succeeds + def failing = async { throw new RuntimeException('server down') } + def succeeding = async { 'card-data-from-cache' } - // Awaitable.first returns the first *successful* result - def result = await Awaitable.first(failing(), succeeding()) - assert result == 'ok' + def result = await Awaitable.first(failing, succeeding) + assert result == 'card-data-from-cache' // end::first_success[] ''' } @@ -124,91 +152,130 @@ class AsyncAwaitSpecTest { import groovy.concurrent.Awaitable // tag::all_settled[] - def ok = async { 42 } - def fail = async { throw new RuntimeException('boom') } + def save1 = async { 42 } + def save2 = async { throw new RuntimeException('db error') } - def results = await Awaitable.allSettled(ok(), fail()) + def results = await Awaitable.allSettled(save1, save2) assert results[0].success && results[0].value == 42 - assert !results[1].success && results[1].error.message == 'boom' + assert !results[1].success && results[1].error.message == 'db error' // end::all_settled[] ''' } + // === Generators and for await === + @Test - void testExceptionHandling() { + void testDeckGenerator() { assertScript ''' - // tag::exception_handling[] - def task = async { - throw new IOException('network error') - } - try { - await task() - } catch (IOException e) { - // Original exception is rethrown — no wrapper layers - assert e.message == 'network error' + // tag::deck_generator[] + def dealCards = async { + def suits = ['♠', '♥', '♦', '♣'] + def ranks = ['A', '2', '3', '4', '5', '6', '7', '8', '9', '10', 'J', 'Q', 'K'] + for (suit in suits) { + for (rank in ranks) { + yield return "$rank$suit" + } + } } - // end::exception_handling[] + def cards = dealCards.collect() + assert cards.size() == 52 + assert cards.first() == 'A♠' + assert cards.last() == 'K♣' + // end::deck_generator[] ''' } @Test - void testTimeout() { + void testForAwaitGenerator() { assertScript ''' - import groovy.concurrent.Awaitable - import java.util.concurrent.TimeoutException - - // tag::timeout[] - def slow = async { Thread.sleep(5000); 'done' } - try { - await Awaitable.orTimeoutMillis(slow(), 100) - } catch (TimeoutException e) { - assert true // timed out as expected + // tag::for_await_generator[] + def topCards = async { + for (card in ['A♠', 'K♥', 'Q♦']) { + yield return card + } } - // end::timeout[] + def hand = [] + for await (card in topCards) { + hand << card + } + assert hand == ['A♠', 'K♥', 'Q♦'] + // end::for_await_generator[] ''' } @Test - void testTimeoutWithFallback() { + void testForAwaitPlainCollection() { assertScript ''' - import groovy.concurrent.Awaitable + // tag::for_await_collection[] + def results = [] + for await (card in ['A♠', 'K♥', 'Q♦']) { + results << card + } + assert results == ['A♠', 'K♥', 'Q♦'] + // end::for_await_collection[] + ''' + } - // tag::timeout_fallback[] - def slow = async { Thread.sleep(5000); 'done' } - def result = await Awaitable.completeOnTimeoutMillis(slow(), 'fallback', 100) - assert result == 'fallback' - // end::timeout_fallback[] + @Test + void testGeneratorRegularFor() { + assertScript ''' + // tag::generator_regular_for[] + def scores = async { + for (s in [100, 250, 75]) { yield return s } + } + // Generators return Iterable — regular for and collect work + assert scores.collect { it * 2 } == [200, 500, 150] + // end::generator_regular_for[] ''' } + // === Channels === + @Test - void testDelay() { + void testChannel() { assertScript ''' - import groovy.concurrent.Awaitable + import groovy.concurrent.AsyncChannel - // tag::delay[] - long start = System.currentTimeMillis() - await Awaitable.delay(100) - long elapsed = System.currentTimeMillis() - start - assert elapsed >= 90 - // end::delay[] + // tag::channel[] + def cardStream = AsyncChannel.create(3) + + // Dealer — sends cards concurrently + async { + for (card in ['A♠', 'K♥', 'Q♦', 'J♣']) { + await cardStream.send(card) + } + cardStream.close() + } + + // Player — receives cards as they arrive + def hand = [] + for await (card in cardStream) { + hand << card + } + assert hand == ['A♠', 'K♥', 'Q♦', 'J♣'] + // end::channel[] ''' } + // === Defer === + @Test void testDeferBasic() { assertScript ''' // tag::defer_basic[] def log = [] def task = async { - defer { log << 'cleanup 1' } - defer { log << 'cleanup 2' } - log << 'work' - 'done' + log << 'open connection' + defer { log << 'close connection' } + log << 'open transaction' + defer { log << 'close transaction' } + log << 'save game state' + 'saved' } - assert await(task()) == 'done' - // Deferred actions run in LIFO order - assert log == ['work', 'cleanup 2', 'cleanup 1'] + assert await(task) == 'saved' + // Deferred actions run in LIFO order — last registered, first to run + assert log == ['open connection', 'open transaction', 'save game state', + 'close transaction', 'close connection'] // end::defer_basic[] ''' } @@ -220,58 +287,37 @@ class AsyncAwaitSpecTest { def cleaned = false def task = async { defer { cleaned = true } - throw new RuntimeException('oops') + throw new RuntimeException('save failed') } try { - await task() + await task } catch (RuntimeException e) { - assert e.message == 'oops' + assert e.message == 'save failed' } - // Deferred actions still run even when an exception occurs + // Deferred actions run even when an exception occurs assert cleaned // end::defer_exception[] ''' } - @Test - void testGoSpawn() { - assertScript ''' - import groovy.concurrent.Awaitable - - // tag::go_spawn[] - def task = Awaitable.go { 'spawned' } - assert await(task) == 'spawned' - // end::go_spawn[] - ''' - } + // === Structured concurrency === @Test - void testCompletableFutureInterop() { - assertScript ''' - import java.util.concurrent.CompletableFuture - - // tag::cf_interop[] - // await works with CompletableFuture from Java libraries - def future = CompletableFuture.supplyAsync { 'from Java' } - assert await(future) == 'from Java' - // end::cf_interop[] - ''' - } - - @Test - void testStructuredConcurrency() { + void testTournamentScope() { assertScript ''' import groovy.concurrent.AsyncScope // tag::structured_concurrency[] - def result = AsyncScope.withScope { scope -> - def user = scope.async { [name: 'Alice'] } - def orders = scope.async { ['order-1', 'order-2'] } - [user: await(user), orders: await(orders)] + // Run a tournament round — all tables play concurrently + def results = AsyncScope.withScope { scope -> + def table1 = scope.async { [winner: 'Alice', score: 320] } + def table2 = scope.async { [winner: 'Bob', score: 280] } + def table3 = scope.async { [winner: 'Carol', score: 410] } + [await(table1), await(table2), await(table3)] } - // Both tasks guaranteed complete when withScope returns - assert result.user.name == 'Alice' - assert result.orders.size() == 2 + // All tables guaranteed complete when withScope returns + assert results.size() == 3 + assert results.max { it.score }.winner == 'Carol' // end::structured_concurrency[] ''' } @@ -284,12 +330,12 @@ class AsyncAwaitSpecTest { // tag::scope_fail_fast[] try { AsyncScope.withScope { scope -> - scope.async { Thread.sleep(5000); 'slow' } - scope.async { throw new RuntimeException('fail!') } + scope.async { Thread.sleep(5000); 'still playing' } + scope.async { throw new RuntimeException('player disconnected') } } } catch (RuntimeException e) { - // The first failure cancels siblings and propagates - assert e.message == 'fail!' + // First failure cancels all siblings and propagates + assert e.message == 'player disconnected' } // end::scope_fail_fast[] ''' @@ -302,85 +348,59 @@ class AsyncAwaitSpecTest { import java.util.concurrent.atomic.AtomicInteger // tag::scope_waits[] - def counter = new AtomicInteger(0) + def completed = new AtomicInteger(0) AsyncScope.withScope { scope -> - 3.times { scope.async { Thread.sleep(50); counter.incrementAndGet() } } + 3.times { scope.async { Thread.sleep(50); completed.incrementAndGet() } } } - // All children have completed - assert counter.get() == 3 + // All children have completed — even without explicit await + assert completed.get() == 3 // end::scope_waits[] ''' } - @Test - void testYieldReturn() { - assertScript ''' - // tag::yield_return[] - def fibonacci = async { - long a = 0, b = 1 - for (i in 1..10) { - yield return a - (a, b) = [b, a + b] - } - } - assert fibonacci().collect() == [0, 1, 1, 2, 3, 5, 8, 13, 21, 34] - // end::yield_return[] - ''' - } + // === Timeouts and delays === @Test - void testGeneratorWithRegularFor() { + void testTimeout() { assertScript ''' - // tag::generator_regular_for[] - def countdown = async { - for (i in 5..1) { - yield return i - } - } - // Generators return Iterable — regular for loop works - def results = [] - for (n in countdown()) { - results << n - } - assert results == [5, 4, 3, 2, 1] + import groovy.concurrent.Awaitable + import java.util.concurrent.TimeoutException - // Collection methods work too - assert countdown().collect { it * 10 } == [50, 40, 30, 20, 10] - // end::generator_regular_for[] + // tag::timeout[] + def slowPlayer = async { Thread.sleep(5000); 'finally played' } + try { + await Awaitable.orTimeoutMillis(slowPlayer, 100) + } catch (TimeoutException e) { + // Player took too long — turn forfeited + assert true + } + // end::timeout[] ''' } @Test - void testForAwaitWithoutGenerator() { + void testTimeoutFallback() { assertScript ''' - import java.util.concurrent.CompletableFuture + import groovy.concurrent.Awaitable - // tag::for_await_no_generator[] - // for await works with any iterable — no generator needed - def results = [] - for await (item in ['alpha', 'beta', 'gamma']) { - results << item.toUpperCase() - } - assert results == ['ALPHA', 'BETA', 'GAMMA'] - // end::for_await_no_generator[] + // tag::timeout_fallback[] + def slowPlayer = async { Thread.sleep(5000); 'deliberate move' } + def move = await Awaitable.completeOnTimeoutMillis(slowPlayer, 'auto-pass', 100) + assert move == 'auto-pass' + // end::timeout_fallback[] ''' } @Test - void testForAwait() { + void testDelay() { assertScript ''' - // tag::for_await[] - def squares = async { - for (i in 1..5) { - yield return i * i - } - } - def results = [] - for await (n in squares()) { - results << n - } - assert results == [1, 4, 9, 16, 25] - // end::for_await[] + import groovy.concurrent.Awaitable + + // tag::delay[] + long start = System.currentTimeMillis() + await Awaitable.delay(100) // pause without blocking a thread + assert System.currentTimeMillis() - start >= 90 + // end::delay[] ''' } } diff --git a/src/test/groovy/groovy/AsyncAwaitTest.groovy b/src/test/groovy/groovy/AsyncAwaitTest.groovy index a96636daa4..ca2d53894f 100644 --- a/src/test/groovy/groovy/AsyncAwaitTest.groovy +++ b/src/test/groovy/groovy/AsyncAwaitTest.groovy @@ -28,12 +28,10 @@ final class AsyncAwaitTest { // === Layer 1: basic async/await === @Test - void testAsyncClosureAndAwait() { + void testAsyncAndAwait() { assertScript ''' - import groovy.concurrent.Awaitable - def task = async { 21 * 2 } - def result = await task() + def result = await task assert result == 42 ''' } @@ -54,7 +52,7 @@ final class AsyncAwaitTest { assertScript ''' def task = async { throw new IOException('test error') } try { - await task() + await task } catch (IOException e) { assert e.message == 'test error' return @@ -71,23 +69,14 @@ final class AsyncAwaitTest { def counter = new AtomicInteger(0) def task1 = async { Thread.sleep(50); counter.incrementAndGet(); 'a' } def task2 = async { Thread.sleep(50); counter.incrementAndGet(); 'b' } - def r1 = await task1() - def r2 = await task2() + def r1 = await task1 + def r2 = await task2 assert counter.get() == 2 assert r1 == 'a' assert r2 == 'b' ''' } - @Test - void testAsyncWithClosureArgs() { - assertScript ''' - def multiply = async { x, y -> x * y } - def result = await multiply(6, 7) - assert result == 42 - ''' - } - @Test void testAwaitNull() { assertScript ''' @@ -98,7 +87,6 @@ final class AsyncAwaitTest { @Test void testAsyncKeywordAsVariable() { - // 'async' can still be used as a variable name assertScript ''' def async = 'hello' assert async.toUpperCase() == 'HELLO' @@ -121,7 +109,7 @@ final class AsyncAwaitTest { def a = async { 1 } def b = async { 2 } def c = async { 3 } - def results = await(a(), b(), c()) + def results = await(a, b, c) assert results == [1, 2, 3] ''' } @@ -133,7 +121,7 @@ final class AsyncAwaitTest { def a = async { 'x' } def b = async { 'y' } - def results = await Awaitable.all(a(), b()) + def results = await Awaitable.all(a, b) assert results == ['x', 'y'] ''' } @@ -145,7 +133,7 @@ final class AsyncAwaitTest { def fast = async { 'fast' } def slow = async { Thread.sleep(500); 'slow' } - def result = await Awaitable.any(fast(), slow()) + def result = await Awaitable.any(fast, slow) assert result == 'fast' ''' } @@ -157,7 +145,7 @@ final class AsyncAwaitTest { def fail1 = async { throw new RuntimeException('fail') } def success = async { 'ok' } - def result = await Awaitable.first(fail1(), success()) + def result = await Awaitable.first(fail1, success) assert result == 'ok' ''' } @@ -169,7 +157,7 @@ final class AsyncAwaitTest { def ok = async { 42 } def fail = async { throw new RuntimeException('boom') } - def results = await Awaitable.allSettled(ok(), fail()) + def results = await Awaitable.allSettled(ok, fail) assert results.size() == 2 assert results[0].success assert results[0].value == 42 @@ -186,7 +174,7 @@ final class AsyncAwaitTest { long start = System.currentTimeMillis() await Awaitable.delay(100) long elapsed = System.currentTimeMillis() - start - assert elapsed >= 90 // allow small timing variance + assert elapsed >= 90 ''' } @@ -198,7 +186,7 @@ final class AsyncAwaitTest { def slow = async { Thread.sleep(5000); 'done' } try { - await Awaitable.orTimeoutMillis(slow(), 100) + await Awaitable.orTimeoutMillis(slow, 100) assert false : 'Should have timed out' } catch (TimeoutException e) { assert true @@ -212,7 +200,7 @@ final class AsyncAwaitTest { import groovy.concurrent.Awaitable def slow = async { Thread.sleep(5000); 'done' } - def result = await Awaitable.completeOnTimeoutMillis(slow(), 'fallback', 100) + def result = await Awaitable.completeOnTimeoutMillis(slow, 'fallback', 100) assert result == 'fallback' ''' } @@ -261,7 +249,6 @@ final class AsyncAwaitTest { void testScopeFailFastCancelsSiblings() { assertScript ''' import groovy.concurrent.AsyncScope - import java.util.concurrent.CancellationException def slowStarted = new java.util.concurrent.CountDownLatch(1) def result = null @@ -297,7 +284,6 @@ final class AsyncAwaitTest { scope.async { Thread.sleep(50); counter.incrementAndGet() } scope.async { Thread.sleep(50); counter.incrementAndGet() } } - // All three must have completed by the time withScope returns assert counter.get() == 3 ''' } @@ -325,8 +311,6 @@ final class AsyncAwaitTest { assert AsyncScope.current() == null AsyncScope.withScope { scope -> - // Inside the scope, current() returns the scope - // (but we're on the calling thread, not a child thread) def childSaw = scope.async { AsyncScope.current() != null } @@ -373,13 +357,13 @@ final class AsyncAwaitTest { @Test void testYieldReturnBasic() { assertScript ''' - def gen = async { + def items = async { yield return 1 yield return 2 yield return 3 } def results = [] - for (item in gen()) { + for (item in items) { results << item } assert results == [1, 2, 3] @@ -389,37 +373,121 @@ final class AsyncAwaitTest { @Test void testYieldReturnWithLoop() { assertScript ''' - def range = async { + def items = async { for (i in 1..5) { yield return i * 10 } } - assert range().collect() == [10, 20, 30, 40, 50] + assert items.collect() == [10, 20, 30, 40, 50] ''' } + // === channels === + @Test - void testYieldReturnWithArgs() { + void testChannelBasic() { assertScript ''' - def repeat = async { value, times -> - for (i in 1..times) { - yield return value + import groovy.concurrent.AsyncChannel + + def ch = AsyncChannel.create(2) + async { + await ch.send('a') + await ch.send('b') + ch.close() + } + def results = [] + for await (item in ch) { + results << item + } + assert results == ['a', 'b'] + ''' + } + + @Test + void testChannelUnbuffered() { + assertScript ''' + import groovy.concurrent.AsyncChannel + + def ch = AsyncChannel.create() // rendezvous + async { + await ch.send(1) + await ch.send(2) + await ch.send(3) + ch.close() + } + def results = [] + for (item in ch) { + results << item + } + assert results == [1, 2, 3] + ''' + } + + @Test + void testChannelProducerConsumer() { + assertScript ''' + import groovy.concurrent.AsyncChannel + + def ch = AsyncChannel.create(3) + // Producer + async { + for (i in 1..5) { + await ch.send(i * 10) } + ch.close() + } + // Consumer + def sum = 0 + for await (value in ch) { + sum += value + } + assert sum == 150 + ''' + } + + @Test + void testChannelClosedForSend() { + assertScript ''' + import groovy.concurrent.AsyncChannel + import groovy.concurrent.ChannelClosedException + + def ch = AsyncChannel.create(1) + ch.close() + try { + await ch.send('too late') + assert false : 'Should have thrown' + } catch (ChannelClosedException e) { + assert e.message.contains('closed') } - assert repeat('x', 3).collect() == ['x', 'x', 'x'] + ''' + } + + @Test + void testChannelQueryMethods() { + assertScript ''' + import groovy.concurrent.AsyncChannel + + def ch = AsyncChannel.create(5) + assert ch.capacity == 5 + assert ch.bufferedSize == 0 + assert !ch.closed + await ch.send('x') + assert ch.bufferedSize == 1 + ch.close() + assert ch.closed ''' } @Test void testYieldReturnExceptionPropagation() { assertScript ''' - def gen = async { + def items = async { yield return 1 throw new RuntimeException('generator error') } def results = [] try { - for (item in gen()) { + for (item in items) { results << item } assert false : 'Should have thrown' @@ -446,13 +514,13 @@ final class AsyncAwaitTest { @Test void testForAwaitWithGenerator() { assertScript ''' - def gen = async { + def items = async { yield return 'a' yield return 'b' yield return 'c' } def results = [] - for await (item in gen()) { + for await (item in items) { results << item.toUpperCase() } assert results == ['A', 'B', 'C'] @@ -471,19 +539,18 @@ final class AsyncAwaitTest { ''' } - // === regular for loop with generators (no for-await needed) === + // === regular for loop with generators === @Test void testRegularForLoopWithGenerator() { - // Generators return Iterable, so regular for loop works too assertScript ''' - def gen = async { + def items = async { yield return 'a' yield return 'b' yield return 'c' } def results = [] - for (item in gen()) { + for (item in items) { results << item } assert results == ['a', 'b', 'c'] @@ -492,15 +559,13 @@ final class AsyncAwaitTest { @Test void testCollectWithGenerator() { - // Standard Groovy collection methods work on generator output assertScript ''' def squares = async { for (i in 1..5) { yield return i * i } } - assert squares().collect() == [1, 4, 9, 16, 25] - assert squares().collect { it * 2 } == [2, 8, 18, 32, 50] + assert squares.collect() == [1, 4, 9, 16, 25] ''' } @@ -515,7 +580,7 @@ final class AsyncAwaitTest { log << 'work' 'result' } - def result = await task() + def result = await task assert result == 'result' assert log == ['work', 'cleanup'] ''' @@ -531,7 +596,7 @@ final class AsyncAwaitTest { log << 'body' 'done' } - await task() + await task assert log == ['body', 'second registered, first to run', 'first registered, last to run'] ''' } @@ -545,7 +610,7 @@ final class AsyncAwaitTest { throw new RuntimeException('oops') } try { - await task() + await task } catch (RuntimeException e) { assert e.message == 'oops' } @@ -562,7 +627,7 @@ final class AsyncAwaitTest { log << 'body' 42 } - assert await(task()) == 42 + assert await(task) == 42 assert log == ['body', 'deferred closure'] ''' } @@ -575,7 +640,6 @@ final class AsyncAwaitTest { import org.apache.groovy.runtime.async.AsyncSupport import java.util.concurrent.Executors - def threadNames = Collections.synchronizedList([]) def exec = Executors.newFixedThreadPool(2) { r -> def t = new Thread(r, 'custom-pool') t.daemon = true @@ -584,7 +648,7 @@ final class AsyncAwaitTest { try { AsyncSupport.setExecutor(exec) def task = async { Thread.currentThread().name } - def name = await task() + def name = await task assert name == 'custom-pool' } finally { AsyncSupport.resetExecutor() @@ -598,7 +662,6 @@ final class AsyncAwaitTest { assertScript ''' import org.apache.groovy.runtime.async.AsyncSupport - // Just verify this doesn't throw — actual value depends on JDK version def available = AsyncSupport.isVirtualThreadsAvailable() assert available instanceof Boolean '''
