This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 7ae9b35f12 test(workflow-operator): add unit test coverage for
filter-family operator executors (#5656)
7ae9b35f12 is described below
commit 7ae9b35f12748616daf7bcc925fdde2e5def5187
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Jun 12 17:30:08 2026 -0700
test(workflow-operator): add unit test coverage for filter-family operator
executors (#5656)
### What changes were proposed in this PR?
Pin behavior of four previously-uncovered modules in the `FilterOpExec`
inheritance hierarchy in `common/workflow-operator`. No production-code
changes.
| Spec | Source class | Tests |
| --- | --- | --- |
| `FilterOpExecSpec` | `FilterOpExec` (abstract base) | 9 |
| `RegexOpExecSpec` | `RegexOpExec` | 8 |
| `SubstringSearchOpExecSpec` | `SubstringSearchOpExec` | 10 |
| `RandomKSamplingOpExecSpec` | `RandomKSamplingOpExec` | 7 |
All four spec files follow the `<srcClassName>Spec.scala` one-to-one
convention. `SpecializedFilterOpExec` already has its own spec; this PR
covers the rest of the family.
**Behavior pinned — `FilterOpExec`**
| Surface | Contract |
| --- | --- |
| `processTuple` (matching predicate) | yields the input tuple as a
single-element iterator |
| `processTuple` (non-matching predicate) | yields an empty iterator |
| `processTuple` | passes the actual tuple instance to the predicate;
ignores the `port` argument |
| `setFilterFunc` | swapping the predicate changes the next
`processTuple` result; value-aware predicates branch per-tuple |
| Type contract | `FilterOpExec` is a `Serializable OperatorExecutor` |
**Behavior pinned — `RegexOpExec`**
| Surface | Contract |
| --- | --- |
| matching regex | yields the tuple |
| find-semantics | unanchored substring match (not full-string
`matches`) |
| `caseInsensitive = true` / `false` | matches case-(in)sensitively |
| invalid regex string | construction succeeds (lazy `Pattern`);
`PatternSyntaxException` surfaces on first `processTuple` |
| repeated invocations | pattern stays cached; results are stable |
| malformed descriptor JSON | construction throws
`JsonProcessingException` |
**Behavior pinned — `SubstringSearchOpExec`**
| Surface | Contract |
| --- | --- |
| substring present / absent | yields tuple / nothing |
| position in value (start / middle / end) | irrelevant —
`String.contains` semantics |
| `isCaseSensitive = true` / `false` | case-(in)sensitive (lowercased
equality on both sides) |
| empty substring | matches every value, including the empty string |
| repeated invocations | results stable |
| malformed descriptor JSON | construction throws
`JsonProcessingException` |
**Behavior pinned — `RandomKSamplingOpExec`**
| Surface | Contract |
| --- | --- |
| `percentage = 100` | accepts every tuple (1000-sample run) |
| `percentage = 0` | rejects every tuple (1000-sample run) |
| Same `workerCount` + `percentage` | identical emission count across
two fresh instances (deterministic seed) |
| `percentage = 50` | approximately half pass (within ±150 of 1000 over
2000 draws) |
| Different `workerCount` | divergent emission sequences (the seed is
`workerCount`) |
| malformed descriptor JSON | construction throws
`JsonProcessingException` |
`FilterOpExec` is abstract, so the spec uses a minimal test-only
concrete subclass that exposes `setFilterFunc` for behavior-only
assertions. The three subclass specs build descriptor JSON via
`objectMapper.writeValueAsString` of a fresh `*OpDesc` (same fixture
pattern as the existing `SpecializedFilterOpExecSpec`).
### Any related issues, documentation, discussions?
Closes #5652.
### How was this PR tested?
Pure unit-test additions; verified locally with:
- `sbt "WorkflowOperator/testOnly
org.apache.texera.amber.operator.filter.FilterOpExecSpec
org.apache.texera.amber.operator.regex.RegexOpExecSpec
org.apache.texera.amber.operator.substringSearch.SubstringSearchOpExecSpec
org.apache.texera.amber.operator.randomksampling.RandomKSamplingOpExecSpec"`
— 34 tests, all green
- `sbt scalafmtCheckAll` — clean
- CI to confirm
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7 [1M context])
---
.../amber/operator/filter/FilterOpExecSpec.scala | 130 +++++++++++++++++++
.../RandomKSamplingOpExecSpec.scala | 129 +++++++++++++++++++
.../amber/operator/regex/RegexOpExecSpec.scala | 139 +++++++++++++++++++++
.../SubstringSearchOpExecSpec.scala | 133 ++++++++++++++++++++
4 files changed, 531 insertions(+)
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpExecSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpExecSpec.scala
new file mode 100644
index 0000000000..49935866ca
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpExecSpec.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.filter
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class FilterOpExecSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // Test harness — FilterOpExec is abstract. The existing
+ // SpecializedFilterOpExecSpec covers the full SpecializedFilterOpExec
+ // (predicate parsing + many AttributeType cases); here we exercise the
+ // base trait's contract directly via a minimal concrete subclass that
+ // exposes setFilterFunc.
+ //
---------------------------------------------------------------------------
+
+ private class BareFilterOpExec extends FilterOpExec
+
+ // A one-attribute tuple is enough to drive predicate evaluation.
+ private val attr = new Attribute("v", AttributeType.INTEGER)
+ private val schema: Schema = Schema().add(attr)
+ private def tuple(v: Int): Tuple =
+ Tuple.builder(schema).add(attr, Integer.valueOf(v)).build()
+
+ //
---------------------------------------------------------------------------
+ // processTuple — pass-through when predicate matches
+ //
---------------------------------------------------------------------------
+
+ "FilterOpExec.processTuple" should
+ "yield the input tuple when filterFunc returns true" in {
+ val exec = new BareFilterOpExec
+ exec.setFilterFunc(_ => true)
+ val t = tuple(42)
+ val out = exec.processTuple(t, port = 0).toList
+ assert(out == List(t))
+ }
+
+ it should "yield an empty Iterator when filterFunc returns false" in {
+ val exec = new BareFilterOpExec
+ exec.setFilterFunc(_ => false)
+ val out = exec.processTuple(tuple(42), port = 0).toList
+ assert(out.isEmpty)
+ }
+
+ it should "evaluate filterFunc against the actual tuple (not a copy or
null)" in {
+ // Capture what the predicate sees — must equal the argument passed in.
+ val exec = new BareFilterOpExec
+ var seen: Tuple = null
+ exec.setFilterFunc { t =>
+ seen = t
+ true
+ }
+ val t = tuple(7)
+ val _ = exec.processTuple(t, port = 0).toList
+ assert(seen eq t, "filterFunc must receive the same Tuple instance")
+ }
+
+ it should "ignore the port argument (port-agnostic by contract)" in {
+ val exec = new BareFilterOpExec
+ exec.setFilterFunc(_ => true)
+ val t = tuple(1)
+ val portsTested = List(0, 1, 7, Int.MaxValue, -1)
+ portsTested.foreach { p =>
+ assert(exec.processTuple(t, port = p).toList == List(t), s"port=$p")
+ }
+ }
+
+ it should "produce an iterator with exactly one element when the predicate
matches" in {
+ // The contract is "single tuple" — not a multi-element iterator.
+ val exec = new BareFilterOpExec
+ exec.setFilterFunc(_ => true)
+ val iter = exec.processTuple(tuple(1), port = 0)
+ assert(iter.hasNext)
+ iter.next()
+ assert(!iter.hasNext, "iterator must be exhausted after the single match")
+ }
+
+ //
---------------------------------------------------------------------------
+ // setFilterFunc — swapping the predicate
+ //
---------------------------------------------------------------------------
+
+ "FilterOpExec.setFilterFunc" should
+ "swap the predicate for subsequent processTuple calls" in {
+ val exec = new BareFilterOpExec
+ exec.setFilterFunc(_ => true)
+ assert(exec.processTuple(tuple(1), port = 0).toList.size == 1)
+ exec.setFilterFunc(_ => false)
+ assert(exec.processTuple(tuple(1), port = 0).toList.isEmpty)
+ }
+
+ it should "accept a value-aware predicate that branches on the tuple's
content" in {
+ // Pin that the predicate is genuinely consulted per-tuple (not memoized).
+ val exec = new BareFilterOpExec
+ exec.setFilterFunc(t => t.getField[Integer](0).intValue() % 2 == 0)
+ assert(exec.processTuple(tuple(2), port = 0).toList == List(tuple(2)))
+ assert(exec.processTuple(tuple(3), port = 0).toList.isEmpty)
+ assert(exec.processTuple(tuple(4), port = 0).toList == List(tuple(4)))
+ }
+
+ //
---------------------------------------------------------------------------
+ // Serializable conformance — FilterOpExec extends Serializable so
+ // executors can ship over the wire.
+ //
---------------------------------------------------------------------------
+
+ "FilterOpExec" should "be a Serializable OperatorExecutor (compile-time
enforced)" in {
+ val exec: java.io.Serializable = new BareFilterOpExec
+ assert(exec != null)
+ val asOpExec: org.apache.texera.amber.core.executor.OperatorExecutor =
+ new BareFilterOpExec
+ assert(asOpExec != null)
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpExecSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpExecSpec.scala
new file mode 100644
index 0000000000..ea4f78fe0f
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpExecSpec.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.randomksampling
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+
+class RandomKSamplingOpExecSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // Fixture builders
+ //
---------------------------------------------------------------------------
+
+ private val attr = new Attribute("v", AttributeType.INTEGER)
+ private val schema: Schema = Schema().add(attr)
+ private def tuple(v: Int): Tuple =
+ Tuple.builder(schema).add(attr, Integer.valueOf(v)).build()
+
+ private def descJson(percentage: Int): String = {
+ val desc = new RandomKSamplingOpDesc
+ desc.percentage = percentage
+ objectMapper.writeValueAsString(desc)
+ }
+
+ /** Run `count` tuples through `exec` and return how many it emitted. */
+ private def emittedCount(exec: RandomKSamplingOpExec, count: Int): Int =
+ (1 to count).count(i => exec.processTuple(tuple(i), port = 0).nonEmpty)
+
+ //
---------------------------------------------------------------------------
+ // Boundary cases — 0% and 100%
+ //
---------------------------------------------------------------------------
+ //
+ // The predicate is `(desc.percentage / 100.0) >= rand.nextDouble()`.
+ // `Random.nextDouble()` returns a value in `[0.0, 1.0)`.
+ //
+ // - At 100% (`1.0`), `1.0 >= rand.nextDouble()` always holds → accept all.
+ // - At 0% (`0.0`), `0.0 >= rand.nextDouble()` holds iff `nextDouble()`
+ // returns `0.0`. The probability of that is 1 / 2^53 ≈ 10^-16 — for
+ // practical purposes, reject all.
+
+ "RandomKSamplingOpExec with percentage = 100" should "accept every tuple" in
{
+ val exec = new RandomKSamplingOpExec(descJson(percentage = 100), idx = 0,
workerCount = 7)
+ assert(emittedCount(exec, 1000) == 1000)
+ }
+
+ "RandomKSamplingOpExec with percentage = 0" should "reject every tuple" in {
+ // Edge case (`rand.nextDouble() == 0.0` would let one through) is
+ // astronomically improbable — running 1000 draws with a fixed seed
+ // either always passes or always fails. The latter is what the
+ // implementation produces for percentage 0.
+ val exec = new RandomKSamplingOpExec(descJson(percentage = 0), idx = 0,
workerCount = 7)
+ assert(emittedCount(exec, 1000) == 0)
+ }
+
+ //
---------------------------------------------------------------------------
+ // Determinism — seed = workerCount, so the same (workerCount,
+ // percentage, input-count) produces the same emission count across runs.
+ //
---------------------------------------------------------------------------
+
+ "RandomKSamplingOpExec with the same workerCount and percentage" should
+ "produce the same emission count across two fresh instances (deterministic
seed)" in {
+ val a = new RandomKSamplingOpExec(descJson(percentage = 50), idx = 0,
workerCount = 13)
+ val b = new RandomKSamplingOpExec(descJson(percentage = 50), idx = 1,
workerCount = 13)
+ val countA = emittedCount(a, 200)
+ val countB = emittedCount(b, 200)
+ assert(countA == countB, s"deterministic seed should give equal counts,
got $countA vs $countB")
+ }
+
+ it should "yield approximately the requested fraction over a large sample"
in {
+ // At 50% over 2000 tuples, the expected emission count is ~1000.
+ // For a binomial(2000, 0.5), 3σ is ~67 — allow a ±150 band so the
+ // case is well clear of stochastic flakiness while still catching
+ // gross deviations (e.g. percentage being ignored).
+ val exec = new RandomKSamplingOpExec(descJson(percentage = 50), idx = 0,
workerCount = 1)
+ val n = emittedCount(exec, 2000)
+ assert(n >= 850 && n <= 1150, s"expected ~1000 emissions at 50%%, got $n")
+ }
+
+ //
---------------------------------------------------------------------------
+ // Different worker seeds — different streams
+ //
---------------------------------------------------------------------------
+
+ "RandomKSamplingOpExec with different workerCount values" should
+ "draw different sequences (the seed is workerCount)" in {
+ // Two executors with the same percentage but different workerCount
+ // should not produce IDENTICAL emission sequences over a meaningful
+ // sample — the seed is workerCount, so the streams diverge.
+ val a = new RandomKSamplingOpExec(descJson(percentage = 50), idx = 0,
workerCount = 1)
+ val b = new RandomKSamplingOpExec(descJson(percentage = 50), idx = 0,
workerCount = 2)
+ val emissionsA = (1 to 100).map(i => execEmit(a, i))
+ val emissionsB = (1 to 100).map(i => execEmit(b, i))
+ assert(
+ emissionsA != emissionsB,
+ "different workerCount seeds must produce different emission sequences"
+ )
+ }
+
+ private def execEmit(exec: RandomKSamplingOpExec, i: Int): Boolean =
+ exec.processTuple(tuple(i), port = 0).nonEmpty
+
+ //
---------------------------------------------------------------------------
+ // Descriptor parse failure
+ //
---------------------------------------------------------------------------
+
+ "RandomKSamplingOpExec construction" should
+ "throw on malformed descriptor JSON" in {
+ intercept[com.fasterxml.jackson.core.JsonProcessingException] {
+ new RandomKSamplingOpExec("{not valid", idx = 0, workerCount = 1)
+ }
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/regex/RegexOpExecSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/regex/RegexOpExecSpec.scala
new file mode 100644
index 0000000000..3d93539876
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/regex/RegexOpExecSpec.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.regex
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+
+class RegexOpExecSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // Fixture builders — mirrors the pattern used by SpecializedFilterOpExecSpec
+ //
---------------------------------------------------------------------------
+
+ private val attr = new Attribute("body", AttributeType.STRING)
+ private val schema: Schema = Schema().add(attr)
+ private def tuple(text: String): Tuple =
+ Tuple.builder(schema).add(attr, text).build()
+
+ private def descJson(regex: String, caseInsensitive: Boolean = false):
String = {
+ val desc = new RegexOpDesc
+ desc.attribute = "body"
+ desc.regex = regex
+ desc.caseInsensitive = caseInsensitive
+ objectMapper.writeValueAsString(desc)
+ }
+
+ //
---------------------------------------------------------------------------
+ // Pattern matching — find-semantics (substring match, not full match)
+ //
---------------------------------------------------------------------------
+
+ "RegexOpExec" should "yield the input tuple when the regex matches the
attribute" in {
+ val exec = new RegexOpExec(descJson(regex = "hello"))
+ val t = tuple("hello world")
+ assert(exec.processTuple(t, port = 0).toList == List(t))
+ }
+
+ it should "use find-semantics — a pattern matches if it appears anywhere in
the value" in {
+ // `Matcher.find` succeeds on a substring; it is not anchored. Pin
+ // this so a future refactor that switched to `matches` (full-string)
+ // would surface here.
+ val exec = new RegexOpExec(descJson(regex = "abc"))
+ val t = tuple("xx abc xx")
+ assert(exec.processTuple(t, port = 0).toList == List(t))
+ }
+
+ it should "yield nothing when the regex does not match" in {
+ val exec = new RegexOpExec(descJson(regex = "foo"))
+ assert(exec.processTuple(tuple("bar baz"), port = 0).toList.isEmpty)
+ }
+
+ it should "yield the tuple when the regex character class matches at least
one char" in {
+ val exec = new RegexOpExec(descJson(regex = "\\d+"))
+ assert(exec.processTuple(tuple("answer is 42 plus"), port = 0).toList.size
== 1)
+ }
+
+ //
---------------------------------------------------------------------------
+ // Case sensitivity
+ //
---------------------------------------------------------------------------
+
+ "RegexOpExec with caseInsensitive = true" should "match case-insensitively"
in {
+ val exec = new RegexOpExec(descJson(regex = "HELLO", caseInsensitive =
true))
+ val t = tuple("hello world")
+ assert(exec.processTuple(t, port = 0).toList == List(t))
+ }
+
+ "RegexOpExec with caseInsensitive = false" should
+ "NOT match when the case differs (default behavior)" in {
+ val exec = new RegexOpExec(descJson(regex = "HELLO", caseInsensitive =
false))
+ assert(exec.processTuple(tuple("hello world"), port = 0).toList.isEmpty)
+ }
+
+ it should "still match identical case under case-sensitive mode" in {
+ val exec = new RegexOpExec(descJson(regex = "HELLO", caseInsensitive =
false))
+ val t = tuple("Say HELLO!")
+ assert(exec.processTuple(t, port = 0).toList == List(t))
+ }
+
+ //
---------------------------------------------------------------------------
+ // Pattern compilation laziness — `pattern` is a lazy val; pin that
+ // construction does not eagerly compile (so a bad regex doesn't blow
+ // up at the wrong time).
+ //
---------------------------------------------------------------------------
+
+ "RegexOpExec" should
+ "tolerate construction with an invalid regex (compilation is lazy on
`pattern`)" in {
+ // `[` is an invalid character class — but the pattern is lazily
+ // compiled inside `matchRegex`. The constructor must succeed; the
+ // failure only surfaces on the first processTuple call.
+ val exec = new RegexOpExec(descJson(regex = "["))
+ intercept[java.util.regex.PatternSyntaxException] {
+ exec.processTuple(tuple("anything"), port = 0).toList
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Repeated invocations — pattern stays cached (lazy val), behavior stable
+ //
---------------------------------------------------------------------------
+
+ it should "produce stable results across repeated processTuple calls
(pattern cached)" in {
+ val exec = new RegexOpExec(descJson(regex = "match"))
+ val hit = tuple("match here")
+ val miss = tuple("no signal")
+ assert(exec.processTuple(hit, port = 0).toList == List(hit))
+ assert(exec.processTuple(miss, port = 0).toList.isEmpty)
+ assert(exec.processTuple(hit, port = 0).toList == List(hit))
+ }
+
+ //
---------------------------------------------------------------------------
+ // Descriptor parse failure surfaces during construction
+ //
---------------------------------------------------------------------------
+
+ "RegexOpExec construction" should
+ "throw on malformed descriptor JSON" in {
+ // The constructor calls objectMapper.readValue; mis-formed JSON must
+ // propagate as a Jackson parse exception, not silently fall through
+ // to a half-constructed executor.
+ intercept[com.fasterxml.jackson.core.JsonProcessingException] {
+ new RegexOpExec("{not valid json")
+ }
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/substringSearch/SubstringSearchOpExecSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/substringSearch/SubstringSearchOpExecSpec.scala
new file mode 100644
index 0000000000..83fd90fee0
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/substringSearch/SubstringSearchOpExecSpec.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.substringSearch
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+
+class SubstringSearchOpExecSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // Fixture builders
+ //
---------------------------------------------------------------------------
+
+ private val attr = new Attribute("body", AttributeType.STRING)
+ private val schema: Schema = Schema().add(attr)
+ private def tuple(text: String): Tuple =
+ Tuple.builder(schema).add(attr, text).build()
+
+ private def descJson(substring: String, isCaseSensitive: Boolean = false):
String = {
+ val desc = new SubstringSearchOpDesc
+ desc.attribute = "body"
+ desc.substring = substring
+ desc.isCaseSensitive = isCaseSensitive
+ objectMapper.writeValueAsString(desc)
+ }
+
+ //
---------------------------------------------------------------------------
+ // Substring detection — match / no-match
+ //
---------------------------------------------------------------------------
+
+ "SubstringSearchOpExec" should "yield the input tuple when the substring is
present" in {
+ val exec = new SubstringSearchOpExec(descJson(substring = "hello"))
+ val t = tuple("hello world")
+ assert(exec.processTuple(t, port = 0).toList == List(t))
+ }
+
+ it should "yield nothing when the substring is absent" in {
+ val exec = new SubstringSearchOpExec(descJson(substring = "missing"))
+ assert(exec.processTuple(tuple("hello world"), port = 0).toList.isEmpty)
+ }
+
+ it should
+ "match when the substring sits anywhere in the value (start / middle /
end)" in {
+ val exec = new SubstringSearchOpExec(descJson(substring = "abc"))
+ assert(exec.processTuple(tuple("abc xx"), port = 0).toList.nonEmpty)
+ assert(exec.processTuple(tuple("xx abc xx"), port = 0).toList.nonEmpty)
+ assert(exec.processTuple(tuple("xx abc"), port = 0).toList.nonEmpty)
+ }
+
+ //
---------------------------------------------------------------------------
+ // Case sensitivity
+ //
---------------------------------------------------------------------------
+
+ "SubstringSearchOpExec with isCaseSensitive = true" should
+ "match case-sensitively (case mismatch is rejected)" in {
+ val exec = new SubstringSearchOpExec(descJson(substring = "HELLO",
isCaseSensitive = true))
+ assert(exec.processTuple(tuple("hello world"), port = 0).toList.isEmpty)
+ }
+
+ it should "yield the tuple when the case matches under case-sensitive mode"
in {
+ val exec = new SubstringSearchOpExec(descJson(substring = "HELLO",
isCaseSensitive = true))
+ val t = tuple("Say HELLO loudly")
+ assert(exec.processTuple(t, port = 0).toList == List(t))
+ }
+
+ "SubstringSearchOpExec with isCaseSensitive = false" should
+ "match case-insensitively (production lowercases both sides before
String.contains)" in {
+ val exec = new SubstringSearchOpExec(descJson(substring = "HELLO",
isCaseSensitive = false))
+ val t = tuple("hello world")
+ assert(exec.processTuple(t, port = 0).toList == List(t))
+ }
+
+ it should "still match identical-case values under case-insensitive mode" in
{
+ val exec = new SubstringSearchOpExec(descJson(substring = "world",
isCaseSensitive = false))
+ val t = tuple("hello world")
+ assert(exec.processTuple(t, port = 0).toList == List(t))
+ }
+
+ //
---------------------------------------------------------------------------
+ // Edge: empty substring
+ //
---------------------------------------------------------------------------
+
+ it should "treat the empty substring as matching every value (Java
String.contains(\"\") == true)" in {
+ val exec = new SubstringSearchOpExec(descJson(substring = ""))
+ val t = tuple("any non-empty text")
+ assert(exec.processTuple(t, port = 0).toList == List(t))
+ // Even an empty value contains the empty substring.
+ val empty = tuple("")
+ assert(exec.processTuple(empty, port = 0).toList == List(empty))
+ }
+
+ //
---------------------------------------------------------------------------
+ // Repeated invocations — predicate stays stable
+ //
---------------------------------------------------------------------------
+
+ it should "produce stable results across repeated processTuple calls" in {
+ val exec = new SubstringSearchOpExec(descJson(substring = "match"))
+ val hit = tuple("match here")
+ val miss = tuple("no signal")
+ assert(exec.processTuple(hit, port = 0).toList == List(hit))
+ assert(exec.processTuple(miss, port = 0).toList.isEmpty)
+ assert(exec.processTuple(hit, port = 0).toList == List(hit))
+ }
+
+ //
---------------------------------------------------------------------------
+ // Descriptor parse failure
+ //
---------------------------------------------------------------------------
+
+ "SubstringSearchOpExec construction" should
+ "throw on malformed descriptor JSON" in {
+ intercept[com.fasterxml.jackson.core.JsonProcessingException] {
+ new SubstringSearchOpExec("{not valid")
+ }
+ }
+}