This is an automated email from the ASF dual-hosted git repository.
sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git
The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
new 6529bbeaaa Minor tweaks
6529bbeaaa is described below
commit 6529bbeaaabf745b0300492cecbb0a563bf098ed
Author: Daniel Sun <[email protected]>
AuthorDate: Sat Mar 14 10:52:54 2026 +0900
Minor tweaks
---
src/main/java/groovy/concurrent/AsyncStream.java | 30 ++
src/main/java/groovy/concurrent/Awaitable.java | 39 +-
.../concurrent/AwaitableAdapterRegistry.java | 14 +-
src/main/java/groovy/transform/Async.java | 4 +-
.../apache/groovy/runtime/async/AsyncSupport.java | 17 +-
src/spec/doc/core-async-await.adoc | 24 +-
src/spec/test/AsyncAwaitSpecTest.groovy | 38 +-
.../codehaus/groovy/transform/AsyncApiTest.groovy | 438 +++++++++++++++++++--
.../transform/AsyncFrameworkIntegrationTest.groovy | 20 +-
.../groovy/transform/AsyncTransformTest.groovy | 18 +-
10 files changed, 578 insertions(+), 64 deletions(-)
diff --git a/src/main/java/groovy/concurrent/AsyncStream.java
b/src/main/java/groovy/concurrent/AsyncStream.java
index 26c7abe316..4baa81115a 100644
--- a/src/main/java/groovy/concurrent/AsyncStream.java
+++ b/src/main/java/groovy/concurrent/AsyncStream.java
@@ -75,6 +75,36 @@ public interface AsyncStream<T> extends AutoCloseable {
default void close() {
}
+ /**
+ * Converts the given source to an {@code AsyncStream}.
+ * <p>
+ * If the source is already an {@code AsyncStream}, it is returned as-is.
+ * Otherwise, the {@link AwaitableAdapterRegistry} is consulted to find a
+ * suitable adapter. Built-in adapters handle {@link Iterable} and
+ * {@link java.util.Iterator}; the auto-discovered {@code
FlowPublisherAdapter}
+ * handles {@link java.util.concurrent.Flow.Publisher}; third-party
frameworks
+ * can register additional adapters via the registry.
+ * <p>
+ * This is the recommended entry point for converting external collection
or
+ * reactive types to {@code AsyncStream}:
+ * <pre>
+ * AsyncStream<String> stream = AsyncStream.from(myList)
+ * AsyncStream<Integer> stream2 = AsyncStream.from(myFlowPublisher)
+ * </pre>
+ *
+ * @param source the source object; must not be {@code null}
+ * @param <T> the element type
+ * @return an async stream backed by the source
+ * @throws IllegalArgumentException if {@code source} is {@code null}
+ * or no adapter supports the source type
+ * @see AwaitableAdapterRegistry#toAsyncStream(Object)
+ * @since 6.0.0
+ */
+ @SuppressWarnings("unchecked")
+ static <T> AsyncStream<T> from(Object source) {
+ return AwaitableAdapterRegistry.toAsyncStream(source);
+ }
+
/**
* Returns an empty {@code AsyncStream} that completes immediately.
*/
diff --git a/src/main/java/groovy/concurrent/Awaitable.java
b/src/main/java/groovy/concurrent/Awaitable.java
index ce41b7f947..d7bf8fb689 100644
--- a/src/main/java/groovy/concurrent/Awaitable.java
+++ b/src/main/java/groovy/concurrent/Awaitable.java
@@ -54,14 +54,18 @@ import java.util.function.Function;
* {@code Promise.allSettled()}</li>
* <li>{@link #delay(long) Awaitable.delay(ms)} — like
* {@code Task.Delay()} / {@code setTimeout}</li>
- * <li>{@link #timeout(Object, long) Awaitable.timeout(task, ms)} — like
+ * <li>{@link #orTimeoutMillis(Object, long) Awaitable.orTimeoutMillis(task,
ms)} — like
* Kotlin's {@code withTimeout} or a JavaScript promise raced against a
timer</li>
- * <li>{@link #timeoutOr(Object, Object, long) Awaitable.timeoutOr(task,
fallback, ms)} —
+ * <li>{@link #completeOnTimeoutMillis(Object, Object, long)
+ * Awaitable.completeOnTimeoutMillis(task, fallback, ms)} —
* like a timeout with fallback/default value</li>
* </ul>
* <p>
- * <b>Static factories:</b>
+ * <b>Static factories and conversion:</b>
* <ul>
+ * <li>{@link #from(Object) Awaitable.from(source)} — converts any supported
+ * async type (CompletableFuture, CompletionStage, Future,
Flow.Publisher, etc.)
+ * to an {@code Awaitable}</li>
* <li>{@link #of(Object) Awaitable.of(value)} — like
* {@code Task.FromResult()} / {@code Promise.resolve()}</li>
* <li>{@link #failed(Throwable) Awaitable.failed(error)} — like
@@ -297,6 +301,35 @@ public interface Awaitable<T> {
// ---- Static factories ----
+ /**
+ * Converts the given source to an {@code Awaitable}.
+ * <p>
+ * If the source is already an {@code Awaitable}, it is returned as-is.
+ * Otherwise, the {@link AwaitableAdapterRegistry} is consulted to find a
+ * suitable adapter. Built-in adapters handle {@link CompletableFuture},
+ * {@link java.util.concurrent.CompletionStage}, {@link
java.util.concurrent.Future},
+ * and {@link java.util.concurrent.Flow.Publisher}; third-party frameworks
+ * can register additional adapters via the registry.
+ * <p>
+ * This is the recommended entry point for converting external async types
+ * to {@code Awaitable}:
+ * <pre>
+ * Awaitable<String> aw = Awaitable.from(someCompletableFuture)
+ * Awaitable<Integer> aw2 = Awaitable.from(someReactorMono)
+ * </pre>
+ *
+ * @param source the source object; must not be {@code null}
+ * @param <T> the result type
+ * @return an awaitable backed by the source
+ * @throws IllegalArgumentException if {@code source} is {@code null}
+ * or no adapter supports the source type
+ * @see AwaitableAdapterRegistry#toAwaitable(Object)
+ * @since 6.0.0
+ */
+ static <T> Awaitable<T> from(Object source) {
+ return AwaitableAdapterRegistry.toAwaitable(source);
+ }
+
/**
* Returns an already-completed {@code Awaitable} with the given value.
* Analogous to C#'s {@code Task.FromResult()} or JavaScript's
diff --git a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
index a3500a13d3..61398d154d 100644
--- a/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
+++ b/src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java
@@ -111,13 +111,18 @@ public class AwaitableAdapterRegistry {
/**
* Converts the given source to an {@link Awaitable}.
* If the source is already an {@code Awaitable}, it is returned as-is.
+ * <p>
+ * <b>Tip:</b> user code should generally prefer {@link
Awaitable#from(Object)},
+ * which delegates to this method but is more discoverable from the
+ * {@code Awaitable} type itself.
*
* @param source the source object; must not be {@code null}
* @throws IllegalArgumentException if {@code source} is {@code null}
* or no adapter supports the source type
+ * @see Awaitable#from(Object)
*/
@SuppressWarnings("unchecked")
- public static <T> Awaitable<T> toAwaitable(Object source) {
+ static <T> Awaitable<T> toAwaitable(Object source) {
if (source == null) {
throw new IllegalArgumentException("Cannot convert null to
Awaitable");
}
@@ -136,13 +141,18 @@ public class AwaitableAdapterRegistry {
/**
* Converts the given source to an {@link AsyncStream}.
* If the source is already an {@code AsyncStream}, it is returned as-is.
+ * <p>
+ * <b>Tip:</b> user code should generally prefer {@link
AsyncStream#from(Object)},
+ * which delegates to this method but is more discoverable from the
+ * {@code AsyncStream} type itself.
*
* @param source the source object; must not be {@code null}
* @throws IllegalArgumentException if {@code source} is {@code null}
* or no adapter supports the source type
+ * @see AsyncStream#from(Object)
*/
@SuppressWarnings("unchecked")
- public static <T> AsyncStream<T> toAsyncStream(Object source) {
+ static <T> AsyncStream<T> toAsyncStream(Object source) {
if (source == null) {
throw new IllegalArgumentException("Cannot convert null to
AsyncStream");
}
diff --git a/src/main/java/groovy/transform/Async.java
b/src/main/java/groovy/transform/Async.java
index 90e9cdca4b..2e09ba2a46 100644
--- a/src/main/java/groovy/transform/Async.java
+++ b/src/main/java/groovy/transform/Async.java
@@ -107,7 +107,9 @@ import java.lang.annotation.Target;
* <ul>
* <li>Cannot be applied to abstract methods</li>
* <li>Cannot be applied to constructors</li>
- * <li>Cannot be applied to methods that already return {@code
Awaitable}</li>
+ * <li>Cannot be applied to methods that already return an async type
+ * ({@code Awaitable}, {@code AsyncStream}, {@code CompletableFuture},
+ * {@code CompletionStage}, or {@code Future})</li>
* </ul>
*
* @see groovy.concurrent.Awaitable
diff --git a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
index 5d8f4aaaf4..96658f81e8 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java
@@ -21,7 +21,6 @@ package org.apache.groovy.runtime.async;
import groovy.concurrent.AsyncStream;
import groovy.concurrent.AwaitResult;
import groovy.concurrent.Awaitable;
-import groovy.concurrent.AwaitableAdapterRegistry;
import groovy.lang.Closure;
import java.lang.invoke.MethodHandle;
@@ -64,8 +63,9 @@ import java.util.concurrent.TimeoutException;
* <ul>
* <li><b>Async execution</b> — {@code executeAsync()}, {@code
executeAsyncVoid()},
* and {@code wrapAsync()} run closures on the configured executor</li>
- * <li><b>Await</b> — all {@code await()} overloads go through the
- * {@link AwaitableAdapterRegistry} so that third-party async types
+ * <li><b>Await</b> — all {@code await()} overloads use
+ * {@link groovy.concurrent.Awaitable#from(Object) Awaitable.from()} so
+ * that third-party async types
* (RxJava {@code Single}, Reactor {@code Mono}, etc.) are supported
* transparently once an adapter is registered</li>
* <li><b>Async generators</b> — {@code generateAsyncStream()} manages the
@@ -112,7 +112,6 @@ import java.util.concurrent.TimeoutException;
*
* @see groovy.concurrent.Awaitable
* @see groovy.transform.Async
- * @see Awaitable
* @since 6.0.0
*/
public class AsyncSupport {
@@ -267,8 +266,8 @@ public class AsyncSupport {
}
/**
- * Awaits an arbitrary object by adapting it to {@link Awaitable} via the
- * {@link AwaitableAdapterRegistry}. This is the fallback overload called
+ * Awaits an arbitrary object by adapting it to {@link Awaitable} via
+ * {@link Awaitable#from(Object)}. This is the fallback overload called
* by the {@code await} expression when the compile-time type is not one
* of the other supported types. Returns {@code null} for a {@code null}
* argument.
@@ -285,7 +284,7 @@ public class AsyncSupport {
if (source instanceof CompletionStage) return
await((CompletionStage<T>) source);
if (source instanceof Future) return await((Future<T>) source);
if (source instanceof Closure) return awaitClosure((Closure<?>)
source);
- return await(AwaitableAdapterRegistry.<T>toAwaitable(source));
+ return await(Awaitable.<T>from(source));
}
/**
@@ -543,7 +542,7 @@ public class AsyncSupport {
if (source instanceof CompletableFuture<?> cf) return cf;
if (source instanceof Awaitable<?> a) return a.toCompletableFuture();
if (source instanceof CompletionStage<?> cs) return
cs.toCompletableFuture();
- return
AwaitableAdapterRegistry.toAwaitable(source).toCompletableFuture();
+ return Awaitable.from(source).toCompletableFuture();
}
// ---- non-blocking combinators (return Awaitable) --------------------
@@ -813,7 +812,7 @@ public class AsyncSupport {
public static <T> AsyncStream<T> toAsyncStream(Object source) {
if (source == null) return AsyncStream.empty();
if (source instanceof AsyncStream) return (AsyncStream<T>) source;
- return AwaitableAdapterRegistry.toAsyncStream(source);
+ return AsyncStream.from(source);
}
/**
diff --git a/src/spec/doc/core-async-await.adoc
b/src/spec/doc/core-async-await.adoc
index cb9c88ee2c..76b6a8b328 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -638,6 +638,24 @@ This is conceptually similar to JavaScript async
iterators' `return()`, C#'s
[[adapter-registry]]
== Adapter Registry
+=== Converting External Types with `from()`
+
+The `Awaitable.from(source)` and `AsyncStream.from(source)` static methods are
the recommended
+entry points for converting external async types to Groovy's native
abstractions. They delegate
+to the `AwaitableAdapterRegistry` internally but provide a more discoverable,
user-friendly API:
+
+[source,groovy]
+----
+include::../test/AsyncAwaitSpecTest.groovy[tags=from_conversion,indent=0]
+----
+
+`Awaitable.from()` accepts any type supported by a registered adapter:
`CompletableFuture`,
+`CompletionStage`, `Future`, `Flow.Publisher`, or any third-party type with a
custom adapter.
+If the source is already an `Awaitable`, it is returned as-is.
`AsyncStream.from()` works
+similarly for multi-value sources such as `Iterable`, `Iterator`, or
`Flow.Publisher`.
+
+=== Built-in Adapters
+
The `groovy.concurrent.AwaitableAdapterRegistry` allows extending `await` to
support additional
asynchronous types from third-party frameworks. The built-in adapter handles:
@@ -1131,7 +1149,8 @@ JavaScript, C#, Kotlin, and Swift, for developers
familiar with those languages.
| task priority / executor chosen by runtime
| **Third-party support**
-| `AwaitableAdapterRegistry` SPI
+| `AwaitableAdapterRegistry` SPI +
+`Awaitable.from()` / `AsyncStream.from()`
| _(native `thenable` protocol)_
| Custom awaiters via `GetAwaiter()`
| coroutine adapters / bridges
@@ -1195,6 +1214,9 @@ JavaScript, C#, Kotlin, and Swift, for developers
familiar with those languages.
| Pre-computed result
| `Awaitable.of(value)` / `Awaitable.failed(error)`
+| Type conversion
+| `Awaitable.from(source)` / `AsyncStream.from(source)`
+
| Annotation form
| `@Async def methodName() { ... }`
diff --git a/src/spec/test/AsyncAwaitSpecTest.groovy
b/src/spec/test/AsyncAwaitSpecTest.groovy
index fd3636ec80..5040424dae 100644
--- a/src/spec/test/AsyncAwaitSpecTest.groovy
+++ b/src/spec/test/AsyncAwaitSpecTest.groovy
@@ -1205,7 +1205,43 @@ assert task.isDone()
}
//
=========================================================================
- // 19. Custom adapter registration
+ // 19. Type conversion with from()
+ //
=========================================================================
+
+ @Test
+ void testFromConversion() {
+ assertScript '''
+// tag::from_conversion[]
+import groovy.concurrent.Awaitable
+import groovy.concurrent.AsyncStream
+import java.util.concurrent.CompletableFuture
+
+// Convert a CompletableFuture to Awaitable
+def cf = CompletableFuture.completedFuture("hello")
+Awaitable<String> awaitable = Awaitable.from(cf)
+assert awaitable.get() == "hello"
+
+// If the source is already an Awaitable, it is returned as-is
+def original = Awaitable.of(42)
+assert Awaitable.from(original).is(original)
+
+// Convert an Iterable to AsyncStream
+AsyncStream<String> stream = AsyncStream.from(["a", "b", "c"])
+def items = []
+assert stream.moveNext().get() == true
+items << stream.current
+assert stream.moveNext().get() == true
+items << stream.current
+assert stream.moveNext().get() == true
+items << stream.current
+assert stream.moveNext().get() == false
+assert items == ["a", "b", "c"]
+// end::from_conversion[]
+ '''
+ }
+
+ //
=========================================================================
+ // 20. Custom adapter registration
//
=========================================================================
@Test
diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
index 34886a564d..7dabc83a79 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
@@ -1154,13 +1154,13 @@ class AsyncApiTest {
}
// ================================================================
- // AwaitableAdapterRegistry
+ // Awaitable.from / AsyncStream.from / AwaitableAdapterRegistry
// ================================================================
@Test
void testToAwaitableNull() {
try {
- AwaitableAdapterRegistry.toAwaitable(null)
+ Awaitable.from(null)
assert false
} catch (IllegalArgumentException e) {
assert e.message.contains('null')
@@ -1170,7 +1170,7 @@ class AsyncApiTest {
@Test
void testToAsyncStreamNull() {
try {
- AwaitableAdapterRegistry.toAsyncStream(null)
+ AsyncStream.from(null)
assert false
} catch (IllegalArgumentException e) {
assert e.message.contains('null')
@@ -1180,13 +1180,13 @@ class AsyncApiTest {
@Test
void testToAsyncStreamPassthrough() {
AsyncStream<String> stream = AsyncStream.empty()
- assert AwaitableAdapterRegistry.toAsyncStream(stream).is(stream)
+ assert AsyncStream.from(stream).is(stream)
}
@Test
void testToAwaitableUnsupportedType() {
try {
- AwaitableAdapterRegistry.toAwaitable(new Object())
+ Awaitable.from(new Object())
assert false
} catch (IllegalArgumentException e) {
assert e.message.contains('No AwaitableAdapter found')
@@ -1196,7 +1196,7 @@ class AsyncApiTest {
@Test
void testToAsyncStreamUnsupportedType() {
try {
- AwaitableAdapterRegistry.toAsyncStream(42)
+ AsyncStream.from(42)
assert false
} catch (IllegalArgumentException e) {
assert e.message.contains('No AsyncStream adapter')
@@ -1206,7 +1206,7 @@ class AsyncApiTest {
@Test
void testToAwaitableNullThrows() {
def ex = shouldFail(IllegalArgumentException) {
- AwaitableAdapterRegistry.toAwaitable(null)
+ Awaitable.from(null)
}
assert ex.message.contains('Cannot convert null to Awaitable')
}
@@ -1214,7 +1214,7 @@ class AsyncApiTest {
@Test
void testToAwaitableUnknownTypeThrows() {
def ex = shouldFail(IllegalArgumentException) {
- AwaitableAdapterRegistry.toAwaitable(new StringBuilder("test"))
+ Awaitable.from(new StringBuilder("test"))
}
assert ex.message.contains('No AwaitableAdapter found for type')
assert ex.message.contains('StringBuilder')
@@ -1223,7 +1223,7 @@ class AsyncApiTest {
@Test
void testToAsyncStreamNullThrows() {
def ex = shouldFail(IllegalArgumentException) {
- AwaitableAdapterRegistry.toAsyncStream(null)
+ AsyncStream.from(null)
}
assert ex.message.contains('Cannot convert null to AsyncStream')
}
@@ -1231,7 +1231,7 @@ class AsyncApiTest {
@Test
void testToAsyncStreamUnknownTypeThrows() {
def ex = shouldFail(IllegalArgumentException) {
- AwaitableAdapterRegistry.toAsyncStream(new StringBuilder("test"))
+ AsyncStream.from(new StringBuilder("test"))
}
assert ex.message.contains('No AsyncStream adapter found for type')
assert ex.message.contains('StringBuilder')
@@ -1240,7 +1240,7 @@ class AsyncApiTest {
@Test
void testToAwaitablePassthrough() {
Awaitable<String> aw = Awaitable.of('pass')
- assert AwaitableAdapterRegistry.toAwaitable(aw).is(aw)
+ assert Awaitable.from(aw).is(aw)
}
@Test
@@ -1254,6 +1254,144 @@ class AsyncApiTest {
assert !AwaitableAdapterRegistry.unregister(adapter) // already removed
}
+ // ================================================================
+ // Awaitable.from() and AsyncStream.from()
+ // ================================================================
+
+ @Test
+ void testAwaitableFromCompletableFuture() {
+ def cf = CompletableFuture.completedFuture("hello")
+ Awaitable<String> aw = Awaitable.from(cf)
+ assert aw.get() == "hello"
+ }
+
+ @Test
+ void testAwaitableFromCompletionStage() {
+ CompletionStage<String> stage =
CompletableFuture.completedFuture("stage-value")
+ Awaitable<String> aw = Awaitable.from(stage)
+ assert aw.get() == "stage-value"
+ }
+
+ @Test
+ void testAwaitableFromFuture() {
+ def ft = new FutureTask<>({ -> "future-result" } as Callable)
+ ft.run()
+ Awaitable<String> aw = Awaitable.from(ft)
+ assert aw.get() == "future-result"
+ }
+
+ @Test
+ void testAwaitableFromPassthrough() {
+ Awaitable<String> original = Awaitable.of("original")
+ assert Awaitable.from(original).is(original)
+ }
+
+ @Test
+ void testAwaitableFromNull() {
+ def ex = shouldFail(IllegalArgumentException) {
+ Awaitable.from(null)
+ }
+ assert ex.message.contains('null')
+ }
+
+ @Test
+ void testAwaitableFromUnsupportedType() {
+ def ex = shouldFail(IllegalArgumentException) {
+ Awaitable.from(new Object())
+ }
+ assert ex.message.contains('No AwaitableAdapter found')
+ }
+
+ @Test
+ void testAwaitableFromFailedFuture() {
+ def cf = CompletableFuture.<String>failedFuture(new
IOException("broken"))
+ Awaitable<String> aw = Awaitable.from(cf)
+ assert aw.isCompletedExceptionally()
+ try {
+ aw.get()
+ assert false
+ } catch (ExecutionException e) {
+ assert e.cause instanceof IOException
+ assert e.cause.message == "broken"
+ }
+ }
+
+ @Test
+ void testAsyncStreamFromIterable() {
+ AsyncStream<String> stream = AsyncStream.from(["x", "y", "z"])
+ def items = []
+ while (stream.moveNext().get()) {
+ items << stream.current
+ }
+ assert items == ["x", "y", "z"]
+ }
+
+ @Test
+ void testAsyncStreamFromIterator() {
+ def iter = ["a", "b"].iterator()
+ AsyncStream<String> stream = AsyncStream.from(iter)
+ assert stream.moveNext().get() == true
+ assert stream.current == "a"
+ assert stream.moveNext().get() == true
+ assert stream.current == "b"
+ assert stream.moveNext().get() == false
+ }
+
+ @Test
+ void testAsyncStreamFromPassthrough() {
+ AsyncStream<String> original = AsyncStream.empty()
+ assert AsyncStream.from(original).is(original)
+ }
+
+ @Test
+ void testAsyncStreamFromNull() {
+ def ex = shouldFail(IllegalArgumentException) {
+ AsyncStream.from(null)
+ }
+ assert ex.message.contains('null')
+ }
+
+ @Test
+ void testAsyncStreamFromUnsupportedType() {
+ def ex = shouldFail(IllegalArgumentException) {
+ AsyncStream.from(42)
+ }
+ assert ex.message.contains('No AsyncStream adapter')
+ }
+
+ @Test
+ void testAsyncStreamFromEmptyIterable() {
+ AsyncStream<String> stream = AsyncStream.from([])
+ assert stream.moveNext().get() == false
+ }
+
+ @Test
+ void testAsyncStreamFromFlowPublisher() {
+ def pub = new SubmissionPublisher<String>()
+ Thread.start {
+ Thread.sleep(50)
+ pub.submit("item1")
+ pub.submit("item2")
+ pub.close()
+ }
+ AsyncStream<String> stream = AsyncStream.from(pub)
+ def items = []
+ while (stream.moveNext().get(2, TimeUnit.SECONDS)) {
+ items << stream.current
+ }
+ stream.close()
+ assert items == ["item1", "item2"]
+ }
+
+ @Test
+ void testAwaitableFromFlowPublisher() {
+ def pub = new SubmissionPublisher<String>()
+ Awaitable<String> aw = Awaitable.from(pub)
+ pub.submit("first")
+ pub.close()
+ assert aw.get(2, TimeUnit.SECONDS) == "first"
+ }
+
// ================================================================
// AwaitableAdapter default methods
// ================================================================
@@ -1288,7 +1426,7 @@ class AsyncApiTest {
@Test
void testFlowPublisherToAwaitableOnNext() {
def pub = new SubmissionPublisher<String>()
- Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(pub)
+ Awaitable<String> aw = Awaitable.from(pub)
pub.submit('hello')
pub.close()
assert aw.get() == 'hello'
@@ -1305,7 +1443,7 @@ class AsyncApiTest {
s.onError(new IOException('pub-error'))
}
}
- Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(pub)
+ Awaitable<String> aw = Awaitable.from(pub)
try {
aw.get()
assert false
@@ -1326,14 +1464,14 @@ class AsyncApiTest {
s.onComplete()
}
}
- Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(pub)
+ Awaitable<String> aw = Awaitable.from(pub)
assert aw.get() == null
}
@Test
void testFlowPublisherToAsyncStreamDirect() {
def pub = new SubmissionPublisher<String>()
- AsyncStream<String> stream =
AwaitableAdapterRegistry.toAsyncStream(pub)
+ AsyncStream<String> stream = AsyncStream.from(pub)
Thread.start {
Thread.sleep(50)
pub.submit('a')
@@ -1362,7 +1500,7 @@ class AsyncApiTest {
})
}
}
- AsyncStream<String> stream =
AwaitableAdapterRegistry.toAsyncStream(pub)
+ AsyncStream<String> stream = AsyncStream.from(pub)
assert stream.moveNext().get() == true
assert stream.getCurrent() == 'item1'
try {
@@ -1376,7 +1514,7 @@ class AsyncApiTest {
@Test
void testFlowPublisherOnCompleteNoItems() {
def pub = new SubmissionPublisher<String>()
- Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(pub)
+ Awaitable<String> aw = Awaitable.from(pub)
// Close immediately — triggers onComplete with no onNext
pub.close()
assert aw.get() == null
@@ -1385,7 +1523,7 @@ class AsyncApiTest {
@Test
void testFlowPublisherOnError() {
def pub = new SubmissionPublisher<String>()
- Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(pub)
+ Awaitable<String> aw = Awaitable.from(pub)
Thread.start {
Thread.sleep(50)
pub.closeExceptionally(new IOException('pub-err'))
@@ -1402,7 +1540,7 @@ class AsyncApiTest {
@Test
void testFlowPublisherAsyncStreamMoveNextInterrupted() {
def pub = new SubmissionPublisher<String>()
- AsyncStream<String> stream =
AwaitableAdapterRegistry.toAsyncStream(pub)
+ AsyncStream<String> stream = AsyncStream.from(pub)
// Interrupt before moveNext to trigger InterruptedException in
queue.take()
Thread.currentThread().interrupt()
try {
@@ -1424,7 +1562,7 @@ class AsyncApiTest {
@Test
void testFlowPublisherAsyncStreamOnError() {
def pub = new SubmissionPublisher<String>()
- AsyncStream<String> stream =
AwaitableAdapterRegistry.toAsyncStream(pub)
+ AsyncStream<String> stream = AsyncStream.from(pub)
Thread.start {
Thread.sleep(50)
pub.closeExceptionally(new IOException('stream-err'))
@@ -1440,7 +1578,7 @@ class AsyncApiTest {
@Test
void testFlowPublisherAsyncStreamOnComplete() {
def pub = new SubmissionPublisher<String>()
- AsyncStream<String> stream =
AwaitableAdapterRegistry.toAsyncStream(pub)
+ AsyncStream<String> stream = AsyncStream.from(pub)
Thread.start {
Thread.sleep(50)
pub.submit('item1')
@@ -1460,14 +1598,14 @@ class AsyncApiTest {
void testFutureAdapterAlreadyDone() {
def ft = new FutureTask<String>({ 'already-done' } as Callable<String>)
ft.run()
- Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft)
+ Awaitable<String> aw = Awaitable.from(ft)
assert aw.get() == 'already-done'
}
@Test
void testFutureAdapterNotYetDone() {
def ft = new FutureTask<String>({ 'async-done' } as Callable<String>)
- Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft)
+ Awaitable<String> aw = Awaitable.from(ft)
Thread.start { Thread.sleep(50); ft.run() }
assert aw.get() == 'async-done'
}
@@ -1476,7 +1614,7 @@ class AsyncApiTest {
void testFutureAdapterWithException() {
def ft = new FutureTask<String>({ throw new
ArithmeticException('div-zero') } as Callable<String>)
ft.run()
- Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft)
+ Awaitable<String> aw = Awaitable.from(ft)
try {
aw.get()
assert false
@@ -1491,7 +1629,7 @@ class AsyncApiTest {
def ft = new FutureTask<String>({ 'with-executor' } as
Callable<String>)
AwaitableAdapterRegistry.setBlockingExecutor(AsyncSupport.getExecutor())
try {
- Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft)
+ Awaitable<String> aw = Awaitable.from(ft)
Thread.start { Thread.sleep(50); ft.run() }
assert aw.get() == 'with-executor'
} finally {
@@ -1503,7 +1641,7 @@ class AsyncApiTest {
void testFutureAdapterCompleteFromException() {
def ft = new FutureTask<String>({ throw new IOException('ft-err') } as
Callable<String>)
ft.run()
- Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft)
+ Awaitable<String> aw = Awaitable.from(ft)
try {
aw.get()
assert false
@@ -1516,7 +1654,7 @@ class AsyncApiTest {
@Test
void testFutureAdapterNotDone() {
def ft = new FutureTask<String>({ 'delayed' } as Callable<String>)
- Awaitable<String> aw = AwaitableAdapterRegistry.toAwaitable(ft)
+ Awaitable<String> aw = Awaitable.from(ft)
// Run the FutureTask after adapter wraps it
Thread.start {
Thread.sleep(50)
@@ -2034,4 +2172,250 @@ class AsyncApiTest {
latch.await(5, TimeUnit.SECONDS)
assert interrupted.get() : "Producer should be interrupted when
attached to a closed stream"
}
+
+ // ================================================================
+ // AwaitResult: additional edge cases
+ // ================================================================
+
+ @Test
+ void testAwaitResultSuccessWithNull() {
+ def r = AwaitResult.success(null)
+ assert r.isSuccess()
+ assert r.value == null
+ assert r.getOrElse({ 'fallback' }) == null
+ assert r.toString() == 'AwaitResult.Success[null]'
+ }
+
+ @Test
+ void testAwaitResultFailureNullThrows() {
+ shouldFail(NullPointerException) {
+ AwaitResult.failure(null)
+ }
+ }
+
+ @Test
+ void testAwaitResultToStringFormats() {
+ assert AwaitResult.success(42).toString() == 'AwaitResult.Success[42]'
+ assert AwaitResult.failure(new
RuntimeException('oops')).toString().contains('Failure')
+ assert AwaitResult.failure(new
RuntimeException('oops')).toString().contains('oops')
+ }
+
+ // ================================================================
+ // Awaitable: thenAccept default method
+ // ================================================================
+
+ @Test
+ void testAwaitableThenAcceptSuccess() {
+ def captured = []
+ def aw = Awaitable.of('hello').thenAccept { captured << it }
+ aw.get()
+ assert captured == ['hello']
+ }
+
+ @Test
+ void testAwaitableThenAcceptReturnsVoid() {
+ def aw = Awaitable.of('hello').thenAccept {}
+ assert aw.get() == null
+ }
+
+ // ================================================================
+ // Awaitable: delay with TimeUnit
+ // ================================================================
+
+ @Test
+ void testAwaitableDelayWithTimeUnit() {
+ def start = System.currentTimeMillis()
+ def aw = Awaitable.delay(50, TimeUnit.MILLISECONDS)
+ aw.get()
+ assert System.currentTimeMillis() - start >= 40
+ }
+
+ // ================================================================
+ // Awaitable: static timeout combinators
+ // ================================================================
+
+ @Test
+ void testAwaitableOrTimeoutMillisStaticSuccess() {
+ def cf = CompletableFuture.completedFuture('fast')
+ def aw = Awaitable.orTimeoutMillis(cf, 5000)
+ assert aw.get() == 'fast'
+ }
+
+ @Test
+ void testAwaitableOrTimeoutStaticExpires() {
+ def cf = new CompletableFuture<String>()
+ def aw = Awaitable.orTimeout(cf, 50, TimeUnit.MILLISECONDS)
+ def ex = shouldFail(ExecutionException) {
+ aw.get()
+ }
+ assert ex.cause instanceof java.util.concurrent.TimeoutException
+ }
+
+ @Test
+ void testAwaitableCompleteOnTimeoutMillisStaticSuccess() {
+ def cf = CompletableFuture.completedFuture('fast')
+ def aw = Awaitable.completeOnTimeoutMillis(cf, 'fallback', 5000)
+ assert aw.get() == 'fast'
+ }
+
+ @Test
+ void testAwaitableCompleteOnTimeoutStaticExpires() {
+ def cf = new CompletableFuture<String>()
+ def aw = Awaitable.completeOnTimeout(cf, 'default', 50,
TimeUnit.MILLISECONDS)
+ assert aw.get() == 'default'
+ }
+
+ // ================================================================
+ // Awaitable: executor configuration
+ // ================================================================
+
+ @Test
+ void testAwaitableGetSetExecutor() {
+ def original = Awaitable.getExecutor()
+ assert original != null
+ try {
+ def custom =
java.util.concurrent.Executors.newSingleThreadExecutor()
+ Awaitable.setExecutor(custom)
+ assert Awaitable.getExecutor().is(custom)
+ custom.shutdown()
+ } finally {
+ Awaitable.setExecutor(null) // reset to default
+ }
+ assert Awaitable.getExecutor() != null
+ }
+
+ @Test
+ void testAwaitableIsVirtualThreadsAvailable() {
+ // Just verify it returns a boolean and doesn't throw
+ def result = Awaitable.isVirtualThreadsAvailable()
+ assert result instanceof Boolean
+ }
+
+ // ================================================================
+ // AsyncStream.empty()
+ // ================================================================
+
+ @Test
+ void testAsyncStreamEmpty() {
+ def stream = AsyncStream.empty()
+ assert stream.moveNext().get() == false
+ assert stream.current == null
+ }
+
+ @Test
+ void testAsyncStreamEmptySingleton() {
+ assert AsyncStream.empty().is(AsyncStream.empty())
+ }
+
+ @Test
+ void testAsyncStreamCloseDefaultNoOp() {
+ def stream = AsyncStream.empty()
+ stream.close() // Should not throw
+ }
+
+ // ================================================================
+ // Awaitable.from() with custom adapter
+ // ================================================================
+
+ @Test
+ void testAwaitableFromWithCustomAdapter() {
+ def adapter = new AwaitableAdapter() {
+ boolean supportsAwaitable(Class<?> type) { type == StringBuilder }
+ def <T> Awaitable<T> toAwaitable(Object source) {
+ Awaitable.of((T) source.toString())
+ }
+ }
+ def handle = AwaitableAdapterRegistry.register(adapter)
+ try {
+ Awaitable<String> aw = Awaitable.from(new StringBuilder("custom"))
+ assert aw.get() == "custom"
+ } finally {
+ handle.close()
+ }
+ }
+
+ @Test
+ void testAsyncStreamFromWithCustomAdapter() {
+ def adapter = new AwaitableAdapter() {
+ boolean supportsAwaitable(Class<?> type) { false }
+ def <T> Awaitable<T> toAwaitable(Object source) { null }
+ boolean supportsAsyncStream(Class<?> type) { type == StringBuilder
}
+ def <T> AsyncStream<T> toAsyncStream(Object source) {
+ AsyncStream.from(source.toString().toList())
+ }
+ }
+ def handle = AwaitableAdapterRegistry.register(adapter)
+ try {
+ AsyncStream<String> stream = AsyncStream.from(new
StringBuilder("ab"))
+ def items = []
+ while (stream.moveNext().get()) {
+ items << stream.current
+ }
+ assert items == ["a", "b"]
+ } finally {
+ handle.close()
+ }
+ }
+
+ // ================================================================
+ // Awaitable.all/any/allSettled: edge cases
+ // ================================================================
+
+ @Test
+ void testAwaitableAllEmpty() {
+ def aw = Awaitable.all()
+ assert aw.get() == []
+ }
+
+ @Test
+ void testAwaitableAllSingleItem() {
+ def aw = Awaitable.all(Awaitable.of(42))
+ assert aw.get() == [42]
+ }
+
+ @Test
+ void testAwaitableAllSettledAllSuccess() {
+ def results = Awaitable.allSettled(Awaitable.of('a'),
Awaitable.of('b')).get()
+ assert results.every { it.isSuccess() }
+ assert results*.value == ['a', 'b']
+ }
+
+ @Test
+ void testAwaitableAllSettledAllFailure() {
+ def results = Awaitable.allSettled(
+ Awaitable.failed(new IOException('e1')),
+ Awaitable.failed(new RuntimeException('e2'))
+ ).get()
+ assert results.every { it.isFailure() }
+ assert results[0].error instanceof IOException
+ assert results[1].error instanceof RuntimeException
+ }
+
+ // ================================================================
+ // GroovyPromise: exceptionally unwraps CompletionException
+ // ================================================================
+
+ @Test
+ void testGroovyPromiseExceptionallyUnwrapsCompletionException() {
+ def cf = new CompletableFuture<String>()
+ cf.completeExceptionally(new CompletionException(new
IOException('inner')))
+ def promise = GroovyPromise.of(cf)
+ def recovered = promise.exceptionally { t ->
+ assert t instanceof IOException
+ "recovered: ${t.message}"
+ }
+ assert recovered.get() == 'recovered: inner'
+ }
+
+ // ================================================================
+ // Awaitable.from() with CompletionStage not backed by CompletableFuture
+ // ================================================================
+
+ @Test
+ void testAwaitableFromCompletionStageMinimal() {
+ CompletionStage<String> stage = CompletableFuture.supplyAsync {
"async-value" }
+ .thenApply { it.toUpperCase() }
+ Awaitable<String> aw = Awaitable.from(stage)
+ assert aw.get(2, TimeUnit.SECONDS) == "ASYNC-VALUE"
+ }
}
diff --git
a/src/test/groovy/org/codehaus/groovy/transform/AsyncFrameworkIntegrationTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncFrameworkIntegrationTest.groovy
index 10b73de428..9a6bd1298a 100644
---
a/src/test/groovy/org/codehaus/groovy/transform/AsyncFrameworkIntegrationTest.groovy
+++
b/src/test/groovy/org/codehaus/groovy/transform/AsyncFrameworkIntegrationTest.groovy
@@ -224,7 +224,7 @@ class SpringWebFluxStyleController {
void testReactorMonoToAwaitableApi() {
assertScript REACTOR_PREAMBLE + '''
def mono = Mono.just(42)
- Awaitable<Integer> awaitable =
AwaitableAdapterRegistry.toAwaitable(mono)
+ Awaitable<Integer> awaitable = Awaitable.from(mono)
assert awaitable.get() == 42
assert awaitable.isDone()
'''
@@ -234,7 +234,7 @@ class SpringWebFluxStyleController {
void testReactorMonoAwaitableThen() {
assertScript REACTOR_PREAMBLE + '''
def mono = Mono.just(5)
- Awaitable<Integer> a = AwaitableAdapterRegistry.toAwaitable(mono)
+ Awaitable<Integer> a = Awaitable.from(mono)
Awaitable<Integer> doubled = a.then { it * 2 }
assert doubled.get() == 10
'''
@@ -333,7 +333,7 @@ class SpringWebFluxStyleController {
void testRxJavaSingleToAwaitableApi() {
assertScript RXJAVA_PREAMBLE + '''
def single = Single.just("adapted")
- Awaitable<String> awaitable =
AwaitableAdapterRegistry.toAwaitable(single)
+ Awaitable<String> awaitable = Awaitable.from(single)
assert awaitable.get() == "adapted"
assert awaitable.isDone()
'''
@@ -418,7 +418,7 @@ class SpringWebFluxStyleController {
void testSpringStyleCompletionStageAdapter() {
assertScript REACTOR_PREAMBLE + '''
CompletionStage<String> stage = CompletableFuture.supplyAsync {
"stage-value" }
- Awaitable<String> awaitable =
AwaitableAdapterRegistry.toAwaitable(stage)
+ Awaitable<String> awaitable = Awaitable.from(stage)
assert awaitable.get() == "stage-value"
'''
}
@@ -444,7 +444,7 @@ class SpringWebFluxStyleController {
void testReactorToRxJavaInterop() {
assertScript REACTOR_PREAMBLE + '''
def mono = Mono.just("from-reactor")
- Awaitable<String> awaitable =
AwaitableAdapterRegistry.toAwaitable(mono)
+ Awaitable<String> awaitable = Awaitable.from(mono)
CompletableFuture<String> cf = awaitable.toCompletableFuture()
assert cf.get() == "from-reactor"
'''
@@ -454,7 +454,7 @@ class SpringWebFluxStyleController {
void testAwaitableExceptionallyWithReactor() {
assertScript REACTOR_PREAMBLE + '''
def mono = Mono.error(new RuntimeException("fail"))
- Awaitable<String> awaitable =
AwaitableAdapterRegistry.toAwaitable(mono)
+ Awaitable<String> awaitable = Awaitable.from(mono)
Awaitable<String> recovered = awaitable.exceptionally {
"recovered" }
assert recovered.get() == "recovered"
'''
@@ -464,7 +464,7 @@ class SpringWebFluxStyleController {
void testAwaitableThenComposeAcrossFrameworks() {
assertScript REACTOR_PREAMBLE + '''
def mono = Mono.just(5)
- Awaitable<Integer> a = AwaitableAdapterRegistry.toAwaitable(mono)
+ Awaitable<Integer> a = Awaitable.from(mono)
Awaitable<Integer> composed = a.thenCompose { val ->
GroovyPromise.of(CompletableFuture.supplyAsync { val * 10 })
}
@@ -689,7 +689,7 @@ class SpringWebFluxStyleController {
// Register and verify it works
def handle = AwaitableAdapterRegistry.register(adapter)
try {
- def result = AwaitableAdapterRegistry.toAwaitable(new
CustomResult(data: "hello"))
+ def result = Awaitable.from(new CustomResult(data: "hello"))
assert await(result) == "hello"
} finally {
handle.close()
@@ -697,7 +697,7 @@ class SpringWebFluxStyleController {
// After close, the adapter should be removed
try {
- AwaitableAdapterRegistry.toAwaitable(new CustomResult(data:
"fail"))
+ Awaitable.from(new CustomResult(data: "fail"))
assert false : "Should throw"
} catch (IllegalArgumentException e) {
// expected
@@ -740,7 +740,7 @@ class SpringWebFluxStyleController {
def future = new FutureTask<String>({ "from-blocking-future" })
pool.submit(future)
- def aw = AwaitableAdapterRegistry.toAwaitable(future)
+ def aw = Awaitable.from(future)
assert await(aw) == "from-blocking-future"
} finally {
AwaitableAdapterRegistry.setBlockingExecutor(null)
diff --git
a/src/test/groovy/org/codehaus/groovy/transform/AsyncTransformTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncTransformTest.groovy
index 58f645672f..e3ab3a87d6 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncTransformTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncTransformTest.groovy
@@ -1176,11 +1176,10 @@ class AsyncTransformTest {
void testAdapterRegistryWithCompletableFuture() {
assertScript '''
import groovy.concurrent.Awaitable
- import groovy.concurrent.AwaitableAdapterRegistry
import java.util.concurrent.CompletableFuture
def cf = CompletableFuture.completedFuture("adapted")
- Awaitable<String> a = AwaitableAdapterRegistry.toAwaitable(cf)
+ Awaitable<String> a = Awaitable.from(cf)
assert a.get() == "adapted"
'''
}
@@ -1189,10 +1188,9 @@ class AsyncTransformTest {
void testAdapterRegistryPassthroughAwaitable() {
assertScript '''
import groovy.concurrent.Awaitable
- import groovy.concurrent.AwaitableAdapterRegistry
def original = Awaitable.of(42)
- def adapted = AwaitableAdapterRegistry.toAwaitable(original)
+ def adapted = Awaitable.from(original)
assert adapted.is(original)
'''
}
@@ -1221,7 +1219,7 @@ class AsyncTransformTest {
})
def custom = new CustomPromise("custom-value")
- Awaitable<String> a = AwaitableAdapterRegistry.toAwaitable(custom)
+ Awaitable<String> a = Awaitable.from(custom)
assert a.get() == "custom-value"
'''
}
@@ -1448,7 +1446,7 @@ class AsyncTransformTest {
assert !AwaitableAdapterRegistry.unregister(adapter) // already
removed
try {
- AwaitableAdapterRegistry.toAwaitable(new FakePromise(val: "x"))
+ Awaitable.from(new FakePromise(val: "x"))
assert false : "should have thrown"
} catch (IllegalArgumentException expected) {
assert expected.message.contains("No AwaitableAdapter")
@@ -1472,7 +1470,7 @@ class AsyncTransformTest {
handle.close() // unregister via AutoCloseable
try {
- AwaitableAdapterRegistry.toAwaitable(new Token(id: 1))
+ Awaitable.from(new Token(id: 1))
assert false
} catch (IllegalArgumentException expected) { }
'''
@@ -1508,7 +1506,7 @@ class AsyncTransformTest {
// Test with Iterator directly
def iter = [10, 20, 30].iterator()
- AsyncStream stream = AwaitableAdapterRegistry.toAsyncStream(iter)
+ AsyncStream stream = AsyncStream.from(iter)
def results = []
while (await(stream.moveNext())) {
results << stream.getCurrent()
@@ -1523,7 +1521,7 @@ class AsyncTransformTest {
import groovy.concurrent.*
try {
- AwaitableAdapterRegistry.toAsyncStream("a string")
+ AsyncStream.from("a string")
assert false : "should throw"
} catch (IllegalArgumentException expected) {
assert expected.message.contains("No AsyncStream adapter")
@@ -1537,7 +1535,7 @@ class AsyncTransformTest {
import groovy.concurrent.*
try {
- AwaitableAdapterRegistry.toAwaitable("plain string")
+ Awaitable.from("plain string")
assert false : "should throw"
} catch (IllegalArgumentException expected) {
assert expected.message.contains("No AwaitableAdapter")