This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
     new 7504008b1b Minor tweaks
7504008b1b is described below

commit 7504008b1b899dc3cb6be9399da1b7646c0396e8
Author: Daniel Sun <[email protected]>
AuthorDate: Thu Mar 19 00:31:31 2026 +0900

    Minor tweaks
---
 src/main/java/groovy/concurrent/AsyncStream.java   |  16 +-
 .../groovy/runtime/async/AbstractAsyncStream.java  | 287 +++++++++++++++++
 .../groovy/runtime/async/AsyncStreamGenerator.java | 209 ++++--------
 .../groovy/runtime/async/FlowPublisherAdapter.java | 323 ++++++++-----------
 src/spec/doc/core-async-await.adoc                 |  78 +++--
 .../codehaus/groovy/transform/AsyncApiTest.groovy  | 353 +++++++++++++++++++++
 6 files changed, 905 insertions(+), 361 deletions(-)

diff --git a/src/main/java/groovy/concurrent/AsyncStream.java 
b/src/main/java/groovy/concurrent/AsyncStream.java
index 4baa81115a..318ddf6355 100644
--- a/src/main/java/groovy/concurrent/AsyncStream.java
+++ b/src/main/java/groovy/concurrent/AsyncStream.java
@@ -113,6 +113,20 @@ public interface AsyncStream<T> extends AutoCloseable {
         return (AsyncStream<T>) EMPTY;
     }
 
+    /**
+     * Cached awaitable for {@code moveNext()} returning {@code true}.
+     * Eliminates per-call allocation on the hot path.  Shared by all
+     * {@code AsyncStream} implementations (e.g. via
+     * {@link org.apache.groovy.runtime.async.AbstractAsyncStream 
AbstractAsyncStream}).
+     */
+    Awaitable<Boolean> MOVE_NEXT_TRUE = Awaitable.of(Boolean.TRUE);
+
+    /**
+     * Cached awaitable for {@code moveNext()} returning {@code false}.
+     * Eliminates per-call allocation on the stream-end path.
+     */
+    Awaitable<Boolean> MOVE_NEXT_FALSE = Awaitable.of(Boolean.FALSE);
+
     /**
      * Singleton empty stream instance.
      * <p>
@@ -121,7 +135,7 @@ public interface AsyncStream<T> extends AutoCloseable {
      * referencing this field directly.
      */
     AsyncStream<Object> EMPTY = new AsyncStream<>() {
-        @Override public Awaitable<Boolean> moveNext() { return 
Awaitable.of(false); }
+        @Override public Awaitable<Boolean> moveNext() { return 
MOVE_NEXT_FALSE; }
         @Override public Object getCurrent() { return null; }
     };
 }
