This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5449-2f9573be88e7f73e63e7f2f441187a5977fdb0a3
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 94170aef8559473c67eea7351ce5948ce858126c
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Jun 14 16:45:46 2026 -0700

    test(amber): add unit test coverage for OperatorExecution and 
RegionExecution (#5449)
    
    ### What changes were proposed in this PR?
    
    Pin behavior of two previously-uncovered modules in
    `engine/architecture/controller/execution`. No production-code changes.
    
    | Spec | Source class | Tests |
    | --- | --- | --- |
    | `OperatorExecutionSpec` | `OperatorExecution` | 14 (1 pending) |
    | `RegionExecutionSpec` | `RegionExecution` | 14 |
    
    Both spec files follow the `<srcClassName>Spec.scala` one-to-one
    convention.
    
    **Behavior pinned — `OperatorExecution`**
    
    | Surface | Contract |
    | --- | --- |
    | `initWorkerExecution(workerId)` | registers a fresh `WorkerExecution`
    under `workerId` and returns it |
    | `getWorkerExecution(workerId)` | returns the previously-initialized
    `WorkerExecution` |
    | `getWorkerIds` | empty for a fresh operator; otherwise the set of
    every initialized worker id |
    | `getState` (no workers) | `UNINITIALIZED` (empty-iterable fallthrough
    in `ExecutionUtils.aggregateStates`) |
    | `getState` (all COMPLETED) | `COMPLETED` |
    | `getState` (any RUNNING) | `RUNNING` |
    | `getState` (all UNINITIALIZED) | `UNINITIALIZED` |
    | `getStats` | per-port input / output metrics summed across workers
    (count + size per `portId`); time fields are per-worker sums; distinct
    ports stay separate |
    | `getStats` (no workers) | empty input / output metrics; zero time
    totals |
    | `isInputPortCompleted` / `isOutputPortCompleted` | `true` only when
    every worker reports the requested port as completed; input and output
    ports with the same `portId` are tracked independently |
    
    **Behavior pinned — `RegionExecution`**
    
    | Surface | Contract |
    | --- | --- |
    | `initOperatorExecution(opId)` | registers a fresh `OperatorExecution`
    and returns it |
    | `initOperatorExecution(opId, Some(inherited))` | **deep-clones** the
    inherited `OperatorExecution`; mutations on the original do not leak
    into the clone |
    | second `initOperatorExecution(opId)` for the same id | throws
    `AssertionError` |
    | `getOperatorExecution` / `hasOperatorExecution` /
    `getAllOperatorExecutions` | retrieval semantics |
    | `initLinkExecution(link)` | registers a fresh `LinkExecution`; second
    call for the same link throws `AssertionError`; distinct links and their
    inner channel-executions stay independent |
    | `getStats` | one `OperatorMetrics` per registered `OperatorExecution`,
    keyed by `opId`; empty when no operator is registered |
    | `getState` / `isCompleted` | for a region with no ports, vacuously
    `COMPLETED` |
    
    **Notes**
    
    While writing `OperatorExecutionSpec` I discovered a
    **docstring-vs-implementation mismatch** in
    `OperatorExecution.initWorkerExecution`:
    
    ```scala
    def initWorkerExecution(workerId: ActorVirtualIdentity): WorkerExecution = {
      assert(
        !workerExecutions.contains(workerId),  // <-- checks VALUES, not KEYS
        s"WorkerExecution already exists for workerId: $workerId"
      )
      workerExecutions.put(workerId, WorkerExecution())
      ...
    }
    ```
    
    `workerExecutions` is a `java.util.concurrent.ConcurrentHashMap` and
    `.contains(Object)` is the legacy `Hashtable` method that checks
    **values**, not **keys**. So the assertion never fires and the second
    call silently overwrites the prior `WorkerExecution` — contradicting the
    docstring (`"throws AssertionError"`).
    
    This PR pins the **current** behavior with a characterization test and
    the **intended** behavior with `pendingUntilFixed`, so the day the
    implementation is corrected (`containsKey`) CI surfaces it via the
    `pendingUntilFixed`-pass-now-fails signal and the characterization test
    starts failing. The fix itself is out of scope for a test-coverage PR.
    
    ### Any related issues, documentation, discussions?
    
    Closes #5448.
    
    ### How was this PR tested?
    
    Pure unit-test additions; verified locally with:
    
    - `sbt "WorkflowExecutionService/testOnly
    
org.apache.texera.amber.engine.architecture.controller.execution.OperatorExecutionSpec
    
org.apache.texera.amber.engine.architecture.controller.execution.RegionExecutionSpec"`
    — 28 succeed, 1 pending (`pendingUntilFixed` for the duplicate-id
    assertion)
    - `sbt scalafmtCheckAll` — clean
    - CI to confirm
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Sonnet 4.5)
---
 .../execution/OperatorExecutionSpec.scala          | 304 +++++++++++++++++++++
 .../controller/execution/RegionExecutionSpec.scala | 236 ++++++++++++++++
 2 files changed, 540 insertions(+)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecutionSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecutionSpec.scala
