This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5845-6d31f46ecfe894fb04f783972c8f9566afb5a16b in repository https://gitbox.apache.org/repos/asf/texera.git
commit 58824343dfd0f25ba9b79f34017743d6b3cdcdf3 Author: Xinyuan Lin <[email protected]> AuthorDate: Sat Jun 20 23:49:49 2026 -0700 test(workflow-operator): add unit test coverage for IntervalJoin, JoinUtils, and OperatorGroupConstants (#5845) ### What changes were proposed in this PR? Pin behavior of three join/utility classes in `common/workflow-operator/`. No production-code changes. | Spec | Source class | Tests | | --- | --- | --- | | `IntervalJoinOpDescSpec` | `IntervalJoinOpDesc` | 5 | | `JoinUtilsSpec` | `JoinUtils` (object) | 3 | | `OperatorGroupConstantsSpec` | `OperatorGroupConstants` (object) | 4 | **Behavior pinned — `IntervalJoinOpDesc`** | Surface | Contract | | --- | --- | | `operatorInfo` | `Interval Join`, JOIN_GROUP; two ordered inputs (`left table` at `PortIdentity()`, `right table` at `PortIdentity(1)` depending on the left); one output | | Field defaults | join-key attrs `null`; `constant == 10`; `includeLeftBound`/`includeRightBound == true` | | `getPhysicalOp` | wires `IntervalJoinOpExec`; port identities carried; `HashPartition` requirement on each join key | | Schema propagation | merges left ⧺ right schemas, suffixing a conflicting attribute with `#@1` | **Behavior pinned — `JoinUtils.joinTuples`** | Surface | Contract | | --- | --- | | Concatenation | left + right fields merged | | Skip | the named join-key attribute is dropped from the right side | | Conflict rename | a right-side name collision is renamed with a `#@1` suffix | **Behavior pinned — `OperatorGroupConstants`** | Surface | Contract | | --- | --- | | Constant values | the canonical group-name strings (`Data Input`, `Join`, `Set`, … `Control Block`) | | `OperatorGroupOrderList` | starts at `Data Input`, ends at `Control Block`; relational subgroups (Join/Set/Aggregate/Sort) nested under `Data Cleaning`; visualization subgroups nested under `Visualization` in panel order | ### Any related issues, documentation, discussions? Closes #5840. ### How was this PR tested? - `sbt "WorkflowOperator/testOnly org.apache.texera.amber.operator.intervalJoin.IntervalJoinOpDescSpec org.apache.texera.amber.operator.hashJoin.JoinUtilsSpec org.apache.texera.amber.operator.metadata.OperatorGroupConstantsSpec"` — 12 tests, all green - `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt "WorkflowOperator/Test/scalafix --check"` — clean - CI to confirm ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.8 [1M context]) --- .../amber/operator/hashJoin/JoinUtilsSpec.scala | 93 ++++++++++++++++ .../intervalJoin/IntervalJoinOpDescSpec.scala | 121 +++++++++++++++++++++ .../metadata/OperatorGroupConstantsSpec.scala | 92 ++++++++++++++++ 3 files changed, 306 insertions(+) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/hashJoin/JoinUtilsSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/hashJoin/JoinUtilsSpec.scala new file mode 100644 index 0000000000..04e2af16f8 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/hashJoin/JoinUtilsSpec.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.hashJoin + +import org.apache.texera.amber.core.tuple.{ + Attribute, + AttributeType, + Schema, + SchemaEnforceable, + Tuple +} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class JoinUtilsSpec extends AnyFlatSpec with Matchers { + + private def schemaOf(attrs: (String, AttributeType)*): Schema = + attrs.foldLeft(Schema())((s, a) => s.add(new Attribute(a._1, a._2))) + + private def tupleOf(schema: Schema, values: (String, Any)*): Tuple = { + val b = Tuple.builder(schema) + values.foreach { case (name, v) => b.add(schema.getAttribute(name), v) } + b.build() + } + + "JoinUtils.joinTuples" should "concatenate the left and right tuple fields" in { + val leftSchema = schemaOf("a" -> AttributeType.STRING, "b" -> AttributeType.INTEGER) + val rightSchema = schemaOf("c" -> AttributeType.STRING) + val joined = JoinUtils.joinTuples( + tupleOf(leftSchema, "a" -> "av", "b" -> Integer.valueOf(1)), + tupleOf(rightSchema, "c" -> "cv") + ) + joined.getFields.length shouldBe 3 + val enforced = joined + .asInstanceOf[SchemaEnforceable] + .enforceSchema( + schemaOf( + "a" -> AttributeType.STRING, + "b" -> AttributeType.INTEGER, + "c" -> AttributeType.STRING + ) + ) + enforced.getField[String]("a") shouldBe "av" + enforced.getField[String]("c") shouldBe "cv" + } + + it should "skip the named attribute (the join key) from the right tuple" in { + val leftSchema = schemaOf("a" -> AttributeType.STRING) + val rightSchema = schemaOf("k" -> AttributeType.STRING, "c" -> AttributeType.STRING) + val joined = JoinUtils.joinTuples( + tupleOf(leftSchema, "a" -> "av"), + tupleOf(rightSchema, "k" -> "kv", "c" -> "cv"), + skipAttributeName = Some("k") + ) + joined.getFields.length shouldBe 2 + val enforced = joined + .asInstanceOf[SchemaEnforceable] + .enforceSchema(schemaOf("a" -> AttributeType.STRING, "c" -> AttributeType.STRING)) + enforced.getField[String]("a") shouldBe "av" + enforced.getField[String]("c") shouldBe "cv" + } + + it should "rename a right-side name conflict with a #@1 suffix" in { + val schema = schemaOf("x" -> AttributeType.STRING) + val joined = JoinUtils.joinTuples( + tupleOf(schema, "x" -> "L"), + tupleOf(schema, "x" -> "R") + ) + joined.getFields.length shouldBe 2 + val enforced = joined + .asInstanceOf[SchemaEnforceable] + .enforceSchema(schemaOf("x" -> AttributeType.STRING, "x#@1" -> AttributeType.STRING)) + enforced.getField[String]("x") shouldBe "L" + enforced.getField[String]("x#@1") shouldBe "R" + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intervalJoin/IntervalJoinOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intervalJoin/IntervalJoinOpDescSpec.scala new file mode 100644 index 0000000000..01d9d2e10b --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intervalJoin/IntervalJoinOpDescSpec.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.intervalJoin + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{HashPartition, PortIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class IntervalJoinOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + // Set the join keys + a concrete (non-null) Option for timeIntervalType before any + // path that serializes `this` (getPhysicalOp / round-trip). + private def configured(): IntervalJoinOpDesc = { + val d = new IntervalJoinOpDesc + d.leftAttributeName = "lk" + d.rightAttributeName = "rk" + d.timeIntervalType = None + d + } + + "IntervalJoinOpDesc.operatorInfo" should + "advertise two ordered inputs (left then right) in the Join group" in { + val info = (new IntervalJoinOpDesc).operatorInfo + info.userFriendlyName shouldBe "Interval Join" + info.operatorGroupName shouldBe OperatorGroupConstants.JOIN_GROUP + info.inputPorts should have length 2 + info.inputPorts.head.id shouldBe PortIdentity() + info.inputPorts.head.displayName shouldBe "left table" + info.inputPorts.last.id shouldBe PortIdentity(1) + info.inputPorts.last.displayName shouldBe "right table" + info.inputPorts.last.dependencies shouldBe List(PortIdentity(0)) + info.outputPorts should have length 1 + } + + "IntervalJoinOpDesc" should + "default the join-key attributes to null and the bounds/constant to their defaults" in { + val d = new IntervalJoinOpDesc + d.leftAttributeName shouldBe null + d.rightAttributeName shouldBe null + d.constant shouldBe 10L + d.includeLeftBound shouldBe true + d.includeRightBound shouldBe true + } + + "IntervalJoinOpDesc.getPhysicalOp" should + "wire IntervalJoinOpExec, carry port identities, and require HashPartition on each join key" in { + val op = configured() + val physical = op.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithClassName(className, descString) => + className shouldBe "org.apache.texera.amber.operator.intervalJoin.IntervalJoinOpExec" + descString should not be empty + case other => fail(s"expected OpExecWithClassName, got $other") + } + physical.inputPorts.keySet shouldBe op.operatorInfo.inputPorts.map(_.id).toSet + physical.outputPorts.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet + physical.partitionRequirement shouldBe List( + Option(HashPartition(List("lk"))), + Option(HashPartition(List("rk"))) + ) + } + + "IntervalJoinOpDesc schema propagation" should + "merge the left and right schemas, suffixing a conflicting attribute with #@1" in { + val op = configured() + val physical = op.getPhysicalOp(workflowId, executionId) + val leftSchema = Schema() + .add(new Attribute("a", AttributeType.STRING)) + .add(new Attribute("k", AttributeType.LONG)) + val rightSchema = Schema() + .add(new Attribute("b", AttributeType.STRING)) + .add(new Attribute("k", AttributeType.LONG)) + val out = physical.propagateSchema.func( + Map(PortIdentity() -> leftSchema, PortIdentity(1) -> rightSchema) + ) + out.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet + out(op.operatorInfo.outputPorts.head.id).getAttributes.map(_.getName) shouldBe + List("a", "k", "b", "k#@1") + } + + "IntervalJoinOpDesc" should "round-trip its fields through the polymorphic base" in { + val d = configured() + d.constant = 42L + d.includeLeftBound = false + d.includeRightBound = false + val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), classOf[LogicalOp]) + restored shouldBe a[IntervalJoinOpDesc] + val ij = restored.asInstanceOf[IntervalJoinOpDesc] + ij.leftAttributeName shouldBe "lk" + ij.rightAttributeName shouldBe "rk" + ij.constant shouldBe 42L + ij.includeLeftBound shouldBe false + ij.includeRightBound shouldBe false + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OperatorGroupConstantsSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OperatorGroupConstantsSpec.scala new file mode 100644 index 0000000000..94ef1a040c --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OperatorGroupConstantsSpec.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.metadata + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class OperatorGroupConstantsSpec extends AnyFlatSpec with Matchers { + + import OperatorGroupConstants._ + + "OperatorGroupConstants" should "pin the canonical group-name string value of every constant" in { + INPUT_GROUP shouldBe "Data Input" + DATABASE_GROUP shouldBe "Database Connector" + SEARCH_GROUP shouldBe "Search" + CLEANING_GROUP shouldBe "Data Cleaning" + JOIN_GROUP shouldBe "Join" + SET_GROUP shouldBe "Set" + AGGREGATE_GROUP shouldBe "Aggregate" + SORT_GROUP shouldBe "Sort" + UTILITY_GROUP shouldBe "Utilities" + API_GROUP shouldBe "External API" + VISUALIZATION_GROUP shouldBe "Visualization" + VISUALIZATION_BASIC_GROUP shouldBe "Basic" + VISUALIZATION_STATISTICAL_GROUP shouldBe "Statistical" + VISUALIZATION_SCIENTIFIC_GROUP shouldBe "Scientific" + VISUALIZATION_FINANCIAL_GROUP shouldBe "Financial" + VISUALIZATION_MEDIA_GROUP shouldBe "Media" + VISUALIZATION_ADVANCED_GROUP shouldBe "Advanced" + MACHINE_LEARNING_GROUP shouldBe "Machine Learning" + ADVANCED_SKLEARN_GROUP shouldBe "Advanced Sklearn" + HUGGINGFACE_GROUP shouldBe "Hugging Face" + SKLEARN_GROUP shouldBe "Sklearn" + SKLEARN_TRAINING_GROUP shouldBe "Sklearn Training" + UDF_GROUP shouldBe "User-defined Functions" + PYTHON_GROUP shouldBe "Python" + JAVA_GROUP shouldBe "Java" + R_GROUP shouldBe "R" + MACHINE_LEARNING_GENERAL_GROUP shouldBe "Machine Learning General" + CONTROL_GROUP shouldBe "Control Block" + } + + "OperatorGroupOrderList" should "start at Data Input, contain Visualization, and place Control Block last" in { + val names = OperatorGroupOrderList.map(_.groupName) + names.head shouldBe INPUT_GROUP + names.last shouldBe CONTROL_GROUP + names should contain(VISUALIZATION_GROUP) + } + + it should "nest the relational subgroups (Join/Set/Aggregate/Sort) under Data Cleaning" in { + val cleaning = OperatorGroupOrderList + .find(_.groupName == CLEANING_GROUP) + .getOrElse(fail("Data Cleaning group missing from OperatorGroupOrderList")) + cleaning.children.map(_.groupName) shouldBe List( + JOIN_GROUP, + SET_GROUP, + AGGREGATE_GROUP, + SORT_GROUP + ) + } + + it should "nest the visualization subgroups under Visualization (in panel order)" in { + val viz = OperatorGroupOrderList + .find(_.groupName == VISUALIZATION_GROUP) + .getOrElse(fail("Visualization group missing from OperatorGroupOrderList")) + viz.children.map(_.groupName) shouldBe List( + VISUALIZATION_BASIC_GROUP, + VISUALIZATION_STATISTICAL_GROUP, + VISUALIZATION_SCIENTIFIC_GROUP, + VISUALIZATION_FINANCIAL_GROUP, + VISUALIZATION_MEDIA_GROUP, + VISUALIZATION_ADVANCED_GROUP + ) + } +}
