This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch add-new-unit-tests-for-processing-elements
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit a408969d7a6c7454f21db77f8541e4332b89555c
Author: Philipp Zehnder <[email protected]>
AuthorDate: Tue Mar 17 20:16:52 2026 +0100

    test: add processor framework test coverage
---
 .../pom.xml                                        |  13 +++
 .../jvm/welford/WelfordChangeDetectionTest.java    |  93 +++++++++++++++++++
 .../jvm/processor/jseval/JSEvalProcessorTest.java  |  93 +++++++++++++++++++
 .../valuechange/ValueChangeProcessorTest.java      |  28 ++++++
 .../enrich/MergeByEnrichProcessorTest.java         | 103 +++++++++++++++++++++
 .../processor/limit/RateLimitProcessorTest.java    |  28 ++++++
 .../movingaverage/MovingAverageProcessorTest.java  |  92 ++++++++++++++++++
 .../NumericalTextFilterProcessorTest.java          |  91 ++++++++++++++++++
 .../projection/TestProjectionProcessor.java        |  21 +++++
 .../schema/MergeBySchemaProcessorTest.java         |  99 ++++++++++++++++++++
 .../ThroughputMonitorProcessorTest.java            |  83 +++++++++++++++++
 .../executors/ProcessingElementTestExecutor.java   |  40 +++++++-
 12 files changed, 781 insertions(+), 3 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-processors-change-detection-jvm/pom.xml 
b/streampipes-extensions/streampipes-processors-change-detection-jvm/pom.xml
index 590f738a10..bac2f44b80 100644
--- a/streampipes-extensions/streampipes-processors-change-detection-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/pom.xml
@@ -39,6 +39,19 @@
             <artifactId>streampipes-wrapper-standalone</artifactId>
             <version>0.99.0-SNAPSHOT</version>
         </dependency>
+
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-test-utils-executors</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/test/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetectionTest.java
 
