avantgardnerio opened a new issue, #23194: URL: https://github.com/apache/datafusion/issues/23194
**Is your feature request related to a problem or challenge?** DataFusion makes all execution-plan choices at planning time from static statistics, which are frequently `Inexact` or `Absent` (e.g. after a `GROUP BY`, a selective filter, or anything involving derived columns). A wide set of optimizations is blocked by this — the planner has to guess at runtime cardinality and a bad guess costs orders of magnitude. Concrete decisions that need runtime stats: - **Build-side swap for hash joins** — JoinSelection picks build/probe from estimates; when wrong, the build side is too big. - **Dynamic range repartitioning** — parallel sort and parallel window need range boundaries chosen from the actual data distribution, not estimates. - **Parallel window functions** — partitioning a window operator across threads only pays when partitions are roughly balanced; runtime property. - **Parallel sort** — pick the right degree of parallelism once we know how much data there is. - **Post-shuffle partition coalescing** — collapse many tiny partitions (over-estimated repartition) into a smaller number of right-sized ones. - **Skew handling** — detect a hot partition and split it across workers. - **Adaptive aggregation strategy** — pick between hash and sort aggregation, or between `Partial` and `Single`, based on observed cardinality. - **Dynamic filter pushdown from build to probe** — generate a filter from the materialized build side and push it into the probe-side scan. Each of these needs the same primitive: read runtime stats from a completed sub-plan, then re-run a planning decision against the up-to-date plan. **Describe the solution you'd like** PoC in #23167. The shape of the solution: - **`StageBoundaryBuffer`** — an `ExecutionPlan` operator that materializes its input, counts rows as the drain task ferries batches through, and exposes the count via `runtime_row_count` (long-term, fold into `partition_statistics` returning `Precision::Exact`). It IS a pipeline breaker by construction — no need to find a natural one inside the operator being adapted. - **Per-decision insertion rule** — `InsertHashJoinBoundaries` (the first, in the PoC) wraps each HashJoin input. Future adaptive optimizations each get their own targeted insertion rule (`InsertWindowBoundaries`, `InsertRangePartitioningBoundaries`, etc.) — they don't need to know about each other. - **`RuntimeOptimizerExec`** — single always-on wrapper at the plan root. On each stage-completion event, fires registered runtime rules against the current plan, replaces the plan with each rule's output, then releases the stage's boundaries so downstream consumers can drain. - **`RuntimeRule` trait with identical signature to `PhysicalOptimizerRule::optimize`.** They live in separate traits today only because `physical-plan` cannot depend on `physical-optimizer` (the dependency runs the other way). End-state: a single trait. Existing rules become runtime-aware by reading boundary state from the plan tree they receive — `JoinSelection` itself can run against a post-breaker plan with no logic change, only fresher numbers. - **Stages execute sequentially; boundaries within a stage execute in parallel** via spawned drain tasks (matching `RepartitionExec`'s spawn model). - **Memory bounded by the materialized side, with a spill-or-stream fallback** — under `MemoryPool` pressure, spill to disk if available; otherwise release the boundary early and let the rule see "overflowed" — the query still runs, we just lose the adaptive decision (no worse than the pre-AQE baseline). The PoC demonstrates the build-side-swap case end-to-end. Every other item in the problem list is a ~30-line targeted insertion rule on the same infrastructure. **Describe alternatives you've considered** - **Status quo (static stats only).** Decisions are made at plan time from `Inexact` estimates. Bad estimates produce slow queries. No path to win on the optimizations listed above. - **Spark-style full re-planning at shuffle boundaries.** Run the full optimizer on the unexecuted portion of the plan after each shuffle finishes. Works in Spark because shuffle is a heavyweight stage barrier. In a streaming/in-process engine like DataFusion, full re-planning at every potential decision point is too coarse. Inserting a targeted pipeline breaker exactly where stats are needed is finer-grained and matches the streaming model. - **Stat updates without explicit boundaries.** Have every operator update its `partition_statistics` as data flows through; rules poll and re-decide. Possible but requires contract changes across every operator and a polling-based fire mechanism. The boundary approach gives the same epistemic guarantee with a single new operator and event-driven firing. **Additional context** PoC PR: #23167. The runtime swap on a real query is observable via `RUST_LOG=info`: ``` RTO: stage 0 ready (2 boundaries); firing 1 rule(s) before release SwapBuildSideIfInverted: flipping HashJoinExec — current build (left) = 100 rows, probe (right) = 5 rows. RTO: stage 0 released; downstream consumers can now drain ... ``` Spark AQE for context: https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
