This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9445fd0f9cc [SPARK-43143][SS][CONNECT] Scala StreamingQuery 
awaitTermination()
9445fd0f9cc is described below

commit 9445fd0f9cc589ea6baec7f03912f73c5f222d87
Author: Wei Liu <[email protected]>
AuthorDate: Mon May 1 14:04:13 2023 -0400

    [SPARK-43143][SS][CONNECT] Scala StreamingQuery awaitTermination()
    
    ### What changes were proposed in this pull request?
    
    Add the awaitTermination() API to scala client query. Please note that 
currently it won't throw the exception as it would do in original method. 
Because the JVM client side error handling is not ready yet. Details in 
SPARK-43299
    
    ### Why are the changes needed?
    
    SS Connect development
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes they can use that now
    
    ### How was this patch tested?
    
    ```
    Spark session available as 'spark'.
       _____                  __      ______                            __
      / ___/____  ____ ______/ /__   / ____/___  ____  ____  ___  _____/ /_
      \__ \/ __ \/ __ `/ ___/ //_/  / /   / __ \/ __ \/ __ \/ _ \/ ___/ __/
     ___/ / /_/ / /_/ / /  / ,<    / /___/ /_/ / / / / / / /  __/ /__/ /_
    /____/ .___/\__,_/_/  /_/|_|   \____/\____/_/ /_/_/ /_/\___/\___/\__/
        /_/
    
     val q = 
spark.readStream.format("rate").load().writeStream.format("memory").queryName("test").start()
    q: org.apache.spark.sql.streaming.StreamingQuery = 
org.apache.spark.sql.streaming.RemoteStreamingQuery34eaf9c1
    
     q.awaitTermination(500)
    res1: Boolean = false
    
    ```
    
    Closes #40968 from WweiL/SPARK-43143-await-termination-scala.
    
    Authored-by: Wei Liu <[email protected]>
    Signed-off-by: Herman van Hovell <[email protected]>
---
 .../spark/sql/streaming/StreamingQuery.scala       | 35 ++++++++++++++++++++++
 .../CheckConnectJvmClientCompatibility.scala       |  3 --
 .../spark/sql/streaming/StreamingQuerySuite.scala  | 26 ++++++++++++++++
 3 files changed, 61 insertions(+), 3 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index a1bd8e264cc..7f6e3721841 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -107,6 +107,32 @@ trait StreamingQuery {
    */
   def lastProgress: StreamingQueryProgress
 
+  /**
+   * Waits for the termination of `this` query, either by `query.stop()` or by 
an exception.
+   *
+   * If the query has terminated, then all subsequent calls to this method 
will either return
+   * immediately (if the query was terminated by `stop()`).
+   *
+   * @since 3.5.0
+   */
+  // TODO(SPARK-43299): verity the behavior of this method after JVM 
client-side error-handling
+  // framework is supported and modify the doc accordingly.
+  def awaitTermination(): Unit
+
+  /**
+   * Waits for the termination of `this` query, either by `query.stop()` or by 
an exception. If
+   * the query has terminated with an exception, then the exception will be 
thrown. Otherwise, it
+   * returns whether the query has terminated or not within the `timeoutMs` 
milliseconds.
+   *
+   * If the query has terminated, then all subsequent calls to this method 
will return `true`
+   * immediately.
+   *
+   * @since 3.5.0
+   */
+  // TODO(SPARK-43299): verity the behavior of this method after JVM 
client-side error-handling
+  // framework is supported and modify the doc accordingly.
+  def awaitTermination(timeoutMs: Long): Boolean
+
   /**
    * Blocks until all available data in the source has been processed and 
committed to the sink.
    * This method is intended for testing. Note that in the case of continually 
arriving data, this
@@ -159,6 +185,15 @@ class RemoteStreamingQuery(
     executeQueryCmd(_.setStatus(true)).getStatus.getIsActive
   }
 
+  override def awaitTermination(): Unit = {
+    executeQueryCmd(_.getAwaitTerminationBuilder.build())
+  }
+
+  override def awaitTermination(timeoutMs: Long): Boolean = {
+    executeQueryCmd(
+      
_.getAwaitTerminationBuilder.setTimeoutMs(timeoutMs)).getAwaitTermination.getTerminated
+  }
+
   override def status: StreamingQueryStatus = {
     val statusResp = executeQueryCmd(_.setStatus(true)).getStatus
     new StreamingQueryStatus(
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index c71017bb271..28a28994a76 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -244,9 +244,6 @@ object CheckConnectJvmClientCompatibility {
       ),
 
       // StreamingQuery
-      ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.StreamingQuery.awaitTermination" // 
TODO(SPARK-43143)
-      ),
       ProblemFilters.exclude[Problem](
         "org.apache.spark.sql.streaming.StreamingQueryProgress.*" // 
TODO(SPARK-43128)
       ),
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index f0c12d212c2..24f5e345b60 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -118,4 +118,30 @@ class StreamingQuerySuite extends RemoteSparkSession with 
SQLHelper {
       }
     }
   }
+
+  test("awaitTermination") {
+    withSQLConf(
+      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
+    ) {
+      val q = spark.readStream
+        .format("rate")
+        .load()
+        .writeStream
+        .format("memory")
+        .queryName("test")
+        .start()
+
+      val start = System.nanoTime
+      val terminated = q.awaitTermination(500)
+      val end = System.nanoTime
+      assert((end - start) / 1e6 >= 500)
+      assert(!terminated)
+
+      q.stop()
+      // TODO (SPARK-43032): uncomment below
+      // eventually(timeout(1.minute)) {
+      // q.awaitTermination()
+      // }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to