gabotechs commented on code in PR #21825:
URL: https://github.com/apache/datafusion/pull/21825#discussion_r3153140020
##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -1175,6 +1181,70 @@ impl DisplayAs for ParquetSink {
}
}
+/// Wraps any [`Future`] and accumulates the time spent actively executing
+/// inside `poll()` calls into `elapsed_compute`. Time between polls (when
+/// the future is suspended waiting for I/O or an upstream channel) is not
+/// measured, so only CPU time is captured.
+///
+/// Used by the sequential write path so that `elapsed_compute` reflects
+/// pure Arrow→Parquet encoding time, automatically excluding object-store
+/// I/O latency without requiring a separate subtraction step.
+///
+/// Note: uses `pin-project` rather than `pin-project-lite` in order to
+/// support `PinnedDrop`, which ensures accumulated time is flushed even
+/// if the future is cancelled (dropped before completion).
+#[pin_project(PinnedDrop)]
+struct ElapsedComputeFuture<T> {
Review Comment:
Nice! as this is pretty general, I think a good place to put this is
`datafusion/datafusion/physical-expr-common/src/metrics/elapsed_compute.rs`,
and we can import it here.
I see other places in the codebase that are actually doing something similar:
-
https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/memory.rs#L404-L404
I'd do a couple of things to the doc comment:
1) State that this will measure all the time the future spends polling, so
everything happening under the `Future::poll` scope is measured.
2) As this will be exposed as part of the public API, Some doc comments to
`ElapsedComputeFutureExt::with_elapsed_compute` could be useful.
##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -1175,6 +1181,70 @@ impl DisplayAs for ParquetSink {
}
}
+/// Wraps any [`Future`] and accumulates the time spent actively executing
+/// inside `poll()` calls into `elapsed_compute`. Time between polls (when
+/// the future is suspended waiting for I/O or an upstream channel) is not
+/// measured, so only CPU time is captured.
+///
+/// Used by the sequential write path so that `elapsed_compute` reflects
+/// pure Arrow→Parquet encoding time, automatically excluding object-store
+/// I/O latency without requiring a separate subtraction step.
+///
+/// Note: uses `pin-project` rather than `pin-project-lite` in order to
+/// support `PinnedDrop`, which ensures accumulated time is flushed even
+/// if the future is cancelled (dropped before completion).
+#[pin_project(PinnedDrop)]
+struct ElapsedComputeFuture<T> {
+ #[pin]
+ inner: T,
+ curr: Duration,
+ elapsed_compute: Time,
+}
+
+#[pinned_drop]
+impl<T> PinnedDrop for ElapsedComputeFuture<T> {
+ fn drop(self: Pin<&mut Self>) {
+ if self.curr > Duration::default() {
+ let self_projected = self.project();
+ self_projected
+ .elapsed_compute
+ .add_duration(*self_projected.curr);
+ }
+ }
+}
+
+impl<O, F: Future<Output = O>> Future for ElapsedComputeFuture<F> {
+ type Output = O;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let self_projected = self.project();
+ let start = Instant::now();
+ let result = self_projected.inner.poll(cx);
+ *self_projected.curr += start.elapsed();
+ if result.is_ready() {
Review Comment:
Tracking the consumed time is done separately from the `elapsed_compute:
Time` metric, and it's only on a ready future or on Drop that the two
`elapsed_compute: Time` and `curr: Duration` are synced.
This is for avoiding doing atomic operations under a `Future::poll`
implementation that can afford to perform mutations.
- The good thing about this, is better performance as no synchronization
primitives sit in between future polls
- The bad thing is, the reported `elapsed_compute` is only available at the
end, once the future has fully finished.
I'd document this decision so that if someone thinks there's a better
approach they have all the cards on the table.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]