This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cb2f47bf7ec1 [SPARK-46485][SQL] V1Write should not add Sort when not
needed
cb2f47bf7ec1 is described below
commit cb2f47bf7ec14edccee0d7a1ad2b12a47f1fa7d2
Author: Wenchen Fan <[email protected]>
AuthorDate: Sat Dec 23 00:08:33 2023 +0800
[SPARK-46485][SQL] V1Write should not add Sort when not needed
### What changes were proposed in this pull request?
In `V1Writes`, we try to avoid adding Sort if the output ordering always
satisfies. However, the code is completely broken with two issues:
- we put `SortOrder` as the child of another `SortOrder` and compare, which
always returns false.
- once we add a project to do `empty2null`, we change the query output
attribute id and the sort order never matches.
It's not a big issue as we still have QO rules to eliminate useless sorts,
but https://github.com/apache/spark/pull/44429 exposes this problem because the
way we optimize sort is a bit different. For `V1Writes`, we should always avoid
adding sort even if the number of ordering key is less, to not change the user
query.
### Why are the changes needed?
fix code mistakes.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
updated test
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #44458 from cloud-fan/sort.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/execution/datasources/V1Writes.scala | 4 +-
.../datasources/V1WriteCommandSuite.scala | 54 ++++++++--------------
2 files changed, 20 insertions(+), 38 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
index b1d2588ede62..d7a8d7aec0b7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
@@ -102,8 +102,8 @@ object V1Writes extends Rule[LogicalPlan] with
SQLConfHelper {
val requiredOrdering = write.requiredOrdering.map(_.transform {
case a: Attribute => attrMap.getOrElse(a, a)
}.asInstanceOf[SortOrder])
- val outputOrdering = query.outputOrdering
- val orderingMatched = isOrderingMatched(requiredOrdering, outputOrdering)
+ val outputOrdering = empty2NullPlan.outputOrdering
+ val orderingMatched = isOrderingMatched(requiredOrdering.map(_.child),
outputOrdering)
if (orderingMatched) {
empty2NullPlan
} else {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
index 3ca516463d36..ce43edb79c12 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
@@ -223,24 +223,15 @@ class V1WriteCommandSuite extends QueryTest with
SharedSparkSession with V1Write
}
// assert the outer most sort in the executed plan
- val sort = plan.collectFirst { case s: SortExec => s }
- if (enabled) {
- // With planned write, optimizer is more efficient and can
eliminate the `SORT BY`.
- assert(sort.exists {
- case SortExec(Seq(
- SortOrder(AttributeReference("key", IntegerType, _, _),
Ascending, NullsFirst, _)
- ), false, _, _) => true
- case _ => false
- }, plan)
- } else {
- assert(sort.exists {
- case SortExec(Seq(
- SortOrder(AttributeReference("key", IntegerType, _, _),
Ascending, NullsFirst, _),
- SortOrder(AttributeReference("value", StringType, _, _),
Ascending, NullsFirst, _)
- ), false, _, _) => true
- case _ => false
- }, plan)
- }
+ assert(plan.collectFirst {
+ case s: SortExec => s
+ }.exists {
+ case SortExec(Seq(
+ SortOrder(AttributeReference("key", IntegerType, _, _),
Ascending, NullsFirst, _),
+ SortOrder(AttributeReference("value", StringType, _, _),
Ascending, NullsFirst, _)
+ ), false, _, _) => true
+ case _ => false
+ }, plan)
}
}
}
@@ -279,24 +270,15 @@ class V1WriteCommandSuite extends QueryTest with
SharedSparkSession with V1Write
}
// assert the outer most sort in the executed plan
- val sort = plan.collectFirst { case s: SortExec => s }
- if (enabled) {
- // With planned write, optimizer is more efficient and can eliminate
the `SORT BY`.
- assert(sort.exists {
- case SortExec(Seq(
- SortOrder(AttributeReference("value", StringType, _, _),
Ascending, NullsFirst, _)
- ), false, _, _) => true
- case _ => false
- }, plan)
- } else {
- assert(sort.exists {
- case SortExec(Seq(
- SortOrder(AttributeReference("value", StringType, _, _),
Ascending, NullsFirst, _),
- SortOrder(AttributeReference("key", IntegerType, _, _),
Ascending, NullsFirst, _)
- ), false, _, _) => true
- case _ => false
- }, plan)
- }
+ assert(plan.collectFirst {
+ case s: SortExec => s
+ }.exists {
+ case SortExec(Seq(
+ SortOrder(AttributeReference("value", StringType, _, _),
Ascending, NullsFirst, _),
+ SortOrder(AttributeReference("key", IntegerType, _, _), Ascending,
NullsFirst, _)
+ ), false, _, _) => true
+ case _ => false
+ }, plan)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]