avantgardnerio opened a new pull request, #23026:
URL: https://github.com/apache/datafusion/pull/23026

   ## Which issue does this PR close?
   
   - N/A — opening as a draft to share the shape with reviewers and gather
     feedback before filing the tracking issue. Related context: #22356
     (PARTITION BY parallelization), #22395 (`Partitioning::Range`
     groundwork by @gabotechs / Datadog).
   
   ## Rationale for this change
   
   Bounded RANGE-frame window functions with `ORDER BY` but no
   `PARTITION BY` currently collapse to a single execution partition.
   `BoundedWindowAggExec::required_input_distribution()` returns
   `Distribution::SinglePartition` in that case, which causes
   `EnsureRequirements` to insert a `SortPreservingMergeExec` and serialize
   a step that is otherwise embarrassingly parallel along the sort key.
   
   The natural unit of parallelism here is a *range bucket* on the order
   key. If the sort key range can be split into `N` disjoint buckets, the
   window operator can run `N` ways in parallel — almost. RANGE frames
   need to see a small window of "halo" rows from the neighbouring bucket
   to compute frame values at boundary seams, but those halo rows must
   not flow through into the final output or be visible to slide
   accumulator math after the boundary row.
   
   ## What changes are included in this PR?
   
   Four pieces, intentionally small and decoupled:
   
   1. **`PartitionExtremes` + `ExecutionPlan::runtime_partition_extremes`**
      (`physical-plan/src/execution_plan.rs:97-127, 540-560`). A new trait
      method that returns per-partition lex-min / lex-max tuples of the
      declared output ordering, populated at runtime rather than from
      plan-time `Statistics`. Default `Ok(None)`.
   
      The type doc calls out a *dual interpretation* the implementer can
      choose: *observed* (the default, what `SortExec` does) or *intended*
      (what `RangeRepartitionExec` does — its bucket's primary range,
      which is narrower than the data it actually carries). The
      distinction matters because `HaloDropExec` consumes the "intended"
      interpretation to strip halo rows without a side-channel.
   
   2. **`SortExec` runtime observer** (`physical-plan/src/sorts/sort.rs`).
      Folds first/last rows of each `sort_batch_chunked` invocation into
      a per-partition `Mutex<Option<PartitionExtremes>>` slot. Cheap, no
      per-row cost — uses the already-sorted chunks the operator
      produces. The slot is the consumer's contract: drive the upstream
      to first batch, then read.
   
   3. **`RangeRepartitionExec`** (`physical-plan/src/range_repartition.rs`).
      K-way fan-in / fan-out coordinator. On first `execute()` it opens
      every input partition, drives each to its first batch (which makes
      the pipeline-breaking sort below populate its extremes slot),
      lex-reduces per-input extremes into a global, derives N equal-width
      Int64 boundaries, and routes every subsequent batch into per-bucket
      mpsc channels. Halo rows land in *two* buckets (their primary and
      the neighbour that needs them for frame context). Also implements
      `runtime_partition_extremes` returning each output partition's
      *intended* primary range — the "useful lie" mentioned above.
   
   4. **`HaloDropExec`** (`physical-plan/src/halo_drop.rs`). Per-partition
      filter that reads its input's `runtime_partition_extremes(partition)`
      on the first batch and drops rows whose leading sort key falls
      outside that range. Sits above `BoundedWindowAggExec`, below the
      final `SortPreservingMergeExec`. The lazy first-batch read works
      because the upstream `RangeRepartitionExec` has populated the
      primary ranges by the time any batch reaches us.
   
   5. **`ParallelWindow` optimizer rule**
      (`physical-optimizer/src/parallel_window.rs`). Runs *before*
      `EnsureRequirements` so it owns the distribution decision instead
      of fighting an inserted SPM. For an eligible window
      (RANGE frame, no PARTITION BY, single Column sort key, Int64
      halo distances) the rule rebuilds the operator as
      `HaloDropExec(BWAG_parallel_aware(RangeRepartitionExec(input)))`.
      `BoundedWindowAggExec` grew a `parallel_aware: bool` flag that
      flips its `required_input_distribution()` to
      `UnspecifiedDistribution`.
   
   Target physical plan shape (from the included SLT):
   
   ```
   SortPreservingMergeExec  (N → 1)
     HaloDropExec  (N partitions; per-partition primary filter)
       BoundedWindowAggExec  (N partitions, parallel_aware)
         RangeRepartitionExec  (N, halo duplicates intended)
           SortExec  (preserve_partitioning, N)
             DataSource
   ```
   
   ## Are these changes tested?
   
   `sqllogictest/test_files/parallel_window.slt` is the fixture:
   - 4 parquet files with overlapping `seq` ranges (so routing actually
     has to move rows around),
   - EXPLAIN assertion on the new plan shape,
   - value assertion on a 5-row truncation,
   - row-count assertion that catches halo leakage
     (`count(rolling_sum) = 100`, not 115).
   
   I have *not* yet added a feature flag + equality SLT comparing parallel
   vs. serial outputs row-for-row. That's the next baby step and is what
   will catch slide-accumulator boundary bugs that the current count
   assertion can't see. Posting now to drive the design conversation
   before investing in that loop.
   
   ## Are there any user-facing changes?
   
   - New nodes in plan trees: `RangeRepartitionExec`, `HaloDropExec`.
   - New trait method `ExecutionPlan::runtime_partition_extremes` with a
     default `Ok(None)` — existing implementations are unaffected.
   - New `PartitionExtremes` re-export from `physical-plan`.
   - No SQL surface changes.
   
   ## Known scope cuts (PoC)
   
   - **Int64-only** leading sort key with Int64-bounded RANGE distances.
     The `runtime_partition_extremes` *type* is generic
     (`Vec<ScalarValue>`); the coordinator's bucketing arithmetic is
     what's Int64-only. `DataType`-dispatch is the permanent architecture
     here, not sample-based bucketing.
   - **Single sort key** (`LexOrdering` len 1). Multi-key extremes are
     in the type from day one; coordinator + halo math are not.
   - **No feature flag** yet. The rule fires unconditionally on eligible
     windows. A session config `optimizer.enable_parallel_window` (default
     off) is the next commit.
   - **No equality SLT** comparing parallel-on vs. parallel-off outputs.
     Next commit after the flag.
   - **No Ballista distribution** yet. The coordinator is the AQE
     reducer; lifting it scheduler-side is a follow-up (~300-500 LoC).
   - **No benchmark numbers** yet. Coming after the equality SLT lands.
   
   ## Coexistence with #22395
   
   `Partitioning::Range` + `RangePartitioning { ordering, split_points }`
   have already merged from @gabotechs. The execution slot in
   `RepartitionExec` for `Partitioning::Range` is still
   `not_impl_err!`. This PR keeps `RangeRepartitionExec` as a separate
   operator for now because it does something `RepartitionExec` is
   unlikely to absorb (halo duplication). Once the design is settled here
   I expect to thread the existing `RangePartitioning` shape through
   where the boundaries live, rather than carry a parallel
   `Vec<ScalarValue>` representation forever.
   
   ## Draft status
   
   This is up for *architecture review*, not approval. CI will be red —
   clippy / fmt / unrelated tests are not yet tended. I will not push
   forward to "green" until reviewers agree the shape is roughly right.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to