b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/test/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetectionTest.java
new file mode 100644
index 0000000000..5aff5fcd6c
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/test/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetectionTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.changedetection.jvm.welford;
+
+import org.apache.streampipes.test.executors.ProcessingElementTestExecutor;
+import org.apache.streampipes.test.executors.TestConfiguration;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+class WelfordChangeDetectionTest {
+
+  @Test
+  void keepsStableSeriesBelowAlarmThreshold() {
+    var configuration = createConfiguration(0.5d, 5.0d);
+
+    List<Map<String, Object>> inputEvents = List.of(
+        event("temperature", 10.0d),
+        event("temperature", 10.2d),
+        event("temperature", 9.8d),
+        event("temperature", 10.1d)
+    );
+
+    List<Map<String, Object>> expectedEvents = List.of(
+        event("temperature", 10.0d, "changeDetectedLow", false, 
"changeDetectedHigh", false),
+        event("temperature", 10.2d, "changeDetectedLow", false, 
"changeDetectedHigh", false),
+        event("temperature", 9.8d, "changeDetectedLow", false, 
"changeDetectedHigh", false),
+        event("temperature", 10.1d, "changeDetectedLow", false, 
"changeDetectedHigh", false)
+    );
+
+    new ProcessingElementTestExecutor(new WelfordChangeDetection(), 
configuration)
+        .run(inputEvents, expectedEvents);
+  }
+
+  @Test
+  void detectsPositiveLevelShift() {
+    var configuration = createConfiguration(0.1d, 0.5d);
+
+    List<Map<String, Object>> inputEvents = List.of(
+        event("temperature", 10.0d),
+        event("temperature", 10.0d),
+        event("temperature", 10.0d),
+        event("temperature", 20.0d),
+        event("temperature", 20.0d)
+    );
+
+    List<Map<String, Object>> expectedEvents = List.of(
+        event("temperature", 10.0d, "changeDetectedLow", false, 
"changeDetectedHigh", false),
+        event("temperature", 10.0d, "changeDetectedLow", false, 
"changeDetectedHigh", false),
+        event("temperature", 10.0d, "changeDetectedLow", false, 
"changeDetectedHigh", false),
+        event("temperature", 20.0d, "changeDetectedLow", false, 
"changeDetectedHigh", true),
+        event("temperature", 20.0d, "changeDetectedLow", false, 
"changeDetectedHigh", false)
+    );
+
+    new ProcessingElementTestExecutor(new WelfordChangeDetection(), 
configuration)
+        .run(inputEvents, expectedEvents);
+  }
+
+  private TestConfiguration createConfiguration(double k, double h) {
+    return TestConfiguration.builder()
+        .configWithDefaultPrefix("number-mapping", "temperature")
+        .config("param-k", k)
+        .config("param-h", h)
+        .build();
+  }
+
+  private Map<String, Object> event(Object... keyValuePairs) {
+    var event = new LinkedHashMap<String, Object>();
+    for (int i = 0; i < keyValuePairs.length; i += 2) {
+      event.put((String) keyValuePairs[i], keyValuePairs[i + 1]);
+    }
+    return event;
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/jseval/JSEvalProcessorTest.java
 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/jseval/JSEvalProcessorTest.java
new file mode 100644
index 0000000000..03afec1dd8
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/jseval/JSEvalProcessorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.enricher.jvm.processor.jseval;
+
+import org.apache.streampipes.test.executors.ProcessingElementTestExecutor;
+import org.apache.streampipes.test.executors.TestConfiguration;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+class JSEvalProcessorTest {
+
+  @Test
+  void transformsFieldsWithJavascriptFunction() {
+    var configuration = TestConfiguration.builder()
+        .config("jsFunction", """
+            function process(event) {
+              return {
+                id: event.id,
+                temperatureF: (event.temperatureC * 9 / 5) + 32
+              };
+            }
+            """)
+        .build();
+
+    List<Map<String, Object>> inputEvents = List.of(
+        event("id", "sensor-1", "temperatureC", 20.0d),
+        event("id", "sensor-2", "temperatureC", 25.0d)
+    );
+
+    List<Map<String, Object>> expectedEvents = List.of(
+        event("id", "sensor-1", "temperatureF", 68.0d),
+        event("id", "sensor-2", "temperatureF", 77.0d)
+    );
+
+    new ProcessingElementTestExecutor(new JSEvalProcessor(), configuration)
+        .run(inputEvents, expectedEvents);
+  }
+
+  @Test
+  void canReturnSubsetAndDerivedFields() {
+    var configuration = TestConfiguration.builder()
+        .config("jsFunction", """
+            function process(event) {
+              return {
+                label: event.device + "-" + event.status,
+                alert: event.value > 50
+              };
+            }
+            """)
+        .build();
+
+    List<Map<String, Object>> inputEvents = List.of(
+        event("device", "sensor-1", "status", "ok", "value", 42.0d),
+        event("device", "sensor-2", "status", "warn", "value", 65.0d)
+    );
+
+    List<Map<String, Object>> expectedEvents = List.of(
+        event("label", "sensor-1-ok", "alert", false),
+        event("label", "sensor-2-warn", "alert", true)
+    );
+
+    new ProcessingElementTestExecutor(new JSEvalProcessor(), configuration)
+        .run(inputEvents, expectedEvents);
+  }
+
+  private Map<String, Object> event(Object... keyValuePairs) {
+    var event = new LinkedHashMap<String, Object>();
+    for (int i = 0; i < keyValuePairs.length; i += 2) {
+      event.put((String) keyValuePairs[i], keyValuePairs[i + 1]);
+    }
+    return event;
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessorTest.java
 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessorTest.java
index ecbec67bce..618d77b79f 100644
--- 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessorTest.java
+++ 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessorTest.java
@@ -66,4 +66,32 @@ class ValueChangeProcessorTest {
 
     testExecutor.run(inputEvents, expectedEvents);
   }
+
+  @Test
+  void detectsDifferentTransitionConfiguration() {
+    TestConfiguration configuration = TestConfiguration.builder()
+        .configWithDefaultPrefix("change-value-mapping", "numberlist")
+        .config("from-property-value", 2.0f)
+        .config("to-property-value", 5.0f)
+        .build();
+
+    List<Map<String, Object>> inputEvents = List.of(
+        Map.of("timestamp", 1L, "numberlist", 2.0f),
+        Map.of("timestamp", 2L, "numberlist", 5.0f),
+        Map.of("timestamp", 3L, "numberlist", 5.0f),
+        Map.of("timestamp", 4L, "numberlist", 2.0f),
+        Map.of("timestamp", 5L, "numberlist", 5.0f)
+    );
+
+    List<Map<String, Object>> expectedEvents = List.of(
+        Map.of("timestamp", 1L, "numberlist", 2.0f, "isChanged", false),
+        Map.of("timestamp", 2L, "numberlist", 5.0f, "isChanged", true),
+        Map.of("timestamp", 3L, "numberlist", 5.0f, "isChanged", false),
+        Map.of("timestamp", 4L, "numberlist", 2.0f, "isChanged", false),
+        Map.of("timestamp", 5L, "numberlist", 5.0f, "isChanged", true)
+    );
+
+    new ProcessingElementTestExecutor(new ValueChangeProcessor(), 
configuration)
+        .run(inputEvents, expectedEvents);
+  }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichProcessorTest.java
 
b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichProcessorTest.java
new file mode 100644
index 0000000000..70df917d8b
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichProcessorTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.filters.jvm.processor.enrich;
+
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.output.CustomOutputStrategy;
+import org.apache.streampipes.test.executors.ProcessingElementTestExecutor;
+import org.apache.streampipes.test.executors.TestConfiguration;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+class MergeByEnrichProcessorTest {
+
+  @Test
+  void enrichesSelectedFirstStreamWithLastSecondStreamEvent() {
+    var configuration = TestConfiguration.builder()
+        .customPrefixStrategy(List.of("s1", "s0", "s1", "s0"))
+        .config("select-stream", "Stream 1")
+        .build();
+
+    List<Map<String, Object>> inputEvents = List.of(
+        event("location", "room-1"),
+        event("deviceId", "sensor-1", "temperature", 21.5d),
+        event("location", "room-2"),
+        event("deviceId", "sensor-2", "temperature", 22.0d)
+    );
+
+    List<Map<String, Object>> expectedEvents = List.of(
+        event("deviceId", "sensor-1", "temperature", 21.5d, "location", 
"room-1"),
+        event("deviceId", "sensor-2", "temperature", 22.0d, "location", 
"room-2")
+    );
+
+    new ProcessingElementTestExecutor(
+        new MergeByEnrichProcessor(),
+        configuration,
+        setOutputKeys("s0::deviceId", "s0::temperature", "s1::location")
+    ).run(inputEvents, expectedEvents);
+  }
+
+  @Test
+  void enrichesSelectedSecondStreamWithLastFirstStreamEvent() {
+    var configuration = TestConfiguration.builder()
+        .customPrefixStrategy(List.of("s0", "s1", "s0", "s1"))
+        .config("select-stream", "Stream 2")
+        .build();
+
+    List<Map<String, Object>> inputEvents = List.of(
+        event("deviceId", "sensor-1"),
+        event("location", "room-1", "humidity", 40.0d),
+        event("deviceId", "sensor-2"),
+        event("location", "room-2", "humidity", 41.5d)
+    );
+
+    List<Map<String, Object>> expectedEvents = List.of(
+        event("deviceId", "sensor-1", "location", "room-1", "humidity", 40.0d),
+        event("deviceId", "sensor-2", "location", "room-2", "humidity", 41.5d)
+    );
+
+    new ProcessingElementTestExecutor(
+        new MergeByEnrichProcessor(),
+        configuration,
+        setOutputKeys("s0::deviceId", "s1::location", "s1::humidity")
+    ).run(inputEvents, expectedEvents);
+  }
+
+  private Consumer<DataProcessorInvocation> setOutputKeys(String... selectors) 
{
+    return invocation -> invocation.getOutputStrategies()
+        .stream()
+        .filter(CustomOutputStrategy.class::isInstance)
+        .map(CustomOutputStrategy.class::cast)
+        .findFirst()
+        .ifPresent(strategy -> 
strategy.setSelectedPropertyKeys(List.of(selectors)));
+  }
+
+  private Map<String, Object> event(Object... keyValuePairs) {
+    var event = new LinkedHashMap<String, Object>();
+    for (int i = 0; i < keyValuePairs.length; i += 2) {
+      event.put((String) keyValuePairs[i], keyValuePairs[i + 1]);
+    }
+    return event;
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitProcessorTest.java
 
b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitProcessorTest.java
index eeefb3b84d..35a81bf9eb 100644
--- 
a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitProcessorTest.java
+++ 
b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitProcessorTest.java
@@ -61,4 +61,32 @@ class RateLimitProcessorTest {
 
     testExecutor.run(inputEvents, expectedEvents);
   }
+
+  @Test
+  void forwardsFirstEventOfEachLengthWindow() {
+    TestConfiguration configuration = TestConfiguration.builder()
+        .config("grouping-enabled", "False")
+        .configWithDefaultPrefix("grouping-field", "randomnumber")
+        .config("window-type", "length-window")
+        .config("length-window-size", 3)
+        .config("event-selection", "First")
+        .build();
+
+    List<Map<String, Object>> inputEvents = List.of(
+        Map.of("timestamp", 1L, "randomnumber", 10.0d),
+        Map.of("timestamp", 2L, "randomnumber", 11.0d),
+        Map.of("timestamp", 3L, "randomnumber", 12.0d),
+        Map.of("timestamp", 4L, "randomnumber", 13.0d),
+        Map.of("timestamp", 5L, "randomnumber", 14.0d),
+        Map.of("timestamp", 6L, "randomnumber", 15.0d)
+    );
+
+    List<Map<String, Object>> expectedEvents = List.of(
+        Map.of("timestamp", 1L, "randomnumber", 10.0d),
+        Map.of("timestamp", 4L, "randomnumber", 13.0d)
+    );
+
+    new ProcessingElementTestExecutor(new RateLimitProcessor(), configuration)
+        .run(inputEvents, expectedEvents);
+  }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/movingaverage/MovingAverageProcessorTest.java
 
b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/movingaverage/MovingAverageProcessorTest.java
new file mode 100644
index 0000000000..c7d3d4155f
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/movingaverage/MovingAverageProcessorTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.filters.jvm.processor.movingaverage;
+
+import org.apache.streampipes.test.executors.Approx;
+import org.apache.streampipes.test.executors.ProcessingElementTestExecutor;
+import org.apache.streampipes.test.executors.TestConfiguration;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+class MovingAverageProcessorTest {
+
+  @Test
+  void meanMethodUsesSlidingAverage() {
+    var configuration = createConfiguration(3, "mean");
+
+    List<Map<String, Object>> inputEvents = List.of(
+        event("timestamp", 1L, "temperature", 1.0d),
+        event("timestamp", 2L, "temperature", 2.0d),
+        event("timestamp", 3L, "temperature", 3.0d),
+        event("timestamp", 4L, "temperature", 6.0d)
+    );
+
+    List<Map<String, Object>> expectedEvents = List.of(
+        event("timestamp", 1L, "temperature", 1.0d, "filterResult", new 
Approx(1.0d, 0.0000001d)),
+        event("timestamp", 2L, "temperature", 2.0d, "filterResult", new 
Approx(1.5d, 0.0000001d)),
+        event("timestamp", 3L, "temperature", 3.0d, "filterResult", new 
Approx(2.0d, 0.0000001d)),
+        event("timestamp", 4L, "temperature", 6.0d, "filterResult", new 
Approx(3.6666666667d, 0.0000001d))
+    );
+
+    new ProcessingElementTestExecutor(new MovingAverageProcessor(), 
configuration)
+        .run(inputEvents, expectedEvents);
+  }
+
+  @Test
+  void medianMethodHandlesOutliers() {
+    var configuration = createConfiguration(3, "median");
+
+    List<Map<String, Object>> inputEvents = List.of(
+        event("timestamp", 1L, "temperature", 1.0d),
+        event("timestamp", 2L, "temperature", 100.0d),
+        event("timestamp", 3L, "temperature", 2.0d),
+        event("timestamp", 4L, "temperature", 3.0d)
+    );
+
+    List<Map<String, Object>> expectedEvents = List.of(
+        event("timestamp", 1L, "temperature", 1.0d, "filterResult", new 
Approx(1.0d, 0.0000001d)),
+        event("timestamp", 2L, "temperature", 100.0d, "filterResult", new 
Approx(50.5d, 0.0000001d)),
+        event("timestamp", 3L, "temperature", 2.0d, "filterResult", new 
Approx(2.0d, 0.0000001d)),
+        event("timestamp", 4L, "temperature", 3.0d, "filterResult", new 
Approx(3.0d, 0.0000001d))
+    );
+
+    new ProcessingElementTestExecutor(new MovingAverageProcessor(), 
configuration)
+        .run(inputEvents, expectedEvents);
+  }
+
+  private TestConfiguration createConfiguration(int windowSize, String method) 
{
+    return TestConfiguration.builder()
+        .configWithDefaultPrefix("number", "temperature")
+        .config("n", windowSize)
+        .config("method", method)
+        .build();
+  }
+
+  private Map<String, Object> event(Object... keyValuePairs) {
+    var event = new LinkedHashMap<String, Object>();
+    for (int i = 0; i < keyValuePairs.length; i += 2) {
+      event.put((String) keyValuePairs[i], keyValuePairs[i + 1]);
+    }
+    return event;
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterProcessorTest.java
 
b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterProcessorTest.java
new file mode 100644
index 0000000000..15a36be0a3
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterProcessorTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package 
org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter;
+
+import org.apache.streampipes.test.executors.ProcessingElementTestExecutor;
+import org.apache.streampipes.test.executors.TestConfiguration;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+class NumericalTextFilterProcessorTest {
+
+  @Test
+  void matchesEventsWhenBothFiltersAreSatisfied() {
+    var configuration = createConfiguration(">", 20.0d, "MATCHES", "active");
+
+    List<Map<String, Object>> inputEvents = List.of(
+        event("temperature", 25.5d, "status", "active", "timestamp", 1L),
+        event("temperature", 25.5d, "status", "inactive", "timestamp", 2L),
+        event("temperature", 15.0d, "status", "active", "timestamp", 3L)
+    );
+
+    List<Map<String, Object>> expectedEvents = List.of(
+        event("temperature", 25.5d, "status", "active", "timestamp", 1L)
+    );
+
+    new ProcessingElementTestExecutor(new NumericalTextFilterProcessor(), 
configuration)
+        .run(inputEvents, expectedEvents);
+  }
+
+  @Test
+  void supportsContainsAndLessThanOrEqual() {
+    var configuration = createConfiguration("<=", 50.0d, "CONTAINS", "warn");
+
+    List<Map<String, Object>> inputEvents = List.of(
+        event("temperature", 45.0d, "status", "warn: high vibration", 
"timestamp", 1L),
+        event("temperature", 55.0d, "status", "warn: high vibration", 
"timestamp", 2L),
+        event("temperature", 45.0d, "status", "normal", "timestamp", 3L)
+    );
+
+    List<Map<String, Object>> expectedEvents = List.of(
+        event("temperature", 45.0d, "status", "warn: high vibration", 
"timestamp", 1L)
+    );
+
+    new ProcessingElementTestExecutor(new NumericalTextFilterProcessor(), 
configuration)
+        .run(inputEvents, expectedEvents);
+  }
+
+  private TestConfiguration createConfiguration(
+      String numberOperation,
+      double numberValue,
+      String textOperation,
+      String keyword
+  ) {
+    return TestConfiguration.builder()
+        .configWithDefaultPrefix("number-mapping", "temperature")
+        .config("number-operation", numberOperation)
+        .config("number-value", numberValue)
+        .configWithDefaultPrefix("text-mapping", "status")
+        .config("text-operation", textOperation)
+        .config("text-keyword", keyword)
+        .build();
+  }
+
+  private Map<String, Object> event(Object... keyValuePairs) {
+    var event = new LinkedHashMap<String, Object>();
+    for (int i = 0; i < keyValuePairs.length; i += 2) {
+      event.put((String) keyValuePairs[i], keyValuePairs[i + 1]);
+    }
+    return event;
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/projection/TestProjectionProcessor.java
 
b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/projection/TestProjectionProcessor.java
index be831d1385..98dbd7c0e1 100644
--- 
a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/projection/TestProjectionProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/projection/TestProjectionProcessor.java
@@ -90,4 +90,25 @@ class TestProjectionProcessor {
     testExecutor.run(events, outputEvents);
   }
 
+  @Test
+  public void projectsSingleField() {
+    var configuration = TestConfiguration
+        .builder()
+        .customOutputStrategy(List.of("remove"))
+        .build();
+
+    List<Map<String, Object>> events = List.of(
+        Map.of("timestamp", 1L, "remove", 62.0d, "a", "x"),
+        Map.of("timestamp", 2L, "remove", 56.0d, "a", "y")
+    );
+
+    List<Map<String, Object>> outputEvents = List.of(
+        Map.of("remove", 62.0d),
+        Map.of("remove", 56.0d)
+    );
+
+    new ProcessingElementTestExecutor(new ProjectionProcessor(), configuration)
+        .run(events, outputEvents);
+  }
+
 }
diff --git 
a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/schema/MergeBySchemaProcessorTest.java
 
b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/schema/MergeBySchemaProcessorTest.java
new file mode 100644
index 0000000000..6ca9a67adb
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/schema/MergeBySchemaProcessorTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.filters.jvm.processor.schema;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.test.executors.PrefixStrategy;
+import org.apache.streampipes.test.executors.ProcessingElementTestExecutor;
+import org.apache.streampipes.test.executors.TestConfiguration;
+import org.apache.streampipes.test.generator.EventStreamGenerator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class MergeBySchemaProcessorTest {
+
+  @Test
+  void forwardsEventsWhenSchemasMatch() {
+    var configuration = TestConfiguration.builder()
+        .prefixStrategy(PrefixStrategy.ALTERNATE)
+        .build();
+
+    var invocationConfig = withInputStreams(
+        EventStreamGenerator.makeStreamWithProperties(List.of("timestamp", 
"temperature")),
+        EventStreamGenerator.makeStreamWithProperties(List.of("timestamp", 
"temperature"))
+    );
+
+    List<Map<String, Object>> inputEvents = List.of(
+        event("timestamp", 1L, "temperature", 25.5d),
+        event("timestamp", 2L, "temperature", 26.0d)
+    );
+
+    List<Map<String, Object>> expectedEvents = List.of(
+        event("timestamp", 1L, "temperature", 25.5d),
+        event("timestamp", 2L, "temperature", 26.0d)
+    );
+
+    new ProcessingElementTestExecutor(new MergeBySchemaProcessor(), 
configuration, invocationConfig)
+        .run(inputEvents, expectedEvents);
+  }
+
+  @Test
+  void rejectsDifferentSchemas() {
+    var configuration = TestConfiguration.builder()
+        .prefixStrategy(PrefixStrategy.ALTERNATE)
+        .build();
+
+    var invocationConfig = withInputStreams(
+        EventStreamGenerator.makeStreamWithProperties(List.of("timestamp", 
"temperature")),
+        EventStreamGenerator.makeStreamWithProperties(List.of("timestamp", 
"humidity"))
+    );
+
+    List<Map<String, Object>> inputEvents = List.of(
+        event("timestamp", 1L, "temperature", 25.5d),
+        event("timestamp", 2L, "humidity", 60.0d)
+    );
+
+    assertThrows(
+        SpRuntimeException.class,
+        () -> new ProcessingElementTestExecutor(new MergeBySchemaProcessor(), 
configuration, invocationConfig)
+            .run(inputEvents, List.of())
+    );
+  }
+
+  private Consumer<DataProcessorInvocation> withInputStreams(SpDataStream 
firstStream, SpDataStream secondStream) {
+    return invocation -> invocation.setInputStreams(List.of(firstStream, 
secondStream));
+  }
+
+  private Map<String, Object> event(Object... keyValuePairs) {
+    var event = new LinkedHashMap<String, Object>();
+    for (int i = 0; i < keyValuePairs.length; i += 2) {
+      event.put((String) keyValuePairs[i], keyValuePairs[i + 1]);
+    }
+    return event;
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/throughputmon/ThroughputMonitorProcessorTest.java
 
b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/throughputmon/ThroughputMonitorProcessorTest.java
new file mode 100644
index 0000000000..d508c3a76c
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/throughputmon/ThroughputMonitorProcessorTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.filters.jvm.processor.throughputmon;
+
+import org.apache.streampipes.test.executors.ProcessingElementTestExecutor;
+import org.apache.streampipes.test.executors.TestConfiguration;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+class ThroughputMonitorProcessorTest {
+
+  @Test
+  void emitsStatisticsAfterConfiguredBatchSize() {
+    var configuration = TestConfiguration.builder()
+        .config("batch-window-key", 2)
+        .build();
+
+    List<Map<String, Object>> inputEvents = List.of(
+        event("value", 1),
+        event("value", 2),
+        event("value", 3)
+    );
+
+    List<Map<String, Object>> expectedEvents = List.of(
+        event("eventcount", 2)
+    );
+
+    new ProcessingElementTestExecutor(new ThroughputMonitorProcessor(), 
configuration)
+        .run(inputEvents, expectedEvents);
+  }
+
+  @Test
+  void resetsAfterEachBatchWindow() {
+    var configuration = TestConfiguration.builder()
+        .config("batch-window-key", 3)
+        .build();
+
+    List<Map<String, Object>> inputEvents = List.of(
+        event("value", 1),
+        event("value", 2),
+        event("value", 3),
+        event("value", 4),
+        event("value", 5),
+        event("value", 6)
+    );
+
+    List<Map<String, Object>> expectedEvents = List.of(
+        event("eventcount", 3),
+        event("eventcount", 3)
+    );
+
+    new ProcessingElementTestExecutor(new ThroughputMonitorProcessor(), 
configuration)
+        .run(inputEvents, expectedEvents);
+  }
+
+  private Map<String, Object> event(Object... keyValuePairs) {
+    var event = new LinkedHashMap<String, Object>();
+    for (int i = 0; i < keyValuePairs.length; i += 2) {
+      event.put((String) keyValuePairs[i], keyValuePairs[i + 1]);
+    }
+    return event;
+  }
+}
diff --git 
a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java
 
b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java
index 0d7c686364..679fb8bf83 100644
--- 
a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java
+++ 
b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java
@@ -20,8 +20,10 @@ package org.apache.streampipes.test.executors;
 
 import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
 import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.param.InputStreamParams;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.manager.template.DataProcessorTemplateHandler;
+import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.output.CustomOutputStrategy;
 import org.apache.streampipes.model.runtime.Event;
@@ -45,7 +47,6 @@ import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 
 public class ProcessingElementTestExecutor {
@@ -98,8 +99,18 @@ public class ProcessingElementTestExecutor {
 
     var mockParams = mock(IDataProcessorParameters.class);
 
-    when(mockParams.getModel()).thenReturn(dataProcessorInvocation);
-    when(mockParams.extractor()).thenReturn(extractor);
+    Mockito.doReturn(dataProcessorInvocation)
+        .when(mockParams)
+        .getModel();
+    Mockito.doReturn(extractor)
+        .when(mockParams)
+        .extractor();
+
+    if (canProvideInputStreamParams(dataProcessorInvocation)) {
+      Mockito.doReturn(getInputStreamParams(dataProcessorInvocation))
+          .when(mockParams)
+          .getInputStreamParams();
+    }
 
     // calls the onPipelineStarted method of the processor to initialize it
     processor.onPipelineStarted(mockParams, mockCollector, null);
@@ -225,4 +236,27 @@ public class ProcessingElementTestExecutor {
 
   }
 
+  private List<InputStreamParams> getInputStreamParams(DataProcessorInvocation 
dataProcessorInvocation) {
+    return IntStream.range(0, dataProcessorInvocation.getInputStreams().size())
+        .mapToObj(index -> makeInputStreamParams(index, 
dataProcessorInvocation.getInputStreams().get(index)))
+        .toList();
+  }
+
+  private boolean canProvideInputStreamParams(DataProcessorInvocation 
dataProcessorInvocation) {
+    return !dataProcessorInvocation.getInputStreams()
+                                   .isEmpty()
+           && dataProcessorInvocation.getInputStreams()
+                                     .stream()
+                                     .noneMatch(this::isMissingGrounding);
+  }
+
+  private boolean isMissingGrounding(SpDataStream inputStream) {
+    return inputStream.getEventGrounding() == null || 
inputStream.getEventGrounding()
+                                                             
.getTransportProtocol() == null;
+  }
+
+  private InputStreamParams makeInputStreamParams(Integer streamIndex, 
SpDataStream inputStream) {
+    return new InputStreamParams(streamIndex, inputStream, List.of());
+  }
+
 }


Reply via email to