Jackie-Jiang commented on a change in pull request #8029:
URL: https://github.com/apache/pinot/pull/8029#discussion_r828299743



##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -217,6 +218,10 @@ private BrokerResponseNative handleSQLRequest(long 
requestId, String query, Json
       requestStatistics.setErrorCode(QueryException.PQL_PARSING_ERROR_CODE);
       return new 
BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR,
 e));
     }
+
+    BrokerRequest originalBrokerRequest = brokerRequest;
+    brokerRequest = GapfillUtils.stripGapfill(originalBrokerRequest);

Review comment:
       Let's name it `serverBrokerRequest`, which is the broker request sent to 
the server?
   
   In `logBrokerResponse`, we should probably pass in the original broker 
request

##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -2183,9 +2192,9 @@ private void attachTimeBoundary(String rawTableName, 
BrokerRequest brokerRequest
    * Processes the optimized broker requests for both OFFLINE and REALTIME 
table.
    */
   protected abstract BrokerResponseNative processBrokerRequest(long requestId, 
BrokerRequest originalBrokerRequest,
-      @Nullable BrokerRequest offlineBrokerRequest, @Nullable 
Map<ServerInstance, List<String>> offlineRoutingTable,
-      @Nullable BrokerRequest realtimeBrokerRequest, @Nullable 
Map<ServerInstance, List<String>> realtimeRoutingTable,
-      long timeoutMs, ServerStats serverStats, RequestStatistics 
requestStatistics)
+      BrokerRequest brokerRequest, @Nullable BrokerRequest 
offlineBrokerRequest, @Nullable Map<ServerInstance,

