This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 68899a30e4 [timeseries] Fix Time Series Query Correctness Issue (#14251) 68899a30e4 is described below commit 68899a30e4848d787c85b3ae966f73fb755182f4 Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Mon Oct 21 11:03:07 2024 -0500 [timeseries] Fix Time Series Query Correctness Issue (#14251) --- .../timeseries/TimeSeriesAggregationOperator.java | 81 ++++++++++----------- .../core/query/executor/QueryExecutorTest.java | 20 +++-- .../src/test/resources/data/sampleEatsData.avro | Bin 551915 -> 0 bytes .../src/test/resources/data/sampleEatsData30k.avro | Bin 0 -> 327494 bytes 4 files changed, 53 insertions(+), 48 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java index a47a05e83a..6202cf808c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java @@ -82,55 +82,54 @@ public class TimeSeriesAggregationOperator extends BaseOperator<TimeSeriesResult @Override protected TimeSeriesResultsBlock getNextBlock() { - ValueBlock transformBlock = _projectOperator.nextBlock(); - if (transformBlock == null) { - TimeSeriesBuilderBlock builderBlock = new TimeSeriesBuilderBlock(_timeBuckets, new HashMap<>()); - return new TimeSeriesResultsBlock(builderBlock); - } - BlockValSet blockValSet = transformBlock.getBlockValueSet(_timeColumn); - long[] timeValues = blockValSet.getLongValuesSV(); - if (_timeOffset != null && _timeOffset != 0L) { - timeValues = applyTimeshift(_timeOffset, timeValues); - } - int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit); - Object[][] tagValues = new Object[_groupByExpressions.size()][]; + ValueBlock valueBlock; Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>(1024); - for (int i = 0; i < _groupByExpressions.size(); i++) { - blockValSet = transformBlock.getBlockValueSet(_groupByExpressions.get(i)); - switch (blockValSet.getValueType()) { - case JSON: - case STRING: - tagValues[i] = blockValSet.getStringValuesSV(); - break; + while ((valueBlock = _projectOperator.nextBlock()) != null) { + // TODO: This is quite unoptimized and allocates liberally + BlockValSet blockValSet = valueBlock.getBlockValueSet(_timeColumn); + long[] timeValues = blockValSet.getLongValuesSV(); + if (_timeOffset != null && _timeOffset != 0L) { + timeValues = applyTimeshift(_timeOffset, timeValues); + } + int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit); + Object[][] tagValues = new Object[_groupByExpressions.size()][]; + for (int i = 0; i < _groupByExpressions.size(); i++) { + blockValSet = valueBlock.getBlockValueSet(_groupByExpressions.get(i)); + switch (blockValSet.getValueType()) { + case JSON: + case STRING: + tagValues[i] = blockValSet.getStringValuesSV(); + break; + case LONG: + tagValues[i] = ArrayUtils.toObject(blockValSet.getLongValuesSV()); + break; + case INT: + tagValues[i] = ArrayUtils.toObject(blockValSet.getIntValuesSV()); + break; + default: + throw new NotImplementedException("Can't handle types other than string and long"); + } + } + BlockValSet valueExpressionBlockValSet = valueBlock.getBlockValueSet(_valueExpression); + switch (valueExpressionBlockValSet.getValueType()) { case LONG: - tagValues[i] = ArrayUtils.toObject(blockValSet.getLongValuesSV()); + processLongExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues); break; case INT: - tagValues[i] = ArrayUtils.toObject(blockValSet.getIntValuesSV()); + processIntExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues); + break; + case DOUBLE: + processDoubleExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues); + break; + case STRING: + processStringExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues); break; default: - throw new NotImplementedException("Can't handle types other than string and long"); + // TODO: Support other types? + throw new IllegalStateException( + "Don't yet support value expression of type: " + valueExpressionBlockValSet.getValueType()); } } - BlockValSet valueExpressionBlockValSet = transformBlock.getBlockValueSet(_valueExpression); - switch (valueExpressionBlockValSet.getValueType()) { - case LONG: - processLongExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues); - break; - case INT: - processIntExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues); - break; - case DOUBLE: - processDoubleExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues); - break; - case STRING: - processStringExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues); - break; - default: - // TODO: Support other types? - throw new IllegalStateException( - "Don't yet support value expression of type: " + valueExpressionBlockValSet.getValueType()); - } return new TimeSeriesResultsBlock(new TimeSeriesBuilderBlock(_timeBuckets, seriesBuilderMap)); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java index f7f747a170..457e916b6d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java @@ -86,7 +86,7 @@ import static org.testng.Assert.assertTrue; public class QueryExecutorTest { - private static final String AVRO_DATA_PATH = "data/sampleEatsData.avro"; + private static final String AVRO_DATA_PATH = "data/sampleEatsData30k.avro"; private static final String EMPTY_JSON_DATA_PATH = "data/test_empty_data.json"; private static final String QUERY_EXECUTOR_CONFIG_PATH = "conf/query-executor.properties"; private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "QueryExecutorTest"); @@ -182,7 +182,7 @@ public class QueryExecutorTest { instanceRequest.setSearchSegments(_segmentNames); InstanceResponseBlock instanceResponse = _queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS); assertTrue(instanceResponse.getResultsBlock() instanceof AggregationResultsBlock); - assertEquals(((AggregationResultsBlock) instanceResponse.getResultsBlock()).getResults().get(0), 20000L); + assertEquals(((AggregationResultsBlock) instanceResponse.getResultsBlock()).getResults().get(0), 60000L); } @Test @@ -192,7 +192,7 @@ public class QueryExecutorTest { instanceRequest.setSearchSegments(_segmentNames); InstanceResponseBlock instanceResponse = _queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS); assertTrue(instanceResponse.getResultsBlock() instanceof AggregationResultsBlock); - assertEquals(((AggregationResultsBlock) instanceResponse.getResultsBlock()).getResults().get(0), 40102.0); + assertEquals(((AggregationResultsBlock) instanceResponse.getResultsBlock()).getResults().get(0), 120306.0); } @Test @@ -217,19 +217,20 @@ public class QueryExecutorTest { @Test public void testTimeSeriesSumQuery() { - TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100); + TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofHours(2), 1); ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderAmount"); TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */, valueExpression, new AggInfo("SUM", null)); - QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext); + QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext, Collections.emptyList()); ServerQueryRequest serverQueryRequest = new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS); assertTrue(instanceResponse.getResultsBlock() instanceof TimeSeriesResultsBlock); TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock(); TimeSeriesBlock timeSeriesBlock = resultsBlock.getTimeSeriesBuilderBlock().build(); - assertEquals(5, timeSeriesBlock.getSeriesMap().size()); + assertEquals(timeSeriesBlock.getSeriesMap().size(), 1); + assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[0], 29885544.0); } @Test @@ -309,12 +310,17 @@ public class QueryExecutorTest { } private QueryContext getQueryContextForTimeSeries(TimeSeriesContext context) { + return getQueryContextForTimeSeries(context, Collections.singletonList( + ExpressionContext.forIdentifier("cityName"))); + } + + private QueryContext getQueryContextForTimeSeries(TimeSeriesContext context, List<ExpressionContext> groupBy) { QueryContext.Builder builder = new QueryContext.Builder(); builder.setTableName(OFFLINE_TABLE_NAME); builder.setTimeSeriesContext(context); builder.setAliasList(Collections.emptyList()); builder.setSelectExpressions(Collections.emptyList()); - builder.setGroupByExpressions(Collections.singletonList(ExpressionContext.forIdentifier("cityName"))); + builder.setGroupByExpressions(groupBy); return builder.build(); } } diff --git a/pinot-core/src/test/resources/data/sampleEatsData.avro b/pinot-core/src/test/resources/data/sampleEatsData.avro deleted file mode 100644 index 0dcc7928ef..0000000000 Binary files a/pinot-core/src/test/resources/data/sampleEatsData.avro and /dev/null differ diff --git a/pinot-core/src/test/resources/data/sampleEatsData30k.avro b/pinot-core/src/test/resources/data/sampleEatsData30k.avro new file mode 100644 index 0000000000..e63388614c Binary files /dev/null and b/pinot-core/src/test/resources/data/sampleEatsData30k.avro differ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org