This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch fix-auto-aggregate in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit d4d28e98a6d26416c646a475e19036184bac9b06 Author: Philipp Zehnder <[email protected]> AuthorDate: Fri Mar 20 22:07:58 2026 +0100 fix: redesign auto aggregation for client limits --- .../dataexplorer/AutoAggregationHandler.java | 207 ++++++++++++++------- .../dataexplorer/QueryResultProvider.java | 3 +- .../dataexplorer/param/model/SelectColumn.java | 4 + .../dataexplorer/AutoAggregationHandlerTest.java | 170 +++++++++++++++++ 4 files changed, 312 insertions(+), 72 deletions(-) diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/AutoAggregationHandler.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/AutoAggregationHandler.java index c0751c7fee..a885b4effd 100644 --- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/AutoAggregationHandler.java +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/AutoAggregationHandler.java @@ -19,7 +19,6 @@ package org.apache.streampipes.dataexplorer; import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement; import org.apache.streampipes.dataexplorer.param.model.SelectColumn; -import org.apache.streampipes.model.datalake.DataLakeQueryOrdering; import org.apache.streampipes.model.datalake.SpQueryResult; import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams; import org.apache.streampipes.model.datalake.param.SupportedRestQueryParams; @@ -37,7 +36,7 @@ import java.util.stream.Collectors; public class AutoAggregationHandler { private static final Logger LOG = LoggerFactory.getLogger(AutoAggregationHandler.class); - private static final double MAX_RETURN_LIMIT = 2000; + private static final int DEFAULT_MAX_RETURN_LIMIT = 2000; private static final String TIMESTAMP_FIELD = "time"; private static final String COMMA = ","; @@ -48,38 +47,45 @@ public class AutoAggregationHandler { private final IDataExplorerQueryManagement dataLakeQueryManagement; private final ProvidedRestQueryParams queryParams; + private final boolean ignoreMissingData; public AutoAggregationHandler(ProvidedRestQueryParams params, - IDataExplorerQueryManagement dataExplorerQueryManagement) { + IDataExplorerQueryManagement dataExplorerQueryManagement, + boolean ignoreMissingData) { this.queryParams = params; this.dataLakeQueryManagement = dataExplorerQueryManagement; + this.ignoreMissingData = ignoreMissingData; } public ProvidedRestQueryParams makeAutoAggregationQueryParams() throws IllegalArgumentException { + ProvidedRestQueryParams rewrittenParams = disableAutoAgg(new ProvidedRestQueryParams(queryParams)); + if (queryHasExplicitTimeAggregation() || !hasColumns()) { + return rewrittenParams; + } + try { - SpQueryResult newest = getSingleRecord(DataLakeQueryOrdering.DESC); - SpQueryResult oldest = getSingleRecord(DataLakeQueryOrdering.ASC); - if (newest.getTotal() > 0) { - String sampleField = getSampleField(newest); - Integer count = getCount(sampleField); - if (count <= MAX_RETURN_LIMIT) { - LOG.debug("Auto-Aggregation disabled as {} results <= max return limit {}", count, MAX_RETURN_LIMIT); - return disableAutoAgg(this.queryParams); - } else { - LOG.debug("Performing auto-aggregation"); - - int aggValue = getAggregationValue(newest, oldest); - LOG.debug("Setting auto-aggregation value to {} ms", aggValue); - queryParams.update(SupportedRestQueryParams.QP_TIME_INTERVAL, aggValue + "ms"); - return disableAutoAgg(queryParams); - } - } else { - return disableAutoAgg(this.queryParams); + QueryBoundary boundary = getQueryBoundary(); + if (!boundary.hasData()) { + return rewrittenParams; } + + if (!usesAggregation()) { + return rewrittenParams; + } + + int maxRows = getMaxReturnLimit(); + int rawRowCount = getRawRowCountUpTo(maxRows + 1); + long interval = rawRowCount <= maxRows + ? 1L + : getAggregationValue(boundary.newest(), boundary.oldest(), maxRows); + + LOG.debug("Auto-aggregation selected time interval {} ms for max {} rows", interval, maxRows); + rewrittenParams.update(SupportedRestQueryParams.QP_TIME_INTERVAL, interval + "ms"); + return rewrittenParams; } catch (ParseException e) { LOG.error("Parsing of timestamp failed during auto aggregation of query parameters: {}", e.getMessage()); + return rewrittenParams; } - return null; } private ProvidedRestQueryParams disableAutoAgg(ProvidedRestQueryParams params) { @@ -87,74 +93,98 @@ public class AutoAggregationHandler { return params; } - public Integer getCount(String fieldName) { - ProvidedRestQueryParams countParams = disableAutoAgg(new ProvidedRestQueryParams(queryParams)); - countParams.remove(SupportedRestQueryParams.QP_TIME_INTERVAL); - countParams.remove(SupportedRestQueryParams.QP_AGGREGATION_FUNCTION); - countParams.update(SupportedRestQueryParams.QP_COUNT_ONLY, true); - countParams.update(SupportedRestQueryParams.QP_COLUMNS, fieldName); + private SpQueryResult fireQuery(ProvidedRestQueryParams params) { + return dataLakeQueryManagement.getData(params, ignoreMissingData); + } - SpQueryResult result = dataLakeQueryManagement.getData(countParams, true); + private long getAggregationValue(SpQueryResult newest, + SpQueryResult oldest, + int maxRows) throws ParseException { + long timeRange = Math.max(0, extractTimestamp(newest) - extractTimestamp(oldest)); + long coveredRange = timeRange + 1; + return Math.max(1L, ceilDiv(coveredRange, maxRows)); + } - return result.getTotal() > 0 ? ( - (Double) result.getAllDataSeries() - .get(0) - .getRows() - .get(0) - .get(1) - ).intValue() : 0; + private QueryBoundary getQueryBoundary() { + ProvidedRestQueryParams boundaryQuery = makeRawHelperQueryParams(); + boundaryQuery.update(SupportedRestQueryParams.QP_LIMIT, 1); + SpQueryResult boundaryResult = fireQuery(boundaryQuery); + if (boundaryResult.getTotal() == 0) { + return QueryBoundary.empty(); + } + + ProvidedRestQueryParams newestQuery = makeRawHelperQueryParams(); + newestQuery.update(SupportedRestQueryParams.QP_LIMIT, 1); + newestQuery.update(SupportedRestQueryParams.QP_ORDER, "DESC"); + return new QueryBoundary(boundaryResult, fireQuery(newestQuery)); } - private SpQueryResult fireQuery(ProvidedRestQueryParams params) { - return dataLakeQueryManagement.getData(params, true); + private int getRawRowCountUpTo(int maxRows) { + ProvidedRestQueryParams sampleQuery = makeRawHelperQueryParams(); + sampleQuery.update(SupportedRestQueryParams.QP_LIMIT, maxRows); + return fireQuery(sampleQuery).getTotal(); } - private int getAggregationValue(SpQueryResult newest, SpQueryResult oldest) throws ParseException { - long timerange = extractTimestamp(newest) - extractTimestamp(oldest); - double v = timerange / MAX_RETURN_LIMIT; - return Double.valueOf(v) - .intValue(); + private List<SelectColumn> getSelectedColumns() { + return Arrays.stream(queryParams.getAsString(SupportedRestQueryParams.QP_COLUMNS).split(COMMA)) + .map(String::trim) + .map(SelectColumn::fromApiQueryString) + .toList(); } - private SpQueryResult getSingleRecord(DataLakeQueryOrdering order) throws ParseException { - ProvidedRestQueryParams singleEvent = disableAutoAgg(new ProvidedRestQueryParams(queryParams)); - singleEvent.remove(SupportedRestQueryParams.QP_AGGREGATION_FUNCTION); - singleEvent.update(SupportedRestQueryParams.QP_LIMIT, 1); - singleEvent.update(SupportedRestQueryParams.QP_ORDER, order.name()); - singleEvent.update(SupportedRestQueryParams.QP_COLUMNS, transformColumns(singleEvent.getAsString( - SupportedRestQueryParams.QP_COLUMNS))); + private boolean usesAggregation() { + if (queryParams.has(SupportedRestQueryParams.QP_AGGREGATION_FUNCTION)) { + return true; + } + + return getSelectedColumns().stream().anyMatch(SelectColumn::isAggregated); + } - return fireQuery(singleEvent); + private boolean hasColumns() { + return queryParams.has(SupportedRestQueryParams.QP_COLUMNS) + && queryParams.getAsString(SupportedRestQueryParams.QP_COLUMNS) != null + && !queryParams.getAsString(SupportedRestQueryParams.QP_COLUMNS).isBlank(); } - private String transformColumns(String rawQuery) { - List<SelectColumn> columns = - Arrays.stream(rawQuery.split(COMMA)) - .map(SelectColumn::fromApiQueryString) - .toList(); - return columns.stream() - .map(SelectColumn::getOriginalField) - .collect(Collectors.joining(COMMA)); + private boolean queryHasExplicitTimeAggregation() { + return queryParams.has(SupportedRestQueryParams.QP_TIME_INTERVAL); } - private String getSampleField(SpQueryResult result) { - for (String column : result.getHeaders()) { - if (!column.equals(TIMESTAMP_FIELD)) { - return column; - } + private int getMaxReturnLimit() { + Integer providedLimit = queryParams.getAsInt(SupportedRestQueryParams.QP_MAXIMUM_AMOUNT_OF_EVENTS); + if (providedLimit == null || providedLimit <= 0) { + return DEFAULT_MAX_RETURN_LIMIT; } - throw new IllegalArgumentException("No columns present"); + return providedLimit; + } + + private String transformColumnsToRawSelection(String rawQuery) { + return Arrays.stream(rawQuery.split(COMMA)) + .map(String::trim) + .map(SelectColumn::fromApiQueryString) + .map(SelectColumn::getOriginalField) + .collect(Collectors.joining(COMMA)); } private long extractTimestamp(SpQueryResult result) throws ParseException { int timestampIndex = result.getHeaders() .indexOf(TIMESTAMP_FIELD); - return tryParseDate(result.getAllDataSeries() - .get(0) - .getRows() - .get(0) - .get(timestampIndex) - .toString()).getTime(); + Object timestampValue = result.getAllDataSeries() + .get(0) + .getRows() + .get(0) + .get(timestampIndex); + + if (timestampValue instanceof Number timestampNumber) { + return timestampNumber.longValue(); + } + + String timestampString = timestampValue.toString(); + try { + return Long.parseLong(timestampString); + } catch (NumberFormatException e) { + return tryParseDate(timestampString).getTime(); + } } private Date tryParseDate(String v) throws ParseException { @@ -164,4 +194,39 @@ public class AutoAggregationHandler { return isoFormatOnlySeconds.parse(v); } } + + private ProvidedRestQueryParams makeBaseHelperQueryParams() { + ProvidedRestQueryParams helperParams = disableAutoAgg(new ProvidedRestQueryParams(queryParams)); + helperParams.remove(SupportedRestQueryParams.QP_TIME_INTERVAL); + helperParams.remove(SupportedRestQueryParams.QP_AGGREGATION_FUNCTION); + helperParams.remove(SupportedRestQueryParams.QP_GROUP_BY); + helperParams.remove(SupportedRestQueryParams.QP_LIMIT); + helperParams.remove(SupportedRestQueryParams.QP_OFFSET); + helperParams.remove(SupportedRestQueryParams.QP_PAGE); + helperParams.remove(SupportedRestQueryParams.QP_ORDER); + helperParams.remove(SupportedRestQueryParams.QP_COUNT_ONLY); + helperParams.remove(SupportedRestQueryParams.QP_MAXIMUM_AMOUNT_OF_EVENTS); + return helperParams; + } + + private ProvidedRestQueryParams makeRawHelperQueryParams() { + ProvidedRestQueryParams helperParams = makeBaseHelperQueryParams(); + helperParams.update(SupportedRestQueryParams.QP_COLUMNS, + transformColumnsToRawSelection(helperParams.getAsString(SupportedRestQueryParams.QP_COLUMNS))); + return helperParams; + } + + private long ceilDiv(long dividend, long divisor) { + return (dividend + divisor - 1) / divisor; + } + + private record QueryBoundary(SpQueryResult oldest, SpQueryResult newest) { + private static QueryBoundary empty() { + return new QueryBoundary(new SpQueryResult(), new SpQueryResult()); + } + + private boolean hasData() { + return oldest.getTotal() > 0 && newest.getTotal() > 0; + } + } } diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/QueryResultProvider.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/QueryResultProvider.java index a05fb871cd..945043c360 100644 --- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/QueryResultProvider.java +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/QueryResultProvider.java @@ -54,7 +54,8 @@ public class QueryResultProvider { public SpQueryResult getData() { if (queryParams.has(SupportedRestQueryParams.QP_AUTO_AGGREGATE)) { queryParams = new AutoAggregationHandler(queryParams, - dataExplorerQueryManagement).makeAutoAggregationQueryParams(); + dataExplorerQueryManagement, + ignoreMissingData).makeAutoAggregationQueryParams(); } SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(queryParams); diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectColumn.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectColumn.java index 3e4fd1048c..c0fa517bbd 100644 --- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectColumn.java +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectColumn.java @@ -81,6 +81,10 @@ public class SelectColumn implements IQueryStatement { return originalField; } + public boolean isAggregated() { + return !simpleField; + } + public void setAggregationFunction(AggregationFunction aggregationFunction) { this.aggregationFunction = aggregationFunction; } diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/AutoAggregationHandlerTest.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/AutoAggregationHandlerTest.java new file mode 100644 index 0000000000..1ff74310d4 --- /dev/null +++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/AutoAggregationHandlerTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.dataexplorer; + +import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement; +import org.apache.streampipes.model.datalake.DataSeries; +import org.apache.streampipes.model.datalake.SpQueryResult; +import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams; +import org.apache.streampipes.model.datalake.param.SupportedRestQueryParams; + +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.util.HashMap; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class AutoAggregationHandlerTest { + + @Test + public void testAutoAggregationAddsOneMillisecondBucketsWhenRawRowsFitClientLimit() { + IDataExplorerQueryManagement queryManagement = mock(IDataExplorerQueryManagement.class); + ArgumentCaptor<ProvidedRestQueryParams> queryCaptor = ArgumentCaptor.forClass(ProvidedRestQueryParams.class); + + when(queryManagement.getData(any(ProvidedRestQueryParams.class), eq(false))) + .thenReturn(makeSingleEventResult(1000L)) + .thenReturn(makeSingleEventResult(3001L)) + .thenReturn(makeRowsResult(3)); + + ProvidedRestQueryParams result = new AutoAggregationHandler(makeAggregateParams(), queryManagement, false) + .makeAutoAggregationQueryParams(); + + verify(queryManagement, times(3)).getData(queryCaptor.capture(), eq(false)); + + List<ProvidedRestQueryParams> capturedQueries = queryCaptor.getAllValues(); + ProvidedRestQueryParams oldestQuery = capturedQueries.get(0); + assertEquals("1", oldestQuery.getAsString(SupportedRestQueryParams.QP_LIMIT)); + assertNull(oldestQuery.getAsString(SupportedRestQueryParams.QP_ORDER)); + assertEquals("density,mass_flow,sensor_fault_flags", + oldestQuery.getAsString(SupportedRestQueryParams.QP_COLUMNS)); + assertFalse(oldestQuery.has(SupportedRestQueryParams.QP_AUTO_AGGREGATE)); + assertFalse(oldestQuery.has(SupportedRestQueryParams.QP_GROUP_BY)); + assertFalse(oldestQuery.has(SupportedRestQueryParams.QP_TIME_INTERVAL)); + assertFalse(oldestQuery.has(SupportedRestQueryParams.QP_PAGE)); + assertFalse(oldestQuery.has(SupportedRestQueryParams.QP_OFFSET)); + + ProvidedRestQueryParams newestQuery = capturedQueries.get(1); + assertEquals("DESC", newestQuery.getAsString(SupportedRestQueryParams.QP_ORDER)); + + ProvidedRestQueryParams sampleQuery = capturedQueries.get(2); + assertEquals("10001", sampleQuery.getAsString(SupportedRestQueryParams.QP_LIMIT)); + + assertFalse(result.has(SupportedRestQueryParams.QP_AUTO_AGGREGATE)); + assertEquals("1ms", result.getAsString(SupportedRestQueryParams.QP_TIME_INTERVAL)); + } + + @Test + public void testAutoAggregationComputesSmallestBoundedBucketSizeAboveLimit() { + IDataExplorerQueryManagement queryManagement = mock(IDataExplorerQueryManagement.class); + ArgumentCaptor<ProvidedRestQueryParams> queryCaptor = ArgumentCaptor.forClass(ProvidedRestQueryParams.class); + + when(queryManagement.getData(any(ProvidedRestQueryParams.class), eq(true))) + .thenReturn(makeSingleEventResult(1_000L)) + .thenReturn(makeSingleEventResult(10_000L)) + .thenReturn(makeRowsResult(6)); + + ProvidedRestQueryParams result = new AutoAggregationHandler(makeAggregateParamsWithClientLimit(5), + queryManagement, + true).makeAutoAggregationQueryParams(); + + verify(queryManagement, times(3)).getData(queryCaptor.capture(), eq(true)); + assertEquals("6", queryCaptor.getAllValues().get(2).getAsString(SupportedRestQueryParams.QP_LIMIT)); + assertEquals("1801ms", result.getAsString(SupportedRestQueryParams.QP_TIME_INTERVAL)); + assertFalse(result.has(SupportedRestQueryParams.QP_AUTO_AGGREGATE)); + } + + @Test + public void testAutoAggregationLeavesSimpleColumnQueryUnchanged() { + IDataExplorerQueryManagement queryManagement = mock(IDataExplorerQueryManagement.class); + + when(queryManagement.getData(any(ProvidedRestQueryParams.class), eq(true))) + .thenReturn(makeSingleEventResult(1_000L)) + .thenReturn(makeSingleEventResult(10_000L)); + + ProvidedRestQueryParams result = new AutoAggregationHandler(makeSimpleParams(), queryManagement, true) + .makeAutoAggregationQueryParams(); + + assertFalse(result.has(SupportedRestQueryParams.QP_AUTO_AGGREGATE)); + assertFalse(result.has(SupportedRestQueryParams.QP_TIME_INTERVAL)); + verify(queryManagement, times(2)).getData(any(ProvidedRestQueryParams.class), eq(true)); + } + + private ProvidedRestQueryParams makeAggregateParams() { + return makeAggregateParamsWithClientLimit(10_000); + } + + private ProvidedRestQueryParams makeAggregateParamsWithClientLimit(int maxEvents) { + var params = new HashMap<String, String>(); + params.put(SupportedRestQueryParams.QP_START_DATE, "1000"); + params.put(SupportedRestQueryParams.QP_END_DATE, "10000"); + params.put(SupportedRestQueryParams.QP_COLUMNS, + "[density;MEAN;mean_density],[mass_flow;MEAN;mean_mass_flow],[sensor_fault_flags;MODE;mode_sensor_fault_flags]"); + params.put(SupportedRestQueryParams.QP_AUTO_AGGREGATE, "true"); + params.put(SupportedRestQueryParams.QP_GROUP_BY, "plant"); + params.put(SupportedRestQueryParams.QP_LIMIT, "50"); + params.put(SupportedRestQueryParams.QP_PAGE, "2"); + params.put(SupportedRestQueryParams.QP_OFFSET, "100"); + params.put(SupportedRestQueryParams.QP_ORDER, "DESC"); + params.put(SupportedRestQueryParams.QP_MAXIMUM_AMOUNT_OF_EVENTS, String.valueOf(maxEvents)); + return new ProvidedRestQueryParams("Test", params); + } + + private ProvidedRestQueryParams makeSimpleParams() { + var params = new HashMap<String, String>(); + params.put(SupportedRestQueryParams.QP_START_DATE, "1000"); + params.put(SupportedRestQueryParams.QP_END_DATE, "10000"); + params.put(SupportedRestQueryParams.QP_COLUMNS, "density,mass_flow"); + params.put(SupportedRestQueryParams.QP_AUTO_AGGREGATE, "true"); + return new ProvidedRestQueryParams("Test", params); + } + + private SpQueryResult makeSingleEventResult(long timestamp) { + SpQueryResult result = new SpQueryResult(); + result.setHeaders(List.of("time", "density")); + result.addDataResult(new DataSeries(1, + List.of(List.of(timestamp, 42.0)), + List.of("time", "density"), + null)); + result.setTotal(1); + return result; + } + + private SpQueryResult makeRowsResult(int rows) { + SpQueryResult result = new SpQueryResult(); + result.setHeaders(List.of("time", "density")); + result.addDataResult(new DataSeries(rows, + java.util.stream.IntStream.range(0, rows) + .mapToObj(index -> List.of((Object) (long) index, 42.0)) + .toList(), + List.of("time", "density"), + null)); + result.setTotal(rows); + return result; + } +}
