This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 45865a6c3dc1 [SPARK-52723][SQL][CONNECT] Server side column name 
validation
45865a6c3dc1 is described below

commit 45865a6c3dc1dcfd1ba170f9947cb5c62a09f906
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Thu Jul 10 13:31:24 2025 +0800

    [SPARK-52723][SQL][CONNECT] Server side column name validation
    
    ### What changes were proposed in this pull request?
    Add column name validation in connect server side
    
    ### Why are the changes needed?
    For `df.col('bad_column')` or `df['bad_column']`, currently it is validated 
in this way:
    
    1. classic (both python and scala): eager validation;
    2. connect (python client): eager validation with cached schema, may 
trigger a RPC;
    3. connect (scala client): no eager validation, should fail in following 
analysis or execution, but may hit such edge case:
    
    ```
    val df1 = sql("select * from values(1, 'y') as t1(a, y)")
    val df2 = sql("select * from values(1, 'x') as t2(a, x)")
    val df3 = df1.join(df2, df1("a") === df2("a"))
    val df4 = df3.select(df1("x"))
    ```
    
    `df1` doesn't contain `x` at all, it should fail, but the query actually 
succeeds with `col("x")`.
    That is due to the connect-specific column resolution approach that in some 
case, if fail to resolve with plan id (`df1("x")`), resolve it without the plan 
id (`col("x")`)
    
    ### Does this PR introduce _any_ user-facing change?
    this problematic query should fail
    
    ### How was this patch tested?
    added ut
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #51409 from zhengruifeng/connect_server_side_validation.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 .../catalyst/analysis/ColumnResolutionHelper.scala | 49 +++++++++++++---------
 .../apache/spark/sql/connect/DataFrameSuite.scala  | 13 ++++++
 2 files changed, 42 insertions(+), 20 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
index d3e52d11b465..27dafeaacaf7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
@@ -557,10 +557,15 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
   // expression are from Spark Connect, and need to be resolved in this way:
   //    1. extract the attached plan id from UnresolvedAttribute;
   //    2. top-down traverse the query plan to find the plan node that matches 
the plan id;
-  //    3. if can not find the matching node, fail the analysis due to illegal 
references;
-  //    4. if more than one matching nodes are found, fail due to ambiguous 
column reference;
-  //    5. resolve the expression with the matching node, if any error occurs 
here, return the
-  //       original expression as it is.
+  //    3. if can not find the matching node, fails with 
'CANNOT_RESOLVE_DATAFRAME_COLUMN';
+  //    4, if the matching node is found, but can not resolve the column, also 
fails with
+  //       'CANNOT_RESOLVE_DATAFRAME_COLUMN';
+  //    5, resolve the expression against the target node, the resolved 
attribute will be
+  //       filtered by the output attributes of nodes in the path (from 
matching to root node);
+  //    6. if more than one resolved attributes are found in the above 
recursive process,
+  //       fails with 'AMBIGUOUS_COLUMN_REFERENCE'.
+  //    7. if all the resolved attributes are filtered out, return the 
original expression
+  //       as it is.
   private def tryResolveDataFrameColumns(
       e: Expression,
       q: Seq[LogicalPlan]): Expression = e match {
@@ -622,18 +627,16 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
       p: LogicalPlan,
       currentDepth: Int): (Option[(NamedExpression, Int)], Boolean) = {
     val (resolved, matched) = if 
(p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
-      val resolved = try {
-        if (!isMetadataAccess) {
-          p.resolve(u.nameParts, conf.resolver)
-        } else if (u.nameParts.size == 1) {
-          p.getMetadataAttributeByNameOpt(u.nameParts.head)
-        } else {
-          None
-        }
-      } catch {
-        case e: AnalysisException =>
-          logDebug(s"Fail to resolve $u with $p due to $e")
-          None
+      val resolved = if (!isMetadataAccess) {
+        p.resolve(u.nameParts, conf.resolver)
+      } else if (u.nameParts.size == 1) {
+        p.getMetadataAttributeByNameOpt(u.nameParts.head)
+      } else {
+        None
+      }
+      if (resolved.isEmpty) {
+        // The targe plan node is found, but the column cannot be resolved.
+        throw QueryCompilationErrors.cannotResolveDataFrameColumn(u)
       }
       (resolved.map(r => (r, currentDepth)), true)
     } else {
@@ -662,14 +665,20 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
     // When resolving the column reference df1.a, the target node with 
plan_id=1
     // can be found in both sides of the Join node.
     // To correctly resolve df1.a, the analyzer discards the resolved attribute
-    // in the right side, by filtering out the result by the output attributes 
of
+    // on the right side, by filtering out the result by the output attributes 
of
     // Project plan_id=2.
     //
     // However, there are analyzer rules (e.g. ResolveReferencesInSort)
     // supporting missing column resolution. Then a valid resolved attribute
-    // maybe filtered out here. In this case, resolveDataFrameColumnByPlanId
-    // returns None, the dataframe column will remain unresolved, and the 
analyzer
-    // will try to resolve it without plan id later.
+    // maybe filtered out here. For example:
+    //
+    // from pyspark.sql import functions as sf
+    // df = spark.range(10).withColumn("v", sf.col("id") + 1)
+    // df.select(df.v).sort(df.id)
+    //
+    // In this case, resolveDataFrameColumnByPlanId returns None,
+    // the dataframe column 'df.id' will remain unresolved, and the analyzer
+    // will try to resolve 'id' without plan id later.
     val filtered = resolved.filter { r =>
       if (isMetadataAccess) {
         r._1.references.subsetOf(AttributeSet(p.output ++ p.metadataOutput))
diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataFrameSuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataFrameSuite.scala
index 2993f44efceb..03793c8bbb72 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataFrameSuite.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataFrameSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.connect
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.connect.test.{QueryTest, RemoteSparkSession}
 import org.apache.spark.sql.functions.{concat, lit, when}
 
@@ -42,4 +43,16 @@ class DataFrameSuite extends QueryTest with 
RemoteSparkSession {
     assert(df4.columns === Array("colA", "colB", "colC", "colC", "colD", 
"colE"))
     assert(df4.count() === 1)
   }
+
+  test("lazy column validation") {
+    val session = spark
+    import session.implicits._
+
+    val df1 = Seq(1 -> "y").toDF("a", "y")
+    val df2 = Seq(1 -> "x").toDF("a", "x")
+    val df3 = df1.join(df2, df1("a") === df2("a"))
+    val df4 = df3.select(df1("x")) // <- No exception here
+
+    intercept[AnalysisException] { df4.schema }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to