andygrove opened a new pull request, #4207:
URL: https://github.com/apache/datafusion-comet/pull/4207
## Which issue does this PR close?
Closes #4122.
## Rationale for this change
On Spark 4.1, SPARK-52921 added `UNION_OUTPUT_PARTITIONING`: when all
children of a `UnionExec` share the same hash/single partitioning, the union
itself reports that same partitioning. Downstream operators (e.g. a final hash
aggregate) then skip an otherwise-required shuffle, and Spark's row-based
`UnionExec.doExecute` keeps the partitioning invariant by routing through
`SQLPartitioningAwareUnionRDD` (each output partition unions partition *i* from
every child).
`CometUnionExec` silently broke both halves of that contract:
- `doExecuteColumnar` used `sparkContext.union(...)`, which concatenates
partitions — partition *i* of the output only holds partition *i* of a single
child.
- `outputPartitioning` delegated to the frozen `originalPlan` snapshot
captured at `CometExecRule` time, so AQE's post-stage coalescing was invisible.
The result: `EXCEPT ALL` / `INTERSECT ALL` whose sides are themselves `GROUP
BY` aggregates lost rows silently (e.g. `EXCEPT ALL` returning `{2, 3}` instead
of `{3}`). Two Spark 4.1.1 `SQLQueryTestSuite` files (`except-all.sql`,
`intersect-all.sql`) were disabled for Comet because of this.
## What changes are included in this PR?
- Override `CometUnionExec.outputPartitioning` to recompute from the live
`children` rather than `originalPlan`.
- Route `doExecuteColumnar` through a new `ShimCometUnionExec.unionRDDs`
helper that uses `SQLPartitioningAwareUnionRDD` on Spark 4.1+ when a known
partitioning is declared (with a partition-count sanity check and a safe
fallback to plain concat), and retains `sparkContext.union` behavior on pre-4.1
Spark where `UnionExec.outputPartitioning` is always `UnknownPartitioning`.
- Add `CometSetOpWithGroupBySuite` covering the two queries from the Spark
SQL tests.
- Remove the `spark.comet.enabled = false` guards at the top of
`except-all.sql` and `intersect-all.sql` in `dev/diffs/4.1.1.diff`.
## How are these changes tested?
- New `CometSetOpWithGroupBySuite` passes on Spark 3.5 and Spark 4.1.1
profiles.
- Existing `CometExecSuite` (246 tests) passes on Spark 3.5.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]