new file mode 100644
index 0000000000..e8b85391ea
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecutionSpec.scala
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.controller.execution
+
+import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
+import org.apache.texera.amber.core.workflow.PortIdentity
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
+import org.apache.texera.amber.engine.architecture.worker.statistics.{
+  PortTupleMetricsMapping,
+  TupleMetrics,
+  WorkerState,
+  WorkerStatistics
+}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class OperatorExecutionSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // Fixtures — small builders that keep tests readable
+  // 
---------------------------------------------------------------------------
+
+  private def workerId(name: String): ActorVirtualIdentity = 
ActorVirtualIdentity(name)
+
+  private def portTupleMetrics(portIdx: Int, count: Long, size: Long): 
PortTupleMetricsMapping =
+    PortTupleMetricsMapping(PortIdentity(portIdx), TupleMetrics(count, size))
+
+  /**
+    * Push `(state, stats)` onto an existing `WorkerExecution`. Production
+    * code applies updates only if the timestamp is newer than the
+    * previously-recorded one; we use a monotonically increasing nano-clock
+    * surrogate so each call wins.
+    */
+  private var clock: Long = 0L
+  private def applyUpdate(
+      worker: 
org.apache.texera.amber.engine.architecture.deploysemantics.layer.WorkerExecution,
+      state: WorkerState,
+      stats: WorkerStatistics
+  ): Unit = {
+    clock += 1
+    worker.update(clock, state, stats)
+  }
+  private def setState(
+      worker: 
org.apache.texera.amber.engine.architecture.deploysemantics.layer.WorkerExecution,
+      state: WorkerState
+  ): Unit = {
+    clock += 1
+    worker.update(clock, state)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // initWorkerExecution + getWorkerExecution + getWorkerIds
+  // 
---------------------------------------------------------------------------
+
+  "OperatorExecution.initWorkerExecution" should
+    "register a fresh WorkerExecution and return it" in {
+    val opExec = OperatorExecution()
+    val w = workerId("w-1")
+    val workerExec = opExec.initWorkerExecution(w)
+    assert(workerExec != null)
+    assert(opExec.getWorkerExecution(w) eq workerExec)
+  }
+
+  // The class docstring claims `initWorkerExecution` throws
+  // `AssertionError` on a duplicate worker id, but the implementation's
+  // `workerExecutions.contains(workerId)` call resolves to Java
+  // `ConcurrentHashMap.contains(Object)`, which checks VALUES rather than
+  // KEYS — so the assertion never fires and the second call silently
+  // overwrites the prior WorkerExecution. We pin the CURRENT (broken)
+  // behavior here so a future fix is noticed in CI, and document the
+  // intended contract with `pendingUntilFixed` so the failure surfaces
+  // the day the implementation is corrected.
+
+  it should
+    "currently overwrite the previous WorkerExecution on a second init for the 
same id " +
+      "(characterization of the contains-by-value bug)" in {
+    val opExec = OperatorExecution()
+    val w = workerId("w-1")
+    val firstExec = opExec.initWorkerExecution(w)
+    val secondExec = opExec.initWorkerExecution(w)
+    assert(firstExec ne secondExec, "current impl replaces the prior 
WorkerExecution instance")
+    assert(opExec.getWorkerExecution(w) eq secondExec)
+  }
+
+  it should "(desired) throw AssertionError when initWorkerExecution is called 
twice for the same id" in pendingUntilFixed {
+    val opExec = OperatorExecution()
+    val w = workerId("w-1")
+    opExec.initWorkerExecution(w)
+    assertThrows[AssertionError] {
+      opExec.initWorkerExecution(w)
+    }
+  }
+
+  "OperatorExecution.getWorkerIds" should "be empty on a freshly constructed 
operator" in {
+    val opExec = OperatorExecution()
+    assert(opExec.getWorkerIds.isEmpty)
+  }
+
+  it should "contain every initialized worker id" in {
+    val opExec = OperatorExecution()
+    val w1 = workerId("w-1")
+    val w2 = workerId("w-2")
+    val w3 = workerId("w-3")
+    opExec.initWorkerExecution(w1)
+    opExec.initWorkerExecution(w2)
+    opExec.initWorkerExecution(w3)
+    assert(opExec.getWorkerIds == Set(w1, w2, w3))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // getState — aggregation via ExecutionUtils.aggregateStates
+  // 
---------------------------------------------------------------------------
+
+  "OperatorExecution.getState (no workers)" should
+    "return UNINITIALIZED — empty Iterable falls through to the default 
branch" in {
+    val opExec = OperatorExecution()
+    assert(opExec.getState == WorkflowAggregatedState.UNINITIALIZED)
+  }
+
+  "OperatorExecution.getState (all workers COMPLETED)" should "return 
COMPLETED" in {
+    val opExec = OperatorExecution()
+    val w1 = opExec.initWorkerExecution(workerId("w-1"))
+    val w2 = opExec.initWorkerExecution(workerId("w-2"))
+    setState(w1, WorkerState.COMPLETED)
+    setState(w2, WorkerState.COMPLETED)
+    assert(opExec.getState == WorkflowAggregatedState.COMPLETED)
+  }
+
+  "OperatorExecution.getState (any worker RUNNING)" should "return RUNNING" in 
{
+    val opExec = OperatorExecution()
+    val w1 = opExec.initWorkerExecution(workerId("w-1"))
+    val w2 = opExec.initWorkerExecution(workerId("w-2"))
+    setState(w1, WorkerState.RUNNING)
+    setState(w2, WorkerState.COMPLETED)
+    assert(opExec.getState == WorkflowAggregatedState.RUNNING)
+  }
+
+  "OperatorExecution.getState (all workers UNINITIALIZED)" should "return 
UNINITIALIZED" in {
+    val opExec = OperatorExecution()
+    opExec.initWorkerExecution(workerId("w-1"))
+    opExec.initWorkerExecution(workerId("w-2"))
+    // Newly-constructed WorkerExecution is UNINITIALIZED by default; no
+    // update needed.
+    assert(opExec.getState == WorkflowAggregatedState.UNINITIALIZED)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // getStats — port-metric aggregation + time sums
+  // 
---------------------------------------------------------------------------
+
+  "OperatorExecution.getStats" should
+    "aggregate per-port input/output metrics across workers (counts + sizes 
sum per portId)" in {
+    val opExec = OperatorExecution()
+    val w1 = opExec.initWorkerExecution(workerId("w-1"))
+    val w2 = opExec.initWorkerExecution(workerId("w-2"))
+    // worker-1 sees 10 tuples / 100 bytes on input port 0; 5 / 50 on output 
port 0
+    applyUpdate(
+      w1,
+      WorkerState.RUNNING,
+      WorkerStatistics(
+        inputTupleMetrics = Seq(portTupleMetrics(0, 10L, 100L)),
+        outputTupleMetrics = Seq(portTupleMetrics(0, 5L, 50L)),
+        dataProcessingTime = 7L,
+        controlProcessingTime = 3L,
+        idleTime = 1L
+      )
+    )
+    // worker-2 sees 4 tuples / 40 bytes on input port 0; 2 / 20 on output 
port 0
+    applyUpdate(
+      w2,
+      WorkerState.RUNNING,
+      WorkerStatistics(
+        inputTupleMetrics = Seq(portTupleMetrics(0, 4L, 40L)),
+        outputTupleMetrics = Seq(portTupleMetrics(0, 2L, 20L)),
+        dataProcessingTime = 11L,
+        controlProcessingTime = 13L,
+        idleTime = 17L
+      )
+    )
+
+    val metrics = opExec.getStats
+    // operatorState mirrors getState — both workers RUNNING → RUNNING
+    assert(metrics.operatorState == WorkflowAggregatedState.RUNNING)
+
+    val stats = metrics.operatorStatistics
+    val inAgg = stats.inputMetrics.find(_.portId == PortIdentity(0)).get
+    val outAgg = stats.outputMetrics.find(_.portId == PortIdentity(0)).get
+    assert(inAgg.tupleMetrics.count == 14L)
+    assert(inAgg.tupleMetrics.size == 140L)
+    assert(outAgg.tupleMetrics.count == 7L)
+    assert(outAgg.tupleMetrics.size == 70L)
+
+    // Time fields are summed across all workers
+    assert(stats.dataProcessingTime == 18L)
+    assert(stats.controlProcessingTime == 16L)
+    assert(stats.idleTime == 18L)
+  }
+
+  it should
+    "keep distinct ports separate when workers report metrics on different 
ports" in {
+    val opExec = OperatorExecution()
+    val w1 = opExec.initWorkerExecution(workerId("w-1"))
+    val w2 = opExec.initWorkerExecution(workerId("w-2"))
+    applyUpdate(
+      w1,
+      WorkerState.RUNNING,
+      WorkerStatistics(
+        inputTupleMetrics = Seq(portTupleMetrics(0, 3L, 30L)),
+        outputTupleMetrics = Seq.empty,
+        dataProcessingTime = 0L,
+        controlProcessingTime = 0L,
+        idleTime = 0L
+      )
+    )
+    applyUpdate(
+      w2,
+      WorkerState.RUNNING,
+      WorkerStatistics(
+        inputTupleMetrics = Seq(portTupleMetrics(1, 5L, 50L)),
+        outputTupleMetrics = Seq.empty,
+        dataProcessingTime = 0L,
+        controlProcessingTime = 0L,
+        idleTime = 0L
+      )
+    )
+    val stats = opExec.getStats.operatorStatistics
+    val byPort = stats.inputMetrics.map(m => m.portId -> m.tupleMetrics).toMap
+    assert(byPort.keySet == Set(PortIdentity(0), PortIdentity(1)))
+    assert(byPort(PortIdentity(0)).count == 3L && byPort(PortIdentity(0)).size 
== 30L)
+    assert(byPort(PortIdentity(1)).count == 5L && byPort(PortIdentity(1)).size 
== 50L)
+  }
+
+  it should "report zero counts / empty metrics for a freshly-constructed 
operator with no workers" in {
+    val opExec = OperatorExecution()
+    val stats = opExec.getStats.operatorStatistics
+    assert(stats.inputMetrics.isEmpty)
+    assert(stats.outputMetrics.isEmpty)
+    assert(stats.dataProcessingTime == 0L)
+    assert(stats.controlProcessingTime == 0L)
+    assert(stats.idleTime == 0L)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // isInputPortCompleted / isOutputPortCompleted
+  // 
---------------------------------------------------------------------------
+
+  "OperatorExecution.isInputPortCompleted" should
+    "return true only when every worker reports the port as completed" in {
+    val opExec = OperatorExecution()
+    val w1 = opExec.initWorkerExecution(workerId("w-1"))
+    val w2 = opExec.initWorkerExecution(workerId("w-2"))
+    val port = PortIdentity(0)
+    // Initially no port executions touched — getInputPortExecution
+    // lazily creates them with completed = false.
+    assert(!opExec.isInputPortCompleted(port))
+    // Only worker-1 completes the port — operator still not complete.
+    w1.getInputPortExecution(port).setCompleted()
+    assert(!opExec.isInputPortCompleted(port))
+    // Both workers complete → operator completes.
+    w2.getInputPortExecution(port).setCompleted()
+    assert(opExec.isInputPortCompleted(port))
+  }
+
+  "OperatorExecution.isOutputPortCompleted" should
+    "return true only when every worker reports the port as completed" in {
+    val opExec = OperatorExecution()
+    val w1 = opExec.initWorkerExecution(workerId("w-1"))
+    val w2 = opExec.initWorkerExecution(workerId("w-2"))
+    val port = PortIdentity(0)
+    assert(!opExec.isOutputPortCompleted(port))
+    w1.getOutputPortExecution(port).setCompleted()
+    assert(!opExec.isOutputPortCompleted(port))
+    w2.getOutputPortExecution(port).setCompleted()
+    assert(opExec.isOutputPortCompleted(port))
+  }
+
+  it should "distinguish input port completion from output port completion 
(same portId)" in {
+    // Same portId on input vs output is tracked by independent
+    // WorkerPortExecution instances. Completing the input port must NOT
+    // also flip the output port — and vice versa.
+    val opExec = OperatorExecution()
+    val w = opExec.initWorkerExecution(workerId("w-1"))
+    val port = PortIdentity(0)
+    w.getInputPortExecution(port).setCompleted()
+    assert(opExec.isInputPortCompleted(port))
+    assert(!opExec.isOutputPortCompleted(port))
+  }
+}
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecutionSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecutionSpec.scala
new file mode 100644
index 0000000000..4b7ef73bdf
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecutionSpec.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.controller.execution
+
+import org.apache.texera.amber.core.virtualidentity.{
+  ActorVirtualIdentity,
+  ChannelIdentity,
+  OperatorIdentity,
+  PhysicalOpIdentity
+}
+import org.apache.texera.amber.core.workflow.{PhysicalLink, PortIdentity}
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
+import org.apache.texera.amber.engine.architecture.scheduling.{Region, 
RegionIdentity}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class RegionExecutionSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // Fixtures
+  // 
---------------------------------------------------------------------------
+
+  private def physicalOpId(layered: String, layer: String = "main"): 
PhysicalOpIdentity =
+    PhysicalOpIdentity(OperatorIdentity(layered), layer)
+
+  private def physicalLink(
+      from: PhysicalOpIdentity,
+      to: PhysicalOpIdentity,
+      fromPort: Int = 0,
+      toPort: Int = 0
+  ): PhysicalLink =
+    PhysicalLink(from, PortIdentity(fromPort), to, PortIdentity(toPort))
+
+  private def channelId(from: String, to: String): ChannelIdentity =
+    ChannelIdentity(ActorVirtualIdentity(from), ActorVirtualIdentity(to), 
isControl = false)
+
+  /**
+    * Empty `Region` — no operators, no links, no ports. Pinpointing the
+    * `RegionExecution` behavior that does NOT touch `region.getPorts`
+    * (init / get / has / getAll for operators & links, plus `getStats`)
+    * does not need the heavy `PhysicalOp` fixture. `getState` /
+    * `isCompleted` exercises the empty-ports path explicitly.
+    */
+  private def emptyRegion(id: Long = 0L): Region =
+    Region(
+      id = RegionIdentity(id),
+      physicalOps = Set.empty,
+      physicalLinks = Set.empty,
+      ports = Set.empty
+    )
+
+  // 
---------------------------------------------------------------------------
+  // initOperatorExecution + getOperatorExecution + hasOperatorExecution
+  // 
---------------------------------------------------------------------------
+
+  "RegionExecution.initOperatorExecution" should
+    "register a fresh OperatorExecution under the requested opId and return 
it" in {
+    val region = RegionExecution(emptyRegion())
+    val opId = physicalOpId("op-a")
+    val opExec = region.initOperatorExecution(opId)
+    assert(opExec != null)
+    assert(region.getOperatorExecution(opId) eq opExec)
+    assert(region.hasOperatorExecution(opId))
+  }
+
+  it should "throw AssertionError if called twice for the same opId" in {
+    val region = RegionExecution(emptyRegion())
+    val opId = physicalOpId("op-a")
+    region.initOperatorExecution(opId)
+    assertThrows[AssertionError] {
+      region.initOperatorExecution(opId)
+    }
+  }
+
+  it should "deep-clone the inherited OperatorExecution (state mutations on 
inherited do NOT leak)" in {
+    // Build an inherited OperatorExecution with one registered worker;
+    // pass it as inheritOperatorExecution. The clone must be a distinct
+    // instance — registering a NEW worker on the inherited copy must
+    // not show up in the cloned region's OperatorExecution.
+    val inherited = OperatorExecution()
+    inherited.initWorkerExecution(ActorVirtualIdentity("inherited-worker"))
+
+    val region = RegionExecution(emptyRegion())
+    val opId = physicalOpId("op-a")
+    val cloned = region.initOperatorExecution(opId, Some(inherited))
+
+    assert(cloned ne inherited, "deep clone must be a distinct instance")
+    assert(cloned.getWorkerIds == inherited.getWorkerIds)
+
+    // Mutate the original — clone must not see the change.
+    inherited.initWorkerExecution(ActorVirtualIdentity("post-clone-worker"))
+    
assert(!cloned.getWorkerIds.contains(ActorVirtualIdentity("post-clone-worker")))
+  }
+
+  "RegionExecution.hasOperatorExecution" should "return false for an unknown 
opId" in {
+    val region = RegionExecution(emptyRegion())
+    assert(!region.hasOperatorExecution(physicalOpId("missing")))
+  }
+
+  "RegionExecution.getAllOperatorExecutions" should
+    "return every registered (opId, OperatorExecution) pair" in {
+    val region = RegionExecution(emptyRegion())
+    val opA = physicalOpId("op-a")
+    val opB = physicalOpId("op-b")
+    val execA = region.initOperatorExecution(opA)
+    val execB = region.initOperatorExecution(opB)
+    val all = region.getAllOperatorExecutions.toMap
+    assert(all.keySet == Set(opA, opB))
+    assert(all(opA) eq execA)
+    assert(all(opB) eq execB)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // initLinkExecution + getAllLinkExecutions
+  // 
---------------------------------------------------------------------------
+
+  "RegionExecution.initLinkExecution" should
+    "register a fresh LinkExecution for the requested link and return it" in {
+    val region = RegionExecution(emptyRegion())
+    val link = physicalLink(physicalOpId("op-a"), physicalOpId("op-b"))
+    val linkExec = region.initLinkExecution(link)
+    assert(linkExec != null)
+    val all = region.getAllLinkExecutions.toMap
+    assert(all.contains(link))
+    assert(all(link) eq linkExec)
+  }
+
+  it should "throw AssertionError if called twice for the same link" in {
+    val region = RegionExecution(emptyRegion())
+    val link = physicalLink(physicalOpId("op-a"), physicalOpId("op-b"))
+    region.initLinkExecution(link)
+    assertThrows[AssertionError] {
+      region.initLinkExecution(link)
+    }
+  }
+
+  it should "track multiple distinct links independently" in {
+    val region = RegionExecution(emptyRegion())
+    val l1 = physicalLink(physicalOpId("op-a"), physicalOpId("op-b"))
+    val l2 = physicalLink(physicalOpId("op-a"), physicalOpId("op-c"))
+    val l3 = physicalLink(physicalOpId("op-b"), physicalOpId("op-c"))
+    region.initLinkExecution(l1)
+    region.initLinkExecution(l2)
+    region.initLinkExecution(l3)
+    assert(region.getAllLinkExecutions.toMap.keySet == Set(l1, l2, l3))
+  }
+
+  it should
+    "produce LinkExecutions that can be mutated independently (channel init on 
one does not show in the other)" in {
+    val region = RegionExecution(emptyRegion())
+    val l1 = physicalLink(physicalOpId("op-a"), physicalOpId("op-b"))
+    val l2 = physicalLink(physicalOpId("op-a"), physicalOpId("op-c"))
+    val link1 = region.initLinkExecution(l1)
+    val link2 = region.initLinkExecution(l2)
+    link1.initChannelExecution(channelId("a", "b"))
+    assert(link1.getAllChannelExecutions.size == 1)
+    assert(link2.getAllChannelExecutions.isEmpty)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // getStats
+  // 
---------------------------------------------------------------------------
+
+  "RegionExecution.getStats" should
+    "return one OperatorMetrics per registered OperatorExecution, keyed by 
opId" in {
+    val region = RegionExecution(emptyRegion())
+    val opA = physicalOpId("op-a")
+    val opB = physicalOpId("op-b")
+    region.initOperatorExecution(opA)
+    region.initOperatorExecution(opB)
+    val stats = region.getStats
+    assert(stats.keySet == Set(opA, opB))
+    // With no workers under either OperatorExecution, the aggregated
+    // state of each is UNINITIALIZED (per OperatorExecution.getState's
+    // empty-iterable fallthrough).
+    assert(stats(opA).operatorState == WorkflowAggregatedState.UNINITIALIZED)
+    assert(stats(opB).operatorState == WorkflowAggregatedState.UNINITIALIZED)
+  }
+
+  it should "be empty when no OperatorExecution has been registered" in {
+    val region = RegionExecution(emptyRegion())
+    assert(region.getStats.isEmpty)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // getState / isCompleted
+  // 
---------------------------------------------------------------------------
+
+  "RegionExecution.getState" should
+    "return COMPLETED for a region with no ports (vacuous forall on getPorts)" 
in {
+    // RegionExecution.getState iterates over region.getPorts; an empty
+    // set vacuously satisfies the forall, so the region is COMPLETED.
+    val region = RegionExecution(emptyRegion())
+    assert(region.getState == WorkflowAggregatedState.COMPLETED)
+  }
+
+  "RegionExecution.isCompleted" should "be true when getState == COMPLETED" in 
{
+    val region = RegionExecution(emptyRegion())
+    assert(region.isCompleted)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Instance independence
+  // 
---------------------------------------------------------------------------
+  //
+  // Two RegionExecutions constructed from equal Regions hold their own
+  // mutable operator / link maps. Registering an operator on one must not
+  // be observable on the other — a regression here (e.g. accidentally
+  // sharing the map via a `val` on the wrapping Region) would corrupt
+  // controller bookkeeping across regions.
+
+  "Two RegionExecutions wrapping equal Regions" should
+    "hold independent operator-execution maps" in {
+    val r1 = RegionExecution(emptyRegion(7L))
+    val r2 = RegionExecution(emptyRegion(7L))
+    r1.initOperatorExecution(physicalOpId("op-a"))
+    assert(r1.hasOperatorExecution(physicalOpId("op-a")))
+    assert(!r2.hasOperatorExecution(physicalOpId("op-a")))
+  }
+}

Reply via email to