This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 68899a30e4 [timeseries] Fix Time Series Query Correctness Issue 
(#14251)
68899a30e4 is described below

commit 68899a30e4848d787c85b3ae966f73fb755182f4
Author: Ankit Sultana <ankitsult...@uber.com>
AuthorDate: Mon Oct 21 11:03:07 2024 -0500

    [timeseries] Fix Time Series Query Correctness Issue (#14251)
---
 .../timeseries/TimeSeriesAggregationOperator.java  |  81 ++++++++++-----------
 .../core/query/executor/QueryExecutorTest.java     |  20 +++--
 .../src/test/resources/data/sampleEatsData.avro    | Bin 551915 -> 0 bytes
 .../src/test/resources/data/sampleEatsData30k.avro | Bin 0 -> 327494 bytes
 4 files changed, 53 insertions(+), 48 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
index a47a05e83a..6202cf808c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
@@ -82,55 +82,54 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
 
   @Override
   protected TimeSeriesResultsBlock getNextBlock() {
-    ValueBlock transformBlock = _projectOperator.nextBlock();
-    if (transformBlock == null) {
-      TimeSeriesBuilderBlock builderBlock = new 
TimeSeriesBuilderBlock(_timeBuckets, new HashMap<>());
-      return new TimeSeriesResultsBlock(builderBlock);
-    }
-    BlockValSet blockValSet = transformBlock.getBlockValueSet(_timeColumn);
-    long[] timeValues = blockValSet.getLongValuesSV();
-    if (_timeOffset != null && _timeOffset != 0L) {
-      timeValues = applyTimeshift(_timeOffset, timeValues);
-    }
-    int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit);
-    Object[][] tagValues = new Object[_groupByExpressions.size()][];
+    ValueBlock valueBlock;
     Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>(1024);
-    for (int i = 0; i < _groupByExpressions.size(); i++) {
-      blockValSet = 
transformBlock.getBlockValueSet(_groupByExpressions.get(i));
-      switch (blockValSet.getValueType()) {
-        case JSON:
-        case STRING:
-          tagValues[i] = blockValSet.getStringValuesSV();
-          break;
+    while ((valueBlock = _projectOperator.nextBlock()) != null) {
+      // TODO: This is quite unoptimized and allocates liberally
+      BlockValSet blockValSet = valueBlock.getBlockValueSet(_timeColumn);
+      long[] timeValues = blockValSet.getLongValuesSV();
+      if (_timeOffset != null && _timeOffset != 0L) {
+        timeValues = applyTimeshift(_timeOffset, timeValues);
+      }
+      int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit);
+      Object[][] tagValues = new Object[_groupByExpressions.size()][];
+      for (int i = 0; i < _groupByExpressions.size(); i++) {
+        blockValSet = valueBlock.getBlockValueSet(_groupByExpressions.get(i));
+        switch (blockValSet.getValueType()) {
+          case JSON:
+          case STRING:
+            tagValues[i] = blockValSet.getStringValuesSV();
+            break;
+          case LONG:
+            tagValues[i] = ArrayUtils.toObject(blockValSet.getLongValuesSV());
+            break;
+          case INT:
+            tagValues[i] = ArrayUtils.toObject(blockValSet.getIntValuesSV());
+            break;
+          default:
+            throw new NotImplementedException("Can't handle types other than 
string and long");
+        }
+      }
+      BlockValSet valueExpressionBlockValSet = 
valueBlock.getBlockValueSet(_valueExpression);
+      switch (valueExpressionBlockValSet.getValueType()) {
         case LONG:
-          tagValues[i] = ArrayUtils.toObject(blockValSet.getLongValuesSV());
+          processLongExpression(valueExpressionBlockValSet, seriesBuilderMap, 
timeValueIndexes, tagValues);
           break;
         case INT:
-          tagValues[i] = ArrayUtils.toObject(blockValSet.getIntValuesSV());
+          processIntExpression(valueExpressionBlockValSet, seriesBuilderMap, 
timeValueIndexes, tagValues);
+          break;
+        case DOUBLE:
+          processDoubleExpression(valueExpressionBlockValSet, 
seriesBuilderMap, timeValueIndexes, tagValues);
+          break;
+        case STRING:
+          processStringExpression(valueExpressionBlockValSet, 
seriesBuilderMap, timeValueIndexes, tagValues);
           break;
         default:
-          throw new NotImplementedException("Can't handle types other than 
string and long");
+          // TODO: Support other types?
+          throw new IllegalStateException(
+              "Don't yet support value expression of type: " + 
valueExpressionBlockValSet.getValueType());
       }
     }
-    BlockValSet valueExpressionBlockValSet = 
transformBlock.getBlockValueSet(_valueExpression);
-    switch (valueExpressionBlockValSet.getValueType()) {
-      case LONG:
-        processLongExpression(valueExpressionBlockValSet, seriesBuilderMap, 
timeValueIndexes, tagValues);
-        break;
-      case INT:
-        processIntExpression(valueExpressionBlockValSet, seriesBuilderMap, 
timeValueIndexes, tagValues);
-        break;
-      case DOUBLE:
-        processDoubleExpression(valueExpressionBlockValSet, seriesBuilderMap, 
timeValueIndexes, tagValues);
-        break;
-      case STRING:
-        processStringExpression(valueExpressionBlockValSet, seriesBuilderMap, 
timeValueIndexes, tagValues);
-        break;
-      default:
-        // TODO: Support other types?
-        throw new IllegalStateException(
-            "Don't yet support value expression of type: " + 
valueExpressionBlockValSet.getValueType());
-    }
     return new TimeSeriesResultsBlock(new TimeSeriesBuilderBlock(_timeBuckets, 
seriesBuilderMap));
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index f7f747a170..457e916b6d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -86,7 +86,7 @@ import static org.testng.Assert.assertTrue;
 
 
 public class QueryExecutorTest {
-  private static final String AVRO_DATA_PATH = "data/sampleEatsData.avro";
+  private static final String AVRO_DATA_PATH = "data/sampleEatsData30k.avro";
   private static final String EMPTY_JSON_DATA_PATH = 
"data/test_empty_data.json";
   private static final String QUERY_EXECUTOR_CONFIG_PATH = 
"conf/query-executor.properties";
   private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"QueryExecutorTest");
@@ -182,7 +182,7 @@ public class QueryExecutorTest {
     instanceRequest.setSearchSegments(_segmentNames);
     InstanceResponseBlock instanceResponse = 
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
     assertTrue(instanceResponse.getResultsBlock() instanceof 
AggregationResultsBlock);
-    assertEquals(((AggregationResultsBlock) 
instanceResponse.getResultsBlock()).getResults().get(0), 20000L);
+    assertEquals(((AggregationResultsBlock) 
instanceResponse.getResultsBlock()).getResults().get(0), 60000L);
   }
 
   @Test
@@ -192,7 +192,7 @@ public class QueryExecutorTest {
     instanceRequest.setSearchSegments(_segmentNames);
     InstanceResponseBlock instanceResponse = 
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
     assertTrue(instanceResponse.getResultsBlock() instanceof 
AggregationResultsBlock);
-    assertEquals(((AggregationResultsBlock) 
instanceResponse.getResultsBlock()).getResults().get(0), 40102.0);
+    assertEquals(((AggregationResultsBlock) 
instanceResponse.getResultsBlock()).getResults().get(0), 120306.0);
   }
 
   @Test
@@ -217,19 +217,20 @@ public class QueryExecutorTest {
 
   @Test
   public void testTimeSeriesSumQuery() {
-    TimeBuckets timeBuckets = 
TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100);
+    TimeBuckets timeBuckets = 
TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofHours(2), 1);
     ExpressionContext valueExpression = 
ExpressionContext.forIdentifier("orderAmount");
     TimeSeriesContext timeSeriesContext =
         new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, 
TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets,
             0L /* offsetSeconds */, valueExpression, new AggInfo("SUM", null));
-    QueryContext queryContext = 
getQueryContextForTimeSeries(timeSeriesContext);
+    QueryContext queryContext = 
getQueryContextForTimeSeries(timeSeriesContext, Collections.emptyList());
     ServerQueryRequest serverQueryRequest =
         new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), 
ServerMetrics.get());
     InstanceResponseBlock instanceResponse = 
_queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS);
     assertTrue(instanceResponse.getResultsBlock() instanceof 
TimeSeriesResultsBlock);
     TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) 
