This is an automated email from the ASF dual-hosted git repository. sunlan pushed a commit to branch GROOVY-9381_3 in repository https://gitbox.apache.org/repos/asf/groovy.git
commit 61da08eabe7a111b6d6c7e81cba84d7d92312f22 Author: Daniel Sun <[email protected]> AuthorDate: Sat Mar 28 15:42:23 2026 +0900 Tweak AsyncSupport.allAsync and awaitAll --- src/main/java/groovy/concurrent/Awaitable.java | 6 +- .../apache/groovy/runtime/async/AsyncSupport.java | 65 +++++++++++------ src/spec/doc/core-async-await.adoc | 23 ++++-- src/spec/test/AsyncAwaitSpecTest.groovy | 31 +++++++++ .../groovy/runtime/async/AsyncApiTest.groovy | 81 ++++++++++++++++++++++ 5 files changed, 177 insertions(+), 29 deletions(-) diff --git a/src/main/java/groovy/concurrent/Awaitable.java b/src/main/java/groovy/concurrent/Awaitable.java index 378e403f5a..c68e4c4da4 100644 --- a/src/main/java/groovy/concurrent/Awaitable.java +++ b/src/main/java/groovy/concurrent/Awaitable.java @@ -397,7 +397,11 @@ public interface Awaitable<T> { /** * Returns an {@code Awaitable} that completes when all given sources - * complete, with a list of their results in order. + * complete successfully, with a list of their results in order. + * <p> + * Like JavaScript's {@code Promise.all()}, the combined awaitable fails as + * soon as the first source fails. Remaining sources are not cancelled + * automatically; cancel them explicitly if that is required by your workflow. * <p> * Unlike blocking APIs, this returns immediately and the caller should * {@code await} the result: diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java index eb8d00d21d..29cd6f7450 100644 --- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java +++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java @@ -52,6 +52,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; /** * Internal runtime support for the {@code async}/{@code await} language feature @@ -488,8 +489,9 @@ public class AsyncSupport { /** * Waits for all given awaitables to complete and returns their results - * as a list, preserving order. If any awaitable fails, the exception is - * rethrown immediately. + * as a list, preserving argument order. Like JavaScript's + * {@code Promise.all()}, the combined wait fails as soon as any input + * fails; slower siblings are not cancelled automatically. * <p> * Returns an empty list if {@code awaitables} is {@code null} or empty. * @@ -504,16 +506,7 @@ public class AsyncSupport { return new ArrayList<>(); } CompletableFuture<?>[] futures = toCompletableFutures(awaitables, "awaitAll"); - try { - CompletableFuture.allOf(futures).join(); - } catch (CompletionException e) { - throw rethrowUnwrapped(e); - } - List<Object> results = new ArrayList<>(futures.length); - for (CompletableFuture<?> f : futures) { - results.add(f.join()); - } - return results; + return await(collectAllFailFast(futures)); } /** @@ -596,27 +589,53 @@ public class AsyncSupport { return futures; } + /** + * Collects results in input order while completing exceptionally as soon as + * the first input fails. Mirrors {@code Promise.all()} early-rejection + * semantics without implicitly cancelling sibling work. + * <p> + * Failures are unwrapped <em>before</em> they are stored on the combined + * future so that {@code await Awaitable.all(...)} and + * {@link #awaitAll(Object...)} both observe the original throwable type. + */ + private static CompletableFuture<List<Object>> collectAllFailFast(CompletableFuture<?>[] futures) { + CompletableFuture<List<Object>> combined = new CompletableFuture<>(); + Object[] results = new Object[futures.length]; + AtomicInteger remaining = new AtomicInteger(futures.length); + + for (int i = 0; i < futures.length; i++) { + final int index = i; + futures[i].whenComplete((value, error) -> { + if (error != null) { + combined.completeExceptionally(deepUnwrap(error)); + return; + } + + results[index] = value; + if (remaining.decrementAndGet() == 0) { + combined.complete(new ArrayList<Object>(Arrays.asList(results))); + } + }); + } + + return combined; + } + // ---- non-blocking combinators (return Awaitable) -------------------- /** * Non-blocking variant of {@link #awaitAll(Object...)} that returns an - * {@link Awaitable} instead of blocking. Used by - * {@link Awaitable#all(Object...)}. + * {@link Awaitable} instead of blocking. Like JavaScript's + * {@code Promise.all()}, the returned awaitable rejects on the first + * observed failure while preserving ordered results on success. + * Used by {@link Awaitable#all(Object...)}. */ public static Awaitable<List<Object>> allAsync(Object... sources) { if (sources == null || sources.length == 0) { return Awaitable.of(new ArrayList<>()); } CompletableFuture<?>[] futures = toCompletableFutures(sources, "Awaitable.all"); - CompletableFuture<List<Object>> combined = CompletableFuture.allOf(futures) - .thenApply(v -> { - List<Object> results = new ArrayList<>(futures.length); - for (CompletableFuture<?> f : futures) { - results.add(f.join()); - } - return results; - }); - return GroovyPromise.of(combined); + return GroovyPromise.of(collectAllFailFast(futures)); } /** diff --git a/src/spec/doc/core-async-await.adoc b/src/spec/doc/core-async-await.adoc index 39542a3680..3aea26ba7b 100644 --- a/src/spec/doc/core-async-await.adoc +++ b/src/spec/doc/core-async-await.adoc @@ -618,17 +618,27 @@ include::../test/AsyncAwaitSpecTest.groovy[tags=exception_multiple_tasks,indent= The `groovy.concurrent.Awaitable` interface provides static combinator methods for common async coordination patterns, analogous to JavaScript's `Promise.all()` / `Promise.any()` / `Promise.allSettled()`, C#'s `Task.WhenAll()` / `Task.WhenAny()`, and Kotlin's coroutine -coordination helpers. +coordination helpers. In particular, `Awaitable.all(...)` follows `Promise.all()`'s +early-rejection behavior: the combined awaitable fails as soon as one input fails. === `all` — Parallel Execution -Wait for all tasks to complete and collect their results. Fails fast on the first error: +Wait for all tasks to complete successfully and collect their results. Like JavaScript's +`Promise.all()`, the combined wait fails immediately on the first error: [source,groovy] ---- include::../test/AsyncAwaitSpecTest.groovy[tags=await_all,indent=0] ---- +The fail-fast behavior applies to the combined awaitable itself; slower inputs continue +running unless you cancel them explicitly or place them in an `AsyncScope`: + +[source,groovy] +---- +include::../test/AsyncAwaitSpecTest.groovy[tags=await_all_fail_fast,indent=0] +---- + === `any` — Race Pattern Return the result of the first task to complete: @@ -1332,7 +1342,7 @@ _(supports `AsyncStream`, `Flow.Publisher`, `AsyncChannel`, `Iterable`, Reactor | `try`/`finally` / `use` | `defer` -| **Wait all** +| **Wait all (fail-fast)** | `Awaitable.all(a, b, c)` | `Promise.all([a, b, c])` | `Task.WhenAll(a, b, c)` @@ -1817,8 +1827,11 @@ For full javadoc, see the API documentation for the `groovy.concurrent` package. | `from(source)` | Adapts `CompletableFuture`, `Future`, `Flow.Publisher`, etc. -| `all(a, b, ...)` / `allSettled(a, b, ...)` -| Parallel wait (fail-fast / collect all outcomes) +| `all(a, b, ...)` +| Parallel wait; fail fast on the first failure + +| `allSettled(a, b, ...)` +| Parallel wait; collect every outcome | `any(a, b, ...)` | Returns the first completed result; cancels the rest diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy b/src/spec/test/AsyncAwaitSpecTest.groovy index daa6af3e09..58d6478358 100644 --- a/src/spec/test/AsyncAwaitSpecTest.groovy +++ b/src/spec/test/AsyncAwaitSpecTest.groovy @@ -581,6 +581,37 @@ assert results == ["Alice", "Order#42", 100.0] ''' } + @Test + void testAwaitAllFailFast() { + assertScript ''' +// tag::await_all_fail_fast[] +import groovy.concurrent.Awaitable + +import java.io.IOException +import java.util.concurrent.TimeUnit + +async slowCall() { + await Awaitable.delay(5_000) + return "slow result" +} + +async caller() { + def started = System.nanoTime() + try { + await Awaitable.all(slowCall(), Awaitable.failed(new IOException("network error"))) + assert false : "should fail fast" + } catch (IOException e) { + def elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - started) + assert elapsedMillis < 2_000 + return e.message + } +} + +assert await(caller()) == "network error" +// end::await_all_fail_fast[] + ''' + } + @Test void testAwaitAny() { assertScript ''' diff --git a/src/test/groovy/org/apache/groovy/runtime/async/AsyncApiTest.groovy b/src/test/groovy/org/apache/groovy/runtime/async/AsyncApiTest.groovy index cd77c38e2b..19264c6142 100644 --- a/src/test/groovy/org/apache/groovy/runtime/async/AsyncApiTest.groovy +++ b/src/test/groovy/org/apache/groovy/runtime/async/AsyncApiTest.groovy @@ -499,6 +499,31 @@ class AsyncApiTest { assert ex.message == 'all-fail' } + @Test + void testAwaitAllFailsFastWithoutWaitingForSlowSiblings() { + def slow = new CompletableFuture<String>() + def failure = CompletableFuture.<String>failedFuture(new IOException('awaitAll-fail-fast')) + def outcome = new CompletableFuture<Throwable>() + def worker = Thread.start { + try { + AsyncSupport.awaitAll(slow, failure) + outcome.complete(null) + } catch (Throwable t) { + outcome.complete(t) + } + } + + try { + def thrown = outcome.get(1, TimeUnit.SECONDS) + assert thrown instanceof IOException + assert thrown.message == 'awaitAll-fail-fast' + assert !slow.isDone() + } finally { + slow.complete('slow-result') + worker.join(1_000) + } + } + @Test void testAwaitAnyWithFailedCompletableFutureUnwrapsCause() { def failed = new CompletableFuture<String>() @@ -674,6 +699,62 @@ class AsyncApiTest { assert aw.get() == ['cs', 'aw'] } + @Test + void testAllAsyncFailsFastWithoutWaitingForSlowSiblings() { + def slow = new CompletableFuture<String>() + def aw = AsyncSupport.allAsync(slow, CompletableFuture.<String>failedFuture(new IOException('allAsync-fail-fast'))) + + def ex = shouldFail(ExecutionException) { + aw.get(1, TimeUnit.SECONDS) + } + + assert ex.cause instanceof IOException + assert ex.cause.message == 'allAsync-fail-fast' + assert aw.isDone() + assert aw.isCompletedExceptionally() + assert !slow.isDone() + + slow.complete('slow-result') + } + + @Test + void testAwaitOnAllAsyncFailsFastWithOriginalCause() { + def slow = new CompletableFuture<String>() + def aw = AsyncSupport.allAsync(slow, CompletableFuture.<String>failedFuture(new IOException('allAsync-await-fail-fast'))) + def outcome = new CompletableFuture<Throwable>() + def worker = Thread.start { + try { + AsyncSupport.await(aw) + outcome.complete(null) + } catch (Throwable t) { + outcome.complete(t) + } + } + + try { + def thrown = outcome.get(1, TimeUnit.SECONDS) + assert thrown instanceof IOException + assert thrown.message == 'allAsync-await-fail-fast' + assert !slow.isDone() + } finally { + slow.complete('slow-result') + worker.join(1_000) + } + } + + @Test + void testAllAsyncPreservesInputOrderWhenSourcesResolveOutOfOrder() { + def first = new CompletableFuture<String>() + def second = new CompletableFuture<String>() + def aw = AsyncSupport.allAsync(first, second) + + second.complete('second') + assert !aw.isDone() + + first.complete('first') + assert aw.get(1, TimeUnit.SECONDS) == ['first', 'second'] + } + // ================================================================ // AsyncSupport.delay // ================================================================
