hcrosse opened a new issue, #21311:
URL: https://github.com/apache/datafusion/issues/21311

   ### Is your feature request related to a problem or challenge?
   
   `BatchPartitioner::partition` takes a sync `FnMut` closure, which means 
consumers that need to do I/O with the partitioned batches have to do it 
inline. In Ballista's shuffle writer, this blocks tokio worker threads because 
file I/O happens inside the closure.
   
   The workaround is to move the entire `partition` call into `spawn_blocking`, 
but that also moves the CPU-bound partitioning work off the tokio workers, 
which is wasteful.
   
   `partition_iter` already exists as a private method and returns an iterator 
over `(partition_index, RecordBatch)` pairs. DataFusion's own `RepartitionExec` 
uses it directly at `repartition/mod.rs:1391` to iterate results and send them 
through async channels. The doc comment on the method says this separation was 
intentional:
   
   > "we need to have a variant of `partition` that works w/ sync functions, 
and one that works w/ async. Using an iterator as an intermediate 
representation was the best way to achieve this"
   
   But since `partition_iter` is private, downstream crates can't use this 
pattern.
   
   ### Describe the solution you'd like
   
   Make `partition_iter` public (or add a public equivalent). The signature is 
already suitable:
   
   ```rust
   pub fn partition_iter(
       &mut self,
       batch: RecordBatch,
   ) -> Result<impl Iterator<Item = Result<(usize, RecordBatch)>> + Send + '_>
   ```
   
   This lets consumers partition on the async side and only push the I/O into 
`spawn_blocking`:
   
   ```rust
   // async side
   for (partition, batch) in partitioner.partition_iter(input_batch)? {
       tx.send((partition, batch)).await?;
   }
   
   // blocking side
   while let Some((partition, batch)) = rx.blocking_recv() {
       writers[partition].write(&batch)?;
   }
   ```
   
   ### Describe alternatives you've considered
   
   In apache/datafusion-ballista#1537 we moved both partitioning and I/O into 
`spawn_blocking` together. It works, but it's leaving performance on the table 
by running CPU work on the blocking pool.
   
   ### Additional context
   
   Ballista PR: https://github.com/apache/datafusion-ballista/pull/1537


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to