This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
     new 2095b592ee docs+test(loop): document and guard 
OutputManager.reset_output_storage
2095b592ee is described below

commit 2095b592ee4aab7aa0b909c37f6534a21f75692a
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Jun 12 16:45:12 2026 -0700

    docs+test(loop): document and guard OutputManager.reset_output_storage
    
    Addresses PR #4206 review feedback
    (https://github.com/apache/texera/pull/4206#discussion_r3400851469): an
    earlier reply claimed reset_storage had been renamed to
    reset_output_storage, documented, and guarded, but that work was lost
    when the branch was rebased -- the method was back to a bare
    reset_storage with no docstring and no checks, and the reason
    truncation is safe lived only in the PR description.
    
    Re-apply on the consolidated branch:
    * Rename reset_storage -> reset_output_storage (matching main_loop's
      caller and the two __init__/set_up comments that referenced the
      intended name).
    * Add a docstring: what it does (drop+recreate the result AND state
      tables, reopen the writers), that it's called only by a Loop End
      worker once per iteration, and -- the rationale that previously
      lived only in the PR description -- WHY truncating live storage is
      safe: loops run in MATERIALIZED mode, so downstream operators don't
      read this output until the loop region completes, so no reader can
      observe an intermediate truncation.
    * Add the two previously-implicit precondition guards: exactly one
      output port, and the storage writer already set up
      (_storage_uri_base populated). Both raise a clear RuntimeError
      instead of silently resetting the wrong port or dereferencing None.
    * test_output_manager.py: new TestResetOutputStorage covering the
      happy path (close -> recreate result+state docs -> reopen writer)
      and both guard failures, with iceberg/thread collaborators mocked.
---
 .../core/architecture/packaging/output_manager.py  | 39 ++++++++-
 amber/src/main/python/core/runnables/main_loop.py  |  2 +-
 .../architecture/packaging/test_output_manager.py  | 92 +++++++++++++++++++++-
 3 files changed, 127 insertions(+), 6 deletions(-)

diff --git 
a/amber/src/main/python/core/architecture/packaging/output_manager.py 
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index a84b651f78..a349fb369c 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -93,7 +93,7 @@ class OutputManager:
         ] = dict()
 
         # Loop-end operators have a single output port; remember its base
-        # URI so `reset_loopend_storage` can re-provision the iceberg
+        # URI so `reset_output_storage` can re-provision the iceberg
         # tables on each loop iteration.
         self._storage_uri_base: typing.Optional[str] = None
 
