This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 9040932f641c [SPARK-54730][SQL][CONNECT] Delay failure of dataframe
column resolution
9040932f641c is described below
commit 9040932f641cf040e8d03657e51a2bef3a096d50
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Thu Dec 18 08:31:02 2025 +0800
[SPARK-54730][SQL][CONNECT] Delay failure of dataframe column resolution
### What changes were proposed in this pull request?
Delay failure of dataframe column resolution
### Why are the changes needed?
it cause conflicts with delta rules that add hidden column
### Does this PR introduce _any_ user-facing change?
yes, delta query fails before this fix
```py
df = spark.read.option("readChangeFeed", True).option("startingVersion",
0).table("sample_table")
df.select(df._commit_version).show() <- fail with
[CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column
"_commit_version".
```
### How was this patch tested?
added test
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #53503 from zhengruifeng/df_col_delay_fail_v2.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
(cherry picked from commit 36f8f1e1003c676969e5108d58fed57bfbba4ddc)
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../catalyst/analysis/ColumnResolutionHelper.scala | 14 ++++-----
.../spark/sql/SparkSessionExtensionSuite.scala | 34 ++++++++++++++++++++--
2 files changed, 39 insertions(+), 9 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
index 34541a8840cb..870e03364225 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
@@ -140,7 +140,9 @@ trait ColumnResolutionHelper extends Logging with
DataTypeErrorsBase {
}
matched(ordinal)
- case u @ UnresolvedAttribute(nameParts) =>
+ case u @ UnresolvedAttribute(nameParts)
+ if u.getTagValue(LogicalPlan.PLAN_ID_TAG).isEmpty =>
+ // UnresolvedAttribute with PLAN_ID_TAG should be resolved in
resolveDataFrameColumn
val result = withPosition(u) {
resolveColumnByName(nameParts)
.orElse(LiteralFunctionResolution.resolve(nameParts))
@@ -495,8 +497,7 @@ trait ColumnResolutionHelper extends Logging with
DataTypeErrorsBase {
// 1. extract the attached plan id from UnresolvedAttribute;
// 2. top-down traverse the query plan to find the plan node that matches
the plan id;
// 3. if can not find the matching node, fails with
'CANNOT_RESOLVE_DATAFRAME_COLUMN';
- // 4, if the matching node is found, but can not resolve the column, also
fails with
- // 'CANNOT_RESOLVE_DATAFRAME_COLUMN';
+ // 4, if the matching node is found, but can not resolve the column,
return the original one;
// 5, resolve the expression against the target node, the resolved
attribute will be
// filtered by the output attributes of nodes in the path (from
matching to root node);
// 6. if more than one resolved attributes are found in the above
recursive process,
@@ -571,10 +572,9 @@ trait ColumnResolutionHelper extends Logging with
DataTypeErrorsBase {
} else {
None
}
- if (resolved.isEmpty) {
- // The targe plan node is found, but the column cannot be resolved.
- throw QueryCompilationErrors.cannotResolveDataFrameColumn(u)
- }
+ // The targe plan node is found, but might still fail to resolve.
+ // In this case, return None to delay the failure, so it is possible to
be
+ // resolved in the next iteration.
(resolved.map(r => (r, currentDepth)), true)
} else {
val children = p match {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 6ee0029b6839..66826a9ca762 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -26,18 +26,19 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow,
TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute,
UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
Final, Max, Partial}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser,
ParserInterface}
import org.apache.spark.sql.catalyst.plans.{PlanTest, SQLHelper}
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, AggregateHint,
ColumnStat, Limit, LocalRelation, LogicalPlan, Sort, SortHint, Statistics,
UnresolvedHint}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, AggregateHint,
ColumnStat, Limit, LocalRelation, LogicalPlan, Project, Sort, SortHint,
Statistics, UnresolvedHint}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning,
SinglePartition}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.classic.ClassicConversions._
+import org.apache.spark.sql.classic.Dataset
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AdaptiveSparkPlanHelper, AQEShuffleReadExec, QueryStageExec,
ShuffleQueryStageExec}
@@ -91,6 +92,22 @@ class SparkSessionExtensionSuite extends SparkFunSuite with
SQLHelper with Adapt
}
}
+ test("inject analyzer rule - hidden column") {
+ withSession(Seq(_.injectResolutionRule(MyHiddenColumn))) { session:
SparkSession =>
+ val rel = LocalRelation(
+ AttributeReference("a", IntegerType)(),
+ AttributeReference("b", IntegerType)())
+ rel.setTagValue[Long](LogicalPlan.PLAN_ID_TAG, 0L)
+
+ val u = UnresolvedAttribute("x")
+ u.setTagValue[Long](LogicalPlan.PLAN_ID_TAG, 0L)
+ val proj = Project(Seq(u), rel)
+
+ val df = Dataset.ofRows(session, proj)
+ assert(df.schema.fieldNames === Array("x"))
+ }
+ }
+
test("inject post hoc resolution analyzer rule") {
withSession(Seq(_.injectPostHocResolutionRule(MyRule))) { session =>
assert(session.sessionState.analyzer.postHocResolutionRules.contains(MyRule(session)))
@@ -608,6 +625,19 @@ case class MyRule(spark: SparkSession) extends
Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan
}
+case class MyHiddenColumn(spark: SparkSession) extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case rel: LocalRelation if rel.output.size == 2 =>
+ // rel.output.size == 2 for idempotence
+ val newRel = rel.copy(
+ output = rel.output :+ AttributeReference("x", IntegerType)()
+ )
+ assert(rel.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(0L))
+ newRel.setTagValue(LogicalPlan.PLAN_ID_TAG, 0L)
+ newRel
+ }
+}
+
case class MyCheckRule(spark: SparkSession) extends (LogicalPlan => Unit) {
override def apply(plan: LogicalPlan): Unit = { }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]