This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f1cb7a730b18 [SPARK-49318][SQL] Preempt low priority error on LCA 
until end of check analysis to improve error experience
f1cb7a730b18 is described below

commit f1cb7a730b18c729d27a10cb51ca1d738a584f8a
Author: Xinyi Yu <xinyi...@databricks.com>
AuthorDate: Wed Aug 21 13:06:13 2024 -0700

    [SPARK-49318][SQL] Preempt low priority error on LCA until end of check 
analysis to improve error experience
    
    ### What changes were proposed in this pull request?
    This PR allows high-priority analysis errors to be thrown first when a 
low-priority error on LCA (lateral column alias) occurs in the same query.
    It preempts and stores the LCA error during check analysis. If another 
error happens, it will be thrown first; If not, the LCA error will be thrown at 
the end of the analysis.
    
    ### Why are the changes needed?
    This helps improve the user experience on errors, to provide higher 
priority information and the root cause to users.
    The test in the PR is one case that UNRESOLVED_COLUMN should be thrown, 
instead of the LCA internal error, which was thrown without the change because 
the segment sits in the CTE definition that is traversed first in CheckAnalysis.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    A new test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #47813 from anchovYu/lca-error-priority.
    
    Authored-by: Xinyi Yu <xinyi...@databricks.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../sql/catalyst/analysis/CheckAnalysis.scala      | 52 +++++++++++++++++++---
 .../sql/catalyst/analysis/AnalysisSuite.scala      | 23 ++++++++++
 .../apache/spark/sql/LateralColumnAliasSuite.scala | 34 ++++++++++++++
 3 files changed, 104 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index aa5a2cd95074..a9fbe548ba39 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import scala.collection.mutable
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkThrowable}
 import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.ExtendedAnalysisException
@@ -57,6 +57,11 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
   val DATA_TYPE_MISMATCH_ERROR = TreeNodeTag[Unit]("dataTypeMismatchError")
   val INVALID_FORMAT_ERROR = TreeNodeTag[Unit]("invalidFormatError")
 