Review comment:
       Suggest renaming it to `serverBrokerRequest`. Same for the child classes

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
##########
@@ -67,9 +67,7 @@
 
 
 public class CalciteSqlParser {
-  private CalciteSqlParser() {
-  }
-
+  public static final List<QueryRewriter> QUERY_REWRITERS = new 
ArrayList<>(QueryRewriterFactory.getQueryRewriters());

Review comment:
       (minor) Revert these reordering changes

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
##########
@@ -108,7 +108,12 @@ public BrokerResponseNative 
reduceOnDataTable(BrokerRequest brokerRequest,
     dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema, 
dataTableMap, brokerResponseNative,
         new DataTableReducerContext(_reduceExecutorService, 
_maxReduceThreadsPerQuery, reduceTimeOutMs,
             _groupByTrimThreshold), brokerMetrics);
-    updateAlias(queryContext, brokerResponseNative);
+    QueryContext originalQueryContext = 
BrokerRequestToQueryContextConverter.convert(originalBrokerRequest);

Review comment:
       Check if `originalBrokerRequest` and `serverBrokerRequest` is the same 
reference before applying this logic to avoid the overhead for regular queries

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
##########
@@ -162,8 +163,7 @@ public String getQueryResponse(String query, String 
traceEnabled, String queryOp
       String inputTableName;
       switch (querySyntax) {
         case CommonConstants.Broker.Request.SQL:
-          inputTableName =
-              
SQL_QUERY_COMPILER.compileToBrokerRequest(query).getPinotQuery().getDataSource().getTableName();
+          inputTableName = 
GapfillUtils.getTableName(SQL_QUERY_COMPILER.compileToBrokerRequest(query).getPinotQuery());

Review comment:
       Let's move this util into `RequestUtils` as it doesn't only apply to 
gapfill

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
##########
@@ -100,6 +95,9 @@ private CalciteSqlParser() {
   private static final Pattern OPTIONS_REGEX_PATTEN =
       Pattern.compile("option\\s*\\(([^\\)]+)\\)", Pattern.CASE_INSENSITIVE);
 
+  private CalciteSqlParser() {

Review comment:
       (minor) Revert these reordering changes

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ColumnDataToBlockValSetConverter.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * As for Gapfilling Function, all raw data will be retrieved from the pinot
+ * server and merged on the pinot broker. The data will be in {@link DataTable}
+ * format.
+ * As part of Gapfilling Function execution plan, the aggregation function will
+ * work on the merged data on pinot broker. The aggregation function only takes
+ * the {@link BlockValSet} format.
+ * This is the Helper class to convert the data from {@link DataTable} to the
+ * block of values {@link BlockValSet} which used as input to the aggregation
+ * function.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})

Review comment:
       These 2 warnings can be removed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ColumnDataToBlockValSetConverter.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * As for Gapfilling Function, all raw data will be retrieved from the pinot
+ * server and merged on the pinot broker. The data will be in {@link DataTable}
+ * format.
+ * As part of Gapfilling Function execution plan, the aggregation function will
+ * work on the merged data on pinot broker. The aggregation function only takes
+ * the {@link BlockValSet} format.
+ * This is the Helper class to convert the data from {@link DataTable} to the
+ * block of values {@link BlockValSet} which used as input to the aggregation
+ * function.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ColumnDataToBlockValSetConverter implements BlockValSet {

Review comment:
       Rename it to `RowBasedBlockValSet`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ColumnDataToBlockValSetConverter.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * As for Gapfilling Function, all raw data will be retrieved from the pinot
+ * server and merged on the pinot broker. The data will be in {@link DataTable}
+ * format.
+ * As part of Gapfilling Function execution plan, the aggregation function will
+ * work on the merged data on pinot broker. The aggregation function only takes
+ * the {@link BlockValSet} format.
+ * This is the Helper class to convert the data from {@link DataTable} to the
+ * block of values {@link BlockValSet} which used as input to the aggregation
+ * function.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ColumnDataToBlockValSetConverter implements BlockValSet {
+
+  private final FieldSpec.DataType _dataType;
+  private final List<Object[]> _rows;
+  private final int _columnIndex;
+
+  public ColumnDataToBlockValSetConverter(DataSchema.ColumnDataType 
columnDataType, List<Object[]> rows,
+      int columnIndex) {
+    _dataType = columnDataType.toDataType();
+    _rows = rows;
+    _columnIndex = columnIndex;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return _dataType;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Nullable
+  @Override
+  public Dictionary getDictionary() {
+    throw new UnsupportedOperationException("Not supported");

Review comment:
       ```suggestion
       return null;
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ColumnDataToBlockValSetConverter.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * As for Gapfilling Function, all raw data will be retrieved from the pinot

Review comment:
       This class is common for all sub-query handling, so let's remove the 
gapfill part from the javadoc. We may also add a TODO here to support BYTES and 
MV in the future.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ColumnDataToBlockValSetConverter.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * As for Gapfilling Function, all raw data will be retrieved from the pinot
+ * server and merged on the pinot broker. The data will be in {@link DataTable}
+ * format.
+ * As part of Gapfilling Function execution plan, the aggregation function will
+ * work on the merged data on pinot broker. The aggregation function only takes
+ * the {@link BlockValSet} format.
+ * This is the Helper class to convert the data from {@link DataTable} to the
+ * block of values {@link BlockValSet} which used as input to the aggregation
+ * function.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ColumnDataToBlockValSetConverter implements BlockValSet {
+
+  private final FieldSpec.DataType _dataType;
+  private final List<Object[]> _rows;
+  private final int _columnIndex;
+
+  public ColumnDataToBlockValSetConverter(DataSchema.ColumnDataType 
columnDataType, List<Object[]> rows,
+      int columnIndex) {
+    _dataType = columnDataType.toDataType();
+    _rows = rows;
+    _columnIndex = columnIndex;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return _dataType;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Nullable
+  @Override
+  public Dictionary getDictionary() {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public int[] getDictionaryIdsSV() {
+    throw new UnsupportedOperationException("Not supported");

Review comment:
       (minor) Remove the `"Not supported"` from the exception message, same 
for other places

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/filter/ValueExtractorFactory.java
##########
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce.filter;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+
+
+/**
+ * Value extractor for the post-aggregation function or pre-aggregation gap 
fill.

Review comment:
       Update the javadoc

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ColumnDataToBlockValSetConverter.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * As for Gapfilling Function, all raw data will be retrieved from the pinot
+ * server and merged on the pinot broker. The data will be in {@link DataTable}
+ * format.
+ * As part of Gapfilling Function execution plan, the aggregation function will
+ * work on the merged data on pinot broker. The aggregation function only takes
+ * the {@link BlockValSet} format.
+ * This is the Helper class to convert the data from {@link DataTable} to the
+ * block of values {@link BlockValSet} which used as input to the aggregation
+ * function.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ColumnDataToBlockValSetConverter implements BlockValSet {
+
+  private final FieldSpec.DataType _dataType;
+  private final List<Object[]> _rows;
+  private final int _columnIndex;
+
+  public ColumnDataToBlockValSetConverter(DataSchema.ColumnDataType 
columnDataType, List<Object[]> rows,
+      int columnIndex) {
+    _dataType = columnDataType.toDataType();
+    _rows = rows;
+    _columnIndex = columnIndex;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return _dataType;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Nullable
+  @Override
+  public Dictionary getDictionary() {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public int[] getDictionaryIdsSV() {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public int[] getIntValuesSV() {
+    if (_dataType == FieldSpec.DataType.INT) {
+      int[] result = new int[_rows.size()];
+      for (int i = 0; i < result.length; i++) {
+        result[i] = (Integer) _rows.get(i)[_columnIndex];
+      }
+      return result;
+    }
+    throw new UnsupportedOperationException("Not supported");

Review comment:
       We should support reading ints from any numeric types and string type, 
same for other places
   ```suggestion
       int length = _rows.size();
       int[] values = new int[length];
       if (_dataType.isNumeric()) {
         for (int i = 0; i <length; i++) {
           values[i] = ((Number) _rows.get(i)[_columnIndex]).intValue();
         }
       } else if (_dataType == FieldSpec.DataType.STRING) {
         for (int i = 0; i <length; i++) {
           values[i] = Integer.parseInt((String) _rows.get(i)[_columnIndex]);
         }
       } else {
         throw new IllegalStateException("Cannot read int values from data 
type: " + _dataType);
       }
       return values;
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillProcessor.java
##########
@@ -0,0 +1,455 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.GapfillUtils;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+/**
+ * Helper class to reduce and set gap fill results into the 
BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GapFillProcessor {

Review comment:
       (minor) Rename to `GapfillProcessor` to be consistent

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillProcessor.java
##########
@@ -0,0 +1,455 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.GapfillUtils;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+/**
+ * Helper class to reduce and set gap fill results into the 
BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GapFillProcessor {
+  private final QueryContext _queryContext;
+
+  private final int _limitForAggregatedResult;
+  private final DateTimeGranularitySpec _dateTimeGranularity;
+  private final DateTimeFormatSpec _dateTimeFormatter;
+  private final long _startMs;
+  private final long _endMs;
+  private final long _timeBucketSize;
+  private final int _numOfTimeBuckets;
+  private final List<Integer> _groupByKeyIndexes;
+  private final Set<Key> _groupByKeys;
+  private final Map<Key, Object[]> _previousByGroupKey;
+  private final Map<String, ExpressionContext> _fillExpressions;
+  private final List<ExpressionContext> _timeSeries;
+  private final GapfillUtils.GapfillType _gapfillType;
+  private int _limitForGapfilledResult;
+  private boolean[] _isGroupBySelections;
+  private final int _timeBucketColumnIndex;
+  private int[] _sourceColumnIndexForResultSchema = null;
+
+  GapFillProcessor(QueryContext queryContext) {
+    _queryContext = queryContext;
+    _gapfillType = queryContext.getGapfillType();
+    _limitForAggregatedResult = queryContext.getLimit();
+    if (_gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL
+        || _gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
+      _limitForGapfilledResult = queryContext.getLimit();
+    } else {
+      _limitForGapfilledResult = queryContext.getSubQueryContext().getLimit();
+    }
+
+    ExpressionContext gapFillSelection = 
GapfillUtils.getGapfillExpressionContext(queryContext);
+    _timeBucketColumnIndex = 
GapfillUtils.findTimeBucketColumnIndex(queryContext);
+
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+
+    _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral());
+    _dateTimeGranularity = new 
DateTimeGranularitySpec(args.get(4).getLiteral());
+    String start = args.get(2).getLiteral();
+    _startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start));
+    String end = args.get(3).getLiteral();
+    _endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end));
+    _timeBucketSize = _dateTimeGranularity.granularityToMillis();
+    _numOfTimeBuckets = (int) ((_endMs - _startMs) / _timeBucketSize);
+
+    _fillExpressions = GapfillUtils.getFillExpressions(gapFillSelection);
+
+    _previousByGroupKey = new HashMap<>();
+    _groupByKeyIndexes = new ArrayList<>();
+    _groupByKeys = new HashSet<>();
+
+    ExpressionContext timeseriesOn = 
GapfillUtils.getTimeSeriesOnExpressionContext(gapFillSelection);
+    _timeSeries = timeseriesOn.getFunction().getArguments();
+  }
+
+  private int findBucketIndex(long time) {
+    return (int) ((time - _startMs) / _timeBucketSize);
+  }
+
+  private void replaceColumnNameWithAlias(DataSchema dataSchema) {
+    QueryContext queryContext;
+    if (_gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL_AGGREGATE) 
{
+      queryContext = _queryContext.getSubQueryContext().getSubQueryContext();
+    } else if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
+      queryContext = _queryContext;
+    } else {
+      queryContext = _queryContext.getSubQueryContext();
+    }
+    List<String> aliasList = queryContext.getAliasList();
+    Map<String, String> columnNameToAliasMap = new HashMap<>();
+    for (int i = 0; i < aliasList.size(); i++) {
+      if (aliasList.get(i) != null) {
+        ExpressionContext selection = 
queryContext.getSelectExpressions().get(i);
+        if (GapfillUtils.isGapfill(selection)) {
+          selection = selection.getFunction().getArguments().get(0);
+        }
+        columnNameToAliasMap.put(selection.toString(), aliasList.get(i));
+      }
+    }
+    for (int i = 0; i < dataSchema.getColumnNames().length; i++) {
+      if (columnNameToAliasMap.containsKey(dataSchema.getColumnNames()[i])) {
+        dataSchema.getColumnNames()[i] = 
columnNameToAliasMap.get(dataSchema.getColumnNames()[i]);
+      }
+    }
+  }
+
+  /**
+   * Here are three things that happen
+   * 1. Sort the result sets from all pinot servers based on timestamp
+   * 2. Gapfill the data for missing entities per time bucket
+   * 3. Aggregate the dataset per time bucket.
+   */
+  public void process(BrokerResponseNative brokerResponseNative) {
+    DataSchema dataSchema = 
brokerResponseNative.getResultTable().getDataSchema();
+    DataSchema resultTableSchema = getResultTableDataSchema(dataSchema);
+    if (brokerResponseNative.getResultTable().getRows().isEmpty()) {
+      brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, 
Collections.emptyList()));
+      return;
+    }
+
+    String[] columns = dataSchema.getColumnNames();
+
+    Map<String, Integer> indexes = new HashMap<>();
+    for (int i = 0; i < columns.length; i++) {
+      indexes.put(columns[i], i);
+    }
+
+    _isGroupBySelections = new boolean[dataSchema.getColumnDataTypes().length];
+
+    // The first one argument of timeSeries is time column. The left ones are 
defining entity.
+    for (ExpressionContext entityColum : _timeSeries) {
+      int index = indexes.get(entityColum.getIdentifier());
+      _isGroupBySelections[index] = true;
+    }
+
+    for (int i = 0; i < _isGroupBySelections.length; i++) {
+      if (_isGroupBySelections[i]) {
+        _groupByKeyIndexes.add(i);
+      }
+    }
+
+    List<Object[]>[] timeBucketedRawRows = 
putRawRowsIntoTimeBucket(brokerResponseNative.getResultTable().getRows());
+
+    List<Object[]> resultRows;
+    replaceColumnNameWithAlias(dataSchema);
+
+    if (_queryContext.getAggregationFunctions() == null) {
+
+      Map<String, Integer> sourceColumnsIndexes = new HashMap<>();
+      for (int i = 0; i < dataSchema.getColumnNames().length; i++) {
+        sourceColumnsIndexes.put(dataSchema.getColumnName(i), i);
+      }
+      _sourceColumnIndexForResultSchema = new 
int[resultTableSchema.getColumnNames().length];
+      for (int i = 0; i < _sourceColumnIndexForResultSchema.length; i++) {
+        _sourceColumnIndexForResultSchema[i] = 
sourceColumnsIndexes.get(resultTableSchema.getColumnName(i));
+      }
+    }
+
+    if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL_AGGREGATE || 
_gapfillType == GapfillUtils.GapfillType.GAP_FILL
+        || _gapfillType == GapfillUtils.GapfillType.GAP_FILL_SELECT) {
+      List<Object[]> gapfilledRows = gapFillAndAggregate(timeBucketedRawRows, 
resultTableSchema, dataSchema);
+      if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL_SELECT) {
+        resultRows = new ArrayList<>(gapfilledRows.size());
+        resultRows.addAll(gapfilledRows);
+      } else {
+        resultRows = gapfilledRows;
+      }
+    } else {
+      resultRows = gapFillAndAggregate(timeBucketedRawRows, resultTableSchema, 
dataSchema);
+    }

Review comment:
       I don't follow this part. Is this the same as `resultRows = 
gapFillAndAggregate(timeBucketedRawRows, resultTableSchema, dataSchema);`?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ColumnDataToBlockValSetConverter.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * As for Gapfilling Function, all raw data will be retrieved from the pinot
+ * server and merged on the pinot broker. The data will be in {@link DataTable}
+ * format.
+ * As part of Gapfilling Function execution plan, the aggregation function will
+ * work on the merged data on pinot broker. The aggregation function only takes
+ * the {@link BlockValSet} format.
+ * This is the Helper class to convert the data from {@link DataTable} to the
+ * block of values {@link BlockValSet} which used as input to the aggregation
+ * function.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ColumnDataToBlockValSetConverter implements BlockValSet {
+
+  private final FieldSpec.DataType _dataType;
+  private final List<Object[]> _rows;
+  private final int _columnIndex;
+
+  public ColumnDataToBlockValSetConverter(DataSchema.ColumnDataType 
columnDataType, List<Object[]> rows,
+      int columnIndex) {
+    _dataType = columnDataType.toDataType();
+    _rows = rows;
+    _columnIndex = columnIndex;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return _dataType;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Nullable
+  @Override
+  public Dictionary getDictionary() {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public int[] getDictionaryIdsSV() {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public int[] getIntValuesSV() {
+    if (_dataType == FieldSpec.DataType.INT) {
+      int[] result = new int[_rows.size()];
+      for (int i = 0; i < result.length; i++) {
+        result[i] = (Integer) _rows.get(i)[_columnIndex];
+      }
+      return result;
+    }
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public long[] getLongValuesSV() {
+    if (_dataType == FieldSpec.DataType.LONG) {
+      long[] result = new long[_rows.size()];
+      for (int i = 0; i < result.length; i++) {
+        result[i] = (Long) _rows.get(i)[_columnIndex];
+      }
+      return result;
+    }
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public float[] getFloatValuesSV() {
+    if (_dataType == FieldSpec.DataType.FLOAT) {
+      float[] result = new float[_rows.size()];
+      for (int i = 0; i < result.length; i++) {
+        result[i] = (Float) _rows.get(i)[_columnIndex];
+      }
+      return result;
+    }
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public double[] getDoubleValuesSV() {
+    if (_dataType == FieldSpec.DataType.DOUBLE) {
+      double[] result = new double[_rows.size()];
+      for (int i = 0; i < result.length; i++) {
+        result[i] = (Double) _rows.get(i)[_columnIndex];
+      }
+      return result;
+    } else if (_dataType == FieldSpec.DataType.INT) {
+      double[] result = new double[_rows.size()];
+      for (int i = 0; i < result.length; i++) {
+        result[i] = ((Integer) _rows.get(i)[_columnIndex]).doubleValue();
+      }
+      return result;
+    }
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public String[] getStringValuesSV() {
+    if (_dataType == FieldSpec.DataType.STRING) {
+      String[] result = new String[_rows.size()];
+      for (int i = 0; i < result.length; i++) {
+        result[i] = (String) _rows.get(i)[_columnIndex];
+      }
+      return result;
+    }
+    throw new UnsupportedOperationException("Not supported");

Review comment:
       We should support reading strings from all data types
   ```suggestion
       int length = _rows.size();
       String[] values = new String[length];
       for (int i = 0; i < length; i++) {
         values[i] = _rows.get(i)[_columnIndex].toString();
       }
       return values;
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapfillFilterHandler.java
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.reduce.filter.ColumnValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.LiteralValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.RowMatcher;
+import org.apache.pinot.core.query.reduce.filter.RowMatcherFactory;
+import org.apache.pinot.core.query.reduce.filter.ValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.ValueExtractorFactory;
+import org.apache.pinot.core.util.GapfillUtils;
+
+
+/**
+ * Handler for Filter clause of GapFill.
+ */
+public class GapfillFilterHandler implements ValueExtractorFactory {
+  private final RowMatcher _rowMatcher;
+  private final DataSchema _dataSchema;
+  private final Map<String, Integer> _indexes;
+
+  public GapfillFilterHandler(FilterContext filter, DataSchema dataSchema) {
+    _dataSchema = dataSchema;
+    _indexes = new HashMap<>();
+    for (int i = 0; i < _dataSchema.size(); i++) {
+      _indexes.put(_dataSchema.getColumnName(i), i);
+    }
+    _rowMatcher = RowMatcherFactory.getRowMatcher(filter, this);
+  }
+
+  /**
+   * Returns {@code true} if the given row matches the HAVING clause, {@code 
false} otherwise.
+   */
+  public boolean isMatch(Object[] row) {
+    return _rowMatcher.isMatch(row);
+  }
+
+  /**
+   * Returns a ValueExtractor based on the given expression.
+   */
+  @Override
+  public ValueExtractor getValueExtractor(ExpressionContext expression) {
+    expression = GapfillUtils.stripGapfill(expression);
+    if (expression.getType() == ExpressionContext.Type.LITERAL) {
+      // Literal
+      return new LiteralValueExtractor(expression.getLiteral());
+    }
+
+    if (expression.getType() == ExpressionContext.Type.IDENTIFIER) {
+      return new 
ColumnValueExtractor(_indexes.get(expression.getIdentifier()), _dataSchema);
+    } else {
+      return new 
ColumnValueExtractor(_indexes.get(expression.getFunction().toString()), 
_dataSchema);

Review comment:
       This does not handle transform properly (e.g. `colA - colB` where the 
gapfill selects `colA` and `colB`). This is handled within the 
`PostAggregationValueExtractor`, and we may also extract that out to be shared. 
(Or add a TODO to fix later)

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapfillFilterHandler.java
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.reduce.filter.ColumnValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.LiteralValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.RowMatcher;
+import org.apache.pinot.core.query.reduce.filter.RowMatcherFactory;
+import org.apache.pinot.core.query.reduce.filter.ValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.ValueExtractorFactory;
+import org.apache.pinot.core.util.GapfillUtils;
+
+
+/**
+ * Handler for Filter clause of GapFill.
+ */
+public class GapfillFilterHandler implements ValueExtractorFactory {
+  private final RowMatcher _rowMatcher;
+  private final DataSchema _dataSchema;
+  private final Map<String, Integer> _indexes;
+
+  public GapfillFilterHandler(FilterContext filter, DataSchema dataSchema) {
+    _dataSchema = dataSchema;
+    _indexes = new HashMap<>();
+    for (int i = 0; i < _dataSchema.size(); i++) {
+      _indexes.put(_dataSchema.getColumnName(i), i);

Review comment:
       This won't work for certain aggregations because the column name in 
schema is not `expression.toString()`. You may refer to 
`PostAggregationHandler` on how to handle the index for aggregation queries. 
(Or add a TODO to fix later)

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/filter/RowMatcherFactory.java
##########
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce.filter;
+
+import org.apache.pinot.common.request.context.FilterContext;
+
+
+/**
+ * Factory for RowMatcher.
+ */
+public interface RowMatcherFactory {

Review comment:
       This should be a concrete util class instead of an interface

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -133,6 +137,8 @@ private QueryContext(String tableName, 
List<ExpressionContext> selectExpressions
     _queryOptions = queryOptions;
     _debugOptions = debugOptions;
     _brokerRequest = brokerRequest;
+    _gapfillType = null;

Review comment:
       (minor) this line is redundant

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -188,6 +194,10 @@ public FilterContext getHavingFilter() {
     return _orderByExpressions;
   }
 
+  public QueryContext getSubQueryContext() {

Review comment:
       ```suggestion
     public QueryContext getSubquery() {
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -85,6 +86,7 @@
   // Keep the BrokerRequest to make incremental changes
   // TODO: Remove it once the whole query engine is using the QueryContext
   private final BrokerRequest _brokerRequest;
+  private final QueryContext _subQueryContext;

Review comment:
       Rename it to `_subquery` to be consistent with other variables

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/filter/ValueExtractorFactory.java
##########
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce.filter;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+
+
+/**
+ * Value extractor for the post-aggregation function or pre-aggregation gap 
fill.
+ */
+public interface ValueExtractorFactory {
+  ValueExtractor getValueExtractor(ExpressionContext expression);

Review comment:
       Add some javadoc

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -375,6 +393,57 @@ public String toString() {
     private Map<String, String> _queryOptions;
     private Map<String, String> _debugOptions;
     private BrokerRequest _brokerRequest;
+    private QueryContext _subQueryContext;

Review comment:
       ```suggestion
       private QueryContext _subquery;
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -436,6 +505,11 @@ public Builder setBrokerRequest(BrokerRequest 
brokerRequest) {
       return this;
     }
 
+    public Builder setSubqueryContext(QueryContext subQueryContext) {

Review comment:
       ```suggestion
       public Builder setSubquery(QueryContext subquery) {
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
##########
@@ -42,23 +42,42 @@
 import org.apache.pinot.common.utils.request.FilterQueryTree;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.GapfillUtils;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
 
 public class BrokerRequestToQueryContextConverter {
   private BrokerRequestToQueryContextConverter() {
   }
 
+  /**
+   * Validate the gapfill query.
+   */
+  public static void validateGapfillQuery(BrokerRequest brokerRequest) {

Review comment:
       Why do we need this? This can add quite big overhead to regular queries

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
##########
@@ -71,12 +86,15 @@ public static boolean isFill(ExpressionContext 
expressionContext) {
       return false;
     }
 
-    return 
FILL.equals(canonicalizeFunctionName(expressionContext.getFunction().getFunctionName()));
+    return 
FILL.equalsIgnoreCase(canonicalizeFunctionName(expressionContext.getFunction().getFunctionName()));

Review comment:
       After #8341, all function name in `FunctionContext` is already 
canonical, no need to canonicalize again. Same for other places
   ```suggestion
       return expressionContext.getFunction().getFunctionName().equals(FILL);
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
##########
@@ -42,23 +42,42 @@
 import org.apache.pinot.common.utils.request.FilterQueryTree;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.GapfillUtils;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
 
 public class BrokerRequestToQueryContextConverter {
   private BrokerRequestToQueryContextConverter() {
   }
 
+  /**
+   * Validate the gapfill query.
+   */
+  public static void validateGapfillQuery(BrokerRequest brokerRequest) {
+    if (brokerRequest.getPinotQuery() != null) {
+      QueryContext queryContext = convertSQL(brokerRequest.getPinotQuery(), 
brokerRequest);
+      GapfillUtils.setGapfillType(queryContext);
+    }
+  }
+
   /**
    * Converts the given {@link BrokerRequest} into a {@link QueryContext}.
    */
   public static QueryContext convert(BrokerRequest brokerRequest) {
-    return brokerRequest.getPinotQuery() != null ? convertSQL(brokerRequest) : 
convertPQL(brokerRequest);
+    if (brokerRequest.getPinotQuery() != null) {
+      QueryContext queryContext = convertSQL(brokerRequest.getPinotQuery(), 
brokerRequest);
+      GapfillUtils.setGapfillType(queryContext);
+      return queryContext;
+    } else {
+      return convertPQL(brokerRequest);
+    }
   }
 
-  private static QueryContext convertSQL(BrokerRequest brokerRequest) {
-    PinotQuery pinotQuery = brokerRequest.getPinotQuery();
-
+  private static QueryContext convertSQL(PinotQuery pinotQuery, BrokerRequest 
brokerRequest) {
+    QueryContext subQueryContext = null;

Review comment:
       (minor)
   ```suggestion
       QueryContext subquery = null;
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -375,6 +393,57 @@ public String toString() {
     private Map<String, String> _queryOptions;
     private Map<String, String> _debugOptions;
     private BrokerRequest _brokerRequest;
+    private QueryContext _subQueryContext;
+
+    /**
+     * Helper method to extract AGGREGATION FunctionContexts and FILTER 
FilterContexts from the given expression.
+     */
+    private static void getAggregations(ExpressionContext expression,
+        List<Pair<FunctionContext, FilterContext>> filteredAggregations) {
+      FunctionContext function = expression.getFunction();
+      if (function == null) {
+        return;
+      }
+      if (function.getType() == FunctionContext.Type.AGGREGATION) {
+        // Aggregation
+        filteredAggregations.add(Pair.of(function, null));
+      } else {
+        List<ExpressionContext> arguments = function.getArguments();
+        if (function.getFunctionName().equalsIgnoreCase("filter")) {
+          // Filtered aggregation
+          Preconditions.checkState(arguments.size() == 2, "FILTER must contain 
2 arguments");
+          FunctionContext aggregation = arguments.get(0).getFunction();
+          Preconditions.checkState(aggregation != null && 
aggregation.getType() == FunctionContext.Type.AGGREGATION,
+              "First argument of FILTER must be an aggregation function");
+          ExpressionContext filterExpression = arguments.get(1);
+          Preconditions.checkState(filterExpression.getFunction() != null
+                  && filterExpression.getFunction().getType() == 
FunctionContext.Type.TRANSFORM,
+              "Second argument of FILTER must be a filter expression");
+          FilterContext filter = 
RequestContextUtils.getFilter(filterExpression);
+          filteredAggregations.add(Pair.of(aggregation, filter));
+        } else {
+          // Transform
+          for (ExpressionContext argument : arguments) {
+            getAggregations(argument, filteredAggregations);
+          }
+        }
+      }
+    }
+
+    /**
+     * Helper method to extract AGGREGATION FunctionContexts and FILTER 
FilterContexts from the given filter.
+     */
+    private static void getAggregations(FilterContext filter,
+        List<Pair<FunctionContext, FilterContext>> filteredAggregations) {
+      List<FilterContext> children = filter.getChildren();
+      if (children != null) {
+        for (FilterContext child : children) {
+          getAggregations(child, filteredAggregations);
+        }
+      } else {
+        getAggregations(filter.getPredicate().getLhs(), filteredAggregations);
+      }
+    }

Review comment:
       Revert the reordering change

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
##########
@@ -88,6 +83,8 @@ private SelectionOperatorUtils() {
       ThreadLocal.withInitial(() -> new DecimalFormat(FLOAT_PATTERN, 
DECIMAL_FORMAT_SYMBOLS));
   private static final ThreadLocal<DecimalFormat> THREAD_LOCAL_DOUBLE_FORMAT =
       ThreadLocal.withInitial(() -> new DecimalFormat(DOUBLE_PATTERN, 
DECIMAL_FORMAT_SYMBOLS));
+  private SelectionOperatorUtils() {

Review comment:
       Revert the unrelated changes

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
##########
@@ -114,6 +115,7 @@ public AsyncQueryResponse submitQuery(long requestId, 
String rawTableName,
     Map<ServerRoutingInstance, InstanceRequest> requestMap = new HashMap<>();
     if (offlineBrokerRequest != null) {
       assert offlineRoutingTable != null;
+      
BrokerRequestToQueryContextConverter.validateGapfillQuery(offlineBrokerRequest);

Review comment:
       Let's not validate gapfill here as it will add overhead to all queries 
(converting broker request to query context)

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
##########
@@ -119,4 +137,265 @@ static public Serializable 
getDefaultValue(DataSchema.ColumnDataType dataType) {
   private static String canonicalizeFunctionName(String functionName) {
     return StringUtils.remove(functionName, '_').toLowerCase();
   }
+
+  public static boolean isGapfill(ExpressionContext expressionContext) {
+    if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
+      return false;
+    }
+
+    return 
GAP_FILL.equals(canonicalizeFunctionName(expressionContext.getFunction().getFunctionName()));
+  }
+
+  private static boolean isGapfill(QueryContext queryContext) {
+    for (ExpressionContext expressionContext : 
queryContext.getSelectExpressions()) {
+      if (isGapfill(expressionContext)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Get the gapfill type for queryContext. Also do the validation for gapfill 
request.
+   * @param queryContext
+   */
+  public static void setGapfillType(QueryContext queryContext) {
+    GapfillType gapfillType = null;
+    if (queryContext.getSubQueryContext() == null) {
+      if (isGapfill(queryContext)) {
+        Preconditions.checkArgument(queryContext.getAggregationFunctions() == 
null,
+            "Aggregation and Gapfill can not be in the same sql statement.");
+        gapfillType = GapfillType.GAP_FILL;
+      }
+    } else if (isGapfill(queryContext)) {
+      
Preconditions.checkArgument(queryContext.getSubQueryContext().getAggregationFunctions()
 != null,
+          "Select and Gapfill should be in the same sql statement.");
+      
Preconditions.checkArgument(queryContext.getSubQueryContext().getSubQueryContext()
 == null,
+          "There is no three levels nesting sql when the outer query is 
gapfill.");
+      gapfillType = GapfillType.AGGREGATE_GAP_FILL;
+    } else if (isGapfill(queryContext.getSubQueryContext())) {
+      if (queryContext.getAggregationFunctions() == null) {
+        gapfillType = GapfillType.GAP_FILL_SELECT;
+      } else if (queryContext.getSubQueryContext().getSubQueryContext() == 
null) {
+        gapfillType = GapfillType.GAP_FILL_AGGREGATE;
+      } else {
+        Preconditions
+            
.checkArgument(queryContext.getSubQueryContext().getSubQueryContext().getAggregationFunctions()
 != null,
+                "Select cannot happen before gapfill.");
+        gapfillType = GapfillType.AGGREGATE_GAP_FILL_AGGREGATE;
+      }
+    }
+
+    queryContext.setGapfillType(gapfillType);
+    if (gapfillType == null) {
+      return;
+    }
+
+    ExpressionContext gapFillSelection = 
GapfillUtils.getGapfillExpressionContext(queryContext);
+
+    Preconditions.checkArgument(gapFillSelection != null && 
gapFillSelection.getFunction() != null,
+        "Gapfill Expression should be function.");
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    Preconditions.checkArgument(args.size() > 5, "PreAggregateGapFill does not 
have correct number of arguments.");
+    Preconditions.checkArgument(args.get(1).getLiteral() != null,
+        "The second argument of PostAggregateGapFill should be 
TimeFormatter.");
+    Preconditions.checkArgument(args.get(2).getLiteral() != null,
+        "The third argument of PostAggregateGapFill should be start time.");
+    Preconditions.checkArgument(args.get(3).getLiteral() != null,
+        "The fourth argument of PostAggregateGapFill should be end time.");
+    Preconditions.checkArgument(args.get(4).getLiteral() != null,
+        "The fifth argument of PostAggregateGapFill should be time bucket 
size.");
+
+    ExpressionContext timeseriesOn = 
GapfillUtils.getTimeSeriesOnExpressionContext(gapFillSelection);
+    Preconditions.checkArgument(timeseriesOn != null, "The TimeSeriesOn 
expressions should be specified.");
+
+    if (queryContext.getAggregationFunctions() == null) {
+      return;
+    }
+
+    List<ExpressionContext> groupbyExpressions = 
queryContext.getGroupByExpressions();
+    Preconditions.checkArgument(groupbyExpressions != null, "No GroupBy 
Clause.");
+    List<ExpressionContext> innerSelections = 
queryContext.getSubQueryContext().getSelectExpressions();
+    String timeBucketCol = null;
+    List<String> strAlias = queryContext.getSubQueryContext().getAliasList();
+    for (int i = 0; i < innerSelections.size(); i++) {
+      ExpressionContext innerSelection = innerSelections.get(i);
+      if (GapfillUtils.isGapfill(innerSelection)) {
+        if (strAlias.get(i) != null) {
+          timeBucketCol = strAlias.get(i);
+        } else {
+          timeBucketCol = 
innerSelection.getFunction().getArguments().get(0).toString();
+        }
+        break;
+      }
+    }
+
+    Preconditions.checkArgument(timeBucketCol != null, "No Group By 
timebucket.");
+
+    boolean findTimeBucket = false;
+    for (ExpressionContext groupbyExp : groupbyExpressions) {
+      if (timeBucketCol.equals(groupbyExp.toString())) {
+        findTimeBucket = true;
+        break;
+      }
+    }
+
+    Preconditions.checkArgument(findTimeBucket, "No Group By timebucket.");
+  }
+
+  private static ExpressionContext findGapfillExpressionContext(QueryContext 
queryContext) {
+    for (ExpressionContext expressionContext : 
queryContext.getSelectExpressions()) {
+      if (isGapfill(expressionContext)) {
+        return expressionContext;
+      }
+    }
+    return null;
+  }
+
+  public static ExpressionContext getGapfillExpressionContext(QueryContext 
queryContext) {
+    GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == GapfillType.AGGREGATE_GAP_FILL || gapfillType == 
GapfillType.GAP_FILL) {
+      return findGapfillExpressionContext(queryContext);
+    } else if (gapfillType == GapfillType.GAP_FILL_AGGREGATE || gapfillType == 
GapfillType.AGGREGATE_GAP_FILL_AGGREGATE
+        || gapfillType == GapfillType.GAP_FILL_SELECT) {
+      return findGapfillExpressionContext(queryContext.getSubQueryContext());
+    } else {
+      return null;
+    }
+  }
+
+  public static int findTimeBucketColumnIndex(QueryContext queryContext) {
+    GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == GapfillType.GAP_FILL_AGGREGATE
+        || gapfillType == GapfillType.GAP_FILL_SELECT
+        || gapfillType == GapfillType.AGGREGATE_GAP_FILL_AGGREGATE) {
+      queryContext = queryContext.getSubQueryContext();
+    }
+    List<ExpressionContext> expressionContexts = 
queryContext.getSelectExpressions();
+    for (int i = 0; i < expressionContexts.size(); i++) {
+      if (isGapfill(expressionContexts.get(i))) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  public static ExpressionContext 
getTimeSeriesOnExpressionContext(ExpressionContext gapFillSelection) {
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    for (int i = STARTING_INDEX_OF_OPTIONAL_ARGS_FOR_PRE_AGGREGATE_GAP_FILL; i 
< args.size(); i++) {
+      if (GapfillUtils.isTimeSeriesOn(args.get(i))) {
+        return args.get(i);
+      }
+    }
+    return null;
+  }
+
+  public static Map<String, ExpressionContext> 
getFillExpressions(ExpressionContext gapFillSelection) {
+    Map<String, ExpressionContext> fillExpressions = new HashMap<>();
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    for (int i = STARTING_INDEX_OF_OPTIONAL_ARGS_FOR_PRE_AGGREGATE_GAP_FILL; i 
< args.size(); i++) {
+      if (GapfillUtils.isFill(args.get(i))) {
+        ExpressionContext fillExpression = args.get(i);
+        
fillExpressions.put(fillExpression.getFunction().getArguments().get(0).getIdentifier(),
 fillExpression);
+      }
+    }
+    return fillExpressions;
+  }
+
+  public static String getTableName(PinotQuery pinotQuery) {
+    while (pinotQuery.getDataSource().getSubquery() != null) {
+      pinotQuery = pinotQuery.getDataSource().getSubquery();
+    }
+    return pinotQuery.getDataSource().getTableName();
+  }
+
+  public static BrokerRequest stripGapfill(BrokerRequest brokerRequest) {
+    if (brokerRequest.getPinotQuery().getDataSource() == null) {
+      return brokerRequest;
+    }
+    QueryContext queryContext = 
BrokerRequestToQueryContextConverter.convert(brokerRequest);

Review comment:
       Avoid this conversion. We may loop over the select list and see if there 
is `gapfill`, and directly rewrite the query when `gapfill` is found

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
##########
@@ -391,6 +388,9 @@ public static DataTable 
getDataTableFromRows(Collection<Object[]> rows, DataSche
           row[i] = dataTable.getStringArray(rowId, i);
           break;
 
+        case OBJECT:

Review comment:
       Does this change still apply?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
##########
@@ -119,4 +137,265 @@ static public Serializable 
getDefaultValue(DataSchema.ColumnDataType dataType) {
   private static String canonicalizeFunctionName(String functionName) {
     return StringUtils.remove(functionName, '_').toLowerCase();
   }
+
+  public static boolean isGapfill(ExpressionContext expressionContext) {
+    if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
+      return false;
+    }
+
+    return 
GAP_FILL.equals(canonicalizeFunctionName(expressionContext.getFunction().getFunctionName()));
+  }
+
+  private static boolean isGapfill(QueryContext queryContext) {
+    for (ExpressionContext expressionContext : 
queryContext.getSelectExpressions()) {
+      if (isGapfill(expressionContext)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Get the gapfill type for queryContext. Also do the validation for gapfill 
request.
+   * @param queryContext
+   */
+  public static void setGapfillType(QueryContext queryContext) {
+    GapfillType gapfillType = null;
+    if (queryContext.getSubQueryContext() == null) {
+      if (isGapfill(queryContext)) {
+        Preconditions.checkArgument(queryContext.getAggregationFunctions() == 
null,
+            "Aggregation and Gapfill can not be in the same sql statement.");
+        gapfillType = GapfillType.GAP_FILL;
+      }
+    } else if (isGapfill(queryContext)) {
+      
Preconditions.checkArgument(queryContext.getSubQueryContext().getAggregationFunctions()
 != null,
+          "Select and Gapfill should be in the same sql statement.");
+      
Preconditions.checkArgument(queryContext.getSubQueryContext().getSubQueryContext()
 == null,
+          "There is no three levels nesting sql when the outer query is 
gapfill.");
+      gapfillType = GapfillType.AGGREGATE_GAP_FILL;
+    } else if (isGapfill(queryContext.getSubQueryContext())) {
+      if (queryContext.getAggregationFunctions() == null) {
+        gapfillType = GapfillType.GAP_FILL_SELECT;
+      } else if (queryContext.getSubQueryContext().getSubQueryContext() == 
null) {
+        gapfillType = GapfillType.GAP_FILL_AGGREGATE;
+      } else {
+        Preconditions
+            
.checkArgument(queryContext.getSubQueryContext().getSubQueryContext().getAggregationFunctions()
 != null,
+                "Select cannot happen before gapfill.");
+        gapfillType = GapfillType.AGGREGATE_GAP_FILL_AGGREGATE;
+      }
+    }
+
+    queryContext.setGapfillType(gapfillType);
+    if (gapfillType == null) {
+      return;
+    }
+
+    ExpressionContext gapFillSelection = 
GapfillUtils.getGapfillExpressionContext(queryContext);
+
+    Preconditions.checkArgument(gapFillSelection != null && 
gapFillSelection.getFunction() != null,
+        "Gapfill Expression should be function.");
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    Preconditions.checkArgument(args.size() > 5, "PreAggregateGapFill does not 
have correct number of arguments.");
+    Preconditions.checkArgument(args.get(1).getLiteral() != null,
+        "The second argument of PostAggregateGapFill should be 
TimeFormatter.");
+    Preconditions.checkArgument(args.get(2).getLiteral() != null,
+        "The third argument of PostAggregateGapFill should be start time.");
+    Preconditions.checkArgument(args.get(3).getLiteral() != null,
+        "The fourth argument of PostAggregateGapFill should be end time.");
+    Preconditions.checkArgument(args.get(4).getLiteral() != null,
+        "The fifth argument of PostAggregateGapFill should be time bucket 
size.");
+
+    ExpressionContext timeseriesOn = 
GapfillUtils.getTimeSeriesOnExpressionContext(gapFillSelection);
+    Preconditions.checkArgument(timeseriesOn != null, "The TimeSeriesOn 
expressions should be specified.");
+
+    if (queryContext.getAggregationFunctions() == null) {
+      return;
+    }
+
+    List<ExpressionContext> groupbyExpressions = 
queryContext.getGroupByExpressions();
+    Preconditions.checkArgument(groupbyExpressions != null, "No GroupBy 
Clause.");
+    List<ExpressionContext> innerSelections = 
queryContext.getSubQueryContext().getSelectExpressions();
+    String timeBucketCol = null;
+    List<String> strAlias = queryContext.getSubQueryContext().getAliasList();
+    for (int i = 0; i < innerSelections.size(); i++) {
+      ExpressionContext innerSelection = innerSelections.get(i);
+      if (GapfillUtils.isGapfill(innerSelection)) {
+        if (strAlias.get(i) != null) {
+          timeBucketCol = strAlias.get(i);
+        } else {
+          timeBucketCol = 
innerSelection.getFunction().getArguments().get(0).toString();
+        }
+        break;
+      }
+    }
+
+    Preconditions.checkArgument(timeBucketCol != null, "No Group By 
timebucket.");
+
+    boolean findTimeBucket = false;
+    for (ExpressionContext groupbyExp : groupbyExpressions) {
+      if (timeBucketCol.equals(groupbyExp.toString())) {
+        findTimeBucket = true;
+        break;
+      }
+    }
+
+    Preconditions.checkArgument(findTimeBucket, "No Group By timebucket.");
+  }
+
+  private static ExpressionContext findGapfillExpressionContext(QueryContext 
queryContext) {
+    for (ExpressionContext expressionContext : 
queryContext.getSelectExpressions()) {
+      if (isGapfill(expressionContext)) {
+        return expressionContext;
+      }
+    }
+    return null;
+  }
+
+  public static ExpressionContext getGapfillExpressionContext(QueryContext 
queryContext) {
+    GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == GapfillType.AGGREGATE_GAP_FILL || gapfillType == 
GapfillType.GAP_FILL) {
+      return findGapfillExpressionContext(queryContext);
+    } else if (gapfillType == GapfillType.GAP_FILL_AGGREGATE || gapfillType == 
GapfillType.AGGREGATE_GAP_FILL_AGGREGATE
+        || gapfillType == GapfillType.GAP_FILL_SELECT) {
+      return findGapfillExpressionContext(queryContext.getSubQueryContext());
+    } else {
+      return null;
+    }
+  }
+
+  public static int findTimeBucketColumnIndex(QueryContext queryContext) {
+    GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == GapfillType.GAP_FILL_AGGREGATE
+        || gapfillType == GapfillType.GAP_FILL_SELECT
+        || gapfillType == GapfillType.AGGREGATE_GAP_FILL_AGGREGATE) {
+      queryContext = queryContext.getSubQueryContext();
+    }
+    List<ExpressionContext> expressionContexts = 
queryContext.getSelectExpressions();
+    for (int i = 0; i < expressionContexts.size(); i++) {
+      if (isGapfill(expressionContexts.get(i))) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  public static ExpressionContext 
getTimeSeriesOnExpressionContext(ExpressionContext gapFillSelection) {
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    for (int i = STARTING_INDEX_OF_OPTIONAL_ARGS_FOR_PRE_AGGREGATE_GAP_FILL; i 
< args.size(); i++) {
+      if (GapfillUtils.isTimeSeriesOn(args.get(i))) {
+        return args.get(i);
+      }
+    }
+    return null;
+  }
+
+  public static Map<String, ExpressionContext> 
getFillExpressions(ExpressionContext gapFillSelection) {
+    Map<String, ExpressionContext> fillExpressions = new HashMap<>();
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    for (int i = STARTING_INDEX_OF_OPTIONAL_ARGS_FOR_PRE_AGGREGATE_GAP_FILL; i 
< args.size(); i++) {
+      if (GapfillUtils.isFill(args.get(i))) {
+        ExpressionContext fillExpression = args.get(i);
+        
fillExpressions.put(fillExpression.getFunction().getArguments().get(0).getIdentifier(),
 fillExpression);
+      }
+    }
+    return fillExpressions;
+  }
+
+  public static String getTableName(PinotQuery pinotQuery) {
+    while (pinotQuery.getDataSource().getSubquery() != null) {
+      pinotQuery = pinotQuery.getDataSource().getSubquery();
+    }
+    return pinotQuery.getDataSource().getTableName();
+  }
+
+  public static BrokerRequest stripGapfill(BrokerRequest brokerRequest) {
+    if (brokerRequest.getPinotQuery().getDataSource() == null) {
+      return brokerRequest;
+    }
+    QueryContext queryContext = 
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+    GapfillUtils.GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == null) {
+      return brokerRequest;
+    }
+    switch (gapfillType) {
+      // one sql query with gapfill only
+      case GAP_FILL:
+        return stripGapfill(brokerRequest.getPinotQuery());
+      // gapfill as subquery, the outer query may have the filter
+      case GAP_FILL_SELECT:
+        // gapfill as subquery, the outer query has the aggregation
+      case GAP_FILL_AGGREGATE:
+        // aggregation as subqery, the outer query is gapfill
+      case AGGREGATE_GAP_FILL:
+        return 
stripGapfill(brokerRequest.getPinotQuery().getDataSource().getSubquery());
+      // aggegration as second nesting subquery, gapfill as first nesting 
subquery, different aggregation as outer query
+      case AGGREGATE_GAP_FILL_AGGREGATE:
+        return 
stripGapfill(brokerRequest.getPinotQuery().getDataSource().getSubquery().getDataSource().getSubquery());
+      default:
+        return brokerRequest;
+    }
+  }
+
+  private static BrokerRequest stripGapfill(PinotQuery pinotQuery) {
+    PinotQuery copy = new PinotQuery(pinotQuery);
+    BrokerRequest brokerRequest = new BrokerRequest();
+    brokerRequest.setPinotQuery(copy);
+    // Set table name in broker request because it is used for access control, 
query routing etc.
+    DataSource dataSource = copy.getDataSource();
+    if (dataSource != null) {
+      QuerySource querySource = new QuerySource();
+      querySource.setTableName(dataSource.getTableName());
+      brokerRequest.setQuerySource(querySource);
+    }
+    List<Expression> selectList = copy.getSelectList();
+    for (int i = 0; i < selectList.size(); i++) {
+      Expression select = selectList.get(i);
+      if (select.getType() != ExpressionType.FUNCTION) {
+        continue;
+      }
+      if (GAP_FILL.equalsIgnoreCase(select.getFunctionCall().getOperator())) {

Review comment:
       (minor) function name is canonical, so you may use `equals` here

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
##########
@@ -119,4 +137,265 @@ static public Serializable 
getDefaultValue(DataSchema.ColumnDataType dataType) {
   private static String canonicalizeFunctionName(String functionName) {
     return StringUtils.remove(functionName, '_').toLowerCase();
   }
+
+  public static boolean isGapfill(ExpressionContext expressionContext) {
+    if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
+      return false;
+    }
+
+    return 
GAP_FILL.equals(canonicalizeFunctionName(expressionContext.getFunction().getFunctionName()));
+  }
+
+  private static boolean isGapfill(QueryContext queryContext) {
+    for (ExpressionContext expressionContext : 
queryContext.getSelectExpressions()) {
+      if (isGapfill(expressionContext)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Get the gapfill type for queryContext. Also do the validation for gapfill 
request.
+   * @param queryContext
+   */
+  public static void setGapfillType(QueryContext queryContext) {
+    GapfillType gapfillType = null;
+    if (queryContext.getSubQueryContext() == null) {
+      if (isGapfill(queryContext)) {
+        Preconditions.checkArgument(queryContext.getAggregationFunctions() == 
null,
+            "Aggregation and Gapfill can not be in the same sql statement.");
+        gapfillType = GapfillType.GAP_FILL;
+      }
+    } else if (isGapfill(queryContext)) {
+      
Preconditions.checkArgument(queryContext.getSubQueryContext().getAggregationFunctions()
 != null,
+          "Select and Gapfill should be in the same sql statement.");
+      
Preconditions.checkArgument(queryContext.getSubQueryContext().getSubQueryContext()
 == null,
+          "There is no three levels nesting sql when the outer query is 
gapfill.");
+      gapfillType = GapfillType.AGGREGATE_GAP_FILL;
+    } else if (isGapfill(queryContext.getSubQueryContext())) {
+      if (queryContext.getAggregationFunctions() == null) {
+        gapfillType = GapfillType.GAP_FILL_SELECT;
+      } else if (queryContext.getSubQueryContext().getSubQueryContext() == 
null) {
+        gapfillType = GapfillType.GAP_FILL_AGGREGATE;
+      } else {
+        Preconditions
+            
.checkArgument(queryContext.getSubQueryContext().getSubQueryContext().getAggregationFunctions()
 != null,
+                "Select cannot happen before gapfill.");
+        gapfillType = GapfillType.AGGREGATE_GAP_FILL_AGGREGATE;
+      }
+    }
+
+    queryContext.setGapfillType(gapfillType);
+    if (gapfillType == null) {
+      return;
+    }
+
+    ExpressionContext gapFillSelection = 
GapfillUtils.getGapfillExpressionContext(queryContext);
+
+    Preconditions.checkArgument(gapFillSelection != null && 
gapFillSelection.getFunction() != null,
+        "Gapfill Expression should be function.");
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    Preconditions.checkArgument(args.size() > 5, "PreAggregateGapFill does not 
have correct number of arguments.");
+    Preconditions.checkArgument(args.get(1).getLiteral() != null,
+        "The second argument of PostAggregateGapFill should be 
TimeFormatter.");
+    Preconditions.checkArgument(args.get(2).getLiteral() != null,
+        "The third argument of PostAggregateGapFill should be start time.");
+    Preconditions.checkArgument(args.get(3).getLiteral() != null,
+        "The fourth argument of PostAggregateGapFill should be end time.");
+    Preconditions.checkArgument(args.get(4).getLiteral() != null,
+        "The fifth argument of PostAggregateGapFill should be time bucket 
size.");
+
+    ExpressionContext timeseriesOn = 
GapfillUtils.getTimeSeriesOnExpressionContext(gapFillSelection);
+    Preconditions.checkArgument(timeseriesOn != null, "The TimeSeriesOn 
expressions should be specified.");
+
+    if (queryContext.getAggregationFunctions() == null) {
+      return;
+    }
+
+    List<ExpressionContext> groupbyExpressions = 
queryContext.getGroupByExpressions();
+    Preconditions.checkArgument(groupbyExpressions != null, "No GroupBy 
Clause.");
+    List<ExpressionContext> innerSelections = 
queryContext.getSubQueryContext().getSelectExpressions();
+    String timeBucketCol = null;
+    List<String> strAlias = queryContext.getSubQueryContext().getAliasList();
+    for (int i = 0; i < innerSelections.size(); i++) {
+      ExpressionContext innerSelection = innerSelections.get(i);
+      if (GapfillUtils.isGapfill(innerSelection)) {
+        if (strAlias.get(i) != null) {
+          timeBucketCol = strAlias.get(i);
+        } else {
+          timeBucketCol = 
innerSelection.getFunction().getArguments().get(0).toString();
+        }
+        break;
+      }
+    }
+
+    Preconditions.checkArgument(timeBucketCol != null, "No Group By 
timebucket.");
+
+    boolean findTimeBucket = false;
+    for (ExpressionContext groupbyExp : groupbyExpressions) {
+      if (timeBucketCol.equals(groupbyExp.toString())) {
+        findTimeBucket = true;
+        break;
+      }
+    }
+
+    Preconditions.checkArgument(findTimeBucket, "No Group By timebucket.");
+  }
+
+  private static ExpressionContext findGapfillExpressionContext(QueryContext 
queryContext) {
+    for (ExpressionContext expressionContext : 
queryContext.getSelectExpressions()) {
+      if (isGapfill(expressionContext)) {
+        return expressionContext;
+      }
+    }
+    return null;
+  }
+
+  public static ExpressionContext getGapfillExpressionContext(QueryContext 
queryContext) {
+    GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == GapfillType.AGGREGATE_GAP_FILL || gapfillType == 
GapfillType.GAP_FILL) {
+      return findGapfillExpressionContext(queryContext);
+    } else if (gapfillType == GapfillType.GAP_FILL_AGGREGATE || gapfillType == 
GapfillType.AGGREGATE_GAP_FILL_AGGREGATE
+        || gapfillType == GapfillType.GAP_FILL_SELECT) {
+      return findGapfillExpressionContext(queryContext.getSubQueryContext());
+    } else {
+      return null;
+    }
+  }
+
+  public static int findTimeBucketColumnIndex(QueryContext queryContext) {
+    GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == GapfillType.GAP_FILL_AGGREGATE
+        || gapfillType == GapfillType.GAP_FILL_SELECT
+        || gapfillType == GapfillType.AGGREGATE_GAP_FILL_AGGREGATE) {
+      queryContext = queryContext.getSubQueryContext();
+    }
+    List<ExpressionContext> expressionContexts = 
queryContext.getSelectExpressions();
+    for (int i = 0; i < expressionContexts.size(); i++) {
+      if (isGapfill(expressionContexts.get(i))) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  public static ExpressionContext 
getTimeSeriesOnExpressionContext(ExpressionContext gapFillSelection) {
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    for (int i = STARTING_INDEX_OF_OPTIONAL_ARGS_FOR_PRE_AGGREGATE_GAP_FILL; i 
< args.size(); i++) {
+      if (GapfillUtils.isTimeSeriesOn(args.get(i))) {
+        return args.get(i);
+      }
+    }
+    return null;
+  }
+
+  public static Map<String, ExpressionContext> 
getFillExpressions(ExpressionContext gapFillSelection) {
+    Map<String, ExpressionContext> fillExpressions = new HashMap<>();
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    for (int i = STARTING_INDEX_OF_OPTIONAL_ARGS_FOR_PRE_AGGREGATE_GAP_FILL; i 
< args.size(); i++) {
+      if (GapfillUtils.isFill(args.get(i))) {
+        ExpressionContext fillExpression = args.get(i);
+        
fillExpressions.put(fillExpression.getFunction().getArguments().get(0).getIdentifier(),
 fillExpression);
+      }
+    }
+    return fillExpressions;
+  }
+
+  public static String getTableName(PinotQuery pinotQuery) {
+    while (pinotQuery.getDataSource().getSubquery() != null) {
+      pinotQuery = pinotQuery.getDataSource().getSubquery();
+    }
+    return pinotQuery.getDataSource().getTableName();
+  }
+
+  public static BrokerRequest stripGapfill(BrokerRequest brokerRequest) {
+    if (brokerRequest.getPinotQuery().getDataSource() == null) {
+      return brokerRequest;
+    }
+    QueryContext queryContext = 
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+    GapfillUtils.GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == null) {
+      return brokerRequest;
+    }
+    switch (gapfillType) {
+      // one sql query with gapfill only
+      case GAP_FILL:
+        return stripGapfill(brokerRequest.getPinotQuery());
+      // gapfill as subquery, the outer query may have the filter
+      case GAP_FILL_SELECT:
+        // gapfill as subquery, the outer query has the aggregation
+      case GAP_FILL_AGGREGATE:
+        // aggregation as subqery, the outer query is gapfill
+      case AGGREGATE_GAP_FILL:
+        return 
stripGapfill(brokerRequest.getPinotQuery().getDataSource().getSubquery());
+      // aggegration as second nesting subquery, gapfill as first nesting 
subquery, different aggregation as outer query
+      case AGGREGATE_GAP_FILL_AGGREGATE:
+        return 
stripGapfill(brokerRequest.getPinotQuery().getDataSource().getSubquery().getDataSource().getSubquery());
+      default:
+        return brokerRequest;
+    }
+  }
+
+  private static BrokerRequest stripGapfill(PinotQuery pinotQuery) {
+    PinotQuery copy = new PinotQuery(pinotQuery);
+    BrokerRequest brokerRequest = new BrokerRequest();
+    brokerRequest.setPinotQuery(copy);
+    // Set table name in broker request because it is used for access control, 
query routing etc.
+    DataSource dataSource = copy.getDataSource();
+    if (dataSource != null) {
+      QuerySource querySource = new QuerySource();
+      querySource.setTableName(dataSource.getTableName());
+      brokerRequest.setQuerySource(querySource);
+    }
+    List<Expression> selectList = copy.getSelectList();
+    for (int i = 0; i < selectList.size(); i++) {
+      Expression select = selectList.get(i);
+      if (select.getType() != ExpressionType.FUNCTION) {
+        continue;
+      }
+      if (GAP_FILL.equalsIgnoreCase(select.getFunctionCall().getOperator())) {
+        selectList.set(i, select.getFunctionCall().getOperands().get(0));
+        break;
+      }
+      if (AS.equalsIgnoreCase(select.getFunctionCall().getOperator())
+          && select.getFunctionCall().getOperands().get(0).getType() == 
ExpressionType.FUNCTION
+          && 
GAP_FILL.equalsIgnoreCase(select.getFunctionCall().getOperands().get(0).getFunctionCall().getOperator()))
 {
+        select.getFunctionCall().getOperands().set(0,
+            
select.getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(0));
+        break;
+      }
+    }
+
+    for (Expression orderBy : copy.getOrderByList()) {
+      if (orderBy.getType() != ExpressionType.FUNCTION) {
+        continue;
+      }
+      if (orderBy.getFunctionCall().getOperands().get(0).getType() == 
ExpressionType.FUNCTION
+          && GAP_FILL.equalsIgnoreCase(
+              
orderBy.getFunctionCall().getOperands().get(0).getFunctionCall().getOperator()))
 {
+        orderBy.getFunctionCall().getOperands().set(0,
+            
orderBy.getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(0));
+        break;
+      }
+    }
+    return brokerRequest;
+  }
+
+  public enum GapfillType {

Review comment:
       Going over all the classes, I feel `GapfillType` might not be required 
at all. All the handling can be based on levels of subqueries.
   If you feel `GapfillType` can make code cleaner, we may add a function 
`GapfillType getGapfillType(QueryContext queryContext)` and maintain it within 
the `GapfillProcessor`. No need to embed it into the `QueryContext` (in 
`QueryContext`, we try to maintain only common properties, not feature specific 
ones).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to