This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ff66173  Deprecate the PostAggregationGapfill. (#8383)
ff66173 is described below

commit ff66173a27eda5d327eb06c669e76a6488c82045
Author: weixiangsun <91153405+weixiang...@users.noreply.github.com>
AuthorDate: Tue Mar 22 11:34:13 2022 -0700

    Deprecate the PostAggregationGapfill. (#8383)
---
 .../core/operator/transform/TransformOperator.java |   4 +-
 .../reduce/GapFillGroupByDataTableReducer.java     | 491 ----------------
 .../core/query/reduce/PostAggregationHandler.java  |   4 +-
 .../core/query/reduce/ResultReducerFactory.java    |   3 -
 .../org/apache/pinot/core/util/GapfillUtils.java   |  30 +-
 .../apache/pinot/queries/GapfillQueriesTest.java   |   1 -
 .../queries/PostAggregationGapfillQueriesTest.java | 616 ---------------------
 7 files changed, 8 insertions(+), 1141 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
index 08bfeb8..d3b752e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
@@ -36,7 +36,6 @@ import org.apache.pinot.core.operator.blocks.TransformBlock;
 import org.apache.pinot.core.operator.transform.function.TransformFunction;
 import 
org.apache.pinot.core.operator.transform.function.TransformFunctionFactory;
 import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.util.GapfillUtils;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 
@@ -63,8 +62,7 @@ public class TransformOperator extends 
BaseOperator<TransformBlock> {
     _projectionOperator = projectionOperator;
     _dataSourceMap = projectionOperator.getDataSourceMap();
     for (ExpressionContext expression : expressions) {
-      TransformFunction transformFunction =
-          TransformFunctionFactory.get(queryContext, 
GapfillUtils.stripGapfill(expression), _dataSourceMap);
+      TransformFunction transformFunction = 
TransformFunctionFactory.get(queryContext, expression, _dataSourceMap);
       _transformFunctionMap.put(expression, transformFunction);
     }
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
deleted file mode 100644
index 9f739b0..0000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
+++ /dev/null
@@ -1,491 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.reduce;
-
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.metrics.BrokerGauge;
-import org.apache.pinot.common.metrics.BrokerMeter;
-import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.request.context.FilterContext;
-import org.apache.pinot.common.request.context.OrderByExpressionContext;
-import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.response.broker.QueryProcessingException;
-import org.apache.pinot.common.response.broker.ResultTable;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
-import org.apache.pinot.core.data.table.IndexedTable;
-import org.apache.pinot.core.data.table.Key;
-import org.apache.pinot.core.data.table.Record;
-import org.apache.pinot.core.data.table.SimpleIndexedTable;
-import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
-import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.transport.ServerRoutingInstance;
-import org.apache.pinot.core.util.GapfillUtils;
-import org.apache.pinot.core.util.GroupByUtils;
-import org.apache.pinot.core.util.trace.TraceCallable;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.data.DateTimeGranularitySpec;
-
-
-/**
- * Helper class to reduce data tables and set group by results into the 
BrokerResponseNative
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class GapFillGroupByDataTableReducer implements DataTableReducer {
-  private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, 
find a better value.
-
-  private final QueryContext _queryContext;
-  private final AggregationFunction[] _aggregationFunctions;
-  private final int _numAggregationFunctions;
-  private final List<ExpressionContext> _groupByExpressions;
-  private final int _numGroupByExpressions;
-  private final int _numColumns;
-  private final DateTimeGranularitySpec _dateTimeGranularity;
-  private final DateTimeFormatSpec _dateTimeFormatter;
-  private final long _startMs;
-  private final long _endMs;
-  private final Set<Key> _groupByKeys;
-  private final Map<Key, Object[]> _previousByGroupKey;
-  private final int _numOfGroupByKeys;
-  private final List<Integer> _groupByKeyIndexes;
-  private final boolean [] _isGroupBySelections;
-  private int _timeBucketIndex = -1;
-
-  GapFillGroupByDataTableReducer(QueryContext queryContext) {
-    Preconditions.checkArgument(
-        queryContext.getBrokerRequest().getPinotQuery() != null, "GapFill can 
only be applied to sql query");
-    _queryContext = queryContext;
-    _aggregationFunctions = queryContext.getAggregationFunctions();
-    assert _aggregationFunctions != null;
-    _numAggregationFunctions = _aggregationFunctions.length;
-    _groupByExpressions = queryContext.getGroupByExpressions();
-    assert _groupByExpressions != null;
-    _numGroupByExpressions = _groupByExpressions.size();
-    _numColumns = _numAggregationFunctions + _numGroupByExpressions;
-
-    ExpressionContext gapFillSelection = null;
-    for (ExpressionContext expressionContext : 
_queryContext.getSelectExpressions()) {
-      if (GapfillUtils.isPostAggregateGapfill(expressionContext)) {
-        gapFillSelection = expressionContext;
-        break;
-      }
-    }
-
-    List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
-    Preconditions.checkArgument(
-        args.size() == 5, "PostAggregateGapFill does not have correct number 
of arguments.");
-    Preconditions.checkArgument(
-        args.get(1).getLiteral() != null, "The second argument of 
PostAggregateGapFill should be TimeFormatter.");
-    Preconditions.checkArgument(
-        args.get(2).getLiteral() != null, "The third argument of 
PostAggregateGapFill should be start time.");
-    Preconditions.checkArgument(
-        args.get(3).getLiteral() != null, "The fourth argument of 
PostAggregateGapFill should be end time.");
-    Preconditions.checkArgument(
-        args.get(4).getLiteral() != null, "The fifth argument of 
PostAggregateGapFill should be time bucket size.");
-
-    boolean orderByTimeBucket = false;
-    if (_queryContext.getOrderByExpressions() != null && 
!_queryContext.getOrderByExpressions().isEmpty()) {
-      OrderByExpressionContext firstOrderByExpression = 
_queryContext.getOrderByExpressions().get(0);
-      orderByTimeBucket =
-          firstOrderByExpression.isAsc() && 
firstOrderByExpression.getExpression().equals(gapFillSelection);
-    }
-
-    Preconditions.checkArgument(
-        orderByTimeBucket, "PostAggregateGapFill does not work if the time 
bucket is not ordered.");
-
-    _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral());
-    _dateTimeGranularity = new 
DateTimeGranularitySpec(args.get(4).getLiteral());
-    String start = args.get(2).getLiteral();
-    String end = args.get(3).getLiteral();
-    _startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start));
-    _endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end));
-    _groupByKeys = new HashSet<>();
-    _previousByGroupKey = new HashMap<>();
-    _numOfGroupByKeys = _queryContext.getGroupByExpressions().size() - 1;
-    _groupByKeyIndexes = new ArrayList<>();
-    _isGroupBySelections = new 
boolean[_queryContext.getSelectExpressions().size()];
-
-    for (ExpressionContext expressionContext : _groupByExpressions) {
-      if (GapfillUtils.isPostAggregateGapfill(expressionContext)) {
-        for (int i = 0; i < _queryContext.getSelectExpressions().size(); i++) {
-          if 
(expressionContext.equals(_queryContext.getSelectExpressions().get(i))) {
-            _timeBucketIndex = i;
-            _isGroupBySelections[i] = true;
-            break;
-          }
-        }
-      } else {
-        for (int i = 0; i < _queryContext.getSelectExpressions().size(); i++) {
-          if 
(expressionContext.equals(_queryContext.getSelectExpressions().get(i))) {
-            _groupByKeyIndexes.add(i);
-            _isGroupBySelections[i] = true;
-            break;
-          }
-        }
-      }
-    }
-
-    Preconditions.checkArgument(_timeBucketIndex >= 0, "There is no time 
bucket.");
-  }
-
-  private long truncate(long epoch) {
-    int sz = _dateTimeGranularity.getSize();
-    return epoch / sz * sz;
-  }
-
-  /**
-   * Reduces and sets group by results into ResultTable, if responseFormat = 
sql
-   * By default, sets group by results into GroupByResults
-   */
-  @Override
-  public void reduceAndSetResults(String tableName, DataSchema dataSchema,
-      Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative 
brokerResponseNative,
-      DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
-    assert dataSchema != null;
-    Collection<DataTable> dataTables = dataTableMap.values();
-
-    try {
-      setSQLGroupByInResultTable(brokerResponseNative, dataSchema, dataTables, 
reducerContext, tableName,
-          brokerMetrics);
-    } catch (TimeoutException e) {
-      brokerResponseNative.getProcessingExceptions()
-          .add(new 
QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, 
e.getMessage()));
-    }
-    int resultSize = brokerResponseNative.getResultTable().getRows().size();
-
-    if (brokerMetrics != null && resultSize > 0) {
-      brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.GROUP_BY_SIZE, 
resultSize);
-    }
-  }
-
-  private Key constructGroupKeys(Object[] row) {
-    Object [] groupKeys = new Object[_numOfGroupByKeys];
-    for (int i = 0; i < _numOfGroupByKeys; i++) {
-      groupKeys[i] = row[_groupByKeyIndexes.get(i)];
-    }
-    return new Key(groupKeys);
-  }
-
-  /**
-   * Extract group by order by results and set into {@link ResultTable}
-   * @param brokerResponseNative broker response
-   * @param dataSchema data schema
-   * @param dataTables Collection of data tables
-   * @param reducerContext DataTableReducer context
-   * @param rawTableName table name
-   * @param brokerMetrics broker metrics (meters)
-   * @throws TimeoutException If unable complete within timeout.
-   */
-  private void setSQLGroupByInResultTable(BrokerResponseNative 
brokerResponseNative, DataSchema dataSchema,
-      Collection<DataTable> dataTables, DataTableReducerContext 
reducerContext, String rawTableName,
-      BrokerMetrics brokerMetrics)
-      throws TimeoutException {
-    IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, 
reducerContext);
-    if (brokerMetrics != null) {
-      brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
-      brokerMetrics.addValueToTableGauge(rawTableName, 
BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
-    }
-    DataSchema prePostAggregationDataSchema = 
getPrePostAggregationDataSchema(dataSchema);
-    ColumnDataType[] columnDataTypes = 
prePostAggregationDataSchema.getColumnDataTypes();
-
-    PostAggregationHandler postAggregationHandler =
-        new PostAggregationHandler(_queryContext, 
prePostAggregationDataSchema);
-    DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
-    ColumnDataType[] resultColumnDataTypes = 
resultDataSchema.getColumnDataTypes();
-    Iterator<Record> sortedIterator = indexedTable.iterator();
-    while (sortedIterator.hasNext()) {
-      Object[] row = sortedIterator.next().getValues();
-      extractFinalAggregationResults(row);
-      for (int i = 0; i < columnDataTypes.length; i++) {
-        row[i] = columnDataTypes[i].convert(row[i]);
-      }
-      Object[] resultRow = postAggregationHandler.getResult(row);
-      for (int i = 0; i < resultColumnDataTypes.length; i++) {
-        resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]);
-      }
-
-      _groupByKeys.add(constructGroupKeys(resultRow));
-    }
-
-    List<Object[]> gapfillResultRows = gapFill(indexedTable.iterator(), 
postAggregationHandler);
-    brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, 
gapfillResultRows));
-  }
-
-  List<Object[]> gapFill(Iterator<Record> sortedIterator, 
PostAggregationHandler postAggregationHandler) {
-    DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
-    ColumnDataType[] resultColumnDataTypes = 
resultDataSchema.getColumnDataTypes();
-    int limit = _queryContext.getLimit();
-    int numResultColumns = resultColumnDataTypes.length;
-    List<Object[]> gapfillResultRows = new ArrayList<>(limit);
-    long step = _dateTimeGranularity.granularityToMillis();
-    FilterContext havingFilter = _queryContext.getHavingFilter();
-    HavingFilterHandler havingFilterHandler = null;
-    if (havingFilter != null) {
-      havingFilterHandler = new HavingFilterHandler(havingFilter, 
postAggregationHandler);
-    }
-    Record record = null;
-    for (long time = _startMs; time + 2 * step <= _endMs; time += step) {
-      Set<Key> keys = new HashSet<>(_groupByKeys);
-      if (record == null && sortedIterator.hasNext()) {
-        record = sortedIterator.next();
-      }
-
-      while (record != null) {
-        Object[] row = record.getValues();
-
-        Object[] resultRow = postAggregationHandler.getResult(row);
-        for (int i = 0; i < resultColumnDataTypes.length; i++) {
-          resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]);
-        }
-
-        long timeCol = 
_dateTimeFormatter.fromFormatToMillis(String.valueOf(resultRow[_timeBucketIndex]));
-        if (timeCol > time) {
-          break;
-        }
-        if (timeCol == time) {
-          if (havingFilterHandler == null || havingFilterHandler.isMatch(row)) 
{
-            gapfillResultRows.add(resultRow);
-            if (gapfillResultRows.size() == limit) {
-              return gapfillResultRows;
-            }
-          }
-          Key key = constructGroupKeys(resultRow);
-          keys.remove(key);
-          _previousByGroupKey.put(key, resultRow);
-        }
-        if (sortedIterator.hasNext()) {
-          record = sortedIterator.next();
-        } else {
-          record = null;
-        }
-      }
-
-      for (Key key : keys) {
-        Object[] gapfillRow = new Object[numResultColumns];
-        int keyIndex = 0;
-        for (int i = 0; i < _isGroupBySelections.length; i++) {
-          if (_isGroupBySelections[i]) {
-            if (i == _timeBucketIndex) {
-              if (resultColumnDataTypes[i] == ColumnDataType.LONG) {
-                gapfillRow[_timeBucketIndex] = 
Long.valueOf(_dateTimeFormatter.fromMillisToFormat(time));
-              } else {
-                gapfillRow[_timeBucketIndex] = 
_dateTimeFormatter.fromMillisToFormat(time);
-              }
-            } else {
-              gapfillRow[i] = key.getValues()[keyIndex++];
-            }
-          } else {
-            gapfillRow[i] = getFillValue(i, key, resultColumnDataTypes[i]);
-          }
-        }
-
-        if (havingFilterHandler == null || 
havingFilterHandler.isMatch(gapfillRow)) {
-          gapfillResultRows.add(gapfillRow);
-          if (gapfillResultRows.size() == limit) {
-            return gapfillResultRows;
-          }
-        }
-      }
-    }
-    return gapfillResultRows;
-  }
-
-  Object getFillValue(int columIndex, Object key, ColumnDataType dataType) {
-    ExpressionContext expressionContext = 
_queryContext.getSelectExpressions().get(columIndex);
-    if (expressionContext.getFunction() != null && 
GapfillUtils.isFill(expressionContext)) {
-      List<ExpressionContext> args = 
expressionContext.getFunction().getArguments();
-      if (args.get(1).getLiteral() == null) {
-        throw new UnsupportedOperationException("Wrong Sql.");
-      }
-      GapfillUtils.FillType fillType = 
GapfillUtils.FillType.valueOf(args.get(1).getLiteral());
-      if (fillType == GapfillUtils.FillType.FILL_DEFAULT_VALUE) {
-        // TODO: may fill the default value from sql in the future.
-        return GapfillUtils.getDefaultValue(dataType);
-      } else if (fillType == GapfillUtils.FillType.FILL_PREVIOUS_VALUE) {
-        Object[] row = _previousByGroupKey.get(key);
-        if (row != null) {
-          return row[columIndex];
-        } else {
-          return GapfillUtils.getDefaultValue(dataType);
-        }
-      } else {
-        throw new UnsupportedOperationException("unsupported fill type.");
-      }
-    } else {
-      return GapfillUtils.getDefaultValue(dataType);
-    }
-  }
-
-  /**
-   * Helper method to extract the final aggregation results for the given row 
(in-place).
-   */
-  private void extractFinalAggregationResults(Object[] row) {
-    for (int i = 0; i < _numAggregationFunctions; i++) {
-      int valueIndex = i + _numGroupByExpressions;
-      row[valueIndex] = 
_aggregationFunctions[i].extractFinalResult(row[valueIndex]);
-    }
-  }
-
-  /**
-   * Constructs the DataSchema for the rows before the post-aggregation (SQL 
mode).
-   */
-  private DataSchema getPrePostAggregationDataSchema(DataSchema dataSchema) {
-    String[] columnNames = dataSchema.getColumnNames();
-    ColumnDataType[] columnDataTypes = new ColumnDataType[_numColumns];
-    System.arraycopy(dataSchema.getColumnDataTypes(), 0, columnDataTypes, 0, 
_numGroupByExpressions);
-    for (int i = 0; i < _numAggregationFunctions; i++) {
-      columnDataTypes[i + _numGroupByExpressions] = 
_aggregationFunctions[i].getFinalResultColumnType();
-    }
-    return new DataSchema(columnNames, columnDataTypes);
-  }
-
-  private IndexedTable getIndexedTable(DataSchema dataSchema, 
Collection<DataTable> dataTablesToReduce,
-      DataTableReducerContext reducerContext)
-      throws TimeoutException {
-    long start = System.currentTimeMillis();
-    int numDataTables = dataTablesToReduce.size();
-
-    // Get the number of threads to use for reducing.
-    // In case of single reduce thread, fall back to SimpleIndexedTable to 
avoid redundant locking/unlocking calls.
-    int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, 
reducerContext.getMaxReduceThreadsPerQuery());
-    int limit = _queryContext.getLimit();
-    // TODO: Make minTrimSize configurable
-    int trimSize = GroupByUtils.getTableCapacity(limit);
-    // NOTE: For query with HAVING clause, use trimSize as resultSize to 
ensure the result accuracy.
-    // TODO: Resolve the HAVING clause within the IndexedTable before 
returning the result
-    int resultSize = _queryContext.getHavingFilter() != null ? trimSize : 
limit;
-    int trimThreshold = reducerContext.getGroupByTrimThreshold();
-    IndexedTable indexedTable;
-    if (numReduceThreadsToUse <= 1) {
-      indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, 
resultSize, trimSize, trimThreshold);
-    } else {
-      if (trimThreshold >= GroupByOrderByCombineOperator.MAX_TRIM_THRESHOLD) {
-        // special case of trim threshold where it is set to max value.
-        // there won't be any trimming during upsert in this case.
-        // thus we can avoid the overhead of read-lock and write-lock
-        // in the upsert method.
-        indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, 
_queryContext, resultSize);
-      } else {
-        indexedTable = new ConcurrentIndexedTable(dataSchema, _queryContext, 
resultSize, trimSize, trimThreshold);
-      }
-    }
-
-    // Create groups of data tables that each thread can process concurrently.
-    // Given that numReduceThreads is <= numDataTables, each group will have 
at least one data table.
-    ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
-    List<List<DataTable>> reduceGroups = new 
ArrayList<>(numReduceThreadsToUse);
-
-    for (int i = 0; i < numReduceThreadsToUse; i++) {
-      reduceGroups.add(new ArrayList<>());
-    }
-    for (int i = 0; i < numDataTables; i++) {
-      reduceGroups.get(i % numReduceThreadsToUse).add(dataTables.get(i));
-    }
-
-    ColumnDataType[] storedColumnDataTypes = 
dataSchema.getStoredColumnDataTypes();
-    long timeOutMs = reducerContext.getReduceTimeOutMs() - 
(System.currentTimeMillis() - start);
-    try {
-      reducerContext.getExecutorService().invokeAll(reduceGroups.stream()
-          .map(reduceGroup -> new TraceCallable<Void>() {
-            @Override
-            public Void callJob() throws Exception {
-              for (DataTable dataTable : reduceGroup) {
-                int numRows = dataTable.getNumberOfRows();
-                for (int rowId = 0; rowId < numRows; rowId++) {
-                  Object[] values = new Object[_numColumns];
-                  for (int colId = 0; colId < _numColumns; colId++) {
-                    switch (storedColumnDataTypes[colId]) {
-                      case INT:
-                        values[colId] = dataTable.getInt(rowId, colId);
-                        break;
-                      case LONG:
-                        values[colId] = dataTable.getLong(rowId, colId);
-                        break;
-                      case FLOAT:
-                        values[colId] = dataTable.getFloat(rowId, colId);
-                        break;
-                      case DOUBLE:
-                        values[colId] = dataTable.getDouble(rowId, colId);
-                        break;
-                      case STRING:
-                        values[colId] = dataTable.getString(rowId, colId);
-                        break;
-                      case BYTES:
-                        values[colId] = dataTable.getBytes(rowId, colId);
-                        break;
-                      case OBJECT:
-                        values[colId] = dataTable.getObject(rowId, colId);
-                        break;
-                      // Add other aggregation intermediate result / group-by 
column type supports here
-                      default:
-                        throw new IllegalStateException();
-                    }
-                  }
-                  indexedTable.upsert(new Record(values));
-              }
-            }
-            return null;
-          }
-          }).collect(Collectors.toList()), timeOutMs, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      throw new TimeoutException("Timed out in broker reduce phase.");
-    }
-
-    indexedTable.finish(true);
-    return indexedTable;
-  }
-
-  /**
-   * Computes the number of reduce threads to use per query.
-   * <ul>
-   *   <li> Use single thread if number of data tables to reduce is less than
-   *   {@value #MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE}.</li>
-   *   <li> Else, use min of max allowed reduce threads per query, and number 
of data tables.</li>
-   * </ul>
-   *
-   * @param numDataTables Number of data tables to reduce
-   * @param maxReduceThreadsPerQuery Max allowed reduce threads per query
-   * @return Number of reduce threads to use for the query
-   */
-  private int getNumReduceThreadsToUse(int numDataTables, int 
maxReduceThreadsPerQuery) {
-    // Use single thread if number of data tables < 
MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE.
-    if (numDataTables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE) {
-      return Math.min(1, numDataTables); // Number of data tables can be zero.
-    }
-
-    return Math.min(maxReduceThreadsPerQuery, numDataTables);
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java
index 7fb878c..4c4bcaf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java
@@ -35,7 +35,6 @@ import 
org.apache.pinot.core.query.reduce.filter.LiteralValueExtractor;
 import org.apache.pinot.core.query.reduce.filter.ValueExtractor;
 import org.apache.pinot.core.query.reduce.filter.ValueExtractorFactory;
 import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.util.GapfillUtils;
 
 
 /**
@@ -58,7 +57,7 @@ public class PostAggregationHandler implements 
ValueExtractorFactory {
       _numGroupByExpressions = groupByExpressions.size();
       _groupByExpressionIndexMap = new HashMap<>();
       for (int i = 0; i < _numGroupByExpressions; i++) {
-        
_groupByExpressionIndexMap.put(GapfillUtils.stripGapfill(groupByExpressions.get(i)),
 i);
+        _groupByExpressionIndexMap.put(groupByExpressions.get(i), i);
       }
     } else {
       _numGroupByExpressions = 0;
@@ -107,7 +106,6 @@ public class PostAggregationHandler implements 
ValueExtractorFactory {
    */
   @Override
   public ValueExtractor getValueExtractor(ExpressionContext expression) {
-    expression = GapfillUtils.stripGapfill(expression);
     if (expression.getType() == ExpressionContext.Type.LITERAL) {
       // Literal
       return new LiteralValueExtractor(expression.getLiteral());
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
index a48a007..3631a3c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
@@ -23,7 +23,6 @@ import 
org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
-import org.apache.pinot.core.util.GapfillUtils;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
 
@@ -58,8 +57,6 @@ public final class ResultReducerFactory {
         } else {
           return new AggregationDataTableReducer(queryContext);
         }
-      } else if (GapfillUtils.isPostAggregateGapfill(queryContext)) {
-        return new GapFillGroupByDataTableReducer(queryContext);
       } else {
         // Aggregation group-by query
         return new GroupByDataTableReducer(queryContext);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
index c40837e..988f780 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
@@ -39,7 +39,6 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
  * Util class to encapsulate all utilites required for gapfill.
  */
 public class GapfillUtils {
-  private static final String POST_AGGREGATE_GAP_FILL = "postaggregategapfill";
   private static final String GAP_FILL = "gapfill";
   private static final String AS = "as";
   private static final String FILL = "fill";
@@ -56,29 +55,12 @@ public class GapfillUtils {
 
     FunctionContext function = expression.getFunction();
     String functionName = function.getFunctionName();
-    if (functionName.equals(POST_AGGREGATE_GAP_FILL) || 
functionName.equals(FILL) || functionName.equals(GAP_FILL)) {
+    if (functionName.equals(FILL) || functionName.equals(GAP_FILL)) {
       return function.getArguments().get(0);
     }
     return expression;
   }
 
-  public static boolean isPostAggregateGapfill(ExpressionContext 
expressionContext) {
-    if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
-      return false;
-    }
-
-    return 
POST_AGGREGATE_GAP_FILL.equals(expressionContext.getFunction().getFunctionName());
-  }
-
-  public static boolean isPostAggregateGapfill(QueryContext queryContext) {
-    for (ExpressionContext expressionContext : 
queryContext.getSelectExpressions()) {
-      if (isPostAggregateGapfill(expressionContext)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   public static boolean isFill(ExpressionContext expressionContext) {
     if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
       return false;
@@ -189,15 +171,15 @@ public class GapfillUtils {
     Preconditions.checkArgument(gapFillSelection != null && 
gapFillSelection.getFunction() != null,
         "Gapfill Expression should be function.");
     List<ExpressionContext> args = 
gapFillSelection.getFunction().getArguments();
-    Preconditions.checkArgument(args.size() > 5, "PreAggregateGapFill does not 
have correct number of arguments.");
+    Preconditions.checkArgument(args.size() > 5, "Gapfill does not have 
correct number of arguments.");
     Preconditions.checkArgument(args.get(1).getLiteral() != null,
-        "The second argument of PostAggregateGapFill should be 
TimeFormatter.");
+        "The second argument of Gapfill should be TimeFormatter.");
     Preconditions.checkArgument(args.get(2).getLiteral() != null,
-        "The third argument of PostAggregateGapFill should be start time.");
+        "The third argument of Gapfill should be start time.");
     Preconditions.checkArgument(args.get(3).getLiteral() != null,
-        "The fourth argument of PostAggregateGapFill should be end time.");
+        "The fourth argument of Gapfill should be end time.");
     Preconditions.checkArgument(args.get(4).getLiteral() != null,
-        "The fifth argument of PostAggregateGapFill should be time bucket 
size.");
+        "The fifth argument of Gapfill should be time bucket size.");
 
     ExpressionContext timeseriesOn = 
GapfillUtils.getTimeSeriesOnExpressionContext(gapFillSelection);
     Preconditions.checkArgument(timeseriesOn != null, "The TimeSeriesOn 
expressions should be specified.");
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/GapfillQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/GapfillQueriesTest.java
index bf0a28b..829a55d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/GapfillQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/GapfillQueriesTest.java
@@ -51,7 +51,6 @@ import org.testng.annotations.Test;
  * Queries test for Gapfill queries.
  */
 // TODO: Item 1. table alias for subquery in next PR
-// TODO: Item 2. Deprecate PostAggregateGapfill implementation in next PR
 @SuppressWarnings("rawtypes")
 public class GapfillQueriesTest extends BaseQueriesTest {
   private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"PostAggregationGapfillQueriesTest");
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java
deleted file mode 100644
index bfd7baa..0000000
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java
+++ /dev/null
@@ -1,616 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.queries;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.response.broker.ResultTable;
-import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
-import org.apache.pinot.segment.spi.ImmutableSegment;
-import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.data.DateTimeGranularitySpec;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.ReadMode;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-
-/**
- * Queries test for PostAggregationGapfill queries.
- */
-@SuppressWarnings("rawtypes")
-public class PostAggregationGapfillQueriesTest extends BaseQueriesTest {
-  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"PostAggregationGapfillQueriesTest");
-  private static final String RAW_TABLE_NAME = "parkingData";
-  private static final String SEGMENT_NAME = "testSegment";
-  private static final Random RANDOM = new Random();
-
-  private static final int NUM_LOTS = 4;
-
-  private static final String IS_OCCUPIED_COLUMN = "isOccupied";
-  private static final String LOT_ID_COLUMN = "lotId";
-  private static final String EVENT_TIME_COLUMN = "eventTime";
-  private static final Schema SCHEMA = new Schema.SchemaBuilder()
-      .addSingleValueDimension(IS_OCCUPIED_COLUMN, DataType.BOOLEAN)
-      .addSingleValueDimension(LOT_ID_COLUMN, DataType.STRING)
-      .addSingleValueDimension(EVENT_TIME_COLUMN, DataType.LONG)
-      .setPrimaryKeyColumns(Arrays.asList(LOT_ID_COLUMN, EVENT_TIME_COLUMN))
-      .build();
-  private static final TableConfig TABLE_CONFIG = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
-      .build();
-
-  private IndexSegment _indexSegment;
-  private List<IndexSegment> _indexSegments;
-
-  @Override
-  protected String getFilter() {
-    // NOTE: Use a match all filter to switch between 
DictionaryBasedAggregationOperator and AggregationOperator
-    return " WHERE eventTime >= 0";
-  }
-
-  @Override
-  protected IndexSegment getIndexSegment() {
-    return _indexSegment;
-  }
-
-  @Override
-  protected List<IndexSegment> getIndexSegments() {
-    return _indexSegments;
-  }
-
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    FileUtils.deleteDirectory(INDEX_DIR);
-
-    long current = 1636286400000L; //November 7, 2021 12:00:00 PM
-    int duplicates = 16;
-    int interval = 1000 * 900; // 15 minutes
-    long start = current - duplicates * 2 * interval; //November 7, 2021 
4:00:00 AM
-
-    List<GenericRow> records = new ArrayList<>(NUM_LOTS * 2);
-    for (int i = 0; i < NUM_LOTS; i++) {
-      for (int j = 0; j < duplicates; j++) {
-        if (j == 4 || j == 5 || j == 6 || j == 7 || j == 10 || j == 11) {
-          continue;
-        }
-        long parkingTime = start + interval * 2 * j + RANDOM.nextInt(interval);
-        long departingTime = j == 3 ? start + interval * (2 * j + 6) + 
RANDOM.nextInt(interval) : start
-            + interval * (2 * j + 1) + RANDOM.nextInt(interval);
-
-        GenericRow parkingRow = new GenericRow();
-        parkingRow.putValue(EVENT_TIME_COLUMN, parkingTime);
-        parkingRow.putValue(LOT_ID_COLUMN, "LotId_" + String.valueOf(i));
-        parkingRow.putValue(IS_OCCUPIED_COLUMN, true);
-        records.add(parkingRow);
-
-        GenericRow departingRow = new GenericRow();
-        departingRow.putValue(EVENT_TIME_COLUMN, departingTime);
-        departingRow.putValue(LOT_ID_COLUMN, "LotId_" + String.valueOf(i));
-        departingRow.putValue(IS_OCCUPIED_COLUMN, false);
-        records.add(departingRow);
-      }
-    }
-
-    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
-    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
-    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
-    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
-
-    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
-    driver.build();
-
-    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
-    _indexSegment = immutableSegment;
-    _indexSegments = Arrays.asList(immutableSegment);
-  }
-
-  @Test
-  public void datetimeconvertGapfillTest() {
-    String dataTimeConvertQuery = "SELECT "
-        + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
-        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', 
'1:HOURS') AS time_col, "
-        + "lotId, "
-        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
-        + "FROM parkingData "
-        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
-        + "GROUP BY 1, 2 "
-        + "ORDER BY 1 "
-        + "LIMIT 200";
-
-    BrokerResponseNative dateTimeConvertBrokerResponse = 
getBrokerResponseForSqlQuery(dataTimeConvertQuery);
-
-    ResultTable dateTimeConvertResultTable = 
dateTimeConvertBrokerResponse.getResultTable();
-    Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
-
-    String gapfillQuery = "SELECT "
-        + "PostAggregateGapFill(DATETIMECONVERT(eventTime, 
'1:MILLISECONDS:EPOCH', "
-        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', 
'1:HOURS'), "
-        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
-        + "'2021-11-07 3:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS') AS 
time_col, "
-        + "lotId, "
-        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_PREVIOUS_VALUE') as status1, "
-        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_DEFAULT_VALUE') as status2, "
-        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
-        + "FROM parkingData "
-        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
-        + "GROUP BY 1, 2 "
-        + "ORDER BY 1 "
-        + "LIMIT 200";
-
-    DateTimeFormatSpec dateTimeFormatter
-        = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd 
HH:mm:ss.SSS");
-    DateTimeGranularitySpec dateTimeGranularity = new 
DateTimeGranularitySpec("1:HOURS");
-
-    BrokerResponseNative gapfillBrokerResponse = 
getBrokerResponseForSqlQuery(gapfillQuery);
-
-    ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
-    Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
-    List<Object[]> gapFillRows = gapFillResultTable.getRows();
-    long start = dateTimeFormatter.fromFormatToMillis("2021-11-07 
03:00:00.000");
-    for (int i = 0; i < 32; i += 4) {
-      String firstTimeCol = (String) gapFillRows.get(i)[0];
-      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
-      Assert.assertEquals(timeStamp, start);
-      Set<String> lots = new HashSet<>();
-      lots.add((String) gapFillRows.get(i)[1]);
-      for (int j = 1; j < 4; j++) {
-        Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
-        Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
-        lots.add((String) gapFillRows.get(i + j)[1]);
-      }
-      start += dateTimeGranularity.granularityToMillis();
-    }
-  }
-
-  @Test
-  public void toEpochHoursGapfillTest() {
-    String dataTimeConvertQuery = "SELECT "
-        + "ToEpochHours(eventTime) AS time_col, "
-        + "lotId, "
-        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
-        + "FROM parkingData "
-        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
-        + "GROUP BY 1, 2 "
-        + "ORDER BY 1 "
-        + "LIMIT 200";
-
-    BrokerResponseNative dateTimeConvertBrokerResponse = 
getBrokerResponseForSqlQuery(dataTimeConvertQuery);
-
-    ResultTable dateTimeConvertResultTable = 
dateTimeConvertBrokerResponse.getResultTable();
-    Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
-
-    String gapfillQuery = "SELECT "
-        + "PostAggregateGapFill(ToEpochHours(eventTime), '1:HOURS:EPOCH', "
-        + "'454515',  '454524', '1:HOURS') AS time_col, "
-        + "lotId, "
-        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_PREVIOUS_VALUE') as status1, "
-        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_DEFAULT_VALUE') as status2, "
-        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
-        + "FROM parkingData "
-        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
-        + "GROUP BY 1, 2 "
-        + "ORDER BY 1 "
-        + "LIMIT 200";
-
-    DateTimeFormatSpec dateTimeFormatter = new 
DateTimeFormatSpec("1:HOURS:EPOCH");
-    DateTimeGranularitySpec dateTimeGranularity = new 
DateTimeGranularitySpec("1:HOURS");
-
-    BrokerResponseNative gapfillBrokerResponse = 
getBrokerResponseForSqlQuery(gapfillQuery);
-
-    ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
-    Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
-    List<Object[]> gapFillRows = gapFillResultTable.getRows();
-    long start = dateTimeFormatter.fromFormatToMillis("454515");
-    for (int i = 0; i < 32; i += 4) {
-      Long firstTimeCol = (Long) gapFillRows.get(i)[0];
-      long timeStamp = 
dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString());
-      Assert.assertEquals(timeStamp, start);
-      Set<String> lots = new HashSet<>();
-      lots.add((String) gapFillRows.get(i)[1]);
-      for (int j = 1; j < 4; j++) {
-        Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
-        Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
-        lots.add((String) gapFillRows.get(i + j)[1]);
-      }
-      start += dateTimeGranularity.granularityToMillis();
-    }
-  }
-
-  @Test
-  public void toEpochMinutesRoundedHoursGapfillTest() {
-    String dataTimeConvertQuery = "SELECT "
-        + "ToEpochMinutesRounded(eventTime, 60) AS time_col, "
-        + "lotId, "
-        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
-        + "FROM parkingData "
-        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
-        + "GROUP BY 1, 2 "
-        + "ORDER BY 1 "
-        + "LIMIT 200";
-
-    BrokerResponseNative dateTimeConvertBrokerResponse = 
getBrokerResponseForSqlQuery(dataTimeConvertQuery);
-
-    ResultTable dateTimeConvertResultTable = 
dateTimeConvertBrokerResponse.getResultTable();
-    Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
-
-    String gapfillQuery = "SELECT "
-        + "PostAggregateGapFill(ToEpochMinutesRounded(eventTime, 60), 
'1:HOURS:EPOCH', "
-        + "'454515',  '454524', '1:HOURS') AS time_col, "
-        + "lotId, "
-        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_PREVIOUS_VALUE') as status1, "
-        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_DEFAULT_VALUE') as status2, "
-        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
-        + "FROM parkingData "
-        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
-        + "GROUP BY 1, 2 "
-        + "ORDER BY 1 "
-        + "LIMIT 200";
-
-    DateTimeFormatSpec dateTimeFormatter = new 
DateTimeFormatSpec("1:HOURS:EPOCH");
-    DateTimeGranularitySpec dateTimeGranularity = new 
DateTimeGranularitySpec("1:HOURS");
-
-    BrokerResponseNative gapfillBrokerResponse = 
getBrokerResponseForSqlQuery(gapfillQuery);
-
-    ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
-    Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
-    List<Object[]> gapFillRows = gapFillResultTable.getRows();
-    long start = dateTimeFormatter.fromFormatToMillis("454515");
-    for (int i = 0; i < 32; i += 4) {
-      Long firstTimeCol = (Long) gapFillRows.get(i)[0];
-      long timeStamp = 
dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString());
-      Assert.assertEquals(timeStamp, start);
-      Set<String> lots = new HashSet<>();
-      lots.add((String) gapFillRows.get(i)[1]);
-      for (int j = 1; j < 4; j++) {
-        Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
-        Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
-        lots.add((String) gapFillRows.get(i + j)[1]);
-      }
-      start += dateTimeGranularity.granularityToMillis();
-    }
-  }
-
-  @Test
-  public void toEpochMinutesBucketHoursGapfillTest() {
-    String dataTimeConvertQuery = "SELECT "
-        + "ToEpochMinutesBucket(eventTime, 60) AS time_col, "
-        + "lotId, "
-        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
-        + "FROM parkingData "
-        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
-        + "GROUP BY 1, 2 "
-        + "ORDER BY 1 "
-        + "LIMIT 200";
-
-    BrokerResponseNative dateTimeConvertBrokerResponse = 
getBrokerResponseForSqlQuery(dataTimeConvertQuery);
-
-    ResultTable dateTimeConvertResultTable = 
dateTimeConvertBrokerResponse.getResultTable();
-    Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
-
-    String gapfillQuery = "SELECT "
-        + "PostAggregateGapFill(ToEpochMinutesBucket(eventTime, 60), 
'1:HOURS:EPOCH', "
-        + "'454515',  '454524', '1:HOURS') AS time_col, "
-        + "lotId, "
-        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_PREVIOUS_VALUE') as status1, "
-        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_DEFAULT_VALUE') as status2, "
-        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
-        + "FROM parkingData "
-        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
-        + "GROUP BY 1, 2 "
-        + "ORDER BY 1 "
-        + "LIMIT 200";
-
-    DateTimeFormatSpec dateTimeFormatter = new 
DateTimeFormatSpec("1:HOURS:EPOCH");
-    DateTimeGranularitySpec dateTimeGranularity = new 
DateTimeGranularitySpec("1:HOURS");
-
-    BrokerResponseNative gapfillBrokerResponse = 
getBrokerResponseForSqlQuery(gapfillQuery);
-
-    ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
-    Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
-    List<Object[]> gapFillRows = gapFillResultTable.getRows();
-    long start = dateTimeFormatter.fromFormatToMillis("454515");
-    for (int i = 0; i < 32; i += 4) {
-      Long firstTimeCol = (Long) gapFillRows.get(i)[0];
-      long timeStamp = 
dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString());
-      Assert.assertEquals(timeStamp, start);
-      Set<String> lots = new HashSet<>();
-      lots.add((String) gapFillRows.get(i)[1]);
-      for (int j = 1; j < 4; j++) {
-        Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
-        Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
-        lots.add((String) gapFillRows.get(i + j)[1]);
-      }
-      start += dateTimeGranularity.granularityToMillis();
-    }
-  }
-
-  @Test
-  public void dateTruncHoursGapfillTest() {
-    String dataTimeConvertQuery = "SELECT "
-        + "DATETRUNC('hour', eventTime, 'milliseconds') AS time_col, "
-        + "lotId, "
-        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
-        + "FROM parkingData "
-        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
-        + "GROUP BY 1, 2 "
-        + "ORDER BY 1 "
-        + "LIMIT 200";
-
-    BrokerResponseNative dateTimeConvertBrokerResponse = 
getBrokerResponseForSqlQuery(dataTimeConvertQuery);
-
-    ResultTable dateTimeConvertResultTable = 
dateTimeConvertBrokerResponse.getResultTable();
-    Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
-
-    String gapfillQuery = "SELECT "
-        + "PostAggregateGapFill(DATETRUNC('hour', eventTime, 'milliseconds'), 
'1:HOURS:EPOCH', "
-        + "'454515',  '454524', '1:HOURS') AS time_col, "
-        + "lotId, "
-        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_PREVIOUS_VALUE') as status1, "
-        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_DEFAULT_VALUE') as status2, "
-        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
-        + "FROM parkingData "
-        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
-        + "GROUP BY 1, 2 "
-        + "ORDER BY 1 "
-        + "LIMIT 200";
-
-    DateTimeFormatSpec dateTimeFormatter = new 
DateTimeFormatSpec("1:HOURS:EPOCH");
-    DateTimeGranularitySpec dateTimeGranularity = new 
DateTimeGranularitySpec("1:HOURS");
-
-    BrokerResponseNative gapfillBrokerResponse = 
getBrokerResponseForSqlQuery(gapfillQuery);
-
-    ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
-    Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
-    List<Object[]> gapFillRows = gapFillResultTable.getRows();
-    long start = dateTimeFormatter.fromFormatToMillis("454515");
-    for (int i = 0; i < 32; i += 4) {
-      Long firstTimeCol = (Long) gapFillRows.get(i)[0];
-      long timeStamp = 
dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString());
-      Assert.assertEquals(timeStamp, start);
-      Set<String> lots = new HashSet<>();
-      lots.add((String) gapFillRows.get(i)[1]);
-      for (int j = 1; j < 4; j++) {
-        Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
-        Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
-        lots.add((String) gapFillRows.get(i + j)[1]);
-      }
-      start += dateTimeGranularity.granularityToMillis();
-    }
-  }
-
-  @Test
-  public void datetimeconvertGapfillTestWithoutTimeBucketOrdering() {
-    try {
-      String gapfillQuery = "SELECT "
-          + "PostAggregateGapFill(DATETIMECONVERT(eventTime, 
'1:MILLISECONDS:EPOCH', "
-          + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', 
'1:HOURS'), "
-          + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
-          + "'2021-11-07 3:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS') 
AS time_col, "
-          + "lotId, "
-          + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_PREVIOUS_VALUE') as status1, "
-          + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_DEFAULT_VALUE') as status2, "
-          + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
-          + "FROM parkingData "
-          + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
-          + "GROUP BY 1, 2 "
-          + "LIMIT 200";
-
-      getBrokerResponseForSqlQuery(gapfillQuery);
-      Assert.fail();
-    } catch (IllegalArgumentException e) {
-      Assert.assertEquals(e.getMessage(), "PostAggregateGapFill does not work 
if the time bucket is not ordered.");
-    }
-  }
-
-  @Test
-  public void datetimeconvertGapfillTestWithHavingClause() {
-    String dataTimeConvertQueryWithUnoccupied = "SELECT "
-        + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
-        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', 
'1:HOURS') AS time_col, "
-        + "lotId, "
-        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status "
-        + "FROM parkingData "
-        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
-        + "GROUP BY 1, 2 "
-        + "HAVING status = 'false' "
-        + "ORDER BY 1 "
-        + "LIMIT 200";
-
-    BrokerResponseNative dateTimeConvertBrokerResponseWithUnoccupied
-        = getBrokerResponseForSqlQuery(dataTimeConvertQueryWithUnoccupied);
-
-    ResultTable dateTimeConvertResultTableWithUnoccupied = 
dateTimeConvertBrokerResponseWithUnoccupied.getResultTable();
-    
Assert.assertEquals(dateTimeConvertResultTableWithUnoccupied.getRows().size(), 
20);
-
-    String dataTimeConvertQueryWithOccupied = "SELECT "
-        + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
-        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', 
'1:HOURS') AS time_col, "
-        + "lotId, "
-        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status "
-        + "FROM parkingData "
-        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
-        + "GROUP BY 1, 2 "
-        + "HAVING status = 'true' "
-        + "ORDER BY 1 "
-        + "LIMIT 200";
-
-    BrokerResponseNative dateTimeConvertBrokerResponseWithOccupied
-        = getBrokerResponseForSqlQuery(dataTimeConvertQueryWithOccupied);
-
-    ResultTable dateTimeConvertResultTableWithOccupied = 
dateTimeConvertBrokerResponseWithOccupied.getResultTable();
-    
Assert.assertEquals(dateTimeConvertResultTableWithOccupied.getRows().size(), 4);
-
-    String gapfillQueryWithOccupied = "SELECT "
-        + "PostAggregateGapFill(DATETIMECONVERT(eventTime, 
'1:MILLISECONDS:EPOCH', "
-        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', 
'1:HOURS'), "
-        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
-        + "'2021-11-07 3:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS') AS 
time_col, "
-        + "lotId, "
-        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_PREVIOUS_VALUE') as status "
-        + "FROM parkingData "
-        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
-        + "GROUP BY 1, 2 "
-        + "HAVING status = 'true' "
-        + "ORDER BY 1 "
-        + "LIMIT 7";
-
-    BrokerResponseNative gapfillBrokerResponseWithOccupied = 
getBrokerResponseForSqlQuery(gapfillQueryWithOccupied);
-
-    ResultTable gapFillResultTableWithOccupied = 
gapfillBrokerResponseWithOccupied.getResultTable();
-    Assert.assertEquals(gapFillResultTableWithOccupied.getRows().size(), 7);
-
-    for (Object [] row : gapFillResultTableWithOccupied.getRows()) {
-      Assert.assertEquals(row[2], true);
-    }
-
-    String gapfillQueryWithUnoccupied = "SELECT "
-        + "PostAggregateGapFill(DATETIMECONVERT(eventTime, 
'1:MILLISECONDS:EPOCH', "
-        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', 
'1:HOURS'), "
-        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
-        + "'2021-11-07 3:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS') AS 
time_col, "
-        + "lotId, "
-        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_PREVIOUS_VALUE') as status "
-        + "FROM parkingData "
-        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
-        + "GROUP BY 1, 2 "
-        + "HAVING status = 'false' "
-        + "ORDER BY 1 "
-        + "LIMIT 24";
-
-    BrokerResponseNative gapfillBrokerResponseWithUnoccupied = 
getBrokerResponseForSqlQuery(gapfillQueryWithUnoccupied);
-
-    ResultTable gapFillResultTableWithUnoccupied = 
gapfillBrokerResponseWithUnoccupied.getResultTable();
-    Assert.assertEquals(gapFillResultTableWithUnoccupied.getRows().size(), 24);
-    for (Object [] row : gapFillResultTableWithUnoccupied.getRows()) {
-      Assert.assertEquals(row[2], false);
-    }
-  }
-
-
-  @Test
-  public void datetimeconvertGapfillTestTimeBucketAsLastSelection() {
-    String gapfillQuery = "SELECT "
-        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_PREVIOUS_VALUE') as status1, "
-        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_DEFAULT_VALUE') as status2, "
-        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3, "
-        + "lotId, PostAggregateGapFill(DATETIMECONVERT(eventTime, 
'1:MILLISECONDS:EPOCH', "
-        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', 
'1:HOURS'), "
-        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
-        + "'2021-11-07 3:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS') AS 
time_col "
-        + "FROM parkingData "
-        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
-        + "GROUP BY 4, 5 "
-        + "ORDER BY 5 "
-        + "LIMIT 200";
-
-    DateTimeFormatSpec dateTimeFormatter
-        = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd 
HH:mm:ss.SSS");
-    DateTimeGranularitySpec dateTimeGranularity = new 
DateTimeGranularitySpec("1:HOURS");
-
-    BrokerResponseNative gapfillBrokerResponse = 
getBrokerResponseForSqlQuery(gapfillQuery);
-
-    ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
-    Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
-    List<Object[]> gapFillRows = gapFillResultTable.getRows();
-    long start = dateTimeFormatter.fromFormatToMillis("2021-11-07 
03:00:00.000");
-    for (int i = 0; i < 32; i += 4) {
-      String timeCol = (String) gapFillRows.get(i)[4];
-      long timeStamp = dateTimeFormatter.fromFormatToMillis(timeCol);
-      Assert.assertEquals(timeStamp, start);
-      Set<String> lots = new HashSet<>();
-      lots.add((String) gapFillRows.get(i)[3]);
-      for (int j = 1; j < 4; j++) {
-        Assert.assertEquals(gapFillRows.get(i)[4], gapFillRows.get(i + j)[4]);
-        Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[3]));
-        lots.add((String) gapFillRows.get(i + j)[3]);
-      }
-      start += dateTimeGranularity.granularityToMillis();
-    }
-  }
-
-  @Test
-  public void datetimeconvertGapfillWithOrderingByTwoColumnsTest() {
-    String gapfillQuery = "SELECT "
-        + "PostAggregateGapFill(DATETIMECONVERT(eventTime, 
'1:MILLISECONDS:EPOCH', "
-        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', 
'1:HOURS'), "
-        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
-        + "'2021-11-07 3:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS') AS 
time_col, "
-        + "lotId, "
-        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_PREVIOUS_VALUE') as status1, "
-        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 
'FILL_DEFAULT_VALUE') as status2, "
-        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
-        + "FROM parkingData "
-        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
-        + "GROUP BY 1, 2 "
-        + "ORDER BY 1, 2 "
-        + "LIMIT 200";
-
-    DateTimeFormatSpec dateTimeFormatter
-        = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd 
HH:mm:ss.SSS");
-    DateTimeGranularitySpec dateTimeGranularity = new 
DateTimeGranularitySpec("1:HOURS");
-
-    BrokerResponseNative gapfillBrokerResponse = 
getBrokerResponseForSqlQuery(gapfillQuery);
-
-    ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
-    Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
-    List<Object[]> gapFillRows = gapFillResultTable.getRows();
-    long start = dateTimeFormatter.fromFormatToMillis("2021-11-07 
03:00:00.000");
-    for (int i = 0; i < 32; i += 4) {
-      String firstTimeCol = (String) gapFillRows.get(i)[0];
-      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
-      Assert.assertEquals(timeStamp, start);
-      Set<String> lots = new HashSet<>();
-      lots.add((String) gapFillRows.get(i)[1]);
-      for (int j = 1; j < 4; j++) {
-        Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
-        Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
-        lots.add((String) gapFillRows.get(i + j)[1]);
-      }
-      start += dateTimeGranularity.granularityToMillis();
-    }
-  }
-
-  @AfterClass
-  public void tearDown()
-      throws IOException {
-    _indexSegment.destroy();
-    FileUtils.deleteDirectory(INDEX_DIR);
-  }
-}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to