Jackie-Jiang commented on a change in pull request #7781:
URL: https://github.com/apache/pinot/pull/7781#discussion_r752779601



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
##########
@@ -59,6 +59,7 @@
  * Factory class for transformation functions.
  */
 public class TransformFunctionFactory {
+

Review comment:
       (minor) revert the unnecessary change?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtil.java
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+
+
+/**
+ * Util class to encapsulate all utilites required for gapfill.
+ */
+public class GapfillUtil {
+  private static final String AGGREGATE_GAP_FILL = "aggregategapfill";

Review comment:
       If it is meant for post-aggregate gap fill, I'd suggest making it clear 
`postaggregategapfill`. In the future we can add `preaggregategapfill`.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
##########
@@ -56,7 +57,10 @@ public TransformOperator(@Nullable QueryContext 
queryContext, ProjectionOperator
     _projectionOperator = projectionOperator;
     _dataSourceMap = projectionOperator.getDataSourceMap();
     for (ExpressionContext expression : expressions) {
-      TransformFunction transformFunction = 
TransformFunctionFactory.get(queryContext, expression, _dataSourceMap);
+      TransformFunction transformFunction = TransformFunctionFactory.get(

Review comment:
       (minor, code format) we usually put arguments in the same line

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
##########
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+/**
+ * Helper class to reduce data tables and set group by results into the 
BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GapFillGroupByDataTableReducer implements DataTableReducer {
+  private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, 
find a better value.
+
+  private final QueryContext _queryContext;
+  private final AggregationFunction[] _aggregationFunctions;
+  private final int _numAggregationFunctions;
+  private final List<ExpressionContext> _groupByExpressions;
+  private final int _numGroupByExpressions;
+  private final int _numColumns;
+  private final boolean _sqlQuery;
+  private final DateTimeGranularitySpec _dateTimeGranularity;
+  private final DateTimeFormatSpec _dateTimeFormatter;
+  private final long _startMs;
+  private final long _endMs;
+  private final Set<Key> _primaryKeys;
+  private final Map<Key, Object[]> _previous;
+  private final int _numOfKeyColumns;
+
+  GapFillGroupByDataTableReducer(QueryContext queryContext) {
+    _queryContext = queryContext;
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    assert _aggregationFunctions != null;
+    _numAggregationFunctions = _aggregationFunctions.length;
+    _groupByExpressions = queryContext.getGroupByExpressions();
+    assert _groupByExpressions != null;
+    _numGroupByExpressions = _groupByExpressions.size();
+    _numColumns = _numAggregationFunctions + _numGroupByExpressions;
+    _sqlQuery = queryContext.getBrokerRequest().getPinotQuery() != null;

Review comment:
       Let's make a `Preconditions` check here and directly fail the pql query 
to avoid unexpected results because gap fill does not work on pql
   ```suggestion
     
Preconditions.checkArgument(queryContext.getBrokerRequest().getPinotQuery() != 
null, "GapFill cannot be applied to pql query);
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
##########
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+/**
+ * Helper class to reduce data tables and set group by results into the 
BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GapFillGroupByDataTableReducer implements DataTableReducer {
+  private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, 
find a better value.
+
+  private final QueryContext _queryContext;
+  private final AggregationFunction[] _aggregationFunctions;
+  private final int _numAggregationFunctions;
+  private final List<ExpressionContext> _groupByExpressions;
+  private final int _numGroupByExpressions;
+  private final int _numColumns;
+  private final boolean _sqlQuery;
+  private final DateTimeGranularitySpec _dateTimeGranularity;
+  private final DateTimeFormatSpec _dateTimeFormatter;
+  private final long _startMs;
+  private final long _endMs;
+  private final Set<Key> _primaryKeys;
+  private final Map<Key, Object[]> _previous;
+  private final int _numOfKeyColumns;
+
+  GapFillGroupByDataTableReducer(QueryContext queryContext) {
+    _queryContext = queryContext;
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    assert _aggregationFunctions != null;
+    _numAggregationFunctions = _aggregationFunctions.length;
+    _groupByExpressions = queryContext.getGroupByExpressions();
+    assert _groupByExpressions != null;
+    _numGroupByExpressions = _groupByExpressions.size();
+    _numColumns = _numAggregationFunctions + _numGroupByExpressions;
+    _sqlQuery = queryContext.getBrokerRequest().getPinotQuery() != null;
+
+    ExpressionContext firstExpressionContext = 
_queryContext.getSelectExpressions().get(0);

Review comment:
       IMO we should not force the time column to always be the first argument

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
##########
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+/**
+ * Helper class to reduce data tables and set group by results into the 
BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GapFillGroupByDataTableReducer implements DataTableReducer {
+  private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, 
find a better value.
+
+  private final QueryContext _queryContext;
+  private final AggregationFunction[] _aggregationFunctions;
+  private final int _numAggregationFunctions;
+  private final List<ExpressionContext> _groupByExpressions;
+  private final int _numGroupByExpressions;
+  private final int _numColumns;
+  private final boolean _sqlQuery;
+  private final DateTimeGranularitySpec _dateTimeGranularity;
+  private final DateTimeFormatSpec _dateTimeFormatter;
+  private final long _startMs;
+  private final long _endMs;
+  private final Set<Key> _primaryKeys;
+  private final Map<Key, Object[]> _previous;
+  private final int _numOfKeyColumns;
+
+  GapFillGroupByDataTableReducer(QueryContext queryContext) {
+    _queryContext = queryContext;
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    assert _aggregationFunctions != null;
+    _numAggregationFunctions = _aggregationFunctions.length;
+    _groupByExpressions = queryContext.getGroupByExpressions();
+    assert _groupByExpressions != null;
+    _numGroupByExpressions = _groupByExpressions.size();
+    _numColumns = _numAggregationFunctions + _numGroupByExpressions;
+    _sqlQuery = queryContext.getBrokerRequest().getPinotQuery() != null;
+
+    ExpressionContext firstExpressionContext = 
_queryContext.getSelectExpressions().get(0);
+    List<ExpressionContext> args = 
firstExpressionContext.getFunction().getArguments();
+    _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral());

Review comment:
       Let's add some arguments check here (both count and type) so that users 
getting proper exception message when providing wrong arguments

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
##########
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+/**
+ * Helper class to reduce data tables and set group by results into the 
BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GapFillGroupByDataTableReducer implements DataTableReducer {
+  private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, 
find a better value.
+
+  private final QueryContext _queryContext;
+  private final AggregationFunction[] _aggregationFunctions;
+  private final int _numAggregationFunctions;
+  private final List<ExpressionContext> _groupByExpressions;
+  private final int _numGroupByExpressions;
+  private final int _numColumns;
+  private final boolean _sqlQuery;
+  private final DateTimeGranularitySpec _dateTimeGranularity;
+  private final DateTimeFormatSpec _dateTimeFormatter;
+  private final long _startMs;
+  private final long _endMs;
+  private final Set<Key> _primaryKeys;
+  private final Map<Key, Object[]> _previous;
+  private final int _numOfKeyColumns;
+
+  GapFillGroupByDataTableReducer(QueryContext queryContext) {
+    _queryContext = queryContext;
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    assert _aggregationFunctions != null;
+    _numAggregationFunctions = _aggregationFunctions.length;
+    _groupByExpressions = queryContext.getGroupByExpressions();
+    assert _groupByExpressions != null;
+    _numGroupByExpressions = _groupByExpressions.size();
+    _numColumns = _numAggregationFunctions + _numGroupByExpressions;
+    _sqlQuery = queryContext.getBrokerRequest().getPinotQuery() != null;
+
+    ExpressionContext firstExpressionContext = 
_queryContext.getSelectExpressions().get(0);
+    List<ExpressionContext> args = 
firstExpressionContext.getFunction().getArguments();
+    _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral());
+    _dateTimeGranularity = new 
DateTimeGranularitySpec(args.get(4).getLiteral());
+    String start = args.get(2).getLiteral();
+    String end = args.get(3).getLiteral();
+    _startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start));
+    _endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end));
+    _primaryKeys = new HashSet<>();
+    _previous = new HashMap<>();
+    _numOfKeyColumns = _queryContext.getGroupByExpressions().size() - 1;
+  }
+
+  private long truncate(long epoch) {
+    int sz = _dateTimeGranularity.getSize();
+    return epoch / sz * sz;
+  }
+
+  /**
+   * Reduces and sets group by results into ResultTable, if responseFormat = 
sql
+   * By default, sets group by results into GroupByResults
+   */
+  @Override
+  public void reduceAndSetResults(String tableName, DataSchema dataSchema,
+      Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative 
brokerResponseNative,
+      DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
+    assert dataSchema != null;
+    Collection<DataTable> dataTables = dataTableMap.values();
+
+    // 1. groupByMode = sql, responseFormat = sql

Review comment:
       (minor) Remove the comments as they are no longer relevant

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtil.java
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+
+
+/**
+ * Util class to encapsulate all utilites required for gapfill.
+ */
+public class GapfillUtil {

Review comment:
       (minor)
   ```suggestion
   public class GapfillUtils {
   ```

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java
##########
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Queries test for LASTWITHTIME queries.
+ */
+@SuppressWarnings("rawtypes")
+public class PostAggregationGapfillQueriesTest extends BaseQueriesTest {

Review comment:
       Can we also test the HAVING clause? It should automatically work if all 
the expressions are properly stripped

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -201,6 +202,11 @@ public int getOffset() {
     return _offset;
   }
 
+  public boolean isAggregateGapfill() {
+    return !_selectExpressions.isEmpty()

Review comment:
       Should we check all the select expressions? I don't think we force 
gapfill to be the first expression
   
   (minor) select expressions is never empty

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtil.java
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+
+
+/**
+ * Util class to encapsulate all utilites required for gapfill.
+ */
+public class GapfillUtil {
+  private static final String AGGREGATE_GAP_FILL = "aggregategapfill";
+  private static final String FILL = "fill";
+
+  private GapfillUtil() {
+  }
+
+  public static ExpressionContext stripGapfill(ExpressionContext expression) {
+    if (expression.getType() != ExpressionContext.Type.FUNCTION) {
+      return expression;
+    }
+    FunctionContext function = expression.getFunction();
+    String functionName = StringUtils.remove(function.getFunctionName(), 
'_').toLowerCase();
+    if (functionName.equalsIgnoreCase(AGGREGATE_GAP_FILL) || 
functionName.equalsIgnoreCase(FILL)) {

Review comment:
       Since the name is already canonicalized, you may use `equals()`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
##########
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+/**
+ * Helper class to reduce data tables and set group by results into the 
BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GapFillGroupByDataTableReducer implements DataTableReducer {
+  private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, 
find a better value.
+
+  private final QueryContext _queryContext;
+  private final AggregationFunction[] _aggregationFunctions;
+  private final int _numAggregationFunctions;
+  private final List<ExpressionContext> _groupByExpressions;
+  private final int _numGroupByExpressions;
+  private final int _numColumns;
+  private final boolean _sqlQuery;
+  private final DateTimeGranularitySpec _dateTimeGranularity;
+  private final DateTimeFormatSpec _dateTimeFormatter;
+  private final long _startMs;
+  private final long _endMs;
+  private final Set<Key> _primaryKeys;
+  private final Map<Key, Object[]> _previous;
+  private final int _numOfKeyColumns;
+
+  GapFillGroupByDataTableReducer(QueryContext queryContext) {
+    _queryContext = queryContext;
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    assert _aggregationFunctions != null;
+    _numAggregationFunctions = _aggregationFunctions.length;
+    _groupByExpressions = queryContext.getGroupByExpressions();
+    assert _groupByExpressions != null;
+    _numGroupByExpressions = _groupByExpressions.size();
+    _numColumns = _numAggregationFunctions + _numGroupByExpressions;
+    _sqlQuery = queryContext.getBrokerRequest().getPinotQuery() != null;
+
+    ExpressionContext firstExpressionContext = 
_queryContext.getSelectExpressions().get(0);
+    List<ExpressionContext> args = 
firstExpressionContext.getFunction().getArguments();
+    _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral());
+    _dateTimeGranularity = new 
DateTimeGranularitySpec(args.get(4).getLiteral());
+    String start = args.get(2).getLiteral();
+    String end = args.get(3).getLiteral();
+    _startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start));

Review comment:
       We should also truncate the `_startMs` and `_endMs`

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtil.java
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+
+
+/**
+ * Util class to encapsulate all utilites required for gapfill.
+ */
+public class GapfillUtil {
+  private static final String AGGREGATE_GAP_FILL = "aggregategapfill";
+  private static final String FILL = "fill";
+
+  private GapfillUtil() {
+  }
+
+  public static ExpressionContext stripGapfill(ExpressionContext expression) {
+    if (expression.getType() != ExpressionContext.Type.FUNCTION) {
+      return expression;
+    }
+    FunctionContext function = expression.getFunction();
+    String functionName = StringUtils.remove(function.getFunctionName(), 
'_').toLowerCase();
+    if (functionName.equalsIgnoreCase(AGGREGATE_GAP_FILL) || 
functionName.equalsIgnoreCase(FILL)) {
+      return function.getArguments().get(0);
+    }
+    return expression;
+  }
+
+  public static boolean isAggregateGapfill(String name) {
+    return AGGREGATE_GAP_FILL.equalsIgnoreCase(name);

Review comment:
       You should also canonicalize the function name here

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java
##########
@@ -67,7 +68,8 @@ public PostAggregationHandler(QueryContext queryContext, 
DataSchema dataSchema)
     String[] columnNames = new String[numSelectExpressions];
     ColumnDataType[] columnDataTypes = new 
ColumnDataType[numSelectExpressions];
     for (int i = 0; i < numSelectExpressions; i++) {
-      ValueExtractor valueExtractor = 
getValueExtractor(selectExpressions.get(i));
+      ValueExtractor valueExtractor
+          = 
getValueExtractor(GapfillUtil.stripGapfill(selectExpressions.get(i)));

Review comment:
       I think `GapfillUtil.stripGapfill()` should be added to the 
`getValueExtracter()` method instead of here. `getValueExtracter()` is used in 
multiple places

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
##########
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+/**
+ * Helper class to reduce data tables and set group by results into the 
BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GapFillGroupByDataTableReducer implements DataTableReducer {
+  private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, 
find a better value.
+
+  private final QueryContext _queryContext;
+  private final AggregationFunction[] _aggregationFunctions;
+  private final int _numAggregationFunctions;
+  private final List<ExpressionContext> _groupByExpressions;
+  private final int _numGroupByExpressions;
+  private final int _numColumns;
+  private final boolean _sqlQuery;
+  private final DateTimeGranularitySpec _dateTimeGranularity;
+  private final DateTimeFormatSpec _dateTimeFormatter;
+  private final long _startMs;
+  private final long _endMs;
+  private final Set<Key> _primaryKeys;
+  private final Map<Key, Object[]> _previous;
+  private final int _numOfKeyColumns;
+
+  GapFillGroupByDataTableReducer(QueryContext queryContext) {
+    _queryContext = queryContext;
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    assert _aggregationFunctions != null;
+    _numAggregationFunctions = _aggregationFunctions.length;
+    _groupByExpressions = queryContext.getGroupByExpressions();
+    assert _groupByExpressions != null;
+    _numGroupByExpressions = _groupByExpressions.size();
+    _numColumns = _numAggregationFunctions + _numGroupByExpressions;
+    _sqlQuery = queryContext.getBrokerRequest().getPinotQuery() != null;
+
+    ExpressionContext firstExpressionContext = 
_queryContext.getSelectExpressions().get(0);
+    List<ExpressionContext> args = 
firstExpressionContext.getFunction().getArguments();
+    _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral());
+    _dateTimeGranularity = new 
DateTimeGranularitySpec(args.get(4).getLiteral());
+    String start = args.get(2).getLiteral();
+    String end = args.get(3).getLiteral();
+    _startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start));
+    _endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end));
+    _primaryKeys = new HashSet<>();
+    _previous = new HashMap<>();
+    _numOfKeyColumns = _queryContext.getGroupByExpressions().size() - 1;
+  }
+
+  private long truncate(long epoch) {
+    int sz = _dateTimeGranularity.getSize();
+    return epoch / sz * sz;
+  }
+
+  /**
+   * Reduces and sets group by results into ResultTable, if responseFormat = 
sql
+   * By default, sets group by results into GroupByResults
+   */
+  @Override
+  public void reduceAndSetResults(String tableName, DataSchema dataSchema,
+      Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative 
brokerResponseNative,
+      DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
+    assert dataSchema != null;
+    Collection<DataTable> dataTables = dataTableMap.values();
+
+    // 1. groupByMode = sql, responseFormat = sql
+    // This is the primary SQL compliant group by
+
+    try {
+      setSQLGroupByInResultTable(brokerResponseNative, dataSchema, dataTables, 
reducerContext, tableName,
+          brokerMetrics);
+    } catch (TimeoutException e) {
+      brokerResponseNative.getProcessingExceptions()
+          .add(new 
QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, 
e.getMessage()));
+    }
+    int resultSize = brokerResponseNative.getResultTable().getRows().size();
+
+    if (brokerMetrics != null && resultSize > 0) {
+      brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.GROUP_BY_SIZE, 
resultSize);
+    }
+  }
+
+  private Key constructKey(Object[] row) {
+    return new Key(Arrays.copyOfRange(row, 1, _numOfKeyColumns + 1));
+  }
+
+  /**
+   * Extract group by order by results and set into {@link ResultTable}
+   * @param brokerResponseNative broker response
+   * @param dataSchema data schema
+   * @param dataTables Collection of data tables
+   * @param reducerContext DataTableReducer context
+   * @param rawTableName table name
+   * @param brokerMetrics broker metrics (meters)
+   * @throws TimeoutException If unable complete within timeout.
+   */
+  private void setSQLGroupByInResultTable(BrokerResponseNative 
brokerResponseNative, DataSchema dataSchema,
+      Collection<DataTable> dataTables, DataTableReducerContext 
reducerContext, String rawTableName,
+      BrokerMetrics brokerMetrics)
+      throws TimeoutException {
+    IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, 
reducerContext);
+    if (brokerMetrics != null) {
+      brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
+      brokerMetrics.addValueToTableGauge(rawTableName, 
BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
+    }
+    Iterator<Record> sortedIterator = indexedTable.iterator();
+    DataSchema prePostAggregationDataSchema = 
getPrePostAggregationDataSchema(dataSchema);
+    ColumnDataType[] columnDataTypes = 
prePostAggregationDataSchema.getColumnDataTypes();
+    int numColumns = columnDataTypes.length;
+    int limit = _queryContext.getLimit();
+    List<Object[]> rows = new ArrayList<>(limit);
+
+    if (_sqlQuery) {
+      // SQL query with SQL group-by mode and response format
+
+      PostAggregationHandler postAggregationHandler =
+          new PostAggregationHandler(_queryContext, 
prePostAggregationDataSchema);
+      FilterContext havingFilter = _queryContext.getHavingFilter();
+      if (havingFilter != null) {
+        HavingFilterHandler havingFilterHandler = new 
HavingFilterHandler(havingFilter, postAggregationHandler);
+        while (rows.size() < limit && sortedIterator.hasNext()) {
+          Object[] row = sortedIterator.next().getValues();
+          extractFinalAggregationResults(row);
+          for (int i = 0; i < numColumns; i++) {
+            row[i] = columnDataTypes[i].convert(row[i]);
+          }
+          if (havingFilterHandler.isMatch(row)) {
+            rows.add(row);
+          }
+        }
+      } else {
+        for (int i = 0; i < limit && sortedIterator.hasNext(); i++) {
+          Object[] row = sortedIterator.next().getValues();
+          extractFinalAggregationResults(row);
+          for (int j = 0; j < numColumns; j++) {
+            row[j] = columnDataTypes[j].convert(row[j]);
+          }
+          rows.add(row);
+        }
+      }
+      DataSchema resultDataSchema = 
postAggregationHandler.getResultDataSchema();
+      ColumnDataType[] resultColumnDataTypes = 
resultDataSchema.getColumnDataTypes();
+      List<Object[]> resultRows = new ArrayList<>(rows.size());
+      for (Object[] row : rows) {
+        Object[] resultRow = postAggregationHandler.getResult(row);
+        for (int i = 0; i < resultColumnDataTypes.length; i++) {
+          resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]);
+        }
+        resultRows.add(resultRow);
+        _primaryKeys.add(constructKey(resultRow));
+      }
+      List<Object[]> gapfillResultRows = gapFill(resultRows, 
resultColumnDataTypes);
+      brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, 
gapfillResultRows));
+    } else {
+      // PQL query with SQL group-by mode and response format
+      // NOTE: For PQL query, keep the order of columns as is (group-by 
expressions followed by aggregations), no need
+      //       to perform post-aggregation or filtering.
+
+      for (int i = 0; i < limit && sortedIterator.hasNext(); i++) {
+        Object[] row = sortedIterator.next().getValues();
+        extractFinalAggregationResults(row);
+        for (int j = 0; j < numColumns; j++) {
+          row[j] = columnDataTypes[j].convertAndFormat(row[j]);
+        }
+        rows.add(row);
+      }
+      brokerResponseNative.setResultTable(new 
ResultTable(prePostAggregationDataSchema, rows));
+    }
+  }
+
+  List<Object[]> gapFill(List<Object[]> resultRows, ColumnDataType[] 
resultColumnDataTypes) {
+    int limit = _queryContext.getLimit();
+    int numResultColumns = resultColumnDataTypes.length;
+    List<Object[]> gapfillResultRows = new ArrayList<>(limit);
+    long step = _dateTimeGranularity.granularityToMillis();
+    int index = 0;
+    for (long time = _startMs; time + 2 * step <= _endMs; time += step) {
+      Set<Key> keys = new HashSet<>(_primaryKeys);
+      while (index < resultRows.size()) {
+        long timeCol = 
_dateTimeFormatter.fromFormatToMillis(String.valueOf(resultRows.get(index)[0]));
+        if (timeCol < time) {
+          index++;
+        } else if (timeCol == time) {
+          gapfillResultRows.add(resultRows.get(index));
+          if (gapfillResultRows.size() == limit) {
+            return gapfillResultRows;
+          }
+          Key key = constructKey(resultRows.get(index));
+          keys.remove(key);
+          _previous.put(key, resultRows.get(index));
+          index++;
+        } else {
+          break;
+        }
+      }
+      for (Key key : keys) {
+        Object[] gapfillRow = new Object[numResultColumns];
+        if (resultColumnDataTypes[0] == ColumnDataType.LONG) {
+          gapfillRow[0] = 
Long.valueOf(_dateTimeFormatter.fromMillisToFormat(time));
+        } else {
+          gapfillRow[0] = _dateTimeFormatter.fromMillisToFormat(time);
+        }
+        System.arraycopy(key.getValues(), 0, gapfillRow, 1, _numOfKeyColumns);
+
+        for (int i = _numOfKeyColumns + 1; i < numResultColumns; i++) {
+          gapfillRow[i] = getFillValue(i, key, resultColumnDataTypes[i]);
+        }
+        gapfillResultRows.add(gapfillRow);
+        if (gapfillResultRows.size() == limit) {
+          return gapfillResultRows;
+        }
+      }
+    }
+    return gapfillResultRows;
+  }
+
+  Object getFillValue(int columIndex, Object key, ColumnDataType dataType) {
+    ExpressionContext expressionContext = 
_queryContext.getSelectExpressions().get(columIndex);
+    if (expressionContext.getFunction() != null
+        && 
expressionContext.getFunction().getFunctionName().equalsIgnoreCase("fill")) {
+      List<ExpressionContext> args = 
expressionContext.getFunction().getArguments();
+      if (args.get(1).getLiteral() == null) {
+        throw new UnsupportedOperationException("Wrong Sql.");
+      }
+      FillType fillType = FillType.valueOf(args.get(1).getLiteral());
+      if (fillType == FillType.FILL_DEFAULT_VALUE) {
+        // TODO: may fill the default value from sql in the future.
+        return getDefaultValue(dataType);
+      } else if (fillType == FillType.FILL_PREVIOUS_VALUE) {
+        Object[] row = _previous.get(key);
+        if (row != null) {
+          return row[columIndex];
+        } else {
+          return getDefaultValue(dataType);
+        }
+      } else {
+        throw new UnsupportedOperationException("unsupported fill type.");
+      }
+    } else {
+      return getDefaultValue(dataType);
+    }
+  }
+
+  enum FillType {
+    FILL_DEFAULT_VALUE,
+    FILL_PREVIOUS_VALUE,
+  }
+
+  /**
+   * The default value for each column type.
+   */
+  private Serializable getDefaultValue(ColumnDataType dataType) {
+    switch (dataType) {
+      // Single-value column
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+      case BOOLEAN:
+      case TIMESTAMP:
+        return dataType.convertAndFormat(0);
+      case STRING:
+      case JSON:
+      case BYTES:
+        return "";
+      case INT_ARRAY:
+        return new int[0];
+      case LONG_ARRAY:
+        return new long[0];
+      case FLOAT_ARRAY:
+        return new float[0];
+      case DOUBLE_ARRAY:
+        return new double[0];
+      case STRING_ARRAY:
+      case TIMESTAMP_ARRAY:
+        return new String[0];
+      case BOOLEAN_ARRAY:
+        return new boolean[0];
+      case BYTES_ARRAY:
+        return new byte[0][0];
+      default:
+        throw new IllegalStateException(String.format("Cannot provide the 
default value for the type: %s", dataType));
+    }
+  }

Review comment:
       Suggest moving these 2 methods into the utils class since they can be 
shared for pre-aggregate gapfill

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
##########
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+/**
+ * Helper class to reduce data tables and set group by results into the 
BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GapFillGroupByDataTableReducer implements DataTableReducer {
+  private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, 
find a better value.
+
+  private final QueryContext _queryContext;
+  private final AggregationFunction[] _aggregationFunctions;
+  private final int _numAggregationFunctions;
+  private final List<ExpressionContext> _groupByExpressions;
+  private final int _numGroupByExpressions;
+  private final int _numColumns;
+  private final boolean _sqlQuery;
+  private final DateTimeGranularitySpec _dateTimeGranularity;
+  private final DateTimeFormatSpec _dateTimeFormatter;
+  private final long _startMs;
+  private final long _endMs;
+  private final Set<Key> _primaryKeys;
+  private final Map<Key, Object[]> _previous;
+  private final int _numOfKeyColumns;
+
+  GapFillGroupByDataTableReducer(QueryContext queryContext) {
+    _queryContext = queryContext;
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    assert _aggregationFunctions != null;
+    _numAggregationFunctions = _aggregationFunctions.length;
+    _groupByExpressions = queryContext.getGroupByExpressions();
+    assert _groupByExpressions != null;
+    _numGroupByExpressions = _groupByExpressions.size();
+    _numColumns = _numAggregationFunctions + _numGroupByExpressions;
+    _sqlQuery = queryContext.getBrokerRequest().getPinotQuery() != null;
+
+    ExpressionContext firstExpressionContext = 
_queryContext.getSelectExpressions().get(0);
+    List<ExpressionContext> args = 
firstExpressionContext.getFunction().getArguments();
+    _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral());
+    _dateTimeGranularity = new 
DateTimeGranularitySpec(args.get(4).getLiteral());
+    String start = args.get(2).getLiteral();
+    String end = args.get(3).getLiteral();
+    _startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start));
+    _endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end));
+    _primaryKeys = new HashSet<>();
+    _previous = new HashMap<>();
+    _numOfKeyColumns = _queryContext.getGroupByExpressions().size() - 1;
+  }
+
+  private long truncate(long epoch) {
+    int sz = _dateTimeGranularity.getSize();
+    return epoch / sz * sz;
+  }
+
+  /**
+   * Reduces and sets group by results into ResultTable, if responseFormat = 
sql
+   * By default, sets group by results into GroupByResults
+   */
+  @Override
+  public void reduceAndSetResults(String tableName, DataSchema dataSchema,
+      Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative 
brokerResponseNative,
+      DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
+    assert dataSchema != null;
+    Collection<DataTable> dataTables = dataTableMap.values();
+
+    // 1. groupByMode = sql, responseFormat = sql
+    // This is the primary SQL compliant group by
+
+    try {
+      setSQLGroupByInResultTable(brokerResponseNative, dataSchema, dataTables, 
reducerContext, tableName,
+          brokerMetrics);
+    } catch (TimeoutException e) {
+      brokerResponseNative.getProcessingExceptions()
+          .add(new 
QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, 
e.getMessage()));
+    }
+    int resultSize = brokerResponseNative.getResultTable().getRows().size();
+
+    if (brokerMetrics != null && resultSize > 0) {
+      brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.GROUP_BY_SIZE, 
resultSize);
+    }
+  }
+
+  private Key constructKey(Object[] row) {
+    return new Key(Arrays.copyOfRange(row, 1, _numOfKeyColumns + 1));
+  }
+
+  /**
+   * Extract group by order by results and set into {@link ResultTable}
+   * @param brokerResponseNative broker response
+   * @param dataSchema data schema
+   * @param dataTables Collection of data tables
+   * @param reducerContext DataTableReducer context
+   * @param rawTableName table name
+   * @param brokerMetrics broker metrics (meters)
+   * @throws TimeoutException If unable complete within timeout.
+   */
+  private void setSQLGroupByInResultTable(BrokerResponseNative 
brokerResponseNative, DataSchema dataSchema,
+      Collection<DataTable> dataTables, DataTableReducerContext 
reducerContext, String rawTableName,
+      BrokerMetrics brokerMetrics)
+      throws TimeoutException {
+    IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, 
reducerContext);
+    if (brokerMetrics != null) {
+      brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
+      brokerMetrics.addValueToTableGauge(rawTableName, 
BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
+    }
+    Iterator<Record> sortedIterator = indexedTable.iterator();
+    DataSchema prePostAggregationDataSchema = 
getPrePostAggregationDataSchema(dataSchema);
+    ColumnDataType[] columnDataTypes = 
prePostAggregationDataSchema.getColumnDataTypes();
+    int numColumns = columnDataTypes.length;
+    int limit = _queryContext.getLimit();
+    List<Object[]> rows = new ArrayList<>(limit);
+
+    if (_sqlQuery) {
+      // SQL query with SQL group-by mode and response format
+
+      PostAggregationHandler postAggregationHandler =
+          new PostAggregationHandler(_queryContext, 
prePostAggregationDataSchema);
+      FilterContext havingFilter = _queryContext.getHavingFilter();
+      if (havingFilter != null) {
+        HavingFilterHandler havingFilterHandler = new 
HavingFilterHandler(havingFilter, postAggregationHandler);
+        while (rows.size() < limit && sortedIterator.hasNext()) {
+          Object[] row = sortedIterator.next().getValues();
+          extractFinalAggregationResults(row);
+          for (int i = 0; i < numColumns; i++) {
+            row[i] = columnDataTypes[i].convert(row[i]);
+          }
+          if (havingFilterHandler.isMatch(row)) {
+            rows.add(row);
+          }
+        }
+      } else {
+        for (int i = 0; i < limit && sortedIterator.hasNext(); i++) {
+          Object[] row = sortedIterator.next().getValues();
+          extractFinalAggregationResults(row);
+          for (int j = 0; j < numColumns; j++) {
+            row[j] = columnDataTypes[j].convert(row[j]);
+          }
+          rows.add(row);
+        }
+      }
+      DataSchema resultDataSchema = 
postAggregationHandler.getResultDataSchema();
+      ColumnDataType[] resultColumnDataTypes = 
resultDataSchema.getColumnDataTypes();
+      List<Object[]> resultRows = new ArrayList<>(rows.size());
+      for (Object[] row : rows) {
+        Object[] resultRow = postAggregationHandler.getResult(row);
+        for (int i = 0; i < resultColumnDataTypes.length; i++) {
+          resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]);
+        }
+        resultRows.add(resultRow);
+        _primaryKeys.add(constructKey(resultRow));
+      }
+      List<Object[]> gapfillResultRows = gapFill(resultRows, 
resultColumnDataTypes);
+      brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, 
gapfillResultRows));
+    } else {
+      // PQL query with SQL group-by mode and response format
+      // NOTE: For PQL query, keep the order of columns as is (group-by 
expressions followed by aggregations), no need
+      //       to perform post-aggregation or filtering.
+
+      for (int i = 0; i < limit && sortedIterator.hasNext(); i++) {
+        Object[] row = sortedIterator.next().getValues();
+        extractFinalAggregationResults(row);
+        for (int j = 0; j < numColumns; j++) {
+          row[j] = columnDataTypes[j].convertAndFormat(row[j]);
+        }
+        rows.add(row);
+      }
+      brokerResponseNative.setResultTable(new 
ResultTable(prePostAggregationDataSchema, rows));
+    }
+  }
+
+  List<Object[]> gapFill(List<Object[]> resultRows, ColumnDataType[] 
resultColumnDataTypes) {
+    int limit = _queryContext.getLimit();
+    int numResultColumns = resultColumnDataTypes.length;
+    List<Object[]> gapfillResultRows = new ArrayList<>(limit);
+    long step = _dateTimeGranularity.granularityToMillis();
+    int index = 0;
+    for (long time = _startMs; time + 2 * step <= _endMs; time += step) {

Review comment:
       We are looping over the rows multiple times (once per time bucket). We 
can optimize it to only loop over the rows once. One option is to track the 
index of the previous iteration and start from the end index of the previous run

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
##########
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+/**
+ * Helper class to reduce data tables and set group by results into the 
BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GapFillGroupByDataTableReducer implements DataTableReducer {
+  private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, 
find a better value.
+
+  private final QueryContext _queryContext;
+  private final AggregationFunction[] _aggregationFunctions;
+  private final int _numAggregationFunctions;
+  private final List<ExpressionContext> _groupByExpressions;
+  private final int _numGroupByExpressions;
+  private final int _numColumns;
+  private final boolean _sqlQuery;
+  private final DateTimeGranularitySpec _dateTimeGranularity;
+  private final DateTimeFormatSpec _dateTimeFormatter;
+  private final long _startMs;
+  private final long _endMs;
+  private final Set<Key> _primaryKeys;
+  private final Map<Key, Object[]> _previous;
+  private final int _numOfKeyColumns;
+
+  GapFillGroupByDataTableReducer(QueryContext queryContext) {
+    _queryContext = queryContext;
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    assert _aggregationFunctions != null;
+    _numAggregationFunctions = _aggregationFunctions.length;
+    _groupByExpressions = queryContext.getGroupByExpressions();
+    assert _groupByExpressions != null;
+    _numGroupByExpressions = _groupByExpressions.size();
+    _numColumns = _numAggregationFunctions + _numGroupByExpressions;
+    _sqlQuery = queryContext.getBrokerRequest().getPinotQuery() != null;
+
+    ExpressionContext firstExpressionContext = 
_queryContext.getSelectExpressions().get(0);
+    List<ExpressionContext> args = 
firstExpressionContext.getFunction().getArguments();
+    _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral());
+    _dateTimeGranularity = new 
DateTimeGranularitySpec(args.get(4).getLiteral());
+    String start = args.get(2).getLiteral();
+    String end = args.get(3).getLiteral();
+    _startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start));
+    _endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end));
+    _primaryKeys = new HashSet<>();
+    _previous = new HashMap<>();
+    _numOfKeyColumns = _queryContext.getGroupByExpressions().size() - 1;
+  }
+
+  private long truncate(long epoch) {
+    int sz = _dateTimeGranularity.getSize();
+    return epoch / sz * sz;
+  }
+
+  /**
+   * Reduces and sets group by results into ResultTable, if responseFormat = 
sql
+   * By default, sets group by results into GroupByResults
+   */
+  @Override
+  public void reduceAndSetResults(String tableName, DataSchema dataSchema,
+      Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative 
brokerResponseNative,
+      DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
+    assert dataSchema != null;
+    Collection<DataTable> dataTables = dataTableMap.values();
+
+    // 1. groupByMode = sql, responseFormat = sql
+    // This is the primary SQL compliant group by
+
+    try {
+      setSQLGroupByInResultTable(brokerResponseNative, dataSchema, dataTables, 
reducerContext, tableName,
+          brokerMetrics);
+    } catch (TimeoutException e) {
+      brokerResponseNative.getProcessingExceptions()
+          .add(new 
QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, 
e.getMessage()));
+    }
+    int resultSize = brokerResponseNative.getResultTable().getRows().size();
+
+    if (brokerMetrics != null && resultSize > 0) {
+      brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.GROUP_BY_SIZE, 
resultSize);
+    }
+  }
+
+  private Key constructKey(Object[] row) {
+    return new Key(Arrays.copyOfRange(row, 1, _numOfKeyColumns + 1));
+  }
+
+  /**
+   * Extract group by order by results and set into {@link ResultTable}
+   * @param brokerResponseNative broker response
+   * @param dataSchema data schema
+   * @param dataTables Collection of data tables
+   * @param reducerContext DataTableReducer context
+   * @param rawTableName table name
+   * @param brokerMetrics broker metrics (meters)
+   * @throws TimeoutException If unable complete within timeout.
+   */
+  private void setSQLGroupByInResultTable(BrokerResponseNative 
brokerResponseNative, DataSchema dataSchema,
+      Collection<DataTable> dataTables, DataTableReducerContext 
reducerContext, String rawTableName,
+      BrokerMetrics brokerMetrics)
+      throws TimeoutException {
+    IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, 
reducerContext);
+    if (brokerMetrics != null) {
+      brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
+      brokerMetrics.addValueToTableGauge(rawTableName, 
BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
+    }
+    Iterator<Record> sortedIterator = indexedTable.iterator();
+    DataSchema prePostAggregationDataSchema = 
getPrePostAggregationDataSchema(dataSchema);
+    ColumnDataType[] columnDataTypes = 
prePostAggregationDataSchema.getColumnDataTypes();
+    int numColumns = columnDataTypes.length;
+    int limit = _queryContext.getLimit();
+    List<Object[]> rows = new ArrayList<>(limit);
+
+    if (_sqlQuery) {
+      // SQL query with SQL group-by mode and response format
+
+      PostAggregationHandler postAggregationHandler =
+          new PostAggregationHandler(_queryContext, 
prePostAggregationDataSchema);
+      FilterContext havingFilter = _queryContext.getHavingFilter();
+      if (havingFilter != null) {
+        HavingFilterHandler havingFilterHandler = new 
HavingFilterHandler(havingFilter, postAggregationHandler);
+        while (rows.size() < limit && sortedIterator.hasNext()) {
+          Object[] row = sortedIterator.next().getValues();
+          extractFinalAggregationResults(row);
+          for (int i = 0; i < numColumns; i++) {
+            row[i] = columnDataTypes[i].convert(row[i]);
+          }
+          if (havingFilterHandler.isMatch(row)) {
+            rows.add(row);
+          }
+        }
+      } else {
+        for (int i = 0; i < limit && sortedIterator.hasNext(); i++) {
+          Object[] row = sortedIterator.next().getValues();
+          extractFinalAggregationResults(row);
+          for (int j = 0; j < numColumns; j++) {
+            row[j] = columnDataTypes[j].convert(row[j]);
+          }
+          rows.add(row);
+        }
+      }
+      DataSchema resultDataSchema = 
postAggregationHandler.getResultDataSchema();
+      ColumnDataType[] resultColumnDataTypes = 
resultDataSchema.getColumnDataTypes();
+      List<Object[]> resultRows = new ArrayList<>(rows.size());
+      for (Object[] row : rows) {
+        Object[] resultRow = postAggregationHandler.getResult(row);
+        for (int i = 0; i < resultColumnDataTypes.length; i++) {
+          resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]);
+        }
+        resultRows.add(resultRow);
+        _primaryKeys.add(constructKey(resultRow));
+      }
+      List<Object[]> gapfillResultRows = gapFill(resultRows, 
resultColumnDataTypes);
+      brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, 
gapfillResultRows));
+    } else {
+      // PQL query with SQL group-by mode and response format
+      // NOTE: For PQL query, keep the order of columns as is (group-by 
expressions followed by aggregations), no need
+      //       to perform post-aggregation or filtering.
+
+      for (int i = 0; i < limit && sortedIterator.hasNext(); i++) {
+        Object[] row = sortedIterator.next().getValues();
+        extractFinalAggregationResults(row);
+        for (int j = 0; j < numColumns; j++) {
+          row[j] = columnDataTypes[j].convertAndFormat(row[j]);
+        }
+        rows.add(row);
+      }
+      brokerResponseNative.setResultTable(new 
ResultTable(prePostAggregationDataSchema, rows));
+    }
+  }
+
+  List<Object[]> gapFill(List<Object[]> resultRows, ColumnDataType[] 
resultColumnDataTypes) {
+    int limit = _queryContext.getLimit();
+    int numResultColumns = resultColumnDataTypes.length;
+    List<Object[]> gapfillResultRows = new ArrayList<>(limit);
+    long step = _dateTimeGranularity.granularityToMillis();
+    int index = 0;
+    for (long time = _startMs; time + 2 * step <= _endMs; time += step) {
+      Set<Key> keys = new HashSet<>(_primaryKeys);
+      while (index < resultRows.size()) {
+        long timeCol = 
_dateTimeFormatter.fromFormatToMillis(String.valueOf(resultRows.get(index)[0]));

Review comment:
       Here we assume the query is ordering by time in ascending order. We 
should add a check somewhere to reject queries not ordering by time. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to