This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 8ca6bb2aef test(workflow-operator): add unit test coverage for 
row-count-shaping operator descriptors (#5814)
8ca6bb2aef is described below

commit 8ca6bb2aef8a4bace84721941a2254d176aaeb49
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 20 00:19:16 2026 -0700

    test(workflow-operator): add unit test coverage for row-count-shaping 
operator descriptors (#5814)
    
    ### What changes were proposed in this PR?
    
    Pin behavior of three previously-untested descriptors that shape/reduce
    output row counts in `common/workflow-operator/`. No production-code
    changes.
    
    | Spec | Source class | Tests |
    | --- | --- | --- |
    | `LimitOpDescSpec` | `LimitOpDesc` | 5 |
    | `RandomKSamplingOpDescSpec` | `RandomKSamplingOpDesc` | 3 |
    | `ReservoirSamplingOpDescSpec` | `ReservoirSamplingOpDesc` | 3 |
    
    All three spec files follow the `<srcClassName>Spec.scala` one-to-one
    convention.
    
    **Behavior pinned — `LimitOpDesc`**
    
    | Surface | Contract |
    | --- | --- |
    | `operatorInfo` | `Limit`, `CLEANING_GROUP`, 1-in/1-out,
    `supportReconfiguration == true` |
    | Polymorphic deserialize | `{"operatorType":"Limit","limit":N}` via
    `classOf[LogicalOp]` yields a `LimitOpDesc` with `limit == N` |
    | `getPhysicalOp` | non-parallelizable; wires `LimitOpExec`; ports
    carried forward |
    | `runtimeReconfiguration` | returns `Success` with a
    `StateTransferFunc`; the func copies the running `count` from the old
    `LimitOpExec` to the new one (exercised end-to-end with two real exec
    instances) |
    
    **Behavior pinned — `RandomKSamplingOpDesc`**
    
    | Surface | Contract |
    | --- | --- |
    | `operatorInfo` | `Random K Sampling`, `UTILITY_GROUP`,
    `supportReconfiguration == true` |
    | `percentage` round-trip | serializes under the spaced wire-key `random
    k sample percentage`; survives a polymorphic round-trip |
    | `getPhysicalOp` | wires `RandomKSamplingOpExec`; ports carried forward
    |
    
    **Behavior pinned — `ReservoirSamplingOpDesc`**
    
    | Surface | Contract |
    | --- | --- |
    | `operatorInfo` | `Reservoir Sampling`, `UTILITY_GROUP`,
    `supportReconfiguration == false` (the intentional difference vs
    RandomKSampling — pinned so a future "fix" that flips it is caught) |
    | `k` round-trip | serializes under the wire-key `number of item sampled
    in reservoir sampling` |
    | `getPhysicalOp` | wires `ReservoirSamplingOpExec`; ports carried
    forward |
    
    ### Any related issues, documentation, discussions?
    
    Closes #5807.
    
    ### How was this PR tested?
    
    Pure unit-test additions; verified locally with:
    
    - `sbt "WorkflowOperator/testOnly
    org.apache.texera.amber.operator.limit.LimitOpDescSpec
    org.apache.texera.amber.operator.randomksampling.RandomKSamplingOpDescSpec
    
org.apache.texera.amber.operator.reservoirsampling.ReservoirSamplingOpDescSpec"`
    — 11 tests, all green
    - `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt
    "WorkflowOperator/Test/scalafix --check"` — clean
    - CI to confirm
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.8 [1M context])
---
 .../amber/operator/limit/LimitOpDescSpec.scala     | 97 ++++++++++++++++++++++
 .../RandomKSamplingOpDescSpec.scala                | 76 +++++++++++++++++
 .../ReservoirSamplingOpDescSpec.scala              | 76 +++++++++++++++++
 3 files changed, 249 insertions(+)

diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/limit/LimitOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/limit/LimitOpDescSpec.scala
new file mode 100644
index 0000000000..f01e8f63f0
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/limit/LimitOpDescSpec.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.limit
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+import scala.util.Success
+
+class LimitOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  // LogicalOp carries @JsonTypeInfo(property = "operatorType"); deserialize 
via the
+  // base type with the registered discriminator name "Limit".
+  private def limitDesc(n: Int): LimitOpDesc =
+    objectMapper
+      .readValue(s"""{"operatorType":"Limit","limit":$n}""", 
classOf[LogicalOp])
+      .asInstanceOf[LimitOpDesc]
+
+  "LimitOpDesc.operatorInfo" should
+    "advertise the name, Cleaning group, and reconfiguration support" in {
+    val info = (new LimitOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Limit"
+    info.operatorDescription shouldBe "Limit the number of output rows"
+    info.operatorGroupName shouldBe OperatorGroupConstants.CLEANING_GROUP
+    info.inputPorts should have length 1
+    info.outputPorts should have length 1
+    info.supportReconfiguration shouldBe true
+  }
+
+  "LimitOpDesc" should "deserialize the limit field through the polymorphic 
base" in {
+    limitDesc(42).limit shouldBe 42
+  }
+
+  "LimitOpDesc.getPhysicalOp" should "be non-parallelizable and wire 
LimitOpExec" in {
+    val physical = limitDesc(10).getPhysicalOp(workflowId, executionId)
+    physical.parallelizable shouldBe false
+    physical.opExecInitInfo match {
+      case OpExecWithClassName(className, descString) =>
+        className shouldBe "org.apache.texera.amber.operator.limit.LimitOpExec"
+        descString should not be empty
+      case other => fail(s"expected OpExecWithClassName, got $other")
+    }
+  }
+
+  it should "carry forward the operatorInfo input/output port identities" in {
+    val op = limitDesc(10)
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    // Pin the actual port identities (not just counts).
+    physical.inputPorts.keySet shouldBe 
op.operatorInfo.inputPorts.map(_.id).toSet
+    physical.outputPorts.keySet shouldBe 
op.operatorInfo.outputPorts.map(_.id).toSet
+  }
+
+  "LimitOpDesc.runtimeReconfiguration" should
+    "return Success with a state-transfer func that copies the running row 
count" in {
+    val desc = limitDesc(5)
+    val result = desc.runtimeReconfiguration(workflowId, executionId, desc, 
desc)
+    result shouldBe a[Success[_]]
+    val (_, transferOpt) = result.get
+    transferOpt should not be empty
+
+    // Exercise the state-transfer func: a freshly-created exec starts at 
count 0;
+    // the func must copy the old exec's count into the new one.
+    val descJson = """{"operatorType":"Limit","limit":5}"""
+    val oldExec = new LimitOpExec(descJson)
+    oldExec.count = 3
+    val newExec = new LimitOpExec(descJson)
+    newExec.count shouldBe 0
+    val transfer = transferOpt.get
+    transfer(oldExec, newExec)
+    newExec.count shouldBe 3
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpDescSpec.scala
new file mode 100644
index 0000000000..94d450a955
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpDescSpec.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.randomksampling
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class RandomKSamplingOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  // The percentage field uses a spaced @JsonProperty wire-key.
+  private val WireKey = "random k sample percentage"
+
+  "RandomKSamplingOpDesc.operatorInfo" should
+    "advertise the name, Utility group, and reconfiguration support" in {
+    val info = (new RandomKSamplingOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Random K Sampling"
+    info.operatorDescription shouldBe "random sampling with given percentage"
+    info.operatorGroupName shouldBe OperatorGroupConstants.UTILITY_GROUP
+    info.inputPorts should have length 1
+    info.outputPorts should have length 1
+    info.supportReconfiguration shouldBe true
+  }
+
+  "RandomKSamplingOpDesc" should
+    "serialize percentage under its spaced wire-key and round-trip it" in {
+    val d = new RandomKSamplingOpDesc
+    d.percentage = 25
+    val json = objectMapper.writeValueAsString(d)
+    val tree = objectMapper.readTree(json)
+    tree.has(WireKey) shouldBe true
+    tree.get(WireKey).asInt shouldBe 25
+    val restored = objectMapper.readValue(json, classOf[LogicalOp])
+    restored shouldBe a[RandomKSamplingOpDesc]
+    restored.asInstanceOf[RandomKSamplingOpDesc].percentage shouldBe 25
+  }
+
+  "RandomKSamplingOpDesc.getPhysicalOp" should
+    "wire the RandomKSamplingOpExec class name and carry ports" in {
+    val d = new RandomKSamplingOpDesc
+    d.percentage = 50
+    val physical = d.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithClassName(className, descString) =>
+        className shouldBe 
"org.apache.texera.amber.operator.randomksampling.RandomKSamplingOpExec"
+        descString should not be empty
+      case other => fail(s"expected OpExecWithClassName, got $other")
+    }
+    physical.inputPorts.keySet shouldBe 
d.operatorInfo.inputPorts.map(_.id).toSet
+    physical.outputPorts.keySet shouldBe 
d.operatorInfo.outputPorts.map(_.id).toSet
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpDescSpec.scala
new file mode 100644
index 0000000000..0c903d6ce8
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpDescSpec.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.reservoirsampling
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class ReservoirSamplingOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  private val WireKey = "number of item sampled in reservoir sampling"
+
+  "ReservoirSamplingOpDesc.operatorInfo" should
+    "advertise the name, Utility group, and (intentionally) NOT support 
reconfiguration" in {
+    val info = (new ReservoirSamplingOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Reservoir Sampling"
+    info.operatorDescription shouldBe "Reservoir Sampling with k items being 
kept randomly"
+    info.operatorGroupName shouldBe OperatorGroupConstants.UTILITY_GROUP
+    info.inputPorts should have length 1
+    info.outputPorts should have length 1
+    // ReservoirSampling does not opt into reconfiguration (unlike 
RandomKSampling),
+    // so it inherits the OperatorInfo default of false.
+    info.supportReconfiguration shouldBe false
+  }
+
+  "ReservoirSamplingOpDesc" should "serialize k under its wire-key and 
round-trip it" in {
+    val d = new ReservoirSamplingOpDesc
+    d.k = 100
+    val json = objectMapper.writeValueAsString(d)
+    val tree = objectMapper.readTree(json)
+    tree.has(WireKey) shouldBe true
+    tree.get(WireKey).asInt shouldBe 100
+    val restored = objectMapper.readValue(json, classOf[LogicalOp])
+    restored shouldBe a[ReservoirSamplingOpDesc]
+    restored.asInstanceOf[ReservoirSamplingOpDesc].k shouldBe 100
+  }
+
+  "ReservoirSamplingOpDesc.getPhysicalOp" should
+    "wire the ReservoirSamplingOpExec class name and carry ports" in {
+    val d = new ReservoirSamplingOpDesc
+    d.k = 10
+    val physical = d.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithClassName(className, descString) =>
+        className shouldBe 
"org.apache.texera.amber.operator.reservoirsampling.ReservoirSamplingOpExec"
+        descString should not be empty
+      case other => fail(s"expected OpExecWithClassName, got $other")
+    }
+    physical.inputPorts.keySet shouldBe 
d.operatorInfo.inputPorts.map(_.id).toSet
+    physical.outputPorts.keySet shouldBe 
d.operatorInfo.outputPorts.map(_.id).toSet
+  }
+}

Reply via email to