weixiangsun commented on a change in pull request #8029:
URL: https://github.com/apache/pinot/pull/8029#discussion_r829434243



##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -217,6 +218,10 @@ private BrokerResponseNative handleSQLRequest(long 
requestId, String query, Json
       requestStatistics.setErrorCode(QueryException.PQL_PARSING_ERROR_CODE);
       return new 
BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR,
 e));
     }
+
+    BrokerRequest originalBrokerRequest = brokerRequest;
+    brokerRequest = GapfillUtils.stripGapfill(originalBrokerRequest);

Review comment:
       Fixed

##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -2183,9 +2192,9 @@ private void attachTimeBoundary(String rawTableName, 
BrokerRequest brokerRequest
    * Processes the optimized broker requests for both OFFLINE and REALTIME 
table.
    */
   protected abstract BrokerResponseNative processBrokerRequest(long requestId, 
BrokerRequest originalBrokerRequest,
-      @Nullable BrokerRequest offlineBrokerRequest, @Nullable 
Map<ServerInstance, List<String>> offlineRoutingTable,
-      @Nullable BrokerRequest realtimeBrokerRequest, @Nullable 
Map<ServerInstance, List<String>> realtimeRoutingTable,
-      long timeoutMs, ServerStats serverStats, RequestStatistics 
requestStatistics)
+      BrokerRequest brokerRequest, @Nullable BrokerRequest 
offlineBrokerRequest, @Nullable Map<ServerInstance,