instanceResponse.getResultsBlock();
     TimeSeriesBlock timeSeriesBlock = 
resultsBlock.getTimeSeriesBuilderBlock().build();
-    assertEquals(5, timeSeriesBlock.getSeriesMap().size());
+    assertEquals(timeSeriesBlock.getSeriesMap().size(), 1);
+    
assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[0],
 29885544.0);
   }
 
   @Test
@@ -309,12 +310,17 @@ public class QueryExecutorTest {
   }
 
   private QueryContext getQueryContextForTimeSeries(TimeSeriesContext context) 
{
+    return getQueryContextForTimeSeries(context, Collections.singletonList(
+        ExpressionContext.forIdentifier("cityName")));
+  }
+
+  private QueryContext getQueryContextForTimeSeries(TimeSeriesContext context, 
List<ExpressionContext> groupBy) {
     QueryContext.Builder builder = new QueryContext.Builder();
     builder.setTableName(OFFLINE_TABLE_NAME);
     builder.setTimeSeriesContext(context);
     builder.setAliasList(Collections.emptyList());
     builder.setSelectExpressions(Collections.emptyList());
-    
builder.setGroupByExpressions(Collections.singletonList(ExpressionContext.forIdentifier("cityName")));
+    builder.setGroupByExpressions(groupBy);
     return builder.build();
   }
 }
diff --git a/pinot-core/src/test/resources/data/sampleEatsData.avro 
b/pinot-core/src/test/resources/data/sampleEatsData.avro
deleted file mode 100644
index 0dcc7928ef..0000000000
Binary files a/pinot-core/src/test/resources/data/sampleEatsData.avro and 
/dev/null differ
diff --git a/pinot-core/src/test/resources/data/sampleEatsData30k.avro 
b/pinot-core/src/test/resources/data/sampleEatsData30k.avro
new file mode 100644
index 0000000000..e63388614c
Binary files /dev/null and 
b/pinot-core/src/test/resources/data/sampleEatsData30k.avro differ


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to