gabotechs commented on code in PR #21825:
URL: https://github.com/apache/datafusion/pull/21825#discussion_r3136345982
##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -1599,6 +1650,7 @@ fn spawn_rg_join_and_finalize_task(
/// on the next row group in parallel. So, parquet serialization is
parallelized
/// across both columns and row_groups, with a theoretical max number of
parallel tasks
/// given by n_columns * num_row_groups.
+#[expect(clippy::too_many_arguments)]
fn spawn_parquet_parallel_serialization_task(
Review Comment:
oh no... clippy kicked in.
I see this file was a bit messy before.
##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -1584,6 +1634,7 @@ fn spawn_rg_join_and_finalize_task(
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
let encoded_size = writer.get_estimated_total_bytes();
rg_reservation.grow(encoded_size);
+ let _timer = encoding_time.timer();
finalized_rg.push(writer.close()?);
Review Comment:
Isn't this just measuring the `writer.close()` method here? is there nothing
to be measured in the tasks above?
EDIT: I see now that the `encoding_time` is also passed while instantiating
the tasks. It's a bit messy, but I think it should work.
##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -1339,8 +1375,10 @@ impl FileSink for ParquetSink {
.with_category(MetricCategory::Bytes)
.global_counter("bytes_written");
let elapsed_compute =
MetricBuilder::new(&self.metrics).elapsed_compute(0);
-
- let write_start = datafusion_common::instant::Instant::now();
+ // Sequential path only: not registered in MetricsSet — used
internally to
+ // compute elapsed_compute = total_write_time - io_time.
+ let total_write_time = Time::new();
+ let io_time = Time::new();
Review Comment:
One of the things I see here is that the `elapsed_compute` metrics is
completely wrong.
Measuring IO and compensating with `total_write_time` sounds like it can
work, but it sounds like a patch for compensating the fact that
`elapsed_compute` is not being measured correctly. It would be interesting to
see if we can measure `elapsed_compute` correctly on the first place.
##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -1339,8 +1375,10 @@ impl FileSink for ParquetSink {
.with_category(MetricCategory::Bytes)
.global_counter("bytes_written");
let elapsed_compute =
MetricBuilder::new(&self.metrics).elapsed_compute(0);
-
- let write_start = datafusion_common::instant::Instant::now();
+ // Sequential path only: not registered in MetricsSet — used
internally to
+ // compute elapsed_compute = total_write_time - io_time.
+ let total_write_time = Time::new();
+ let io_time = Time::new();
Review Comment:
If you think this can help avoid the mess, it might be worth trying,
otherwise, what you have here is an improvement.
##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -1599,6 +1650,7 @@ fn spawn_rg_join_and_finalize_task(
/// on the next row group in parallel. So, parquet serialization is
parallelized
/// across both columns and row_groups, with a theoretical max number of
parallel tasks
/// given by n_columns * num_row_groups.
+#[expect(clippy::too_many_arguments)]
fn spawn_parquet_parallel_serialization_task(
Review Comment:
Here is one idea about how to make it better:
I see a pattern where `file_write_tasks` is set only once on the
`FileSink::spawn_writer_tasks_and_join` implementation, and then task are
spawned for it.
I see that several of the arguments passed to these longs functions are not
per-path, and are instead global to the whole
`FileSink::spawn_writer_tasks_and_join` call, for example: `schema`, `props`,
`skip_arrow_metadata`, `parallel_options`, `pool`.
One idea that comes to mind is to gather all these "contextual" variables in
a struct that is responsible for task spawning, for example:
```rust
struct FileWriteTasks {
file_write_tasks: JoinSet<
std::result::Result<(Path, ParquetMetaData), DataFusionError>,
>,
schema: SchemaRef,
properties: WriterProperties,
parquet_options: ParallelParquetWriterOptions,
pool: Arc<dyn MemoryPool>
}
impl FileWriteTasks {
fn spawn_task_foo(&mut self) {
self.file_write_tasks.spawn(async move {
...
})
}
fn spawn_task_bar(&mut self) {
self.file_write_tasks.spawn(async move {
...
})
}
}
```
That would allow the contextual variables to not need to be threaded around
all the way down the callstack.
Actually, your new `encoding_time`, etc... variables fall in this category
of "contextual variables" that are in scope for the whole duration of
`FileSink::spawn_writer_tasks_and_join`, so they could be just fields of
`FileWriteTasks`
##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -1339,8 +1375,10 @@ impl FileSink for ParquetSink {
.with_category(MetricCategory::Bytes)
.global_counter("bytes_written");
let elapsed_compute =
MetricBuilder::new(&self.metrics).elapsed_compute(0);
-
- let write_start = datafusion_common::instant::Instant::now();
+ // Sequential path only: not registered in MetricsSet — used
internally to
+ // compute elapsed_compute = total_write_time - io_time.
+ let total_write_time = Time::new();
+ let io_time = Time::new();
Review Comment:
In https://github.com/datafusion-contrib/datafusion-distributed we have this:
https://github.com/datafusion-contrib/datafusion-distributed/blob/2b6c918e0636e0645fafcc109252262f94d6c452/src/worker/worker_connection_pool.rs#L455-L486
Which allows to wrap any arbitrary future, something like this:
```rust
async move {
// do some async work
}.with_elapsed_compute(elapsed_compute)
```
I could imagine how something similar could just be used here for properly
measuring `elapsed_compute`, although the change would be bigger.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]