This is an automated email from the ASF dual-hosted git repository.
sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git
The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
new c0c5c74607 Tweak AsyncSupport.allAsync and awaitAll
c0c5c74607 is described below
commit c0c5c746075e8cd5e5df1d0a9e610c0eb021f653
Author: Daniel Sun <[email protected]>
AuthorDate: Sat Mar 28 15:42:23 2026 +0900
Tweak AsyncSupport.allAsync and awaitAll
---
src/main/java/groovy/concurrent/Awaitable.java | 6 +-
.../apache/groovy/runtime/async/AsyncSupport.java | 75 ++++++++++++++------
src/spec/doc/core-async-await.adoc | 23 ++++--
src/spec/test/AsyncAwaitSpecTest.groovy | 31 +++++++++
.../groovy/runtime/async/AsyncApiTest.groovy | 81 ++++++++++++++++++++++
5 files changed, 187 insertions(+), 29 deletions(-)
diff --git a/src/main/java/groovy/concurrent/Awaitable.java
b/src/main/java/groovy/concurrent/Awaitable.java
index 378e403f5a..c68e4c4da4 100644
--- a/src/main/java/groovy/concurrent/Awaitable.java
+++ b/src/main/java/groovy/concurrent/Awaitable.java
@@ -397,7 +397,11 @@ public interface Awaitable<T> {
/**
* Returns an {@code Awaitable} that completes when all given sources
- * complete, with a list of their results in order.
+ * complete successfully, with a list of their results in order.
+ * <p>
+ * Like JavaScript's {@code Promise.all()}, the combined awaitable fails as
+ * soon as the first source fails. Remaining sources are not cancelled
+ * automatically; cancel them explicitly if that is required by your
workflow.
* <p>
* Unlike blocking APIs, this returns immediately and the caller should
* {@code await} the result:
diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
index eb8d00d21d..bf0ad6e324 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
@@ -52,6 +52,7 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Internal runtime support for the {@code async}/{@code await} language
feature
@@ -488,8 +489,9 @@ public class AsyncSupport {
/**
* Waits for all given awaitables to complete and returns their results
- * as a list, preserving order. If any awaitable fails, the exception is
- * rethrown immediately.
+ * as a list, preserving argument order. Like JavaScript's
+ * {@code Promise.all()}, the combined wait fails as soon as any input
+ * fails; slower siblings are not cancelled automatically.
* <p>
* Returns an empty list if {@code awaitables} is {@code null} or empty.
*
@@ -504,16 +506,7 @@ public class AsyncSupport {
return new ArrayList<>();
}
CompletableFuture<?>[] futures = toCompletableFutures(awaitables,
"awaitAll");
- try {
- CompletableFuture.allOf(futures).join();
- } catch (CompletionException e) {
- throw rethrowUnwrapped(e);
- }
- List<Object> results = new ArrayList<>(futures.length);
- for (CompletableFuture<?> f : futures) {
- results.add(f.join());
- }
- return results;
+ return await(collectAllFailFast(futures));
}
/**
@@ -596,27 +589,63 @@ public class AsyncSupport {
return futures;
}
+ /**
+ * Collects results in input order while completing exceptionally as soon
as
+ * the first input fails. Mirrors {@code Promise.all()} early-rejection
+ * semantics without implicitly cancelling sibling work.
+ * <p>
+ * Failures are unwrapped <em>before</em> they are stored on the combined
+ * future so that {@code await Awaitable.all(...)} and
+ * {@link #awaitAll(Object...)} both observe the original throwable type.
+ */
+ private static CompletableFuture<List<Object>>
collectAllFailFast(CompletableFuture<?>[] futures) {
+ CompletableFuture<List<Object>> combined = new CompletableFuture<>();
+ Object[] results = new Object[futures.length];
+ AtomicInteger remaining = new AtomicInteger(futures.length);
+
+ for (int i = 0; i < futures.length; i++) {
+ final int index = i;
+ futures[i].whenComplete((value, error) -> {
+ if (error != null) {
+ completeWithUnwrappedFailure(combined, error);
+ return;
+ }
+
+ results[index] = value;
+ if (remaining.decrementAndGet() == 0) {
+ combined.complete(toOrderedResultList(results));
+ }
+ });
+ }
+
+ return combined;
+ }
+
+ private static void completeWithUnwrappedFailure(CompletableFuture<?>
future, Throwable error) {
+ future.completeExceptionally(deepUnwrap(error));
+ }
+
+ private static List<Object> toOrderedResultList(Object[] results) {
+ List<Object> ordered = new ArrayList<>(results.length);
+ ordered.addAll(Arrays.asList(results));
+ return ordered;
+ }
+
// ---- non-blocking combinators (return Awaitable) --------------------
/**
* Non-blocking variant of {@link #awaitAll(Object...)} that returns an
- * {@link Awaitable} instead of blocking. Used by
- * {@link Awaitable#all(Object...)}.
+ * {@link Awaitable} instead of blocking. Like JavaScript's
+ * {@code Promise.all()}, the returned awaitable rejects on the first
+ * observed failure while preserving ordered results on success.
+ * Used by {@link Awaitable#all(Object...)}.
*/
public static Awaitable<List<Object>> allAsync(Object... sources) {
if (sources == null || sources.length == 0) {
return Awaitable.of(new ArrayList<>());
}
CompletableFuture<?>[] futures = toCompletableFutures(sources,
"Awaitable.all");
- CompletableFuture<List<Object>> combined =
CompletableFuture.allOf(futures)
- .thenApply(v -> {
- List<Object> results = new ArrayList<>(futures.length);
- for (CompletableFuture<?> f : futures) {
- results.add(f.join());
- }
- return results;
- });
- return GroovyPromise.of(combined);
+ return GroovyPromise.of(collectAllFailFast(futures));
}
/**
diff --git a/src/spec/doc/core-async-await.adoc
b/src/spec/doc/core-async-await.adoc
index 39542a3680..3aea26ba7b 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -618,17 +618,27 @@
include::../test/AsyncAwaitSpecTest.groovy[tags=exception_multiple_tasks,indent=
The `groovy.concurrent.Awaitable` interface provides static combinator methods
for common async
coordination patterns, analogous to JavaScript's `Promise.all()` /
`Promise.any()` /
`Promise.allSettled()`, C#'s `Task.WhenAll()` / `Task.WhenAny()`, and Kotlin's
coroutine
-coordination helpers.
+coordination helpers. In particular, `Awaitable.all(...)` follows
`Promise.all()`'s
+early-rejection behavior: the combined awaitable fails as soon as one input
fails.
=== `all` — Parallel Execution
-Wait for all tasks to complete and collect their results. Fails fast on the
first error:
+Wait for all tasks to complete successfully and collect their results. Like
JavaScript's
+`Promise.all()`, the combined wait fails immediately on the first error:
[source,groovy]
----
include::../test/AsyncAwaitSpecTest.groovy[tags=await_all,indent=0]
----
+The fail-fast behavior applies to the combined awaitable itself; slower inputs
continue
+running unless you cancel them explicitly or place them in an `AsyncScope`:
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=await_all_fail_fast,indent=0]
+----
+
=== `any` — Race Pattern
Return the result of the first task to complete:
@@ -1332,7 +1342,7 @@ _(supports `AsyncStream`, `Flow.Publisher`,
`AsyncChannel`, `Iterable`, Reactor
| `try`/`finally` / `use`
| `defer`
-| **Wait all**
+| **Wait all (fail-fast)**
| `Awaitable.all(a, b, c)`
| `Promise.all([a, b, c])`
| `Task.WhenAll(a, b, c)`
@@ -1817,8 +1827,11 @@ For full javadoc, see the API documentation for the
`groovy.concurrent` package.
| `from(source)`
| Adapts `CompletableFuture`, `Future`, `Flow.Publisher`, etc.
-| `all(a, b, ...)` / `allSettled(a, b, ...)`
-| Parallel wait (fail-fast / collect all outcomes)
+| `all(a, b, ...)`
+| Parallel wait; fail fast on the first failure
+
+| `allSettled(a, b, ...)`
+| Parallel wait; collect every outcome
| `any(a, b, ...)`
| Returns the first completed result; cancels the rest
diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy
b/src/spec/test/AsyncAwaitSpecTest.groovy
index daa6af3e09..58d6478358 100644
--- a/src/spec/test/AsyncAwaitSpecTest.groovy
+++ b/src/spec/test/AsyncAwaitSpecTest.groovy
@@ -581,6 +581,37 @@ assert results == ["Alice", "Order#42", 100.0]
'''
}
+ @Test
+ void testAwaitAllFailFast() {
+ assertScript '''
+// tag::await_all_fail_fast[]
+import groovy.concurrent.Awaitable
+
+import java.io.IOException
+import java.util.concurrent.TimeUnit
+
+async slowCall() {
+ await Awaitable.delay(5_000)
+ return "slow result"
+}
+
+async caller() {
+ def started = System.nanoTime()
+ try {
+ await Awaitable.all(slowCall(), Awaitable.failed(new
IOException("network error")))
+ assert false : "should fail fast"
+ } catch (IOException e) {
+ def elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
started)
+ assert elapsedMillis < 2_000
+ return e.message
+ }
+}
+
+assert await(caller()) == "network error"
+// end::await_all_fail_fast[]
+ '''
+ }
+
@Test
void testAwaitAny() {
assertScript '''
diff --git
a/src/test/groovy/org/apache/groovy/runtime/async/AsyncApiTest.groovy
b/src/test/groovy/org/apache/groovy/runtime/async/AsyncApiTest.groovy
index cd77c38e2b..19264c6142 100644
--- a/src/test/groovy/org/apache/groovy/runtime/async/AsyncApiTest.groovy
+++ b/src/test/groovy/org/apache/groovy/runtime/async/AsyncApiTest.groovy
@@ -499,6 +499,31 @@ class AsyncApiTest {
assert ex.message == 'all-fail'
}
+ @Test
+ void testAwaitAllFailsFastWithoutWaitingForSlowSiblings() {
+ def slow = new CompletableFuture<String>()
+ def failure = CompletableFuture.<String>failedFuture(new
IOException('awaitAll-fail-fast'))
+ def outcome = new CompletableFuture<Throwable>()
+ def worker = Thread.start {
+ try {
+ AsyncSupport.awaitAll(slow, failure)
+ outcome.complete(null)
+ } catch (Throwable t) {
+ outcome.complete(t)
+ }
+ }
+
+ try {
+ def thrown = outcome.get(1, TimeUnit.SECONDS)
+ assert thrown instanceof IOException
+ assert thrown.message == 'awaitAll-fail-fast'
+ assert !slow.isDone()
+ } finally {
+ slow.complete('slow-result')
+ worker.join(1_000)
+ }
+ }
+
@Test
void testAwaitAnyWithFailedCompletableFutureUnwrapsCause() {
def failed = new CompletableFuture<String>()
@@ -674,6 +699,62 @@ class AsyncApiTest {
assert aw.get() == ['cs', 'aw']
}
+ @Test
+ void testAllAsyncFailsFastWithoutWaitingForSlowSiblings() {
+ def slow = new CompletableFuture<String>()
+ def aw = AsyncSupport.allAsync(slow,
CompletableFuture.<String>failedFuture(new IOException('allAsync-fail-fast')))
+
+ def ex = shouldFail(ExecutionException) {
+ aw.get(1, TimeUnit.SECONDS)
+ }
+
+ assert ex.cause instanceof IOException
+ assert ex.cause.message == 'allAsync-fail-fast'
+ assert aw.isDone()
+ assert aw.isCompletedExceptionally()
+ assert !slow.isDone()
+
+ slow.complete('slow-result')
+ }
+
+ @Test
+ void testAwaitOnAllAsyncFailsFastWithOriginalCause() {
+ def slow = new CompletableFuture<String>()
+ def aw = AsyncSupport.allAsync(slow,
CompletableFuture.<String>failedFuture(new
IOException('allAsync-await-fail-fast')))
+ def outcome = new CompletableFuture<Throwable>()
+ def worker = Thread.start {
+ try {
+ AsyncSupport.await(aw)
+ outcome.complete(null)
+ } catch (Throwable t) {
+ outcome.complete(t)
+ }
+ }
+
+ try {
+ def thrown = outcome.get(1, TimeUnit.SECONDS)
+ assert thrown instanceof IOException
+ assert thrown.message == 'allAsync-await-fail-fast'
+ assert !slow.isDone()
+ } finally {
+ slow.complete('slow-result')
+ worker.join(1_000)
+ }
+ }
+
+ @Test
+ void testAllAsyncPreservesInputOrderWhenSourcesResolveOutOfOrder() {
+ def first = new CompletableFuture<String>()
+ def second = new CompletableFuture<String>()
+ def aw = AsyncSupport.allAsync(first, second)
+
+ second.complete('second')
+ assert !aw.isDone()
+
+ first.complete('first')
+ assert aw.get(1, TimeUnit.SECONDS) == ['first', 'second']
+ }
+
// ================================================================
// AsyncSupport.delay
// ================================================================