This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 94170aef85 test(amber): add unit test coverage for OperatorExecution
and RegionExecution (#5449)
94170aef85 is described below
commit 94170aef8559473c67eea7351ce5948ce858126c
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Jun 14 16:45:46 2026 -0700
test(amber): add unit test coverage for OperatorExecution and
RegionExecution (#5449)
### What changes were proposed in this PR?
Pin behavior of two previously-uncovered modules in
`engine/architecture/controller/execution`. No production-code changes.
| Spec | Source class | Tests |
| --- | --- | --- |
| `OperatorExecutionSpec` | `OperatorExecution` | 14 (1 pending) |
| `RegionExecutionSpec` | `RegionExecution` | 14 |
Both spec files follow the `<srcClassName>Spec.scala` one-to-one
convention.
**Behavior pinned — `OperatorExecution`**
| Surface | Contract |
| --- | --- |
| `initWorkerExecution(workerId)` | registers a fresh `WorkerExecution`
under `workerId` and returns it |
| `getWorkerExecution(workerId)` | returns the previously-initialized
`WorkerExecution` |
| `getWorkerIds` | empty for a fresh operator; otherwise the set of
every initialized worker id |
| `getState` (no workers) | `UNINITIALIZED` (empty-iterable fallthrough
in `ExecutionUtils.aggregateStates`) |
| `getState` (all COMPLETED) | `COMPLETED` |
| `getState` (any RUNNING) | `RUNNING` |
| `getState` (all UNINITIALIZED) | `UNINITIALIZED` |
| `getStats` | per-port input / output metrics summed across workers
(count + size per `portId`); time fields are per-worker sums; distinct
ports stay separate |
| `getStats` (no workers) | empty input / output metrics; zero time
totals |
| `isInputPortCompleted` / `isOutputPortCompleted` | `true` only when
every worker reports the requested port as completed; input and output
ports with the same `portId` are tracked independently |
**Behavior pinned — `RegionExecution`**
| Surface | Contract |
| --- | --- |
| `initOperatorExecution(opId)` | registers a fresh `OperatorExecution`
and returns it |
| `initOperatorExecution(opId, Some(inherited))` | **deep-clones** the
inherited `OperatorExecution`; mutations on the original do not leak
into the clone |
| second `initOperatorExecution(opId)` for the same id | throws
`AssertionError` |
| `getOperatorExecution` / `hasOperatorExecution` /
`getAllOperatorExecutions` | retrieval semantics |
| `initLinkExecution(link)` | registers a fresh `LinkExecution`; second
call for the same link throws `AssertionError`; distinct links and their
inner channel-executions stay independent |
| `getStats` | one `OperatorMetrics` per registered `OperatorExecution`,
keyed by `opId`; empty when no operator is registered |
| `getState` / `isCompleted` | for a region with no ports, vacuously
`COMPLETED` |
**Notes**
While writing `OperatorExecutionSpec` I discovered a
**docstring-vs-implementation mismatch** in
`OperatorExecution.initWorkerExecution`:
```scala
def initWorkerExecution(workerId: ActorVirtualIdentity): WorkerExecution = {
assert(
!workerExecutions.contains(workerId), // <-- checks VALUES, not KEYS
s"WorkerExecution already exists for workerId: $workerId"
)
workerExecutions.put(workerId, WorkerExecution())
...
}
```
`workerExecutions` is a `java.util.concurrent.ConcurrentHashMap` and
`.contains(Object)` is the legacy `Hashtable` method that checks
**values**, not **keys**. So the assertion never fires and the second
call silently overwrites the prior `WorkerExecution` — contradicting the
docstring (`"throws AssertionError"`).
This PR pins the **current** behavior with a characterization test and
the **intended** behavior with `pendingUntilFixed`, so the day the
implementation is corrected (`containsKey`) CI surfaces it via the
`pendingUntilFixed`-pass-now-fails signal and the characterization test
starts failing. The fix itself is out of scope for a test-coverage PR.
### Any related issues, documentation, discussions?
Closes #5448.
### How was this PR tested?
Pure unit-test additions; verified locally with:
- `sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.controller.execution.OperatorExecutionSpec
org.apache.texera.amber.engine.architecture.controller.execution.RegionExecutionSpec"`
— 28 succeed, 1 pending (`pendingUntilFixed` for the duplicate-id
assertion)
- `sbt scalafmtCheckAll` — clean
- CI to confirm
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Sonnet 4.5)
---
.../execution/OperatorExecutionSpec.scala | 304 +++++++++++++++++++++
.../controller/execution/RegionExecutionSpec.scala | 236 ++++++++++++++++
2 files changed, 540 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecutionSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecutionSpec.scala
new file mode 100644
index 0000000000..e8b85391ea
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecutionSpec.scala
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.controller.execution
+
+import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
+import org.apache.texera.amber.core.workflow.PortIdentity
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
+import org.apache.texera.amber.engine.architecture.worker.statistics.{
+ PortTupleMetricsMapping,
+ TupleMetrics,
+ WorkerState,
+ WorkerStatistics
+}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class OperatorExecutionSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // Fixtures — small builders that keep tests readable
+ //
---------------------------------------------------------------------------
+
+ private def workerId(name: String): ActorVirtualIdentity =
ActorVirtualIdentity(name)
+
+ private def portTupleMetrics(portIdx: Int, count: Long, size: Long):
PortTupleMetricsMapping =
+ PortTupleMetricsMapping(PortIdentity(portIdx), TupleMetrics(count, size))
+
+ /**
+ * Push `(state, stats)` onto an existing `WorkerExecution`. Production
+ * code applies updates only if the timestamp is newer than the
+ * previously-recorded one; we use a monotonically increasing nano-clock
+ * surrogate so each call wins.
+ */
+ private var clock: Long = 0L
+ private def applyUpdate(
+ worker:
org.apache.texera.amber.engine.architecture.deploysemantics.layer.WorkerExecution,
+ state: WorkerState,
+ stats: WorkerStatistics
+ ): Unit = {
+ clock += 1
+ worker.update(clock, state, stats)
+ }
+ private def setState(
+ worker:
org.apache.texera.amber.engine.architecture.deploysemantics.layer.WorkerExecution,
+ state: WorkerState
+ ): Unit = {
+ clock += 1
+ worker.update(clock, state)
+ }
+
+ //
---------------------------------------------------------------------------
+ // initWorkerExecution + getWorkerExecution + getWorkerIds
+ //
---------------------------------------------------------------------------
+
+ "OperatorExecution.initWorkerExecution" should
+ "register a fresh WorkerExecution and return it" in {
+ val opExec = OperatorExecution()
+ val w = workerId("w-1")
+ val workerExec = opExec.initWorkerExecution(w)
+ assert(workerExec != null)
+ assert(opExec.getWorkerExecution(w) eq workerExec)
+ }
+
+ // The class docstring claims `initWorkerExecution` throws
+ // `AssertionError` on a duplicate worker id, but the implementation's
+ // `workerExecutions.contains(workerId)` call resolves to Java
+ // `ConcurrentHashMap.contains(Object)`, which checks VALUES rather than
+ // KEYS — so the assertion never fires and the second call silently
+ // overwrites the prior WorkerExecution. We pin the CURRENT (broken)
+ // behavior here so a future fix is noticed in CI, and document the
+ // intended contract with `pendingUntilFixed` so the failure surfaces
+ // the day the implementation is corrected.
+
+ it should
+ "currently overwrite the previous WorkerExecution on a second init for the
same id " +
+ "(characterization of the contains-by-value bug)" in {
+ val opExec = OperatorExecution()
+ val w = workerId("w-1")
+ val firstExec = opExec.initWorkerExecution(w)
+ val secondExec = opExec.initWorkerExecution(w)
+ assert(firstExec ne secondExec, "current impl replaces the prior
WorkerExecution instance")
+ assert(opExec.getWorkerExecution(w) eq secondExec)
+ }
+
+ it should "(desired) throw AssertionError when initWorkerExecution is called
twice for the same id" in pendingUntilFixed {
+ val opExec = OperatorExecution()
+ val w = workerId("w-1")
+ opExec.initWorkerExecution(w)
+ assertThrows[AssertionError] {
+ opExec.initWorkerExecution(w)
+ }
+ }
+
+ "OperatorExecution.getWorkerIds" should "be empty on a freshly constructed
operator" in {
+ val opExec = OperatorExecution()
+ assert(opExec.getWorkerIds.isEmpty)
+ }
+
+ it should "contain every initialized worker id" in {
+ val opExec = OperatorExecution()
+ val w1 = workerId("w-1")
+ val w2 = workerId("w-2")
+ val w3 = workerId("w-3")
+ opExec.initWorkerExecution(w1)
+ opExec.initWorkerExecution(w2)
+ opExec.initWorkerExecution(w3)
+ assert(opExec.getWorkerIds == Set(w1, w2, w3))
+ }
+
+ //
---------------------------------------------------------------------------
+ // getState — aggregation via ExecutionUtils.aggregateStates
+ //
---------------------------------------------------------------------------
+
+ "OperatorExecution.getState (no workers)" should
+ "return UNINITIALIZED — empty Iterable falls through to the default
branch" in {
+ val opExec = OperatorExecution()
+ assert(opExec.getState == WorkflowAggregatedState.UNINITIALIZED)
+ }
+
+ "OperatorExecution.getState (all workers COMPLETED)" should "return
COMPLETED" in {
+ val opExec = OperatorExecution()
+ val w1 = opExec.initWorkerExecution(workerId("w-1"))
+ val w2 = opExec.initWorkerExecution(workerId("w-2"))
+ setState(w1, WorkerState.COMPLETED)
+ setState(w2, WorkerState.COMPLETED)
+ assert(opExec.getState == WorkflowAggregatedState.COMPLETED)
+ }
+
+ "OperatorExecution.getState (any worker RUNNING)" should "return RUNNING" in
{
+ val opExec = OperatorExecution()
+ val w1 = opExec.initWorkerExecution(workerId("w-1"))
+ val w2 = opExec.initWorkerExecution(workerId("w-2"))
+ setState(w1, WorkerState.RUNNING)
+ setState(w2, WorkerState.COMPLETED)
+ assert(opExec.getState == WorkflowAggregatedState.RUNNING)
+ }
+
+ "OperatorExecution.getState (all workers UNINITIALIZED)" should "return
UNINITIALIZED" in {
+ val opExec = OperatorExecution()
+ opExec.initWorkerExecution(workerId("w-1"))
+ opExec.initWorkerExecution(workerId("w-2"))
+ // Newly-constructed WorkerExecution is UNINITIALIZED by default; no
+ // update needed.
+ assert(opExec.getState == WorkflowAggregatedState.UNINITIALIZED)
+ }
+
+ //
---------------------------------------------------------------------------
+ // getStats — port-metric aggregation + time sums
+ //
---------------------------------------------------------------------------
+
+ "OperatorExecution.getStats" should
+ "aggregate per-port input/output metrics across workers (counts + sizes
sum per portId)" in {
+ val opExec = OperatorExecution()
+ val w1 = opExec.initWorkerExecution(workerId("w-1"))
+ val w2 = opExec.initWorkerExecution(workerId("w-2"))
+ // worker-1 sees 10 tuples / 100 bytes on input port 0; 5 / 50 on output
port 0
+ applyUpdate(
+ w1,
+ WorkerState.RUNNING,
+ WorkerStatistics(
+ inputTupleMetrics = Seq(portTupleMetrics(0, 10L, 100L)),
+ outputTupleMetrics = Seq(portTupleMetrics(0, 5L, 50L)),
+ dataProcessingTime = 7L,
+ controlProcessingTime = 3L,
+ idleTime = 1L
+ )
+ )
+ // worker-2 sees 4 tuples / 40 bytes on input port 0; 2 / 20 on output
port 0
+ applyUpdate(
+ w2,
+ WorkerState.RUNNING,
+ WorkerStatistics(
+ inputTupleMetrics = Seq(portTupleMetrics(0, 4L, 40L)),
+ outputTupleMetrics = Seq(portTupleMetrics(0, 2L, 20L)),
+ dataProcessingTime = 11L,
+ controlProcessingTime = 13L,
+ idleTime = 17L
+ )
+ )
+
+ val metrics = opExec.getStats
+ // operatorState mirrors getState — both workers RUNNING → RUNNING
+ assert(metrics.operatorState == WorkflowAggregatedState.RUNNING)
+
+ val stats = metrics.operatorStatistics
+ val inAgg = stats.inputMetrics.find(_.portId == PortIdentity(0)).get
+ val outAgg = stats.outputMetrics.find(_.portId == PortIdentity(0)).get
+ assert(inAgg.tupleMetrics.count == 14L)
+ assert(inAgg.tupleMetrics.size == 140L)
+ assert(outAgg.tupleMetrics.count == 7L)
+ assert(outAgg.tupleMetrics.size == 70L)
+
+ // Time fields are summed across all workers
+ assert(stats.dataProcessingTime == 18L)
+ assert(stats.controlProcessingTime == 16L)
+ assert(stats.idleTime == 18L)
+ }
+
+ it should
+ "keep distinct ports separate when workers report metrics on different
ports" in {
+ val opExec = OperatorExecution()
+ val w1 = opExec.initWorkerExecution(workerId("w-1"))
+ val w2 = opExec.initWorkerExecution(workerId("w-2"))
+ applyUpdate(
+ w1,
+ WorkerState.RUNNING,
+ WorkerStatistics(
+ inputTupleMetrics = Seq(portTupleMetrics(0, 3L, 30L)),
+ outputTupleMetrics = Seq.empty,
+ dataProcessingTime = 0L,
+ controlProcessingTime = 0L,
+ idleTime = 0L
+ )
+ )
+ applyUpdate(
+ w2,
+ WorkerState.RUNNING,
+ WorkerStatistics(
+ inputTupleMetrics = Seq(portTupleMetrics(1, 5L, 50L)),
+ outputTupleMetrics = Seq.empty,
+ dataProcessingTime = 0L,
+ controlProcessingTime = 0L,
+ idleTime = 0L
+ )
+ )
+ val stats = opExec.getStats.operatorStatistics
+ val byPort = stats.inputMetrics.map(m => m.portId -> m.tupleMetrics).toMap
+ assert(byPort.keySet == Set(PortIdentity(0), PortIdentity(1)))
+ assert(byPort(PortIdentity(0)).count == 3L && byPort(PortIdentity(0)).size
== 30L)
+ assert(byPort(PortIdentity(1)).count == 5L && byPort(PortIdentity(1)).size
== 50L)
+ }
+
+ it should "report zero counts / empty metrics for a freshly-constructed
operator with no workers" in {
+ val opExec = OperatorExecution()
+ val stats = opExec.getStats.operatorStatistics
+ assert(stats.inputMetrics.isEmpty)
+ assert(stats.outputMetrics.isEmpty)
+ assert(stats.dataProcessingTime == 0L)
+ assert(stats.controlProcessingTime == 0L)
+ assert(stats.idleTime == 0L)
+ }
+
+ //
---------------------------------------------------------------------------
+ // isInputPortCompleted / isOutputPortCompleted
+ //
---------------------------------------------------------------------------
+
+ "OperatorExecution.isInputPortCompleted" should
+ "return true only when every worker reports the port as completed" in {
+ val opExec = OperatorExecution()
+ val w1 = opExec.initWorkerExecution(workerId("w-1"))
+ val w2 = opExec.initWorkerExecution(workerId("w-2"))
+ val port = PortIdentity(0)
+ // Initially no port executions touched — getInputPortExecution
+ // lazily creates them with completed = false.
+ assert(!opExec.isInputPortCompleted(port))
+ // Only worker-1 completes the port — operator still not complete.
+ w1.getInputPortExecution(port).setCompleted()
+ assert(!opExec.isInputPortCompleted(port))
+ // Both workers complete → operator completes.
+ w2.getInputPortExecution(port).setCompleted()
+ assert(opExec.isInputPortCompleted(port))
+ }
+
+ "OperatorExecution.isOutputPortCompleted" should
+ "return true only when every worker reports the port as completed" in {
+ val opExec = OperatorExecution()
+ val w1 = opExec.initWorkerExecution(workerId("w-1"))
+ val w2 = opExec.initWorkerExecution(workerId("w-2"))
+ val port = PortIdentity(0)
+ assert(!opExec.isOutputPortCompleted(port))
+ w1.getOutputPortExecution(port).setCompleted()
+ assert(!opExec.isOutputPortCompleted(port))
+ w2.getOutputPortExecution(port).setCompleted()
+ assert(opExec.isOutputPortCompleted(port))
+ }
+
+ it should "distinguish input port completion from output port completion
(same portId)" in {
+ // Same portId on input vs output is tracked by independent
+ // WorkerPortExecution instances. Completing the input port must NOT
+ // also flip the output port — and vice versa.
+ val opExec = OperatorExecution()
+ val w = opExec.initWorkerExecution(workerId("w-1"))
+ val port = PortIdentity(0)
+ w.getInputPortExecution(port).setCompleted()
+ assert(opExec.isInputPortCompleted(port))
+ assert(!opExec.isOutputPortCompleted(port))
+ }
+}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecutionSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecutionSpec.scala
new file mode 100644
index 0000000000..4b7ef73bdf
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecutionSpec.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.controller.execution
+
+import org.apache.texera.amber.core.virtualidentity.{
+ ActorVirtualIdentity,
+ ChannelIdentity,
+ OperatorIdentity,
+ PhysicalOpIdentity
+}
+import org.apache.texera.amber.core.workflow.{PhysicalLink, PortIdentity}
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
+import org.apache.texera.amber.engine.architecture.scheduling.{Region,
RegionIdentity}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class RegionExecutionSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // Fixtures
+ //
---------------------------------------------------------------------------
+
+ private def physicalOpId(layered: String, layer: String = "main"):
PhysicalOpIdentity =
+ PhysicalOpIdentity(OperatorIdentity(layered), layer)
+
+ private def physicalLink(
+ from: PhysicalOpIdentity,
+ to: PhysicalOpIdentity,
+ fromPort: Int = 0,
+ toPort: Int = 0
+ ): PhysicalLink =
+ PhysicalLink(from, PortIdentity(fromPort), to, PortIdentity(toPort))
+
+ private def channelId(from: String, to: String): ChannelIdentity =
+ ChannelIdentity(ActorVirtualIdentity(from), ActorVirtualIdentity(to),
isControl = false)
+
+ /**
+ * Empty `Region` — no operators, no links, no ports. Pinpointing the
+ * `RegionExecution` behavior that does NOT touch `region.getPorts`
+ * (init / get / has / getAll for operators & links, plus `getStats`)
+ * does not need the heavy `PhysicalOp` fixture. `getState` /
+ * `isCompleted` exercises the empty-ports path explicitly.
+ */
+ private def emptyRegion(id: Long = 0L): Region =
+ Region(
+ id = RegionIdentity(id),
+ physicalOps = Set.empty,
+ physicalLinks = Set.empty,
+ ports = Set.empty
+ )
+
+ //
---------------------------------------------------------------------------
+ // initOperatorExecution + getOperatorExecution + hasOperatorExecution
+ //
---------------------------------------------------------------------------
+
+ "RegionExecution.initOperatorExecution" should
+ "register a fresh OperatorExecution under the requested opId and return
it" in {
+ val region = RegionExecution(emptyRegion())
+ val opId = physicalOpId("op-a")
+ val opExec = region.initOperatorExecution(opId)
+ assert(opExec != null)
+ assert(region.getOperatorExecution(opId) eq opExec)
+ assert(region.hasOperatorExecution(opId))
+ }
+
+ it should "throw AssertionError if called twice for the same opId" in {
+ val region = RegionExecution(emptyRegion())
+ val opId = physicalOpId("op-a")
+ region.initOperatorExecution(opId)
+ assertThrows[AssertionError] {
+ region.initOperatorExecution(opId)
+ }
+ }
+
+ it should "deep-clone the inherited OperatorExecution (state mutations on
inherited do NOT leak)" in {
+ // Build an inherited OperatorExecution with one registered worker;
+ // pass it as inheritOperatorExecution. The clone must be a distinct
+ // instance — registering a NEW worker on the inherited copy must
+ // not show up in the cloned region's OperatorExecution.
+ val inherited = OperatorExecution()
+ inherited.initWorkerExecution(ActorVirtualIdentity("inherited-worker"))
+
+ val region = RegionExecution(emptyRegion())
+ val opId = physicalOpId("op-a")
+ val cloned = region.initOperatorExecution(opId, Some(inherited))
+
+ assert(cloned ne inherited, "deep clone must be a distinct instance")
+ assert(cloned.getWorkerIds == inherited.getWorkerIds)
+
+ // Mutate the original — clone must not see the change.
+ inherited.initWorkerExecution(ActorVirtualIdentity("post-clone-worker"))
+
assert(!cloned.getWorkerIds.contains(ActorVirtualIdentity("post-clone-worker")))
+ }
+
+ "RegionExecution.hasOperatorExecution" should "return false for an unknown
opId" in {
+ val region = RegionExecution(emptyRegion())
+ assert(!region.hasOperatorExecution(physicalOpId("missing")))
+ }
+
+ "RegionExecution.getAllOperatorExecutions" should
+ "return every registered (opId, OperatorExecution) pair" in {
+ val region = RegionExecution(emptyRegion())
+ val opA = physicalOpId("op-a")
+ val opB = physicalOpId("op-b")
+ val execA = region.initOperatorExecution(opA)
+ val execB = region.initOperatorExecution(opB)
+ val all = region.getAllOperatorExecutions.toMap
+ assert(all.keySet == Set(opA, opB))
+ assert(all(opA) eq execA)
+ assert(all(opB) eq execB)
+ }
+
+ //
---------------------------------------------------------------------------
+ // initLinkExecution + getAllLinkExecutions
+ //
---------------------------------------------------------------------------
+
+ "RegionExecution.initLinkExecution" should
+ "register a fresh LinkExecution for the requested link and return it" in {
+ val region = RegionExecution(emptyRegion())
+ val link = physicalLink(physicalOpId("op-a"), physicalOpId("op-b"))
+ val linkExec = region.initLinkExecution(link)
+ assert(linkExec != null)
+ val all = region.getAllLinkExecutions.toMap
+ assert(all.contains(link))
+ assert(all(link) eq linkExec)
+ }
+
+ it should "throw AssertionError if called twice for the same link" in {
+ val region = RegionExecution(emptyRegion())
+ val link = physicalLink(physicalOpId("op-a"), physicalOpId("op-b"))
+ region.initLinkExecution(link)
+ assertThrows[AssertionError] {
+ region.initLinkExecution(link)
+ }
+ }
+
+ it should "track multiple distinct links independently" in {
+ val region = RegionExecution(emptyRegion())
+ val l1 = physicalLink(physicalOpId("op-a"), physicalOpId("op-b"))
+ val l2 = physicalLink(physicalOpId("op-a"), physicalOpId("op-c"))
+ val l3 = physicalLink(physicalOpId("op-b"), physicalOpId("op-c"))
+ region.initLinkExecution(l1)
+ region.initLinkExecution(l2)
+ region.initLinkExecution(l3)
+ assert(region.getAllLinkExecutions.toMap.keySet == Set(l1, l2, l3))
+ }
+
+ it should
+ "produce LinkExecutions that can be mutated independently (channel init on
one does not show in the other)" in {
+ val region = RegionExecution(emptyRegion())
+ val l1 = physicalLink(physicalOpId("op-a"), physicalOpId("op-b"))
+ val l2 = physicalLink(physicalOpId("op-a"), physicalOpId("op-c"))
+ val link1 = region.initLinkExecution(l1)
+ val link2 = region.initLinkExecution(l2)
+ link1.initChannelExecution(channelId("a", "b"))
+ assert(link1.getAllChannelExecutions.size == 1)
+ assert(link2.getAllChannelExecutions.isEmpty)
+ }
+
+ //
---------------------------------------------------------------------------
+ // getStats
+ //
---------------------------------------------------------------------------
+
+ "RegionExecution.getStats" should
+ "return one OperatorMetrics per registered OperatorExecution, keyed by
opId" in {
+ val region = RegionExecution(emptyRegion())
+ val opA = physicalOpId("op-a")
+ val opB = physicalOpId("op-b")
+ region.initOperatorExecution(opA)
+ region.initOperatorExecution(opB)
+ val stats = region.getStats
+ assert(stats.keySet == Set(opA, opB))
+ // With no workers under either OperatorExecution, the aggregated
+ // state of each is UNINITIALIZED (per OperatorExecution.getState's
+ // empty-iterable fallthrough).
+ assert(stats(opA).operatorState == WorkflowAggregatedState.UNINITIALIZED)
+ assert(stats(opB).operatorState == WorkflowAggregatedState.UNINITIALIZED)
+ }
+
+ it should "be empty when no OperatorExecution has been registered" in {
+ val region = RegionExecution(emptyRegion())
+ assert(region.getStats.isEmpty)
+ }
+
+ //
---------------------------------------------------------------------------
+ // getState / isCompleted
+ //
---------------------------------------------------------------------------
+
+ "RegionExecution.getState" should
+ "return COMPLETED for a region with no ports (vacuous forall on getPorts)"
in {
+ // RegionExecution.getState iterates over region.getPorts; an empty
+ // set vacuously satisfies the forall, so the region is COMPLETED.
+ val region = RegionExecution(emptyRegion())
+ assert(region.getState == WorkflowAggregatedState.COMPLETED)
+ }
+
+ "RegionExecution.isCompleted" should "be true when getState == COMPLETED" in
{
+ val region = RegionExecution(emptyRegion())
+ assert(region.isCompleted)
+ }
+
+ //
---------------------------------------------------------------------------
+ // Instance independence
+ //
---------------------------------------------------------------------------
+ //
+ // Two RegionExecutions constructed from equal Regions hold their own
+ // mutable operator / link maps. Registering an operator on one must not
+ // be observable on the other — a regression here (e.g. accidentally
+ // sharing the map via a `val` on the wrapping Region) would corrupt
+ // controller bookkeeping across regions.
+
+ "Two RegionExecutions wrapping equal Regions" should
+ "hold independent operator-execution maps" in {
+ val r1 = RegionExecution(emptyRegion(7L))
+ val r2 = RegionExecution(emptyRegion(7L))
+ r1.initOperatorExecution(physicalOpId("op-a"))
+ assert(r1.hasOperatorExecution(physicalOpId("op-a")))
+ assert(!r2.hasOperatorExecution(physicalOpId("op-a")))
+ }
+}