fred1268 opened a new pull request, #21825:
URL: https://github.com/apache/datafusion/pull/21825

   ## Which issue does this PR close?
   
   - Closes #21797.
   
   ## Rationale for this change
   
   `ParquetSink` registered an `elapsed_compute` metric using a single 
wall-clock timer that spanned the entire write operation — upstream batch wait, 
CPU Arrow→Parquet encoding, and object-store I/O all rolled into one number. 
This
   made the metric misleading: it inflated `elapsed_compute` with I/O latency, 
which is inconsistent with how every other operator in DataFusion reports this 
metric (CPU time only).
   
   ## What changes are included in this PR?
   
   Two write paths are fixed independently:
   
   **Sequential path** (`allow_single_file_parallelism = false` or CDC enabled):
   - A new `TimingWriter<W>` wrapper implements `AsyncFileWriter` and records 
wall-clock time spent in I/O calls (`write` / `complete`).
   - The total time inside `writer.write()` and `writer.close()` is accumulated 
in `total_write_time`. After all tasks join, `elapsed_compute` is set to 
`total_write_time − io_time`, isolating pure Arrow→Parquet encoding time.
   
   **Parallel path** (`allow_single_file_parallelism = true`, default):
   - `encoding_time: Time` (a clone of the registered `elapsed_compute` metric) 
is threaded through the five-function call chain down to the two leaf sites: 
`writer.write()` in `column_serializer_task` and `writer.close()` in 
`spawn_rg_join_and_finalize_task`. Since `Time` is `Arc<AtomicUsize>`, all 
concurrent column tasks accumulate directly into the registered metric.
   - Note: on the parallel path, `append_to_row_group()` in 
`concatenate_parallel_row_groups` is interleaved with I/O and cannot be cleanly 
isolated. It is excluded from `elapsed_compute`. This is acceptable since it 
operates on already-encoded data and represents a small fraction of total 
encoding CPU time.
   
   ## Are these changes tested?
   
   Yes. Two tests are added/extended in 
`datafusion/core/src/dataframe/parquet.rs`:
   - `test_parquet_sink_metrics_sequential` (new): verifies `elapsed_compute > 
0` with `allow_single_file_parallelism = false`.
   - `test_parquet_sink_metrics_parallel`: extended with an `elapsed_compute > 
0` assertion (was previously missing for the parallel path).
   
   The existing `test_parquet_sink_metrics` test (parallel path, default 
config) already asserted `elapsed_compute > 0` and continues to pass.
   
   ## Are there any user-facing changes?
   
   No API changes. The `elapsed_compute` metric was already surfaced — this PR 
makes its value accurate rather than introducing a new metric.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to