This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 9950b4aa67 Fix Java 25 stream test timeout failures (#2573) (#2728)
9950b4aa67 is described below

commit 9950b4aa673ff2165e04ce76147acef71bf23b27
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Mar 15 20:52:32 2026 +0800

    Fix Java 25 stream test timeout failures (#2573) (#2728)
    
    Motivation: JDK 25 ForkJoinPool scheduling changes cause nightly CI
    stream tests to fail with timeouts — ~100+ failures per night.
    
    Modifications:
    - Increase timefactor from 2 to 3 for JDK 25+ in nightly CI workflow
    - Add 30s PatienceConfig to FlowFlatMapConcatParallelismSpec (100K elements)
    - Add 30s PatienceConfig to HubSpec (20K element long-stream tests)
    - Increase MapAsyncPartitionedSpec patience from 5s to 15s
    - Increase AggregateWithBoundarySpec Await timeouts from 10s to 30s
    
    Result: Timing-sensitive stream tests have sufficient timeout headroom
    on JDK 25, reducing false failures in nightly CI.
    
    References: https://github.com/apache/pekko/issues/2573
    
    Co-authored-by: Copilot <[email protected]>
---
 .github/workflows/nightly-builds.yml                           | 10 ++++++++--
 .../pekko/stream/scaladsl/AggregateWithBoundarySpec.scala      | 10 +++++-----
 .../stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala     |  8 ++++++++
 .../test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala  |  7 +++++++
 .../org/apache/pekko/stream/MapAsyncPartitionedSpec.scala      |  4 +++-
 5 files changed, 31 insertions(+), 8 deletions(-)

diff --git a/.github/workflows/nightly-builds.yml 
b/.github/workflows/nightly-builds.yml
index 4117bb7e6b..5168d398a9 100644
--- a/.github/workflows/nightly-builds.yml
+++ b/.github/workflows/nightly-builds.yml
@@ -148,12 +148,18 @@ jobs:
 
       - name: Compile and Test
         # note that this is not running any multi-jvm tests because 
multi-in-test=false
+        # JDK 25 ForkJoinPool scheduling changes need a higher timefactor (see 
#2573)
         run: |-
+          if [ "${{ matrix.javaVersion }}" -ge 25 ]; then
+            TIMEFACTOR=3
+          else
+            TIMEFACTOR=2
+          fi
           sbt \
             -Dpekko.cluster.assert=on \
             -Dpekko.log.timestamps=true \
-            -Dpekko.test.timefactor=2 \
-            -Dpekko.actor.testkit.typed.timefactor=2 \
+            -Dpekko.test.timefactor=$TIMEFACTOR \
+            -Dpekko.actor.testkit.typed.timefactor=$TIMEFACTOR \
             -Dpekko.test.tags.exclude=gh-exclude,timing \
             -Dpekko.test.multi-in-test=false \
             -Dio.netty.leakDetection.level=PARANOID \
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala
index 0cda1362b6..f56c1fcabb 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala
@@ -41,7 +41,7 @@ class AggregateWithBoundarySpec extends StreamSpec {
         }, harvest = buffer => buffer.toSeq, emitOnTimer = None)
       .runWith(Sink.collection)
 
-    Await.result(result, 10.seconds) should be(stream.grouped(groupSize).toSeq)
+    Await.result(result, 30.seconds) should be(stream.grouped(groupSize).toSeq)
 
   }
 
@@ -58,7 +58,7 @@ class AggregateWithBoundarySpec extends StreamSpec {
         emitOnTimer = None)
       .runWith(Sink.collection)
 
-    Await.result(result, 10.seconds) should 
be(stream.grouped(groupSize).toSeq.map(seq => seq :+ -1))
+    Await.result(result, 30.seconds) should 
be(stream.grouped(groupSize).toSeq.map(seq => seq :+ -1))
 
   }
 
@@ -73,7 +73,7 @@ class AggregateWithBoundarySpec extends StreamSpec {
         }, harvest = buffer => buffer.toSeq, emitOnTimer = None)
       .runWith(Sink.collection)
 
-    Await.result(result, 10.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6), 
Seq(7)))
+    Await.result(result, 30.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6), 
Seq(7)))
   }
 
 }
@@ -186,7 +186,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends 
AnyWordSpecLike with
     p.sendNext(7)
     p.sendComplete()
 
-    Await.result(result, 10.seconds) should be(Seq(Seq(1, 2), Seq(3, 4), 
Seq(5, 6, 7)))
+    Await.result(result, 30.seconds) should be(Seq(Seq(1, 2), Seq(3, 4), 
Seq(5, 6, 7)))
 
   }
 
@@ -227,7 +227,7 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends 
AnyWordSpecLike with
     p.sendNext(7)
     p.sendComplete()
 
-    Await.result(result, 10.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6, 
7)))
+    Await.result(result, 30.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6, 
7)))
 
   }
 
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
index 2a43334b06..befee24c08 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
@@ -33,9 +33,17 @@ import pekko.stream._
 import pekko.stream.testkit.{ ScriptedTest, StreamSpec }
 import pekko.stream.testkit.scaladsl.TestSink
 
+import org.scalatest.time.{ Seconds, Span }
+
 class FlowFlatMapConcatParallelismSpec extends StreamSpec("""
     pekko.stream.materializer.initial-input-buffer-size = 2
   """) with ScriptedTest with FutureTimeoutSupport {
+
+  // 100K-element tests need extra headroom, especially on JDK 25+ where
+  // ForkJoinPool scheduling changes slow down highly-parallel workloads 
(#2573)
+  override implicit val patience: PatienceConfig =
+    PatienceConfig(timeout = Span(30, Seconds), interval = Span(1, Seconds))
+
   val toSeq = Flow[Int].grouped(1000).toMat(Sink.head)(Keep.right)
 
   class BoomException extends RuntimeException("BOOM~~") with NoStackTrace
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
index da98c24f49..ce7d19c48f 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
@@ -30,9 +30,16 @@ import pekko.stream.testkit.scaladsl.TestSink
 import pekko.stream.testkit.scaladsl.TestSource
 import pekko.testkit.EventFilter
 
+import org.scalatest.time.{ Seconds, Span }
+
 class HubSpec extends StreamSpec {
   implicit val ec: ExecutionContext = system.dispatcher
 
+  // Long-stream tests (20K elements) need extra headroom on JDK 25+
+  // where ForkJoinPool scheduling changes cause slower throughput (#2573)
+  override implicit val patience: PatienceConfig =
+    PatienceConfig(timeout = Span(30, Seconds), interval = Span(1, Seconds))
+
   "MergeHub" must {
 
     "work in the happy case" in {
diff --git 
a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
 
b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
index 765b335a78..7ccd38e28e 100644
--- 
a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
+++ 
b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
@@ -107,8 +107,10 @@ class MapAsyncPartitionedSpec
 
   import MapAsyncPartitionedSpec.TestData._
 
+  // Property-based tests with blocking operations need extra headroom,
+  // especially on JDK 25+ with ForkJoinPool scheduling changes (#2573)
   override implicit def patienceConfig: PatienceConfig = PatienceConfig(
-    timeout = 5.seconds,
+    timeout = 15.seconds,
     interval = 100.millis)
 
   private implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty, 
"test-system")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to