This is an automated email from the ASF dual-hosted git repository.
sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git
The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
new 7dde6e5d71 Minor tweaks
7dde6e5d71 is described below
commit 7dde6e5d711aceb66296e38feb29207644426a46
Author: Daniel Sun <[email protected]>
AuthorDate: Wed Mar 11 00:56:06 2026 +0900
Minor tweaks
---
src/main/java/groovy/concurrent/AsyncStream.java | 8 +-
src/main/java/groovy/concurrent/AwaitResult.java | 9 ++
src/main/java/groovy/concurrent/Awaitable.java | 46 +++++++-
.../java/groovy/concurrent/AwaitableAdapter.java | 12 ++-
.../groovy/runtime/async/AsyncStreamGenerator.java | 29 +++--
src/spec/doc/core-async-await.adoc | 21 ++--
.../codehaus/groovy/transform/AsyncApiTest.groovy | 117 ++++++++++++++++++++-
.../groovy/transform/AsyncPatternsTest.groovy | 2 +-
8 files changed, 225 insertions(+), 19 deletions(-)
diff --git a/src/main/java/groovy/concurrent/AsyncStream.java
b/src/main/java/groovy/concurrent/AsyncStream.java
index f9c0ca89c9..26c7abe316 100644
--- a/src/main/java/groovy/concurrent/AsyncStream.java
+++ b/src/main/java/groovy/concurrent/AsyncStream.java
@@ -83,7 +83,13 @@ public interface AsyncStream<T> extends AutoCloseable {
return (AsyncStream<T>) EMPTY;
}
- /** Singleton empty stream instance. */
+ /**
+ * Singleton empty stream instance.
+ * <p>
+ * This is an implementation detail backing {@link #empty()}.
+ * User code should call {@code AsyncStream.empty()} rather than
+ * referencing this field directly.
+ */
AsyncStream<Object> EMPTY = new AsyncStream<>() {
@Override public Awaitable<Boolean> moveNext() { return
Awaitable.of(false); }
@Override public Object getCurrent() { return null; }
diff --git a/src/main/java/groovy/concurrent/AwaitResult.java
b/src/main/java/groovy/concurrent/AwaitResult.java
index 31aedbee80..ea89d97dff 100644
--- a/src/main/java/groovy/concurrent/AwaitResult.java
+++ b/src/main/java/groovy/concurrent/AwaitResult.java
@@ -47,6 +47,10 @@ public final class AwaitResult<T> {
/**
* Creates a successful result with the given value.
+ *
+ * @param value the computation result (may be {@code null})
+ * @param <T> the value type
+ * @return a success result wrapping the value
*/
@SuppressWarnings("unchecked")
public static <T> AwaitResult<T> success(Object value) {
@@ -55,6 +59,11 @@ public final class AwaitResult<T> {
/**
* Creates a failure result with the given exception.
+ *
+ * @param error the exception that caused the failure; must not be {@code
null}
+ * @param <T> the value type (never actually used, since the result is a
failure)
+ * @return a failure result wrapping the exception
+ * @throws NullPointerException if {@code error} is {@code null}
*/
public static <T> AwaitResult<T> failure(Throwable error) {
return new AwaitResult<>(null, Objects.requireNonNull(error), false);
diff --git a/src/main/java/groovy/concurrent/Awaitable.java
b/src/main/java/groovy/concurrent/Awaitable.java
index 4640bebcef..b64d556d87 100644
--- a/src/main/java/groovy/concurrent/Awaitable.java
+++ b/src/main/java/groovy/concurrent/Awaitable.java
@@ -94,16 +94,30 @@ public interface Awaitable<T> {
/**
* Blocks until the computation completes and returns the result.
+ *
+ * @return the computed result
+ * @throws InterruptedException if the calling thread is interrupted while
waiting
+ * @throws ExecutionException if the computation completed exceptionally
*/
T get() throws InterruptedException, ExecutionException;
/**
* Blocks until the computation completes or the timeout expires.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @return the computed result
+ * @throws InterruptedException if the calling thread is interrupted while
waiting
+ * @throws ExecutionException if the computation completed exceptionally
+ * @throws TimeoutException if the wait timed out
*/
T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException;
/**
- * Returns {@code true} if the computation has completed (normally or
exceptionally).
+ * Returns {@code true} if the computation has completed (normally,
+ * exceptionally, or via cancellation).
+ *
+ * @return {@code true} if complete
*/
boolean isDone();
@@ -118,24 +132,37 @@ public interface Awaitable<T> {
/**
* Returns {@code true} if the computation was cancelled before completing
normally.
+ *
+ * @return {@code true} if cancelled
*/
boolean isCancelled();
/**
* Returns {@code true} if this computation completed exceptionally
* (including cancellation).
+ *
+ * @return {@code true} if completed with an error or cancellation
*/
boolean isCompletedExceptionally();
/**
* Returns a new {@code Awaitable} whose result is obtained by applying the
* given function to this awaitable's result when it completes.
+ *
+ * @param fn the mapping function
+ * @param <U> the type of the mapped result
+ * @return a new awaitable holding the mapped result
*/
<U> Awaitable<U> then(Function<? super T, ? extends U> fn);
/**
- * Returns a new {@code Awaitable} produced by applying the given function
- * to this awaitable's result, flattening the nested {@code Awaitable}.
+ * Returns a new {@code Awaitable} produced by applying the given async
+ * function to this awaitable's result, flattening the nested {@code
Awaitable}.
+ * This is the monadic {@code flatMap} operation for awaitables.
+ *
+ * @param fn the async mapping function that returns an {@code Awaitable}
+ * @param <U> the type of the inner awaitable's result
+ * @return a new awaitable holding the inner result
*/
<U> Awaitable<U> thenCompose(Function<? super T, ? extends Awaitable<U>>
fn);
@@ -158,6 +185,11 @@ public interface Awaitable<T> {
/**
* Returns a new {@code Awaitable} that, if this one completes
exceptionally,
* applies the given function to the exception to produce a recovery value.
+ * The throwable passed to the function is deeply unwrapped to strip JDK
+ * wrapper layers.
+ *
+ * @param fn the recovery function
+ * @return a new awaitable that recovers from failures
*/
Awaitable<T> exceptionally(Function<Throwable, ? extends T> fn);
@@ -260,6 +292,8 @@ public interface Awaitable<T> {
/**
* Converts this {@code Awaitable} to a JDK {@link CompletableFuture}
* for interoperability with APIs that require it.
+ *
+ * @return a {@code CompletableFuture} representing this computation
*/
CompletableFuture<T> toCompletableFuture();
@@ -267,6 +301,12 @@ public interface Awaitable<T> {
/**
* Returns an already-completed {@code Awaitable} with the given value.
+ * Analogous to C#'s {@code Task.FromResult()} or JavaScript's
+ * {@code Promise.resolve()}.
+ *
+ * @param value the result value (may be {@code null})
+ * @param <T> the result type
+ * @return a completed awaitable
*/
static <T> Awaitable<T> of(T value) {
return new GroovyPromise<>(CompletableFuture.completedFuture(value));
diff --git a/src/main/java/groovy/concurrent/AwaitableAdapter.java
b/src/main/java/groovy/concurrent/AwaitableAdapter.java
index 961ddbf5a7..148440a595 100644
--- a/src/main/java/groovy/concurrent/AwaitableAdapter.java
+++ b/src/main/java/groovy/concurrent/AwaitableAdapter.java
@@ -60,7 +60,12 @@ public interface AwaitableAdapter {
/**
* Returns {@code true} if this adapter can convert instances of the given
- * type to {@link AsyncStream}.
+ * type to {@link AsyncStream}. Defaults to {@code false}; override if
+ * this adapter supports multi-value stream conversion (e.g., for Reactor
+ * {@code Flux} or RxJava {@code Observable}).
+ *
+ * @param type the source class to check
+ * @return {@code true} if this adapter can produce an {@code AsyncStream}
*/
default boolean supportsAsyncStream(Class<?> type) {
return false;
@@ -69,6 +74,11 @@ public interface AwaitableAdapter {
/**
* Converts the given source object to an {@link AsyncStream}.
* Called only when {@link #supportsAsyncStream} returned {@code true}.
+ *
+ * @param source the source object (e.g., a Reactor {@code Flux})
+ * @param <T> the element type
+ * @return an async stream backed by the source
+ * @throws UnsupportedOperationException if not overridden (default)
*/
default <T> AsyncStream<T> toAsyncStream(Object source) {
throw new UnsupportedOperationException("AsyncStream conversion not
supported by " + getClass().getName());
diff --git
a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
index 51396f4ef4..38c66e62a7 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
@@ -144,7 +144,13 @@ public class AsyncStreamGenerator<T> implements
AsyncStream<T> {
/**
* Signals that the generator has completed (no more elements).
- * If interrupted, the completion signal is delivered on a best-effort
basis.
+ * <p>
+ * If the blocking {@code queue.put()} is interrupted, a best-effort
+ * non-blocking {@code queue.offer()} is attempted. If that also fails
+ * (no consumer is currently blocked in {@code take()}), the stream is
+ * force-closed to prevent the consumer from blocking indefinitely on a
+ * subsequent {@link #moveNext()} call. This defensive close ensures no
+ * thread leak occurs even under unexpected interrupt timing.
*/
public void complete() {
if (closed.get()) {
@@ -155,15 +161,25 @@ public class AsyncStreamGenerator<T> implements
AsyncStream<T> {
} catch (InterruptedException e) {
if (!closed.get()) {
Thread.currentThread().interrupt();
- // Best-effort delivery: use non-blocking offer as fallback
- queue.offer(DONE);
+ // Best-effort: non-blocking handoff to a waiting consumer.
+ // If no consumer is waiting, offer() returns false and the
DONE
+ // signal is lost — force-close to unblock future moveNext()
calls.
+ if (!queue.offer(DONE)) {
+ close();
+ }
}
}
}
/**
* Signals that the generator failed with an exception.
- * If interrupted, the error signal is delivered on a best-effort basis.
+ * <p>
+ * If the blocking {@code queue.put()} is interrupted, a best-effort
+ * non-blocking {@code queue.offer()} is attempted. If that also fails,
+ * the stream is force-closed (same rationale as {@link #complete()}).
+ * The original error is not propagated to the consumer in this edge case;
+ * instead the consumer sees a clean stream closure — this is acceptable
+ * because the interrupt itself indicates an external cancellation.
*/
public void error(Throwable t) {
if (closed.get()) {
@@ -175,8 +191,9 @@ public class AsyncStreamGenerator<T> implements
AsyncStream<T> {
} catch (InterruptedException e) {
if (!closed.get()) {
Thread.currentThread().interrupt();
- // Best-effort delivery: use non-blocking offer as fallback
- queue.offer(item);
+ if (!queue.offer(item)) {
+ close();
+ }
}
}
}
diff --git a/src/spec/doc/core-async-await.adoc
b/src/spec/doc/core-async-await.adoc
index aa9e56bb80..a491c347bf 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -973,17 +973,26 @@ Several concurrency hazards are handled transparently:
allowing it to clean up promptly even if blocked in I/O or a long computation.
| **TOCTOU race prevention**
-| `moveNext()` uses a CAS-based double-check after taking from the queue: if
the generator was
-closed between the call to `moveNext()` and the queue handoff, the taken value
is discarded
-and `false` is returned, preventing stale values from leaking to the consumer.
+| `moveNext()` uses a double-check pattern: the `closed` flag is re-checked
_after_
+registering the consumer thread. This closes a race window where `close()`
could execute
+between the initial check and the `consumerThread.set()` call, which would
leave the
+consumer stranded in `queue.take()` with no one to interrupt it.
-| **Single-consumer enforcement**
-| A single consumer is enforced by CAS on an internal flag. Attempting to call
`moveNext()`
-from multiple threads concurrently throws `IllegalStateException`, catching
misuse early.
+| **Thread-safe consumer tracking**
+| The consumer thread is tracked via `AtomicReference<Thread>` during
`moveNext()`. This
+enables `close()` to interrupt a blocked consumer. Note: concurrent
`moveNext()` calls
+from multiple threads are not supported and may produce unpredictable results
— async
+generators are inherently single-consumer (just like C#'s `IAsyncEnumerator`).
| **Idempotent close**
| `close()` is guarded by `AtomicBoolean.compareAndSet()`, making it safe to
call multiple
times from any thread without side effects.
+
+| **Signal delivery under interrupt**
+| If the producer's `complete()` or `error()` signal is interrupted and the
non-blocking
+fallback delivery fails (no consumer waiting), the generator force-closes
itself. This
+prevents the consumer from blocking indefinitely on a subsequent `moveNext()`
— a defensive
+measure against unexpected thread interruption outside the normal close path.
|===
These mechanisms ensure that `yield return` / `for await` code remains as
simple as writing
diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
index 2ed5ec1a64..7df7d1a152 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
@@ -1906,7 +1906,7 @@ class AsyncApiTest {
@Test
void testGroovyPromiseIsCompletedExceptionally() {
- def cf = new java.util.concurrent.CompletableFuture<>()
+ def cf = new CompletableFuture<>()
def promise = GroovyPromise.of(cf)
assert !promise.isCompletedExceptionally()
cf.completeExceptionally(new RuntimeException())
@@ -1919,4 +1919,119 @@ class AsyncApiTest {
new GroovyPromise(null)
}
}
+
+ // ================================================================
+ // AsyncStreamGenerator: complete()/error() robustness under interrupt
+ // ================================================================
+
+ /**
+ * Verifies that when a producer's complete() signal is interrupted and
+ * the best-effort offer() fails (no consumer waiting), the generator
+ * force-closes so a subsequent moveNext() returns false instead of
+ * blocking indefinitely.
+ */
+ @Test
+ void testGeneratorCompleteForceClosesOnOfferFailure() {
+ def gen = new AsyncStreamGenerator<Integer>()
+ def producerThread = Thread.currentThread()
+ gen.attachProducer(producerThread)
+
+ // Interrupt the current thread so that queue.put(DONE) inside
+ // complete() throws InterruptedException. Since no consumer is
+ // blocked in take(), the non-blocking offer(DONE) will also fail,
+ // triggering the force-close path.
+ producerThread.interrupt()
+ gen.complete()
+
+ // Clear the interrupt flag set by the force-close path
+ Thread.interrupted()
+
+ gen.detachProducer(producerThread)
+
+ // Consumer should see a cleanly closed stream — not block forever
+ def result = gen.moveNext()
+ assert !AsyncSupport.await(result) : "moveNext() should return false
after force-close"
+ }
+
+ /**
+ * Verifies that when a producer's error() signal is interrupted and
+ * the best-effort offer() fails, the generator force-closes so the
+ * consumer does not hang.
+ */
+ @Test
+ void testGeneratorErrorForceClosesOnOfferFailure() {
+ def gen = new AsyncStreamGenerator<Integer>()
+ def producerThread = Thread.currentThread()
+ gen.attachProducer(producerThread)
+
+ producerThread.interrupt()
+ gen.error(new RuntimeException("test error"))
+
+ Thread.interrupted()
+ gen.detachProducer(producerThread)
+
+ def result = gen.moveNext()
+ assert !AsyncSupport.await(result) : "moveNext() should return false
after force-close"
+ }
+
+ /**
+ * Verifies that complete() is a no-op when the stream is already closed.
+ */
+ @Test
+ void testGeneratorCompleteAfterClose() {
+ def gen = new AsyncStreamGenerator<Integer>()
+ gen.close()
+ // Should not throw or block
+ gen.complete()
+ def result = gen.moveNext()
+ assert !AsyncSupport.await(result)
+ }
+
+ /**
+ * Verifies that error() is a no-op when the stream is already closed.
+ */
+ @Test
+ void testGeneratorErrorAfterClose() {
+ def gen = new AsyncStreamGenerator<Integer>()
+ gen.close()
+ // Should not throw or block
+ gen.error(new RuntimeException("ignored"))
+ def result = gen.moveNext()
+ assert !AsyncSupport.await(result)
+ }
+
+ /**
+ * Verifies that close() is idempotent — multiple calls do not throw or
+ * cause double-interrupt of threads.
+ */
+ @Test
+ void testGeneratorCloseIdempotent() {
+ def gen = new AsyncStreamGenerator<Integer>()
+ gen.close()
+ gen.close()
+ gen.close()
+ // All should be no-ops; moveNext should still return false
+ assert !AsyncSupport.await(gen.moveNext())
+ }
+
+ /**
+ * Verifies that attachProducer on an already-closed stream immediately
+ * interrupts the producer thread, allowing the generator body to exit.
+ */
+ @Test
+ void testAttachProducerOnClosedStreamInterrupts() {
+ def gen = new AsyncStreamGenerator<Integer>()
+ gen.close()
+
+ def interrupted = new AtomicBoolean(false)
+ def latch = new CountDownLatch(1)
+ def t = new Thread({
+ gen.attachProducer(Thread.currentThread())
+ interrupted.set(Thread.currentThread().isInterrupted())
+ latch.countDown()
+ })
+ t.start()
+ latch.await(5, TimeUnit.SECONDS)
+ assert interrupted.get() : "Producer should be interrupted when
attached to a closed stream"
+ }
}
diff --git
a/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy
index 76174a9bd5..5a0737f9cc 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncPatternsTest.groovy
@@ -696,7 +696,7 @@ class AsyncPatternsTest {
}
@Test
- void testCancellationPattern() {
+ void testCancellationPatternWithClosureExpression() {
assertScript '''
import java.util.concurrent.CancellationException
import groovy.concurrent.Awaitable