This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new f2d5737059 test(workflow-operator): add unit test coverage for sort 
and set operator descriptors (#5825)
f2d5737059 is described below

commit f2d573705981d98ccb946f88182fa59cb82c8614
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 20 17:37:50 2026 -0700

    test(workflow-operator): add unit test coverage for sort and set operator 
descriptors (#5825)
    
    ### What changes were proposed in this PR?
    
    Pin behavior of three previously-untested descriptors that reorder rows
    or combine relations, in `common/workflow-operator/`. No production-code
    changes.
    
    | Spec | Source class | Tests |
    | --- | --- | --- |
    | `StableMergeSortOpDescSpec` | `StableMergeSortOpDesc` | 3 |
    | `SortPartitionsOpDescSpec` | `SortPartitionsOpDesc` | 3 |
    | `SymmetricDifferenceOpDescSpec` | `SymmetricDifferenceOpDesc` | 4 |
    
    All three spec files follow the `<srcClassName>Spec.scala` one-to-one
    convention.
    
    **Behavior pinned**
    
    | Surface | Contract |
    | --- | --- |
    | `operatorInfo` | exact name + group (`SORT_GROUP` / `SET_GROUP`);
    blocking output port (sort/set must observe all rows);
    `SymmetricDifference` advertises two inputs
    `PortIdentity(0)`/`PortIdentity(1)` |
    | `getPhysicalOp` wiring | `opExecInitInfo` pattern-matches
    `OpExecWithClassName` with the exact executor FQCN; port **identities**
    carried forward (`keySet`, not counts) |
    | `StableMergeSort` | non-parallelizable many-to-one op; deserializes
    its `List[SortCriteriaUnit]` sort keys |
    | `SortPartitions` | `RangePartition(List(attr), domainMin, domainMax)`
    partition requirement; field round-trip |
    | `SymmetricDifference` | `HashPartition` requirement on both inputs;
    schema propagation passes the shared input schema through and throws
    `IllegalArgumentException` when the two inputs' schemas differ |
    
    ### Any related issues, documentation, discussions?
    
    Closes #5822.
    
    ### How was this PR tested?
    
    Pure unit-test additions; verified locally with:
    
    - `sbt "WorkflowOperator/testOnly
    org.apache.texera.amber.operator.sort.StableMergeSortOpDescSpec
    org.apache.texera.amber.operator.sortPartitions.SortPartitionsOpDescSpec
    
org.apache.texera.amber.operator.symmetricDifference.SymmetricDifferenceOpDescSpec"`
    — 10 tests, all green
    - `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt
    "WorkflowOperator/Test/scalafix --check"` — clean
    - CI to confirm
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.8 [1M context])
---
 .../operator/sort/StableMergeSortOpDescSpec.scala  | 70 +++++++++++++++++
 .../sortPartitions/SortPartitionsOpDescSpec.scala  | 80 +++++++++++++++++++
 .../SymmetricDifferenceOpDescSpec.scala            | 89 ++++++++++++++++++++++
 3 files changed, 239 insertions(+)

diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sort/StableMergeSortOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sort/StableMergeSortOpDescSpec.scala
new file mode 100644
index 0000000000..933549a3b3
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sort/StableMergeSortOpDescSpec.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.sort
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class StableMergeSortOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  "StableMergeSortOpDesc.operatorInfo" should
+    "advertise the name, Sort group, and a single blocking output" in {
+    val info = (new StableMergeSortOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Stable Merge Sort"
+    info.operatorGroupName shouldBe OperatorGroupConstants.SORT_GROUP
+    info.inputPorts should have length 1
+    info.outputPorts should have length 1
+    // A stable sort must observe all rows before emitting, so the output 
blocks.
+    info.outputPorts.head.blocking shouldBe true
+  }
+
+  "StableMergeSortOpDesc.getPhysicalOp" should
+    "be a non-parallelizable many-to-one op wiring StableMergeSortOpExec" in {
+    val op = new StableMergeSortOpDesc
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.parallelizable shouldBe false
+    physical.opExecInitInfo match {
+      case OpExecWithClassName(className, descString) =>
+        className shouldBe 
"org.apache.texera.amber.operator.sort.StableMergeSortOpExec"
+        descString should not be empty
+      case other => fail(s"expected OpExecWithClassName, got $other")
+    }
+    physical.inputPorts.keySet shouldBe 
op.operatorInfo.inputPorts.map(_.id).toSet
+    physical.outputPorts.keySet shouldBe 
op.operatorInfo.outputPorts.map(_.id).toSet
+  }
+
+  "StableMergeSortOpDesc" should
+    "deserialize its sort keys (List of SortCriteriaUnit) through the 
polymorphic base" in {
+    val json =
+      
"""{"operatorType":"StableMergeSort","keys":[{"attribute":"age","sortPreference":"DESC"}]}"""
+    val desc = objectMapper.readValue(json, 
classOf[LogicalOp]).asInstanceOf[StableMergeSortOpDesc]
+    desc.keys should have size 1
+    desc.keys.head.attributeName shouldBe "age"
+    desc.keys.head.sortPreference shouldBe SortPreference.DESC
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sortPartitions/SortPartitionsOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sortPartitions/SortPartitionsOpDescSpec.scala
new file mode 100644
index 0000000000..d9ae20b14a
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sortPartitions/SortPartitionsOpDescSpec.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.sortPartitions
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.RangePartition
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class SortPartitionsOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  private def newDesc(attr: String, min: Long, max: Long): 
SortPartitionsOpDesc = {
+    val d = new SortPartitionsOpDesc
+    d.sortAttributeName = attr
+    d.domainMin = min
+    d.domainMax = max
+    d
+  }
+
+  "SortPartitionsOpDesc.operatorInfo" should
+    "advertise the name, Sort group, and a single blocking output" in {
+    val info = (new SortPartitionsOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Sort Partitions"
+    info.operatorGroupName shouldBe OperatorGroupConstants.SORT_GROUP
+    info.inputPorts should have length 1
+    info.outputPorts should have length 1
+    info.outputPorts.head.blocking shouldBe true
+  }
+
+  "SortPartitionsOpDesc.getPhysicalOp" should
+    "wire SortPartitionsOpExec and require a RangePartition over the sort 
attribute/domain" in {
+    val op = newDesc("score", 0L, 100L)
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithClassName(className, descString) =>
+        className shouldBe 
"org.apache.texera.amber.operator.sortPartitions.SortPartitionsOpExec"
+        descString should not be empty
+      case other => fail(s"expected OpExecWithClassName, got $other")
+    }
+    physical.inputPorts.keySet shouldBe 
op.operatorInfo.inputPorts.map(_.id).toSet
+    physical.outputPorts.keySet shouldBe 
op.operatorInfo.outputPorts.map(_.id).toSet
+    physical.partitionRequirement shouldBe List(
+      Option(RangePartition(List("score"), 0L, 100L))
+    )
+  }
+
+  "SortPartitionsOpDesc" should "round-trip its attribute/domain fields 
through the polymorphic base" in {
+    val json = objectMapper.writeValueAsString(newDesc("age", 1L, 99L))
+    val restored = objectMapper.readValue(json, classOf[LogicalOp])
+    restored shouldBe a[SortPartitionsOpDesc]
+    val sp = restored.asInstanceOf[SortPartitionsOpDesc]
+    sp.sortAttributeName shouldBe "age"
+    sp.domainMin shouldBe 1L
+    sp.domainMax shouldBe 99L
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/symmetricDifference/SymmetricDifferenceOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/symmetricDifference/SymmetricDifferenceOpDescSpec.scala
new file mode 100644
index 0000000000..e6e2995877
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/symmetricDifference/SymmetricDifferenceOpDescSpec.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.symmetricDifference
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{HashPartition, PortIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class SymmetricDifferenceOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  private val schemaA = Schema().add(new Attribute("a", AttributeType.STRING))
+  private val schemaDifferent = Schema().add(new Attribute("b", 
AttributeType.INTEGER))
+
+  "SymmetricDifferenceOpDesc.operatorInfo" should
+    "advertise the name, Set group, two inputs, and a single blocking output" 
in {
+    val info = (new SymmetricDifferenceOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "SymmetricDifference"
+    info.operatorGroupName shouldBe OperatorGroupConstants.SET_GROUP
+    info.inputPorts.map(_.id) shouldBe List(PortIdentity(0), PortIdentity(1))
+    info.outputPorts should have length 1
+    info.outputPorts.head.blocking shouldBe true
+  }
+
+  "SymmetricDifferenceOpDesc.getPhysicalOp" should
+    "wire SymmetricDifferenceOpExec, carry port identities, and require 
HashPartition on both inputs" in {
+    val op = new SymmetricDifferenceOpDesc
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithClassName(className, _) =>
+        className shouldBe 
"org.apache.texera.amber.operator.symmetricDifference.SymmetricDifferenceOpExec"
+      case other => fail(s"expected OpExecWithClassName, got $other")
+    }
+    physical.inputPorts.keySet shouldBe 
op.operatorInfo.inputPorts.map(_.id).toSet
+    physical.outputPorts.keySet shouldBe 
op.operatorInfo.outputPorts.map(_.id).toSet
+    physical.partitionRequirement shouldBe List(Option(HashPartition()), 
Option(HashPartition()))
+  }
+
+  "SymmetricDifferenceOpDesc schema propagation" should
+    "pass the shared input schema through to the output port" in {
+    val op = new SymmetricDifferenceOpDesc
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    val inputs = Map(PortIdentity(0) -> schemaA, PortIdentity(1) -> schemaA)
+    val outputs = physical.propagateSchema.func(inputs)
+    outputs.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet
+    outputs.values.toSet shouldBe Set(schemaA)
+  }
+
+  it should "reject inputs whose schemas differ" in {
+    val physical = (new SymmetricDifferenceOpDesc).getPhysicalOp(workflowId, 
executionId)
+    val mismatched = Map(PortIdentity(0) -> schemaA, PortIdentity(1) -> 
schemaDifferent)
+    intercept[IllegalArgumentException] {
+      physical.propagateSchema.func(mismatched)
+    }
+  }
+
+  "SymmetricDifferenceOpDesc" should
+    "round-trip through the polymorphic base (pins the SymmetricDifference 
discriminator)" in {
+    // The operator has no config fields, so this pins the @JsonSubTypes
+    // discriminator + type resolution, on par with the other specs in this PR.
+    val json = objectMapper.writeValueAsString(new SymmetricDifferenceOpDesc)
+    objectMapper.readValue(json, classOf[LogicalOp]) shouldBe 
a[SymmetricDifferenceOpDesc]
+  }
+}

Reply via email to