This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
     new 98c29c03ca Minor tweaks
98c29c03ca is described below

commit 98c29c03ca1a6e6fc3524dfc6c49769eaeed673a
Author: Daniel Sun <[email protected]>
AuthorDate: Sat Mar 14 15:28:20 2026 +0900

    Minor tweaks
---
 .../groovy/runtime/async/AsyncStreamGenerator.java |  17 ++--
 .../groovy/runtime/async/FlowPublisherAdapter.java | 100 +++++++++++++--------
 src/spec/doc/core-async-await.adoc                 |  43 ++++++++-
 .../codehaus/groovy/transform/AsyncApiTest.groovy  |   8 +-
 4 files changed, 122 insertions(+), 46 deletions(-)

diff --git 
a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java 
b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
index 59299f9fb3..d5b5e36e0c 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
@@ -87,6 +87,13 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
 
     private static final Object DONE = new Object();
 
+    /**
+     * Cached awaitables for the two common {@code moveNext()} outcomes.
+     * Eliminates per-call object allocation on the hot path.
+     */
+    private static final Awaitable<Boolean> MOVE_NEXT_TRUE = 
Awaitable.of(Boolean.TRUE);
+    private static final Awaitable<Boolean> MOVE_NEXT_FALSE = 
Awaitable.of(Boolean.FALSE);
+
     private final SynchronousQueue<Object> queue = new SynchronousQueue<>();
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final AtomicReference<Thread> producerThread = new 
AtomicReference<>();
@@ -231,7 +238,7 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
     @SuppressWarnings("unchecked")
     public Awaitable<Boolean> moveNext() {
         if (closed.get()) {
-            return Awaitable.of(false);
+            return MOVE_NEXT_FALSE;
         }
         // Enforce single-consumer semantics: only one thread may call 
moveNext()
         // at a time.  A concurrent second caller would overwrite 
consumerThread,
@@ -252,13 +259,13 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
         // re-check the consumer would block in queue.take() indefinitely.
         if (closed.get()) {
             consumerThread.compareAndSet(currentThread, null);
-            return Awaitable.of(false);
+            return MOVE_NEXT_FALSE;
         }
         try {
             Object next = queue.take();
             if (next == DONE) {
                 closed.set(true);
-                return Awaitable.of(false);
+                return MOVE_NEXT_FALSE;
             }
             if (next instanceof ErrorItem ei) {
                 closed.set(true);
@@ -267,10 +274,10 @@ public class AsyncStreamGenerator<T> implements 
AsyncStream<T> {
                 throw AsyncSupport.sneakyThrow(cause);
             }
             current = (T) ((Item) next).value;
-            return Awaitable.of(true);
+            return MOVE_NEXT_TRUE;
         } catch (InterruptedException e) {
             if (closed.get()) {
-                return Awaitable.of(false);
+                return MOVE_NEXT_FALSE;
             }
             Thread.currentThread().interrupt();
             throw interrupted("Interrupted during moveNext", e);
diff --git 
a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java 
b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
index 777a897433..fa94f02ac2 100644
--- a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
+++ b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
@@ -65,12 +65,13 @@ import java.util.concurrent.atomic.AtomicReference;
  *   <li>§2.5 — duplicate {@code onSubscribe} cancels the second 
subscription</li>
  *   <li>§2.13 — {@code null} items in {@code onNext} are rejected 
immediately</li>
  *   <li>All signals ({@code onNext}, {@code onError}, {@code onComplete}) use
- *       blocking {@code put()} to guarantee delivery even under queue
- *       contention</li>
+ *       blocking {@code put()} with a non-blocking {@code offer()} fallback
+ *       when the publisher thread is interrupted — this prevents both silent
+ *       item loss and consumer deadlock even under unexpected interrupts</li>
  *   <li>Back-pressure is enforced by requesting exactly one item after
- *       each consumed element; demand is signalled <em>before</em> the
- *       consumer's {@code moveNext()} awaitable completes, preventing
- *       livelock when producer and consumer share the same thread pool</li>
+ *       each consumed element; demand is signalled <em>before</em>
+ *       {@code moveNext()} returns, preventing livelock when producer and
+ *       consumer share the same thread pool</li>
  * </ul>
  *
  * @see groovy.concurrent.AwaitableAdapterRegistry
@@ -86,6 +87,14 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
      */
     private static final int QUEUE_CAPACITY = 256;
 
+    /**
+     * Cached awaitables for the two common {@code moveNext()} outcomes.
+     * Eliminates per-call {@link CompletableFuture} + {@link GroovyPromise}
+     * allocation on the hot path (every element and stream-end).
+     */
+    private static final Awaitable<Boolean> MOVE_NEXT_TRUE = 
Awaitable.of(Boolean.TRUE);
+    private static final Awaitable<Boolean> MOVE_NEXT_FALSE = 
Awaitable.of(Boolean.FALSE);
+
     /**
      * Returns {@code true} if the given type is assignable to
      * {@link Flow.Publisher}, enabling single-value {@code await}.
@@ -234,9 +243,10 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
      * share the same thread pool.</p>
      *
      * <p>The internal bounded queue (capacity {@value QUEUE_CAPACITY})
-     * absorbs minor timing jitter between producer and consumer.  All
-     * signals use blocking {@code put()}, ensuring no items or terminal
-     * events are silently dropped.</p>
+     * absorbs minor timing jitter between producer and consumer.  Signals
+     * use blocking {@code put()} for normal delivery with a non-blocking
+     * {@code offer()} fallback when the publisher thread is interrupted —
+     * ensuring no items or terminal events are silently dropped.</p>
      *
      * <p><b>Resource management:</b> When the consumer calls
      * {@link AsyncStream#close()} (e.g. via {@code break} in a
@@ -293,6 +303,21 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
                         queue.put(new ValueSignal<>(item));
                     } catch (InterruptedException ie) {
                         Thread.currentThread().interrupt();
+                        // Blocking put() was interrupted. Fall back to 
non-blocking
+                        // offer() so the item still reaches the consumer. With
+                        // one-at-a-time demand the queue is almost never 
full, so
+                        // offer() effectively always succeeds. If it doesn't
+                        // (misbehaving publisher overfilling the queue), 
cancel
+                        // upstream and inject an error signal to terminate the
+                        // consumer cleanly instead of silently dropping the 
item.
+                        if (!queue.offer(new ValueSignal<>(item))) {
+                            Flow.Subscription sub = subRef.getAndSet(null);
+                            if (sub != null) sub.cancel();
+                            closedRef.set(true);
+                            queue.offer(new ErrorSignal(
+                                    new CancellationException(
+                                            "Item delivery interrupted and 
queue full")));
+                        }
                     }
                 }
             }
@@ -308,6 +333,13 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
                     queue.put(new ErrorSignal(t));
                 } catch (InterruptedException ie) {
                     Thread.currentThread().interrupt();
+                    // Blocking put() was interrupted. Fall back to 
non-blocking
+                    // offer(). If that also fails (queue full), set 
streamClosed
+                    // so the consumer's next moveNext() returns false instead 
of
+                    // blocking indefinitely.
+                    if (!queue.offer(new ErrorSignal(t))) {
+                        streamClosed.set(true);
+                    }
                 }
             }
 
@@ -319,6 +351,11 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
                     queue.put(COMPLETE_SENTINEL);
                 } catch (InterruptedException ie) {
                     Thread.currentThread().interrupt();
+                    // Same fallback as onError: try non-blocking offer,
+                    // set streamClosed if that also fails.
+                    if (!queue.offer(COMPLETE_SENTINEL)) {
+                        streamClosed.set(true);
+                    }
                 }
             }
         });
@@ -328,47 +365,42 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
 
             @Override
             public Awaitable<Boolean> moveNext() {
-                // Fast path: stream already closed — no CF allocation needed
                 if (streamClosed.get()) {
-                    return Awaitable.of(Boolean.FALSE);
+                    return MOVE_NEXT_FALSE;
                 }
 
-                CompletableFuture<Boolean> cf = new CompletableFuture<>();
                 try {
                     Object signal = queue.take();
 
                     if (signal instanceof ValueSignal) {
                         current = ((ValueSignal<T>) signal).value;
-                        // Signal demand for the next item BEFORE completing
-                        // the awaitable, so the publisher can begin producing
-                        // the next value while the consumer processes this 
one.
-                        // Ordering here is critical: if request(1) were called
-                        // after cf.complete(), the consumer could re-enter
-                        // moveNext() and block in take() before demand was
-                        // signalled, creating a livelock.
+                        // Signal demand BEFORE returning so the publisher can
+                        // begin producing the next value while the consumer
+                        // processes this one — prevents livelock when both
+                        // share a thread pool.
                         Flow.Subscription sub = subRef.get();
                         if (sub != null) sub.request(1);
-                        cf.complete(Boolean.TRUE);
-                    } else if (signal instanceof ErrorSignal) {
+                        return MOVE_NEXT_TRUE;
+                    } else if (signal instanceof ErrorSignal es) {
                         streamClosed.set(true);
-                        cf.completeExceptionally(((ErrorSignal) signal).error);
+                        // Throw directly (matching AsyncStreamGenerator) to
+                        // avoid unnecessary CF allocation on the error path
+                        // and JDK 23+ CompletableFuture.get() wrapping issues.
+                        Throwable cause = es.error;
+                        if (cause instanceof Error err) throw err;
+                        throw AsyncSupport.sneakyThrow(cause);
                     } else {
-                        // COMPLETE_SENTINEL or unknown — treat as 
end-of-stream
+                        // COMPLETE_SENTINEL — end-of-stream
                         streamClosed.set(true);
-                        cf.complete(Boolean.FALSE);
+                        return MOVE_NEXT_FALSE;
                     }
                 } catch (InterruptedException ie) {
-                    // Consumer thread was interrupted — throw directly as
-                    // CancellationException (matching AsyncStreamGenerator 
behaviour
-                    // and avoiding JDK 23+ CompletableFuture.get() wrapping 
issues)
                     streamClosed.set(true);
                     Thread.currentThread().interrupt();
                     CancellationException ce = new 
CancellationException("Interrupted during moveNext");
                     ce.initCause(ie);
                     throw ce;
                 }
-
-                return new GroovyPromise<>(cf);
             }
 
             @Override
@@ -380,19 +412,15 @@ public class FlowPublisherAdapter implements 
AwaitableAdapter {
             public void close() {
                 if (streamClosed.compareAndSet(false, true)) {
                     closedRef.set(true);
-                    // Cancel the upstream subscription
                     Flow.Subscription sub = subRef.getAndSet(null);
                     if (sub != null) sub.cancel();
                     // Drain the queue and inject a sentinel to unblock a
                     // concurrent moveNext() that may be blocked in take().
-                    // Using blocking put() after clear() guarantees delivery;
-                    // since the queue is freshly cleared, put() will not 
block.
+                    // offer() is non-blocking and cannot throw 
InterruptedException;
+                    // after clear(), the queue is empty (capacity 256) so 
offer()
+                    // effectively always succeeds.
                     queue.clear();
-                    try {
-                        queue.put(COMPLETE_SENTINEL);
-                    } catch (InterruptedException ie) {
-                        Thread.currentThread().interrupt();
-                    }
+                    queue.offer(COMPLETE_SENTINEL);
                 }
             }
         };
diff --git a/src/spec/doc/core-async-await.adoc 
b/src/spec/doc/core-async-await.adoc
index 5d38e1b7e3..fa43090a1e 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -985,8 +985,10 @@ writes (adapter registration) are rare, reads (every 
`await`) are frequent
 idempotent close/cancellation signalling
 * `Flow.Publisher` adaptation (`FlowPublisherAdapter`) — 
`AtomicReference<Subscription>` with CAS-guarded
   onSubscribe (§2.5 compliance), `AtomicBoolean` done-guard for single-value 
path, and close-aware
-  queue signalling; all signals (`onNext`/`onError`/`onComplete`) use blocking 
`put()` for
-  guaranteed delivery; demand is signalled before the consumer's awaitable 
completes to prevent livelock
+  queue signalling; all signals (`onNext`/`onError`/`onComplete`) use blocking 
`put()` with a
+  non-blocking `offer()` fallback on interrupt, preventing both silent item 
loss and consumer deadlock;
+  demand is signalled before `moveNext()` returns to prevent livelock; 
`moveNext()` uses cached
+  `Awaitable` instances to eliminate per-call allocations on the hot path
 * Defer scopes — per-task `ArrayDeque`, confined to a single thread (no 
sharing)
 * `DELAY_SCHEDULER` — single daemon thread for non-blocking timer operations
 
@@ -1032,6 +1034,43 @@ prevents the consumer from blocking indefinitely on a 
subsequent `moveNext()` 
 measure against unexpected thread interruption outside the normal close path.
 |===
 
+==== Flow.Publisher Adaptation Safety
+
+The `FlowPublisherAdapter` bridges the push-based `Flow.Publisher` protocol 
into the pull-based
+`AsyncStream` model.  Several concurrency hazards are handled transparently:
+
+[cols="2,3"]
+|===
+| Concern | Mechanism
+
+| **Back-pressure**
+| A bounded `LinkedBlockingQueue` (capacity 256) bridges push and pull.  
Demand is capped at
+one item per `moveNext()` call via `request(1)`.  Demand is signalled _before_ 
`moveNext()`
+returns, preventing livelock when producer and consumer share a thread pool.
+
+| **Interrupt-safe signal delivery**
+| All subscriber callbacks (`onNext`, `onError`, `onComplete`) use blocking 
`put()` for normal
+delivery, with a non-blocking `offer()` fallback when the publisher thread is 
interrupted.
+If `offer()` also fails (queue full from a misbehaving publisher), `onNext` 
cancels the
+upstream subscription and injects an error signal; `onError`/`onComplete` set 
the `streamClosed`
+flag so the consumer's next `moveNext()` exits immediately.
+
+| **Allocation-free hot path**
+| `moveNext()` returns cached `Awaitable<Boolean>` instances for the value and 
end-of-stream
+cases, eliminating per-call `CompletableFuture` + `GroovyPromise` allocation.  
Error signals
+are thrown directly (matching `AsyncStreamGenerator` behaviour) rather than 
wrapped in a
+failed `CompletableFuture`.
+
+| **Non-blocking close**
+| `close()` uses non-blocking `offer()` (instead of blocking `put()`) to 
inject the completion
+sentinel after clearing the queue — this cannot throw `InterruptedException` 
and effectively
+always succeeds because the queue was just drained.
+
+| **Idempotent close**
+| `close()` is guarded by `AtomicBoolean.compareAndSet()`, making it safe to 
call multiple
+times from any thread without side effects.
+|===
+
 These mechanisms ensure that `yield return` / `for await` code remains as 
simple as writing
 a synchronous loop, while the runtime handles all cross-thread coordination.
 
diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy 
b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
index 7dabc83a79..feaa8d1a89 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
@@ -1503,11 +1503,13 @@ class AsyncApiTest {
         AsyncStream<String> stream = AsyncStream.from(pub)
         assert stream.moveNext().get() == true
         assert stream.getCurrent() == 'item1'
+        // moveNext() throws the error directly (matching AsyncStreamGenerator
+        // behaviour) rather than wrapping it in a failed Awaitable
         try {
-            stream.moveNext().get()
+            stream.moveNext()
             assert false
-        } catch (ExecutionException e) {
-            assert e.cause instanceof IOException
+        } catch (IOException e) {
+            assert e.message == 'stream-err'
         }
     }
 

Reply via email to