This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new e8045fb [SPARK-28344][SQL][FOLLOW-UP] Check the ambiguous self-join
only if there is a join in the plan
e8045fb is described below
commit e8045fb99cc463889aebcda1c8c1daa2042f4319
Author: HyukjinKwon <[email protected]>
AuthorDate: Mon Jun 1 16:31:39 2020 -0700
[SPARK-28344][SQL][FOLLOW-UP] Check the ambiguous self-join only if there
is a join in the plan
### What changes were proposed in this pull request?
This PR proposes to check `DetectAmbiguousSelfJoin` only if there is `Join`
in the plan. Currently, the checking is too strict even to non-join queries.
For example, the codes below don't have join at all but it fails as the
ambiguous self-join:
```scala
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.sum
val df = Seq(1, 1, 2, 2).toDF("A")
val w = Window.partitionBy(df("A"))
df.select(df("A").alias("X"), sum(df("A")).over(w)).explain(true)
```
It is because `ExtractWindowExpressions` can create a `AttributeReference`
with the same metadata but a different expression ID, see:
https://github.com/apache/spark/blob/0fd98abd859049dc3b200492487041eeeaa8f737/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2679
https://github.com/apache/spark/blob/71c73d58f6e88d2558ed2e696897767d93bac60f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L63
https://github.com/apache/spark/blob/5945d46c11a86fd85f9e65f24c2e88f368eee01f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala#L180
Before:
```
'Project [A#19 AS X#21, sum(A#19) windowspecdefinition(A#19,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS sum(A) OVER (PARTITION BY A unspecifiedframe$())#23L]
+- Relation[A#19] parquet
```
After:
```
Project [X#21, sum(A) OVER (PARTITION BY A unspecifiedframe$())#23L]
+- Project [X#21, A#19, sum(A) OVER (PARTITION BY A
unspecifiedframe$())#23L, sum(A) OVER (PARTITION BY A unspecifiedframe$())#23L]
+- Window [sum(A#19) windowspecdefinition(A#19,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS sum(A) OVER (PARTITION BY A unspecifiedframe$())#23L], [A#19]
+- Project [A#19 AS X#21, A#19]
+- Relation[A#19] parquet
```
`X#21` holds the same metadata of DataFrame ID and column position with
`A#19` but it has a different expression ID which ends up with the checking
fails.
### Why are the changes needed?
To loose the checking and make users not surprised.
### Does this PR introduce _any_ user-facing change?
It's the changes in unreleased branches only.
### How was this patch tested?
Manually tested and unittest was added.
Closes #28695 from HyukjinKwon/SPARK-28344-followup.
Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit ea45fc51921e64302b9220b264156bb4f757fe01)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/execution/analysis/DetectAmbiguousSelfJoin.scala | 6 ++++++
.../scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala | 12 ++++++++++++
2 files changed, 18 insertions(+)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
index 614d6c2..136f7c4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
@@ -76,6 +76,8 @@ class DetectAmbiguousSelfJoin(conf: SQLConf) extends
Rule[LogicalPlan] {
// We always remove the special metadata from `AttributeReference` at the
end of this rule, so
// Dataset column reference only exists in the root node via Dataset
transformations like
// `Dataset#select`.
+ if (plan.find(_.isInstanceOf[Join]).isEmpty) return
stripColumnReferenceMetadataInPlan(plan)
+
val colRefAttrs = plan.expressions.flatMap(_.collect {
case a: AttributeReference if isColumnReference(a) => a
})
@@ -153,6 +155,10 @@ class DetectAmbiguousSelfJoin(conf: SQLConf) extends
Rule[LogicalPlan] {
}
}
+ stripColumnReferenceMetadataInPlan(plan)
+ }
+
+ private def stripColumnReferenceMetadataInPlan(plan: LogicalPlan):
LogicalPlan = {
plan.transformExpressions {
case a: AttributeReference if isColumnReference(a) =>
// Remove the special metadata from this `AttributeReference`, as the
detection is done.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala
index 250ec7d..fb58c98 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql
+import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{count, sum}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -202,4 +203,15 @@ class DataFrameSelfJoinSuite extends QueryTest with
SharedSparkSession {
assertAmbiguousSelfJoin(df1.join(df4).join(df2).select(df2("id")))
}
}
+
+ test("SPARK-28344: don't fail as ambiguous self join when there is no join")
{
+ withSQLConf(
+ SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true") {
+ val df = Seq(1, 1, 2, 2).toDF("a")
+ val w = Window.partitionBy(df("a"))
+ checkAnswer(
+ df.select(df("a").alias("x"), sum(df("a")).over(w)),
+ Seq((1, 2), (1, 2), (2, 4), (2, 4)).map(Row.fromTuple))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]