This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 608cbc98eb add test for groupBy not throwing
tooManySubstreamsOpenException on closed substream (#2742)
608cbc98eb is described below
commit 608cbc98eb7cf04d43f116fdf7bc67f88d92d282
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Mar 17 14:03:04 2026 +0100
add test for groupBy not throwing tooManySubstreamsOpenException on closed
substream (#2742)
* Initial plan
* Port akka-core PR #31872: add test for tooManySubstreamsOpenException on
closed substream
Co-authored-by: pjfanning <[email protected]>
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../apache/pekko/stream/scaladsl/FlowGroupBySpec.scala | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala
index d6e3fd6c56..8055bded4c 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala
@@ -707,6 +707,23 @@ class FlowGroupBySpec extends StreamSpec("""
.expectComplete()
}
+ "not throw tooManySubstreamsOpenException for element on closed substream"
in {
+ val publisher = TestPublisher.Probe[(Int, Boolean)]()
+ val outProbe =
+ Source.fromPublisher(publisher).groupBy(2, _._1).takeWhile(_._2 !=
false).mergeSubstreams.runWith(TestSink())
+ outProbe.request(4)
+ publisher.sendNext((1, true))
+ outProbe.expectNext((1, true))
+ publisher.sendNext((2, true))
+ outProbe.expectNext((2, true))
+ publisher.sendNext((2, false)) // substream 2 completed
+ publisher.sendNext((2, false)) // should be dropped, not crash the stream
+ publisher.sendNext((1, true))
+ outProbe.expectNext((1, true))
+
+ outProbe.cancel()
+ }
+
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]