This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5657-190823f7562ba8c0bb2a515b0ce4823cf640e049 in repository https://gitbox.apache.org/repos/asf/texera.git
commit e0a96478817a5c7a2f585de1952e40f7c8ba534f Author: Xinyuan Lin <[email protected]> AuthorDate: Fri Jun 12 17:22:16 2026 -0700 test(workflow-operator): add unit test coverage for AutoClosingIterator and UnionOpExec (#5657) ### What changes were proposed in this PR? Pin behavior of two previously-uncovered helpers in `common/workflow-operator`. No production-code changes. | Spec | Source class | Tests | | --- | --- | --- | | `AutoClosingIteratorSpec` | `AutoClosingIterator` | 10 | | `UnionOpExecSpec` | `UnionOpExec` | 7 | Both spec files follow the `<srcClassName>Spec.scala` one-to-one convention. **Behavior pinned — `AutoClosingIterator`** | Surface | Contract | | --- | --- | | `hasNext` (non-empty source) | `true`; `onClose` not invoked | | `hasNext` (exhausted source) | `false`; `onClose` invoked exactly once | | Repeated `hasNext` after exhaustion | does NOT re-fire `onClose` (`alreadyClosed` guard) | | `next()` | delegates straight to the wrapped iterator (in order) | | Full traversal via `toList` | yields every element; `onClose` fires once at the end | | Already-empty source | first `hasNext` returns `false` and fires `onClose` | | Mid-iteration | `onClose` stays un-fired between elements | | `onClose` throws | exception propagates (no swallowing) | | `onClose` throws + retry | current impl re-fires `onClose` (assignment to `alreadyClosed` runs AFTER `onClose()`); characterization pins this behavior | **Behavior pinned — `UnionOpExec`** | Surface | Contract | | --- | --- | | `processTuple(tuple, port = 0)` | yields a single-element iterator containing the tuple | | `processTuple` (any port) | port-agnostic — same tuple passes through for ports 0, 1, 5, 99, MaxValue, -1 | | Tuple identity | pass-through preserves the exact `Tuple` reference (no copy) | | Successive calls | each returns an independent fresh iterator (no shared cursor) | | Per-call iterator | yields exactly one element | | `null` tuple | passes through as `null` (the impl does not null-check) | | Type contract | `UnionOpExec` is an `OperatorExecutor` | ### Any related issues, documentation, discussions? Closes #5653. ### How was this PR tested? Pure unit-test additions; verified locally with: - `sbt "WorkflowOperator/testOnly org.apache.texera.amber.operator.source.scan.AutoClosingIteratorSpec org.apache.texera.amber.operator.union.UnionOpExecSpec"` — 17 tests, all green - `sbt scalafmtCheckAll` — clean - CI to confirm ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.7 [1M context]) --- .../source/scan/AutoClosingIteratorSpec.scala | 154 +++++++++++++++++++++ .../amber/operator/union/UnionOpExecSpec.scala | 114 +++++++++++++++ 2 files changed, 268 insertions(+) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/AutoClosingIteratorSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/AutoClosingIteratorSpec.scala new file mode 100644 index 0000000000..1bd1dc5ed3 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/AutoClosingIteratorSpec.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.scan + +import org.scalatest.flatspec.AnyFlatSpec + +class AutoClosingIteratorSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // hasNext + onClose firing semantics + // --------------------------------------------------------------------------- + + "AutoClosingIterator.hasNext (non-empty underlying)" should + "return true and NOT invoke onClose" in { + var closed = false + val it = new AutoClosingIterator[Int](Iterator(1, 2, 3), () => closed = true) + assert(it.hasNext) + assert(!closed, "onClose must not fire while elements remain") + } + + "AutoClosingIterator.hasNext (exhausted underlying)" should + "return false and invoke onClose exactly once" in { + var closeCount = 0 + val it = new AutoClosingIterator[Int](Iterator.empty, () => closeCount += 1) + assert(!it.hasNext) + assert(closeCount == 1) + } + + it should "NOT invoke onClose again on a second hasNext after exhaustion" in { + var closeCount = 0 + val it = new AutoClosingIterator[Int](Iterator.empty, () => closeCount += 1) + assert(!it.hasNext) + assert(!it.hasNext) + assert(!it.hasNext) + assert(closeCount == 1, s"onClose must fire exactly once, got $closeCount calls") + } + + // --------------------------------------------------------------------------- + // next() — delegates straight through + // --------------------------------------------------------------------------- + + "AutoClosingIterator.next" should "delegate to the wrapped iterator (in order)" in { + val it = new AutoClosingIterator[Int](Iterator(10, 20, 30), () => ()) + assert(it.next() == 10) + assert(it.next() == 20) + assert(it.next() == 30) + } + + // --------------------------------------------------------------------------- + // Full traversal — onClose fires exactly once at the end + // --------------------------------------------------------------------------- + + "AutoClosingIterator full traversal" should + "yield every element of the wrapped iterator in order" in { + val it = new AutoClosingIterator[Int](Iterator(1, 2, 3, 4, 5), () => ()) + assert(it.toList == List(1, 2, 3, 4, 5)) + } + + it should "fire onClose exactly once when toList finishes consuming" in { + var closeCount = 0 + val it = new AutoClosingIterator[Int](Iterator(1, 2, 3), () => closeCount += 1) + val _ = it.toList + assert(closeCount == 1, s"expected single onClose firing, got $closeCount") + } + + // --------------------------------------------------------------------------- + // Already-empty source — close fires on the very first hasNext call + // --------------------------------------------------------------------------- + + "AutoClosingIterator over an already-empty source" should + "fire onClose on the very first hasNext call" in { + var fired = false + val it = new AutoClosingIterator[Int](Iterator.empty, () => fired = true) + val _ = it.hasNext + assert(fired, "onClose must fire when the source is already empty") + } + + // --------------------------------------------------------------------------- + // Mid-iteration close behavior — onClose does NOT fire before exhaustion + // --------------------------------------------------------------------------- + + "AutoClosingIterator (mid-iteration)" should + "leave onClose un-fired between elements (only fires after hasNext returns false)" in { + var fired = false + val it = new AutoClosingIterator[Int](Iterator(1, 2, 3), () => fired = true) + // Step-by-step assertion: after each hasNext that returns TRUE, + // onClose MUST still be un-fired. Only the hasNext that returns + // false may flip `fired`. A bug that prematurely closed during a + // truthy hasNext would surface here, not just at the loop's exit. + assert(it.hasNext); assert(!fired, "onClose must not fire while element 1 is reachable") + assert(it.next() == 1) + assert(it.hasNext); assert(!fired, "onClose must not fire while element 2 is reachable") + assert(it.next() == 2) + assert(it.hasNext); assert(!fired, "onClose must not fire while element 3 is reachable") + assert(it.next() == 3) + // The final hasNext returns false — THIS is the call that fires onClose. + assert(!it.hasNext) + assert(fired, "after hasNext first returns false, onClose must have fired") + } + + // --------------------------------------------------------------------------- + // onClose exception propagation + // --------------------------------------------------------------------------- + + "AutoClosingIterator" should + "propagate exceptions thrown from onClose (no swallowing)" in { + val it = new AutoClosingIterator[Int]( + Iterator.empty, + () => throw new IllegalStateException("close failed") + ) + val ex = intercept[IllegalStateException] { + it.hasNext + } + assert(ex.getMessage == "close failed") + } + + it should + "re-invoke onClose on a retry when the previous onClose threw (alreadyClosed is set AFTER onClose)" in { + // Reading the production code: `alreadyClosed = true` runs AFTER + // `onClose()`. So if onClose throws, alreadyClosed stays false and + // a second hasNext will re-invoke onClose. This is the OPPOSITE of + // an "alreadyClosed once close was attempted" contract — characterize + // the current (brittle) behavior so a refactor that swaps the order + // (running `alreadyClosed = true` BEFORE `onClose()`) surfaces here. + var closeCount = 0 + val it = new AutoClosingIterator[Int]( + Iterator.empty, + () => { + closeCount += 1 + throw new RuntimeException("boom") + } + ) + intercept[RuntimeException] { it.hasNext } + intercept[RuntimeException] { it.hasNext } // current impl re-runs onClose + assert(closeCount == 2, s"current impl re-fires onClose on retry; got $closeCount") + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/union/UnionOpExecSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/union/UnionOpExecSpec.scala new file mode 100644 index 0000000000..59a4db213b --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/union/UnionOpExecSpec.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.union + +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} +import org.scalatest.flatspec.AnyFlatSpec + +class UnionOpExecSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // Fixture builders + // --------------------------------------------------------------------------- + + private val attr = new Attribute("v", AttributeType.INTEGER) + private val schema: Schema = Schema().add(attr) + private def tuple(v: Int): Tuple = + Tuple.builder(schema).add(attr, Integer.valueOf(v)).build() + + // --------------------------------------------------------------------------- + // Pass-through semantics + // --------------------------------------------------------------------------- + + "UnionOpExec.processTuple" should + "yield a single-element iterator containing the input tuple" in { + val exec = new UnionOpExec + val t = tuple(42) + val out = exec.processTuple(t, port = 0).toList + assert(out == List(t)) + } + + it should "preserve the exact Tuple instance (pass-through, no copy)" in { + val exec = new UnionOpExec + val t = tuple(7) + val out = exec.processTuple(t, port = 0).toList + assert(out.size == 1) + assert(out.head eq t, "pass-through must return the same Tuple reference") + } + + // --------------------------------------------------------------------------- + // Port-agnostic behavior — union merges streams regardless of port id + // --------------------------------------------------------------------------- + + it should "yield the same tuple regardless of which input port it arrived on" in { + val exec = new UnionOpExec + val t = tuple(1) + val portsTested = List(0, 1, 5, 99, Int.MaxValue, -1) + portsTested.foreach { p => + assert(exec.processTuple(t, port = p).toList == List(t), s"port=$p must pass through") + } + } + + // --------------------------------------------------------------------------- + // Repeated calls — no state leakage + // --------------------------------------------------------------------------- + + it should "return an independent fresh iterator on each call (no shared cursor)" in { + val exec = new UnionOpExec + val a = tuple(1) + val b = tuple(2) + val itA = exec.processTuple(a, port = 0) + val itB = exec.processTuple(b, port = 1) + // Consume a before b — neither call should affect the other. + assert(itA.toList == List(a)) + assert(itB.toList == List(b)) + } + + it should "produce exactly one element per processTuple call" in { + val exec = new UnionOpExec + val t = tuple(1) + val iter = exec.processTuple(t, port = 0) + assert(iter.hasNext) + iter.next() + assert(!iter.hasNext, "iterator must be exhausted after the single pass-through") + } + + // --------------------------------------------------------------------------- + // Null tuple — pass-through is unconditional + // --------------------------------------------------------------------------- + + it should "pass-through a null tuple unchanged (the impl does not null-check)" in { + // Pin current behavior: `Iterator(tuple)` with `tuple = null` yields + // an iterator containing `null`. If a future change adds a null- + // check, that's a behavior change worth catching. + val exec = new UnionOpExec + val out = exec.processTuple(null, port = 0).toList + assert(out == List(null)) + } + + // --------------------------------------------------------------------------- + // Type contract — UnionOpExec is an OperatorExecutor + // --------------------------------------------------------------------------- + + "UnionOpExec" should "be an OperatorExecutor (compile-time enforced)" in { + val exec: org.apache.texera.amber.core.executor.OperatorExecutor = new UnionOpExec + assert(exec.processTuple(tuple(1), port = 0).toList.size == 1) + } +}
