This is an automated email from the ASF dual-hosted git repository. oehler pushed a commit to branch improve-streampipes-function-monitoring in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 7c19394d7c2f624acab967758015770597594dbf Author: Sven Oehler <[email protected]> AuthorDate: Fri Mar 20 13:29:37 2026 +0100 Improve StreamPipesFunction monitoring --- .../standalone/function/StreamPipesFunction.java | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java index fadf1fdff7..5878d25087 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java @@ -104,9 +104,13 @@ public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclare public void discardRuntime() { var functionId = this.getFunctionConfig().getFunctionId(); LOG.info("Discarding function {}:{}", functionId.getId(), functionId.getVersion()); - onServiceStopped(); - unregisterConsumers(); this.outputCollectors.forEach((key, value) -> value.disconnect()); + unregisterConsumers(); + try { + onServiceStopped(); + } catch (Exception e) { + throw new SpRuntimeException("Custom stop behaviour failed with: " + e); + } } @Override @@ -116,7 +120,7 @@ public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclare var event = EventFactory .fromMap(rawEvent, sourceInfo, schemaInfoMapper.get(topicName)); this.onEvent(event, sourceInfo.getSourceId()); - increaseCounter(sourceInfo.getSourceId(), size); + increaseInCounter(sourceInfo.getSourceId(), size); } catch (RuntimeException e) { addError(e); } @@ -126,7 +130,7 @@ public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclare return stream.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName(); } - private void increaseCounter(String sourceInfo, long size) { + protected void increaseInCounter(String sourceInfo, long size) { var functionId = this.getFunctionConfig().getFunctionId(); SpMonitoringManager.INSTANCE.increaseInCounter( functionId.getId(), @@ -136,7 +140,15 @@ public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclare ); } - private void addError(RuntimeException e) { + protected void increaseOutCounter(long size) { + var functionId = this.getFunctionConfig().getFunctionId(); + SpMonitoringManager.INSTANCE.increaseOutCounter( + functionId.getId(), + size, + System.currentTimeMillis()); + } + + protected void addError(RuntimeException e) { var functionId = this.getFunctionConfig().getFunctionId(); SpMonitoringManager.INSTANCE.addErrorMessage( functionId.getId(),
