This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new f2d5737059 test(workflow-operator): add unit test coverage for sort
and set operator descriptors (#5825)
f2d5737059 is described below
commit f2d573705981d98ccb946f88182fa59cb82c8614
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 20 17:37:50 2026 -0700
test(workflow-operator): add unit test coverage for sort and set operator
descriptors (#5825)
### What changes were proposed in this PR?
Pin behavior of three previously-untested descriptors that reorder rows
or combine relations, in `common/workflow-operator/`. No production-code
changes.
| Spec | Source class | Tests |
| --- | --- | --- |
| `StableMergeSortOpDescSpec` | `StableMergeSortOpDesc` | 3 |
| `SortPartitionsOpDescSpec` | `SortPartitionsOpDesc` | 3 |
| `SymmetricDifferenceOpDescSpec` | `SymmetricDifferenceOpDesc` | 4 |
All three spec files follow the `<srcClassName>Spec.scala` one-to-one
convention.
**Behavior pinned**
| Surface | Contract |
| --- | --- |
| `operatorInfo` | exact name + group (`SORT_GROUP` / `SET_GROUP`);
blocking output port (sort/set must observe all rows);
`SymmetricDifference` advertises two inputs
`PortIdentity(0)`/`PortIdentity(1)` |
| `getPhysicalOp` wiring | `opExecInitInfo` pattern-matches
`OpExecWithClassName` with the exact executor FQCN; port **identities**
carried forward (`keySet`, not counts) |
| `StableMergeSort` | non-parallelizable many-to-one op; deserializes
its `List[SortCriteriaUnit]` sort keys |
| `SortPartitions` | `RangePartition(List(attr), domainMin, domainMax)`
partition requirement; field round-trip |
| `SymmetricDifference` | `HashPartition` requirement on both inputs;
schema propagation passes the shared input schema through and throws
`IllegalArgumentException` when the two inputs' schemas differ |
### Any related issues, documentation, discussions?
Closes #5822.
### How was this PR tested?
Pure unit-test additions; verified locally with:
- `sbt "WorkflowOperator/testOnly
org.apache.texera.amber.operator.sort.StableMergeSortOpDescSpec
org.apache.texera.amber.operator.sortPartitions.SortPartitionsOpDescSpec
org.apache.texera.amber.operator.symmetricDifference.SymmetricDifferenceOpDescSpec"`
— 10 tests, all green
- `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt
"WorkflowOperator/Test/scalafix --check"` — clean
- CI to confirm
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.8 [1M context])
---
.../operator/sort/StableMergeSortOpDescSpec.scala | 70 +++++++++++++++++
.../sortPartitions/SortPartitionsOpDescSpec.scala | 80 +++++++++++++++++++
.../SymmetricDifferenceOpDescSpec.scala | 89 ++++++++++++++++++++++
3 files changed, 239 insertions(+)
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sort/StableMergeSortOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sort/StableMergeSortOpDescSpec.scala
new file mode 100644
index 0000000000..933549a3b3
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sort/StableMergeSortOpDescSpec.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.sort
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class StableMergeSortOpDescSpec extends AnyFlatSpec with Matchers {
+
+ private val workflowId = WorkflowIdentity(1L)
+ private val executionId = ExecutionIdentity(1L)
+
+ "StableMergeSortOpDesc.operatorInfo" should
+ "advertise the name, Sort group, and a single blocking output" in {
+ val info = (new StableMergeSortOpDesc).operatorInfo
+ info.userFriendlyName shouldBe "Stable Merge Sort"
+ info.operatorGroupName shouldBe OperatorGroupConstants.SORT_GROUP
+ info.inputPorts should have length 1
+ info.outputPorts should have length 1
+ // A stable sort must observe all rows before emitting, so the output
blocks.
+ info.outputPorts.head.blocking shouldBe true
+ }
+
+ "StableMergeSortOpDesc.getPhysicalOp" should
+ "be a non-parallelizable many-to-one op wiring StableMergeSortOpExec" in {
+ val op = new StableMergeSortOpDesc
+ val physical = op.getPhysicalOp(workflowId, executionId)
+ physical.parallelizable shouldBe false
+ physical.opExecInitInfo match {
+ case OpExecWithClassName(className, descString) =>
+ className shouldBe
"org.apache.texera.amber.operator.sort.StableMergeSortOpExec"
+ descString should not be empty
+ case other => fail(s"expected OpExecWithClassName, got $other")
+ }
+ physical.inputPorts.keySet shouldBe
op.operatorInfo.inputPorts.map(_.id).toSet
+ physical.outputPorts.keySet shouldBe
op.operatorInfo.outputPorts.map(_.id).toSet
+ }
+
+ "StableMergeSortOpDesc" should
+ "deserialize its sort keys (List of SortCriteriaUnit) through the
polymorphic base" in {
+ val json =
+
"""{"operatorType":"StableMergeSort","keys":[{"attribute":"age","sortPreference":"DESC"}]}"""
+ val desc = objectMapper.readValue(json,
classOf[LogicalOp]).asInstanceOf[StableMergeSortOpDesc]
+ desc.keys should have size 1
+ desc.keys.head.attributeName shouldBe "age"
+ desc.keys.head.sortPreference shouldBe SortPreference.DESC
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sortPartitions/SortPartitionsOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sortPartitions/SortPartitionsOpDescSpec.scala
new file mode 100644
index 0000000000..d9ae20b14a
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sortPartitions/SortPartitionsOpDescSpec.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.sortPartitions
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.RangePartition
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class SortPartitionsOpDescSpec extends AnyFlatSpec with Matchers {
+
+ private val workflowId = WorkflowIdentity(1L)
+ private val executionId = ExecutionIdentity(1L)
+
+ private def newDesc(attr: String, min: Long, max: Long):
SortPartitionsOpDesc = {
+ val d = new SortPartitionsOpDesc
+ d.sortAttributeName = attr
+ d.domainMin = min
+ d.domainMax = max
+ d
+ }
+
+ "SortPartitionsOpDesc.operatorInfo" should
+ "advertise the name, Sort group, and a single blocking output" in {
+ val info = (new SortPartitionsOpDesc).operatorInfo
+ info.userFriendlyName shouldBe "Sort Partitions"
+ info.operatorGroupName shouldBe OperatorGroupConstants.SORT_GROUP
+ info.inputPorts should have length 1
+ info.outputPorts should have length 1
+ info.outputPorts.head.blocking shouldBe true
+ }
+
+ "SortPartitionsOpDesc.getPhysicalOp" should
+ "wire SortPartitionsOpExec and require a RangePartition over the sort
attribute/domain" in {
+ val op = newDesc("score", 0L, 100L)
+ val physical = op.getPhysicalOp(workflowId, executionId)
+ physical.opExecInitInfo match {
+ case OpExecWithClassName(className, descString) =>
+ className shouldBe
"org.apache.texera.amber.operator.sortPartitions.SortPartitionsOpExec"
+ descString should not be empty
+ case other => fail(s"expected OpExecWithClassName, got $other")
+ }
+ physical.inputPorts.keySet shouldBe
op.operatorInfo.inputPorts.map(_.id).toSet
+ physical.outputPorts.keySet shouldBe
op.operatorInfo.outputPorts.map(_.id).toSet
+ physical.partitionRequirement shouldBe List(
+ Option(RangePartition(List("score"), 0L, 100L))
+ )
+ }
+
+ "SortPartitionsOpDesc" should "round-trip its attribute/domain fields
through the polymorphic base" in {
+ val json = objectMapper.writeValueAsString(newDesc("age", 1L, 99L))
+ val restored = objectMapper.readValue(json, classOf[LogicalOp])
+ restored shouldBe a[SortPartitionsOpDesc]
+ val sp = restored.asInstanceOf[SortPartitionsOpDesc]
+ sp.sortAttributeName shouldBe "age"
+ sp.domainMin shouldBe 1L
+ sp.domainMax shouldBe 99L
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/symmetricDifference/SymmetricDifferenceOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/symmetricDifference/SymmetricDifferenceOpDescSpec.scala
new file mode 100644
index 0000000000..e6e2995877
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/symmetricDifference/SymmetricDifferenceOpDescSpec.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.symmetricDifference
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{HashPartition, PortIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class SymmetricDifferenceOpDescSpec extends AnyFlatSpec with Matchers {
+
+ private val workflowId = WorkflowIdentity(1L)
+ private val executionId = ExecutionIdentity(1L)
+
+ private val schemaA = Schema().add(new Attribute("a", AttributeType.STRING))
+ private val schemaDifferent = Schema().add(new Attribute("b",
AttributeType.INTEGER))
+
+ "SymmetricDifferenceOpDesc.operatorInfo" should
+ "advertise the name, Set group, two inputs, and a single blocking output"
in {
+ val info = (new SymmetricDifferenceOpDesc).operatorInfo
+ info.userFriendlyName shouldBe "SymmetricDifference"
+ info.operatorGroupName shouldBe OperatorGroupConstants.SET_GROUP
+ info.inputPorts.map(_.id) shouldBe List(PortIdentity(0), PortIdentity(1))
+ info.outputPorts should have length 1
+ info.outputPorts.head.blocking shouldBe true
+ }
+
+ "SymmetricDifferenceOpDesc.getPhysicalOp" should
+ "wire SymmetricDifferenceOpExec, carry port identities, and require
HashPartition on both inputs" in {
+ val op = new SymmetricDifferenceOpDesc
+ val physical = op.getPhysicalOp(workflowId, executionId)
+ physical.opExecInitInfo match {
+ case OpExecWithClassName(className, _) =>
+ className shouldBe
"org.apache.texera.amber.operator.symmetricDifference.SymmetricDifferenceOpExec"
+ case other => fail(s"expected OpExecWithClassName, got $other")
+ }
+ physical.inputPorts.keySet shouldBe
op.operatorInfo.inputPorts.map(_.id).toSet
+ physical.outputPorts.keySet shouldBe
op.operatorInfo.outputPorts.map(_.id).toSet
+ physical.partitionRequirement shouldBe List(Option(HashPartition()),
Option(HashPartition()))
+ }
+
+ "SymmetricDifferenceOpDesc schema propagation" should
+ "pass the shared input schema through to the output port" in {
+ val op = new SymmetricDifferenceOpDesc
+ val physical = op.getPhysicalOp(workflowId, executionId)
+ val inputs = Map(PortIdentity(0) -> schemaA, PortIdentity(1) -> schemaA)
+ val outputs = physical.propagateSchema.func(inputs)
+ outputs.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet
+ outputs.values.toSet shouldBe Set(schemaA)
+ }
+
+ it should "reject inputs whose schemas differ" in {
+ val physical = (new SymmetricDifferenceOpDesc).getPhysicalOp(workflowId,
executionId)
+ val mismatched = Map(PortIdentity(0) -> schemaA, PortIdentity(1) ->
schemaDifferent)
+ intercept[IllegalArgumentException] {
+ physical.propagateSchema.func(mismatched)
+ }
+ }
+
+ "SymmetricDifferenceOpDesc" should
+ "round-trip through the polymorphic base (pins the SymmetricDifference
discriminator)" in {
+ // The operator has no config fields, so this pins the @JsonSubTypes
+ // discriminator + type resolution, on par with the other specs in this PR.
+ val json = objectMapper.writeValueAsString(new SymmetricDifferenceOpDesc)
+ objectMapper.readValue(json, classOf[LogicalOp]) shouldBe
a[SymmetricDifferenceOpDesc]
+ }
+}