zhuqi-lucas opened a new issue, #21973:
URL: https://github.com/apache/datafusion/issues/21973

   
   ## Summary
   
   `EnforceDistribution` and `EnforceSorting` are currently implemented as two 
separate physical optimizer rules that run independently. This design is unique 
among major query engines -- Spark (`EnsureRequirements`), Presto/Trino 
(`AddExchanges`), and others all handle distribution and sorting in a **single 
combined rule**. The separation in DataFusion leads to a class of correctness 
bugs where one rule undoes the invariants established by the other, because 
sorting and distribution are fundamentally coupled (`preserve_partitioning` on 
`SortExec` directly affects output partition count).
   
   This epic tracks the work to merge these two rules into a single 
`EnsureRequirements`-style rule that handles both distribution and sorting in 
one pass, eliminating the non-idempotent composition and the recurring bugs it 
causes.
   
   ## Motivation
   
   ### The fundamental coupling
   
   `SortExec` has a `preserve_partitioning` flag that determines whether it 
outputs one partition (merging all inputs) or N partitions (sorting each 
independently). This means **every sorting decision is also a distribution 
decision**, and vice versa. Handling them in separate rules creates a semantic 
gap where each rule makes locally correct decisions that are globally incorrect.
   
   ### Non-idempotent composition
   
   Running `EnforceDistribution` followed by `EnforceSorting` does not produce 
a stable plan. Running the pair again can produce a different (and sometimes 
invalid) plan:
   
   ```
   Round 1: EnforceDistribution -> fixes distribution
            EnforceSorting      -> pushdown_sorts breaks distribution
            
   Round 2: EnforceDistribution -> fixes the NEW distribution violation
            EnforceSorting      -> pushdown_sorts breaks it AGAIN (different 
location)
   ```
   
   This is particularly problematic for downstream projects that run custom 
optimizer rules between or after these passes (e.g., remote execution, 
materialized view selection), which necessitate additional rounds of 
`EnforceSorting`.
   
   ### Real-world impact
   
   We maintain a production system (Polygon.io Atlas) serving financial market 
data APIs built on DataFusion. The separation of these rules has caused 
**multiple production incidents** over the past months:
   
   1. **`SanityCheckPlan` failures on multi-partition `StorageExec` + 
