This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 159fc6716e [timeseries] Add Support for Passing Raw Time Values to Leaf Stage (#15000) 159fc6716e is described below commit 159fc6716efa1cd50dfa9d7f59ab4c7092e38935 Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Thu Feb 6 23:22:31 2025 -0600 [timeseries] Add Support for Passing Raw Time Values to Leaf Stage (#15000) --- .../function/TimeSeriesAggregationFunction.java | 99 ++++++++++++++++------ .../core/query/executor/QueryExecutorTest.java | 6 +- .../PhysicalTimeSeriesServerPlanVisitor.java | 7 +- .../tsdb/spi/series/BaseTimeSeriesBuilder.java | 17 ++++ 4 files changed, 94 insertions(+), 35 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java index 79feb866cb..239fa4a556 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java @@ -24,7 +24,9 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.pinot.common.request.Literal; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FunctionContext; @@ -44,8 +46,33 @@ import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider; /** - * Aggregation function used by the Time Series Engine. - * TODO: This can't be used with SQL because the Object Serde is not implemented. + * Aggregation function used by the Time Series Engine. This can't be used with SQL because the Object Serde is not yet + * implemented. Though we don't plan on exposing this function anytime soon. + * <h2>Converting Time Values to Bucket Indexes</h2> + * <p> + * This aggregation function will map each scanned data point to a time bucket index. This is done using the + * formula: {@code ((timeValue + timeOffset) - timeReferencePoint - 1) / bucketSize}. The entire calculation is done + * in the Time Unit (seconds, ms, etc.) of the timeValue returned by the time expression chosen by the user. + * The method used to add values to the series builders is: + * {@link BaseTimeSeriesBuilder#addValueAtIndex(int, Double, long)}. + * </p> + * <p> + * The formula originates from the fact that we use half-open time intervals, which are open on the left. + * The timeReferencePoint is usually the start of the time-range being scanned. Assuming everything is in seconds, + * the time buckets can generally thought to look something like the following: + * <pre> + * (timeReferencePoint, timeReferencePoint + bucketSize] + * (timeReferencePoint + bucketSize, timeReferencePoint + 2 * bucketSize] + * ... + * (timeReferencePoint + (numBuckets - 1) * bucketSize, timeReferencePoint + numBuckets * bucketSize] + * </pre> + * </p> + * <p> + * Also, note that the timeReferencePoint is simply calculated as follows: + * <pre> + * timeReferencePointInSeconds = firstBucketValue - bucketSizeInSeconds + * </pre> + * </p> */ public class TimeSeriesAggregationFunction implements AggregationFunction<BaseTimeSeriesBuilder, DoubleArrayList> { private final TimeSeriesBuilderFactory _factory; @@ -53,32 +80,40 @@ public class TimeSeriesAggregationFunction implements AggregationFunction<BaseTi private final ExpressionContext _valueExpression; private final ExpressionContext _timeExpression; private final TimeBuckets _timeBuckets; + private final long _timeReferencePoint; + private final long _timeOffset; + private final long _timeBucketDivisor; /** * Arguments are as shown below: * <pre> - * timeSeriesAggregate("m3ql", "MIN", valueExpr, timeBucketExpr, firstBucketValue, bucketLenSeconds, numBuckets, - * "aggParam1=value1") + * timeSeriesAggregate("m3ql", "MIN", valueExpr, timeExpr, timeUnit, offsetSeconds, firstBucketValue, + * bucketLenSeconds, numBuckets, "aggParam1=value1") * </pre> */ public TimeSeriesAggregationFunction(List<ExpressionContext> arguments) { - // Initialize everything - Preconditions.checkArgument(arguments.size() == 8, "Expected 8 arguments for time-series agg"); + // Initialize temporary variables. + Preconditions.checkArgument(arguments.size() == 10, "Expected 10 arguments for time-series agg"); String language = arguments.get(0).getLiteral().getStringValue(); String aggFunctionName = arguments.get(1).getLiteral().getStringValue(); ExpressionContext valueExpression = arguments.get(2); - ExpressionContext bucketIndexReturningExpr = arguments.get(3); - long firstBucketValue = arguments.get(4).getLiteral().getLongValue(); - long bucketWindowSeconds = arguments.get(5).getLiteral().getLongValue(); - int numBuckets = arguments.get(6).getLiteral().getIntValue(); - Map<String, String> aggParams = AggInfo.deserializeParams(arguments.get(7).getLiteral().getStringValue()); + ExpressionContext timeExpression = arguments.get(3); + TimeUnit timeUnit = TimeUnit.valueOf(arguments.get(4).getLiteral().getStringValue().toUpperCase(Locale.ENGLISH)); + long offsetSeconds = arguments.get(5).getLiteral().getLongValue(); + long firstBucketValue = arguments.get(6).getLiteral().getLongValue(); + long bucketWindowSeconds = arguments.get(7).getLiteral().getLongValue(); + int numBuckets = arguments.get(8).getLiteral().getIntValue(); + Map<String, String> aggParams = AggInfo.deserializeParams(arguments.get(9).getLiteral().getStringValue()); AggInfo aggInfo = new AggInfo(aggFunctionName, true /* is partial agg */, aggParams); // Set all values _factory = TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(language); _valueExpression = valueExpression; - _timeExpression = bucketIndexReturningExpr; + _timeExpression = timeExpression; _timeBuckets = TimeBuckets.ofSeconds(firstBucketValue, Duration.ofSeconds(bucketWindowSeconds), numBuckets); _aggInfo = aggInfo; + _timeReferencePoint = timeUnit.convert(Duration.ofSeconds(firstBucketValue - bucketWindowSeconds)); + _timeOffset = timeUnit.convert(Duration.ofSeconds(offsetSeconds)); + _timeBucketDivisor = timeUnit.convert(_timeBuckets.getBucketSize()); } @Override @@ -109,16 +144,16 @@ public class TimeSeriesAggregationFunction implements AggregationFunction<BaseTi @Override public void aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<ExpressionContext, BlockValSet> blockValSetMap) { - int[] timeIndexes = blockValSetMap.get(_timeExpression).getIntValuesSV(); + final long[] timeValues = blockValSetMap.get(_timeExpression).getLongValuesSV(); BlockValSet valueBlockValSet = blockValSetMap.get(_valueExpression); switch (valueBlockValSet.getValueType()) { case DOUBLE: case LONG: case INT: - aggregateNumericValues(length, timeIndexes, aggregationResultHolder, valueBlockValSet); + aggregateNumericValues(length, timeValues, aggregationResultHolder, valueBlockValSet); break; case STRING: - aggregateStringValues(length, timeIndexes, aggregationResultHolder, valueBlockValSet); + aggregateStringValues(length, timeValues, aggregationResultHolder, valueBlockValSet); break; default: throw new UnsupportedOperationException(String.format("Unsupported type: %s in aggregate", @@ -129,16 +164,16 @@ public class TimeSeriesAggregationFunction implements AggregationFunction<BaseTi @Override public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map<ExpressionContext, BlockValSet> blockValSetMap) { - final int[] timeIndexes = blockValSetMap.get(_timeExpression).getIntValuesSV(); + final long[] timeValues = blockValSetMap.get(_timeExpression).getLongValuesSV(); BlockValSet valueBlockValSet = blockValSetMap.get(_valueExpression); switch (valueBlockValSet.getValueType()) { case DOUBLE: case LONG: case INT: - aggregateGroupByNumericValues(length, groupKeyArray, timeIndexes, groupByResultHolder, valueBlockValSet); + aggregateGroupByNumericValues(length, groupKeyArray, timeValues, groupByResultHolder, valueBlockValSet); break; case STRING: - aggregateGroupByStringValues(length, groupKeyArray, timeIndexes, groupByResultHolder, valueBlockValSet); + aggregateGroupByStringValues(length, groupKeyArray, timeValues, groupByResultHolder, valueBlockValSet); break; default: throw new UnsupportedOperationException(String.format("Unsupported type: %s in aggregate", @@ -189,7 +224,7 @@ public class TimeSeriesAggregationFunction implements AggregationFunction<BaseTi return "TIME_SERIES"; } - private void aggregateNumericValues(int length, int[] timeIndexes, AggregationResultHolder resultHolder, + private void aggregateNumericValues(int length, long[] timeValues, AggregationResultHolder resultHolder, BlockValSet blockValSet) { double[] values = blockValSet.getDoubleValuesSV(); BaseTimeSeriesBuilder currentSeriesBuilder = resultHolder.getResult(); @@ -198,12 +233,14 @@ public class TimeSeriesAggregationFunction implements AggregationFunction<BaseTi BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES); resultHolder.setValue(currentSeriesBuilder); } + int timeIndex; for (int docIndex = 0; docIndex < length; docIndex++) { - currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]); + timeIndex = (int) (((timeValues[docIndex] + _timeOffset) - _timeReferencePoint - 1) / _timeBucketDivisor); + currentSeriesBuilder.addValueAtIndex(timeIndex, values[docIndex], timeValues[docIndex]); } } - private void aggregateStringValues(int length, int[] timeIndexes, AggregationResultHolder resultHolder, + private void aggregateStringValues(int length, long[] timeValues, AggregationResultHolder resultHolder, BlockValSet blockValSet) { String[] values = blockValSet.getStringValuesSV(); BaseTimeSeriesBuilder currentSeriesBuilder = resultHolder.getResult(); @@ -212,14 +249,17 @@ public class TimeSeriesAggregationFunction implements AggregationFunction<BaseTi BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES); resultHolder.setValue(currentSeriesBuilder); } + int timeIndex; for (int docIndex = 0; docIndex < length; docIndex++) { - currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]); + timeIndex = (int) (((timeValues[docIndex] + _timeOffset) - _timeReferencePoint - 1) / _timeBucketDivisor); + currentSeriesBuilder.addValueAtIndex(timeIndex, values[docIndex], timeValues[docIndex]); } } - private void aggregateGroupByNumericValues(int length, int[] groupKeyArray, int[] timeIndexes, + private void aggregateGroupByNumericValues(int length, int[] groupKeyArray, long[] timeValues, GroupByResultHolder resultHolder, BlockValSet blockValSet) { final double[] values = blockValSet.getDoubleValuesSV(); + int timeIndex; for (int docIndex = 0; docIndex < length; docIndex++) { int groupId = groupKeyArray[docIndex]; BaseTimeSeriesBuilder currentSeriesBuilder = resultHolder.getResult(groupId); @@ -228,13 +268,15 @@ public class TimeSeriesAggregationFunction implements AggregationFunction<BaseTi BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES); resultHolder.setValueForKey(groupId, currentSeriesBuilder); } - currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]); + timeIndex = (int) (((timeValues[docIndex] + _timeOffset) - _timeReferencePoint - 1) / _timeBucketDivisor); + currentSeriesBuilder.addValueAtIndex(timeIndex, values[docIndex], timeValues[docIndex]); } } - private void aggregateGroupByStringValues(int length, int[] groupKeyArray, int[] timeIndexes, + private void aggregateGroupByStringValues(int length, int[] groupKeyArray, long[] timeValues, GroupByResultHolder resultHolder, BlockValSet blockValSet) { final String[] values = blockValSet.getStringValuesSV(); + int timeIndex; for (int docIndex = 0; docIndex < length; docIndex++) { int groupId = groupKeyArray[docIndex]; BaseTimeSeriesBuilder currentSeriesBuilder = resultHolder.getResult(groupId); @@ -243,18 +285,21 @@ public class TimeSeriesAggregationFunction implements AggregationFunction<BaseTi BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES, BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES); resultHolder.setValueForKey(groupId, currentSeriesBuilder); } - currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex], values[docIndex]); + timeIndex = (int) (((timeValues[docIndex] + _timeOffset) - _timeReferencePoint - 1) / _timeBucketDivisor); + currentSeriesBuilder.addValueAtIndex(timeIndex, values[docIndex], timeValues[docIndex]); } } public static ExpressionContext create(String language, String valueExpressionStr, ExpressionContext timeExpression, - TimeBuckets timeBuckets, AggInfo aggInfo) { + TimeUnit timeUnit, long offsetSeconds, TimeBuckets timeBuckets, AggInfo aggInfo) { ExpressionContext valueExpression = RequestContextUtils.getExpression(valueExpressionStr); List<ExpressionContext> arguments = new ArrayList<>(); arguments.add(ExpressionContext.forLiteral(Literal.stringValue(language))); arguments.add(ExpressionContext.forLiteral(Literal.stringValue(aggInfo.getAggFunction()))); arguments.add(valueExpression); arguments.add(timeExpression); + arguments.add(ExpressionContext.forLiteral(Literal.stringValue(timeUnit.toString()))); + arguments.add(ExpressionContext.forLiteral(Literal.longValue(offsetSeconds))); arguments.add(ExpressionContext.forLiteral(Literal.longValue(timeBuckets.getTimeBuckets()[0]))); arguments.add(ExpressionContext.forLiteral(Literal.longValue(timeBuckets.getBucketSize().getSeconds()))); arguments.add(ExpressionContext.forLiteral(Literal.intValue(timeBuckets.getNumBuckets()))); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java index 04a908e697..095a1c0a89 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java @@ -47,7 +47,6 @@ import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; import org.apache.pinot.core.operator.timeseries.TimeSeriesOperatorUtils; -import org.apache.pinot.core.operator.transform.function.TimeSeriesBucketTransformFunction; import org.apache.pinot.core.query.aggregation.function.TimeSeriesAggregationFunction; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.request.context.QueryContext; @@ -311,10 +310,9 @@ public class QueryExecutorTest { AggInfo aggInfo, List<String> groupBy) { List<ExpressionContext> groupByExpList = groupBy.stream().map(RequestContextUtils::getExpression) .collect(Collectors.toList()); - ExpressionContext timeExpression = TimeSeriesBucketTransformFunction.create(TIME_SERIES_TIME_COL_NAME, - TimeUnit.SECONDS, timeBuckets, offsetSeconds); + ExpressionContext timeExpression = RequestContextUtils.getExpression(TIME_SERIES_TIME_COL_NAME); ExpressionContext aggregateExpr = TimeSeriesAggregationFunction.create(TIME_SERIES_LANGUAGE_NAME, valueExpression, - timeExpression, timeBuckets, aggInfo); + timeExpression, TimeUnit.SECONDS, offsetSeconds, timeBuckets, aggInfo); QueryContext.Builder builder = new QueryContext.Builder(); builder.setTableName(OFFLINE_TABLE_NAME); builder.setAliasList(Collections.emptyList()); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java index 72857d403b..27918be35f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java @@ -30,7 +30,6 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FilterContext; import org.apache.pinot.common.request.context.RequestContextUtils; -import org.apache.pinot.core.operator.transform.function.TimeSeriesBucketTransformFunction; import org.apache.pinot.core.query.aggregation.function.TimeSeriesAggregationFunction; import org.apache.pinot.core.query.executor.QueryExecutor; import org.apache.pinot.core.query.request.ServerQueryRequest; @@ -106,10 +105,10 @@ public class PhysicalTimeSeriesServerPlanVisitor { List<ExpressionContext> groupByExpressions = leafNode.getGroupByExpressions().stream() .map(RequestContextUtils::getExpression).collect(Collectors.toList()); TimeBuckets timeBuckets = context.getInitialTimeBuckets(); - ExpressionContext timeTransform = TimeSeriesBucketTransformFunction.create(leafNode.getTimeColumn(), - leafNode.getTimeUnit(), timeBuckets, leafNode.getOffsetSeconds() == null ? 0 : leafNode.getOffsetSeconds()); + ExpressionContext rawTimeValuesInLong = RequestContextUtils.getExpression(leafNode.getTimeColumn()); ExpressionContext aggregation = TimeSeriesAggregationFunction.create(context.getLanguage(), - leafNode.getValueExpression(), timeTransform, timeBuckets, leafNode.getAggInfo()); + leafNode.getValueExpression(), rawTimeValuesInLong, leafNode.getTimeUnit(), + leafNode.getOffsetSeconds() == null ? 0 : leafNode.getOffsetSeconds(), timeBuckets, leafNode.getAggInfo()); Map<String, String> queryOptions = new HashMap<>(leafNode.getQueryOptions()); queryOptions.put(QueryOptionKey.TIMEOUT_MS, Long.toString(Math.max(0L, context.getRemainingTimeMs()))); return new QueryContext.Builder() diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java index 84239e1217..0a5e4753fc 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; /** @@ -57,10 +58,26 @@ public abstract class BaseTimeSeriesBuilder { public abstract void addValueAtIndex(int timeBucketIndex, Double value); + /** + * This is the method called by Pinot's leaf stage to accumulate data in the series builders. Pinot's leaf stage + * passes the raw time value to allow languages to build complex series builders. For instance, PromQL relies on + * the first and last time value in each time bucket for certain functions. + * <p> + * The rawTimeValue is in the same Time Unit as that passed to the {@link LeafTimeSeriesPlanNode}. + * </p> + */ + public void addValueAtIndex(int timeBucketIndex, Double value, long rawTimeValue) { + addValueAtIndex(timeBucketIndex, value); + } + public void addValueAtIndex(int timeBucketIndex, String value) { throw new UnsupportedOperationException("This aggregation function does not support string input"); } + public void addValueAtIndex(int timeBucketIndex, String value, long rawTimeValue) { + addValueAtIndex(timeBucketIndex, value); + } + public abstract void addValue(long timeValue, Double value); /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org