Review comment:
       Fixed

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
##########
@@ -67,9 +67,7 @@
 
 
 public class CalciteSqlParser {
-  private CalciteSqlParser() {
-  }
-
+  public static final List<QueryRewriter> QUERY_REWRITERS = new 
ArrayList<>(QueryRewriterFactory.getQueryRewriters());

Review comment:
       Fixed

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
##########
@@ -100,6 +95,9 @@ private CalciteSqlParser() {
   private static final Pattern OPTIONS_REGEX_PATTEN =
       Pattern.compile("option\\s*\\(([^\\)]+)\\)", Pattern.CASE_INSENSITIVE);
 
+  private CalciteSqlParser() {

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
##########
@@ -108,7 +108,12 @@ public BrokerResponseNative 
reduceOnDataTable(BrokerRequest brokerRequest,
     dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema, 
dataTableMap, brokerResponseNative,
         new DataTableReducerContext(_reduceExecutorService, 
_maxReduceThreadsPerQuery, reduceTimeOutMs,
             _groupByTrimThreshold), brokerMetrics);
-    updateAlias(queryContext, brokerResponseNative);
+    QueryContext originalQueryContext = 
BrokerRequestToQueryContextConverter.convert(originalBrokerRequest);

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ColumnDataToBlockValSetConverter.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * As for Gapfilling Function, all raw data will be retrieved from the pinot
+ * server and merged on the pinot broker. The data will be in {@link DataTable}
+ * format.
+ * As part of Gapfilling Function execution plan, the aggregation function will
+ * work on the merged data on pinot broker. The aggregation function only takes
+ * the {@link BlockValSet} format.
+ * This is the Helper class to convert the data from {@link DataTable} to the
+ * block of values {@link BlockValSet} which used as input to the aggregation
+ * function.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ColumnDataToBlockValSetConverter.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * As for Gapfilling Function, all raw data will be retrieved from the pinot
+ * server and merged on the pinot broker. The data will be in {@link DataTable}
+ * format.
+ * As part of Gapfilling Function execution plan, the aggregation function will
+ * work on the merged data on pinot broker. The aggregation function only takes
+ * the {@link BlockValSet} format.
+ * This is the Helper class to convert the data from {@link DataTable} to the
+ * block of values {@link BlockValSet} which used as input to the aggregation
+ * function.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ColumnDataToBlockValSetConverter implements BlockValSet {

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillProcessor.java
##########
@@ -0,0 +1,455 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.GapfillUtils;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+/**
+ * Helper class to reduce and set gap fill results into the 
BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GapFillProcessor {

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -188,6 +194,10 @@ public FilterContext getHavingFilter() {
     return _orderByExpressions;
   }
 
+  public QueryContext getSubQueryContext() {

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -85,6 +86,7 @@
   // Keep the BrokerRequest to make incremental changes
   // TODO: Remove it once the whole query engine is using the QueryContext
   private final BrokerRequest _brokerRequest;
+  private final QueryContext _subQueryContext;

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -375,6 +393,57 @@ public String toString() {
     private Map<String, String> _queryOptions;
     private Map<String, String> _debugOptions;
     private BrokerRequest _brokerRequest;
+    private QueryContext _subQueryContext;

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -436,6 +505,11 @@ public Builder setBrokerRequest(BrokerRequest 
brokerRequest) {
       return this;
     }
 
+    public Builder setSubqueryContext(QueryContext subQueryContext) {

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
##########
@@ -42,23 +42,42 @@
 import org.apache.pinot.common.utils.request.FilterQueryTree;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.GapfillUtils;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
 
 public class BrokerRequestToQueryContextConverter {
   private BrokerRequestToQueryContextConverter() {
   }
 
+  /**
+   * Validate the gapfill query.
+   */
+  public static void validateGapfillQuery(BrokerRequest brokerRequest) {
+    if (brokerRequest.getPinotQuery() != null) {
+      QueryContext queryContext = convertSQL(brokerRequest.getPinotQuery(), 
brokerRequest);
+      GapfillUtils.setGapfillType(queryContext);
+    }
+  }
+
   /**
    * Converts the given {@link BrokerRequest} into a {@link QueryContext}.
    */
   public static QueryContext convert(BrokerRequest brokerRequest) {
-    return brokerRequest.getPinotQuery() != null ? convertSQL(brokerRequest) : 
convertPQL(brokerRequest);
+    if (brokerRequest.getPinotQuery() != null) {
+      QueryContext queryContext = convertSQL(brokerRequest.getPinotQuery(), 
brokerRequest);
+      GapfillUtils.setGapfillType(queryContext);
+      return queryContext;
+    } else {
+      return convertPQL(brokerRequest);
+    }
   }
 
-  private static QueryContext convertSQL(BrokerRequest brokerRequest) {
-    PinotQuery pinotQuery = brokerRequest.getPinotQuery();
-
+  private static QueryContext convertSQL(PinotQuery pinotQuery, BrokerRequest 
brokerRequest) {
+    QueryContext subQueryContext = null;

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -375,6 +393,57 @@ public String toString() {
     private Map<String, String> _queryOptions;
     private Map<String, String> _debugOptions;
     private BrokerRequest _brokerRequest;
+    private QueryContext _subQueryContext;
+
+    /**
+     * Helper method to extract AGGREGATION FunctionContexts and FILTER 
FilterContexts from the given expression.
+     */
+    private static void getAggregations(ExpressionContext expression,
+        List<Pair<FunctionContext, FilterContext>> filteredAggregations) {
+      FunctionContext function = expression.getFunction();
+      if (function == null) {
+        return;
+      }
+      if (function.getType() == FunctionContext.Type.AGGREGATION) {
+        // Aggregation
+        filteredAggregations.add(Pair.of(function, null));
+      } else {
+        List<ExpressionContext> arguments = function.getArguments();
+        if (function.getFunctionName().equalsIgnoreCase("filter")) {
+          // Filtered aggregation
+          Preconditions.checkState(arguments.size() == 2, "FILTER must contain 
2 arguments");
+          FunctionContext aggregation = arguments.get(0).getFunction();
+          Preconditions.checkState(aggregation != null && 
aggregation.getType() == FunctionContext.Type.AGGREGATION,
+              "First argument of FILTER must be an aggregation function");
+          ExpressionContext filterExpression = arguments.get(1);
+          Preconditions.checkState(filterExpression.getFunction() != null
+                  && filterExpression.getFunction().getType() == 
FunctionContext.Type.TRANSFORM,
+              "Second argument of FILTER must be a filter expression");
+          FilterContext filter = 
RequestContextUtils.getFilter(filterExpression);
+          filteredAggregations.add(Pair.of(aggregation, filter));
+        } else {
+          // Transform
+          for (ExpressionContext argument : arguments) {
+            getAggregations(argument, filteredAggregations);
+          }
+        }
+      }
+    }
+
+    /**
+     * Helper method to extract AGGREGATION FunctionContexts and FILTER 
FilterContexts from the given filter.
+     */
+    private static void getAggregations(FilterContext filter,
+        List<Pair<FunctionContext, FilterContext>> filteredAggregations) {
+      List<FilterContext> children = filter.getChildren();
+      if (children != null) {
+        for (FilterContext child : children) {
+          getAggregations(child, filteredAggregations);
+        }
+      } else {
+        getAggregations(filter.getPredicate().getLhs(), filteredAggregations);
+      }
+    }

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ColumnDataToBlockValSetConverter.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * As for Gapfilling Function, all raw data will be retrieved from the pinot
+ * server and merged on the pinot broker. The data will be in {@link DataTable}
+ * format.
+ * As part of Gapfilling Function execution plan, the aggregation function will
+ * work on the merged data on pinot broker. The aggregation function only takes
+ * the {@link BlockValSet} format.
+ * This is the Helper class to convert the data from {@link DataTable} to the
+ * block of values {@link BlockValSet} which used as input to the aggregation
+ * function.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ColumnDataToBlockValSetConverter implements BlockValSet {
+
+  private final FieldSpec.DataType _dataType;
+  private final List<Object[]> _rows;
+  private final int _columnIndex;
+
+  public ColumnDataToBlockValSetConverter(DataSchema.ColumnDataType 
columnDataType, List<Object[]> rows,
+      int columnIndex) {
+    _dataType = columnDataType.toDataType();
+    _rows = rows;
+    _columnIndex = columnIndex;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return _dataType;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Nullable
+  @Override
+  public Dictionary getDictionary() {
+    throw new UnsupportedOperationException("Not supported");

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ColumnDataToBlockValSetConverter.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * As for Gapfilling Function, all raw data will be retrieved from the pinot
+ * server and merged on the pinot broker. The data will be in {@link DataTable}
+ * format.
+ * As part of Gapfilling Function execution plan, the aggregation function will
+ * work on the merged data on pinot broker. The aggregation function only takes
+ * the {@link BlockValSet} format.
+ * This is the Helper class to convert the data from {@link DataTable} to the
+ * block of values {@link BlockValSet} which used as input to the aggregation
+ * function.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ColumnDataToBlockValSetConverter implements BlockValSet {
+
+  private final FieldSpec.DataType _dataType;
+  private final List<Object[]> _rows;
+  private final int _columnIndex;
+
+  public ColumnDataToBlockValSetConverter(DataSchema.ColumnDataType 
columnDataType, List<Object[]> rows,
+      int columnIndex) {
+    _dataType = columnDataType.toDataType();
+    _rows = rows;
+    _columnIndex = columnIndex;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return _dataType;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Nullable
+  @Override
+  public Dictionary getDictionary() {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public int[] getDictionaryIdsSV() {
+    throw new UnsupportedOperationException("Not supported");

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ColumnDataToBlockValSetConverter.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * As for Gapfilling Function, all raw data will be retrieved from the pinot
+ * server and merged on the pinot broker. The data will be in {@link DataTable}
+ * format.
+ * As part of Gapfilling Function execution plan, the aggregation function will
+ * work on the merged data on pinot broker. The aggregation function only takes
+ * the {@link BlockValSet} format.
+ * This is the Helper class to convert the data from {@link DataTable} to the
+ * block of values {@link BlockValSet} which used as input to the aggregation
+ * function.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ColumnDataToBlockValSetConverter implements BlockValSet {
+
+  private final FieldSpec.DataType _dataType;
+  private final List<Object[]> _rows;
+  private final int _columnIndex;
+
+  public ColumnDataToBlockValSetConverter(DataSchema.ColumnDataType 
columnDataType, List<Object[]> rows,
+      int columnIndex) {
+    _dataType = columnDataType.toDataType();
+    _rows = rows;
+    _columnIndex = columnIndex;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return _dataType;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Nullable
+  @Override
+  public Dictionary getDictionary() {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public int[] getDictionaryIdsSV() {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public int[] getIntValuesSV() {
+    if (_dataType == FieldSpec.DataType.INT) {
+      int[] result = new int[_rows.size()];
+      for (int i = 0; i < result.length; i++) {
+        result[i] = (Integer) _rows.get(i)[_columnIndex];
+      }
+      return result;
+    }
+    throw new UnsupportedOperationException("Not supported");

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ColumnDataToBlockValSetConverter.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * As for Gapfilling Function, all raw data will be retrieved from the pinot

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ColumnDataToBlockValSetConverter.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * As for Gapfilling Function, all raw data will be retrieved from the pinot
+ * server and merged on the pinot broker. The data will be in {@link DataTable}
+ * format.
+ * As part of Gapfilling Function execution plan, the aggregation function will
+ * work on the merged data on pinot broker. The aggregation function only takes
+ * the {@link BlockValSet} format.
+ * This is the Helper class to convert the data from {@link DataTable} to the
+ * block of values {@link BlockValSet} which used as input to the aggregation
+ * function.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ColumnDataToBlockValSetConverter implements BlockValSet {
+
+  private final FieldSpec.DataType _dataType;
+  private final List<Object[]> _rows;
+  private final int _columnIndex;
+
+  public ColumnDataToBlockValSetConverter(DataSchema.ColumnDataType 
columnDataType, List<Object[]> rows,
+      int columnIndex) {
+    _dataType = columnDataType.toDataType();
+    _rows = rows;
+    _columnIndex = columnIndex;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return _dataType;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Nullable
+  @Override
+  public Dictionary getDictionary() {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public int[] getDictionaryIdsSV() {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public int[] getIntValuesSV() {
+    if (_dataType == FieldSpec.DataType.INT) {
+      int[] result = new int[_rows.size()];
+      for (int i = 0; i < result.length; i++) {
+        result[i] = (Integer) _rows.get(i)[_columnIndex];
+      }
+      return result;
+    }
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public long[] getLongValuesSV() {
+    if (_dataType == FieldSpec.DataType.LONG) {
+      long[] result = new long[_rows.size()];
+      for (int i = 0; i < result.length; i++) {
+        result[i] = (Long) _rows.get(i)[_columnIndex];
+      }
+      return result;
+    }
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public float[] getFloatValuesSV() {
+    if (_dataType == FieldSpec.DataType.FLOAT) {
+      float[] result = new float[_rows.size()];
+      for (int i = 0; i < result.length; i++) {
+        result[i] = (Float) _rows.get(i)[_columnIndex];
+      }
+      return result;
+    }
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public double[] getDoubleValuesSV() {
+    if (_dataType == FieldSpec.DataType.DOUBLE) {
+      double[] result = new double[_rows.size()];
+      for (int i = 0; i < result.length; i++) {
+        result[i] = (Double) _rows.get(i)[_columnIndex];
+      }
+      return result;
+    } else if (_dataType == FieldSpec.DataType.INT) {
+      double[] result = new double[_rows.size()];
+      for (int i = 0; i < result.length; i++) {
+        result[i] = ((Integer) _rows.get(i)[_columnIndex]).doubleValue();
+      }
+      return result;
+    }
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public String[] getStringValuesSV() {
+    if (_dataType == FieldSpec.DataType.STRING) {
+      String[] result = new String[_rows.size()];
+      for (int i = 0; i < result.length; i++) {
+        result[i] = (String) _rows.get(i)[_columnIndex];
+      }
+      return result;
+    }
+    throw new UnsupportedOperationException("Not supported");

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/filter/ValueExtractorFactory.java
##########
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce.filter;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+
+
+/**
+ * Value extractor for the post-aggregation function or pre-aggregation gap 
fill.

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/filter/RowMatcherFactory.java
##########
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce.filter;
+
+import org.apache.pinot.common.request.context.FilterContext;
+
+
+/**
+ * Factory for RowMatcher.
+ */
+public interface RowMatcherFactory {

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/filter/ValueExtractorFactory.java
##########
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce.filter;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+
+
+/**
+ * Value extractor for the post-aggregation function or pre-aggregation gap 
fill.
+ */
+public interface ValueExtractorFactory {
+  ValueExtractor getValueExtractor(ExpressionContext expression);

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapfillFilterHandler.java
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.reduce.filter.ColumnValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.LiteralValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.RowMatcher;
+import org.apache.pinot.core.query.reduce.filter.RowMatcherFactory;
+import org.apache.pinot.core.query.reduce.filter.ValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.ValueExtractorFactory;
+import org.apache.pinot.core.util.GapfillUtils;
+
+
+/**
+ * Handler for Filter clause of GapFill.
+ */
+public class GapfillFilterHandler implements ValueExtractorFactory {
+  private final RowMatcher _rowMatcher;
+  private final DataSchema _dataSchema;
+  private final Map<String, Integer> _indexes;
+
+  public GapfillFilterHandler(FilterContext filter, DataSchema dataSchema) {
+    _dataSchema = dataSchema;
+    _indexes = new HashMap<>();
+    for (int i = 0; i < _dataSchema.size(); i++) {
+      _indexes.put(_dataSchema.getColumnName(i), i);
+    }
+    _rowMatcher = RowMatcherFactory.getRowMatcher(filter, this);
+  }
+
+  /**
+   * Returns {@code true} if the given row matches the HAVING clause, {@code 
false} otherwise.
+   */
+  public boolean isMatch(Object[] row) {
+    return _rowMatcher.isMatch(row);
+  }
+
+  /**
+   * Returns a ValueExtractor based on the given expression.
+   */
+  @Override
+  public ValueExtractor getValueExtractor(ExpressionContext expression) {
+    expression = GapfillUtils.stripGapfill(expression);
+    if (expression.getType() == ExpressionContext.Type.LITERAL) {
+      // Literal
+      return new LiteralValueExtractor(expression.getLiteral());
+    }
+
+    if (expression.getType() == ExpressionContext.Type.IDENTIFIER) {
+      return new 
ColumnValueExtractor(_indexes.get(expression.getIdentifier()), _dataSchema);
+    } else {
+      return new 
ColumnValueExtractor(_indexes.get(expression.getFunction().toString()), 
_dataSchema);

Review comment:
       Add TODO

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapfillFilterHandler.java
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.reduce.filter.ColumnValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.LiteralValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.RowMatcher;
+import org.apache.pinot.core.query.reduce.filter.RowMatcherFactory;
+import org.apache.pinot.core.query.reduce.filter.ValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.ValueExtractorFactory;
+import org.apache.pinot.core.util.GapfillUtils;
+
+
+/**
+ * Handler for Filter clause of GapFill.
+ */
+public class GapfillFilterHandler implements ValueExtractorFactory {
+  private final RowMatcher _rowMatcher;
+  private final DataSchema _dataSchema;
+  private final Map<String, Integer> _indexes;
+
+  public GapfillFilterHandler(FilterContext filter, DataSchema dataSchema) {
+    _dataSchema = dataSchema;
+    _indexes = new HashMap<>();
+    for (int i = 0; i < _dataSchema.size(); i++) {
+      _indexes.put(_dataSchema.getColumnName(i), i);

Review comment:
       Add TODO

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
##########
@@ -88,6 +83,8 @@ private SelectionOperatorUtils() {
       ThreadLocal.withInitial(() -> new DecimalFormat(FLOAT_PATTERN, 
DECIMAL_FORMAT_SYMBOLS));
   private static final ThreadLocal<DecimalFormat> THREAD_LOCAL_DOUBLE_FORMAT =
       ThreadLocal.withInitial(() -> new DecimalFormat(DOUBLE_PATTERN, 
DECIMAL_FORMAT_SYMBOLS));
+  private SelectionOperatorUtils() {

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
##########
@@ -391,6 +388,9 @@ public static DataTable 
getDataTableFromRows(Collection<Object[]> rows, DataSche
           row[i] = dataTable.getStringArray(rowId, i);
           break;
 
+        case OBJECT:

Review comment:
       Revert it.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
##########
@@ -114,6 +115,7 @@ public AsyncQueryResponse submitQuery(long requestId, 
String rawTableName,
     Map<ServerRoutingInstance, InstanceRequest> requestMap = new HashMap<>();
     if (offlineBrokerRequest != null) {
       assert offlineRoutingTable != null;
+      
BrokerRequestToQueryContextConverter.validateGapfillQuery(offlineBrokerRequest);

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
##########
@@ -42,23 +42,42 @@
 import org.apache.pinot.common.utils.request.FilterQueryTree;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.GapfillUtils;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
 
 public class BrokerRequestToQueryContextConverter {
   private BrokerRequestToQueryContextConverter() {
   }
 
+  /**
+   * Validate the gapfill query.
+   */
+  public static void validateGapfillQuery(BrokerRequest brokerRequest) {

Review comment:
       Fixed

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
##########
@@ -71,12 +86,15 @@ public static boolean isFill(ExpressionContext 
expressionContext) {
       return false;
     }
 
-    return 
FILL.equals(canonicalizeFunctionName(expressionContext.getFunction().getFunctionName()));
+    return 
FILL.equalsIgnoreCase(canonicalizeFunctionName(expressionContext.getFunction().getFunctionName()));

Review comment:
       Fixed

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
##########
@@ -119,4 +137,265 @@ static public Serializable 
getDefaultValue(DataSchema.ColumnDataType dataType) {
   private static String canonicalizeFunctionName(String functionName) {
     return StringUtils.remove(functionName, '_').toLowerCase();
   }
+
+  public static boolean isGapfill(ExpressionContext expressionContext) {
+    if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
+      return false;
+    }
+
+    return 
GAP_FILL.equals(canonicalizeFunctionName(expressionContext.getFunction().getFunctionName()));
+  }
+
+  private static boolean isGapfill(QueryContext queryContext) {
+    for (ExpressionContext expressionContext : 
queryContext.getSelectExpressions()) {
+      if (isGapfill(expressionContext)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Get the gapfill type for queryContext. Also do the validation for gapfill 
request.
+   * @param queryContext
+   */
+  public static void setGapfillType(QueryContext queryContext) {
+    GapfillType gapfillType = null;
+    if (queryContext.getSubQueryContext() == null) {
+      if (isGapfill(queryContext)) {
+        Preconditions.checkArgument(queryContext.getAggregationFunctions() == 
null,
+            "Aggregation and Gapfill can not be in the same sql statement.");
+        gapfillType = GapfillType.GAP_FILL;
+      }
+    } else if (isGapfill(queryContext)) {
+      
Preconditions.checkArgument(queryContext.getSubQueryContext().getAggregationFunctions()
 != null,
+          "Select and Gapfill should be in the same sql statement.");
+      
Preconditions.checkArgument(queryContext.getSubQueryContext().getSubQueryContext()
 == null,
+          "There is no three levels nesting sql when the outer query is 
gapfill.");
+      gapfillType = GapfillType.AGGREGATE_GAP_FILL;
+    } else if (isGapfill(queryContext.getSubQueryContext())) {
+      if (queryContext.getAggregationFunctions() == null) {
+        gapfillType = GapfillType.GAP_FILL_SELECT;
+      } else if (queryContext.getSubQueryContext().getSubQueryContext() == 
null) {
+        gapfillType = GapfillType.GAP_FILL_AGGREGATE;
+      } else {
+        Preconditions
+            
.checkArgument(queryContext.getSubQueryContext().getSubQueryContext().getAggregationFunctions()
 != null,
+                "Select cannot happen before gapfill.");
+        gapfillType = GapfillType.AGGREGATE_GAP_FILL_AGGREGATE;
+      }
+    }
+
+    queryContext.setGapfillType(gapfillType);
+    if (gapfillType == null) {
+      return;
+    }
+
+    ExpressionContext gapFillSelection = 
GapfillUtils.getGapfillExpressionContext(queryContext);
+
+    Preconditions.checkArgument(gapFillSelection != null && 
gapFillSelection.getFunction() != null,
+        "Gapfill Expression should be function.");
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    Preconditions.checkArgument(args.size() > 5, "PreAggregateGapFill does not 
have correct number of arguments.");
+    Preconditions.checkArgument(args.get(1).getLiteral() != null,
+        "The second argument of PostAggregateGapFill should be 
TimeFormatter.");
+    Preconditions.checkArgument(args.get(2).getLiteral() != null,
+        "The third argument of PostAggregateGapFill should be start time.");
+    Preconditions.checkArgument(args.get(3).getLiteral() != null,
+        "The fourth argument of PostAggregateGapFill should be end time.");
+    Preconditions.checkArgument(args.get(4).getLiteral() != null,
+        "The fifth argument of PostAggregateGapFill should be time bucket 
size.");
+
+    ExpressionContext timeseriesOn = 
GapfillUtils.getTimeSeriesOnExpressionContext(gapFillSelection);
+    Preconditions.checkArgument(timeseriesOn != null, "The TimeSeriesOn 
expressions should be specified.");
+
+    if (queryContext.getAggregationFunctions() == null) {
+      return;
+    }
+
+    List<ExpressionContext> groupbyExpressions = 
queryContext.getGroupByExpressions();
+    Preconditions.checkArgument(groupbyExpressions != null, "No GroupBy 
Clause.");
+    List<ExpressionContext> innerSelections = 
queryContext.getSubQueryContext().getSelectExpressions();
+    String timeBucketCol = null;
+    List<String> strAlias = queryContext.getSubQueryContext().getAliasList();
+    for (int i = 0; i < innerSelections.size(); i++) {
+      ExpressionContext innerSelection = innerSelections.get(i);
+      if (GapfillUtils.isGapfill(innerSelection)) {
+        if (strAlias.get(i) != null) {
+          timeBucketCol = strAlias.get(i);
+        } else {
+          timeBucketCol = 
innerSelection.getFunction().getArguments().get(0).toString();
+        }
+        break;
+      }
+    }
+
+    Preconditions.checkArgument(timeBucketCol != null, "No Group By 
timebucket.");
+
+    boolean findTimeBucket = false;
+    for (ExpressionContext groupbyExp : groupbyExpressions) {
+      if (timeBucketCol.equals(groupbyExp.toString())) {
+        findTimeBucket = true;
+        break;
+      }
+    }
+
+    Preconditions.checkArgument(findTimeBucket, "No Group By timebucket.");
+  }
+
+  private static ExpressionContext findGapfillExpressionContext(QueryContext 
queryContext) {
+    for (ExpressionContext expressionContext : 
queryContext.getSelectExpressions()) {
+      if (isGapfill(expressionContext)) {
+        return expressionContext;
+      }
+    }
+    return null;
+  }
+
+  public static ExpressionContext getGapfillExpressionContext(QueryContext 
queryContext) {
+    GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == GapfillType.AGGREGATE_GAP_FILL || gapfillType == 
GapfillType.GAP_FILL) {
+      return findGapfillExpressionContext(queryContext);
+    } else if (gapfillType == GapfillType.GAP_FILL_AGGREGATE || gapfillType == 
GapfillType.AGGREGATE_GAP_FILL_AGGREGATE
+        || gapfillType == GapfillType.GAP_FILL_SELECT) {
+      return findGapfillExpressionContext(queryContext.getSubQueryContext());
+    } else {
+      return null;
+    }
+  }
+
+  public static int findTimeBucketColumnIndex(QueryContext queryContext) {
+    GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == GapfillType.GAP_FILL_AGGREGATE
+        || gapfillType == GapfillType.GAP_FILL_SELECT
+        || gapfillType == GapfillType.AGGREGATE_GAP_FILL_AGGREGATE) {
+      queryContext = queryContext.getSubQueryContext();
+    }
+    List<ExpressionContext> expressionContexts = 
queryContext.getSelectExpressions();
+    for (int i = 0; i < expressionContexts.size(); i++) {
+      if (isGapfill(expressionContexts.get(i))) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  public static ExpressionContext 
getTimeSeriesOnExpressionContext(ExpressionContext gapFillSelection) {
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    for (int i = STARTING_INDEX_OF_OPTIONAL_ARGS_FOR_PRE_AGGREGATE_GAP_FILL; i 
< args.size(); i++) {
+      if (GapfillUtils.isTimeSeriesOn(args.get(i))) {
+        return args.get(i);
+      }
+    }
+    return null;
+  }
+
+  public static Map<String, ExpressionContext> 
getFillExpressions(ExpressionContext gapFillSelection) {
+    Map<String, ExpressionContext> fillExpressions = new HashMap<>();
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    for (int i = STARTING_INDEX_OF_OPTIONAL_ARGS_FOR_PRE_AGGREGATE_GAP_FILL; i 
< args.size(); i++) {
+      if (GapfillUtils.isFill(args.get(i))) {
+        ExpressionContext fillExpression = args.get(i);
+        
fillExpressions.put(fillExpression.getFunction().getArguments().get(0).getIdentifier(),
 fillExpression);
+      }
+    }
+    return fillExpressions;
+  }
+
+  public static String getTableName(PinotQuery pinotQuery) {
+    while (pinotQuery.getDataSource().getSubquery() != null) {
+      pinotQuery = pinotQuery.getDataSource().getSubquery();
+    }
+    return pinotQuery.getDataSource().getTableName();
+  }
+
+  public static BrokerRequest stripGapfill(BrokerRequest brokerRequest) {
+    if (brokerRequest.getPinotQuery().getDataSource() == null) {
+      return brokerRequest;
+    }
+    QueryContext queryContext = 
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+    GapfillUtils.GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == null) {
+      return brokerRequest;
+    }
+    switch (gapfillType) {
+      // one sql query with gapfill only
+      case GAP_FILL:
+        return stripGapfill(brokerRequest.getPinotQuery());
+      // gapfill as subquery, the outer query may have the filter
+      case GAP_FILL_SELECT:
+        // gapfill as subquery, the outer query has the aggregation
+      case GAP_FILL_AGGREGATE:
+        // aggregation as subqery, the outer query is gapfill
+      case AGGREGATE_GAP_FILL:
+        return 
stripGapfill(brokerRequest.getPinotQuery().getDataSource().getSubquery());
+      // aggegration as second nesting subquery, gapfill as first nesting 
subquery, different aggregation as outer query
+      case AGGREGATE_GAP_FILL_AGGREGATE:
+        return 
stripGapfill(brokerRequest.getPinotQuery().getDataSource().getSubquery().getDataSource().getSubquery());
+      default:
+        return brokerRequest;
+    }
+  }
+
+  private static BrokerRequest stripGapfill(PinotQuery pinotQuery) {
+    PinotQuery copy = new PinotQuery(pinotQuery);
+    BrokerRequest brokerRequest = new BrokerRequest();
+    brokerRequest.setPinotQuery(copy);
+    // Set table name in broker request because it is used for access control, 
query routing etc.
+    DataSource dataSource = copy.getDataSource();
+    if (dataSource != null) {
+      QuerySource querySource = new QuerySource();
+      querySource.setTableName(dataSource.getTableName());
+      brokerRequest.setQuerySource(querySource);
+    }
+    List<Expression> selectList = copy.getSelectList();
+    for (int i = 0; i < selectList.size(); i++) {
+      Expression select = selectList.get(i);
+      if (select.getType() != ExpressionType.FUNCTION) {
+        continue;
+      }
+      if (GAP_FILL.equalsIgnoreCase(select.getFunctionCall().getOperator())) {

Review comment:
       Fixed

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
##########
@@ -119,4 +137,265 @@ static public Serializable 
getDefaultValue(DataSchema.ColumnDataType dataType) {
   private static String canonicalizeFunctionName(String functionName) {
     return StringUtils.remove(functionName, '_').toLowerCase();
   }
+
+  public static boolean isGapfill(ExpressionContext expressionContext) {
+    if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
+      return false;
+    }
+
+    return 
GAP_FILL.equals(canonicalizeFunctionName(expressionContext.getFunction().getFunctionName()));
+  }
+
+  private static boolean isGapfill(QueryContext queryContext) {
+    for (ExpressionContext expressionContext : 
queryContext.getSelectExpressions()) {
+      if (isGapfill(expressionContext)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Get the gapfill type for queryContext. Also do the validation for gapfill 
request.
+   * @param queryContext
+   */
+  public static void setGapfillType(QueryContext queryContext) {
+    GapfillType gapfillType = null;
+    if (queryContext.getSubQueryContext() == null) {
+      if (isGapfill(queryContext)) {
+        Preconditions.checkArgument(queryContext.getAggregationFunctions() == 
null,
+            "Aggregation and Gapfill can not be in the same sql statement.");
+        gapfillType = GapfillType.GAP_FILL;
+      }
+    } else if (isGapfill(queryContext)) {
+      
Preconditions.checkArgument(queryContext.getSubQueryContext().getAggregationFunctions()
 != null,
+          "Select and Gapfill should be in the same sql statement.");
+      
Preconditions.checkArgument(queryContext.getSubQueryContext().getSubQueryContext()
 == null,
+          "There is no three levels nesting sql when the outer query is 
gapfill.");
+      gapfillType = GapfillType.AGGREGATE_GAP_FILL;
+    } else if (isGapfill(queryContext.getSubQueryContext())) {
+      if (queryContext.getAggregationFunctions() == null) {
+        gapfillType = GapfillType.GAP_FILL_SELECT;
+      } else if (queryContext.getSubQueryContext().getSubQueryContext() == 
null) {
+        gapfillType = GapfillType.GAP_FILL_AGGREGATE;
+      } else {
+        Preconditions
+            
.checkArgument(queryContext.getSubQueryContext().getSubQueryContext().getAggregationFunctions()
 != null,
+                "Select cannot happen before gapfill.");
+        gapfillType = GapfillType.AGGREGATE_GAP_FILL_AGGREGATE;
+      }
+    }
+
+    queryContext.setGapfillType(gapfillType);
+    if (gapfillType == null) {
+      return;
+    }
+
+    ExpressionContext gapFillSelection = 
GapfillUtils.getGapfillExpressionContext(queryContext);
+
+    Preconditions.checkArgument(gapFillSelection != null && 
gapFillSelection.getFunction() != null,
+        "Gapfill Expression should be function.");
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    Preconditions.checkArgument(args.size() > 5, "PreAggregateGapFill does not 
have correct number of arguments.");
+    Preconditions.checkArgument(args.get(1).getLiteral() != null,
+        "The second argument of PostAggregateGapFill should be 
TimeFormatter.");
+    Preconditions.checkArgument(args.get(2).getLiteral() != null,
+        "The third argument of PostAggregateGapFill should be start time.");
+    Preconditions.checkArgument(args.get(3).getLiteral() != null,
+        "The fourth argument of PostAggregateGapFill should be end time.");
+    Preconditions.checkArgument(args.get(4).getLiteral() != null,
+        "The fifth argument of PostAggregateGapFill should be time bucket 
size.");
+
+    ExpressionContext timeseriesOn = 
GapfillUtils.getTimeSeriesOnExpressionContext(gapFillSelection);
+    Preconditions.checkArgument(timeseriesOn != null, "The TimeSeriesOn 
expressions should be specified.");
+
+    if (queryContext.getAggregationFunctions() == null) {
+      return;
+    }
+
+    List<ExpressionContext> groupbyExpressions = 
queryContext.getGroupByExpressions();
+    Preconditions.checkArgument(groupbyExpressions != null, "No GroupBy 
Clause.");
+    List<ExpressionContext> innerSelections = 
queryContext.getSubQueryContext().getSelectExpressions();
+    String timeBucketCol = null;
+    List<String> strAlias = queryContext.getSubQueryContext().getAliasList();
+    for (int i = 0; i < innerSelections.size(); i++) {
+      ExpressionContext innerSelection = innerSelections.get(i);
+      if (GapfillUtils.isGapfill(innerSelection)) {
+        if (strAlias.get(i) != null) {
+          timeBucketCol = strAlias.get(i);
+        } else {
+          timeBucketCol = 
innerSelection.getFunction().getArguments().get(0).toString();
+        }
+        break;
+      }
+    }
+
+    Preconditions.checkArgument(timeBucketCol != null, "No Group By 
timebucket.");
+
+    boolean findTimeBucket = false;
+    for (ExpressionContext groupbyExp : groupbyExpressions) {
+      if (timeBucketCol.equals(groupbyExp.toString())) {
+        findTimeBucket = true;
+        break;
+      }
+    }
+
+    Preconditions.checkArgument(findTimeBucket, "No Group By timebucket.");
+  }
+
+  private static ExpressionContext findGapfillExpressionContext(QueryContext 
queryContext) {
+    for (ExpressionContext expressionContext : 
queryContext.getSelectExpressions()) {
+      if (isGapfill(expressionContext)) {
+        return expressionContext;
+      }
+    }
+    return null;
+  }
+
+  public static ExpressionContext getGapfillExpressionContext(QueryContext 
queryContext) {
+    GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == GapfillType.AGGREGATE_GAP_FILL || gapfillType == 
GapfillType.GAP_FILL) {
+      return findGapfillExpressionContext(queryContext);
+    } else if (gapfillType == GapfillType.GAP_FILL_AGGREGATE || gapfillType == 
GapfillType.AGGREGATE_GAP_FILL_AGGREGATE
+        || gapfillType == GapfillType.GAP_FILL_SELECT) {
+      return findGapfillExpressionContext(queryContext.getSubQueryContext());
+    } else {
+      return null;
+    }
+  }
+
+  public static int findTimeBucketColumnIndex(QueryContext queryContext) {
+    GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == GapfillType.GAP_FILL_AGGREGATE
+        || gapfillType == GapfillType.GAP_FILL_SELECT
+        || gapfillType == GapfillType.AGGREGATE_GAP_FILL_AGGREGATE) {
+      queryContext = queryContext.getSubQueryContext();
+    }
+    List<ExpressionContext> expressionContexts = 
queryContext.getSelectExpressions();
+    for (int i = 0; i < expressionContexts.size(); i++) {
+      if (isGapfill(expressionContexts.get(i))) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  public static ExpressionContext 
getTimeSeriesOnExpressionContext(ExpressionContext gapFillSelection) {
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    for (int i = STARTING_INDEX_OF_OPTIONAL_ARGS_FOR_PRE_AGGREGATE_GAP_FILL; i 
< args.size(); i++) {
+      if (GapfillUtils.isTimeSeriesOn(args.get(i))) {
+        return args.get(i);
+      }
+    }
+    return null;
+  }
+
+  public static Map<String, ExpressionContext> 
getFillExpressions(ExpressionContext gapFillSelection) {
+    Map<String, ExpressionContext> fillExpressions = new HashMap<>();
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    for (int i = STARTING_INDEX_OF_OPTIONAL_ARGS_FOR_PRE_AGGREGATE_GAP_FILL; i 
< args.size(); i++) {
+      if (GapfillUtils.isFill(args.get(i))) {
+        ExpressionContext fillExpression = args.get(i);
+        
fillExpressions.put(fillExpression.getFunction().getArguments().get(0).getIdentifier(),
 fillExpression);
+      }
+    }
+    return fillExpressions;
+  }
+
+  public static String getTableName(PinotQuery pinotQuery) {
+    while (pinotQuery.getDataSource().getSubquery() != null) {
+      pinotQuery = pinotQuery.getDataSource().getSubquery();
+    }
+    return pinotQuery.getDataSource().getTableName();
+  }
+
+  public static BrokerRequest stripGapfill(BrokerRequest brokerRequest) {
+    if (brokerRequest.getPinotQuery().getDataSource() == null) {
+      return brokerRequest;
+    }
+    QueryContext queryContext = 
BrokerRequestToQueryContextConverter.convert(brokerRequest);

Review comment:
       Fixed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -133,6 +137,8 @@ private QueryContext(String tableName, 
List<ExpressionContext> selectExpressions
     _queryOptions = queryOptions;
     _debugOptions = debugOptions;
     _brokerRequest = brokerRequest;
+    _gapfillType = null;

Review comment:
       Fixed

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
##########
@@ -119,4 +137,265 @@ static public Serializable 
getDefaultValue(DataSchema.ColumnDataType dataType) {
   private static String canonicalizeFunctionName(String functionName) {
     return StringUtils.remove(functionName, '_').toLowerCase();
   }
+
+  public static boolean isGapfill(ExpressionContext expressionContext) {
+    if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
+      return false;
+    }
+
+    return 
GAP_FILL.equals(canonicalizeFunctionName(expressionContext.getFunction().getFunctionName()));
+  }
+
+  private static boolean isGapfill(QueryContext queryContext) {
+    for (ExpressionContext expressionContext : 
queryContext.getSelectExpressions()) {
+      if (isGapfill(expressionContext)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Get the gapfill type for queryContext. Also do the validation for gapfill 
request.
+   * @param queryContext
+   */
+  public static void setGapfillType(QueryContext queryContext) {
+    GapfillType gapfillType = null;
+    if (queryContext.getSubQueryContext() == null) {
+      if (isGapfill(queryContext)) {
+        Preconditions.checkArgument(queryContext.getAggregationFunctions() == 
null,
+            "Aggregation and Gapfill can not be in the same sql statement.");
+        gapfillType = GapfillType.GAP_FILL;
+      }
+    } else if (isGapfill(queryContext)) {
+      
Preconditions.checkArgument(queryContext.getSubQueryContext().getAggregationFunctions()
 != null,
+          "Select and Gapfill should be in the same sql statement.");
+      
Preconditions.checkArgument(queryContext.getSubQueryContext().getSubQueryContext()
 == null,
+          "There is no three levels nesting sql when the outer query is 
gapfill.");
+      gapfillType = GapfillType.AGGREGATE_GAP_FILL;
+    } else if (isGapfill(queryContext.getSubQueryContext())) {
+      if (queryContext.getAggregationFunctions() == null) {
+        gapfillType = GapfillType.GAP_FILL_SELECT;
+      } else if (queryContext.getSubQueryContext().getSubQueryContext() == 
null) {
+        gapfillType = GapfillType.GAP_FILL_AGGREGATE;
+      } else {
+        Preconditions
+            
.checkArgument(queryContext.getSubQueryContext().getSubQueryContext().getAggregationFunctions()
 != null,
+                "Select cannot happen before gapfill.");
+        gapfillType = GapfillType.AGGREGATE_GAP_FILL_AGGREGATE;
+      }
+    }
+
+    queryContext.setGapfillType(gapfillType);
+    if (gapfillType == null) {
+      return;
+    }
+
+    ExpressionContext gapFillSelection = 
GapfillUtils.getGapfillExpressionContext(queryContext);
+
+    Preconditions.checkArgument(gapFillSelection != null && 
gapFillSelection.getFunction() != null,
+        "Gapfill Expression should be function.");
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    Preconditions.checkArgument(args.size() > 5, "PreAggregateGapFill does not 
have correct number of arguments.");
+    Preconditions.checkArgument(args.get(1).getLiteral() != null,
+        "The second argument of PostAggregateGapFill should be 
TimeFormatter.");
+    Preconditions.checkArgument(args.get(2).getLiteral() != null,
+        "The third argument of PostAggregateGapFill should be start time.");
+    Preconditions.checkArgument(args.get(3).getLiteral() != null,
+        "The fourth argument of PostAggregateGapFill should be end time.");
+    Preconditions.checkArgument(args.get(4).getLiteral() != null,
+        "The fifth argument of PostAggregateGapFill should be time bucket 
size.");
+
+    ExpressionContext timeseriesOn = 
GapfillUtils.getTimeSeriesOnExpressionContext(gapFillSelection);
+    Preconditions.checkArgument(timeseriesOn != null, "The TimeSeriesOn 
expressions should be specified.");
+
+    if (queryContext.getAggregationFunctions() == null) {
+      return;
+    }
+
+    List<ExpressionContext> groupbyExpressions = 
queryContext.getGroupByExpressions();
+    Preconditions.checkArgument(groupbyExpressions != null, "No GroupBy 
Clause.");
+    List<ExpressionContext> innerSelections = 
queryContext.getSubQueryContext().getSelectExpressions();
+    String timeBucketCol = null;
+    List<String> strAlias = queryContext.getSubQueryContext().getAliasList();
+    for (int i = 0; i < innerSelections.size(); i++) {
+      ExpressionContext innerSelection = innerSelections.get(i);
+      if (GapfillUtils.isGapfill(innerSelection)) {
+        if (strAlias.get(i) != null) {
+          timeBucketCol = strAlias.get(i);
+        } else {
+          timeBucketCol = 
innerSelection.getFunction().getArguments().get(0).toString();
+        }
+        break;
+      }
+    }
+
+    Preconditions.checkArgument(timeBucketCol != null, "No Group By 
timebucket.");
+
+    boolean findTimeBucket = false;
+    for (ExpressionContext groupbyExp : groupbyExpressions) {
+      if (timeBucketCol.equals(groupbyExp.toString())) {
+        findTimeBucket = true;
+        break;
+      }
+    }
+
+    Preconditions.checkArgument(findTimeBucket, "No Group By timebucket.");
+  }
+
+  private static ExpressionContext findGapfillExpressionContext(QueryContext 
queryContext) {
+    for (ExpressionContext expressionContext : 
queryContext.getSelectExpressions()) {
+      if (isGapfill(expressionContext)) {
+        return expressionContext;
+      }
+    }
+    return null;
+  }
+
+  public static ExpressionContext getGapfillExpressionContext(QueryContext 
queryContext) {
+    GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == GapfillType.AGGREGATE_GAP_FILL || gapfillType == 
GapfillType.GAP_FILL) {
+      return findGapfillExpressionContext(queryContext);
+    } else if (gapfillType == GapfillType.GAP_FILL_AGGREGATE || gapfillType == 
GapfillType.AGGREGATE_GAP_FILL_AGGREGATE
+        || gapfillType == GapfillType.GAP_FILL_SELECT) {
+      return findGapfillExpressionContext(queryContext.getSubQueryContext());
+    } else {
+      return null;
+    }
+  }
+
+  public static int findTimeBucketColumnIndex(QueryContext queryContext) {
+    GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == GapfillType.GAP_FILL_AGGREGATE
+        || gapfillType == GapfillType.GAP_FILL_SELECT
+        || gapfillType == GapfillType.AGGREGATE_GAP_FILL_AGGREGATE) {
+      queryContext = queryContext.getSubQueryContext();
+    }
+    List<ExpressionContext> expressionContexts = 
queryContext.getSelectExpressions();
+    for (int i = 0; i < expressionContexts.size(); i++) {
+      if (isGapfill(expressionContexts.get(i))) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  public static ExpressionContext 
getTimeSeriesOnExpressionContext(ExpressionContext gapFillSelection) {
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    for (int i = STARTING_INDEX_OF_OPTIONAL_ARGS_FOR_PRE_AGGREGATE_GAP_FILL; i 
< args.size(); i++) {
+      if (GapfillUtils.isTimeSeriesOn(args.get(i))) {
+        return args.get(i);
+      }
+    }
+    return null;
+  }
+
+  public static Map<String, ExpressionContext> 
getFillExpressions(ExpressionContext gapFillSelection) {
+    Map<String, ExpressionContext> fillExpressions = new HashMap<>();
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+    for (int i = STARTING_INDEX_OF_OPTIONAL_ARGS_FOR_PRE_AGGREGATE_GAP_FILL; i 
< args.size(); i++) {
+      if (GapfillUtils.isFill(args.get(i))) {
+        ExpressionContext fillExpression = args.get(i);
+        
fillExpressions.put(fillExpression.getFunction().getArguments().get(0).getIdentifier(),
 fillExpression);
+      }
+    }
+    return fillExpressions;
+  }
+
+  public static String getTableName(PinotQuery pinotQuery) {
+    while (pinotQuery.getDataSource().getSubquery() != null) {
+      pinotQuery = pinotQuery.getDataSource().getSubquery();
+    }
+    return pinotQuery.getDataSource().getTableName();
+  }
+
+  public static BrokerRequest stripGapfill(BrokerRequest brokerRequest) {
+    if (brokerRequest.getPinotQuery().getDataSource() == null) {
+      return brokerRequest;
+    }
+    QueryContext queryContext = 
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+    GapfillUtils.GapfillType gapfillType = queryContext.getGapfillType();
+    if (gapfillType == null) {
+      return brokerRequest;
+    }
+    switch (gapfillType) {
+      // one sql query with gapfill only
+      case GAP_FILL:
+        return stripGapfill(brokerRequest.getPinotQuery());
+      // gapfill as subquery, the outer query may have the filter
+      case GAP_FILL_SELECT:
+        // gapfill as subquery, the outer query has the aggregation
+      case GAP_FILL_AGGREGATE:
+        // aggregation as subqery, the outer query is gapfill
+      case AGGREGATE_GAP_FILL:
+        return 
stripGapfill(brokerRequest.getPinotQuery().getDataSource().getSubquery());
+      // aggegration as second nesting subquery, gapfill as first nesting 
subquery, different aggregation as outer query
+      case AGGREGATE_GAP_FILL_AGGREGATE:
+        return 
stripGapfill(brokerRequest.getPinotQuery().getDataSource().getSubquery().getDataSource().getSubquery());
+      default:
+        return brokerRequest;
+    }
+  }
+
+  private static BrokerRequest stripGapfill(PinotQuery pinotQuery) {
+    PinotQuery copy = new PinotQuery(pinotQuery);
+    BrokerRequest brokerRequest = new BrokerRequest();
+    brokerRequest.setPinotQuery(copy);
+    // Set table name in broker request because it is used for access control, 
query routing etc.
+    DataSource dataSource = copy.getDataSource();
+    if (dataSource != null) {
+      QuerySource querySource = new QuerySource();
+      querySource.setTableName(dataSource.getTableName());
+      brokerRequest.setQuerySource(querySource);
+    }
+    List<Expression> selectList = copy.getSelectList();
+    for (int i = 0; i < selectList.size(); i++) {
+      Expression select = selectList.get(i);
+      if (select.getType() != ExpressionType.FUNCTION) {
+        continue;
+      }
+      if (GAP_FILL.equalsIgnoreCase(select.getFunctionCall().getOperator())) {
+        selectList.set(i, select.getFunctionCall().getOperands().get(0));
+        break;
+      }
+      if (AS.equalsIgnoreCase(select.getFunctionCall().getOperator())
+          && select.getFunctionCall().getOperands().get(0).getType() == 
ExpressionType.FUNCTION
+          && 
GAP_FILL.equalsIgnoreCase(select.getFunctionCall().getOperands().get(0).getFunctionCall().getOperator()))
 {
+        select.getFunctionCall().getOperands().set(0,
+            
select.getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(0));
+        break;
+      }
+    }
+
+    for (Expression orderBy : copy.getOrderByList()) {
+      if (orderBy.getType() != ExpressionType.FUNCTION) {
+        continue;
+      }
+      if (orderBy.getFunctionCall().getOperands().get(0).getType() == 
ExpressionType.FUNCTION
+          && GAP_FILL.equalsIgnoreCase(
+              
orderBy.getFunctionCall().getOperands().get(0).getFunctionCall().getOperator()))
 {
+        orderBy.getFunctionCall().getOperands().set(0,
+            
orderBy.getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(0));
+        break;
+      }
+    }
+    return brokerRequest;
+  }
+
+  public enum GapfillType {

Review comment:
       Fixed.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillProcessor.java
##########
@@ -0,0 +1,455 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.GapfillUtils;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+/**
+ * Helper class to reduce and set gap fill results into the 
BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GapFillProcessor {
+  private final QueryContext _queryContext;
+
+  private final int _limitForAggregatedResult;
+  private final DateTimeGranularitySpec _dateTimeGranularity;
+  private final DateTimeFormatSpec _dateTimeFormatter;
+  private final long _startMs;
+  private final long _endMs;
+  private final long _timeBucketSize;
+  private final int _numOfTimeBuckets;
+  private final List<Integer> _groupByKeyIndexes;
+  private final Set<Key> _groupByKeys;
+  private final Map<Key, Object[]> _previousByGroupKey;
+  private final Map<String, ExpressionContext> _fillExpressions;
+  private final List<ExpressionContext> _timeSeries;
+  private final GapfillUtils.GapfillType _gapfillType;
+  private int _limitForGapfilledResult;
+  private boolean[] _isGroupBySelections;
+  private final int _timeBucketColumnIndex;
+  private int[] _sourceColumnIndexForResultSchema = null;
+
+  GapFillProcessor(QueryContext queryContext) {
+    _queryContext = queryContext;
+    _gapfillType = queryContext.getGapfillType();
+    _limitForAggregatedResult = queryContext.getLimit();
+    if (_gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL
+        || _gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
+      _limitForGapfilledResult = queryContext.getLimit();
+    } else {
+      _limitForGapfilledResult = queryContext.getSubQueryContext().getLimit();
+    }
+
+    ExpressionContext gapFillSelection = 
GapfillUtils.getGapfillExpressionContext(queryContext);
+    _timeBucketColumnIndex = 
GapfillUtils.findTimeBucketColumnIndex(queryContext);
+
+    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
+
+    _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral());
+    _dateTimeGranularity = new 
DateTimeGranularitySpec(args.get(4).getLiteral());
+    String start = args.get(2).getLiteral();
+    _startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start));
+    String end = args.get(3).getLiteral();
+    _endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end));
+    _timeBucketSize = _dateTimeGranularity.granularityToMillis();
+    _numOfTimeBuckets = (int) ((_endMs - _startMs) / _timeBucketSize);
+
+    _fillExpressions = GapfillUtils.getFillExpressions(gapFillSelection);
+
+    _previousByGroupKey = new HashMap<>();
+    _groupByKeyIndexes = new ArrayList<>();
+    _groupByKeys = new HashSet<>();
+
+    ExpressionContext timeseriesOn = 
GapfillUtils.getTimeSeriesOnExpressionContext(gapFillSelection);
+    _timeSeries = timeseriesOn.getFunction().getArguments();
+  }
+
+  private int findBucketIndex(long time) {
+    return (int) ((time - _startMs) / _timeBucketSize);
+  }
+
+  private void replaceColumnNameWithAlias(DataSchema dataSchema) {
+    QueryContext queryContext;
+    if (_gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL_AGGREGATE) 
{
+      queryContext = _queryContext.getSubQueryContext().getSubQueryContext();
+    } else if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
+      queryContext = _queryContext;
+    } else {
+      queryContext = _queryContext.getSubQueryContext();
+    }
+    List<String> aliasList = queryContext.getAliasList();
+    Map<String, String> columnNameToAliasMap = new HashMap<>();
+    for (int i = 0; i < aliasList.size(); i++) {
+      if (aliasList.get(i) != null) {
+        ExpressionContext selection = 
queryContext.getSelectExpressions().get(i);
+        if (GapfillUtils.isGapfill(selection)) {
+          selection = selection.getFunction().getArguments().get(0);
+        }
+        columnNameToAliasMap.put(selection.toString(), aliasList.get(i));
+      }
+    }
+    for (int i = 0; i < dataSchema.getColumnNames().length; i++) {
+      if (columnNameToAliasMap.containsKey(dataSchema.getColumnNames()[i])) {
+        dataSchema.getColumnNames()[i] = 
columnNameToAliasMap.get(dataSchema.getColumnNames()[i]);
+      }
+    }
+  }
+
+  /**
+   * Here are three things that happen
+   * 1. Sort the result sets from all pinot servers based on timestamp
+   * 2. Gapfill the data for missing entities per time bucket
+   * 3. Aggregate the dataset per time bucket.
+   */
+  public void process(BrokerResponseNative brokerResponseNative) {
+    DataSchema dataSchema = 
brokerResponseNative.getResultTable().getDataSchema();
+    DataSchema resultTableSchema = getResultTableDataSchema(dataSchema);
+    if (brokerResponseNative.getResultTable().getRows().isEmpty()) {
+      brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, 
Collections.emptyList()));
+      return;
+    }
+
+    String[] columns = dataSchema.getColumnNames();
+
+    Map<String, Integer> indexes = new HashMap<>();
+    for (int i = 0; i < columns.length; i++) {
+      indexes.put(columns[i], i);
+    }
+
+    _isGroupBySelections = new boolean[dataSchema.getColumnDataTypes().length];
+
+    // The first one argument of timeSeries is time column. The left ones are 
defining entity.
+    for (ExpressionContext entityColum : _timeSeries) {
+      int index = indexes.get(entityColum.getIdentifier());
+      _isGroupBySelections[index] = true;
+    }
+
+    for (int i = 0; i < _isGroupBySelections.length; i++) {
+      if (_isGroupBySelections[i]) {
+        _groupByKeyIndexes.add(i);
+      }
+    }
+
+    List<Object[]>[] timeBucketedRawRows = 
putRawRowsIntoTimeBucket(brokerResponseNative.getResultTable().getRows());
+
+    List<Object[]> resultRows;
+    replaceColumnNameWithAlias(dataSchema);
+
+    if (_queryContext.getAggregationFunctions() == null) {
+
+      Map<String, Integer> sourceColumnsIndexes = new HashMap<>();
+      for (int i = 0; i < dataSchema.getColumnNames().length; i++) {
+        sourceColumnsIndexes.put(dataSchema.getColumnName(i), i);
+      }
+      _sourceColumnIndexForResultSchema = new 
int[resultTableSchema.getColumnNames().length];
+      for (int i = 0; i < _sourceColumnIndexForResultSchema.length; i++) {
+        _sourceColumnIndexForResultSchema[i] = 
sourceColumnsIndexes.get(resultTableSchema.getColumnName(i));
+      }
+    }
+
+    if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL_AGGREGATE || 
_gapfillType == GapfillUtils.GapfillType.GAP_FILL
+        || _gapfillType == GapfillUtils.GapfillType.GAP_FILL_SELECT) {
+      List<Object[]> gapfilledRows = gapFillAndAggregate(timeBucketedRawRows, 
resultTableSchema, dataSchema);
+      if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL_SELECT) {
+        resultRows = new ArrayList<>(gapfilledRows.size());
+        resultRows.addAll(gapfilledRows);
+      } else {
+        resultRows = gapfilledRows;
+      }
+    } else {
+      resultRows = gapFillAndAggregate(timeBucketedRawRows, resultTableSchema, 
dataSchema);
+    }

Review comment:
       Good catch, Fixed.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
##########
@@ -162,8 +163,7 @@ public String getQueryResponse(String query, String 
traceEnabled, String queryOp
       String inputTableName;
       switch (querySyntax) {
         case CommonConstants.Broker.Request.SQL:
-          inputTableName =
-              
SQL_QUERY_COMPILER.compileToBrokerRequest(query).getPinotQuery().getDataSource().getTableName();
+          inputTableName = 
GapfillUtils.getTableName(SQL_QUERY_COMPILER.compileToBrokerRequest(query).getPinotQuery());

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to