This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch add-new-unit-tests-for-processing-elements in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit a408969d7a6c7454f21db77f8541e4332b89555c Author: Philipp Zehnder <[email protected]> AuthorDate: Tue Mar 17 20:16:52 2026 +0100 test: add processor framework test coverage --- .../pom.xml | 13 +++ .../jvm/welford/WelfordChangeDetectionTest.java | 93 +++++++++++++++++++ .../jvm/processor/jseval/JSEvalProcessorTest.java | 93 +++++++++++++++++++ .../valuechange/ValueChangeProcessorTest.java | 28 ++++++ .../enrich/MergeByEnrichProcessorTest.java | 103 +++++++++++++++++++++ .../processor/limit/RateLimitProcessorTest.java | 28 ++++++ .../movingaverage/MovingAverageProcessorTest.java | 92 ++++++++++++++++++ .../NumericalTextFilterProcessorTest.java | 91 ++++++++++++++++++ .../projection/TestProjectionProcessor.java | 21 +++++ .../schema/MergeBySchemaProcessorTest.java | 99 ++++++++++++++++++++ .../ThroughputMonitorProcessorTest.java | 83 +++++++++++++++++ .../executors/ProcessingElementTestExecutor.java | 40 +++++++- 12 files changed, 781 insertions(+), 3 deletions(-) diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/pom.xml b/streampipes-extensions/streampipes-processors-change-detection-jvm/pom.xml index 590f738a10..bac2f44b80 100644 --- a/streampipes-extensions/streampipes-processors-change-detection-jvm/pom.xml +++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/pom.xml @@ -39,6 +39,19 @@ <artifactId>streampipes-wrapper-standalone</artifactId> <version>0.99.0-SNAPSHOT</version> </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.streampipes</groupId> + <artifactId>streampipes-test-utils-executors</artifactId> + <version>0.99.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/test/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetectionTest.java b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/test/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetectionTest.java new file mode 100644 index 0000000000..5aff5fcd6c --- /dev/null +++ b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/test/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetectionTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.changedetection.jvm.welford; + +import org.apache.streampipes.test.executors.ProcessingElementTestExecutor; +import org.apache.streampipes.test.executors.TestConfiguration; + +import org.junit.jupiter.api.Test; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +class WelfordChangeDetectionTest { + + @Test + void keepsStableSeriesBelowAlarmThreshold() { + var configuration = createConfiguration(0.5d, 5.0d); + + List<Map<String, Object>> inputEvents = List.of( + event("temperature", 10.0d), + event("temperature", 10.2d), + event("temperature", 9.8d), + event("temperature", 10.1d) + ); + + List<Map<String, Object>> expectedEvents = List.of( + event("temperature", 10.0d, "changeDetectedLow", false, "changeDetectedHigh", false), + event("temperature", 10.2d, "changeDetectedLow", false, "changeDetectedHigh", false), + event("temperature", 9.8d, "changeDetectedLow", false, "changeDetectedHigh", false), + event("temperature", 10.1d, "changeDetectedLow", false, "changeDetectedHigh", false) + ); + + new ProcessingElementTestExecutor(new WelfordChangeDetection(), configuration) + .run(inputEvents, expectedEvents); + } + + @Test + void detectsPositiveLevelShift() { + var configuration = createConfiguration(0.1d, 0.5d); + + List<Map<String, Object>> inputEvents = List.of( + event("temperature", 10.0d), + event("temperature", 10.0d), + event("temperature", 10.0d), + event("temperature", 20.0d), + event("temperature", 20.0d) + ); + + List<Map<String, Object>> expectedEvents = List.of( + event("temperature", 10.0d, "changeDetectedLow", false, "changeDetectedHigh", false), + event("temperature", 10.0d, "changeDetectedLow", false, "changeDetectedHigh", false), + event("temperature", 10.0d, "changeDetectedLow", false, "changeDetectedHigh", false), + event("temperature", 20.0d, "changeDetectedLow", false, "changeDetectedHigh", true), + event("temperature", 20.0d, "changeDetectedLow", false, "changeDetectedHigh", false) + ); + + new ProcessingElementTestExecutor(new WelfordChangeDetection(), configuration) + .run(inputEvents, expectedEvents); + } + + private TestConfiguration createConfiguration(double k, double h) { + return TestConfiguration.builder() + .configWithDefaultPrefix("number-mapping", "temperature") + .config("param-k", k) + .config("param-h", h) + .build(); + } + + private Map<String, Object> event(Object... keyValuePairs) { + var event = new LinkedHashMap<String, Object>(); + for (int i = 0; i < keyValuePairs.length; i += 2) { + event.put((String) keyValuePairs[i], keyValuePairs[i + 1]); + } + return event; + } +} diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/jseval/JSEvalProcessorTest.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/jseval/JSEvalProcessorTest.java new file mode 100644 index 0000000000..03afec1dd8 --- /dev/null +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/jseval/JSEvalProcessorTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.enricher.jvm.processor.jseval; + +import org.apache.streampipes.test.executors.ProcessingElementTestExecutor; +import org.apache.streampipes.test.executors.TestConfiguration; + +import org.junit.jupiter.api.Test; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +class JSEvalProcessorTest { + + @Test + void transformsFieldsWithJavascriptFunction() { + var configuration = TestConfiguration.builder() + .config("jsFunction", """ + function process(event) { + return { + id: event.id, + temperatureF: (event.temperatureC * 9 / 5) + 32 + }; + } + """) + .build(); + + List<Map<String, Object>> inputEvents = List.of( + event("id", "sensor-1", "temperatureC", 20.0d), + event("id", "sensor-2", "temperatureC", 25.0d) + ); + + List<Map<String, Object>> expectedEvents = List.of( + event("id", "sensor-1", "temperatureF", 68.0d), + event("id", "sensor-2", "temperatureF", 77.0d) + ); + + new ProcessingElementTestExecutor(new JSEvalProcessor(), configuration) + .run(inputEvents, expectedEvents); + } + + @Test + void canReturnSubsetAndDerivedFields() { + var configuration = TestConfiguration.builder() + .config("jsFunction", """ + function process(event) { + return { + label: event.device + "-" + event.status, + alert: event.value > 50 + }; + } + """) + .build(); + + List<Map<String, Object>> inputEvents = List.of( + event("device", "sensor-1", "status", "ok", "value", 42.0d), + event("device", "sensor-2", "status", "warn", "value", 65.0d) + ); + + List<Map<String, Object>> expectedEvents = List.of( + event("label", "sensor-1-ok", "alert", false), + event("label", "sensor-2-warn", "alert", true) + ); + + new ProcessingElementTestExecutor(new JSEvalProcessor(), configuration) + .run(inputEvents, expectedEvents); + } + + private Map<String, Object> event(Object... keyValuePairs) { + var event = new LinkedHashMap<String, Object>(); + for (int i = 0; i < keyValuePairs.length; i += 2) { + event.put((String) keyValuePairs[i], keyValuePairs[i + 1]); + } + return event; + } +} diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessorTest.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessorTest.java index ecbec67bce..618d77b79f 100644 --- a/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessorTest.java +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessorTest.java @@ -66,4 +66,32 @@ class ValueChangeProcessorTest { testExecutor.run(inputEvents, expectedEvents); } + + @Test + void detectsDifferentTransitionConfiguration() { + TestConfiguration configuration = TestConfiguration.builder() + .configWithDefaultPrefix("change-value-mapping", "numberlist") + .config("from-property-value", 2.0f) + .config("to-property-value", 5.0f) + .build(); + + List<Map<String, Object>> inputEvents = List.of( + Map.of("timestamp", 1L, "numberlist", 2.0f), + Map.of("timestamp", 2L, "numberlist", 5.0f), + Map.of("timestamp", 3L, "numberlist", 5.0f), + Map.of("timestamp", 4L, "numberlist", 2.0f), + Map.of("timestamp", 5L, "numberlist", 5.0f) + ); + + List<Map<String, Object>> expectedEvents = List.of( + Map.of("timestamp", 1L, "numberlist", 2.0f, "isChanged", false), + Map.of("timestamp", 2L, "numberlist", 5.0f, "isChanged", true), + Map.of("timestamp", 3L, "numberlist", 5.0f, "isChanged", false), + Map.of("timestamp", 4L, "numberlist", 2.0f, "isChanged", false), + Map.of("timestamp", 5L, "numberlist", 5.0f, "isChanged", true) + ); + + new ProcessingElementTestExecutor(new ValueChangeProcessor(), configuration) + .run(inputEvents, expectedEvents); + } } diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichProcessorTest.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichProcessorTest.java new file mode 100644 index 0000000000..70df917d8b --- /dev/null +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichProcessorTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.filters.jvm.processor.enrich; + +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.output.CustomOutputStrategy; +import org.apache.streampipes.test.executors.ProcessingElementTestExecutor; +import org.apache.streampipes.test.executors.TestConfiguration; + +import org.junit.jupiter.api.Test; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +class MergeByEnrichProcessorTest { + + @Test + void enrichesSelectedFirstStreamWithLastSecondStreamEvent() { + var configuration = TestConfiguration.builder() + .customPrefixStrategy(List.of("s1", "s0", "s1", "s0")) + .config("select-stream", "Stream 1") + .build(); + + List<Map<String, Object>> inputEvents = List.of( + event("location", "room-1"), + event("deviceId", "sensor-1", "temperature", 21.5d), + event("location", "room-2"), + event("deviceId", "sensor-2", "temperature", 22.0d) + ); + + List<Map<String, Object>> expectedEvents = List.of( + event("deviceId", "sensor-1", "temperature", 21.5d, "location", "room-1"), + event("deviceId", "sensor-2", "temperature", 22.0d, "location", "room-2") + ); + + new ProcessingElementTestExecutor( + new MergeByEnrichProcessor(), + configuration, + setOutputKeys("s0::deviceId", "s0::temperature", "s1::location") + ).run(inputEvents, expectedEvents); + } + + @Test + void enrichesSelectedSecondStreamWithLastFirstStreamEvent() { + var configuration = TestConfiguration.builder() + .customPrefixStrategy(List.of("s0", "s1", "s0", "s1")) + .config("select-stream", "Stream 2") + .build(); + + List<Map<String, Object>> inputEvents = List.of( + event("deviceId", "sensor-1"), + event("location", "room-1", "humidity", 40.0d), + event("deviceId", "sensor-2"), + event("location", "room-2", "humidity", 41.5d) + ); + + List<Map<String, Object>> expectedEvents = List.of( + event("deviceId", "sensor-1", "location", "room-1", "humidity", 40.0d), + event("deviceId", "sensor-2", "location", "room-2", "humidity", 41.5d) + ); + + new ProcessingElementTestExecutor( + new MergeByEnrichProcessor(), + configuration, + setOutputKeys("s0::deviceId", "s1::location", "s1::humidity") + ).run(inputEvents, expectedEvents); + } + + private Consumer<DataProcessorInvocation> setOutputKeys(String... selectors) { + return invocation -> invocation.getOutputStrategies() + .stream() + .filter(CustomOutputStrategy.class::isInstance) + .map(CustomOutputStrategy.class::cast) + .findFirst() + .ifPresent(strategy -> strategy.setSelectedPropertyKeys(List.of(selectors))); + } + + private Map<String, Object> event(Object... keyValuePairs) { + var event = new LinkedHashMap<String, Object>(); + for (int i = 0; i < keyValuePairs.length; i += 2) { + event.put((String) keyValuePairs[i], keyValuePairs[i + 1]); + } + return event; + } +} diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitProcessorTest.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitProcessorTest.java index eeefb3b84d..35a81bf9eb 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitProcessorTest.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitProcessorTest.java @@ -61,4 +61,32 @@ class RateLimitProcessorTest { testExecutor.run(inputEvents, expectedEvents); } + + @Test + void forwardsFirstEventOfEachLengthWindow() { + TestConfiguration configuration = TestConfiguration.builder() + .config("grouping-enabled", "False") + .configWithDefaultPrefix("grouping-field", "randomnumber") + .config("window-type", "length-window") + .config("length-window-size", 3) + .config("event-selection", "First") + .build(); + + List<Map<String, Object>> inputEvents = List.of( + Map.of("timestamp", 1L, "randomnumber", 10.0d), + Map.of("timestamp", 2L, "randomnumber", 11.0d), + Map.of("timestamp", 3L, "randomnumber", 12.0d), + Map.of("timestamp", 4L, "randomnumber", 13.0d), + Map.of("timestamp", 5L, "randomnumber", 14.0d), + Map.of("timestamp", 6L, "randomnumber", 15.0d) + ); + + List<Map<String, Object>> expectedEvents = List.of( + Map.of("timestamp", 1L, "randomnumber", 10.0d), + Map.of("timestamp", 4L, "randomnumber", 13.0d) + ); + + new ProcessingElementTestExecutor(new RateLimitProcessor(), configuration) + .run(inputEvents, expectedEvents); + } } diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/movingaverage/MovingAverageProcessorTest.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/movingaverage/MovingAverageProcessorTest.java new file mode 100644 index 0000000000..c7d3d4155f --- /dev/null +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/movingaverage/MovingAverageProcessorTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.filters.jvm.processor.movingaverage; + +import org.apache.streampipes.test.executors.Approx; +import org.apache.streampipes.test.executors.ProcessingElementTestExecutor; +import org.apache.streampipes.test.executors.TestConfiguration; + +import org.junit.jupiter.api.Test; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +class MovingAverageProcessorTest { + + @Test + void meanMethodUsesSlidingAverage() { + var configuration = createConfiguration(3, "mean"); + + List<Map<String, Object>> inputEvents = List.of( + event("timestamp", 1L, "temperature", 1.0d), + event("timestamp", 2L, "temperature", 2.0d), + event("timestamp", 3L, "temperature", 3.0d), + event("timestamp", 4L, "temperature", 6.0d) + ); + + List<Map<String, Object>> expectedEvents = List.of( + event("timestamp", 1L, "temperature", 1.0d, "filterResult", new Approx(1.0d, 0.0000001d)), + event("timestamp", 2L, "temperature", 2.0d, "filterResult", new Approx(1.5d, 0.0000001d)), + event("timestamp", 3L, "temperature", 3.0d, "filterResult", new Approx(2.0d, 0.0000001d)), + event("timestamp", 4L, "temperature", 6.0d, "filterResult", new Approx(3.6666666667d, 0.0000001d)) + ); + + new ProcessingElementTestExecutor(new MovingAverageProcessor(), configuration) + .run(inputEvents, expectedEvents); + } + + @Test + void medianMethodHandlesOutliers() { + var configuration = createConfiguration(3, "median"); + + List<Map<String, Object>> inputEvents = List.of( + event("timestamp", 1L, "temperature", 1.0d), + event("timestamp", 2L, "temperature", 100.0d), + event("timestamp", 3L, "temperature", 2.0d), + event("timestamp", 4L, "temperature", 3.0d) + ); + + List<Map<String, Object>> expectedEvents = List.of( + event("timestamp", 1L, "temperature", 1.0d, "filterResult", new Approx(1.0d, 0.0000001d)), + event("timestamp", 2L, "temperature", 100.0d, "filterResult", new Approx(50.5d, 0.0000001d)), + event("timestamp", 3L, "temperature", 2.0d, "filterResult", new Approx(2.0d, 0.0000001d)), + event("timestamp", 4L, "temperature", 3.0d, "filterResult", new Approx(3.0d, 0.0000001d)) + ); + + new ProcessingElementTestExecutor(new MovingAverageProcessor(), configuration) + .run(inputEvents, expectedEvents); + } + + private TestConfiguration createConfiguration(int windowSize, String method) { + return TestConfiguration.builder() + .configWithDefaultPrefix("number", "temperature") + .config("n", windowSize) + .config("method", method) + .build(); + } + + private Map<String, Object> event(Object... keyValuePairs) { + var event = new LinkedHashMap<String, Object>(); + for (int i = 0; i < keyValuePairs.length; i += 2) { + event.put((String) keyValuePairs[i], keyValuePairs[i + 1]); + } + return event; + } +} diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterProcessorTest.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterProcessorTest.java new file mode 100644 index 0000000000..15a36be0a3 --- /dev/null +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterProcessorTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter; + +import org.apache.streampipes.test.executors.ProcessingElementTestExecutor; +import org.apache.streampipes.test.executors.TestConfiguration; + +import org.junit.jupiter.api.Test; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +class NumericalTextFilterProcessorTest { + + @Test + void matchesEventsWhenBothFiltersAreSatisfied() { + var configuration = createConfiguration(">", 20.0d, "MATCHES", "active"); + + List<Map<String, Object>> inputEvents = List.of( + event("temperature", 25.5d, "status", "active", "timestamp", 1L), + event("temperature", 25.5d, "status", "inactive", "timestamp", 2L), + event("temperature", 15.0d, "status", "active", "timestamp", 3L) + ); + + List<Map<String, Object>> expectedEvents = List.of( + event("temperature", 25.5d, "status", "active", "timestamp", 1L) + ); + + new ProcessingElementTestExecutor(new NumericalTextFilterProcessor(), configuration) + .run(inputEvents, expectedEvents); + } + + @Test + void supportsContainsAndLessThanOrEqual() { + var configuration = createConfiguration("<=", 50.0d, "CONTAINS", "warn"); + + List<Map<String, Object>> inputEvents = List.of( + event("temperature", 45.0d, "status", "warn: high vibration", "timestamp", 1L), + event("temperature", 55.0d, "status", "warn: high vibration", "timestamp", 2L), + event("temperature", 45.0d, "status", "normal", "timestamp", 3L) + ); + + List<Map<String, Object>> expectedEvents = List.of( + event("temperature", 45.0d, "status", "warn: high vibration", "timestamp", 1L) + ); + + new ProcessingElementTestExecutor(new NumericalTextFilterProcessor(), configuration) + .run(inputEvents, expectedEvents); + } + + private TestConfiguration createConfiguration( + String numberOperation, + double numberValue, + String textOperation, + String keyword + ) { + return TestConfiguration.builder() + .configWithDefaultPrefix("number-mapping", "temperature") + .config("number-operation", numberOperation) + .config("number-value", numberValue) + .configWithDefaultPrefix("text-mapping", "status") + .config("text-operation", textOperation) + .config("text-keyword", keyword) + .build(); + } + + private Map<String, Object> event(Object... keyValuePairs) { + var event = new LinkedHashMap<String, Object>(); + for (int i = 0; i < keyValuePairs.length; i += 2) { + event.put((String) keyValuePairs[i], keyValuePairs[i + 1]); + } + return event; + } +} diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/projection/TestProjectionProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/projection/TestProjectionProcessor.java index be831d1385..98dbd7c0e1 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/projection/TestProjectionProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/projection/TestProjectionProcessor.java @@ -90,4 +90,25 @@ class TestProjectionProcessor { testExecutor.run(events, outputEvents); } + @Test + public void projectsSingleField() { + var configuration = TestConfiguration + .builder() + .customOutputStrategy(List.of("remove")) + .build(); + + List<Map<String, Object>> events = List.of( + Map.of("timestamp", 1L, "remove", 62.0d, "a", "x"), + Map.of("timestamp", 2L, "remove", 56.0d, "a", "y") + ); + + List<Map<String, Object>> outputEvents = List.of( + Map.of("remove", 62.0d), + Map.of("remove", 56.0d) + ); + + new ProcessingElementTestExecutor(new ProjectionProcessor(), configuration) + .run(events, outputEvents); + } + } diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/schema/MergeBySchemaProcessorTest.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/schema/MergeBySchemaProcessorTest.java new file mode 100644 index 0000000000..6ca9a67adb --- /dev/null +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/schema/MergeBySchemaProcessorTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.filters.jvm.processor.schema; + +import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.model.SpDataStream; +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.test.executors.PrefixStrategy; +import org.apache.streampipes.test.executors.ProcessingElementTestExecutor; +import org.apache.streampipes.test.executors.TestConfiguration; +import org.apache.streampipes.test.generator.EventStreamGenerator; + +import org.junit.jupiter.api.Test; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +class MergeBySchemaProcessorTest { + + @Test + void forwardsEventsWhenSchemasMatch() { + var configuration = TestConfiguration.builder() + .prefixStrategy(PrefixStrategy.ALTERNATE) + .build(); + + var invocationConfig = withInputStreams( + EventStreamGenerator.makeStreamWithProperties(List.of("timestamp", "temperature")), + EventStreamGenerator.makeStreamWithProperties(List.of("timestamp", "temperature")) + ); + + List<Map<String, Object>> inputEvents = List.of( + event("timestamp", 1L, "temperature", 25.5d), + event("timestamp", 2L, "temperature", 26.0d) + ); + + List<Map<String, Object>> expectedEvents = List.of( + event("timestamp", 1L, "temperature", 25.5d), + event("timestamp", 2L, "temperature", 26.0d) + ); + + new ProcessingElementTestExecutor(new MergeBySchemaProcessor(), configuration, invocationConfig) + .run(inputEvents, expectedEvents); + } + + @Test + void rejectsDifferentSchemas() { + var configuration = TestConfiguration.builder() + .prefixStrategy(PrefixStrategy.ALTERNATE) + .build(); + + var invocationConfig = withInputStreams( + EventStreamGenerator.makeStreamWithProperties(List.of("timestamp", "temperature")), + EventStreamGenerator.makeStreamWithProperties(List.of("timestamp", "humidity")) + ); + + List<Map<String, Object>> inputEvents = List.of( + event("timestamp", 1L, "temperature", 25.5d), + event("timestamp", 2L, "humidity", 60.0d) + ); + + assertThrows( + SpRuntimeException.class, + () -> new ProcessingElementTestExecutor(new MergeBySchemaProcessor(), configuration, invocationConfig) + .run(inputEvents, List.of()) + ); + } + + private Consumer<DataProcessorInvocation> withInputStreams(SpDataStream firstStream, SpDataStream secondStream) { + return invocation -> invocation.setInputStreams(List.of(firstStream, secondStream)); + } + + private Map<String, Object> event(Object... keyValuePairs) { + var event = new LinkedHashMap<String, Object>(); + for (int i = 0; i < keyValuePairs.length; i += 2) { + event.put((String) keyValuePairs[i], keyValuePairs[i + 1]); + } + return event; + } +} diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/throughputmon/ThroughputMonitorProcessorTest.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/throughputmon/ThroughputMonitorProcessorTest.java new file mode 100644 index 0000000000..d508c3a76c --- /dev/null +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/throughputmon/ThroughputMonitorProcessorTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.filters.jvm.processor.throughputmon; + +import org.apache.streampipes.test.executors.ProcessingElementTestExecutor; +import org.apache.streampipes.test.executors.TestConfiguration; + +import org.junit.jupiter.api.Test; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +class ThroughputMonitorProcessorTest { + + @Test + void emitsStatisticsAfterConfiguredBatchSize() { + var configuration = TestConfiguration.builder() + .config("batch-window-key", 2) + .build(); + + List<Map<String, Object>> inputEvents = List.of( + event("value", 1), + event("value", 2), + event("value", 3) + ); + + List<Map<String, Object>> expectedEvents = List.of( + event("eventcount", 2) + ); + + new ProcessingElementTestExecutor(new ThroughputMonitorProcessor(), configuration) + .run(inputEvents, expectedEvents); + } + + @Test + void resetsAfterEachBatchWindow() { + var configuration = TestConfiguration.builder() + .config("batch-window-key", 3) + .build(); + + List<Map<String, Object>> inputEvents = List.of( + event("value", 1), + event("value", 2), + event("value", 3), + event("value", 4), + event("value", 5), + event("value", 6) + ); + + List<Map<String, Object>> expectedEvents = List.of( + event("eventcount", 3), + event("eventcount", 3) + ); + + new ProcessingElementTestExecutor(new ThroughputMonitorProcessor(), configuration) + .run(inputEvents, expectedEvents); + } + + private Map<String, Object> event(Object... keyValuePairs) { + var event = new LinkedHashMap<String, Object>(); + for (int i = 0; i < keyValuePairs.length; i += 2) { + event.put((String) keyValuePairs[i], keyValuePairs[i + 1]); + } + return event; + } +} diff --git a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java index 0d7c686364..679fb8bf83 100644 --- a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java +++ b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java @@ -20,8 +20,10 @@ package org.apache.streampipes.test.executors; import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor; import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters; +import org.apache.streampipes.extensions.api.pe.param.InputStreamParams; import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; import org.apache.streampipes.manager.template.DataProcessorTemplateHandler; +import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.model.graph.DataProcessorInvocation; import org.apache.streampipes.model.output.CustomOutputStrategy; import org.apache.streampipes.model.runtime.Event; @@ -45,7 +47,6 @@ import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class ProcessingElementTestExecutor { @@ -98,8 +99,18 @@ public class ProcessingElementTestExecutor { var mockParams = mock(IDataProcessorParameters.class); - when(mockParams.getModel()).thenReturn(dataProcessorInvocation); - when(mockParams.extractor()).thenReturn(extractor); + Mockito.doReturn(dataProcessorInvocation) + .when(mockParams) + .getModel(); + Mockito.doReturn(extractor) + .when(mockParams) + .extractor(); + + if (canProvideInputStreamParams(dataProcessorInvocation)) { + Mockito.doReturn(getInputStreamParams(dataProcessorInvocation)) + .when(mockParams) + .getInputStreamParams(); + } // calls the onPipelineStarted method of the processor to initialize it processor.onPipelineStarted(mockParams, mockCollector, null); @@ -225,4 +236,27 @@ public class ProcessingElementTestExecutor { } + private List<InputStreamParams> getInputStreamParams(DataProcessorInvocation dataProcessorInvocation) { + return IntStream.range(0, dataProcessorInvocation.getInputStreams().size()) + .mapToObj(index -> makeInputStreamParams(index, dataProcessorInvocation.getInputStreams().get(index))) + .toList(); + } + + private boolean canProvideInputStreamParams(DataProcessorInvocation dataProcessorInvocation) { + return !dataProcessorInvocation.getInputStreams() + .isEmpty() + && dataProcessorInvocation.getInputStreams() + .stream() + .noneMatch(this::isMissingGrounding); + } + + private boolean isMissingGrounding(SpDataStream inputStream) { + return inputStream.getEventGrounding() == null || inputStream.getEventGrounding() + .getTransportProtocol() == null; + } + + private InputStreamParams makeInputStreamParams(Integer streamIndex, SpDataStream inputStream) { + return new InputStreamParams(streamIndex, inputStream, List.of()); + } + }
