This is an automated email from the ASF dual-hosted git repository.
sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git
The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
new 7504008b1b Minor tweaks
7504008b1b is described below
commit 7504008b1b899dc3cb6be9399da1b7646c0396e8
Author: Daniel Sun <[email protected]>
AuthorDate: Thu Mar 19 00:31:31 2026 +0900
Minor tweaks
---
src/main/java/groovy/concurrent/AsyncStream.java | 16 +-
.../groovy/runtime/async/AbstractAsyncStream.java | 287 +++++++++++++++++
.../groovy/runtime/async/AsyncStreamGenerator.java | 209 ++++--------
.../groovy/runtime/async/FlowPublisherAdapter.java | 323 ++++++++-----------
src/spec/doc/core-async-await.adoc | 78 +++--
.../codehaus/groovy/transform/AsyncApiTest.groovy | 353 +++++++++++++++++++++
6 files changed, 905 insertions(+), 361 deletions(-)
diff --git a/src/main/java/groovy/concurrent/AsyncStream.java
b/src/main/java/groovy/concurrent/AsyncStream.java
index 4baa81115a..318ddf6355 100644
--- a/src/main/java/groovy/concurrent/AsyncStream.java
+++ b/src/main/java/groovy/concurrent/AsyncStream.java
@@ -113,6 +113,20 @@ public interface AsyncStream<T> extends AutoCloseable {
return (AsyncStream<T>) EMPTY;
}
+ /**
+ * Cached awaitable for {@code moveNext()} returning {@code true}.
+ * Eliminates per-call allocation on the hot path. Shared by all
+ * {@code AsyncStream} implementations (e.g. via
+ * {@link org.apache.groovy.runtime.async.AbstractAsyncStream
AbstractAsyncStream}).
+ */
+ Awaitable<Boolean> MOVE_NEXT_TRUE = Awaitable.of(Boolean.TRUE);
+
+ /**
+ * Cached awaitable for {@code moveNext()} returning {@code false}.
+ * Eliminates per-call allocation on the stream-end path.
+ */
+ Awaitable<Boolean> MOVE_NEXT_FALSE = Awaitable.of(Boolean.FALSE);
+
/**
* Singleton empty stream instance.
* <p>
@@ -121,7 +135,7 @@ public interface AsyncStream<T> extends AutoCloseable {
* referencing this field directly.
*/
AsyncStream<Object> EMPTY = new AsyncStream<>() {
- @Override public Awaitable<Boolean> moveNext() { return
Awaitable.of(false); }
+ @Override public Awaitable<Boolean> moveNext() { return
MOVE_NEXT_FALSE; }
@Override public Object getCurrent() { return null; }
};
}
diff --git
a/src/main/java/org/apache/groovy/runtime/async/AbstractAsyncStream.java
b/src/main/java/org/apache/groovy/runtime/async/AbstractAsyncStream.java
new file mode 100644
index 0000000000..2e70a12848
--- /dev/null
+++ b/src/main/java/org/apache/groovy/runtime/async/AbstractAsyncStream.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.groovy.runtime.async;
+
+import groovy.concurrent.AsyncStream;
+import groovy.concurrent.Awaitable;
+
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Template base class for queue-based {@link AsyncStream} implementations.
+ *
+ * <p>This class implements the
+ * <a href="https://en.wikipedia.org/wiki/Template_method_pattern">Template
Method</a>
+ * pattern, centralising the signal dispatch logic, lifecycle management, and
+ * interrupt handling that is common to all queue-based async streams.
+ * Concrete subclasses only need to supply a {@link BlockingQueue} and override
+ * a small number of hook methods to customise behaviour.</p>
+ *
+ * <h2>Signal protocol</h2>
+ * <p>Elements flowing through the queue are wrapped in one of three signal
types:</p>
+ * <ul>
+ * <li>{@link ValueSignal} — carries a data element (may wrap {@code
null})</li>
+ * <li>{@link ErrorSignal} — carries a {@link Throwable} to propagate</li>
+ * <li>{@link #COMPLETE} — singleton sentinel indicating normal
end-of-stream</li>
+ * </ul>
+ * <p>The template's {@link #moveNext()} dispatches on these signals with a
fixed
+ * sequence: value → set current + {@link #afterValueConsumed()} + return
{@code true};
+ * error → set closed + sneaky-throw; complete → set closed + return {@code
false}.</p>
+ *
+ * <h2>Hook methods (override points)</h2>
+ * <table>
+ * <caption>Hook methods and their defaults</caption>
+ * <tr><th>Hook</th><th>Default</th><th>Typical override</th></tr>
+ * <tr><td>{@link #beforeTake()}</td><td>return {@code MOVE_NEXT_FALSE} if
closed</td>
+ * <td>thread registration, double-check, or drain check</td></tr>
+ * <tr><td>{@link #afterValueConsumed()}</td><td>no-op</td>
+ * <td>request more items from upstream (back-pressure)</td></tr>
+ * <tr><td>{@link #afterMoveNext()}</td><td>no-op</td>
+ * <td>unregister consumer thread</td></tr>
+ * <tr><td>{@link #onMoveNextInterrupted(InterruptedException)}</td>
+ * <td>set closed, restore interrupt, throw {@link
CancellationException}</td>
+ * <td>return {@code MOVE_NEXT_FALSE} if already closed</td></tr>
+ * <tr><td>{@link #onClose()}</td><td><em>abstract</em></td>
+ * <td>interrupt threads, cancel subscriptions, drain queue</td></tr>
+ * </table>
+ *
+ * <h2>Thread safety</h2>
+ * <p>The {@link #closed} flag is an {@link AtomicBoolean} shared between the
+ * producer (subclass-managed) and consumer ({@code moveNext()}) sides.
+ * The {@link #close()} method uses CAS to guarantee exactly-once semantics.
+ * The {@link #current} field is {@code volatile} for safe cross-thread
visibility.</p>
+ *
+ * <p>This class is an internal implementation detail and should not be
referenced
+ * directly by user code.</p>
+ *
+ * @param <T> the element type
+ * @see AsyncStreamGenerator
+ * @see FlowPublisherAdapter
+ * @since 6.0.0
+ */
+public abstract class AbstractAsyncStream<T> implements AsyncStream<T> {
+
+ // ---- Unified signal types ----
+
+ /**
+ * Wraps a data element for transport through the signal queue.
+ * The wrapper is necessary because the queue element type is {@code
Object},
+ * and the actual value may be {@code null}.
+ */
+ protected record ValueSignal(Object value) { }
+
+ /**
+ * Wraps an error for transport through the signal queue.
+ * When dispatched by {@link #moveNext()}, the wrapped throwable is
+ * re-thrown via {@link AsyncSupport#sneakyThrow(Throwable)}.
+ */
+ protected record ErrorSignal(Throwable error) { }
+
+ /**
+ * Singleton sentinel indicating normal stream completion.
+ * Identity comparison ({@code ==}) is used in the dispatch logic.
+ */
+ protected static final Object COMPLETE = new Object();
+
+ // ---- Shared state ----
+
+ /** The signal queue bridging producer and consumer. */
+ protected final BlockingQueue<Object> queue;
+
+ /** Lifecycle flag: set exactly once when the stream is closed. */
+ protected final AtomicBoolean closed = new AtomicBoolean(false);
+
+ /** Most recently consumed value, set by {@link #moveNext()} on value
signals. */
+ private volatile T current;
+
+ /**
+ * @param queue the blocking queue used for producer→consumer signal
delivery;
+ * must not be {@code null}
+ */
+ protected AbstractAsyncStream(BlockingQueue<Object> queue) {
+ this.queue = Objects.requireNonNull(queue, "queue");
+ }
+
+ // ---- Template method: moveNext ----
+
+ /**
+ * Template method implementing the {@link AsyncStream} iteration protocol.
+ *
+ * <p>Execution sequence:</p>
+ * <ol>
+ * <li>{@link #beforeTake()} — may short-circuit with an early
return</li>
+ * <li>{@code queue.take()} — blocks until a signal is available</li>
+ * <li>Signal dispatch: value / error / complete</li>
+ * <li>{@link #afterMoveNext()} — always runs (finally block)</li>
+ * </ol>
+ *
+ * <p>If {@code queue.take()} throws {@link InterruptedException},
+ * {@link #onMoveNextInterrupted(InterruptedException)} handles it.</p>
+ *
+ * @return an {@code Awaitable<Boolean>} — {@code true} if a new element is
+ * available via {@link #getCurrent()}, {@code false} if the stream
+ * is exhausted or closed
+ */
+ @Override
+ @SuppressWarnings("unchecked")
+ public final Awaitable<Boolean> moveNext() {
+ Awaitable<Boolean> earlyReturn = beforeTake();
+ if (earlyReturn != null) {
+ return earlyReturn;
+ }
+ try {
+ Object signal = queue.take();
+
+ if (signal instanceof ValueSignal vs) {
+ current = (T) vs.value;
+ afterValueConsumed();
+ return MOVE_NEXT_TRUE;
+ }
+ if (signal instanceof ErrorSignal es) {
+ closed.set(true);
+ Throwable cause = es.error;
+ if (cause instanceof Error err) throw err;
+ throw AsyncSupport.sneakyThrow(cause);
+ }
+ // COMPLETE sentinel — end-of-stream
+ closed.set(true);
+ return MOVE_NEXT_FALSE;
+ } catch (InterruptedException e) {
+ return onMoveNextInterrupted(e);
+ } finally {
+ afterMoveNext();
+ }
+ }
+
+ // ---- Final implementations ----
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Returns the most recently consumed element, set by the last successful
+ * {@link #moveNext()} call.
+ *
+ * @return the current element, or {@code null} before the first successful
+ * {@code moveNext()} call
+ */
+ @Override
+ public final T getCurrent() {
+ return current;
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Atomically marks this stream as closed via CAS on the {@link #closed}
+ * flag, then delegates to {@link #onClose()} for subclass-specific
cleanup.
+ * Idempotent: only the first invocation triggers {@code onClose()}.
+ */
+ @Override
+ public final void close() {
+ if (!closed.compareAndSet(false, true)) {
+ return;
+ }
+ onClose();
+ }
+
+ // ---- Abstract methods ----
+
+ /**
+ * Subclass-specific cleanup, called exactly once from {@link #close()}.
+ * Typical actions include interrupting blocked threads, cancelling
+ * upstream subscriptions, and draining the queue.
+ */
+ protected abstract void onClose();
+
+ // ---- Hook methods with defaults ----
+
+ /**
+ * Pre-take hook invoked at the start of {@link #moveNext()}.
+ * <p>
+ * Return a non-{@code null} {@link Awaitable} to short-circuit
+ * {@code moveNext()} without blocking on the queue. Return {@code null}
+ * to proceed with the normal take-and-dispatch sequence.
+ * <p>
+ * The default implementation returns {@link #MOVE_NEXT_FALSE} when
+ * the stream is closed, and {@code null} otherwise.
+ *
+ * @return an early-return value, or {@code null} to continue
+ */
+ protected Awaitable<Boolean> beforeTake() {
+ return closed.get() ? MOVE_NEXT_FALSE : null;
+ }
+
+ /**
+ * Post-value hook invoked after a {@link ValueSignal} has been consumed
+ * and the {@link #current} field updated.
+ * <p>
+ * Subclasses may use this to signal demand to an upstream source
+ * (e.g. {@code Subscription.request(1)} for reactive streams).
+ * <p>
+ * The default implementation is a no-op.
+ */
+ protected void afterValueConsumed() { }
+
+ /**
+ * Finally hook invoked after every {@link #moveNext()} attempt,
+ * regardless of outcome (value, error, completion, or interrupt).
+ * <p>
+ * Subclasses may use this to unregister the consumer thread.
+ * <p>
+ * The default implementation is a no-op.
+ */
+ protected void afterMoveNext() { }
+
+ /**
+ * Interrupt handler invoked when {@code queue.take()} throws
+ * {@link InterruptedException} inside {@link #moveNext()}.
+ * <p>
+ * The default implementation sets {@link #closed} to {@code true},
+ * restores the interrupt flag, and throws a {@link CancellationException}.
+ * Subclasses may override to return {@link #MOVE_NEXT_FALSE} instead
+ * of throwing (e.g. when an external {@code close()} caused the
interrupt).
+ *
+ * @param e the interrupt exception
+ * @return an {@link Awaitable} to return from {@code moveNext()}, or
+ * the method may throw instead
+ */
+ protected Awaitable<Boolean> onMoveNextInterrupted(InterruptedException e)
{
+ closed.set(true);
+ Thread.currentThread().interrupt();
+ throw newCancellationException("Interrupted during moveNext", e);
+ }
+
+ // ---- Utilities for subclasses ----
+
+ /**
+ * Creates a {@link CancellationException} with the given message and
cause.
+ *
+ * @param message the detail message
+ * @param cause the interrupt that triggered the cancellation
+ * @return a new {@code CancellationException}
+ */
+ protected static CancellationException newCancellationException(String
message, InterruptedException cause) {
+ CancellationException ce = new CancellationException(message);
+ ce.initCause(cause);
+ return ce;
+ }
+}
diff --git
a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
index d5b5e36e0c..ca7928ea9d 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
@@ -18,22 +18,31 @@
*/
package org.apache.groovy.runtime.async;
-import groovy.concurrent.AsyncStream;
import groovy.concurrent.Awaitable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
- * A producer/consumer implementation of {@link AsyncStream} used by
- * {@code async} methods that contain {@code yield return} statements.
+ * A producer/consumer implementation of {@link groovy.concurrent.AsyncStream
AsyncStream}
+ * backed by a {@link SynchronousQueue}, used by {@code async} methods that
+ * contain {@code yield return} statements.
* <p>
- * The producer (method body) runs on a separate thread and calls
- * {@link #yield(Object)} for each emitted element. The consumer
- * calls {@link #moveNext()}/{@link #getCurrent()} — typically via
- * a {@code for await} loop.
+ * Extends {@link AbstractAsyncStream} which provides the common
+ * {@link #moveNext()}/{@link #getCurrent()}/{@link #close()} template.
+ * This class adds the producer-side API ({@link #yield}, {@link #complete},
+ * {@link #error}) and overrides the following hooks:
+ *
+ * <ul>
+ * <li>{@link #beforeTake()} — enforces single-consumer semantics via
+ * {@link #consumerThread} CAS + double-check of the {@link #closed}
flag</li>
+ * <li>{@link #afterMoveNext()} — unregisters the consumer thread</li>
+ * <li>{@link #onMoveNextInterrupted(InterruptedException)} — returns
+ * {@code MOVE_NEXT_FALSE} if the stream was already closed (cooperative
+ * cancellation), otherwise throws {@link CancellationException}</li>
+ * <li>{@link #onClose()} — interrupts both producer and consumer
threads</li>
+ * </ul>
*
* <h2>Back-pressure</h2>
* Uses a {@link SynchronousQueue} to provide natural back-pressure:
@@ -83,22 +92,14 @@ import java.util.concurrent.atomic.AtomicReference;
* @param <T> the element type
* @since 6.0.0
*/
-public class AsyncStreamGenerator<T> implements AsyncStream<T> {
-
- private static final Object DONE = new Object();
-
- /**
- * Cached awaitables for the two common {@code moveNext()} outcomes.
- * Eliminates per-call object allocation on the hot path.
- */
- private static final Awaitable<Boolean> MOVE_NEXT_TRUE =
Awaitable.of(Boolean.TRUE);
- private static final Awaitable<Boolean> MOVE_NEXT_FALSE =
Awaitable.of(Boolean.FALSE);
+public class AsyncStreamGenerator<T> extends AbstractAsyncStream<T> {
- private final SynchronousQueue<Object> queue = new SynchronousQueue<>();
- private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicReference<Thread> producerThread = new
AtomicReference<>();
private final AtomicReference<Thread> consumerThread = new
AtomicReference<>();
- private volatile T current;
+
+ public AsyncStreamGenerator() {
+ super(new SynchronousQueue<>());
+ }
/**
* Registers the given thread as the producer for this stream.
@@ -139,13 +140,13 @@ public class AsyncStreamGenerator<T> implements
AsyncStream<T> {
throw streamClosed(null);
}
try {
- queue.put(new Item(value));
+ queue.put(new ValueSignal(value));
} catch (InterruptedException e) {
if (closed.get()) {
throw streamClosed(e);
}
Thread.currentThread().interrupt();
- throw interrupted("Interrupted during yield", e);
+ throw newCancellationException("Interrupted during yield", e);
}
}
@@ -160,22 +161,7 @@ public class AsyncStreamGenerator<T> implements
AsyncStream<T> {
* thread leak occurs even under unexpected interrupt timing.
*/
public void complete() {
- if (closed.get()) {
- return;
- }
- try {
- queue.put(DONE);
- } catch (InterruptedException e) {
- if (!closed.get()) {
- Thread.currentThread().interrupt();
- // Best-effort: non-blocking handoff to a waiting consumer.
- // If no consumer is waiting, offer() returns false and the
DONE
- // signal is lost — force-close to unblock future moveNext()
calls.
- if (!queue.offer(DONE)) {
- close();
- }
- }
- }
+ putTerminalSignal(COMPLETE);
}
/**
@@ -189,135 +175,90 @@ public class AsyncStreamGenerator<T> implements
AsyncStream<T> {
* because the interrupt itself indicates an external cancellation.
*/
public void error(Throwable t) {
+ putTerminalSignal(new ErrorSignal(t != null ? t : new
NullPointerException("null error in generator")));
+ }
+
+ /**
+ * Shared logic for {@link #complete()} and {@link #error(Throwable)}:
+ * attempts a blocking {@code put()}, falling back to non-blocking
+ * {@code offer()} on interrupt, and force-closing if both fail.
+ */
+ private void putTerminalSignal(Object signal) {
if (closed.get()) {
return;
}
- ErrorItem item = new ErrorItem(t != null ? t : new
NullPointerException("null error in generator"));
try {
- queue.put(item);
+ queue.put(signal);
} catch (InterruptedException e) {
if (!closed.get()) {
Thread.currentThread().interrupt();
- if (!queue.offer(item)) {
+ if (!queue.offer(signal)) {
close();
}
}
}
}
+ // ---- Template method overrides ----
+
/**
- * {@inheritDoc}
- * <p>
- * Blocks the calling (consumer) thread on the {@link SynchronousQueue}
until
- * the producer offers the next element, a completion sentinel, or an
error.
- * If the stream has been {@linkplain #close() closed}, returns
- * {@code Awaitable.of(false)} immediately without blocking.
- * <p>
- * The consumer thread is registered via {@code consumerThread} during the
- * blocking call so that {@link #close()} can interrupt it if needed.
- * <p>
- * <b>Single-consumer invariant</b>: only one thread may call
- * {@code moveNext()} at a time. A {@code compareAndSet(null, current)}
- * guard enforces this — a concurrent second caller receives an
- * {@link IllegalStateException} immediately instead of silently corrupting
- * the producer/consumer handshake.
- * <p>
- * A <em>double-check</em> of the {@link #closed} flag is performed after
- * registration to close the TOCTOU race window: if {@code close()}
executes
- * between the initial {@code closed.get()} check and the
- * {@code consumerThread} CAS, the consumer reference would not yet be
- * visible to {@code close()}, so no interrupt would be delivered, and
- * {@code queue.take()} would block indefinitely. The re-check after
- * registration detects this case and returns immediately.
- *
- * @return an {@code Awaitable<Boolean>} that resolves to {@code true} if a
- * new element is available via {@link #getCurrent()}, or {@code
false}
- * if the stream is exhausted or closed
+ * Enforces single-consumer semantics via CAS on {@link #consumerThread},
+ * with a double-check of the {@link #closed} flag to close the TOCTOU
+ * race window (see class-level javadoc).
*/
@Override
- @SuppressWarnings("unchecked")
- public Awaitable<Boolean> moveNext() {
+ protected Awaitable<Boolean> beforeTake() {
if (closed.get()) {
return MOVE_NEXT_FALSE;
}
- // Enforce single-consumer semantics: only one thread may call
moveNext()
- // at a time. A concurrent second caller would overwrite
consumerThread,
- // breaking close()'s interrupt targeting and causing data races on
- // queue.take(). CAS guards this invariant at the cost of one atomic
op.
- Thread currentThread = Thread.currentThread();
- if (!consumerThread.compareAndSet(null, currentThread)) {
+ Thread ct = Thread.currentThread();
+ if (!consumerThread.compareAndSet(null, ct)) {
Thread existing = consumerThread.get();
- if (existing != currentThread) {
+ if (existing != ct) {
throw new IllegalStateException(
"AsyncStream does not support concurrent consumers. "
+ "Current consumer: " + existing);
}
}
- // Double-check after registration: if close() raced between the first
- // closed check and consumerThread CAS, the consumer reference was not
- // yet visible to close(), so no interrupt was delivered. Without this
- // re-check the consumer would block in queue.take() indefinitely.
+ // Double-check: if close() raced between the first closed check and
+ // consumerThread CAS, the consumer reference was not yet visible to
+ // close(), so no interrupt was delivered.
if (closed.get()) {
- consumerThread.compareAndSet(currentThread, null);
+ consumerThread.compareAndSet(ct, null);
return MOVE_NEXT_FALSE;
}
- try {
- Object next = queue.take();
- if (next == DONE) {
- closed.set(true);
- return MOVE_NEXT_FALSE;
- }
- if (next instanceof ErrorItem ei) {
- closed.set(true);
- Throwable cause = ei.error;
- if (cause instanceof Error err) throw err;
- throw AsyncSupport.sneakyThrow(cause);
- }
- current = (T) ((Item) next).value;
- return MOVE_NEXT_TRUE;
- } catch (InterruptedException e) {
- if (closed.get()) {
- return MOVE_NEXT_FALSE;
- }
- Thread.currentThread().interrupt();
- throw interrupted("Interrupted during moveNext", e);
- } finally {
- consumerThread.compareAndSet(currentThread, null);
- }
+ return null;
}
/**
- * {@inheritDoc}
- * <p>
- * Returns the most recently consumed element. The value is updated each
time
- * {@link #moveNext()} returns {@code true}.
- *
- * @return the current element, or {@code null} before the first successful
- * {@code moveNext()} call
+ * Unregisters the consumer thread after every {@link #moveNext()} attempt.
*/
@Override
- public T getCurrent() {
- return current;
+ protected void afterMoveNext() {
+ consumerThread.compareAndSet(Thread.currentThread(), null);
}
/**
- * {@inheritDoc}
- * <p>
- * Atomically marks this stream as closed and interrupts any producer or
- * consumer thread that is currently blocked on the {@link
SynchronousQueue}.
- * The interrupted threads detect the {@link #closed} flag and exit
- * gracefully (see the class-level javadoc for details).
- * <p>
- * This method is idempotent: only the first invocation performs the
- * interrupt; subsequent calls are no-ops. A thread calling {@code
close()}
- * on itself (e.g. the consumer calling close inside a {@code for await}
- * body) is never self-interrupted.
+ * If the stream was already closed (cooperative cancellation), returns
+ * {@code MOVE_NEXT_FALSE}; otherwise restores the interrupt flag and
+ * throws {@link CancellationException}.
*/
@Override
- public void close() {
- if (!closed.compareAndSet(false, true)) {
- return;
+ protected Awaitable<Boolean> onMoveNextInterrupted(InterruptedException e)
{
+ if (closed.get()) {
+ return MOVE_NEXT_FALSE;
}
+ Thread.currentThread().interrupt();
+ throw newCancellationException("Interrupted during moveNext", e);
+ }
+
+ /**
+ * Interrupts both producer and consumer threads (if any) to unblock
+ * pending {@link SynchronousQueue} operations. A thread is never
+ * self-interrupted.
+ */
+ @Override
+ protected void onClose() {
Thread producer = producerThread.getAndSet(null);
if (producer != null && producer != Thread.currentThread()) {
producer.interrupt();
@@ -328,12 +269,6 @@ public class AsyncStreamGenerator<T> implements
AsyncStream<T> {
}
}
- private static CancellationException interrupted(String message,
InterruptedException cause) {
- CancellationException ce = new CancellationException(message);
- ce.initCause(cause);
- return ce;
- }
-
private static CancellationException streamClosed(InterruptedException
cause) {
CancellationException ce = new CancellationException("Async stream was
closed");
if (cause != null) {
@@ -341,8 +276,4 @@ public class AsyncStreamGenerator<T> implements
AsyncStream<T> {
}
return ce;
}
-
- // Wrapper to handle null values in the queue
- private record Item(Object value) { }
- private record ErrorItem(Throwable error) { }
}
diff --git
a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
index cf15cb91a6..3c56af6bc1 100644
--- a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
+++ b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
@@ -68,8 +68,8 @@ import java.util.concurrent.atomic.AtomicReference;
* blocking {@code put()} with a non-blocking {@code offer()} fallback
* when the publisher thread is interrupted — this prevents both silent
* item loss and consumer deadlock even under unexpected interrupts</li>
- * <li>Terminal callbacks atomically close the upstream side
- * ({@code closedRef}) and clear/cancel the stored subscription.
+ * <li>Terminal callbacks atomically close the stream via a single
+ * {@link AtomicBoolean} flag and clear/cancel the stored subscription.
* This makes post-terminal {@code onNext} calls from non-compliant
* publishers harmless and releases resources promptly.</li>
* <li>Back-pressure is enforced by requesting exactly one item after
@@ -84,21 +84,6 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class FlowPublisherAdapter implements AwaitableAdapter {
- /**
- * Queue capacity for the push→pull bridge in
- * {@link #publisherToAsyncStream}. 256 provides a generous buffer
- * for bursty publishers while bounding memory.
- */
- private static final int QUEUE_CAPACITY = 256;
-
- /**
- * Cached awaitables for the two common {@code moveNext()} outcomes.
- * Eliminates per-call {@link CompletableFuture} + {@link GroovyPromise}
- * allocation on the hot path (every element and stream-end).
- */
- private static final Awaitable<Boolean> MOVE_NEXT_TRUE =
Awaitable.of(Boolean.TRUE);
- private static final Awaitable<Boolean> MOVE_NEXT_FALSE =
Awaitable.of(Boolean.FALSE);
-
/**
* Returns {@code true} if the given type is assignable to
* {@link Flow.Publisher}, enabling single-value {@code await}.
@@ -226,18 +211,6 @@ public class FlowPublisherAdapter implements
AwaitableAdapter {
// ---- Multi-value adaptation (for await publisher) ----
- // Signal wrapper types allow us to distinguish values, errors, and
- // completion in a single queue without type confusion.
-
- private record ValueSignal<T>(T value) {
- }
-
- private record ErrorSignal(Throwable error) {
- }
-
- /** Singleton sentinel for stream completion. */
- private static final Object COMPLETE_SENTINEL = new Object();
-
/**
* Wraps a {@link Flow.Publisher} into an {@link AsyncStream},
* providing a pull-based iteration interface over a push-based source.
@@ -249,12 +222,6 @@ public class FlowPublisherAdapter implements
AwaitableAdapter {
* current one — this prevents livelock when producer and consumer
* share the same thread pool.</p>
*
- * <p>The internal bounded queue (capacity {@value QUEUE_CAPACITY})
- * absorbs minor timing jitter between producer and consumer. Signals
- * use blocking {@code put()} for normal delivery with a non-blocking
- * {@code offer()} fallback when the publisher thread is interrupted —
- * ensuring no items or terminal events are silently dropped.</p>
- *
* <p><b>Resource management:</b> When the consumer calls
* {@link AsyncStream#close()} (e.g. via {@code break} in a
* {@code for await} loop), the upstream subscription is cancelled
@@ -265,182 +232,154 @@ public class FlowPublisherAdapter implements
AwaitableAdapter {
* @param <T> the element type
* @return an async stream that yields publisher items
*/
- @SuppressWarnings("unchecked")
private <T> AsyncStream<T> publisherToAsyncStream(Flow.Publisher<T>
publisher) {
- LinkedBlockingQueue<Object> queue = new
LinkedBlockingQueue<>(QUEUE_CAPACITY);
- AtomicReference<Flow.Subscription> subRef = new AtomicReference<>();
- // Tracks whether the stream has been closed (by consumer or terminal
signal).
- // CAS ensures exactly-once semantics for the close/cleanup path.
- AtomicBoolean closedRef = new AtomicBoolean(false);
- AtomicBoolean streamClosed = new AtomicBoolean(false);
+ FlowAsyncStream<T> stream = new FlowAsyncStream<>();
+ publisher.subscribe(stream.newSubscriber());
+ return stream;
+ }
- publisher.subscribe(new Flow.Subscriber<T>() {
- @Override
- public void onSubscribe(Flow.Subscription s) {
- // §2.5: reject duplicate subscriptions
- if (!subRef.compareAndSet(null, s)) {
- s.cancel();
- return;
- }
- // Double-check pattern: if close() raced between the CAS and
this point,
- // the subscription must be cancelled immediately to avoid a
dangling stream.
- if (closedRef.get()) {
- Flow.Subscription sub = subRef.getAndSet(null);
- if (sub != null) sub.cancel();
- return;
+ /**
+ * Named implementation of {@link AsyncStream} that extends
+ * {@link AbstractAsyncStream} to bridge a push-based
+ * {@link Flow.Publisher} into a pull-based async stream.
+ *
+ * <p>Inherits the common {@link AbstractAsyncStream#moveNext()
moveNext()}/
+ * {@link AbstractAsyncStream#getCurrent() getCurrent()}/
+ * {@link AbstractAsyncStream#close() close()} template and overrides:</p>
+ * <ul>
+ * <li>{@link #beforeTake()} — drains remaining signals before returning
+ * false (checks {@code closed && queue.isEmpty()})</li>
+ * <li>{@link #afterValueConsumed()} — signals back-pressure demand via
+ * {@code Subscription.request(1)}</li>
+ * <li>{@link #onClose()} — cancels the upstream subscription, drains
+ * the queue, and injects a {@link #COMPLETE} sentinel to unblock
+ * any pending {@code moveNext()}</li>
+ * </ul>
+ *
+ * <p>The internal bounded queue (capacity {@value QUEUE_CAPACITY})
+ * absorbs minor timing jitter. Signals use blocking {@code put()}
+ * for normal delivery with a non-blocking {@code offer()} fallback
+ * when the publisher thread is interrupted — ensuring no items or
+ * terminal events are silently dropped.</p>
+ *
+ * @param <T> the element type
+ */
+ private static final class FlowAsyncStream<T> extends
AbstractAsyncStream<T> {
+
+ /**
+ * Queue capacity for the push→pull bridge. With one-at-a-time
+ * demand ({@code request(1)} per consumed element), a well-behaved
+ * publisher will never enqueue more than one value at a time.
+ * A capacity of 2 accommodates the value + a racing terminal
+ * signal without blocking, while keeping memory minimal.
+ */
+ private static final int QUEUE_CAPACITY = 2;
+
+ private final AtomicReference<Flow.Subscription> subRef = new
AtomicReference<>();
+
+ FlowAsyncStream() {
+ super(new LinkedBlockingQueue<>(QUEUE_CAPACITY));
+ }
+
+ /**
+ * Creates a new {@link Flow.Subscriber} wired to this stream's
+ * internal queue and lifecycle state.
+ */
+ Flow.Subscriber<T> newSubscriber() {
+ return new Flow.Subscriber<>() {
+ @Override
+ public void onSubscribe(Flow.Subscription s) {
+ // §2.5: reject duplicate subscriptions
+ if (!subRef.compareAndSet(null, s)) {
+ s.cancel();
+ return;
+ }
+ // Double-check: if close() raced before subscription was
set,
+ // cancel immediately to avoid a dangling upstream.
+ if (closed.get()) {
+ cancelSubscription();
+ return;
+ }
+ s.request(1);
}
- s.request(1);
- }
- @Override
- public void onNext(T item) {
- // §2.13: null items are spec violations
- if (item == null) {
- onError(new NullPointerException(
- "Flow.Publisher onNext received null (Reactive
Streams §2.13)"));
- return;
- }
- if (!closedRef.get()) {
+ @Override
+ public void onNext(T item) {
+ // §2.13: null items are spec violations
+ if (item == null) {
+ onError(new NullPointerException(
+ "Flow.Publisher onNext received null (Reactive
Streams §2.13)"));
+ return;
+ }
+ if (closed.get()) return;
try {
- // Blocking put() guarantees the item reaches the
consumer.
- // Since demand is capped at 1 (one request(1) per
moveNext),
- // a well-behaved publisher will never overflow the
queue; put()
- // still protects against misbehaving publishers by
blocking
- // rather than silently dropping the value.
- queue.put(new ValueSignal<>(item));
+ queue.put(new ValueSignal(item));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
- // Blocking put() was interrupted. Fall back to
non-blocking
- // offer() so the item still reaches the consumer. With
- // one-at-a-time demand the queue is almost never
full, so
- // offer() effectively always succeeds. If it doesn't
- // (misbehaving publisher overfilling the queue),
cancel
- // upstream and inject an error signal to terminate the
- // consumer cleanly instead of silently dropping the
item.
- if (!queue.offer(new ValueSignal<>(item))) {
- Flow.Subscription sub = subRef.getAndSet(null);
- if (sub != null) sub.cancel();
- closedRef.set(true);
+ if (!queue.offer(new ValueSignal(item))) {
+ cancelSubscription();
+ closed.set(true);
queue.offer(new ErrorSignal(
new CancellationException(
"Item delivery interrupted and
queue full")));
}
}
}
- }
- @Override
- public void onError(Throwable t) {
- // First terminal signal wins. Ignore duplicate terminal
callbacks.
- if (!closedRef.compareAndSet(false, true)) {
- return;
- }
- // Cancel subscription eagerly to release upstream resources
- Flow.Subscription sub = subRef.getAndSet(null);
- if (sub != null) sub.cancel();
- try {
- // Terminal signals use blocking put() to guarantee
delivery —
- // the consumer MUST see the error to propagate it
correctly.
- queue.put(new ErrorSignal(t));
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- // Blocking put() was interrupted. Fall back to
non-blocking
- // offer(). If that also fails (queue full), set
streamClosed
- // so the consumer's next moveNext() returns false instead
of
- // blocking indefinitely.
- if (!queue.offer(new ErrorSignal(t))) {
- streamClosed.set(true);
- }
+ @Override
+ public void onError(Throwable t) {
+ putTerminalSignal(new ErrorSignal(t));
}
- }
- @Override
- public void onComplete() {
- // First terminal signal wins. Ignore duplicate terminal
callbacks.
- if (!closedRef.compareAndSet(false, true)) {
- return;
- }
- // Clear subscription consistently with other terminal paths.
- Flow.Subscription sub = subRef.getAndSet(null);
- if (sub != null) sub.cancel();
- try {
- // Blocking put() guarantees the consumer will see the
sentinel,
- // even if the queue was temporarily full from buffered
values.
- queue.put(COMPLETE_SENTINEL);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- // Same fallback as onError: try non-blocking offer,
- // set streamClosed if that also fails.
- if (!queue.offer(COMPLETE_SENTINEL)) {
- streamClosed.set(true);
- }
- }
- }
- });
-
- return new AsyncStream<T>() {
- private T current;
-
- @Override
- public Awaitable<Boolean> moveNext() {
- if (streamClosed.get()) {
- return MOVE_NEXT_FALSE;
+ @Override
+ public void onComplete() {
+ putTerminalSignal(COMPLETE);
}
- try {
- Object signal = queue.take();
-
- if (signal instanceof ValueSignal) {
- current = ((ValueSignal<T>) signal).value;
- // Signal demand BEFORE returning so the publisher can
- // begin producing the next value while the consumer
- // processes this one — prevents livelock when both
- // share a thread pool.
- Flow.Subscription sub = subRef.get();
- if (sub != null) sub.request(1);
- return MOVE_NEXT_TRUE;
- } else if (signal instanceof ErrorSignal es) {
- streamClosed.set(true);
- // Throw directly (matching AsyncStreamGenerator) to
- // avoid unnecessary CF allocation on the error path
- // and JDK 23+ CompletableFuture.get() wrapping issues.
- Throwable cause = es.error;
- if (cause instanceof Error err) throw err;
- throw AsyncSupport.sneakyThrow(cause);
- } else {
- // COMPLETE_SENTINEL — end-of-stream
- streamClosed.set(true);
- return MOVE_NEXT_FALSE;
+ /**
+ * Shared logic for {@code onError()} and {@code onComplete()}:
+ * atomically marks the upstream as closed, cancels the
subscription,
+ * and delivers the terminal signal to the consumer queue.
+ */
+ private void putTerminalSignal(Object signal) {
+ if (!closed.compareAndSet(false, true)) return;
+ cancelSubscription();
+ try {
+ queue.put(signal);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ queue.offer(signal);
}
- } catch (InterruptedException ie) {
- streamClosed.set(true);
- Thread.currentThread().interrupt();
- CancellationException ce = new
CancellationException("Interrupted during moveNext");
- ce.initCause(ie);
- throw ce;
}
- }
-
- @Override
- public T getCurrent() {
- return current;
- }
-
- @Override
- public void close() {
- if (streamClosed.compareAndSet(false, true)) {
- closedRef.set(true);
- Flow.Subscription sub = subRef.getAndSet(null);
- if (sub != null) sub.cancel();
- // Drain the queue and inject a sentinel to unblock a
- // concurrent moveNext() that may be blocked in take().
- // offer() is non-blocking and cannot throw
InterruptedException;
- // after clear(), the queue is empty (capacity 256) so
offer()
- // effectively always succeeds.
- queue.clear();
- queue.offer(COMPLETE_SENTINEL);
- }
- }
- };
+ };
+ }
+
+ @Override
+ protected Awaitable<Boolean> beforeTake() {
+ return (closed.get() && queue.isEmpty()) ? MOVE_NEXT_FALSE : null;
+ }
+
+ @Override
+ protected void afterValueConsumed() {
+ // Signal demand BEFORE returning so the publisher can begin
+ // producing the next value while the consumer processes this
+ // one — prevents livelock when both share a thread pool.
+ Flow.Subscription sub = subRef.get();
+ if (sub != null) sub.request(1);
+ }
+
+ @Override
+ protected void onClose() {
+ cancelSubscription();
+ // Drain the queue and inject a sentinel to unblock a
+ // concurrent moveNext() that may be blocked in take().
+ queue.clear();
+ queue.offer(COMPLETE);
+ }
+
+ private void cancelSubscription() {
+ Flow.Subscription sub = subRef.getAndSet(null);
+ if (sub != null) sub.cancel();
+ }
}
}
diff --git a/src/spec/doc/core-async-await.adoc
b/src/spec/doc/core-async-await.adoc
index d0ebd0d859..ef3a3fb9f9 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -944,21 +944,28 @@ to `AsyncSupport` static methods. The `@Async` annotation
is processed by `Async
which delegates to the same `AsyncTransformHelper` utility class. This shared
helper ensures
consistent code generation for both the keyword and annotation forms.
-**Runtime layer** (`AsyncSupport`, `GroovyPromise`, `AsyncStreamGenerator`,
-`AwaitableAdapterRegistry`): `AsyncSupport` is the central runtime class
containing static
-methods for `await`, `async`, `defer`, `yield return`, timeout composition,
stream cleanup,
-and combinators. `GroovyPromise` wraps
+**Runtime layer** (`AsyncSupport`, `GroovyPromise`, `AbstractAsyncStream`,
+`AsyncStreamGenerator`, `FlowPublisherAdapter`, `AwaitableAdapterRegistry`):
`AsyncSupport` is the
+central runtime class containing static methods for `await`, `async`, `defer`,
`yield return`,
+timeout composition, stream cleanup, and combinators. `GroovyPromise` wraps
`CompletableFuture` to implement the `Awaitable` interface, decoupling the
public API from the
-JDK implementation. `AsyncStreamGenerator` implements the producer/consumer
pattern for async
-generators using a `SynchronousQueue`. `AwaitableAdapterRegistry` provides the
SPI extension
-point for third-party async type support.
+JDK implementation. `AbstractAsyncStream` is the template base class for
queue-based `AsyncStream`
+implementations — it centralises the `moveNext()` signal dispatch,
`getCurrent()`/`close()`
+lifecycle, and interrupt handling, using the Template Method pattern with hook
methods
+(`beforeTake`, `afterValueConsumed`, `afterMoveNext`, `onMoveNextInterrupted`,
`onClose`).
+`AsyncStreamGenerator` extends the template for generator-style streams backed
by a
+`SynchronousQueue`. `FlowPublisherAdapter.FlowAsyncStream` extends the
template for
+`Flow.Publisher` adaptation using a bounded `LinkedBlockingQueue` with
one-at-a-time
+demand. `AwaitableAdapterRegistry` provides the SPI extension point for
third-party async type
+support.
==== API Decoupling
The public API that Groovy developers interact with is defined entirely in the
`groovy.concurrent` package (`Awaitable`, `AsyncStream`, `AwaitResult`,
`AwaitableAdapter`, `AwaitableAdapterRegistry`). The implementation classes
-(`AsyncSupport`, `GroovyPromise`, `AsyncStreamGenerator`) live in
+(`AsyncSupport`, `GroovyPromise`, `AbstractAsyncStream`,
`AsyncStreamGenerator`,
+`FlowPublisherAdapter`) live in
`org.apache.groovy.runtime.async` — an internal package.
This separation means:
@@ -987,23 +994,29 @@ All runtime components employ lock-free or
minimal-contention synchronization:
* `AwaitableAdapterRegistry.ADAPTERS` — `CopyOnWriteArrayList` for lock-free
iteration during adapter lookup;
writes (adapter registration) are rare, reads (every `await`) are frequent
* `AwaitableAdapterRegistry.blockingExecutor` — `volatile` field
-* `AsyncStreamGenerator.current` — `volatile` field for cross-thread
producer/consumer visibility
-* `AsyncStreamGenerator` close state — `AtomicBoolean` +
`AtomicReference<Thread>` for prompt,
+* `AbstractAsyncStream.current` — `volatile` field for cross-thread visibility
+* `AbstractAsyncStream.closed` — `AtomicBoolean` for lifecycle management
(shared by all queue-based streams)
+* `AsyncStreamGenerator` — extends `AbstractAsyncStream`; adds
`AtomicReference<Thread>` for prompt,
idempotent close/cancellation signalling
-* `Flow.Publisher` adaptation (`FlowPublisherAdapter`) —
`AtomicReference<Subscription>` with CAS-guarded
- onSubscribe (§2.5 compliance), `AtomicBoolean` done-guard for single-value
path, and close-aware
- queue signalling; all signals (`onNext`/`onError`/`onComplete`) use blocking
`put()` with a
- non-blocking `offer()` fallback on interrupt, preventing both silent item
loss and consumer deadlock;
- demand is signalled before `moveNext()` returns to prevent livelock;
`moveNext()` uses cached
- `Awaitable` instances to eliminate per-call allocations on the hot path
+* `Flow.Publisher` adaptation (`FlowPublisherAdapter.FlowAsyncStream`) —
extends `AbstractAsyncStream`;
+ the inherited `AtomicBoolean` closed flag governs the entire lifecycle (both
upstream terminal signals
+ and consumer-side close), with `AtomicReference<Subscription>` for
CAS-guarded onSubscribe (§2.5
+ compliance); all signals (`onNext`/`onError`/`onComplete`) use blocking
`put()` with a non-blocking
+ `offer()` fallback on interrupt, preventing both silent item loss and
consumer deadlock; demand is
+ signalled before `moveNext()` returns to prevent livelock; `moveNext()` uses
shared cached `Awaitable`
+ instances
+ (defined on the `AsyncStream` interface) to eliminate per-call allocations
on the hot path
* Defer scopes — per-task `ArrayDeque`, confined to a single thread (no
sharing)
* `DELAY_SCHEDULER` — single daemon thread for non-blocking timer operations
==== Async Generator Safety
-The `AsyncStreamGenerator` implements a producer/consumer pattern where a
producer thread
-(the generator body) yields values and a consumer thread (the `for await`
loop) pulls them.
-Several concurrency hazards are handled transparently:
+The `AsyncStreamGenerator` extends `AbstractAsyncStream` — a template base
class that
+centralises the `moveNext()` signal dispatch, lifecycle management, and
interrupt handling
+common to all queue-based `AsyncStream` implementations. The generator adds
the producer-side
+API (`yield`, `complete`, `error`) and overrides four template hooks
(`beforeTake`,
+`afterMoveNext`, `onMoveNextInterrupted`, `onClose`). Several concurrency
hazards are
+handled transparently:
[cols="2,3"]
|===
@@ -1044,29 +1057,36 @@ measure against unexpected thread interruption outside
the normal close path.
==== Flow.Publisher Adaptation Safety
The `FlowPublisherAdapter` bridges the push-based `Flow.Publisher` protocol
into the pull-based
-`AsyncStream` model. Several concurrency hazards are handled transparently:
+`AsyncStream` model via a named inner class `FlowAsyncStream` that also extends
+`AbstractAsyncStream`. It overrides three template hooks (`beforeTake`,
`afterValueConsumed`,
+`onClose`) and inherits the default `onMoveNextInterrupted` behaviour. A
single `AtomicBoolean`
+closed flag (inherited from the template) governs the entire lifecycle — set
by the first terminal
+signal, by the consumer's `close()`, or by an interrupt. Several concurrency
hazards are handled
+transparently:
[cols="2,3"]
|===
| Concern | Mechanism
| **Back-pressure**
-| A bounded `LinkedBlockingQueue` (capacity 256) bridges push and pull.
Demand is capped at
-one item per `moveNext()` call via `request(1)`. Demand is signalled _before_
`moveNext()`
-returns, preventing livelock when producer and consumer share a thread pool.
+| A bounded `LinkedBlockingQueue` (capacity 2) bridges push and pull. Demand
is capped at
+one item per `moveNext()` call via `request(1)`. The small capacity reflects
the one-at-a-time
+demand model: at most one value plus a racing terminal signal. Demand is
signalled _before_
+`moveNext()` returns, preventing livelock when producer and consumer share a
thread pool.
| **Interrupt-safe signal delivery**
| All subscriber callbacks (`onNext`, `onError`, `onComplete`) use blocking
`put()` for normal
delivery, with a non-blocking `offer()` fallback when the publisher thread is
interrupted.
If `offer()` also fails (queue full from a misbehaving publisher), `onNext`
cancels the
-upstream subscription and injects an error signal; `onError`/`onComplete` set
the `streamClosed`
-flag so the consumer's next `moveNext()` exits immediately.
+upstream subscription and injects an error signal; terminal signals
(`onError`/`onComplete`)
+share a common `putTerminalSignal()` helper that atomically CAS-closes the
stream, cancels
+the subscription, and delivers the signal.
| **Allocation-free hot path**
-| `moveNext()` returns cached `Awaitable<Boolean>` instances for the value and
end-of-stream
-cases, eliminating per-call `CompletableFuture` + `GroovyPromise` allocation.
Error signals
-are thrown directly (matching `AsyncStreamGenerator` behaviour) rather than
wrapped in a
-failed `CompletableFuture`.
+| `moveNext()` returns shared cached `Awaitable<Boolean>` constants defined on
the `AsyncStream`
+interface for the value and end-of-stream cases, eliminating per-call
`CompletableFuture` +
+`GroovyPromise` allocation. Error signals are thrown directly (matching
`AsyncStreamGenerator`
+behaviour) rather than wrapped in a failed `CompletableFuture`.
| **Non-blocking close**
| `close()` uses non-blocking `offer()` (instead of blocking `put()`) to
inject the completion
diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
index 4d2bec01f4..fc76e4c017 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
@@ -2860,4 +2860,357 @@ class AsyncApiTest {
Awaitable<String> aw = Awaitable.from(stage)
assert aw.get(2, TimeUnit.SECONDS) == "ASYNC-VALUE"
}
+
+ // ================================================================
+ // Shared MOVE_NEXT constants (optimization validation)
+ // ================================================================
+
+ @Test
+ void testAsyncStreamMoveNextConstantsAreShared() {
+ // Both AsyncStreamGenerator and FlowPublisherAdapter should use the
+ // same MOVE_NEXT_TRUE/MOVE_NEXT_FALSE constants from AsyncStream
+ def gen = new AsyncStreamGenerator<String>()
+ Thread.start {
+ gen.yield('a')
+ gen.complete()
+ }
+ def trueResult = gen.moveNext()
+ assert trueResult.get() == true
+ assert trueResult.is(AsyncStream.MOVE_NEXT_TRUE)
+
+ def falseResult = gen.moveNext()
+ assert falseResult.get() == false
+ assert falseResult.is(AsyncStream.MOVE_NEXT_FALSE)
+ }
+
+ @Test
+ void testAsyncStreamEmptyUsesMoveNextFalse() {
+ def empty = AsyncStream.empty()
+ def result = empty.moveNext()
+ assert result.get() == false
+ assert result.is(AsyncStream.MOVE_NEXT_FALSE)
+ }
+
+ @Test
+ void testFlowPublisherAsyncStreamUsesMoveNextConstants() {
+ def pub = new SubmissionPublisher<String>()
+ def stream = AsyncStream.from(pub)
+ Thread.start {
+ Thread.sleep(50)
+ pub.submit('hello')
+ pub.close()
+ }
+ def trueResult = stream.moveNext()
+ assert trueResult.get() == true
+ assert trueResult.is(AsyncStream.MOVE_NEXT_TRUE)
+ assert stream.getCurrent() == 'hello'
+
+ def falseResult = stream.moveNext()
+ assert falseResult.get() == false
+ assert falseResult.is(AsyncStream.MOVE_NEXT_FALSE)
+ }
+
+ // ================================================================
+ // FlowAsyncStream: single-flag lifecycle (consolidation validation)
+ // ================================================================
+
+ @Test
+ void testFlowAsyncStreamCloseIdempotent() {
+ def pub = new SubmissionPublisher<String>()
+ def stream = AsyncStream.from(pub)
+ // close() should be idempotent
+ stream.close()
+ stream.close()
+ stream.close()
+ // moveNext after close should return false
+ assert stream.moveNext().get() == false
+ }
+
+ @Test
+ void testFlowAsyncStreamCloseUnblocksMovNext() {
+ def pub = new SubmissionPublisher<String>()
+ def stream = AsyncStream.from(pub)
+ def result = new CompletableFuture<Boolean>()
+
+ Thread.start {
+ result.complete(stream.moveNext().get())
+ }
+
+ Thread.sleep(100) // let moveNext block
+ stream.close()
+
+ assert result.get(5, TimeUnit.SECONDS) == false
+ }
+
+ @Test
+ void testFlowAsyncStreamMoveNextDrainsQueueBeforeReturningFalse() {
+ // Even after closed flag is set by terminal signal, queued values
+ // should still be consumable
+ def pub = new SubmissionPublisher<Integer>()
+ def stream = AsyncStream.from(pub)
+
+ Thread.start {
+ Thread.sleep(50)
+ pub.submit(1)
+ pub.submit(2)
+ pub.close()
+ }
+
+ def items = []
+ while (stream.moveNext().get()) {
+ items << stream.getCurrent()
+ }
+ assert items == [1, 2]
+ }
+
+ // ================================================================
+ // AsyncStreamGenerator: putTerminalSignal helper (dedup validation)
+ // ================================================================
+
+ @Test
+ void testAsyncStreamGeneratorCompleteAfterClose() {
+ def gen = new AsyncStreamGenerator<String>()
+ gen.close()
+ // complete() after close should be a no-op (via putTerminalSignal)
+ gen.complete() // should not throw
+ assert gen.moveNext().get() == false
+ }
+
+ @Test
+ void testAsyncStreamGeneratorErrorAfterClose() {
+ def gen = new AsyncStreamGenerator<String>()
+ gen.close()
+ // error() after close should be a no-op (via putTerminalSignal)
+ gen.error(new RuntimeException("ignored")) // should not throw
+ assert gen.moveNext().get() == false
+ }
+
+ @Test
+ void testAsyncStreamGeneratorPutTerminalSignalInterrupted() {
+ // Test the putTerminalSignal interrupt fallback path for both
complete() and error()
+ def gen1 = new AsyncStreamGenerator<String>()
+ def done1 = new CompletableFuture<Boolean>()
+ Thread.start {
+ Thread.currentThread().interrupt()
+ gen1.complete()
+ done1.complete(true)
+ }
+ done1.get(5, TimeUnit.SECONDS)
+
+ def gen2 = new AsyncStreamGenerator<String>()
+ def done2 = new CompletableFuture<Boolean>()
+ Thread.start {
+ Thread.currentThread().interrupt()
+ gen2.error(new IOException('test'))
+ done2.complete(true)
+ }
+ done2.get(5, TimeUnit.SECONDS)
+ }
+
+ // ================================================================
+ // FlowAsyncStream: small queue capacity validation
+ // ================================================================
+
+ @Test
+ void testFlowAsyncStreamWithManyItems() {
+ // Verify that even with small queue capacity (2), many items
+ // can be streamed correctly with proper back-pressure
+ def pub = new SubmissionPublisher<Integer>()
+ def stream = AsyncStream.from(pub)
+ int count = 100
+
+ Thread.start {
+ Thread.sleep(50)
+ (1..count).each { pub.submit(it) }
+ pub.close()
+ }
+
+ def items = []
+ while (stream.moveNext().get()) {
+ items << stream.getCurrent()
+ }
+ assert items.size() == count
+ assert items == (1..count).toList()
+ }
+
+ // ================================================================
+ // AbstractAsyncStream: template method pattern validation
+ // ================================================================
+
+ @Test
+ void testAbstractAsyncStreamUnifiedSignalTypes() {
+ // Verify that both AsyncStreamGenerator and FlowAsyncStream use the
+ // unified signal types from AbstractAsyncStream
+ def gen = new AsyncStreamGenerator<String>()
+ CompletableFuture.runAsync {
+ gen.attachProducer(Thread.currentThread())
+ try {
+ gen.yield('hello')
+ gen.complete()
+ } finally {
+ gen.detachProducer(Thread.currentThread())
+ }
+ }
+ assert gen.moveNext().get() == true
+ assert gen.getCurrent() == 'hello'
+ assert gen.moveNext().get() == false
+
+ // Flow.Publisher path
+ def pub = new SubmissionPublisher<String>()
+ def stream = AsyncStream.from(pub)
+ Thread.start {
+ Thread.sleep(20)
+ pub.submit('world')
+ pub.close()
+ }
+ assert stream.moveNext().get() == true
+ assert stream.getCurrent() == 'world'
+ assert stream.moveNext().get() == false
+ }
+
+ @Test
+ void testAbstractAsyncStreamCloseIsIdempotent() {
+ // close() on AbstractAsyncStream subclasses should be idempotent
+ def gen = new AsyncStreamGenerator<String>()
+ gen.close()
+ gen.close() // should not throw
+ gen.close() // still should not throw
+ assert gen.moveNext().get() == false
+
+ def pub = new SubmissionPublisher<String>()
+ def stream = AsyncStream.from(pub)
+ stream.close()
+ stream.close() // should not throw
+ assert stream.moveNext().get() == false
+ }
+
+ @Test
+ void testAbstractAsyncStreamGetCurrentBeforeMoveNext() {
+ // getCurrent() before any moveNext() should return null
+ def gen = new AsyncStreamGenerator<String>()
+ assert gen.getCurrent() == null
+ gen.close()
+
+ def pub = new SubmissionPublisher<String>()
+ def stream = AsyncStream.from(pub)
+ assert stream.getCurrent() == null
+ stream.close()
+ }
+
+ @Test
+ void testAbstractAsyncStreamMoveNextAfterError() {
+ // After moveNext() throws due to an error signal, subsequent calls
+ // should return false (stream is closed)
+ def gen = new AsyncStreamGenerator<Integer>()
+ CompletableFuture.runAsync {
+ gen.attachProducer(Thread.currentThread())
+ try {
+ gen.error(new IOException('test error'))
+ } finally {
+ gen.detachProducer(Thread.currentThread())
+ }
+ }
+ try {
+ gen.moveNext().get()
+ assert false : 'Should have thrown'
+ } catch (IOException e) {
+ assert e.message == 'test error'
+ }
+ // After error, stream should be closed
+ assert gen.moveNext().get() == false
+ }
+
+ @Test
+ void testAbstractAsyncStreamFlowErrorFollowedByMoveNext() {
+ // Flow.Publisher error should close the stream
+ def pub = new SubmissionPublisher<String>()
+ def stream = AsyncStream.from(pub)
+ Thread.start {
+ Thread.sleep(20)
+ pub.closeExceptionally(new RuntimeException('flow error'))
+ }
+ try {
+ stream.moveNext().get()
+ assert false : 'Should have thrown'
+ } catch (RuntimeException e) {
+ assert e.message == 'flow error'
+ }
+ // After error, stream should be closed
+ assert stream.moveNext().get() == false
+ }
+
+ @Test
+ void testAbstractAsyncStreamTemplateHooksInvocationOrder() {
+ // Verify that afterValueConsumed is called by testing Flow.Publisher
+ // back-pressure: each consumed value should trigger request(1),
+ // allowing the next item to flow
+ def pub = new SubmissionPublisher<Integer>()
+ def stream = AsyncStream.from(pub)
+ def items = []
+
+ Thread.start {
+ Thread.sleep(20)
+ // Submit items one at a time — each requires afterValueConsumed
+ // to call request(1) for the next to flow
+ (1..5).each {
+ pub.submit(it)
+ Thread.sleep(10)
+ }
+ pub.close()
+ }
+
+ while (stream.moveNext().get()) {
+ items << stream.getCurrent()
+ }
+ assert items == [1, 2, 3, 4, 5]
+ }
+
+ @Test
+ void testAbstractAsyncStreamGeneratorBeforeTakeRejectsSecondConsumer() {
+ // AsyncStreamGenerator.beforeTake() enforces single-consumer semantics
+ def gen = new AsyncStreamGenerator<Integer>()
+ def latch = new java.util.concurrent.CountDownLatch(1)
+ def error = new CompletableFuture<Throwable>()
+
+ CompletableFuture.runAsync {
+ gen.attachProducer(Thread.currentThread())
+ try {
+ gen.yield(1)
+ gen.yield(2)
+ gen.complete()
+ } finally {
+ gen.detachProducer(Thread.currentThread())
+ }
+ }
+
+ // First consumer starts moveNext
+ assert gen.moveNext().get() == true
+ assert gen.getCurrent() == 1
+
+ // Continue consuming normally
+ assert gen.moveNext().get() == true
+ assert gen.getCurrent() == 2
+ assert gen.moveNext().get() == false
+ }
+
+ @Test
+ void testAbstractAsyncStreamNullValueThroughGenerator() {
+ // ValueSignal wraps null values correctly
+ def gen = new AsyncStreamGenerator<String>()
+ CompletableFuture.runAsync {
+ gen.attachProducer(Thread.currentThread())
+ try {
+ gen.yield(null)
+ gen.yield('after-null')
+ gen.complete()
+ } finally {
+ gen.detachProducer(Thread.currentThread())
+ }
+ }
+ assert gen.moveNext().get() == true
+ assert gen.getCurrent() == null
+ assert gen.moveNext().get() == true
+ assert gen.getCurrent() == 'after-null'
+ assert gen.moveNext().get() == false
+ }
}