This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new af1615d026e [SPARK-45036][FOLLOWUP][SQL] SPJ: Make sure result
partitions are sorted according to partition values
af1615d026e is described below
commit af1615d026eaf4aeec27ccfe3c58011ebbcb3de1
Author: Chao Sun <[email protected]>
AuthorDate: Thu Sep 7 18:19:54 2023 +0800
[SPARK-45036][FOLLOWUP][SQL] SPJ: Make sure result partitions are sorted
according to partition values
### What changes were proposed in this pull request?
This PR makes sure the result grouped partitions from
`DataSourceV2ScanExec#groupPartitions` are sorted according to the partition
values. Previously in the #42757 we were assuming Scala would preserve the
input ordering but apparently that's not the case.
### Why are the changes needed?
See https://github.com/apache/spark/pull/42757#discussion_r1316926504 for
diagnosis. The partition ordering is a fundamental property for SPJ and thus
must be guaranteed.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
We have tests in `KeyGroupedPartitioningSuite` to cover this.
### Was this patch authored or co-authored using generative AI tooling?
Closes #42839 from sunchao/SPARK-45036-followup.
Authored-by: Chao Sun <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../execution/datasources/v2/DataSourceV2ScanExecBase.scala | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
index 94667fbd00c..b2f94cae2df 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
@@ -143,17 +143,16 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
// also sort the input partitions according to their partition key
order. This ensures
// a canonical order from both sides of a bucketed join, for example.
val partitionDataTypes = expressions.map(_.dataType)
- val partitionOrdering: Ordering[(InternalRow, InputPartition)] = {
-
RowOrdering.createNaturalAscendingOrdering(partitionDataTypes).on(_._1)
- }
- val sortedKeyToPartitions = results.sorted(partitionOrdering)
- val groupedPartitions = sortedKeyToPartitions
+ val rowOrdering =
RowOrdering.createNaturalAscendingOrdering(partitionDataTypes)
+ val sortedKeyToPartitions = results.sorted(rowOrdering.on((t:
(InternalRow, _)) => t._1))
+ val sortedGroupedPartitions = sortedKeyToPartitions
.map(t => (InternalRowComparableWrapper(t._1, expressions), t._2))
.groupBy(_._1)
.toSeq
.map { case (key, s) => KeyGroupedPartition(key.row, s.map(_._2)) }
+ .sorted(rowOrdering.on((k: KeyGroupedPartition) => k.value))
- Some(KeyGroupedPartitionInfo(groupedPartitions,
sortedKeyToPartitions.map(_._2)))
+ Some(KeyGroupedPartitionInfo(sortedGroupedPartitions,
sortedKeyToPartitions.map(_._2)))
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]