This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 6b2869385d fix: redesign auto aggregation for client limits (#4274)
6b2869385d is described below
commit 6b2869385d5426309467503425fd8ec3c4d88e07
Author: Philipp Zehnder <[email protected]>
AuthorDate: Mon Mar 23 16:26:10 2026 +0100
fix: redesign auto aggregation for client limits (#4274)
---
.../dataexplorer/AutoAggregationHandler.java | 207 ++++++++++++++-------
.../dataexplorer/QueryResultProvider.java | 3 +-
.../dataexplorer/param/model/SelectColumn.java | 4 +
.../dataexplorer/AutoAggregationHandlerTest.java | 170 +++++++++++++++++
ui/cypress/support/utils/chart/ChartBtns.ts | 16 ++
ui/cypress/support/utils/chart/ChartUtils.ts | 18 ++
.../support/utils/chart/ChartWidgetTableUtils.ts | 10 +
.../support/utils/dataset/DataLakeSeedUtils.ts | 52 ++++--
ui/cypress/tests/chart/autoAggregateTable.spec.ts | 100 ++++++++++
.../lib/query/data-view-query-generator.service.ts | 6 +-
.../charts/table/table-widget.component.html | 1 +
.../chart-data-settings.component.html | 13 +-
12 files changed, 504 insertions(+), 96 deletions(-)
diff --git
a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/AutoAggregationHandler.java
b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/AutoAggregationHandler.java
index c0751c7fee..a885b4effd 100644
---
a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/AutoAggregationHandler.java
+++
b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/AutoAggregationHandler.java
@@ -19,7 +19,6 @@ package org.apache.streampipes.dataexplorer;
import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
import org.apache.streampipes.dataexplorer.param.model.SelectColumn;
-import org.apache.streampipes.model.datalake.DataLakeQueryOrdering;
import org.apache.streampipes.model.datalake.SpQueryResult;
import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams;
import org.apache.streampipes.model.datalake.param.SupportedRestQueryParams;
@@ -37,7 +36,7 @@ import java.util.stream.Collectors;
public class AutoAggregationHandler {
private static final Logger LOG =
LoggerFactory.getLogger(AutoAggregationHandler.class);
- private static final double MAX_RETURN_LIMIT = 2000;
+ private static final int DEFAULT_MAX_RETURN_LIMIT = 2000;
private static final String TIMESTAMP_FIELD = "time";
private static final String COMMA = ",";
@@ -48,38 +47,45 @@ public class AutoAggregationHandler {
private final IDataExplorerQueryManagement dataLakeQueryManagement;
private final ProvidedRestQueryParams queryParams;
+ private final boolean ignoreMissingData;
public AutoAggregationHandler(ProvidedRestQueryParams params,
- IDataExplorerQueryManagement
dataExplorerQueryManagement) {
+ IDataExplorerQueryManagement
dataExplorerQueryManagement,
+ boolean ignoreMissingData) {
this.queryParams = params;
this.dataLakeQueryManagement = dataExplorerQueryManagement;
+ this.ignoreMissingData = ignoreMissingData;
}
public ProvidedRestQueryParams makeAutoAggregationQueryParams() throws
IllegalArgumentException {
+ ProvidedRestQueryParams rewrittenParams = disableAutoAgg(new
ProvidedRestQueryParams(queryParams));
+ if (queryHasExplicitTimeAggregation() || !hasColumns()) {
+ return rewrittenParams;
+ }
+
try {
- SpQueryResult newest = getSingleRecord(DataLakeQueryOrdering.DESC);
- SpQueryResult oldest = getSingleRecord(DataLakeQueryOrdering.ASC);
- if (newest.getTotal() > 0) {
- String sampleField = getSampleField(newest);
- Integer count = getCount(sampleField);
- if (count <= MAX_RETURN_LIMIT) {
- LOG.debug("Auto-Aggregation disabled as {} results <= max return
limit {}", count, MAX_RETURN_LIMIT);
- return disableAutoAgg(this.queryParams);
- } else {
- LOG.debug("Performing auto-aggregation");
-
- int aggValue = getAggregationValue(newest, oldest);
- LOG.debug("Setting auto-aggregation value to {} ms", aggValue);
- queryParams.update(SupportedRestQueryParams.QP_TIME_INTERVAL,
aggValue + "ms");
- return disableAutoAgg(queryParams);
- }
- } else {
- return disableAutoAgg(this.queryParams);
+ QueryBoundary boundary = getQueryBoundary();
+ if (!boundary.hasData()) {
+ return rewrittenParams;
}
+
+ if (!usesAggregation()) {
+ return rewrittenParams;
+ }
+
+ int maxRows = getMaxReturnLimit();
+ int rawRowCount = getRawRowCountUpTo(maxRows + 1);
+ long interval = rawRowCount <= maxRows
+ ? 1L
+ : getAggregationValue(boundary.newest(), boundary.oldest(), maxRows);
+
+ LOG.debug("Auto-aggregation selected time interval {} ms for max {}
rows", interval, maxRows);
+ rewrittenParams.update(SupportedRestQueryParams.QP_TIME_INTERVAL,
interval + "ms");
+ return rewrittenParams;
} catch (ParseException e) {
LOG.error("Parsing of timestamp failed during auto aggregation of query
parameters: {}", e.getMessage());
+ return rewrittenParams;
}
- return null;
}
private ProvidedRestQueryParams disableAutoAgg(ProvidedRestQueryParams
params) {
@@ -87,74 +93,98 @@ public class AutoAggregationHandler {
return params;
}
- public Integer getCount(String fieldName) {
- ProvidedRestQueryParams countParams = disableAutoAgg(new
ProvidedRestQueryParams(queryParams));
- countParams.remove(SupportedRestQueryParams.QP_TIME_INTERVAL);
- countParams.remove(SupportedRestQueryParams.QP_AGGREGATION_FUNCTION);
- countParams.update(SupportedRestQueryParams.QP_COUNT_ONLY, true);
- countParams.update(SupportedRestQueryParams.QP_COLUMNS, fieldName);
+ private SpQueryResult fireQuery(ProvidedRestQueryParams params) {
+ return dataLakeQueryManagement.getData(params, ignoreMissingData);
+ }
- SpQueryResult result = dataLakeQueryManagement.getData(countParams, true);
+ private long getAggregationValue(SpQueryResult newest,
+ SpQueryResult oldest,
+ int maxRows) throws ParseException {
+ long timeRange = Math.max(0, extractTimestamp(newest) -
extractTimestamp(oldest));
+ long coveredRange = timeRange + 1;
+ return Math.max(1L, ceilDiv(coveredRange, maxRows));
+ }
- return result.getTotal() > 0 ? (
- (Double) result.getAllDataSeries()
- .get(0)
- .getRows()
- .get(0)
- .get(1)
- ).intValue() : 0;
+ private QueryBoundary getQueryBoundary() {
+ ProvidedRestQueryParams boundaryQuery = makeRawHelperQueryParams();
+ boundaryQuery.update(SupportedRestQueryParams.QP_LIMIT, 1);
+ SpQueryResult boundaryResult = fireQuery(boundaryQuery);
+ if (boundaryResult.getTotal() == 0) {
+ return QueryBoundary.empty();
+ }
+
+ ProvidedRestQueryParams newestQuery = makeRawHelperQueryParams();
+ newestQuery.update(SupportedRestQueryParams.QP_LIMIT, 1);
+ newestQuery.update(SupportedRestQueryParams.QP_ORDER, "DESC");
+ return new QueryBoundary(boundaryResult, fireQuery(newestQuery));
}
- private SpQueryResult fireQuery(ProvidedRestQueryParams params) {
- return dataLakeQueryManagement.getData(params, true);
+ private int getRawRowCountUpTo(int maxRows) {
+ ProvidedRestQueryParams sampleQuery = makeRawHelperQueryParams();
+ sampleQuery.update(SupportedRestQueryParams.QP_LIMIT, maxRows);
+ return fireQuery(sampleQuery).getTotal();
}
- private int getAggregationValue(SpQueryResult newest, SpQueryResult oldest)
throws ParseException {
- long timerange = extractTimestamp(newest) - extractTimestamp(oldest);
- double v = timerange / MAX_RETURN_LIMIT;
- return Double.valueOf(v)
- .intValue();
+ private List<SelectColumn> getSelectedColumns() {
+ return
Arrays.stream(queryParams.getAsString(SupportedRestQueryParams.QP_COLUMNS).split(COMMA))
+ .map(String::trim)
+ .map(SelectColumn::fromApiQueryString)
+ .toList();
}
- private SpQueryResult getSingleRecord(DataLakeQueryOrdering order) throws
ParseException {
- ProvidedRestQueryParams singleEvent = disableAutoAgg(new
ProvidedRestQueryParams(queryParams));
- singleEvent.remove(SupportedRestQueryParams.QP_AGGREGATION_FUNCTION);
- singleEvent.update(SupportedRestQueryParams.QP_LIMIT, 1);
- singleEvent.update(SupportedRestQueryParams.QP_ORDER, order.name());
- singleEvent.update(SupportedRestQueryParams.QP_COLUMNS,
transformColumns(singleEvent.getAsString(
- SupportedRestQueryParams.QP_COLUMNS)));
+ private boolean usesAggregation() {
+ if (queryParams.has(SupportedRestQueryParams.QP_AGGREGATION_FUNCTION)) {
+ return true;
+ }
+
+ return getSelectedColumns().stream().anyMatch(SelectColumn::isAggregated);
+ }
- return fireQuery(singleEvent);
+ private boolean hasColumns() {
+ return queryParams.has(SupportedRestQueryParams.QP_COLUMNS)
+ && queryParams.getAsString(SupportedRestQueryParams.QP_COLUMNS) != null
+ &&
!queryParams.getAsString(SupportedRestQueryParams.QP_COLUMNS).isBlank();
}
- private String transformColumns(String rawQuery) {
- List<SelectColumn> columns =
- Arrays.stream(rawQuery.split(COMMA))
- .map(SelectColumn::fromApiQueryString)
- .toList();
- return columns.stream()
- .map(SelectColumn::getOriginalField)
- .collect(Collectors.joining(COMMA));
+ private boolean queryHasExplicitTimeAggregation() {
+ return queryParams.has(SupportedRestQueryParams.QP_TIME_INTERVAL);
}
- private String getSampleField(SpQueryResult result) {
- for (String column : result.getHeaders()) {
- if (!column.equals(TIMESTAMP_FIELD)) {
- return column;
- }
+ private int getMaxReturnLimit() {
+ Integer providedLimit =
queryParams.getAsInt(SupportedRestQueryParams.QP_MAXIMUM_AMOUNT_OF_EVENTS);
+ if (providedLimit == null || providedLimit <= 0) {
+ return DEFAULT_MAX_RETURN_LIMIT;
}
- throw new IllegalArgumentException("No columns present");
+ return providedLimit;
+ }
+
+ private String transformColumnsToRawSelection(String rawQuery) {
+ return Arrays.stream(rawQuery.split(COMMA))
+ .map(String::trim)
+ .map(SelectColumn::fromApiQueryString)
+ .map(SelectColumn::getOriginalField)
+ .collect(Collectors.joining(COMMA));
}
private long extractTimestamp(SpQueryResult result) throws ParseException {
int timestampIndex = result.getHeaders()
.indexOf(TIMESTAMP_FIELD);
- return tryParseDate(result.getAllDataSeries()
- .get(0)
- .getRows()
- .get(0)
- .get(timestampIndex)
- .toString()).getTime();
+ Object timestampValue = result.getAllDataSeries()
+ .get(0)
+ .getRows()
+ .get(0)
+ .get(timestampIndex);
+
+ if (timestampValue instanceof Number timestampNumber) {
+ return timestampNumber.longValue();
+ }
+
+ String timestampString = timestampValue.toString();
+ try {
+ return Long.parseLong(timestampString);
+ } catch (NumberFormatException e) {
+ return tryParseDate(timestampString).getTime();
+ }
}
private Date tryParseDate(String v) throws ParseException {
@@ -164,4 +194,39 @@ public class AutoAggregationHandler {
return isoFormatOnlySeconds.parse(v);
}
}
+
+ private ProvidedRestQueryParams makeBaseHelperQueryParams() {
+ ProvidedRestQueryParams helperParams = disableAutoAgg(new
ProvidedRestQueryParams(queryParams));
+ helperParams.remove(SupportedRestQueryParams.QP_TIME_INTERVAL);
+ helperParams.remove(SupportedRestQueryParams.QP_AGGREGATION_FUNCTION);
+ helperParams.remove(SupportedRestQueryParams.QP_GROUP_BY);
+ helperParams.remove(SupportedRestQueryParams.QP_LIMIT);
+ helperParams.remove(SupportedRestQueryParams.QP_OFFSET);
+ helperParams.remove(SupportedRestQueryParams.QP_PAGE);
+ helperParams.remove(SupportedRestQueryParams.QP_ORDER);
+ helperParams.remove(SupportedRestQueryParams.QP_COUNT_ONLY);
+ helperParams.remove(SupportedRestQueryParams.QP_MAXIMUM_AMOUNT_OF_EVENTS);
+ return helperParams;
+ }
+
+ private ProvidedRestQueryParams makeRawHelperQueryParams() {
+ ProvidedRestQueryParams helperParams = makeBaseHelperQueryParams();
+ helperParams.update(SupportedRestQueryParams.QP_COLUMNS,
+
transformColumnsToRawSelection(helperParams.getAsString(SupportedRestQueryParams.QP_COLUMNS)));
+ return helperParams;
+ }
+
+ private long ceilDiv(long dividend, long divisor) {
+ return (dividend + divisor - 1) / divisor;
+ }
+
+ private record QueryBoundary(SpQueryResult oldest, SpQueryResult newest) {
+ private static QueryBoundary empty() {
+ return new QueryBoundary(new SpQueryResult(), new SpQueryResult());
+ }
+
+ private boolean hasData() {
+ return oldest.getTotal() > 0 && newest.getTotal() > 0;
+ }
+ }
}
diff --git
a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/QueryResultProvider.java
b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/QueryResultProvider.java
index a05fb871cd..945043c360 100644
---
a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/QueryResultProvider.java
+++
b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/QueryResultProvider.java
@@ -54,7 +54,8 @@ public class QueryResultProvider {
public SpQueryResult getData() {
if (queryParams.has(SupportedRestQueryParams.QP_AUTO_AGGREGATE)) {
queryParams = new AutoAggregationHandler(queryParams,
-
dataExplorerQueryManagement).makeAutoAggregationQueryParams();
+ dataExplorerQueryManagement,
+
ignoreMissingData).makeAutoAggregationQueryParams();
}
SelectQueryParams qp =
ProvidedRestQueryParamConverter.getSelectQueryParams(queryParams);
diff --git
a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectColumn.java
b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectColumn.java
index 3e4fd1048c..c0fa517bbd 100644
---
a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectColumn.java
+++
b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectColumn.java
@@ -81,6 +81,10 @@ public class SelectColumn implements IQueryStatement {
return originalField;
}
+ public boolean isAggregated() {
+ return !simpleField;
+ }
+
public void setAggregationFunction(AggregationFunction aggregationFunction) {
this.aggregationFunction = aggregationFunction;
}
diff --git
a/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/AutoAggregationHandlerTest.java
b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/AutoAggregationHandlerTest.java
new file mode 100644
index 0000000000..1ff74310d4
--- /dev/null
+++
b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/AutoAggregationHandlerTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer;
+
+import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
+import org.apache.streampipes.model.datalake.DataSeries;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams;
+import org.apache.streampipes.model.datalake.param.SupportedRestQueryParams;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class AutoAggregationHandlerTest {
+
+ @Test
+ public void
testAutoAggregationAddsOneMillisecondBucketsWhenRawRowsFitClientLimit() {
+ IDataExplorerQueryManagement queryManagement =
mock(IDataExplorerQueryManagement.class);
+ ArgumentCaptor<ProvidedRestQueryParams> queryCaptor =
ArgumentCaptor.forClass(ProvidedRestQueryParams.class);
+
+ when(queryManagement.getData(any(ProvidedRestQueryParams.class),
eq(false)))
+ .thenReturn(makeSingleEventResult(1000L))
+ .thenReturn(makeSingleEventResult(3001L))
+ .thenReturn(makeRowsResult(3));
+
+ ProvidedRestQueryParams result = new
AutoAggregationHandler(makeAggregateParams(), queryManagement, false)
+ .makeAutoAggregationQueryParams();
+
+ verify(queryManagement, times(3)).getData(queryCaptor.capture(),
eq(false));
+
+ List<ProvidedRestQueryParams> capturedQueries = queryCaptor.getAllValues();
+ ProvidedRestQueryParams oldestQuery = capturedQueries.get(0);
+ assertEquals("1",
oldestQuery.getAsString(SupportedRestQueryParams.QP_LIMIT));
+ assertNull(oldestQuery.getAsString(SupportedRestQueryParams.QP_ORDER));
+ assertEquals("density,mass_flow,sensor_fault_flags",
+ oldestQuery.getAsString(SupportedRestQueryParams.QP_COLUMNS));
+ assertFalse(oldestQuery.has(SupportedRestQueryParams.QP_AUTO_AGGREGATE));
+ assertFalse(oldestQuery.has(SupportedRestQueryParams.QP_GROUP_BY));
+ assertFalse(oldestQuery.has(SupportedRestQueryParams.QP_TIME_INTERVAL));
+ assertFalse(oldestQuery.has(SupportedRestQueryParams.QP_PAGE));
+ assertFalse(oldestQuery.has(SupportedRestQueryParams.QP_OFFSET));
+
+ ProvidedRestQueryParams newestQuery = capturedQueries.get(1);
+ assertEquals("DESC",
newestQuery.getAsString(SupportedRestQueryParams.QP_ORDER));
+
+ ProvidedRestQueryParams sampleQuery = capturedQueries.get(2);
+ assertEquals("10001",
sampleQuery.getAsString(SupportedRestQueryParams.QP_LIMIT));
+
+ assertFalse(result.has(SupportedRestQueryParams.QP_AUTO_AGGREGATE));
+ assertEquals("1ms",
result.getAsString(SupportedRestQueryParams.QP_TIME_INTERVAL));
+ }
+
+ @Test
+ public void testAutoAggregationComputesSmallestBoundedBucketSizeAboveLimit()
{
+ IDataExplorerQueryManagement queryManagement =
mock(IDataExplorerQueryManagement.class);
+ ArgumentCaptor<ProvidedRestQueryParams> queryCaptor =
ArgumentCaptor.forClass(ProvidedRestQueryParams.class);
+
+ when(queryManagement.getData(any(ProvidedRestQueryParams.class), eq(true)))
+ .thenReturn(makeSingleEventResult(1_000L))
+ .thenReturn(makeSingleEventResult(10_000L))
+ .thenReturn(makeRowsResult(6));
+
+ ProvidedRestQueryParams result = new
AutoAggregationHandler(makeAggregateParamsWithClientLimit(5),
+ queryManagement,
+ true).makeAutoAggregationQueryParams();
+
+ verify(queryManagement, times(3)).getData(queryCaptor.capture(), eq(true));
+ assertEquals("6",
queryCaptor.getAllValues().get(2).getAsString(SupportedRestQueryParams.QP_LIMIT));
+ assertEquals("1801ms",
result.getAsString(SupportedRestQueryParams.QP_TIME_INTERVAL));
+ assertFalse(result.has(SupportedRestQueryParams.QP_AUTO_AGGREGATE));
+ }
+
+ @Test
+ public void testAutoAggregationLeavesSimpleColumnQueryUnchanged() {
+ IDataExplorerQueryManagement queryManagement =
mock(IDataExplorerQueryManagement.class);
+
+ when(queryManagement.getData(any(ProvidedRestQueryParams.class), eq(true)))
+ .thenReturn(makeSingleEventResult(1_000L))
+ .thenReturn(makeSingleEventResult(10_000L));
+
+ ProvidedRestQueryParams result = new
AutoAggregationHandler(makeSimpleParams(), queryManagement, true)
+ .makeAutoAggregationQueryParams();
+
+ assertFalse(result.has(SupportedRestQueryParams.QP_AUTO_AGGREGATE));
+ assertFalse(result.has(SupportedRestQueryParams.QP_TIME_INTERVAL));
+ verify(queryManagement,
times(2)).getData(any(ProvidedRestQueryParams.class), eq(true));
+ }
+
+ private ProvidedRestQueryParams makeAggregateParams() {
+ return makeAggregateParamsWithClientLimit(10_000);
+ }
+
+ private ProvidedRestQueryParams makeAggregateParamsWithClientLimit(int
maxEvents) {
+ var params = new HashMap<String, String>();
+ params.put(SupportedRestQueryParams.QP_START_DATE, "1000");
+ params.put(SupportedRestQueryParams.QP_END_DATE, "10000");
+ params.put(SupportedRestQueryParams.QP_COLUMNS,
+
"[density;MEAN;mean_density],[mass_flow;MEAN;mean_mass_flow],[sensor_fault_flags;MODE;mode_sensor_fault_flags]");
+ params.put(SupportedRestQueryParams.QP_AUTO_AGGREGATE, "true");
+ params.put(SupportedRestQueryParams.QP_GROUP_BY, "plant");
+ params.put(SupportedRestQueryParams.QP_LIMIT, "50");
+ params.put(SupportedRestQueryParams.QP_PAGE, "2");
+ params.put(SupportedRestQueryParams.QP_OFFSET, "100");
+ params.put(SupportedRestQueryParams.QP_ORDER, "DESC");
+ params.put(SupportedRestQueryParams.QP_MAXIMUM_AMOUNT_OF_EVENTS,
String.valueOf(maxEvents));
+ return new ProvidedRestQueryParams("Test", params);
+ }
+
+ private ProvidedRestQueryParams makeSimpleParams() {
+ var params = new HashMap<String, String>();
+ params.put(SupportedRestQueryParams.QP_START_DATE, "1000");
+ params.put(SupportedRestQueryParams.QP_END_DATE, "10000");
+ params.put(SupportedRestQueryParams.QP_COLUMNS, "density,mass_flow");
+ params.put(SupportedRestQueryParams.QP_AUTO_AGGREGATE, "true");
+ return new ProvidedRestQueryParams("Test", params);
+ }
+
+ private SpQueryResult makeSingleEventResult(long timestamp) {
+ SpQueryResult result = new SpQueryResult();
+ result.setHeaders(List.of("time", "density"));
+ result.addDataResult(new DataSeries(1,
+ List.of(List.of(timestamp, 42.0)),
+ List.of("time", "density"),
+ null));
+ result.setTotal(1);
+ return result;
+ }
+
+ private SpQueryResult makeRowsResult(int rows) {
+ SpQueryResult result = new SpQueryResult();
+ result.setHeaders(List.of("time", "density"));
+ result.addDataResult(new DataSeries(rows,
+ java.util.stream.IntStream.range(0, rows)
+ .mapToObj(index -> List.of((Object) (long) index, 42.0))
+ .toList(),
+ List.of("time", "density"),
+ null));
+ result.setTotal(rows);
+ return result;
+ }
+}
diff --git a/ui/cypress/support/utils/chart/ChartBtns.ts
b/ui/cypress/support/utils/chart/ChartBtns.ts
index 95c09cd292..ddd404bb03 100644
--- a/ui/cypress/support/utils/chart/ChartBtns.ts
+++ b/ui/cypress/support/utils/chart/ChartBtns.ts
@@ -258,6 +258,22 @@ export class ChartBtns {
return cy.dataCy('design-panel-data-settings-filter-value', {}, true);
}
+ public static aggregatedQueryTypeButton() {
+ return cy.dataCy('data-explorer-query-type-aggregated');
+ }
+
+ public static autoAggregateCheckbox() {
+ return cy.dataCy('data-explorer-auto-aggregate-checkbox');
+ }
+
+ public static ignoreTooMuchDataWarningCheckbox() {
+ return
cy.dataCy('data-explorer-ignore-too-much-data-warning-checkbox');
+ }
+
+ public static dataExplorerTablePaginator() {
+ return cy.dataCy('data-explorer-table-paginator');
+ }
+
public static matOptionByText(text: string | RegExp) {
return cy.get('mat-option').contains(text);
}
diff --git a/ui/cypress/support/utils/chart/ChartUtils.ts
b/ui/cypress/support/utils/chart/ChartUtils.ts
index 8498b045be..33752cf994 100644
--- a/ui/cypress/support/utils/chart/ChartUtils.ts
+++ b/ui/cypress/support/utils/chart/ChartUtils.ts
@@ -532,6 +532,24 @@ export class ChartUtils {
}
}
+ public static selectAggregatedQueryType() {
+ ChartBtns.aggregatedQueryTypeButton()
+ .find('input[type="radio"]')
+ .first()
+ .check({ force: true });
+ }
+
+ public static enableAutoAggregate() {
+ ChartBtns.autoAggregateCheckbox()
+ .find('input[type="checkbox"]')
+ .first()
+ .then($checkbox => {
+ if (!$checkbox.prop('checked')) {
+ cy.wrap($checkbox).check({ force: true });
+ }
+ });
+ }
+
/**
* Select visualization type
*/
diff --git a/ui/cypress/support/utils/chart/ChartWidgetTableUtils.ts
b/ui/cypress/support/utils/chart/ChartWidgetTableUtils.ts
index 96f2540001..9c58916837 100644
--- a/ui/cypress/support/utils/chart/ChartWidgetTableUtils.ts
+++ b/ui/cypress/support/utils/chart/ChartWidgetTableUtils.ts
@@ -35,4 +35,14 @@ export class ChartWidgetTableUtils {
public static checkAmountOfRows(amount: number) {
this.chartTableRows().should('have.length', amount);
}
+
+ public static paginatorRangeLabel() {
+ return cy
+ .dataCy('data-explorer-table-paginator')
+ .find('.mat-mdc-paginator-range-label');
+ }
+
+ public static checkTotalAmountOfRows(amount: number) {
+ this.paginatorRangeLabel().should('contain.text', amount.toString());
+ }
}
diff --git a/ui/cypress/support/utils/dataset/DataLakeSeedUtils.ts
b/ui/cypress/support/utils/dataset/DataLakeSeedUtils.ts
index 45fc8eaeed..a1806d6c68 100644
--- a/ui/cypress/support/utils/dataset/DataLakeSeedUtils.ts
+++ b/ui/cypress/support/utils/dataset/DataLakeSeedUtils.ts
@@ -86,6 +86,13 @@ interface JsonArrayFixtureImportOptions {
columnOverrides?: Record<string, ColumnOverride>;
}
+interface JsonArrayImportOptions {
+ records: Array<any>;
+ measurementName: string;
+ timestampColumn?: string;
+ columnOverrides?: Record<string, ColumnOverride>;
+}
+
interface ImportRequest {
csvConfig: CsvImportConfiguration;
headers: string[];
@@ -152,29 +159,40 @@ export class DataLakeSeedUtils {
options: JsonArrayFixtureImportOptions,
): Cypress.Chainable<CsvImportResult> {
return cy.fixture(options.fixture).then((records: Array<any>) => {
- const headers = this.extractHeaders(records);
- const rows = records.map(record =>
- headers.map(header =>
- this.serializeCell(record ? record[header] : undefined),
- ),
- );
- const timestampColumn = options.timestampColumn ?? headers[0];
-
- return this.previewAndImport({
- headers,
- rows,
- csvConfig: {
- delimiter: ';',
- decimalSeparator: '.',
- hasHeader: true,
- },
+ return this.importJsonArrayRecords({
+ records,
measurementName: options.measurementName,
- timestampColumn,
+ timestampColumn: options.timestampColumn,
columnOverrides: options.columnOverrides,
});
});
}
+ public static importJsonArrayRecords(
+ options: JsonArrayImportOptions,
+ ): Cypress.Chainable<CsvImportResult> {
+ const headers = this.extractHeaders(options.records);
+ const rows = options.records.map(record =>
+ headers.map(header =>
+ this.serializeCell(record ? record[header] : undefined),
+ ),
+ );
+ const timestampColumn = options.timestampColumn ?? headers[0];
+
+ return this.previewAndImport({
+ headers,
+ rows,
+ csvConfig: {
+ delimiter: ';',
+ decimalSeparator: '.',
+ hasHeader: true,
+ },
+ measurementName: options.measurementName,
+ timestampColumn,
+ columnOverrides: options.columnOverrides,
+ });
+ }
+
private static previewAndImport(options: {
headers: string[];
rows: string[][];
diff --git a/ui/cypress/tests/chart/autoAggregateTable.spec.ts
b/ui/cypress/tests/chart/autoAggregateTable.spec.ts
new file mode 100644
index 0000000000..d65ed8344f
--- /dev/null
+++ b/ui/cypress/tests/chart/autoAggregateTable.spec.ts
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import { ChartUtils } from '../../support/utils/chart/ChartUtils';
+import { ChartWidgetTableUtils } from
'../../support/utils/chart/ChartWidgetTableUtils';
+import { DataLakeSeedUtils } from
'../../support/utils/dataset/DataLakeSeedUtils';
+
+describe('Test auto aggregate table result size', () => {
+ const eventCount = 10050;
+ const maximumAmountOfEvents = 10000;
+ const expectedAutoAggregatedRows = 2001;
+
+ beforeEach('Setup Test', () => {
+ cy.initStreamPipesTest();
+ DataLakeSeedUtils.importJsonArrayRecords({
+ records: makeAutoAggregateRecords(
+ eventCount,
+ maximumAmountOfEvents,
+ ),
+ measurementName: ChartUtils.ADAPTER_NAME,
+ timestampColumn: 'timestamp',
+ });
+ });
+
+ it('Aggregates large one-second series to the client limit', () => {
+ cy.intercept(
+ 'GET',
+
'**/streampipes-backend/api/v4/datalake/measurements/datalake_configuration*',
+ req => {
+ if (req.query.autoAggregate === 'true') {
+ req.alias = 'autoAggregateQuery';
+ }
+ },
+ );
+
+ ChartUtils.addDataViewAndTableWidget(
+ 'Auto aggregate table',
+ ChartUtils.ADAPTER_NAME,
+ );
+ ChartUtils.selectDataConfig();
+ ChartUtils.selectAggregatedQueryType();
+ ChartUtils.enableAutoAggregate();
+
+ cy.wait('@autoAggregateQuery').then(({ request, response }) => {
+ const query = request.query as Record<string, string>;
+ expect(query.autoAggregate).to.equal('true');
+ expect(query.maximumAmountOfEvents).to.be.undefined;
+ expect(response?.body.total).to.equal(expectedAutoAggregatedRows);
+ });
+
+ ChartWidgetTableUtils.checkAmountOfRows(20);
+ ChartWidgetTableUtils.checkTotalAmountOfRows(
+ expectedAutoAggregatedRows,
+ );
+ });
+});
+
+function makeAutoAggregateRecords(
+ totalEvents: number,
+ maxAmountOfEvents: number,
+) {
+ const eventIntervalMs = 1000;
+ const coveredRange = (totalEvents - 1) * eventIntervalMs + 1;
+ const aggregationInterval = Math.max(
+ 1,
+ ceilDiv(coveredRange, maxAmountOfEvents),
+ );
+ const baseTimestamp =
+ Math.floor(Date.UTC(2025, 0, 1, 0, 0, 0) / aggregationInterval) *
+ aggregationInterval;
+
+ return Array.from({ length: totalEvents }, (_, index) => ({
+ timestamp: baseTimestamp + index * eventIntervalMs,
+ density: Number((40 + (index % 25) * 0.1).toFixed(3)),
+ mass_flow: Number((5 + (index % 40) * 0.01).toFixed(3)),
+ sensor_fault_flags: index % 1200 === 0,
+ sensorId: `flowrate-${String((index % 4) + 1).padStart(2, '0')}`,
+ temperature: Number((65 + (index % 30) * 0.2).toFixed(3)),
+ volume_flow: Number((5.5 + (index % 50) * 0.015).toFixed(3)),
+ }));
+}
+
+function ceilDiv(dividend: number, divisor: number) {
+ return Math.floor((dividend + divisor - 1) / divisor);
+}
diff --git
a/ui/projects/streampipes/platform-services/src/lib/query/data-view-query-generator.service.ts
b/ui/projects/streampipes/platform-services/src/lib/query/data-view-query-generator.service.ts
index 8a08e535d0..ccb3e33bcc 100644
---
a/ui/projects/streampipes/platform-services/src/lib/query/data-view-query-generator.service.ts
+++
b/ui/projects/streampipes/platform-services/src/lib/query/data-view-query-generator.service.ts
@@ -146,12 +146,12 @@ export class DataViewQueryGeneratorService {
queryBuilder.withMissingValueBehaviour('empty');
}
- const dataLakeQueryParameter = queryBuilder.build();
-
- if (maximumResultingEvents !== -1) {
+ if (maximumResultingEvents !== -1 && !queryConfig.autoAggregate) {
queryBuilder.withMaximumAmountOfEvents(maximumResultingEvents);
}
+ const dataLakeQueryParameter = queryBuilder.build();
+
if (includeMeasureName) {
dataLakeQueryParameter.measureName = sourceConfig.measureName;
}
diff --git
a/ui/src/app/chart-shared/components/charts/table/table-widget.component.html
b/ui/src/app/chart-shared/components/charts/table/table-widget.component.html
index ceb7795847..397ac9c776 100644
---
a/ui/src/app/chart-shared/components/charts/table/table-widget.component.html
+++
b/ui/src/app/chart-shared/components/charts/table/table-widget.component.html
@@ -171,6 +171,7 @@
</div>
<mat-paginator
+ data-cy="data-explorer-table-paginator"
color="accent"
[length]="filteredRows.length"
[pageIndex]="pageIndex"
diff --git
a/ui/src/app/chart/components/chart-view/designer-panel/data-settings/chart-data-settings.component.html
b/ui/src/app/chart/components/chart-view/designer-panel/data-settings/chart-data-settings.component.html
index 5b23222e1e..115e1f8656 100644
---
a/ui/src/app/chart/components/chart-view/designer-panel/data-settings/chart-data-settings.component.html
+++
b/ui/src/app/chart/components/chart-view/designer-panel/data-settings/chart-data-settings.component.html
@@ -272,6 +272,7 @@
sourceConfig.queryType === 'aggregated'
) {
<div
+
data-cy="data-explorer-query-type-aggregated"
class="radio-option-box"
fxLayout="column"
[ngClass]="{
@@ -301,7 +302,9 @@
class="selection-form-field"
fxFlex="100"
>
- <div>
+ <div
+
data-cy="data-explorer-auto-aggregate-checkbox"
+ >
<mat-checkbox
color="accent"
[(ngModel)]="
@@ -409,9 +412,11 @@
</button>
</div>
<div class="p-10">
- <mat-checkbox [(ngModel)]="dataConfig.ignoreTooMuchDataWarning">
- {{ 'Deactivate browser overload warning' | translate }}
- </mat-checkbox>
+ <div data-cy="data-explorer-ignore-too-much-data-warning-checkbox">
+ <mat-checkbox [(ngModel)]="dataConfig.ignoreTooMuchDataWarning">
+ {{ 'Deactivate browser overload warning' | translate }}
+ </mat-checkbox>
+ </div>
<mat-checkbox
[(ngModel)]="dataConfig.ignoreMissingValues"
(change)="triggerDataRefresh()"