raghavyadav01 commented on code in PR #13999: URL: https://github.com/apache/pinot/pull/13999#discussion_r1761553293
########## pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java: ########## @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.request.context; + +import java.util.concurrent.TimeUnit; +import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.TimeBuckets; + + +public class TimeSeriesContext { + private final String _engine; + private final String _timeColumn; + private final TimeUnit _timeUnit; + private final TimeBuckets _timeBuckets; + private final Long _offsetSeconds; + private final ExpressionContext _valueExpression; Review Comment: What are the expressions allowed for valueExpression? I though this is the name of the value column. ########## pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java: ########## @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.timeseries; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.BaseProjectOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.blocks.ValueBlock; +import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; +import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; +import org.apache.pinot.tsdb.spi.series.TimeSeries; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; + + +/** + * Segment level operator which converts data in relational model to a time-series model, by aggregating on a + * configured aggregation expression. + */ +public class TimeSeriesAggregationOperator extends BaseOperator<TimeSeriesResultsBlock> { + private static final String EXPLAIN_NAME = "TIME_SERIES_AGGREGATION"; + private final String _timeColumn; + private final TimeUnit _storedTimeUnit; + private final Long _timeOffset; + private final AggInfo _aggInfo; + private final ExpressionContext _valueExpression; + private final List<String> _groupByExpressions; + private final BaseProjectOperator<? extends ValueBlock> _projectOperator; + private final TimeBuckets _timeBuckets; + private final TimeSeriesBuilderFactory _seriesBuilderFactory; + + public TimeSeriesAggregationOperator( + String timeColumn, + TimeUnit timeUnit, + Long timeOffset, + AggInfo aggInfo, + ExpressionContext valueExpression, + List<String> groupByExpressions, + TimeBuckets timeBuckets, + BaseProjectOperator<? extends ValueBlock> projectOperator, + TimeSeriesBuilderFactory seriesBuilderFactory) { + _timeColumn = timeColumn; + _storedTimeUnit = timeUnit; + _timeOffset = timeOffset; + _aggInfo = aggInfo; + _valueExpression = valueExpression; + _groupByExpressions = groupByExpressions; + _projectOperator = projectOperator; + _timeBuckets = timeBuckets; + _seriesBuilderFactory = seriesBuilderFactory; + } + + @Override + protected TimeSeriesResultsBlock getNextBlock() { + ValueBlock transformBlock = _projectOperator.nextBlock(); + BlockValSet blockValSet = transformBlock.getBlockValueSet(_timeColumn); + long[] timeValues = blockValSet.getLongValuesSV(); + if (_timeOffset != null && _timeOffset != 0L) { + timeValues = applyTimeshift(_timeOffset, timeValues); + } + int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit); + Object[][] tagValues = new Object[_groupByExpressions.size()][]; Review Comment: We are thinking to store Lables in promtheus messages as json blobs. To filter the specific label values groupByExpression will be used. Will this work for such columns? http_requests_total{status="200", method="GET"} ########## pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java: ########## @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.timeseries; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.BaseProjectOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.blocks.ValueBlock; +import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; +import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; +import org.apache.pinot.tsdb.spi.series.TimeSeries; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; + + +/** + * Segment level operator which converts data in relational model to a time-series model, by aggregating on a + * configured aggregation expression. + */ +public class TimeSeriesAggregationOperator extends BaseOperator<TimeSeriesResultsBlock> { + private static final String EXPLAIN_NAME = "TIME_SERIES_AGGREGATION"; + private final String _timeColumn; + private final TimeUnit _storedTimeUnit; + private final Long _timeOffset; + private final AggInfo _aggInfo; + private final ExpressionContext _valueExpression; + private final List<String> _groupByExpressions; + private final BaseProjectOperator<? extends ValueBlock> _projectOperator; + private final TimeBuckets _timeBuckets; + private final TimeSeriesBuilderFactory _seriesBuilderFactory; + + public TimeSeriesAggregationOperator( + String timeColumn, + TimeUnit timeUnit, + Long timeOffset, + AggInfo aggInfo, + ExpressionContext valueExpression, + List<String> groupByExpressions, + TimeBuckets timeBuckets, + BaseProjectOperator<? extends ValueBlock> projectOperator, + TimeSeriesBuilderFactory seriesBuilderFactory) { + _timeColumn = timeColumn; + _storedTimeUnit = timeUnit; + _timeOffset = timeOffset; + _aggInfo = aggInfo; + _valueExpression = valueExpression; + _groupByExpressions = groupByExpressions; + _projectOperator = projectOperator; + _timeBuckets = timeBuckets; + _seriesBuilderFactory = seriesBuilderFactory; + } + + @Override + protected TimeSeriesResultsBlock getNextBlock() { + ValueBlock transformBlock = _projectOperator.nextBlock(); + BlockValSet blockValSet = transformBlock.getBlockValueSet(_timeColumn); + long[] timeValues = blockValSet.getLongValuesSV(); + if (_timeOffset != null && _timeOffset != 0L) { + timeValues = applyTimeshift(_timeOffset, timeValues); + } + int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit); + Object[][] tagValues = new Object[_groupByExpressions.size()][]; + Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>(1024); + for (int i = 0; i < _groupByExpressions.size(); i++) { + blockValSet = transformBlock.getBlockValueSet(_groupByExpressions.get(i)); + switch (blockValSet.getValueType()) { + case STRING: + tagValues[i] = blockValSet.getStringValuesSV(); + break; + case LONG: + tagValues[i] = ArrayUtils.toObject(blockValSet.getLongValuesSV()); + break; + default: + throw new NotImplementedException("Can't handle types other than string and long"); + } + } + BlockValSet valueExpressionBlockValSet = transformBlock.getBlockValueSet(_valueExpression); + switch (valueExpressionBlockValSet.getValueType()) { + case LONG: + processLongExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues); + break; + case INT: + processIntExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues); + break; + case DOUBLE: + processDoubleExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues); + break; + case STRING: + processStringExpression(valueExpressionBlockValSet, seriesBuilderMap, timeValueIndexes, tagValues); + break; + default: + // TODO: Support other types? + throw new IllegalStateException( + "Don't yet support value expression of type: " + valueExpressionBlockValSet.getValueType()); + } + Map<Long, List<TimeSeries>> seriesMap = new HashMap<>(); + for (var entry : seriesBuilderMap.entrySet()) { + List<TimeSeries> seriesList = new ArrayList<>(); + seriesList.add(entry.getValue().build()); + seriesMap.put(entry.getKey(), seriesList); + } + return new TimeSeriesResultsBlock(new TimeSeriesBlock(_timeBuckets, seriesMap)); + } + + @Override + @SuppressWarnings("rawtypes") + public List<? extends Operator> getChildOperators() { + return ImmutableList.of(_projectOperator); + } + + @Nullable + @Override + public String toExplainString() { + return EXPLAIN_NAME; + } + + @Override + public ExecutionStatistics getExecutionStatistics() { + // TODO: Implement this. + return new ExecutionStatistics(0, 0, 0, 0); + } + + private int[] getTimeValueIndex(long[] actualTimeValues, TimeUnit timeUnit) { + if (timeUnit == TimeUnit.MILLISECONDS) { + return getTimeValueIndexMillis(actualTimeValues); + } + int[] timeIndexes = new int[actualTimeValues.length]; + for (int index = 0; index < actualTimeValues.length; index++) { + timeIndexes[index] = (int) ((actualTimeValues[index] - _timeBuckets.getStartTime()) + / _timeBuckets.getBucketSize().getSeconds()); + } + return timeIndexes; + } + + private int[] getTimeValueIndexMillis(long[] actualTimeValues) { + int[] timeIndexes = new int[actualTimeValues.length]; + for (int index = 0; index < actualTimeValues.length; index++) { + timeIndexes[index] = (int) ((actualTimeValues[index] - _timeBuckets.getStartTime() * 1000L) + / _timeBuckets.getBucketSize().toMillis()); + } + return timeIndexes; + } + + public void processLongExpression(BlockValSet blockValSet, Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap, + int[] timeValueIndexes, Object[][] tagValues) { + long[] valueColumnValues = blockValSet.getLongValuesSV(); + for (int docIdIndex = 0; docIdIndex < timeValueIndexes.length; docIdIndex++) { + Object[] tagValuesForDoc = new Object[_groupByExpressions.size()]; + for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) { + tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex]; + } + long hash = TimeSeries.hash(tagValuesForDoc); + seriesBuilderMap.computeIfAbsent(hash, + k -> _seriesBuilderFactory.newTimeSeriesBuilder(_aggInfo, Long.toString(hash), _timeBuckets, + _groupByExpressions, + tagValuesForDoc)) + .addValueAtIndex(timeValueIndexes[docIdIndex], (double) valueColumnValues[docIdIndex]); + } + } + + public void processIntExpression(BlockValSet blockValSet, Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap, + int[] timeValueIndexes, Object[][] tagValues) { + int[] valueColumnValues = blockValSet.getIntValuesSV(); + for (int docIdIndex = 0; docIdIndex < timeValueIndexes.length; docIdIndex++) { + Object[] tagValuesForDoc = new Object[_groupByExpressions.size()]; + for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) { + tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex]; + } + long hash = TimeSeries.hash(tagValuesForDoc); + seriesBuilderMap.computeIfAbsent(hash, + k -> _seriesBuilderFactory.newTimeSeriesBuilder(_aggInfo, Long.toString(hash), _timeBuckets, + _groupByExpressions, + tagValuesForDoc)) + .addValueAtIndex(timeValueIndexes[docIdIndex], (double) valueColumnValues[docIdIndex]); + } + } + + public void processDoubleExpression(BlockValSet blockValSet, Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap, + int[] timeValueIndexes, Object[][] tagValues) { + double[] valueColumnValues = blockValSet.getDoubleValuesSV(); + for (int docIdIndex = 0; docIdIndex < timeValueIndexes.length; docIdIndex++) { + Object[] tagValuesForDoc = new Object[_groupByExpressions.size()]; + for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) { + tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex]; + } + long hash = TimeSeries.hash(tagValuesForDoc); + seriesBuilderMap.computeIfAbsent(hash, + k -> _seriesBuilderFactory.newTimeSeriesBuilder(_aggInfo, Long.toString(hash), _timeBuckets, + _groupByExpressions, + tagValuesForDoc)) + .addValueAtIndex(timeValueIndexes[docIdIndex], valueColumnValues[docIdIndex]); + } + } + + public void processStringExpression(BlockValSet blockValSet, Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap, Review Comment: Will we have time series value as string? ########## pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java: ########## @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.timeseries; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.BaseProjectOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.blocks.ValueBlock; +import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; +import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; +import org.apache.pinot.tsdb.spi.series.TimeSeries; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; + + +/** + * Segment level operator which converts data in relational model to a time-series model, by aggregating on a + * configured aggregation expression. + */ +public class TimeSeriesAggregationOperator extends BaseOperator<TimeSeriesResultsBlock> { + private static final String EXPLAIN_NAME = "TIME_SERIES_AGGREGATION"; + private final String _timeColumn; + private final TimeUnit _storedTimeUnit; + private final Long _timeOffset; + private final AggInfo _aggInfo; + private final ExpressionContext _valueExpression; + private final List<String> _groupByExpressions; + private final BaseProjectOperator<? extends ValueBlock> _projectOperator; + private final TimeBuckets _timeBuckets; + private final TimeSeriesBuilderFactory _seriesBuilderFactory; + + public TimeSeriesAggregationOperator( + String timeColumn, + TimeUnit timeUnit, + Long timeOffset, + AggInfo aggInfo, + ExpressionContext valueExpression, + List<String> groupByExpressions, + TimeBuckets timeBuckets, + BaseProjectOperator<? extends ValueBlock> projectOperator, + TimeSeriesBuilderFactory seriesBuilderFactory) { + _timeColumn = timeColumn; + _storedTimeUnit = timeUnit; + _timeOffset = timeOffset; + _aggInfo = aggInfo; + _valueExpression = valueExpression; + _groupByExpressions = groupByExpressions; + _projectOperator = projectOperator; + _timeBuckets = timeBuckets; + _seriesBuilderFactory = seriesBuilderFactory; + } + + @Override + protected TimeSeriesResultsBlock getNextBlock() { + ValueBlock transformBlock = _projectOperator.nextBlock(); + BlockValSet blockValSet = transformBlock.getBlockValueSet(_timeColumn); + long[] timeValues = blockValSet.getLongValuesSV(); + if (_timeOffset != null && _timeOffset != 0L) { + timeValues = applyTimeshift(_timeOffset, timeValues); + } + int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit); + Object[][] tagValues = new Object[_groupByExpressions.size()][]; + Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>(1024); Review Comment: Why hard coded 1k length ? can it overflow ? ########## pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java: ########## @@ -441,7 +444,15 @@ private InstanceResponseBlock executeInternal(TableDataManager tableDataManager, if (queryContext.isExplain()) { instanceResponse = getExplainResponseForNoMatchingSegment(numTotalSegments, queryContext); } else { - instanceResponse = new InstanceResponseBlock(ResultsBlockUtils.buildEmptyQueryResults(queryContext)); + if (QueryContextUtils.isTimeSeriesQuery(queryContext)) { + // TODO: handle invalid segments Review Comment: What will happen now if we get invalid segments? In what cases invalid segments can reach here? ########## pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java: ########## @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine.merger; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; +import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; +import org.apache.pinot.tsdb.spi.series.TimeSeries; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; + + +public class TimeSeriesAggResultsBlockMerger implements ResultsBlockMerger<TimeSeriesResultsBlock> { + private final TimeSeriesBuilderFactory _seriesBuilderFactory; + private final AggInfo _aggInfo; + + public TimeSeriesAggResultsBlockMerger(TimeSeriesBuilderFactory seriesBuilderFactory, AggInfo aggInfo) { + _seriesBuilderFactory = seriesBuilderFactory; + _aggInfo = aggInfo; + } + + @Override + public void mergeResultsBlocks(TimeSeriesResultsBlock mergedBlock, TimeSeriesResultsBlock blockToMerge) { + TimeSeriesBlock currentTimeSeriesBlock = mergedBlock.getTimeSeriesBlock(); + TimeSeriesBlock seriesBlockToMerge = blockToMerge.getTimeSeriesBlock(); + for (var entry : seriesBlockToMerge.getSeriesMap().entrySet()) { + long seriesHash = entry.getKey(); + List<TimeSeries> currentTimeSeriesList = currentTimeSeriesBlock.getSeriesMap().get(seriesHash); + TimeSeries currentTimeSeries = null; + if (currentTimeSeriesList != null && !currentTimeSeriesList.isEmpty()) { + currentTimeSeries = currentTimeSeriesList.get(0); + } + TimeSeries newTimeSeriesToMerge = entry.getValue().get(0); + if (currentTimeSeries == null) { Review Comment: Can you please explain currentTimeSeries == null condition? If it is null then why are we adding a new entry? ########## pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java: ########## @@ -107,6 +107,28 @@ public ServerQueryRequest(Server.ServerRequest serverRequest, ServerMetrics serv _timerContext = new TimerContext(_queryContext.getTableName(), serverMetrics, queryArrivalTimeMs); } + /** + * To be used by Time Series Query Engine. + */ + public ServerQueryRequest(QueryContext queryContext, List<String> segmentsToQuery, Map<String, String> metadata, + ServerMetrics serverMetrics) { + long queryArrivalTimeMs = System.currentTimeMillis(); + _queryContext = queryContext; + + // Initialize metadata + _requestId = Long.parseLong(metadata.getOrDefault(Request.MetadataKeys.REQUEST_ID, "0")); + _brokerId = metadata.getOrDefault(Request.MetadataKeys.BROKER_ID, "unknown"); + _enableTrace = Boolean.parseBoolean(metadata.getOrDefault(Request.MetadataKeys.ENABLE_TRACE, "false")); + _enableStreaming = Boolean.parseBoolean(metadata.getOrDefault(Request.MetadataKeys.ENABLE_STREAMING, "false")); + _queryId = QueryIdUtils.getQueryId(_brokerId, _requestId, + TableNameBuilder.getTableTypeFromTableName(_queryContext.getTableName())); + + _segmentsToQuery = segmentsToQuery; + _optionalSegments = null; + + _timerContext = new TimerContext(_queryContext.getTableName(), serverMetrics, queryArrivalTimeMs); Review Comment: We should be able to use existing pinot server metrics to troubleshoot any latency or other issues in time series engine, correct ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org