This is an automated email from the ASF dual-hosted git repository.
kunwp1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 31308a29eb fix: reduce catalog round-trips in
`IcebergDocument.hasNext()` to improve result read performance (#4293)
31308a29eb is described below
commit 31308a29eb6877ac0ae19891d305472b6f80a504
Author: Kunwoo (Chris) <[email protected]>
AuthorDate: Wed Mar 18 11:52:57 2026 -0700
fix: reduce catalog round-trips in `IcebergDocument.hasNext()` to improve
result read performance (#4293)
<!--
Thanks for sending a pull request (PR)! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
[Contributing to
Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
2. Ensure you have added or run the appropriate tests for your PR
3. If the PR is work in progress, mark it a draft on GitHub.
4. Please write your PR title to summarize what this PR proposes, we
are following Conventional Commits style for PR titles as well.
5. Be sure to keep the PR description updated to reflect all changes.
-->
### What changes were proposed in this PR?
<!--
Please clarify what changes you are proposing. The purpose of this
section
is to outline the changes. Here are some tips for you:
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
3. If it is a refactoring, clarify what has been changed.
3. It would be helpful to include a before-and-after comparison using
screenshots or GIFs.
4. Please consider writing useful notes for better and faster reviews.
-->
This PR addresses #4289 by optimizing `IcebergDocument.hasNext()` to
minimize redundant catalog round-trips. By introducing a guard
condition, we ensure `seekToUsableFile()` and its subsequent catalog
calls are only triggered when the current record iterator is fully
exhausted.
1. If the current file has more records, return `true` immediately.
2. Only if the current file is exhausted, check `usableFileIterator`.
3. Only if `usableFileIterator` is also empty, call
`seekToUsableFile()`.
### Any related issues, documentation, discussions?
<!--
Please use this section to link other resources if not mentioned
already.
1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
#1234`
or `Closes #1234`. If it is only related, simply mention the issue
number.
2. If there is design documentation, please add the link.
3. If there is a discussion in the mailing list, please add the link.
-->
Fix #4289
### How was this PR tested?
<!--
If tests were added, say they were added here. Or simply mention that if
the PR
is tested with existing test cases. Make sure to include/update test
cases that
check the changes thoroughly including negative and positive cases if
possible.
If it was tested in a way different from regular unit tests, please
clarify how
you tested step by step, ideally copy and paste-able, so that other
reviewers can
test and check, and descendants can verify in the future. If tests were
not added,
please describe why they were not added and/or why it was difficult to
add.
-->
1. Import and use [Untitled workflow
(9).json](https://github.com/user-attachments/files/25983397/Untitled.workflow.9.json).
2. Use a CSV file containing 1M records.
3. Set `storage.iceberg.table.commit.batch-size` to 1M (matching the
total record count).
4. Compare the performance before fix and after fix. For me it was 2m
45s vs 36s.
### Was this PR authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this
PR,
please include the phrase: 'Generated-by: ' followed by the name of the
tool
and its version. If no, write 'No'.
Please refer to the [ASF Generative Tooling
Guidance](https://www.apache.org/legal/generative-tooling.html) for
details.
-->
No.
Co-authored-by: Chen Li <[email protected]>
Co-authored-by: Xiaozhen Liu <[email protected]>
Co-authored-by: Jiadong Bai <[email protected]>
---
.../storage/result/iceberg/IcebergDocument.scala | 6 ++
.../result/iceberg/IcebergDocumentSpec.scala | 78 +++++++++++++++++++++-
2 files changed, 83 insertions(+), 1 deletion(-)
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala
index e238ab7417..e10152cdae 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala
@@ -258,6 +258,12 @@ private[storage] class IcebergDocument[T >: Null <:
AnyRef](
return false
}
+ // If the current file still has records, return immediately without
touching the catalog.
+ // Only when the current file is exhausted do we check for more
files and possibly refresh.
+ if (currentRecordIterator.hasNext) {
+ return true
+ }
+
if (!usableFileIterator.hasNext) {
usableFileIterator = seekToUsableFile()
}
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index fe06a47d19..8fdf039f3e 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -19,9 +19,11 @@
package org.apache.texera.amber.storage.result.iceberg
+import org.apache.texera.amber.config.StorageConfig
import org.apache.texera.amber.core.storage.model.{VirtualDocument,
VirtualDocumentSpec}
-import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
+import org.apache.texera.amber.core.storage.{DocumentFactory,
IcebergCatalogInstance, VFSURIFactory}
import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple}
+import org.apache.iceberg.Table
import org.apache.texera.amber.core.virtualidentity.{
ExecutionIdentity,
OperatorIdentity,
@@ -35,9 +37,11 @@ import org.apache.iceberg.data.Record
import org.apache.iceberg.{Schema => IcebergSchema}
import org.scalatest.BeforeAndAfterAll
+import java.lang.reflect.{InvocationHandler, Method, Proxy}
import java.net.URI
import java.sql.Timestamp
import java.util.UUID
+import java.util.concurrent.atomic.AtomicInteger
class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with
BeforeAndAfterAll {
@@ -99,6 +103,78 @@ class IcebergDocumentSpec extends
VirtualDocumentSpec[Tuple] with BeforeAndAfter
DocumentFactory.openDocument(uri)._1.asInstanceOf[VirtualDocument[Tuple]]
}
+ it should "not trigger excessive catalog seeks when reading the last file
(lazy file advancement)" in {
+ val batchSize = StorageConfig.icebergTableCommitBatchSize
+ val items = generateSampleItems().take(batchSize * 2)
+ val (batch1, batch2) = items.splitAt(batchSize)
+
+ // Write two separate batches to produce two committed data files.
+ // This also initialises `document.catalog` (lazy val) with the real
catalog, which
+ // is why we open a fresh reader document below after injecting the spy.
+ val writer1 = document.writer(UUID.randomUUID().toString)
+ writer1.open(); batch1.foreach(writer1.putOne); writer1.close()
+
+ val writer2 = document.writer(UUID.randomUUID().toString)
+ writer2.open(); batch2.foreach(writer2.putOne); writer2.close()
+
+ val refreshCount = new AtomicInteger(0)
+ val realCatalog = IcebergCatalogInstance.getInstance()
+ IcebergCatalogInstance.replaceInstance(catalogWithRefreshSpy(realCatalog,
refreshCount))
+ // Open a fresh reader: its `catalog` lazy val hasn't been initialised
yet, so it
+ // will pick up the spy catalog on first access inside seekToUsableFile.
+ val readerDoc = getDocument
+ try {
+ val retrieved = readerDoc.get().toList
+ assert(
+ retrieved.toSet == items.toSet,
+ "All records from both files should be read correctly"
+ )
+ // With lazy file advancement seekToUsableFile() (and therefore
table.refresh()) is called:
+ // once on iterator creation, once when the last file is exhausted → 2
total.
+ // Without the fix it would be called once per hasNext() on the last
file → O(batchSize).
+ assert(
+ refreshCount.get() <= 4,
+ s"table.refresh() should be called at most 4 times (lazy advancement),
but was ${refreshCount.get()}"
+ )
+ } finally {
+ IcebergCatalogInstance.replaceInstance(realCatalog)
+ }
+ }
+
+ /** Returns a dynamic proxy for `realTable` that increments `counter` on
every `refresh()` call. */
+ private def tableWithRefreshSpy(realTable: Table, counter: AtomicInteger):
Table =
+ Proxy
+ .newProxyInstance(
+ classOf[Table].getClassLoader,
+ Array(classOf[Table]),
+ new InvocationHandler {
+ override def invoke(proxy: Object, method: Method, args:
Array[Object]): Object = {
+ if (method.getName == "refresh") counter.incrementAndGet()
+ if (args == null) method.invoke(realTable) else
method.invoke(realTable, args: _*)
+ }
+ }
+ )
+ .asInstanceOf[Table]
+
+ /** Returns a dynamic proxy for `realCatalog` that wraps every loaded
`Table` with a refresh spy. */
+ private def catalogWithRefreshSpy(realCatalog: Catalog, counter:
AtomicInteger): Catalog =
+ Proxy
+ .newProxyInstance(
+ classOf[Catalog].getClassLoader,
+ Array(classOf[Catalog]),
+ new InvocationHandler {
+ override def invoke(proxy: Object, method: Method, args:
Array[Object]): Object = {
+ val result =
+ if (args == null) method.invoke(realCatalog) else
method.invoke(realCatalog, args: _*)
+ if (method.getName == "loadTable" && result != null)
+ tableWithRefreshSpy(result.asInstanceOf[Table], counter)
+ else
+ result
+ }
+ }
+ )
+ .asInstanceOf[Catalog]
+
override def generateSampleItems(): List[Tuple] = {
val baseTuples = List(
Tuple