zhuqi-lucas commented on issue #21973:
URL: https://github.com/apache/datafusion/issues/21973#issuecomment-4359773821

   ## Initial Design Thoughts
   
   After analyzing both rules in depth (~4000 lines total), here's my proposed 
architecture for the merged `EnsureRequirements` rule.
   
   ### Why merging is the right approach
   
   The current separation has a fundamental design flaw: **sorting and 
distribution are coupled through `SortExec.preserve_partitioning`**, but the 
two rules make decisions independently. This coupling means:
   
   1. `EnforceDistribution` already partially handles ordering 
(`replace_order_preserving_variants`, `add_sort_above_with_check`)
   2. `EnforceSorting` already partially handles distribution 
(`parallelize_sorts` removes `CoalescePartitionsExec`, 
`analyze_immediate_sort_removal` adds `SortPreservingMergeExec`)
   3. The responsibility boundary is blurred — both rules touch both concerns
   
   Every major query engine (Spark's `EnsureRequirements`, Presto/Trino's 
`AddExchanges`) handles distribution and sorting in a **single rule** for 
exactly this reason.
   
   ### The `pushdown_sorts` problem
   
   The root cause of non-idempotency is `pushdown_sorts` — the **only top-down 
pass** in `EnforceSorting`. It:
   
   1. Removes existing `SortExec` (which `ensure_sorting` just placed correctly)
   2. Walks down the tree carrying ordering requirements
   3. Reinserts `SortExec` at the deepest pushable position via `add_sort_above`
   4. `add_sort_above` has no distribution context → sets 
`preserve_partitioning=true` on multi-partition input without inserting 
`SortPreservingMergeExec`
   
   **Spark doesn't have a separate `pushdown_sorts` pass.** Sort pushdown is 
implicit in the bottom-up traversal: if a child already satisfies the ordering 
requirement, no `SortExec` is added — equivalent to "the sort was pushed to the 
deepest satisfying position."
   
   ### Proposed architecture
   
   ```
   EnsureRequirements::optimize(plan)
   │
   ├─ Phase 1 (optional): reorder_join_keys (top-down)
   │   └─ Keep existing adjust_input_keys_ordering logic unchanged
   │
   ├─ Phase 2: ensure_requirements (single bottom-up pass)
   │   └─ For each node (bottom-up):
   │       For each child:
   │         Step 1: Ensure distribution requirement
   │           └─ Add RepartitionExec / CoalescePartitionsExec / 
SortPreservingMergeExec
   │         Step 2: Ensure ordering requirement (distribution-aware)
   │           └─ Add SortExec with correct preserve_partitioning + SPM if 
needed
   │         Step 3: Local optimizations
   │           ├─ parallelize_sorts (inline): Coalesce+Sort → Sort+SPM
   │           ├─ order-preserving variants (inline): Repartition → 
preserve_order
   │           └─ partial sort (inline): SortExec → PartialSortExec
   │
   └─ No separate pushdown_sorts pass!
       └─ Sort pushdown is implicit: bottom-up pass only adds sort
          where child doesn't satisfy ordering → sort is naturally
          at the deepest satisfying position
   ```
   
   ### Unified data structure
   
   ```rust
   /// Combined requirement for a single child
   struct ChildRequirement {
       distribution: Distribution,
       ordering: Option<OrderingRequirements>,
       fetch: Option<usize>,
       // Auxiliary decision flags
       hash_necessary: bool,
       roundrobin_beneficial: bool,
   }
   ```
   
   ### Key design properties
   
   1. **Single pass**: One bottom-up traversal handles both distribution and 
sorting
   2. **Distribution before sorting**: For each operator's children, 
distribution is settled first, then sorting decisions are made with full 
distribution context
   3. **Naturally idempotent**: Running the rule twice produces the same plan — 
each child is already correct after the first pass
   4. **No `OutputRequirementExec` needed**: The single pass carries 
requirements directly, eliminating the add/remove ceremony and the 
`require_top_ordering_helper` distribution extraction bug
   5. **`pushdown_sorts` eliminated**: Sort pushdown is implicit in the 
bottom-up traversal
   
   ### Migration strategy
   
   1. Implement `EnsureRequirements` alongside existing rules
   2. Feature flag to switch between old and new behavior
   3. Validate against full DataFusion test suite + sqllogictest
   4. Deprecate `EnforceDistribution` + `EnforceSorting` after stabilization
   
   ### Scope & complexity
   
   This is a large change (~4000 lines to rewrite), but the existing code 
already has significant overlap between the two rules. The merged version 
should be significantly simpler because:
   - No need for `OutputRequirementExec` add/remove ceremony
   - No `pushdown_sorts` top-down pass
   - No duplicated `replace_order_preserving_variants` logic
   - Single traversal instead of 5+ traversals
   
   I'll start with Phase 1 (the `pushdown_sorts` distribution fix) as a 
standalone upstream PR, then iterate on the full merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to