SubhamSinghal opened a new pull request, #21479:
URL: https://github.com/apache/datafusion/pull/21479
## Which issue does this PR close?
- Related to https://github.com/apache/datafusion/issues/6899.
## Rationale for this change
Queries like `SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val)
as rn FROM t WHERE rn <= K` are extremely common in analytics ("top N per
group"). The current plan sorts the **entire** dataset O(N log N), computes
ROW_NUMBER for all rows, then filters. With 10M rows, 1K partitions, and K=3,
we sort all 10M rows but only keep 3K.
This PR introduces a `PartitionedTopKExec` operator that replaces the
`SortExec`, maintaining a per-partition `TopK` heap (reusing DataFusion's
existing `TopK` implementation). Cost drops to O(N log K) time and O(K × P ×
row_size) memory.
## What changes are included in this PR?
**New physical operator: `PartitionedTopKExec`**
(`physical-plan/src/sorts/partitioned_topk.rs`)
- Reads unsorted input, groups rows by partition key using `RowConverter`,
feeds sub-batches to a per-partition `TopK` heap
- Emits only the top-K rows per partition in sorted `(partition_keys,
order_keys)` order
- Reuses the existing `TopK` implementation for heap management, sort key
comparison, eviction, and batch compaction
**New optimizer rule: `WindowTopN`**
(`physical-optimizer/src/window_topn.rs`)
Detects the pattern:
```text
FilterExec(rn <= K)
[optional ProjectionExec]
BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...)
SortExec(partition_keys, order_keys)
```
And replaces it with:
```text
[optional ProjectionExec]
BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...)
PartitionedTopKExec(fetch=K)
```
Both `FilterExec` and `SortExec` are removed.
Supported predicates: `rn <= K`, `rn < K`, `K >= rn`, `K > rn`.
The rule only fires for `ROW_NUMBER` with a `PARTITION BY` clause. Global
top-K (no `PARTITION BY`) is already handled by
`SortExec` with `fetch`.
**Config flag:** `datafusion.optimizer.enable_window_topn` (default:
`true`)
**Benchmark results** (H2O groupby Q8, 10M rows, top-2 per partition):
cargo run --release --example h2o_window_topn_bench
| Scenario | Enabled (ms) | Disabled (ms) | Speedup |
|----------|-------------|--------------|---------|
| 100 partitions (100K rows/part) | 43 | 174 | 4.0x |
| 1K partitions (10K rows/part) | 71 | 146 | 2.1x |
| 10K partitions (1K rows/part) | 619 | 128 | 0.2x (regression) |
| 100K partitions (100 rows/part) | 4368 | 135 | 0.03x (regression) |
The 100K-partition regression is expected: per-partition `TopK` overhead
(RowConverter, MemoryReservation per instance)
dominates when partitions are very numerous with few rows each. For the
common case (moderate partition cardinality), the
optimization provides 2-3x speedup.
## Are these changes tested?
Yes:
- **7 unit tests** (`core/tests/physical_optimizer/window_topn.rs`): basic
ROW_NUMBER, `rn < K`, flipped predicates, non-window column filter, config
disabled, no partition by, projection between filter and window
- **5 SLT tests** (`sqllogictest/test_files/window_topn.slt`): correctness
verification, EXPLAIN plan validation, `rn < K`, no-partition-by case, config
disabled fallback
## Are there any user-facing changes?
No breaking API changes. The optimization is enabled by default and
transparent to users. It can be disabled via:
```sql
SET datafusion.optimizer.enable_window_topn = false;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]