This is an automated email from the ASF dual-hosted git repository. sunlan pushed a commit to branch GROOVY-9381_3 in repository https://gitbox.apache.org/repos/asf/groovy.git
commit 92f450c013a2eef06ae03e15ea96127bd1ef295d Author: Daniel Sun <[email protected]> AuthorDate: Tue Mar 3 00:08:40 2026 +0900 Minor tweaks --- src/antlr/GroovyParser.g4 | 2 +- .../concurrent/AwaitableAdapterRegistry.java | 74 ++++++++++++++-------- .../apache/groovy/parser/antlr4/AstBuilder.java | 27 +++++--- .../groovy/runtime/async/AsyncStreamGenerator.java | 15 +++-- .../apache/groovy/runtime/async/AsyncSupport.java | 67 ++++++++++++++------ .../apache/groovy/runtime/async/GroovyPromise.java | 10 +-- src/spec/doc/core-async-await.adoc | 27 ++++++++ .../groovy/transform/AsyncAwaitSyntaxTest.groovy | 3 +- .../groovy/transform/AsyncPatternsTest.groovy | 2 +- .../groovy/transform/AsyncVirtualThreadTest.groovy | 3 +- 10 files changed, 156 insertions(+), 74 deletions(-) diff --git a/src/antlr/GroovyParser.g4 b/src/antlr/GroovyParser.g4 index a9a28a991e..d78dcdb649 100644 --- a/src/antlr/GroovyParser.g4 +++ b/src/antlr/GroovyParser.g4 @@ -644,7 +644,7 @@ statement | { inSwitchExpressionLevel > 0 }? yieldStatement #yieldStmtAlt | YIELD RETURN nls expression #yieldReturnStmtAlt - | DEFER nls (closureOrLambdaExpression | statementExpression) #deferStmtAlt + | DEFER nls statementExpression #deferStmtAlt | identifier COLON nls statement #labeledStmtAlt | assertStatement #assertStmtAlt | localVariableDeclaration #localVariableDeclarationStmtAlt diff --git a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java index 8ee415dd27..c9e3c734e7 100644 --- a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java +++ b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java @@ -23,11 +23,16 @@ import org.apache.groovy.runtime.async.GroovyPromise; import java.util.Iterator; import java.util.List; import java.util.ServiceLoader; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.Flow; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; /** * Central registry for {@link AwaitableAdapter} instances. @@ -38,7 +43,7 @@ import java.util.concurrent.Future; * <ul> * <li>{@link CompletableFuture} and {@link CompletionStage}</li> * <li>{@link Future} (adapted via a blocking wrapper)</li> - * <li>JDK {@link java.util.concurrent.Flow.Publisher} — single-value + * <li>JDK {@link Flow.Publisher} — single-value * ({@link #toAwaitable}) and multi-value ({@link #toAsyncStream}) * with backpressure support</li> * </ul> @@ -107,10 +112,15 @@ public class AwaitableAdapterRegistry { * Converts the given source to an {@link Awaitable}. * If the source is already an {@code Awaitable}, it is returned as-is. * - * @throws IllegalArgumentException if no adapter supports the source type + * @param source the source object; must not be {@code null} + * @throws IllegalArgumentException if {@code source} is {@code null} + * or no adapter supports the source type */ @SuppressWarnings("unchecked") public static <T> Awaitable<T> toAwaitable(Object source) { + if (source == null) { + throw new IllegalArgumentException("Cannot convert null to Awaitable"); + } if (source instanceof Awaitable) return (Awaitable<T>) source; Class<?> type = source.getClass(); for (AwaitableAdapter adapter : ADAPTERS) { @@ -127,10 +137,15 @@ public class AwaitableAdapterRegistry { * Converts the given source to an {@link AsyncStream}. * If the source is already an {@code AsyncStream}, it is returned as-is. * - * @throws IllegalArgumentException if no adapter supports the source type + * @param source the source object; must not be {@code null} + * @throws IllegalArgumentException if {@code source} is {@code null} + * or no adapter supports the source type */ @SuppressWarnings("unchecked") public static <T> AsyncStream<T> toAsyncStream(Object source) { + if (source == null) { + throw new IllegalArgumentException("Cannot convert null to AsyncStream"); + } if (source instanceof AsyncStream) return (AsyncStream<T>) source; Class<?> type = source.getClass(); for (AwaitableAdapter adapter : ADAPTERS) { @@ -145,14 +160,14 @@ public class AwaitableAdapterRegistry { /** * Built-in adapter handling JDK {@link CompletableFuture}, {@link CompletionStage}, - * {@link Future}, {@link java.util.concurrent.Flow.Publisher}, + * {@link Future}, {@link Flow.Publisher}, * and {@link Iterable}/{@link Iterator} (for async stream bridging). * <p> * {@link CompletionStage} support enables seamless integration with frameworks * that return {@code CompletionStage} (e.g., Spring's async APIs, Reactor's * {@code Mono.toFuture()}, etc.) without any additional adapter registration. * <p> - * {@link java.util.concurrent.Flow.Publisher} support enables seamless + * {@link Flow.Publisher} support enables seamless * consumption of reactive streams via {@code for await} without any adapter * registration. This covers any reactive library that implements the JDK * standard reactive-streams interface (Reactor, RxJava via adapters, etc.). @@ -163,7 +178,7 @@ public class AwaitableAdapterRegistry { public boolean supportsAwaitable(Class<?> type) { return CompletionStage.class.isAssignableFrom(type) || Future.class.isAssignableFrom(type) - || java.util.concurrent.Flow.Publisher.class.isAssignableFrom(type); + || Flow.Publisher.class.isAssignableFrom(type); } @Override @@ -172,7 +187,7 @@ public class AwaitableAdapterRegistry { if (source instanceof CompletionStage) { return new GroovyPromise<>(((CompletionStage<T>) source).toCompletableFuture()); } - if (source instanceof java.util.concurrent.Flow.Publisher<?> pub) { + if (source instanceof Flow.Publisher<?> pub) { return publisherToAwaitable(pub); } if (source instanceof Future) { @@ -197,14 +212,14 @@ public class AwaitableAdapterRegistry { public boolean supportsAsyncStream(Class<?> type) { return Iterable.class.isAssignableFrom(type) || Iterator.class.isAssignableFrom(type) - || java.util.concurrent.Flow.Publisher.class.isAssignableFrom(type); + || Flow.Publisher.class.isAssignableFrom(type); } @Override @SuppressWarnings("unchecked") public <T> AsyncStream<T> toAsyncStream(Object source) { - if (source instanceof java.util.concurrent.Flow.Publisher<?> pub) { - return publisherToAsyncStream((java.util.concurrent.Flow.Publisher<T>) pub); + if (source instanceof Flow.Publisher<?> pub) { + return publisherToAsyncStream((Flow.Publisher<T>) pub); } final Iterator<T> iterator; if (source instanceof Iterable) { @@ -232,23 +247,28 @@ public class AwaitableAdapterRegistry { } /** - * Adapts a {@link java.util.concurrent.Flow.Publisher} to an + * Adapts a {@link Flow.Publisher} to an * {@link AsyncStream} using a blocking queue to bridge the push-based * reactive-streams protocol to the pull-based {@code moveNext}/{@code getCurrent} * pattern. Backpressure is managed by requesting one item at a time: * each {@code moveNext()} call requests the next item from the upstream * subscription only after the previous item has been consumed. + * <p> + * {@link Throwable} instances are delivered as exceptional completions. + * Thread interruption during {@code queue.take()} is converted to a + * {@link CancellationException} with the interrupt + * flag preserved, consistent with the cancellation semantics used elsewhere. */ @SuppressWarnings("unchecked") - private static <T> AsyncStream<T> publisherToAsyncStream(java.util.concurrent.Flow.Publisher<T> publisher) { - java.util.concurrent.LinkedBlockingQueue<Object> queue = new java.util.concurrent.LinkedBlockingQueue<>(); + private static <T> AsyncStream<T> publisherToAsyncStream(Flow.Publisher<T> publisher) { + BlockingQueue<Object> queue = new LinkedBlockingQueue<>(); Object COMPLETE_SENTINEL = new Object(); - java.util.concurrent.atomic.AtomicReference<java.util.concurrent.Flow.Subscription> subRef = - new java.util.concurrent.atomic.AtomicReference<>(); + AtomicReference<Flow.Subscription> subRef = + new AtomicReference<>(); - publisher.subscribe(new java.util.concurrent.Flow.Subscriber<T>() { + publisher.subscribe(new Flow.Subscriber<T>() { @Override - public void onSubscribe(java.util.concurrent.Flow.Subscription s) { + public void onSubscribe(Flow.Subscription s) { subRef.set(s); s.request(1); } @@ -260,7 +280,9 @@ public class AwaitableAdapterRegistry { @Override public void onError(Throwable t) { - queue.add(t instanceof Exception ? t : new RuntimeException(t)); + // Preserve the original Throwable type (including Error) so that + // consumers see the exact exception/error from the publisher. + queue.add(t); } @Override @@ -285,12 +307,14 @@ public class AwaitableAdapterRegistry { current = (T) item; cf.complete(true); // Request next item for the subsequent moveNext() call - java.util.concurrent.Flow.Subscription sub = subRef.get(); + Flow.Subscription sub = subRef.get(); if (sub != null) sub.request(1); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - cf.completeExceptionally(e); + CancellationException ce = new CancellationException("Interrupted while waiting for next item"); + ce.initCause(e); + cf.completeExceptionally(ce); } return new GroovyPromise<>(cf); } @@ -303,17 +327,17 @@ public class AwaitableAdapterRegistry { } /** - * Adapts a single-value {@link java.util.concurrent.Flow.Publisher} to + * Adapts a single-value {@link Flow.Publisher} to * an {@link Awaitable}. Subscribes and takes the first emitted value. */ @SuppressWarnings("unchecked") - private static <T> Awaitable<T> publisherToAwaitable(java.util.concurrent.Flow.Publisher<?> publisher) { + private static <T> Awaitable<T> publisherToAwaitable(Flow.Publisher<?> publisher) { CompletableFuture<T> cf = new CompletableFuture<>(); - publisher.subscribe(new java.util.concurrent.Flow.Subscriber<Object>() { - private java.util.concurrent.Flow.Subscription subscription; + publisher.subscribe(new Flow.Subscriber<Object>() { + private Flow.Subscription subscription; @Override - public void onSubscribe(java.util.concurrent.Flow.Subscription s) { + public void onSubscribe(Flow.Subscription s) { this.subscription = s; s.request(1); } diff --git a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java index 2e4ece72be..ddcf2838b8 100644 --- a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java +++ b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java @@ -45,7 +45,6 @@ import org.apache.groovy.parser.antlr4.util.StringUtils; import org.apache.groovy.util.Maps; import org.apache.groovy.util.SystemUtil; import org.codehaus.groovy.GroovyBugError; -import org.codehaus.groovy.transform.AsyncTransformHelper; import org.codehaus.groovy.antlr.EnumHelper; import org.codehaus.groovy.ast.ASTNode; import org.codehaus.groovy.ast.AnnotationNode; @@ -98,7 +97,6 @@ import org.codehaus.groovy.ast.expr.PropertyExpression; import org.codehaus.groovy.ast.expr.RangeExpression; import org.codehaus.groovy.ast.expr.SpreadExpression; import org.codehaus.groovy.ast.expr.SpreadMapExpression; -import org.codehaus.groovy.ast.expr.StaticMethodCallExpression; import org.codehaus.groovy.ast.expr.TernaryExpression; import org.codehaus.groovy.ast.expr.TupleExpression; import org.codehaus.groovy.ast.expr.UnaryMinusExpression; @@ -133,6 +131,7 @@ import org.codehaus.groovy.runtime.StringGroovyMethods; import org.codehaus.groovy.syntax.Numbers; import org.codehaus.groovy.syntax.SyntaxException; import org.codehaus.groovy.syntax.Types; +import org.codehaus.groovy.transform.AsyncTransformHelper; import org.objectweb.asm.Opcodes; import java.io.BufferedReader; @@ -485,6 +484,11 @@ public class AstBuilder extends GroovyParserBaseVisitor<Object> { * over an {@link groovy.concurrent.AsyncStream}: the source expression is * adapted via {@code AsyncSupport.toAsyncStream()}, then repeatedly polled * with {@code moveNext()} / {@code getCurrent()}. + * <p> + * Variable modifiers (e.g. {@code final}) from the enhanced-for declaration + * are applied to the synthesised loop variable, consistent with the + * standard {@code for (... in ...)} handling in + * {@link #visitEnhancedForControl}. */ private Statement visitForAwait(final ForStmtAltContext ctx) { ForControlContext forCtrl = ctx.forControl(); @@ -498,6 +502,12 @@ public class AstBuilder extends GroovyParserBaseVisitor<Object> { Expression source = (Expression) this.visit(enhCtrl.expression()); Statement loopBody = this.unpackStatement((Statement) this.visit(ctx.statement())); + // Apply variable modifiers (e.g. final) to the loop variable + VariableExpression loopVar = varX(varName, varType); + ModifierManager modifierManager = new ModifierManager(this, + this.visitVariableModifiersOpt(enhCtrl.variableModifiersOpt())); + modifierManager.processVariableExpression(loopVar); + String streamVar = "$__asyncStream__" + (asyncStreamCounter++); // def $__asyncStream__N = AsyncSupport.toAsyncStream(source) @@ -513,7 +523,7 @@ public class AstBuilder extends GroovyParserBaseVisitor<Object> { // def <varName> = $__asyncStream__N.getCurrent() Expression getCurrentCall = callX(varX(streamVar), "getCurrent"); - ExpressionStatement getItemStmt = new ExpressionStatement(declX(varX(varName, varType), getCurrentCall)); + ExpressionStatement getItemStmt = new ExpressionStatement(declX(loopVar, getCurrentCall)); BlockStatement whileBody = block(getItemStmt, loopBody); WhileStatement whileStmt = new WhileStatement(condition, whileBody); @@ -925,13 +935,14 @@ public class AstBuilder extends GroovyParserBaseVisitor<Object> { @Override public ExpressionStatement visitDeferStmtAlt(final DeferStmtAltContext ctx) { Expression action; - if (ctx.closureOrLambdaExpression() != null) { - action = this.visitClosureOrLambdaExpression(ctx.closureOrLambdaExpression()); + ExpressionStatement stmtExprStmt = (ExpressionStatement) this.visit(ctx.statementExpression()); + Expression expr = stmtExprStmt.getExpression(); + if (expr instanceof ClosureExpression) { + // Already a closure/lambda — use directly as the defer action + action = expr; } else { // Wrap the statement expression in a closure: { -> expr } - ExpressionStatement stmtExprStmt = (ExpressionStatement) this.visit(ctx.statementExpression()); - ClosureExpression wrapper = closureX(Parameter.EMPTY_ARRAY, - block(stmtExprStmt)); + ClosureExpression wrapper = closureX(Parameter.EMPTY_ARRAY, block(stmtExprStmt)); wrapper.setSourcePosition(stmtExprStmt); action = wrapper; } diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java index 3bac2d95ca..e341c30def 100644 --- a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java +++ b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java @@ -66,23 +66,29 @@ public class AsyncStreamGenerator<T> implements AsyncStream<T> { /** * Signals that the generator has completed (no more elements). + * If interrupted, the completion signal is delivered on a best-effort basis. */ public void complete() { try { queue.put(DONE); } catch (InterruptedException e) { Thread.currentThread().interrupt(); + // Best-effort delivery: use non-blocking offer as fallback + queue.offer(DONE); } } /** * Signals that the generator failed with an exception. + * If interrupted, the error signal is delivered on a best-effort basis. */ public void error(Throwable t) { try { - queue.put(new ErrorItem(t)); + queue.put(new ErrorItem(t != null ? t : new NullPointerException("null error in generator"))); } catch (InterruptedException e) { Thread.currentThread().interrupt(); + // Best-effort delivery: use non-blocking offer as fallback + queue.offer(new ErrorItem(t != null ? t : new NullPointerException("null error in generator"))); } } @@ -97,7 +103,7 @@ public class AsyncStreamGenerator<T> implements AsyncStream<T> { if (next instanceof ErrorItem ei) { Throwable cause = ei.error; if (cause instanceof Error err) throw err; - throw sneakyThrow(cause); + throw AsyncSupport.sneakyThrow(cause); } current = (T) ((Item) next).value; return Awaitable.of(true); @@ -115,9 +121,4 @@ public class AsyncStreamGenerator<T> implements AsyncStream<T> { // Wrapper to handle null values in the queue private record Item(Object value) { } private record ErrorItem(Throwable error) { } - - @SuppressWarnings("unchecked") - private static <T extends Throwable> RuntimeException sneakyThrow(Throwable t) throws T { - throw (T) t; - } } diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java index 4ceca8c5a1..a7ba01d775 100644 --- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java +++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java @@ -31,6 +31,8 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.UndeclaredThrowableException; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Deque; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; @@ -42,6 +44,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** @@ -136,10 +140,10 @@ public class AsyncSupport { // reaped after 60 s) to avoid thread starvation. Async/await relies // on blocking operations (SynchronousQueue in generators, Future.get // in await) — a fixed-size pool can deadlock when all threads block. - FALLBACK_EXECUTOR = new java.util.concurrent.ThreadPoolExecutor( + FALLBACK_EXECUTOR = new ThreadPoolExecutor( 0, FALLBACK_MAX_THREADS, 60L, TimeUnit.SECONDS, - new java.util.concurrent.SynchronousQueue<>(), + new SynchronousQueue<>(), r -> { Thread t = new Thread(r); t.setDaemon(true); @@ -422,10 +426,11 @@ public class AsyncSupport { */ @SuppressWarnings("unchecked") public static List<Object> awaitAll(Object... awaitables) { - CompletableFuture<?>[] futures = new CompletableFuture[awaitables.length]; - for (int i = 0; i < awaitables.length; i++) { - futures[i] = toCompletableFuture(awaitables[i]); + if (awaitables == null || awaitables.length == 0) { + return new ArrayList<>(); } + CompletableFuture<?>[] futures = + Arrays.stream(awaitables).map(AsyncSupport::toCompletableFuture).toArray(CompletableFuture[]::new); try { CompletableFuture.allOf(futures).join(); } catch (CompletionException e) { @@ -447,10 +452,11 @@ public class AsyncSupport { */ @SuppressWarnings("unchecked") public static Object awaitAny(Object... awaitables) { - CompletableFuture<?>[] futures = new CompletableFuture[awaitables.length]; - for (int i = 0; i < awaitables.length; i++) { - futures[i] = toCompletableFuture(awaitables[i]); + if (awaitables == null || awaitables.length == 0) { + throw new IllegalArgumentException("awaitAny requires at least one awaitable"); } + CompletableFuture<?>[] futures = + Arrays.stream(awaitables).map(AsyncSupport::toCompletableFuture).toArray(CompletableFuture[]::new); try { return CompletableFuture.anyOf(futures).join(); } catch (CompletionException e) { @@ -469,12 +475,13 @@ public class AsyncSupport { */ @SuppressWarnings("unchecked") public static List<AwaitResult<Object>> awaitAllSettled(Object... awaitables) { - CompletableFuture<?>[] futures = new CompletableFuture[awaitables.length]; - for (int i = 0; i < awaitables.length; i++) { - futures[i] = toCompletableFuture(awaitables[i]); + if (awaitables == null || awaitables.length == 0) { + return new ArrayList<>(); } + CompletableFuture<?>[] futures = + Arrays.stream(awaitables).map(AsyncSupport::toCompletableFuture).toArray(CompletableFuture[]::new); CompletableFuture.allOf( - java.util.Arrays.stream(futures) + Arrays.stream(futures) .map(f -> f.handle((v, t) -> null)) .toArray(CompletableFuture[]::new) ).join(); @@ -535,11 +542,15 @@ public class AsyncSupport { * the specified delay in the given time unit. * * @param duration the delay duration (must be ≥ 0) - * @param unit the time unit + * @param unit the time unit (must not be {@code null}) * @return an awaitable that completes after the delay - * @throws IllegalArgumentException if {@code duration} is negative + * @throws IllegalArgumentException if {@code duration} is negative or + * {@code unit} is {@code null} */ public static Awaitable<Void> delay(long duration, TimeUnit unit) { + if (unit == null) { + throw new IllegalArgumentException("TimeUnit must not be null"); + } if (duration < 0) { throw new IllegalArgumentException("delay duration must not be negative: " + duration); } @@ -670,7 +681,7 @@ public class AsyncSupport { * * @return a new, empty defer scope (a mutable list of closures) */ - public static ArrayDeque<Closure<?>> createDeferScope() { + public static Deque<Closure<?>> createDeferScope() { return new ArrayDeque<>(); } @@ -682,10 +693,21 @@ public class AsyncSupport { * This method is the runtime entry point for the {@code defer { ... }} * statement, inspired by Go's {@code defer} keyword. * - * @param scope the defer scope (created by {@link #createDeferScope()}) - * @param action the closure to execute on method exit - */ - public static void defer(ArrayDeque<Closure<?>> scope, Closure<?> action) { + * @param scope the defer scope (created by {@link #createDeferScope()}); + * must not be {@code null} + * @param action the closure to execute on method exit; + * must not be {@code null} + * @throws IllegalStateException if {@code scope} is {@code null} + * (indicates {@code defer} used outside an async context) + * @throws IllegalArgumentException if {@code action} is {@code null} + */ + public static void defer(Deque<Closure<?>> scope, Closure<?> action) { + if (scope == null) { + throw new IllegalStateException("defer must be used inside an async method or closure"); + } + if (action == null) { + throw new IllegalArgumentException("defer action must not be null"); + } scope.push(action); } @@ -698,10 +720,13 @@ public class AsyncSupport { * <p> * Called by compiler-generated code in the {@code finally} block of * methods that contain {@code defer} statements. + * <p> + * A {@code null} or empty scope is treated as a no-op for robustness. * - * @param scope the defer scope to execute + * @param scope the defer scope to execute; may be {@code null} */ - public static void executeDeferScope(ArrayDeque<Closure<?>> scope) { + public static void executeDeferScope(Deque<Closure<?>> scope) { + if (scope == null || scope.isEmpty()) return; Throwable firstError = null; while (!scope.isEmpty()) { try { diff --git a/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java b/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java index 8bb173291a..0b3a85a03e 100644 --- a/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java +++ b/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java @@ -22,7 +22,6 @@ import groovy.concurrent.Awaitable; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -114,14 +113,7 @@ public class GroovyPromise<T> implements Awaitable<T> { public Awaitable<T> exceptionally(Function<Throwable, ? extends T> fn) { return new GroovyPromise<>(future.exceptionally(t -> { // Unwrap all wrapper layers so handler sees the original exception - Throwable cause = t; - while (cause.getCause() != null - && (cause instanceof CompletionException - || cause instanceof ExecutionException - || cause instanceof java.lang.reflect.UndeclaredThrowableException - || cause instanceof java.lang.reflect.InvocationTargetException)) { - cause = cause.getCause(); - } + Throwable cause = AsyncSupport.deepUnwrap(t); return fn.apply(cause); })); } diff --git a/src/spec/doc/core-async-await.adoc b/src/spec/doc/core-async-await.adoc index 1fb316eccd..a45aa2735b 100644 --- a/src/spec/doc/core-async-await.adoc +++ b/src/spec/doc/core-async-await.adoc @@ -503,6 +503,33 @@ def result = AsyncUtils.awaitAny( ) ---- +[[cancellation]] +=== Cancellation + +An `Awaitable` can be cancelled via the `cancel()` method. Once cancelled, any subsequent +`await` on the task throws a `java.util.concurrent.CancellationException`: + +[source,groovy] +---- +import java.util.concurrent.CancellationException + +def task = longRunningTask() +task.cancel() + +try { + await(task) +} catch (CancellationException e) { + // task was cancelled +} +---- + +[NOTE] +==== +Cancellation sets the `Awaitable` state to cancelled but does _not_ reliably interrupt the +underlying thread (a limitation of `CompletableFuture`). For cooperative cancellation in +long-running async bodies, check `Thread.currentThread().isInterrupted()` periodically. +==== + [[summary]] == Summary diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy b/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy index b31b1d0263..9bc87564dd 100644 --- a/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy +++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy @@ -1177,7 +1177,8 @@ class AsyncAwaitSyntaxTest { a.get(50, TimeUnit.MILLISECONDS) assert false : "expected timeout" } catch (TimeoutException e) { - // expected + // expected — cancel to avoid leaving the task running + a.cancel() } ''' } diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy b/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy index ed52945a65..8c765aa266 100644 --- a/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy +++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy @@ -702,7 +702,7 @@ class AsyncPatternsTest { import groovy.concurrent.AsyncUtils def longRunning = async { - await(AsyncUtils.delay(5000)) + await(AsyncUtils.delay(2000)) return "completed" } def longTask = longRunning() diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy b/src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy index 76671d4dc4..7210c5744d 100644 --- a/src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy +++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy @@ -960,7 +960,8 @@ final class AsyncVirtualThreadTest { awaitable.get(50, TimeUnit.MILLISECONDS) assert false : "Should have timed out" } catch (TimeoutException e) { - // expected + // expected — cancel to avoid leaving the task running + awaitable.cancel() } ''' }
