fred1268 opened a new issue, #21797: URL: https://github.com/apache/datafusion/issues/21797
### Describe the bug `ParquetSink` registers an `elapsed_compute` metric, but its value is a single wall-clock timer spanning the entire write phase — from before writer tasks are spawned to after they all join (`file_format.rs`, `spawn_writer_tasks_and_join`): ```rust let write_start = Instant::now(); // spawn tasks — each calls writer.write(&batch) then writer.close() // join all tasks elapsed_compute.add_elapsed(write_start); ``` This means `elapsed_compute` conflates three fundamentally different contributors: 1. **Upstream backpressure** — time blocked on `rx.recv().await` waiting for the upstream plan to produce the next `RecordBatch` 2. **CPU encoding** — Arrow → Parquet serialisation and column compression 3. **Object-store I/O** — `BufWriter` flushes at row-group boundaries and the final `close()` `DataSinkExec::metrics()` is a transparent passthrough to `ParquetSink::metrics()`, so this value propagates unchanged up the plan tree. Any metrics walker that sums `ElapsedCompute` across operators will misattribute I/O latency and upstream wait time as compute time. ### To Reproduce _No response_ ### Expected behavior `elapsed_compute` for `ParquetSink` should reflect only CPU time (Arrow → Parquet encoding and compression), consistent with how `elapsed_compute` is used in other operators. I/O time and upstream wait should either be tracked separately or excluded. ### Additional context `ParquetSink` has two write paths: a **sequential path** (one `AsyncArrowWriter` per file, active when `allow_single_file_parallelism = false`) and a **parallel path** (`output_single_parquet_file_parallelized`, active when `allow_single_file_parallelism = true`, which is the default). **Sequential path.** `AsyncArrowWriter` encodes batches one at a time. Encoding is synchronous (`sync_writer.write(batch)?`) and I/O is async, occurring only at row-group flush boundaries and on `close()`. The two phases are clearly separated inside the crate, but `sync_writer` is a private field — DataFusion cannot time them independently without changes to `arrow-rs` or an interception point at the `AsyncFileWriter` layer. **Parallel path.** Each column in each row group is encoded in its own tokio task. A separate task assembles completed row groups and flushes them to the object store via a raw `Box<dyn AsyncWrite>`. CPU encoding is distributed across concurrent tasks and the I/O and CPU work are interleaved within the same concatenation loop — there is no clean trait boundary to intercept. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
