This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new c796ebb9ca72 [SPARK-51109][SQL] CTE in subquery expression as grouping
column
c796ebb9ca72 is described below
commit c796ebb9ca722761f0f26a056fd4f80237d4c232
Author: Wenchen Fan <[email protected]>
AuthorDate: Mon Feb 10 11:00:47 2025 +0800
[SPARK-51109][SQL] CTE in subquery expression as grouping column
### What changes were proposed in this pull request?
This is a long-standing problem. With the GROUP BY ordinal feature, it's
quite easy for users to write a complicated expression as the GROUP BY
expression and also put it in the SELECT list. It's usually OK as the
complicated expressions in the GROUP BY expression and SELECT list remain the
same, but problems may occur with subquery expressions, duplicated relations,
and CTE inline. Let's look at this example:
```
CREATE VIEW v AS
WITH r AS (SELECT c1 + c2 AS c FROM t)
SELECT * FROM r;
SELECT (SELECT max(c) FROM v WHERE c > id) FROM range(1) GROUP BY 1;
```
A scalar subquery appears in both the GROUP BY expression and SELECT list.
The scalar subquery scans table `t`, and because this scalar subquery appears
twice, `DeduplicateRelations` will trigger. This makes the output attributes of
CTE def and ref out of sync in the second scalar subquery and `InlineCTE` will
add a cosmetic `Project` to adjust the output attr ids. `CheckAnalysis` will
inline CTE in the beginning, and this extra cosmetic `Project` in the second
scalar subquery makes S [...]
The proposal here is to remove cosmetic Projects during plan
canonicalization.
### Why are the changes needed?
bug fix
### Does this PR introduce _any_ user-facing change?
Yes, some queries fail before and work now.
### How was this patch tested?
new test
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #49829 from cloud-fan/cte.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit c7edcaed290ee5182f089fce8dc1cc46b72ea76f)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../plans/logical/basicLogicalOperators.scala | 15 ++++++++++++++
.../optimizer/MergeScalarSubqueriesSuite.scala | 12 ++---------
.../org/apache/spark/sql/CTEInlineSuite.scala | 24 +++++++++++++++++++---
.../command/AlterTableDropPartitionSuiteBase.scala | 13 ++++++------
.../AlterTableRenamePartitionSuiteBase.scala | 13 ++++++------
.../execution/command/TruncateTableSuiteBase.scala | 13 ++++++------
.../command/v1/AlterTableAddPartitionSuite.scala | 13 ++++++------
.../command/v2/AlterTableAddPartitionSuite.scala | 13 ++++++------
8 files changed, 73 insertions(+), 43 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 5e43e6603278..fb1999148d60 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -89,6 +89,21 @@ case class Project(projectList: Seq[NamedExpression], child:
LogicalPlan)
expressions.forall(_.resolved) && childrenResolved &&
!hasSpecialExpressions
}
+ override protected def doCanonicalize(): LogicalPlan = {
+ // During canonicalization, the name and exprId of Alias and Attributes
will be
+ // erased and normalized. If the Project only changes name and exprId,
then it
+ // can be striped as it doesn't change the semantic.
+ val noSemanticChange = projectList.length == child.output.length &&
+ projectList.zip(child.output).forall {
+ case (alias: Alias, attr) =>
+ alias.child.semanticEquals(attr) && alias.explicitMetadata.isEmpty &&
+ alias.qualifier.isEmpty && alias.nonInheritableMetadataKeys.isEmpty
+ case (attr1: Attribute, attr2) => attr1.semanticEquals(attr2)
+ case _ => false
+ }
+ if (noSemanticChange) child.canonicalized else super.doCanonicalize()
+ }
+
override lazy val validConstraints: ExpressionSet =
getAllValidConstraints(projectList)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala
index b3444b0b4307..636a280bce00 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala
@@ -346,11 +346,7 @@ class MergeScalarSubqueriesSuite extends PlanTest {
subquery2)
val mergedSubquery = testRelation.as("t1")
- .select(Symbol("a"), Symbol("b"), Symbol("c"))
- .join(
- testRelation.as("t2").select(Symbol("a"), Symbol("b"), Symbol("c")),
- Inner,
- Some($"t1.b" === $"t2.b"))
+ .join(testRelation.as("t2"), Inner, Some($"t1.b" === $"t2.b"))
.select($"t1.a", $"t2.c")
.select(CreateNamedStruct(Seq(
Literal("a"), Symbol("a"),
@@ -387,11 +383,7 @@ class MergeScalarSubqueriesSuite extends PlanTest {
subquery2)
val mergedSubquery = testRelation.as("t1")
- .select(Symbol("a"), Symbol("b"), Symbol("c"))
- .join(
- testRelation.as("t2").select(Symbol("a"), Symbol("b"), Symbol("c")),
- Inner,
- Some($"t1.b" < $"t2.b" && $"t1.a" === $"t2.c"))
+ .join(testRelation.as("t2"), Inner, Some($"t1.b" < $"t2.b" && $"t1.a"
=== $"t2.c"))
.select($"t1.a", $"t2.c")
.select(CreateNamedStruct(Seq(
Literal("a"), Symbol("a"),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index e8b9ffe28494..9bddaf3baab0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -333,14 +333,14 @@ abstract class CTEInlineSuiteBase
test("CTE Predicate push-down and column pruning - combined predicate") {
withTempView("t") {
- Seq((0, 1, 2), (1, 2, 3)).toDF("c1", "c2",
"c3").createOrReplaceTempView("t")
+ Seq((0, 1, 2, 3), (1, 2, 3, 4)).toDF("c1", "c2", "c3",
"c4").createOrReplaceTempView("t")
val df = sql(
s"""with
|v as (
- | select c1, c2, c3, rand() c4 from t
+ | select c1, c2, c3, c4, rand() c5 from t
|),
|vv as (
- | select v1.c1, v1.c2, rand() c5 from v v1, v v2
+ | select v1.c1, v1.c2, rand() c6 from v v1, v v2
| where v1.c1 > 0 and v2.c3 < 5 and v1.c2 = v2.c2
|)
|select vv1.c1, vv1.c2, vv2.c1, vv2.c2 from vv vv1, vv vv2
@@ -816,6 +816,24 @@ abstract class CTEInlineSuiteBase
val inlined = InlineCTE().apply(query)
assert(!inlined.exists(_.isInstanceOf[WithCTE]))
}
+
+ test("SPARK-51109: CTE in subquery expression as grouping column") {
+ withTable("t") {
+ Seq(1 -> 1).toDF("c1", "c2").write.saveAsTable("t")
+ withView("v") {
+ sql(
+ """
+ |CREATE VIEW v AS
+ |WITH r AS (SELECT c1 + c2 AS c FROM t)
+ |SELECT * FROM r
+ |""".stripMargin)
+ checkAnswer(
+ sql("SELECT (SELECT max(c) FROM v WHERE c > id) FROM range(1) GROUP
BY 1"),
+ Row(2)
+ )
+ }
+ }
+ }
}
class CTEInlineSuiteAEOff extends CTEInlineSuiteBase with
DisableAdaptiveExecutionSuite
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
index a49a94174195..fb6c93e9d32b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
@@ -234,25 +234,26 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest
with DDLCommandTestUtil
checkCachedRelation(t, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3, 3)))
withView("v0") {
- sql(s"CREATE VIEW v0 AS SELECT * FROM $t")
+ // Add a dummy column so that this view is semantically different from
raw table scan.
+ sql(s"CREATE VIEW v0 AS SELECT *, 'a' FROM $t")
cacheRelation("v0")
sql(s"ALTER TABLE $t DROP PARTITION (part=1)")
- checkCachedRelation("v0", Seq(Row(0, 0), Row(2, 2), Row(3, 3)))
+ checkCachedRelation("v0", Seq(Row(0, 0, "a"), Row(2, 2, "a"), Row(3,
3, "a")))
}
withTempView("v1") {
- sql(s"CREATE TEMP VIEW v1 AS SELECT * FROM $t")
+ sql(s"CREATE TEMP VIEW v1 AS SELECT *, 'a' FROM $t")
cacheRelation("v1")
sql(s"ALTER TABLE $t DROP PARTITION (part=2)")
- checkCachedRelation("v1", Seq(Row(0, 0), Row(3, 3)))
+ checkCachedRelation("v1", Seq(Row(0, 0, "a"), Row(3, 3, "a")))
}
val v2 = s"${spark.sharedState.globalTempDB}.v2"
withGlobalTempView("v2") {
- sql(s"CREATE GLOBAL TEMP VIEW v2 AS SELECT * FROM $t")
+ sql(s"CREATE GLOBAL TEMP VIEW v2 AS SELECT *, 'a' FROM $t")
cacheRelation(v2)
sql(s"ALTER TABLE $t DROP PARTITION (part=3)")
- checkCachedRelation(v2, Seq(Row(0, 0)))
+ checkCachedRelation(v2, Seq(Row(0, 0, "a")))
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableRenamePartitionSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableRenamePartitionSuiteBase.scala
index 186f2b293ea8..353253537c4b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableRenamePartitionSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableRenamePartitionSuiteBase.scala
@@ -220,25 +220,26 @@ trait AlterTableRenamePartitionSuiteBase extends
QueryTest with DDLCommandTestUt
checkCachedRelation(t, Seq(Row(0, 0), Row(1, 1)))
withView("v0") {
- sql(s"CREATE VIEW v0 AS SELECT * FROM $t")
+ // Add a dummy column so that this view is semantically different from
raw table scan.
+ sql(s"CREATE VIEW v0 AS SELECT *, 'a' FROM $t")
cacheRelation("v0")
sql(s"ALTER TABLE $t PARTITION (part=0) RENAME TO PARTITION (part=2)")
- checkCachedRelation("v0", Seq(Row(0, 2), Row(1, 1)))
+ checkCachedRelation("v0", Seq(Row(0, 2, "a"), Row(1, 1, "a")))
}
withTempView("v1") {
- sql(s"CREATE TEMP VIEW v1 AS SELECT * FROM $t")
+ sql(s"CREATE TEMP VIEW v1 AS SELECT *, 'a' FROM $t")
cacheRelation("v1")
sql(s"ALTER TABLE $t PARTITION (part=1) RENAME TO PARTITION (part=3)")
- checkCachedRelation("v1", Seq(Row(0, 2), Row(1, 3)))
+ checkCachedRelation("v1", Seq(Row(0, 2, "a"), Row(1, 3, "a")))
}
val v2 = s"${spark.sharedState.globalTempDB}.v2"
withGlobalTempView("v2") {
- sql(s"CREATE GLOBAL TEMP VIEW v2 AS SELECT * FROM $t")
+ sql(s"CREATE GLOBAL TEMP VIEW v2 AS SELECT *, 'a' FROM $t")
cacheRelation(v2)
sql(s"ALTER TABLE $t PARTITION (part=2) RENAME TO PARTITION (part=4)")
- checkCachedRelation(v2, Seq(Row(0, 4), Row(1, 3)))
+ checkCachedRelation(v2, Seq(Row(0, 4, "a"), Row(1, 3, "a")))
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TruncateTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TruncateTableSuiteBase.scala
index b61065f41c5e..e9d5fe1e3fb1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TruncateTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TruncateTableSuiteBase.scala
@@ -271,26 +271,27 @@ trait TruncateTableSuiteBase extends QueryTest with
DDLCommandTestUtils {
Seq(Row(0, 0, 0), Row(1, 1, 1), Row(1, 2, 3)))
withView("v0") {
- sql(s"CREATE VIEW v0 AS SELECT * FROM $t")
+ // Add a dummy column so that this view is semantically different from
raw table scan.
+ sql(s"CREATE VIEW v0 AS SELECT *, 'a' FROM $t")
cacheRelation("v0")
sql(s"TRUNCATE TABLE $t PARTITION (width = 1, length = 2)")
- checkCachedRelation("v0", Seq(Row(0, 0, 0), Row(1, 1, 1)))
+ checkCachedRelation("v0", Seq(Row(0, 0, 0, "a"), Row(1, 1, 1, "a")))
}
withTempView("v1") {
- sql(s"CREATE TEMP VIEW v1 AS SELECT * FROM $t")
+ sql(s"CREATE TEMP VIEW v1 AS SELECT *, 'a' FROM $t")
cacheRelation("v1")
sql(s"TRUNCATE TABLE $t PARTITION (width = 1, length = 1)")
- checkCachedRelation("v1", Seq(Row(0, 0, 0)))
+ checkCachedRelation("v1", Seq(Row(0, 0, 0, "a")))
}
val v2 = s"${spark.sharedState.globalTempDB}.v2"
withGlobalTempView("v2") {
sql(s"INSERT INTO $t PARTITION (width = 10, length = 10) SELECT 10")
- sql(s"CREATE GLOBAL TEMP VIEW v2 AS SELECT * FROM $t")
+ sql(s"CREATE GLOBAL TEMP VIEW v2 AS SELECT *, 'a' FROM $t")
cacheRelation(v2)
sql(s"TRUNCATE TABLE $t PARTITION (width = 10, length = 10)")
- checkCachedRelation(v2, Seq(Row(0, 0, 0)))
+ checkCachedRelation(v2, Seq(Row(0, 0, 0, "a")))
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala
index fea0d07278c1..47ec09245130 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala
@@ -118,28 +118,29 @@ trait AlterTableAddPartitionSuiteBase extends
command.AlterTableAddPartitionSuit
checkCachedRelation(t, Seq(Row(0, 0)))
withView("v0") {
- sql(s"CREATE VIEW v0 AS SELECT * FROM $t")
+ // Add a dummy column so that this view is semantically different from
raw table scan.
+ sql(s"CREATE VIEW v0 AS SELECT *, 'a' FROM $t")
cacheRelation("v0")
val part1Loc = copyPartition(t, "part=0", "part=1")
sql(s"ALTER TABLE $t ADD PARTITION (part=1) LOCATION '$part1Loc'")
- checkCachedRelation("v0", Seq(Row(0, 0), Row(0, 1)))
+ checkCachedRelation("v0", Seq(Row(0, 0, "a"), Row(0, 1, "a")))
}
withTempView("v1") {
- sql(s"CREATE TEMP VIEW v1 AS SELECT * FROM $t")
+ sql(s"CREATE TEMP VIEW v1 AS SELECT *, 'a' FROM $t")
cacheRelation("v1")
val part2Loc = copyPartition(t, "part=0", "part=2")
sql(s"ALTER TABLE $t ADD PARTITION (part=2) LOCATION '$part2Loc'")
- checkCachedRelation("v1", Seq(Row(0, 0), Row(0, 1), Row(0, 2)))
+ checkCachedRelation("v1", Seq(Row(0, 0, "a"), Row(0, 1, "a"), Row(0,
2, "a")))
}
val v2 = s"${spark.sharedState.globalTempDB}.v2"
withGlobalTempView("v2") {
- sql(s"CREATE GLOBAL TEMP VIEW v2 AS SELECT * FROM $t")
+ sql(s"CREATE GLOBAL TEMP VIEW v2 AS SELECT *, 'a' FROM $t")
cacheRelation(v2)
val part3Loc = copyPartition(t, "part=0", "part=3")
sql(s"ALTER TABLE $t ADD PARTITION (part=3) LOCATION '$part3Loc'")
- checkCachedRelation(v2, Seq(Row(0, 0), Row(0, 1), Row(0, 2), Row(0,
3)))
+ checkCachedRelation(v2, Seq(Row(0, 0, "a"), Row(0, 1, "a"), Row(0, 2,
"a"), Row(0, 3, "a")))
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
index e3b6a9b5e610..e7a2b194fa0b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
@@ -92,25 +92,26 @@ class AlterTableAddPartitionSuite
checkCachedRelation(t, Seq(Row(0, 0)))
withView("v0") {
- sql(s"CREATE VIEW v0 AS SELECT * FROM $t")
+ // Add a dummy column so that this view is semantically different from
raw table scan.
+ sql(s"CREATE VIEW v0 AS SELECT *, 'a' FROM $t")
cacheRelation("v0")
sql(s"ALTER TABLE $t ADD PARTITION (id=0, part=1)")
- checkCachedRelation("v0", Seq(Row(0, 0), Row(0, 1)))
+ checkCachedRelation("v0", Seq(Row(0, 0, "a"), Row(0, 1, "a")))
}
withTempView("v1") {
- sql(s"CREATE TEMP VIEW v1 AS SELECT * FROM $t")
+ sql(s"CREATE TEMP VIEW v1 AS SELECT *, 'a' FROM $t")
cacheRelation("v1")
sql(s"ALTER TABLE $t ADD PARTITION (id=1, part=2)")
- checkCachedRelation("v1", Seq(Row(0, 0), Row(0, 1), Row(1, 2)))
+ checkCachedRelation("v1", Seq(Row(0, 0, "a"), Row(0, 1, "a"), Row(1,
2, "a")))
}
val v2 = s"${spark.sharedState.globalTempDB}.v2"
withGlobalTempView(v2) {
- sql(s"CREATE GLOBAL TEMP VIEW v2 AS SELECT * FROM $t")
+ sql(s"CREATE GLOBAL TEMP VIEW v2 AS SELECT *, 'a' FROM $t")
cacheRelation(v2)
sql(s"ALTER TABLE $t ADD PARTITION (id=2, part=3)")
- checkCachedRelation(v2, Seq(Row(0, 0), Row(0, 1), Row(1, 2), Row(2,
3)))
+ checkCachedRelation(v2, Seq(Row(0, 0, "a"), Row(0, 1, "a"), Row(1, 2,
"a"), Row(2, 3, "a")))
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]