This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
     new c0c5c74607 Tweak AsyncSupport.allAsync and awaitAll
c0c5c74607 is described below

commit c0c5c746075e8cd5e5df1d0a9e610c0eb021f653
Author: Daniel Sun <[email protected]>
AuthorDate: Sat Mar 28 15:42:23 2026 +0900

    Tweak AsyncSupport.allAsync and awaitAll
---
 src/main/java/groovy/concurrent/Awaitable.java     |  6 +-
 .../apache/groovy/runtime/async/AsyncSupport.java  | 75 ++++++++++++++------
 src/spec/doc/core-async-await.adoc                 | 23 ++++--
 src/spec/test/AsyncAwaitSpecTest.groovy            | 31 +++++++++
 .../groovy/runtime/async/AsyncApiTest.groovy       | 81 ++++++++++++++++++++++
 5 files changed, 187 insertions(+), 29 deletions(-)

diff --git a/src/main/java/groovy/concurrent/Awaitable.java 
b/src/main/java/groovy/concurrent/Awaitable.java
index 378e403f5a..c68e4c4da4 100644
--- a/src/main/java/groovy/concurrent/Awaitable.java
+++ b/src/main/java/groovy/concurrent/Awaitable.java
@@ -397,7 +397,11 @@ public interface Awaitable<T> {
 
     /**
      * Returns an {@code Awaitable} that completes when all given sources
-     * complete, with a list of their results in order.
+     * complete successfully, with a list of their results in order.
+     * <p>
+     * Like JavaScript's {@code Promise.all()}, the combined awaitable fails as
+     * soon as the first source fails. Remaining sources are not cancelled
+     * automatically; cancel them explicitly if that is required by your 
workflow.
      * <p>
      * Unlike blocking APIs, this returns immediately and the caller should
      * {@code await} the result:
diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java 
b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
index eb8d00d21d..bf0ad6e324 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
@@ -52,6 +52,7 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Internal runtime support for the {@code async}/{@code await} language 
feature
@@ -488,8 +489,9 @@ public class AsyncSupport {
 
     /**
      * Waits for all given awaitables to complete and returns their results
-     * as a list, preserving order.  If any awaitable fails, the exception is
-     * rethrown immediately.
+     * as a list, preserving argument order.  Like JavaScript's
+     * {@code Promise.all()}, the combined wait fails as soon as any input
+     * fails; slower siblings are not cancelled automatically.
      * <p>
      * Returns an empty list if {@code awaitables} is {@code null} or empty.
      *
@@ -504,16 +506,7 @@ public class AsyncSupport {
             return new ArrayList<>();
         }
         CompletableFuture<?>[] futures = toCompletableFutures(awaitables, 
"awaitAll");
-        try {
-            CompletableFuture.allOf(futures).join();
-        } catch (CompletionException e) {
-            throw rethrowUnwrapped(e);
-        }
-        List<Object> results = new ArrayList<>(futures.length);
-        for (CompletableFuture<?> f : futures) {
-            results.add(f.join());
-        }
-        return results;
+        return await(collectAllFailFast(futures));
     }
 
     /**
@@ -596,27 +589,63 @@ public class AsyncSupport {
         return futures;
     }
 
+    /**
+     * Collects results in input order while completing exceptionally as soon 
as
+     * the first input fails.  Mirrors {@code Promise.all()} early-rejection
+     * semantics without implicitly cancelling sibling work.
+     * <p>
+     * Failures are unwrapped <em>before</em> they are stored on the combined
+     * future so that {@code await Awaitable.all(...)} and
+     * {@link #awaitAll(Object...)} both observe the original throwable type.
+     */
+    private static CompletableFuture<List<Object>> 
collectAllFailFast(CompletableFuture<?>[] futures) {
+        CompletableFuture<List<Object>> combined = new CompletableFuture<>();
+        Object[] results = new Object[futures.length];
+        AtomicInteger remaining = new AtomicInteger(futures.length);
+
+        for (int i = 0; i < futures.length; i++) {
+            final int index = i;
+            futures[i].whenComplete((value, error) -> {
+                if (error != null) {
+                    completeWithUnwrappedFailure(combined, error);
+                    return;
+                }
+
+                results[index] = value;
+                if (remaining.decrementAndGet() == 0) {
+                    combined.complete(toOrderedResultList(results));
+                }
+            });
+        }
+
+        return combined;
+    }
+
+    private static void completeWithUnwrappedFailure(CompletableFuture<?> 
future, Throwable error) {
+        future.completeExceptionally(deepUnwrap(error));
+    }
+
+    private static List<Object> toOrderedResultList(Object[] results) {
+        List<Object> ordered = new ArrayList<>(results.length);
+        ordered.addAll(Arrays.asList(results));
+        return ordered;
+    }
+
     // ---- non-blocking combinators (return Awaitable) --------------------
 
     /**
      * Non-blocking variant of {@link #awaitAll(Object...)} that returns an
-     * {@link Awaitable} instead of blocking.  Used by
-     * {@link Awaitable#all(Object...)}.
+     * {@link Awaitable} instead of blocking.  Like JavaScript's
+     * {@code Promise.all()}, the returned awaitable rejects on the first
+     * observed failure while preserving ordered results on success.
+     * Used by {@link Awaitable#all(Object...)}.
      */
     public static Awaitable<List<Object>> allAsync(Object... sources) {
         if (sources == null || sources.length == 0) {
             return Awaitable.of(new ArrayList<>());
         }
         CompletableFuture<?>[] futures = toCompletableFutures(sources, 
"Awaitable.all");
-        CompletableFuture<List<Object>> combined = 
CompletableFuture.allOf(futures)
-                .thenApply(v -> {
-                    List<Object> results = new ArrayList<>(futures.length);
-                    for (CompletableFuture<?> f : futures) {
-                        results.add(f.join());
-                    }
-                    return results;
-                });
-        return GroovyPromise.of(combined);
+        return GroovyPromise.of(collectAllFailFast(futures));
     }
 
     /**
diff --git a/src/spec/doc/core-async-await.adoc 
b/src/spec/doc/core-async-await.adoc
index 39542a3680..3aea26ba7b 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -618,17 +618,27 @@ 
include::../test/AsyncAwaitSpecTest.groovy[tags=exception_multiple_tasks,indent=
 The `groovy.concurrent.Awaitable` interface provides static combinator methods 
for common async
 coordination patterns, analogous to JavaScript's `Promise.all()` / 
`Promise.any()` /
 `Promise.allSettled()`, C#'s `Task.WhenAll()` / `Task.WhenAny()`, and Kotlin's 
coroutine
-coordination helpers.
+coordination helpers. In particular, `Awaitable.all(...)` follows 
`Promise.all()`'s
+early-rejection behavior: the combined awaitable fails as soon as one input 
fails.
 
 === `all` — Parallel Execution
 
-Wait for all tasks to complete and collect their results. Fails fast on the 
first error:
+Wait for all tasks to complete successfully and collect their results. Like 
JavaScript's
+`Promise.all()`, the combined wait fails immediately on the first error:
 
 [source,groovy]
 ----
 include::../test/AsyncAwaitSpecTest.groovy[tags=await_all,indent=0]
 ----
 
+The fail-fast behavior applies to the combined awaitable itself; slower inputs 
continue
+running unless you cancel them explicitly or place them in an `AsyncScope`:
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=await_all_fail_fast,indent=0]
+----
+
 === `any` — Race Pattern
 
 Return the result of the first task to complete:
@@ -1332,7 +1342,7 @@ _(supports `AsyncStream`, `Flow.Publisher`, 
`AsyncChannel`, `Iterable`, Reactor
 | `try`/`finally` / `use`
 | `defer`
 
-| **Wait all**
+| **Wait all (fail-fast)**
 | `Awaitable.all(a, b, c)`
 | `Promise.all([a, b, c])`
 | `Task.WhenAll(a, b, c)`
@@ -1817,8 +1827,11 @@ For full javadoc, see the API documentation for the 
`groovy.concurrent` package.
 | `from(source)`
 | Adapts `CompletableFuture`, `Future`, `Flow.Publisher`, etc.
 
-| `all(a, b, ...)` / `allSettled(a, b, ...)`
-| Parallel wait (fail-fast / collect all outcomes)
+| `all(a, b, ...)`
+| Parallel wait; fail fast on the first failure
+
+| `allSettled(a, b, ...)`
+| Parallel wait; collect every outcome
 
 | `any(a, b, ...)`
 | Returns the first completed result; cancels the rest
diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy 
b/src/spec/test/AsyncAwaitSpecTest.groovy
index daa6af3e09..58d6478358 100644
--- a/src/spec/test/AsyncAwaitSpecTest.groovy
+++ b/src/spec/test/AsyncAwaitSpecTest.groovy
@@ -581,6 +581,37 @@ assert results == ["Alice", "Order#42", 100.0]
         '''
     }
 
+    @Test
+    void testAwaitAllFailFast() {
+        assertScript '''
+// tag::await_all_fail_fast[]
+import groovy.concurrent.Awaitable
+
+import java.io.IOException
+import java.util.concurrent.TimeUnit
+
+async slowCall() {
+    await Awaitable.delay(5_000)
+    return "slow result"
+}
+
+async caller() {
+    def started = System.nanoTime()
+    try {
+        await Awaitable.all(slowCall(), Awaitable.failed(new 
IOException("network error")))
+        assert false : "should fail fast"
+    } catch (IOException e) {
+        def elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
started)
+        assert elapsedMillis < 2_000
+        return e.message
+    }
+}
+
+assert await(caller()) == "network error"
+// end::await_all_fail_fast[]
+        '''
+    }
+
     @Test
     void testAwaitAny() {
         assertScript '''
diff --git 
a/src/test/groovy/org/apache/groovy/runtime/async/AsyncApiTest.groovy 
b/src/test/groovy/org/apache/groovy/runtime/async/AsyncApiTest.groovy
index cd77c38e2b..19264c6142 100644
--- a/src/test/groovy/org/apache/groovy/runtime/async/AsyncApiTest.groovy
+++ b/src/test/groovy/org/apache/groovy/runtime/async/AsyncApiTest.groovy
@@ -499,6 +499,31 @@ class AsyncApiTest {
         assert ex.message == 'all-fail'
     }
 
+    @Test
+    void testAwaitAllFailsFastWithoutWaitingForSlowSiblings() {
+        def slow = new CompletableFuture<String>()
+        def failure = CompletableFuture.<String>failedFuture(new 
IOException('awaitAll-fail-fast'))
+        def outcome = new CompletableFuture<Throwable>()
+        def worker = Thread.start {
+            try {
+                AsyncSupport.awaitAll(slow, failure)
+                outcome.complete(null)
+            } catch (Throwable t) {
+                outcome.complete(t)
+            }
+        }
+
+        try {
+            def thrown = outcome.get(1, TimeUnit.SECONDS)
+            assert thrown instanceof IOException
+            assert thrown.message == 'awaitAll-fail-fast'
+            assert !slow.isDone()
+        } finally {
+            slow.complete('slow-result')
+            worker.join(1_000)
+        }
+    }
+
     @Test
     void testAwaitAnyWithFailedCompletableFutureUnwrapsCause() {
         def failed = new CompletableFuture<String>()
@@ -674,6 +699,62 @@ class AsyncApiTest {
         assert aw.get() == ['cs', 'aw']
     }
 
+    @Test
+    void testAllAsyncFailsFastWithoutWaitingForSlowSiblings() {
+        def slow = new CompletableFuture<String>()
+        def aw = AsyncSupport.allAsync(slow, 
CompletableFuture.<String>failedFuture(new IOException('allAsync-fail-fast')))
+
+        def ex = shouldFail(ExecutionException) {
+            aw.get(1, TimeUnit.SECONDS)
+        }
+
+        assert ex.cause instanceof IOException
+        assert ex.cause.message == 'allAsync-fail-fast'
+        assert aw.isDone()
+        assert aw.isCompletedExceptionally()
+        assert !slow.isDone()
+
+        slow.complete('slow-result')
+    }
+
+    @Test
+    void testAwaitOnAllAsyncFailsFastWithOriginalCause() {
+        def slow = new CompletableFuture<String>()
+        def aw = AsyncSupport.allAsync(slow, 
CompletableFuture.<String>failedFuture(new 
IOException('allAsync-await-fail-fast')))
+        def outcome = new CompletableFuture<Throwable>()
+        def worker = Thread.start {
+            try {
+                AsyncSupport.await(aw)
+                outcome.complete(null)
+            } catch (Throwable t) {
+                outcome.complete(t)
+            }
+        }
+
+        try {
+            def thrown = outcome.get(1, TimeUnit.SECONDS)
+            assert thrown instanceof IOException
+            assert thrown.message == 'allAsync-await-fail-fast'
+            assert !slow.isDone()
+        } finally {
+            slow.complete('slow-result')
+            worker.join(1_000)
+        }
+    }
+
+    @Test
+    void testAllAsyncPreservesInputOrderWhenSourcesResolveOutOfOrder() {
+        def first = new CompletableFuture<String>()
+        def second = new CompletableFuture<String>()
+        def aw = AsyncSupport.allAsync(first, second)
+
+        second.complete('second')
+        assert !aw.isDone()
+
+        first.complete('first')
+        assert aw.get(1, TimeUnit.SECONDS) == ['first', 'second']
+    }
+
     // ================================================================
     // AsyncSupport.delay
     // ================================================================

Reply via email to