aglinxinyuan commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3337639407


##########
amber/src/main/python/core/models/operator.py:
##########
@@ -291,3 +291,30 @@ def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]
             time, or None.
         """
         yield
+
+
+class LoopStartOperator(TableOperator):
+    @overrides.final
+    def process_state(self, state: State, port: int) -> Optional[State]:
+        if "LoopStartStateURI" in state:
+            state["loop_counter"] += 1

Review Comment:
   Addressed by moving `loop_counter` off `State` entirely (007a264b5). It is 
no longer a key in the `State` dict the operator is handed — the loop operators 
never read or mutate it. It now rides on the `StateFrame` transport envelope 
and the worker runtime owns it: `main_loop._process_state_frame` applies the 
`+1`/`-1` and handles the LoopStart/LoopEnd nested pass-through before the 
operator runs (so the generated LoopEnd is now consume-only). It is 
materialized/serialized as its own column via a new bilingual `StateStorage` 
(`content` STRING, `loop_counter` LONG) format, so `State` and its 
single-column schema stay pure user content. Operator-level counter coverage 
was relocated to `main_loop` runtime tests, with a new `StateStorageSpec` for 
the column round-trip.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to