avantgardnerio opened a new pull request, #23026:
URL: https://github.com/apache/datafusion/pull/23026
## Which issue does this PR close?
- N/A — opening as a draft to share the shape with reviewers and gather
feedback before filing the tracking issue. Related context: #22356
(PARTITION BY parallelization), #22395 (`Partitioning::Range`
groundwork by @gabotechs / Datadog).
## Rationale for this change
Bounded RANGE-frame window functions with `ORDER BY` but no
`PARTITION BY` currently collapse to a single execution partition.
`BoundedWindowAggExec::required_input_distribution()` returns
`Distribution::SinglePartition` in that case, which causes
`EnsureRequirements` to insert a `SortPreservingMergeExec` and serialize
a step that is otherwise embarrassingly parallel along the sort key.
The natural unit of parallelism here is a *range bucket* on the order
key. If the sort key range can be split into `N` disjoint buckets, the
window operator can run `N` ways in parallel — almost. RANGE frames
need to see a small window of "halo" rows from the neighbouring bucket
to compute frame values at boundary seams, but those halo rows must
not flow through into the final output or be visible to slide
accumulator math after the boundary row.
## What changes are included in this PR?
Four pieces, intentionally small and decoupled:
1. **`PartitionExtremes` + `ExecutionPlan::runtime_partition_extremes`**
(`physical-plan/src/execution_plan.rs:97-127, 540-560`). A new trait
method that returns per-partition lex-min / lex-max tuples of the
declared output ordering, populated at runtime rather than from
plan-time `Statistics`. Default `Ok(None)`.
The type doc calls out a *dual interpretation* the implementer can
choose: *observed* (the default, what `SortExec` does) or *intended*
(what `RangeRepartitionExec` does — its bucket's primary range,
which is narrower than the data it actually carries). The
distinction matters because `HaloDropExec` consumes the "intended"
interpretation to strip halo rows without a side-channel.
2. **`SortExec` runtime observer** (`physical-plan/src/sorts/sort.rs`).
Folds first/last rows of each `sort_batch_chunked` invocation into
a per-partition `Mutex<Option<PartitionExtremes>>` slot. Cheap, no
per-row cost — uses the already-sorted chunks the operator
produces. The slot is the consumer's contract: drive the upstream
to first batch, then read.
3. **`RangeRepartitionExec`** (`physical-plan/src/range_repartition.rs`).
K-way fan-in / fan-out coordinator. On first `execute()` it opens
every input partition, drives each to its first batch (which makes
the pipeline-breaking sort below populate its extremes slot),
lex-reduces per-input extremes into a global, derives N equal-width
Int64 boundaries, and routes every subsequent batch into per-bucket
mpsc channels. Halo rows land in *two* buckets (their primary and
the neighbour that needs them for frame context). Also implements
`runtime_partition_extremes` returning each output partition's
*intended* primary range — the "useful lie" mentioned above.
4. **`HaloDropExec`** (`physical-plan/src/halo_drop.rs`). Per-partition
filter that reads its input's `runtime_partition_extremes(partition)`
on the first batch and drops rows whose leading sort key falls
outside that range. Sits above `BoundedWindowAggExec`, below the
final `SortPreservingMergeExec`. The lazy first-batch read works
because the upstream `RangeRepartitionExec` has populated the
primary ranges by the time any batch reaches us.
5. **`ParallelWindow` optimizer rule**
(`physical-optimizer/src/parallel_window.rs`). Runs *before*
`EnsureRequirements` so it owns the distribution decision instead
of fighting an inserted SPM. For an eligible window
(RANGE frame, no PARTITION BY, single Column sort key, Int64
halo distances) the rule rebuilds the operator as
`HaloDropExec(BWAG_parallel_aware(RangeRepartitionExec(input)))`.
`BoundedWindowAggExec` grew a `parallel_aware: bool` flag that
flips its `required_input_distribution()` to
`UnspecifiedDistribution`.
Target physical plan shape (from the included SLT):
```
SortPreservingMergeExec (N → 1)
HaloDropExec (N partitions; per-partition primary filter)
BoundedWindowAggExec (N partitions, parallel_aware)
RangeRepartitionExec (N, halo duplicates intended)
SortExec (preserve_partitioning, N)
DataSource
```
## Are these changes tested?
`sqllogictest/test_files/parallel_window.slt` is the fixture:
- 4 parquet files with overlapping `seq` ranges (so routing actually
has to move rows around),
- EXPLAIN assertion on the new plan shape,
- value assertion on a 5-row truncation,
- row-count assertion that catches halo leakage
(`count(rolling_sum) = 100`, not 115).
I have *not* yet added a feature flag + equality SLT comparing parallel
vs. serial outputs row-for-row. That's the next baby step and is what
will catch slide-accumulator boundary bugs that the current count
assertion can't see. Posting now to drive the design conversation
before investing in that loop.
## Are there any user-facing changes?
- New nodes in plan trees: `RangeRepartitionExec`, `HaloDropExec`.
- New trait method `ExecutionPlan::runtime_partition_extremes` with a
default `Ok(None)` — existing implementations are unaffected.
- New `PartitionExtremes` re-export from `physical-plan`.
- No SQL surface changes.
## Known scope cuts (PoC)
- **Int64-only** leading sort key with Int64-bounded RANGE distances.
The `runtime_partition_extremes` *type* is generic
(`Vec<ScalarValue>`); the coordinator's bucketing arithmetic is
what's Int64-only. `DataType`-dispatch is the permanent architecture
here, not sample-based bucketing.
- **Single sort key** (`LexOrdering` len 1). Multi-key extremes are
in the type from day one; coordinator + halo math are not.
- **No feature flag** yet. The rule fires unconditionally on eligible
windows. A session config `optimizer.enable_parallel_window` (default
off) is the next commit.
- **No equality SLT** comparing parallel-on vs. parallel-off outputs.
Next commit after the flag.
- **No Ballista distribution** yet. The coordinator is the AQE
reducer; lifting it scheduler-side is a follow-up (~300-500 LoC).
- **No benchmark numbers** yet. Coming after the equality SLT lands.
## Coexistence with #22395
`Partitioning::Range` + `RangePartitioning { ordering, split_points }`
have already merged from @gabotechs. The execution slot in
`RepartitionExec` for `Partitioning::Range` is still
`not_impl_err!`. This PR keeps `RangeRepartitionExec` as a separate
operator for now because it does something `RepartitionExec` is
unlikely to absorb (halo duplication). Once the design is settled here
I expect to thread the existing `RangePartitioning` shape through
where the boundaries live, rather than carry a parallel
`Vec<ScalarValue>` representation forever.
## Draft status
This is up for *architecture review*, not approval. CI will be red —
clippy / fmt / unrelated tests are not yet tended. I will not push
forward to "green" until reviewers agree the shape is roughly right.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]