Jackie-Jiang commented on a change in pull request #7781: URL: https://github.com/apache/pinot/pull/7781#discussion_r751816491
########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java ########## @@ -198,6 +202,18 @@ public static TransformFunction get(ExpressionContext expression, Map<String, Da return get(null, expression, dataSourceMap); } + public static ExpressionContext stripGapfill(ExpressionContext expression) { Review comment: I feel this util method does not belong to this class. Suggest adding a util class for gapfill, and put all the shared constants and methods there ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java ########## @@ -59,6 +59,10 @@ * Factory class for transformation functions. */ public class TransformFunctionFactory { + + private static final String AGGREGATE_GAP_FILL = "aggregategapfill"; Review comment: Any specific reason naming it `aggregategapfill` instead of `gapfill`? ########## File path: pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java ########## @@ -223,8 +223,11 @@ private BrokerResponseNative getBrokerResponse(QueryContext queryContext, PlanMa byte[] serializedResponse = instanceResponse.toBytes(); dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.OFFLINE), DataTableFactory.getDataTable(serializedResponse)); - dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.REALTIME), - DataTableFactory.getDataTable(serializedResponse)); + // skip creating the realtime table for gapfill test case. + if (!queryContext.isAggregateGapfill()) { Review comment: This should not be required. This test won't query the realtime table, but just mimic the behavior of hitting 2 servers ########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java ########## @@ -339,6 +339,43 @@ public static DataTable getDataTableFromRows(Collection<Object[]> rows, DataSche return dataTableBuilder.build(); } + /** + * The default value for each column type. + */ + public static Serializable getDefaultValue(ColumnDataType dataType) { Review comment: These formatters are already deprecated (used for the deprecated pql query response). I think you can put this as a private method in `GapFillGroupByDataTableReducer` because the default value might not be the same for different queries. You may refer to `DateSchema.ColumnDataType.convertAndFormat()` for the expected return type for each column data type ########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java ########## @@ -0,0 +1,706 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.reduce; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.metrics.BrokerGauge; +import org.apache.pinot.common.metrics.BrokerMeter; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FilterContext; +import org.apache.pinot.common.response.broker.AggregationResult; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.GroupByResult; +import org.apache.pinot.common.response.broker.QueryProcessingException; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.data.table.ConcurrentIndexedTable; +import org.apache.pinot.core.data.table.IndexedTable; +import org.apache.pinot.core.data.table.Key; +import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.core.data.table.SimpleIndexedTable; +import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable; +import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; +import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByTrimmingService; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.selection.SelectionOperatorUtils; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.core.util.GroupByUtils; +import org.apache.pinot.core.util.QueryOptionsUtils; +import org.apache.pinot.core.util.trace.TraceCallable; +import org.apache.pinot.spi.data.DateTimeFormatSpec; +import org.apache.pinot.spi.data.DateTimeGranularitySpec; + + +/** + * Helper class to reduce data tables and set group by results into the BrokerResponseNative + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class GapFillGroupByDataTableReducer implements DataTableReducer { + private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, find a better value. + + private final QueryContext _queryContext; + private final AggregationFunction[] _aggregationFunctions; + private final int _numAggregationFunctions; + private final List<ExpressionContext> _groupByExpressions; + private final int _numGroupByExpressions; + private final int _numColumns; + private final boolean _preserveType; + private final boolean _groupByModeSql; + private final boolean _responseFormatSql; + private final boolean _sqlQuery; + private final DateTimeGranularitySpec _dateTimeGranularity; + private final DateTimeFormatSpec _dateTimeFormatter; + private final long _startMs; + private final long _endMs; + private final Set<Key> _primaryKeys; + private final Map<Key, Object[]> _previous; + private final int _numOfKeyColumns; + + GapFillGroupByDataTableReducer(QueryContext queryContext) { + _queryContext = queryContext; + _aggregationFunctions = queryContext.getAggregationFunctions(); + assert _aggregationFunctions != null; + _numAggregationFunctions = _aggregationFunctions.length; + _groupByExpressions = queryContext.getGroupByExpressions(); + assert _groupByExpressions != null; + _numGroupByExpressions = _groupByExpressions.size(); + _numColumns = _numAggregationFunctions + _numGroupByExpressions; + Map<String, String> queryOptions = queryContext.getQueryOptions(); + _preserveType = QueryOptionsUtils.isPreserveType(queryOptions); + _groupByModeSql = QueryOptionsUtils.isGroupByModeSQL(queryOptions); + _responseFormatSql = QueryOptionsUtils.isResponseFormatSQL(queryOptions); + _sqlQuery = queryContext.getBrokerRequest().getPinotQuery() != null; + + ExpressionContext firstExpressionContext = _queryContext.getSelectExpressions().get(0); + List<ExpressionContext> args = firstExpressionContext.getFunction().getArguments(); + _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral()); + _dateTimeGranularity = new DateTimeGranularitySpec(args.get(4).getLiteral()); + String start = args.get(2).getLiteral(); + String end = args.get(3).getLiteral(); + _startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start)); + _endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end)); + _primaryKeys = new HashSet<>(); + _previous = new HashMap<>(); + _numOfKeyColumns = _queryContext.getGroupByExpressions().size() - 1; + } + + private long truncate(long epoch) { + int sz = _dateTimeGranularity.getSize(); + return epoch / sz * sz; + } + + /** + * Reduces and sets group by results into ResultTable, if responseFormat = sql + * By default, sets group by results into GroupByResults + */ + @Override + public void reduceAndSetResults(String tableName, DataSchema dataSchema, + Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative, + DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) { + assert dataSchema != null; + int resultSize = 0; + Collection<DataTable> dataTables = dataTableMap.values(); + + // For group by, PQL behavior is different than the SQL behavior. In the PQL way, Review comment: Let's not worry about PQL behavior and simplify this class. PQL is already deprecated and will be cleaned up soon. Here we can assume `_groupByModeSql` and `_responseFormatSql` are always `true` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org