diff --git 
a/src/main/java/org/apache/groovy/runtime/async/AbstractAsyncStream.java 
b/src/main/java/org/apache/groovy/runtime/async/AbstractAsyncStream.java
new file mode 100644
index 0000000000..2e70a12848
--- /dev/null
+++ b/src/main/java/org/apache/groovy/runtime/async/AbstractAsyncStream.java
@@ -0,0 +1,287 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.groovy.runtime.async;
+
+import groovy.concurrent.AsyncStream;
+import groovy.concurrent.Awaitable;
+
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Template base class for queue-based {@link AsyncStream} implementations.
+ *
+ * <p>This class implements the
+ * <a href="https://en.wikipedia.org/wiki/Template_method_pattern";>Template 
Method</a>
+ * pattern, centralising the signal dispatch logic, lifecycle management, and
+ * interrupt handling that is common to all queue-based async streams.
+ * Concrete subclasses only need to supply a {@link BlockingQueue} and override
+ * a small number of hook methods to customise behaviour.</p>
+ *
+ * <h2>Signal protocol</h2>
+ * <p>Elements flowing through the queue are wrapped in one of three signal 
types:</p>
+ * <ul>
+ *   <li>{@link ValueSignal} — carries a data element (may wrap {@code 
null})</li>
+ *   <li>{@link ErrorSignal} — carries a {@link Throwable} to propagate</li>
+ *   <li>{@link #COMPLETE} — singleton sentinel indicating normal 
end-of-stream</li>
+ * </ul>
+ * <p>The template's {@link #moveNext()} dispatches on these signals with a 
fixed
+ * sequence: value → set current + {@link #afterValueConsumed()} + return 
{@code true};
+ * error → set closed + sneaky-throw; complete → set closed + return {@code 
false}.</p>
+ *
+ * <h2>Hook methods (override points)</h2>
+ * <table>
+ *   <caption>Hook methods and their defaults</caption>
+ *   <tr><th>Hook</th><th>Default</th><th>Typical override</th></tr>
+ *   <tr><td>{@link #beforeTake()}</td><td>return {@code MOVE_NEXT_FALSE} if 
closed</td>
+ *       <td>thread registration, double-check, or drain check</td></tr>
+ *   <tr><td>{@link #afterValueConsumed()}</td><td>no-op</td>
+ *       <td>request more items from upstream (back-pressure)</td></tr>
+ *   <tr><td>{@link #afterMoveNext()}</td><td>no-op</td>
+ *       <td>unregister consumer thread</td></tr>
+ *   <tr><td>{@link #onMoveNextInterrupted(InterruptedException)}</td>
+ *       <td>set closed, restore interrupt, throw {@link 
CancellationException}</td>
+ *       <td>return {@code MOVE_NEXT_FALSE} if already closed</td></tr>
+ *   <tr><td>{@link #onClose()}</td><td><em>abstract</em></td>
+ *       <td>interrupt threads, cancel subscriptions, drain queue</td></tr>
+ * </table>
+ *
+ * <h2>Thread safety</h2>
+ * <p>The {@link #closed} flag is an {@link AtomicBoolean} shared between the
+ * producer (subclass-managed) and consumer ({@code moveNext()}) sides.
+ * The {@link #close()} method uses CAS to guarantee exactly-once semantics.
+ * The {@link #current} field is {@code volatile} for safe cross-thread 
visibility.</p>
+ *
+ * <p>This class is an internal implementation detail and should not be 
referenced
+ * directly by user code.</p>
+ *
+ * @param <T> the element type
+ * @see AsyncStreamGenerator
+ * @see FlowPublisherAdapter
+ * @since 6.0.0
+ */
+public abstract class AbstractAsyncStream<T> implements AsyncStream<T> {
+
+    // ---- Unified signal types ----
+
+    /**
+     * Wraps a data element for transport through the signal queue.
+     * The wrapper is necessary because the queue element type is {@code 
Object},
+     * and the actual value may be {@code null}.
+     */
+    protected record ValueSignal(Object value) { }
+
+    /**
+     * Wraps an error for transport through the signal queue.
+     * When dispatched by {@link #moveNext()}, the wrapped throwable is
+     * re-thrown via {@link AsyncSupport#sneakyThrow(Throwable)}.
+     */
+    protected record ErrorSignal(Throwable error) { }
+
+    /**
+     * Singleton sentinel indicating normal stream completion.
+     * Identity comparison ({@code ==}) is used in the dispatch logic.
+     */
+    protected static final Object COMPLETE = new Object();
+
+    // ---- Shared state ----
+
+    /** The signal queue bridging producer and consumer. */
+    protected final BlockingQueue<Object> queue;
+
+    /** Lifecycle flag: set exactly once when the stream is closed. */
+    protected final AtomicBoolean closed = new AtomicBoolean(false);
+
+    /** Most recently consumed value, set by {@link #moveNext()} on value 
signals. */
+    private volatile T current;
+
+    /**
+     * @param queue the blocking queue used for producer→consumer signal 
delivery;
+     *              must not be {@code null}
+     */
+    protected AbstractAsyncStream(BlockingQueue<Object> queue) {
+        this.queue = Objects.requireNonNull(queue, "queue");
+    }
+
+    // ---- Template method: moveNext ----
+
+    /**
+     * Template method implementing the {@link AsyncStream} iteration protocol.
+     *
+     * <p>Execution sequence:</p>
+     * <ol>
+     *   <li>{@link #beforeTake()} — may short-circuit with an early 
return</li>
+     *   <li>{@code queue.take()} — blocks until a signal is available</li>
+     *   <li>Signal dispatch: value / error / complete</li>
+     *   <li>{@link #afterMoveNext()} — always runs (finally block)</li>
+     * </ol>
+     *
+     * <p>If {@code queue.take()} throws {@link InterruptedException},
+     * {@link #onMoveNextInterrupted(InterruptedException)} handles it.</p>
+     *
+     * @return an {@code Awaitable<Boolean>} — {@code true} if a new element is
+     *         available via {@link #getCurrent()}, {@code false} if the stream
+     *         is exhausted or closed
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    public final Awaitable<Boolean> moveNext() {
+        Awaitable<Boolean> earlyReturn = beforeTake();
+        if (earlyReturn != null) {
+            return earlyReturn;
+        }
+        try {
+            Object signal = queue.take();
+
+            if (signal instanceof ValueSignal vs) {
+                current = (T) vs.value;
+                afterValueConsumed();
+                return MOVE_NEXT_TRUE;
+            }
+            if (signal instanceof ErrorSignal es) {
+                closed.set(true);
+                Throwable cause = es.error;
+                if (cause instanceof Error err) throw err;
+                throw AsyncSupport.sneakyThrow(cause);
+            }
+            // COMPLETE sentinel — end-of-stream
+            closed.set(true);
+            return MOVE_NEXT_FALSE;
+        } catch (InterruptedException e) {
+            return onMoveNextInterrupted(e);
+        } finally {
+            afterMoveNext();
+        }
+    }
+
+    // ---- Final implementations ----
+
+    /**
+     * {@inheritDoc}
+     * <p>
+     * Returns the most recently consumed element, set by the last successful
+     * {@link #moveNext()} call.
+     *
+     * @return the current element, or {@code null} before the first successful
+     *         {@code moveNext()} call
+     */
+    @Override
+    public final T getCurrent() {
+        return current;
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p>
+     * Atomically marks this stream as closed via CAS on the {@link #closed}
+     * flag, then delegates to {@link #onClose()} for subclass-specific 
cleanup.
+     * Idempotent: only the first invocation triggers {@code onClose()}.
+     */
+    @Override
+    public final void close() {
+        if (!closed.compareAndSet(false, true)) {
+            return;
+        }
+        onClose();
+    }
+
+    // ---- Abstract methods ----
+
+    /**
+     * Subclass-specific cleanup, called exactly once from {@link #close()}.
+     * Typical actions include interrupting blocked threads, cancelling
+     * upstream subscriptions, and draining the queue.
+     */
+    protected abstract void onClose();
+
+    // ---- Hook methods with defaults ----
+
+    /**
+     * Pre-take hook invoked at the start of {@link #moveNext()}.
+     * <p>
+     * Return a non-{@code null} {@link Awaitable} to short-circuit
+     * {@code moveNext()} without blocking on the queue.  Return {@code null}
+     * to proceed with the normal take-and-dispatch sequence.
+     * <p>
+     * The default implementation returns {@link #MOVE_NEXT_FALSE} when
+     * the stream is closed, and {@code null} otherwise.
+     *
+     * @return an early-return value, or {@code null} to continue
+     */
+    protected Awaitable<Boolean> beforeTake() {
+        return closed.get() ? MOVE_NEXT_FALSE : null;
+    }
+
+    /**
+     * Post-value hook invoked after a {@link ValueSignal} has been consumed
+     * and the {@link #current} field updated.
+     * <p>
+     * Subclasses may use this to signal demand to an upstream source
+     * (e.g. {@code Subscription.request(1)} for reactive streams).
+     * <p>
+     * The default implementation is a no-op.
+     */
+    protected void afterValueConsumed() { }
+
+    /**
+     * Finally hook invoked after every {@link #moveNext()} attempt,
+     * regardless of outcome (value, error, completion, or interrupt).
+     * <p>
+     * Subclasses may use this to unregister the consumer thread.
+     * <p>
+     * The default implementation is a no-op.
+     */
+    protected void afterMoveNext() { }
+
+    /**
+     * Interrupt handler invoked when {@code queue.take()} throws
+     * {@link InterruptedException} inside {@link #moveNext()}.
+     * <p>
+     * The default implementation sets {@link #closed} to {@code true},
+     * restores the interrupt flag, and throws a {@link CancellationException}.
+     * Subclasses may override to return {@link #MOVE_NEXT_FALSE} instead
+     * of throwing (e.g. when an external {@code close()} caused the 
interrupt).
+     *
+     * @param e the interrupt exception
+     * @return an {@link Awaitable} to return from {@code moveNext()}, or
+     *         the method may throw instead
+     */
+    protected Awaitable<Boolean> onMoveNextInterrupted(InterruptedException e) 
{
+        closed.set(true);
+        Thread.currentThread().interrupt();
+        throw newCancellationException("Interrupted during moveNext", e);
+    }
+
+    // ---- Utilities for subclasses ----
+
+    /**
+     * Creates a {@link CancellationException} with the given message and 
cause.
+     *
+     * @param message the detail message
+     * @param cause   the interrupt that triggered the cancellation
+     * @return a new {@code CancellationException}
+     */
+    protected static CancellationException newCancellationException(String 
message, InterruptedException cause) {
+        CancellationException ce = new CancellationException(message);
+        ce.initCause(cause);
+        return ce;
+    }
+}
diff --git 
a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java 
b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
index d5b5e36e0c..ca7928ea9d 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
@@ -18,22 +18,31 @@
  */
 package org.apache.groovy.runtime.async;
 
-import groovy.concurrent.AsyncStream;
 import groovy.concurrent.Awaitable;
 
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * A producer/consumer implementation of {@link AsyncStream} used by
- * {@code async} methods that contain {@code yield return} statements.
+ * A producer/consumer implementation of {@link groovy.concurrent.AsyncStream 
AsyncStream}
+ * backed by a {@link SynchronousQueue}, used by {@code async} methods that
+ * contain {@code yield return} statements.
  * <p>
- * The producer (method body) runs on a separate thread and calls
- * {@link #yield(Object)} for each emitted element. The consumer
- * calls {@link #moveNext()}/{@link #getCurrent()} — typically via
- * a {@code for await} loop.
+ * Extends {@link AbstractAsyncStream} which provides the common
+ * {@link #moveNext()}/{@link #getCurrent()}/{@link #close()} template.
+ * This class adds the producer-side API ({@link #yield}, {@link #complete},
+ * {@link #error}) and overrides the following hooks:
+ *
+ * <ul>
+ *   <li>{@link #beforeTake()} — enforces single-consumer semantics via
+ *       {@link #consumerThread} CAS + double-check of the {@link #closed} 
flag</li>
+ *   <li>{@link #afterMoveNext()} — unregisters the consumer thread</li>
+ *   <li>{@link #onMoveNextInterrupted(InterruptedException)} — returns
+ *       {@code MOVE_NEXT_FALSE} if the stream was already closed (cooperative
+ *       cancellation), otherwise throws {@link CancellationException}</li>
+ *   <li>{@link #onClose()} — interrupts both producer and consumer 
threads</li>
+ * </ul>
  *
  * <h2>Back-pressure</h2>
  * Uses a {@link SynchronousQueue} to provide natural back-pressure:
@@ -83,22 +92,14 @@ import java.util.concurrent.atomic.AtomicReference;
  * @param <T> the element type
  * @since 6.0.0
  */
-public class AsyncStreamGenerator<T> implements AsyncStream<T> {
-
-    private static final Object DONE = new Object();
-
-    /**
-     * Cached awaitables for the two common {@code moveNext()} outcomes.
-     * Eliminates per-call object allocation on the hot path.
-     */
-    private static final Awaitable<Boolean> MOVE_NEXT_TRUE = 
Awaitable.of(Boolean.TRUE);
-    private static final Awaitable<Boolean> MOVE_NEXT_FALSE = 
Awaitable.of(Boolean.FALSE);
+public class AsyncStreamGenerator<T> extends AbstractAsyncStream<T> {
 
-    private final SynchronousQueue<Object> queue = new SynchronousQueue<>();
-    private final AtomicBoolean closed = new AtomicBoolean(false);
     private final AtomicReference<Thread> producerThread = new 
AtomicReference<>();
     private final AtomicReference<Thread> consumerThread = new 
AtomicReference<>();
-    private volatile T current;
+
+    public AsyncStreamGenerator() {
+        super(new SynchronousQueue<>());
+    }
 
     /**
      * Registers the given thread as the producer for this stream.
@@ -139,13 +140,13 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
             throw streamClosed(null);
         }
         try {
-            queue.put(new Item(value));
+            queue.put(new ValueSignal(value));
         } catch (InterruptedException e) {
             if (closed.get()) {
                 throw streamClosed(e);
             }
             Thread.currentThread().interrupt();
-            throw interrupted("Interrupted during yield", e);
+            throw newCancellationException("Interrupted during yield", e);
         }
     }
 
@@ -160,22 +161,7 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
      * thread leak occurs even under unexpected interrupt timing.
      */
     public void complete() {
-        if (closed.get()) {
-            return;
-        }
-        try {
-            queue.put(DONE);
-        } catch (InterruptedException e) {
-            if (!closed.get()) {
-                Thread.currentThread().interrupt();
-                // Best-effort: non-blocking handoff to a waiting consumer.
-                // If no consumer is waiting, offer() returns false and the 
DONE
-                // signal is lost — force-close to unblock future moveNext() 
calls.
-                if (!queue.offer(DONE)) {
-                    close();
-                }
-            }
-        }
+        putTerminalSignal(COMPLETE);
     }
 
     /**
@@ -189,135 +175,90 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
      * because the interrupt itself indicates an external cancellation.
      */
     public void error(Throwable t) {
+        putTerminalSignal(new ErrorSignal(t != null ? t : new 
NullPointerException("null error in generator")));
+    }
+
+    /**
+     * Shared logic for {@link #complete()} and {@link #error(Throwable)}:
+     * attempts a blocking {@code put()}, falling back to non-blocking
+     * {@code offer()} on interrupt, and force-closing if both fail.
+     */
+    private void putTerminalSignal(Object signal) {
         if (closed.get()) {
             return;
         }
-        ErrorItem item = new ErrorItem(t != null ? t : new 
NullPointerException("null error in generator"));
         try {
-            queue.put(item);
+            queue.put(signal);
         } catch (InterruptedException e) {
             if (!closed.get()) {
                 Thread.currentThread().interrupt();
-                if (!queue.offer(item)) {
+                if (!queue.offer(signal)) {
                     close();
                 }
             }
         }
     }
 
+    // ---- Template method overrides ----
+
     /**
-     * {@inheritDoc}
-     * <p>
-     * Blocks the calling (consumer) thread on the {@link SynchronousQueue} 
until
-     * the producer offers the next element, a completion sentinel, or an 
error.
-     * If the stream has been {@linkplain #close() closed}, returns
-     * {@code Awaitable.of(false)} immediately without blocking.
-     * <p>
-     * The consumer thread is registered via {@code consumerThread} during the
-     * blocking call so that {@link #close()} can interrupt it if needed.
-     * <p>
-     * <b>Single-consumer invariant</b>: only one thread may call
-     * {@code moveNext()} at a time.  A {@code compareAndSet(null, current)}
-     * guard enforces this — a concurrent second caller receives an
-     * {@link IllegalStateException} immediately instead of silently corrupting
-     * the producer/consumer handshake.
-     * <p>
-     * A <em>double-check</em> of the {@link #closed} flag is performed after
-     * registration to close the TOCTOU race window: if {@code close()} 
executes
-     * between the initial {@code closed.get()} check and the
-     * {@code consumerThread} CAS, the consumer reference would not yet be
-     * visible to {@code close()}, so no interrupt would be delivered, and
-     * {@code queue.take()} would block indefinitely.  The re-check after
-     * registration detects this case and returns immediately.
-     *
-     * @return an {@code Awaitable<Boolean>} that resolves to {@code true} if a
-     *         new element is available via {@link #getCurrent()}, or {@code 
false}
-     *         if the stream is exhausted or closed
+     * Enforces single-consumer semantics via CAS on {@link #consumerThread},
+     * with a double-check of the {@link #closed} flag to close the TOCTOU
+     * race window (see class-level javadoc).
      */
     @Override
-    @SuppressWarnings("unchecked")
-    public Awaitable<Boolean> moveNext() {
+    protected Awaitable<Boolean> beforeTake() {
         if (closed.get()) {
             return MOVE_NEXT_FALSE;
         }
-        // Enforce single-consumer semantics: only one thread may call 
moveNext()
-        // at a time.  A concurrent second caller would overwrite 
consumerThread,
-        // breaking close()'s interrupt targeting and causing data races on
-        // queue.take().  CAS guards this invariant at the cost of one atomic 
op.
-        Thread currentThread = Thread.currentThread();
-        if (!consumerThread.compareAndSet(null, currentThread)) {
+        Thread ct = Thread.currentThread();
+        if (!consumerThread.compareAndSet(null, ct)) {
             Thread existing = consumerThread.get();
-            if (existing != currentThread) {
+            if (existing != ct) {
                 throw new IllegalStateException(
                         "AsyncStream does not support concurrent consumers. "
                         + "Current consumer: " + existing);
             }
         }
-        // Double-check after registration: if close() raced between the first
-        // closed check and consumerThread CAS, the consumer reference was not
-        // yet visible to close(), so no interrupt was delivered.  Without this
-        // re-check the consumer would block in queue.take() indefinitely.
+        // Double-check: if close() raced between the first closed check and
+        // consumerThread CAS, the consumer reference was not yet visible to
+        // close(), so no interrupt was delivered.
         if (closed.get()) {
-            consumerThread.compareAndSet(currentThread, null);
+            consumerThread.compareAndSet(ct, null);
             return MOVE_NEXT_FALSE;
         }
-        try {
-            Object next = queue.take();
-            if (next == DONE) {
-                closed.set(true);
-                return MOVE_NEXT_FALSE;
-            }
-            if (next instanceof ErrorItem ei) {
-                closed.set(true);
-                Throwable cause = ei.error;
-                if (cause instanceof Error err) throw err;
-                throw AsyncSupport.sneakyThrow(cause);
-            }
-            current = (T) ((Item) next).value;
-            return MOVE_NEXT_TRUE;
-        } catch (InterruptedException e) {
-            if (closed.get()) {
-                return MOVE_NEXT_FALSE;
-            }
-            Thread.currentThread().interrupt();
-            throw interrupted("Interrupted during moveNext", e);
-        } finally {
-            consumerThread.compareAndSet(currentThread, null);
-        }
+        return null;
     }
 
     /**
-     * {@inheritDoc}
-     * <p>
-     * Returns the most recently consumed element. The value is updated each 
time
-     * {@link #moveNext()} returns {@code true}.
-     *
-     * @return the current element, or {@code null} before the first successful
-     *         {@code moveNext()} call
+     * Unregisters the consumer thread after every {@link #moveNext()} attempt.
      */
     @Override
-    public T getCurrent() {
-        return current;
+    protected void afterMoveNext() {
+        consumerThread.compareAndSet(Thread.currentThread(), null);
     }
 
     /**
-     * {@inheritDoc}
-     * <p>
-     * Atomically marks this stream as closed and interrupts any producer or
-     * consumer thread that is currently blocked on the {@link 
SynchronousQueue}.
-     * The interrupted threads detect the {@link #closed} flag and exit
-     * gracefully (see the class-level javadoc for details).
-     * <p>
-     * This method is idempotent: only the first invocation performs the
-     * interrupt; subsequent calls are no-ops.  A thread calling {@code 
close()}
-     * on itself (e.g. the consumer calling close inside a {@code for await}
-     * body) is never self-interrupted.
+     * If the stream was already closed (cooperative cancellation), returns
+     * {@code MOVE_NEXT_FALSE}; otherwise restores the interrupt flag and
+     * throws {@link CancellationException}.
      */
     @Override
-    public void close() {
-        if (!closed.compareAndSet(false, true)) {
-            return;
+    protected Awaitable<Boolean> onMoveNextInterrupted(InterruptedException e) 
{
+        if (closed.get()) {
+            return MOVE_NEXT_FALSE;
         }
+        Thread.currentThread().interrupt();
+        throw newCancellationException("Interrupted during moveNext", e);
+    }
+
+    /**
+     * Interrupts both producer and consumer threads (if any) to unblock
+     * pending {@link SynchronousQueue} operations.  A thread is never
+     * self-interrupted.
+     */
+    @Override
+    protected void onClose() {
         Thread producer = producerThread.getAndSet(null);
         if (producer != null && producer != Thread.currentThread()) {
             producer.interrupt();
@@ -328,12 +269,6 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
         }
     }
 
-    private static CancellationException interrupted(String message, 
InterruptedException cause) {
-        CancellationException ce = new CancellationException(message);
-        ce.initCause(cause);
-        return ce;
-    }
-
     private static CancellationException streamClosed(InterruptedException 
cause) {
         CancellationException ce = new CancellationException("Async stream was 
closed");
         if (cause != null) {
@@ -341,8 +276,4 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
         }
         return ce;
     }
-
-    // Wrapper to handle null values in the queue
-    private record Item(Object value) { }
-    private record ErrorItem(Throwable error) { }
 }
diff --git 
a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java 
b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
index cf15cb91a6..3c56af6bc1 100644
--- a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
+++ b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
@@ -68,8 +68,8 @@ import java.util.concurrent.atomic.AtomicReference;
  *       blocking {@code put()} with a non-blocking {@code offer()} fallback
  *       when the publisher thread is interrupted — this prevents both silent
  *       item loss and consumer deadlock even under unexpected interrupts</li>
- *   <li>Terminal callbacks atomically close the upstream side
- *       ({@code closedRef}) and clear/cancel the stored subscription.
+ *   <li>Terminal callbacks atomically close the stream via a single
+ *       {@link AtomicBoolean} flag and clear/cancel the stored subscription.
  *       This makes post-terminal {@code onNext} calls from non-compliant
  *       publishers harmless and releases resources promptly.</li>
  *   <li>Back-pressure is enforced by requesting exactly one item after
@@ -84,21 +84,6 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class FlowPublisherAdapter implements AwaitableAdapter {
 
-    /**
-     * Queue capacity for the push→pull bridge in
-     * {@link #publisherToAsyncStream}.  256 provides a generous buffer
-     * for bursty publishers while bounding memory.
-     */
-    private static final int QUEUE_CAPACITY = 256;
-
-    /**
-     * Cached awaitables for the two common {@code moveNext()} outcomes.
-     * Eliminates per-call {@link CompletableFuture} + {@link GroovyPromise}
-     * allocation on the hot path (every element and stream-end).
-     */
-    private static final Awaitable<Boolean> MOVE_NEXT_TRUE = 
Awaitable.of(Boolean.TRUE);
-    private static final Awaitable<Boolean> MOVE_NEXT_FALSE = 
Awaitable.of(Boolean.FALSE);
-
     /**
      * Returns {@code true} if the given type is assignable to
      * {@link Flow.Publisher}, enabling single-value {@code await}.
@@ -226,18 +211,6 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
 
     // ---- Multi-value adaptation (for await publisher) ----
 
-    // Signal wrapper types allow us to distinguish values, errors, and
-    // completion in a single queue without type confusion.
-
-    private record ValueSignal<T>(T value) {
-    }
-
-    private record ErrorSignal(Throwable error) {
-    }
-
-    /** Singleton sentinel for stream completion. */
-    private static final Object COMPLETE_SENTINEL = new Object();
-
     /**
      * Wraps a {@link Flow.Publisher} into an {@link AsyncStream},
      * providing a pull-based iteration interface over a push-based source.
@@ -249,12 +222,6 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
      * current one — this prevents livelock when producer and consumer
      * share the same thread pool.</p>
      *
-     * <p>The internal bounded queue (capacity {@value QUEUE_CAPACITY})
-     * absorbs minor timing jitter between producer and consumer.  Signals
-     * use blocking {@code put()} for normal delivery with a non-blocking
-     * {@code offer()} fallback when the publisher thread is interrupted —
-     * ensuring no items or terminal events are silently dropped.</p>
-     *
      * <p><b>Resource management:</b> When the consumer calls
      * {@link AsyncStream#close()} (e.g. via {@code break} in a
      * {@code for await} loop), the upstream subscription is cancelled
@@ -265,182 +232,154 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
      * @param <T>       the element type
      * @return an async stream that yields publisher items
      */
-    @SuppressWarnings("unchecked")
     private <T> AsyncStream<T> publisherToAsyncStream(Flow.Publisher<T> 
publisher) {
-        LinkedBlockingQueue<Object> queue = new 
LinkedBlockingQueue<>(QUEUE_CAPACITY);
-        AtomicReference<Flow.Subscription> subRef = new AtomicReference<>();
-        // Tracks whether the stream has been closed (by consumer or terminal 
signal).
-        // CAS ensures exactly-once semantics for the close/cleanup path.
-        AtomicBoolean closedRef = new AtomicBoolean(false);
-        AtomicBoolean streamClosed = new AtomicBoolean(false);
+        FlowAsyncStream<T> stream = new FlowAsyncStream<>();
+        publisher.subscribe(stream.newSubscriber());
+        return stream;
+    }
 
-        publisher.subscribe(new Flow.Subscriber<T>() {
-            @Override
-            public void onSubscribe(Flow.Subscription s) {
-                // §2.5: reject duplicate subscriptions
-                if (!subRef.compareAndSet(null, s)) {
-                    s.cancel();
-                    return;
-                }
-                // Double-check pattern: if close() raced between the CAS and 
this point,
-                // the subscription must be cancelled immediately to avoid a 
dangling stream.
-                if (closedRef.get()) {
-                    Flow.Subscription sub = subRef.getAndSet(null);
-                    if (sub != null) sub.cancel();
-                    return;
+    /**
+     * Named implementation of {@link AsyncStream} that extends
+     * {@link AbstractAsyncStream} to bridge a push-based
+     * {@link Flow.Publisher} into a pull-based async stream.
+     *
+     * <p>Inherits the common {@link AbstractAsyncStream#moveNext() 
moveNext()}/
+     * {@link AbstractAsyncStream#getCurrent() getCurrent()}/
+     * {@link AbstractAsyncStream#close() close()} template and overrides:</p>
+     * <ul>
+     *   <li>{@link #beforeTake()} — drains remaining signals before returning
+     *       false (checks {@code closed && queue.isEmpty()})</li>
+     *   <li>{@link #afterValueConsumed()} — signals back-pressure demand via
+     *       {@code Subscription.request(1)}</li>
+     *   <li>{@link #onClose()} — cancels the upstream subscription, drains
+     *       the queue, and injects a {@link #COMPLETE} sentinel to unblock
+     *       any pending {@code moveNext()}</li>
+     * </ul>
+     *
+     * <p>The internal bounded queue (capacity {@value QUEUE_CAPACITY})
+     * absorbs minor timing jitter.  Signals use blocking {@code put()}
+     * for normal delivery with a non-blocking {@code offer()} fallback
+     * when the publisher thread is interrupted — ensuring no items or
+     * terminal events are silently dropped.</p>
+     *
+     * @param <T> the element type
+     */
+    private static final class FlowAsyncStream<T> extends 
AbstractAsyncStream<T> {
+
+        /**
+         * Queue capacity for the push→pull bridge.  With one-at-a-time
+         * demand ({@code request(1)} per consumed element), a well-behaved
+         * publisher will never enqueue more than one value at a time.
+         * A capacity of 2 accommodates the value + a racing terminal
+         * signal without blocking, while keeping memory minimal.
+         */
+        private static final int QUEUE_CAPACITY = 2;
+
+        private final AtomicReference<Flow.Subscription> subRef = new 
AtomicReference<>();
+
+        FlowAsyncStream() {
+            super(new LinkedBlockingQueue<>(QUEUE_CAPACITY));
+        }
+
+        /**
+         * Creates a new {@link Flow.Subscriber} wired to this stream's
+         * internal queue and lifecycle state.
+         */
+        Flow.Subscriber<T> newSubscriber() {
+            return new Flow.Subscriber<>() {
+                @Override
+                public void onSubscribe(Flow.Subscription s) {
+                    // §2.5: reject duplicate subscriptions
+                    if (!subRef.compareAndSet(null, s)) {
+                        s.cancel();
+                        return;
+                    }
+                    // Double-check: if close() raced before subscription was 
set,
+                    // cancel immediately to avoid a dangling upstream.
+                    if (closed.get()) {
+                        cancelSubscription();
+                        return;
+                    }
+                    s.request(1);
                 }
-                s.request(1);
-            }
 
-            @Override
-            public void onNext(T item) {
-                // §2.13: null items are spec violations
-                if (item == null) {
-                    onError(new NullPointerException(
-                            "Flow.Publisher onNext received null (Reactive 
Streams §2.13)"));
-                    return;
-                }
-                if (!closedRef.get()) {
+                @Override
+                public void onNext(T item) {
+                    // §2.13: null items are spec violations
+                    if (item == null) {
+                        onError(new NullPointerException(
+                                "Flow.Publisher onNext received null (Reactive 
Streams §2.13)"));
+                        return;
+                    }
+                    if (closed.get()) return;
                     try {
-                        // Blocking put() guarantees the item reaches the 
consumer.
-                        // Since demand is capped at 1 (one request(1) per 
moveNext),
-                        // a well-behaved publisher will never overflow the 
queue; put()
-                        // still protects against misbehaving publishers by 
blocking
-                        // rather than silently dropping the value.
-                        queue.put(new ValueSignal<>(item));
+                        queue.put(new ValueSignal(item));
                     } catch (InterruptedException ie) {
                         Thread.currentThread().interrupt();
-                        // Blocking put() was interrupted. Fall back to 
non-blocking
-                        // offer() so the item still reaches the consumer. With
-                        // one-at-a-time demand the queue is almost never 
full, so
-                        // offer() effectively always succeeds. If it doesn't
-                        // (misbehaving publisher overfilling the queue), 
cancel
-                        // upstream and inject an error signal to terminate the
-                        // consumer cleanly instead of silently dropping the 
item.
-                        if (!queue.offer(new ValueSignal<>(item))) {
-                            Flow.Subscription sub = subRef.getAndSet(null);
-                            if (sub != null) sub.cancel();
-                            closedRef.set(true);
+                        if (!queue.offer(new ValueSignal(item))) {
+                            cancelSubscription();
+                            closed.set(true);
                             queue.offer(new ErrorSignal(
                                     new CancellationException(
                                             "Item delivery interrupted and 
queue full")));
                         }
                     }
                 }
-            }
 
-            @Override
-            public void onError(Throwable t) {
-                // First terminal signal wins. Ignore duplicate terminal 
callbacks.
-                if (!closedRef.compareAndSet(false, true)) {
-                    return;
-                }
-                // Cancel subscription eagerly to release upstream resources
-                Flow.Subscription sub = subRef.getAndSet(null);
-                if (sub != null) sub.cancel();
-                try {
-                    // Terminal signals use blocking put() to guarantee 
delivery —
-                    // the consumer MUST see the error to propagate it 
correctly.
-                    queue.put(new ErrorSignal(t));
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                    // Blocking put() was interrupted. Fall back to 
non-blocking
-                    // offer(). If that also fails (queue full), set 
streamClosed
-                    // so the consumer's next moveNext() returns false instead 
of
-                    // blocking indefinitely.
-                    if (!queue.offer(new ErrorSignal(t))) {
-                        streamClosed.set(true);
-                    }
+                @Override
+                public void onError(Throwable t) {
+                    putTerminalSignal(new ErrorSignal(t));
                 }
-            }
 
-            @Override
-            public void onComplete() {
-                // First terminal signal wins. Ignore duplicate terminal 
callbacks.
-                if (!closedRef.compareAndSet(false, true)) {
-                    return;
-                }
-                // Clear subscription consistently with other terminal paths.
-                Flow.Subscription sub = subRef.getAndSet(null);
-                if (sub != null) sub.cancel();
-                try {
-                    // Blocking put() guarantees the consumer will see the 
sentinel,
-                    // even if the queue was temporarily full from buffered 
values.
-                    queue.put(COMPLETE_SENTINEL);
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                    // Same fallback as onError: try non-blocking offer,
-                    // set streamClosed if that also fails.
-                    if (!queue.offer(COMPLETE_SENTINEL)) {
-                        streamClosed.set(true);
-                    }
-                }
-            }
-        });
-
-        return new AsyncStream<T>() {
-            private T current;
-
-            @Override
-            public Awaitable<Boolean> moveNext() {
-                if (streamClosed.get()) {
-                    return MOVE_NEXT_FALSE;
+                @Override
+                public void onComplete() {
+                    putTerminalSignal(COMPLETE);
                 }
 
-                try {
-                    Object signal = queue.take();
-
-                    if (signal instanceof ValueSignal) {
-                        current = ((ValueSignal<T>) signal).value;
-                        // Signal demand BEFORE returning so the publisher can
-                        // begin producing the next value while the consumer
-                        // processes this one — prevents livelock when both
-                        // share a thread pool.
-                        Flow.Subscription sub = subRef.get();
-                        if (sub != null) sub.request(1);
-                        return MOVE_NEXT_TRUE;
-                    } else if (signal instanceof ErrorSignal es) {
-                        streamClosed.set(true);
-                        // Throw directly (matching AsyncStreamGenerator) to
-                        // avoid unnecessary CF allocation on the error path
-                        // and JDK 23+ CompletableFuture.get() wrapping issues.
-                        Throwable cause = es.error;
-                        if (cause instanceof Error err) throw err;
-                        throw AsyncSupport.sneakyThrow(cause);
-                    } else {
-                        // COMPLETE_SENTINEL — end-of-stream
-                        streamClosed.set(true);
-                        return MOVE_NEXT_FALSE;
+                /**
+                 * Shared logic for {@code onError()} and {@code onComplete()}:
+                 * atomically marks the upstream as closed, cancels the 
subscription,
+                 * and delivers the terminal signal to the consumer queue.
+                 */
+                private void putTerminalSignal(Object signal) {
+                    if (!closed.compareAndSet(false, true)) return;
+                    cancelSubscription();
+                    try {
+                        queue.put(signal);
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        queue.offer(signal);
                     }
-                } catch (InterruptedException ie) {
-                    streamClosed.set(true);
-                    Thread.currentThread().interrupt();
-                    CancellationException ce = new 
CancellationException("Interrupted during moveNext");
-                    ce.initCause(ie);
-                    throw ce;
                 }
-            }
-
-            @Override
-            public T getCurrent() {
-                return current;
-            }
-
-            @Override
-            public void close() {
-                if (streamClosed.compareAndSet(false, true)) {
-                    closedRef.set(true);
-                    Flow.Subscription sub = subRef.getAndSet(null);
-                    if (sub != null) sub.cancel();
-                    // Drain the queue and inject a sentinel to unblock a
-                    // concurrent moveNext() that may be blocked in take().
-                    // offer() is non-blocking and cannot throw 
InterruptedException;
-                    // after clear(), the queue is empty (capacity 256) so 
offer()
-                    // effectively always succeeds.
-                    queue.clear();
-                    queue.offer(COMPLETE_SENTINEL);
-                }
-            }
-        };
+            };
+        }
+
+        @Override
+        protected Awaitable<Boolean> beforeTake() {
+            return (closed.get() && queue.isEmpty()) ? MOVE_NEXT_FALSE : null;
+        }
+
+        @Override
+        protected void afterValueConsumed() {
+            // Signal demand BEFORE returning so the publisher can begin
+            // producing the next value while the consumer processes this
+            // one — prevents livelock when both share a thread pool.
+            Flow.Subscription sub = subRef.get();
+            if (sub != null) sub.request(1);
+        }
+
+        @Override
+        protected void onClose() {
+            cancelSubscription();
+            // Drain the queue and inject a sentinel to unblock a
+            // concurrent moveNext() that may be blocked in take().
+            queue.clear();
+            queue.offer(COMPLETE);
+        }
+
+        private void cancelSubscription() {
+            Flow.Subscription sub = subRef.getAndSet(null);
+            if (sub != null) sub.cancel();
+        }
     }
 }
diff --git a/src/spec/doc/core-async-await.adoc 
b/src/spec/doc/core-async-await.adoc
index d0ebd0d859..ef3a3fb9f9 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -944,21 +944,28 @@ to `AsyncSupport` static methods. The `@Async` annotation 
is processed by `Async
 which delegates to the same `AsyncTransformHelper` utility class. This shared 
helper ensures
 consistent code generation for both the keyword and annotation forms.
 
-**Runtime layer** (`AsyncSupport`, `GroovyPromise`, `AsyncStreamGenerator`,
-`AwaitableAdapterRegistry`): `AsyncSupport` is the central runtime class 
containing static
-methods for `await`, `async`, `defer`, `yield return`, timeout composition, 
stream cleanup,
-and combinators. `GroovyPromise` wraps
+**Runtime layer** (`AsyncSupport`, `GroovyPromise`, `AbstractAsyncStream`,
+`AsyncStreamGenerator`, `FlowPublisherAdapter`, `AwaitableAdapterRegistry`): 
`AsyncSupport` is the
+central runtime class containing static methods for `await`, `async`, `defer`, 
`yield return`,
+timeout composition, stream cleanup, and combinators. `GroovyPromise` wraps
 `CompletableFuture` to implement the `Awaitable` interface, decoupling the 
public API from the
-JDK implementation. `AsyncStreamGenerator` implements the producer/consumer 
pattern for async
-generators using a `SynchronousQueue`. `AwaitableAdapterRegistry` provides the 
SPI extension
-point for third-party async type support.
+JDK implementation. `AbstractAsyncStream` is the template base class for 
queue-based `AsyncStream`
+implementations — it centralises the `moveNext()` signal dispatch, 
`getCurrent()`/`close()`
+lifecycle, and interrupt handling, using the Template Method pattern with hook 
methods
+(`beforeTake`, `afterValueConsumed`, `afterMoveNext`, `onMoveNextInterrupted`, 
`onClose`).
+`AsyncStreamGenerator` extends the template for generator-style streams backed 
by a
+`SynchronousQueue`. `FlowPublisherAdapter.FlowAsyncStream` extends the 
template for
+`Flow.Publisher` adaptation using a bounded `LinkedBlockingQueue` with 
one-at-a-time
+demand. `AwaitableAdapterRegistry` provides the SPI extension point for 
third-party async type
+support.
 
 ==== API Decoupling
 
 The public API that Groovy developers interact with is defined entirely in the
 `groovy.concurrent` package (`Awaitable`, `AsyncStream`, `AwaitResult`,
 `AwaitableAdapter`, `AwaitableAdapterRegistry`). The implementation classes
-(`AsyncSupport`, `GroovyPromise`, `AsyncStreamGenerator`) live in
+(`AsyncSupport`, `GroovyPromise`, `AbstractAsyncStream`, 
`AsyncStreamGenerator`,
+`FlowPublisherAdapter`) live in
 `org.apache.groovy.runtime.async` — an internal package.
 
 This separation means:
@@ -987,23 +994,29 @@ All runtime components employ lock-free or 
minimal-contention synchronization:
 * `AwaitableAdapterRegistry.ADAPTERS` — `CopyOnWriteArrayList` for lock-free 
iteration during adapter lookup;
 writes (adapter registration) are rare, reads (every `await`) are frequent
 * `AwaitableAdapterRegistry.blockingExecutor` — `volatile` field
-* `AsyncStreamGenerator.current` — `volatile` field for cross-thread 
producer/consumer visibility
-* `AsyncStreamGenerator` close state — `AtomicBoolean` + 
`AtomicReference<Thread>` for prompt,
+* `AbstractAsyncStream.current` — `volatile` field for cross-thread visibility
+* `AbstractAsyncStream.closed` — `AtomicBoolean` for lifecycle management 
(shared by all queue-based streams)
+* `AsyncStreamGenerator` — extends `AbstractAsyncStream`; adds 
`AtomicReference<Thread>` for prompt,
 idempotent close/cancellation signalling
-* `Flow.Publisher` adaptation (`FlowPublisherAdapter`) — 
`AtomicReference<Subscription>` with CAS-guarded
-  onSubscribe (§2.5 compliance), `AtomicBoolean` done-guard for single-value 
path, and close-aware
-  queue signalling; all signals (`onNext`/`onError`/`onComplete`) use blocking 
`put()` with a
-  non-blocking `offer()` fallback on interrupt, preventing both silent item 
loss and consumer deadlock;
-  demand is signalled before `moveNext()` returns to prevent livelock; 
`moveNext()` uses cached
-  `Awaitable` instances to eliminate per-call allocations on the hot path
+* `Flow.Publisher` adaptation (`FlowPublisherAdapter.FlowAsyncStream`) — 
extends `AbstractAsyncStream`;
+  the inherited `AtomicBoolean` closed flag governs the entire lifecycle (both 
upstream terminal signals
+  and consumer-side close), with `AtomicReference<Subscription>` for 
CAS-guarded onSubscribe (§2.5
+  compliance); all signals (`onNext`/`onError`/`onComplete`) use blocking 
`put()` with a non-blocking
+  `offer()` fallback on interrupt, preventing both silent item loss and 
consumer deadlock; demand is
+  signalled before `moveNext()` returns to prevent livelock; `moveNext()` uses 
shared cached `Awaitable`
+  instances
+  (defined on the `AsyncStream` interface) to eliminate per-call allocations 
on the hot path
 * Defer scopes — per-task `ArrayDeque`, confined to a single thread (no 
sharing)
 * `DELAY_SCHEDULER` — single daemon thread for non-blocking timer operations
 
 ==== Async Generator Safety
 
-The `AsyncStreamGenerator` implements a producer/consumer pattern where a 
producer thread
-(the generator body) yields values and a consumer thread (the `for await` 
loop) pulls them.
-Several concurrency hazards are handled transparently:
+The `AsyncStreamGenerator` extends `AbstractAsyncStream` — a template base 
class that
+centralises the `moveNext()` signal dispatch, lifecycle management, and 
interrupt handling
+common to all queue-based `AsyncStream` implementations.  The generator adds 
the producer-side
+API (`yield`, `complete`, `error`) and overrides four template hooks 
(`beforeTake`,
+`afterMoveNext`, `onMoveNextInterrupted`, `onClose`).  Several concurrency 
hazards are
+handled transparently:
 
 [cols="2,3"]
 |===
@@ -1044,29 +1057,36 @@ measure against unexpected thread interruption outside 
the normal close path.
 ==== Flow.Publisher Adaptation Safety
 
 The `FlowPublisherAdapter` bridges the push-based `Flow.Publisher` protocol 
into the pull-based
-`AsyncStream` model.  Several concurrency hazards are handled transparently:
+`AsyncStream` model via a named inner class `FlowAsyncStream` that also extends
+`AbstractAsyncStream`.  It overrides three template hooks (`beforeTake`, 
`afterValueConsumed`,
+`onClose`) and inherits the default `onMoveNextInterrupted` behaviour.  A 
single `AtomicBoolean`
+closed flag (inherited from the template) governs the entire lifecycle — set 
by the first terminal
+signal, by the consumer's `close()`, or by an interrupt.  Several concurrency 
hazards are handled
+transparently:
 
 [cols="2,3"]
 |===
 | Concern | Mechanism
 
 | **Back-pressure**
-| A bounded `LinkedBlockingQueue` (capacity 256) bridges push and pull.  
Demand is capped at
-one item per `moveNext()` call via `request(1)`.  Demand is signalled _before_ 
`moveNext()`
-returns, preventing livelock when producer and consumer share a thread pool.
+| A bounded `LinkedBlockingQueue` (capacity 2) bridges push and pull.  Demand 
is capped at
+one item per `moveNext()` call via `request(1)`.  The small capacity reflects 
the one-at-a-time
+demand model: at most one value plus a racing terminal signal.  Demand is 
signalled _before_
+`moveNext()` returns, preventing livelock when producer and consumer share a 
thread pool.
 
 | **Interrupt-safe signal delivery**
 | All subscriber callbacks (`onNext`, `onError`, `onComplete`) use blocking 
`put()` for normal
 delivery, with a non-blocking `offer()` fallback when the publisher thread is 
interrupted.
 If `offer()` also fails (queue full from a misbehaving publisher), `onNext` 
cancels the
-upstream subscription and injects an error signal; `onError`/`onComplete` set 
the `streamClosed`
-flag so the consumer's next `moveNext()` exits immediately.
+upstream subscription and injects an error signal; terminal signals 
(`onError`/`onComplete`)
+share a common `putTerminalSignal()` helper that atomically CAS-closes the 
stream, cancels
+the subscription, and delivers the signal.
 
 | **Allocation-free hot path**
-| `moveNext()` returns cached `Awaitable<Boolean>` instances for the value and 
end-of-stream
-cases, eliminating per-call `CompletableFuture` + `GroovyPromise` allocation.  
Error signals
-are thrown directly (matching `AsyncStreamGenerator` behaviour) rather than 
wrapped in a
-failed `CompletableFuture`.
+| `moveNext()` returns shared cached `Awaitable<Boolean>` constants defined on 
the `AsyncStream`
+interface for the value and end-of-stream cases, eliminating per-call 
`CompletableFuture` +
+`GroovyPromise` allocation.  Error signals are thrown directly (matching 
`AsyncStreamGenerator`
+behaviour) rather than wrapped in a failed `CompletableFuture`.
 
 | **Non-blocking close**
 | `close()` uses non-blocking `offer()` (instead of blocking `put()`) to 
inject the completion
diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
index 4d2bec01f4..fc76e4c017 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
@@ -2860,4 +2860,357 @@ class AsyncApiTest {
         Awaitable<String> aw = Awaitable.from(stage)
         assert aw.get(2, TimeUnit.SECONDS) == "ASYNC-VALUE"
     }
+
+    // ================================================================
+    // Shared MOVE_NEXT constants (optimization validation)
+    // ================================================================
+
+    @Test
+    void testAsyncStreamMoveNextConstantsAreShared() {
+        // Both AsyncStreamGenerator and FlowPublisherAdapter should use the
+        // same MOVE_NEXT_TRUE/MOVE_NEXT_FALSE constants from AsyncStream
+        def gen = new AsyncStreamGenerator<String>()
+        Thread.start {
+            gen.yield('a')
+            gen.complete()
+        }
+        def trueResult = gen.moveNext()
+        assert trueResult.get() == true
+        assert trueResult.is(AsyncStream.MOVE_NEXT_TRUE)
+
+        def falseResult = gen.moveNext()
+        assert falseResult.get() == false
+        assert falseResult.is(AsyncStream.MOVE_NEXT_FALSE)
+    }
+
+    @Test
+    void testAsyncStreamEmptyUsesMoveNextFalse() {
+        def empty = AsyncStream.empty()
+        def result = empty.moveNext()
+        assert result.get() == false
+        assert result.is(AsyncStream.MOVE_NEXT_FALSE)
+    }
+
+    @Test
+    void testFlowPublisherAsyncStreamUsesMoveNextConstants() {
+        def pub = new SubmissionPublisher<String>()
+        def stream = AsyncStream.from(pub)
+        Thread.start {
+            Thread.sleep(50)
+            pub.submit('hello')
+            pub.close()
+        }
+        def trueResult = stream.moveNext()
+        assert trueResult.get() == true
+        assert trueResult.is(AsyncStream.MOVE_NEXT_TRUE)
+        assert stream.getCurrent() == 'hello'
+
+        def falseResult = stream.moveNext()
+        assert falseResult.get() == false
+        assert falseResult.is(AsyncStream.MOVE_NEXT_FALSE)
+    }
+
+    // ================================================================
+    // FlowAsyncStream: single-flag lifecycle (consolidation validation)
+    // ================================================================
+
+    @Test
+    void testFlowAsyncStreamCloseIdempotent() {
+        def pub = new SubmissionPublisher<String>()
+        def stream = AsyncStream.from(pub)
+        // close() should be idempotent
+        stream.close()
+        stream.close()
+        stream.close()
+        // moveNext after close should return false
+        assert stream.moveNext().get() == false
+    }
+
+    @Test
+    void testFlowAsyncStreamCloseUnblocksMovNext() {
+        def pub = new SubmissionPublisher<String>()
+        def stream = AsyncStream.from(pub)
+        def result = new CompletableFuture<Boolean>()
+
+        Thread.start {
+            result.complete(stream.moveNext().get())
+        }
+
+        Thread.sleep(100) // let moveNext block
+        stream.close()
+
+        assert result.get(5, TimeUnit.SECONDS) == false
+    }
+
+    @Test
+    void testFlowAsyncStreamMoveNextDrainsQueueBeforeReturningFalse() {
+        // Even after closed flag is set by terminal signal, queued values
+        // should still be consumable
+        def pub = new SubmissionPublisher<Integer>()
+        def stream = AsyncStream.from(pub)
+
+        Thread.start {
+            Thread.sleep(50)
+            pub.submit(1)
+            pub.submit(2)
+            pub.close()
+        }
+
+        def items = []
+        while (stream.moveNext().get()) {
+            items << stream.getCurrent()
+        }
+        assert items == [1, 2]
+    }
+
+    // ================================================================
+    // AsyncStreamGenerator: putTerminalSignal helper (dedup validation)
+    // ================================================================
+
+    @Test
+    void testAsyncStreamGeneratorCompleteAfterClose() {
+        def gen = new AsyncStreamGenerator<String>()
+        gen.close()
+        // complete() after close should be a no-op (via putTerminalSignal)
+        gen.complete() // should not throw
+        assert gen.moveNext().get() == false
+    }
+
+    @Test
+    void testAsyncStreamGeneratorErrorAfterClose() {
+        def gen = new AsyncStreamGenerator<String>()
+        gen.close()
+        // error() after close should be a no-op (via putTerminalSignal)
+        gen.error(new RuntimeException("ignored")) // should not throw
+        assert gen.moveNext().get() == false
+    }
+
+    @Test
+    void testAsyncStreamGeneratorPutTerminalSignalInterrupted() {
+        // Test the putTerminalSignal interrupt fallback path for both 
complete() and error()
+        def gen1 = new AsyncStreamGenerator<String>()
+        def done1 = new CompletableFuture<Boolean>()
+        Thread.start {
+            Thread.currentThread().interrupt()
+            gen1.complete()
+            done1.complete(true)
+        }
+        done1.get(5, TimeUnit.SECONDS)
+
+        def gen2 = new AsyncStreamGenerator<String>()
+        def done2 = new CompletableFuture<Boolean>()
+        Thread.start {
+            Thread.currentThread().interrupt()
+            gen2.error(new IOException('test'))
+            done2.complete(true)
+        }
+        done2.get(5, TimeUnit.SECONDS)
+    }
+
+    // ================================================================
+    // FlowAsyncStream: small queue capacity validation
+    // ================================================================
+
+    @Test
+    void testFlowAsyncStreamWithManyItems() {
+        // Verify that even with small queue capacity (2), many items
+        // can be streamed correctly with proper back-pressure
+        def pub = new SubmissionPublisher<Integer>()
+        def stream = AsyncStream.from(pub)
+        int count = 100
+
+        Thread.start {
+            Thread.sleep(50)
+            (1..count).each { pub.submit(it) }
+            pub.close()
+        }
+
+        def items = []
+        while (stream.moveNext().get()) {
+            items << stream.getCurrent()
+        }
+        assert items.size() == count
+        assert items == (1..count).toList()
+    }
+
+    // ================================================================
+    // AbstractAsyncStream: template method pattern validation
+    // ================================================================
+
+    @Test
+    void testAbstractAsyncStreamUnifiedSignalTypes() {
+        // Verify that both AsyncStreamGenerator and FlowAsyncStream use the
+        // unified signal types from AbstractAsyncStream
+        def gen = new AsyncStreamGenerator<String>()
+        CompletableFuture.runAsync {
+            gen.attachProducer(Thread.currentThread())
+            try {
+                gen.yield('hello')
+                gen.complete()
+            } finally {
+                gen.detachProducer(Thread.currentThread())
+            }
+        }
+        assert gen.moveNext().get() == true
+        assert gen.getCurrent() == 'hello'
+        assert gen.moveNext().get() == false
+
+        // Flow.Publisher path
+        def pub = new SubmissionPublisher<String>()
+        def stream = AsyncStream.from(pub)
+        Thread.start {
+            Thread.sleep(20)
+            pub.submit('world')
+            pub.close()
+        }
+        assert stream.moveNext().get() == true
+        assert stream.getCurrent() == 'world'
+        assert stream.moveNext().get() == false
+    }
+
+    @Test
+    void testAbstractAsyncStreamCloseIsIdempotent() {
+        // close() on AbstractAsyncStream subclasses should be idempotent
+        def gen = new AsyncStreamGenerator<String>()
+        gen.close()
+        gen.close()  // should not throw
+        gen.close()  // still should not throw
+        assert gen.moveNext().get() == false
+
+        def pub = new SubmissionPublisher<String>()
+        def stream = AsyncStream.from(pub)
+        stream.close()
+        stream.close()  // should not throw
+        assert stream.moveNext().get() == false
+    }
+
+    @Test
+    void testAbstractAsyncStreamGetCurrentBeforeMoveNext() {
+        // getCurrent() before any moveNext() should return null
+        def gen = new AsyncStreamGenerator<String>()
+        assert gen.getCurrent() == null
+        gen.close()
+
+        def pub = new SubmissionPublisher<String>()
+        def stream = AsyncStream.from(pub)
+        assert stream.getCurrent() == null
+        stream.close()
+    }
+
+    @Test
+    void testAbstractAsyncStreamMoveNextAfterError() {
+        // After moveNext() throws due to an error signal, subsequent calls
+        // should return false (stream is closed)
+        def gen = new AsyncStreamGenerator<Integer>()
+        CompletableFuture.runAsync {
+            gen.attachProducer(Thread.currentThread())
+            try {
+                gen.error(new IOException('test error'))
+            } finally {
+                gen.detachProducer(Thread.currentThread())
+            }
+        }
+        try {
+            gen.moveNext().get()
+            assert false : 'Should have thrown'
+        } catch (IOException e) {
+            assert e.message == 'test error'
+        }
+        // After error, stream should be closed
+        assert gen.moveNext().get() == false
+    }
+
+    @Test
+    void testAbstractAsyncStreamFlowErrorFollowedByMoveNext() {
+        // Flow.Publisher error should close the stream
+        def pub = new SubmissionPublisher<String>()
+        def stream = AsyncStream.from(pub)
+        Thread.start {
+            Thread.sleep(20)
+            pub.closeExceptionally(new RuntimeException('flow error'))
+        }
+        try {
+            stream.moveNext().get()
+            assert false : 'Should have thrown'
+        } catch (RuntimeException e) {
+            assert e.message == 'flow error'
+        }
+        // After error, stream should be closed
+        assert stream.moveNext().get() == false
+    }
+
+    @Test
+    void testAbstractAsyncStreamTemplateHooksInvocationOrder() {
+        // Verify that afterValueConsumed is called by testing Flow.Publisher
+        // back-pressure: each consumed value should trigger request(1),
+        // allowing the next item to flow
+        def pub = new SubmissionPublisher<Integer>()
+        def stream = AsyncStream.from(pub)
+        def items = []
+
+        Thread.start {
+            Thread.sleep(20)
+            // Submit items one at a time — each requires afterValueConsumed
+            // to call request(1) for the next to flow
+            (1..5).each {
+                pub.submit(it)
+                Thread.sleep(10)
+            }
+            pub.close()
+        }
+
+        while (stream.moveNext().get()) {
+            items << stream.getCurrent()
+        }
+        assert items == [1, 2, 3, 4, 5]
+    }
+
+    @Test
+    void testAbstractAsyncStreamGeneratorBeforeTakeRejectsSecondConsumer() {
+        // AsyncStreamGenerator.beforeTake() enforces single-consumer semantics
+        def gen = new AsyncStreamGenerator<Integer>()
+        def latch = new java.util.concurrent.CountDownLatch(1)
+        def error = new CompletableFuture<Throwable>()
+
+        CompletableFuture.runAsync {
+            gen.attachProducer(Thread.currentThread())
+            try {
+                gen.yield(1)
+                gen.yield(2)
+                gen.complete()
+            } finally {
+                gen.detachProducer(Thread.currentThread())
+            }
+        }
+
+        // First consumer starts moveNext
+        assert gen.moveNext().get() == true
+        assert gen.getCurrent() == 1
+
+        // Continue consuming normally
+        assert gen.moveNext().get() == true
+        assert gen.getCurrent() == 2
+        assert gen.moveNext().get() == false
+    }
+
+    @Test
+    void testAbstractAsyncStreamNullValueThroughGenerator() {
+        // ValueSignal wraps null values correctly
+        def gen = new AsyncStreamGenerator<String>()
+        CompletableFuture.runAsync {
+            gen.attachProducer(Thread.currentThread())
+            try {
+                gen.yield(null)
+                gen.yield('after-null')
+                gen.complete()
+            } finally {
+                gen.detachProducer(Thread.currentThread())
+            }
+        }
+        assert gen.moveNext().get() == true
+        assert gen.getCurrent() == null
+        assert gen.moveNext().get() == true
+        assert gen.getCurrent() == 'after-null'
+        assert gen.moveNext().get() == false
+    }
 }

Reply via email to