@@ -138,7 +138,7 @@ class OutputManager:
         state materialization on the same port. `storage_uri_base` is the
         port's base URI; the result and state URIs are derived from it.
         """
-        # Remember the base URI so `reset_loopend_storage` can re-provision
+        # Remember the base URI so `reset_output_storage` can re-provision
         # the iceberg tables on subsequent loop iterations.
         self._storage_uri_base = storage_uri_base
         document, _ = DocumentFactory.open_document(
@@ -234,8 +234,39 @@ class OutputManager:
         elif port_id in self._port_state_writers:
             self._port_state_writers[port_id][0].put(element)
 
-    def reset_storage(self) -> None:
-        port_id = self.get_port_ids()[0]
+    def reset_output_storage(self) -> None:
+        """Drop and recreate this operator's result and state tables, then
+        reopen the storage writers against the empty tables.
+
+        Called only by a Loop End worker, once per loop iteration (see the
+        ``LoopEndOperator`` branch in ``MainLoop.process_input_state``). Each
+        iteration must start from empty tables so the materialization the
+        downstream eventually reads holds only the final iteration's rows
+        rather than every iteration's rows concatenated.
+
+        Truncating live storage is safe here because a workflow containing a
+        loop runs in MATERIALIZED execution mode: downstream operators do not
+        start reading this output until the loop region has fully completed,
+        so no reader can observe an intermediate truncation.
+
+        Preconditions (always satisfied by a Loop End worker): the operator
+        has exactly one output port, and ``set_up_port_storage_writer`` has
+        already run for it (so ``_storage_uri_base`` is populated). Both are
+        checked so future misuse fails loudly instead of silently resetting
+        the wrong port or dereferencing ``None``.
+        """
+        port_ids = self.get_port_ids()
+        if len(port_ids) != 1:
+            raise RuntimeError(
+                f"reset_output_storage expects exactly one output port, "
+                f"but found {len(port_ids)}"
+            )
+        if self._storage_uri_base is None:
+            raise RuntimeError(
+                "reset_output_storage called before the output port's storage "
+                "writer was set up"
+            )
+        port_id = port_ids[0]
         storage_uri_base = self._storage_uri_base
         self.close_port_storage_writers()
         DocumentFactory.create_document(
diff --git a/amber/src/main/python/core/runnables/main_loop.py 
b/amber/src/main/python/core/runnables/main_loop.py
index 95bca48daa..93b4af9873 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -264,7 +264,7 @@ class MainLoop(StoppableQueueBlockingRunnable):
         if output_state is not None:
             executor = self.context.executor_manager.executor
             if isinstance(executor, LoopEndOperator):
-                self.context.output_manager.reset_storage()
+                self.context.output_manager.reset_output_storage()
             elif isinstance(executor, LoopStartOperator):
                 # A LoopStart stamps its own id/uri onto the state it emits.
                 (
diff --git 
a/amber/src/test/python/core/architecture/packaging/test_output_manager.py 
b/amber/src/test/python/core/architecture/packaging/test_output_manager.py
index 95e03dca63..a8fec1b72d 100644
--- a/amber/src/test/python/core/architecture/packaging/test_output_manager.py
+++ b/amber/src/test/python/core/architecture/packaging/test_output_manager.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from unittest.mock import MagicMock
+from unittest.mock import MagicMock, patch
 
 import pytest
 
@@ -105,3 +105,93 @@ class TestSaveStateToStorageIfNeeded:
         thread_a.join.assert_called_once()
         thread_b.join.assert_called_once()
         assert output_manager._port_state_writers == {}
+
+
+class TestResetOutputStorage:
+    """Covers OutputManager.reset_output_storage, the per-iteration
+    result+state table reset a Loop End worker runs between loop
+    iterations.
+
+    The collaborators that touch real iceberg storage / writer threads
+    (DocumentFactory, close_port_storage_writers,
+    set_up_port_storage_writer) are replaced with spies so these tests
+    stay hermetic and assert the contract: drop+recreate both tables,
+    bracketed by closing the old writers and opening fresh ones, with
+    both preconditions enforced.
+    """
+
+    @pytest.fixture
+    def output_manager(self):
+        return OutputManager(worker_id="Worker:WF0-test-op-main-0")
+
+    @staticmethod
+    def _add_port_with_storage(om, port_id, uri, schema):
+        # Stand in for what add_output_port + set_up_port_storage_writer
+        # populate, without spinning up real iceberg tables and threads.
+        port = MagicMock()
+        port.get_schema.return_value = schema
+        om._ports[port_id] = port
+        om._storage_uri_base = uri
+
+    def test_recreates_result_and_state_tables_and_reopens_writer(self, 
output_manager):
+        port_id = PortIdentity(id=0, internal=False)
+        schema = MagicMock(name="schema")
+        self._add_port_with_storage(output_manager, port_id, "vfs:///base", 
schema)
+
+        output_manager.close_port_storage_writers = MagicMock()
+        output_manager.set_up_port_storage_writer = MagicMock()
+
+        with (
+            patch(
+                "core.architecture.packaging.output_manager.DocumentFactory"
+            ) as doc_factory,
+            patch(
+                "core.architecture.packaging.output_manager.VFSURIFactory"
+            ) as uri_factory,
+        ):
+            uri_factory.result_uri.return_value = "vfs:///base/result"
+            uri_factory.state_uri.return_value = "vfs:///base/state"
+            output_manager.reset_output_storage()
+
+        # Both the result and the state table are recreated, which drops
+        # the rows the previous loop iteration wrote.
+        recreated = {
+            call.args[0] for call in doc_factory.create_document.call_args_list
+        }
+        assert recreated == {"vfs:///base/result", "vfs:///base/state"}
+        # The old writers are flushed/closed first, and fresh writers are
+        # opened against the recreated tables afterwards.
+        output_manager.close_port_storage_writers.assert_called_once_with()
+        output_manager.set_up_port_storage_writer.assert_called_once_with(
+            port_id, "vfs:///base"
+        )
+
+    def test_raises_when_no_output_port(self, output_manager):
+        output_manager._storage_uri_base = "vfs:///base"
+        output_manager.close_port_storage_writers = MagicMock()
+        with 
patch("core.architecture.packaging.output_manager.DocumentFactory"):
+            with pytest.raises(RuntimeError, match="exactly one output port"):
+                output_manager.reset_output_storage()
+        # Must fail before touching storage.
+        output_manager.close_port_storage_writers.assert_not_called()
+
+    def test_raises_when_multiple_output_ports(self, output_manager):
+        schema = MagicMock()
+        self._add_port_with_storage(
+            output_manager, PortIdentity(id=0, internal=False), "vfs:///base", 
schema
+        )
+        # A second port makes the count != 1; the shared _storage_uri_base
+        # is already set, so the port-count guard is what must trip.
+        output_manager._ports[PortIdentity(id=1, internal=False)] = MagicMock()
+        with pytest.raises(RuntimeError, match="exactly one output port"):
+            output_manager.reset_output_storage()
+
+    def test_raises_when_storage_writer_not_set_up(self, output_manager):
+        # The port exists but no storage URI was assigned -- i.e.
+        # set_up_port_storage_writer never ran for it.
+        output_manager._ports[PortIdentity(id=0, internal=False)] = MagicMock()
+        output_manager.close_port_storage_writers = MagicMock()
+        with 
patch("core.architecture.packaging.output_manager.DocumentFactory"):
+            with pytest.raises(RuntimeError, match="storage writer was set 
up"):
+                output_manager.reset_output_storage()
+        output_manager.close_port_storage_writers.assert_not_called()

Reply via email to