This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 608cbc98eb add test for groupBy not throwing 
tooManySubstreamsOpenException on closed substream (#2742)
608cbc98eb is described below

commit 608cbc98eb7cf04d43f116fdf7bc67f88d92d282
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Mar 17 14:03:04 2026 +0100

    add test for groupBy not throwing tooManySubstreamsOpenException on closed 
substream (#2742)
    
    * Initial plan
    
    * Port akka-core PR #31872: add test for tooManySubstreamsOpenException on 
closed substream
    
    Co-authored-by: pjfanning <[email protected]>
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../apache/pekko/stream/scaladsl/FlowGroupBySpec.scala  | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala
index d6e3fd6c56..8055bded4c 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala
@@ -707,6 +707,23 @@ class FlowGroupBySpec extends StreamSpec("""
         .expectComplete()
     }
 
+    "not throw tooManySubstreamsOpenException for element on closed substream" 
in {
+      val publisher = TestPublisher.Probe[(Int, Boolean)]()
+      val outProbe =
+        Source.fromPublisher(publisher).groupBy(2, _._1).takeWhile(_._2 != 
false).mergeSubstreams.runWith(TestSink())
+      outProbe.request(4)
+      publisher.sendNext((1, true))
+      outProbe.expectNext((1, true))
+      publisher.sendNext((2, true))
+      outProbe.expectNext((2, true))
+      publisher.sendNext((2, false)) // substream 2 completed
+      publisher.sendNext((2, false)) // should be dropped, not crash the stream
+      publisher.sendNext((1, true))
+      outProbe.expectNext((1, true))
+
+      outProbe.cancel()
+    }
+
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to