This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f1cb7a730b18 [SPARK-49318][SQL] Preempt low priority error on LCA until end of check analysis to improve error experience f1cb7a730b18 is described below commit f1cb7a730b18c729d27a10cb51ca1d738a584f8a Author: Xinyi Yu <xinyi...@databricks.com> AuthorDate: Wed Aug 21 13:06:13 2024 -0700 [SPARK-49318][SQL] Preempt low priority error on LCA until end of check analysis to improve error experience ### What changes were proposed in this pull request? This PR allows high-priority analysis errors to be thrown first when a low-priority error on LCA (lateral column alias) occurs in the same query. It preempts and stores the LCA error during check analysis. If another error happens, it will be thrown first; If not, the LCA error will be thrown at the end of the analysis. ### Why are the changes needed? This helps improve the user experience on errors, to provide higher priority information and the root cause to users. The test in the PR is one case that UNRESOLVED_COLUMN should be thrown, instead of the LCA internal error, which was thrown without the change because the segment sits in the CTE definition that is traversed first in CheckAnalysis. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? A new test case. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47813 from anchovYu/lca-error-priority. Authored-by: Xinyi Yu <xinyi...@databricks.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../sql/catalyst/analysis/CheckAnalysis.scala | 52 +++++++++++++++++++--- .../sql/catalyst/analysis/AnalysisSuite.scala | 23 ++++++++++ .../apache/spark/sql/LateralColumnAliasSuite.scala | 34 ++++++++++++++ 3 files changed, 104 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index aa5a2cd95074..a9fbe548ba39 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, SparkThrowable} import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.ExtendedAnalysisException @@ -57,6 +57,11 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB val DATA_TYPE_MISMATCH_ERROR = TreeNodeTag[Unit]("dataTypeMismatchError") val INVALID_FORMAT_ERROR = TreeNodeTag[Unit]("invalidFormatError") + // Error that is not supposed to throw immediately on triggering, e.g. certain internal errors. + // The error will be thrown at the end of the whole check analysis process, if no other error + // occurs. + val preemptedError = new PreemptedError() + /** * Fails the analysis at the point where a specific tree node was parsed using a provided * error class and message parameters. @@ -114,10 +119,15 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB private def checkNotContainingLCA(exprs: Seq[Expression], plan: LogicalPlan): Unit = { exprs.foreach(_.transformDownWithPruning(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) { case lcaRef: LateralColumnAliasReference => - throw SparkException.internalError("Resolved plan should not contain any " + - s"LateralColumnAliasReference.\nDebugging information: plan:\n$plan", - context = lcaRef.origin.getQueryContext, - summary = lcaRef.origin.context.summary) + // this should be a low priority internal error to be preempted + preemptedError.set( + SparkException.internalError( + "Resolved plan should not contain any " + + s"LateralColumnAliasReference.\nDebugging information: plan:\n$plan", + context = lcaRef.origin.getQueryContext, + summary = lcaRef.origin.context.summary) + ) + lcaRef }) } @@ -174,11 +184,15 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB case e: AnalysisException => throw new ExtendedAnalysisException(e, plan) } + preemptedError.clear() try { checkAnalysis0(inlinedPlan) + preemptedError.getErrorOpt().foreach(throw _) // throw preempted error if any } catch { case e: AnalysisException => throw new ExtendedAnalysisException(e, inlinedPlan) + } finally { + preemptedError.clear() } plan.setAnalyzed() } @@ -1546,3 +1560,31 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB } } } + +// a heap of the preempted error that only keeps the top priority element, representing the sole +// error to be thrown at the end of the whole check analysis process, if no other error occurs. +class PreemptedError() { + case class ErrorWithPriority(error: Exception with SparkThrowable, priority: Int) {} + + private var errorOpt: Option[ErrorWithPriority] = None + + // Set/overwrite the given error as the preempted error, if no other errors are preempted, or it + // has a higher priority than the existing one. + // If the priority is not provided, it will be calculated based on error class. Currently internal + // errors have the lowest priority. + def set(error: Exception with SparkThrowable, priority: Option[Int] = None): Unit = { + val calculatedPriority = priority.getOrElse { + error.getErrorClass match { + case c if c.startsWith("INTERNAL_ERROR") => 1 + case _ => 2 + } + } + if (errorOpt.isEmpty || calculatedPriority > errorOpt.get.priority) { + errorOpt = Some(ErrorWithPriority(error, calculatedPriority)) + } + } + + def getErrorOpt(): Option[Exception with SparkThrowable] = errorOpt.map(_.error) + + def clear(): Unit = errorOpt = None +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 1bb1412434c6..1816c620414c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.connector.catalog.InMemoryTable +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -1807,4 +1808,26 @@ class AnalysisSuite extends AnalysisTest with Matchers { val plan = testRelation.select(udf.as("u")).select($"u").analyze assert(plan.output.head.nullable) } + + test("test methods of PreemptedError") { + val preemptedError = new PreemptedError() + assert(preemptedError.getErrorOpt().isEmpty) + + val internalError = SparkException.internalError("some internal error to be preempted") + preemptedError.set(internalError) + assert(preemptedError.getErrorOpt().contains(internalError)) + + // set error with higher priority will overwrite + val regularError = QueryCompilationErrors.unresolvedColumnError("name", Seq("a")) + .asInstanceOf[AnalysisException] + preemptedError.set(regularError) + assert(preemptedError.getErrorOpt().contains(regularError)) + + // set error with lower priority is noop + preemptedError.set(internalError) + assert(preemptedError.getErrorOpt().contains(regularError)) + + preemptedError.clear() + assert(preemptedError.getErrorOpt().isEmpty) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala index 14895c2f8692..336cf12ae57c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala @@ -1319,4 +1319,38 @@ class LateralColumnAliasSuite extends LateralColumnAliasSuiteBase { Row(2) :: Nil ) } + + test("LCA internal error should have lower priority") { + // in this query, the 'order by Freq DESC' error should be the top error surfaced to users + checkError( + exception = intercept[AnalysisException] { + sql( + """ + |WITH group_counts AS ( + | SELECT id, count(*) as Freq, CASE WHEN Freq <= 10 THEN "1" ELSE "2" END AS Group + | FROM values (123) as data(id) + | GROUP BY id + |) + |SELECT Group, count(*) * 100.0 / (select count(*) from group_counts) AS Percentage + |FROM group_counts + |Group BY Group + |ORDER BY Freq DESC; + |""".stripMargin + ) + }, + errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION", + sqlState = "42703", + parameters = Map( + "objectName" -> "`Freq`", + "proposal" -> "`Percentage`, `group_counts`.`Group`" + ), + context = ExpectedContext( + fragment = "Freq", + start = 280, + stop = 283) + ) + + // the states are cleared - a subsequent correct query should succeed + sql("select 1 as a, a").queryExecution.assertAnalyzed() + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org