This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 159fc6716e [timeseries] Add Support for Passing Raw Time Values to 
Leaf Stage (#15000)
159fc6716e is described below

commit 159fc6716efa1cd50dfa9d7f59ab4c7092e38935
Author: Ankit Sultana <ankitsult...@uber.com>
AuthorDate: Thu Feb 6 23:22:31 2025 -0600

    [timeseries] Add Support for Passing Raw Time Values to Leaf Stage (#15000)
---
 .../function/TimeSeriesAggregationFunction.java    | 99 ++++++++++++++++------
 .../core/query/executor/QueryExecutorTest.java     |  6 +-
 .../PhysicalTimeSeriesServerPlanVisitor.java       |  7 +-
 .../tsdb/spi/series/BaseTimeSeriesBuilder.java     | 17 ++++
 4 files changed, 94 insertions(+), 35 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java
index 79feb866cb..239fa4a556 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java
@@ -24,7 +24,9 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.request.Literal;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FunctionContext;
@@ -44,8 +46,33 @@ import 
org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider;
 
 
 /**
- * Aggregation function used by the Time Series Engine.
- * TODO: This can't be used with SQL because the Object Serde is not 
implemented.
+ * Aggregation function used by the Time Series Engine. This can't be used 
with SQL because the Object Serde is not yet
+ * implemented. Though we don't plan on exposing this function anytime soon.
+ * <h2>Converting Time Values to Bucket Indexes</h2>
+ * <p>
+ *   This aggregation function will map each scanned data point to a time 
bucket index. This is done using the
+ *   formula: {@code ((timeValue + timeOffset) - timeReferencePoint - 1) / 
bucketSize}. The entire calculation is done
+ *   in the Time Unit (seconds, ms, etc.) of the timeValue returned by the 
time expression chosen by the user.
+ *   The method used to add values to the series builders is:
+ *   {@link BaseTimeSeriesBuilder#addValueAtIndex(int, Double, long)}.
+ * </p>
+ * <p>
+ *   The formula originates from the fact that we use half-open time 
intervals, which are open on the left.
+ *   The timeReferencePoint is usually the start of the time-range being 
scanned. Assuming everything is in seconds,
+ *   the time buckets can generally thought to look something like the 
following:
+ *   <pre>
+ *     (timeReferencePoint, timeReferencePoint + bucketSize]
+ *     (timeReferencePoint + bucketSize, timeReferencePoint + 2 * bucketSize]
+ *     ...
+ *     (timeReferencePoint + (numBuckets - 1) * bucketSize, timeReferencePoint 
+ numBuckets * bucketSize]
+ *   </pre>
+ * </p>
+ * <p>
+ *   Also, note that the timeReferencePoint is simply calculated as follows:
+ *   <pre>
+ *     timeReferencePointInSeconds = firstBucketValue - bucketSizeInSeconds
+ *   </pre>
+ * </p>
  */
 public class TimeSeriesAggregationFunction implements 
AggregationFunction<BaseTimeSeriesBuilder, DoubleArrayList> {
   private final TimeSeriesBuilderFactory _factory;
@@ -53,32 +80,40 @@ public class TimeSeriesAggregationFunction implements 
AggregationFunction<BaseTi
   private final ExpressionContext _valueExpression;
   private final ExpressionContext _timeExpression;
   private final TimeBuckets _timeBuckets;
+  private final long _timeReferencePoint;
+  private final long _timeOffset;
+  private final long _timeBucketDivisor;
 
   /**
    * Arguments are as shown below:
    * <pre>
-   *   timeSeriesAggregate("m3ql", "MIN", valueExpr, timeBucketExpr, 
firstBucketValue, bucketLenSeconds, numBuckets,
-   *      "aggParam1=value1")
+   *   timeSeriesAggregate("m3ql", "MIN", valueExpr, timeExpr, timeUnit, 
offsetSeconds, firstBucketValue,
+   *       bucketLenSeconds, numBuckets, "aggParam1=value1")
    * </pre>
    */
   public TimeSeriesAggregationFunction(List<ExpressionContext> arguments) {
-    // Initialize everything
-    Preconditions.checkArgument(arguments.size() == 8, "Expected 8 arguments 
for time-series agg");
+    // Initialize temporary variables.
+    Preconditions.checkArgument(arguments.size() == 10, "Expected 10 arguments 
for time-series agg");
     String language = arguments.get(0).getLiteral().getStringValue();
     String aggFunctionName = arguments.get(1).getLiteral().getStringValue();
     ExpressionContext valueExpression = arguments.get(2);
-    ExpressionContext bucketIndexReturningExpr = arguments.get(3);
-    long firstBucketValue = arguments.get(4).getLiteral().getLongValue();
-    long bucketWindowSeconds = arguments.get(5).getLiteral().getLongValue();
-    int numBuckets = arguments.get(6).getLiteral().getIntValue();
-    Map<String, String> aggParams = 
AggInfo.deserializeParams(arguments.get(7).getLiteral().getStringValue());
+    ExpressionContext timeExpression = arguments.get(3);
+    TimeUnit timeUnit = 
TimeUnit.valueOf(arguments.get(4).getLiteral().getStringValue().toUpperCase(Locale.ENGLISH));
+    long offsetSeconds = arguments.get(5).getLiteral().getLongValue();
+    long firstBucketValue = arguments.get(6).getLiteral().getLongValue();
+    long bucketWindowSeconds = arguments.get(7).getLiteral().getLongValue();
+    int numBuckets = arguments.get(8).getLiteral().getIntValue();
+    Map<String, String> aggParams = 
AggInfo.deserializeParams(arguments.get(9).getLiteral().getStringValue());
     AggInfo aggInfo = new AggInfo(aggFunctionName, true /* is partial agg */, 
aggParams);
     // Set all values
     _factory = 
TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(language);
     _valueExpression = valueExpression;
-    _timeExpression = bucketIndexReturningExpr;
+    _timeExpression = timeExpression;
     _timeBuckets = TimeBuckets.ofSeconds(firstBucketValue, 
Duration.ofSeconds(bucketWindowSeconds), numBuckets);
     _aggInfo = aggInfo;
+    _timeReferencePoint = timeUnit.convert(Duration.ofSeconds(firstBucketValue 
- bucketWindowSeconds));
+    _timeOffset = timeUnit.convert(Duration.ofSeconds(offsetSeconds));
+    _timeBucketDivisor = timeUnit.convert(_timeBuckets.getBucketSize());
   }
 
   @Override
@@ -109,16 +144,16 @@ public class TimeSeriesAggregationFunction implements 
AggregationFunction<BaseTi
   @Override
   public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    int[] timeIndexes = blockValSetMap.get(_timeExpression).getIntValuesSV();
+    final long[] timeValues = 
blockValSetMap.get(_timeExpression).getLongValuesSV();
     BlockValSet valueBlockValSet = blockValSetMap.get(_valueExpression);
     switch (valueBlockValSet.getValueType()) {
       case DOUBLE:
       case LONG:
       case INT:
-        aggregateNumericValues(length, timeIndexes, aggregationResultHolder, 
valueBlockValSet);
+        aggregateNumericValues(length, timeValues, aggregationResultHolder, 
valueBlockValSet);
         break;
       case STRING:
-        aggregateStringValues(length, timeIndexes, aggregationResultHolder, 
valueBlockValSet);
+        aggregateStringValues(length, timeValues, aggregationResultHolder, 
valueBlockValSet);
         break;
       default:
         throw new UnsupportedOperationException(String.format("Unsupported 
type: %s in aggregate",
@@ -129,16 +164,16 @@ public class TimeSeriesAggregationFunction implements 
AggregationFunction<BaseTi
   @Override
   public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    final int[] timeIndexes = 
blockValSetMap.get(_timeExpression).getIntValuesSV();
+    final long[] timeValues = 
blockValSetMap.get(_timeExpression).getLongValuesSV();
     BlockValSet valueBlockValSet = blockValSetMap.get(_valueExpression);
     switch (valueBlockValSet.getValueType()) {
       case DOUBLE:
       case LONG:
       case INT:
-        aggregateGroupByNumericValues(length, groupKeyArray, timeIndexes, 
groupByResultHolder, valueBlockValSet);
+        aggregateGroupByNumericValues(length, groupKeyArray, timeValues, 
groupByResultHolder, valueBlockValSet);
         break;
       case STRING:
-        aggregateGroupByStringValues(length, groupKeyArray, timeIndexes, 
groupByResultHolder, valueBlockValSet);
+        aggregateGroupByStringValues(length, groupKeyArray, timeValues, 
groupByResultHolder, valueBlockValSet);
         break;
       default:
         throw new UnsupportedOperationException(String.format("Unsupported 
type: %s in aggregate",
@@ -189,7 +224,7 @@ public class TimeSeriesAggregationFunction implements 
AggregationFunction<BaseTi
     return "TIME_SERIES";
   }
 
-  private void aggregateNumericValues(int length, int[] timeIndexes, 
AggregationResultHolder resultHolder,
+  private void aggregateNumericValues(int length, long[] timeValues, 
AggregationResultHolder resultHolder,
       BlockValSet blockValSet) {
     double[] values = blockValSet.getDoubleValuesSV();
     BaseTimeSeriesBuilder currentSeriesBuilder = resultHolder.getResult();
@@ -198,12 +233,14 @@ public class TimeSeriesAggregationFunction implements 
AggregationFunction<BaseTi
           BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, 
BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES);
       resultHolder.setValue(currentSeriesBuilder);
     }
+    int timeIndex;
     for (int docIndex = 0; docIndex < length; docIndex++) {
-      currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], 
values[docIndex]);
+      timeIndex = (int) (((timeValues[docIndex] + _timeOffset) - 
_timeReferencePoint - 1) / _timeBucketDivisor);
+      currentSeriesBuilder.addValueAtIndex(timeIndex, values[docIndex], 
timeValues[docIndex]);
     }
   }
 
-  private void aggregateStringValues(int length, int[] timeIndexes, 
AggregationResultHolder resultHolder,
+  private void aggregateStringValues(int length, long[] timeValues, 
AggregationResultHolder resultHolder,
       BlockValSet blockValSet) {
     String[] values = blockValSet.getStringValuesSV();
     BaseTimeSeriesBuilder currentSeriesBuilder = resultHolder.getResult();
@@ -212,14 +249,17 @@ public class TimeSeriesAggregationFunction implements 
AggregationFunction<BaseTi
           BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, 
BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES);
       resultHolder.setValue(currentSeriesBuilder);
     }
+    int timeIndex;
     for (int docIndex = 0; docIndex < length; docIndex++) {
-      currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], 
values[docIndex]);
+      timeIndex = (int) (((timeValues[docIndex] + _timeOffset) - 
_timeReferencePoint - 1) / _timeBucketDivisor);
+      currentSeriesBuilder.addValueAtIndex(timeIndex, values[docIndex], 
timeValues[docIndex]);
     }
   }
 
-  private void aggregateGroupByNumericValues(int length, int[] groupKeyArray, 
int[] timeIndexes,
+  private void aggregateGroupByNumericValues(int length, int[] groupKeyArray, 
long[] timeValues,
       GroupByResultHolder resultHolder, BlockValSet blockValSet) {
     final double[] values = blockValSet.getDoubleValuesSV();
+    int timeIndex;
     for (int docIndex = 0; docIndex < length; docIndex++) {
       int groupId = groupKeyArray[docIndex];
       BaseTimeSeriesBuilder currentSeriesBuilder = 
resultHolder.getResult(groupId);
@@ -228,13 +268,15 @@ public class TimeSeriesAggregationFunction implements 
AggregationFunction<BaseTi
             BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, 
BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES);
         resultHolder.setValueForKey(groupId, currentSeriesBuilder);
       }
-      currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], 
values[docIndex]);
+      timeIndex = (int) (((timeValues[docIndex] + _timeOffset) - 
_timeReferencePoint - 1) / _timeBucketDivisor);
+      currentSeriesBuilder.addValueAtIndex(timeIndex, values[docIndex], 
timeValues[docIndex]);
     }
   }
 
-  private void aggregateGroupByStringValues(int length, int[] groupKeyArray, 
int[] timeIndexes,
+  private void aggregateGroupByStringValues(int length, int[] groupKeyArray, 
long[] timeValues,
       GroupByResultHolder resultHolder, BlockValSet blockValSet) {
     final String[] values = blockValSet.getStringValuesSV();
+    int timeIndex;
     for (int docIndex = 0; docIndex < length; docIndex++) {
       int groupId = groupKeyArray[docIndex];
       BaseTimeSeriesBuilder currentSeriesBuilder = 
resultHolder.getResult(groupId);
@@ -243,18 +285,21 @@ public class TimeSeriesAggregationFunction implements 
AggregationFunction<BaseTi
             BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, 
BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES);
         resultHolder.setValueForKey(groupId, currentSeriesBuilder);
       }
-      currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], 
values[docIndex]);
+      timeIndex = (int) (((timeValues[docIndex] + _timeOffset) - 
_timeReferencePoint - 1) / _timeBucketDivisor);
+      currentSeriesBuilder.addValueAtIndex(timeIndex, values[docIndex], 
timeValues[docIndex]);
     }
   }
 
   public static ExpressionContext create(String language, String 
valueExpressionStr, ExpressionContext timeExpression,
-    TimeBuckets timeBuckets, AggInfo aggInfo) {
+    TimeUnit timeUnit, long offsetSeconds, TimeBuckets timeBuckets, AggInfo 
aggInfo) {
     ExpressionContext valueExpression = 
RequestContextUtils.getExpression(valueExpressionStr);
     List<ExpressionContext> arguments = new ArrayList<>();
     arguments.add(ExpressionContext.forLiteral(Literal.stringValue(language)));
     
arguments.add(ExpressionContext.forLiteral(Literal.stringValue(aggInfo.getAggFunction())));
     arguments.add(valueExpression);
     arguments.add(timeExpression);
+    
arguments.add(ExpressionContext.forLiteral(Literal.stringValue(timeUnit.toString())));
+    
arguments.add(ExpressionContext.forLiteral(Literal.longValue(offsetSeconds)));
     
arguments.add(ExpressionContext.forLiteral(Literal.longValue(timeBuckets.getTimeBuckets()[0])));
     
arguments.add(ExpressionContext.forLiteral(Literal.longValue(timeBuckets.getBucketSize().getSeconds())));
     
arguments.add(ExpressionContext.forLiteral(Literal.intValue(timeBuckets.getNumBuckets())));
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index 04a908e697..095a1c0a89 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -47,7 +47,6 @@ import 
org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
 import org.apache.pinot.core.operator.timeseries.TimeSeriesOperatorUtils;
-import 
org.apache.pinot.core.operator.transform.function.TimeSeriesBucketTransformFunction;
 import 
org.apache.pinot.core.query.aggregation.function.TimeSeriesAggregationFunction;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -311,10 +310,9 @@ public class QueryExecutorTest {
       AggInfo aggInfo, List<String> groupBy) {
     List<ExpressionContext> groupByExpList = 
groupBy.stream().map(RequestContextUtils::getExpression)
         .collect(Collectors.toList());
-    ExpressionContext timeExpression = 
TimeSeriesBucketTransformFunction.create(TIME_SERIES_TIME_COL_NAME,
-        TimeUnit.SECONDS, timeBuckets, offsetSeconds);
+    ExpressionContext timeExpression = 
RequestContextUtils.getExpression(TIME_SERIES_TIME_COL_NAME);
     ExpressionContext aggregateExpr = 
TimeSeriesAggregationFunction.create(TIME_SERIES_LANGUAGE_NAME, valueExpression,
-        timeExpression, timeBuckets, aggInfo);
+        timeExpression, TimeUnit.SECONDS, offsetSeconds, timeBuckets, aggInfo);
     QueryContext.Builder builder = new QueryContext.Builder();
     builder.setTableName(OFFLINE_TABLE_NAME);
     builder.setAliasList(Collections.emptyList());
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java
index 72857d403b..27918be35f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java
@@ -30,7 +30,6 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.common.request.context.RequestContextUtils;
-import 
org.apache.pinot.core.operator.transform.function.TimeSeriesBucketTransformFunction;
 import 
org.apache.pinot.core.query.aggregation.function.TimeSeriesAggregationFunction;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -106,10 +105,10 @@ public class PhysicalTimeSeriesServerPlanVisitor {
     List<ExpressionContext> groupByExpressions = 
leafNode.getGroupByExpressions().stream()
         .map(RequestContextUtils::getExpression).collect(Collectors.toList());
     TimeBuckets timeBuckets = context.getInitialTimeBuckets();
-    ExpressionContext timeTransform = 
TimeSeriesBucketTransformFunction.create(leafNode.getTimeColumn(),
-        leafNode.getTimeUnit(), timeBuckets, leafNode.getOffsetSeconds() == 
null ? 0 : leafNode.getOffsetSeconds());
+    ExpressionContext rawTimeValuesInLong = 
RequestContextUtils.getExpression(leafNode.getTimeColumn());
     ExpressionContext aggregation = 
TimeSeriesAggregationFunction.create(context.getLanguage(),
-        leafNode.getValueExpression(), timeTransform, timeBuckets, 
leafNode.getAggInfo());
+        leafNode.getValueExpression(), rawTimeValuesInLong, 
leafNode.getTimeUnit(),
+        leafNode.getOffsetSeconds() == null ? 0 : leafNode.getOffsetSeconds(), 
timeBuckets, leafNode.getAggInfo());
     Map<String, String> queryOptions = new 
HashMap<>(leafNode.getQueryOptions());
     queryOptions.put(QueryOptionKey.TIMEOUT_MS, Long.toString(Math.max(0L, 
context.getRemainingTimeMs())));
     return new QueryContext.Builder()
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
index 84239e1217..0a5e4753fc 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
 
 
 /**
@@ -57,10 +58,26 @@ public abstract class BaseTimeSeriesBuilder {
 
   public abstract void addValueAtIndex(int timeBucketIndex, Double value);
 
+  /**
+   * This is the method called by Pinot's leaf stage to accumulate data in the 
series builders. Pinot's leaf stage
+   * passes the raw time value to allow languages to build complex series 
builders. For instance, PromQL relies on
+   * the first and last time value in each time bucket for certain functions.
+   * <p>
+   *   The rawTimeValue is in the same Time Unit as that passed to the {@link 
LeafTimeSeriesPlanNode}.
+   * </p>
+   */
+  public void addValueAtIndex(int timeBucketIndex, Double value, long 
rawTimeValue) {
+    addValueAtIndex(timeBucketIndex, value);
+  }
+
   public void addValueAtIndex(int timeBucketIndex, String value) {
     throw new UnsupportedOperationException("This aggregation function does 
not support string input");
   }
 
+  public void addValueAtIndex(int timeBucketIndex, String value, long 
rawTimeValue) {
+    addValueAtIndex(timeBucketIndex, value);
+  }
+
   public abstract void addValue(long timeValue, Double value);
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to