avantgardnerio opened a new issue, #23093:
URL: https://github.com/apache/datafusion/issues/23093

   ## What is the problem the feature is trying to solve?
   
   `Partitioning::Range(RangePartitioning { ordering, split_points })` landed 
in #22207 (@gene-bordegaray, Datadog) as the *declarative* form of range 
partitioning — split points are known at plan time, either declared by a 
`TableProvider` or computed by a planner from statistics.
   
   What is missing is the symmetric *runtime-discovered* form: a sibling 
variant where the boundary set is only known once an upstream operator has 
observed its actual data range. Two concrete needs:
   
   1. **Parallelizing single-partition window functions (RANGE frames, no 
PARTITION BY).** A planner can declare "I want N output partitions of this 
sorted stream, split by sort-key range" without knowing the actual range yet. 
The implementing operator discovers the range at execute time (typically from 
its input's runtime extrema, see #23089) and computes interior split points 
before routing rows. The motivating use case from the spike in #23026.
   2. **Future range-aware operators where boundaries are data-dependent.** 
Anything that wants to bucket a single stream into N value-ranges without 
forcing statistics-based estimates to be precise. The existing declarative form 
is the right answer when split points are known a priori; the dynamic form is 
the right answer when they're not.
   
   These two cases share the partitioning semantics (lexicographic ordering, 
half-open intervals) but differ on *when* the boundaries are known. Keeping 
them as two variants of `Partitioning` lets downstream distribution 
requirements have a stable answer at plan time (partition count is fixed) while 
leaving the boundary discovery to the implementing operator.
   
   ## Describe the solution you'd like
   
   A small addition to `Partitioning`:
   
   ```rust
   pub enum Partitioning {
       RoundRobinBatch(usize),
       Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
       Range(RangePartitioning),
       DynamicRange(DynamicRangePartitioning),  // <- new
       UnknownPartitioning(usize),
   }
   
   pub struct DynamicRangePartitioning {
       ordering: LexOrdering,
       partition_count: usize,
   }
   ```
   
   `partition_count` is fixed at plan time so downstream distribution 
requirements have a stable answer. Only the split point values are 
runtime-discovered.
   
   Variant introduction only — no execution slot in this PR. `RepartitionExec` 
returns `not_impl_err!` for `DynamicRange` at every site it does for `Range`. 
Proto serialization returns `not_impl_err!` (proto plumbing follows the same 
incremental cadence as #22207 → #22787). FFI bridges to 
`UnknownPartitioning(n)` (same path `Range` takes per #22394).
   
   Implementation branch: `coralogix/arrow-datafusion@brent/dynamic-range` 
([compare 
view](https://github.com/coralogix/arrow-datafusion/compare/apache:main...coralogix:arrow-datafusion:brent/dynamic-range)).
 +285 / -3 LoC. 3 unit tests covering construction, `compatible_with`, and 
`project` semantics. PR to follow once this discussion lands.
   
   ## Design points worth debating
   
   1. **Is `Partitioning::DynamicRange` the right shape, or should this be a 
flag on the existing `RangePartitioning`?** A `Option<Vec<SplitPoint>>` field 
on `RangePartitioning` (None = "discover at runtime") is the alternative. Two 
variants is preferred here because the operator contract is materially 
different — declared partitioning is a static property the planner can reason 
about; dynamic partitioning is a runtime contract whose split points only 
become defined once an upstream operator publishes them. Keeping them separate 
lets the type system enforce the distinction at every match site. Open to be 
talked out of this.
   2. **`partition_count` at plan time vs. dynamic.** Pinning the partition 
count at plan time keeps the rest of the optimizer/planner simple — every 
distribution decision still has a stable answer. The alternative (dynamic 
partition count too) would require deeper planner changes and is not motivated 
by any current use case.
   3. **How does the execution slot discover boundaries?** That's the 
follow-up: the implementing operator reads its input's runtime extrema via the 
API proposed in #23089 / implemented in #23090. This issue scopes only the 
partitioning variant itself; the execution slot will be filed separately once 
both this and #23090 settle.
   4. **Naming.** `DynamicRange` is straightforward but other options exist 
(`DiscoveredRange`, `RuntimeRange`). Open to the reviewer's preference.
   
   ## Describe alternatives you've considered
   
   - **A flag on `RangePartitioning`.** Discussed above (point 1).
   - **A new dedicated operator** (not a `Partitioning` variant) that routes by 
runtime-discovered boundaries. Rejected because that would duplicate 
`RepartitionExec`'s mechanics and bypass the unified `EnforceDistribution` 
story the existing `Partitioning` enum gives.
   - **Inferring "dynamic" from absence of split points in 
`RangePartitioning`.** Same problem as #1 — collapses two contracts into one 
type at the cost of every consumer having to guard against the absent case.
   
   ## Coexistence with existing work
   
   - **#22207, #22607, #22777** — Gene Bordegaray's `Partitioning::Range` work. 
`DynamicRange` is the runtime-discovered sibling; the declarative path is 
unchanged.
   - **#23089** — `PartitionExtrema` API proposal. `DynamicRange`'s execution 
slot is the canonical consumer.
   - **#23090** — `PartitionExtrema` implementation PR. Lands the primitive the 
dynamic execution slot will read.
   - **#23026** — original parallel-window draft PR, the spike that surfaced 
both this and `PartitionExtrema`. Kept open for context; the rewrite plan 
splits it into small focused PRs of which this is one.
   - **#22395 (epic), #22397 (RepartitionExec execution slot for Range)** — the 
umbrella issues for `Partitioning::Range` end-to-end landing. A `DynamicRange` 
execution slot will share infrastructure with the eventual `Range` row-routing 
implementation.
   
   ## Additional context
   
   This issue is the discussion home for the variant. Once shape is agreed, the 
branch can be cleaned into a single PR (+285 / -3 LoC, pure addition, behavior 
mirrors `Range` at every match site — every `not_impl_err!` site for `Range` 
gets the same treatment for `DynamicRange`). Execution-slot, proto, logical 
representation, and substrait round-trip land as separate follow-ups on top, 
mirroring the cadence Gene used for `Range`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to