avantgardnerio opened a new issue, #23194:
URL: https://github.com/apache/datafusion/issues/23194

   **Is your feature request related to a problem or challenge?**
   
   DataFusion makes all execution-plan choices at planning time from static 
statistics, which are frequently `Inexact` or `Absent` (e.g. after a `GROUP 
BY`, a selective filter, or anything involving derived columns). A wide set of 
optimizations is blocked by this — the planner has to guess at runtime 
cardinality and a bad guess costs orders of magnitude.
   
   Concrete decisions that need runtime stats:
   
   - **Build-side swap for hash joins** — JoinSelection picks build/probe from 
estimates; when wrong, the build side is too big.
   - **Dynamic range repartitioning** — parallel sort and parallel window need 
range boundaries chosen from the actual data distribution, not estimates.
   - **Parallel window functions** — partitioning a window operator across 
threads only pays when partitions are roughly balanced; runtime property.
   - **Parallel sort** — pick the right degree of parallelism once we know how 
much data there is.
   - **Post-shuffle partition coalescing** — collapse many tiny partitions 
(over-estimated repartition) into a smaller number of right-sized ones.
   - **Skew handling** — detect a hot partition and split it across workers.
   - **Adaptive aggregation strategy** — pick between hash and sort 
aggregation, or between `Partial` and `Single`, based on observed cardinality.
   - **Dynamic filter pushdown from build to probe** — generate a filter from 
the materialized build side and push it into the probe-side scan.
   
   Each of these needs the same primitive: read runtime stats from a completed 
sub-plan, then re-run a planning decision against the up-to-date plan.
   
   **Describe the solution you'd like**
   
   PoC in #23167.
   
   The shape of the solution:
   
   - **`StageBoundaryBuffer`** — an `ExecutionPlan` operator that materializes 
its input, counts rows as the drain task ferries batches through, and exposes 
the count via `runtime_row_count` (long-term, fold into `partition_statistics` 
returning `Precision::Exact`). It IS a pipeline breaker by construction — no 
need to find a natural one inside the operator being adapted.
   - **Per-decision insertion rule** — `InsertHashJoinBoundaries` (the first, 
in the PoC) wraps each HashJoin input. Future adaptive optimizations each get 
their own targeted insertion rule (`InsertWindowBoundaries`, 
`InsertRangePartitioningBoundaries`, etc.) — they don't need to know about each 
other.
   - **`RuntimeOptimizerExec`** — single always-on wrapper at the plan root. On 
each stage-completion event, fires registered runtime rules against the current 
plan, replaces the plan with each rule's output, then releases the stage's 
boundaries so downstream consumers can drain.
   - **`RuntimeRule` trait with identical signature to 
`PhysicalOptimizerRule::optimize`.** They live in separate traits today only 
because `physical-plan` cannot depend on `physical-optimizer` (the dependency 
runs the other way). End-state: a single trait. Existing rules become 
runtime-aware by reading boundary state from the plan tree they receive — 
`JoinSelection` itself can run against a post-breaker plan with no logic 
change, only fresher numbers.
   - **Stages execute sequentially; boundaries within a stage execute in 
parallel** via spawned drain tasks (matching `RepartitionExec`'s spawn model).
   - **Memory bounded by the materialized side, with a spill-or-stream 
fallback** — under `MemoryPool` pressure, spill to disk if available; otherwise 
release the boundary early and let the rule see "overflowed" — the query still 
runs, we just lose the adaptive decision (no worse than the pre-AQE baseline).
   
   The PoC demonstrates the build-side-swap case end-to-end. Every other item 
in the problem list is a ~30-line targeted insertion rule on the same 
infrastructure.
   
   **Describe alternatives you've considered**
   
   - **Status quo (static stats only).** Decisions are made at plan time from 
`Inexact` estimates. Bad estimates produce slow queries. No path to win on the 
optimizations listed above.
   - **Spark-style full re-planning at shuffle boundaries.** Run the full 
optimizer on the unexecuted portion of the plan after each shuffle finishes. 
Works in Spark because shuffle is a heavyweight stage barrier. In a 
streaming/in-process engine like DataFusion, full re-planning at every 
potential decision point is too coarse. Inserting a targeted pipeline breaker 
exactly where stats are needed is finer-grained and matches the streaming model.
   - **Stat updates without explicit boundaries.** Have every operator update 
its `partition_statistics` as data flows through; rules poll and re-decide. 
Possible but requires contract changes across every operator and a 
polling-based fire mechanism. The boundary approach gives the same epistemic 
guarantee with a single new operator and event-driven firing.
   
   **Additional context**
   
   PoC PR: #23167. The runtime swap on a real query is observable via 
`RUST_LOG=info`:
   
   ```
   RTO: stage 0 ready (2 boundaries); firing 1 rule(s) before release
   SwapBuildSideIfInverted: flipping HashJoinExec — current build (left) = 100 
rows, probe (right) = 5 rows.
   RTO: stage 0 released; downstream consumers can now drain ...
   ```
   
   Spark AQE for context: 
https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to