zhuqi-lucas opened a new pull request, #21976:
URL: https://github.com/apache/datafusion/pull/21976

   ## Summary
   
   This PR adds a new `EnsureRequirements` optimizer rule that merges 
`EnforceDistribution` and `EnforceSorting` into a single coordinated rule, and 
fixes `pushdown_sorts` to be distribution-aware, making the composition 
idempotent.
   
   **Epic**: #21973
   
   ## Problem
   
   `EnforceDistribution` and `EnforceSorting` run as separate rules, but 
sorting and distribution are coupled through `SortExec.preserve_partitioning`. 
`pushdown_sorts` (inside `EnforceSorting`) removes sorts and reinserts them via 
`add_sort_above()`, which has no distribution context. This creates 
`SortExec(preserve_partitioning=true)` without `SortPreservingMergeExec`, 
violating `SinglePartition` requirements from operators like `GlobalLimitExec`.
   
   This caused production 502 errors for specific API users whose entitlements 
triggered 32-partition `StorageExec` with `in_list_partitioning`.
   
   DataFusion is the only major query engine that separates distribution and 
sorting enforcement — Spark (`EnsureRequirements`), Presto/Trino 
(`AddExchanges`) handle both in a single rule.
   
   ## Changes
   
   ### 1. `EnsureRequirements` rule (`ensure_requirements/mod.rs` + 
`new_tests.rs`)
   - Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()` 
in a single rule
   - 41 comprehensive tests including:
     - Multi-partition sort + limit (2, 4, 8, 16, 32, 64 partitions)
     - Union with mixed partition counts
     - Projection, Filter, Repartition, Coalesce over multi-partition
     - Hash joins, aggregates, window functions
     - Fetch/skip preservation (#14150 regression)
     - PR #53/#54 regression scenarios
     - **Idempotency tests**: 1-64 partition sweep, triple-optimize 
convergence, all bug-triggering topologies
   
   ### 2. Distribution-aware `pushdown_sorts` (`sort_pushdown.rs`)
   - Add `distribution_requirement: Distribution` field to `ParentRequirements`
   - Initialize from `plan.required_input_distribution()` in 
`assign_initial_requirements`
   - Propagate through recursion with `stronger_distribution()` helper
   - Reset to `UnspecifiedDistribution` below partition-merging nodes (SPM, 
single-partition outputs)
   - Switch both `add_sort_above` call sites to 
`add_sort_above_with_distribution`
   
   ### 3. `add_sort_above_with_distribution` (`utils.rs`)
   - New utility: inserts `SortPreservingMergeExec` when parent requires 
`SinglePartition` and input has multiple partitions
   
   ## Testing
   
   | Test Suite | Result |
   |-----------|--------|
   | EnsureRequirements (new) | **41 passed** |
   | enforce_sorting (existing) | **124 passed, 0 failed** |
   | enforce_distribution (existing) | **66 passed, 0 failed** |
   | SLT (464 files) | **1 pre-existing failure only** |
   | **Total** | **695 tests, 0 regressions** |
   
   Key idempotency verifications:
   - All partition counts 1-64 ✓
   - PR #53 scenario (OutputRequirementExec + multi-partition) ✓
   - PR #54 scenario (ProjectionExec + multi-partition) ✓
   - SPM → Sort → multi-partition ✓
   - Sort → Aggregate pattern (#18989) ✓
   - Triple optimization convergence ✓
   
   ## Next Steps (future PRs)
   
   - [ ] Replace default optimizer chain with `EnsureRequirements` (currently 
opt-in)
   - [ ] Gradually eliminate `pushdown_sorts` by implementing its optimizations 
in the bottom-up `ensure_sorting` pass
   - [ ] Single-pass architecture (like Spark's `EnsureRequirements`)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to