This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 68305acc01fc [SPARK-50600][CONNECT][SQL] Set analyzed on analysis 
failure
68305acc01fc is described below

commit 68305acc01fc0614a554f76316b35065f56f6e0f
Author: jdesjean <[email protected]>
AuthorDate: Fri Jan 10 12:19:11 2025 +0800

    [SPARK-50600][CONNECT][SQL] Set analyzed on analysis failure
    
    ### What changes were proposed in this pull request?
    As part of 
[SPARK-44145](https://issues.apache.org/jira/browse/SPARK-44145), a callback 
was added to track completion of analysis and optimization phase of a query. 
While the analyzed plan is sent when analysis completes successfully it does 
not when it fail. In that case, we should fallback to the ParsedPlan.
    
    ### Why are the changes needed?
    The purpose of the analyze event is to track when analysis completes, as 
such it should also be sent on both success & failure.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #49383 from jdesjean/jdesjean/SPARK-50600.
    
    Authored-by: jdesjean <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/QueryPlanningTracker.scala  | 21 +++++++++++++++
 .../sql/catalyst/QueryPlanningTrackerSuite.scala   |  6 +++++
 .../sql/connect/service/ExecuteEventsManager.scala | 24 ++++++++++++++---
 .../spark/sql/execution/QueryExecution.scala       | 16 +++++++----
 .../spark/sql/execution/QueryExecutionSuite.scala  | 31 +++++++++++++++++++---
 5 files changed, 87 insertions(+), 11 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
index 2e14c09bc819..d1007404158f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
@@ -94,6 +94,16 @@ object QueryPlanningTracker {
  * Callbacks after planning phase completion.
  */
 abstract class QueryPlanningTrackerCallback {
+  /**
+   * Called when query fails analysis
+   *
+   * @param tracker      tracker that triggered the callback.
+   * @param parsedPlan   The plan prior to analysis
+   *                     see @org.apache.spark.sql.catalyst.analysis.Analyzer
+   */
+  def analysisFailed(tracker: QueryPlanningTracker, parsedPlan: LogicalPlan): 
Unit = {
+    // Noop by default for backward compatibility
+  }
   /**
    * Called when query has been analyzed.
    *
@@ -147,6 +157,17 @@ class QueryPlanningTracker(
     ret
   }
 
+  /**
+   * Set when the query has been parsed but failed to be analyzed.
+   * Can be called multiple times upon plan change.
+   *
+   * @param parsedPlan The plan prior analysis
+   *                   see @org.apache.spark.sql.catalyst.analysis.Analyzer
+   */
+  private[sql] def setAnalysisFailed(parsedPlan: LogicalPlan): Unit = {
+    trackerCallback.foreach(_.analysisFailed(this, parsedPlan))
+  }
+
   /**
    * Set when the query has been analysed.
    * Can be called multiple times upon plan change.
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/QueryPlanningTrackerSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/QueryPlanningTrackerSuite.scala
index 972b98780bcc..500bbef3c89b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/QueryPlanningTrackerSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/QueryPlanningTrackerSuite.scala
@@ -95,7 +95,13 @@ class QueryPlanningTrackerSuite extends SparkFunSuite {
     val mockCallback = mock[QueryPlanningTrackerCallback]
     val mockPlan1 = mock[LogicalPlan]
     val mockPlan2 = mock[LogicalPlan]
+    val mockPlan3 = mock[LogicalPlan]
+    val mockPlan4 = mock[LogicalPlan]
     val t = new QueryPlanningTracker(Some(mockCallback))
+    t.setAnalysisFailed(mockPlan3)
+    verify(mockCallback, times(1)).analysisFailed(t, mockPlan3)
+    t.setAnalysisFailed(mockPlan4)
+    verify(mockCallback, times(1)).analysisFailed(t, mockPlan4)
     t.setAnalyzed(mockPlan1)
     verify(mockCallback, times(1)).analyzed(t, mockPlan1)
     t.setAnalyzed(mockPlan2)
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
index faa7582d169f..61cd95621d15 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
@@ -145,13 +145,19 @@ case class ExecuteEventsManager(executeHolder: 
ExecuteHolder, clock: Clock) {
    *
    * @param analyzedPlan
    *   The analyzed plan generated by the Connect request plan. None when the 
request does not
-   *   generate a plan.
+   *   generate a Spark plan or analysis fails.
+   * @param parsedPlan
+   *   The analyzed plan generated by the Connect request plan. None when the 
request does not
+   *   generate a plan or does not fail analysis.
    */
-  def postAnalyzed(analyzedPlan: Option[LogicalPlan] = None): Unit = {
+  def postAnalyzed(
+      analyzedPlan: Option[LogicalPlan] = None,
+      parsedPlan: Option[LogicalPlan] = None): Unit = {
     assertStatus(List(ExecuteStatus.Started, ExecuteStatus.Analyzed), 
ExecuteStatus.Analyzed)
     val event =
       SparkListenerConnectOperationAnalyzed(jobTag, operationId, 
clock.getTimeMillis())
     event.analyzedPlan = analyzedPlan
+    event.parsedPlan = parsedPlan
     listenerBus.post(event)
   }
 
@@ -251,6 +257,12 @@ case class ExecuteEventsManager(executeHolder: 
ExecuteHolder, clock: Clock) {
         postAnalyzed(Some(analyzedPlan))
       }
 
+      override def analysisFailed(
+          tracker: QueryPlanningTracker,
+          parsedPlan: LogicalPlan): Unit = {
+        postAnalyzed(parsedPlan = Some(parsedPlan))
+      }
+
       def readyForExecution(tracker: QueryPlanningTracker): Unit = 
postReadyForExecution()
     }))
   }
