Assuming the test code and my reading of the mapConcurrent() code haven't
misled me, I think this observation suggests that if an *upstream exception*
terminates the stream, mapConcurrent() may not be able to properly cancel
or interrupt the virtual threads it has already started. This could
potentially lead to *resource leaks* or "zombie" threads that continue to
run or block indefinitely in the background, consuming resources
unnecessarily.

This behavior might be surprising to developers. For instance,
Stream.parallel(), which leverages ForkJoinPool, effectively manages its
entire pipeline. It can interrupt running threads and join() them to
enforce *happens-before guarantees* even when exceptions occur upstream.
Users might naturally expect a similar level of robust resource management
and memory visibility from mapConcurrent(). While explicit documentation
could clarify this behavior, such a fundamental difference might still lead
to confusion and unintended issues.
------------------------------

A Potential Design Adjustment for More Robustness

To address these concerns while maintaining mapConcurrent()'s core
benefits, I'd like to propose a strawman design adjustment.

Instead of mapConcurrent() being an intermediary Gatherer (which has
limited control over upstream events), consider exposing its functionality
as a *terminal Collector *(let's tentatively call it concurrently())

For example, instead of:

inputs.stream().filter(...).gather(mapConcurrent(10, input ->
callBackend(input))).toList();

We could use a Collector-based approach:

inputs.stream().filter(...).collect(concurrently(10, input ->
callBackend(input))).toList();

Here's how this Collector-based design could mitigate the issues:

   -

   *Pre-emptive Upstream Processing:* The concurrently() Collector could
   internally use collectingAndThen(toList(), ...) or a similar mechanism.
   This would ensure that *all upstream elements are eagerly consumed* and
   any upstream exceptions are thrown and handled *before* the concurrent
   virtual threads are ever launched for the mapConcurrent-like operations.
   -

   *Controlled Concurrent Operations:* Only once the upstream processing is
   complete and validated would the concurrent mapping begin. The concurrent
   operations would still be *lazy and concurrency-limited*, but they would
   only apply to the concurrent operations started by concurrently().
   -

   *Resolved Concerns:* This design would effectively make the
   *happens-before* and *resource leak* concerns caused by upstream
   exceptions moot, as such exceptions would occur and be dealt with at an
   earlier, more controlled stage with zero concurrency complications.

The most apparent trade-off is that this Collector approach would eagerly
consume all input elements, potentially increasing memory usage and
sacrificing the full laziness a Gatherer might offer. However,
mapConcurrent()'s primary use cases typically involve I/O-intensive
fan-outs, which generally don't operate on an extremely large number of
inputs. For the rare instances involving infinite or very large streams,
the limitation becomes a clear "*not designed for this*" scenario, simpler
than navigating the current nuanced caveats around happens-before
enforcement and thread cancellation.
Just my 2c,

Cheers


On Fri, Jul 4, 2025 at 8:55 AM Jige Yu <yuj...@gmail.com> wrote:

> Yes.
>
> I got two tests that show the behavior difference between downstream
> exception and upstream exception:
>
>
> @Test public void mapConcurrently_upstreamFailureDoesNotInterrupt() {
>
> ConcurrentLinkedQueue<Integer> started = new ConcurrentLinkedQueue<>();
>
> ConcurrentLinkedQueue<Integer> interrupted = new
> ConcurrentLinkedQueue<>();
>
> RuntimeException thrown = assertThrows(
>
> RuntimeException.class,
>
> () -> Stream.of(1, 10, 3, 0)
>
> .peek(n -> {
>
> if (n == 3) {
>
> try { // give 1 and 3 some time to have at least started
>
> Thread.sleep(100);
>
> } catch (InterruptedException e) {
>
> interrupted.add(n);
>
> }
>
> throw new IllegalArgumentException(String.valueOf(n));
>
> }
>
> })
>
> .gather(mapConcurrently(3, n -> {
>
> started.add(n);
>
> try {
>
> Thread.sleep(n * 10000);
>
> } catch (InterruptedException e) {
>
> interrupted.add(n);
>
> }
>
> return n;
>
> }))
>
> .findAny());
>
> assertThat(started).containsExactly(10, 1);
>
> assertThat(interrupted).isEmpty();
>
> assertThat(thrown).hasMessageThat().isEqualTo("3");
>
> }
>
>
> @Test public void mapConcurrently_downstreamFailurePropagated() {
>
> ConcurrentLinkedQueue<Integer> started = new ConcurrentLinkedQueue<>();
>
> ConcurrentLinkedQueue<Integer> interrupted = new
> ConcurrentLinkedQueue<>();
>
> RuntimeException thrown = assertThrows(
>
> RuntimeException.class,
>
> () -> Stream.of(10, 1, 3, 0)
>
> .gather(mapConcurrently(3, n -> {
>
> started.add(n);
>
> try {
>
> Thread.sleep(n * 1000);
>
> } catch (InterruptedException e) {
>
> interrupted.add(n);
>
> }
>
> return n;
>
> }))
>
> .peek(n -> {
>
> throw new IllegalArgumentException(String.valueOf(n));
>
> })
>
> .findAny());
>
> assertThat(started).containsExactly(10, 1, 3);
>
> assertThat(interrupted).containsExactly(3, 10);
>
> assertThat(thrown).hasMessageThat().isEqualTo("1");
>
> }
>
>
> Both with maxConcurrenc=3.
>
> When the downstream peek() throws on the first element "1" it gets, it
> will interrupt (and join) the other two pending threads (3 and 10).
> The interrupted ConcurrentLinkedQueue is guaranteed to see [3, 10] because
> of the happens-before guarantee.
>
> When the upstream peek() throws on 3, [1, 10] are also already running
> concurrently. But no thread is interrupted.
>
>
>
> On Thu, Jul 3, 2025 at 10:29 AM David Alayachew <davidalayac...@gmail.com>
> wrote:
>
>> These questions necessitate runnable examples. Do you have any
>>
>> On Thu, Jul 3, 2025, 10:37 AM Jige Yu <yuj...@gmail.com> wrote:
>>
>>> Hi JDK Core Devs,
>>>
>>> I'm writing to you today with a question about the behavior of
>>> mapConcurrent() and its interaction with unchecked exceptions. I've
>>> been experimenting with the API and observed that mapConcurrent()
>>> blocks and joins all virtual threads upon an unchecked exception before
>>> propagating it.
>>>
>>> Initially, I thought this design choice might provide a strong
>>> happens-before guarantee. My assumption was that an application catching a
>>> RuntimeException would be able to *observe all side effects* from the
>>> virtual threads, even though this practice is generally discouraged. This
>>> seemed like a potentially significant advantage, outweighing the risk of a
>>> virtual thread failing to respond to interruption or responding slowly.
>>>
>>> However, I've since realized that mapConcurrent() cannot fully
>>> guarantee a strong happens-before relationship when an unchecked exception
>>> occurs *somewhere* in the stream pipeline. While it can block and wait
>>> for exceptions thrown by the mapper function or downstream operations, it
>>> appears unable to intercept unchecked exceptions *thrown by an upstream*
>>> source.
>>>
>>> Consider a scenario with two input elements: if the first element starts
>>> a virtual thread, and then the second element causes an unchecked exception
>>> from the upstream *before* reaching the gather() call, the virtual
>>> thread initiated by the first element would not be interrupted. This makes
>>> the "happens-before" guarantee quite nuanced in practice.
>>>
>>> This brings me to my core questions:
>>>
>>>    1.
>>>
>>>    Is providing a happens-before guarantee upon an unchecked exception
>>>    a design goal for mapConcurrent()?
>>>    2.
>>>
>>>    If not, would it be more desirable to *not* join on virtual threads
>>>    when unchecked exceptions occur? This would allow the application code to
>>>    catch the exception sooner and avoid the risk of being blocked 
>>> indefinitely.
>>>
>>> Thank you for your time and insights.
>>>
>>> Best regards,
>>>
>>> Ben Yu
>>>
>>

Reply via email to