This is an automated email from the ASF dual-hosted git repository.
sunlan pushed a commit to branch GROOVY-9381_3
in repository https://gitbox.apache.org/repos/asf/groovy.git
The following commit(s) were added to refs/heads/GROOVY-9381_3 by this push:
new 98c29c03ca Minor tweaks
98c29c03ca is described below
commit 98c29c03ca1a6e6fc3524dfc6c49769eaeed673a
Author: Daniel Sun <[email protected]>
AuthorDate: Sat Mar 14 15:28:20 2026 +0900
Minor tweaks
---
.../groovy/runtime/async/AsyncStreamGenerator.java | 17 ++--
.../groovy/runtime/async/FlowPublisherAdapter.java | 100 +++++++++++++--------
src/spec/doc/core-async-await.adoc | 43 ++++++++-
.../codehaus/groovy/transform/AsyncApiTest.groovy | 8 +-
4 files changed, 122 insertions(+), 46 deletions(-)
diff --git
a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
index 59299f9fb3..d5b5e36e0c 100644
--- a/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
+++ b/src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java
@@ -87,6 +87,13 @@ public class AsyncStreamGenerator<T> implements
AsyncStream<T> {
private static final Object DONE = new Object();
+ /**
+ * Cached awaitables for the two common {@code moveNext()} outcomes.
+ * Eliminates per-call object allocation on the hot path.
+ */
+ private static final Awaitable<Boolean> MOVE_NEXT_TRUE =
Awaitable.of(Boolean.TRUE);
+ private static final Awaitable<Boolean> MOVE_NEXT_FALSE =
Awaitable.of(Boolean.FALSE);
+
private final SynchronousQueue<Object> queue = new SynchronousQueue<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicReference<Thread> producerThread = new
AtomicReference<>();
@@ -231,7 +238,7 @@ public class AsyncStreamGenerator<T> implements
AsyncStream<T> {
@SuppressWarnings("unchecked")
public Awaitable<Boolean> moveNext() {
if (closed.get()) {
- return Awaitable.of(false);
+ return MOVE_NEXT_FALSE;
}
// Enforce single-consumer semantics: only one thread may call
moveNext()
// at a time. A concurrent second caller would overwrite
consumerThread,
@@ -252,13 +259,13 @@ public class AsyncStreamGenerator<T> implements
AsyncStream<T> {
// re-check the consumer would block in queue.take() indefinitely.
if (closed.get()) {
consumerThread.compareAndSet(currentThread, null);
- return Awaitable.of(false);
+ return MOVE_NEXT_FALSE;
}
try {
Object next = queue.take();
if (next == DONE) {
closed.set(true);
- return Awaitable.of(false);
+ return MOVE_NEXT_FALSE;
}
if (next instanceof ErrorItem ei) {
closed.set(true);
@@ -267,10 +274,10 @@ public class AsyncStreamGenerator<T> implements
AsyncStream<T> {
throw AsyncSupport.sneakyThrow(cause);
}
current = (T) ((Item) next).value;
- return Awaitable.of(true);
+ return MOVE_NEXT_TRUE;
} catch (InterruptedException e) {
if (closed.get()) {
- return Awaitable.of(false);
+ return MOVE_NEXT_FALSE;
}
Thread.currentThread().interrupt();
throw interrupted("Interrupted during moveNext", e);
diff --git
a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
index 777a897433..fa94f02ac2 100644
--- a/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
+++ b/src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java
@@ -65,12 +65,13 @@ import java.util.concurrent.atomic.AtomicReference;
* <li>§2.5 — duplicate {@code onSubscribe} cancels the second
subscription</li>
* <li>§2.13 — {@code null} items in {@code onNext} are rejected
immediately</li>
* <li>All signals ({@code onNext}, {@code onError}, {@code onComplete}) use
- * blocking {@code put()} to guarantee delivery even under queue
- * contention</li>
+ * blocking {@code put()} with a non-blocking {@code offer()} fallback
+ * when the publisher thread is interrupted — this prevents both silent
+ * item loss and consumer deadlock even under unexpected interrupts</li>
* <li>Back-pressure is enforced by requesting exactly one item after
- * each consumed element; demand is signalled <em>before</em> the
- * consumer's {@code moveNext()} awaitable completes, preventing
- * livelock when producer and consumer share the same thread pool</li>
+ * each consumed element; demand is signalled <em>before</em>
+ * {@code moveNext()} returns, preventing livelock when producer and
+ * consumer share the same thread pool</li>
* </ul>
*
* @see groovy.concurrent.AwaitableAdapterRegistry
@@ -86,6 +87,14 @@ public class FlowPublisherAdapter implements
AwaitableAdapter {
*/
private static final int QUEUE_CAPACITY = 256;
+ /**
+ * Cached awaitables for the two common {@code moveNext()} outcomes.
+ * Eliminates per-call {@link CompletableFuture} + {@link GroovyPromise}
+ * allocation on the hot path (every element and stream-end).
+ */
+ private static final Awaitable<Boolean> MOVE_NEXT_TRUE =
Awaitable.of(Boolean.TRUE);
+ private static final Awaitable<Boolean> MOVE_NEXT_FALSE =
Awaitable.of(Boolean.FALSE);
+
/**
* Returns {@code true} if the given type is assignable to
* {@link Flow.Publisher}, enabling single-value {@code await}.
@@ -234,9 +243,10 @@ public class FlowPublisherAdapter implements
AwaitableAdapter {
* share the same thread pool.</p>
*
* <p>The internal bounded queue (capacity {@value QUEUE_CAPACITY})
- * absorbs minor timing jitter between producer and consumer. All
- * signals use blocking {@code put()}, ensuring no items or terminal
- * events are silently dropped.</p>
+ * absorbs minor timing jitter between producer and consumer. Signals
+ * use blocking {@code put()} for normal delivery with a non-blocking
+ * {@code offer()} fallback when the publisher thread is interrupted —
+ * ensuring no items or terminal events are silently dropped.</p>
*
* <p><b>Resource management:</b> When the consumer calls
* {@link AsyncStream#close()} (e.g. via {@code break} in a
@@ -293,6 +303,21 @@ public class FlowPublisherAdapter implements
AwaitableAdapter {
queue.put(new ValueSignal<>(item));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
+ // Blocking put() was interrupted. Fall back to
non-blocking
+ // offer() so the item still reaches the consumer. With
+ // one-at-a-time demand the queue is almost never
full, so
+ // offer() effectively always succeeds. If it doesn't
+ // (misbehaving publisher overfilling the queue),
cancel
+ // upstream and inject an error signal to terminate the
+ // consumer cleanly instead of silently dropping the
item.
+ if (!queue.offer(new ValueSignal<>(item))) {
+ Flow.Subscription sub = subRef.getAndSet(null);
+ if (sub != null) sub.cancel();
+ closedRef.set(true);
+ queue.offer(new ErrorSignal(
+ new CancellationException(
+ "Item delivery interrupted and
queue full")));
+ }
}
}
}
@@ -308,6 +333,13 @@ public class FlowPublisherAdapter implements
AwaitableAdapter {
queue.put(new ErrorSignal(t));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
+ // Blocking put() was interrupted. Fall back to
non-blocking
+ // offer(). If that also fails (queue full), set
streamClosed
+ // so the consumer's next moveNext() returns false instead
of
+ // blocking indefinitely.
+ if (!queue.offer(new ErrorSignal(t))) {
+ streamClosed.set(true);
+ }
}
}
@@ -319,6 +351,11 @@ public class FlowPublisherAdapter implements
AwaitableAdapter {
queue.put(COMPLETE_SENTINEL);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
+ // Same fallback as onError: try non-blocking offer,
+ // set streamClosed if that also fails.
+ if (!queue.offer(COMPLETE_SENTINEL)) {
+ streamClosed.set(true);
+ }
}
}
});
@@ -328,47 +365,42 @@ public class FlowPublisherAdapter implements
AwaitableAdapter {
@Override
public Awaitable<Boolean> moveNext() {
- // Fast path: stream already closed — no CF allocation needed
if (streamClosed.get()) {
- return Awaitable.of(Boolean.FALSE);
+ return MOVE_NEXT_FALSE;
}
- CompletableFuture<Boolean> cf = new CompletableFuture<>();
try {
Object signal = queue.take();
if (signal instanceof ValueSignal) {
current = ((ValueSignal<T>) signal).value;
- // Signal demand for the next item BEFORE completing
- // the awaitable, so the publisher can begin producing
- // the next value while the consumer processes this
one.
- // Ordering here is critical: if request(1) were called
- // after cf.complete(), the consumer could re-enter
- // moveNext() and block in take() before demand was
- // signalled, creating a livelock.
+ // Signal demand BEFORE returning so the publisher can
+ // begin producing the next value while the consumer
+ // processes this one — prevents livelock when both
+ // share a thread pool.
Flow.Subscription sub = subRef.get();
if (sub != null) sub.request(1);
- cf.complete(Boolean.TRUE);
- } else if (signal instanceof ErrorSignal) {
+ return MOVE_NEXT_TRUE;
+ } else if (signal instanceof ErrorSignal es) {
streamClosed.set(true);
- cf.completeExceptionally(((ErrorSignal) signal).error);
+ // Throw directly (matching AsyncStreamGenerator) to
+ // avoid unnecessary CF allocation on the error path
+ // and JDK 23+ CompletableFuture.get() wrapping issues.
+ Throwable cause = es.error;
+ if (cause instanceof Error err) throw err;
+ throw AsyncSupport.sneakyThrow(cause);
} else {
- // COMPLETE_SENTINEL or unknown — treat as
end-of-stream
+ // COMPLETE_SENTINEL — end-of-stream
streamClosed.set(true);
- cf.complete(Boolean.FALSE);
+ return MOVE_NEXT_FALSE;
}
} catch (InterruptedException ie) {
- // Consumer thread was interrupted — throw directly as
- // CancellationException (matching AsyncStreamGenerator
behaviour
- // and avoiding JDK 23+ CompletableFuture.get() wrapping
issues)
streamClosed.set(true);
Thread.currentThread().interrupt();
CancellationException ce = new
CancellationException("Interrupted during moveNext");
ce.initCause(ie);
throw ce;
}
-
- return new GroovyPromise<>(cf);
}
@Override
@@ -380,19 +412,15 @@ public class FlowPublisherAdapter implements
AwaitableAdapter {
public void close() {
if (streamClosed.compareAndSet(false, true)) {
closedRef.set(true);
- // Cancel the upstream subscription
Flow.Subscription sub = subRef.getAndSet(null);
if (sub != null) sub.cancel();
// Drain the queue and inject a sentinel to unblock a
// concurrent moveNext() that may be blocked in take().
- // Using blocking put() after clear() guarantees delivery;
- // since the queue is freshly cleared, put() will not
block.
+ // offer() is non-blocking and cannot throw
InterruptedException;
+ // after clear(), the queue is empty (capacity 256) so
offer()
+ // effectively always succeeds.
queue.clear();
- try {
- queue.put(COMPLETE_SENTINEL);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
+ queue.offer(COMPLETE_SENTINEL);
}
}
};
diff --git a/src/spec/doc/core-async-await.adoc
b/src/spec/doc/core-async-await.adoc
index 5d38e1b7e3..fa43090a1e 100644
--- a/src/spec/doc/core-async-await.adoc
+++ b/src/spec/doc/core-async-await.adoc
@@ -985,8 +985,10 @@ writes (adapter registration) are rare, reads (every
`await`) are frequent
idempotent close/cancellation signalling
* `Flow.Publisher` adaptation (`FlowPublisherAdapter`) —
`AtomicReference<Subscription>` with CAS-guarded
onSubscribe (§2.5 compliance), `AtomicBoolean` done-guard for single-value
path, and close-aware
- queue signalling; all signals (`onNext`/`onError`/`onComplete`) use blocking
`put()` for
- guaranteed delivery; demand is signalled before the consumer's awaitable
completes to prevent livelock
+ queue signalling; all signals (`onNext`/`onError`/`onComplete`) use blocking
`put()` with a
+ non-blocking `offer()` fallback on interrupt, preventing both silent item
loss and consumer deadlock;
+ demand is signalled before `moveNext()` returns to prevent livelock;
`moveNext()` uses cached
+ `Awaitable` instances to eliminate per-call allocations on the hot path
* Defer scopes — per-task `ArrayDeque`, confined to a single thread (no
sharing)
* `DELAY_SCHEDULER` — single daemon thread for non-blocking timer operations
@@ -1032,6 +1034,43 @@ prevents the consumer from blocking indefinitely on a
subsequent `moveNext()`
measure against unexpected thread interruption outside the normal close path.
|===
+==== Flow.Publisher Adaptation Safety
+
+The `FlowPublisherAdapter` bridges the push-based `Flow.Publisher` protocol
into the pull-based
+`AsyncStream` model. Several concurrency hazards are handled transparently:
+
+[cols="2,3"]
+|===
+| Concern | Mechanism
+
+| **Back-pressure**
+| A bounded `LinkedBlockingQueue` (capacity 256) bridges push and pull.
Demand is capped at
+one item per `moveNext()` call via `request(1)`. Demand is signalled _before_
`moveNext()`
+returns, preventing livelock when producer and consumer share a thread pool.
+
+| **Interrupt-safe signal delivery**
+| All subscriber callbacks (`onNext`, `onError`, `onComplete`) use blocking
`put()` for normal
+delivery, with a non-blocking `offer()` fallback when the publisher thread is
interrupted.
+If `offer()` also fails (queue full from a misbehaving publisher), `onNext`
cancels the
+upstream subscription and injects an error signal; `onError`/`onComplete` set
the `streamClosed`
+flag so the consumer's next `moveNext()` exits immediately.
+
+| **Allocation-free hot path**
+| `moveNext()` returns cached `Awaitable<Boolean>` instances for the value and
end-of-stream
+cases, eliminating per-call `CompletableFuture` + `GroovyPromise` allocation.
Error signals
+are thrown directly (matching `AsyncStreamGenerator` behaviour) rather than
wrapped in a
+failed `CompletableFuture`.
+
+| **Non-blocking close**
+| `close()` uses non-blocking `offer()` (instead of blocking `put()`) to
inject the completion
+sentinel after clearing the queue — this cannot throw `InterruptedException`
and effectively
+always succeeds because the queue was just drained.
+
+| **Idempotent close**
+| `close()` is guarded by `AtomicBoolean.compareAndSet()`, making it safe to
call multiple
+times from any thread without side effects.
+|===
+
These mechanisms ensure that `yield return` / `for await` code remains as
simple as writing
a synchronous loop, while the runtime handles all cross-thread coordination.
diff --git a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
index 7dabc83a79..feaa8d1a89 100644
--- a/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
+++ b/src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy
@@ -1503,11 +1503,13 @@ class AsyncApiTest {
AsyncStream<String> stream = AsyncStream.from(pub)
assert stream.moveNext().get() == true
assert stream.getCurrent() == 'item1'
+ // moveNext() throws the error directly (matching AsyncStreamGenerator
+ // behaviour) rather than wrapping it in a failed Awaitable
try {
- stream.moveNext().get()
+ stream.moveNext()
assert false
- } catch (ExecutionException e) {
- assert e.cause instanceof IOException
+ } catch (IOException e) {
+ assert e.message == 'stream-err'
}
}