This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new d0c8d0b302 Add FlowMapAsyncPartitionedSpec tests ported from 
akka/akka-core#31582 and akka/akka-core#31882 (#2747)
d0c8d0b302 is described below

commit d0c8d0b3023c41ab44748ebb7d0cd5beb0412ef7
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Mar 17 21:03:26 2026 +0100

    Add FlowMapAsyncPartitionedSpec tests ported from akka/akka-core#31582 and 
akka/akka-core#31882 (#2747)
    
    * Initial plan
    
    * Add FlowMapAsyncPartitionedSpec tests ported from akka/akka-core#31582 
and #31882
    
    Co-authored-by: pjfanning <[email protected]>
    
    * scalafmt
    
    * Update FlowMapAsyncPartitionedSpec.scala
    
    * Fix flaky 'resume after multiple failures' test - use deterministic 
Future.failed only
    
    Co-authored-by: pjfanning <[email protected]>
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../scaladsl/FlowMapAsyncPartitionedSpec.scala     | 539 +++++++++++++++++++++
 1 file changed, 539 insertions(+)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
new file mode 100644
index 0000000000..ece1d02847
--- /dev/null
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.immutable
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.Promise
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.Done
+import pekko.stream.ActorAttributes
+import pekko.stream.Supervision
+import pekko.stream.testkit._
+import pekko.stream.testkit.scaladsl.TestSink
+import pekko.stream.testkit.scaladsl.TestSource
+import pekko.testkit.TestLatch
+import pekko.testkit.TestProbe
+import pekko.testkit.WithLogCapturing
+
+import org.scalatest.compatible.Assertion
+
+// Tests ported from akka/akka-core#31582 and akka/akka-core#31882,
+// adapted for Pekko's mapAsyncPartitioned API (no perPartition parameter).
+// Pekko's implementation always processes at most one element per partition 
at a time.
+class FlowMapAsyncPartitionedSpec extends StreamSpec with WithLogCapturing {
+  import Utils.TE
+
+  "A Flow with mapAsyncPartitioned" must {
+    "produce future elements" in {
+      implicit val ec: ExecutionContext = system.dispatcher
+
+      val c = TestSubscriber.manualProbe[Int]()
+      Source(1 to 3)
+        .mapAsyncPartitioned(4)(_ % 2) { (elem, _) =>
+          Future(elem)
+        }
+        .runWith(Sink.fromSubscriber(c))
+
+      val sub = c.expectSubscription()
+      sub.request(2)
+      c.expectNext(1)
+      c.expectNext(2)
+      c.expectNoMessage(200.millis)
+      sub.request(2)
+      c.expectNext(3)
+      c.expectComplete()
+    }
+
+    "produce elements in order of ingestion regardless of future completion 
order" in {
+      val c = TestSubscriber.manualProbe[Int]()
+      val promises = (0 until 50).map { _ =>
+        Promise[Int]()
+      }.toArray
+
+      Source(0 until 50)
+        .mapAsyncPartitioned(25)(_ % 7) { (elem, _) =>
+          promises(elem).future
+        }
+        .to(Sink.fromSubscriber(c))
+        .run()
+
+      val sub = c.expectSubscription()
+      sub.request(1000)
+
+      // completes all the promises in random order
+      @annotation.tailrec
+      def iter(completed: Int, i: Int): Unit =
+        if (completed != promises.size) {
+          if (promises(i).isCompleted) iter(completed, (i + 1) % promises.size)
+          else {
+            promises(i).success(i)
+            iter(completed + 1, scala.util.Random.nextInt(promises.size))
+          }
+        }
+
+      iter(0, scala.util.Random.nextInt(promises.size))
+
+      (0 until 50).foreach { i =>
+        c.expectNext(i)
+      }
+      c.expectComplete()
+    }
+
+    "not run more futures than overall parallelism" in {
+      implicit val ec: ExecutionContext = system.dispatcher
+
+      val probe = TestProbe()
+      val c = TestSubscriber.manualProbe[Int]()
+
+      // parallelism and partitioner chosen to maximize number of futures
+      //  in-flight while leaving an opportunity for the stream etc. to still 
run
+      val maxParallelism = (Runtime.getRuntime.availableProcessors - 
1).max(3).min(8)
+      Source(1 to 22)
+        .mapAsyncPartitioned(maxParallelism)(_ % 22) { (elem, _) =>
+          Future {
+            probe.ref ! elem
+            elem
+          }
+        }
+        .to(Sink.fromSubscriber(c))
+        .run()
+
+      val sub = c.expectSubscription()
+      probe.expectNoMessage(100.millis)
+      sub.request(1)
+
+      // theSameElementsAs (viz. ordering insensitive) because the order in 
which messages are received by the probe
+      //  is not deterministic, but the bunching (caused by downstream demand) 
should be deterministic
+      probe.receiveN(maxParallelism + 1) should contain theSameElementsAs (1 
to (maxParallelism + 1))
+      probe.expectNoMessage(100.millis)
+      sub.request(2)
+      probe.receiveN(2) should contain theSameElementsAs (2 to 3).map(_ + 
maxParallelism)
+      probe.expectNoMessage(100.millis)
+      sub.request(10)
+      probe.receiveN(10) should contain theSameElementsAs (4 to 13).map(_ + 
maxParallelism)
+      probe.expectNoMessage(200.millis)
+    }
+
+    // Pekko's mapAsyncPartitioned always uses perPartition=1 (at most one 
in-flight future per partition),
+    // meaning elements within the same partition are processed sequentially.
+    // This test verifies that elements from different partitions CAN run 
concurrently
+    // and that the stream buffers elements without backpressuring upstream.
+    "not backpressure when partition slots are busy" in {
+      val processingProbe = TestProbe()
+      case class Elem(n: Int, promise: Promise[Done])
+      val (sourceProbe, result) =
+        TestSource[Int]()
+          .viaMat(Flow[Int].mapAsyncPartitioned(10)(_ < 9) {
+            case (n, _) =>
+              val promise = Promise[Done]()
+              processingProbe.ref ! Elem(n, promise)
+              promise.future.map(_ => n)(ExecutionContext.parasitic)
+          })(Keep.left)
+          .toMat(Sink.seq[Int])(Keep.both)
+          .run()
+
+      // we get to send all right away (goes into buffers)
+      (0 to 10).foreach(n => sourceProbe.sendNext(n))
+
+      // only these two in flight, one per partition (Pekko always uses 
perPartition=1)
+      // partition true (n < 9)
+      val elem0 = processingProbe.expectMsgType[Elem]
+      elem0.n should ===(0)
+
+      // partition false (n >= 9)
+      val elem9 = processingProbe.expectMsgType[Elem]
+      elem9.n should ===(9)
+
+      processingProbe.expectNoMessage(10.millis) // both partitions busy
+
+      // unlock partition true, should let us work through all elements in 
that partition
+      elem0.promise.success(Done)
+      (1 to 8).foreach { n =>
+        val elemN = processingProbe.expectMsgType[Elem]
+        elemN.n should ===(n)
+        elemN.promise.success(Done)
+      }
+
+      // unlock partition false
+      elem9.promise.success(Done)
+      val elem10 = processingProbe.expectMsgType[Elem]
+      elem10.n should ===(10)
+      elem10.promise.success(Done)
+
+      sourceProbe.sendComplete()
+
+      // results are in order
+      result.futureValue should ===((0 to 10).toVector)
+    }
+
+    "signal future already failed" in {
+      implicit val ec: ExecutionContext = system.dispatcher
+
+      val latch = TestLatch(1)
+      val c = TestSubscriber.manualProbe[Int]()
+      Source(1 to 5)
+        .mapAsyncPartitioned(4)(_ % 3) { (elem, _) =>
+          if (elem == 3) Future.failed[Int](new TE("BOOM TRES!"))
+          else
+            Future {
+              Await.ready(latch, 10.seconds)
+              elem
+            }
+        }
+        .to(Sink.fromSubscriber(c))
+        .run()
+
+      val sub = c.expectSubscription()
+      sub.request(10)
+      c.expectError().getMessage shouldBe "BOOM TRES!"
+      latch.countDown()
+    }
+
+    "signal future failure" in {
+      implicit val ec: ExecutionContext = system.dispatcher
+
+      val latch = TestLatch(1)
+      val c = TestSubscriber.manualProbe[Int]()
+
+      Source(1 to 5)
+        .mapAsyncPartitioned(4)(_ % 3) { (elem, _) =>
+          Future {
+            if (elem == 3) throw new TE("BOOM TROIS!")
+            else {
+              Await.ready(latch, 10.seconds)
+              elem
+            }
+          }
+        }
+        .to(Sink.fromSubscriber(c))
+        .run()
+
+      val sub = c.expectSubscription()
+      sub.request(10)
+      c.expectError().getMessage shouldBe "BOOM TROIS!"
+      latch.countDown()
+    }
+
+    "signal future failure asap" in {
+      val latch = TestLatch(1)
+      val done =
+        Source(1 to 5)
+          .map { elem =>
+            if (elem == 1) elem
+            else {
+              // Slow the upstream after the first
+              Await.ready(latch, 10.seconds)
+              elem
+            }
+          }
+          .mapAsyncPartitioned(4)(_ % 3) { (elem, _) =>
+            if (elem == 1) Future.failed(new TE("BOOM EIN!"))
+            else Future.successful(elem)
+          }
+          .runWith(Sink.ignore)
+
+      intercept[TE] {
+        Await.result(done, remainingOrDefault)
+      }.getMessage shouldBe "BOOM EIN!"
+      latch.countDown()
+    }
+
+    "fail ASAP midstream" in {
+      implicit val ec: ExecutionContext = system.dispatcher
+
+      val promises = (0 until 6).map(_ => Promise[Int]()).toArray
+      val probe =
+        Source(0 until 6)
+          .mapAsyncPartitioned(5)(_ % 7) { (elem, _) =>
+            promises(elem).future.map(n => ('A' + n).toChar)
+          }
+          .runWith(TestSink())
+
+      probe.request(100)
+      val failure = new Exception("BOOM tvÄ")
+      scala.util.Random.shuffle((0 until 6): immutable.Seq[Int]).foreach { n =>
+        if (n == 2) promises(n).failure(failure)
+        else promises(n).success(n)
+      }
+
+      // we don't know when the third promise will be failed
+      probe.expectNextOrError() match {
+        case Left(ex)   => ex.getMessage shouldBe failure.getMessage // fine, 
error can overtake elements
+        case Right('A') =>
+          probe.expectNextOrError() match {
+            case Left(ex)   => ex.getMessage shouldBe failure.getMessage // 
fine, error can overtake elements
+            case Right('B') =>
+              probe.expectNextOrError() match {
+                case Left(ex) => ex.getMessage shouldBe failure.getMessage // 
fine, error can overtake elements
+                case Right(n) => fail(s"stage should have failed rather than 
emit $n")
+              }
+
+            case unexpected => fail(s"unexpected $unexpected")
+          }
+
+        case unexpected => fail(s"unexpected $unexpected")
+      }
+    }
+
+    "drop failed elements midstream when resume supervision is in place" in {
+      implicit val ec: ExecutionContext = system.dispatcher
+
+      val promises = (0 until 6).map(_ => Promise[Int]()).toArray
+      val elements =
+        Source(0 until 6)
+          .mapAsyncPartitioned(5)(_ % 7) { (elem, _) =>
+            promises(elem).future.map(n => ('A' + n).toChar)
+          }
+          
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
+          .runWith(Sink.seq)
+
+      val failure = new Exception("BOOM TWEE!")
+      scala.util.Random.shuffle((0 until 6): immutable.Seq[Int]).foreach { n =>
+        if (n == 2) promises(n).failure(failure)
+        else promises(n).success(n)
+      }
+
+      elements.futureValue should contain theSameElementsInOrderAs "ABDEF"
+    }
+
+    "signal error when constructing future" in {
+      implicit val ec: ExecutionContext = system.dispatcher
+
+      val latch = TestLatch(1)
+      val c = TestSubscriber.manualProbe[Int]()
+
+      // Use identity partitioner so each element has a unique partition, 
allowing elem 3 to start
+      // concurrently with other elements (Pekko processes at most one element 
per partition at a time)
+      Source(1 to 5)
+        .mapAsyncPartitioned(4)(identity) { (elem, _) =>
+          if (elem == 3) throw new TE("BOOM TRE!")
+          else
+            Future {
+              Await.ready(latch, 10.seconds)
+              elem
+            }
+        }
+        .to(Sink.fromSubscriber(c))
+        .run()
+
+      val sub = c.expectSubscription()
+      sub.request(10)
+      c.expectError().getMessage shouldBe "BOOM TRE!"
+      latch.countDown()
+    }
+
+    "resume after failed future if resume supervision is in place" in {
+      implicit val ec: ExecutionContext = system.dispatcher
+
+      val elements =
+        Source(1 to 5)
+          .mapAsyncPartitioned(4)(_ % 2) { (elem, _) =>
+            Future {
+              if (elem == 3) throw new TE("BOOM TRZY!")
+              else elem
+            }
+          }
+          
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
+          .runWith(Sink.seq)
+
+      elements.futureValue should contain theSameElementsInOrderAs Seq(1, 2, 
4, 5)
+    }
+
+    "resume after already failed future if resume supervision is in place" in {
+      val expected =
+        Source(1 to 5)
+          .mapAsyncPartitioned(4)(_ % 2) { (elem, _) =>
+            if (elem == 3) Future.failed(new TE("BOOM TRI!"))
+            else Future.successful(elem)
+          }
+          
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
+          .runWith(Sink.seq)
+
+      expected.futureValue should contain theSameElementsInOrderAs Seq(1, 2, 
4, 5)
+    }
+
+    "resume after multiple failures if resume supervision is in place" in {
+      val expected =
+        Source(1 to 10)
+          .mapAsyncPartitioned(4)(_ % 3) { (elem, _) =>
+            if (elem % 4 < 3) Future.failed(new TE("BOOM!"))
+            else Future.successful(elem)
+          }
+          
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
+          .runWith(Sink.seq)
+
+      expected.futureValue should contain theSameElementsInOrderAs (1 to 
10).filter(_ % 4 == 3)
+    }
+
+    "ignore null-completed futures" in {
+      val shouldBeNull = {
+        val n = scala.util.Random.nextInt(10) + 1
+        (1 to n).foldLeft(Set.empty[Int]) { (set, _) =>
+          set + scala.util.Random.nextInt(10)
+        }
+      }
+      if (shouldBeNull.isEmpty) fail("should be at least one null")
+
+      val f: (Int, Int) => Future[String] = { (elem, _) =>
+        if (shouldBeNull(elem)) Future.successful(null)
+        else Future.successful(elem.toString)
+      }
+
+      val result =
+        Source(1 to 10).mapAsyncPartitioned(5)(_ % 2)(f).runWith(Sink.seq)
+
+      result.futureValue should contain theSameElementsInOrderAs (1 to 
10).filterNot(shouldBeNull).map(_.toString)
+    }
+
+    "handle cancel properly" in {
+      val pub = TestPublisher.manualProbe[Int]()
+      val sub = TestSubscriber.manualProbe[Int]()
+
+      Source
+        .fromPublisher(pub)
+        .mapAsyncPartitioned(4)(_ % 2) { (elem, _) =>
+          Future.successful(elem)
+        }
+        .runWith(Sink.fromSubscriber(sub))
+
+      val upstream = pub.expectSubscription()
+      upstream.expectRequest()
+
+      sub.expectSubscription().cancel()
+
+      upstream.expectCancellation()
+    }
+
+    val maxParallelism = Runtime.getRuntime.availableProcessors.max(8)
+    // Pekko always uses perPartition=1; numPartitions > maxParallelism 
ensures all partitions can be active
+    val numPartitions = maxParallelism + 1
+
+    s"not run more futures than allowed (maxParallelism: $maxParallelism)" in {
+      val partitioner: Int => Int = _ % numPartitions
+
+      val globalCounter = new AtomicInteger
+      val partitionCounters = (0 until numPartitions).map(_ -> new 
AtomicInteger).toMap: Map[Int, AtomicInteger]
+      val queue = new LinkedBlockingQueue[(Int, Promise[Int], Long)]
+
+      val timer = new Thread {
+        val maxDelay = 100000 // nanos
+
+        def delay(): Int =
+          scala.util.Random.nextInt(maxDelay) + 1
+
+        var count = 0
+
+        @annotation.tailrec
+        final override def run(): Unit = {
+          val cont =
+            try {
+              val (partition, promise, enqueued) = queue.take()
+              val wakeup = enqueued + delay()
+              while (System.nanoTime() < wakeup) {}
+              globalCounter.decrementAndGet()
+              partitionCounters(partition).decrementAndGet()
+              promise.success(count)
+              count += 1
+              true
+            } catch {
+              case _: InterruptedException => false
+            }
+
+          if (cont) run()
+        }
+      }
+      timer.start()
+
+      def deferred(partition: Int): Future[Int] =
+        if (globalCounter.incrementAndGet() > maxParallelism) {
+          Future.failed(new AssertionError("global parallelism exceeded"))
+        } else if (partitionCounters(partition).incrementAndGet() > 1) {
+          // Pekko always has perPartition=1: at most one future per partition 
at a time
+          Future.failed(new AssertionError(s"partition parallelism for 
partition $partition exceeded"))
+        } else {
+          val p = Promise[Int]()
+          queue.offer((partition, p, System.nanoTime()))
+          p.future
+        }
+
+      try {
+        @volatile var lastElem = 0
+        val successes =
+          Source(1 to 10000)
+            .mapAsyncPartitioned(maxParallelism)(partitioner) { (_, partition) 
=>
+              deferred(partition)
+            }
+            .filter { elem =>
+              if (elem == lastElem + 1) {
+                lastElem = elem
+                true
+              } else false
+            }
+            .runFold(0) { (c, e) =>
+              c + e
+            }
+
+        successes.futureValue shouldBe (5000 * 9999) // sum of 1 to 9999
+      } finally {
+        timer.interrupt()
+      }
+    }
+
+    val thisWillNotStand = TE("this will not stand")
+
+    def deciderTest(f: (Boolean, Boolean) => Future[Boolean]): Assertion = {
+      val failCount = new AtomicInteger
+      val result =
+        Source(List(true, false))
+          .mapAsyncPartitioned(2)(identity)(f)
+          .addAttributes(ActorAttributes.supervisionStrategy {
+            case x if x == thisWillNotStand =>
+              failCount.incrementAndGet()
+              Supervision.resume
+
+            case _ => Supervision.stop
+          })
+          .runWith(Sink.seq)
+
+      result.futureValue should contain only false
+      failCount.get() shouldBe 1
+    }
+
+    "not invoke the decider twice for the same failed future" in {
+      implicit val ec: ExecutionContext = system.dispatcher
+
+      deciderTest { (elem, _) =>
+        Future {
+          if (elem) throw thisWillNotStand
+          else elem
+        }
+      }
+    }
+
+    "not invoke the decider twice for the same pre-failed future" in {
+      deciderTest { (elem, _) =>
+        if (elem) Future.failed(thisWillNotStand)
+        else Future.successful(elem)
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to