+  // Error that is not supposed to throw immediately on triggering, e.g. 
certain internal errors.
+  // The error will be thrown at the end of the whole check analysis process, 
if no other error
+  // occurs.
+  val preemptedError = new PreemptedError()
+
   /**
    * Fails the analysis at the point where a specific tree node was parsed 
using a provided
    * error class and message parameters.
@@ -114,10 +119,15 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
   private def checkNotContainingLCA(exprs: Seq[Expression], plan: 
LogicalPlan): Unit = {
     
exprs.foreach(_.transformDownWithPruning(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE))
 {
       case lcaRef: LateralColumnAliasReference =>
-        throw SparkException.internalError("Resolved plan should not contain 
any " +
-          s"LateralColumnAliasReference.\nDebugging information: plan:\n$plan",
-          context = lcaRef.origin.getQueryContext,
-          summary = lcaRef.origin.context.summary)
+        // this should be a low priority internal error to be preempted
+        preemptedError.set(
+          SparkException.internalError(
+            "Resolved plan should not contain any " +
+            s"LateralColumnAliasReference.\nDebugging information: 
plan:\n$plan",
+            context = lcaRef.origin.getQueryContext,
+            summary = lcaRef.origin.context.summary)
+        )
+        lcaRef
     })
   }
 
@@ -174,11 +184,15 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
       case e: AnalysisException =>
         throw new ExtendedAnalysisException(e, plan)
     }
+    preemptedError.clear()
     try {
       checkAnalysis0(inlinedPlan)
+      preemptedError.getErrorOpt().foreach(throw _) // throw preempted error 
if any
     } catch {
       case e: AnalysisException =>
         throw new ExtendedAnalysisException(e, inlinedPlan)
+    } finally {
+      preemptedError.clear()
     }
     plan.setAnalyzed()
   }
@@ -1546,3 +1560,31 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
     }
   }
 }
+
+// a heap of the preempted error that only keeps the top priority element, 
representing the sole
+// error to be thrown at the end of the whole check analysis process, if no 
other error occurs.
+class PreemptedError() {
+  case class ErrorWithPriority(error: Exception with SparkThrowable, priority: 
Int) {}
+
+  private var errorOpt: Option[ErrorWithPriority] = None
+
+  // Set/overwrite the given error as the preempted error, if no other errors 
are preempted, or it
+  // has a higher priority than the existing one.
+  // If the priority is not provided, it will be calculated based on error 
class. Currently internal
+  // errors have the lowest priority.
+  def set(error: Exception with SparkThrowable, priority: Option[Int] = None): 
Unit = {
+    val calculatedPriority = priority.getOrElse {
+      error.getErrorClass match {
+        case c if c.startsWith("INTERNAL_ERROR") => 1
+        case _ => 2
+      }
+    }
+    if (errorOpt.isEmpty || calculatedPriority > errorOpt.get.priority) {
+      errorOpt = Some(ErrorWithPriority(error, calculatedPriority))
+    }
+  }
+
+  def getErrorOpt(): Option[Exception with SparkThrowable] = 
errorOpt.map(_.error)
+
+  def clear(): Unit = errorOpt = None
+}
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 1bb1412434c6..1816c620414c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -43,6 +43,7 @@ import 
org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.connector.catalog.InMemoryTable
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -1807,4 +1808,26 @@ class AnalysisSuite extends AnalysisTest with Matchers {
     val plan = testRelation.select(udf.as("u")).select($"u").analyze
     assert(plan.output.head.nullable)
   }
+
+  test("test methods of PreemptedError") {
+    val preemptedError = new PreemptedError()
+    assert(preemptedError.getErrorOpt().isEmpty)
+
+    val internalError = SparkException.internalError("some internal error to 
be preempted")
+    preemptedError.set(internalError)
+    assert(preemptedError.getErrorOpt().contains(internalError))
+
+    // set error with higher priority will overwrite
+    val regularError = QueryCompilationErrors.unresolvedColumnError("name", 
Seq("a"))
+      .asInstanceOf[AnalysisException]
+    preemptedError.set(regularError)
+    assert(preemptedError.getErrorOpt().contains(regularError))
+
+    // set error with lower priority is noop
+    preemptedError.set(internalError)
+    assert(preemptedError.getErrorOpt().contains(regularError))
+
+    preemptedError.clear()
+    assert(preemptedError.getErrorOpt().isEmpty)
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala
index 14895c2f8692..336cf12ae57c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala
@@ -1319,4 +1319,38 @@ class LateralColumnAliasSuite extends 
LateralColumnAliasSuiteBase {
       Row(2) :: Nil
     )
   }
+
+  test("LCA internal error should have lower priority") {
+    // in this query, the 'order by Freq DESC' error should be the top error 
surfaced to users
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql(
+          """
+            |WITH group_counts AS (
+            |  SELECT id, count(*) as Freq, CASE WHEN Freq <= 10 THEN "1" ELSE 
"2" END AS Group
+            |  FROM values (123) as data(id)
+            |  GROUP BY id
+            |)
+            |SELECT Group, count(*) * 100.0 / (select count(*) from 
group_counts) AS Percentage
+            |FROM group_counts
+            |Group BY Group
+            |ORDER BY Freq DESC;
+            |""".stripMargin
+        )
+      },
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      sqlState = "42703",
+      parameters = Map(
+        "objectName" -> "`Freq`",
+        "proposal" -> "`Percentage`, `group_counts`.`Group`"
+      ),
+      context = ExpectedContext(
+        fragment = "Freq",
+        start = 280,
+        stop = 283)
+    )
+
+    // the states are cleared - a subsequent correct query should succeed
+    sql("select 1 as a, a").queryExecution.assertAnalyzed()
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to