SubhamSinghal opened a new pull request, #21479:
URL: https://github.com/apache/datafusion/pull/21479

     ## Which issue does this PR close?
   
     - Related to https://github.com/apache/datafusion/issues/6899.
   
     ## Rationale for this change
   
     Queries like `SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) 
as rn FROM t WHERE rn <= K` are extremely common in analytics ("top N per 
group"). The current plan sorts the **entire** dataset O(N log N), computes 
ROW_NUMBER for all rows, then filters. With 10M rows, 1K partitions, and K=3, 
we sort all 10M rows but only keep 3K.
   
   This PR introduces a `PartitionedTopKExec` operator that replaces the 
`SortExec`, maintaining a per-partition `TopK` heap (reusing DataFusion's 
existing `TopK` implementation). Cost drops to O(N log K) time and O(K × P × 
row_size) memory.
   
     ## What changes are included in this PR?
   
     **New physical operator: `PartitionedTopKExec`** 
(`physical-plan/src/sorts/partitioned_topk.rs`)
     - Reads unsorted input, groups rows by partition key using `RowConverter`, 
feeds sub-batches to a per-partition `TopK` heap
     - Emits only the top-K rows per partition in sorted `(partition_keys, 
order_keys)` order
     - Reuses the existing `TopK` implementation for heap management, sort key 
comparison, eviction, and batch compaction
   
     **New optimizer rule: `WindowTopN`** 
(`physical-optimizer/src/window_topn.rs`)
   
     Detects the pattern:
     ```text
     FilterExec(rn <= K)
       [optional ProjectionExec]
         BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...)
           SortExec(partition_keys, order_keys)
     ```
   
     And replaces it with:
     ```text
     [optional ProjectionExec]
       BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...)
         PartitionedTopKExec(fetch=K)
     ```
   
     Both `FilterExec` and `SortExec` are removed.
   
     Supported predicates: `rn <= K`, `rn < K`, `K >= rn`, `K > rn`.
   
     The rule only fires for `ROW_NUMBER` with a `PARTITION BY` clause. Global 
top-K (no `PARTITION BY`) is already handled by
     `SortExec` with `fetch`.
   
     **Config flag:** `datafusion.optimizer.enable_window_topn` (default: 
`true`)
   
    **Benchmark results** (H2O groupby Q8, 10M rows, top-2 per partition):
   
     cargo run --release --example h2o_window_topn_bench
   
     | Scenario | Enabled (ms) | Disabled (ms) | Speedup |
     |----------|-------------|--------------|---------|
     | 100 partitions (100K rows/part) | 43 | 174 | 4.0x |
     | 1K partitions (10K rows/part) | 71 | 146 | 2.1x |
     | 10K partitions (1K rows/part) | 619 | 128 | 0.2x (regression) |
     | 100K partitions (100 rows/part) | 4368 | 135 | 0.03x (regression) |
   
     The 100K-partition regression is expected: per-partition `TopK` overhead 
(RowConverter, MemoryReservation per instance)
     dominates when partitions are very numerous with few rows each. For the 
common case (moderate partition cardinality), the
     optimization provides 2-3x speedup.
   
     ## Are these changes tested?
   
     Yes:
     - **7 unit tests** (`core/tests/physical_optimizer/window_topn.rs`): basic 
ROW_NUMBER, `rn < K`, flipped predicates, non-window column filter, config 
disabled, no partition by, projection between filter and window
     - **5 SLT tests** (`sqllogictest/test_files/window_topn.slt`): correctness 
verification, EXPLAIN plan validation, `rn < K`, no-partition-by case, config 
disabled fallback
   
     ## Are there any user-facing changes?
   
     No breaking API changes. The optimization is enabled by default and 
transparent to users. It can be disabled via:
     ```sql
     SET datafusion.optimizer.enable_window_topn = false;
     ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to