This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ae9b35f12 test(workflow-operator): add unit test coverage for 
filter-family operator executors (#5656)
7ae9b35f12 is described below

commit 7ae9b35f12748616daf7bcc925fdde2e5def5187
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Jun 12 17:30:08 2026 -0700

    test(workflow-operator): add unit test coverage for filter-family operator 
executors (#5656)
    
    ### What changes were proposed in this PR?
    
    Pin behavior of four previously-uncovered modules in the `FilterOpExec`
    inheritance hierarchy in `common/workflow-operator`. No production-code
    changes.
    
    | Spec | Source class | Tests |
    | --- | --- | --- |
    | `FilterOpExecSpec` | `FilterOpExec` (abstract base) | 9 |
    | `RegexOpExecSpec` | `RegexOpExec` | 8 |
    | `SubstringSearchOpExecSpec` | `SubstringSearchOpExec` | 10 |
    | `RandomKSamplingOpExecSpec` | `RandomKSamplingOpExec` | 7 |
    
    All four spec files follow the `<srcClassName>Spec.scala` one-to-one
    convention. `SpecializedFilterOpExec` already has its own spec; this PR
    covers the rest of the family.
    
    **Behavior pinned — `FilterOpExec`**
    
    | Surface | Contract |
    | --- | --- |
    | `processTuple` (matching predicate) | yields the input tuple as a
    single-element iterator |
    | `processTuple` (non-matching predicate) | yields an empty iterator |
    | `processTuple` | passes the actual tuple instance to the predicate;
    ignores the `port` argument |
    | `setFilterFunc` | swapping the predicate changes the next
    `processTuple` result; value-aware predicates branch per-tuple |
    | Type contract | `FilterOpExec` is a `Serializable OperatorExecutor` |
    
    **Behavior pinned — `RegexOpExec`**
    
    | Surface | Contract |
    | --- | --- |
    | matching regex | yields the tuple |
    | find-semantics | unanchored substring match (not full-string
    `matches`) |
    | `caseInsensitive = true` / `false` | matches case-(in)sensitively |
    | invalid regex string | construction succeeds (lazy `Pattern`);
    `PatternSyntaxException` surfaces on first `processTuple` |
    | repeated invocations | pattern stays cached; results are stable |
    | malformed descriptor JSON | construction throws
    `JsonProcessingException` |
    
    **Behavior pinned — `SubstringSearchOpExec`**
    
    | Surface | Contract |
    | --- | --- |
    | substring present / absent | yields tuple / nothing |
    | position in value (start / middle / end) | irrelevant —
    `String.contains` semantics |
    | `isCaseSensitive = true` / `false` | case-(in)sensitive (lowercased
    equality on both sides) |
    | empty substring | matches every value, including the empty string |
    | repeated invocations | results stable |
    | malformed descriptor JSON | construction throws
    `JsonProcessingException` |
    
    **Behavior pinned — `RandomKSamplingOpExec`**
    
    | Surface | Contract |
    | --- | --- |
    | `percentage = 100` | accepts every tuple (1000-sample run) |
    | `percentage = 0` | rejects every tuple (1000-sample run) |
    | Same `workerCount` + `percentage` | identical emission count across
    two fresh instances (deterministic seed) |
    | `percentage = 50` | approximately half pass (within ±150 of 1000 over
    2000 draws) |
    | Different `workerCount` | divergent emission sequences (the seed is
    `workerCount`) |
    | malformed descriptor JSON | construction throws
    `JsonProcessingException` |
    
    `FilterOpExec` is abstract, so the spec uses a minimal test-only
    concrete subclass that exposes `setFilterFunc` for behavior-only
    assertions. The three subclass specs build descriptor JSON via
    `objectMapper.writeValueAsString` of a fresh `*OpDesc` (same fixture
    pattern as the existing `SpecializedFilterOpExecSpec`).
    
    ### Any related issues, documentation, discussions?
    
    Closes #5652.
    
    ### How was this PR tested?
    
    Pure unit-test additions; verified locally with:
    
    - `sbt "WorkflowOperator/testOnly
    org.apache.texera.amber.operator.filter.FilterOpExecSpec
    org.apache.texera.amber.operator.regex.RegexOpExecSpec
    org.apache.texera.amber.operator.substringSearch.SubstringSearchOpExecSpec
    org.apache.texera.amber.operator.randomksampling.RandomKSamplingOpExecSpec"`
    — 34 tests, all green
    - `sbt scalafmtCheckAll` — clean
    - CI to confirm
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.7 [1M context])
---
 .../amber/operator/filter/FilterOpExecSpec.scala   | 130 +++++++++++++++++++
 .../RandomKSamplingOpExecSpec.scala                | 129 +++++++++++++++++++
 .../amber/operator/regex/RegexOpExecSpec.scala     | 139 +++++++++++++++++++++
 .../SubstringSearchOpExecSpec.scala                | 133 ++++++++++++++++++++
 4 files changed, 531 insertions(+)

diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpExecSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpExecSpec.scala
new file mode 100644
index 0000000000..49935866ca
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpExecSpec.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.filter
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class FilterOpExecSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // Test harness — FilterOpExec is abstract. The existing
+  // SpecializedFilterOpExecSpec covers the full SpecializedFilterOpExec
+  // (predicate parsing + many AttributeType cases); here we exercise the
+  // base trait's contract directly via a minimal concrete subclass that
+  // exposes setFilterFunc.
+  // 
---------------------------------------------------------------------------
+
+  private class BareFilterOpExec extends FilterOpExec
+
+  // A one-attribute tuple is enough to drive predicate evaluation.
+  private val attr = new Attribute("v", AttributeType.INTEGER)
+  private val schema: Schema = Schema().add(attr)
+  private def tuple(v: Int): Tuple =
+    Tuple.builder(schema).add(attr, Integer.valueOf(v)).build()
+
+  // 
---------------------------------------------------------------------------
+  // processTuple — pass-through when predicate matches
+  // 
---------------------------------------------------------------------------
+
+  "FilterOpExec.processTuple" should
+    "yield the input tuple when filterFunc returns true" in {
+    val exec = new BareFilterOpExec
+    exec.setFilterFunc(_ => true)
+    val t = tuple(42)
+    val out = exec.processTuple(t, port = 0).toList
+    assert(out == List(t))
+  }
+
+  it should "yield an empty Iterator when filterFunc returns false" in {
+    val exec = new BareFilterOpExec
+    exec.setFilterFunc(_ => false)
+    val out = exec.processTuple(tuple(42), port = 0).toList
+    assert(out.isEmpty)
+  }
+
+  it should "evaluate filterFunc against the actual tuple (not a copy or 
null)" in {
+    // Capture what the predicate sees — must equal the argument passed in.
+    val exec = new BareFilterOpExec
+    var seen: Tuple = null
+    exec.setFilterFunc { t =>
+      seen = t
+      true
+    }
+    val t = tuple(7)
+    val _ = exec.processTuple(t, port = 0).toList
+    assert(seen eq t, "filterFunc must receive the same Tuple instance")
+  }
+
+  it should "ignore the port argument (port-agnostic by contract)" in {
+    val exec = new BareFilterOpExec
+    exec.setFilterFunc(_ => true)
+    val t = tuple(1)
+    val portsTested = List(0, 1, 7, Int.MaxValue, -1)
+    portsTested.foreach { p =>
+      assert(exec.processTuple(t, port = p).toList == List(t), s"port=$p")
+    }
+  }
+
+  it should "produce an iterator with exactly one element when the predicate 
matches" in {
+    // The contract is "single tuple" — not a multi-element iterator.
+    val exec = new BareFilterOpExec
+    exec.setFilterFunc(_ => true)
+    val iter = exec.processTuple(tuple(1), port = 0)
+    assert(iter.hasNext)
+    iter.next()
+    assert(!iter.hasNext, "iterator must be exhausted after the single match")
+  }
+
+  // 
---------------------------------------------------------------------------
+  // setFilterFunc — swapping the predicate
+  // 
---------------------------------------------------------------------------
+
+  "FilterOpExec.setFilterFunc" should
+    "swap the predicate for subsequent processTuple calls" in {
+    val exec = new BareFilterOpExec
+    exec.setFilterFunc(_ => true)
+    assert(exec.processTuple(tuple(1), port = 0).toList.size == 1)
+    exec.setFilterFunc(_ => false)
+    assert(exec.processTuple(tuple(1), port = 0).toList.isEmpty)
+  }
+
+  it should "accept a value-aware predicate that branches on the tuple's 
content" in {
+    // Pin that the predicate is genuinely consulted per-tuple (not memoized).
+    val exec = new BareFilterOpExec
+    exec.setFilterFunc(t => t.getField[Integer](0).intValue() % 2 == 0)
+    assert(exec.processTuple(tuple(2), port = 0).toList == List(tuple(2)))
+    assert(exec.processTuple(tuple(3), port = 0).toList.isEmpty)
+    assert(exec.processTuple(tuple(4), port = 0).toList == List(tuple(4)))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Serializable conformance — FilterOpExec extends Serializable so
+  // executors can ship over the wire.
+  // 
---------------------------------------------------------------------------
+
+  "FilterOpExec" should "be a Serializable OperatorExecutor (compile-time 
enforced)" in {
+    val exec: java.io.Serializable = new BareFilterOpExec
+    assert(exec != null)
+    val asOpExec: org.apache.texera.amber.core.executor.OperatorExecutor =
+      new BareFilterOpExec
+    assert(asOpExec != null)
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpExecSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpExecSpec.scala
new file mode 100644
index 0000000000..ea4f78fe0f
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpExecSpec.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.randomksampling
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+
+class RandomKSamplingOpExecSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // Fixture builders
+  // 
---------------------------------------------------------------------------
+
+  private val attr = new Attribute("v", AttributeType.INTEGER)
+  private val schema: Schema = Schema().add(attr)
+  private def tuple(v: Int): Tuple =
+    Tuple.builder(schema).add(attr, Integer.valueOf(v)).build()
+
+  private def descJson(percentage: Int): String = {
+    val desc = new RandomKSamplingOpDesc
+    desc.percentage = percentage
+    objectMapper.writeValueAsString(desc)
+  }
+
+  /** Run `count` tuples through `exec` and return how many it emitted. */
+  private def emittedCount(exec: RandomKSamplingOpExec, count: Int): Int =
+    (1 to count).count(i => exec.processTuple(tuple(i), port = 0).nonEmpty)
+
+  // 
---------------------------------------------------------------------------
+  // Boundary cases — 0% and 100%
+  // 
---------------------------------------------------------------------------
+  //
+  // The predicate is `(desc.percentage / 100.0) >= rand.nextDouble()`.
+  // `Random.nextDouble()` returns a value in `[0.0, 1.0)`.
+  //
+  //   - At 100% (`1.0`), `1.0 >= rand.nextDouble()` always holds → accept all.
+  //   - At 0%   (`0.0`), `0.0 >= rand.nextDouble()` holds iff `nextDouble()`
+  //     returns `0.0`. The probability of that is 1 / 2^53 ≈ 10^-16 — for
+  //     practical purposes, reject all.
+
+  "RandomKSamplingOpExec with percentage = 100" should "accept every tuple" in 
{
+    val exec = new RandomKSamplingOpExec(descJson(percentage = 100), idx = 0, 
workerCount = 7)
+    assert(emittedCount(exec, 1000) == 1000)
+  }
+
+  "RandomKSamplingOpExec with percentage = 0" should "reject every tuple" in {
+    // Edge case (`rand.nextDouble() == 0.0` would let one through) is
+    // astronomically improbable — running 1000 draws with a fixed seed
+    // either always passes or always fails. The latter is what the
+    // implementation produces for percentage 0.
+    val exec = new RandomKSamplingOpExec(descJson(percentage = 0), idx = 0, 
workerCount = 7)
+    assert(emittedCount(exec, 1000) == 0)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Determinism — seed = workerCount, so the same (workerCount,
+  // percentage, input-count) produces the same emission count across runs.
+  // 
---------------------------------------------------------------------------
+
+  "RandomKSamplingOpExec with the same workerCount and percentage" should
+    "produce the same emission count across two fresh instances (deterministic 
seed)" in {
+    val a = new RandomKSamplingOpExec(descJson(percentage = 50), idx = 0, 
workerCount = 13)
+    val b = new RandomKSamplingOpExec(descJson(percentage = 50), idx = 1, 
workerCount = 13)
+    val countA = emittedCount(a, 200)
+    val countB = emittedCount(b, 200)
+    assert(countA == countB, s"deterministic seed should give equal counts, 
got $countA vs $countB")
+  }
+
+  it should "yield approximately the requested fraction over a large sample" 
in {
+    // At 50% over 2000 tuples, the expected emission count is ~1000.
+    // For a binomial(2000, 0.5), 3σ is ~67 — allow a ±150 band so the
+    // case is well clear of stochastic flakiness while still catching
+    // gross deviations (e.g. percentage being ignored).
+    val exec = new RandomKSamplingOpExec(descJson(percentage = 50), idx = 0, 
workerCount = 1)
+    val n = emittedCount(exec, 2000)
+    assert(n >= 850 && n <= 1150, s"expected ~1000 emissions at 50%%, got $n")
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Different worker seeds — different streams
+  // 
---------------------------------------------------------------------------
+
+  "RandomKSamplingOpExec with different workerCount values" should
+    "draw different sequences (the seed is workerCount)" in {
+    // Two executors with the same percentage but different workerCount
+    // should not produce IDENTICAL emission sequences over a meaningful
+    // sample — the seed is workerCount, so the streams diverge.
+    val a = new RandomKSamplingOpExec(descJson(percentage = 50), idx = 0, 
workerCount = 1)
+    val b = new RandomKSamplingOpExec(descJson(percentage = 50), idx = 0, 
workerCount = 2)
+    val emissionsA = (1 to 100).map(i => execEmit(a, i))
+    val emissionsB = (1 to 100).map(i => execEmit(b, i))
+    assert(
+      emissionsA != emissionsB,
+      "different workerCount seeds must produce different emission sequences"
+    )
+  }
+
+  private def execEmit(exec: RandomKSamplingOpExec, i: Int): Boolean =
+    exec.processTuple(tuple(i), port = 0).nonEmpty
+
+  // 
---------------------------------------------------------------------------
+  // Descriptor parse failure
+  // 
---------------------------------------------------------------------------
+
+  "RandomKSamplingOpExec construction" should
+    "throw on malformed descriptor JSON" in {
+    intercept[com.fasterxml.jackson.core.JsonProcessingException] {
+      new RandomKSamplingOpExec("{not valid", idx = 0, workerCount = 1)
+    }
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/regex/RegexOpExecSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/regex/RegexOpExecSpec.scala
new file mode 100644
index 0000000000..3d93539876
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/regex/RegexOpExecSpec.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.regex
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+
+class RegexOpExecSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // Fixture builders — mirrors the pattern used by SpecializedFilterOpExecSpec
+  // 
---------------------------------------------------------------------------
+
+  private val attr = new Attribute("body", AttributeType.STRING)
+  private val schema: Schema = Schema().add(attr)
+  private def tuple(text: String): Tuple =
+    Tuple.builder(schema).add(attr, text).build()
+
+  private def descJson(regex: String, caseInsensitive: Boolean = false): 
String = {
+    val desc = new RegexOpDesc
+    desc.attribute = "body"
+    desc.regex = regex
+    desc.caseInsensitive = caseInsensitive
+    objectMapper.writeValueAsString(desc)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Pattern matching — find-semantics (substring match, not full match)
+  // 
---------------------------------------------------------------------------
+
+  "RegexOpExec" should "yield the input tuple when the regex matches the 
attribute" in {
+    val exec = new RegexOpExec(descJson(regex = "hello"))
+    val t = tuple("hello world")
+    assert(exec.processTuple(t, port = 0).toList == List(t))
+  }
+
+  it should "use find-semantics — a pattern matches if it appears anywhere in 
the value" in {
+    // `Matcher.find` succeeds on a substring; it is not anchored. Pin
+    // this so a future refactor that switched to `matches` (full-string)
+    // would surface here.
+    val exec = new RegexOpExec(descJson(regex = "abc"))
+    val t = tuple("xx abc xx")
+    assert(exec.processTuple(t, port = 0).toList == List(t))
+  }
+
+  it should "yield nothing when the regex does not match" in {
+    val exec = new RegexOpExec(descJson(regex = "foo"))
+    assert(exec.processTuple(tuple("bar baz"), port = 0).toList.isEmpty)
+  }
+
+  it should "yield the tuple when the regex character class matches at least 
one char" in {
+    val exec = new RegexOpExec(descJson(regex = "\\d+"))
+    assert(exec.processTuple(tuple("answer is 42 plus"), port = 0).toList.size 
== 1)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Case sensitivity
+  // 
---------------------------------------------------------------------------
+
+  "RegexOpExec with caseInsensitive = true" should "match case-insensitively" 
in {
+    val exec = new RegexOpExec(descJson(regex = "HELLO", caseInsensitive = 
true))
+    val t = tuple("hello world")
+    assert(exec.processTuple(t, port = 0).toList == List(t))
+  }
+
+  "RegexOpExec with caseInsensitive = false" should
+    "NOT match when the case differs (default behavior)" in {
+    val exec = new RegexOpExec(descJson(regex = "HELLO", caseInsensitive = 
false))
+    assert(exec.processTuple(tuple("hello world"), port = 0).toList.isEmpty)
+  }
+
+  it should "still match identical case under case-sensitive mode" in {
+    val exec = new RegexOpExec(descJson(regex = "HELLO", caseInsensitive = 
false))
+    val t = tuple("Say HELLO!")
+    assert(exec.processTuple(t, port = 0).toList == List(t))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Pattern compilation laziness — `pattern` is a lazy val; pin that
+  // construction does not eagerly compile (so a bad regex doesn't blow
+  // up at the wrong time).
+  // 
---------------------------------------------------------------------------
+
+  "RegexOpExec" should
+    "tolerate construction with an invalid regex (compilation is lazy on 
`pattern`)" in {
+    // `[` is an invalid character class — but the pattern is lazily
+    // compiled inside `matchRegex`. The constructor must succeed; the
+    // failure only surfaces on the first processTuple call.
+    val exec = new RegexOpExec(descJson(regex = "["))
+    intercept[java.util.regex.PatternSyntaxException] {
+      exec.processTuple(tuple("anything"), port = 0).toList
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Repeated invocations — pattern stays cached (lazy val), behavior stable
+  // 
---------------------------------------------------------------------------
+
+  it should "produce stable results across repeated processTuple calls 
(pattern cached)" in {
+    val exec = new RegexOpExec(descJson(regex = "match"))
+    val hit = tuple("match here")
+    val miss = tuple("no signal")
+    assert(exec.processTuple(hit, port = 0).toList == List(hit))
+    assert(exec.processTuple(miss, port = 0).toList.isEmpty)
+    assert(exec.processTuple(hit, port = 0).toList == List(hit))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Descriptor parse failure surfaces during construction
+  // 
---------------------------------------------------------------------------
+
+  "RegexOpExec construction" should
+    "throw on malformed descriptor JSON" in {
+    // The constructor calls objectMapper.readValue; mis-formed JSON must
+    // propagate as a Jackson parse exception, not silently fall through
+    // to a half-constructed executor.
+    intercept[com.fasterxml.jackson.core.JsonProcessingException] {
+      new RegexOpExec("{not valid json")
+    }
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/substringSearch/SubstringSearchOpExecSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/substringSearch/SubstringSearchOpExecSpec.scala
new file mode 100644
index 0000000000..83fd90fee0
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/substringSearch/SubstringSearchOpExecSpec.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.substringSearch
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+
+class SubstringSearchOpExecSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // Fixture builders
+  // 
---------------------------------------------------------------------------
+
+  private val attr = new Attribute("body", AttributeType.STRING)
+  private val schema: Schema = Schema().add(attr)
+  private def tuple(text: String): Tuple =
+    Tuple.builder(schema).add(attr, text).build()
+
+  private def descJson(substring: String, isCaseSensitive: Boolean = false): 
String = {
+    val desc = new SubstringSearchOpDesc
+    desc.attribute = "body"
+    desc.substring = substring
+    desc.isCaseSensitive = isCaseSensitive
+    objectMapper.writeValueAsString(desc)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Substring detection — match / no-match
+  // 
---------------------------------------------------------------------------
+
+  "SubstringSearchOpExec" should "yield the input tuple when the substring is 
present" in {
+    val exec = new SubstringSearchOpExec(descJson(substring = "hello"))
+    val t = tuple("hello world")
+    assert(exec.processTuple(t, port = 0).toList == List(t))
+  }
+
+  it should "yield nothing when the substring is absent" in {
+    val exec = new SubstringSearchOpExec(descJson(substring = "missing"))
+    assert(exec.processTuple(tuple("hello world"), port = 0).toList.isEmpty)
+  }
+
+  it should
+    "match when the substring sits anywhere in the value (start / middle / 
end)" in {
+    val exec = new SubstringSearchOpExec(descJson(substring = "abc"))
+    assert(exec.processTuple(tuple("abc xx"), port = 0).toList.nonEmpty)
+    assert(exec.processTuple(tuple("xx abc xx"), port = 0).toList.nonEmpty)
+    assert(exec.processTuple(tuple("xx abc"), port = 0).toList.nonEmpty)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Case sensitivity
+  // 
---------------------------------------------------------------------------
+
+  "SubstringSearchOpExec with isCaseSensitive = true" should
+    "match case-sensitively (case mismatch is rejected)" in {
+    val exec = new SubstringSearchOpExec(descJson(substring = "HELLO", 
isCaseSensitive = true))
+    assert(exec.processTuple(tuple("hello world"), port = 0).toList.isEmpty)
+  }
+
+  it should "yield the tuple when the case matches under case-sensitive mode" 
in {
+    val exec = new SubstringSearchOpExec(descJson(substring = "HELLO", 
isCaseSensitive = true))
+    val t = tuple("Say HELLO loudly")
+    assert(exec.processTuple(t, port = 0).toList == List(t))
+  }
+
+  "SubstringSearchOpExec with isCaseSensitive = false" should
+    "match case-insensitively (production lowercases both sides before 
String.contains)" in {
+    val exec = new SubstringSearchOpExec(descJson(substring = "HELLO", 
isCaseSensitive = false))
+    val t = tuple("hello world")
+    assert(exec.processTuple(t, port = 0).toList == List(t))
+  }
+
+  it should "still match identical-case values under case-insensitive mode" in 
{
+    val exec = new SubstringSearchOpExec(descJson(substring = "world", 
isCaseSensitive = false))
+    val t = tuple("hello world")
+    assert(exec.processTuple(t, port = 0).toList == List(t))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Edge: empty substring
+  // 
---------------------------------------------------------------------------
+
+  it should "treat the empty substring as matching every value (Java 
String.contains(\"\") == true)" in {
+    val exec = new SubstringSearchOpExec(descJson(substring = ""))
+    val t = tuple("any non-empty text")
+    assert(exec.processTuple(t, port = 0).toList == List(t))
+    // Even an empty value contains the empty substring.
+    val empty = tuple("")
+    assert(exec.processTuple(empty, port = 0).toList == List(empty))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Repeated invocations — predicate stays stable
+  // 
---------------------------------------------------------------------------
+
+  it should "produce stable results across repeated processTuple calls" in {
+    val exec = new SubstringSearchOpExec(descJson(substring = "match"))
+    val hit = tuple("match here")
+    val miss = tuple("no signal")
+    assert(exec.processTuple(hit, port = 0).toList == List(hit))
+    assert(exec.processTuple(miss, port = 0).toList.isEmpty)
+    assert(exec.processTuple(hit, port = 0).toList == List(hit))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Descriptor parse failure
+  // 
---------------------------------------------------------------------------
+
+  "SubstringSearchOpExec construction" should
+    "throw on malformed descriptor JSON" in {
+    intercept[com.fasterxml.jackson.core.JsonProcessingException] {
+      new SubstringSearchOpExec("{not valid")
+    }
+  }
+}

Reply via email to