aglinxinyuan commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3337639407
##########
amber/src/main/python/core/models/operator.py:
##########
@@ -291,3 +291,30 @@ def process_table(self, table: Table, port: int) ->
Iterator[Optional[TableLike]
time, or None.
"""
yield
+
+
+class LoopStartOperator(TableOperator):
+ @overrides.final
+ def process_state(self, state: State, port: int) -> Optional[State]:
+ if "LoopStartStateURI" in state:
+ state["loop_counter"] += 1
Review Comment:
Addressed by moving `loop_counter` out of the `State` content dict entirely
(latest: 63d243353). The loop operators never read or mutate it. It rides on
the `StateFrame` transport envelope and the worker runtime owns it:
`main_loop._process_state_frame` applies the `+1`/`-1` and handles the
LoopStart/LoopEnd nested pass-through before the operator runs (so the
generated LoopEnd is now consume-only). It is materialized/serialized as its
own `loop_counter` column parallel to `content`: `State.SCHEMA` is the
two-column schema and `State.to_tuple(loop_counter)` writes both columns, while
`from_tuple` returns the bare State (the readers that need the counter read the
column directly). The user state JSON stays clean. Operator-level counter
coverage was relocated to `main_loop` runtime tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]