This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 45865a6c3dc1 [SPARK-52723][SQL][CONNECT] Server side column name validation 45865a6c3dc1 is described below commit 45865a6c3dc1dcfd1ba170f9947cb5c62a09f906 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Thu Jul 10 13:31:24 2025 +0800 [SPARK-52723][SQL][CONNECT] Server side column name validation ### What changes were proposed in this pull request? Add column name validation in connect server side ### Why are the changes needed? For `df.col('bad_column')` or `df['bad_column']`, currently it is validated in this way: 1. classic (both python and scala): eager validation; 2. connect (python client): eager validation with cached schema, may trigger a RPC; 3. connect (scala client): no eager validation, should fail in following analysis or execution, but may hit such edge case: ``` val df1 = sql("select * from values(1, 'y') as t1(a, y)") val df2 = sql("select * from values(1, 'x') as t2(a, x)") val df3 = df1.join(df2, df1("a") === df2("a")) val df4 = df3.select(df1("x")) ``` `df1` doesn't contain `x` at all, it should fail, but the query actually succeeds with `col("x")`. That is due to the connect-specific column resolution approach that in some case, if fail to resolve with plan id (`df1("x")`), resolve it without the plan id (`col("x")`) ### Does this PR introduce _any_ user-facing change? this problematic query should fail ### How was this patch tested? added ut ### Was this patch authored or co-authored using generative AI tooling? no Closes #51409 from zhengruifeng/connect_server_side_validation. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../catalyst/analysis/ColumnResolutionHelper.scala | 49 +++++++++++++--------- .../apache/spark/sql/connect/DataFrameSuite.scala | 13 ++++++ 2 files changed, 42 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala index d3e52d11b465..27dafeaacaf7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala @@ -557,10 +557,15 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase { // expression are from Spark Connect, and need to be resolved in this way: // 1. extract the attached plan id from UnresolvedAttribute; // 2. top-down traverse the query plan to find the plan node that matches the plan id; - // 3. if can not find the matching node, fail the analysis due to illegal references; - // 4. if more than one matching nodes are found, fail due to ambiguous column reference; - // 5. resolve the expression with the matching node, if any error occurs here, return the - // original expression as it is. + // 3. if can not find the matching node, fails with 'CANNOT_RESOLVE_DATAFRAME_COLUMN'; + // 4, if the matching node is found, but can not resolve the column, also fails with + // 'CANNOT_RESOLVE_DATAFRAME_COLUMN'; + // 5, resolve the expression against the target node, the resolved attribute will be + // filtered by the output attributes of nodes in the path (from matching to root node); + // 6. if more than one resolved attributes are found in the above recursive process, + // fails with 'AMBIGUOUS_COLUMN_REFERENCE'. + // 7. if all the resolved attributes are filtered out, return the original expression + // as it is. private def tryResolveDataFrameColumns( e: Expression, q: Seq[LogicalPlan]): Expression = e match { @@ -622,18 +627,16 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase { p: LogicalPlan, currentDepth: Int): (Option[(NamedExpression, Int)], Boolean) = { val (resolved, matched) = if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) { - val resolved = try { - if (!isMetadataAccess) { - p.resolve(u.nameParts, conf.resolver) - } else if (u.nameParts.size == 1) { - p.getMetadataAttributeByNameOpt(u.nameParts.head) - } else { - None - } - } catch { - case e: AnalysisException => - logDebug(s"Fail to resolve $u with $p due to $e") - None + val resolved = if (!isMetadataAccess) { + p.resolve(u.nameParts, conf.resolver) + } else if (u.nameParts.size == 1) { + p.getMetadataAttributeByNameOpt(u.nameParts.head) + } else { + None + } + if (resolved.isEmpty) { + // The targe plan node is found, but the column cannot be resolved. + throw QueryCompilationErrors.cannotResolveDataFrameColumn(u) } (resolved.map(r => (r, currentDepth)), true) } else { @@ -662,14 +665,20 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase { // When resolving the column reference df1.a, the target node with plan_id=1 // can be found in both sides of the Join node. // To correctly resolve df1.a, the analyzer discards the resolved attribute - // in the right side, by filtering out the result by the output attributes of + // on the right side, by filtering out the result by the output attributes of // Project plan_id=2. // // However, there are analyzer rules (e.g. ResolveReferencesInSort) // supporting missing column resolution. Then a valid resolved attribute - // maybe filtered out here. In this case, resolveDataFrameColumnByPlanId - // returns None, the dataframe column will remain unresolved, and the analyzer - // will try to resolve it without plan id later. + // maybe filtered out here. For example: + // + // from pyspark.sql import functions as sf + // df = spark.range(10).withColumn("v", sf.col("id") + 1) + // df.select(df.v).sort(df.id) + // + // In this case, resolveDataFrameColumnByPlanId returns None, + // the dataframe column 'df.id' will remain unresolved, and the analyzer + // will try to resolve 'id' without plan id later. val filtered = resolved.filter { r => if (isMetadataAccess) { r._1.references.subsetOf(AttributeSet(p.output ++ p.metadataOutput)) diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataFrameSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataFrameSuite.scala index 2993f44efceb..03793c8bbb72 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataFrameSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataFrameSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.connect +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.connect.test.{QueryTest, RemoteSparkSession} import org.apache.spark.sql.functions.{concat, lit, when} @@ -42,4 +43,16 @@ class DataFrameSuite extends QueryTest with RemoteSparkSession { assert(df4.columns === Array("colA", "colB", "colC", "colC", "colD", "colE")) assert(df4.count() === 1) } + + test("lazy column validation") { + val session = spark + import session.implicits._ + + val df1 = Seq(1 -> "y").toDF("a", "y") + val df2 = Seq(1 -> "x").toDF("a", "x") + val df3 = df1.join(df2, df1("a") === df2("a")) + val df4 = df3.select(df1("x")) // <- No exception here + + intercept[AnalysisException] { df4.schema } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org