heisenbergs-uncertainty opened a new pull request, #3669:
URL: https://github.com/apache/streampipes/pull/3669
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<!--
Thanks for contributing! Here are some tips you can follow to help us
incorporate your contribution quickly and easily:
1. If this is your first time, please read our contributor guidelines:
- https://streampipes.apache.org/community/get-involved/
- https://cwiki.apache.org/confluence/display/STREAMPIPES/Getting+Started
2. Make sure the PR title is formatted like: `[#<GitHub issue id>] PR title
...`
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
`[WIP][#<GitHub issue id>] PR title ...`.
4. Please write your PR title to summarize what this PR proposes/fixes.
5. Link the PR to the corresponding GitHub issue (if present) in the
`Development` section in the right menu bar.
6. Be sure to keep the PR description updated to reflect all changes.
7. If possible, provide a concise example to reproduce the issue for a
faster review.
8. Make sure tests pass via `mvn clean install`.
9. (Optional) If the contribution is large, please file an Apache ICLA
- http://apache.org/licenses/icla.pdf
-->
### Purpose
<!--
Please clarify what changes you are proposing and describe how those changes
will address the issue.
Furthermore, describe potential consequences the changes might have.
-->
The purpose of this processor is to handle use-cases where you need to
execute some kind of logic when a condition is met for some amount of time.
My specific use case for this operator stemmed for my teams OEE automation
efforts. We are tracking downtime on CNC machines but want to get reason codes
from our operators that tell us "why" the machine is down. The problem in CNC
downtime is that the machine is often instructed to intentionally drop into a
"not running" state for short bursts. In order to not spam our operators when
machines drop out of running state for short periods we wanted to only send
notifications after x amount of time which would indicate that the machine is
down for some unknown reason. This is what led to this processor.
The processor is decently configurable and should address many use cases
outside of our own.
I have also noticed that Streampipes is currently generally lacking in
processors that operate on time as a condition which I think is a generally
useful use-case. With some more effort and refactoring, the logic implemented
in the processor could possibly serve as a based for processors that depend on
the timestamp to determine their state.
### Logic of Processor

