This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 722b195e4d Fix Java unchecked warnings in stream testkit fluent API 
(#2625) (#2725)
722b195e4d is described below

commit 722b195e4d629936a21a79c9b088a593edcab8b2
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Mar 15 22:40:26 2026 +0800

    Fix Java unchecked warnings in stream testkit fluent API (#2625) (#2725)
    
    Use Scala's `this.type` return type instead of abstract type member `Self`
    to fix raw type erasure in bytecode signatures. This is a root-cause fix
    that eliminates unchecked warnings in Java without any boilerplate 
overrides.
    
    Root cause: `type Self <: ManualProbe[I]` erased to raw `ManualProbe` in
    bytecode (no generic Signature attribute), causing Java to see raw types.
    `this.type` makes the Scala compiler emit `ManualProbe<TI;>` (parameterized)
    in the bytecode Signature attribute, so Java sees proper generic types.
    
    Changes:
    - Remove `type Self` abstract type members from ManualProbe/Probe classes
    - Change all fluent method return types from `Self` to `this.type`
    - Replace `self` references with `this`
    - Fix ScalaDoc links: TestSubscriber → TestSubscriber.Probe, TestPublisher 
→ TestPublisher.Probe
    - Fix typo: "JAVA PAI" → "JAVA API"
    
    Binary compatible: erased JVM return types unchanged (MiMa safe).
    Only generic Signature attribute metadata changes.
---
 .../fix-java-unchecked-warnings.excludes           |  46 +++++++
 .../pekko/stream/testkit/StreamTestKit.scala       | 144 ++++++++++-----------
 .../pekko/stream/testkit/javadsl/TestSink.scala    |   2 +-
 .../pekko/stream/testkit/javadsl/TestSource.scala  |   2 +-
 4 files changed, 119 insertions(+), 75 deletions(-)

diff --git 
a/stream-testkit/src/main/mima-filters/2.0.x.backwards.excludes/fix-java-unchecked-warnings.excludes
 
b/stream-testkit/src/main/mima-filters/2.0.x.backwards.excludes/fix-java-unchecked-warnings.excludes
new file mode 100644
index 0000000000..8d97d5bb65
--- /dev/null
+++ 
b/stream-testkit/src/main/mima-filters/2.0.x.backwards.excludes/fix-java-unchecked-warnings.excludes
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Fix Java unchecked warnings by using this.type return types (#2625)
+# Changed fluent API methods from abstract type member Self to this.type,
+# which adds parameterized generic Signature attributes to bytecode.
+# Only the generic Signature metadata changes — erased JVM types are unchanged.
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestPublisher#ManualProbe.expectNoMessage")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestPublisher#ManualProbe.expectRequest")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectComplete")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectError")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectEvent")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectNext")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectNextChainingPF")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectNextN")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectNextOrComplete")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectNextUnordered")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectNextUnorderedN")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectNoMessage")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectSubscriptionAndComplete")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectSubscriptionAndError")
+# Probe subclass methods — Scala 3 cross-build also changes these signatures
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestPublisher#Probe.sendNext")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestPublisher#Probe.unsafeSendNext")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestPublisher#Probe.sendComplete")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestPublisher#Probe.sendError")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestPublisher#Probe.expectCancellation")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestPublisher#Probe.expectCancellationWithCause")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#Probe.ensureSubscription")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#Probe.request")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#Probe.requestNext")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#Probe.cancel")
diff --git 
a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala
 
b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala
index 36abb43fc8..ebc90af57d 100644
--- 
a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala
+++ 
b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala
@@ -117,8 +117,6 @@ object TestPublisher {
   class ManualProbe[I] private[TestPublisher] (autoOnSubscribe: Boolean = 
true)(implicit system: ActorSystem)
       extends Publisher[I] {
 
-    type Self <: ManualProbe[I]
-
     private val probe: TestProbe = TestProbe()
 
     // this is a way to pause receiving message from probe until subscription 
is done
@@ -130,7 +128,6 @@ object TestPublisher {
         this
       }
     })
-    private val self = this.asInstanceOf[Self]
 
     /**
      * Subscribes a given [[org.reactivestreams.Subscriber]] to this probe 
publisher.
@@ -168,26 +165,32 @@ object TestPublisher {
     /**
      * Expect demand from a given subscription.
      */
-    def expectRequest(subscription: Subscription, n: Int): Self = 
executeAfterSubscription {
-      probe.expectMsg(RequestMore(subscription, n))
-      self
+    def expectRequest(subscription: Subscription, n: Int): this.type = {
+      executeAfterSubscription {
+        probe.expectMsg(RequestMore(subscription, n))
+      }
+      this
     }
 
     /**
      * Expect no messages.
      * Waits for the default period configured as 
`pekko.actor.testkit.expect-no-message-default`.
      */
-    def expectNoMessage(): Self = executeAfterSubscription {
-      probe.expectNoMessage()
-      self
+    def expectNoMessage(): this.type = {
+      executeAfterSubscription {
+        probe.expectNoMessage()
+      }
+      this
     }
 
     /**
      * Expect no messages for a given duration.
      */
-    def expectNoMessage(max: FiniteDuration): Self = executeAfterSubscription {
-      probe.expectNoMessage(max)
-      self
+    def expectNoMessage(max: FiniteDuration): this.type = {
+      executeAfterSubscription {
+        probe.expectNoMessage(max)
+      }
+      this
     }
 
     /**
@@ -196,7 +199,7 @@ object TestPublisher {
      * Expect no messages for a given duration.
      * @since 1.1.0
      */
-    def expectNoMessage(max: java.time.Duration): Self = 
expectNoMessage(max.toScala)
+    def expectNoMessage(max: java.time.Duration): this.type = 
expectNoMessage(max.toScala)
 
     /**
      * Receive messages for a given duration or until one does not match a 
given partial function.
@@ -307,8 +310,6 @@ object TestPublisher {
   class Probe[T] private[TestPublisher] (initialPendingRequests: 
Long)(implicit system: ActorSystem)
       extends ManualProbe[T] {
 
-    type Self = Probe[T]
-
     private var pendingRequests = initialPendingRequests
     private lazy val subscription = expectSubscription()
 
@@ -320,24 +321,24 @@ object TestPublisher {
      */
     def pending: Long = pendingRequests
 
-    def sendNext(elem: T): Self = {
+    def sendNext(elem: T): this.type = {
       if (pendingRequests == 0) pendingRequests = subscription.expectRequest()
       pendingRequests -= 1
       subscription.sendNext(elem)
       this
     }
 
-    def unsafeSendNext(elem: T): Self = {
+    def unsafeSendNext(elem: T): this.type = {
       subscription.sendNext(elem)
       this
     }
 
-    def sendComplete(): Self = {
+    def sendComplete(): this.type = {
       subscription.sendComplete()
       this
     }
 
-    def sendError(cause: Throwable): Self = {
+    def sendError(cause: Throwable): this.type = {
       subscription.sendError(cause)
       this
     }
@@ -348,12 +349,12 @@ object TestPublisher {
       requests
     }
 
-    def expectCancellation(): Self = {
+    def expectCancellation(): this.type = {
       subscription.expectCancellation()
       this
     }
 
-    def expectCancellationWithCause(expectedCause: Throwable): Self = {
+    def expectCancellationWithCause(expectedCause: Throwable): this.type = {
       val cause = subscription.expectCancellation()
       assert(cause == expectedCause, s"Expected cancellation cause to be 
$expectedCause but was $cause")
       this
@@ -371,6 +372,7 @@ object TestPublisher {
      */
     def expectCancellationWithCause[E <: Throwable](causeClass: Class[E]): E =
       expectCancellationWithCause()(ClassTag(causeClass))
+
   }
 
 }
@@ -419,14 +421,10 @@ object TestSubscriber {
   class ManualProbe[I] private[TestSubscriber] ()(implicit system: 
ActorSystem) extends Subscriber[I] {
     import pekko.testkit._
 
-    type Self <: ManualProbe[I]
-
     private val probe = TestProbe()
 
     @volatile private var _subscription: Subscription = _
 
-    private val self = this.asInstanceOf[Self]
-
     /**
      * Expect and return a [[org.reactivestreams.Subscription]].
      */
@@ -460,9 +458,9 @@ object TestSubscriber {
      *
      * Expect [[SubscriberEvent]] (any of: `OnSubscribe`, `OnNext`, `OnError` 
or `OnComplete`).
      */
-    def expectEvent(event: SubscriberEvent): Self = {
+    def expectEvent(event: SubscriberEvent): this.type = {
       probe.expectMsg(event)
-      self
+      this
     }
 
     /**
@@ -497,9 +495,9 @@ object TestSubscriber {
      *
      * Expect a stream element.
      */
-    def expectNext(element: I): Self = {
+    def expectNext(element: I): this.type = {
       probe.expectMsg(OnNext(element))
-      self
+      this
     }
 
     /**
@@ -507,20 +505,20 @@ object TestSubscriber {
      *
      * Expect a stream element during specified time or timeout.
      */
-    def expectNext(d: FiniteDuration, element: I): Self = {
+    def expectNext(d: FiniteDuration, element: I): this.type = {
       probe.expectMsg(d, OnNext(element))
-      self
+      this
     }
 
     /**
-     * JAVA PAI
+     * JAVA API
      *
      * Fluent DSL
      *
      * Expect a stream element during specified time or timeout.
      * @since 1.1.0
      */
-    def expectNext(d: java.time.Duration, element: I): Self = 
expectNext(d.toScala, element)
+    def expectNext(d: java.time.Duration, element: I): this.type = 
expectNext(d.toScala, element)
 
     /**
      * Fluent DSL
@@ -528,7 +526,7 @@ object TestSubscriber {
      * Expect multiple stream elements.
      */
     @annotation.varargs
-    def expectNext(e1: I, e2: I, es: I*): Self =
+    def expectNext(e1: I, e2: I, es: I*): this.type =
       expectNextN((e1 +: e2 +: 
es).iterator.map(identity).to(immutable.IndexedSeq))
 
     /**
@@ -537,7 +535,7 @@ object TestSubscriber {
      * Expect multiple stream elements in arbitrary order.
      */
     @annotation.varargs
-    def expectNextUnordered(e1: I, e2: I, es: I*): Self =
+    def expectNextUnordered(e1: I, e2: I, es: I*): this.type =
       expectNextUnorderedN((e1 +: e2 +: 
es).iterator.map(identity).to(immutable.IndexedSeq))
 
     /**
@@ -559,9 +557,9 @@ object TestSubscriber {
      * Fluent DSL
      * Expect the given elements to be signalled in order.
      */
-    def expectNextN(all: immutable.Seq[I]): Self = {
+    def expectNextN(all: immutable.Seq[I]): this.type = {
       all.foreach(e => probe.expectMsg(OnNext(e)))
-      self
+      this
     }
 
     /**
@@ -569,16 +567,16 @@ object TestSubscriber {
      * Expect the given elements to be signalled in order.
      * @since 1.1.0
      */
-    def expectNextN(elems: java.util.List[I]): Self = {
+    def expectNextN(elems: java.util.List[I]): this.type = {
       elems.forEach(e => probe.expectMsg(OnNext(e)))
-      self
+      this
     }
 
     /**
      * Fluent DSL
      * Expect the given elements to be signalled in any order.
      */
-    def expectNextUnorderedN(all: immutable.Seq[I]): Self = {
+    def expectNextUnorderedN(all: immutable.Seq[I]): this.type = {
       @annotation.tailrec
       def expectOneOf(all: immutable.Seq[I]): Unit = all match {
         case Nil =>
@@ -589,7 +587,7 @@ object TestSubscriber {
       }
 
       expectOneOf(all)
-      self
+      this
     }
 
     /**
@@ -599,16 +597,16 @@ object TestSubscriber {
      * Expect the given elements to be signalled in any order.
      * @since 1.1.0
      */
-    def expectNextUnorderedN(all: java.util.List[I]): Self = 
expectNextUnorderedN(Util.immutableSeq(all))
+    def expectNextUnorderedN(all: java.util.List[I]): this.type = 
expectNextUnorderedN(Util.immutableSeq(all))
 
     /**
      * Fluent DSL
      *
      * Expect completion.
      */
-    def expectComplete(): Self = {
+    def expectComplete(): this.type = {
       probe.expectMsg(OnComplete)
-      self
+      this
     }
 
     /**
@@ -621,9 +619,9 @@ object TestSubscriber {
      *
      * Expect given [[java.lang.Throwable]].
      */
-    def expectError(cause: Throwable): Self = {
+    def expectError(cause: Throwable): this.type = {
       probe.expectMsg(OnError(cause))
-      self
+      this
     }
 
     /**
@@ -660,7 +658,7 @@ object TestSubscriber {
      *
      * See also 
[[#expectSubscriptionAndError(cause:Throwable,signalDemand:Boolean)* 
#expectSubscriptionAndError(cause: Throwable, signalDemand: Boolean)]] if no 
demand should be signalled.
      */
-    def expectSubscriptionAndError(cause: Throwable): Self =
+    def expectSubscriptionAndError(cause: Throwable): this.type =
       expectSubscriptionAndError(cause, signalDemand = true)
 
     /**
@@ -671,11 +669,11 @@ object TestSubscriber {
      *
      * See also [[#expectSubscriptionAndError(cause:Throwable)* 
#expectSubscriptionAndError(cause: Throwable)]].
      */
-    def expectSubscriptionAndError(cause: Throwable, signalDemand: Boolean): 
Self = {
+    def expectSubscriptionAndError(cause: Throwable, signalDemand: Boolean): 
this.type = {
       val sub = expectSubscription()
       if (signalDemand) sub.request(1)
       expectError(cause)
-      self
+      this
     }
 
     /**
@@ -686,7 +684,7 @@ object TestSubscriber {
      *
      * See also [[#expectSubscriptionAndComplete(signalDemand:Boolean)* 
#expectSubscriptionAndComplete(signalDemand: Boolean)]] if no demand should be 
signalled.
      */
-    def expectSubscriptionAndComplete(): Self =
+    def expectSubscriptionAndComplete(): this.type =
       expectSubscriptionAndComplete(true)
 
     /**
@@ -699,11 +697,11 @@ object TestSubscriber {
      *
      * See also [[#expectSubscriptionAndComplete()* 
#expectSubscriptionAndComplete]].
      */
-    def expectSubscriptionAndComplete(signalDemand: Boolean): Self = {
+    def expectSubscriptionAndComplete(signalDemand: Boolean): this.type = {
       val sub = expectSubscription()
       if (signalDemand) sub.request(1)
       expectComplete()
-      self
+      this
     }
 
     /**
@@ -756,12 +754,12 @@ object TestSubscriber {
      *
      * Expect given next element or stream completion.
      */
-    def expectNextOrComplete(element: I): Self = {
+    def expectNextOrComplete(element: I): this.type = {
       probe.fishForMessage(hint = s"OnNext($element) or OnComplete") {
         case OnNext(`element`) => true
         case OnComplete        => true
       }
-      self
+      this
     }
 
     /**
@@ -769,9 +767,9 @@ object TestSubscriber {
      *
      * Assert that no message is received for the specified time.
      */
-    def expectNoMessage(remaining: FiniteDuration): Self = {
+    def expectNoMessage(remaining: FiniteDuration): this.type = {
       probe.expectNoMessage(remaining)
-      self
+      this
     }
 
     /**
@@ -781,17 +779,17 @@ object TestSubscriber {
      * Waits for the default period configured as 
`pekko.test.expect-no-message-default`.
      * That timeout is scaled using the configuration entry 
"pekko.test.timefactor".
      */
-    def expectNoMessage(): Self = {
+    def expectNoMessage(): this.type = {
       probe.expectNoMessage()
-      self
+      this
     }
 
     /**
      * Java API: Assert that no message is received for the specified time.
      */
-    def expectNoMessage(remaining: java.time.Duration): Self = {
+    def expectNoMessage(remaining: java.time.Duration): this.type = {
       probe.expectNoMessage(remaining.toScala)
-      self
+      this
     }
 
     /**
@@ -830,8 +828,10 @@ object TestSubscriber {
      *
      * @param max wait no more than max time, otherwise throw AssertionError
      */
-    def expectNextChainingPF(max: Duration, f: PartialFunction[Any, Any]): 
Self =
-      expectNextWithTimeoutPF(max, f.andThen(_ => self))
+    def expectNextChainingPF(max: Duration, f: PartialFunction[Any, Any]): 
this.type = {
+      expectNextWithTimeoutPF(max, f)
+      this
+    }
 
     /**
      * JAVA API
@@ -843,7 +843,7 @@ object TestSubscriber {
      * @param max wait no more than max time, otherwise throw AssertionError
      * @since 1.1.0
      */
-    def expectNextChainingPF(max: java.time.Duration, f: PartialFunction[Any, 
Any]): Self =
+    def expectNextChainingPF(max: java.time.Duration, f: PartialFunction[Any, 
Any]): this.type =
       expectNextChainingPF(max.toScala, f)
 
     /**
@@ -851,7 +851,7 @@ object TestSubscriber {
      *
      * Allows chaining probe methods.
      */
-    def expectNextChainingPF(f: PartialFunction[Any, Any]): Self =
+    def expectNextChainingPF(f: PartialFunction[Any, Any]): this.type =
       expectNextChainingPF(Duration.Undefined, f)
 
     def expectEventWithTimeoutPF[T](max: Duration, f: 
PartialFunction[SubscriberEvent, T]): T =
@@ -919,7 +919,7 @@ object TestSubscriber {
       val b = immutable.Seq.newBuilder[I]
 
       @tailrec def drain(): immutable.Seq[I] =
-        self.expectEvent(deadline.timeLeft) match {
+        this.expectEvent(deadline.timeLeft) match {
           case OnError(ex) =>
             throw new AssertionError(
               s"toStrict received OnError while draining stream! Accumulated 
elements: ${b.result()}",
@@ -933,7 +933,7 @@ object TestSubscriber {
         }
 
       // if no subscription was obtained yet, we expect it
-      if (_subscription eq null) self.expectSubscription()
+      if (_subscription eq null) this.expectSubscription()
       _subscription.request(Long.MaxValue)
 
       drain()
@@ -1027,17 +1027,15 @@ object TestSubscriber {
    */
   class Probe[T] private[TestSubscriber] ()(implicit system: ActorSystem) 
extends ManualProbe[T] {
 
-    override type Self = Probe[T]
-
     private lazy val subscription = expectSubscription()
 
     /** Asserts that a subscription has been received or will be received */
-    def ensureSubscription(): Self = {
+    def ensureSubscription(): this.type = {
       subscription // initializes lazy val
       this
     }
 
-    def request(n: Long): Self = {
+    def request(n: Long): this.type = {
       subscription.request(n)
       this
     }
@@ -1045,18 +1043,18 @@ object TestSubscriber {
     /**
      * Request and expect a stream element.
      */
-    def requestNext(element: T): Self = {
+    def requestNext(element: T): this.type = {
       subscription.request(1)
       expectNext(element)
       this
     }
 
-    def cancel(): Self = {
+    def cancel(): this.type = {
       subscription.cancel()
       this
     }
 
-    def cancel(cause: Throwable): Self = subscription match {
+    def cancel(cause: Throwable): this.type = subscription match {
       case s: SubscriptionWithCancelException =>
         s.cancel(cause)
         this
diff --git 
a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSink.scala
 
b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSink.scala
index d6ab2db74c..9fa14e868c 100644
--- 
a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSink.scala
+++ 
b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSink.scala
@@ -22,7 +22,7 @@ import pekko.stream.testkit._
 object TestSink {
 
   /**
-   * A Sink that materialized to a [[pekko.stream.testkit.TestSubscriber]].
+   * A Sink that materialized to a 
[[pekko.stream.testkit.TestSubscriber.Probe]].
    */
   def create[T](system: ClassicActorSystemProvider): Sink[T, 
TestSubscriber.Probe[T]] =
     new Sink(scaladsl.TestSink[T]()(system))
diff --git 
a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSource.scala
 
b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSource.scala
index 968c3f3e49..74fed49578 100644
--- 
a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSource.scala
+++ 
b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSource.scala
@@ -22,7 +22,7 @@ import pekko.stream.testkit._
 object TestSource {
 
   /**
-   * A Source that materializes to a [[pekko.stream.testkit.TestPublisher]].
+   * A Source that materializes to a 
[[pekko.stream.testkit.TestPublisher.Probe]].
    */
   def create[T](system: ClassicActorSystemProvider): Source[T, 
TestPublisher.Probe[T]] =
     new Source(scaladsl.TestSource[T]()(system))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to