Yes. I got two tests that show the behavior difference between downstream exception and upstream exception:
@Test public void mapConcurrently_upstreamFailureDoesNotInterrupt() { ConcurrentLinkedQueue<Integer> started = new ConcurrentLinkedQueue<>(); ConcurrentLinkedQueue<Integer> interrupted = new ConcurrentLinkedQueue<>(); RuntimeException thrown = assertThrows( RuntimeException.class, () -> Stream.of(1, 10, 3, 0) .peek(n -> { if (n == 3) { try { // give 1 and 3 some time to have at least started Thread.sleep(100); } catch (InterruptedException e) { interrupted.add(n); } throw new IllegalArgumentException(String.valueOf(n)); } }) .gather(mapConcurrently(3, n -> { started.add(n); try { Thread.sleep(n * 10000); } catch (InterruptedException e) { interrupted.add(n); } return n; })) .findAny()); assertThat(started).containsExactly(10, 1); assertThat(interrupted).isEmpty(); assertThat(thrown).hasMessageThat().isEqualTo("3"); } @Test public void mapConcurrently_downstreamFailurePropagated() { ConcurrentLinkedQueue<Integer> started = new ConcurrentLinkedQueue<>(); ConcurrentLinkedQueue<Integer> interrupted = new ConcurrentLinkedQueue<>(); RuntimeException thrown = assertThrows( RuntimeException.class, () -> Stream.of(10, 1, 3, 0) .gather(mapConcurrently(3, n -> { started.add(n); try { Thread.sleep(n * 1000); } catch (InterruptedException e) { interrupted.add(n); } return n; })) .peek(n -> { throw new IllegalArgumentException(String.valueOf(n)); }) .findAny()); assertThat(started).containsExactly(10, 1, 3); assertThat(interrupted).containsExactly(3, 10); assertThat(thrown).hasMessageThat().isEqualTo("1"); } Both with maxConcurrenc=3. When the downstream peek() throws on the first element "1" it gets, it will interrupt (and join) the other two pending threads (3 and 10). The interrupted ConcurrentLinkedQueue is guaranteed to see [3, 10] because of the happens-before guarantee. When the upstream peek() throws on 3, [1, 10] are also already running concurrently. But no thread is interrupted. On Thu, Jul 3, 2025 at 10:29 AM David Alayachew <davidalayac...@gmail.com> wrote: > These questions necessitate runnable examples. Do you have any > > On Thu, Jul 3, 2025, 10:37 AM Jige Yu <yuj...@gmail.com> wrote: > >> Hi JDK Core Devs, >> >> I'm writing to you today with a question about the behavior of >> mapConcurrent() and its interaction with unchecked exceptions. I've been >> experimenting with the API and observed that mapConcurrent() blocks and >> joins all virtual threads upon an unchecked exception before propagating it. >> >> Initially, I thought this design choice might provide a strong >> happens-before guarantee. My assumption was that an application catching a >> RuntimeException would be able to *observe all side effects* from the >> virtual threads, even though this practice is generally discouraged. This >> seemed like a potentially significant advantage, outweighing the risk of a >> virtual thread failing to respond to interruption or responding slowly. >> >> However, I've since realized that mapConcurrent() cannot fully guarantee >> a strong happens-before relationship when an unchecked exception occurs >> *somewhere* in the stream pipeline. While it can block and wait for >> exceptions thrown by the mapper function or downstream operations, it >> appears unable to intercept unchecked exceptions *thrown by an upstream* >> source. >> >> Consider a scenario with two input elements: if the first element starts >> a virtual thread, and then the second element causes an unchecked exception >> from the upstream *before* reaching the gather() call, the virtual >> thread initiated by the first element would not be interrupted. This makes >> the "happens-before" guarantee quite nuanced in practice. >> >> This brings me to my core questions: >> >> 1. >> >> Is providing a happens-before guarantee upon an unchecked exception a >> design goal for mapConcurrent()? >> 2. >> >> If not, would it be more desirable to *not* join on virtual threads >> when unchecked exceptions occur? This would allow the application code to >> catch the exception sooner and avoid the risk of being blocked >> indefinitely. >> >> Thank you for your time and insights. >> >> Best regards, >> >> Ben Yu >> >