2010YOUY01 opened a new issue, #23174:
URL: https://github.com/apache/datafusion/issues/23174

   ### Is your feature request related to a problem or challenge?
   
   Original discussion: 
https://github.com/apache/datafusion/pull/23026#issuecomment-4793157335
   
   Summary: DataFusion mostly uses repartition-based parallelism today, but at 
some point we need to introduce intra-operator parallelism, and we have to do 
that carefully for performance.
   
   The existing `SortExec` has already exposed some internal worker 
parallelism: it is possible to have a large number of concurrent workers for 
local sorting.
   - 
https://github.com/apache/datafusion/blob/a00f7499e139c29e13b8b94baba262435bef417b/datafusion/physical-plan/src/sorts/sort.rs#L626-L634
   
   This issue explains the background and proposes some ideas for improving 
this support.
   
   ### What is internal worker parallelism
   DataFusion mostly uses repartition based parallelism, each partition has 
independent data, and we use 1 CPU core to process one partition, here is a 
parallel aggregation query example:
   
   <img width="1213" height="760" alt="Image" 
src="https://github.com/user-attachments/assets/4484b7fc-002c-4d17-adf0-5046e546fe15";
 />
   
   For certain workloads, the assumptions for repartition are not ideal, here 
are 3 motivating examples
   
   #### Motivating Example 1: memory pressure case
   Let's say we're doing a large sort (data size >> memory), on a machine with 
32 cores, 64GB memory. The default setting will execute it with 32 global 
partitions, and with each partition a classic external sort algorithm is 
executed (local sort, spill disk, and finally read back and sort-preserving 
merge)
   The issue is that per-partition memory budget is low, the spilling might 
create smaller sorted runs, and the end-to-end execution requires extra spills, 
reading back, and merging smaller files.
   A more ideal plan shape is:
   - the scanner keep 32 partitions, since they're not memory intensive and 
scalable with number of CPU cores
   - SortExec shrink to 8 partitions, with 4 internal workers per partition. 
This gives each partition more memory budget to proceed easier; also there are 
efficient algorithms to parallelize sort and sort-preserving merge within 
partition.
   
   <img width="1289" height="770" alt="Image" 
src="https://github.com/user-attachments/assets/213cf7fc-64a6-4a8b-85f1-b2757c22d2e9";
 />
   
   #### Motivating Example 2: segment-tree based parallelism in window functions
   The window query in the figure is impossible to parallelize with 
repartition, because it assumes data independence among partitions, and the 
query has one global partition, and window frame changes every row.
   At the meantime, there is a very parallel algorithm if we can allow shared 
memory among partitions:
   - https://www.vldb.org/pvldb/vol8/p1058-leis.pdf
   
   Then the ideal query shape become
   ```
   (any downstream exec)
   -- RepartitionExec(round-robin on batch, input_partitions=1, 
otuput_partitions=32)
   ---- WindowExec(partition=1, internal_parallelism=32)
   ------ CoalescePartitionExec(input_partitions=32, output_partitions=1)
   -------- CsvExec(partition=32, internal_parallelism=1)
   ```
   
   <img width="1322" height="745" alt="Image" 
src="https://github.com/user-attachments/assets/105395d1-f7a1-41d5-b4c8-9be4753f9f04";
 />
   
   ### Motivating Example 3
   I haven't checked this PR carefully yet, but it seems also try to introduce 
intra operator parallelism:
   https://github.com/apache/datafusion/pull/23124
   
   ### Describe the solution you'd like
   
   - Establish conventions for intra-operator parallelism. For example, each 
execution plan stage may want to maintain a similar total concurrency level: 
`partition_count * internal_workers_per_partition`. See the CsvExec + 
WindowExec example above.
   - Improve `Explain` output for internal parallelism. The existing 
single-partition `SortExec` can still use internal parallelism, but the plan 
currently looks serial, which makes it hard to inspect potential performance 
issues.
   
   ### Describe alternatives you've considered
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to