avantgardnerio commented on PR #23167:
URL: https://github.com/apache/datafusion/pull/23167#issuecomment-4803250969

   Thanks again for the feedback! I think it has helped me shape this into a 
much more AQE-looking PR:
   
   0. The PR is now functional. It actually does flip the join sides. The SLT 
passes just like it did without AQE. Based on `runtime_row_count()` 
   1. `PipelineBreakerBuffer` is now `StageBoundryBuffer`, and it actually 
buffers. This allows us to insert it even on the left (streaming side) of the 
example SLT HashJoin.
   2. Each StageBoundry gets assigned a number, within a stage all leaves 
execute in parallel. Stages execute sequentially.
   3. RuntimeOptimizer calls `prime()` on each `StageBoundryBuffer` to kick it 
off in it's own tokio task, much like how `RepartitionExec` polls it's children 
today - this skips the normal DataFusion poll-based flow to execute subtrees 
incrementally and bottom up
   4. `RuntimeRule` now has the exact same signature as 
`PhysicalOptimizerRule`, hopefully showing how they can be the same (just 
requires refactoring into a common module). Rules would have to be made 
"runtime aware" one at a time, but this gives us a migration path to AQE
   5. `StageBoundryBuffer` is it's own pipeline breaker - it stores everything 
in an unbounded channel in this PR. I think in the future we could `prime()` it 
until it hits a memory threshold, then give up and release it to start 
streaming - if this happens we'd no longer be able to get run time stats and 
modify the plan, but at least it would be "no worse" than today.
   
   ```
     RTO: stage 0 ready (2 boundaries); firing 1 rule(s) before release
     SwapBuildSideIfInverted: flipping HashJoinExec — current build (left) = 
100 rows, probe (right) = 5 rows.
     RTO: stage 0 released; downstream consumers can now drain ...
   ```
   
   ```
   01)RuntimeOptimizerExec
   02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, 
group_key@0)], projection=[group_key@2, sum_payload@3, payload@1]
   03)----StageBoundaryBuffer: stage=0
   04)------CoalescePartitionsExec
   05)--------DataSourceExec: partitions=4, partition_sizes=[1, 0, 0, 0]
   06)----StageBoundaryBuffer: stage=0
   07)------ProjectionExec: expr=[group_key@0 as group_key, sum(big.payload)@1 
as sum_payload]
   08)--------AggregateExec: mode=FinalPartitioned, gby=[group_key@0 as 
group_key], aggr=[sum(big.payload)]
   09)----------RepartitionExec: partitioning=Hash([group_key@0], 4), 
input_partitions=4
   10)------------AggregateExec: mode=Partial, gby=[group_key@0 as group_key], 
aggr=[sum(big.payload)]
   11)--------------DataSourceExec: partitions=4, partition_sizes=[4, 3, 3, 3]
   ```
   
   Hopefully this makes clear that the PR is a migration path, a generalizable 
solution. So what's missing? If we keep going this route would we be able to 1. 
land it today, 2. eventually get to "full" AQE?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to