zhuqi-lucas opened a new issue, #21973:
URL: https://github.com/apache/datafusion/issues/21973
## Summary
`EnforceDistribution` and `EnforceSorting` are currently implemented as two
separate physical optimizer rules that run independently. This design is unique
among major query engines -- Spark (`EnsureRequirements`), Presto/Trino
(`AddExchanges`), and others all handle distribution and sorting in a **single
combined rule**. The separation in DataFusion leads to a class of correctness
bugs where one rule undoes the invariants established by the other, because
sorting and distribution are fundamentally coupled (`preserve_partitioning` on
`SortExec` directly affects output partition count).
This epic tracks the work to merge these two rules into a single
`EnsureRequirements`-style rule that handles both distribution and sorting in
one pass, eliminating the non-idempotent composition and the recurring bugs it
causes.
## Motivation
### The fundamental coupling
`SortExec` has a `preserve_partitioning` flag that determines whether it
outputs one partition (merging all inputs) or N partitions (sorting each
independently). This means **every sorting decision is also a distribution
decision**, and vice versa. Handling them in separate rules creates a semantic
gap where each rule makes locally correct decisions that are globally incorrect.
### Non-idempotent composition
Running `EnforceDistribution` followed by `EnforceSorting` does not produce
a stable plan. Running the pair again can produce a different (and sometimes
invalid) plan:
```
Round 1: EnforceDistribution -> fixes distribution
EnforceSorting -> pushdown_sorts breaks distribution
Round 2: EnforceDistribution -> fixes the NEW distribution violation
EnforceSorting -> pushdown_sorts breaks it AGAIN (different
location)
```
This is particularly problematic for downstream projects that run custom
optimizer rules between or after these passes (e.g., remote execution,
materialized view selection), which necessitate additional rounds of
`EnforceSorting`.
### Real-world impact
We maintain a production system (Polygon.io Atlas) serving financial market
data APIs built on DataFusion. The separation of these rules has caused
**multiple production incidents** over the past months:
1. **`SanityCheckPlan` failures on multi-partition `StorageExec` +
`GlobalLimitExec`** (April 2026): `EnforceSorting`'s `pushdown_sorts` pushed a
`SortExec` through an intermediate node onto a 32-partition input, setting
`preserve_partitioning=true` without inserting `SortPreservingMergeExec`.
`GlobalLimitExec` requires `SinglePartition` -> 502 errors for specific API
users. Root cause: `pushdown_sorts` has no knowledge of distribution
requirements.
2. **`InterleaveExec::with_new_children` panics** (#21826, April 2026):
Running `EnforceDistribution` twice (which our custom optimizer chain does)
causes `InterleaveExec` created in the first pass to panic when the second pass
changes children's partitioning.
3. **Planning time explosion with materialized views** (April 2026):
`EnforceDistribution`'s `adjust_input_keys_ordering` returns `Transformed::yes`
unconditionally (#21946), causing unnecessary plan tree rebuilds that trigger
expensive cost re-evaluation in `OneOfExec` (materialized view candidate
selection). Each `EnforceSorting` + `EnforceDistribution` round compounds this
cost.
### Existing upstream issues (same root cause)
- #14150: "Bug: applying multiple times `EnforceDistribution` generates
invalid plan" -- running `EnforceDistribution` twice loses limit `fetch`
values, producing wrong results.
- #18989: "Sanity check failed when sort and aggregate on a
multi-partitioned table" -- `EnforceDistribution` fails to inject necessary
`RepartitionExec` between aggregate operations.
- #16888: "Window aggregates output order broken due to hash repartitioning"
-- `EnforceDistribution` breaks ordering established by `EnforceSorting`.
- #21826: "`InterleaveExec::with_new_children` panics when optimizer
rewrites change children's partitioning" -- second `EnforceDistribution` pass
panics on plan created by first pass.
All of these share the same root cause: **two separate rules making
independent decisions about coupled concerns**.
## How other engines solve this
### Apache Spark: `EnsureRequirements`
Spark handles both in a [single
rule](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala)
using `transformUp`:
```scala
// For each operator, in a single pass:
// 1. Check requiredChildDistribution -> add ShuffleExchangeExec if needed
// 2. Check requiredChildOrdering -> add SortExec if needed
// Distribution is always resolved before sorting for the same operator.
```
**Key design**: sorting decisions are made AFTER distribution is finalized
for each operator. `SortExec(global=false)` preserves partition boundaries. No
separate "sort pushdown" pass exists.
### Presto/Trino: `AddExchanges`
Presto's `AddExchanges` rule similarly handles both distribution and sorting
properties in a single rule, using a `PreferredProperties` structure that
carries both distribution and ordering preferences through the plan tree.
## Proposed approach
### Phase 1: Make `pushdown_sorts` distribution-aware (short-term fix)
Add a `distribution_requirement` field to `ParentRequirements` in
`pushdown_sorts`, so that `add_sort_above` knows when to insert
`SortPreservingMergeExec`. This is a targeted fix for the most critical bug.
**Status**: Implemented in our fork ([PR
#54](https://github.com/massive-com/arrow-datafusion/pull/54)), ready to port
upstream.
### Phase 2: Add idempotency tests (validation)
Add tests that verify `EnforceDistribution -> EnforceSorting` produces a
stable plan:
```rust
fn assert_idempotent(plan: Arc<dyn ExecutionPlan>) {
let p1 = EnforceDistribution::optimize(plan)?;
let p1 = EnforceSorting::optimize(p1)?;
let p2 = EnforceDistribution::optimize(p1.clone())?;
let p2 = EnforceSorting::optimize(p2)?;
assert_eq!(display(p1), display(p2));
}
```
Test against various plan topologies: multi-partition sorts with limits,
unions with mixed partition counts, projections over multi-partition sources,
window functions, etc.
### Phase 3: Merge into `EnsureRequirements` (architectural fix)
Create a new `EnsureRequirements` rule that replaces both
`EnforceDistribution` and `EnforceSorting`:
```rust
pub struct EnsureRequirements;
impl PhysicalOptimizerRule for EnsureRequirements {
fn optimize(&self, plan: Arc<dyn ExecutionPlan>, config: &ConfigOptions)
-> Result<Arc<dyn ExecutionPlan>> {
// Single bottom-up pass:
// For each operator, ensure children satisfy both
// requiredChildDistribution AND requiredChildOrdering.
// Distribution is resolved before ordering for each operator.
plan.transform_up(|node| ensure_requirements(node, config))
}
}
struct Requirements {
distribution: Distribution,
ordering: Option<OrderingRequirements>,
fetch: Option<usize>,
}
fn ensure_requirements(node: PlanContext<Requirements>) ->
Result<Transformed<...>> {
for (child, required_dist, required_ordering) in zip(children,
distributions, orderings) {
// Step 1: Ensure distribution
if !child.output_partitioning().satisfies(&required_dist) {
child = add_exchange(child, required_dist);
}
// Step 2: Ensure ordering (distribution is already correct)
if !child.output_ordering().satisfies(&required_ordering) {
child = add_sort(child, required_ordering, required_dist); //
dist-aware!
}
}
}
```
**Key properties**:
- **Single pass**: No separate `pushdown_sorts` that can undo distribution
work
- **Distribution before sorting**: For each operator, distribution is
settled before sorting decisions, like Spark
- **Naturally idempotent**: Running it twice produces the same plan because
each operator's children are already correct after the first pass
- **Sort pushdown integrated**: Instead of a separate top-down pass, sort
pushdown is handled by the bottom-up pass recognizing when a child already
satisfies ordering (no sort needed)
### Migration path
1. `EnsureRequirements` can coexist with the old rules during development
2. Add a feature flag to switch between old and new behavior
3. Validate with the full DataFusion test suite + sqllogictest
4. Deprecate `EnforceDistribution` + `EnforceSorting` after stabilization
## Sub-tasks
- [ ] Phase 1: Port `pushdown_sorts` distribution fix upstream (from our
fork PR #54)
- [ ] Phase 1: Port `ensure_sorting` distribution fix upstream (from our
fork PR #53)
- [ ] Phase 2: Add idempotency test framework for physical optimizer rules
- [ ] Phase 2: Add idempotency tests for `EnforceDistribution` +
`EnforceSorting` composition
- [ ] Phase 3: Design `EnsureRequirements` API and `Requirements` structure
- [ ] Phase 3: Implement combined distribution + ordering enforcement in
single bottom-up pass
- [ ] Phase 3: Integrate sort pushdown into the bottom-up pass
- [ ] Phase 3: Handle `parallelize_sorts` and
`replace_with_order_preserving_variants` optimizations
- [ ] Phase 3: Migrate `OutputRequirements` (add/remove) into the new rule
- [ ] Phase 3: Feature flag and migration path
- [ ] Phase 3: Validate against full test suite
## References
- Spark `EnsureRequirements`:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
- Presto `AddExchanges`:
https://github.com/prestodb/presto/wiki/New-Optimizer
- #14150: Multiple `EnforceDistribution` generates invalid plan
- #18989: Sanity check failed with sort + aggregate on multi-partitioned
table
- #16888: Window aggregates output order broken due to hash repartitioning
- #21826: `InterleaveExec::with_new_children` panics from optimizer rewrites
- #21946: `adjust_input_keys_ordering` returns `Transformed::yes`
unconditionally
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]