aglinxinyuan commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3353869367
##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -87,19 +92,54 @@ def __init__(
target=self.data_processor.run, daemon=True,
name="data_processor_thread"
).start()
+ def _attach_loop_start_id(self, output_state: State) -> None:
+ if "LoopStartId" in output_state:
+ return
+ output_state["LoopStartId"] = self.context.worker_id.split("-",
1)[1].rsplit(
+ "-main-0", 1
+ )[0]
+ # The URI lives on the upstream operator's output port (which
+ # LoopStart's first materialization reader is reading from).
+ reader_runnables = (
+ self.context.input_manager.get_input_port_mat_reader_threads()
+ )
+ output_state["LoopStartStateURI"] = VFSURIFactory.state_uri(
+ next(iter(reader_runnables.values()))[0].uri
+ )
+
+ def _jump_to_loop_start(
+ self, executor: LoopEndOperator, controller_interface
+ ) -> None:
+ state = executor.state
+ controller_interface.jump_to_operator_region(
+ JumpToOperatorRegionRequest(OperatorIdentity(state["LoopStartId"]))
+ )
+ uri = state["LoopStartStateURI"]
+ # Strip the per-iteration scratch (`table`, `output`) and the
+ # loop metadata (`LoopStartId`, `LoopStartStateURI`) so only the
+ # user-visible loop state is written back to LoopStart's input.
+ for key in ("table", "output", "LoopStartId", "LoopStartStateURI"):
+ state.pop(key, None)
Review Comment:
Most of the bullets here are already addressed by intermediate commits; this
commit (f22738ecb6) closes the remaining gaps.
| Concern | Status |
|---|---|
| Worker-name parsing (`split("-",1)[1].rsplit("-main-0",1)[0]`) | **Fixed
in 512841a78b** — `_compute_loop_start_id` now delegates to
`get_operator_id(...)` in `core/util/virtual_identity.py`, which is
exhaustively tested in `test_virtual_identity.py` (canonical, hyphenated op id,
non-main layer, digit-ending id, malformed inputs). The brittle inline parse is
gone. |
| State-key stripping list `(LoopStartId, LoopStartStateURI, table, output)`
| **Reduced in 30ba48c39f / 007a264b59 / 411d92f67** — `LoopStartId` /
`LoopStartStateURI` / `loop_counter` now ride the `StateFrame` envelope, not
user state. Only `(table, output)` (the user-exec runtime scratch) is still
stripped; that strip is now tested. |
| First-port assumption (`next(iter(reader_runnables.values()))[0].uri`) |
**Fixed in this commit** — `_compute_loop_start_id` raises `RuntimeError` if
the input_manager reports more than one input port or more than one reader on
the single port, instead of silently picking the dict iterator's first. |
| No tests | **Closed in this commit** — 7 new `test_main_loop.py` cases
covering both methods: worker-id parse via `get_operator_id`, URI =
state-channel of reader's result URI (so `VFSURIFactory.state_uri` isn't
dropped), both defensive raises, the RPC with `target_operator_id` taken from
the StateFrame envelope (never from user state), the scratch-key strip
preserving user vars, and the exact iceberg-write contract (`create_document →
writer("0") → put_one(State.to_tuple(0)) → close`). |
Diff scoped to `main_loop.py` (defensive guards only) and
`test_main_loop.py` (new cases). Loop integration coverage continues to live in
`amber/src/test/integration/.../LoopIntegrationSpec.scala`. 23/23 tests in
`test_main_loop.py` green.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]