### Examples
| Configuration | Results |
:-------------------------:|:-------------------------:
||
||
|
### Remarks
<!--
Is there anything left we need to pay attention on?
Are there some references that might be important? E.g. links to Confluence,
or discussions
on the mailing list or GitHub.
-->
I would be interested in some feedback on my use of the
ScheduledExecutorService and whether this has the potential to introduce any
strange behaviour in the pipeline do to it synthetically generating events.
I also am again unsure how to unit test this in the repository. I was able
to introduce unit tests in my own custom processor repo but that is running
against jdk v23 which allows mocking the ScheduledExecutorService which 17 does
not.
It also introduces a new dependency from mockito-junit-jupiter for the
MockitoExtension.class which is another reason why I didn't want to include the
unit testing into this original PR until further review.
Below are the tests that I are used in my custom component.
```java
package com.spmoilandgas.streampipes.conditionduration;
import
org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.EventFactory;
import org.apache.streampipes.model.runtime.SchemaInfo;
import org.apache.streampipes.model.runtime.SourceInfo;
import org.apache.streampipes.model.schema.EventSchema;
import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
public class ConditionalTimeFilterProcessorTest {
// region Constants
private static final String FILTER_FIELD_ID = "filterField";
private static final String TIMESTAMP_FIELD_ID = "timestampField";
private static final String DURATION_ID = "duration";
private static final String DURATION_UNIT_ID = "durationUnit";
private static final String CONDITION_VALUE_ID = "conditionValue";
private static final String OUTPUT_MODE_ID = "outputMode";
private static final String TRIGGER_MODE_ID = "triggerMode";
private static final String TRIGGER_MODE_ON_EVENT = "On Event Arrival";
private static final String TRIGGER_MODE_ON_TIMER = "On Timer";
private static final String ORIGINAL_TIMESTAMP_FIELD = "originalTimestamp";
private static final String PROCESSING_TIMESTAMP_FIELD =
"processingTimestamp";
private static final String TIME_DIFFERENCE_FIELD = "timeDifference";
// endregion
@Mock
private SpOutputCollector collector;
@Mock
private ScheduledExecutorService executorService;
@Mock
private ScheduledFuture<?> scheduledFuture;
// ###################################
// ## Tests for "On Event Arrival" ##
// ###################################
@Test
public void onEventArrival_fireOnceOnTrue() {
var processor = new ConditionalTimeFilterProcessor();
var eventCaptor = ArgumentCaptor.forClass(Event.class);
configureProcessor(processor, 2, "Seconds", "Fire Once",
TRIGGER_MODE_ON_EVENT, true);
processor.onEvent(makeEvent(true, 10000, "id-1"), collector);
processor.onEvent(makeEvent(true, 11000, "id-2"), collector);
processor.onEvent(makeEvent(true, 12000, "id-3"), collector);
processor.onEvent(makeEvent(true, 13000, "id-4"), collector);
verify(collector, times(1)).collect(eventCaptor.capture());
assertEventIsEnrichedCorrectly(eventCaptor.getValue(), 10000L, 12000L,
"id-3", true);
}
@Test
public void onEventArrival_fireOnceOnFalse() {
var processor = new ConditionalTimeFilterProcessor();
var eventCaptor = ArgumentCaptor.forClass(Event.class);
configureProcessor(processor, 2, "Seconds", "Fire Once",
TRIGGER_MODE_ON_EVENT, false);
processor.onEvent(makeEvent(false, 10000, "id-1"), collector);
processor.onEvent(makeEvent(true, 11000, "id-2"), collector);
processor.onEvent(makeEvent(false, 12000, "id-3"), collector);
processor.onEvent(makeEvent(false, 14000, "id-4"), collector);
verify(collector, times(1)).collect(eventCaptor.capture());
assertEventIsEnrichedCorrectly(eventCaptor.getValue(), 12000L, 14000L,
"id-4", false);
}
@Test
public void onEventArrival_fireOnceThenResetAndFireAgain() {
var processor = new ConditionalTimeFilterProcessor();
var eventCaptor = ArgumentCaptor.forClass(Event.class);
configureProcessor(processor, 1, "Seconds", "Fire Once",
TRIGGER_MODE_ON_EVENT, true);
// First sequence
processor.onEvent(makeEvent(true, 10000, "id-1"), collector);
processor.onEvent(makeEvent(true, 11000, "id-2"), collector); // Fires
verify(collector, times(1)).collect(any(Event.class));
// This event should be ignored
processor.onEvent(makeEvent(true, 12000, "id-3"), collector);
verify(collector, times(1)).collect(any(Event.class));
// Reset sequence
processor.onEvent(makeEvent(false, 13000, "id-4"), collector);
// Second sequence
processor.onEvent(makeEvent(true, 14000, "id-5"), collector); // New
timer starts
processor.onEvent(makeEvent(true, 15000, "id-6"), collector); // Fires
again
verify(collector, times(2)).collect(eventCaptor.capture());
assertEventIsEnrichedCorrectly(eventCaptor.getAllValues().get(1),
14000L, 15000L, "id-6", true);
}
@Test
public void onEventArrival_fireContinuouslyAfterDelay() {
var processor = new ConditionalTimeFilterProcessor();
configureProcessor(processor, 10, "Seconds", "Fire Continuously",
TRIGGER_MODE_ON_EVENT, true);
// Timer starts, but no event is fired
processor.onEvent(makeEvent(true, 0, "id-1"), collector);
verify(collector, never()).collect(any());
// Still not enough time
processor.onEvent(makeEvent(true, 9000, "id-2"), collector);
verify(collector, never()).collect(any());
// Duration met, first event is fired
processor.onEvent(makeEvent(true, 10000, "id-3"), collector);
verify(collector, times(1)).collect(any(Event.class));
// Subsequent true events should be fired immediately
processor.onEvent(makeEvent(true, 11000, "id-4"), collector);
verify(collector, times(2)).collect(any(Event.class));
processor.onEvent(makeEvent(true, 12000, "id-5"), collector);
verify(collector, times(3)).collect(any(Event.class));
}
// ###########################
// ## Tests for "On Timer" ##
// ###########################
@Test
public void onTimer_fireWithoutSubsequentEvents() {
var processor = new ConditionalTimeFilterProcessor();
var taskCaptor = ArgumentCaptor.forClass(Runnable.class);
var eventCaptor = ArgumentCaptor.forClass(Event.class);
configureProcessor(processor, 1, "Seconds", "Fire Once",
TRIGGER_MODE_ON_TIMER, true);
processor.executorService = this.executorService;
processor.onEvent(makeEvent(true, 10000, "id-1"), collector);
verify(executorService).schedule(taskCaptor.capture(), eq(1000L),
eq(TimeUnit.MILLISECONDS));
taskCaptor.getValue().run();
verify(collector, times(1)).collect(eventCaptor.capture());
assertEventIsEnrichedCorrectly(eventCaptor.getValue(), 10000L, "id-1",
true);
}
@Test
public void onTimer_fireOnceThenResetAndFireAgain() {
var processor = new ConditionalTimeFilterProcessor();
var taskCaptor = ArgumentCaptor.forClass(Runnable.class);
configureProcessor(processor, 1, "Seconds", "Fire Once",
TRIGGER_MODE_ON_TIMER, true);
processor.executorService = this.executorService;
doReturn(scheduledFuture).when(executorService).schedule(any(Runnable.class),
anyLong(), any(TimeUnit.class));
// --- First Sequence ---
processor.onEvent(makeEvent(true, 10000, "id-1"), collector);
verify(executorService, times(1)).schedule(taskCaptor.capture(),
eq(1000L), eq(TimeUnit.MILLISECONDS));
taskCaptor.getValue().run();
verify(collector, times(1)).collect(any(Event.class));
// --- Reset ---
processor.onEvent(makeEvent(false, 11000, "id-2"), collector);
verify(scheduledFuture, times(1)).cancel(true);
// --- Second Sequence ---
processor.onEvent(makeEvent(true, 12000, "id-3"), collector);
verify(executorService, times(2)).schedule(taskCaptor.capture(),
eq(1000L), eq(TimeUnit.MILLISECONDS));
taskCaptor.getValue().run();
verify(collector, times(2)).collect(any(Event.class));
}
// region Helper methods
private void configureProcessor(ConditionalTimeFilterProcessor processor,
int duration,
String timeUnit,
String outputMode,
String triggerMode,
boolean conditionToMeet) {
DataProcessorInvocation graph = mock(DataProcessorInvocation.class);
IDataProcessorParameters params = mock(IDataProcessorParameters.class);
ProcessingElementParameterExtractor extractor =
mock(ProcessingElementParameterExtractor.class);
when(params.extractor()).thenReturn(extractor);
when(extractor.mappingPropertyValue(FILTER_FIELD_ID)).thenReturn("s0::condition");
when(extractor.mappingPropertyValue(TIMESTAMP_FIELD_ID)).thenReturn("s0::timestamp");
when(extractor.singleValueParameter(DURATION_ID,
Integer.class)).thenReturn(duration);
when(extractor.selectedSingleValue(DURATION_UNIT_ID,
String.class)).thenReturn(timeUnit);
when(extractor.selectedSingleValue(OUTPUT_MODE_ID,
String.class)).thenReturn(outputMode);
when(extractor.selectedSingleValue(TRIGGER_MODE_ID,
String.class)).thenReturn(triggerMode);
when(extractor.slideToggleValue(CONDITION_VALUE_ID)).thenReturn(conditionToMeet);
processor.onPipelineStarted(params, collector, null);
}
private Event makeEvent(boolean condition, long timestamp, String eventId)
{
Map<String, Object> map = new HashMap<>();
map.put("condition", condition);
map.put("timestamp", timestamp);
map.put("eventId", eventId);
return EventFactory.fromMap(map,
new SourceInfo("test-topic", "s0"),
new SchemaInfo(new EventSchema(), new ArrayList<>()));
}
private void assertEventIsEnrichedCorrectly(Event event, long
expectedOriginalTs, long expectedProcessingTs, String expectedOriginalEventId,
boolean expectedCondition) {
assertNotNull(event, "Event should not be null");
assertEquals(expectedOriginalEventId,
event.getFieldByRuntimeName("eventId").getAsPrimitive().getAsString());
assertEquals(expectedCondition,
event.getFieldByRuntimeName("condition").getAsPrimitive().getAsBoolean());
assertEquals(expectedOriginalTs,
event.getFieldByRuntimeName(ORIGINAL_TIMESTAMP_FIELD).getAsPrimitive().getAsLong());
assertEquals(expectedProcessingTs,
event.getFieldByRuntimeName(PROCESSING_TIMESTAMP_FIELD).getAsPrimitive().getAsLong());
assertEquals(expectedProcessingTs - expectedOriginalTs,
event.getFieldByRuntimeName(TIME_DIFFERENCE_FIELD).getAsPrimitive().getAsLong());
}
private void assertEventIsEnrichedCorrectly(Event event, long
expectedOriginalTs, String expectedOriginalEventId, boolean expectedCondition) {
assertNotNull(event, "Event should not be null");
assertEquals(expectedOriginalEventId,
event.getFieldByRuntimeName("eventId").getAsPrimitive().getAsString());
assertEquals(expectedCondition,
event.getFieldByRuntimeName("condition").getAsPrimitive().getAsBoolean());
assertEquals(expectedOriginalTs,
event.getFieldByRuntimeName(ORIGINAL_TIMESTAMP_FIELD).getAsPrimitive().getAsLong());
assertTrue(event.getFieldByRuntimeName(PROCESSING_TIMESTAMP_FIELD).getAsPrimitive().getAsLong()
>= expectedOriginalTs);
assertTrue(event.getFieldByRuntimeName(TIME_DIFFERENCE_FIELD).getAsPrimitive().getAsLong()
>= 0);
}
// endregion
}
```
PR introduces (a) breaking change(s): no
PR introduces (a) deprecation(s): no
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]