This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 218d2926e227 [SPARK-52809][SQL] Don't hold reader and iterator references for all partitions in task completion listeners for metric update 218d2926e227 is described below commit 218d2926e227b295f1e5682772f6cf4839a0481c Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Wed Jul 16 00:09:26 2025 -0700 [SPARK-52809][SQL] Don't hold reader and iterator references for all partitions in task completion listeners for metric update This patch adds only one task completion listener for metric updating for `DataSourceRDD`, instead of adding separate one for each partition iterator. For each partition iterator, currently we add one task completion listener used to update final metrics if the task is stopped early. In the listener, the reader and iterator are held. So if the partition is normally exhausted, the references cannot be released early. It is a problem especially if the references are heavy as reported by https://github.com/apache/iceberg/issues/13297. Since the purpose of the callback is to update the final metrics if the task is stopped early, it means that we only need to do it for the last partition iterator. So we don't need set up a listener for each partition iterator. Thus, we can set up just one listener for all iterator. Once we advance to next partition iterator, we can update the update target (reader and partition iterator). No Existing tests. No Closes #51503 from viirya/fix_metric_callback. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> (cherry picked from commit cea0051f71e8b50e8b9e19fb0ff4fb100e82e3d0) Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- .../execution/datasources/v2/DataSourceRDD.scala | 40 ++++++++++++++++++---- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala index 67e77a97865d..36872f232e7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala @@ -60,6 +60,14 @@ class DataSourceRDD( private var currentIter: Option[Iterator[Object]] = None private var currentIndex: Int = 0 + private val partitionMetricCallback = new PartitionMetricCallback(customMetrics) + + // In case of early stopping before consuming the entire iterator, + // we need to do one more metric update at the end of the task. + context.addTaskCompletionListener[Unit] { _ => + partitionMetricCallback.execute() + } + override def hasNext: Boolean = currentIter.exists(_.hasNext) || advanceToNextIter() override def next(): Object = { @@ -86,13 +94,10 @@ class DataSourceRDD( new PartitionIterator[InternalRow](rowReader, customMetrics)) (iter, rowReader) } - context.addTaskCompletionListener[Unit] { _ => - // In case of early stopping before consuming the entire iterator, - // we need to do one more metric update at the end of the task. - CustomMetrics.updateMetrics(reader.currentMetricsValues, customMetrics) - iter.forceUpdateMetrics() - reader.close() - } + + // Once we advance to the next partition, update the metric callback for early finish + partitionMetricCallback.advancePartition(iter, reader) + currentIter = Some(iter) hasNext } @@ -107,6 +112,27 @@ class DataSourceRDD( } } +private class PartitionMetricCallback + (customMetrics: Map[String, SQLMetric]) { + private var iter: MetricsIterator[_] = null + private var reader: PartitionReader[_] = null + + def advancePartition(iter: MetricsIterator[_], reader: PartitionReader[_]): Unit = { + execute() + + this.iter = iter + this.reader = reader + } + + def execute(): Unit = { + if (iter != null && reader != null) { + CustomMetrics.updateMetrics(reader.currentMetricsValues, customMetrics) + iter.forceUpdateMetrics() + reader.close() + } + } +} + private class PartitionIterator[T]( reader: PartitionReader[T], customMetrics: Map[String, SQLMetric]) extends Iterator[T] { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org