`GlobalLimitExec`** (April 2026): `EnforceSorting`'s `pushdown_sorts` pushed a 
`SortExec` through an intermediate node onto a 32-partition input, setting 
`preserve_partitioning=true` without inserting `SortPreservingMergeExec`. 
`GlobalLimitExec` requires `SinglePartition` -> 502 errors for specific API 
users. Root cause: `pushdown_sorts` has no knowledge of distribution 
requirements.
   
   2. **`InterleaveExec::with_new_children` panics** (#21826, April 2026): 
Running `EnforceDistribution` twice (which our custom optimizer chain does) 
causes `InterleaveExec` created in the first pass to panic when the second pass 
changes children's partitioning.
   
   3. **Planning time explosion with materialized views** (April 2026): 
`EnforceDistribution`'s `adjust_input_keys_ordering` returns `Transformed::yes` 
unconditionally (#21946), causing unnecessary plan tree rebuilds that trigger 
expensive cost re-evaluation in `OneOfExec` (materialized view candidate 
selection). Each `EnforceSorting` + `EnforceDistribution` round compounds this 
cost.
   
   ### Existing upstream issues (same root cause)
   
   - #14150: "Bug: applying multiple times `EnforceDistribution` generates 
invalid plan" -- running `EnforceDistribution` twice loses limit `fetch` 
values, producing wrong results.
   - #18989: "Sanity check failed when sort and aggregate on a 
multi-partitioned table" -- `EnforceDistribution` fails to inject necessary 
`RepartitionExec` between aggregate operations.
   - #16888: "Window aggregates output order broken due to hash repartitioning" 
-- `EnforceDistribution` breaks ordering established by `EnforceSorting`.
   - #21826: "`InterleaveExec::with_new_children` panics when optimizer 
rewrites change children's partitioning" -- second `EnforceDistribution` pass 
panics on plan created by first pass.
   
   All of these share the same root cause: **two separate rules making 
independent decisions about coupled concerns**.
   
   ## How other engines solve this
   
   ### Apache Spark: `EnsureRequirements`
   
   Spark handles both in a [single 
rule](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala)
 using `transformUp`:
   
   ```scala
   // For each operator, in a single pass:
   // 1. Check requiredChildDistribution -> add ShuffleExchangeExec if needed
   // 2. Check requiredChildOrdering -> add SortExec if needed
   // Distribution is always resolved before sorting for the same operator.
   ```
   
   **Key design**: sorting decisions are made AFTER distribution is finalized 
for each operator. `SortExec(global=false)` preserves partition boundaries. No 
separate "sort pushdown" pass exists.
   
   ### Presto/Trino: `AddExchanges`
   
   Presto's `AddExchanges` rule similarly handles both distribution and sorting 
properties in a single rule, using a `PreferredProperties` structure that 
carries both distribution and ordering preferences through the plan tree.
   
   ## Proposed approach
   
   ### Phase 1: Make `pushdown_sorts` distribution-aware (short-term fix)
   
   Add a `distribution_requirement` field to `ParentRequirements` in 
`pushdown_sorts`, so that `add_sort_above` knows when to insert 
`SortPreservingMergeExec`. This is a targeted fix for the most critical bug.
   
   **Status**: Implemented in our fork ([PR 
#54](https://github.com/massive-com/arrow-datafusion/pull/54)), ready to port 
upstream.
   
   ### Phase 2: Add idempotency tests (validation)
   
   Add tests that verify `EnforceDistribution -> EnforceSorting` produces a 
stable plan:
   
   ```rust
   fn assert_idempotent(plan: Arc<dyn ExecutionPlan>) {
       let p1 = EnforceDistribution::optimize(plan)?;
       let p1 = EnforceSorting::optimize(p1)?;
       let p2 = EnforceDistribution::optimize(p1.clone())?;
       let p2 = EnforceSorting::optimize(p2)?;
       assert_eq!(display(p1), display(p2));
   }
   ```
   
   Test against various plan topologies: multi-partition sorts with limits, 
unions with mixed partition counts, projections over multi-partition sources, 
window functions, etc.
   
   ### Phase 3: Merge into `EnsureRequirements` (architectural fix)
   
   Create a new `EnsureRequirements` rule that replaces both 
`EnforceDistribution` and `EnforceSorting`:
   
   ```rust
   pub struct EnsureRequirements;
   
   impl PhysicalOptimizerRule for EnsureRequirements {
       fn optimize(&self, plan: Arc<dyn ExecutionPlan>, config: &ConfigOptions) 
-> Result<Arc<dyn ExecutionPlan>> {
           // Single bottom-up pass:
           // For each operator, ensure children satisfy both
           // requiredChildDistribution AND requiredChildOrdering.
           // Distribution is resolved before ordering for each operator.
           plan.transform_up(|node| ensure_requirements(node, config))
       }
   }
   
   struct Requirements {
       distribution: Distribution,
       ordering: Option<OrderingRequirements>,
       fetch: Option<usize>,
   }
   
   fn ensure_requirements(node: PlanContext<Requirements>) -> 
Result<Transformed<...>> {
       for (child, required_dist, required_ordering) in zip(children, 
distributions, orderings) {
           // Step 1: Ensure distribution
           if !child.output_partitioning().satisfies(&required_dist) {
               child = add_exchange(child, required_dist);
           }
           // Step 2: Ensure ordering (distribution is already correct)
           if !child.output_ordering().satisfies(&required_ordering) {
               child = add_sort(child, required_ordering, required_dist); // 
dist-aware!
           }
       }
   }
   ```
   
   **Key properties**:
   - **Single pass**: No separate `pushdown_sorts` that can undo distribution 
work
   - **Distribution before sorting**: For each operator, distribution is 
settled before sorting decisions, like Spark
   - **Naturally idempotent**: Running it twice produces the same plan because 
each operator's children are already correct after the first pass
   - **Sort pushdown integrated**: Instead of a separate top-down pass, sort 
pushdown is handled by the bottom-up pass recognizing when a child already 
satisfies ordering (no sort needed)
   
   ### Migration path
   
   1. `EnsureRequirements` can coexist with the old rules during development
   2. Add a feature flag to switch between old and new behavior
   3. Validate with the full DataFusion test suite + sqllogictest
   4. Deprecate `EnforceDistribution` + `EnforceSorting` after stabilization
   
   ## Sub-tasks
   
   - [ ] Phase 1: Port `pushdown_sorts` distribution fix upstream (from our 
fork PR #54)
   - [ ] Phase 1: Port `ensure_sorting` distribution fix upstream (from our 
fork PR #53)
   - [ ] Phase 2: Add idempotency test framework for physical optimizer rules
   - [ ] Phase 2: Add idempotency tests for `EnforceDistribution` + 
`EnforceSorting` composition
   - [ ] Phase 3: Design `EnsureRequirements` API and `Requirements` structure
   - [ ] Phase 3: Implement combined distribution + ordering enforcement in 
single bottom-up pass
   - [ ] Phase 3: Integrate sort pushdown into the bottom-up pass
   - [ ] Phase 3: Handle `parallelize_sorts` and 
`replace_with_order_preserving_variants` optimizations
   - [ ] Phase 3: Migrate `OutputRequirements` (add/remove) into the new rule
   - [ ] Phase 3: Feature flag and migration path
   - [ ] Phase 3: Validate against full test suite
   
   ## References
   
   - Spark `EnsureRequirements`: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
   - Presto `AddExchanges`: 
https://github.com/prestodb/presto/wiki/New-Optimizer
   - #14150: Multiple `EnforceDistribution` generates invalid plan
   - #18989: Sanity check failed with sort + aggregate on multi-partitioned 
table  
   - #16888: Window aggregates output order broken due to hash repartitioning
   - #21826: `InterleaveExec::with_new_children` panics from optimizer rewrites
   - #21946: `adjust_input_keys_ordering` returns `Transformed::yes` 
unconditionally


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to