SubhamSinghal opened a new pull request, #23096:
URL: https://github.com/apache/datafusion/pull/23096
## Which issue does this PR close?
Follow-up to #21479 (`PartitionedTopKExec` for `ROW_NUMBER ... PARTITION
BY ... LIMIT N`) toward closing #6899.
## Rationale for this change
`PartitionedTopKExec` today maintains a `HashMap<OwnedRow, TopK>` — one
full `TopK` per distinct partition key. Each `TopK` carries its own
`RowConverter`, `MemoryReservation` registered with the runtime pool,
`TopKMetrics`, and scratch `Rows` buffer. With high partition cardinality every
partition seen for the first time pays:
- `RowConverter::new` (parses `SortField` list, allocates per-encoder
state)
- `MemoryConsumer::register` with the pool (involves a global lock)
- per-counter `TopKMetrics` setup
- scratch `Rows::empty_rows` allocation
For the h2o window-TopN sweep on a 10M-row CSV (`id3 % N` partition
cardinality), this shows up as a regression at ≥10K partitions —
`PartitionedTopKExec` is slower than the unpartitioned `SortExec` baseline that
it's meant to replace.
## What changes are included in this PR?
Adds a `PartitionedTopK` sibling type to `topk/mod.rs` that holds the
shared encoder/reservation/metrics state once at the operator level and a
`HashMap<OwnedRow, TopKHeap>` of cheap per-partition heap state.
`PartitionedTopKExec` switches from `HashMap<OwnedRow, TopK>` to one
`PartitionedTopK`.
### Bench results
#### Today's default (main, flag-off) vs this PR (flag-on)
| Partitions | main flag-off | this PR flag-on | Delta |
|-----------:|--------------:|----------------:|-------|
| 100 | 282 ms | 105 ms | **2.7x faster** |
| 1,000 | 247 ms | 110 ms | **2.2x faster** |
| 10,000 | 250 ms | 137 ms | **1.8x faster** |
| 100,000 | 222 ms | 320 ms | 1.4x slower |
h2o `id3 % N` sweep, 10M-row CSV, 3 iterations per query, release build,
`enable_window_topn=true` on both sides:
| Partitions | `main` | this PR | Speedup |
|-----------:|-------:|--------:|--------:|
| 100 | 110 ms | 105 ms | ~1.0× |
| 1,000 | 117 ms | 110 ms | ~1.0× |
| 10,000 | 640 ms | 137 ms | **4.7×** |
| 100,000 | 4,327 ms | 320 ms | **13.5×** |
10K is the inflection point: on `main` it's a regression vs the sort
baseline (640 ms vs 234 ms); after this PR it's a win (137 ms vs 234 ms — 1.7×
faster than sort). 100K nearly catches up to the sort baseline (320 ms vs 238
ms).
`enable_window_topn` default stays `false` per the #21479 discussion —
100K+ remains slower than sort on average, so this PR doesn't motivate flipping
the default. It's the prerequisite for further optimizations that would attack
the residual 100K+ cliff.
## Are these changes tested?
Yes
## Are there any user-facing changes?
No public API changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]