This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 68305acc01fc [SPARK-50600][CONNECT][SQL] Set analyzed on analysis
failure
68305acc01fc is described below
commit 68305acc01fc0614a554f76316b35065f56f6e0f
Author: jdesjean <[email protected]>
AuthorDate: Fri Jan 10 12:19:11 2025 +0800
[SPARK-50600][CONNECT][SQL] Set analyzed on analysis failure
### What changes were proposed in this pull request?
As part of
[SPARK-44145](https://issues.apache.org/jira/browse/SPARK-44145), a callback
was added to track completion of analysis and optimization phase of a query.
While the analyzed plan is sent when analysis completes successfully it does
not when it fail. In that case, we should fallback to the ParsedPlan.
### Why are the changes needed?
The purpose of the analyze event is to track when analysis completes, as
such it should also be sent on both success & failure.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #49383 from jdesjean/jdesjean/SPARK-50600.
Authored-by: jdesjean <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/QueryPlanningTracker.scala | 21 +++++++++++++++
.../sql/catalyst/QueryPlanningTrackerSuite.scala | 6 +++++
.../sql/connect/service/ExecuteEventsManager.scala | 24 ++++++++++++++---
.../spark/sql/execution/QueryExecution.scala | 16 +++++++----
.../spark/sql/execution/QueryExecutionSuite.scala | 31 +++++++++++++++++++---
5 files changed, 87 insertions(+), 11 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
index 2e14c09bc819..d1007404158f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
@@ -94,6 +94,16 @@ object QueryPlanningTracker {
* Callbacks after planning phase completion.
*/
abstract class QueryPlanningTrackerCallback {
+ /**
+ * Called when query fails analysis
+ *
+ * @param tracker tracker that triggered the callback.
+ * @param parsedPlan The plan prior to analysis
+ * see @org.apache.spark.sql.catalyst.analysis.Analyzer
+ */
+ def analysisFailed(tracker: QueryPlanningTracker, parsedPlan: LogicalPlan):
Unit = {
+ // Noop by default for backward compatibility
+ }
/**
* Called when query has been analyzed.
*
@@ -147,6 +157,17 @@ class QueryPlanningTracker(
ret
}
+ /**
+ * Set when the query has been parsed but failed to be analyzed.
+ * Can be called multiple times upon plan change.
+ *
+ * @param parsedPlan The plan prior analysis
+ * see @org.apache.spark.sql.catalyst.analysis.Analyzer
+ */
+ private[sql] def setAnalysisFailed(parsedPlan: LogicalPlan): Unit = {
+ trackerCallback.foreach(_.analysisFailed(this, parsedPlan))
+ }
+
/**
* Set when the query has been analysed.
* Can be called multiple times upon plan change.
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/QueryPlanningTrackerSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/QueryPlanningTrackerSuite.scala
index 972b98780bcc..500bbef3c89b 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/QueryPlanningTrackerSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/QueryPlanningTrackerSuite.scala
@@ -95,7 +95,13 @@ class QueryPlanningTrackerSuite extends SparkFunSuite {
val mockCallback = mock[QueryPlanningTrackerCallback]
val mockPlan1 = mock[LogicalPlan]
val mockPlan2 = mock[LogicalPlan]
+ val mockPlan3 = mock[LogicalPlan]
+ val mockPlan4 = mock[LogicalPlan]
val t = new QueryPlanningTracker(Some(mockCallback))
+ t.setAnalysisFailed(mockPlan3)
+ verify(mockCallback, times(1)).analysisFailed(t, mockPlan3)
+ t.setAnalysisFailed(mockPlan4)
+ verify(mockCallback, times(1)).analysisFailed(t, mockPlan4)
t.setAnalyzed(mockPlan1)
verify(mockCallback, times(1)).analyzed(t, mockPlan1)
t.setAnalyzed(mockPlan2)
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
index faa7582d169f..61cd95621d15 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
@@ -145,13 +145,19 @@ case class ExecuteEventsManager(executeHolder:
ExecuteHolder, clock: Clock) {
*
* @param analyzedPlan
* The analyzed plan generated by the Connect request plan. None when the
request does not
- * generate a plan.
+ * generate a Spark plan or analysis fails.
+ * @param parsedPlan
+ * The analyzed plan generated by the Connect request plan. None when the
request does not
+ * generate a plan or does not fail analysis.
*/
- def postAnalyzed(analyzedPlan: Option[LogicalPlan] = None): Unit = {
+ def postAnalyzed(
+ analyzedPlan: Option[LogicalPlan] = None,
+ parsedPlan: Option[LogicalPlan] = None): Unit = {
assertStatus(List(ExecuteStatus.Started, ExecuteStatus.Analyzed),
ExecuteStatus.Analyzed)
val event =
SparkListenerConnectOperationAnalyzed(jobTag, operationId,
clock.getTimeMillis())
event.analyzedPlan = analyzedPlan
+ event.parsedPlan = parsedPlan
listenerBus.post(event)
}
@@ -251,6 +257,12 @@ case class ExecuteEventsManager(executeHolder:
ExecuteHolder, clock: Clock) {
postAnalyzed(Some(analyzedPlan))
}
+ override def analysisFailed(
+ tracker: QueryPlanningTracker,
+ parsedPlan: LogicalPlan): Unit = {
+ postAnalyzed(parsedPlan = Some(parsedPlan))
+ }
+
def readyForExecution(tracker: QueryPlanningTracker): Unit =
postReadyForExecution()
}))
}
@@ -341,9 +353,15 @@ case class SparkListenerConnectOperationAnalyzed(
extraTags: Map[String, String] = Map.empty)
extends SparkListenerEvent {
+ /**
+ * Parsed Spark plan generated by the Connect request. None when the Connect
request does not
+ * generate a Spark plan or does not fail analysis.
+ */
+ @JsonIgnore var parsedPlan: Option[LogicalPlan] = None
+
/**
* Analyzed Spark plan generated by the Connect request. None when the
Connect request does not
- * generate a Spark plan.
+ * generate a Spark plan or analysis fails.
*/
@JsonIgnore var analyzedPlan: Option[LogicalPlan] = None
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 5695ea57e7fb..d9b1a2136a5d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -92,12 +92,18 @@ class QueryExecution(
}
private val lazyAnalyzed = LazyTry {
- val plan = executePhase(QueryPlanningTracker.ANALYSIS) {
- // We can't clone `logical` here, which will reset the `_analyzed` flag.
- sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
+ try {
+ val plan = executePhase(QueryPlanningTracker.ANALYSIS) {
+ // We can't clone `logical` here, which will reset the `_analyzed`
flag.
+ sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
+ }
+ tracker.setAnalyzed(plan)
+ plan
+ } catch {
+ case NonFatal(e) =>
+ tracker.setAnalysisFailed(logical)
+ throw e
}
- tracker.setAnalyzed(plan)
- plan
}
def analyzed: LogicalPlan = lazyAnalyzed.get
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
index 974be2f62799..d670b3d8c77d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
@@ -18,11 +18,12 @@ package org.apache.spark.sql.execution
import scala.collection.mutable
import scala.io.Source
+import scala.util.Try
import org.apache.spark.sql.{AnalysisException, Dataset,
ExtendedExplainGenerator, FastOperator}
-import org.apache.spark.sql.catalyst.{QueryPlanningTracker,
QueryPlanningTrackerCallback}
-import org.apache.spark.sql.catalyst.analysis.CurrentNamespace
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.{QueryPlanningTracker,
QueryPlanningTrackerCallback, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{CurrentNamespace,
UnresolvedFunction, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.expressions.{Alias, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult,
LogicalPlan, OneRowRelation, Project, ShowTables, SubqueryAlias}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
@@ -405,6 +406,21 @@ class QueryExecutionSuite extends SharedSparkSession {
}
}
+ test("SPARK-50600: Failed analysis should send analyzed event") {
+ val mockCallback = MockCallback()
+
+ def table(ref: String): LogicalPlan =
UnresolvedRelation(TableIdentifier(ref))
+
+ val unresolvedUndefinedFunc = UnresolvedFunction("unknown", Seq.empty,
isDistinct = false)
+ val plan = Project(Seq(Alias(unresolvedUndefinedFunc, "call1")()),
table("table"))
+ val dataset = Try {
+ val df = Dataset.ofRows(spark, plan, new
QueryPlanningTracker(Some(mockCallback)))
+ df.queryExecution.assertAnalyzed()
+ }
+ assert(dataset.failed.get.isInstanceOf[AnalysisException])
+ mockCallback.assertAnalyzed()
+ }
+
case class MockCallbackEagerCommand(
var trackerAnalyzed: QueryPlanningTracker = null,
var trackerReadyForExecution: QueryPlanningTracker = null)
@@ -447,6 +463,15 @@ class QueryExecutionSuite extends SharedSparkSession {
var trackerAnalyzed: QueryPlanningTracker = null,
var trackerReadyForExecution: QueryPlanningTracker = null)
extends QueryPlanningTrackerCallback {
+ override def analysisFailed(
+ trackerFromCallback: QueryPlanningTracker,
+ analyzedPlan: LogicalPlan): Unit = {
+ trackerAnalyzed = trackerFromCallback
+
assert(!trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.ANALYSIS))
+
assert(!trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.OPTIMIZATION))
+
assert(!trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.PLANNING))
+ assert(analyzedPlan != null)
+ }
def analyzed(trackerFromCallback: QueryPlanningTracker, plan:
LogicalPlan): Unit = {
trackerAnalyzed = trackerFromCallback
assert(trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.ANALYSIS))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]