This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new f5cf00a8b1 test(workflow-operator): add unit test coverage for
map/filter/flatmap abstract LogicalOp bases (#5796)
f5cf00a8b1 is described below
commit f5cf00a8b1b7a91d18b2505b4b3de3dfcfdcbdd2
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Jun 19 22:56:58 2026 -0700
test(workflow-operator): add unit test coverage for map/filter/flatmap
abstract LogicalOp bases (#5796)
### What changes were proposed in this PR?
Pin behavior of three tiny abstract `LogicalOp` bases in
`common/workflow-operator/`. Two of them (`MapOpDesc`, `FilterOpDesc`)
carry a `runtimeReconfiguration` override that delegates to
`newOpDesc.getPhysicalOp` and wraps the result in `Success((_, None))` —
drift in that delegation (swapped arg order, accidental `Failure` wrap,
surprise `StateTransferFunc`) would land silently. No production-code
changes.
| Spec | Source class | Tests |
| --- | --- | --- |
| `FlatMapOpDescSpec` | `FlatMapOpDesc` | 4 |
| `MapOpDescSpec` | `MapOpDesc` | 6 |
| `FilterOpDescSpec` | `FilterOpDesc` | 6 |
All three spec files follow the `<srcClassName>Spec.scala` one-to-one
convention.
**Behavior pinned (per descriptor)**
| Surface | Contract |
| --- | --- |
| Abstract-class shape | `Modifier.isAbstract(classOf[X])` |
| LogicalOp inheritance | upcast compiles; `case _: LogicalOp` matches a
concrete subclass |
| `runtimeReconfiguration` (Map/Filter only) | delegates to
`newOpDesc.getPhysicalOp(workflowId, executionId)` |
| Argument isolation | `oldOpDesc.getPhysicalOp` is **not** called |
| Result wrapping | returns `Success` with `(_, None)` for the
`StateTransferFunc` slot |
| Exception propagation | a throw from `newOpDesc.getPhysicalOp` is
**not** caught — it propagates up |
The specs use minimal `StubMapDesc` / `StubFilterDesc` subclasses that
record every `getPhysicalOp` call so the delegation is observable
end-to-end. Sentinel returns use `null.asInstanceOf[PhysicalOp]` — the
production code only passes the return value to `Success(...)` without
inspecting it, so the cast is safe for these tests.
### Any related issues, documentation, discussions?
Closes #5793.
### How was this PR tested?
Pure unit-test additions; verified locally with:
- `sbt \"WorkflowOperator/testOnly
org.apache.texera.amber.operator.flatmap.FlatMapOpDescSpec
org.apache.texera.amber.operator.map.MapOpDescSpec
org.apache.texera.amber.operator.filter.FilterOpDescSpec\"` — 16 tests,
all green
- `sbt \"WorkflowOperator/Test/scalafmtCheck\"` — clean
- CI to confirm
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7 [1M context])
---
.../amber/operator/filter/FilterOpDescSpec.scala | 116 ++++++++++++++++++++
.../amber/operator/flatmap/FlatMapOpDescSpec.scala | 78 ++++++++++++++
.../texera/amber/operator/map/MapOpDescSpec.scala | 120 +++++++++++++++++++++
3 files changed, 314 insertions(+)
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpDescSpec.scala
new file mode 100644
index 0000000000..5200eb3490
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpDescSpec.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.filter
+
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.PhysicalOp
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants,
OperatorInfo}
+import java.lang.reflect.Modifier
+import org.scalatest.flatspec.AnyFlatSpec
+
+class FilterOpDescSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // Test-only concrete subclass — same shape as the MapOpDesc stub; the
+ // two abstract bases share the same `runtimeReconfiguration` contract.
+ //
---------------------------------------------------------------------------
+
+ private class StubFilterDesc extends FilterOpDesc {
+ var calls: List[(WorkflowIdentity, ExecutionIdentity)] = Nil
+ override def getPhysicalOp(
+ workflowId: WorkflowIdentity,
+ executionId: ExecutionIdentity
+ ): PhysicalOp = {
+ calls = calls :+ ((workflowId, executionId))
+ null.asInstanceOf[PhysicalOp]
+ }
+ override def operatorInfo: OperatorInfo =
+ OperatorInfo(
+ "StubFilter",
+ "stub filter",
+ OperatorGroupConstants.UTILITY_GROUP,
+ inputPorts = List.empty,
+ outputPorts = List.empty
+ )
+ }
+
+ private val workflowId = WorkflowIdentity(7L)
+ private val executionId = ExecutionIdentity(11L)
+
+ //
---------------------------------------------------------------------------
+ // Abstract-class shape
+ //
---------------------------------------------------------------------------
+
+ "FilterOpDesc" should "be declared abstract (cannot be instantiated
directly)" in {
+ assert(Modifier.isAbstract(classOf[FilterOpDesc].getModifiers))
+ }
+
+ //
---------------------------------------------------------------------------
+ // Inheritance — FilterOpDesc is a LogicalOp
+ //
---------------------------------------------------------------------------
+
+ it should "extend LogicalOp (compile-time enforced)" in {
+ val s: LogicalOp = new StubFilterDesc
+ assert(s != null)
+ }
+
+ //
---------------------------------------------------------------------------
+ // runtimeReconfiguration — delegates to newOpDesc and ignores oldOpDesc
+ //
---------------------------------------------------------------------------
+
+ "FilterOpDesc.runtimeReconfiguration" should
+ "delegate to newOpDesc.getPhysicalOp with the supplied workflow/execution
ids" in {
+ val oldDesc = new StubFilterDesc
+ val newDesc = new StubFilterDesc
+ val result = oldDesc.runtimeReconfiguration(workflowId, executionId,
oldDesc, newDesc)
+ assert(result.isSuccess)
+ assert(newDesc.calls == List((workflowId, executionId)))
+ }
+
+ it should "not call oldOpDesc.getPhysicalOp" in {
+ val oldDesc = new StubFilterDesc
+ val newDesc = new StubFilterDesc
+ oldDesc.runtimeReconfiguration(workflowId, executionId, oldDesc, newDesc)
+ assert(oldDesc.calls == Nil)
+ }
+
+ it should "return None for the StateTransferFunc slot" in {
+ val oldDesc = new StubFilterDesc
+ val newDesc = new StubFilterDesc
+ val result = oldDesc.runtimeReconfiguration(workflowId, executionId,
oldDesc, newDesc)
+ val (_, transferOpt) = result.get
+ assert(transferOpt.isEmpty)
+ }
+
+ it should "propagate exceptions from newOpDesc.getPhysicalOp (not catch
them)" in {
+ val oldDesc = new StubFilterDesc
+ val throwingDesc = new StubFilterDesc {
+ override def getPhysicalOp(
+ workflowId: WorkflowIdentity,
+ executionId: ExecutionIdentity
+ ): PhysicalOp = throw new RuntimeException("sentinel:newDesc")
+ }
+ val ex = intercept[RuntimeException] {
+ oldDesc.runtimeReconfiguration(workflowId, executionId, oldDesc,
throwingDesc)
+ }
+ assert(ex.getMessage == "sentinel:newDesc")
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/flatmap/FlatMapOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/flatmap/FlatMapOpDescSpec.scala
new file mode 100644
index 0000000000..937c7a4c64
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/flatmap/FlatMapOpDescSpec.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.flatmap
+
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants,
OperatorInfo}
+import java.lang.reflect.Modifier
+import org.scalatest.flatspec.AnyFlatSpec
+
+class FlatMapOpDescSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // Test-only concrete subclass
+ //
---------------------------------------------------------------------------
+
+ private class StubFlatMap extends FlatMapOpDesc {
+ override def operatorInfo: OperatorInfo =
+ OperatorInfo(
+ "StubFlatMap",
+ "stub flatmap",
+ OperatorGroupConstants.UTILITY_GROUP,
+ inputPorts = List.empty,
+ outputPorts = List.empty
+ )
+ }
+
+ //
---------------------------------------------------------------------------
+ // Abstract-class shape
+ //
---------------------------------------------------------------------------
+
+ "FlatMapOpDesc" should "be declared abstract (cannot be instantiated
directly)" in {
+ assert(Modifier.isAbstract(classOf[FlatMapOpDesc].getModifiers))
+ }
+
+ //
---------------------------------------------------------------------------
+ // Inheritance — FlatMapOpDesc is a LogicalOp
+ //
---------------------------------------------------------------------------
+
+ it should "extend LogicalOp (compile-time enforced)" in {
+ val s: LogicalOp = new StubFlatMap
+ assert(s != null)
+ }
+
+ it should "match the LogicalOp type-pattern" in {
+ val any: AnyRef = new StubFlatMap
+ val matched = any match {
+ case _: LogicalOp => true
+ case _ => false
+ }
+ assert(matched)
+ }
+
+ it should "match the FlatMapOpDesc type-pattern" in {
+ val any: AnyRef = new StubFlatMap
+ val matched = any match {
+ case _: FlatMapOpDesc => true
+ case _ => false
+ }
+ assert(matched)
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/map/MapOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/map/MapOpDescSpec.scala
new file mode 100644
index 0000000000..aa61154ca8
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/map/MapOpDescSpec.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.map
+
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.PhysicalOp
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants,
OperatorInfo}
+import java.lang.reflect.Modifier
+import org.scalatest.flatspec.AnyFlatSpec
+
+class MapOpDescSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // Test-only concrete subclass — records every call to `getPhysicalOp`
+ // and returns a stable sentinel so the spec can verify the delegation.
+ //
+ // Casting `null` to PhysicalOp is acceptable here: the production
+ // `runtimeReconfiguration` just wraps the return value in
+ // `Success((_, None))` without inspecting it.
+ //
---------------------------------------------------------------------------
+
+ private class StubMapDesc extends MapOpDesc {
+ var calls: List[(WorkflowIdentity, ExecutionIdentity)] = Nil
+ override def getPhysicalOp(
+ workflowId: WorkflowIdentity,
+ executionId: ExecutionIdentity
+ ): PhysicalOp = {
+ calls = calls :+ ((workflowId, executionId))
+ null.asInstanceOf[PhysicalOp]
+ }
+ override def operatorInfo: OperatorInfo =
+ OperatorInfo(
+ "StubMap",
+ "stub map",
+ OperatorGroupConstants.UTILITY_GROUP,
+ inputPorts = List.empty,
+ outputPorts = List.empty
+ )
+ }
+
+ private val workflowId = WorkflowIdentity(7L)
+ private val executionId = ExecutionIdentity(11L)
+
+ //
---------------------------------------------------------------------------
+ // Abstract-class shape
+ //
---------------------------------------------------------------------------
+
+ "MapOpDesc" should "be declared abstract (cannot be instantiated directly)"
in {
+ assert(Modifier.isAbstract(classOf[MapOpDesc].getModifiers))
+ }
+
+ //
---------------------------------------------------------------------------
+ // Inheritance — MapOpDesc is a LogicalOp
+ //
---------------------------------------------------------------------------
+
+ it should "extend LogicalOp (compile-time enforced)" in {
+ val s: LogicalOp = new StubMapDesc
+ assert(s != null)
+ }
+
+ //
---------------------------------------------------------------------------
+ // runtimeReconfiguration — delegates to newOpDesc and ignores oldOpDesc
+ //
---------------------------------------------------------------------------
+
+ "MapOpDesc.runtimeReconfiguration" should
+ "delegate to newOpDesc.getPhysicalOp with the supplied workflow/execution
ids" in {
+ val oldDesc = new StubMapDesc
+ val newDesc = new StubMapDesc
+ val result = oldDesc.runtimeReconfiguration(workflowId, executionId,
oldDesc, newDesc)
+ assert(result.isSuccess)
+ assert(newDesc.calls == List((workflowId, executionId)))
+ }
+
+ it should "not call oldOpDesc.getPhysicalOp" in {
+ val oldDesc = new StubMapDesc
+ val newDesc = new StubMapDesc
+ oldDesc.runtimeReconfiguration(workflowId, executionId, oldDesc, newDesc)
+ assert(oldDesc.calls == Nil)
+ }
+
+ it should "return None for the StateTransferFunc slot" in {
+ val oldDesc = new StubMapDesc
+ val newDesc = new StubMapDesc
+ val result = oldDesc.runtimeReconfiguration(workflowId, executionId,
oldDesc, newDesc)
+ val (_, transferOpt) = result.get
+ assert(transferOpt.isEmpty)
+ }
+
+ it should "propagate exceptions from newOpDesc.getPhysicalOp (not catch
them)" in {
+ val oldDesc = new StubMapDesc
+ val throwingDesc = new StubMapDesc {
+ override def getPhysicalOp(
+ workflowId: WorkflowIdentity,
+ executionId: ExecutionIdentity
+ ): PhysicalOp = throw new RuntimeException("sentinel:newDesc")
+ }
+ val ex = intercept[RuntimeException] {
+ oldDesc.runtimeReconfiguration(workflowId, executionId, oldDesc,
throwingDesc)
+ }
+ assert(ex.getMessage == "sentinel:newDesc")
+ }
+}