This is an automated email from the ASF dual-hosted git repository. sunlan pushed a commit to branch GROOVY-9381_3 in repository https://gitbox.apache.org/repos/asf/groovy.git
commit 6eb7c2294a74c0bb9d3ef601d366eafa31872011 Author: Daniel Sun <[email protected]> AuthorDate: Sun Mar 8 15:22:10 2026 +0900 Minor tweaks --- src/main/java/groovy/concurrent/AwaitResult.java | 4 + .../apache/groovy/parser/antlr4/AstBuilder.java | 8 + .../groovy/parser/antlr4/ModifierManager.java | 3 +- .../groovy/runtime/async/AsyncStreamGenerator.java | 45 ++ .../apache/groovy/runtime/async/GroovyPromise.java | 51 ++ src/spec/doc/core-async-await.adoc | 62 ++ src/spec/test/AsyncAwaitSpecTest.groovy | 101 ++- .../groovy/transform/AsyncAwaitSyntaxTest.groovy | 60 +- .../groovy/transform/AsyncCoverageTest.groovy | 739 +++++++++++++++++++++ .../groovy/transform/AsyncVirtualThreadTest.groovy | 55 +- 10 files changed, 1103 insertions(+), 25 deletions(-) diff --git a/src/main/java/groovy/concurrent/AwaitResult.java b/src/main/java/groovy/concurrent/AwaitResult.java index 9fba81d1e9..31aedbee80 100644 --- a/src/main/java/groovy/concurrent/AwaitResult.java +++ b/src/main/java/groovy/concurrent/AwaitResult.java @@ -100,6 +100,10 @@ public final class AwaitResult<T> { return success ? value : fallback.apply(error); } + /** + * Returns a human-readable representation of this result: + * {@code AwaitResult.Success[value]} or {@code AwaitResult.Failure[error]}. + */ @Override public String toString() { return success diff --git a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java index 3f221e35d8..55fd359bb0 100644 --- a/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java +++ b/src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java @@ -1285,6 +1285,10 @@ public class AstBuilder extends GroovyParserBaseVisitor<Object> { throw createParsingFailedException("only sealed type declarations should have `permits` clause", ctx); } + if (modifierManager.containsAny(ASYNC)) { + throw createParsingFailedException("modifier `async` is not allowed for type declarations", modifierManager.get(ASYNC).get()); + } + int modifiers = modifierManager.getClassModifiersOpValue(); boolean syntheticPublic = ((modifiers & Opcodes.ACC_SYNTHETIC) != 0); @@ -2013,6 +2017,10 @@ public class AstBuilder extends GroovyParserBaseVisitor<Object> { asBoolean(ctx.modifiers()) ? this.visitModifiers(ctx.modifiers()) : Collections.emptyList() ); + if (modifierManager.containsAny(ASYNC)) { + throw createParsingFailedException("modifier `async` is not allowed for variable declarations", modifierManager.get(ASYNC).get()); + } + if (asBoolean(ctx.typeNamePairs())) { // e.g. def (int a, int b) = [1, 2] return this.createMultiAssignmentDeclarationListStatement(ctx, modifierManager); } diff --git a/src/main/java/org/apache/groovy/parser/antlr4/ModifierManager.java b/src/main/java/org/apache/groovy/parser/antlr4/ModifierManager.java index 4d86449b8d..729eb14cf2 100644 --- a/src/main/java/org/apache/groovy/parser/antlr4/ModifierManager.java +++ b/src/main/java/org/apache/groovy/parser/antlr4/ModifierManager.java @@ -37,6 +37,7 @@ import java.util.Optional; import java.util.stream.Collectors; import static org.apache.groovy.parser.antlr4.GroovyLangParser.ABSTRACT; +import static org.apache.groovy.parser.antlr4.GroovyLangParser.ASYNC; import static org.apache.groovy.parser.antlr4.GroovyLangParser.FINAL; import static org.apache.groovy.parser.antlr4.GroovyLangParser.NATIVE; import static org.apache.groovy.parser.antlr4.GroovyLangParser.STATIC; @@ -47,7 +48,7 @@ import static org.apache.groovy.parser.antlr4.GroovyLangParser.VOLATILE; */ class ModifierManager { private static final Map<Class, List<Integer>> INVALID_MODIFIERS_MAP = Maps.of( - ConstructorNode.class, Arrays.asList(STATIC, FINAL, ABSTRACT, NATIVE), + ConstructorNode.class, Arrays.asList(STATIC, FINAL, ABSTRACT, NATIVE, ASYNC), MethodNode.class, Arrays.asList(VOLATILE/*, TRANSIENT*/) // Transient is left open for properties for legacy reasons but should be removed before ClassCompletionVerifier runs (CLASSGEN) ); private AstBuilder astBuilder; diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java index c3ba7176c3..51396f4ef4 100644 --- a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java +++ b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java @@ -66,6 +66,12 @@ import java.util.concurrent.atomic.AtomicReference; * {@link java.util.concurrent.CancellationException}, while the consumer's * {@link #moveNext()} returns {@code Awaitable.of(false)}. * <p> + * A <em>double-check</em> pattern in {@link #moveNext()} closes a TOCTOU race + * window: the {@link #closed} flag is re-checked <em>after</em> registering the + * consumer thread, ensuring that an externally-triggered {@code close()} cannot + * slip between the initial check and the registration, which would otherwise + * leave the consumer blocked forever in {@code queue.take()}. + * <p> * The {@link #attachProducer}/{@link #detachProducer} lifecycle methods are * called by the async runtime (in {@link AsyncSupport}) to register and * unregister the producer thread. The consumer thread is tracked @@ -175,6 +181,28 @@ public class AsyncStreamGenerator<T> implements AsyncStream<T> { } } + /** + * {@inheritDoc} + * <p> + * Blocks the calling (consumer) thread on the {@link SynchronousQueue} until + * the producer offers the next element, a completion sentinel, or an error. + * If the stream has been {@linkplain #close() closed}, returns + * {@code Awaitable.of(false)} immediately without blocking. + * <p> + * The consumer thread is registered via {@code consumerThread} during the + * blocking call so that {@link #close()} can interrupt it if needed. + * A <em>double-check</em> of the {@link #closed} flag is performed after + * registration to close the TOCTOU race window: if {@code close()} executes + * between the initial {@code closed.get()} check and the + * {@code consumerThread.set()} call, the consumer reference would not yet be + * visible to {@code close()}, so no interrupt would be delivered, and + * {@code queue.take()} would block indefinitely. The re-check after + * registration detects this case and returns immediately. + * + * @return an {@code Awaitable<Boolean>} that resolves to {@code true} if a + * new element is available via {@link #getCurrent()}, or {@code false} + * if the stream is exhausted or closed + */ @Override @SuppressWarnings("unchecked") public Awaitable<Boolean> moveNext() { @@ -183,6 +211,14 @@ public class AsyncStreamGenerator<T> implements AsyncStream<T> { } Thread currentThread = Thread.currentThread(); consumerThread.set(currentThread); + // Double-check after registration: if close() raced between the first + // closed check and consumerThread.set(), the consumer reference was not + // yet visible to close(), so no interrupt was delivered. Without this + // re-check the consumer would block in queue.take() indefinitely. + if (closed.get()) { + consumerThread.compareAndSet(currentThread, null); + return Awaitable.of(false); + } try { Object next = queue.take(); if (next == DONE) { @@ -208,6 +244,15 @@ public class AsyncStreamGenerator<T> implements AsyncStream<T> { } } + /** + * {@inheritDoc} + * <p> + * Returns the most recently consumed element. The value is updated each time + * {@link #moveNext()} returns {@code true}. + * + * @return the current element, or {@code null} before the first successful + * {@code moveNext()} call + */ @Override public T getCurrent() { return current; diff --git a/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java b/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java index df22416b2d..5d03000748 100644 --- a/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java +++ b/src/main/java/org/apache/groovy/runtime/async/GroovyPromise.java @@ -47,6 +47,12 @@ public class GroovyPromise<T> implements Awaitable<T> { private final CompletableFuture<T> future; + /** + * Creates a new {@code GroovyPromise} wrapping the given {@link CompletableFuture}. + * + * @param future the backing future; must not be {@code null} + * @throws NullPointerException if {@code future} is {@code null} + */ public GroovyPromise(CompletableFuture<T> future) { this.future = Objects.requireNonNull(future, "future must not be null"); } @@ -58,6 +64,13 @@ public class GroovyPromise<T> implements Awaitable<T> { return new GroovyPromise<>(future); } + /** + * {@inheritDoc} + * <p> + * Waits if necessary for the computation to complete, then retrieves its result. + * If the future was cancelled, the original {@link CancellationException} is + * unwrapped from the JDK 23+ wrapper for cross-version consistency. + */ @Override public T get() throws InterruptedException, ExecutionException { try { @@ -67,6 +80,12 @@ public class GroovyPromise<T> implements Awaitable<T> { } } + /** + * {@inheritDoc} + * <p> + * Waits at most the given time for the computation to complete. + * Unwraps JDK 23+ {@link CancellationException} wrappers for consistency. + */ @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { try { @@ -76,6 +95,7 @@ public class GroovyPromise<T> implements Awaitable<T> { } } + /** {@inheritDoc} */ @Override public boolean isDone() { return future.isDone(); @@ -98,26 +118,47 @@ public class GroovyPromise<T> implements Awaitable<T> { return future.cancel(true); } + /** {@inheritDoc} */ @Override public boolean isCancelled() { return future.isCancelled(); } + /** {@inheritDoc} */ @Override public boolean isCompletedExceptionally() { return future.isCompletedExceptionally(); } + /** + * {@inheritDoc} + * <p> + * Returns a new {@code GroovyPromise} whose result is obtained by applying + * the given function to this promise's result. + */ @Override public <U> Awaitable<U> then(Function<? super T, ? extends U> fn) { return new GroovyPromise<>(future.thenApply(fn)); } + /** + * {@inheritDoc} + * <p> + * Returns a new {@code GroovyPromise} that is the result of composing this + * promise with the async function, enabling flat-mapping of awaitables. + */ @Override public <U> Awaitable<U> thenCompose(Function<? super T, ? extends Awaitable<U>> fn) { return new GroovyPromise<>(future.thenCompose(t -> fn.apply(t).toCompletableFuture())); } + /** + * {@inheritDoc} + * <p> + * Returns a new {@code GroovyPromise} that handles exceptions thrown by this promise. + * The throwable passed to the handler is deeply unwrapped to strip JDK + * wrapper layers ({@code CompletionException}, {@code ExecutionException}). + */ @Override public Awaitable<T> exceptionally(Function<Throwable, ? extends T> fn) { return new GroovyPromise<>(future.exceptionally(t -> { @@ -127,6 +168,11 @@ public class GroovyPromise<T> implements Awaitable<T> { })); } + /** + * {@inheritDoc} + * <p> + * Returns the underlying {@link CompletableFuture} for interop with JDK APIs. + */ @Override public CompletableFuture<T> toCompletableFuture() { return future; @@ -143,6 +189,11 @@ public class GroovyPromise<T> implements Awaitable<T> { return cause instanceof CancellationException ce ? ce : exception; } + /** + * Returns a human-readable representation showing the promise state: + * {@code GroovyPromise{pending}}, {@code GroovyPromise{completed}}, or + * {@code GroovyPromise{failed}}. + */ @Override public String toString() { if (future.isDone()) { diff --git a/src/spec/doc/core-async-await.adoc b/src/spec/doc/core-async-await.adoc index 0ad22f6d37..844b33de98 100644 --- a/src/spec/doc/core-async-await.adoc +++ b/src/spec/doc/core-async-await.adoc @@ -382,6 +382,23 @@ each result is wrapped in an `AwaitResult` with `isSuccess()`, `isFailure()`, `v include::../test/AsyncAwaitSpecTest.groovy[tags=await_all_settled,indent=0] ---- +[[await-result]] +=== `AwaitResult` — Outcome Wrapper + +Each element returned by `allSettled` is an `AwaitResult<T>`. This immutable value type +carries either a success value or a failure throwable, and provides safe accessors: + +* `isSuccess()` / `isFailure()` — check outcome type +* `value` — the result (throws `IllegalStateException` on failure) +* `error` — the exception (throws `IllegalStateException` on success) +* `getOrElse(Function)` — recover from failures with a fallback function +* `toString()` — returns `AwaitResult.Success[value]` or `AwaitResult.Failure[error]` + +[source,groovy] +---- +include::../test/AsyncAwaitSpecTest.groovy[tags=await_result_api,indent=0] +---- + === Continuation Helpers Use `thenAccept`, `whenComplete`, and `handle` when you want continuation-style @@ -415,6 +432,16 @@ provide the same capability on an existing awaitable. include::../test/AsyncAwaitSpecTest.groovy[tags=timeout_combinators,indent=0] ---- +==== Instance Forms: `orTimeout` and `completeOnTimeout` + +The instance methods `orTimeout(millis)` and `completeOnTimeout(fallback, millis)` provide +a fluent alternative to the static `Awaitable.timeout()` and `Awaitable.timeoutOr()` forms: + +[source,groovy] +---- +include::../test/AsyncAwaitSpecTest.groovy[tags=instance_timeout_methods,indent=0] +---- + [[flow-publisher]] == `Flow.Publisher` Integration @@ -638,6 +665,31 @@ lose exceptions silently: include::../test/AsyncAwaitSpecTest.groovy[tags=fire_and_forget,indent=0] ---- +[[inspection]] +== Inspecting Awaitable State + +An `Awaitable` provides non-blocking inspection methods for checking its state without +blocking the calling thread: + +* `isDone()` — `true` when the awaitable has completed (successfully, exceptionally, or via cancellation) +* `isCancelled()` — `true` when the awaitable was cancelled via `cancel()` +* `isCompletedExceptionally()` — `true` when the awaitable completed with an exception (including cancellation) + +[source,groovy] +---- +include::../test/AsyncAwaitSpecTest.groovy[tags=inspection_methods,indent=0] +---- + +[[async-modifier-restrictions]] +== `async` Modifier Restrictions + +The `async` keyword is valid only on **method declarations** (including script-level methods) +and **closure/lambda expressions**. Applying `async` to other declarations is a compile-time error: + +* **Class/interface declarations**: `async class Foo {}` → parser error +* **Field/variable declarations**: `async def x = 1` → compilation error ("async not allowed for variable declarations") +* **Constructor declarations**: `async Foo() {}` → compilation error ("async not allowed for constructors") + [[implementation-details]] == Implementation Notes @@ -899,4 +951,14 @@ JavaScript, C#, Kotlin, and Swift, for developers familiar with those languages. | Annotation form | `@Async def methodName() { ... }` + +| Inspection +| `awaitable.isDone()` / `isCancelled()` / `isCompletedExceptionally()` + +| Timeout (instance) +| `awaitable.orTimeout(ms)` / `awaitable.completeOnTimeout(fallback, ms)` + +| AwaitResult +| `result.isSuccess()` / `result.isFailure()` / `result.getOrElse { fallback }` |=== + diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy b/src/spec/test/AsyncAwaitSpecTest.groovy index 66f5735d06..1c1a05e962 100644 --- a/src/spec/test/AsyncAwaitSpecTest.groovy +++ b/src/spec/test/AsyncAwaitSpecTest.groovy @@ -1106,7 +1106,106 @@ assert results == [1, 2, 3] } // ========================================================================= - // 16. Custom adapter registration + // 16. AwaitResult API + // ========================================================================= + + @Test + void testAwaitResultApi() { + assertScript ''' +// tag::await_result_api[] +import groovy.concurrent.Awaitable +import groovy.concurrent.AwaitResult + +async caller() { + def a = Awaitable.of(42) + def b = Awaitable.failed(new IOException("disk full")) + return await(Awaitable.allSettled(a, b)) +} + +def results = await(caller()) + +// Inspect each outcome +AwaitResult success = results[0] +assert success.isSuccess() +assert success.value == 42 +assert !success.isFailure() + +AwaitResult failure = results[1] +assert failure.isFailure() +assert failure.error.message == "disk full" +assert !failure.isSuccess() + +// getOrElse: recover from failures with a fallback function +assert success.getOrElse { -1 } == 42 +assert failure.getOrElse { err -> "recovered: ${err.message}" } == "recovered: disk full" + +// toString for debugging +assert success.toString() == "AwaitResult.Success[42]" +assert failure.toString().startsWith("AwaitResult.Failure[") +// end::await_result_api[] + ''' + } + + // ========================================================================= + // 17. Instance timeout methods + // ========================================================================= + + @Test + void testInstanceTimeoutMethods() { + assertScript ''' +// tag::instance_timeout_methods[] +import groovy.concurrent.Awaitable +import java.util.concurrent.TimeoutException + +async slowTask() { + await(Awaitable.delay(5_000)) + return "done" +} + +// orTimeout: fails with TimeoutException if not completed in time +try { + await(slowTask().orTimeout(50)) + assert false : "should have timed out" +} catch (TimeoutException e) { + assert e.message.contains("50") +} + +// completeOnTimeout: completes with a fallback value instead of failing +assert await(slowTask().completeOnTimeout("fallback", 50)) == "fallback" +// end::instance_timeout_methods[] + ''' + } + + // ========================================================================= + // 18. Inspection methods + // ========================================================================= + + @Test + void testInspectionMethods() { + assertScript ''' +// tag::inspection_methods[] +import groovy.concurrent.Awaitable + +// Check state without blocking +def completed = Awaitable.of(42) +assert completed.isDone() +assert !completed.isCancelled() + +def failed = Awaitable.failed(new RuntimeException("oops")) +assert failed.isDone() +assert failed.isCompletedExceptionally() + +// Cancel an awaitable +def task = Awaitable.delay(10_000) +task.cancel() +assert task.isCancelled() +assert task.isDone() +// end::inspection_methods[] + ''' + } + + // ========================================================================= + // 19. Custom adapter registration // ========================================================================= @Test diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy b/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy index 59e46116eb..0c15f94f63 100644 --- a/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy +++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy @@ -18,6 +18,7 @@ */ package org.codehaus.groovy.transform +import org.codehaus.groovy.control.CompilationFailedException import org.junit.jupiter.api.Test import static groovy.test.GroovyAssert.assertScript @@ -1644,28 +1645,30 @@ class AsyncAwaitSyntaxTest { @Test void testAsyncOnAbstractMethodFails() { - shouldFail ''' + def err = shouldFail CompilationFailedException, ''' abstract class Svc { @groovy.transform.Async abstract def compute() } ''' + assert err.message.contains('cannot be applied to abstract method') } @Test void testAsyncOnAwaitableReturnTypeFails() { - shouldFail ''' + def err = shouldFail CompilationFailedException, ''' import groovy.concurrent.Awaitable class Svc { @groovy.transform.Async Awaitable<String> compute() { Awaitable.of("x") } } ''' + assert err.message.contains('already returns an async type') } @Test void testForAwaitWithTraditionalForSyntaxFails() { - shouldFail ''' + def err = shouldFail CompilationFailedException, ''' class Svc { @groovy.transform.Async def work() { @@ -1675,6 +1678,57 @@ class AsyncAwaitSyntaxTest { } } ''' + assert err.message.contains('for await') + } + + @Test + void testAsyncOnClassDeclarationFails() { + def err = shouldFail CompilationFailedException, ''' + async class Svc {} + ''' + // Parser rejects 'async' on top-level class declarations (ASYNC not in classOrInterfaceModifier) + assert err.message.contains('Unexpected input') + } + + @Test + void testAsyncOnInnerClassDeclarationFails() { + def err = shouldFail CompilationFailedException, ''' + class Outer { + async class Inner {} + } + ''' + assert err.message.contains('not allowed for type declarations') + } + + @Test + void testAsyncOnFieldDeclarationFails() { + def err = shouldFail CompilationFailedException, ''' + class Svc { + async int x = 42 + } + ''' + assert err.message.contains('async') && err.message.contains('not allowed') + } + + @Test + void testAsyncOnLocalVariableFails() { + def err = shouldFail CompilationFailedException, ''' + def foo() { + async int x = 42 + } + ''' + assert err.message.contains('async') && err.message.contains('not allowed') + } + + @Test + void testAsyncOnConstructorFails() { + def err = shouldFail CompilationFailedException, ''' + class Svc { + @groovy.transform.Async + Svc() {} + } + ''' + assert err.message.contains('incorrect modifier') || err.message.contains('Async') } // ===================================================================== diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncCoverageTest.groovy b/src/test/groovy/org/codehaus/groovy/transform/AsyncCoverageTest.groovy index 49ab2fdbdf..4f72cc29fa 100644 --- a/src/test/groovy/org/codehaus/groovy/transform/AsyncCoverageTest.groovy +++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncCoverageTest.groovy @@ -34,6 +34,7 @@ import java.util.concurrent.CancellationException import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletionException import java.util.concurrent.CompletionStage +import java.util.concurrent.CountDownLatch import java.util.concurrent.ExecutionException import java.util.concurrent.Flow import java.util.concurrent.FutureTask @@ -42,6 +43,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import static groovy.test.GroovyAssert.assertScript +import static groovy.test.GroovyAssert.shouldFail /** * Comprehensive coverage tests for Groovy's async/await API surface. @@ -2716,4 +2718,741 @@ class AsyncCoverageTest { assert stream.getCurrent() == 'item1' assert stream.moveNext().get() == false } + + // ===================================================================== + // Coverage: AsyncSupport edge cases + // ===================================================================== + + @Test + void testAwaitClosureDirectlyThrowsIllegalArgument() { + def ex = shouldFail(IllegalArgumentException) { + AsyncSupport.await((Object) { -> 42 }) + } + assert ex.message.contains('Cannot await a Closure directly') + assert ex.message.contains('Call the closure first') + } + + @Test + void testAwaitRawFutureNonCompletableFuture() { + // Use a FutureTask (implements Future but is not CompletableFuture) + def ft = new FutureTask<>({ 'raw-future-result' } as Callable) + Thread.start { ft.run() } + def result = AsyncSupport.await(ft) + assert result == 'raw-future-result' + } + + @Test + void testAwaitRawFutureExecutionException() { + def ft = new FutureTask<>({ + throw new IOException('raw future error') + } as Callable) + Thread.start { ft.run() } + def ex = shouldFail(IOException) { + AsyncSupport.await(ft) + } + assert ex.message == 'raw future error' + } + + @Test + void testAwaitRawFutureCancellation() { + def ft = new FutureTask<>({ Thread.sleep(10000); 'never' } as Callable) + Thread.start { ft.run() } + ft.cancel(true) + shouldFail(CancellationException) { + AsyncSupport.await(ft) + } + } + + @Test + void testDelayZeroDurationReturnsImmediately() { + long start = System.nanoTime() + def result = AsyncSupport.delay(0, TimeUnit.MILLISECONDS) + assert result.get() == null + long elapsed = (System.nanoTime() - start) / 1_000_000 + assert elapsed < 500 // should be nearly instant + } + + @Test + void testDelayNullUnitThrows() { + def ex = shouldFail(IllegalArgumentException) { + AsyncSupport.delay(100, null) + } + assert ex.message.contains('TimeUnit must not be null') + } + + @Test + void testDelayNegativeDurationThrows() { + def ex = shouldFail(IllegalArgumentException) { + AsyncSupport.delay(-1, TimeUnit.MILLISECONDS) + } + assert ex.message.contains('must not be negative') + } + + @Test + void testDeepUnwrapNestedExceptions() { + def root = new RuntimeException('root cause') + def wrapped = new CompletionException( + new ExecutionException(root)) + assert AsyncSupport.deepUnwrap(wrapped) == root + } + + @Test + void testDeepUnwrapUnrelatedExceptionNotUnwrapped() { + def ex = new IOException('not wrapped') + assert AsyncSupport.deepUnwrap(ex) == ex + } + + @Test + void testRethrowUnwrappedThrowsError() { + // Test indirectly via await(Future) — errors are rethrown directly + def ft = new FutureTask<>({ + throw new StackOverflowError('test error') + } as Callable) + Thread.start { ft.run() } + Thread.sleep(50) + shouldFail(StackOverflowError) { + AsyncSupport.await(ft) + } + } + + // ===================================================================== + // Coverage: AwaitableAdapterRegistry fallback paths + // ===================================================================== + + @Test + void testToAwaitableNullThrows() { + def ex = shouldFail(IllegalArgumentException) { + AwaitableAdapterRegistry.toAwaitable(null) + } + assert ex.message.contains('Cannot convert null to Awaitable') + } + + @Test + void testToAwaitableUnknownTypeThrows() { + def ex = shouldFail(IllegalArgumentException) { + AwaitableAdapterRegistry.toAwaitable(new StringBuilder("test")) + } + assert ex.message.contains('No AwaitableAdapter found for type') + assert ex.message.contains('StringBuilder') + } + + @Test + void testToAsyncStreamNullThrows() { + def ex = shouldFail(IllegalArgumentException) { + AwaitableAdapterRegistry.toAsyncStream(null) + } + assert ex.message.contains('Cannot convert null to AsyncStream') + } + + @Test + void testToAsyncStreamUnknownTypeThrows() { + def ex = shouldFail(IllegalArgumentException) { + AwaitableAdapterRegistry.toAsyncStream(new StringBuilder("test")) + } + assert ex.message.contains('No AsyncStream adapter found for type') + assert ex.message.contains('StringBuilder') + } + + // ===================================================================== + // Coverage: Awaitable default methods + // ===================================================================== + + @Test + void testAwaitableWhenCompleteSuccess() { + def awaitable = Awaitable.of(42) + def captured = [] + def result = awaitable.whenComplete { value, error -> + captured << value + captured << error + } + assert result.get() == 42 + assert captured == [42, null] + } + + @Test + void testAwaitableWhenCompleteFailure() { + def cf = new CompletableFuture<>() + cf.completeExceptionally(new IOException('fail')) + def awaitable = GroovyPromise.of(cf) + def captured = [] + def result = awaitable.whenComplete { value, error -> + captured << value + captured << error?.class?.simpleName + } + try { result.get() } catch (ignored) {} + Thread.sleep(50) + assert captured.contains('IOException') + } + + @Test + void testAwaitableHandleSuccess() { + def awaitable = Awaitable.of('hello') + def result = awaitable.handle { value, error -> + error == null ? value.toUpperCase() : 'fallback' + } + assert result.get() == 'HELLO' + } + + @Test + void testAwaitableHandleFailure() { + def cf = new CompletableFuture<String>() + cf.completeExceptionally(new IOException('broken')) + def awaitable = GroovyPromise.of(cf) + def result = awaitable.handle { value, error -> + error != null ? "recovered: ${error.message}" : value + } + assert result.get() == 'recovered: broken' + } + + @Test + void testAwaitableOrTimeoutInstanceMethod() { + def cf = new CompletableFuture<>() + def awaitable = GroovyPromise.of(cf) + def withTimeout = awaitable.orTimeout(50) + def ex = shouldFail(ExecutionException) { + withTimeout.get() + } + assert ex.cause instanceof java.util.concurrent.TimeoutException + } + + @Test + void testAwaitableOrTimeoutWithUnitInstanceMethod() { + def cf = new CompletableFuture<>() + def awaitable = GroovyPromise.of(cf) + def withTimeout = awaitable.orTimeout(50, TimeUnit.MILLISECONDS) + def ex = shouldFail(ExecutionException) { + withTimeout.get() + } + assert ex.cause instanceof java.util.concurrent.TimeoutException + } + + @Test + void testAwaitableCompleteOnTimeoutInstanceMethod() { + def cf = new CompletableFuture<>() + def awaitable = GroovyPromise.of(cf) + def withFallback = awaitable.completeOnTimeout('default', 50) + assert withFallback.get() == 'default' + } + + @Test + void testAwaitableCompleteOnTimeoutWithUnitInstanceMethod() { + def cf = new CompletableFuture<>() + def awaitable = GroovyPromise.of(cf) + def withFallback = awaitable.completeOnTimeout('default', 50, TimeUnit.MILLISECONDS) + assert withFallback.get() == 'default' + } + + // ===================================================================== + // Coverage: GroovyPromise toString() and get(timeout) + // ===================================================================== + + @Test + void testGroovyPromiseToStringPending() { + def cf = new CompletableFuture<>() + def promise = GroovyPromise.of(cf) + assert promise.toString() == 'GroovyPromise{pending}' + } + + @Test + void testGroovyPromiseToStringCompleted() { + def cf = CompletableFuture.completedFuture(42) + def promise = GroovyPromise.of(cf) + assert promise.toString() == 'GroovyPromise{completed}' + } + + @Test + void testGroovyPromiseToStringFailed() { + def cf = new CompletableFuture<>() + cf.completeExceptionally(new RuntimeException('oops')) + def promise = GroovyPromise.of(cf) + assert promise.toString() == 'GroovyPromise{failed}' + } + + @Test + void testGroovyPromiseGetWithTimeout() { + def cf = CompletableFuture.completedFuture('ok') + def promise = GroovyPromise.of(cf) + assert promise.get(1, TimeUnit.SECONDS) == 'ok' + } + + @Test + void testGroovyPromiseGetWithTimeoutExpired() { + def cf = new CompletableFuture<>() + def promise = GroovyPromise.of(cf) + shouldFail(java.util.concurrent.TimeoutException) { + promise.get(50, TimeUnit.MILLISECONDS) + } + } + + @Test + void testGroovyPromiseGetWithTimeoutCancelled() { + def cf = new CompletableFuture<>() + cf.cancel(true) + def promise = GroovyPromise.of(cf) + shouldFail(CancellationException) { + promise.get(1, TimeUnit.SECONDS) + } + } + + @Test + void testGroovyPromiseIsCompletedExceptionally() { + def cf = new java.util.concurrent.CompletableFuture<>() + def promise = GroovyPromise.of(cf) + assert !promise.isCompletedExceptionally() + cf.completeExceptionally(new RuntimeException()) + assert promise.isCompletedExceptionally() + } + + @Test + void testGroovyPromiseConstructorNullThrows() { + shouldFail(NullPointerException) { + new GroovyPromise(null) + } + } + + // ===================================================================== + // Coverage: AsyncStreamGenerator error path + // ===================================================================== + + @Test + void testAsyncStreamGeneratorErrorWithNullCoversNPECreation() { + // Tests the null-guarding branch: t != null ? t : new NullPointerException(...) + def gen = new AsyncStreamGenerator<String>() + Thread.start { + gen.error(null) + } + try { + gen.moveNext() + assert false : 'expected NullPointerException' + } catch (NullPointerException e) { + assert e != null + } + } + + @Test + void testAsyncStreamGeneratorYieldAfterClose() { + def gen = new AsyncStreamGenerator<String>() + def yieldFailed = new CompletableFuture<Boolean>() + def readyToClose = new CountDownLatch(1) + Thread.start { + gen.attachProducer(Thread.currentThread()) + try { + AsyncSupport.yieldReturn(gen, 'first') + // Signal that producer is past the first yield and ready for close + readyToClose.countDown() + try { + // This will either block in queue.put() (if close hasn't happened + // yet — close() will interrupt it) or throw immediately (if closed) + AsyncSupport.yieldReturn(gen, 'second') + yieldFailed.complete(false) + } catch (ignored) { + yieldFailed.complete(true) + } + } catch (ignored) { + // yield of 'first' was interrupted by close — still counts as failure + yieldFailed.complete(true) + } finally { + gen.detachProducer(Thread.currentThread()) + } + } + assert gen.moveNext().get() == true + assert gen.getCurrent() == 'first' + // Consumer closes the stream — close() never self-interrupts the calling thread + readyToClose.await(2, TimeUnit.SECONDS) + gen.close() + assert yieldFailed.get(2, TimeUnit.SECONDS) == true + } + + // ===================================================================== + // High-concurrency real-world scenarios + // ===================================================================== + + /** + * Verifies the moveNext() double-check-after-registration fix: + * an external thread calling close() between the initial closed.get() + * and consumerThread.set() must not cause moveNext() to block forever. + */ + @Test + void testMoveNextTOCTOURaceWithExternalClose() { + for (int iteration = 0; iteration < 200; iteration++) { + def gen = new AsyncStreamGenerator<Integer>() + def producerStarted = new CountDownLatch(1) + + // Producer that yields one item, then waits to be interrupted + Thread.start { + gen.attachProducer(Thread.currentThread()) + try { + producerStarted.countDown() + AsyncSupport.yieldReturn(gen, 1) + // Block indefinitely — close() should interrupt us + Thread.sleep(60_000) + gen.complete() + } catch (ignored) { + // Expected: CancellationException or InterruptedException from close() + } finally { + gen.detachProducer(Thread.currentThread()) + } + } + + producerStarted.await(2, TimeUnit.SECONDS) + // Consume the first item + assert gen.moveNext().get() == true + assert gen.getCurrent() == 1 + + // Close from an external thread while consumer is about to call moveNext() + def closeThread = Thread.start { gen.close() } + closeThread.join(2000) + + // Without the double-check fix, this would block forever + assert gen.moveNext().get(2, TimeUnit.SECONDS) == false + } + } + + /** + * Multiple independent async tasks producing results concurrently. + * Simulates a real scenario: parallel data fetches aggregated together. + */ + @Test + void testParallelAsyncGeneratorsUnderConcurrency() { + assertScript ''' +import groovy.concurrent.Awaitable + +async List<Integer> produce(int genId, int count) { + def items = [] + for (int i = 0; i < count; i++) { + items << await(Awaitable.of(genId * 1000 + i)) + } + return items +} + +int numTasks = 20 +int itemsPerTask = 50 + +// Launch all tasks concurrently and await all results +def tasks = (0..<numTasks).collect { produce(it, itemsPerTask) } +def allResults = await(Awaitable.all(*tasks)) + +assert allResults.size() == numTasks +for (int g = 0; g < numTasks; g++) { + def expected = (0..<itemsPerTask).collect { g * 1000 + it } + assert allResults[g] == expected +} + ''' + } + + /** + * Verifies back-pressure: a fast producer is naturally throttled + * by a slow consumer through the SynchronousQueue handoff. + */ + @Test + void testBackPressureFastProducerSlowConsumer() { + assertScript ''' +import groovy.concurrent.Awaitable + +async generateFast() { + for (int i = 0; i < 100; i++) { + yield return i + } +} + +def results = [] +for await (item in generateFast()) { + results << item + // Slow consumer: the producer cannot get far ahead because + // SynchronousQueue blocks until consumer takes each item + if (item < 5) { + Thread.sleep(10) + } +} +assert results == (0..<100).toList() + ''' + } + + /** + * Consumer breaks out of for-await mid-stream under high load. + * Verifies the producer is interrupted and cleaned up promptly. + */ + @Test + void testMidStreamCancelUnderLoad() { + assertScript ''' +import groovy.concurrent.Awaitable + +async generateMany() { + for (int i = 0; i < 1_000_000; i++) { + yield return i + } +} + +def consumed = [] +for await (item in generateMany()) { + consumed << item + if (item == 9) break // Cancel after 10 items +} + +assert consumed == (0..9).toList() + ''' + } + + /** + * Nested async chains: async method A calls async method B which calls C. + * All running concurrently with multiple callers via Awaitable.all(). + */ + @Test + void testNestedAwaitChainsUnderConcurrency() { + assertScript ''' +import groovy.concurrent.Awaitable + +async int fetchValue(int id) { + await(Awaitable.delay(1)) + return id * 10 +} + +async int transform(int id) { + def raw = await(fetchValue(id)) + return raw + 1 +} + +async int pipeline(int id) { + def transformed = await(transform(id)) + return transformed + 100 +} + +int numCallers = 50 +def tasks = (0..<numCallers).collect { pipeline(it) } +def results = await(Awaitable.all(*tasks)) + +assert results.size() == numCallers +for (int i = 0; i < numCallers; i++) { + assert results[i] == i * 10 + 1 + 100 +} + ''' + } + + /** + * Retry pattern under concurrent failures: + * multiple tasks experience transient errors and are retried. + */ + @Test + void testRetryPatternUnderConcurrentFailures() { + assertScript ''' +import groovy.concurrent.Awaitable +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger +import groovy.transform.Field + +@Field def attemptCounts = new ConcurrentHashMap<Integer, AtomicInteger>() + +async int unstableService(int taskId) { + def counter = attemptCounts.computeIfAbsent(taskId, { new AtomicInteger(0) }) + int attempt = counter.incrementAndGet() + if (attempt <= 2) { + throw new IOException("Transient failure #${attempt} for task ${taskId}") + } + return taskId * 100 +} + +async int retryWithBackoff(int taskId, int maxRetries) { + for (int i = 0; i <= maxRetries; i++) { + try { + return await(unstableService(taskId)) + } catch (IOException e) { + if (i == maxRetries) throw e + await(Awaitable.delay(1)) + } + } + throw new AssertionError("unreachable") +} + +int numTasks = 20 +def tasks = (0..<numTasks).collect { retryWithBackoff(it, 3) } +def results = await(Awaitable.all(*tasks)) + +assert results.size() == numTasks +for (int i = 0; i < numTasks; i++) { + assert results[i] == i * 100 + assert attemptCounts[i].get() == 3 // 2 failures + 1 success +} + ''' + } + + /** + * Concurrent allSettled with mixed success and failure tasks. + * Verifies thread-safe result collection under contention. + */ + @Test + void testConcurrentAllSettledMixedResults() { + assertScript ''' +import groovy.concurrent.Awaitable +import groovy.concurrent.AwaitResult +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +async int succeedOrFail(int id) { + await(Awaitable.delay(1)) + if (id % 3 == 0) throw new RuntimeException("fail-${id}") + return id +} + +async List<AwaitResult> runBatch(int batchSize) { + def tasks = (0..<batchSize).collect { succeedOrFail(it) } + return await(Awaitable.allSettled(*tasks)) +} + +int batchSize = 30 +def results = await(runBatch(batchSize)) +assert results.size() == batchSize + +int successes = 0, failures = 0 +for (int i = 0; i < batchSize; i++) { + if (i % 3 == 0) { + assert results[i].isFailure() + assert results[i].error.message == "fail-${i}" + failures++ + } else { + assert results[i].isSuccess() + assert results[i].value == i + successes++ + } +} +assert successes == 20 +assert failures == 10 + ''' + } + + /** + * Multiple for-await loops consuming generators concurrently via Awaitable.all(). + * Each task runs its own generator and collects results independently. + */ + @Test + void testConcurrentForAwaitLoops() { + assertScript ''' +import groovy.concurrent.Awaitable + +async generateSequence(int start, int count) { + for (int i = start; i < start + count; i++) { + yield return i + } +} + +async List<Integer> consumeStream(int loopId, int itemsPerLoop) { + def items = [] + for await (n in generateSequence(loopId * 1000, itemsPerLoop)) { + items << n + } + return items +} + +int numLoops = 20 +int itemsPerLoop = 25 +def tasks = (0..<numLoops).collect { consumeStream(it, itemsPerLoop) } +def allResults = await(Awaitable.all(*tasks)) + +assert allResults.size() == numLoops +for (int loop = 0; loop < numLoops; loop++) { + def expected = (loop * 1000 ..< loop * 1000 + itemsPerLoop).toList() + assert allResults[loop] == expected +} + ''' + } + + /** + * Simulates a real-world fan-out/fan-in pattern: + * one coordinator dispatches work to multiple async workers, + * collects results, and aggregates them. + */ + @Test + void testFanOutFanInPattern() { + assertScript ''' +import groovy.concurrent.Awaitable + +async int processChunk(List<Integer> chunk) { + await(Awaitable.delay(1)) + return chunk.sum() +} + +async int distributedSum(List<Integer> data, int chunkSize) { + def chunks = data.collate(chunkSize) + def tasks = chunks.collect { processChunk(it) } + def results = await(Awaitable.all(*tasks)) + return results.sum() +} + +def data = (1..1000).toList() +def total = await(distributedSum(data, 50)) +assert total == (1..1000).sum() // 500500 + ''' + } + + /** + * Verifies that exceptions thrown in async generators propagate + * correctly to the consumer even under concurrent usage. + */ + @Test + void testGeneratorExceptionPropagationUnderConcurrency() { + assertScript ''' +import groovy.concurrent.Awaitable +import groovy.concurrent.AwaitResult + +async generateWithError(int failAt) { + for (int i = 0; i < 100; i++) { + if (i == failAt) throw new IllegalStateException("boom at ${i}") + yield return i + } +} + +async Map consumeWithErrorHandling(int consumerId, int failPoint) { + int count = 0 + String error = null + try { + for await (n in generateWithError(failPoint)) { + count++ + } + } catch (IllegalStateException e) { + error = e.message + } + return [count: count, error: error] +} + +int numConsumers = 15 +def tasks = (0..<numConsumers).collect { consumeWithErrorHandling(it, 5 + it) } +def allResults = await(Awaitable.all(*tasks)) + +assert allResults.size() == numConsumers +for (int c = 0; c < numConsumers; c++) { + assert allResults[c].error == "boom at ${5 + c}" + assert allResults[c].count == 5 + c // Items consumed before the error +} + ''' + } + + /** + * Stress test: many short-lived async tasks created and awaited rapidly. + * Verifies executor handles rapid task churn without thread leaks. + */ + @Test + void testRapidAsyncTaskChurn() { + assertScript ''' +import groovy.concurrent.Awaitable +import java.util.concurrent.atomic.AtomicInteger + +def completed = new AtomicInteger(0) + +async int quickTask(int n) { + return n + 1 +} + +// Create and await many small tasks in rapid succession +for (int batch = 0; batch < 10; batch++) { + def tasks = (0..<100).collect { quickTask(it) } + def results = await(Awaitable.all(*tasks)) + assert results.size() == 100 + for (int i = 0; i < 100; i++) { + assert results[i] == i + 1 + } + completed.addAndGet(100) +} + +assert completed.get() == 1000 + ''' + } } diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy b/src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy index 50d24a8ebe..a9926ff4d0 100644 --- a/src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy +++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy @@ -146,15 +146,17 @@ final class AsyncVirtualThreadTest { assertScript ''' import groovy.concurrent.Awaitable import java.util.concurrent.Executors + import java.util.concurrent.ExecutorService import java.util.concurrent.atomic.AtomicReference def savedExecutor = Awaitable.getExecutor() + ExecutorService customPool = Executors.newFixedThreadPool(2, { r -> + def t = new Thread(r) + t.setName("custom-async-" + t.getId()) + t.setDaemon(true) + t + }) try { - def customPool = Executors.newFixedThreadPool(2, { r -> - def t = new Thread(r) - t.setName("custom-async-" + t.getId()) - t - }) Awaitable.setExecutor(customPool) def asyncName = async { @@ -165,6 +167,7 @@ final class AsyncVirtualThreadTest { assert threadName.startsWith("custom-async-") } finally { Awaitable.setExecutor(savedExecutor) + customPool.shutdownNow() } ''' } @@ -174,20 +177,26 @@ final class AsyncVirtualThreadTest { assertScript ''' import groovy.concurrent.Awaitable import java.util.concurrent.Executors + import java.util.concurrent.ExecutorService def originalExecutor = Awaitable.getExecutor() // Set a custom executor - Awaitable.setExecutor(Executors.newSingleThreadExecutor()) - assert Awaitable.getExecutor() != originalExecutor - // Reset to null — should restore default - Awaitable.setExecutor(null) - def restored = Awaitable.getExecutor() - assert restored != null - // Verify it works - def task = async { 42 }; def awaitable = task() - assert await(awaitable) == 42 - // Restore original - Awaitable.setExecutor(originalExecutor) + ExecutorService tempPool = Executors.newSingleThreadExecutor() + try { + Awaitable.setExecutor(tempPool) + assert Awaitable.getExecutor() != originalExecutor + // Reset to null — should restore default + Awaitable.setExecutor(null) + def restored = Awaitable.getExecutor() + assert restored != null + // Verify it works + def task = async { 42 }; def awaitable = task() + assert await(awaitable) == 42 + // Restore original + Awaitable.setExecutor(originalExecutor) + } finally { + tempPool.shutdownNow() + } ''' } @@ -196,12 +205,14 @@ final class AsyncVirtualThreadTest { assertScript ''' import groovy.transform.Async import java.util.concurrent.Executor + import java.util.concurrent.ExecutorService import java.util.concurrent.Executors class CustomExecutorService { - static Executor myPool = Executors.newFixedThreadPool(1, { r -> + static ExecutorService myPool = Executors.newFixedThreadPool(1, { r -> def t = new Thread(r) t.setName("my-pool-thread") + t.setDaemon(true) t }) @@ -211,9 +222,13 @@ final class AsyncVirtualThreadTest { } } - def svc = new CustomExecutorService() - def result = svc.doWork().get() - assert result.startsWith("my-pool-thread") + try { + def svc = new CustomExecutorService() + def result = svc.doWork().get() + assert result.startsWith("my-pool-thread") + } finally { + CustomExecutorService.myPool.shutdownNow() + } ''' }
