Assuming the test code and my reading of the mapConcurrent() code haven't misled me, I think this observation suggests that if an *upstream exception* terminates the stream, mapConcurrent() may not be able to properly cancel or interrupt the virtual threads it has already started. This could potentially lead to *resource leaks* or "zombie" threads that continue to run or block indefinitely in the background, consuming resources unnecessarily.
This behavior might be surprising to developers. For instance, Stream.parallel(), which leverages ForkJoinPool, effectively manages its entire pipeline. It can interrupt running threads and join() them to enforce *happens-before guarantees* even when exceptions occur upstream. Users might naturally expect a similar level of robust resource management and memory visibility from mapConcurrent(). While explicit documentation could clarify this behavior, such a fundamental difference might still lead to confusion and unintended issues. ------------------------------ A Potential Design Adjustment for More Robustness To address these concerns while maintaining mapConcurrent()'s core benefits, I'd like to propose a strawman design adjustment. Instead of mapConcurrent() being an intermediary Gatherer (which has limited control over upstream events), consider exposing its functionality as a *terminal Collector *(let's tentatively call it concurrently()) For example, instead of: inputs.stream().filter(...).gather(mapConcurrent(10, input -> callBackend(input))).toList(); We could use a Collector-based approach: inputs.stream().filter(...).collect(concurrently(10, input -> callBackend(input))).toList(); Here's how this Collector-based design could mitigate the issues: - *Pre-emptive Upstream Processing:* The concurrently() Collector could internally use collectingAndThen(toList(), ...) or a similar mechanism. This would ensure that *all upstream elements are eagerly consumed* and any upstream exceptions are thrown and handled *before* the concurrent virtual threads are ever launched for the mapConcurrent-like operations. - *Controlled Concurrent Operations:* Only once the upstream processing is complete and validated would the concurrent mapping begin. The concurrent operations would still be *lazy and concurrency-limited*, but they would only apply to the concurrent operations started by concurrently(). - *Resolved Concerns:* This design would effectively make the *happens-before* and *resource leak* concerns caused by upstream exceptions moot, as such exceptions would occur and be dealt with at an earlier, more controlled stage with zero concurrency complications. The most apparent trade-off is that this Collector approach would eagerly consume all input elements, potentially increasing memory usage and sacrificing the full laziness a Gatherer might offer. However, mapConcurrent()'s primary use cases typically involve I/O-intensive fan-outs, which generally don't operate on an extremely large number of inputs. For the rare instances involving infinite or very large streams, the limitation becomes a clear "*not designed for this*" scenario, simpler than navigating the current nuanced caveats around happens-before enforcement and thread cancellation. Just my 2c, Cheers On Fri, Jul 4, 2025 at 8:55 AM Jige Yu <yuj...@gmail.com> wrote: > Yes. > > I got two tests that show the behavior difference between downstream > exception and upstream exception: > > > @Test public void mapConcurrently_upstreamFailureDoesNotInterrupt() { > > ConcurrentLinkedQueue<Integer> started = new ConcurrentLinkedQueue<>(); > > ConcurrentLinkedQueue<Integer> interrupted = new > ConcurrentLinkedQueue<>(); > > RuntimeException thrown = assertThrows( > > RuntimeException.class, > > () -> Stream.of(1, 10, 3, 0) > > .peek(n -> { > > if (n == 3) { > > try { // give 1 and 3 some time to have at least started > > Thread.sleep(100); > > } catch (InterruptedException e) { > > interrupted.add(n); > > } > > throw new IllegalArgumentException(String.valueOf(n)); > > } > > }) > > .gather(mapConcurrently(3, n -> { > > started.add(n); > > try { > > Thread.sleep(n * 10000); > > } catch (InterruptedException e) { > > interrupted.add(n); > > } > > return n; > > })) > > .findAny()); > > assertThat(started).containsExactly(10, 1); > > assertThat(interrupted).isEmpty(); > > assertThat(thrown).hasMessageThat().isEqualTo("3"); > > } > > > @Test public void mapConcurrently_downstreamFailurePropagated() { > > ConcurrentLinkedQueue<Integer> started = new ConcurrentLinkedQueue<>(); > > ConcurrentLinkedQueue<Integer> interrupted = new > ConcurrentLinkedQueue<>(); > > RuntimeException thrown = assertThrows( > > RuntimeException.class, > > () -> Stream.of(10, 1, 3, 0) > > .gather(mapConcurrently(3, n -> { > > started.add(n); > > try { > > Thread.sleep(n * 1000); > > } catch (InterruptedException e) { > > interrupted.add(n); > > } > > return n; > > })) > > .peek(n -> { > > throw new IllegalArgumentException(String.valueOf(n)); > > }) > > .findAny()); > > assertThat(started).containsExactly(10, 1, 3); > > assertThat(interrupted).containsExactly(3, 10); > > assertThat(thrown).hasMessageThat().isEqualTo("1"); > > } > > > Both with maxConcurrenc=3. > > When the downstream peek() throws on the first element "1" it gets, it > will interrupt (and join) the other two pending threads (3 and 10). > The interrupted ConcurrentLinkedQueue is guaranteed to see [3, 10] because > of the happens-before guarantee. > > When the upstream peek() throws on 3, [1, 10] are also already running > concurrently. But no thread is interrupted. > > > > On Thu, Jul 3, 2025 at 10:29 AM David Alayachew <davidalayac...@gmail.com> > wrote: > >> These questions necessitate runnable examples. Do you have any >> >> On Thu, Jul 3, 2025, 10:37 AM Jige Yu <yuj...@gmail.com> wrote: >> >>> Hi JDK Core Devs, >>> >>> I'm writing to you today with a question about the behavior of >>> mapConcurrent() and its interaction with unchecked exceptions. I've >>> been experimenting with the API and observed that mapConcurrent() >>> blocks and joins all virtual threads upon an unchecked exception before >>> propagating it. >>> >>> Initially, I thought this design choice might provide a strong >>> happens-before guarantee. My assumption was that an application catching a >>> RuntimeException would be able to *observe all side effects* from the >>> virtual threads, even though this practice is generally discouraged. This >>> seemed like a potentially significant advantage, outweighing the risk of a >>> virtual thread failing to respond to interruption or responding slowly. >>> >>> However, I've since realized that mapConcurrent() cannot fully >>> guarantee a strong happens-before relationship when an unchecked exception >>> occurs *somewhere* in the stream pipeline. While it can block and wait >>> for exceptions thrown by the mapper function or downstream operations, it >>> appears unable to intercept unchecked exceptions *thrown by an upstream* >>> source. >>> >>> Consider a scenario with two input elements: if the first element starts >>> a virtual thread, and then the second element causes an unchecked exception >>> from the upstream *before* reaching the gather() call, the virtual >>> thread initiated by the first element would not be interrupted. This makes >>> the "happens-before" guarantee quite nuanced in practice. >>> >>> This brings me to my core questions: >>> >>> 1. >>> >>> Is providing a happens-before guarantee upon an unchecked exception >>> a design goal for mapConcurrent()? >>> 2. >>> >>> If not, would it be more desirable to *not* join on virtual threads >>> when unchecked exceptions occur? This would allow the application code to >>> catch the exception sooner and avoid the risk of being blocked >>> indefinitely. >>> >>> Thank you for your time and insights. >>> >>> Best regards, >>> >>> Ben Yu >>> >>