wForget opened a new pull request, #4599:
URL: https://github.com/apache/datafusion-comet/pull/4599

   ## Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and 
enhancements and this helps us generate change logs for our releases. You can 
link an issue to this PR using the GitHub syntax. For example `Closes #123` 
indicates that this PR will close issue #123.
   -->
   
   Closes #.
   
   ## Rationale for this change
   
   Two improvements to the native shuffle writer:
   
   1. Specify  buffer size for `output_data` buffer writer: `shuffle_write` 
uses the default 8 KB buffer size for the output data buffer writer, which can 
lead to excessive system calls. This change explicitly sets `write_buffer_size` 
to improve write efficiency.
   2. Add `interleave_time` metric: `interleave_record_batch` accounts for a 
significant portion of the shuffle execution time, so I'd like to add metrics 
for it.
   
   ## What changes are included in this PR?
   
   This PR includes the following changes:
   
   - adds a new `interleave_time` metric to `ShufflePartitionerMetrics`
   - calculate `interleave_time` during the `PartitionedBatchIterator.next` 
call.
   - changes the final shuffle data file writer to use 
`BufWriter::with_capacity(self.write_buffer_size, ...)` instead of the default 
buffer size
   
   
   ## How are these changes tested?
   
   Run shuffle benchmark locally:
   
   ```
   cargo flamegraph --root --release --features shuffle-bench --bin 
shuffle_bench -- \
     --input benchmark_data/lineitem.parquet \
     --partitions 200 \
     --codec lz4 \
     --hash-columns 0,3 \
     --memory-limit 2147483648 \
     --output-dir comet_shuffle_bench
   ```
   
   **Test for specify  buffer size for `output_data` buffer writer**
   before:
   ```
   === Results ===
   Write:
     avg time:         42.037s
     throughput:       1,426,973 rows/s (total across 1 tasks)
     output size:      3.80 GiB
   
   Input Metrics (last iteration):
     time_elapsed_scanning_total: 239.870s
     time_elapsed_opening: 0.003s
     time_elapsed_processing: 7.355s
     metadata_load_time: 0.000s
     output_bytes: 33.88 GiB
     bloom_filter_eval_time: 0.000s
     output_rows: 119,972,104
     bytes_scanned: 2.37 GiB
     page_index_eval_time: 0.000s
     time_elapsed_scanning_until_data: 0.349s
     output_batches: 14,956
     statistics_eval_time: 0.000s
     elapsed_compute: 0.000s
     row_pushdown_eval_time: 0.000s
   
   Shuffle Metrics (last iteration):
     input batches:    7,478
     repart time:      0.668s (1.6%)
     encode time:      13.311s (31.7%)
     write time:       16.923s (40.3%)
     spill count:      8
     spilled bytes:    3.32 GiB
     data size:        16.95 GiB
   dtrace: pid 27204 has exited
   writing flamegraph to "flamegraph.svg"
   ```
   
   after:
   ```
   === Results ===
   Write:
     avg time:         35.921s
     throughput:       1,669,937 rows/s (total across 1 tasks)
     output size:      3.80 GiB
   
   Input Metrics (last iteration):
     output_bytes: 33.88 GiB
     time_elapsed_scanning_until_data: 0.265s
     metadata_load_time: 0.001s
     bytes_scanned: 2.37 GiB
     time_elapsed_scanning_total: 234.757s
     bloom_filter_eval_time: 0.000s
     time_elapsed_opening: 0.003s
     statistics_eval_time: 0.000s
     time_elapsed_processing: 5.214s
     row_pushdown_eval_time: 0.000s
     elapsed_compute: 0.000s
     output_rows: 119,972,104
     page_index_eval_time: 0.000s
     output_batches: 14,956
   
   Shuffle Metrics (last iteration):
     input batches:    7,478
     repart time:      0.637s (1.8%)
     encode time:      13.048s (36.3%)
     write time:       11.051s (30.8%)
     spill count:      8
     spilled bytes:    3.32 GiB
     data size:        16.95 GiB
   dtrace: pid 37062 has exited
   writing flamegraph to "flamegraph.svg"
   ```
   
   **Test for add `interleave_time` metric**
   after this:
   ```
   === Results ===
   Write:
     avg time:         32.851s
     throughput:       1,826,014 rows/s (total across 1 tasks)
     output size:      3.80 GiB
   
   Input Metrics (last iteration):
     page_index_eval_time: 0.000s
     elapsed_compute: 0.000s
     time_elapsed_opening: 0.002s
     output_bytes: 33.88 GiB
     row_pushdown_eval_time: 0.000s
     output_batches: 14,956
     statistics_eval_time: 0.000s
     bytes_scanned: 2.37 GiB
     bloom_filter_eval_time: 0.000s
     time_elapsed_processing: 5.287s
     metadata_load_time: 0.000s
     output_rows: 119,972,104
     time_elapsed_scanning_until_data: 0.261s
     time_elapsed_scanning_total: 226.113s
   
   Shuffle Metrics (last iteration):
     input batches:    7,478
     repart time:      0.665s (2.0%)
     interleave time:  6.796s (20.7%)
     encode time:      12.600s (38.4%)
     write time:       8.844s (26.9%)
     spill count:      8
     spilled bytes:    3.32 GiB
     data size:        16.95 GiB
   dtrace: pid 39163 has exited
   writing flamegraph to "flamegraph.svg"
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to