This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch 1.5.x
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/1.5.x by this push:
     new 20c2b18040 Source.combine single source with type-transforming fan-in 
strategies (#2746)
20c2b18040 is described below

commit 20c2b18040489faa1cf1103e80dca27dc76dbada
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Mar 17 00:59:05 2026 +0100

    Source.combine single source with type-transforming fan-in strategies 
(#2746)
    
    * fix(stream): Source.combine single source with type-transforming fan-in 
strategies (#2723) (#2726)
    
    Motivation:
    Source.combine with a single source bypassed the fan-in strategy using an 
unsafe
    asInstanceOf cast. This worked for type-preserving strategies like Merge (T 
→ T),
    but silently produced incorrect results for type-transforming strategies 
like
    MergeLatest (T → List[T]). For example:
      Source.combine(Seq(Source.single(1)))(MergeLatest(_)) emitted 1 instead 
of List(1)
    
    Modification:
    - Introduce TypePreservingFanIn marker trait for fan-in stages where T == U 
AND
      single-input behavior is a no-op pass-through (Merge, Concat, Interleave,
      MergePrioritized, OrElse)
    - MergeSequence intentionally NOT marked: despite being T → T, it validates
      sequence ordering (not a pure pass-through)
    - Source.combine single-source case: check TypePreservingFanIn trait before
      bypassing. Strategies without the trait are routed through the fan-in 
graph.
    - Relax Concat, Interleave, MergeSequence to accept inputPorts >= 1 (was > 
1).
      This eliminates the need for a try-catch fallback in Source.combine and 
allows
      these stages to be used directly with a single input.
    - Use Source.fromGraph for non-Source Graph inputs safety
    - Add 14 regression tests (12 Scala + 1 Java + MergeSequence validation)
    
    Result:
    - MergeLatest/ZipWithN correctly apply their transformation even for single 
source
    - Merge/Concat/Interleave correctly bypass (type-preserving optimization)
    - MergeSequence correctly validates sequences even for single source
    - Unknown/third-party strategies default to routing through the fan-in graph
      (safe default for strategies that may transform types)
    - Binary compatibility maintained (verified via MiMa)
    
    References:
    - https://github.com/apache/pekko/issues/2723
    - https://github.com/apache/pekko/pull/2726
    
    * fix `@since` on new classes (#2732)
    
    * scala 2.12 compile
    
    ---------
    
    Co-authored-by: He-Pin(kerr) <[email protected]>
---
 .../apache/pekko/stream/javadsl/SourceTest.java    |  15 +++
 .../apache/pekko/stream/scaladsl/SinkSpec.scala    |  41 +++++++++
 .../apache/pekko/stream/scaladsl/SourceSpec.scala  | 101 +++++++++++++++++++++
 .../apache/pekko/stream/TypePreservingFanIn.scala  |  48 ++++++++++
 .../apache/pekko/stream/TypePreservingFanOut.scala |  44 +++++++++
 .../org/apache/pekko/stream/scaladsl/Graph.scala   |  43 ++++++---
 .../org/apache/pekko/stream/scaladsl/Sink.scala    |  22 ++++-
 .../org/apache/pekko/stream/scaladsl/Source.scala  |  41 ++++++++-
 8 files changed, 340 insertions(+), 15 deletions(-)

diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index bc4d67db7c..f958176cf1 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -1140,6 +1140,21 @@ public class SourceTest extends StreamTest {
     assertEquals(6, result.toCompletableFuture().get(3, 
TimeUnit.SECONDS).intValue());
   }
 
+  // Regression test for https://github.com/apache/pekko/issues/2723
+  // Verifies that Source.combine with a single source correctly applies
+  // type-transforming strategies (like MergeLatest), rather than bypassing
+  // them with an unsafe asInstanceOf cast.
+  @Test
+  public void mustBeAbleToCombineSingleSourceWithMergeLatest() throws 
Exception {
+    final List<Source<Integer, NotUsed>> sources = 
Collections.singletonList(Source.single(1));
+    final List<List<Integer>> result =
+        Source.<Integer, List<Integer>, NotUsed>combine(sources, 
MergeLatest::create)
+            .runWith(Sink.collect(Collectors.toList()), system)
+            .toCompletableFuture()
+            .get(3, TimeUnit.SECONDS);
+    assertEquals(Collections.singletonList(Collections.singletonList(1)), 
result);
+  }
+
   @SuppressWarnings("unchecked")
   @Test
   public void mustBeAbleToZipN() throws Exception {
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
index 23124024f5..7cb34c8acc 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
@@ -19,12 +19,15 @@ import scala.concurrent.duration._
 
 import org.apache.pekko
 import pekko.Done
+import pekko.dispatch.ExecutionContexts
 import pekko.stream._
 import pekko.stream.ActorAttributes.supervisionStrategy
 import pekko.stream.testkit._
 import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
 import pekko.testkit.DefaultTimeout
 
+import scala.collection.immutable
+
 import org.reactivestreams.Publisher
 
 import org.scalatest.concurrent.ScalaFutures
@@ -192,6 +195,44 @@ class SinkSpec extends StreamSpec with DefaultTimeout with 
ScalaFutures {
       }
     }
 
+    // Regression tests for Sink.combine single-sink case — mirrors 
Source.combine fix (#2723).
+    // The single-sink case previously used an unsafe asInstanceOf cast.
+
+    "combine single sink with Broadcast should work (type-preserving bypass)" 
in {
+      // Broadcast has TypePreservingFanOut, so the single-sink case is safely 
bypassed.
+      implicit val ex: scala.concurrent.ExecutionContext = 
ExecutionContexts.parasitic
+      val result = Source(List(1, 2, 3))
+        .runWith(Sink.combine(immutable.Seq(Sink.seq[Int]))(Broadcast[Int](_)))
+      Future.sequence(result).futureValue should ===(List(immutable.Seq(1, 2, 
3)))
+    }
+
+    "combine single sink with Balance should work (type-preserving bypass)" in 
{
+      // Balance has TypePreservingFanOut, so the single-sink case is safely 
bypassed.
+      implicit val ex: scala.concurrent.ExecutionContext = 
ExecutionContexts.parasitic
+      val result = Source(List(1, 2, 3))
+        .runWith(Sink.combine(immutable.Seq(Sink.seq[Int]))(Balance[Int](_)))
+      Future.sequence(result).futureValue should ===(List(immutable.Seq(1, 2, 
3)))
+    }
+
+    "combine single sink with Partition should route through strategy (not 
type-preserving)" in {
+      // Partition intentionally does NOT have TypePreservingFanOut — its 
partitioner function
+      // provides user-specified routing semantics that would be lost if 
bypassed.
+      // Single-sink Partition goes through the fan-out graph, honoring 
partitioner semantics.
+      implicit val ex: scala.concurrent.ExecutionContext = 
ExecutionContexts.parasitic
+      val result = Source(List(1, 2, 3))
+        .runWith(Sink.combine(immutable.Seq(Sink.seq[Int]))(Partition[Int](_, 
_ => 0)))
+      Future.sequence(result).futureValue should ===(List(immutable.Seq(1, 2, 
3)))
+    }
+
+    "combine single sink with wrapped Broadcast (.named) should still work" in 
{
+      // Even if the fan-out strategy loses the TypePreservingFanOut trait via 
wrapping
+      // (e.g., .named()), routing through the strategy is still correct.
+      implicit val ex: scala.concurrent.ExecutionContext = 
ExecutionContexts.parasitic
+      val result = Source(List(1, 2, 3))
+        .runWith(Sink.combine(immutable.Seq(Sink.seq[Int]))(n => 
Broadcast[Int](n).named("myBroadcast")))
+      Future.sequence(result).futureValue should ===(List(immutable.Seq(1, 2, 
3)))
+    }
+
     "suitably override attribute handling methods" in {
       import Attributes._
       val s: Sink[Int, Future[Int]] = 
Sink.head[Int].async.addAttributes(none).named("name")
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 9bc7808ee3..67bb03a85d 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -171,6 +171,107 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
       sub.request(5).expectNextN(0 to 4).expectComplete()
     }
 
+    // Regression tests for https://github.com/apache/pekko/issues/2723
+    // Source.combine with a single source must apply type-transforming fan-in 
strategies
+    // (like MergeLatest) correctly, rather than bypassing them with an unsafe 
cast.
+    // The TypePreservingFanIn trait marks strategies where T == U, enabling 
safe bypass.
+    // Strategies without this trait (MergeLatest, ZipWithN) are always routed 
through
+    // the fan-in graph even for a single source.
+
+    "combine single source with MergeLatest should emit wrapped elements" in {
+      Source
+        .combine(immutable.Seq(Source.single(1)))(MergeLatest(_))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(List(1)))
+    }
+
+    "combine single source with MergeLatest should emit all wrapped elements" 
in {
+      Source
+        .combine(immutable.Seq(Source(List(1, 2, 3))))(MergeLatest(_))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(List(1), List(2), List(3)))
+    }
+
+    "combine single source with ZipWithN should apply zipper function" in {
+      Source
+        .combine(immutable.Seq(Source(List(1, 2, 3))))(n => ZipWithN[Int, 
Int](_.sum)(n))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
+
+    "combine single source with Merge should still work (type-preserving)" in {
+      Source
+        .combine(immutable.Seq(Source.single(1)))(Merge(_))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1))
+    }
+
+    "combine single source with Concat should still work (type-preserving)" in 
{
+      Source
+        .combine(immutable.Seq(Source(List(1, 2, 3))))(Concat(_))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
+
+    "combine single source with Interleave should still work 
(type-preserving)" in {
+      Source
+        .combine(immutable.Seq(Source(List(1, 2, 3))))(Interleave(_, 1))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
+
+    "combine single source with wrapped Merge (.named) should still work" in {
+      // When Merge is wrapped via .named(), the TypePreservingFanIn trait is 
lost
+      // (GenericGraphWithChangedAttributes does not extend it). The code 
correctly
+      // routes through the fan-in graph instead of bypassing — functionally 
correct,
+      // just slightly less optimal.
+      Source
+        .combine(immutable.Seq(Source(List(1, 2, 3))))(n => 
Merge[Int](n).named("my-merge"))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
+
+    "combine single source with wrapped MergeLatest (.named) should emit 
wrapped elements" in {
+      Source
+        .combine(immutable.Seq(Source(List(1, 2, 3))))(n => 
MergeLatest[Int](n).named("my-merge-latest"))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(List(1), List(2), List(3)))
+    }
+
+    "combine single source with MergeSequence should still work 
(type-preserving)" in {
+      // MergeSequence.apply wraps via withDetachedInputs, which loses the 
TypePreservingFanIn
+      // trait. This means single-source MergeSequence goes through the fan-in 
strategy (safe
+      // default). The test uses 0-based sequences to satisfy MergeSequence's 
ordering validation.
+      Source
+        .combine(immutable.Seq(Source(List(0L, 1L, 2L))))(n => 
MergeSequence[Long](n)(identity))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(0L, 1L, 2L))
+    }
+
+    "combine single source with MergePrioritized should still work 
(type-preserving)" in {
+      Source
+        .combine(immutable.Seq(Source(List(1, 2, 3))))(n => 
MergePrioritized(Seq.fill(n)(1)))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
+
+    "combine single source materialized value should be a singleton list" in {
+      val (mat, result) = Source
+        .combine(immutable.Seq(Source.single(1).mapMaterializedValue(_ => 
"mat-value")))(MergeLatest(_))
+        .toMat(Sink.seq)(Keep.both)
+        .run()
+      mat should ===(immutable.Seq("mat-value"))
+      result.futureValue should ===(immutable.Seq(List(1)))
+    }
+
+    "combine empty sources list should produce empty source" in {
+      val result = Source
+        .combine(immutable.Seq.empty[Source[Int, NotUsed]])(MergeLatest(_))
+        .runWith(Sink.seq)
+        .futureValue
+      result should ===(immutable.Seq.empty)
+    }
+
     "combine from two inputs with simplified API" in {
       val probes = immutable.Seq.fill(2)(TestPublisher.manualProbe[Int]())
       val source = Source.fromPublisher(probes(0)) :: 
Source.fromPublisher(probes(1)) :: Nil
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanIn.scala 
b/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanIn.scala
new file mode 100644
index 0000000000..857f5b8d24
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanIn.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+/**
+ * Marker trait for fan-in graph stages whose output element type is the same 
as
+ * their input element type (i.e., `T => T`).
+ *
+ * Built-in stages with this trait: [[scaladsl.Merge]], [[scaladsl.Concat]],
+ * [[scaladsl.Interleave]], [[scaladsl.MergePrioritized]], [[scaladsl.OrElse]],
+ * and [[scaladsl.MergeSequence]].
+ *
+ * Note: some of these stages (Concat, Interleave, MergeSequence) have factory 
methods
+ * that wrap the stage via `withDetachedInputs`, which loses this trait. In 
those cases,
+ * `Source.combine` routes through the fan-in graph instead of 
bypassing—functionally
+ * correct, just slightly less optimal. The bypass optimization fires for 
stages whose
+ * factory methods return the raw class (e.g., `Merge`, `MergePrioritized`).
+ *
+ * This trait is used by [[scaladsl.Source.combine]] (and its Java API 
counterpart)
+ * to safely optimize the single-source case. When only one source is provided,
+ * the fan-in strategy can be bypassed with a direct pass-through if and only 
if the
+ * strategy is type-preserving (output type equals input type). Without this 
marker,
+ * a bypass via `asInstanceOf` would be unsafe for type-transforming strategies
+ * like `MergeLatest` (where `T => List[T]`) or `ZipWithN` (where `A => O`).
+ *
+ * This design uses a "safe default": strategies '''without''' this trait will 
always
+ * be routed through the fan-in graph, even for a single source. This ensures
+ * correct behavior for unknown or third-party fan-in strategies that may 
transform
+ * the element type.
+ *
+ * @since 1.5.0
+ */
+trait TypePreservingFanIn
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanOut.scala 
b/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanOut.scala
new file mode 100644
index 0000000000..663819093a
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanOut.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+/**
+ * Marker trait for fan-out graph stages whose output element type is the same 
as
+ * their input element type (i.e., `T => T`).
+ *
+ * Examples include [[scaladsl.Broadcast]] and [[scaladsl.Balance]].
+ *
+ * Note: [[scaladsl.Partition]] is intentionally NOT marked with this trait 
despite having
+ * `T => T` types, because its `partitioner` function provides user-specified 
routing
+ * semantics that would be lost if the stage were bypassed.
+ *
+ * This trait is used by [[scaladsl.Sink.combine]] (and its Java API 
counterpart)
+ * to safely optimize the single-sink case. When only one sink is provided,
+ * the fan-out strategy can be bypassed with a direct pass-through if and only 
if the
+ * strategy is type-preserving (output type equals input type). Without this 
marker,
+ * a bypass via `asInstanceOf` would be unsafe for type-transforming strategies
+ * where `T` differs from `U`.
+ *
+ * This design uses a "safe default": strategies '''without''' this trait will 
always
+ * be routed through the fan-out graph, even for a single sink. This ensures
+ * correct behavior for unknown or third-party fan-out strategies that may 
transform
+ * the element type.
+ *
+ * @since 1.5.0
+ */
+trait TypePreservingFanOut
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
index bf28108523..6eebb3b9ae 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
@@ -96,7 +96,9 @@ object Merge {
  *
  * '''Cancels when''' downstream cancels
  */
-final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean) extends 
GraphStage[UniformFanInShape[T, T]] {
+final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean)
+    extends GraphStage[UniformFanInShape[T, T]]
+    with pekko.stream.TypePreservingFanIn {
   // one input might seem counter intuitive but saves us from special handling 
in other places
   require(inputPorts >= 1, "A Merge must have one or more input ports")
 
@@ -338,7 +340,8 @@ object MergePrioritized {
  * '''Cancels when''' downstream cancels
  */
 final class MergePrioritized[T] private (val priorities: Seq[Int], val 
eagerComplete: Boolean)
-    extends GraphStage[UniformFanInShape[T, T]] {
+    extends GraphStage[UniformFanInShape[T, T]]
+    with pekko.stream.TypePreservingFanIn {
   require(priorities.nonEmpty, "A Merge must have one or more input ports")
   require(priorities.forall(_ > 0), "Priorities should be positive integers")
 
@@ -463,8 +466,12 @@ object Interleave {
  * '''Cancels when''' downstream cancels
  */
 final class Interleave[T](val inputPorts: Int, val segmentSize: Int, val 
eagerClose: Boolean)
-    extends GraphStage[UniformFanInShape[T, T]] {
-  require(inputPorts > 1, "input ports must be > 1")
+    extends GraphStage[UniformFanInShape[T, T]]
+    with pekko.stream.TypePreservingFanIn {
+  // Relaxed from > 1 to >= 1: single-input Interleave is semantically valid 
(pass-through).
+  // This enables Source.combine to route single-source cases through the 
stage without
+  // needing a try-catch fallback. See #2723.
+  require(inputPorts >= 1, "input ports must be >= 1")
   require(segmentSize > 0, "segmentSize must be > 0")
 
   val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => 
Inlet[T]("Interleave.in" + i))
@@ -617,7 +624,9 @@ object Broadcast {
  * '''Cancels when'''
  *   If eagerCancel is enabled: when any downstream cancels; otherwise: when 
all downstreams cancel
  */
-final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) 
extends GraphStage[UniformFanOutShape[T, T]] {
+final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean)
+    extends GraphStage[UniformFanOutShape[T, T]]
+    with pekko.stream.TypePreservingFanOut {
   // one output might seem counter intuitive but saves us from special 
handling in other places
   require(outputPorts >= 1, "A Broadcast must have one or more output ports")
   val in: Inlet[T] = Inlet[T]("Broadcast.in")
@@ -943,7 +952,8 @@ object Balance {
  * '''Cancels when''' If eagerCancel is enabled: when any downstream cancels; 
otherwise: when all downstreams cancel
  */
 final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: 
Boolean, val eagerCancel: Boolean)
-    extends GraphStage[UniformFanOutShape[T, T]] {
+    extends GraphStage[UniformFanOutShape[T, T]]
+    with pekko.stream.TypePreservingFanOut {
   // one output might seem counter intuitive but saves us from special 
handling in other places
   require(outputPorts >= 1, "A Balance must have one or more output ports")
 
@@ -1325,8 +1335,13 @@ object Concat {
  *
  * '''Cancels when''' downstream cancels
  */
-final class Concat[T](val inputPorts: Int) extends 
GraphStage[UniformFanInShape[T, T]] {
-  require(inputPorts > 1, "A Concat must have more than 1 input ports")
+final class Concat[T](val inputPorts: Int)
+    extends GraphStage[UniformFanInShape[T, T]]
+    with pekko.stream.TypePreservingFanIn {
+  // Relaxed from > 1 to >= 1: single-input Concat is semantically valid 
(pass-through).
+  // This enables Source.combine to route single-source cases through the 
stage without
+  // needing a try-catch fallback. See #2723.
+  require(inputPorts >= 1, "A Concat must have at least 1 input port")
   val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => 
Inlet[T]("Concat.in" + i))
   val out: Outlet[T] = Outlet[T]("Concat.out")
   override def initialAttributes = DefaultAttributes.concat
@@ -1398,7 +1413,9 @@ object OrElse {
  * '''Cancels when''' downstream cancels
  */
 @InternalApi
-private[stream] final class OrElse[T] extends GraphStage[UniformFanInShape[T, 
T]] {
+private[stream] final class OrElse[T]
+    extends GraphStage[UniformFanInShape[T, T]]
+    with pekko.stream.TypePreservingFanIn {
   val primary = Inlet[T]("OrElse.primary")
   val secondary = Inlet[T]("OrElse.secondary")
   val out = Outlet[T]("OrElse.out")
@@ -1497,8 +1514,12 @@ object MergeSequence {
  * '''Cancels when''' downstream cancels
  */
 final class MergeSequence[T](val inputPorts: Int)(extractSequence: T => Long)
-    extends GraphStage[UniformFanInShape[T, T]] {
-  require(inputPorts > 1, "A MergeSequence must have more than 1 input ports")
+    extends GraphStage[UniformFanInShape[T, T]]
+    with pekko.stream.TypePreservingFanIn {
+  // Relaxed from > 1 to >= 1: single-input MergeSequence is semantically 
valid.
+  // This enables Source.combine to route single-source cases through the 
stage without
+  // needing a try-catch fallback. See #2723.
+  require(inputPorts >= 1, "A MergeSequence must have at least 1 input port")
   private val in: IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => 
Inlet[T]("MergeSequence.in" + i))
   private val out: Outlet[T] = Outlet("MergeSequence.out")
   override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
index ae45121714..32d46fb24a 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
@@ -413,8 +413,26 @@ object Sink {
       fanOutStrategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): 
Sink[T, immutable.Seq[M]] =
     sinks match {
       case immutable.Seq()     => Sink.cancelled.mapMaterializedValue(_ => Nil)
-      case immutable.Seq(sink) => sink.asInstanceOf[Sink[T, 
M]].mapMaterializedValue(_ :: Nil)
-      case _                   =>
+      case immutable.Seq(sink) =>
+        // Single-sink optimization: bypass the fan-out strategy if and only 
if the strategy
+        // is type-preserving (T == U), marked by the TypePreservingFanOut 
trait.
+        // For type-transforming strategies, we MUST route through the 
strategy even for a
+        // single sink. Same design as Source.combine — see #2723.
+        val strategyGraph = fanOutStrategy(1)
+        strategyGraph match {
+          case _: pekko.stream.TypePreservingFanOut =>
+            // Type-preserving (T == U): safe to bypass the strategy with a 
direct pass-through.
+            Sink.fromGraph(sink).asInstanceOf[Sink[T, 
M]].mapMaterializedValue(_ :: Nil)
+          case _ =>
+            // Not type-preserving or unknown: route through the fan-out 
strategy.
+            Sink.fromGraph(GraphDSL.create(sinks) { implicit b => shapes =>
+              import GraphDSL.Implicits._
+              val c = b.add(strategyGraph)
+              c.out(0) ~> shapes.head
+              SinkShape(c.in)
+            })
+        }
+      case _ =>
         Sink.fromGraph(GraphDSL.create(sinks) { implicit b => shapes =>
           import GraphDSL.Implicits._
           val c = b.add(fanOutStrategy(sinks.size))
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index b4d9bc18e3..bdeba1fb71 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -925,8 +925,45 @@ object Source {
       fanInStrategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): 
Source[U, immutable.Seq[M]] =
     sources match {
       case immutable.Seq()       => Source.empty.mapMaterializedValue(_ => Nil)
-      case immutable.Seq(source) => source.asInstanceOf[Source[U, 
M]].mapMaterializedValue(_ :: Nil)
-      case _                     =>
+      case immutable.Seq(source) =>
+        // Single-source optimization: bypass the fan-in strategy if and only 
if the strategy
+        // is type-preserving (T == U), marked by the TypePreservingFanIn 
trait.
+        //
+        // For type-transforming strategies (e.g., MergeLatest: T => List[T], 
ZipWithN: A => O),
+        // we MUST route through the strategy even for a single source to 
ensure correct output
+        // types. Without this check, the asInstanceOf cast would silently 
produce incorrect
+        // results at runtime (see #2723).
+        //
+        // Design: "safe default" — strategies WITHOUT TypePreservingFanIn 
always go through
+        // the full fan-in graph. This correctly handles unknown or 
third-party strategies.
+        // Built-in type-preserving fan-in stages (Merge, MergePrioritized) 
directly return
+        // the stage class from their factory methods, so the trait check 
fires and enables
+        // the bypass. Other stages (Concat, Interleave, MergeSequence) are 
wrapped by
+        // withDetachedInputs in their factory methods, which loses the trait 
— they always
+        // route through the graph, which is correct because their require 
constraints have
+        // been relaxed to accept inputPorts >= 1.
+        //
+        // Note: fanInStrategy(1) is always invoked here to determine the 
strategy's trait.
+        // Third-party strategies that reject n=1 will surface their exception 
immediately,
+        // which is preferable to silently returning an incorrectly-typed 
stream.
+        val strategyGraph = fanInStrategy(1)
+        strategyGraph match {
+          case _: pekko.stream.TypePreservingFanIn =>
+            // Type-preserving (T == U): safe to bypass the strategy with a 
direct pass-through.
+            // Use Source.fromGraph to handle non-Source Graph inputs safely 
(the sources parameter
+            // accepts Graph[SourceShape[T], M], not just Source[T, M]).
+            Source.fromGraph(source).asInstanceOf[Source[U, 
M]].mapMaterializedValue(_ :: Nil)
+          case _ =>
+            // Not type-preserving or unknown: route through the fan-in 
strategy.
+            // This ensures type-transforming strategies correctly transform 
the output.
+            Source.fromGraph(GraphDSL.create(sources) { implicit b => shapes =>
+              import GraphDSL.Implicits._
+              val c = b.add(strategyGraph)
+              shapes.head ~> c.in(0)
+              SourceShape(c.out)
+            })
+        }
+      case _ =>
         Source.fromGraph(GraphDSL.create(sources) { implicit b => shapes =>
           import GraphDSL.Implicits._
           val c = b.add(fanInStrategy(sources.size))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to