andygrove opened a new issue, #1910:
URL: https://github.com/apache/datafusion-ballista/issues/1910

   ## Is your feature request related to a problem or challenge?
   
   #1909 made uncorrelated scalar subqueries work in distributed execution by 
setting `datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery = 
false`, which makes the optimizer rewrite them to joins (the previous physical 
`ScalarSubqueryExec` cannot be serialized across Ballista's stage boundaries).
   
   That fix is **correct but not efficient**. An uncorrelated scalar subquery 
produces exactly one value, but rewriting it to a join turns it into a (cross / 
left) join against the subquery's single-row result. Compared to using the 
value as a constant, the join adds extra shuffle/build/probe work — for example 
TPC-H q11/q15/q22, where the subquery is a single aggregate that is then 
compared (`=`, `>`) against the outer rows.
   
   ## Describe the solution you'd like
   
   Execute the uncorrelated scalar subquery **first**, then substitute its 
value into the original plan:
   
   1. Detect uncorrelated scalar subqueries during distributed planning.
   2. Run each subquery's plan as its own job/stage(s) and collect its single 
scalar result (erroring if it returns more than one row, matching 
scalar-subquery semantics).
   3. Replace the `ScalarSubquery` / `ScalarSubqueryExpr` in the main plan with 
the materialized value as a literal.
   4. Plan and execute the (now subquery-free) main plan.
   
   This matches how the value is logically used — a constant — and avoids the 
join overhead entirely. It also removes the need to disable 
`enable_physical_uncorrelated_scalar_subquery`, since there would no longer be 
a `ScalarSubqueryExec` to serialize.
   
   This is essentially sequential/dependent subquery execution in the 
scheduler: the main query depends on the subquery's result, so the subquery's 
stages run to completion first and feed a constant into the dependent plan.
   
   ## Describe alternatives you've considered
   
   - **Decorrelate to a join (current behavior, #1909):** correct but adds join 
work for what is logically a constant.
   - **Keep `ScalarSubqueryExec` and serialize it across stages:** would 
require teaching `datafusion-proto` / the scheduler to carry the shared 
in-process results container across processes, which isn't possible — the value 
has to be materialized and transmitted regardless.
   
   ## Additional context
   
   Follow-on to #1909. Once this lands, the 
`enable_physical_uncorrelated_scalar_subquery = false` default set in #1909 
could be revisited.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to