ankitsultana commented on code in PR #13999: URL: https://github.com/apache/pinot/pull/13999#discussion_r1762432674
########## pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java: ########## @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.timeseries; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.BaseProjectOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.blocks.ValueBlock; +import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; +import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; +import org.apache.pinot.tsdb.spi.series.TimeSeries; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; + + +/** + * Segment level operator which converts data in relational model to a time-series model, by aggregating on a + * configured aggregation expression. + */ +public class TimeSeriesAggregationOperator extends BaseOperator<TimeSeriesResultsBlock> { + private static final String EXPLAIN_NAME = "TIME_SERIES_AGGREGATION"; + private final String _timeColumn; + private final TimeUnit _storedTimeUnit; + private final Long _timeOffset; + private final AggInfo _aggInfo; + private final ExpressionContext _valueExpression; + private final List<String> _groupByExpressions; + private final BaseProjectOperator<? extends ValueBlock> _projectOperator; + private final TimeBuckets _timeBuckets; + private final TimeSeriesBuilderFactory _seriesBuilderFactory; + + public TimeSeriesAggregationOperator( + String timeColumn, + TimeUnit timeUnit, + Long timeOffset, + AggInfo aggInfo, + ExpressionContext valueExpression, + List<String> groupByExpressions, + TimeBuckets timeBuckets, + BaseProjectOperator<? extends ValueBlock> projectOperator, + TimeSeriesBuilderFactory seriesBuilderFactory) { + _timeColumn = timeColumn; + _storedTimeUnit = timeUnit; + _timeOffset = timeOffset; + _aggInfo = aggInfo; + _valueExpression = valueExpression; + _groupByExpressions = groupByExpressions; + _projectOperator = projectOperator; + _timeBuckets = timeBuckets; + _seriesBuilderFactory = seriesBuilderFactory; + } + + @Override + protected TimeSeriesResultsBlock getNextBlock() { + ValueBlock transformBlock = _projectOperator.nextBlock(); + BlockValSet blockValSet = transformBlock.getBlockValueSet(_timeColumn); + long[] timeValues = blockValSet.getLongValuesSV(); + if (_timeOffset != null && _timeOffset != 0L) { + timeValues = applyTimeshift(_timeOffset, timeValues); + } + int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit); + Object[][] tagValues = new Object[_groupByExpressions.size()][]; + Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>(1024); + for (int i = 0; i < _groupByExpressions.size(); i++) { + blockValSet = transformBlock.getBlockValueSet(_groupByExpressions.get(i)); + switch (blockValSet.getValueType()) { + case STRING: + tagValues[i] = blockValSet.getStringValuesSV(); + break; + case LONG: + tagValues[i] = ArrayUtils.toObject(blockValSet.getLongValuesSV()); + break; + default: + throw new NotImplementedException("Can't handle types other than string and long"); + } + } + BlockValSet valueExpressionBlockValSet = transformBlock.getBlockValueSet(_valueExpression); + switch (valueExpressionBlockValSet.getValueType()) { + case LONG: + processLongExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues); + break; + case INT: + processIntExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues); + break; + case DOUBLE: + processDoubleExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues); + break; + case STRING: + processStringExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues); + break; + default: + // TODO: Support other types? + throw new IllegalStateException( + "Don't yet support value expression of type: " + valueExpressionBlockValSet.getValueType()); + } + Map<Long, List<TimeSeries>> seriesMap = new HashMap<>(); + for (var entry : seriesBuilderMap.entrySet()) { + List<TimeSeries> seriesList = new ArrayList<>(); + seriesList.add(entry.getValue().build()); + seriesMap.put(entry.getKey(), seriesList); + } + return new TimeSeriesResultsBlock(new TimeSeriesBlock(_timeBuckets, seriesMap)); + } + + @Override + @SuppressWarnings("rawtypes") + public List<? extends Operator> getChildOperators() { + return ImmutableList.of(_projectOperator); + } + + @Nullable + @Override + public String toExplainString() { + return EXPLAIN_NAME; + } + + @Override + public ExecutionStatistics getExecutionStatistics() { + // TODO: Implement this. + return new ExecutionStatistics(0, 0, 0, 0); + } + + private int[] getTimeValueIndex(long[] actualTimeValues, TimeUnit timeUnit) { + if (timeUnit == TimeUnit.MILLISECONDS) { + return getTimeValueIndexMillis(actualTimeValues); + } + int[] timeIndexes = new int[actualTimeValues.length]; + for (int index = 0; index < actualTimeValues.length; index++) { + timeIndexes[index] = (int) ((actualTimeValues[index] - _timeBuckets.getStartTime()) + / _timeBuckets.getBucketSize().getSeconds()); + } + return timeIndexes; + } + + private int[] getTimeValueIndexMillis(long[] actualTimeValues) { + int[] timeIndexes = new int[actualTimeValues.length]; + for (int index = 0; index < actualTimeValues.length; index++) { + timeIndexes[index] = (int) ((actualTimeValues[index] - _timeBuckets.getStartTime() * 1000L) + / _timeBuckets.getBucketSize().toMillis()); + } + return timeIndexes; + } + + public void processLongExpression(BlockValSet blockValSet, Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap, + int[] timeValueIndexes, Object[][] tagValues) { + long[] valueColumnValues = blockValSet.getLongValuesSV(); + for (int docIdIndex = 0; docIdIndex < timeValueIndexes.length; docIdIndex++) { + Object[] tagValuesForDoc = new Object[_groupByExpressions.size()]; + for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) { + tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex]; + } + long hash = TimeSeries.hash(tagValuesForDoc); + seriesBuilderMap.computeIfAbsent(hash, + k -> _seriesBuilderFactory.newTimeSeriesBuilder(_aggInfo, Long.toString(hash), _timeBuckets, + _groupByExpressions, + tagValuesForDoc)) + .addValueAtIndex(timeValueIndexes[docIdIndex], (double) valueColumnValues[docIdIndex]); + } + } + + public void processIntExpression(BlockValSet blockValSet, Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap, + int[] timeValueIndexes, Object[][] tagValues) { + int[] valueColumnValues = blockValSet.getIntValuesSV(); + for (int docIdIndex = 0; docIdIndex < timeValueIndexes.length; docIdIndex++) { + Object[] tagValuesForDoc = new Object[_groupByExpressions.size()]; + for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) { + tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex]; + } + long hash = TimeSeries.hash(tagValuesForDoc); + seriesBuilderMap.computeIfAbsent(hash, + k -> _seriesBuilderFactory.newTimeSeriesBuilder(_aggInfo, Long.toString(hash), _timeBuckets, + _groupByExpressions, + tagValuesForDoc)) + .addValueAtIndex(timeValueIndexes[docIdIndex], (double) valueColumnValues[docIdIndex]); + } + } + + public void processDoubleExpression(BlockValSet blockValSet, Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap, + int[] timeValueIndexes, Object[][] tagValues) { + double[] valueColumnValues = blockValSet.getDoubleValuesSV(); + for (int docIdIndex = 0; docIdIndex < timeValueIndexes.length; docIdIndex++) { + Object[] tagValuesForDoc = new Object[_groupByExpressions.size()]; + for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) { + tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex]; + } + long hash = TimeSeries.hash(tagValuesForDoc); + seriesBuilderMap.computeIfAbsent(hash, + k -> _seriesBuilderFactory.newTimeSeriesBuilder(_aggInfo, Long.toString(hash), _timeBuckets, + _groupByExpressions, + tagValuesForDoc)) + .addValueAtIndex(timeValueIndexes[docIdIndex], valueColumnValues[docIdIndex]); + } + } + + public void processStringExpression(BlockValSet blockValSet, Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap, Review Comment: Yeah we could but only in the leaf stage. For instance, in M3 we would like to be able to support queries such as: ``` fetch table:foobar value:some_uuid_column | summarizeBy 1h countDistinct ... ``` This will essentially run a countDistinct in a 1 hour bucket on the `some_uuid_column`. This is a common use-case we have internally. This will require an extension to the current TimeSeries code though. Right now we have tied TimeSeries to Double[] values. We will make it generic in a subsequent PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org