This is an automated email from the ASF dual-hosted git repository.

kunwp1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 31308a29eb fix: reduce catalog round-trips in 
`IcebergDocument.hasNext()` to improve result read performance (#4293)
31308a29eb is described below

commit 31308a29eb6877ac0ae19891d305472b6f80a504
Author: Kunwoo (Chris) <[email protected]>
AuthorDate: Wed Mar 18 11:52:57 2026 -0700

    fix: reduce catalog round-trips in `IcebergDocument.hasNext()` to improve 
result read performance (#4293)
    
    <!--
    Thanks for sending a pull request (PR)! Here are some tips for you:
    1. If this is your first time, please read our contributor guidelines:
    [Contributing to
    Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
      2. Ensure you have added or run the appropriate tests for your PR
      3. If the PR is work in progress, mark it a draft on GitHub.
      4. Please write your PR title to summarize what this PR proposes, we
        are following Conventional Commits style for PR titles as well.
      5. Be sure to keep the PR description updated to reflect all changes.
    -->
    
    ### What changes were proposed in this PR?
    <!--
    Please clarify what changes you are proposing. The purpose of this
    section
    is to outline the changes. Here are some tips for you:
      1. If you propose a new API, clarify the use case for a new API.
      2. If you fix a bug, you can clarify why it is a bug.
      3. If it is a refactoring, clarify what has been changed.
      3. It would be helpful to include a before-and-after comparison using
         screenshots or GIFs.
      4. Please consider writing useful notes for better and faster reviews.
    -->
    
    This PR addresses #4289 by optimizing `IcebergDocument.hasNext()` to
    minimize redundant catalog round-trips. By introducing a guard
    condition, we ensure `seekToUsableFile()` and its subsequent catalog
    calls are only triggered when the current record iterator is fully
    exhausted.
    
    1. If the current file has more records, return `true` immediately.
    2. Only if the current file is exhausted, check `usableFileIterator`.
    3. Only if `usableFileIterator` is also empty, call
    `seekToUsableFile()`.
    
    ### Any related issues, documentation, discussions?
    <!--
    Please use this section to link other resources if not mentioned
    already.
    1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
    #1234`
    or `Closes #1234`. If it is only related, simply mention the issue
    number.
      2. If there is design documentation, please add the link.
      3. If there is a discussion in the mailing list, please add the link.
    -->
    
    Fix #4289
    
    ### How was this PR tested?
    <!--
    If tests were added, say they were added here. Or simply mention that if
    the PR
    is tested with existing test cases. Make sure to include/update test
    cases that
    check the changes thoroughly including negative and positive cases if
    possible.
    If it was tested in a way different from regular unit tests, please
    clarify how
    you tested step by step, ideally copy and paste-able, so that other
    reviewers can
    test and check, and descendants can verify in the future. If tests were
    not added,
    please describe why they were not added and/or why it was difficult to
    add.
    -->
    1. Import and use [Untitled workflow
    
(9).json](https://github.com/user-attachments/files/25983397/Untitled.workflow.9.json).
    2. Use a CSV file containing 1M records.
    3. Set `storage.iceberg.table.commit.batch-size` to 1M (matching the
    total record count).
    4. Compare the performance before fix and after fix. For me it was 2m
    45s vs 36s.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    <!--
    If generative AI tooling has been used in the process of authoring this
    PR,
    please include the phrase: 'Generated-by: ' followed by the name of the
    tool
    and its version. If no, write 'No'.
    Please refer to the [ASF Generative Tooling
    Guidance](https://www.apache.org/legal/generative-tooling.html) for
    details.
    -->
    
    No.
    
    Co-authored-by: Chen Li <[email protected]>
    Co-authored-by: Xiaozhen Liu <[email protected]>
    Co-authored-by: Jiadong Bai <[email protected]>
---
 .../storage/result/iceberg/IcebergDocument.scala   |  6 ++
 .../result/iceberg/IcebergDocumentSpec.scala       | 78 +++++++++++++++++++++-
 2 files changed, 83 insertions(+), 1 deletion(-)

diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala
index e238ab7417..e10152cdae 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala
@@ -258,6 +258,12 @@ private[storage] class IcebergDocument[T >: Null <: 
AnyRef](
             return false
           }
 
+          // If the current file still has records, return immediately without 
touching the catalog.
+          // Only when the current file is exhausted do we check for more 
files and possibly refresh.
+          if (currentRecordIterator.hasNext) {
+            return true
+          }
+
           if (!usableFileIterator.hasNext) {
             usableFileIterator = seekToUsableFile()
           }
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index fe06a47d19..8fdf039f3e 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -19,9 +19,11 @@
 
 package org.apache.texera.amber.storage.result.iceberg
 
+import org.apache.texera.amber.config.StorageConfig
 import org.apache.texera.amber.core.storage.model.{VirtualDocument, 
VirtualDocumentSpec}
-import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
+import org.apache.texera.amber.core.storage.{DocumentFactory, 
IcebergCatalogInstance, VFSURIFactory}
 import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
+import org.apache.iceberg.Table
 import org.apache.texera.amber.core.virtualidentity.{
   ExecutionIdentity,
   OperatorIdentity,
@@ -35,9 +37,11 @@ import org.apache.iceberg.data.Record
 import org.apache.iceberg.{Schema => IcebergSchema}
 import org.scalatest.BeforeAndAfterAll
 
+import java.lang.reflect.{InvocationHandler, Method, Proxy}
 import java.net.URI
 import java.sql.Timestamp
 import java.util.UUID
+import java.util.concurrent.atomic.AtomicInteger
 
 class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with 
BeforeAndAfterAll {
 
@@ -99,6 +103,78 @@ class IcebergDocumentSpec extends 
VirtualDocumentSpec[Tuple] with BeforeAndAfter
     DocumentFactory.openDocument(uri)._1.asInstanceOf[VirtualDocument[Tuple]]
   }
 
+  it should "not trigger excessive catalog seeks when reading the last file 
(lazy file advancement)" in {
+    val batchSize = StorageConfig.icebergTableCommitBatchSize
+    val items = generateSampleItems().take(batchSize * 2)
+    val (batch1, batch2) = items.splitAt(batchSize)
+
+    // Write two separate batches to produce two committed data files.
+    // This also initialises `document.catalog` (lazy val) with the real 
catalog, which
+    // is why we open a fresh reader document below after injecting the spy.
+    val writer1 = document.writer(UUID.randomUUID().toString)
+    writer1.open(); batch1.foreach(writer1.putOne); writer1.close()
+
+    val writer2 = document.writer(UUID.randomUUID().toString)
+    writer2.open(); batch2.foreach(writer2.putOne); writer2.close()
+
+    val refreshCount = new AtomicInteger(0)
+    val realCatalog = IcebergCatalogInstance.getInstance()
+    IcebergCatalogInstance.replaceInstance(catalogWithRefreshSpy(realCatalog, 
refreshCount))
+    // Open a fresh reader: its `catalog` lazy val hasn't been initialised 
yet, so it
+    // will pick up the spy catalog on first access inside seekToUsableFile.
+    val readerDoc = getDocument
+    try {
+      val retrieved = readerDoc.get().toList
+      assert(
+        retrieved.toSet == items.toSet,
+        "All records from both files should be read correctly"
+      )
+      // With lazy file advancement seekToUsableFile() (and therefore 
table.refresh()) is called:
+      //   once on iterator creation, once when the last file is exhausted → 2 
total.
+      // Without the fix it would be called once per hasNext() on the last 
file → O(batchSize).
+      assert(
+        refreshCount.get() <= 4,
+        s"table.refresh() should be called at most 4 times (lazy advancement), 
but was ${refreshCount.get()}"
+      )
+    } finally {
+      IcebergCatalogInstance.replaceInstance(realCatalog)
+    }
+  }
+
+  /** Returns a dynamic proxy for `realTable` that increments `counter` on 
every `refresh()` call. */
+  private def tableWithRefreshSpy(realTable: Table, counter: AtomicInteger): 
Table =
+    Proxy
+      .newProxyInstance(
+        classOf[Table].getClassLoader,
+        Array(classOf[Table]),
+        new InvocationHandler {
+          override def invoke(proxy: Object, method: Method, args: 
Array[Object]): Object = {
+            if (method.getName == "refresh") counter.incrementAndGet()
+            if (args == null) method.invoke(realTable) else 
method.invoke(realTable, args: _*)
+          }
+        }
+      )
+      .asInstanceOf[Table]
+
+  /** Returns a dynamic proxy for `realCatalog` that wraps every loaded 
`Table` with a refresh spy. */
+  private def catalogWithRefreshSpy(realCatalog: Catalog, counter: 
AtomicInteger): Catalog =
+    Proxy
+      .newProxyInstance(
+        classOf[Catalog].getClassLoader,
+        Array(classOf[Catalog]),
+        new InvocationHandler {
+          override def invoke(proxy: Object, method: Method, args: 
Array[Object]): Object = {
+            val result =
+              if (args == null) method.invoke(realCatalog) else 
method.invoke(realCatalog, args: _*)
+            if (method.getName == "loadTable" && result != null)
+              tableWithRefreshSpy(result.asInstanceOf[Table], counter)
+            else
+              result
+          }
+        }
+      )
+      .asInstanceOf[Catalog]
+
   override def generateSampleItems(): List[Tuple] = {
     val baseTuples = List(
       Tuple

Reply via email to