This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new cf99e6c00c3 [SPARK-44474][CONNECT] Reenable "Test observe response" at SparkConnectServiceSuite cf99e6c00c3 is described below commit cf99e6c00c3c167b1a799db65bca85d7a627ea08 Author: jdesjean <jf.gauth...@databricks.com> AuthorDate: Wed Jul 19 11:25:24 2023 +0900 [SPARK-44474][CONNECT] Reenable "Test observe response" at SparkConnectServiceSuite ### What changes were proposed in this pull request? Finished is emitted in SparkConnectPlanExecution after the arrow conversion job is completed. However, since we don't await the completion of the job, it's possible for SparkConnectPlanExecution to complete before sending Finished. Closed is emitted in SparkConnectExecutePlanHandler in a separate thread. Add await in order to guarantee the order of events between Finished & Closed. ### Why are the changes needed? `Test observe response` at SparkConnectServiceSuite was disabled as flaky after [introduction of events](https://github.com/apache/spark/pull/41443). Failure surfaced race condition in emitting the Finished & Closed events for Connect request of type plan. The correct order of events is Finished < Closed. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit Closes #42063 from jdesjean/SPARK-44474. Authored-by: jdesjean <jf.gauth...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../execution/SparkConnectPlanExecution.scala | 38 ++++++++++++---------- .../connect/planner/SparkConnectServiceSuite.scala | 3 +- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala index d2124a38c9d..334dcdbcb42 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.connect.execution import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration import scala.util.{Failure, Success} import com.google.protobuf.ByteString @@ -153,23 +154,23 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder) () } - val future = spark.sparkContext.submitJob( - rdd = batches, - processPartition = (iter: Iterator[Batch]) => iter.toArray, - partitions = Seq.range(0, numPartitions), - resultHandler = resultHandler, - resultFunc = () => ()) - - // Collect errors and propagate them to the main thread. - future.onComplete { - case Success(_) => - executePlan.eventsManager.postFinished() - case Failure(throwable) => - signal.synchronized { - error = Some(throwable) - signal.notify() - } - }(ThreadUtils.sameThread) + val future = spark.sparkContext + .submitJob( + rdd = batches, + processPartition = (iter: Iterator[Batch]) => iter.toArray, + partitions = Seq.range(0, numPartitions), + resultHandler = resultHandler, + resultFunc = () => ()) + // Collect errors and propagate them to the main thread. + .andThen { + case Success(_) => + executePlan.eventsManager.postFinished() + case Failure(throwable) => + signal.synchronized { + error = Some(throwable) + signal.notify() + } + }(ThreadUtils.sameThread) // The main thread will wait until 0-th partition is available, // then send it to client and wait for the next partition. @@ -199,6 +200,9 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder) currentPartitionId += 1 } + ThreadUtils.awaitReady(future, Duration.Inf) + } else { + executePlan.eventsManager.postFinished() } } } diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala index cfa37b86cd4..498084efb8f 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala @@ -564,8 +564,7 @@ class SparkConnectServiceSuite extends SharedSparkSession with MockitoSugar with } } - // TODO(SPARK-44474): Reenable Test observe response at SparkConnectServiceSuite - ignore("Test observe response") { + test("Test observe response") { // TODO(SPARK-44121) Renable Arrow-based connect tests in Java 21 assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) withTable("test") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org