This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cf99e6c00c3 [SPARK-44474][CONNECT] Reenable "Test observe response" at 
SparkConnectServiceSuite
cf99e6c00c3 is described below

commit cf99e6c00c3c167b1a799db65bca85d7a627ea08
Author: jdesjean <jf.gauth...@databricks.com>
AuthorDate: Wed Jul 19 11:25:24 2023 +0900

    [SPARK-44474][CONNECT] Reenable "Test observe response" at 
SparkConnectServiceSuite
    
    ### What changes were proposed in this pull request?
    Finished is emitted in SparkConnectPlanExecution after the arrow conversion 
job is completed. However, since we don't await the completion of the job, it's 
possible for SparkConnectPlanExecution to complete before sending Finished.
    Closed is emitted in SparkConnectExecutePlanHandler in a separate thread.
    
    Add await in order to guarantee the order of events between Finished & 
Closed.
    
    ### Why are the changes needed?
    `Test observe response` at SparkConnectServiceSuite was disabled as flaky 
after [introduction of events](https://github.com/apache/spark/pull/41443). 
Failure surfaced race condition in emitting the Finished & Closed events for 
Connect request of type plan. The correct order of events is Finished < Closed.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit
    
    Closes #42063 from jdesjean/SPARK-44474.
    
    Authored-by: jdesjean <jf.gauth...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../execution/SparkConnectPlanExecution.scala      | 38 ++++++++++++----------
 .../connect/planner/SparkConnectServiceSuite.scala |  3 +-
 2 files changed, 22 insertions(+), 19 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
index d2124a38c9d..334dcdbcb42 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.connect.execution
 
 import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
 import scala.util.{Failure, Success}
 
 import com.google.protobuf.ByteString
@@ -153,23 +154,23 @@ private[execution] class 
SparkConnectPlanExecution(executeHolder: ExecuteHolder)
               ()
             }
 
-            val future = spark.sparkContext.submitJob(
-              rdd = batches,
-              processPartition = (iter: Iterator[Batch]) => iter.toArray,
-              partitions = Seq.range(0, numPartitions),
-              resultHandler = resultHandler,
-              resultFunc = () => ())
-
-            // Collect errors and propagate them to the main thread.
-            future.onComplete {
-              case Success(_) =>
-                executePlan.eventsManager.postFinished()
-              case Failure(throwable) =>
-                signal.synchronized {
-                  error = Some(throwable)
-                  signal.notify()
-                }
-            }(ThreadUtils.sameThread)
+            val future = spark.sparkContext
+              .submitJob(
+                rdd = batches,
+                processPartition = (iter: Iterator[Batch]) => iter.toArray,
+                partitions = Seq.range(0, numPartitions),
+                resultHandler = resultHandler,
+                resultFunc = () => ())
+              // Collect errors and propagate them to the main thread.
+              .andThen {
+                case Success(_) =>
+                  executePlan.eventsManager.postFinished()
+                case Failure(throwable) =>
+                  signal.synchronized {
+                    error = Some(throwable)
+                    signal.notify()
+                  }
+              }(ThreadUtils.sameThread)
 
             // The main thread will wait until 0-th partition is available,
             // then send it to client and wait for the next partition.
@@ -199,6 +200,9 @@ private[execution] class 
SparkConnectPlanExecution(executeHolder: ExecuteHolder)
 
               currentPartitionId += 1
             }
+            ThreadUtils.awaitReady(future, Duration.Inf)
+          } else {
+            executePlan.eventsManager.postFinished()
           }
         }
     }
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
index cfa37b86cd4..498084efb8f 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
@@ -564,8 +564,7 @@ class SparkConnectServiceSuite extends SharedSparkSession 
with MockitoSugar with
     }
   }
 
-  // TODO(SPARK-44474): Reenable Test observe response at 
SparkConnectServiceSuite
-  ignore("Test observe response") {
+  test("Test observe response") {
     // TODO(SPARK-44121) Renable Arrow-based connect tests in Java 21
     assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17))
     withTable("test") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to