This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 97adac6fc479 [SPARK-52516][SQL] Don't hold previous iterator reference after advancing to next file in ParquetPartitionReaderFactory 97adac6fc479 is described below commit 97adac6fc479e5862999c3e424b9834edbec4b3e Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Fri Jul 18 01:01:54 2025 -0700 [SPARK-52516][SQL] Don't hold previous iterator reference after advancing to next file in ParquetPartitionReaderFactory ### What changes were proposed in this pull request? This patch adds only one task completion listener for closing iterators in `ParquetPartitionReaderFactory`, instead of adding separate one for each file iterator. ### Why are the changes needed? For each file iterator, currently we add one task completion listener used to update closing the iterator when the task is finished. In the listener, the iterator reference is held. So if the file is normally exhausted, the reference cannot be released early. It is a problem especially if the reference are heavy as reported by https://github.com/apache/iceberg/issues/13297. Similar to #51503, we don't need set up a listener for each file iterator. Thus, we can set up just one listener for all iterator. Once we advance to next file, we can update the update target to new iterator. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #51528 from viirya/fix_iter_callback. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> (cherry picked from commit 197c9d6051efcc57f984a9497975d5723e1f2dac) Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- .../v2/parquet/ParquetPartitionReaderFactory.scala | 38 ++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index ac15456f0c3d..70ae8068a03a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -84,6 +84,8 @@ case class ParquetPartitionReaderFactory( private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead private val int96RebaseModeInRead = options.int96RebaseModeInRead + private val parquetReaderCallback = new ParquetReaderCallback() + private def getFooter(file: PartitionedFile): ParquetMetadata = { val conf = broadcastedConf.value.value if (aggregation.isDefined || enableVectorizedReader) { @@ -309,7 +311,8 @@ case class ParquetPartitionReaderFactory( reader, readDataSchema) val iter = new RecordReaderIterator(readerWithRowIndexes) // SPARK-23457 Register a task completion listener before `initialization`. - taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + parquetReaderCallback.advanceFile(iter) + taskContext.foreach(parquetReaderCallback.initIfNotAlready) readerWithRowIndexes } @@ -337,8 +340,39 @@ case class ParquetPartitionReaderFactory( capacity) val iter = new RecordReaderIterator(vectorizedReader) // SPARK-23457 Register a task completion listener before `initialization`. - taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + parquetReaderCallback.advanceFile(iter) + taskContext.foreach(parquetReaderCallback.initIfNotAlready) logDebug(s"Appending $partitionSchema $partitionValues") vectorizedReader } } + +/** + * A callback class to handle the cleanup of Parquet readers. + * + * This class is used to ensure that the Parquet readers are closed properly when the task + * completes, and it also allows for the initialization of the reader callback only once per task. + */ +private class ParquetReaderCallback extends Serializable { + private var init: Boolean = false + private var iter: RecordReaderIterator[_] = null + + def initIfNotAlready(taskContext: TaskContext): Unit = { + if (!init) { + taskContext.addTaskCompletionListener[Unit](_ => closeCurrent()) + init = true + } + } + + def advanceFile(iter: RecordReaderIterator[_]): Unit = { + closeCurrent() + + this.iter = iter + } + + def closeCurrent(): Unit = { + if (iter != null) { + iter.close() + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org