hcrosse commented on code in PR #1537:
URL:
https://github.com/apache/datafusion-ballista/pull/1537#discussion_r3028756603
##########
ballista/core/src/execution_plans/shuffle_writer.rs:
##########
@@ -255,96 +252,114 @@ impl ShuffleWriterExec {
}
Some(Partitioning::Hash(exprs, num_output_partitions)) => {
- // we won't necessary produce output for every possible
partition, so we
- // create writers on demand
- let mut writers: Vec<Option<WriteTracker>> = vec![];
- for _ in 0..num_output_partitions {
- writers.push(None);
- }
-
- let mut partitioner =
BatchPartitioner::new_hash_partitioner(
- exprs,
- num_output_partitions,
- write_metrics.repart_time.clone(),
- );
-
- while let Some(result) = stream.next().await {
- let input_batch = result?;
-
- write_metrics.input_rows.add(input_batch.num_rows());
-
- partitioner.partition(
- input_batch,
- |output_partition, output_batch| {
- // partition func in datafusion make sure not
write empty output_batch.
- let timer = write_metrics.write_time.timer();
- match &mut writers[output_partition] {
- Some(w) => {
- w.num_batches += 1;
- w.num_rows += output_batch.num_rows();
- w.writer.write(&output_batch)?;
- }
- None => {
- let mut path = path.clone();
-
path.push(format!("{output_partition}"));
- std::fs::create_dir_all(&path)?;
-
- path.push(format!(
- "data-{input_partition}.arrow"
- ));
- debug!("Writing results to {path:?}");
-
- let options =
IpcWriteOptions::default()
- .try_with_compression(Some(
- CompressionType::LZ4_FRAME,
- ))?;
-
- let file =
-
BufWriter::new(File::create(path.clone())?);
- let mut writer =
- StreamWriter::try_new_with_options(
- file,
- stream.schema().as_ref(),
- options,
- )?;
-
- writer.write(&output_batch)?;
- writers[output_partition] =
Some(WriteTracker {
- num_batches: 1,
- num_rows: output_batch.num_rows(),
- writer,
- path,
- });
+ let schema = stream.schema();
+ let (tx, mut rx) =
tokio::sync::mpsc::channel::<RecordBatch>(2);
+ let write_time = write_metrics.write_time.clone();
+ let repart_time = write_metrics.repart_time.clone();
+ let output_rows = write_metrics.output_rows.clone();
+
+ let handle = tokio::task::spawn_blocking(move || {
+ let mut writers: Vec<Option<WriteTracker>> =
+ (0..num_output_partitions).map(|_| None).collect();
Review Comment:
Applied this but had to revert when I actually compiled locally. `vec![None;
N]` needs `Clone` on the element type, and `WriteTracker` holds a
`StreamWriter<BufWriter<File>>` which isn't cloneable. Kept the map/collect
form instead
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]