This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 9950b4aa67 Fix Java 25 stream test timeout failures (#2573) (#2728)
9950b4aa67 is described below
commit 9950b4aa673ff2165e04ce76147acef71bf23b27
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Mar 15 20:52:32 2026 +0800
Fix Java 25 stream test timeout failures (#2573) (#2728)
Motivation: JDK 25 ForkJoinPool scheduling changes cause nightly CI
stream tests to fail with timeouts — ~100+ failures per night.
Modifications:
- Increase timefactor from 2 to 3 for JDK 25+ in nightly CI workflow
- Add 30s PatienceConfig to FlowFlatMapConcatParallelismSpec (100K elements)
- Add 30s PatienceConfig to HubSpec (20K element long-stream tests)
- Increase MapAsyncPartitionedSpec patience from 5s to 15s
- Increase AggregateWithBoundarySpec Await timeouts from 10s to 30s
Result: Timing-sensitive stream tests have sufficient timeout headroom
on JDK 25, reducing false failures in nightly CI.
References: https://github.com/apache/pekko/issues/2573
Co-authored-by: Copilot <[email protected]>
---
.github/workflows/nightly-builds.yml | 10 ++++++++--
.../pekko/stream/scaladsl/AggregateWithBoundarySpec.scala | 10 +++++-----
.../stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala | 8 ++++++++
.../test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala | 7 +++++++
.../org/apache/pekko/stream/MapAsyncPartitionedSpec.scala | 4 +++-
5 files changed, 31 insertions(+), 8 deletions(-)
diff --git a/.github/workflows/nightly-builds.yml
b/.github/workflows/nightly-builds.yml
index 4117bb7e6b..5168d398a9 100644
--- a/.github/workflows/nightly-builds.yml
+++ b/.github/workflows/nightly-builds.yml
@@ -148,12 +148,18 @@ jobs:
- name: Compile and Test
# note that this is not running any multi-jvm tests because
multi-in-test=false
+ # JDK 25 ForkJoinPool scheduling changes need a higher timefactor (see
#2573)
run: |-
+ if [ "${{ matrix.javaVersion }}" -ge 25 ]; then
+ TIMEFACTOR=3
+ else
+ TIMEFACTOR=2
+ fi
sbt \
-Dpekko.cluster.assert=on \
-Dpekko.log.timestamps=true \
- -Dpekko.test.timefactor=2 \
- -Dpekko.actor.testkit.typed.timefactor=2 \
+ -Dpekko.test.timefactor=$TIMEFACTOR \
+ -Dpekko.actor.testkit.typed.timefactor=$TIMEFACTOR \
-Dpekko.test.tags.exclude=gh-exclude,timing \
-Dpekko.test.multi-in-test=false \
-Dio.netty.leakDetection.level=PARANOID \
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala
index 0cda1362b6..f56c1fcabb 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala
@@ -41,7 +41,7 @@ class AggregateWithBoundarySpec extends StreamSpec {
}, harvest = buffer => buffer.toSeq, emitOnTimer = None)
.runWith(Sink.collection)
- Await.result(result, 10.seconds) should be(stream.grouped(groupSize).toSeq)
+ Await.result(result, 30.seconds) should be(stream.grouped(groupSize).toSeq)
}
@@ -58,7 +58,7 @@ class AggregateWithBoundarySpec extends StreamSpec {
emitOnTimer = None)
.runWith(Sink.collection)
- Await.result(result, 10.seconds) should
be(stream.grouped(groupSize).toSeq.map(seq => seq :+ -1))
+ Await.result(result, 30.seconds) should
be(stream.grouped(groupSize).toSeq.map(seq => seq :+ -1))
}
@@ -73,7 +73,7 @@ class AggregateWithBoundarySpec extends StreamSpec {
}, harvest = buffer => buffer.toSeq, emitOnTimer = None)
.runWith(Sink.collection)
- Await.result(result, 10.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6),
Seq(7)))
+ Await.result(result, 30.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6),
Seq(7)))
}
}
@@ -186,7 +186,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends
AnyWordSpecLike with
p.sendNext(7)
p.sendComplete()
- Await.result(result, 10.seconds) should be(Seq(Seq(1, 2), Seq(3, 4),
Seq(5, 6, 7)))
+ Await.result(result, 30.seconds) should be(Seq(Seq(1, 2), Seq(3, 4),
Seq(5, 6, 7)))
}
@@ -227,7 +227,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends
AnyWordSpecLike with
p.sendNext(7)
p.sendComplete()
- Await.result(result, 10.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6,
7)))
+ Await.result(result, 30.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6,
7)))
}
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
index 2a43334b06..befee24c08 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
@@ -33,9 +33,17 @@ import pekko.stream._
import pekko.stream.testkit.{ ScriptedTest, StreamSpec }
import pekko.stream.testkit.scaladsl.TestSink
+import org.scalatest.time.{ Seconds, Span }
+
class FlowFlatMapConcatParallelismSpec extends StreamSpec("""
pekko.stream.materializer.initial-input-buffer-size = 2
""") with ScriptedTest with FutureTimeoutSupport {
+
+ // 100K-element tests need extra headroom, especially on JDK 25+ where
+ // ForkJoinPool scheduling changes slow down highly-parallel workloads
(#2573)
+ override implicit val patience: PatienceConfig =
+ PatienceConfig(timeout = Span(30, Seconds), interval = Span(1, Seconds))
+
val toSeq = Flow[Int].grouped(1000).toMat(Sink.head)(Keep.right)
class BoomException extends RuntimeException("BOOM~~") with NoStackTrace
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
index da98c24f49..ce7d19c48f 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
@@ -30,9 +30,16 @@ import pekko.stream.testkit.scaladsl.TestSink
import pekko.stream.testkit.scaladsl.TestSource
import pekko.testkit.EventFilter
+import org.scalatest.time.{ Seconds, Span }
+
class HubSpec extends StreamSpec {
implicit val ec: ExecutionContext = system.dispatcher
+ // Long-stream tests (20K elements) need extra headroom on JDK 25+
+ // where ForkJoinPool scheduling changes cause slower throughput (#2573)
+ override implicit val patience: PatienceConfig =
+ PatienceConfig(timeout = Span(30, Seconds), interval = Span(1, Seconds))
+
"MergeHub" must {
"work in the happy case" in {
diff --git
a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
index 765b335a78..7ccd38e28e 100644
---
a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
+++
b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
@@ -107,8 +107,10 @@ class MapAsyncPartitionedSpec
import MapAsyncPartitionedSpec.TestData._
+ // Property-based tests with blocking operations need extra headroom,
+ // especially on JDK 25+ with ForkJoinPool scheduling changes (#2573)
override implicit def patienceConfig: PatienceConfig = PatienceConfig(
- timeout = 5.seconds,
+ timeout = 15.seconds,
interval = 100.millis)
private implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty,
"test-system")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]