fred1268 opened a new issue, #21797:
URL: https://github.com/apache/datafusion/issues/21797

   ### Describe the bug
   
   `ParquetSink` registers an `elapsed_compute` metric, but its value is a 
single wall-clock timer spanning the entire write phase — from before writer 
tasks are spawned to after they all join (`file_format.rs`, 
`spawn_writer_tasks_and_join`):
   
   ```rust
   let write_start = Instant::now();
   // spawn tasks — each calls writer.write(&batch) then writer.close()
   // join all tasks
   elapsed_compute.add_elapsed(write_start);
   ```
   
   This means `elapsed_compute` conflates three fundamentally different 
contributors:
   
   1. **Upstream backpressure** — time blocked on `rx.recv().await` waiting for 
the upstream plan to produce the next `RecordBatch`
   2. **CPU encoding** — Arrow → Parquet serialisation and column compression
   3. **Object-store I/O** — `BufWriter` flushes at row-group boundaries and 
the final `close()`
   
   `DataSinkExec::metrics()` is a transparent passthrough to 
`ParquetSink::metrics()`, so this value propagates unchanged up the plan tree. 
Any metrics walker that sums `ElapsedCompute` across operators will 
misattribute I/O latency and upstream wait time as compute time.
   
   ### To Reproduce
   
   _No response_
   
   ### Expected behavior
   
   `elapsed_compute` for `ParquetSink` should reflect only CPU time (Arrow → 
Parquet encoding and compression), consistent with how `elapsed_compute` is 
used in other operators. I/O time and upstream wait should either be tracked 
separately or excluded.
   
   ### Additional context
   
   `ParquetSink` has two write paths: a **sequential path** (one 
`AsyncArrowWriter` per file, active when `allow_single_file_parallelism = 
false`) and a **parallel path** (`output_single_parquet_file_parallelized`, 
active when `allow_single_file_parallelism = true`,
   which is the default).
   
   **Sequential path.** `AsyncArrowWriter` encodes batches one at a time. 
Encoding is synchronous (`sync_writer.write(batch)?`) and I/O is async, 
occurring only at row-group flush boundaries and on `close()`. The two phases 
are clearly separated inside the crate, but `sync_writer` is a private field — 
DataFusion cannot time them independently without changes to `arrow-rs` or an 
interception point at the `AsyncFileWriter` layer.
   
   **Parallel path.** Each column in each row group is encoded in its own tokio 
task. A separate task assembles completed row groups and flushes them to the 
object store via a raw `Box<dyn AsyncWrite>`. CPU encoding is distributed 
across concurrent tasks and the I/O and CPU work are interleaved within the 
same concatenation loop — there is no clean trait boundary to intercept.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to