This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 58824343df test(workflow-operator): add unit test coverage for
IntervalJoin, JoinUtils, and OperatorGroupConstants (#5845)
58824343df is described below
commit 58824343dfd0f25ba9b79f34017743d6b3cdcdf3
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 20 23:49:49 2026 -0700
test(workflow-operator): add unit test coverage for IntervalJoin,
JoinUtils, and OperatorGroupConstants (#5845)
### What changes were proposed in this PR?
Pin behavior of three join/utility classes in
`common/workflow-operator/`. No production-code changes.
| Spec | Source class | Tests |
| --- | --- | --- |
| `IntervalJoinOpDescSpec` | `IntervalJoinOpDesc` | 5 |
| `JoinUtilsSpec` | `JoinUtils` (object) | 3 |
| `OperatorGroupConstantsSpec` | `OperatorGroupConstants` (object) | 4 |
**Behavior pinned — `IntervalJoinOpDesc`**
| Surface | Contract |
| --- | --- |
| `operatorInfo` | `Interval Join`, JOIN_GROUP; two ordered inputs
(`left table` at `PortIdentity()`, `right table` at `PortIdentity(1)`
depending on the left); one output |
| Field defaults | join-key attrs `null`; `constant == 10`;
`includeLeftBound`/`includeRightBound == true` |
| `getPhysicalOp` | wires `IntervalJoinOpExec`; port identities carried;
`HashPartition` requirement on each join key |
| Schema propagation | merges left ⧺ right schemas, suffixing a
conflicting attribute with `#@1` |
**Behavior pinned — `JoinUtils.joinTuples`**
| Surface | Contract |
| --- | --- |
| Concatenation | left + right fields merged |
| Skip | the named join-key attribute is dropped from the right side |
| Conflict rename | a right-side name collision is renamed with a `#@1`
suffix |
**Behavior pinned — `OperatorGroupConstants`**
| Surface | Contract |
| --- | --- |
| Constant values | the canonical group-name strings (`Data Input`,
`Join`, `Set`, … `Control Block`) |
| `OperatorGroupOrderList` | starts at `Data Input`, ends at `Control
Block`; relational subgroups (Join/Set/Aggregate/Sort) nested under
`Data Cleaning`; visualization subgroups nested under `Visualization` in
panel order |
### Any related issues, documentation, discussions?
Closes #5840.
### How was this PR tested?
- `sbt "WorkflowOperator/testOnly
org.apache.texera.amber.operator.intervalJoin.IntervalJoinOpDescSpec
org.apache.texera.amber.operator.hashJoin.JoinUtilsSpec
org.apache.texera.amber.operator.metadata.OperatorGroupConstantsSpec"` —
12 tests, all green
- `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt
"WorkflowOperator/Test/scalafix --check"` — clean
- CI to confirm
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.8 [1M context])
---
.../amber/operator/hashJoin/JoinUtilsSpec.scala | 93 ++++++++++++++++
.../intervalJoin/IntervalJoinOpDescSpec.scala | 121 +++++++++++++++++++++
.../metadata/OperatorGroupConstantsSpec.scala | 92 ++++++++++++++++
3 files changed, 306 insertions(+)
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/hashJoin/JoinUtilsSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/hashJoin/JoinUtilsSpec.scala
new file mode 100644
index 0000000000..04e2af16f8
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/hashJoin/JoinUtilsSpec.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.hashJoin
+
+import org.apache.texera.amber.core.tuple.{
+ Attribute,
+ AttributeType,
+ Schema,
+ SchemaEnforceable,
+ Tuple
+}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class JoinUtilsSpec extends AnyFlatSpec with Matchers {
+
+ private def schemaOf(attrs: (String, AttributeType)*): Schema =
+ attrs.foldLeft(Schema())((s, a) => s.add(new Attribute(a._1, a._2)))
+
+ private def tupleOf(schema: Schema, values: (String, Any)*): Tuple = {
+ val b = Tuple.builder(schema)
+ values.foreach { case (name, v) => b.add(schema.getAttribute(name), v) }
+ b.build()
+ }
+
+ "JoinUtils.joinTuples" should "concatenate the left and right tuple fields"
in {
+ val leftSchema = schemaOf("a" -> AttributeType.STRING, "b" ->
AttributeType.INTEGER)
+ val rightSchema = schemaOf("c" -> AttributeType.STRING)
+ val joined = JoinUtils.joinTuples(
+ tupleOf(leftSchema, "a" -> "av", "b" -> Integer.valueOf(1)),
+ tupleOf(rightSchema, "c" -> "cv")
+ )
+ joined.getFields.length shouldBe 3
+ val enforced = joined
+ .asInstanceOf[SchemaEnforceable]
+ .enforceSchema(
+ schemaOf(
+ "a" -> AttributeType.STRING,
+ "b" -> AttributeType.INTEGER,
+ "c" -> AttributeType.STRING
+ )
+ )
+ enforced.getField[String]("a") shouldBe "av"
+ enforced.getField[String]("c") shouldBe "cv"
+ }
+
+ it should "skip the named attribute (the join key) from the right tuple" in {
+ val leftSchema = schemaOf("a" -> AttributeType.STRING)
+ val rightSchema = schemaOf("k" -> AttributeType.STRING, "c" ->
AttributeType.STRING)
+ val joined = JoinUtils.joinTuples(
+ tupleOf(leftSchema, "a" -> "av"),
+ tupleOf(rightSchema, "k" -> "kv", "c" -> "cv"),
+ skipAttributeName = Some("k")
+ )
+ joined.getFields.length shouldBe 2
+ val enforced = joined
+ .asInstanceOf[SchemaEnforceable]
+ .enforceSchema(schemaOf("a" -> AttributeType.STRING, "c" ->
AttributeType.STRING))
+ enforced.getField[String]("a") shouldBe "av"
+ enforced.getField[String]("c") shouldBe "cv"
+ }
+
+ it should "rename a right-side name conflict with a #@1 suffix" in {
+ val schema = schemaOf("x" -> AttributeType.STRING)
+ val joined = JoinUtils.joinTuples(
+ tupleOf(schema, "x" -> "L"),
+ tupleOf(schema, "x" -> "R")
+ )
+ joined.getFields.length shouldBe 2
+ val enforced = joined
+ .asInstanceOf[SchemaEnforceable]
+ .enforceSchema(schemaOf("x" -> AttributeType.STRING, "x#@1" ->
AttributeType.STRING))
+ enforced.getField[String]("x") shouldBe "L"
+ enforced.getField[String]("x#@1") shouldBe "R"
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intervalJoin/IntervalJoinOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intervalJoin/IntervalJoinOpDescSpec.scala
new file mode 100644
index 0000000000..01d9d2e10b
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intervalJoin/IntervalJoinOpDescSpec.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.intervalJoin
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{HashPartition, PortIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class IntervalJoinOpDescSpec extends AnyFlatSpec with Matchers {
+
+ private val workflowId = WorkflowIdentity(1L)
+ private val executionId = ExecutionIdentity(1L)
+
+ // Set the join keys + a concrete (non-null) Option for timeIntervalType
before any
+ // path that serializes `this` (getPhysicalOp / round-trip).
+ private def configured(): IntervalJoinOpDesc = {
+ val d = new IntervalJoinOpDesc
+ d.leftAttributeName = "lk"
+ d.rightAttributeName = "rk"
+ d.timeIntervalType = None
+ d
+ }
+
+ "IntervalJoinOpDesc.operatorInfo" should
+ "advertise two ordered inputs (left then right) in the Join group" in {
+ val info = (new IntervalJoinOpDesc).operatorInfo
+ info.userFriendlyName shouldBe "Interval Join"
+ info.operatorGroupName shouldBe OperatorGroupConstants.JOIN_GROUP
+ info.inputPorts should have length 2
+ info.inputPorts.head.id shouldBe PortIdentity()
+ info.inputPorts.head.displayName shouldBe "left table"
+ info.inputPorts.last.id shouldBe PortIdentity(1)
+ info.inputPorts.last.displayName shouldBe "right table"
+ info.inputPorts.last.dependencies shouldBe List(PortIdentity(0))
+ info.outputPorts should have length 1
+ }
+
+ "IntervalJoinOpDesc" should
+ "default the join-key attributes to null and the bounds/constant to their
defaults" in {
+ val d = new IntervalJoinOpDesc
+ d.leftAttributeName shouldBe null
+ d.rightAttributeName shouldBe null
+ d.constant shouldBe 10L
+ d.includeLeftBound shouldBe true
+ d.includeRightBound shouldBe true
+ }
+
+ "IntervalJoinOpDesc.getPhysicalOp" should
+ "wire IntervalJoinOpExec, carry port identities, and require HashPartition
on each join key" in {
+ val op = configured()
+ val physical = op.getPhysicalOp(workflowId, executionId)
+ physical.opExecInitInfo match {
+ case OpExecWithClassName(className, descString) =>
+ className shouldBe
"org.apache.texera.amber.operator.intervalJoin.IntervalJoinOpExec"
+ descString should not be empty
+ case other => fail(s"expected OpExecWithClassName, got $other")
+ }
+ physical.inputPorts.keySet shouldBe
op.operatorInfo.inputPorts.map(_.id).toSet
+ physical.outputPorts.keySet shouldBe
op.operatorInfo.outputPorts.map(_.id).toSet
+ physical.partitionRequirement shouldBe List(
+ Option(HashPartition(List("lk"))),
+ Option(HashPartition(List("rk")))
+ )
+ }
+
+ "IntervalJoinOpDesc schema propagation" should
+ "merge the left and right schemas, suffixing a conflicting attribute with
#@1" in {
+ val op = configured()
+ val physical = op.getPhysicalOp(workflowId, executionId)
+ val leftSchema = Schema()
+ .add(new Attribute("a", AttributeType.STRING))
+ .add(new Attribute("k", AttributeType.LONG))
+ val rightSchema = Schema()
+ .add(new Attribute("b", AttributeType.STRING))
+ .add(new Attribute("k", AttributeType.LONG))
+ val out = physical.propagateSchema.func(
+ Map(PortIdentity() -> leftSchema, PortIdentity(1) -> rightSchema)
+ )
+ out.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet
+ out(op.operatorInfo.outputPorts.head.id).getAttributes.map(_.getName)
shouldBe
+ List("a", "k", "b", "k#@1")
+ }
+
+ "IntervalJoinOpDesc" should "round-trip its fields through the polymorphic
base" in {
+ val d = configured()
+ d.constant = 42L
+ d.includeLeftBound = false
+ d.includeRightBound = false
+ val restored = objectMapper.readValue(objectMapper.writeValueAsString(d),
classOf[LogicalOp])
+ restored shouldBe a[IntervalJoinOpDesc]
+ val ij = restored.asInstanceOf[IntervalJoinOpDesc]
+ ij.leftAttributeName shouldBe "lk"
+ ij.rightAttributeName shouldBe "rk"
+ ij.constant shouldBe 42L
+ ij.includeLeftBound shouldBe false
+ ij.includeRightBound shouldBe false
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OperatorGroupConstantsSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OperatorGroupConstantsSpec.scala
new file mode 100644
index 0000000000..94ef1a040c
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OperatorGroupConstantsSpec.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.metadata
+
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class OperatorGroupConstantsSpec extends AnyFlatSpec with Matchers {
+
+ import OperatorGroupConstants._
+
+ "OperatorGroupConstants" should "pin the canonical group-name string value
of every constant" in {
+ INPUT_GROUP shouldBe "Data Input"
+ DATABASE_GROUP shouldBe "Database Connector"
+ SEARCH_GROUP shouldBe "Search"
+ CLEANING_GROUP shouldBe "Data Cleaning"
+ JOIN_GROUP shouldBe "Join"
+ SET_GROUP shouldBe "Set"
+ AGGREGATE_GROUP shouldBe "Aggregate"
+ SORT_GROUP shouldBe "Sort"
+ UTILITY_GROUP shouldBe "Utilities"
+ API_GROUP shouldBe "External API"
+ VISUALIZATION_GROUP shouldBe "Visualization"
+ VISUALIZATION_BASIC_GROUP shouldBe "Basic"
+ VISUALIZATION_STATISTICAL_GROUP shouldBe "Statistical"
+ VISUALIZATION_SCIENTIFIC_GROUP shouldBe "Scientific"
+ VISUALIZATION_FINANCIAL_GROUP shouldBe "Financial"
+ VISUALIZATION_MEDIA_GROUP shouldBe "Media"
+ VISUALIZATION_ADVANCED_GROUP shouldBe "Advanced"
+ MACHINE_LEARNING_GROUP shouldBe "Machine Learning"
+ ADVANCED_SKLEARN_GROUP shouldBe "Advanced Sklearn"
+ HUGGINGFACE_GROUP shouldBe "Hugging Face"
+ SKLEARN_GROUP shouldBe "Sklearn"
+ SKLEARN_TRAINING_GROUP shouldBe "Sklearn Training"
+ UDF_GROUP shouldBe "User-defined Functions"
+ PYTHON_GROUP shouldBe "Python"
+ JAVA_GROUP shouldBe "Java"
+ R_GROUP shouldBe "R"
+ MACHINE_LEARNING_GENERAL_GROUP shouldBe "Machine Learning General"
+ CONTROL_GROUP shouldBe "Control Block"
+ }
+
+ "OperatorGroupOrderList" should "start at Data Input, contain Visualization,
and place Control Block last" in {
+ val names = OperatorGroupOrderList.map(_.groupName)
+ names.head shouldBe INPUT_GROUP
+ names.last shouldBe CONTROL_GROUP
+ names should contain(VISUALIZATION_GROUP)
+ }
+
+ it should "nest the relational subgroups (Join/Set/Aggregate/Sort) under
Data Cleaning" in {
+ val cleaning = OperatorGroupOrderList
+ .find(_.groupName == CLEANING_GROUP)
+ .getOrElse(fail("Data Cleaning group missing from
OperatorGroupOrderList"))
+ cleaning.children.map(_.groupName) shouldBe List(
+ JOIN_GROUP,
+ SET_GROUP,
+ AGGREGATE_GROUP,
+ SORT_GROUP
+ )
+ }
+
+ it should "nest the visualization subgroups under Visualization (in panel
order)" in {
+ val viz = OperatorGroupOrderList
+ .find(_.groupName == VISUALIZATION_GROUP)
+ .getOrElse(fail("Visualization group missing from
OperatorGroupOrderList"))
+ viz.children.map(_.groupName) shouldBe List(
+ VISUALIZATION_BASIC_GROUP,
+ VISUALIZATION_STATISTICAL_GROUP,
+ VISUALIZATION_SCIENTIFIC_GROUP,
+ VISUALIZATION_FINANCIAL_GROUP,
+ VISUALIZATION_MEDIA_GROUP,
+ VISUALIZATION_ADVANCED_GROUP
+ )
+ }
+}