@@ -341,9 +353,15 @@ case class SparkListenerConnectOperationAnalyzed(
     extraTags: Map[String, String] = Map.empty)
     extends SparkListenerEvent {
 
+  /**
+   * Parsed Spark plan generated by the Connect request. None when the Connect 
request does not
+   * generate a Spark plan or does not fail analysis.
+   */
+  @JsonIgnore var parsedPlan: Option[LogicalPlan] = None
+
   /**
    * Analyzed Spark plan generated by the Connect request. None when the 
Connect request does not
-   * generate a Spark plan.
+   * generate a Spark plan or analysis fails.
    */
   @JsonIgnore var analyzedPlan: Option[LogicalPlan] = None
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 5695ea57e7fb..d9b1a2136a5d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -92,12 +92,18 @@ class QueryExecution(
   }
 
   private val lazyAnalyzed = LazyTry {
-    val plan = executePhase(QueryPlanningTracker.ANALYSIS) {
-      // We can't clone `logical` here, which will reset the `_analyzed` flag.
-      sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
+    try {
+      val plan = executePhase(QueryPlanningTracker.ANALYSIS) {
+        // We can't clone `logical` here, which will reset the `_analyzed` 
flag.
+        sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
+      }
+      tracker.setAnalyzed(plan)
+      plan
+    } catch {
+      case NonFatal(e) =>
+        tracker.setAnalysisFailed(logical)
+        throw e
     }
-    tracker.setAnalyzed(plan)
-    plan
   }
 
   def analyzed: LogicalPlan = lazyAnalyzed.get
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
index 974be2f62799..d670b3d8c77d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
@@ -18,11 +18,12 @@ package org.apache.spark.sql.execution
 
 import scala.collection.mutable
 import scala.io.Source
+import scala.util.Try
 
 import org.apache.spark.sql.{AnalysisException, Dataset, 
ExtendedExplainGenerator, FastOperator}
-import org.apache.spark.sql.catalyst.{QueryPlanningTracker, 
QueryPlanningTrackerCallback}
-import org.apache.spark.sql.catalyst.analysis.CurrentNamespace
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.{QueryPlanningTracker, 
QueryPlanningTrackerCallback, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{CurrentNamespace, 
UnresolvedFunction, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.expressions.{Alias, UnsafeRow}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, 
LogicalPlan, OneRowRelation, Project, ShowTables, SubqueryAlias}
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
@@ -405,6 +406,21 @@ class QueryExecutionSuite extends SharedSparkSession {
     }
   }
 
+  test("SPARK-50600: Failed analysis should send analyzed event") {
+    val mockCallback = MockCallback()
+
+    def table(ref: String): LogicalPlan = 
UnresolvedRelation(TableIdentifier(ref))
+
+    val unresolvedUndefinedFunc = UnresolvedFunction("unknown", Seq.empty, 
isDistinct = false)
+    val plan = Project(Seq(Alias(unresolvedUndefinedFunc, "call1")()), 
table("table"))
+    val dataset = Try {
+      val df = Dataset.ofRows(spark, plan, new 
QueryPlanningTracker(Some(mockCallback)))
+      df.queryExecution.assertAnalyzed()
+    }
+    assert(dataset.failed.get.isInstanceOf[AnalysisException])
+    mockCallback.assertAnalyzed()
+  }
+
   case class MockCallbackEagerCommand(
       var trackerAnalyzed: QueryPlanningTracker = null,
       var trackerReadyForExecution: QueryPlanningTracker = null)
@@ -447,6 +463,15 @@ class QueryExecutionSuite extends SharedSparkSession {
       var trackerAnalyzed: QueryPlanningTracker = null,
       var trackerReadyForExecution: QueryPlanningTracker = null)
       extends QueryPlanningTrackerCallback {
+    override def analysisFailed(
+        trackerFromCallback: QueryPlanningTracker,
+        analyzedPlan: LogicalPlan): Unit = {
+      trackerAnalyzed = trackerFromCallback
+      
assert(!trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.ANALYSIS))
+      
assert(!trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.OPTIMIZATION))
+      
assert(!trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.PLANNING))
+      assert(analyzedPlan != null)
+    }
     def analyzed(trackerFromCallback: QueryPlanningTracker, plan: 
LogicalPlan): Unit = {
       trackerAnalyzed = trackerFromCallback
       
assert(trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.ANALYSIS))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to