This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 58ec60d26974 [SPARK-38498][DSTREAM] Support customized 
StreamingListener by configuration
58ec60d26974 is described below

commit 58ec60d26974242245ec22de5e70ccf09942c42e
Author: Angerszhuuuu <[email protected]>
AuthorDate: Tue Nov 4 11:04:43 2025 -0800

    [SPARK-38498][DSTREAM] Support customized StreamingListener by configuration
    
    ### What changes were proposed in this pull request?
    Currently, if user want to add an customized StreamingListener to 
StreamingContext, we need to add this listener in customized code.
    ```
    streamingContext.addStreamingListener()
    ```
    
    In this pr, we can define customized StreamingListener by set conf
    ```
    spark.streaming.extraListeners
    ```
    
    And it can support two constructor
    
    1. No construct parameter
    2. one constructor parameter of `SparkConf`
    
    ### Why are the changes needed?
    Some time we want to add some common StreamingListener to do some 
customized analysis, it is noisy to do this for all job, with this 
configuration, we can do this together just by setting a common env.
    
    ### Does this PR introduce _any_ user-facing change?
    User can set StreamingContext by set configuration
    ```
    spark.streaming.extraListeners
    ```
    
    ### How was this patch tested?
    Added UT
    
    Closes #35799 from AngersZhuuuu/SPARK-38498.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 8100636ab8707d068b02b9e7cd3bfe1b850ff86e)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../org/apache/spark/streaming/StreamingConf.scala |  7 ++++++
 .../apache/spark/streaming/StreamingContext.scala  | 28 ++++++++++++++++++----
 .../spark/streaming/StreamingListenerSuite.scala   | 27 ++++++++++++++++++++-
 3 files changed, 57 insertions(+), 5 deletions(-)

diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala
index bb80bd7072e8..39f3d495bdbf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala
@@ -185,4 +185,11 @@ object StreamingConf {
       .longConf
       .createWithDefault(0)
 
+  private[streaming] val STREAMING_EXTRA_LISTENERS =
+    ConfigBuilder("spark.streaming.extraListeners")
+      .doc("Class names of streaming listeners to add to StreamingContext 
during initialization.")
+      .version("4.1.0")
+      .stringConf
+      .toSequence
+      .createOptional
 }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 79bc38318f91..139b83ba0d07 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -41,12 +41,11 @@ import org.apache.spark.rdd.{RDD, RDDOperationScope}
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.SerializationDebugger
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingConf.STOP_GRACEFULLY_ON_SHUTDOWN
+import org.apache.spark.streaming.StreamingConf.{STOP_GRACEFULLY_ON_SHUTDOWN, 
STREAMING_EXTRA_LISTENERS}
 import org.apache.spark.streaming.StreamingContextState._
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.
-    {ExecutorAllocationManager, JobScheduler, StreamingListener, 
StreamingListenerStreamingStarted}
+import org.apache.spark.streaming.scheduler.{ExecutorAllocationManager, 
JobScheduler, StreamingListener, StreamingListenerStreamingStarted}
 import org.apache.spark.streaming.ui.{StreamingJobProgressListener, 
StreamingTab}
 import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, 
Utils}
 
@@ -584,7 +583,7 @@ class StreamingContext private[streaming] (
             validate()
 
             registerProgressListener()
-
+            registerExtraStreamingListener()
             // Start the streaming scheduler in a new thread, so that thread 
local properties
             // like call sites and job groups can be reset without affecting 
those of the
             // current thread.
@@ -622,6 +621,27 @@ class StreamingContext private[streaming] (
     }
   }
 
+  /**
+   * Registers streaming listeners specified in spark.streaming.extraListeners.
+   */
+  private def registerExtraStreamingListener(): Unit = {
+    try {
+      conf.get(STREAMING_EXTRA_LISTENERS).foreach { classNames =>
+        val listeners = Utils.loadExtensions(classOf[StreamingListener], 
classNames, conf)
+        listeners.foreach { listener =>
+          addStreamingListener(listener)
+          logInfo(s"Registered streaming listener 
${listener.getClass().getName()}")
+        }
+      }
+    } catch {
+      case e: Exception =>
+        try {
+          stop()
+        } finally {
+          throw new SparkException(s"Exception when registering 
StreamingListener", e)
+        }
+    }
+  }
 
   /**
    * Wait for the execution to stop. Any exceptions that occurs during the 
execution
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 63899f961e7b..fa560406ac3f 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -32,8 +32,9 @@ import org.scalatest.matchers.must.Matchers
 import org.scalatest.matchers.should.Matchers._
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.UI
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.receiver.Receiver
@@ -235,6 +236,19 @@ class StreamingListenerSuite extends TestSuiteBase with 
LocalStreamingContext wi
     verifyNoMoreInteractions(streamingListener)
   }
 
+  test("SPARK-38498: Support extra streaming listener") {
+    val conf = new SparkConf().setMaster("local").setAppName("customized 
streaming listener")
+      .set(UI.UI_ENABLED, false)
+      .set(StreamingConf.STREAMING_EXTRA_LISTENERS.key,
+        classOf[ExtraStreamingListener].getName)
+    val sc = new SparkContext(conf)
+    ssc = new StreamingContext(sc, Milliseconds(1000))
+    val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
+    inputStream.foreachRDD(_.count())
+    startStreamingContextAndCallStop(ssc)
+    assert(ExtraStreamingListenerBatchCounter.COMPLETED_BATCH >= 1)
+  }
+
   private def startStreamingContextAndCallStop(_ssc: StreamingContext): Unit = 
{
     val contextStoppingCollector = new StreamingContextStoppingCollector(_ssc)
     _ssc.addStreamingListener(contextStoppingCollector)
@@ -386,3 +400,14 @@ class StreamingContextStoppingCollector(val ssc: 
StreamingContext) extends Strea
     }
   }
 }
+
+class ExtraStreamingListener extends StreamingListener {
+  import ExtraStreamingListenerBatchCounter._
+  override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted): Unit = {
+    COMPLETED_BATCH += 1
+  }
+}
+
+object ExtraStreamingListenerBatchCounter {
+  var COMPLETED_BATCH = 0
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to