This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 308d3e899fd9 [SPARK-52686][SQL] `Union` should be resolved only if there are no duplicates 308d3e899fd9 is described below commit 308d3e899fd9271c98c7f815829c58a0ba35cd7b Author: Mihailo Timotic <mihailo.timo...@databricks.com> AuthorDate: Wed Jul 9 10:30:18 2025 +0800 [SPARK-52686][SQL] `Union` should be resolved only if there are no duplicates ### What changes were proposed in this pull request? Union should be `resolved` only if there are no duplicates in any of the children and there are no conflicting attributes per branch. ### Why are the changes needed? This is necessary in order to prevent some rules like `ResolveReferences` or `WidenSetOperationTypes` resolving upper nodes while Union still has duplicate expr ids. For the following query pattern: ``` sql("""CREATE TABLE t1 (col1 STRING, col2 STRING, col3 STRING)""".stripMargin) sql("""CREATE TABLE t2 (col1 STRING, col2 DOUBLE, col3 STRING)""".stripMargin) sql("""CREATE TABLE t3 (col1 STRING, col2 DOUBLE, a STRING, col3 STRING)""".stripMargin) sql("""SELECT | * |FROM ( | SELECT col1, col2, NULL AS a, col1 FROM t1 | UNION | SELECT col1, col2, NULL AS a, col3 FROM t2 | UNION | SELECT * FROM t3 |)""".stripMargin) ``` Because at the moment `Union` can be resolved even if there are duplicates in a branch, plan is transformed in a following way: ``` Union +- Union :- Project col1#5, col2#6, null AS a#3, col1#5 +-Project col1#8, col2#9, null AS a#4, col3#10 ``` becomes ``` Union +- Project col1#5, col2#16, cast(a#3 as string) AS a#17, col1#5 +- Union :- Project col1#5, col2#6, null AS a#3, col1#5 +-Project col1#8, col2#9, null AS a#4, col3#10 ``` we end up with duplicate `col1#5` in both the outer project and the inner one. After `ResolveReferences` triggers, we will deduplicate both the inner and outer projects, resulting in an unnecessary project. Instead, by waiting to first deduplicate expr ids in the inner project before continuing resolution, the Project we insert between Unions will not contain duplicate ExprIds and we don't need to add another unnecessary one. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a test case. ### Was this patch authored or co-authored using generative AI tooling? No Closes #51376 from mihailotim-db/mihailotim-db/fix_union_resolved. Authored-by: Mihailo Timotic <mihailo.timo...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../catalyst/analysis/DeduplicateRelations.scala | 2 +- .../spark/sql/catalyst/optimizer/Optimizer.scala | 29 ++++++++++++++-- .../plans/logical/basicLogicalOperators.scala | 11 ++++-- .../org/apache/spark/sql/internal/SQLConf.scala | 12 +++++++ .../query-tests/explain-results/union.explain | 2 +- .../query-tests/explain-results/unionAll.explain | 2 +- .../explain-results/unionByName.explain | 2 +- .../unionByName_allowMissingColumns.explain | 2 +- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 39 ++++++++++++++++++++++ 9 files changed, 92 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index da940c9b8ead..b6181a2d54fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -59,7 +59,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] { case e @ Except(left, right, _) if !e.duplicateResolved && noMissingInput(right) => e.copy(right = dedupRight(left, right)) // Only after we finish by-name resolution for Union - case u: Union if !u.byName && !u.duplicateResolved => + case u: Union if !u.byName && !u.duplicatesResolvedBetweenBranches => val unionWithChildOutputsDeduplicated = DeduplicateUnionChildOutput.deduplicateOutputPerChild(u) // Use projection-based de-duplication for Union to avoid breaking the checkpoint sharing diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2701a79077ce..2ae507c831f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.optimizer +import java.util.HashSet + import scala.collection.mutable import org.apache.spark.SparkException @@ -895,6 +897,24 @@ object LimitPushDown extends Rule[LogicalPlan] { */ object PushProjectionThroughUnion extends Rule[LogicalPlan] { + /** + * When pushing a [[Project]] through [[Union]] we need to maintain the invariant that [[Union]] + * children must have unique [[ExprId]]s per branch. We can safely deduplicate [[ExprId]]s + * without updating any references because those [[ExprId]]s will simply remain unused. + * For example, in a `Project(col1#1, col#1)` we will alias the second `col1` and get + * `Project(col1#1, col1 as col1#2)`. We don't need to update any references to `col1#1` we + * aliased because `col1#1` still exists in [[Project]] output. + */ + private def deduplicateProjectList(projectList: Seq[NamedExpression]) = { + val existingExprIds = new HashSet[ExprId] + projectList.map(attr => if (existingExprIds.contains(attr.exprId)) { + Alias(attr, attr.name)() + } else { + existingExprIds.add(attr.exprId) + attr + }) + } + /** * Maps Attributes from the left side to the corresponding Attribute on the right side. */ @@ -923,10 +943,15 @@ object PushProjectionThroughUnion extends Rule[LogicalPlan] { } def pushProjectionThroughUnion(projectList: Seq[NamedExpression], u: Union): Seq[LogicalPlan] = { - val newFirstChild = Project(projectList, u.children.head) + val deduplicatedProjectList = if (conf.unionIsResolvedWhenDuplicatesPerChildResolved) { + deduplicateProjectList(projectList) + } else { + projectList + } + val newFirstChild = Project(deduplicatedProjectList, u.children.head) val newOtherChildren = u.children.tail.map { child => val rewrites = buildRewrites(u.children.head, child) - Project(projectList.map(pushToRight(_, rewrites)), child) + Project(deduplicatedProjectList.map(pushToRight(_, rewrites)), child) } newFirstChild +: newOtherChildren } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 5215e8a9568b..01e49a3eda81 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -614,13 +614,20 @@ case class Union( Some(sum.toLong) } - def duplicateResolved: Boolean = { + private def duplicatesResolvedPerBranch: Boolean = + children.forall(child => child.outputSet.size == child.output.size) + + def duplicatesResolvedBetweenBranches: Boolean = { children.map(_.outputSet.size).sum == AttributeSet.fromAttributeSets(children.map(_.outputSet)).size } override lazy val resolved: Boolean = { - children.length > 1 && !(byName || allowMissingCol) && childrenResolved && allChildrenCompatible + children.length > 1 && + !(byName || allowMissingCol) && + childrenResolved && + allChildrenCompatible && + (!conf.unionIsResolvedWhenDuplicatesPerChildResolved || duplicatesResolvedPerBranch) } override protected def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): Union = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3f648a91d7d0..1d54d814a358 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -241,6 +241,15 @@ object SQLConf { } } + val UNION_IS_RESOLVED_WHEN_DUPLICATES_PER_CHILD_RESOLVED = + buildConf("spark.sql.analyzer.unionIsResolvedWhenDuplicatesPerChildResolved") + .internal() + .doc( + "When true, union should only be resolved once there are no duplicate attributes in " + + "each branch.") + .booleanConf + .createWithDefault(true) + val ONLY_NECESSARY_AND_UNIQUE_METADATA_COLUMNS = buildConf("spark.sql.analyzer.uniqueNecessaryMetadataColumns") .internal() @@ -6893,6 +6902,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def useNullsForMissingDefaultColumnValues: Boolean = getConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES) + def unionIsResolvedWhenDuplicatesPerChildResolved: Boolean = + getConf(SQLConf.UNION_IS_RESOLVED_WHEN_DUPLICATES_PER_CHILD_RESOLVED) + override def enforceReservedKeywords: Boolean = ansiEnabled && getConf(ENFORCE_RESERVED_KEYWORDS) override def doubleQuotedIdentifiers: Boolean = ansiEnabled && getConf(DOUBLE_QUOTED_IDENTIFIERS) diff --git a/sql/connect/common/src/test/resources/query-tests/explain-results/union.explain b/sql/connect/common/src/test/resources/query-tests/explain-results/union.explain index 4d5d1f53b841..252774510896 100644 --- a/sql/connect/common/src/test/resources/query-tests/explain-results/union.explain +++ b/sql/connect/common/src/test/resources/query-tests/explain-results/union.explain @@ -1,3 +1,3 @@ -Union false, false +'Union false, false :- LocalRelation <empty>, [id#0L, a#0, b#0] +- LocalRelation <empty>, [id#0L, a#0, b#0] diff --git a/sql/connect/common/src/test/resources/query-tests/explain-results/unionAll.explain b/sql/connect/common/src/test/resources/query-tests/explain-results/unionAll.explain index 4d5d1f53b841..252774510896 100644 --- a/sql/connect/common/src/test/resources/query-tests/explain-results/unionAll.explain +++ b/sql/connect/common/src/test/resources/query-tests/explain-results/unionAll.explain @@ -1,3 +1,3 @@ -Union false, false +'Union false, false :- LocalRelation <empty>, [id#0L, a#0, b#0] +- LocalRelation <empty>, [id#0L, a#0, b#0] diff --git a/sql/connect/common/src/test/resources/query-tests/explain-results/unionByName.explain b/sql/connect/common/src/test/resources/query-tests/explain-results/unionByName.explain index 6ec8eb37f50e..2877c7cef0fd 100644 --- a/sql/connect/common/src/test/resources/query-tests/explain-results/unionByName.explain +++ b/sql/connect/common/src/test/resources/query-tests/explain-results/unionByName.explain @@ -1,4 +1,4 @@ -Union false, false +'Union false, false :- Project [id#0L, a#0] : +- LocalRelation <empty>, [id#0L, a#0, b#0] +- Project [id#0L, a#0] diff --git a/sql/connect/common/src/test/resources/query-tests/explain-results/unionByName_allowMissingColumns.explain b/sql/connect/common/src/test/resources/query-tests/explain-results/unionByName_allowMissingColumns.explain index 96bd9f281c15..dc0d1d94f85c 100644 --- a/sql/connect/common/src/test/resources/query-tests/explain-results/unionByName_allowMissingColumns.explain +++ b/sql/connect/common/src/test/resources/query-tests/explain-results/unionByName_allowMissingColumns.explain @@ -1,4 +1,4 @@ -Union false, false +'Union false, false :- Project [id#0L, a#0, b#0, null AS payload#0] : +- LocalRelation <empty>, [id#0L, a#0, b#0] +- Project [id#0L, a#0, null AS b#0, payload#0] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 26aa4b6b5210..f405989520e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -4963,6 +4963,45 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark ) } + test("SPARK-52686: Union should be resolved only if there are no duplicates") { + withTable("t1", "t2", "t3") { + sql("CREATE TABLE t1 (col1 STRING, col2 STRING, col3 STRING)") + sql("CREATE TABLE t2 (col1 STRING, col2 DOUBLE, col3 STRING)") + sql("CREATE TABLE t3 (col1 STRING, col2 DOUBLE, a STRING, col3 STRING)") + + for (confValue <- Seq(false, true)) { + withSQLConf( + SQLConf.UNION_IS_RESOLVED_WHEN_DUPLICATES_PER_CHILD_RESOLVED.key -> confValue.toString + ) { + val analyzedPlan = sql( + """SELECT + | * + |FROM ( + | SELECT col1, col2, NULL AS a, col1 FROM t1 + | UNION + | SELECT col1, col2, NULL AS a, col3 FROM t2 + | UNION + | SELECT * FROM t3 + |)""".stripMargin + ).queryExecution.analyzed + + val projectCount = analyzedPlan.collect { + case project: Project => project + }.size + + // When UNION_IS_RESOLVED_WHEN_DUPLICATES_PER_CHILD_RESOLVED is disabled, we resolve + // outer Union before deduplicating ExprIds in inner union. Because of this we get an + // additional unnecessary Project (see SPARK-52686). + if (confValue) { + assert(projectCount == 7) + } else { + assert(projectCount == 8) + } + } + } + } + } + Seq(true, false).foreach { codegenEnabled => test(s"SPARK-52060: one row relation with codegen enabled - $codegenEnabled") { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegenEnabled.toString) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org